58 lines
1.4 KiB
TypeScript
58 lines
1.4 KiB
TypeScript
interface TaskData<T> {
|
|
task: (() => Promise<T> | T);
|
|
resolve: ((result: T) => void);
|
|
reject: ((error: Error) => void);
|
|
}
|
|
|
|
// Runs a limited number of promises at one time
|
|
export default class ConcurrentQueue<T> {
|
|
private consecutive: number;
|
|
private queue: TaskData<T>[];
|
|
private current: number;
|
|
private drainListeners: (() => void)[];
|
|
|
|
constructor(consecutive: number) {
|
|
this.consecutive = consecutive;
|
|
this.queue = [];
|
|
this.current = 0;
|
|
this.drainListeners = [];
|
|
}
|
|
|
|
_checkQueue() {
|
|
if (this.current == 0 && this.queue.length == 0) {
|
|
for (let drainListener of this.drainListeners) {
|
|
drainListener();
|
|
}
|
|
this.drainListeners = [];
|
|
}
|
|
while (this.current < this.consecutive && this.queue.length > 0) {
|
|
let taskData = this.queue.shift(); if (taskData === undefined) break;
|
|
this.current += 1;
|
|
(async () => {
|
|
try {
|
|
taskData.resolve(await taskData.task());
|
|
} catch (e) {
|
|
taskData.reject(e);
|
|
}
|
|
this.current -= 1;
|
|
this._checkQueue();
|
|
})();
|
|
}
|
|
}
|
|
|
|
// returns a promise that can be awaited to get the resolution or rejection of the task's execution
|
|
async push(task: () => Promise<T> | T) {
|
|
return new Promise((resolve, reject) => {
|
|
this.queue.push({ task, resolve, reject })
|
|
this._checkQueue();
|
|
});
|
|
}
|
|
|
|
async waitForDrain(): Promise<void> {
|
|
return new Promise((resolve) => {
|
|
this.drainListeners.push(() => { resolve(); });
|
|
this._checkQueue();
|
|
});
|
|
}
|
|
}
|