|
|
|
@ -2,7 +2,7 @@
|
|
|
|
|
import { Duplex, Writable } from 'stream';
|
|
|
|
|
|
|
|
|
|
export class MemoryDuplex extends Duplex {
|
|
|
|
|
protected buffer = new Buffer(this.size);
|
|
|
|
|
protected buffer = Buffer.alloc(this.size);
|
|
|
|
|
protected buffered = 0;
|
|
|
|
|
protected doPush = false;
|
|
|
|
|
constructor(public readonly size: number = 4092) {
|
|
|
|
@ -10,7 +10,7 @@ export class MemoryDuplex extends Duplex {
|
|
|
|
|
}
|
|
|
|
|
// tslint:disable-next-line:function-name
|
|
|
|
|
public _write(chunk: any, encoding: string, callback: (err?: Error) => void) {
|
|
|
|
|
const buffer = chunk instanceof Buffer ? chunk : new Buffer(chunk, encoding);
|
|
|
|
|
const buffer = chunk instanceof Buffer ? chunk : Buffer.from(chunk, encoding);
|
|
|
|
|
const end = this.buffered + buffer.length;
|
|
|
|
|
if (end > this.size) {
|
|
|
|
|
return callback(new Error('Buffer overflow'));
|
|
|
|
@ -23,7 +23,7 @@ export class MemoryDuplex extends Duplex {
|
|
|
|
|
public _read() {
|
|
|
|
|
const slice = this.buffer.slice(0, this.buffered);
|
|
|
|
|
this.buffered = 0;
|
|
|
|
|
this.buffer = new Buffer(this.size);
|
|
|
|
|
this.buffer = Buffer.alloc(this.size);
|
|
|
|
|
this.doPush = this.push(slice);
|
|
|
|
|
}
|
|
|
|
|
public bytesBuffered() {
|
|
|
|
@ -37,7 +37,7 @@ export class WritableFunctionStream extends Writable {
|
|
|
|
|
}
|
|
|
|
|
// tslint:disable-next-line:function-name
|
|
|
|
|
public async _write(chunk: any, encoding: string, callback: (err?: Error) => void) {
|
|
|
|
|
const buffer = chunk instanceof Buffer ? chunk : new Buffer(chunk, encoding);
|
|
|
|
|
const buffer = chunk instanceof Buffer ? chunk : Buffer.from(chunk, encoding);
|
|
|
|
|
try {
|
|
|
|
|
await this.func(buffer);
|
|
|
|
|
callback();
|
|
|
|
|