stream-fork
TypeScript icon, indicating that this package has built-in type declarations

2.0.0 • Public • Published

stream-fork NPM version

A toolkit of 1→N stream combinators — sinks that distribute every chunk to N downstream sinks under different dispatch shapes, with proper backpressure handling. Three primitives cover the three useful control-flow shapes:

  • fork — broadcast: every chunk to every live output, slowest gates.
  • route — single-target dispatch: per-chunk picker selects one output.
  • filter — subset broadcast: per-output predicates decide who receives.

Available in two flavors that share the same options surface: Node Streams (the default stream-fork entry) and Web Streams (stream-fork/web). Zero runtime dependencies. Part of the stream-chain / stream-json family.

Installation

npm i stream-fork

Node 22+ required (or any host with a WritableStream global for the Web variant — modern browsers, Deno 2+, Bun 1+).

Quick start (Node Streams)

import fork from 'stream-fork';
import fs from 'node:fs';
import zlib from 'node:zlib';

const gzip = zlib.createGzip();
gzip.pipe(fs.createWriteStream('log.txt.gz'));

// push every chunk to both the gzip chain and stdout
dataSource.pipe(fork([gzip, process.stdout]));

Quick start (Web Streams)

import fork from 'stream-fork/web';

// dataSource is a ReadableStream, sinkA/B are WritableStreams
await dataSource.pipeTo(fork([sinkA, sinkB]));

The Web fork is a backpressure-preserving generalization of ReadableStream.tee() to N outputs — unlike tee, it does not buffer per branch (a slow branch slows upstream rather than ballooning a queue).

API

The Node and Web primitives share the same options surface. Replace Writable[] with WritableStream[] in the signatures below for the Web flavor.

fork(outputs[, options])

Broadcast sink. Every chunk goes to every live output; Promise.all over the per-output writes gates upstream backpressure to the slowest downstream.

  • outputs — array of downstream sinks.
  • options — Writable options (Node) or {queuingStrategy} (Web). Default {objectMode: true} on Node.
    • options.ignoreErrors — when truthy, downstream errors are silently dropped and the failing stream is removed from outputs.
import fork from 'stream-fork';
source.pipe(fork([sinkA, sinkB, sinkC]));

route(outputs, options)

Per-chunk single-target dispatch.

  • outputs — non-empty array of downstream sinks.
  • options.pick(chunk[, encoding]) => number | undefined — required picker. Returns the index of the output to forward to, or any non-index value to drop the chunk.
  • Plus any inner-stream options and ignoreErrors.
import route from 'stream-fork/route.js';

source.pipe(
  route([evenSink, oddSink], {
    pick: chunk => (chunk % 2 === 0 ? 0 : 1)
  })
);

filter(outputs, options)

Per-chunk subset broadcast.

  • outputs — non-empty array of downstream sinks.
  • options.predicates — array of predicates, one per output (same length).
  • Plus any inner-stream options and ignoreErrors.
import filter from 'stream-fork/filter.js';

source.pipe(
  filter([auditSink, errorSink, allSink], {
    predicates: [log => log.audit, log => log.level === 'error', () => true]
  })
);

Picker helpers

Shared between the Node and Web trees — pure functions, no runtime imports.

import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
import pickByHash from 'stream-fork/utils/pick-by-hash.js';
import pickByKey from 'stream-fork/utils/pick-by-key.js';
import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';
  • pickRoundRobin(count) — cycles 0..count-1. Load-balance across N workers.
  • pickByHash(keyFn, count) — stable hash(key) % count sharding.
  • pickByKey(keyFn, table) — explicit key → index map (object or Map).
  • pickFirstMatch(predicates) — first matching predicate's index; append () => true for catch-all.

Example: round-robin load balance (Web variant).

import route from 'stream-fork/web/route.js';
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';

await source.pipeTo(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));

For detailed usage docs see the wiki.

Release History

  • 2.0.0 ESM, new API: fork(...), no new, route, filter.
  • 1.0.5 technical release.
  • 1.0.4 bugfix: forward errors correctly, thx dbubovych.
  • 1.0.3 technical release to support Node 14.
  • 1.0.2 workaround for Node 6: use 'finish' event instead of _final().
  • 1.0.1 improved documentation.
  • 1.0.0 the initial release.

The full release notes are in the wiki: Release notes.