A toolkit of 1→N stream combinators — sinks that distribute every chunk to N downstream sinks under different dispatch shapes, with proper backpressure handling. Three primitives cover the three useful control-flow shapes:
-
fork— broadcast: every chunk to every live output, slowest gates. -
route— single-target dispatch: per-chunk picker selects one output. -
filter— subset broadcast: per-output predicates decide who receives.
Available in two flavors that share the same options surface: Node Streams (the default stream-fork entry) and Web Streams (stream-fork/web). Zero runtime dependencies. Part of the stream-chain / stream-json family.
npm i stream-forkNode 22+ required (or any host with a WritableStream global for the Web variant — modern browsers, Deno 2+, Bun 1+).
import fork from 'stream-fork';
import fs from 'node:fs';
import zlib from 'node:zlib';
const gzip = zlib.createGzip();
gzip.pipe(fs.createWriteStream('log.txt.gz'));
// push every chunk to both the gzip chain and stdout
dataSource.pipe(fork([gzip, process.stdout]));import fork from 'stream-fork/web';
// dataSource is a ReadableStream, sinkA/B are WritableStreams
await dataSource.pipeTo(fork([sinkA, sinkB]));The Web fork is a backpressure-preserving generalization of ReadableStream.tee() to N outputs — unlike tee, it does not buffer per branch (a slow branch slows upstream rather than ballooning a queue).
The Node and Web primitives share the same options surface. Replace Writable[] with WritableStream[] in the signatures below for the Web flavor.
Broadcast sink. Every chunk goes to every live output; Promise.all over the per-output writes gates upstream backpressure to the slowest downstream.
-
outputs— array of downstream sinks. -
options— Writable options (Node) or{queuingStrategy}(Web). Default{objectMode: true}on Node.-
options.ignoreErrors— when truthy, downstream errors are silently dropped and the failing stream is removed fromoutputs.
-
import fork from 'stream-fork';
source.pipe(fork([sinkA, sinkB, sinkC]));Per-chunk single-target dispatch.
-
outputs— non-empty array of downstream sinks. -
options.pick(chunk[, encoding]) => number | undefined— required picker. Returns the index of the output to forward to, or any non-index value to drop the chunk. - Plus any inner-stream options and
ignoreErrors.
import route from 'stream-fork/route.js';
source.pipe(
route([evenSink, oddSink], {
pick: chunk => (chunk % 2 === 0 ? 0 : 1)
})
);Per-chunk subset broadcast.
-
outputs— non-empty array of downstream sinks. -
options.predicates— array of predicates, one per output (same length). - Plus any inner-stream options and
ignoreErrors.
import filter from 'stream-fork/filter.js';
source.pipe(
filter([auditSink, errorSink, allSink], {
predicates: [log => log.audit, log => log.level === 'error', () => true]
})
);Shared between the Node and Web trees — pure functions, no runtime imports.
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
import pickByHash from 'stream-fork/utils/pick-by-hash.js';
import pickByKey from 'stream-fork/utils/pick-by-key.js';
import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';-
pickRoundRobin(count)— cycles0..count-1. Load-balance across N workers. -
pickByHash(keyFn, count)— stablehash(key) % countsharding. -
pickByKey(keyFn, table)— explicitkey → indexmap (object orMap). -
pickFirstMatch(predicates)— first matching predicate's index; append() => truefor catch-all.
Example: round-robin load balance (Web variant).
import route from 'stream-fork/web/route.js';
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
await source.pipeTo(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));For detailed usage docs see the wiki.
- 2.0.0 ESM, new API:
fork(...), nonew,route,filter. - 1.0.5 technical release.
- 1.0.4 bugfix: forward errors correctly, thx dbubovych.
- 1.0.3 technical release to support Node 14.
- 1.0.2 workaround for Node 6: use
'finish'event instead of_final(). - 1.0.1 improved documentation.
- 1.0.0 the initial release.
The full release notes are in the wiki: Release notes.