Easier Node.js streams via async iteration

[2019-11-07] dev, javascript, nodejs, async
(Ad, please don’t block)

Working with Node.js streams is much more pleasant if we use asynchronous iteration. This blog post explores how to do that.

Recap: asynchronous iteration and asynchronous generators  

Asynchronous iteration is a protocol for retrieving the contents of a data container asynchronously (meaning the current “task” may be paused before retrieving an item).

Asynchronous generators help with async iteration. For example, this is an asynchronous generator function:

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}
  • The for-await-of loop iterates over the input asyncIterable. This loop is also available in normal asynchronous functions.
  • The yield feeds values into the asynchronous iterable that is returned by this generator.

In the remainder of the blog post, pay close attention to whether a function is an async function or an async generator function:

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

Streams  

A stream is a pattern whose core idea is to “divide and conquer” a large amount of data: We can handle it if we split it into smaller pieces and handle one portion at a time.

Node.js supports several kinds of streams – for example:

  • Readable streams are streams from which we can read data. In other words, they are sources of data. An example is a readable file stream, which lets us read the contents of a file.

  • Writable streams are streams to which we can write data. In other words, they are sinks for data. An example is a writable file stream, which lets us write data to a file.

  • A transform stream is both readable and writable. As a writable stream, it receives pieces of data, transforms (changes or discards) them and then outputs them as a readable stream.

Pipelining  

To process streamed data in multiple steps, we can pipeline (connect) streams:

  1. Input is received via a readable stream.
  2. Each processing step is performed via a transform stream.
  3. For the last processing step, we have two options:
    • We can write the data in the most recent readable stream into a writable stream. That is, the writable stream is the last element of our pipeline.
    • We can process the data in the most recent readable stream in some other manner.

Part (2) is optional.

Text encodings  

When creating text streams, it is best to always specify an encoding:

  • The Node.js docs have a list of supported encodings and their default spellings – for example:

    • 'utf8'
    • 'utf16le'
    • 'base64'
  • A few different spellings are also allowed. You can use Buffer.isEncoding() to check which ones are:

    > buffer.Buffer.isEncoding('utf8')
    true
    > buffer.Buffer.isEncoding('utf-8')
    true
    > buffer.Buffer.isEncoding('UTF-8')
    true
    > buffer.Buffer.isEncoding('UTF:8')
    false
    

The default value for encodings is null, which is equivalent to 'utf8'.

Helper function: readableToString()  

We will occasionally use the following helper function. You don’t need to understand how it works, only (roughly) what it does.

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

This function is implemented via the event-based API. We’ll later see a simpler way of doing this – via async iteration.

A few preliminary remarks  

  • We’ll only use text streams in this post.

  • In the examples, we’ll occasionally encounter await being used at the top level. In that case, we imagine that we are inside a module or inside the body of an async function.

  • Whenever there are newlines, we support both:

    • Unix: '\n' (LF)
    • Windows: '\r\n' (CR LF)

    The newline characters of the current platform can be accessed via the constant EOL in module os.

Readable streams  

Creating readable streams  

Creating readable streams from files  

We can use fs.createReadStream() to create readable streams:

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');

Readable.from(): Creating readable streams from iterables  

The static method Readable.from(iterable, options?) creates a readable stream which holds the data contained in iterable. iterable can be a synchronous iterable or an asynchronous iterable. The parameter options is optional and can, among other things, be used to specify a text encoding.

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');
Creating readable streams from strings  

Readable.from() accepts any iterable and can therefore also be used to convert strings to streams:

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

At the moment, Readable.from() treats a string like any other iterable and therefore iterates over its code points. That isn’t ideal, performance-wise, but should be OK for most use cases. I expect Readable.from() to be often used with strings, so maybe there will be optimizations in the future.

Reading from readable streams via for-await-of  

Every readable stream is asynchronously iterable, which means that we can use a for-await-of loop to read its contents:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'

Collecting the contents of a readable stream in a string  

The following function is a simpler reimplementation of the function that we have seen at the beginning of this blog post.

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

Note that, in this case, we had to use an async function because we wanted to return a Promise.

Transforming readable streams via async generators  

Async iteration provides an elegant alternative to transform streams for processing streamed data in multiple steps:

  • The input is a readable stream.
  • The first transformation is performed by an async generator that iterates over the readable streams and yields as it sees fit.
  • Optionally, we can transform further, by using more async generators.
  • At the end, we have several options for handling the async iterable returned by the last generator:
    • We can convert it to a readable stream via Readable.from() (which can later be piped into a writable stream).
    • We can use an async function to process it.
    • Etc.

To summarize, these are the pieces of such processing pipelines:

readable
→ first async generator [→ … → last async generator]
→ readable or async function

In the following example, the final step is performed by the async function logLines() which logs the items in an iterable to the console.

import {Readable} from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
logLines(numberLines(chunksToLines(chunks)));

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

Writable streams  

Creating writable streams for files  

We can use fs.createWriteStream() to create writable streams:

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

Writing to writable streams  

In this section, we look at three approaches to writing to a writable stream:

  1. Writing directly to the writable stream via its method .write().
  2. Using method .pipe() of a readable stream to pipe it into the writable stream.
  3. Using function pipeline() from module stream to pipe a readable stream into the writable stream.

To demonstrate these approaches, we implement the same function writeIterableToFile() in three different ways.

Writing to a writable stream in an async function  

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

The default version of stream.finished() is callback-based but can be turned into a Promise-based version via util.promisify() (line A).

We used the following two patterns:

  • Writing to a writable stream while handling backpressure (line B):

    if (!writable.write(chunk)) {
      await once(writable, 'drain');
    }
    
  • Closing a writable stream and waiting until writing is done (line C):

    writable.end();
    await finished(writable);
    

pipeline(readable, writable)  

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

We used the following pattern (line A):

await pipeline(readable, writable);

There is also Readable.prototype.pipe(), but that method has a caveat (if the readable emits an error, then the writable is not closed automatically). stream.pipeline() does not have that caveat.

Module os:

  • const EOL: string (since 0.7.8)

    Contains the end-of-line character sequence used by the current platform.

Module buffer:

  • Buffer.isEncoding(encoding: string): boolean (since 0.9.1)

    Returns true if encoding correctly names one of the supported Node.js encodings for text. Supported encodings include:

    • 'utf8'
    • 'utf16le'
    • 'ascii'
    • 'latin1
    • 'base64'
    • 'hex' (each byte as two hexadecimal characters)

Module stream:

  • Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any> (since 10.0.0)

    Readable streams are asynchronously iterable. For example, you can use for-await-of loops in asyc functions or async generators to iterate over them.

  • finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void> (since 10.0.0)

    The returned Promise is settled when reading/writing is done or there was an error.

    This promisified version is created as follows:

    const finished = util.promisify(stream.finished);
    
  • pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void> (since 10.0.0)

    Pipes between streams. The returned Promise is settled when the pipeline is complete or when there was an error.

    This promisified version is created as follows:

    const pipeline = util.promisify(stream.pipeline);
    
  • Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable (since 12.3.0)

    Converts an iterable into a readable stream.

    interface ReadableOptions {
      highWaterMark?: number;
      encoding?: string;
      objectMode?: boolean;
      read?(this: Readable, size: number): void;
      destroy?(this: Readable, error: Error | null,
        callback: (error: Error | null) => void): void;
      autoDestroy?: boolean;
    }
    

    These options are the same as the options for the Readable constructor and documented there.

Module fs:

  • createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream (since 2.3.0)

    Creates a readable stream. More options are available.

  • createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream (since 2.3.0)

    With option .flags you can specify if you want to write or append and what happens if a file does or does not exist. More options are available.

The static type information in this section is based on Definitely Typed.

Acknowledgement  

  • Thanks to @ehmicky for telling me about several stream-related utility functions.

Further reading and sources of this post