49 lines
1.1 KiB
JavaScript
49 lines
1.1 KiB
JavaScript
|
// Runs a limited number of promises at one time
|
||
|
class ConcurrentQueue {
|
||
|
constructor(consecutive) {
|
||
|
this.consecutive = consecutive;
|
||
|
this.queue = [];
|
||
|
this.current = 0;
|
||
|
this.drainListeners = [];
|
||
|
}
|
||
|
|
||
|
_checkQueue() {
|
||
|
if (this.current == 0 && this.queue.length == 0) {
|
||
|
for (let drainListener of this.drainListeners) {
|
||
|
drainListener();
|
||
|
}
|
||
|
this.drainListeners = [];
|
||
|
}
|
||
|
while (this.current < this.consecutive && this.queue.length > 0) {
|
||
|
let taskData = this.queue.shift();
|
||
|
this.current += 1;
|
||
|
(async () => {
|
||
|
try {
|
||
|
taskData.resolve(await taskData.task());
|
||
|
} catch (e) {
|
||
|
taskData.reject(e);
|
||
|
}
|
||
|
this.current -= 1;
|
||
|
this._checkQueue();
|
||
|
})();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// returns a promise that can be awaited to get the resolution or rejection of the task's execution
|
||
|
push(task) {
|
||
|
return new Promise((resolve, reject) => {
|
||
|
this.queue.push({ task, resolve, reject })
|
||
|
this._checkQueue();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
async waitForDrain() {
|
||
|
return new Promise((resolve) => {
|
||
|
this.drainListeners.push(resolve);
|
||
|
this._checkQueue();
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = ConcurrentQueue;
|