Node.js streams make managing big chunks of data a breeze. Think of a stream like a conveyor belt in a factory, moving items bit by bit.
In Node.js, streams do something similar with data. They let you handle things like reading files or sending data over the internet in smaller, more manageable parts, almost like taking small bites out of a huge sandwich instead of trying to eat it all in one mouthful.
This means your Node.js apps run smoother and faster, especially when they're dealing with a lot of information.
The basics of Node.js streams
There are four main types of streams, and understanding them is key to mastering how data flows in your applications.
Readable Streams: Think of these like a book you're reading. Just as you read words from a page, readable streams let your app read data from a source, like a file or a web request. Whenever you're pulling in data, you're likely using a readable stream.
Writable Streams: Now, imagine you're writing in a journal. Writable streams are like that – they let your app write data to a destination. Whether you're saving data to a file or sending it to another server, you're using a writable stream.
Duplex Streams: These are the multitaskers. Duplex streams are like walkie-talkies; you can both speak into them and listen to them. In Node.js, duplex streams let you both read and write data through the same stream. They’re handy when you need to do both, like when you're chatting in an app.
Transform Streams: Transform streams take in data, change it into something else, and then output the new form. It’s like putting raw ingredients into a blender and getting a smoothie out. They’re great for things like compressing files, converting data formats or really any data processing between reading and writing data.
In my experience, Duplex Streams are harder to come by. You'll most likely end up using a pipeline such as a Readable Stream, going through 1..N Transform Streams and ending with a Writable Stream most of the time.
How streams work in Node.js
Let's dive into how Node.js streams really work, without getting too technical. Imagine you have a water tank and a cup. You want to fill the cup, but you can't just dump the whole tank's water into it at once. That's where the concept of buffering and flow control in streams comes into play.
Buffering: In Node.js streams, buffering is like having a small container that temporarily holds a bit of water from the tank before it goes into the cup. This container makes sure the cup gets just the right amount of water at the right time, without overflowing or running dry. Similarly, buffering in streams ensures that data is fed into the system at a pace it can handle.
Flow Control: This is all about managing the speed and volume of the water (or data) flow. If the water flows too fast, the cup might overflow. Too slow, and it takes forever to fill up. Node.js streams automatically adjust this flow, ensuring data moves smoothly and efficiently from source to destination, just like controlling the valve of the water tank to fill the cup at an optimal rate.
// Importing the necessary modules from the 'fs' package
import { createReadStream, createWriteStream } from 'fs';
// File paths
const sourceFilePath = './source.txt';
const destinationFilePath = './destination.txt';
// Creating readable and writable streams
const readStream = createReadStream(sourceFilePath);
const writeStream = createWriteStream(destinationFilePath);
// Using the pipe method to transfer data from readStream to writeStream
readStream.pipe(writeStream);
console.log('File has been copied successfully.');
This reads data from source.txt
and writes it to destination.txt
using Node.js streams. The pipe
method takes care of buffering and flow control automatically for us.
Demo project
Let's have a look at a demo project designed as a learning tool for understanding and utilizing Node.js streams. It focuses on two primary applications:
Data Generation: Utilizing streams to generate a large data file efficiently
Report Generation: Demonstrating how to process data and generate a report without consuming excessive memory
Both applications run in a matter of seconds and be very memory efficient. On my machine, the Data Generation app took about 14s seconds to generate 1M lines into a ~280mb file, and the Report Generation app took under 3s to generate an aggregated report from the generated file.
You can browse the full source code here: https://github.com/phillippelevidad/nodejs-streams
Efficient data generation with streams
This is the meat of the [src/1.generateData.js](https://github.com/phillippelevidad/nodejs-streams/blob/main/src/1.generateData.js) file of the demo project.
The script demonstrates a memory-efficient way to generate a large number of data objects (in this case, orders), serialize them into NDJSON format, and write them to a file.
The use of asynchronous generators, along with the streaming capabilities of Node.js, makes this script highly efficient for handling large-scale data operations.
import fs from "fs";
import { pipeline } from "stream/promises";
import { stringify } from "ndjson";
const NUMBER_OF_ORDERS = 1_000_000;
// Generate orders using a generator function
// The fact that it's a generator function helps peformance
// because it doesn't need to keep all orders in memory
async function* generateOrders() {
for (let i = 0; i < NUMBER_OF_ORDERS; i++) {
const order = {
"customer": "Tracey Stark",
"date": "2022-12-27T17:00:47.514Z",
"orderTotal": 2946,
"items": [
{
"product": "Tasty Plastic Chair",
"quantity": 3,
"unitPrice": 982,
"category": "Jewelery"
}
]
};
yield order;
}
}
// Other steps of the processing pipeline
const serialize = stringify();
const output = fs.createWriteStream(config.ORDERS_FILE);
// Create a pipeline that will generate orders, serialize them to NDJSON
// and write them to the output file. This is a very efficient way of
// processing large amounts of data. Each order is processed as soon as
// it's generated, without having to accumulate all orders in memory.
await pipeline(generateOrders, serialize, output);
The generateOrders
Generator Function
This is an async generator function that produces a sequence of order objects. I used a hard-coded order object for simplicity, but the original source code generates random orders.
The use of a generator function is key for performance, as it avoids the need to keep all generated orders in memory at once. Instead, it yields one order at a time.
In each iteration, it generates an order object and yields it.
Other components of the pipeline
serialize
: A transform stream created bystringify()
from thendjson
module. It takes JavaScript objects and converts them into NDJSON format.output
: A writable stream created byfs.createWriteStream
, intended to write data to a file specified byconfig.ORDERS_FILE
.
Creating and executing the pipeline
The
pipeline
function is used to create a processing pipeline. It connects the output ofgenerateOrders
(which produces orders one by one) to theserialize
stream (which converts these orders to NDJSON format) and then to theoutput
stream (which writes the serialized data to a file).The use of
await
makes it wait for the entire pipeline to finish processing all orders.This approach is memory-efficient because each order is processed and serialized as soon as it's generated, without accumulating all orders in memory.
Streamlining report generation with Node.js streams
Streams aren't just great for creating data; they're also perfect for reading and processing it. This comes in super handy for tasks like generating reports from large files.
Instead of loading our huge file into memory with its 1M lines of JSON, we use Node.js streams to load and process the file in smaller sections.
Let's inspect a streamlined version of the [2.generateReport.js](https://github.com/phillippelevidad/nodejs-streams/blob/main/src/2.generateReport.js) file.
import fs from "fs";
import { pipeline } from "stream/promises";
import split from "split2";
const report = {
grandTotal: 0,
customers: {},
months: {},
categories: {},
};
// First step of the pipeline
// Read the orders file in chunks
const input = fs.createReadStream(config.ORDERS_FILE);
// Second step of the pipeline
// Split the orders into lines and parse each line as JSON
const splitAndParse = split(JSON.parse);
// Third step of the pipeline
// Aggregate the data
// Accumulate the report data in memory
async function* aggregate(stream) {
for await (const data of stream) {
report.grandTotal += data.orderTotal;
report.customers[data.customer] =
(report.customers[data.customer] ?? 0) + data.orderTotal;
report.months[data.date.slice(0, 7)] =
(report.months[data.date.slice(0, 7)] ?? 0) + data.orderTotal;
for (const item of data.items) {
report.categories[item.category] =
(report.categories[item.category] ?? 0) +
item.quantity * item.unitPrice;
}
}
yield JSON.stringify(report, null, 2);
}
// Fourth step of the pipeline
// Write the report to a file
const output = fs.createWriteStream(config.REPORT_FILE);
// Create a pipeline that will read the orders file, split it into lines,
// parse each line as JSON, aggregate the data and write the report to a file.
// This is a very efficient way of processing large amounts of data. Each order
// is processed as soon as it's read from the file, without having to
// accumulate all orders in memory.
await pipeline(input, splitAndParse, aggregate, output);
Reading the orders.ndjson
data file
input
: A readable stream created byfs.createReadStream
to read the orders file specified byconfig.ORDERS_FILE
.
Splitting and parsing the data
splitAndParse
: A transform stream created usingsplit
that splits the data from theinput
stream into lines and parses each line as JSON.
Aggregating the data for the report
aggregate
: An async generator function that processes each piece of data (order) from the stream.Inside this function, we update the
report
object by aggregating data like the grand total of orders, totals per customer, per month, and per category.After processing all data, it yields the final report as a JSON string.
Writing the report to a file
output
: A writable stream created byfs.createWriteStream
for writing the aggregated report to a file specified byconfig.REPORT_FILE
.
Executing the pipeline
The
pipeline
function is used to chain these streams together: reading from theinput
, processing throughsplitAndParse
andaggregate
, and finally writing tooutput
.We
await
for the entire pipeline to complete. This approach processes each order as it is read from the file, without needing to load all orders into memory at once.
On my machine, this was able to load and process 1M orders in under 3 seconds without any memory spikes. Cool, right?
Advanced stream techniques
Error handling in Node.js streams
Of course, errors can occur for various reasons, like file read/write issues or network problems. Robust error handling ensures that these issues don't bring your entire application to a halt.
Here are a few strategies to effectively manage errors:
Listening for Errors: Always listen for the 'error' event on your streams. This is the most fundamental step in stream error handling. When an error occurs, this event is emitted, allowing you to handle it gracefully, like logging the error or cleaning up resources.
import { createReadStream } from 'fs';
const stream = createReadStream('path/to/file.txt');
stream.on('error', (error) => {
console.error('An error occurred:', error);
// Additional error handling logic here
});
Try-Catch with Async Functions: If you’re using async functions or promises with streams, wrap your code in try-catch blocks to handle any errors that might be thrown.
import { createReadStream } from 'fs';
import { pipeline } from 'stream/promises';
async function processStream() {
try {
const stream = createReadStream('path/to/file.txt');
await pipeline(stream, process.stdout);
} catch (error) {
console.error('Error processing stream:', error);
}
}
processStream();
Cleaning Up Resources: In the event of an error, it's important to properly close any open streams or files to prevent memory leaks and other issues. This might involve destroying the stream or closing file descriptors.
import { createReadStream, createWriteStream } from 'fs';
const readStream = createReadStream('source.txt');
const writeStream = createWriteStream('destination.txt');
readStream.on('error', (error) => {
console.error('Read stream error:', error);
readStream.destroy();
writeStream.end();
});
writeStream.on('error', (error) => {
console.error('Write stream error:', error);
writeStream.destroy();
readStream.close();
});
Performance optimization tips
Choosing the Right Stream Type: Understanding the different types of streams (readable, writable, duplex, and transform) and choosing the appropriate one for your task is essencial. Using the wrong type can lead to inefficiencies.
import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';
// Readable stream example
const readStream = createReadStream('input.txt');
// Writable stream example
const writeStream = createWriteStream('output.txt');
// Duplex stream example (net.Socket is a duplex stream)
// This one is best for things like a Chat application.
import net from 'net';
const duplexStream = new net.Socket();
// Transform stream example
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
Buffer Management: Properly managing the buffer size can have a significant impact on performance. While Node.js handles this automatically to a large extent, in some cases, adjusting the buffer size can lead to better performance, especially in high-throughput scenarios.
import { createReadStream } from 'fs';
// Adjusting the buffer size
const bufferSize = 1024 * 1024; // 1 MB
const readStream = createReadStream('large-file.txt', { highWaterMark: bufferSize });
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
Backpressure Management: Backpressure occurs when a readable stream produces data faster than the writable stream can handle. You can manage it by pausing the readable stream until the writable stream catches up.
import { createReadStream, createWriteStream } from 'fs';
const readStream = createReadStream('large-file.txt');
const writeStream = createWriteStream('output.txt');
readStream.on('data', (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
console.log('Pausing read stream due to backpressure');
readStream.pause();
writeStream.once('drain', () => {
console.log('Resuming read stream');
readStream.resume();
});
}
});
Using Transform Streams for Data Manipulation: If you need to modify data as it passes through your stream, using transform streams is usually more efficient than manually manipulating the data and then writing it to a stream.
import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';
const transformStream = new Transform({
transform(chunk, encoding, callback) {
const transformedChunk = chunk.toString().toUpperCase();
callback(null, transformedChunk);
}
});
const readStream = createReadStream('input.txt');
const writeStream = createWriteStream('output.txt');
readStream.pipe(transformStream).pipe(writeStream);
Avoid Synchronous Operations: In stream callbacks or handlers, avoid synchronous operations or heavy computation, as they can block the event loop and reduce the overall throughput of your streams.
import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';
const transformStream = new Transform({
transform(chunk, encoding, callback) {
// Turn a synchronous operation into an asynchronous one
// setImmediate defers the operation to the end of the current
// event loop cycle
setImmediate(() => {
// toString().toUpperCase() is just a silly example of a sync
// operation that wouldn't call for this type of optimization,
// but imagine that it would be a heavy sync operation here
const transformedChunk = chunk.toString().toUpperCase();
callback(null, transformedChunk);
});
}
});
const readStream = createReadStream('input.txt');
const writeStream = createWriteStream('output.txt');
readStream.pipe(transformStream).pipe(writeStream);
Further reading and resources
We really just scratched the surface here. If you need more, there are tons of great resources out there to help you. Here are some of my favorites:
Node.js Official Documentation: It's always good to start with the basics, and the Node.js official documentation is the perfect place. It's clear, thorough, and straight from the source.
"Streams Handbook" by Substack: This online guide is super focused and easy to follow. Read it on GitHub.
"Node.js Streams: Everything You Need to Know" by Samer Buna: This article on FreeCodeCamp’s blog breaks down streams in a really approachable way. Give it a read on FreeCodeCamp.
Also, the "Node.js Design Patterns" by Mario Casciaro book is on my reading list. Check it out on Amazon (this is not an affiliate link).
Ok, that's it, then! Grab a cup of coffee, get comfy, and start refactoring some code using Node.js streams! 🌟👩💻👨💻🌟