internal.pipeline - Node documentation
function internal.pipeline

Usage in Deno

import { internal } from "node:stream";
const { pipeline } = internal;
pipeline<
A extends PipelineSource<any>,
B extends PipelineDestination<A, any>,
>
(
source: A,
destination: B,
callback?: PipelineCallback<B>,
): B extends WritableStream ? B : WritableStream

A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  },
);

The pipeline API provides a promise version.

stream.pipeline() will call stream.destroy(err) on all streams except:

  • Readable streams which have emitted 'end' or 'close'.
  • Writable streams which have emitted 'finish' or 'close'.

stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. If the last stream is readable, dangling event listeners will be removed so that the last stream can be consumed later.

stream.pipeline() closes all the streams when an error is raised. The IncomingRequest usage with pipeline could lead to an unexpected behavior once it would destroy the socket without sending the expected response. See the example below:

const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt');
  pipeline(fileStream, res, (err) => {
    if (err) {
      console.log(err); // No such file
      // this message can't be sent once `pipeline` already destroyed the socket
      return res.end('error!!!');
    }
  });
});

Type Parameters

A extends PipelineSource<any>
B extends PipelineDestination<A, any>

Parameters

source: A
destination: B
optional
callback: PipelineCallback<B>

Called when the pipeline is fully done.

Return Type

B extends WritableStream ? B : WritableStream
pipeline<
A extends PipelineSource<any>,
T1 extends PipelineTransform<A, any>,
B extends PipelineDestination<T1, any>,
>
(
source: A,
transform1: T1,
destination: B,
callback?: PipelineCallback<B>,
): B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
B extends PipelineDestination<T1, any>

Parameters

source: A
transform1: T1
destination: B
optional
callback: PipelineCallback<B>

Return Type

B extends WritableStream ? B : WritableStream
pipeline<
A extends PipelineSource<any>,
T1 extends PipelineTransform<A, any>,
T2 extends PipelineTransform<T1, any>,
B extends PipelineDestination<T2, any>,
>
(
source: A,
transform1: T1,
transform2: T2,
destination: B,
callback?: PipelineCallback<B>,
): B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
T2 extends PipelineTransform<T1, any>
B extends PipelineDestination<T2, any>

Parameters

source: A
transform1: T1
transform2: T2
destination: B
optional
callback: PipelineCallback<B>

Return Type

B extends WritableStream ? B : WritableStream
pipeline<
A extends PipelineSource<any>,
T1 extends PipelineTransform<A, any>,
T2 extends PipelineTransform<T1, any>,
T3 extends PipelineTransform<T2, any>,
B extends PipelineDestination<T3, any>,
>
(
source: A,
transform1: T1,
transform2: T2,
transform3: T3,
destination: B,
callback?: PipelineCallback<B>,
): B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
T2 extends PipelineTransform<T1, any>
T3 extends PipelineTransform<T2, any>
B extends PipelineDestination<T3, any>

Parameters

source: A
transform1: T1
transform2: T2
transform3: T3
destination: B
optional
callback: PipelineCallback<B>

Return Type

B extends WritableStream ? B : WritableStream
pipeline<
A extends PipelineSource<any>,
T1 extends PipelineTransform<A, any>,
T2 extends PipelineTransform<T1, any>,
T3 extends PipelineTransform<T2, any>,
T4 extends PipelineTransform<T3, any>,
B extends PipelineDestination<T4, any>,
>
(
source: A,
transform1: T1,
transform2: T2,
transform3: T3,
transform4: T4,
destination: B,
callback?: PipelineCallback<B>,
): B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
T2 extends PipelineTransform<T1, any>
T3 extends PipelineTransform<T2, any>
T4 extends PipelineTransform<T3, any>
B extends PipelineDestination<T4, any>

Parameters

source: A
transform1: T1
transform2: T2
transform3: T3
transform4: T4
destination: B
optional
callback: PipelineCallback<B>

Return Type

B extends WritableStream ? B : WritableStream
pipeline(
streams: ReadonlyArray<
ReadableStream
| WritableStream
| ReadWriteStream
>
,
callback?: (err: ErrnoException | null) => void,
): WritableStream

Parameters

streams: ReadonlyArray<
ReadableStream
| WritableStream
| ReadWriteStream
>
optional
callback: (err: ErrnoException | null) => void

Return Type

WritableStream
pipeline(
stream1: ReadableStream,
stream2: ReadWriteStream | WritableStream,
...streams: Array<
ReadWriteStream
| WritableStream
| ((err: ErrnoException | null) => void)
>
,
): WritableStream

Parameters

stream1: ReadableStream
stream2: ReadWriteStream | WritableStream
...streams: Array<
ReadWriteStream
| WritableStream
| ((err: ErrnoException | null) => void)
>

Return Type

WritableStream