job-queue.ts 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
  3. import { Job } from './job';
  4. import { CreateQueueOptions, JobConfig, JobData } from './types';
  5. /**
  6. * @description
  7. * A JobQueue is used to process {@link Job}s. A job is added to the queue via the
  8. * `.add()` method, and the queue will then poll for new jobs and process each
  9. * according to the defined `process` function.
  10. *
  11. * *Note*: JobQueue instances should not be directly instantiated. Rather, the
  12. * {@link JobQueueService} `createQueue()` method should be used (see that service
  13. * for example usage).
  14. *
  15. * @docsCategory JobQueue
  16. */
  17. export class JobQueue<Data extends JobData<Data> = {}> {
  18. private activeJobs: Array<Job<Data>> = [];
  19. private timer: any;
  20. private fooId: number;
  21. private running = false;
  22. get concurrency(): number {
  23. return this.options.concurrency;
  24. }
  25. get name(): string {
  26. return this.options.name;
  27. }
  28. get started(): boolean {
  29. return this.running;
  30. }
  31. constructor(
  32. private options: CreateQueueOptions<Data>,
  33. private jobQueueStrategy: JobQueueStrategy,
  34. private pollInterval: number,
  35. ) {}
  36. /** @internal */
  37. start() {
  38. if (this.running) {
  39. return;
  40. }
  41. this.running = true;
  42. const concurrency = this.options.concurrency;
  43. const runNextJobs = async () => {
  44. const runningJobsCount = this.activeJobs.length;
  45. for (let i = runningJobsCount; i < concurrency; i++) {
  46. const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(this.options.name);
  47. if (nextJob) {
  48. this.activeJobs.push(nextJob);
  49. await this.jobQueueStrategy.update(nextJob);
  50. nextJob.on('complete', job => this.onFailOrComplete(job));
  51. nextJob.on('progress', job => this.jobQueueStrategy.update(job));
  52. nextJob.on('fail', job => this.onFailOrComplete(job));
  53. try {
  54. const returnVal = this.options.process(nextJob);
  55. if (returnVal instanceof Promise) {
  56. returnVal.catch(err => nextJob.fail(err));
  57. }
  58. } catch (err) {
  59. nextJob.fail(err);
  60. }
  61. }
  62. }
  63. if (this.running) {
  64. this.timer = setTimeout(runNextJobs, this.pollInterval);
  65. }
  66. };
  67. runNextJobs();
  68. }
  69. /** @internal */
  70. pause() {
  71. this.running = false;
  72. clearTimeout(this.timer);
  73. }
  74. /** @internal */
  75. async destroy(): Promise<void> {
  76. this.running = false;
  77. clearTimeout(this.timer);
  78. const start = +new Date();
  79. // Wait for 2 seconds to allow running jobs to complete
  80. const maxTimeout = 2000;
  81. return new Promise(resolve => {
  82. const pollActiveJobs = async () => {
  83. const timedOut = +new Date() - start > maxTimeout;
  84. if (this.activeJobs.length === 0 || timedOut) {
  85. // if there are any incomplete jobs after the 2 second
  86. // wait period, set them back to "pending" so they can
  87. // be re-run on next bootstrap.
  88. for (const job of this.activeJobs) {
  89. job.defer();
  90. await this.jobQueueStrategy.update(job);
  91. }
  92. resolve();
  93. } else {
  94. setTimeout(pollActiveJobs, 50);
  95. }
  96. };
  97. pollActiveJobs();
  98. });
  99. }
  100. /**
  101. * @description
  102. * Adds a new {@link Job} to the queue.
  103. */
  104. add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>) {
  105. const job = new Job<any>({
  106. data,
  107. queueName: this.options.name,
  108. retries: options?.retries ?? 0,
  109. });
  110. return this.jobQueueStrategy.add(job);
  111. }
  112. private async onFailOrComplete(job: Job<Data>) {
  113. await this.jobQueueStrategy.update(job);
  114. this.removeJobFromActive(job);
  115. }
  116. private removeJobFromActive(job: Job<Data>) {
  117. const index = this.activeJobs.indexOf(job);
  118. this.activeJobs.splice(index, 1);
  119. }
  120. }