job-queue.ts 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
  3. import { Job } from './job';
  4. import { CreateQueueOptions, JobConfig, JobData } from './types';
  5. /**
  6. * @description
  7. * A JobQueue is used to process {@link Job}s. A job is added to the queue via the
  8. * `.add()` method, and the queue will then poll for new jobs and process each
  9. * according to the process
  10. *
  11. * @docsCateogory JobQueue
  12. */
  13. export class JobQueue<Data extends JobData<Data> = {}> {
  14. private activeJobs: Array<Job<Data>> = [];
  15. private timer: any;
  16. private fooId: number;
  17. private running = false;
  18. get concurrency(): number {
  19. return this.options.concurrency;
  20. }
  21. get name(): string {
  22. return this.options.name;
  23. }
  24. get started(): boolean {
  25. return this.running;
  26. }
  27. constructor(
  28. private options: CreateQueueOptions<Data>,
  29. private jobQueueStrategy: JobQueueStrategy,
  30. private pollInterval: number,
  31. ) {}
  32. /** @internal */
  33. start() {
  34. if (this.running) {
  35. return;
  36. }
  37. this.running = true;
  38. const concurrency = this.options.concurrency;
  39. const runNextJobs = async () => {
  40. const runningJobsCount = this.activeJobs.length;
  41. for (let i = runningJobsCount; i < concurrency; i++) {
  42. const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(this.options.name);
  43. if (nextJob) {
  44. this.activeJobs.push(nextJob);
  45. nextJob.on('complete', (job) => this.onFailOrComplete(job));
  46. nextJob.on('fail', (job) => this.onFailOrComplete(job));
  47. try {
  48. const returnVal = this.options.process(nextJob);
  49. if (returnVal instanceof Promise) {
  50. returnVal.catch((err) => nextJob.fail(err));
  51. }
  52. } catch (err) {
  53. nextJob.fail(err);
  54. }
  55. await this.jobQueueStrategy.update(nextJob);
  56. }
  57. }
  58. this.timer = setTimeout(runNextJobs, this.pollInterval);
  59. };
  60. runNextJobs();
  61. }
  62. /** @internal */
  63. pause() {
  64. this.running = false;
  65. clearTimeout(this.timer);
  66. }
  67. /** @internal */
  68. async destroy(): Promise<void> {
  69. this.running = false;
  70. clearTimeout(this.timer);
  71. const start = +new Date();
  72. const maxTimeout = 5000;
  73. return new Promise((resolve) => {
  74. const pollActiveJobs = () => {
  75. const timedOut = +new Date() - start > maxTimeout;
  76. if (this.activeJobs.length === 0 || timedOut) {
  77. resolve();
  78. } else {
  79. setTimeout(pollActiveJobs, 50);
  80. }
  81. };
  82. pollActiveJobs();
  83. });
  84. }
  85. /**
  86. * @description
  87. * Adds a new {@link Job} to the queue.
  88. */
  89. add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>) {
  90. const job = new Job<any>({
  91. data,
  92. queueName: this.options.name,
  93. retries: options?.retries ?? 0,
  94. });
  95. return this.jobQueueStrategy.add(job);
  96. }
  97. private async onFailOrComplete(job: Job<Data>) {
  98. await this.jobQueueStrategy.update(job);
  99. this.removeJobFromActive(job);
  100. }
  101. private removeJobFromActive(job: Job<Data>) {
  102. const index = this.activeJobs.indexOf(job);
  103. this.activeJobs.splice(index, 1);
  104. }
  105. }