In this blog post I present Communicating Sequential Processes (CSP) and how they can be used as an alternative to async generators. In the process, we will also take a look at the ECMAScript proposal that async generators are a part of: “asynchronous iteration”
Let’s start by refreshing our knowledge of asynchronous iteration. It consists of three parts:
for-await-of
: a version of the for-of
loop that works with data that is async iterable.The next sections take a look at each of these parts. Afterwards, I’ll explain CSP and how it is an alternative to #3.
Synchronous iteration is supported by many languages. What all approaches have in common is that you need two things:
However, they differ in the specifics. For example:
Java: iterators have two methods: next()
returns values. hasNext()
returns a boolean indicating whether there are more values.
Python: values are delivered via the iterator method __next__()
. That method raises the exception StopIteration
if it is called after the last value.
Swift: iterators have a single method next()
that returns nil
after the last value.
For JavaScript, the idea was to use a single method without an exception, much like Swift does. However, unlike Swift, TC39 wanted to avoid bugs caused by end-of-iteration values being stored in collections. That’s why in JavaScript, the iterator method next()
always returns {value,done}
. For values, done
is false
. After the last value, done
is true
. You could say that the end-of-sequence value is truly metadata and kept out of band.
In async iteration, the {value,done}
objects are delivered via Promises. That does make everything even more complex, but the added complexity is necessary to fully support asynchronicity. Moreover, since Promises are a standard mechanism, async functions help when working with async iteration.
for-await-of
The for-await-of
loop looks as follows. It is based on the async iteration protocol and can be used inside async functions and async generators:
for await (const x of asyncIterable) {
console.log(x);
}
As of ES2017, we already have the following invokable entities in JavaScript:
Async generators would add to this already long list:
The core idea of async generators is:
await
, like async functionsyield
, like generators, but via async iteration, not via sync iteration.It would be nice if we didn’t need to introduce yet another callable entity to JavaScript. In the next section, I present Communicating Sequential Processes, which can be implemented via just a library in ES2017 and are an alternative to async generators.
CSP are a pattern for concurrent programming that involves two key abstractions:
Processes: are tasks that are, in general, executed concurrently. The code defining a Process is executed sequentially.
Channels: are first-in first-out (FIFO) queues that are used for communication between Processes. You put an element into a Channel via the operation put
and take an element out of a channel via the operation take
.
For this blog post, I’m using the CSP library async-csp by @dvlsg:
It implements Channel
as a class and put
and take
as Promise-based methods.
Processes are simply async functions that use await
to block on put
and take
. await
leads to a Process yielding execution to other JavaScript code, including other Processes, leading to a relatively coarse-grained cooperative form of multitasking.
In this section, I’ll compare async generators with CSP and you’ll see that they are quite similar.
The GitHub repository /async-iter-demo
contains code that processes a file asynchronously in the following steps:
readFile()
: read the contents of a file asynchronously.splitLines()
: split the result into lines.numberLines()
: number the resulting lines.logLines()
: log the numbered lines to the console.Async generators. You can’t directly feed asynchronous input from legacy sources into async generators. You need a data structure that is asynchronously iterable. AsyncQueue
is such a data structure that I implemented as a simple utility class. The following function starts asynchronous processing by reading a file, feeding its contents to an AsyncQueue
and returning that queue:
// Signature: string → AsyncIterable
function readFile(fileName) {
const queue = new AsyncQueue(); // A
const readStream = createReadStream(fileName,
{ encoding: 'utf8', bufferSize: 1024 });
readStream.on('data', buffer => {
const str = buffer.toString('utf8');
queue.enqueue(str); // (B)
});
readStream.on('error', err => {
queue.enqueue(err); // (C)
queue.close();
});
readStream.on('end', () => {
// Signal end of output sequence
queue.close(); // (D)
});
return queue;
}
The queue is created in line A. Input is added to it in line B. Errors are added like normal queue elements (line C), but are thrown when they are taken out of the queue. Once the stream ends, the queue is closed (line D).
CSP. ChannelWithErrors
augments the library’s Channel
with error handling. Similarly to the previous code, readFile()
starts asynchronous processing:
// Signature: string → Channel
function readFile(fileName : string) : Channel {
const channel = new ChannelWithErrors();
const readStream = fs.createReadStream(fileName,
{ encoding: 'utf8', bufferSize: 1024 });
readStream.on('data', buffer => {
const str = buffer.toString('utf8');
channel.put(str);
});
readStream.on('error', err => {
channel.putError(err); // (A)
});
readStream.on('end', () => {
// Signal end of output sequence
channel.close();
});
return channel;
}
This piece of code works almost the same as the previous one. Note the special method putError()
in line A.
In this post, we are skipping the relatively complex splitLines()
and advancing to numberLines()
, which is a good demonstration of a function that takes asynchronous input, transforms it and produces asynchronous output.
Async generators receive input via await
and produce output via yield
:
// Signature: AsyncIterable → AsyncIterable
async function* numberLines(lines) {
let n = 1;
for await (const line of lines) {
yield `${n} ${line}`;
n++;
}
}
You can see that for-await-of
is really useful, but you need to have an async iterable.
CSP. The CSP version of numberLines()
receives its input via one channel and produces its output via another channel:
// Signature: Channel × Channel → void
async function numberLines(inputChannel, outputChannel) {
for (let n=1;; n++) {
const line = await inputChannel.takeWithErrors();
if (line === ChannelWithErrors.DONE) {
break;
}
outputChannel.put(`${n} ${line}`);
}
outputChannel.close();
}
We’ll see later how functions that have the signature
Channel × Channel → void
can be composed to produce a single channel. For now, it is enough to see that a function with such a signature is easy to understand and that you only need things that ES2017 already has (async functions, Promises) plus library code.
I opted against using await
for put()
andd close()
. Whether or not that works depends on the Channel implementation one is using.
Function main()
puts everything together.
Async generators. Since each of the functions that are involved produce and possibly consume async iterables, simple function application is enough to connect them. The result is an asynchronous iterable asyncIterable
(line A) whose contents can be logged via logLines()
.
async function main() {
const fileName = process.argv[2];
const asyncIterable = numberLines(splitLines(readFile(fileName))); // (A)
await logLines(asyncIterable);
}
// Signature: AsyncIterable → void
async function logLines(asyncIterable) {
for await (const line of asyncIterable) {
console.log(line);
}
}
CSP. The custom tool function filter()
(line A) connects a channel (created via readFile()
) with functions that have two channels as parameters (input and output). The result is yet another channel whose contents can be logged via logLines()
.
async function main() {
const fileName = process.argv[2];
const ch = filter(readFile(fileName), splitLines, numberLines); // (A)
await logLines(ch);
}
// Signature: Channel → void
async function logLines(channel) {
while (true) {
const line = await channel.takeWithErrors();
if (line === ChannelWithErrors.DONE) break;
console.log(line);
}
}
I hope that the example showed that CSP are an interesting alternative to async generators. I’ve drawn a few more conclusions from working with both CSP and async generators:
If we do get async generators, they should come bundled with an asynchronously iterable data structure, because it is essential for connecting async iteration with legacy sources of async data. WHATWG streams are such a data structure, but they are not an ECMAScript feature.
Async generators do error handling well. With CSP, you need to manually feed errors into channels.
I like that CSP focus on the Channel as their core abstraction. It is something tangible and a location where lots of configuration will occur (buffering, handling backpressure, etc.).
CSP would benefit from two parts of the async iteration proposal: Channel
could implement the async iteration protocol (via a method [Symbol.asyncIterator]
), which would enable the handy for-await-of
and interoperability with tools based on async iteration (incl. async generators).
Sync iteration: chapter “Iterables and iterators” in “Exploring ES6”
Chapter “Promises for asynchronous programming” in “Exploring ES6”
Chapter “Callable entities in ECMAScript 6” in “Exploring ES6”
Chapter “Async functions” in “Exploring ES2016 and ES2017”
Async iteration: post “ES proposal: asynchronous iteration” on 2ality