Node.js Streams for Efficient Data Processing At Scale

Node.js streams make managing big chunks of data a breeze. Think of a stream like a conveyor belt in a factory, moving items bit by bit.

In Node.js, streams do something similar with data. They let you handle things like reading files or sending data over the internet in smaller, more manageable parts, almost like taking small bites out of a huge sandwich instead of trying to eat it all in one mouthful.

This means your Node.js apps run smoother and faster, especially when they're dealing with a lot of information.

The basics of Node.js streams

There are four main types of streams, and understanding them is key to mastering how data flows in your applications.

  1. Readable Streams: Think of these like a book you're reading. Just as you read words from a page, readable streams let your app read data from a source, like a file or a web request. Whenever you're pulling in data, you're likely using a readable stream.

  2. Writable Streams: Now, imagine you're writing in a journal. Writable streams are like that – they let your app write data to a destination. Whether you're saving data to a file or sending it to another server, you're using a writable stream.

  3. Duplex Streams: These are the multitaskers. Duplex streams are like walkie-talkies; you can both speak into them and listen to them. In Node.js, duplex streams let you both read and write data through the same stream. They’re handy when you need to do both, like when you're chatting in an app.

  4. Transform Streams: Transform streams take in data, change it into something else, and then output the new form. It’s like putting raw ingredients into a blender and getting a smoothie out. They’re great for things like compressing files, converting data formats or really any data processing between reading and writing data.

In my experience, Duplex Streams are harder to come by. You'll most likely end up using a pipeline such as a Readable Stream, going through 1..N Transform Streams and ending with a Writable Stream most of the time.

How streams work in Node.js

Let's dive into how Node.js streams really work, without getting too technical. Imagine you have a water tank and a cup. You want to fill the cup, but you can't just dump the whole tank's water into it at once. That's where the concept of buffering and flow control in streams comes into play.

  1. Buffering: In Node.js streams, buffering is like having a small container that temporarily holds a bit of water from the tank before it goes into the cup. This container makes sure the cup gets just the right amount of water at the right time, without overflowing or running dry. Similarly, buffering in streams ensures that data is fed into the system at a pace it can handle.

  2. Flow Control: This is all about managing the speed and volume of the water (or data) flow. If the water flows too fast, the cup might overflow. Too slow, and it takes forever to fill up. Node.js streams automatically adjust this flow, ensuring data moves smoothly and efficiently from source to destination, just like controlling the valve of the water tank to fill the cup at an optimal rate.

// Importing the necessary modules from the 'fs' package
import { createReadStream, createWriteStream } from 'fs';

// File paths
const sourceFilePath = './source.txt';
const destinationFilePath = './destination.txt';

// Creating readable and writable streams
const readStream = createReadStream(sourceFilePath);
const writeStream = createWriteStream(destinationFilePath);

// Using the pipe method to transfer data from readStream to writeStream
readStream.pipe(writeStream);

console.log('File has been copied successfully.');

This reads data from source.txt and writes it to destination.txt using Node.js streams. The pipe method takes care of buffering and flow control automatically for us.

Demo project

Let's have a look at a demo project designed as a learning tool for understanding and utilizing Node.js streams. It focuses on two primary applications:

  1. Data Generation: Utilizing streams to generate a large data file efficiently

  2. Report Generation: Demonstrating how to process data and generate a report without consuming excessive memory

Both applications run in a matter of seconds and be very memory efficient. On my machine, the Data Generation app took about 14s seconds to generate 1M lines into a ~280mb file, and the Report Generation app took under 3s to generate an aggregated report from the generated file.

You can browse the full source code here: https://github.com/phillippelevidad/nodejs-streams

Efficient data generation with streams

This is the meat of the [src/1.generateData.js](https://github.com/phillippelevidad/nodejs-streams/blob/main/src/1.generateData.js) file of the demo project.

The script demonstrates a memory-efficient way to generate a large number of data objects (in this case, orders), serialize them into NDJSON format, and write them to a file.

The use of asynchronous generators, along with the streaming capabilities of Node.js, makes this script highly efficient for handling large-scale data operations.

import fs from "fs";
import { pipeline } from "stream/promises";
import { stringify } from "ndjson";

const NUMBER_OF_ORDERS = 1_000_000;

// Generate orders using a generator function
// The fact that it's a generator function helps peformance
// because it doesn't need to keep all orders in memory
async function* generateOrders() {
  for (let i = 0; i < NUMBER_OF_ORDERS; i++) {
    const order = {
      "customer": "Tracey Stark",
      "date": "2022-12-27T17:00:47.514Z",
      "orderTotal": 2946,
      "items": [
        {
          "product": "Tasty Plastic Chair",
          "quantity": 3,
          "unitPrice": 982,
          "category": "Jewelery"
        }
      ]
    };
    yield order;
  }
}

// Other steps of the processing pipeline
const serialize = stringify();
const output = fs.createWriteStream(config.ORDERS_FILE);

// Create a pipeline that will generate orders, serialize them to NDJSON
// and write them to the output file. This is a very efficient way of
// processing large amounts of data. Each order is processed as soon as
// it's generated, without having to accumulate all orders in memory.
await pipeline(generateOrders, serialize, output);

The generateOrders Generator Function

  • This is an async generator function that produces a sequence of order objects. I used a hard-coded order object for simplicity, but the original source code generates random orders.

  • The use of a generator function is key for performance, as it avoids the need to keep all generated orders in memory at once. Instead, it yields one order at a time.

  • In each iteration, it generates an order object and yields it.

Other components of the pipeline

  • serialize: A transform stream created by stringify() from the ndjson module. It takes JavaScript objects and converts them into NDJSON format.

  • output: A writable stream created by fs.createWriteStream, intended to write data to a file specified by config.ORDERS_FILE.

Creating and executing the pipeline

  • The pipeline function is used to create a processing pipeline. It connects the output of generateOrders (which produces orders one by one) to the serialize stream (which converts these orders to NDJSON format) and then to the output stream (which writes the serialized data to a file).

  • The use of await makes it wait for the entire pipeline to finish processing all orders.

  • This approach is memory-efficient because each order is processed and serialized as soon as it's generated, without accumulating all orders in memory.

Streamlining report generation with Node.js streams

Streams aren't just great for creating data; they're also perfect for reading and processing it. This comes in super handy for tasks like generating reports from large files.

Instead of loading our huge file into memory with its 1M lines of JSON, we use Node.js streams to load and process the file in smaller sections.

Let's inspect a streamlined version of the [2.generateReport.js](https://github.com/phillippelevidad/nodejs-streams/blob/main/src/2.generateReport.js) file.

import fs from "fs";
import { pipeline } from "stream/promises";
import split from "split2";

const report = {
  grandTotal: 0,
  customers: {},
  months: {},
  categories: {},
};

// First step of the pipeline
// Read the orders file in chunks
const input = fs.createReadStream(config.ORDERS_FILE);

// Second step of the pipeline
// Split the orders into lines and parse each line as JSON
const splitAndParse = split(JSON.parse);

// Third step of the pipeline
// Aggregate the data
// Accumulate the report data in memory
async function* aggregate(stream) {
  for await (const data of stream) {
    report.grandTotal += data.orderTotal;

    report.customers[data.customer] =
      (report.customers[data.customer] ?? 0) + data.orderTotal;

    report.months[data.date.slice(0, 7)] =
      (report.months[data.date.slice(0, 7)] ?? 0) + data.orderTotal;

    for (const item of data.items) {
      report.categories[item.category] =
        (report.categories[item.category] ?? 0) +
        item.quantity * item.unitPrice;
    }
  }

  yield JSON.stringify(report, null, 2);
}

// Fourth step of the pipeline
// Write the report to a file
const output = fs.createWriteStream(config.REPORT_FILE);

// Create a pipeline that will read the orders file, split it into lines,
// parse each line as JSON, aggregate the data and write the report to a file.
// This is a very efficient way of processing large amounts of data. Each order
// is processed as soon as it's read from the file, without having to
// accumulate all orders in memory.
await pipeline(input, splitAndParse, aggregate, output);

Reading the orders.ndjson data file

  • input: A readable stream created by fs.createReadStream to read the orders file specified by config.ORDERS_FILE.

Splitting and parsing the data

  • splitAndParse: A transform stream created using split that splits the data from the input stream into lines and parses each line as JSON.

Aggregating the data for the report

  • aggregate: An async generator function that processes each piece of data (order) from the stream.

  • Inside this function, we update the report object by aggregating data like the grand total of orders, totals per customer, per month, and per category.

  • After processing all data, it yields the final report as a JSON string.

Writing the report to a file

  • output: A writable stream created by fs.createWriteStream for writing the aggregated report to a file specified by config.REPORT_FILE.

Executing the pipeline

  • The pipeline function is used to chain these streams together: reading from the input, processing through splitAndParse and aggregate, and finally writing to output.

  • We await for the entire pipeline to complete. This approach processes each order as it is read from the file, without needing to load all orders into memory at once.

On my machine, this was able to load and process 1M orders in under 3 seconds without any memory spikes. Cool, right?

Advanced stream techniques

Error handling in Node.js streams

Of course, errors can occur for various reasons, like file read/write issues or network problems. Robust error handling ensures that these issues don't bring your entire application to a halt.

Here are a few strategies to effectively manage errors:

Listening for Errors: Always listen for the 'error' event on your streams. This is the most fundamental step in stream error handling. When an error occurs, this event is emitted, allowing you to handle it gracefully, like logging the error or cleaning up resources.

import { createReadStream } from 'fs';
const stream = createReadStream('path/to/file.txt');
stream.on('error', (error) => {
    console.error('An error occurred:', error);
    // Additional error handling logic here
});

Try-Catch with Async Functions: If you’re using async functions or promises with streams, wrap your code in try-catch blocks to handle any errors that might be thrown.

import { createReadStream } from 'fs';
import { pipeline } from 'stream/promises';
async function processStream() {
    try {
        const stream = createReadStream('path/to/file.txt');
        await pipeline(stream, process.stdout);
    } catch (error) {
        console.error('Error processing stream:', error);
    }
}
processStream();

Cleaning Up Resources: In the event of an error, it's important to properly close any open streams or files to prevent memory leaks and other issues. This might involve destroying the stream or closing file descriptors.

import { createReadStream, createWriteStream } from 'fs';
const readStream = createReadStream('source.txt');
const writeStream = createWriteStream('destination.txt');
readStream.on('error', (error) => {
    console.error('Read stream error:', error);
    readStream.destroy();
    writeStream.end();
});
writeStream.on('error', (error) => {
    console.error('Write stream error:', error);
    writeStream.destroy();
    readStream.close();
});

Performance optimization tips

Choosing the Right Stream Type: Understanding the different types of streams (readable, writable, duplex, and transform) and choosing the appropriate one for your task is essencial. Using the wrong type can lead to inefficiencies.

import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';

// Readable stream example
const readStream = createReadStream('input.txt');

// Writable stream example
const writeStream = createWriteStream('output.txt');

// Duplex stream example (net.Socket is a duplex stream)
// This one is best for things like a Chat application.
import net from 'net';
const duplexStream = new net.Socket();

// Transform stream example
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

Buffer Management: Properly managing the buffer size can have a significant impact on performance. While Node.js handles this automatically to a large extent, in some cases, adjusting the buffer size can lead to better performance, especially in high-throughput scenarios.

import { createReadStream } from 'fs';

// Adjusting the buffer size
const bufferSize = 1024 * 1024; // 1 MB
const readStream = createReadStream('large-file.txt', { highWaterMark: bufferSize });

readStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

Backpressure Management: Backpressure occurs when a readable stream produces data faster than the writable stream can handle. You can manage it by pausing the readable stream until the writable stream catches up.

import { createReadStream, createWriteStream } from 'fs';

const readStream = createReadStream('large-file.txt');
const writeStream = createWriteStream('output.txt');

readStream.on('data', (chunk) => {
  const canContinue = writeStream.write(chunk);
  if (!canContinue) {
    console.log('Pausing read stream due to backpressure');
    readStream.pause();

    writeStream.once('drain', () => {
      console.log('Resuming read stream');
      readStream.resume();
    });
  }
});

Using Transform Streams for Data Manipulation: If you need to modify data as it passes through your stream, using transform streams is usually more efficient than manually manipulating the data and then writing it to a stream.

import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';

const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    const transformedChunk = chunk.toString().toUpperCase();
    callback(null, transformedChunk);
  }
});

const readStream = createReadStream('input.txt');
const writeStream = createWriteStream('output.txt');

readStream.pipe(transformStream).pipe(writeStream);

Avoid Synchronous Operations: In stream callbacks or handlers, avoid synchronous operations or heavy computation, as they can block the event loop and reduce the overall throughput of your streams.

import { createReadStream, createWriteStream } from 'fs';
import { Transform } from 'stream';

const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    // Turn a synchronous operation into an asynchronous one
    // setImmediate defers the operation to the end of the current
    // event loop cycle
    setImmediate(() => {
      // toString().toUpperCase() is just a silly example of a sync
      // operation that wouldn't call for this type of optimization,
      // but imagine that it would be a heavy sync operation here
      const transformedChunk = chunk.toString().toUpperCase();
      callback(null, transformedChunk);
    });
  }
});

const readStream = createReadStream('input.txt');
const writeStream = createWriteStream('output.txt');
readStream.pipe(transformStream).pipe(writeStream);

Further reading and resources

We really just scratched the surface here. If you need more, there are tons of great resources out there to help you. Here are some of my favorites:

  1. Node.js Official Documentation: It's always good to start with the basics, and the Node.js official documentation is the perfect place. It's clear, thorough, and straight from the source.

  2. "Streams Handbook" by Substack: This online guide is super focused and easy to follow. Read it on GitHub.

  3. "Node.js Streams: Everything You Need to Know" by Samer Buna: This article on FreeCodeCamp’s blog breaks down streams in a really approachable way. Give it a read on FreeCodeCamp.

Also, the "Node.js Design Patterns" by Mario Casciaro book is on my reading list. Check it out on Amazon (this is not an affiliate link).

Ok, that's it, then! Grab a cup of coffee, get comfy, and start refactoring some code using Node.js streams! 🌟👩‍💻👨‍💻🌟