Reading streams via async iteration in Node.js

[2018-04-01] dev, javascript, nodejs, async
(Ad, please don’t block)

Update 2018-04-24: Using the library stringio to read lines from stdin.


Node.js 10 was released on April 24, 2018. This version provides experimental support for asynchronously iterating over readable streams. This blog post explains how that works.

Reading streams asynchronously  

In this section, we examine two ways of reading data from a stream asynchronously: via callbacks and via asynchronous iteration.

Reading asynchronously via callbacks  

To read the contents of a file asynchronously, you can use callbacks, as follows.

First, you create a readable stream:

function main(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });
  ···
  • The option encoding determines how the stream delivers its content:
    • If it is null, the stream delivers buffers.
    • If it is a string such as 'utf8', it delivers strings, by interpreting the stream’s data according to the specified encoding.
  • The option highWaterMark determines the maximum size (in bytes) of each delivered buffer or string.

Second, you receive the data by listening to the events data and end:

  ···
  readStream.on('data', (chunk) => {
    console.log('>>> '+chunk);
  });
  readStream.on('end', () => {
    console.log('### DONE ###');
  });
} // main

Reading asynchronously via async iteration  

Starting with Node.js v10, you can also use asynchronous iteration to read a file asynchronously. Readable streams have a property whose key is Symbol.asyncIterator, which enables the for-await-of loop to iterate over their chunks. However, this kind of loop is only available within async functions and async generators. That’s why we have to use an async function:

async function main(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });

  for await (const chunk of readStream) {
    console.log('>>> '+chunk);
  }
  console.log('### DONE ###');
}

Processing async iterables via async generators  

So far, we have seen how you can use async functions as sinks of async iterables. With async generators, you can go one step further: They can be the source of an async iterable. Or they can transform an async iterable (as both sink and source). The latter works as follows. The async generator:

  • Consumes an async iterable via for-await-of.
  • Returns an async iterable and feeds data into it via yield.

That is, if you chain async generators, you can process input similarly to Unix piping. Let’s look at a pipe with two async generators.

Generator #1: from chunks to lines  

The following function takes an async iterable over strings and returns an async iterable over lines:

/**
 * Parameter: async iterable of chunks (strings)
 * Result: async iterable of lines (incl. newlines)
 */
async function* chunksToLines(chunksAsync) {
  let previous = '';
  for await (const chunk of chunksAsync) {
    previous += chunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf('\n')) >= 0) {
      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

Generator #2: from lines to numbered lines  

This function takes lines and numbers them:

/**
 * Parameter: async iterable of lines
 * Result: async iterable of numbered lines
 */
async function* numberLines(linesAsync) {
  let counter = 1;
  for await (const line of linesAsync) {
    yield counter + ': ' + line;
    counter++;
  }
}

Connecting the generators  

The main() function reads a text file via a readable stream and applies the two async generators to it, therefore numbering the lines in that file.

async function main() {
  const inputFilePath = process.argv[2];
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });
  printAsyncIterable(numberLines(chunksToLines(readStream)));
}
main();

One intriguing trait of processing data asynchronously is that the processing steps become intertwined: As soon as the first chunk arrives, it is split into lines and the lines are numbered. Therefore, the code can handle very large files, because it processes them in chunks of 1024 bytes.

Reading stdin  

In the following example, we use the library stringio to read lines from stdin:

const {chunksToLinesAsync, chomp} = require('@rauschma/stringio');

async function main() {
  for await (const line of chunksToLinesAsync(process.stdin)) {
    console.log('LINE: '+chomp(line));
  }
}
main();

Conveniently, this code processes individual lines as soon as you hit return:

cat | node read_stdin.js

Conclusion  

Having async iterables in Node.js is great. It’s a considerable improvement over callback-based processing. Based on async iteration, we can now have combinators such as .map() and .filter() for asynchronous data.

Further reading