index.js 1.45 KB
Newer Older
Patrick's avatar
Patrick committed
1
'use strict';
2
3
4
const {constants: BufferConstants} = require('buffer');
const stream = require('stream');
const {promisify} = require('util');
Patrick's avatar
Patrick committed
5
6
const bufferStream = require('./buffer-stream');

7
8
9
10
11
12
13
14
15
16
const streamPipelinePromisified = promisify(stream.pipeline);

class MaxBufferError extends Error {
	constructor() {
		super('maxBuffer exceeded');
		this.name = 'MaxBufferError';
	}
}

async function getStream(inputStream, options) {
Patrick's avatar
Patrick committed
17
	if (!inputStream) {
18
		throw new Error('Expected a stream');
Patrick's avatar
Patrick committed
19
20
	}

21
22
23
24
	options = {
		maxBuffer: Infinity,
		...options
	};
Patrick's avatar
Patrick committed
25

26
27
	const {maxBuffer} = options;
	const stream = bufferStream(options);
Patrick's avatar
Patrick committed
28

29
30
31
32
33
	await new Promise((resolve, reject) => {
		const rejectPromise = error => {
			// Don't retrieve an oversized buffer.
			if (error && stream.getBufferedLength() <= BufferConstants.MAX_LENGTH) {
				error.bufferedData = stream.getBufferedValue();
Patrick's avatar
Patrick committed
34
35
			}

36
			reject(error);
Patrick's avatar
Patrick committed
37
38
		};

39
40
41
42
43
44
45
46
		(async () => {
			try {
				await streamPipelinePromisified(inputStream, stream);
				resolve();
			} catch (error) {
				rejectPromise(error);
			}
		})();
Patrick's avatar
Patrick committed
47
48
49

		stream.on('data', () => {
			if (stream.getBufferedLength() > maxBuffer) {
50
				rejectPromise(new MaxBufferError());
Patrick's avatar
Patrick committed
51
52
53
54
			}
		});
	});

55
	return stream.getBufferedValue();
Patrick's avatar
Patrick committed
56
57
58
}

module.exports = getStream;
59
60
61
module.exports.buffer = (stream, options) => getStream(stream, {...options, encoding: 'buffer'});
module.exports.array = (stream, options) => getStream(stream, {...options, array: true});
module.exports.MaxBufferError = MaxBufferError;