Streams are a way for handling input and output, and much like the Unix pipes, the output from one command can be the input in another. Steams will deal with reading data and piping it. They are defined in the Node.js documentation as the following.

A stream is an abstract interface for working with streaming data in Node.js.

The stream module provides a base API that makes it easy to build objects that implement the stream interface. One thing that is playing a huge factor in understanding the streams is that they are instances of the EventEmitter and they implement their own methods on top of that.

Stream Types

There are four types of streams:

  • Readable streams where data can be read from. fs.createReadStream() on the File System module is an instance of a readable stream.
  • Writable streams where data can be written to. fs.createWriteStream() on the File System module is an example of a writable stream.
  • Duplex streams are both Readable and Writable. Such as net.Socket class on the Net module.
  • Transform streams that are Duplex and can transform the data as it is written and read. For example, zlib.createDeflate on the Zlib module.

Now there is a fifth type of streams that is known as classic streams. These type of streams behaves according to the old API (they appeared on Node.js 0.4) and they default back to it whenever a stream has a data listener registered (EventEmitter).

The pipe

The pipe method is what pair the output of a readable stream (source) as an input of a writable stream (destination). Think of it like the | symbol when using the output of one Unix command as an input of another. The pipe method has the following syntax.

readableStream.pipe(writableStream);

As we know, we can chain multiple pipes in the Unix like so.

command1 | command2 | command3 | command4

The same way we can chain multiple pipe calls.

stream1.pipe(stream2).pipe(stream3).pipe(stream4);

However, stream2 and stream3 in this case must be Duplex or Transform in order for this chain to work.

Readable Streams

As mentioned, a readable stream read data from a source (another stream, file, or memory buffer). Let's define a Readable stream.

const { Readable } = require('stream');
const readableStream = new Readable();

readableStream.push('Hello, ');
readableStream.push('World!');
readableStream.push(null); 

readableStream.pipe(process.stdout);

//Hello, World!

First, we required the Readable from the stream module, and instantiated a new instance of it called readableStream. To send data to be consumed, we use push method, which if feed null, will tell the consumer that the Readable stream is finished outputting data. Finally, we use the pipe method to pipe the source to the process.stdout.

We mentioned before that the streams are basically event emitters. Therefore, we can write the previous piece of code like the following.

const { Readable } = require('stream');
const readableStream = new Readable();

var data = '';

readableStream.push('Hello, ');
readableStream.push('World!');
readableStream.push(null); 

readableStream.on('data', chunk => {
    data += chunk;
});

readableStream.on('end', () => {
    console.log(data);
});

Notice how we used the event listener to listen for the data event and read the data as chunks and append it to a variable called data. Then listened to the end event where we mere outputted the data. This is a simple implementation of the pipe method.

Data is buffered in Readable streams when the implementation calls stream.push(chunk). If the consumer of the Stream does not call stream.read(), the data will sit in the internal queue until it is consumed.

Both Writable and Readable streams will store data in an internal buffer that can be retrieved using writable._writableState.getBuffer() or readable._readableState.buffer, respectively. The amount of data potentially buffered depends on the highWaterMark option passed into the streams constructor.

However, it is advisable that we avoid buffering data and generate it only when the consumer requests it. To do so, we push chunks in a ._read function. Let's see how we do that.

const { Readable } = require('stream');
const readableStream = new Readable();

var n = 100;

readableStream._read = function() {
    for (let i=0; i < n; i++) {
        readableStream.push('' + i);
    }

    readableStream.push(null);
};

readableStream.pipe(process.stdout);

we pushed '' + i because the push takes only string.

Once the total size of the internal read buffer reaches the threshold specified by highWaterMark, the stream will temporarily stop reading data from the underlying resource until the data currently buffered can be consumed (that is, the stream will stop calling the internal readable._read() method that is used to fill the read buffer).

Writable Streams

For the Writable stream, we use the _write method and then we can pipe a Readable stream to it.

const { Writable } = require('stream');
const writableStream = Writable();

writableStream._write = function (chunk, encoding, next) {
    console.log(chunk.toString());
    next();
};

readableStream.pipe(writableStream);

The _write function takes three parameters, chunk, encoding, and next. chunk is the next piece of data that comes from the Readable stream. encoding is the encoding of the data, where by default the data that are read from a stream is a Buffer object and can be set using Readable.setEncoding(). For example, Readable.setEncoding('utf8'). The next is what we call when we are ready to receive the next chunk of data from the readable stream.

Where to Use Streams

A good and practical example of using streams is when dealing with a large amount of data. For example, loading a large file from the disk or over a network. Where traditionally the file will be loaded into the memory and served from there to the client, streams will serve chunks of that file leaving the memory usage at the minimum.