Communicating Sequential Processes: an alternative to async generators

[2017-03-13] dev, javascript, esnext, async
(Ad, please don’t block)

In this blog post I present Communicating Sequential Processes (CSP) and how they can be used as an alternative to async generators. In the process, we will also take a look at the ECMAScript proposal that async generators are a part of: “asynchronous iteration

Asynchronous iteration  

Let’s start by refreshing our knowledge of asynchronous iteration. It consists of three parts:

  1. Async iteration protocol: a combination of the protocol for synchronous iteration and Promises (which enable asynchronicity).
  2. for-await-of: a version of the for-of loop that works with data that is async iterable.
  3. Async generators: generators whose yielded values are delivered via async iteration.

The next sections take a look at each of these parts. Afterwards, I’ll explain CSP and how it is an alternative to #3.

Async iteration protocol  

Synchronous iteration is supported by many languages. What all approaches have in common is that you need two things:

  • A way to deliver values
  • A way to signal the end of the sequence of values

However, they differ in the specifics. For example:

  • Java: iterators have two methods: next() returns values. hasNext() returns a boolean indicating whether there are more values.

  • Python: values are delivered via the iterator method __next__(). That method raises the exception StopIteration if it is called after the last value.

  • Swift: iterators have a single method next() that returns nil after the last value.

For JavaScript, the idea was to use a single method without an exception, much like Swift does. However, unlike Swift, TC39 wanted to avoid bugs caused by end-of-iteration values being stored in collections. That’s why in JavaScript, the iterator method next() always returns {value,done}. For values, done is false. After the last value, done is true. You could say that the end-of-sequence value is truly metadata and kept out of band.

In async iteration, the {value,done} objects are delivered via Promises. That does make everything even more complex, but the added complexity is necessary to fully support asynchronicity. Moreover, since Promises are a standard mechanism, async functions help when working with async iteration.

for-await-of  

The for-await-of loop looks as follows. It is based on the async iteration protocol and can be used inside async functions and async generators:

for await (const x of asyncIterable) {
    console.log(x);
}

Asynchronous generators  

As of ES2017, we already have the following invokable entities in JavaScript:

  • Function expressions, function declarations
  • Arrow functions
  • Method definitions
  • Classes
  • Async function expressions, async function declarations, async arrow functions, async method definitions

Async generators would add to this already long list:

  • Asynchronous generator function declarations
  • Asynchronous generator function expressions
  • Asynchronous generator method definitions

The core idea of async generators is:

  • Receive input via await, like async functions
  • Produce output via yield, like generators, but via async iteration, not via sync iteration.

It would be nice if we didn’t need to introduce yet another callable entity to JavaScript. In the next section, I present Communicating Sequential Processes, which can be implemented via just a library in ES2017 and are an alternative to async generators.

Communicating Sequential Processes (CSP)  

CSP are a pattern for concurrent programming that involves two key abstractions:

  • Processes: are tasks that are, in general, executed concurrently. The code defining a Process is executed sequentially.

  • Channels: are first-in first-out (FIFO) queues that are used for communication between Processes. You put an element into a Channel via the operation put and take an element out of a channel via the operation take.

async-csp: a JavaScript implementation of CSP  

For this blog post, I’m using the CSP library async-csp by @dvlsg:

  • It implements Channel as a class and put and take as Promise-based methods.

  • Processes are simply async functions that use await to block on put and take. await leads to a Process yielding execution to other JavaScript code, including other Processes, leading to a relatively coarse-grained cooperative form of multitasking.

Async generators vs. CSP  

In this section, I’ll compare async generators with CSP and you’ll see that they are quite similar.

Example: reading and processing a file asynchronously  

The GitHub repository /async-iter-demo contains code that processes a file asynchronously in the following steps:

  1. readFile(): read the contents of a file asynchronously.
  2. splitLines(): split the result into lines.
  3. numberLines(): number the resulting lines.
  4. logLines(): log the numbered lines to the console.

Starting processing  

Async generators. You can’t directly feed asynchronous input from legacy sources into async generators. You need a data structure that is asynchronously iterable. AsyncQueue is such a data structure that I implemented as a simple utility class. The following function starts asynchronous processing by reading a file, feeding its contents to an AsyncQueue and returning that queue:

// Signature: string → AsyncIterable
function readFile(fileName) {
    const queue = new AsyncQueue(); // A
    const readStream = createReadStream(fileName,
        { encoding: 'utf8', bufferSize: 1024 });
    readStream.on('data', buffer => {
        const str = buffer.toString('utf8');
        queue.enqueue(str); // (B)
    });
    readStream.on('error', err => {
        queue.enqueue(err); // (C)
        queue.close();
    });
    readStream.on('end', () => {
        // Signal end of output sequence
        queue.close(); // (D)
    });
    return queue;
}

The queue is created in line A. Input is added to it in line B. Errors are added like normal queue elements (line C), but are thrown when they are taken out of the queue. Once the stream ends, the queue is closed (line D).

CSP. ChannelWithErrors augments the library’s Channel with error handling. Similarly to the previous code, readFile() starts asynchronous processing:

// Signature: string → Channel
function readFile(fileName : string) : Channel {
    const channel = new ChannelWithErrors();
    const readStream = fs.createReadStream(fileName,
        { encoding: 'utf8', bufferSize: 1024 });
    readStream.on('data', buffer => {
        const str = buffer.toString('utf8');
        channel.put(str);
    });
    readStream.on('error', err => {
        channel.putError(err); // (A)
    });
    readStream.on('end', () => {
        // Signal end of output sequence
        channel.close();
    });
    return channel;
}

This piece of code works almost the same as the previous one. Note the special method putError() in line A.

Transforming a sequence of values  

In this post, we are skipping the relatively complex splitLines() and advancing to numberLines(), which is a good demonstration of a function that takes asynchronous input, transforms it and produces asynchronous output.

Async generators receive input via await and produce output via yield:

// Signature: AsyncIterable → AsyncIterable
async function* numberLines(lines) {
    let n = 1;
    for await (const line of lines) {
        yield `${n} ${line}`;
        n++;
    }
}

You can see that for-await-of is really useful, but you need to have an async iterable.

CSP. The CSP version of numberLines() receives its input via one channel and produces its output via another channel:

// Signature: Channel × Channel → void
async function numberLines(inputChannel, outputChannel) {
    for (let n=1;; n++) {
        const line = await inputChannel.takeWithErrors();
        if (line === ChannelWithErrors.DONE) {
            break;
        }
        outputChannel.put(`${n} ${line}`);
    }
    outputChannel.close();
}

We’ll see later how functions that have the signature

Channel × Channel → void

can be composed to produce a single channel. For now, it is enough to see that a function with such a signature is easy to understand and that you only need things that ES2017 already has (async functions, Promises) plus library code.

I opted against using await for put() andd close(). Whether or not that works depends on the Channel implementation one is using.

Putting everything together  

Function main() puts everything together.

Async generators. Since each of the functions that are involved produce and possibly consume async iterables, simple function application is enough to connect them. The result is an asynchronous iterable asyncIterable (line A) whose contents can be logged via logLines().

async function main() {
    const fileName = process.argv[2];
    const asyncIterable = numberLines(splitLines(readFile(fileName))); // (A)
    await logLines(asyncIterable);
}

// Signature: AsyncIterable → void
async function logLines(asyncIterable) {
    for await (const line of asyncIterable) {
        console.log(line);
    }
}

CSP. The custom tool function filter() (line A) connects a channel (created via readFile()) with functions that have two channels as parameters (input and output). The result is yet another channel whose contents can be logged via logLines().

async function main() {
    const fileName = process.argv[2];

    const ch = filter(readFile(fileName), splitLines, numberLines); // (A)
    await logLines(ch);
}

// Signature: Channel → void
async function logLines(channel) {
    while (true) {
        const line = await channel.takeWithErrors();
        if (line === ChannelWithErrors.DONE) break;
        console.log(line);
    }
}

Conclusions  

I hope that the example showed that CSP are an interesting alternative to async generators. I’ve drawn a few more conclusions from working with both CSP and async generators:

  • If we do get async generators, they should come bundled with an asynchronously iterable data structure, because it is essential for connecting async iteration with legacy sources of async data. WHATWG streams are such a data structure, but they are not an ECMAScript feature.

  • Async generators do error handling well. With CSP, you need to manually feed errors into channels.

  • I like that CSP focus on the Channel as their core abstraction. It is something tangible and a location where lots of configuration will occur (buffering, handling backpressure, etc.).

  • CSP would benefit from two parts of the async iteration proposal: Channel could implement the async iteration protocol (via a method [Symbol.asyncIterator]), which would enable the handy for-await-of and interoperability with tools based on async iteration (incl. async generators).

Further reading