Black lives matter
Portrait Dr. Axel Rauschmayer
Dr. Axel Rauschmayer
Homepage | Twitter
Cover of book “JavaScript for impatient programmers”
Book, exercises, quizzes
(free to read online)
Cover of book “Deep JavaScript”
Book (50% free online)
Cover of book “Tackling TypeScript”
Book (first part free online)
Logo of newsletter “ES.next news”
Newsletter (free)

Using web streams on Node.js

[2022-06-17] dev, javascript, nodejs
(Ad, please don’t block)

Web streams are a standard for streams that is now supported on all major web platforms: web browsers, Node.js, and Deno. (Streams are an abstraction for reading and writing data sequentially in small pieces from all kinds of sources – files, data hosted on servers, etc.)

For example, the global function fetch() (which downloads online resources) asynchronously returns a Response which has a property .body with a web stream.

This blog post covers web streams on Node.js, but most of what we learn applies to all web platforms that support them.


Table of contents:


What are web streams?  

Let’s start with an overview of a few fundamentals of web streams. Afterwards, we’ll quickly move on to examples.

Streams are a data structure for accessing data such as:

  • Files
  • Data hosted on web servers
  • Etc.

Two of their benefits are:

  • We can work with large amounts of data because streams allow us to split them up into smaller pieces (so-called chunks) which we can process one at a time.

  • We can work with the same data structure, streams, while processing different data. That makes it easier to reuse code.

Web streams (“web” is often omitted) are a relatively new standard that originated in web browsers but is now also supported by Node.js and Deno (as shown in this MDN compatibility table).

In web streams, chunks are usually either:

Kinds of streams  

There are three main kinds of web streams:

  • A ReadableStream is used to read data from a source. Code that does that is called a consumer.

  • A WritableStream is used to write data to a sink. Code that does that is called a producer.

  • A TransformStream consists of two streams:

    • It receives input from its writable side, a WritableStream.
    • It sends output to its readable side, a ReadableStream.

    The idea is to transform data by “piping it through” a TransformStream. That is, we write data to the writable side and read transformed data from the readable side. The following TransformStreams are built into most JavaScript platforms (more on them later):

    • Because JavaScript strings are UTF-16 encoded, UTF-8 encoded data is treated as binary in JavaScript. A TextDecoderStream converts such data to strings.
    • A TextEncoderStream converts JavaScript strings to UTF-8 data.
    • A ``CompressionStream` compresses binary data to GZIP and other compression formats.
    • A DecompressionStream decompresses binary data from GZIP and other compression formats.

ReadableStreams, WritableStreams and TransformStreams can be used to transport text or binary data. We’ll mostly do the former in this post. Byte streams for binary data are briefly mentioned at the end.

Pipe chains  

Piping is an operation that lets us pipe a ReadableStream to a WritableStream: As long as the ReadableStream produces data, this operation reads that data and writes it to the WritableStream. If we connect just two streams, we get a convenient way of transferring data from one location to another (e.g. to copy a file). However, we can also connect more than two streams and get pipe chains that can process data in a variety of ways. This is an example of a pipe chain:

  • It starts with a ReadableStream.
  • Next are one or more TransformStreams.
  • The chain ends with a WritableStream.

A ReadableStream is connected to a TransformStream by piping the former to the writable side of the latter. Similarly, a TransformStream is connected to another TransformStream by piping the readable side of the former to the writable side of the latter. And a TransformStream is connected to a WritableStream by piping the readable side of the former to the latter.

Backpressure  

One problem in pipe chains is that a member may receive more data than it can handle at the moment. Backpressure is a technique for solving this problem: It enables a receiver of data to tell its sender that it should temporarily stop sending data so that the receiver doesn’t get overwhelmed.

Another way to look at backpressure is as a signal that travels backwards through a pipe chain, from a member that is getting overwhelmed to the beginning of the chain. As an example, consider the following pipe chain:

ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream

This is how backpressure travels through this chain:

  • Initially, the WriteableStream signals that it can’t process more data at the moment.
  • The pipe stops reading from the TransformStream.
  • Input accumulates inside the TransformStream (which is buffered).
  • The TransformStream signals that it’s full.
  • The pipe stops reading from the ReadableStream.

We have reached the beginning of the pipe chain. Therefore, no data accumulates inside the ReadableStream (which is also buffered) and the WriteableStream has time to recover. Once it does, it signals that it is ready to receive data again. That signal also travels back through the chain until it reaches the ReadableStream and data processing resumes.

In this first look at backpressure, several details were omitted to make things easier to understand. These will be covered later.

Support for web streams in Node.js  

In Node.js, web streams are available from two sources:

At the moment, only one API has direct support for web streams in Node.js – the Fetch API:

const response = await fetch('https://example.com');
const readableStream = response.body;

For other things, we need to use one of the following static methods in module 'node:stream' to either convert a Node.js stream to a web stream or vice versa:

  • Node.js Readables can be converted to and from WritableStreams:
    • Readable.toWeb(nodeReadable)
    • Readable.fromWeb(webReadableStream, options?)
  • Node.js Writables can be converted to and from ReadableStreams:
    • Writable.toWeb(nodeWritable)
    • Writable.fromWeb(webWritableStream, options?)
  • Node.js Duplexes can be converted to and from TransformStreams:
    • Duplex.toWeb(nodeDuplex)
    • Duplex.fromWeb(webTransformStream, options?)

One other API partially supports web streams: FileHandles have the method .readableWebStream().

Reading from ReadableStreams  

ReadableStreams let us read chunks of data from various sources. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface ReadableStream<TChunk> {
  getReader(): ReadableStreamDefaultReader<TChunk>;
  readonly locked: boolean;
  [Symbol.asyncIterator](): AsyncIterator<TChunk>;

  cancel(reason?: any): Promise<void>;

  pipeTo(
    destination: WritableStream<TChunk>,
    options?: StreamPipeOptions
  ): Promise<void>;
  pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
  ): ReadableStream<TChunk2>;
  
  // Not used in this blog post:
  tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}

interface StreamPipeOptions {
  signal?: AbortSignal;
  preventClose?: boolean;
  preventAbort?: boolean;
  preventCancel?: boolean;
}

Explanations of these properties:

  • .getReader() returns a Reader – an object through which we can read from a ReadableStream. ReadableStreams returning Readers is similar to iterables returning iterators.
  • .locked: There can only be one active Reader per ReadableStream at a time. While one Reader is in use, the ReadableStream is locked and .getReader() cannot be invoked.
  • [Symbol.asyncIterator](https://exploringjs.com/impatient-js/ch_async-iteration.html): This method makes ReadableStreams asynchronously iterable. It is currently only implemented on some platforms.
  • .cancel(reason) cancels the stream because the consumer isn’t interested in it anymore. reason is passed on to the .cancel() method of the ReadableStream’s underlying source (more on that later). The returned Promise fulfills when this operation is done.
  • .pipeTo() feeds the contents of its ReadableStream to a WritableStream. The returned Promise fulfills when this operation is done. .pipeTo() ensures that backpressure, closing, errors, etc. are all correctly propagated through a pipe chain. We can specify options via its second parameter:
    • .signal lets us pass an AbortSignal to this method, which enables us to abort piping via an AbortController.
    • .preventClose: If true, it prevents the WritableStream from being closed when the ReadableStream is closed. That is useful when we want to pipe more than one ReadableStream to the same WritableStream.
    • The remaining options are beyond the scope of this blog post. They are documented in the web streams specification.
  • .pipeThrough() connects its ReadableStream to a ReadableWritablePair (roughly: a TransformStream, more on that later). It returns the resulting ReadableStream (i.e., the readable side of the ReadableWritablePair).

The following subsections cover two ways of consuming ReadableStreams:

  • Reading via Readers
  • Reading via asynchronous iteration

Consuming ReadableStreams via Readers  

We can use Readers to read data from ReadableStreams. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface ReadableStreamGenericReader {
  readonly closed: Promise<undefined>;
  cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
  extends ReadableStreamGenericReader
{
  releaseLock(): void;
  read(): Promise<ReadableStreamReadResult<TChunk>>;
}

interface ReadableStreamReadResult<TChunk> {
  done: boolean;
  value: TChunk | undefined;
}

Explanations of these properties:

  • .closed: This Promise is fulfilled after the stream is closed. It is rejected if the stream errors or if a Reader’s lock is released before the stream is closed.
  • .cancel(): In an active Reader, this method cancels the associated ReadableStream.
  • .releaseLock() deactivates the Reader and unlocks its stream.
  • .read() returns a Promise for a ReadableStreamReadResult (a wrapped chunk) which has two properties:
    • .done is a boolean that is false as long as chunks can be read and true after the last chunk.
    • .value is the chunk (or undefined after the last chunk).

ReadableStreamReadResult may look familiar if you know how iteration works: ReadableStreams are similar to iterables, Readers are similar to iterators, and ReadableStreamReadResults are similar to the objects returned by the iterator method .next().

The following code demonstrates the protocol for using Readers:

const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
  while (true) {
    const {done, value: chunk} = await reader.read(); // (C)
    if (done) break;
    // Use `chunk`
  }
} finally {
  reader.releaseLock(); // (D)
}

Getting a Reader. We can’t read directly from readableStream, we first need to acquire a Reader (line A). Each ReadableStream can have at most one Reader. After a Reader was acquired, readableStream is locked (line B). Before we can call .getReader() again, we must call .releaseLock() (line D).

Reading chunks. .read() returns a Promise for an object with the properties .done and .value (line C). After the last chunk was read, .done is true. This approach is similar to how asynchronous iteration works in JavaScript.

Example: reading a file via a ReadableStream  

In the following example, we read chunks (strings) from a text file data.txt:

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)

const reader = webReadableStream.getReader();
try {
  while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    console.log(value);
  }
} finally {
  reader.releaseLock();
}
// Output:
// 'Content of text file\n'

We are converting a Node.js Readable to a web ReadableStream (line A). Then we use the previously explained protocol to read the chunks.

Example: assembling a string with the contents of a ReadableStream  

In the next example, we concatenate all chunks of a ReadableStream into a string and return it:

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString(readableStream) {
  const reader = readableStream.getReader();
  try {
    let result = '';
    while (true) {
      const {done, value} = await reader.read();
      if (done) {
        return result; // (A)
      }
      result += value;
    }
  } finally {
    reader.releaseLock(); // (B)
  }
}

Conveniently, the finally clause is always executed – now matter how we leave the try clause. That is, the lock is correctly released (line B) if we return a result (line A).

Consuming ReadableStreams via asynchronous iteration  

ReadableStreams can also be consumed via asynchronous iteration:

const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
  while (true) {
    let chunk;
    ({done: exhaustive, value: chunk} = await iterator.next());
    if (exhaustive) break;
    console.log(chunk);
  }
} finally {
  // If the loop was terminated before we could iterate exhaustively
  // (via an exception or `return`), we must call `iterator.return()`.
  // Check if that was the case.
  if (!exhaustive) {
    iterator.return();
  }
}

Thankfully, the for-await-of loop handles all the details of asynchronous iteration for us:

for await (const chunk of readableStream) {
  console.log(chunk);
}

Example: using asynchronous iteration to read a stream  

Let’s redo our previous attempt to read text from a file. This time, we use asynchronous iteration instead of a Reader:

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
  console.log(chunk);
}
// Output:
// 'Content of text file'

Caveat: Browsers don’t support asynchronous iteration over ReadableStreams  

At the moment, Node.js and Deno support asynchronous iteration over ReadableStreams but web browsers don’t: There is a GitHub issue that links to bug reports.

Given that it’s not yet completely clear how async iteration will be supported on browsers, wrapping is a safer choice than polyfilling. The following code is based on a suggestion in the Chromium bug report:

async function* getAsyncIterableFor(readableStream) {
  const reader = readableStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) return;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

Creating pipe chains  

ReadableStreams have two methods for creating pipe chains:

  • readableStream.pipeTo(writeableStream) synchronously returns a Promise p. It asynchronously reads all chunks of readableStream and writes them to writableStream. When it is done, it fulfills p.

    We’ll see examples of .pipeTo() when we explore WritableStreams, as it provides a convenient way to transfer data into them.

  • readableStream.pipeThrough(transformStream) pipes readableStream into transformStream.writable and returns transformStream.readable (every TransformStream has these properties that refer to its writable side and its readable side). Another way to view this operation is that we create a new ReadableStream by connecting a transformStream to a readableStream.

    We’ll see examples of .pipeThrough() when we explore TransformStreams, as this method is the main way in which they are used.

Turning data sources into ReadableStreams via wrapping  

If we want to read an external source via a ReadableStream, we can wrap it in an adapter object and pass that object to the ReadableStream constructor. The adapter object is called the underlying source of the ReadableStream (queuing strategies are explained later, when we take a closer look at backpressure):

new ReadableStream(underlyingSource?, queuingStrategy?)

This is the type of underlying sources (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface UnderlyingSource<TChunk> {
  start?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  pull?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  cancel?(reason?: any): void | Promise<void>;

  // Only used in byte streams and ignored in this section:
  type: 'bytes' | undefined;
  autoAllocateChunkSize: bigint;
}

This is when the ReadableStream calls these methods:

  • .start(controller) is called immediately after we invoke the constructor of ReadableStream.

  • .pull(controller) is called whenever there is room in the internal queue of the ReadableStream. It is called repeatedly until the queue is full again. This method will only be called after .start() is finished. If .pull() doesn’t enqueue anything, it won’t be called again.

  • .cancel(reason) is called if the consumer of a ReadableStream cancels it via readableStream.cancel() or reader.cancel(). reason is the value that was passed to these methods.

Each of these methods can return a Promise and no further steps will be taken until the Promise is settled. That is useful if we want to do something asynchronous.

The parameter controller of .start() and .pull() lets them access the stream. It has the following type:

type ReadableStreamController<TChunk> =
  | ReadableStreamDefaultController<TChunk>
  | ReadableByteStreamController<TChunk> // ignored here
;

interface ReadableStreamDefaultController<TChunk> {
  enqueue(chunk?: TChunk): void;
  readonly desiredSize: number | null;
  close(): void;
  error(err?: any): void;
}

For now, chunks are strings. We’ll later get to byte streams, where Uint8Arrays are common. This is what the methods do:

  • .enqueue(chunk) adds chunk to the ReadableStream’s internal queue.
  • .desiredSize indicates how much room there is in the queue into which .enqueue() writes. It is zero if the queue is full and negative if it has exceeded its maximum size. Therefore, if the desired size is zero or negative, we have to stop enqueuing.
    • If a stream is closed, its desired size is zero.
    • If a stream is in error mode, its desired size is null.
  • .close() closes the ReadableStream. Consumers will still be able to empty the queue, but after that, the stream ends. It’s important that an underlying source calls this method – otherwise, reading its stream will never finish.
  • .error(err) puts the stream in an error mode: All future interactions with it will fail with the error value err.

A first example of implementing an underlying source  

In our first example of implementing an underlying source, we only provide method .start(). We’ll see use cases for .pull() in the next subsection.

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First line\n'); // (A)
    controller.enqueue('Second line\n'); // (B)
    controller.close(); // (C)
  },
});
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'First line\n'
// 'Second line\n'

We use the controller to create a stream with two chunks (line A and line B). It’s important that we close the stream (line C). Otherwise, the for-await-of loop would never finish!

Note that this way of enqueuing isn’t completely safe: There is a risk of exceding the capacity of the internal queue. We’ll see soon how we can avoid that risk.

Using a ReadableStream to wrap a push source or a pull source  

A common scenario is turning a push source or a pull source into a ReadableStream. The source being push or pull determines how we will hook into the ReadableStream with our UnderlyingSource:

  • Push source: Such a source notifies us when there is new data. We use .start() to set up listeners and supporting data structures. If we receive too much data and the desired size isn’t positive anymore, we must tell our source to pause. If .pull() is called later, we can unpause it. Pausing an external source in reaction to the desired size becoming non-positive is called applying backpressure.

  • Pull source: We ask such a source for new data – often asynchronously. Therefore, we usually don’t do much in .start() and retrieve data whenever .pull() is called.

We’ll see examples for both kinds of sources next.

Example: creating a ReadableStream from a push source with backpressure support  

In the following example, we wrap a ReadableStream around a socket – which pushes its data to us (it calls us). This example is taken from the web stream specification:

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // The internal queue is full, so propagate
          // the backpressure signal to the underlying source.
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(
        new Error('The socket errored!'));
    },

    pull() {
      // This is called if the internal queue has been emptied, but the
      // stream’s consumer still wants more data. In that case, restart
      // the flow of data if we have previously paused it.
      socket.readStart();
    },

    cancel() {
      socket.close();
    },
  });
}

Example: creating a ReadableStream from a pull source  

The tool function iterableToReadableStream() takes an iterable over chunks and turns it into a ReadableStream:

/**
 * @param iterable an iterable (asynchronous or synchronous)
 */
 function iterableToReadableStream(iterable) {
  return new ReadableStream({
    start() {
      if (typeof iterable[Symbol.asyncIterator] === 'function') {
        this.iterator = iterable[Symbol.asyncIterator]();
      } else if (typeof iterable[Symbol.iterator] === 'function') {
        this.iterator = iterable[Symbol.iterator]();
      } else {
        throw new Error('Not an iterable: ' + iterable);
      }
    },

    async pull(controller) {
      if (this.iterator === null) return;
      // Sync iterators return non-Promise values,
      // but `await` doesn’t mind and simply passes them on
      const {value, done} = await this.iterator.next();
      if (done) {
        this.iterator = null;
        controller.close();
        return;
      }
      controller.enqueue(value);
    },

    cancel() {
      this.iterator = null;
      controller.close();
    },
  });
}

Let’s use an async generator function to create an asynchronous iterable and turn that iterable into a ReadableStream:

async function* genAsyncIterable() {
  yield 'how';
  yield 'are';
  yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'how'
// 'are'
// 'you'

iterableToReadableStream() also works with synchronous iterables:

const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'hello'
// 'everyone'

There may eventually by a static helper method ReadableStream.from() that provides this functionality (see its pull request for more information).

Writing to WritableStreams  

WritableStreams let us write chunks of data to various sinks. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface WritableStream<TChunk> {
  getWriter(): WritableStreamDefaultWriter<TChunk>;
  readonly locked: boolean;

  close(): Promise<void>;
  abort(reason?: any): Promise<void>;
}

Explanations of these properties:

  • .getWriter() returns a Writer – an object through which we can write to a WritableStream.
  • .locked: There can only be one active Writer per WritableStream at a time. While one Writer is in use, the WritableStream is locked and .getWriter() cannot be invoked.
  • .close() closes the stream:
    • The underlying sink (more on that later) will still receive all queued chunks before it’s closed.
    • From now on, all attempts to write will fail silently (without errors).
    • The method returns a Promise that will be fulfilled if the sink succeeds in writing all queued chunks and closing. It will be rejected if any errors occur during these steps.
  • .abort() aborts the stream:
    • It puts the stream in error mode.
    • The returned Promise fulfills if the sink shuts down successfully and rejects if errors occur.

The following subsections cover two approaches to sending data to WritableStreams:

  • Writing to WritableStreams via Writers
  • Piping to WritableStreams

Writing to WritableStreams via Writers  

We can use Writers to write to WritableStreams. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface WritableStreamDefaultWriter<TChunk> {
  readonly desiredSize: number | null;
  readonly ready: Promise<undefined>;
  write(chunk?: TChunk): Promise<void>;
  releaseLock(): void;

  close(): Promise<void>;
  readonly closed: Promise<undefined>;
  abort(reason?: any): Promise<void>;
}

Explanations of these properties:

  • .desiredSize indicates how much room there is in this WriteStream’s queue. It is zero if the queue is full and negative if it has exceeded its maximum size. Therefore, if the desired size is zero or negative, we have to stop writing.

    • If a stream is closed, its desired size is zero.
    • If a stream is in error mode, its desired size is null.
  • .ready returns a Promise that is fulfilled when the desired size changes from non-positive to positive. That means that no backpressure is active and it’s OK to write data. If the desired size later changes back to non-positive, a new pending Promise is created and returned.

  • .write() writes a chunk to the stream. It returns a Promise that is fulfilled after writing succeeds and rejected if there is an error.

  • .releaseLock() releases the Writer’s lock on its stream.

  • .close() has the same effect as closing the Writer’s stream.

  • .closed returns a Promise that is fulfilled when the stream is closed.

  • .abort() has the same effect as aborting the Writer’s stream.

The following code shows the protocol for using Writers:

const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
  // Writing the chunks (explained later)
} finally {
  writer.releaseLock(); // (C)
}

We can’t write directly to a writableStream, we first need to acquire a Writer (line A). Each WritableStream can have at most one Writer. After a Writer was acquired, writableStream is locked (line B). Before we can call .getWriter() again, we must call .releaseLock() (line C).

There are three approaches to writing chunks.

Writing approach 1: awaiting .write() (handling backpressure inefficiently)  

The first writing approach is to await each result of .write():

await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();

The Promise returned by .write() fulfills when the chunk that we passed to it, was successfully written. What exactly “successfully written” means, depends on how a WritableStream is implemented – e.g., with a file stream, the chunk may have been sent to the operating system but still reside in a cache and therefore not have actually been written to disk.

The Promise returned by .close() is fulfilled when the stream becomes closed.

A downside of this writing approach is that waiting until writing succeeds means that the queue isn’t used. As a consequence, data throughput may be lower.

Writing approach 2: ignoring .write() rejections (ignoring backpressure)  

In the second writing approach, we ignore the Promises returned by .write() and only await the Promise returned by .close():

writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errors

The synchronous invocations of .write() add chunks to the internal queue of the WritableStream. By not awaiting the returned Promises, we don’t wait until each chunk is written. However, awaiting .close() ensures that the queue is empty and all writing succeeded before we continue.

Invoking .catch() in line A and line B is necessary to avoid warnings about unhandled Promise rejections when something goes wrong during writing. Such warnings are often logged to the console. We can afford to ignore the errors reported by .write() because .close() will also report them to us.

The previous code can be improved by using a helper function that ignores Promise rejections:

ignoreRejections(
  writer.write('Chunk 1'),
  writer.write('Chunk 2'),
);
await writer.close(); // reports errors

function ignoreRejections(...promises) {
  for (const promise of promises) {
    promise.catch(() => {});
  }
}

One downside of this approach is that backpressure is ignored: We simply assume that the queue is big enough to hold everything we write.

Writing approach 3: awaiting .ready (handling backpressure efficiently)  

In this writing approach, we handle backpressure efficiently by awaiting the Writer getter .ready:

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});

await writer.close(); // reports errors

The Promise in .ready fulfills whenever the stream transitions from having backpressure to not having backpressure.

Example: writing to a file via a Writer  

In this example, we create a text file data.txt via a WritableStream:

import * as fs from 'node:fs';
import {Writable} from 'node:stream';

const nodeWritable = fs.createWriteStream(
  'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)

const writer = webWritableStream.getWriter();
try {
  await writer.write('First line\n');
  await writer.write('Second line\n');
  await writer.close();
} finally {
  writer.releaseLock()
}

In line A, we create a Node.js stream for the file data.txt. In line B, we convert this stream to a web stream. Then we use a Writer to write strings to it.

Piping to WritableStreams  

Instead of using Writers, we can also write to WritableStreams by piping ReadableStreams to them:

await readableStream.pipeTo(writableStream);

The Promise returned by .pipeTo() fulfills when piping finishes successfully.

Piping happens asynchronously  

Piping is performed after the current task completes or pauses. The following code demonstrates that:

const readableStream = new ReadableStream({ // (A)
  start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});
const writableStream = new WritableStream({ // (B)
  write(chunk) {
    console.log('WRITE: ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE WritableStream');
  },
});


console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');

// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'

In line A we create a ReadableStream. In line B we create a WritableStream.

We can see that .pipeTo() (line C) returns immediately. In a new task, chunks are read and written. Then writableStream is closed and, finally, promise is fulfilled.

Example: piping to a WritableStream for a file  

In the following example, we create a WritableStream for a file and pipe a ReadableStream to it:

const webReadableStream = new ReadableStream({ // (A)
  async start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});

const nodeWritable = fs.createWriteStream( // (B)
  'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)

await webReadableStream.pipeTo(webWritableStream); // (D)

In line A, we create a ReadableStream. In line B, we create a Node.js stream for the file data.txt. In line C, we convert this stream to a web stream. In line D, we pipe our webReadableStream to the WritableStream for the file.

Example: writing two ReadableStreams to a WritableStream  

In the following example, we write two ReadableStreams to a single WritableStream.

function createReadableStream(prefix) {
  return new ReadableStream({
    async start(controller) {
      controller.enqueue(prefix + 'chunk 1');
      controller.enqueue(prefix + 'chunk 2');
      controller.close();
    },
  });
}

const writableStream = new WritableStream({
  write(chunk) {
    console.log('WRITE ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE');
  },
  abort(err) {
    console.log('ABORT ' + err);
  },
});

await createReadableStream('Stream 1: ')
  .pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
  .pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();

// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'

We tell .pipeTo() to not close the WritableStream after the ReadableStream is closed (line A and line B). Therefore, the WritableStream remains open after line A and we can pipe another ReadableStream to it.

Turning data sinks into WritableStreams via wrapping  

If we want to write to an external sink via a WritableStream, we can wrap it in an adapter object and pass that object to the WritableStream constructor. The adapter object is called the underlying sink of the WritableStream (queuing strategies are explained later, when we take a closer look at backpressure):

new WritableStream(underlyingSink?, queuingStrategy?)

This is the type of underlying sinks (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface UnderlyingSink<TChunk> {
  start?(
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  write?(
    chunk: TChunk,
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  close?(): void | Promise<void>;;
  abort?(reason?: any): void | Promise<void>;
}

Explanations of these properties:

  • .start(controller) is called immediately after we invoke the constructor of WritableStream. If we do something asynchronous, we can return a Promise. In this method, we can prepare for writing.

  • .write(chunk, controller) is called when a new chunk is ready to be written to the external sink. We can exert backpressure by returning a Promise that fulfills once the backpressure is gone.

  • .close() is called after writer.close() was called and all queued writes succeeded. In this method, we can clean up after writing.

  • .abort(reason) is called if writeStream.abort() or writer.abort() were invoked. reason is the value passed to these methods.

The parameter controller of .start() and .write() lets them error the WritableStream. It has the following type:

interface WritableStreamDefaultController {
  readonly signal: AbortSignal;
  error(err?: any): void;
}
  • .signal is an AbortSignal that we can listen to if we want to abort a write or close operation when the stream is aborted.
  • .error(err) errors the WritableStream: It is closed and all future interactions with it fail with the error value err.

Example: tracing a ReadableStream  

In the next example, we pipe a ReadableStream to a WritableStream in order to check how the ReadableStream produces chunks:

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First chunk');
    controller.enqueue('Second chunk');
    controller.close();
  },
});
await readableStream.pipeTo(
  new WritableStream({
    write(chunk) {
      console.log('WRITE ' + JSON.stringify(chunk));
    },
    close() {
      console.log('CLOSE');
    },
    abort(err) {
      console.log('ABORT ' + err);
    },
  })
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'

Example: collecting written chunks in a string  

In the next example, we create a subclass of WriteStream that collects all written chunks in a string. We can access that string via method .getString():

class WritableStringStream extends WritableStream {
  #string = '';
  constructor() {
    super({
      // We need to access the `this` of `WritableStringStream`.
      // Hence the arrow function (and not a method).
      write: (chunk) => {
        this.#string += chunk;
      },
    });
  }
  getString() {
    return this.#string;
  }
}
const stringStream = new WritableStringStream();
const writer = stringStream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

A downside of this approach is that we are mixing two APIs: The API of WritableStream and our new string stream API. An alternative is to delegate to the WritableStream instead of extending it:

function createWritableStringStream() {
  let string = '';
  return {
    stream: new WritableStream({
      write(chunk) {
        string += chunk;
      },
    }),
    getString() {
      return string;
    },
  };
}

const stringStream = createWritableStringStream();
const writer = stringStream.stream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

This functionality could also be implemented via a class (instead of as a factory function for objects).

Using TransformStreams  

A TransformStream:

  • Receives input via its writable side, a WritableStream.
  • It then may or may not transform this input.
  • The result can be read via a ReadableStream, its readable side.

The most common way to use TransformStreams is to “pipe through” them:

const transformedStream = readableStream.pipeThrough(transformStream);

.pipeThrough() pipes readableStream to the writable side of transformStream and returns its readable side. In other words: We have created a new ReadableStream that is a transformed version of readableStream.

.pipeThrough() accepts not only TransformStreams, but any object that has the following shape:

interface ReadableWritablePair<RChunk, WChunk> {
  readable: ReadableStream<RChunk>;
  writable: WritableStream<WChunk>;
}

Standard TransformStreams  

Node.js supports the following standard TransformStreams:

Example: decoding a stream of UTF-8-encoded bytes  

In the following example, we decode a stream of UTF-8-encoded bytes:

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

response.body is a ReadableByteStream whose chunks are instances of Uint8Array (TypedArrays). We pipe that stream through a TextDecoderStream to get a stream that has string chunks.

Note that translating each byte chunk separately (e.g. via a TextDecoder) doesn’t work because a single Unicode code point is encoded as up to four bytes in UTF-8 and those bytes might not all be in the same chunk.

Example: creating a readable text stream for standard input  

The following Node.js module logs everything that is sent to it via standard input:

// echo-stdin.mjs
import {Readable} from 'node:stream';

const webStream = Readable.toWeb(process.stdin)
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
  console.log('>>>', chunk);
}

We can access standard input via a stream stored in process.stdin (process is a global Node.js variable). If we don’t set an encoding for this stream and convert it via Readable.toWeb(), we get a byte stream. We pipe it through a TextDecoderStream in order to get a text stream.

Note that we process standard input incrementally: As soon as another chunk is available, we log it. In other words, we don’t wait until standard input is finished. That is useful when the data is either large or only sent intermittently.

Implementing custom TransformStreams  

We can implement a custom TransformStream by passing a Transformer object to the constructor of TransformStream. Such has object has the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface Transformer<TInChunk, TOutChunk> {
  start?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  transform?(
    chunk: TInChunk,
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  flush?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
}

Explanations of these properties:

  • .start(controller) is called immediately after we invoke the constructor of TransformStream. Here we can prepare things before the transformations start.
  • .transform(chunk, controller) performs the actual transformations. It receives an input chunk and can use its parameter controller to enqueue one or more transformed output chunks. It can also choose not to enqueue anything at all.
  • .flush(controller) is called after all input chunks were transformed successfully. Here we can perform clean-ups after the transformations are done.

Each of these methods can return a Promise and no further steps will be taken until the Promise is settled. That is useful if we want to do something asynchronous.

The parameter controller has the following type:

interface TransformStreamDefaultController<TOutChunk> {
  enqueue(chunk?: TOutChunk): void;
  readonly desiredSize: number | null;
  terminate(): void;
  error(err?: any): void;
}
  • .enqueue(chunk) adds chunk to the readable side (output) of the TransformStream.
  • .desiredSize returns the desired size of the internal queue of the readable side (output) of the TransformStream.
  • .terminate() closes the readable side (output) and errors the writable side (input) of the TransformStream. It can be used if a transformer is not interested in the remaining chunks of the writable side (input) and wants to skip them.
  • .error(err) errors the TransformStream: All future interactions with it will fail with the error value err.

What about backpressure in a TransformStream? The class propagates the backpressure from its readable side (output) to its writable side (input). The assumption is that transforming doesn’t change the amount of data much. Therefore, Transforms can get away with ignoring backpressure. However, it could be detected via transformStreamDefaultController.desiredSize and propagated by returning a Promise from transformer.transform().

Example: transforming a stream of arbitrary chunks to a stream of lines  

The following subclass of TransformStream converts a stream with arbitrary chunks into a stream where each chunk comprises exactly one line of text. That is, with the possible exception of the last chunk, each chunk ends with an end-of-line (EOL) string: '\n' on Unix (incl. macOS) and '\r\n' on Windows.

class ChunksToLinesTransformer {
  #previous = '';

  transform(chunk, controller) {
    let startSearch = this.#previous.length;
    this.#previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = this.#previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // line includes the EOL
      const line = this.#previous.slice(0, eolIndex+1);
      controller.enqueue(line);
      this.#previous = this.#previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }

  flush(controller) {
    // Clean up and enqueue any text we’re still holding on to
    if (this.#previous.length > 0) {
      controller.enqueue(this.#previous);
    }
  }
}
class ChunksToLinesStream extends TransformStream {
  constructor() {
    super(new ChunksToLinesTransformer());
  }
}

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('multiple\nlines of\ntext');
    controller.close();
  },
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);

for await (const line of transformed) {
  console.log('>>>', JSON.stringify(line));
}

// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'

Note that Deno’s built-in TextLineStream provides similar functionality.

Tip: async generators are also great for transforming streams  

Due to ReadableStreams being asynchronously iterable, we can use asynchronous generators to transform them. That leads to very elegant code:

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('one');
    controller.enqueue('two');
    controller.enqueue('three');
    controller.close();
  },
});

async function* prefixChunks(prefix, asyncIterable) {
  for await (const chunk of asyncIterable) {
    yield '> ' + chunk;
  }
}

const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
  console.log(transformedChunk);
}

// Output:
// '> one'
// '> two'
// '> three'

A closer look at backpressure  

Let’s take a closer look at backpressure. Consider the following pipe chain:

rs.pipeThrough(ts).pipeTo(ws);

rs is a ReadableStream, ts is a TransformStream, ws is a WritableStream. These are the connections that are created by the previous expression (.pipeThrough uses .pipeTo to connect rs to the writable side of ts):

rs -pipeTo-> ts{writable,readable} -pipeTo-> ws

Observations:

  • The underlying source of rs can be viewed as a pipe chain member that comes before rs.
  • The underlying sink of ws can be viewed as a pipe chain member that comes after ws.
  • Each stream has an internal buffer: ReadableStreams buffers after their underlying sources. WritableStreams have buffers before their underlying sinks.

Let’s assume that the underlying sink of ws is slow and the buffer of ws is eventually full. Then the following steps happen:

  • ws signals it’s full.
  • pipeTo stops reading from ts.readable.
  • ts.readable signals it’s full.
  • ts stops moving chunks from ts.writable to ts.readable.
  • ts.writable signals it’s full.
  • pipeTo stops reading from rs.
  • rs signals it’s full to its underlying source.
  • The underlying source pauses.

This example illustrates that we need two kinds of functionality:

  • Entities receiving data need to be able to signal backpressure.
  • Entities sending data need to react to signals by exerting backpressure.

Let’s explore how these functionalities are implemented in the web streams API.

Signalling backpressure  

Backpressure is signalled by entities that are receiving data. Web streams have two such entities:

  • A WritableStream receives data via the Writer method .write().
  • A ReadableStream receives data when its underlying source calls the ReadableStreamDefaultController method .enqueue().

In both cases, the input is buffered via queues. The signal to apply backpressure is when a queue is full. Let’s see how that can be detected.

These are the locations of the queues:

The desired size of a queue is a number that indicates how much room is left in the queue:

  • It is positive if there is still room in the queue.
  • It is zero if the queue has reached its maximum size.
  • It is negative if the queue has exceeded its maximum size.

Therefore, we have to apply backpressure if the desired size is zero or less. It is available via the getter .desiredSize of the object which contains the queue.

How is the desired size computed? Via an object that specifies a so-called queuing strategy. ReadableStream and WritableStream have default queuing strategies which can be overridden via optional parameters of their constructors. The interface QueuingStrategy has two properties:

  • Method .size(chunk) returns a size for chunk.
    • The current size of a queue is the sum of the sizes of the chunks it contains.
  • Property .highWaterMark specifies the maximum size of a queue.

The desired size of a queue is the high water mark minus the current size of the queue.

Reacting to backpressure  

Entities sending data need to react to signalled backpressure by exerting backpressure.

Code writing to a WritableStream via a Writer  

  • We can await the Promise in writer.ready. While we do, we are blocked and the desired backpressure is achieved. The Promise is fulfilled once there is room in the queue. Fulfillment is triggered when writer.desiredSize has a value greater than zero.

  • Alternatively, we can await the Promise returned by writer.write(). If we do that, the queue won’t even be filled.

If we want to, we can additionally base the size of our chunks on writer.desiredSize.

The underlying source of a ReadableStream  

The underlying source object that can be passed to a ReadableStream wraps an external source. In a way, it is also a member of the pipe chain; one that comes before its ReadableStream.

  • Underlying pull sources are only asked for new data whenever there is room in the queue. While there isn’t, backpressure is exerted automatically because no data is pulled.

  • Underlying push sources should check controller.desiredSize after enqueuing something: If it’s zero or less, they should exert backpressure by pausing their external sources.

The underlying sink of a WritableStream  

The underlying sink object that can be passed to a WritableStream wraps an external sink. In a way, it is also a member of the pipe chain; one that comes after its WritableStream.

Each external sink signals backpressure differently (in some cases not at all). The underlying sink can exert backpressure by returning a Promise from method .write() that is fulfilled once writing is finished. There is an example in the web streams standard that demonstrates how that works.

A transformStream (.writable .readable)  

The TransformStream connects its writable side with its readable side by implementing an underlying sink for the former and an underlying source for the latter. It has an internal slot .[[backpressure]] that indicates if internal backpressure is currently active or not.

  • Method .write() of the underlying sink of the writable side waits asynchronously until there is no internal backpressure before it feeds another chunk to the TransformStream’s transformer (web streams standard: TransformStreamDefaultSinkWriteAlgorithm). The transformer may then enqueue something via its TransformStreamDefaultController. Note that .write() returns a Promise that fulfills when the method is finished. Until that happens, the WriteStream buffers incoming write requests via its queue. Therefore, backpressure for the writable side is signalled via that queue and its desired size.

  • The TransformStream’s backpressure is activated if a chunk is enqueued via the TransformStreamDefaultController and the queue of the readable side becomes full (web streams standard: TransformStreamDefaultControllerEnqueue).

  • The TransformStream’s backpressure may be deactivated if something is read from the Reader (web streams standard: ReadableStreamDefaultReaderRead):

    • If there is room in the queue now, it may be time to call .pull() of the underlying source (web streams standard: .[[PullSteps]]).
    • .pull() of the underlying source of the readable side deactivates the backpressure (web streams standard: TransformStreamDefaultSourcePullAlgorithm).

.pipeTo() (ReadableStream WritableStream)  

.pipeTo() reads chunks from the ReadableStream via a reader and write them to the WritableStream via a Writer. It pauses whenever writer.desiredSize is zero or less (web streams standard: Step 15 of ReadableStreamPipeTo).

Byte streams  

So far, we have only worked with text streams, streams whose chunks were strings. But the web streams API also supports byte streams for binary data, where chunks are Uint8Arrays (TypedArrays):

  • ReadableStream has a special 'bytes' mode.
  • WritableStream itself doesn’t care if chunks are strings or Uint8Arrays. Therefore, whether an instance is a text stream or a byte stream depends on what kind of chunks the underlying sink can handle.
  • What kind of chunks a TransformStream can handle also depends on its Transformer.

Next, we’ll learn how to create readable byte streams.

Readable byte streams  

What kind of stream is created by the ReadableStream constructor depends on the optional property .type of its optional first parameter underlyingSource:

  • If .type is omitted or no underlying source is provided, the new instance is a text stream.
  • If .type is the string 'bytes', the new instance is a byte stream:
    const readableByteStream = new ReadableStream({
      type: 'bytes',
      async start() { /*...*/ }
      // ...
    });
    

What changes if a ReadableStream is in 'bytes' mode?

In default mode, the underlying source can return any kind of chunk. In bytes mode, the chunks must be ArrayBufferViews, i.e. TypedArrays (such as Uint8Arrays) or DataViews.

Additionally, a readable byte stream can create two kinds of readers:

  • .getReader() returns an instance of ReadableStreamDefaultReader.
  • .getReader({mode: 'byob'}) returns an instance of ReadableStreamBYOBReader.

“BYOB“ stands for “Bring Your Own Buffer” and means that we can pass a buffer (an ArrayBufferView) to reader.read(). Afterwards, that ArrayBufferView will be detached and no longer usable. But .read() returns its data in a new ArrayBufferView that has the same type and accesses the same region of the same ArrayBuffer.

Additionally, readable byte streams have different controllers: They are instances of ReadableByteStreamController (vs. ReadableStreamDefaultController). Apart from forcing underlying sources to enqueue ArrayBufferViews (TypedArrays or DataViews), it also supports ReadableStreamBYOBReaders via its property .byobRequest. An underlying source writes its data into the BYOBRequest stored in this property. The web streams standard has two examples of using .byobRequest in its section “Examples of creating streams”.

Example: an infinite readable byte stream filled with random data  

In the next example, create an infinite readable byte stream that fills its chunks with random data (inspiration: example4.mjs in “Implementing the Web Streams API in Node.js”).

import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);

const readableByteStream = new ReadableStream({
  type: 'bytes',
  async pull(controller) {
    const byobRequest = controller.byobRequest;
    await asyncRandomFill(byobRequest.view);
    byobRequest.respond(byobRequest.view.byteLength);
  },
});

const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);

Due to readableByteStream being infinite, we can’t loop over it. That’s why we only read its first chunk (line B).

The buffer we create in line A is transferred and therefore unreadable after line B.

Example: compressing a readable byte stream  

In the following example, we create a readable byte stream and pipe it through a stream that compresses it to the GZIP format:

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // 256 zeros
    controller.enqueue(new Uint8Array(256));
    controller.close();
  },
});
const transformedStream = readableByteStream.pipeThrough(
  new CompressionStream('gzip'));
await logChunks(transformedStream);

async function logChunks(readableByteStream) {
  const reader = transformedStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) break;
      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

Example: reading a web page via fetch()  

The result of fetch() resolves to a response object whose property .body is a readable byte stream. We convert that byte stream to a text stream via TextDecoderStream:

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
  new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

Node.js-specific helpers  

Node.js is the only web platform that supports the following helper functions that it calls utility consumers:

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';

These functions convert web ReadableStreams, Node.js Readables and AsyncIterators to Promises that are fulfilled with:

  • ArrayBuffers (arrayBuffer())
  • Blobs (blob())
  • Node.js Buffers (buffer())
  • JSON objects (json())
  • Strings (text())

Binary data is assumed to be UTF-8-encoded:

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // TextEncoder converts strings to UTF-8 encoded Uint8Arrays
    const encoder = new TextEncoder();
    const view = encoder.encode('"😀"');
    assert.deepEqual(
      view,
      Uint8Array.of(34, 240, 159, 152, 128, 34)
    );
    controller.enqueue(view);
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

String streams work as expected:

import * as assert from 'assert';

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  start(controller) {
    controller.enqueue('"😀"');
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

Further reading  

All of the material mentioned in this section was a source for this blog post.

This post doesn’t cover every aspect of the web streams API. You can find more information here:

More material: