From 35c54b393114aeb64c33afc4de5055963b8b78a5 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 24 Mar 2022 20:41:31 +0100 Subject: [PATCH 01/19] adds optimized iterators for mapping (.map()) and filtering (.filter()) --- asynciterator.ts | 44 ++++++++++++++++++++++++++-- test/SimpleTransformIterator-test.js | 12 ++++---- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 000fd10..bbc6931 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -456,7 +456,8 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D, self?: any): AsyncIterator { - return this.transform({ map: self ? map.bind(self) : map }); + // return this.transform({ map: self ? map.bind(self) : map }); + return new MappingIterator(this, self ? map.bind(self) : map); } /** @@ -469,7 +470,8 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - return this.transform({ filter: self ? filter.bind(self) : filter }); + return new FilteringIterator(this, self ? filter.bind(self) : filter); + // return this.transform({ filter: self ? filter.bind(self) : filter }); } /** @@ -1251,6 +1253,44 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } +export class MappingIterator extends AsyncIterator { + constructor(source: AsyncIterator, map: (item: S) => D) { + super(); + let item: S | null; + this.read = (): D | null => { + if ((item = source.read()) !== null) + return map.call(this, item); + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + +export class FilteringIterator extends AsyncIterator { + constructor(source: AsyncIterator, filter: (item: T) => boolean) { + super(); + let item: T | null; + this.read = (): T | null => { + while ((item = source.read()) !== null) { + if (filter.call(this, item)) + return item; + } + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + /** An iterator that generates items based on a source iterator diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index e033b8e..78815aa 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -7,6 +7,8 @@ import { ArrayIterator, IntegerIterator, scheduleTask, + MappingIterator, + FilteringIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1110,8 +1112,8 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + it('should be a MappingIterator', () => { + result.should.be.an.instanceof(MappingIterator); }); it('should execute the map function on all items in order', () => { @@ -1146,7 +1148,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(MappingIterator); }); it('should execute the map function on all items in order', () => { @@ -1185,7 +1187,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(FilteringIterator); }); it('should execute the filter function on all items in order', () => { @@ -1219,7 +1221,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(FilteringIterator); }); it('should execute the filter function on all items in order', () => { From 98195c274ffa9ea8f96635070904e2cf5014e2e5 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 24 Mar 2022 21:09:13 +0100 Subject: [PATCH 02/19] adds simpler and faster iterators for skipping and limiting --- asynciterator.ts | 54 ++++++++++++++++++++++++++-- test/SimpleTransformIterator-test.js | 6 ++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index bbc6931..5a8db07 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -512,7 +512,8 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - return this.transform({ offset }); + // return this.transform({ offset }); + return new SkippingIterator(this, offset); } /** @@ -522,7 +523,8 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - return this.transform({ limit }); + // return this.transform({ limit }); + return new LimitingIterator(this, limit); } /** @@ -1291,6 +1293,54 @@ export class FilteringIterator extends AsyncIterator { } } +export class SkippingIterator extends AsyncIterator { + constructor(source: AsyncIterator, skip: number) { + super(); + let item: T | null; + let skipped = 0; + this.read = (): T | null => { + while ((item = source.read()) !== null) { + if (skipped < skip) + skipped += 1; + else + return item; + } + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + +export class LimitingIterator extends AsyncIterator { + constructor(source: AsyncIterator, limit: number) { + super(); + let item: T | null; + let count = 0; + this.read = (): T | null => { + while ((item = source.read()) !== null) { + if (count < limit) { + count += 1; + return item; + } + this.close(); + return null; + } + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + /** An iterator that generates items based on a source iterator diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index 78815aa..71ef501 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -9,6 +9,8 @@ import { scheduleTask, MappingIterator, FilteringIterator, + SkippingIterator, + LimitingIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1349,7 +1351,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(SkippingIterator); }); it('should skip the given number of items', () => { @@ -1379,7 +1381,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(LimitingIterator); }); it('should take the given number of items', () => { From f1e4b14a1c1741a6044d85fe6316a37d098a6887 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 24 Mar 2022 21:31:30 +0100 Subject: [PATCH 03/19] fixes if vs. while in LimitingIterator's .read() --- asynciterator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 5a8db07..96913f8 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1322,7 +1322,7 @@ export class LimitingIterator extends AsyncIterator { let item: T | null; let count = 0; this.read = (): T | null => { - while ((item = source.read()) !== null) { + if ((item = source.read()) !== null) { if (count < limit) { count += 1; return item; From 974f076fcf15802050fc7cb90dd68aeb3db0772c Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 18:08:10 +0100 Subject: [PATCH 04/19] refactors read() functions into class methods --- asynciterator.ts | 102 +++++++++++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 38 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 96913f8..0349194 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1256,14 +1256,13 @@ function destinationFillBuffer(this: InternalSource) { } export class MappingIterator extends AsyncIterator { + protected readonly _map: (item: S) => D; + protected readonly _source: AsyncIterator; + constructor(source: AsyncIterator, map: (item: S) => D) { super(); - let item: S | null; - this.read = (): D | null => { - if ((item = source.read()) !== null) - return map.call(this, item); - return null; - }; + this._source = source; + this._map = map; source.on('end', () => { this.close(); }); @@ -1271,19 +1270,23 @@ export class MappingIterator extends AsyncIterator { this.readable = true; }); } + + read(): D | null { + const item = this._source.read(); + if (item !== null) + return this._map(item); + return null; + } } export class FilteringIterator extends AsyncIterator { + protected readonly _filter: (item: T) => boolean; + protected readonly _source: AsyncIterator; + constructor(source: AsyncIterator, filter: (item: T) => boolean) { super(); - let item: T | null; - this.read = (): T | null => { - while ((item = source.read()) !== null) { - if (filter.call(this, item)) - return item; - } - return null; - }; + this._source = source; + this._filter = filter; source.on('end', () => { this.close(); }); @@ -1291,22 +1294,27 @@ export class FilteringIterator extends AsyncIterator { this.readable = true; }); } + + read(): T | null { + let item; + while ((item = this._source.read()) !== null) { + if (this._filter(item)) + return item; + } + return null; + } } export class SkippingIterator extends AsyncIterator { + protected readonly _source: AsyncIterator; + protected readonly _skip: number; + protected _skipped: number; + constructor(source: AsyncIterator, skip: number) { super(); - let item: T | null; - let skipped = 0; - this.read = (): T | null => { - while ((item = source.read()) !== null) { - if (skipped < skip) - skipped += 1; - else - return item; - } - return null; - }; + this._skip = skip; + this._skipped = 0; + this._source = source; source.on('end', () => { this.close(); }); @@ -1314,24 +1322,29 @@ export class SkippingIterator extends AsyncIterator { this.readable = true; }); } + + read(): T | null { + let item; + while ((item = this._source.read()) !== null) { + if (this._skipped < this._skip) + this._skipped += 1; + else + return item; + } + return null; + } } export class LimitingIterator extends AsyncIterator { + protected readonly _source: AsyncIterator; + protected readonly _limit: number; + protected _count: number; + constructor(source: AsyncIterator, limit: number) { super(); - let item: T | null; - let count = 0; - this.read = (): T | null => { - if ((item = source.read()) !== null) { - if (count < limit) { - count += 1; - return item; - } - this.close(); - return null; - } - return null; - }; + this._source = source; + this._limit = limit; + this._count = 0; source.on('end', () => { this.close(); }); @@ -1339,6 +1352,19 @@ export class LimitingIterator extends AsyncIterator { this.readable = true; }); } + + read(): T | null { + const item = this._source.read(); + if (item !== null) { + if (this._count < this._limit) { + this._count += 1; + return item; + } + this.close(); + return null; + } + return null; + } } From 728f9ba790dc5fd731e70f50450f7f944def416c Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 18:31:10 +0100 Subject: [PATCH 05/19] centralizes common code into shared base class --- asynciterator.ts | 72 +++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 0349194..e11a87e 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1255,20 +1255,40 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } -export class MappingIterator extends AsyncIterator { - protected readonly _map: (item: S) => D; +export class SynchronousTransformIterator extends AsyncIterator { protected readonly _source: AsyncIterator; - constructor(source: AsyncIterator, map: (item: S) => D) { + constructor(source: AsyncIterator) { + /* eslint-disable no-use-before-define */ super(); this._source = source; - this._map = map; - source.on('end', () => { + const cleanup = () => { + source.removeListener('end', onEnd); + source.removeListener('readable', onReadable); + }; + const onEnd = () => { + cleanup(); this.close(); - }); - source.on('readable', () => { + }; + const onReadable = () => { this.readable = true; - }); + }; + source.on('end', onEnd); + source.on('readable', onReadable); + } + + protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { + super._destroy(cause, callback); + this._source.destroy(cause); + } +} + +export class MappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => D; + + constructor(source: AsyncIterator, map: (item: S) => D) { + super(source); + this._map = map; } read(): D | null { @@ -1279,20 +1299,12 @@ export class MappingIterator extends AsyncIterator { } } -export class FilteringIterator extends AsyncIterator { +export class FilteringIterator extends SynchronousTransformIterator { protected readonly _filter: (item: T) => boolean; - protected readonly _source: AsyncIterator; constructor(source: AsyncIterator, filter: (item: T) => boolean) { - super(); - this._source = source; + super(source); this._filter = filter; - source.on('end', () => { - this.close(); - }); - source.on('readable', () => { - this.readable = true; - }); } read(): T | null { @@ -1305,22 +1317,14 @@ export class FilteringIterator extends AsyncIterator { } } -export class SkippingIterator extends AsyncIterator { - protected readonly _source: AsyncIterator; +export class SkippingIterator extends SynchronousTransformIterator { protected readonly _skip: number; protected _skipped: number; constructor(source: AsyncIterator, skip: number) { - super(); + super(source); this._skip = skip; this._skipped = 0; - this._source = source; - source.on('end', () => { - this.close(); - }); - source.on('readable', () => { - this.readable = true; - }); } read(): T | null { @@ -1335,22 +1339,14 @@ export class SkippingIterator extends AsyncIterator { } } -export class LimitingIterator extends AsyncIterator { - protected readonly _source: AsyncIterator; +export class LimitingIterator extends SynchronousTransformIterator { protected readonly _limit: number; protected _count: number; constructor(source: AsyncIterator, limit: number) { - super(); - this._source = source; + super(source); this._limit = limit; this._count = 0; - source.on('end', () => { - this.close(); - }); - source.on('readable', () => { - this.readable = true; - }); } read(): T | null { From c1d47354b922ac693bf55b9b7a71620146795e60 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 10:56:50 +1100 Subject: [PATCH 06/19] chore: remove unecessary commented out lines --- asynciterator.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index e11a87e..f7c4613 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -456,7 +456,6 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D, self?: any): AsyncIterator { - // return this.transform({ map: self ? map.bind(self) : map }); return new MappingIterator(this, self ? map.bind(self) : map); } @@ -471,7 +470,6 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { return new FilteringIterator(this, self ? filter.bind(self) : filter); - // return this.transform({ filter: self ? filter.bind(self) : filter }); } /** @@ -512,7 +510,6 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - // return this.transform({ offset }); return new SkippingIterator(this, offset); } @@ -523,7 +520,6 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - // return this.transform({ limit }); return new LimitingIterator(this, limit); } From 1ec39fbb12497ad46ea5409ba683deb4cbb2079c Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 12:32:20 +1100 Subject: [PATCH 07/19] feat: optimize chained mapping and filtering --- asynciterator.ts | 138 +++++++++++++++++++++++++++++++++++++ test/AsyncIterator-test.js | 84 ++++++++++++++++++++++ 2 files changed, 222 insertions(+) diff --git a/asynciterator.ts b/asynciterator.ts index f7c4613..9ee0446 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -459,6 +459,14 @@ export class AsyncIterator extends EventEmitter { return new MappingIterator(this, self ? map.bind(self) : map); } + /** + MultiMaps items according to a synchronous generator (hence no need for buffering) + @param {Function} multiMap The function to multiMap items with + */ + multiMap(multiMap: (item: T) => Generator): AsyncIterator { + return new MultiMappingIterator(this, multiMap); + } + /** Return items from this iterator that match the filter. After this operation, only read the returned iterator instead of the current one. @@ -1279,6 +1287,32 @@ export class SynchronousTransformIterator extends AsyncIterator { } } +export class MultiMappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => Generator; + private generator?: Generator; + + constructor(source: AsyncIterator, map: (item: S) => Generator) { + super(source); + this._map = map; + } + + read(): D | null { + let _item; + + // eslint-disable-next-line no-constant-condition + while (true) { + if (!this.generator) { + if ((_item = this._source.read()) === null) + return null; + this.generator = this._map(_item); + } + if (!(_item = this.generator.next()).done) + return _item.value; + this.generator = undefined; + } + } +} + export class MappingIterator extends SynchronousTransformIterator { protected readonly _map: (item: S) => D; @@ -1293,6 +1327,28 @@ export class MappingIterator extends SynchronousTransformIterator(map: (item: D) => T, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: { + filter: false, + function: this._map, + }, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: { + filter: false, + function: this._map, + }, + }); + } } export class FilteringIterator extends SynchronousTransformIterator { @@ -1311,6 +1367,28 @@ export class FilteringIterator extends SynchronousTransformIterator { } return null; } + + map(map: (item: T) => D, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: { + filter: true, + function: this._filter, + }, + }); + } + + filter(filter: (item: T) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: { + filter: true, + function: this._filter, + }, + }); + } } export class SkippingIterator extends SynchronousTransformIterator { @@ -1359,6 +1437,66 @@ export class LimitingIterator extends SynchronousTransformIterator { } } +interface Transform { + filter: boolean, + function: Function, + next?: Transform +} + +export class MultiMapFilterTransformIterator extends SynchronousTransformIterator { + private _transformation?: (item: S) => D | null; + + constructor(source: AsyncIterator, private transforms: Transform) { + super(source); + } + + protected transformation(_item: S): D | null { + if (!this._transformation) { + let _transforms: Transform | undefined = this.transforms; + + const { filter, function: func } = _transforms!; + + this._transformation = filter ? + ((item: any) => func(item) ? item : null) : + func as any; + + while ((_transforms = _transforms!.next) !== undefined) { + const { filter: _filter, function: _func } = _transforms; + const t = this._transformation!; + + this._transformation = _filter ? + (item: any) => _func(item) ? t(item) : null : + (item: any) => t(_func(item)); + } + } + return this._transformation!(_item); + } + + read(): D | null { + let item; + while ((item = this._source.read()) !== null) { + if ((item = this.transformation(item)) !== null) + return item; + } + return null; + } + + map(map: (item: D) => T, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: this.transforms, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: this.transforms, + }); + } +} /** An iterator that generates items based on a source iterator diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 37a74d6..d7527b1 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,6 +5,7 @@ import { ENDED, DESTROYED, scheduleTask, + range, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1307,4 +1308,87 @@ describe('AsyncIterator', () => { }); }); }); + describe('Testing chains fo maps and filters', () => { + let iterator; + beforeEach(() => { + iterator = range(0, 2); + }); + it('Should handle no transforms', async () => { + iterator.read().should.equal(0); + iterator.read().should.equal(1); + iterator.read().should.equal(2); + }); + it('Should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + it('Should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + it('Should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + it('Should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + it('Should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + it('Should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + it('Should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + it('Should handle transforms', async () => { + iterator = iterator.multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }); + (await iterator.toArray()).should.deep.equal(['x0', 'y0', 'x1', 'y1', 'x2', 'y2']); + }); + it('Should handle transforms and maps', async () => { + iterator = iterator.multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }).map(x => `z${x}`); + (await iterator.toArray()).should.deep.equal(['zx0', 'zy0', 'zx1', 'zy1', 'zx2', 'zy2']); + }); + it('Should handle maps and transforms', async () => { + iterator = iterator.map(x => `z${x}`).multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }); + (await iterator.toArray()).should.deep.equal(['xz0', 'yz0', 'xz1', 'yz1', 'xz2', 'yz2']); + }); + }); }); From 6141346920244148d1a10d0601268a7058ec4644 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 18:13:49 +0200 Subject: [PATCH 08/19] fixes missing readable events --- asynciterator.ts | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 9ee0446..e9959ff 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1260,25 +1260,52 @@ function destinationFillBuffer(this: InternalSource) { } export class SynchronousTransformIterator extends AsyncIterator { - protected readonly _source: AsyncIterator; + protected _source: AsyncIterator; constructor(source: AsyncIterator) { /* eslint-disable no-use-before-define */ super(); this._source = source; const cleanup = () => { - source.removeListener('end', onEnd); - source.removeListener('readable', onReadable); + source.removeListener('end', onSourceEnd); + source.removeListener('error', onSourceError); + source.removeListener('readable', onSourceReadable); + taskScheduler(() => { + // Delayed as there might be pending tasks using the source at the + // time that cleanup() is called. + delete this._source; + }); }; - const onEnd = () => { + const onSourceEnd = () => { cleanup(); this.close(); }; - const onReadable = () => { - this.readable = true; + const onSourceError = (err: Error) => { + scheduleTask(() => { + this.emit('error', err); + }); + }; + const onSourceReadable = () => { + if (this.readable) { + // TODO: I'm not completely sure as to why this is needed but without + // the following line, some use cases relying on flow mode (i.e. + // consuming items via `on('data', (data) => {})`) do not work. + // It looks like the debouncing that happens in `set readable()` + // in `AsyncIterator` prevents the event from firing as `this` + // is already readable. + scheduleTask(() => { + this.emit('readable'); + }); + } + else { + this.readable = true; + } }; - source.on('end', onEnd); - source.on('readable', onReadable); + source.on('end', onSourceEnd); + source.on('error', onSourceError); + source.on('readable', onSourceReadable); + if (source.readable) + onSourceReadable(); } protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { From d7ab7d9a0e970b05156dcac17505c837bf1386df Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 18:47:10 +0200 Subject: [PATCH 09/19] synchronously emits readable and error events in SychronousTransformIterator --- asynciterator.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index e9959ff..b9a83cf 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1281,9 +1281,7 @@ export class SynchronousTransformIterator extends AsyncIterator { this.close(); }; const onSourceError = (err: Error) => { - scheduleTask(() => { - this.emit('error', err); - }); + this.emit('error', err); }; const onSourceReadable = () => { if (this.readable) { @@ -1293,9 +1291,7 @@ export class SynchronousTransformIterator extends AsyncIterator { // It looks like the debouncing that happens in `set readable()` // in `AsyncIterator` prevents the event from firing as `this` // is already readable. - scheduleTask(() => { - this.emit('readable'); - }); + this.emit('readable'); } else { this.readable = true; From 44fa99ccfc98376a2aa3e44741574a6105d2df3e Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Fri, 25 Mar 2022 15:00:49 +0100 Subject: [PATCH 10/19] faster wrapping of iterator-like sources --- asynciterator.ts | 38 ++++++++++++++-- test/TransformIterator-test.js | 3 +- test/WrappingIterator-test.js | 82 ++++++++++++++++++++++++++++++++++ test/wrap-test.js | 55 +++++++++++++++++++++++ 4 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 test/WrappingIterator-test.js create mode 100644 test/wrap-test.js diff --git a/asynciterator.ts b/asynciterator.ts index b9a83cf..f60e32c 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2117,17 +2117,49 @@ class HistoryReader { } } +export class WrappingIterator extends AsyncIterator { + protected _source?: InternalSource; + + constructor(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise) { + super(); + if (sourceOrPromise instanceof AsyncIterator) + return sourceOrPromise; + Promise.resolve(sourceOrPromise) + .then(source => { + // @ts-ignore - TODO: how to drop this cleanly? + if (!isFunction(source.read) || !isFunction(source.on)) + throw new Error(`Invalid source: ${source}`); + this._source = (source as InternalSource) + .on('end', () => { + this.close(); + }); + this.readable = true; + }) + .catch(error => { + this.emit('error', error); + }); + } + + read(): T | null { + if (this._source) + return this._source.read(); + return null; + } +} + /** Creates an iterator that wraps around a given iterator or readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. After this operation, only read the returned iterator instead of the given one. @function - @param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from + @param {module:asynciterator.AsyncIterator|Readable} [sourceOrPromise] The source this iterator generates items from @param {object} [options] Settings of the iterator @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ -export function wrap(source: EventEmitter | Promise, options?: TransformIteratorOptions) { - return new TransformIterator(source as AsyncIterator | Promise>, options); +export function wrap(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise, options?: TransformIteratorOptions) { + if (options) + return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); + return new WrappingIterator(sourceOrPromise); } /** diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index e5d8116..0f4f7a5 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -4,6 +4,7 @@ import { EmptyIterator, ArrayIterator, TransformIterator, + WrappingIterator, wrap, scheduleTask, } from '../dist/asynciterator.js'; @@ -38,7 +39,7 @@ describe('TransformIterator', () => { before(() => { instance = wrap(); }); it('should be an TransformIterator object', () => { - instance.should.be.an.instanceof(TransformIterator); + instance.should.be.an.instanceof(WrappingIterator); }); it('should be an AsyncIterator object', () => { diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js new file mode 100644 index 0000000..5f86a10 --- /dev/null +++ b/test/WrappingIterator-test.js @@ -0,0 +1,82 @@ +import { AsyncIterator, ArrayIterator, WrappingIterator } from '../dist/asynciterator.js'; +import { EventEmitter } from 'events'; + +describe('WrappingIterator', () => { + describe('The WrappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + + before(() => { + instance = new WrappingIterator({}); + }); + + it('should be a WrappingIterator object', () => { + instance.should.be.an.instanceof(WrappingIterator); + }); + + it('should be a AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + describe('with an invalid source', () => { + it('should emit an error', done => { + const source = {}; + const wrapped = new WrappingIterator(source); + wrapped.on('error', err => { + err; + done(); + }); + }); + }); + describe('with an empty source', () => { + it('should end when the source ends', done => { + const source = new ArrayIterator([]); + const wrapped = new WrappingIterator(source); + wrapped.on('end', () => { + done(); + }); + }); + }); + describe('with a non-empty source', () => { + it('should end when the source ends', done => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = new WrappingIterator(source); + wrapped.on('data', item => { item; }).on('end', () => { + done(); + }); + }); + it('should emit items from the source before ending', done => { + const array = [0, 1, 2, 3, 4]; + const source = new ArrayIterator(array); + const wrapped = new WrappingIterator(source); + let i = 0; + wrapped + .on('data', item => { + item.should.equal(array[i++]); + }) + .on('end', () => { + done(); + }); + }); + }); + describe('with a promise of a non-empty source', () => { + it('should emit items from the source before ending', done => { + const array = [0, 1, 2, 3, 4]; + const source = new ArrayIterator(array); + const wrapped = new WrappingIterator(Promise.resolve(source)); + let i = 0; + wrapped + .on('data', item => { + item.should.equal(array[i++]); + }) + .on('end', () => { + done(); + }); + }); + }); +}); diff --git a/test/wrap-test.js b/test/wrap-test.js new file mode 100644 index 0000000..c54e49b --- /dev/null +++ b/test/wrap-test.js @@ -0,0 +1,55 @@ + +import { + wrap, + ArrayIterator, + TransformIterator, + WrappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class IteratorLike extends EventEmitter { + constructor() { + super(); + this._count = 0; + } + + read() { + if (this._count >= 5) { + this.emit('end'); + return null; + } + return this._count++; + } +} + +describe('The wrap() function', () => { + it('should let an instance of AsyncIterator pass through without wrapping', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source); + wrapped.should.equal(source); + wrapped.should.be.instanceof(ArrayIterator); + }); + + it('should emit an error when an incompatible source is passed', done => { + const source = {}; + const wrapped = wrap(source); + wrapped.on('error', err => { + err; + done(); + }); + }); + + it('should return a TransformIterator when an options object is passed', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const options = { map: num => num * 2 }; + const wrapped = wrap(source, options); + wrapped.should.be.instanceof(TransformIterator); + }); + + it('should return a WrappingIterator when no options object is passed', () => { + const source = new IteratorLike(); + const wrapped = wrap(source); + wrapped.should.be.instanceof(WrappingIterator); + }); +}); From e8f9108eb1a777ba0e11dc2162e60af2739113ed Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Fri, 25 Mar 2022 15:07:34 +0100 Subject: [PATCH 11/19] adds missing readable event handler in WrappingIterator --- asynciterator.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/asynciterator.ts b/asynciterator.ts index f60e32c..f0af6d6 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2132,6 +2132,9 @@ export class WrappingIterator extends AsyncIterator { this._source = (source as InternalSource) .on('end', () => { this.close(); + }) + .on('readable', () => { + this.readable = true; }); this.readable = true; }) From 46c6877295c3bdd238ef94e1ae253d7b2ffb5ca1 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 11:34:07 +0100 Subject: [PATCH 12/19] puts iterator pass-through behind a dedicated option, fixes breaking end events on promisified sources --- asynciterator.ts | 67 ++++++++++++++++++++++++---------- test/TransformIterator-test.js | 19 ---------- test/WrappingIterator-test.js | 15 ++++++-- test/wrap-test.js | 20 ++++++++-- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index f0af6d6..73522be 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2117,30 +2117,54 @@ class HistoryReader { } } +export interface WrappingIteratorOptions { + letIteratorThrough?: boolean; +} + +export type PromiseLike = Pick, 'then' | 'catch'> + +/* eslint-disable arrow-body-style */ +export const isPromiseLike = (item: { [key: string]: any }): item is PromiseLike => { + return isFunction(item.then) && isFunction(item.catch); +}; + export class WrappingIterator extends AsyncIterator { protected _source?: InternalSource; - constructor(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise) { + constructor(sourceOrPromise: EventEmitter | Promise | PromiseLike, options: WrappingIteratorOptions = {}) { super(); - if (sourceOrPromise instanceof AsyncIterator) + if (options.letIteratorThrough === true && sourceOrPromise instanceof AsyncIterator) return sourceOrPromise; - Promise.resolve(sourceOrPromise) - .then(source => { - // @ts-ignore - TODO: how to drop this cleanly? - if (!isFunction(source.read) || !isFunction(source.on)) - throw new Error(`Invalid source: ${source}`); - this._source = (source as InternalSource) - .on('end', () => { - this.close(); - }) - .on('readable', () => { - this.readable = true; - }); - this.readable = true; + if (sourceOrPromise instanceof Promise || isPromiseLike(sourceOrPromise)) { + sourceOrPromise + .then(source => { + WrappingIterator._wrapSource(source, this); + }) + .catch(err => { + this.emit('error', err); + }); + } + else { + WrappingIterator._wrapSource(sourceOrPromise, this); + } + } + + protected static _wrapSource(source: EventEmitter, wrapped: WrappingIterator) { + // @ts-ignore - TODO: how to drop this cleanly? + if (!isFunction(source.read) || !isFunction(source.on)) { + taskScheduler(() => { + wrapped.emit('error', new Error(`Invalid source: ${source}`)); + }); + return; + } + wrapped._source = (source as InternalSource) + .on('end', () => { + wrapped.close(); }) - .catch(error => { - this.emit('error', error); + .on('readable', () => { + wrapped.readable = true; }); + wrapped.readable = true; } read(): T | null { @@ -2159,10 +2183,13 @@ export class WrappingIterator extends AsyncIterator { @param {object} [options] Settings of the iterator @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ -export function wrap(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise, options?: TransformIteratorOptions) { - if (options) +export function wrap( + sourceOrPromise: EventEmitter | Promise, + options: TransformIteratorOptions & WrappingIteratorOptions = {}, +): AsyncIterator { + if ('maxBufferSize' in options || 'autoStart' in options || 'optional' in options || 'destroySource' in options) return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); - return new WrappingIterator(sourceOrPromise); + return new WrappingIterator(sourceOrPromise, options); } /** diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index 0f4f7a5..a6cdcab 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -4,8 +4,6 @@ import { EmptyIterator, ArrayIterator, TransformIterator, - WrappingIterator, - wrap, scheduleTask, } from '../dist/asynciterator.js'; @@ -33,23 +31,6 @@ describe('TransformIterator', () => { instance.should.be.an.instanceof(EventEmitter); }); }); - - describe('the result when called through `wrap`', () => { - let instance; - before(() => { instance = wrap(); }); - - it('should be an TransformIterator object', () => { - instance.should.be.an.instanceof(WrappingIterator); - }); - - it('should be an AsyncIterator object', () => { - instance.should.be.an.instanceof(AsyncIterator); - }); - - it('should be an EventEmitter object', () => { - instance.should.be.an.instanceof(EventEmitter); - }); - }); }); describe('A TransformIterator', () => { diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index 5f86a10..aef904c 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -7,7 +7,7 @@ describe('WrappingIterator', () => { let instance; before(() => { - instance = new WrappingIterator({}); + instance = new WrappingIterator(new ArrayIterator([])); }); it('should be a WrappingIterator object', () => { @@ -23,7 +23,7 @@ describe('WrappingIterator', () => { }); }); }); - describe('with an invalid source', () => { + describe('the result when called with new and with an invalid source', () => { it('should emit an error', done => { const source = {}; const wrapped = new WrappingIterator(source); @@ -33,14 +33,21 @@ describe('WrappingIterator', () => { }); }); }); - describe('with an empty source', () => { - it('should end when the source ends', done => { + describe('with an empty source iterator', () => { + it('should end when the source iterator ends and letIteratorThrough is not set', done => { const source = new ArrayIterator([]); const wrapped = new WrappingIterator(source); wrapped.on('end', () => { done(); }); }); + it('should end when the source iterator ends and letIteratorThrough is set to true', done => { + const source = new ArrayIterator([]); + const wrapped = new WrappingIterator(source, { letIteratorThrough: true }); + wrapped.on('end', () => { + done(); + }); + }); }); describe('with a non-empty source', () => { it('should end when the source ends', done => { diff --git a/test/wrap-test.js b/test/wrap-test.js index c54e49b..127082b 100644 --- a/test/wrap-test.js +++ b/test/wrap-test.js @@ -24,9 +24,23 @@ class IteratorLike extends EventEmitter { } describe('The wrap() function', () => { - it('should let an instance of AsyncIterator pass through without wrapping', () => { + it('should not let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is not set', () => { const source = new ArrayIterator([0, 1, 2, 3, 4]); const wrapped = wrap(source); + wrapped.should.not.equal(source); + wrapped.should.be.instanceof(WrappingIterator); + }); + + it('should not let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is set to false', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source, { letIteratorThrough: false }); + wrapped.should.not.equal(source); + wrapped.should.be.instanceof(WrappingIterator); + }); + + it('should let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is set to true', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source, { letIteratorThrough: true }); wrapped.should.equal(source); wrapped.should.be.instanceof(ArrayIterator); }); @@ -40,9 +54,9 @@ describe('The wrap() function', () => { }); }); - it('should return a TransformIterator when an options object is passed', () => { + it('should return a TransformIterator when transform options are passed', () => { const source = new ArrayIterator([0, 1, 2, 3, 4]); - const options = { map: num => num * 2 }; + const options = { maxBufferSize: 42 }; const wrapped = wrap(source, options); wrapped.should.be.instanceof(TransformIterator); }); From 60a0d150382c1cc806279273e3e970e1a9deaab0 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 11:57:40 +0100 Subject: [PATCH 13/19] better typings for iterator-like and promise-like objects --- asynciterator.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 73522be..f0c58f3 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2121,21 +2121,27 @@ export interface WrappingIteratorOptions { letIteratorThrough?: boolean; } -export type PromiseLike = Pick, 'then' | 'catch'> +export type PromiseLike = Pick, 'then' | 'catch'>; /* eslint-disable arrow-body-style */ export const isPromiseLike = (item: { [key: string]: any }): item is PromiseLike => { return isFunction(item.then) && isFunction(item.catch); }; +export type IteratorLike = EventEmitter & { on: () => any; read: () => T | null }; + +export const isIteratorLike = (item: EventEmitter & { [key: string]: any }): item is IteratorLike => { + return isFunction(item.on) && isFunction(item.read); +}; + export class WrappingIterator extends AsyncIterator { - protected _source?: InternalSource; + protected _source?: IteratorLike; - constructor(sourceOrPromise: EventEmitter | Promise | PromiseLike, options: WrappingIteratorOptions = {}) { + constructor(sourceOrPromise: EventEmitter | PromiseLike, options: WrappingIteratorOptions = {}) { super(); if (options.letIteratorThrough === true && sourceOrPromise instanceof AsyncIterator) return sourceOrPromise; - if (sourceOrPromise instanceof Promise || isPromiseLike(sourceOrPromise)) { + if (isPromiseLike(sourceOrPromise)) { sourceOrPromise .then(source => { WrappingIterator._wrapSource(source, this); @@ -2150,14 +2156,13 @@ export class WrappingIterator extends AsyncIterator { } protected static _wrapSource(source: EventEmitter, wrapped: WrappingIterator) { - // @ts-ignore - TODO: how to drop this cleanly? - if (!isFunction(source.read) || !isFunction(source.on)) { + if (!isIteratorLike(source)) { taskScheduler(() => { wrapped.emit('error', new Error(`Invalid source: ${source}`)); }); return; } - wrapped._source = (source as InternalSource) + wrapped._source = source .on('end', () => { wrapped.close(); }) From 12885ba9ec1bc36932ca06740bd6d67e60452f7c Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sat, 26 Mar 2022 14:25:27 +1100 Subject: [PATCH 14/19] Allow unionIterator to take promises. Closes https://github.com/RubenVerborgh/AsyncIterator/issues/42 --- asynciterator.ts | 20 ++++++--- test/UnionIterator-test.js | 92 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index f0c58f3..07560c7 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1783,7 +1783,7 @@ export class MultiTransformIterator extends TransformIterator { */ export class UnionIterator extends BufferedIterator { private _sources : InternalSource[] = []; - private _pending? : { sources?: AsyncIterator> }; + private _pending? : { sources?: AsyncIterator>> }; private _currentSource = -1; /** @@ -1791,7 +1791,9 @@ export class UnionIterator extends BufferedIterator { @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from @param {object} [options] Settings of the iterator */ - constructor(sources: AsyncIteratorOrArray>, + constructor(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>, options: BufferedIteratorOptions = {}) { super(options); const autoStart = options.autoStart !== false; @@ -1799,14 +1801,14 @@ export class UnionIterator extends BufferedIterator { // Sources have been passed as an iterator if (isEventEmitter(sources)) { sources.on('error', error => this.emit('error', error)); - this._pending = { sources }; + this._pending = { sources: sources as AsyncIterator>> }; if (autoStart) this._loadSources(); } // Sources have been passed as a non-empty array else if (Array.isArray(sources) && sources.length > 0) { for (const source of sources) - this._addSource(source as InternalSource); + this._addSource(source as MaybePromise>); } // Sources are an empty list else if (autoStart) { @@ -1828,7 +1830,7 @@ export class UnionIterator extends BufferedIterator { // Otherwise, set up source reading else { sources.on('data', source => { - this._addSource(source as InternalSource); + this._addSource(source as MaybePromise>); this._fillBufferAsync(); }); sources.on('end', () => { @@ -1839,7 +1841,9 @@ export class UnionIterator extends BufferedIterator { } // Adds the given source to the internal sources array - protected _addSource(source: InternalSource) { + protected _addSource(source: MaybePromise>) { + if (isPromise(source)) + source = wrap(source) as any as InternalSource; if (!source.done) { this._sources.push(source); source._destination = this; @@ -2224,7 +2228,9 @@ export function fromArray(items: Iterable) { Creates an iterator containing all items from the given iterators. @param {Array} items the items */ -export function union(sources: AsyncIteratorOrArray>) { +export function union(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>) { return new UnionIterator(sources); } diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index 308f52d..c7714ec 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -260,6 +260,50 @@ describe('UnionIterator', () => { }); }); + describe('when constructed with an iterator and with autoStart and one source a promise', () => { + let iterator, sourceIterator; + before(() => { + const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; + sourceIterator = new ArrayIterator(sources); + sinon.spy(sourceIterator, 'read'); + iterator = new UnionIterator(sourceIterator, { autoStart: true }); + }); + + describe('before reading', () => { + it('should have read the sources', () => { + sourceIterator.read.should.have.been.called; + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + + it('should pass errors', () => { + const callback = sinon.spy(); + const error = new Error('error'); + iterator.once('error', callback); + sourceIterator.emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('after reading', () => { + let items; + before(async () => { + items = (await toArray(iterator)).sort(); + }); + + it('should have emitted all items', () => { + items.should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + }); + describe('when constructed with an iterator and without autoStart', () => { let iterator, sourceIterator; before(() => { @@ -308,6 +352,54 @@ describe('UnionIterator', () => { }); }); + describe('when constructed with an iterator and without autoStart and one source as a promise', () => { + let iterator, sourceIterator; + before(() => { + const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; + sourceIterator = new ArrayIterator(sources); + sinon.spy(sourceIterator, 'read'); + iterator = new UnionIterator(sourceIterator, { autoStart: false }); + }); + + describe('before reading', () => { + it('should not have read the sources', () => { + sourceIterator.read.should.not.have.been.called; + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + + it('should pass errors', () => { + const callback = sinon.spy(); + const error = new Error('error'); + iterator.once('error', callback); + sourceIterator.emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('after reading', () => { + let items; + before(async () => { + items = (await toArray(iterator)).sort(); + }); + + it('should have read the sources', () => { + sourceIterator.read.should.have.been.called; + }); + + it('should have emitted all items', () => { + items.should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + }); + describe('a UnionIterator with two sources', () => { let iterator, sources; From 730bf645d82257efd27fb23c3b1ef2d0d8bd3621 Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 26 Mar 2022 16:35:01 +0000 Subject: [PATCH 15/19] Release version 3.4.0 of the npm package. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 831df0f..d438e01 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "license": "MIT", "devDependencies": { "@babel/cli": "^7.10.1", diff --git a/package.json b/package.json index 03b490b..edcab1f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "description": "An asynchronous iterator library for advanced object pipelines.", "author": "Ruben Verborgh ", "type": "module", From 55f21c6ea4ff7e70375740230712e2238747defe Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 16:37:05 +1100 Subject: [PATCH 16/19] feat: wrapIterator --- asynciterator.ts | 31 +++++++++++++++++++++++++++++-- test/WrapIterator-test.js | 13 +++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 test/WrapIterator-test.js diff --git a/asynciterator.ts b/asynciterator.ts index 07560c7..51da262 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2153,8 +2153,7 @@ export class WrappingIterator extends AsyncIterator { .catch(err => { this.emit('error', err); }); - } - else { + } else { WrappingIterator._wrapSource(sourceOrPromise, this); } } @@ -2183,6 +2182,22 @@ export class WrappingIterator extends AsyncIterator { } } +class WrapIterator extends AsyncIterator { + constructor(private source: Iterator) { + super(); + this.readable = true; + } + + read(): T | null { + const item = this.source.next(); + if (item.done) { + this.close(); + return null; + } + return item.value; + } +} + /** Creates an iterator that wraps around a given iterator or readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. @@ -2201,6 +2216,18 @@ export function wrap( return new WrappingIterator(sourceOrPromise, options); } +/** + Creates an iterator that wraps around a given synchronous iterator. + Use this to convert an iterator-like object into a full-featured AsyncIterator. + After this operation, only read the returned iterator instead of the given one. + @function + @param {Iterator} [source] The source this iterator generates items from + @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator +*/ +export function wrapIterator(source: Iterator) { + return new WrapIterator(source); +} + /** Creates an empty iterator. */ diff --git a/test/WrapIterator-test.js b/test/WrapIterator-test.js new file mode 100644 index 0000000..a7adec1 --- /dev/null +++ b/test/WrapIterator-test.js @@ -0,0 +1,13 @@ +import { + wrapIterator, +} from '../dist/asynciterator.js'; + +describe('wrapIterator', () => { + it('Should wrap correctly', async () => { + (await wrapIterator((function * () { + yield 1; + yield 2; + yield 3; + })()).toArray()).should.deep.equal([1, 2, 3]); + }); +}); From 2339f4b2be2fa83790c57335b9062827c9a36a57 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 27 Mar 2022 19:19:24 +0200 Subject: [PATCH 17/19] universal wrap() function --- asynciterator.ts | 118 +++++++++++------- ...rator-test.js => IteratorIterator-test.js} | 6 +- 2 files changed, 73 insertions(+), 51 deletions(-) rename test/{WrapIterator-test.js => IteratorIterator-test.js} (64%) diff --git a/asynciterator.ts b/asynciterator.ts index 51da262..7b0b11c 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2121,58 +2121,55 @@ class HistoryReader { } } -export interface WrappingIteratorOptions { - letIteratorThrough?: boolean; -} - -export type PromiseLike = Pick, 'then' | 'catch'>; +export type IteratorLike = EventEmitter & { + on: (event: string | symbol, listener: (...args: any[]) => void) => AsyncIterator; read: () => T | null }; /* eslint-disable arrow-body-style */ -export const isPromiseLike = (item: { [key: string]: any }): item is PromiseLike => { - return isFunction(item.then) && isFunction(item.catch); +export const isIteratorLike = (item: { [key: string]: any }): item is IteratorLike => { + return isFunction(item.on) && isFunction(item.read); }; -export type IteratorLike = EventEmitter & { on: () => any; read: () => T | null }; +export const isIterator = (item: { [key: string]: any }): item is Iterator => { + return isFunction(item.next); +}; -export const isIteratorLike = (item: EventEmitter & { [key: string]: any }): item is IteratorLike => { - return isFunction(item.on) && isFunction(item.read); +export const isIterable = (item: { [key: string]: any }): item is Iterable => { + return Symbol.iterator in item; }; export class WrappingIterator extends AsyncIterator { protected _source?: IteratorLike; - constructor(sourceOrPromise: EventEmitter | PromiseLike, options: WrappingIteratorOptions = {}) { + constructor(sourceOrPromise: WrapSource | Promise>, options: WrapOptions = {}) { super(); - if (options.letIteratorThrough === true && sourceOrPromise instanceof AsyncIterator) - return sourceOrPromise; - if (isPromiseLike(sourceOrPromise)) { + if (isPromise(sourceOrPromise)) { sourceOrPromise .then(source => { - WrappingIterator._wrapSource(source, this); + WrappingIterator._wrapSource(source, this, options); }) .catch(err => { this.emit('error', err); }); - } else { - WrappingIterator._wrapSource(sourceOrPromise, this); + } + else { + WrappingIterator._wrapSource(sourceOrPromise, this, options); } } - protected static _wrapSource(source: EventEmitter, wrapped: WrappingIterator) { - if (!isIteratorLike(source)) { - taskScheduler(() => { - wrapped.emit('error', new Error(`Invalid source: ${source}`)); - }); - return; + protected static _wrapSource(source: WrapSource, iterator: WrappingIterator, options: WrapOptions = {}) { + try { + iterator._source = (isIteratorLike(source) ? source : _wrap(source, options)) + .on('end', () => { + iterator.close(); + }) + .on('readable', () => { + iterator.readable = true; + }); + iterator.readable = true; + } + catch (err) { + scheduleTask(() => iterator.emit('error', err)); } - wrapped._source = source - .on('end', () => { - wrapped.close(); - }) - .on('readable', () => { - wrapped.readable = true; - }); - wrapped.readable = true; } read(): T | null { @@ -2182,7 +2179,7 @@ export class WrappingIterator extends AsyncIterator { } } -class WrapIterator extends AsyncIterator { +export class IteratorIterator extends AsyncIterator { constructor(private source: Iterator) { super(); this.readable = true; @@ -2198,8 +2195,29 @@ class WrapIterator extends AsyncIterator { } } +export interface WrapOptions { + letIteratorThrough?: boolean; +} + +export type WrapSource = T[] | EventEmitter | Iterator | Iterable; + +const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterator => { + if (options.letIteratorThrough && source instanceof AsyncIterator) + return source; + if (Array.isArray(source)) + return new ArrayIterator(source); + if (isIteratorLike(source)) + return new WrappingIterator(source); + if (isIterator(source)) + return new IteratorIterator(source); + if (isIterable(source)) + return new IteratorIterator(source[Symbol.iterator]()); + throw new Error(`Unsupported source ${source}`); +}; + /** - Creates an iterator that wraps around a given iterator or readable stream. + Creates an iterator that wraps around a given array, iterator, iterable or + readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. After this operation, only read the returned iterator instead of the given one. @function @@ -2208,24 +2226,28 @@ class WrapIterator extends AsyncIterator { @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ export function wrap( - sourceOrPromise: EventEmitter | Promise, - options: TransformIteratorOptions & WrappingIteratorOptions = {}, + sourceOrPromise: WrapSource | Promise>, + options: TransformIteratorOptions & WrapOptions = {}, ): AsyncIterator { + // For backward compatibility, passing TransformIteratorOptions results in + // an instance of TransformIterator. + // TODO: consider dropping this in the next major version if ('maxBufferSize' in options || 'autoStart' in options || 'optional' in options || 'destroySource' in options) return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); - return new WrappingIterator(sourceOrPromise, options); -} - -/** - Creates an iterator that wraps around a given synchronous iterator. - Use this to convert an iterator-like object into a full-featured AsyncIterator. - After this operation, only read the returned iterator instead of the given one. - @function - @param {Iterator} [source] The source this iterator generates items from - @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator -*/ -export function wrapIterator(source: Iterator) { - return new WrapIterator(source); + // If the source is promisified, we *need* to use a WrappingIterator as this + // function is a synchronous one. + if (isPromise(sourceOrPromise)) + return new WrappingIterator(sourceOrPromise, options); + // The _wrap function synchronously return an iterator or throws on + // unsupported sources. However, for backward-compatiblity we need + // to relay errors as events of an AsyncIterator instance. + // TODO: consider dropping this in the next major version + try { + return _wrap(sourceOrPromise as WrapSource, options); + } + catch (err) { + return new WrappingIterator(sourceOrPromise); + } } /** diff --git a/test/WrapIterator-test.js b/test/IteratorIterator-test.js similarity index 64% rename from test/WrapIterator-test.js rename to test/IteratorIterator-test.js index a7adec1..868fc67 100644 --- a/test/WrapIterator-test.js +++ b/test/IteratorIterator-test.js @@ -1,10 +1,10 @@ import { - wrapIterator, + IteratorIterator, } from '../dist/asynciterator.js'; -describe('wrapIterator', () => { +describe('IteratorIterator', () => { it('Should wrap correctly', async () => { - (await wrapIterator((function * () { + (await new IteratorIterator((function * () { yield 1; yield 2; yield 3; From 3abf6b76b70df659b6a1e4de339d158bca183c6b Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 07:10:47 +0200 Subject: [PATCH 18/19] adds dedicated from* functions for ES2015 Iterator and Iterable, IteratorLike, adds option to prioritize ES2015 Iterable and Iterator while wrapping --- asynciterator.ts | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 7b0b11c..5316d99 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2196,6 +2196,7 @@ export class IteratorIterator extends AsyncIterator { } export interface WrapOptions { + prioritizeIterable?: boolean; letIteratorThrough?: boolean; } @@ -2204,14 +2205,22 @@ export type WrapSource = T[] | EventEmitter | Iterator | Iterable; const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterator => { if (options.letIteratorThrough && source instanceof AsyncIterator) return source; + if (options.prioritizeIterable) { + if (isIterator(source)) + return fromIterator(source); + if (isIterable(source)) + return fromIterable(source); + } if (Array.isArray(source)) - return new ArrayIterator(source); + return fromArray(source); if (isIteratorLike(source)) - return new WrappingIterator(source); - if (isIterator(source)) - return new IteratorIterator(source); - if (isIterable(source)) - return new IteratorIterator(source[Symbol.iterator]()); + return fromIteratorLike(source); + if (!options.prioritizeIterable) { + if (isIterator(source)) + return fromIterator(source); + if (isIterable(source)) + return fromIterable(source); + } throw new Error(`Unsupported source ${source}`); }; @@ -2269,10 +2278,35 @@ export function single(item: T) { Creates an iterator for the given array. @param {Array} items the items */ -export function fromArray(items: Iterable) { +export function fromArray(items: Iterable): AsyncIterator { return new ArrayIterator(items); } +/** + Creates an iterator for the given ES2015 Iterable. + @param {Iterable} iterable the iterable + */ +export function fromIterable(iterable: Iterable): AsyncIterator { + return new IteratorIterator(iterable[Symbol.iterator]()); +} + +/** + Creates an iterator for the given ES2015 Iterator. + @param {Iterable} iterator the iterator + */ +export function fromIterator(iterator: Iterator): AsyncIterator { + return new IteratorIterator(iterator); +} + +/** + * Creates an iterator for the given iterator-like object + * (AsyncIterator, stream.Readable, ...). + * @param {IteratorLike} iterator + */ +export function fromIteratorLike(iterator: IteratorLike): AsyncIterator { + return new WrappingIterator(iterator); +} + /** Creates an iterator containing all items from the given iterators. @param {Array} items the items From 10308913ee54c9ef6fec220dd5b7dcac80cd2653 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 07:27:58 +0200 Subject: [PATCH 19/19] renames IteratorLike to AsyncIteratorLike to highlight difference from ES2015 Iterator --- asynciterator.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 5316d99..e938963 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2121,11 +2121,11 @@ class HistoryReader { } } -export type IteratorLike = EventEmitter & { +export type AsyncIteratorLike = EventEmitter & { on: (event: string | symbol, listener: (...args: any[]) => void) => AsyncIterator; read: () => T | null }; /* eslint-disable arrow-body-style */ -export const isIteratorLike = (item: { [key: string]: any }): item is IteratorLike => { +export const isAsyncIteratorLike = (item: { [key: string]: any }): item is AsyncIteratorLike => { return isFunction(item.on) && isFunction(item.read); }; @@ -2138,7 +2138,7 @@ export const isIterable = (item: { [key: string]: any }): item is Iterable }; export class WrappingIterator extends AsyncIterator { - protected _source?: IteratorLike; + protected _source?: AsyncIteratorLike; constructor(sourceOrPromise: WrapSource | Promise>, options: WrapOptions = {}) { super(); @@ -2158,7 +2158,7 @@ export class WrappingIterator extends AsyncIterator { protected static _wrapSource(source: WrapSource, iterator: WrappingIterator, options: WrapOptions = {}) { try { - iterator._source = (isIteratorLike(source) ? source : _wrap(source, options)) + iterator._source = (isAsyncIteratorLike(source) ? source : _wrap(source, options)) .on('end', () => { iterator.close(); }) @@ -2213,8 +2213,8 @@ const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterat } if (Array.isArray(source)) return fromArray(source); - if (isIteratorLike(source)) - return fromIteratorLike(source); + if (isAsyncIteratorLike(source)) + return fromAsyncIteratorLike(source); if (!options.prioritizeIterable) { if (isIterator(source)) return fromIterator(source); @@ -2301,9 +2301,9 @@ export function fromIterator(iterator: Iterator): AsyncIterator { /** * Creates an iterator for the given iterator-like object * (AsyncIterator, stream.Readable, ...). - * @param {IteratorLike} iterator + * @param {AsyncIteratorLike} iterator */ -export function fromIteratorLike(iterator: IteratorLike): AsyncIterator { +export function fromAsyncIteratorLike(iterator: AsyncIteratorLike): AsyncIterator { return new WrappingIterator(iterator); }