Web Streams API
History
No longer experimental.
Use of this API no longer emit a runtime warning.
An implementation of the WHATWG Streams Standard.
The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.
There are three primary types of objects:
ReadableStream- Represents a source of streaming data.WritableStream- Represents a destination for streaming data.TransformStream- Represents an algorithm for transforming streaming data.
This example creates a simple ReadableStream that pushes the current
performance.now() timestamp once every second forever. An async iterable
is used to read the data from the stream.
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);Node.js streams can be converted to web streams and vice versa via the toWeb and fromWeb methods present on stream.Readable, stream.Writable and stream.Duplex objects.
For more details refer to the relevant documentation:
stream.Readable.toWebstream.Readable.fromWebstream.Writable.toWebstream.Writable.fromWebstream.Duplex.toWebstream.Duplex.fromWeb
ReadableStream
History
This class is now exposed on the global object.
new ReadableStream(underlyingSource ?, strategy?): void<Object><Function>ReadableStream
is created.undefined
or a promise fulfilled with
undefined
.<Function>ReadableStream
internal queue is not full. The operation may be sync or
async. If async, the function will not be called again until the previously
returned promise is fulfilled.undefined
.<Function>ReadableStream
is canceled.<any>undefined
.<string>'bytes'
or
undefined
.<number>type
is equal to
'bytes'
. When set to a non-zero value a view buffer is automatically
allocated to
ReadableByteStreamController.byobRequest
. When not set
one must use stream's internal queues to transfer data via the default
reader
ReadableStreamDefaultReader
.<Object><number><Function><boolean>true
if there is an active reader for this
<ReadableStream>
.The readableStream.locked property is false by default, and is
switched to true while there is an active reader consuming the
stream's data.
readableStream.cancel(reason?): undefined<any>undefined
once cancelation has
been completed.readableStream.getReader(options?): ReadableStreamDefaultReader | ReadableStreamBYOBReaderimport { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());Causes the readableStream.locked to be true.
readableStream.pipeThrough(transform, options?): ReadableStream<Object><ReadableStream>ReadableStream
to which
transform.writable
will push the potentially modified data
it receives from this
ReadableStream
.<WritableStream>WritableStream
to which this
ReadableStream
's data will be written.<Object><boolean>true
, errors in this
ReadableStream
will not cause
transform.writable
to be aborted.<boolean>true
, errors in the destination
transform.writable
do not cause this
ReadableStream
to be
canceled.<boolean>true
, closing this
ReadableStream
does not cause
transform.writable
to be closed.<AbortSignal><AbortController>
.<ReadableStream>transform.readable
.Connects this <ReadableStream> to the pair of <ReadableStream> and
<WritableStream> provided in the transform argument such that the
data from this <ReadableStream> is written in to transform.writable,
possibly transformed, then pushed to transform.readable. Once the
pipeline is configured, transform.readable is returned.
Causes the readableStream.locked to be true while the pipe operation
is active.
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: AreadableStream.pipeTo(destination, options?): undefined<WritableStream><WritableStream>
to which this
ReadableStream
's data will be written.<Object><boolean>true
, errors in this
ReadableStream
will not cause
destination
to be aborted.<boolean>true
, errors in the
destination
will not cause this
ReadableStream
to be canceled.<boolean>true
, closing this
ReadableStream
does not cause
destination
to be closed.<AbortSignal><AbortController>
.undefinedCauses the readableStream.locked to be true while the pipe operation
is active.
readableStream.tee
History
Support teeing a readable byte stream.
readableStream.tee(): ReadableStream[]<ReadableStream[]>Returns a pair of new <ReadableStream> instances to which this ReadableStream's data will be forwarded. Each will receive the
same data.
Causes the readableStream.locked to be true.
readableStream.values(options?): void<Object><boolean>true
, prevents the
<ReadableStream>
from being closed when the async iterator abruptly terminates.
Default
:
false
.Creates and returns an async iterator usable for consuming this
ReadableStream's data.
Causes the readableStream.locked to be true while the async iterator
is active.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());The <ReadableStream> object supports the async iterator protocol using for await syntax.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());The async iterator will consume the <ReadableStream> until it terminates.
By default, if the async iterator exits early (via either a break,
return, or a throw), the <ReadableStream> will be closed. To prevent
automatic closing of the <ReadableStream>, use the readableStream.values()
method to acquire the async iterator and set the preventCancel option to
true.
The <ReadableStream> must not be locked (that is, it must not have an existing
active reader). During the async iteration, the <ReadableStream> will be locked.
A <ReadableStream> instance can be transferred using a <MessagePort>.
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);ReadableStream.from(iterable): void<Iterable>Symbol.asyncIterator
or
Symbol.iterator
iterable protocol.A utility method that creates a new <ReadableStream> from an iterable.
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'To pipe the resulting <ReadableStream> into a <WritableStream> the <Iterable>
should yield a sequence of <Buffer>, <TypedArray>, or <DataView> objects.
import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
await stream.pipeTo(createWritableStreamSomehow());ReadableStreamDefaultReader
History
This class is now exposed on the global object.
By default, calling readableStream.getReader() with no arguments
will return an instance of ReadableStreamDefaultReader. The default
reader treats the chunks of data passed through the stream as opaque
values, which allows the <ReadableStream> to work with generally any
JavaScript value.
new ReadableStreamDefaultReader(stream): void<ReadableStream>Creates a new <ReadableStreamDefaultReader> that is locked to the
given <ReadableStream>.
readableStreamDefaultReader.cancel(reason?): undefined<any>undefined
.Cancels the <ReadableStream> and returns a promise that is fulfilled
when the underlying stream has been canceled.
<Promise>undefined
when the associated
<ReadableStream>
is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.read(): undefinedRequests the next chunk of data from the underlying <ReadableStream>
and returns a promise that is fulfilled with the data once it is
available.
readableStreamDefaultReader.releaseLock(): voidReleases this reader's lock on the underlying <ReadableStream>.
ReadableStreamBYOBReader
History
This class is now exposed on the global object.
The ReadableStreamBYOBReader is an alternative consumer for
byte-oriented <ReadableStream>s (those that are created with underlyingSource.type set equal to 'bytes' when the
ReadableStream was created).
The BYOB is short for "bring your own buffer". This is a
pattern that allows for more efficient reading of byte-oriented
data that avoids extraneous copying.
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());new ReadableStreamBYOBReader(stream): void<ReadableStream>Creates a new ReadableStreamBYOBReader that is locked to the
given <ReadableStream>.
readableStreamBYOBReader.cancel(reason?): undefined<any>undefined
.Cancels the <ReadableStream> and returns a promise that is fulfilled
when the underlying stream has been canceled.
<Promise>undefined
when the associated
<ReadableStream>
is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.read(view, options?): undefined<Buffer>
|
<TypedArray>
|
<DataView>Requests the next chunk of data from the underlying <ReadableStream>
and returns a promise that is fulfilled with the data once it is
available.
Do not pass a pooled <Buffer> object instance in to this method.
Pooled Buffer objects are created using Buffer.allocUnsafe(),
or Buffer.from(), or are often returned by various node:fs module
callbacks. These types of Buffers use a shared underlying
<ArrayBuffer> object that contains all of the data from all of
the pooled Buffer instances. When a Buffer, <TypedArray>,
or <DataView> is passed in to readableStreamBYOBReader.read(),
the view's underlying ArrayBuffer is detached, invalidating
all existing views that may exist on that ArrayBuffer. This
can have disastrous consequences for your application.
readableStreamBYOBReader.releaseLock(): voidReleases this reader's lock on the underlying <ReadableStream>.
Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The ReadableStreamDefaultController is the default controller
implementation for ReadableStreams that are not byte-oriented.
readableStreamDefaultController.close(): voidCloses the <ReadableStream> to which this controller is associated.
<number>Returns the amount of data remaining to fill the <ReadableStream>'s
queue.
readableStreamDefaultController.enqueue(chunk?): void<any>Appends a new chunk of data to the <ReadableStream>'s queue.
readableStreamDefaultController.error(error?): void<any>Signals an error that causes the <ReadableStream> to error and close.
ReadableByteStreamController
History
Support handling a BYOB pull request from a released reader.
Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The ReadableByteStreamController is for byte-oriented ReadableStreams.
readableByteStreamController.close(): voidCloses the <ReadableStream> to which this controller is associated.
<number>Returns the amount of data remaining to fill the <ReadableStream>'s
queue.
readableByteStreamController.enqueue(chunk): void<Buffer>
|
<TypedArray>
|
<DataView>Appends a new chunk of data to the <ReadableStream>'s queue.
readableByteStreamController.error(error?): void<any>Signals an error that causes the <ReadableStream> to error and close.
ReadableStreamBYOBRequest
History
This class is now exposed on the global object.
When using ReadableByteStreamController in byte-oriented
streams, and when using the ReadableStreamBYOBReader,
the readableByteStreamController.byobRequest property
provides access to a ReadableStreamBYOBRequest instance
that represents the current read request. The object
is used to gain access to the ArrayBuffer/TypedArray
that has been provided for the read request to fill,
and provides methods for signaling that the data has
been provided.
readableStreamBYOBRequest.respond(bytesWritten): void<number>Signals that a bytesWritten number of bytes have been written
to readableStreamBYOBRequest.view.
readableStreamBYOBRequest.respondWithNewView(view): void<Buffer>
|
<TypedArray>
|
<DataView>Signals that the request has been fulfilled with bytes written
to a new Buffer, TypedArray, or DataView.
<Buffer>
|
<TypedArray>
|
<DataView>WritableStream
History
This class is now exposed on the global object.
The WritableStream is a destination to which stream data is sent.
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');new WritableStream(underlyingSink?, strategy?): void<Object><Function>WritableStream
is created.<WritableStreamDefaultController>undefined
or a promise fulfilled with
undefined
.<Function>WritableStream
.<Function>WritableStream
is closed.undefined
.<Function>WritableStream
.<any>undefined
.<any>type
option is reserved for future use and
must
be
undefined.<Object><number><Function>writableStream.abort(reason?): undefined<any>undefined
.Abruptly terminates the WritableStream. All queued writes will be
canceled with their associated promises rejected.
writableStream.close(): undefinedundefined
.Closes the WritableStream when no additional writes are expected.
writableStream.getWriter(): WritableStreamDefaultWriter<WritableStreamDefaultWriter>Creates and returns a new writer instance that can be used to write
data into the WritableStream.
<boolean>The writableStream.locked property is false by default, and is
switched to true while there is an active writer attached to this
WritableStream.
A <WritableStream> instance can be transferred using a <MessagePort>.
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);WritableStreamDefaultWriter
History
This class is now exposed on the global object.
new WritableStreamDefaultWriter(stream): void<WritableStream>Creates a new WritableStreamDefaultWriter that is locked to the given
WritableStream.
writableStreamDefaultWriter.abort(reason?): undefined<any>undefined
.Abruptly terminates the WritableStream. All queued writes will be
canceled with their associated promises rejected.
writableStreamDefaultWriter.close(): undefinedundefined
.Closes the WritableStream when no additional writes are expected.
<Promise>undefined
when the associated
<WritableStream>
is closed or rejected if the stream errors or the writer's
lock is released before the stream finishes closing.<number>The amount of data required to fill the <WritableStream>'s queue.
<Promise>undefined
when the writer is ready
to be used.writableStreamDefaultWriter.releaseLock(): voidReleases this writer's lock on the underlying <ReadableStream>.
writableStreamDefaultWriter.write(chunk?): undefined<any>undefined
.Appends a new chunk of data to the <WritableStream>'s queue.
WritableStreamDefaultController
History
This class is now exposed on the global object.
The WritableStreamDefaultController manages the <WritableStream>'s
internal state.
writableStreamDefaultController.error(error?): void<any>Called by user-code to signal that an error has occurred while processing
the WritableStream data. When called, the <WritableStream> will be aborted,
with currently pending writes canceled.
<AbortSignal>AbortSignal
that can be used to cancel pending
write or close operations when a
<WritableStream>
is aborted.TransformStream
History
This class is now exposed on the global object.
A TransformStream consists of a <ReadableStream> and a <WritableStream> that
are connected such that the data written to the WritableStream is received,
and potentially transformed, before being pushed into the ReadableStream's
queue.
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);new TransformStream(transformer?, writableStrategy?, readableStrategy?): void<Object><Function>TransformStream
is created.<TransformStreamDefaultController>undefined
or a promise fulfilled with
undefined<Function>transformStream.writable
,
before forwarding that on to
transformStream.readable
.<any><TransformStreamDefaultController>undefined
.<Function>TransformStream
is closed, signaling the end of
the transformation process.<TransformStreamDefaultController>undefined
.<any>readableType
option is reserved for future use
and
must
be
undefined
.<any>writableType
option is reserved for future use
and
must
be
undefined
.<Object><number><Function><Object><number><Function><ReadableStream><WritableStream>A <TransformStream> instance can be transferred using a <MessagePort>.
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);TransformStreamDefaultController
History
This class is now exposed on the global object.
The TransformStreamDefaultController manages the internal state
of the TransformStream.
<number>The amount of data required to fill the readable side's queue.
transformStreamDefaultController.enqueue(chunk?): void<any>Appends a chunk of data to the readable side's queue.
transformStreamDefaultController.error(reason?): void<any>Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.
transformStreamDefaultController.terminate(): voidCloses the readable side of the transport and causes the writable side to be abruptly closed with an error.
ByteLengthQueuingStrategy
History
This class is now exposed on the global object.
new ByteLengthQueuingStrategy(init): void<number><Function>CountQueuingStrategy
History
This class is now exposed on the global object.
new CountQueuingStrategy(init): void<number><Function>TextEncoderStream
History
This class is now exposed on the global object.
new TextEncoderStream(): voidCreates a new TextEncoderStream instance.
<string>The encoding supported by the TextEncoderStream instance.
<ReadableStream><WritableStream>TextDecoderStream
History
This class is now exposed on the global object.
new TextDecoderStream(encoding?, options?): void<string>encoding
that this
TextDecoder
instance
supports.
Default:
'utf-8'
.<Object><boolean>true
if decoding failures are fatal.<boolean>true
, the
TextDecoderStream
will include the
byte order mark in the decoded result. When
false
, the byte order mark
will be removed from the output. This option is only used when
encoding
is
'utf-8'
,
'utf-16be'
, or
'utf-16le'
.
Default:
false
.Creates a new TextDecoderStream instance.
<string>The encoding supported by the TextDecoderStream instance.
<boolean>The value will be true if decoding errors result in a TypeError being
thrown.
<boolean>The value will be true if the decoding result will include the byte order
mark.
<ReadableStream><WritableStream>CompressionStream
History
This class is now exposed on the global object.
new CompressionStream(format): void<string>'deflate'
,
'deflate-raw'
,
'gzip'
, or
'brotli'
.<ReadableStream><WritableStream>DecompressionStream
History
This class is now exposed on the global object.
new DecompressionStream(format): void<string>'deflate'
,
'deflate-raw'
,
'gzip'
, or
'brotli'
.<ReadableStream><WritableStream>Utility Consumers
History
The utility consumer functions provide common options for consuming streams.
They are accessed using:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';streamConsumers.arrayBuffer(stream): Promise<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>ArrayBuffer
containing the full
contents of the stream.import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76streamConsumers.blob(stream): Promise<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator>import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27streamConsumers.buffer(stream): Promise<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator>import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27streamConsumers.bytes(stream): Promise<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise><Uint8Array>
containing the full
contents of the stream.import { bytes } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await bytes(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27streamConsumers.json(stream): Promise<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>JSON.parse()
.import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100streamConsumers.text(stream): Promise<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27