forked from enviPath/enviPy
700 lines
22 KiB
JavaScript
Executable File
700 lines
22 KiB
JavaScript
Executable File
/* eslint max-nested-callbacks:[1] */
|
|
|
|
'use strict';
|
|
|
|
var assert = require('assert');
|
|
var Duplexer = require('../src');
|
|
var Stream = require('readable-stream');
|
|
var streamtest = require('streamtest');
|
|
|
|
describe('Duplexer', function() {
|
|
|
|
streamtest.versions.forEach(function(version) {
|
|
describe('for ' + version + ' streams', function() {
|
|
|
|
describe('in binary mode', function() {
|
|
|
|
describe('and with async streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
|
|
var createDuplexStream = Duplexer;
|
|
var readable = streamtest[version].fromChunks(['biba', 'beloola']);
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = createDuplexStream({}, writable, readable);
|
|
|
|
assert(duplex instanceof Duplexer);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
done();
|
|
}));
|
|
|
|
streamtest[version].fromChunks(['oude', 'lali'])
|
|
.pipe(duplex);
|
|
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
|
|
var readable = streamtest[version].fromChunks(['biba', 'beloola']);
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = new Duplexer({}, writable, readable);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
done();
|
|
}));
|
|
|
|
streamtest[version].fromChunks(['oude', 'lali'])
|
|
.pipe(duplex);
|
|
|
|
});
|
|
|
|
it('should reemit errors', function(done) {
|
|
var readable = new Stream.PassThrough();
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = new Duplexer(writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
assert.equal(errorsCount, 2);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
setImmediate(function() {
|
|
// Writing content to duplex
|
|
duplex.write('oude');
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write('lali');
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write('biba');
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write('beloola');
|
|
readable.end();
|
|
});
|
|
|
|
});
|
|
|
|
it('should not reemit errors when option is set', function(done) {
|
|
var readable = new Stream.PassThrough();
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = new Duplexer({ reemitErrors: false }, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
assert.equal(errorsCount, 0);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
// Catch error events
|
|
readable.on('error', function() {});
|
|
writable.on('error', function() {});
|
|
|
|
setImmediate(function() {
|
|
// Writing content to duplex
|
|
duplex.write('oude');
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write('lali');
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write('biba');
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write('beloola');
|
|
readable.end();
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
describe('and with sync streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
var createDuplexStream = Duplexer;
|
|
var readable = new Stream.PassThrough();
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = createDuplexStream({}, writable, readable);
|
|
|
|
assert(duplex instanceof Duplexer);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
done();
|
|
}));
|
|
|
|
// Writing content to duplex
|
|
duplex.write('oude');
|
|
duplex.write('lali');
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write('biba');
|
|
readable.write('beloola');
|
|
readable.end();
|
|
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
var readable = new Stream.PassThrough();
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = new Duplexer(writable, readable);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
done();
|
|
}));
|
|
|
|
// Writing content to duplex
|
|
duplex.write('oude');
|
|
duplex.write('lali');
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write('biba');
|
|
readable.write('beloola');
|
|
readable.end();
|
|
|
|
});
|
|
|
|
it('should reemit errors', function(done) {
|
|
var readable = new Stream.PassThrough();
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = new Duplexer(null, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
assert.equal(errorsCount, 2);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
// Writing content to duplex
|
|
duplex.write('oude');
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write('lali');
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write('biba');
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write('beloola');
|
|
readable.end();
|
|
|
|
});
|
|
|
|
it('should not reemit errors when option is set', function(done) {
|
|
var readable = new Stream.PassThrough();
|
|
var writable = new Stream.PassThrough();
|
|
var duplex = new Duplexer({ reemitErrors: false }, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'oudelali');
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toText(function(err, text) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.equal(text, 'bibabeloola');
|
|
assert.equal(errorsCount, 0);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
// Catch error events
|
|
readable.on('error', function() {});
|
|
writable.on('error', function() {});
|
|
|
|
// Writing content to duplex
|
|
duplex.write('oude');
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write('lali');
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write('biba');
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write('beloola');
|
|
readable.end();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
describe('in object mode', function() {
|
|
var obj1 = { cnt: 'oude' };
|
|
var obj2 = { cnt: 'lali' };
|
|
var obj3 = { cnt: 'biba' };
|
|
var obj4 = { cnt: 'beloola' };
|
|
|
|
describe('and with async streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
|
|
var createDuplexStream = Duplexer;
|
|
var readable = streamtest[version].fromObjects([obj1, obj2]);
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = createDuplexStream({ objectMode: true }, writable, readable);
|
|
|
|
assert(duplex instanceof Duplexer);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
done();
|
|
}));
|
|
|
|
streamtest[version].fromObjects([obj3, obj4])
|
|
.pipe(duplex);
|
|
|
|
});
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
|
|
var readable = streamtest[version].fromObjects([obj1, obj2]);
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = Duplexer.obj(writable, readable);
|
|
|
|
assert(duplex instanceof Duplexer);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
done();
|
|
}));
|
|
|
|
streamtest[version].fromObjects([obj3, obj4])
|
|
.pipe(duplex);
|
|
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
|
|
var readable = streamtest[version].fromObjects([obj1, obj2]);
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = new Duplexer({ objectMode: true }, writable, readable);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
done();
|
|
}));
|
|
|
|
streamtest[version].fromObjects([obj3, obj4])
|
|
.pipe(duplex);
|
|
|
|
});
|
|
|
|
it('should reemit errors', function(done) {
|
|
var readable = new Stream.PassThrough({ objectMode: true });
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = new Duplexer({ objectMode: true }, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
assert.equal(errorsCount, 2);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
setImmediate(function() {
|
|
// Writing content to duplex
|
|
duplex.write(obj1);
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write(obj2);
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write(obj3);
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write(obj4);
|
|
readable.end();
|
|
});
|
|
|
|
});
|
|
|
|
it('should not reemit errors when option is set', function(done) {
|
|
var readable = new Stream.PassThrough({ objectMode: true });
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = Duplexer.obj({ reemitErrors: false }, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
assert.equal(errorsCount, 0);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
// Catch error events
|
|
readable.on('error', function() {});
|
|
writable.on('error', function() {});
|
|
|
|
setImmediate(function() {
|
|
// Writing content to duplex
|
|
duplex.write(obj1);
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write(obj2);
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write(obj3);
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write(obj4);
|
|
readable.end();
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
describe('and with sync streams', function() {
|
|
|
|
it('should work with functionnal API', function(done) {
|
|
var createDuplexStream = Duplexer;
|
|
var readable = new Stream.PassThrough({ objectMode: true });
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = createDuplexStream({ objectMode: true }, writable, readable);
|
|
|
|
assert(duplex instanceof Duplexer);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
done();
|
|
}));
|
|
|
|
// Writing content to duplex
|
|
duplex.write(obj1);
|
|
duplex.write(obj2);
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write(obj3);
|
|
readable.write(obj4);
|
|
readable.end();
|
|
|
|
});
|
|
|
|
it('should work with POO API', function(done) {
|
|
var readable = new Stream.PassThrough({ objectMode: true });
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = new Duplexer({ objectMode: true }, writable, readable);
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
done();
|
|
}));
|
|
|
|
// Writing content to duplex
|
|
duplex.write(obj1);
|
|
duplex.write(obj2);
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write(obj3);
|
|
readable.write(obj4);
|
|
readable.end();
|
|
|
|
});
|
|
|
|
it('should reemit errors', function(done) {
|
|
var readable = new Stream.PassThrough({ objectMode: true });
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = new Duplexer({ objectMode: true }, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
assert.equal(errorsCount, 2);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
// Writing content to duplex
|
|
duplex.write(obj1);
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write(obj2);
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write(obj3);
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write(obj4);
|
|
readable.end();
|
|
|
|
});
|
|
|
|
it('should not reemit errors when option is set', function(done) {
|
|
var readable = new Stream.PassThrough({ objectMode: true });
|
|
var writable = new Stream.PassThrough({ objectMode: true });
|
|
var duplex = new Duplexer({
|
|
objectMode: true,
|
|
reemitErrors: false,
|
|
}, writable, readable);
|
|
var errorsCount = 0;
|
|
|
|
// Checking writable content
|
|
writable.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj1, obj2]);
|
|
}));
|
|
|
|
// Checking duplex output
|
|
duplex.pipe(streamtest[version].toObjects(function(err, objs) {
|
|
if(err) {
|
|
return done(err);
|
|
}
|
|
assert.deepEqual(objs, [obj3, obj4]);
|
|
assert.equal(errorsCount, 0);
|
|
done();
|
|
}));
|
|
|
|
duplex.on('error', function() {
|
|
errorsCount++;
|
|
});
|
|
|
|
// Catch error events
|
|
readable.on('error', function() {});
|
|
writable.on('error', function() {});
|
|
|
|
// Writing content to duplex
|
|
duplex.write(obj1);
|
|
writable.emit('error', new Error('hip'));
|
|
duplex.write(obj2);
|
|
duplex.end();
|
|
|
|
// Writing content to readable
|
|
readable.write(obj3);
|
|
readable.emit('error', new Error('hip'));
|
|
readable.write(obj4);
|
|
readable.end();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
});
|
|
|
|
});
|