async-queue.ts 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. export type Task<T = any> = () => Promise<T> | T;
  2. export type Resolve<T> = (val: T) => void;
  3. export type Reject<T> = (val: T) => void;
  4. type TaskQueueItem = { task: Task; resolve: Resolve<any>; reject: Reject<any> };
  5. /**
  6. * @description
  7. * A queue class for limiting concurrent async tasks. This can be used e.g. to prevent
  8. * race conditions when working on a shared resource such as writing to a database.
  9. *
  10. * @docsCategory common
  11. */
  12. export class AsyncQueue {
  13. private static running: { [label: string]: number } = {};
  14. private static taskQueue: { [label: string]: TaskQueueItem[] } = {};
  15. constructor(private label: string = 'default', private concurrency: number = 1) {
  16. if (!AsyncQueue.taskQueue[label]) {
  17. AsyncQueue.taskQueue[label] = [];
  18. AsyncQueue.running[label] = 0;
  19. }
  20. }
  21. private get running(): number {
  22. return AsyncQueue.running[this.label];
  23. }
  24. private inc() {
  25. AsyncQueue.running[this.label]++;
  26. }
  27. private dec() {
  28. AsyncQueue.running[this.label]--;
  29. }
  30. /**
  31. * @description
  32. * Pushes a new task onto the queue, upon which the task will either execute immediately or
  33. * (if the number of running tasks is equal to the concurrency limit) enqueue the task to
  34. * be executed at the soonest opportunity.
  35. */
  36. push<T>(task: Task<T>): Promise<T> {
  37. return new Promise<T>((resolve, reject) => {
  38. void (this.running < this.concurrency
  39. ? this.runTask(task, resolve, reject)
  40. : this.enqueueTask(task, resolve, reject));
  41. });
  42. }
  43. private async runTask<T>(task: Task<T>, resolve: Resolve<T>, reject: Reject<T>) {
  44. this.inc();
  45. try {
  46. const result = await task();
  47. resolve(result);
  48. } catch (e: any) {
  49. reject(e);
  50. }
  51. this.dec();
  52. if (this.getQueue().length > 0) {
  53. const nextTask = this.getQueue().shift();
  54. if (nextTask) {
  55. await this.runTask(nextTask.task, nextTask.resolve, nextTask.reject);
  56. }
  57. }
  58. }
  59. private enqueueTask<T>(task: Task<T>, resolve: Resolve<T>, reject: Reject<T>) {
  60. this.getQueue().push({ task, resolve, reject });
  61. }
  62. private getQueue(): TaskQueueItem[] {
  63. return AsyncQueue.taskQueue[this.label];
  64. }
  65. }