Web streams are a standard for streams that is now supported on all major web platforms: web browsers, Node.js, and Deno. (Streams are an abstraction for reading and writing data sequentially in small pieces from all kinds of sources – files, data hosted on servers, etc.)
For example, the global function fetch()
(which downloads online resources) asynchronously returns a Response which has a property .body
with a web stream.
This blog post covers web streams on Node.js, but most of what we learn applies to all web platforms that support them.
Let’s start with an overview of a few fundamentals of web streams. Afterwards, we’ll quickly move on to examples.
Streams are a data structure for accessing data such as:
Two of their benefits are:
We can work with large amounts of data because streams allow us to split them up into smaller pieces (so-called chunks) which we can process one at a time.
We can work with the same data structure, streams, while processing different data. That makes it easier to reuse code.
Web streams (“web” is often omitted) are a relatively new standard that originated in web browsers but is now also supported by Node.js and Deno (as shown in this MDN compatibility table).
In web streams, chunks are usually either:
There are three main kinds of web streams:
A ReadableStream is used to read data from a source. Code that does that is called a consumer.
A WritableStream is used to write data to a sink. Code that does that is called a producer.
A TransformStream consists of two streams:
The idea is to transform data by “piping it through” a TransformStream. That is, we write data to the writable side and read transformed data from the readable side. The following TransformStreams are built into most JavaScript platforms (more on them later):
TextDecoderStream
converts such data to strings.TextEncoderStream
converts JavaScript strings to UTF-8 data.DecompressionStream
decompresses binary data from GZIP and other compression formats.ReadableStreams, WritableStreams and TransformStreams can be used to transport text or binary data. We’ll mostly do the former in this post. Byte streams for binary data are briefly mentioned at the end.
Piping is an operation that lets us pipe a ReadableStream to a WritableStream: As long as the ReadableStream produces data, this operation reads that data and writes it to the WritableStream. If we connect just two streams, we get a convenient way of transferring data from one location to another (e.g. to copy a file). However, we can also connect more than two streams and get pipe chains that can process data in a variety of ways. This is an example of a pipe chain:
A ReadableStream is connected to a TransformStream by piping the former to the writable side of the latter. Similarly, a TransformStream is connected to another TransformStream by piping the readable side of the former to the writable side of the latter. And a TransformStream is connected to a WritableStream by piping the readable side of the former to the latter.
One problem in pipe chains is that a member may receive more data than it can handle at the moment. Backpressure is a technique for solving this problem: It enables a receiver of data to tell its sender that it should temporarily stop sending data so that the receiver doesn’t get overwhelmed.
Another way to look at backpressure is as a signal that travels backwards through a pipe chain, from a member that is getting overwhelmed to the beginning of the chain. As an example, consider the following pipe chain:
ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream
This is how backpressure travels through this chain:
We have reached the beginning of the pipe chain. Therefore, no data accumulates inside the ReadableStream (which is also buffered) and the WriteableStream has time to recover. Once it does, it signals that it is ready to receive data again. That signal also travels back through the chain until it reaches the ReadableStream and data processing resumes.
In this first look at backpressure, several details were omitted to make things easier to understand. These will be covered later.
In Node.js, web streams are available from two sources:
'node:stream/web'
At the moment, only one API has direct support for web streams in Node.js – the Fetch API:
const response = await fetch('https://example.com');
const readableStream = response.body;
For other things, we need to use one of the following static methods in module 'node:stream'
to either convert a Node.js stream to a web stream or vice versa:
Readable.toWeb(nodeReadable)
Readable.fromWeb(webReadableStream, options?)
Writable.toWeb(nodeWritable)
Writable.fromWeb(webWritableStream, options?)
Duplex.toWeb(nodeDuplex)
Duplex.fromWeb(webTransformStream, options?)
One other API partially supports web streams: FileHandles have the method .readableWebStream()
.
ReadableStreams let us read chunks of data from various sources. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface ReadableStream<TChunk> {
getReader(): ReadableStreamDefaultReader<TChunk>;
readonly locked: boolean;
[Symbol.asyncIterator](): AsyncIterator<TChunk>;
cancel(reason?: any): Promise<void>;
pipeTo(
destination: WritableStream<TChunk>,
options?: StreamPipeOptions
): Promise<void>;
pipeThrough<TChunk2>(
transform: ReadableWritablePair<TChunk2, TChunk>,
options?: StreamPipeOptions
): ReadableStream<TChunk2>;
// Not used in this blog post:
tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}
interface StreamPipeOptions {
signal?: AbortSignal;
preventClose?: boolean;
preventAbort?: boolean;
preventCancel?: boolean;
}
Explanations of these properties:
.getReader()
returns a Reader – an object through which we can read from a ReadableStream. ReadableStreams returning Readers is similar to iterables returning iterators..locked
: There can only be one active Reader per ReadableStream at a time. While one Reader is in use, the ReadableStream is locked and .getReader()
cannot be invoked.[Symbol.asyncIterator](https://exploringjs.com/impatient-js/ch_async-iteration.html)
: This method makes ReadableStreams asynchronously iterable. It is currently only implemented on some platforms..cancel(reason)
cancels the stream because the consumer isn’t interested in it anymore. reason
is passed on to the .cancel()
method of the ReadableStream’s underlying source (more on that later). The returned Promise fulfills when this operation is done..pipeTo()
feeds the contents of its ReadableStream to a WritableStream. The returned Promise fulfills when this operation is done. .pipeTo()
ensures that backpressure, closing, errors, etc. are all correctly propagated through a pipe chain. We can specify options via its second parameter:
.signal
lets us pass an AbortSignal to this method, which enables us to abort piping via an AbortController..preventClose
: If true
, it prevents the WritableStream from being closed when the ReadableStream is closed. That is useful when we want to pipe more than one ReadableStream to the same WritableStream..pipeThrough()
connects its ReadableStream to a ReadableWritablePair (roughly: a TransformStream, more on that later). It returns the resulting ReadableStream (i.e., the readable side of the ReadableWritablePair).The following subsections cover two ways of consuming ReadableStreams:
We can use Readers to read data from ReadableStreams. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface ReadableStreamGenericReader {
readonly closed: Promise<undefined>;
cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
extends ReadableStreamGenericReader
{
releaseLock(): void;
read(): Promise<ReadableStreamReadResult<TChunk>>;
}
interface ReadableStreamReadResult<TChunk> {
done: boolean;
value: TChunk | undefined;
}
Explanations of these properties:
.closed
: This Promise is fulfilled after the stream is closed. It is rejected if the stream errors or if a Reader’s lock is released before the stream is closed..cancel()
: In an active Reader, this method cancels the associated ReadableStream..releaseLock()
deactivates the Reader and unlocks its stream..read()
returns a Promise for a ReadableStreamReadResult (a wrapped chunk) which has two properties:
.done
is a boolean that is false
as long as chunks can be read and true
after the last chunk..value
is the chunk (or undefined
after the last chunk).ReadableStreamReadResult may look familiar if you know how iteration works: ReadableStreams are similar to iterables, Readers are similar to iterators, and ReadableStreamReadResults are similar to the objects returned by the iterator method .next()
.
The following code demonstrates the protocol for using Readers:
const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
while (true) {
const {done, value: chunk} = await reader.read(); // (C)
if (done) break;
// Use `chunk`
}
} finally {
reader.releaseLock(); // (D)
}
Getting a Reader. We can’t read directly from readableStream
, we first need to acquire a Reader (line A). Each ReadableStream can have at most one Reader. After a Reader was acquired, readableStream
is locked (line B). Before we can call .getReader()
again, we must call .releaseLock()
(line D).
Reading chunks. .read()
returns a Promise for an object with the properties .done
and .value
(line C). After the last chunk was read, .done
is true
. This approach is similar to how asynchronous iteration works in JavaScript.
In the following example, we read chunks (strings) from a text file data.txt
:
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)
const reader = webReadableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}
} finally {
reader.releaseLock();
}
// Output:
// 'Content of text file\n'
We are converting a Node.js Readable to a web ReadableStream (line A). Then we use the previously explained protocol to read the chunks.
In the next example, we concatenate all chunks of a ReadableStream into a string and return it:
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString(readableStream) {
const reader = readableStream.getReader();
try {
let result = '';
while (true) {
const {done, value} = await reader.read();
if (done) {
return result; // (A)
}
result += value;
}
} finally {
reader.releaseLock(); // (B)
}
}
Conveniently, the finally
clause is always executed – now matter how we leave the try
clause. That is, the lock is correctly released (line B) if we return a result (line A).
ReadableStreams can also be consumed via asynchronous iteration:
const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
while (true) {
let chunk;
({done: exhaustive, value: chunk} = await iterator.next());
if (exhaustive) break;
console.log(chunk);
}
} finally {
// If the loop was terminated before we could iterate exhaustively
// (via an exception or `return`), we must call `iterator.return()`.
// Check if that was the case.
if (!exhaustive) {
iterator.return();
}
}
Thankfully, the for-await-of
loop handles all the details of asynchronous iteration for us:
for await (const chunk of readableStream) {
console.log(chunk);
}
Let’s redo our previous attempt to read text from a file. This time, we use asynchronous iteration instead of a Reader:
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
console.log(chunk);
}
// Output:
// 'Content of text file'
At the moment, Node.js and Deno support asynchronous iteration over ReadableStreams but web browsers don’t: There is a GitHub issue that links to bug reports.
Given that it’s not yet completely clear how async iteration will be supported on browsers, wrapping is a safer choice than polyfilling. The following code is based on a suggestion in the Chromium bug report:
async function* getAsyncIterableFor(readableStream) {
const reader = readableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}
ReadableStreams have two methods for creating pipe chains:
readableStream.pipeTo(writeableStream)
synchronously returns a Promise p
. It asynchronously reads all chunks of readableStream
and writes them to writableStream
. When it is done, it fulfills p
.
We’ll see examples of .pipeTo()
when we explore WritableStreams, as it provides a convenient way to transfer data into them.
readableStream.pipeThrough(transformStream)
pipes readableStream
into transformStream.writable
and returns transformStream.readable
(every TransformStream has these properties that refer to its writable side and its readable side). Another way to view this operation is that we create a new ReadableStream by connecting a transformStream
to a readableStream
.
We’ll see examples of .pipeThrough()
when we explore TransformStreams, as this method is the main way in which they are used.
If we want to read an external source via a ReadableStream, we can wrap it in an adapter object and pass that object to the ReadableStream
constructor. The adapter object is called the underlying source of the ReadableStream (queuing strategies are explained later, when we take a closer look at backpressure):
new ReadableStream(underlyingSource?, queuingStrategy?)
This is the type of underlying sources (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface UnderlyingSource<TChunk> {
start?(
controller: ReadableStreamController<TChunk>
): void | Promise<void>;
pull?(
controller: ReadableStreamController<TChunk>
): void | Promise<void>;
cancel?(reason?: any): void | Promise<void>;
// Only used in byte streams and ignored in this section:
type: 'bytes' | undefined;
autoAllocateChunkSize: bigint;
}
This is when the ReadableStream calls these methods:
.start(controller)
is called immediately after we invoke the constructor of ReadableStream
.
.pull(controller)
is called whenever there is room in the internal queue of the ReadableStream. It is called repeatedly until the queue is full again. This method will only be called after .start()
is finished. If .pull()
doesn’t enqueue anything, it won’t be called again.
.cancel(reason)
is called if the consumer of a ReadableStream cancels it via readableStream.cancel()
or reader.cancel()
. reason
is the value that was passed to these methods.
Each of these methods can return a Promise and no further steps will be taken until the Promise is settled. That is useful if we want to do something asynchronous.
The parameter controller
of .start()
and .pull()
lets them access the stream. It has the following type:
type ReadableStreamController<TChunk> =
| ReadableStreamDefaultController<TChunk>
| ReadableByteStreamController<TChunk> // ignored here
;
interface ReadableStreamDefaultController<TChunk> {
enqueue(chunk?: TChunk): void;
readonly desiredSize: number | null;
close(): void;
error(err?: any): void;
}
For now, chunks are strings. We’ll later get to byte streams, where Uint8Arrays are common. This is what the methods do:
.enqueue(chunk)
adds chunk
to the ReadableStream’s internal queue..desiredSize
indicates how much room there is in the queue into which .enqueue()
writes. It is zero if the queue is full and negative if it has exceeded its maximum size. Therefore, if the desired size is zero or negative, we have to stop enqueuing.
null
..close()
closes the ReadableStream. Consumers will still be able to empty the queue, but after that, the stream ends. It’s important that an underlying source calls this method – otherwise, reading its stream will never finish..error(err)
puts the stream in an error mode: All future interactions with it will fail with the error value err
.In our first example of implementing an underlying source, we only provide method .start()
. We’ll see use cases for .pull()
in the next subsection.
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('First line\n'); // (A)
controller.enqueue('Second line\n'); // (B)
controller.close(); // (C)
},
});
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'First line\n'
// 'Second line\n'
We use the controller to create a stream with two chunks (line A and line B). It’s important that we close the stream (line C). Otherwise, the for-await-of
loop would never finish!
Note that this way of enqueuing isn’t completely safe: There is a risk of exceding the capacity of the internal queue. We’ll see soon how we can avoid that risk.
A common scenario is turning a push source or a pull source into a ReadableStream. The source being push or pull determines how we will hook into the ReadableStream with our UnderlyingSource:
Push source: Such a source notifies us when there is new data. We use .start()
to set up listeners and supporting data structures. If we receive too much data and the desired size isn’t positive anymore, we must tell our source to pause. If .pull()
is called later, we can unpause it. Pausing an external source in reaction to the desired size becoming non-positive is called applying backpressure.
Pull source: We ask such a source for new data – often asynchronously. Therefore, we usually don’t do much in .start()
and retrieve data whenever .pull()
is called.
We’ll see examples for both kinds of sources next.
In the following example, we wrap a ReadableStream around a socket – which pushes its data to us (it calls us). This example is taken from the web stream specification:
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
socket.ondata = event => {
controller.enqueue(event.data);
if (controller.desiredSize <= 0) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
socket.readStop();
}
};
socket.onend = () => controller.close();
socket.onerror = () => controller.error(
new Error('The socket errored!'));
},
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
socket.readStart();
},
cancel() {
socket.close();
},
});
}
The tool function iterableToReadableStream()
takes an iterable over chunks and turns it into a ReadableStream:
/**
* @param iterable an iterable (asynchronous or synchronous)
*/
function iterableToReadableStream(iterable) {
return new ReadableStream({
start() {
if (typeof iterable[Symbol.asyncIterator] === 'function') {
this.iterator = iterable[Symbol.asyncIterator]();
} else if (typeof iterable[Symbol.iterator] === 'function') {
this.iterator = iterable[Symbol.iterator]();
} else {
throw new Error('Not an iterable: ' + iterable);
}
},
async pull(controller) {
if (this.iterator === null) return;
// Sync iterators return non-Promise values,
// but `await` doesn’t mind and simply passes them on
const {value, done} = await this.iterator.next();
if (done) {
this.iterator = null;
controller.close();
return;
}
controller.enqueue(value);
},
cancel() {
this.iterator = null;
controller.close();
},
});
}
Let’s use an async generator function to create an asynchronous iterable and turn that iterable into a ReadableStream:
async function* genAsyncIterable() {
yield 'how';
yield 'are';
yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'how'
// 'are'
// 'you'
iterableToReadableStream()
also works with synchronous iterables:
const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'hello'
// 'everyone'
There may eventually by a static helper method ReadableStream.from()
that provides this functionality (see its pull request for more information).
WritableStreams let us write chunks of data to various sinks. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface WritableStream<TChunk> {
getWriter(): WritableStreamDefaultWriter<TChunk>;
readonly locked: boolean;
close(): Promise<void>;
abort(reason?: any): Promise<void>;
}
Explanations of these properties:
.getWriter()
returns a Writer – an object through which we can write to a WritableStream..locked
: There can only be one active Writer per WritableStream at a time. While one Writer is in use, the WritableStream is locked and .getWriter()
cannot be invoked..close()
closes the stream:
.abort()
aborts the stream:
The following subsections cover two approaches to sending data to WritableStreams:
We can use Writers to write to WritableStreams. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface WritableStreamDefaultWriter<TChunk> {
readonly desiredSize: number | null;
readonly ready: Promise<undefined>;
write(chunk?: TChunk): Promise<void>;
releaseLock(): void;
close(): Promise<void>;
readonly closed: Promise<undefined>;
abort(reason?: any): Promise<void>;
}
Explanations of these properties:
.desiredSize
indicates how much room there is in this WriteStream’s queue. It is zero if the queue is full and negative if it has exceeded its maximum size. Therefore, if the desired size is zero or negative, we have to stop writing.
null
..ready
returns a Promise that is fulfilled when the desired size changes from non-positive to positive. That means that no backpressure is active and it’s OK to write data. If the desired size later changes back to non-positive, a new pending Promise is created and returned.
.write()
writes a chunk to the stream. It returns a Promise that is fulfilled after writing succeeds and rejected if there is an error.
.releaseLock()
releases the Writer’s lock on its stream.
.close()
has the same effect as closing the Writer’s stream.
.closed
returns a Promise that is fulfilled when the stream is closed.
.abort()
has the same effect as aborting the Writer’s stream.
The following code shows the protocol for using Writers:
const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
// Writing the chunks (explained later)
} finally {
writer.releaseLock(); // (C)
}
We can’t write directly to a writableStream
, we first need to acquire a Writer (line A). Each WritableStream can have at most one Writer. After a Writer was acquired, writableStream
is locked (line B). Before we can call .getWriter()
again, we must call .releaseLock()
(line C).
There are three approaches to writing chunks.
.write()
(handling backpressure inefficiently) The first writing approach is to await each result of .write()
:
await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();
The Promise returned by .write()
fulfills when the chunk that we passed to it, was successfully written. What exactly “successfully written” means, depends on how a WritableStream is implemented – e.g., with a file stream, the chunk may have been sent to the operating system but still reside in a cache and therefore not have actually been written to disk.
The Promise returned by .close()
is fulfilled when the stream becomes closed.
A downside of this writing approach is that waiting until writing succeeds means that the queue isn’t used. As a consequence, data throughput may be lower.
.write()
rejections (ignoring backpressure) In the second writing approach, we ignore the Promises returned by .write()
and only await the Promise returned by .close()
:
writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errors
The synchronous invocations of .write()
add chunks to the internal queue of the WritableStream. By not awaiting the returned Promises, we don’t wait until each chunk is written. However, awaiting .close()
ensures that the queue is empty and all writing succeeded before we continue.
Invoking .catch()
in line A and line B is necessary to avoid warnings about unhandled Promise rejections when something goes wrong during writing. Such warnings are often logged to the console. We can afford to ignore the errors reported by .write()
because .close()
will also report them to us.
The previous code can be improved by using a helper function that ignores Promise rejections:
ignoreRejections(
writer.write('Chunk 1'),
writer.write('Chunk 2'),
);
await writer.close(); // reports errors
function ignoreRejections(...promises) {
for (const promise of promises) {
promise.catch(() => {});
}
}
One downside of this approach is that backpressure is ignored: We simply assume that the queue is big enough to hold everything we write.
.ready
(handling backpressure efficiently) In this writing approach, we handle backpressure efficiently by awaiting the Writer getter .ready
:
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});
await writer.close(); // reports errors
The Promise in .ready
fulfills whenever the stream transitions from having backpressure to not having backpressure.
In this example, we create a text file data.txt
via a WritableStream:
import * as fs from 'node:fs';
import {Writable} from 'node:stream';
const nodeWritable = fs.createWriteStream(
'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)
const writer = webWritableStream.getWriter();
try {
await writer.write('First line\n');
await writer.write('Second line\n');
await writer.close();
} finally {
writer.releaseLock()
}
In line A, we create a Node.js stream for the file data.txt
. In line B, we convert this stream to a web stream. Then we use a Writer to write strings to it.
Instead of using Writers, we can also write to WritableStreams by piping ReadableStreams to them:
await readableStream.pipeTo(writableStream);
The Promise returned by .pipeTo()
fulfills when piping finishes successfully.
Piping is performed after the current task completes or pauses. The following code demonstrates that:
const readableStream = new ReadableStream({ // (A)
start(controller) {
controller.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
},
});
const writableStream = new WritableStream({ // (B)
write(chunk) {
console.log('WRITE: ' + JSON.stringify(chunk));
},
close() {
console.log('CLOSE WritableStream');
},
});
console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');
// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'
In line A we create a ReadableStream. In line B we create a WritableStream.
We can see that .pipeTo()
(line C) returns immediately. In a new task, chunks are read and written. Then writableStream
is closed and, finally, promise
is fulfilled.
In the following example, we create a WritableStream for a file and pipe a ReadableStream to it:
const webReadableStream = new ReadableStream({ // (A)
async start(controller) {
controller.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
},
});
const nodeWritable = fs.createWriteStream( // (B)
'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)
await webReadableStream.pipeTo(webWritableStream); // (D)
In line A, we create a ReadableStream. In line B, we create a Node.js stream for the file data.txt
. In line C, we convert this stream to a web stream. In line D, we pipe our webReadableStream
to the WritableStream for the file.
In the following example, we write two ReadableStreams to a single WritableStream.
function createReadableStream(prefix) {
return new ReadableStream({
async start(controller) {
controller.enqueue(prefix + 'chunk 1');
controller.enqueue(prefix + 'chunk 2');
controller.close();
},
});
}
const writableStream = new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
},
close() {
console.log('CLOSE');
},
abort(err) {
console.log('ABORT ' + err);
},
});
await createReadableStream('Stream 1: ')
.pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
.pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();
// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'
We tell .pipeTo()
to not close the WritableStream after the ReadableStream is closed (line A and line B). Therefore, the WritableStream remains open after line A and we can pipe another ReadableStream to it.
If we want to write to an external sink via a WritableStream, we can wrap it in an adapter object and pass that object to the WritableStream
constructor. The adapter object is called the underlying sink of the WritableStream (queuing strategies are explained later, when we take a closer look at backpressure):
new WritableStream(underlyingSink?, queuingStrategy?)
This is the type of underlying sinks (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface UnderlyingSink<TChunk> {
start?(
controller: WritableStreamDefaultController
): void | Promise<void>;
write?(
chunk: TChunk,
controller: WritableStreamDefaultController
): void | Promise<void>;
close?(): void | Promise<void>;;
abort?(reason?: any): void | Promise<void>;
}
Explanations of these properties:
.start(controller)
is called immediately after we invoke the constructor of WritableStream
. If we do something asynchronous, we can return a Promise. In this method, we can prepare for writing.
.write(chunk, controller)
is called when a new chunk is ready to be written to the external sink. We can exert backpressure by returning a Promise that fulfills once the backpressure is gone.
.close()
is called after writer.close()
was called and all queued writes succeeded. In this method, we can clean up after writing.
.abort(reason)
is called if writeStream.abort()
or writer.abort()
were invoked. reason
is the value passed to these methods.
The parameter controller
of .start()
and .write()
lets them error the WritableStream. It has the following type:
interface WritableStreamDefaultController {
readonly signal: AbortSignal;
error(err?: any): void;
}
.signal
is an AbortSignal that we can listen to if we want to abort a write or close operation when the stream is aborted..error(err)
errors the WritableStream: It is closed and all future interactions with it fail with the error value err
.In the next example, we pipe a ReadableStream to a WritableStream in order to check how the ReadableStream produces chunks:
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('First chunk');
controller.enqueue('Second chunk');
controller.close();
},
});
await readableStream.pipeTo(
new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
},
close() {
console.log('CLOSE');
},
abort(err) {
console.log('ABORT ' + err);
},
})
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'
In the next example, we create a subclass of WriteStream
that collects all written chunks in a string. We can access that string via method .getString()
:
class WritableStringStream extends WritableStream {
#string = '';
constructor() {
super({
// We need to access the `this` of `WritableStringStream`.
// Hence the arrow function (and not a method).
write: (chunk) => {
this.#string += chunk;
},
});
}
getString() {
return this.#string;
}
}
const stringStream = new WritableStringStream();
const writer = stringStream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
} finally {
writer.releaseLock()
}
assert.equal(
stringStream.getString(),
'How are you?'
);
A downside of this approach is that we are mixing two APIs: The API of WritableStream
and our new string stream API. An alternative is to delegate to the WritableStream instead of extending it:
function createWritableStringStream() {
let string = '';
return {
stream: new WritableStream({
write(chunk) {
string += chunk;
},
}),
getString() {
return string;
},
};
}
const stringStream = createWritableStringStream();
const writer = stringStream.stream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
} finally {
writer.releaseLock()
}
assert.equal(
stringStream.getString(),
'How are you?'
);
This functionality could also be implemented via a class (instead of as a factory function for objects).
A TransformStream:
The most common way to use TransformStreams is to “pipe through” them:
const transformedStream = readableStream.pipeThrough(transformStream);
.pipeThrough()
pipes readableStream
to the writable side of transformStream
and returns its readable side. In other words: We have created a new ReadableStream that is a transformed version of readableStream
.
.pipeThrough()
accepts not only TransformStreams, but any object that has the following shape:
interface ReadableWritablePair<RChunk, WChunk> {
readable: ReadableStream<RChunk>;
writable: WritableStream<WChunk>;
}
Node.js supports the following standard TransformStreams:
Encoding (WHATWG standard) – TextEncoderStream
and TextDecoderStream
:
TextDecoderStream
handles these cases correctly.TextEncoderStream
, TextDecoderStream
).Compression Streams (W3C Draft Community Group Report) – CompressionStream
, DecompressionStream
:
deflate
(ZLIB Compressed Data Format), deflate-raw
(DEFLATE algorithm), gzip
(GZIP file format).CompressionStream
, DecompressionStream
).In the following example, we decode a stream of UTF-8-encoded bytes:
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
response.body
is a ReadableByteStream whose chunks are instances of Uint8Array
(TypedArrays). We pipe that stream through a TextDecoderStream
to get a stream that has string chunks.
Note that translating each byte chunk separately (e.g. via a TextDecoder
) doesn’t work because a single Unicode code point is encoded as up to four bytes in UTF-8 and those bytes might not all be in the same chunk.
The following Node.js module logs everything that is sent to it via standard input:
// echo-stdin.mjs
import {Readable} from 'node:stream';
const webStream = Readable.toWeb(process.stdin)
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
console.log('>>>', chunk);
}
We can access standard input via a stream stored in process.stdin
(process
is a global Node.js variable). If we don’t set an encoding for this stream and convert it via Readable.toWeb()
, we get a byte stream. We pipe it through a TextDecoderStream in order to get a text stream.
Note that we process standard input incrementally: As soon as another chunk is available, we log it. In other words, we don’t wait until standard input is finished. That is useful when the data is either large or only sent intermittently.
We can implement a custom TransformStream by passing a Transformer object to the constructor of TransformStream
. Such has object has the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):
interface Transformer<TInChunk, TOutChunk> {
start?(
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
transform?(
chunk: TInChunk,
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
flush?(
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
}
Explanations of these properties:
.start(controller)
is called immediately after we invoke the constructor of TransformStream
. Here we can prepare things before the transformations start..transform(chunk, controller)
performs the actual transformations. It receives an input chunk and can use its parameter controller
to enqueue one or more transformed output chunks. It can also choose not to enqueue anything at all..flush(controller)
is called after all input chunks were transformed successfully. Here we can perform clean-ups after the transformations are done.Each of these methods can return a Promise and no further steps will be taken until the Promise is settled. That is useful if we want to do something asynchronous.
The parameter controller
has the following type:
interface TransformStreamDefaultController<TOutChunk> {
enqueue(chunk?: TOutChunk): void;
readonly desiredSize: number | null;
terminate(): void;
error(err?: any): void;
}
.enqueue(chunk)
adds chunk
to the readable side (output) of the TransformStream..desiredSize
returns the desired size of the internal queue of the readable side (output) of the TransformStream..terminate()
closes the readable side (output) and errors the writable side (input) of the TransformStream. It can be used if a transformer is not interested in the remaining chunks of the writable side (input) and wants to skip them..error(err)
errors the TransformStream: All future interactions with it will fail with the error value err
.What about backpressure in a TransformStream? The class propagates the backpressure from its readable side (output) to its writable side (input). The assumption is that transforming doesn’t change the amount of data much. Therefore, Transforms can get away with ignoring backpressure. However, it could be detected via transformStreamDefaultController.desiredSize
and propagated by returning a Promise from transformer.transform()
.
The following subclass of TransformStream
converts a stream with arbitrary chunks into a stream where each chunk comprises exactly one line of text. That is, with the possible exception of the last chunk, each chunk ends with an end-of-line (EOL) string: '\n'
on Unix (incl. macOS) and '\r\n'
on Windows.
class ChunksToLinesTransformer {
#previous = '';
transform(chunk, controller) {
let startSearch = this.#previous.length;
this.#previous += chunk;
while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = this.#previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// line includes the EOL
const line = this.#previous.slice(0, eolIndex+1);
controller.enqueue(line);
this.#previous = this.#previous.slice(eolIndex+1);
startSearch = 0;
}
}
flush(controller) {
// Clean up and enqueue any text we’re still holding on to
if (this.#previous.length > 0) {
controller.enqueue(this.#previous);
}
}
}
class ChunksToLinesStream extends TransformStream {
constructor() {
super(new ChunksToLinesTransformer());
}
}
const stream = new ReadableStream({
async start(controller) {
controller.enqueue('multiple\nlines of\ntext');
controller.close();
},
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);
for await (const line of transformed) {
console.log('>>>', JSON.stringify(line));
}
// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'
Note that Deno’s built-in TextLineStream
provides similar functionality.
Due to ReadableStreams being asynchronously iterable, we can use asynchronous generators to transform them. That leads to very elegant code:
const stream = new ReadableStream({
async start(controller) {
controller.enqueue('one');
controller.enqueue('two');
controller.enqueue('three');
controller.close();
},
});
async function* prefixChunks(prefix, asyncIterable) {
for await (const chunk of asyncIterable) {
yield '> ' + chunk;
}
}
const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
console.log(transformedChunk);
}
// Output:
// '> one'
// '> two'
// '> three'
Let’s take a closer look at backpressure. Consider the following pipe chain:
rs.pipeThrough(ts).pipeTo(ws);
rs
is a ReadableStream, ts
is a TransformStream, ws
is a WritableStream. These are the connections that are created by the previous expression (.pipeThrough
uses .pipeTo
to connect rs
to the writable side of ts
):
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
Observations:
rs
can be viewed as a pipe chain member that comes before rs
.ws
can be viewed as a pipe chain member that comes after ws
.Let’s assume that the underlying sink of ws
is slow and the buffer of ws
is eventually full. Then the following steps happen:
ws
signals it’s full.pipeTo
stops reading from ts.readable
.ts.readable
signals it’s full.ts
stops moving chunks from ts.writable
to ts.readable
.ts.writable
signals it’s full.pipeTo
stops reading from rs
.rs
signals it’s full to its underlying source.This example illustrates that we need two kinds of functionality:
Let’s explore how these functionalities are implemented in the web streams API.
Backpressure is signalled by entities that are receiving data. Web streams have two such entities:
.write()
..enqueue()
.In both cases, the input is buffered via queues. The signal to apply backpressure is when a queue is full. Let’s see how that can be detected.
These are the locations of the queues:
The desired size of a queue is a number that indicates how much room is left in the queue:
Therefore, we have to apply backpressure if the desired size is zero or less. It is available via the getter .desiredSize
of the object which contains the queue.
How is the desired size computed? Via an object that specifies a so-called queuing strategy. ReadableStream
and WritableStream
have default queuing strategies which can be overridden via optional parameters of their constructors. The interface QueuingStrategy
has two properties:
.size(chunk)
returns a size for chunk
.
.highWaterMark
specifies the maximum size of a queue.The desired size of a queue is the high water mark minus the current size of the queue.
Entities sending data need to react to signalled backpressure by exerting backpressure.
We can await the Promise in writer.ready
. While we do, we are blocked and the desired backpressure is achieved. The Promise is fulfilled once there is room in the queue. Fulfillment is triggered when writer.desiredSize
has a value greater than zero.
Alternatively, we can await the Promise returned by writer.write()
. If we do that, the queue won’t even be filled.
If we want to, we can additionally base the size of our chunks on writer.desiredSize
.
The underlying source object that can be passed to a ReadableStream wraps an external source. In a way, it is also a member of the pipe chain; one that comes before its ReadableStream.
Underlying pull sources are only asked for new data whenever there is room in the queue. While there isn’t, backpressure is exerted automatically because no data is pulled.
Underlying push sources should check controller.desiredSize
after enqueuing something: If it’s zero or less, they should exert backpressure by pausing their external sources.
The underlying sink object that can be passed to a WritableStream wraps an external sink. In a way, it is also a member of the pipe chain; one that comes after its WritableStream.
Each external sink signals backpressure differently (in some cases not at all). The underlying sink can exert backpressure by returning a Promise from method .write()
that is fulfilled once writing is finished. There is an example in the web streams standard that demonstrates how that works.
.writable
→
.readable
) The TransformStream connects its writable side with its readable side by implementing an underlying sink for the former and an underlying source for the latter. It has an internal slot .[[backpressure]]
that indicates if internal backpressure is currently active or not.
Method .write()
of the underlying sink of the writable side waits asynchronously until there is no internal backpressure before it feeds another chunk to the TransformStream’s transformer (web streams standard: TransformStreamDefaultSinkWriteAlgorithm
). The transformer may then enqueue something via its TransformStreamDefaultController. Note that .write()
returns a Promise that fulfills when the method is finished. Until that happens, the WriteStream buffers incoming write requests via its queue. Therefore, backpressure for the writable side is signalled via that queue and its desired size.
The TransformStream’s backpressure is activated if a chunk is enqueued via the TransformStreamDefaultController and the queue of the readable side becomes full (web streams standard: TransformStreamDefaultControllerEnqueue
).
The TransformStream’s backpressure may be deactivated if something is read from the Reader (web streams standard: ReadableStreamDefaultReaderRead
):
.pull()
of the underlying source (web streams standard: .[[PullSteps]]
)..pull()
of the underlying source of the readable side deactivates the backpressure (web streams standard: TransformStreamDefaultSourcePullAlgorithm
)..pipeTo()
(ReadableStream →
WritableStream) .pipeTo()
reads chunks from the ReadableStream via a reader and write them to the WritableStream via a Writer. It pauses whenever writer.desiredSize
is zero or less (web streams standard: Step 15 of ReadableStreamPipeTo
).
So far, we have only worked with text streams, streams whose chunks were strings. But the web streams API also supports byte streams for binary data, where chunks are Uint8Arrays (TypedArrays):
ReadableStream
has a special 'bytes'
mode.WritableStream
itself doesn’t care if chunks are strings or Uint8Arrays. Therefore, whether an instance is a text stream or a byte stream depends on what kind of chunks the underlying sink can handle.TransformStream
can handle also depends on its Transformer.Next, we’ll learn how to create readable byte streams.
What kind of stream is created by the ReadableStream
constructor depends on the optional property .type
of its optional first parameter underlyingSource
:
.type
is omitted or no underlying source is provided, the new instance is a text stream..type
is the string 'bytes'
, the new instance is a byte stream:const readableByteStream = new ReadableStream({
type: 'bytes',
async start() { /*...*/ }
// ...
});
What changes if a ReadableStream is in 'bytes'
mode?
In default mode, the underlying source can return any kind of chunk. In bytes mode, the chunks must be ArrayBufferViews, i.e. TypedArrays (such as Uint8Arrays) or DataViews.
Additionally, a readable byte stream can create two kinds of readers:
.getReader()
returns an instance of ReadableStreamDefaultReader
..getReader({mode: 'byob'})
returns an instance of ReadableStreamBYOBReader
.“BYOB“ stands for “Bring Your Own Buffer” and means that we can pass a buffer (an ArrayBufferView) to reader.read()
. Afterwards, that ArrayBufferView will be detached and no longer usable. But .read()
returns its data in a new ArrayBufferView that has the same type and accesses the same region of the same ArrayBuffer.
Additionally, readable byte streams have different controllers: They are instances of ReadableByteStreamController
(vs. ReadableStreamDefaultController
). Apart from forcing underlying sources to enqueue ArrayBufferViews (TypedArrays or DataViews), it also supports ReadableStreamBYOBReaders via its property .byobRequest
. An underlying source writes its data into the BYOBRequest stored in this property. The web streams standard has two examples of using .byobRequest
in its section “Examples of creating streams”.
In the next example, create an infinite readable byte stream that fills its chunks with random data (inspiration: example4.mjs
in “Implementing the Web Streams API in Node.js”).
import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);
const readableByteStream = new ReadableStream({
type: 'bytes',
async pull(controller) {
const byobRequest = controller.byobRequest;
await asyncRandomFill(byobRequest.view);
byobRequest.respond(byobRequest.view.byteLength);
},
});
const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);
Due to readableByteStream
being infinite, we can’t loop over it. That’s why we only read its first chunk (line B).
The buffer we create in line A is transferred and therefore unreadable after line B.
In the following example, we create a readable byte stream and pipe it through a stream that compresses it to the GZIP format:
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// 256 zeros
controller.enqueue(new Uint8Array(256));
controller.close();
},
});
const transformedStream = readableByteStream.pipeThrough(
new CompressionStream('gzip'));
await logChunks(transformedStream);
async function logChunks(readableByteStream) {
const reader = readableByteStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}
} finally {
reader.releaseLock();
}
}
fetch()
The result of fetch()
resolves to a response object whose property .body
is a readable byte stream. We convert that byte stream to a text stream via TextDecoderStream
:
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
Node.js is the only web platform that supports the following helper functions that it calls utility consumers:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';
These functions convert web ReadableStreams, Node.js Readables and AsyncIterators to Promises that are fulfilled with:
arrayBuffer()
)blob()
)buffer()
)json()
)text()
)Binary data is assumed to be UTF-8-encoded:
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// TextEncoder converts strings to UTF-8 encoded Uint8Arrays
const encoder = new TextEncoder();
const view = encoder.encode('"😀"');
assert.deepEqual(
view,
Uint8Array.of(34, 240, 159, 152, 128, 34)
);
controller.enqueue(view);
controller.close();
},
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');
String streams work as expected:
import * as assert from 'assert';
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
start(controller) {
controller.enqueue('"😀"');
controller.close();
},
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');
All of the material mentioned in this section was a source for this blog post.
This post doesn’t cover every aspect of the web streams API. You can find more information here:
More material: