cordis/concurrent-queue/concurrent-queue.ts
2021-10-30 12:26:41 -05:00

58 lines
1.4 KiB
TypeScript

interface TaskData<T> {
task: (() => Promise<T> | T);
resolve: ((result: T) => void);
reject: ((error: Error) => void);
}
// Runs a limited number of promises at one time
export default class ConcurrentQueue<T> {
private consecutive: number;
private queue: TaskData<T>[];
private current: number;
private drainListeners: (() => void)[];
constructor(consecutive: number) {
this.consecutive = consecutive;
this.queue = [];
this.current = 0;
this.drainListeners = [];
}
_checkQueue() {
if (this.current == 0 && this.queue.length == 0) {
for (let drainListener of this.drainListeners) {
drainListener();
}
this.drainListeners = [];
}
while (this.current < this.consecutive && this.queue.length > 0) {
let taskData = this.queue.shift(); if (taskData === undefined) break;
this.current += 1;
(async () => {
try {
taskData.resolve(await taskData.task());
} catch (e) {
taskData.reject(e);
}
this.current -= 1;
this._checkQueue();
})();
}
}
// returns a promise that can be awaited to get the resolution or rejection of the task's execution
async push(task: () => Promise<T> | T) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject })
this._checkQueue();
});
}
async waitForDrain(): Promise<void> {
return new Promise((resolve) => {
this.drainListeners.push(() => { resolve(); });
this._checkQueue();
});
}
}