job-queue.ts 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { Subject, Subscription } from 'rxjs';
  3. import { throttleTime } from 'rxjs/operators';
  4. import { JobQueueStrategy } from '../config';
  5. import { Logger } from '../config/logger/vendure-logger';
  6. import { Job } from './job';
  7. import { JobBufferService } from './job-buffer/job-buffer.service';
  8. import { SubscribableJob } from './subscribable-job';
  9. import { CreateQueueOptions, JobConfig, JobData } from './types';
  10. /**
  11. * @description
  12. * A JobQueue is used to process {@link Job}s. A job is added to the queue via the
  13. * `.add()` method, and the configured {@link JobQueueStrategy} will check for new jobs and process each
  14. * according to the defined `process` function.
  15. *
  16. * *Note*: JobQueue instances should not be directly instantiated. Rather, the
  17. * {@link JobQueueService} `createQueue()` method should be used (see that service
  18. * for example usage).
  19. *
  20. * @docsCategory JobQueue
  21. */
  22. export class JobQueue<Data extends JobData<Data> = object> {
  23. private running = false;
  24. get name(): string {
  25. return this.options.name;
  26. }
  27. get started(): boolean {
  28. return this.running;
  29. }
  30. constructor(
  31. private options: CreateQueueOptions<Data>,
  32. private jobQueueStrategy: JobQueueStrategy,
  33. private jobBufferService: JobBufferService,
  34. ) {}
  35. /** @internal */
  36. async start() {
  37. if (this.running) {
  38. return;
  39. }
  40. this.running = true;
  41. await this.jobQueueStrategy.start<Data>(this.options.name, this.options.process);
  42. }
  43. /** @internal */
  44. async stop(): Promise<void> {
  45. if (!this.running) {
  46. return;
  47. }
  48. this.running = false;
  49. return this.jobQueueStrategy.stop(this.options.name, this.options.process);
  50. }
  51. /**
  52. * @description
  53. * Adds a new {@link Job} to the queue. The resolved {@link SubscribableJob} allows the
  54. * calling code to subscribe to updates to the Job:
  55. *
  56. * @example
  57. * ```ts
  58. * const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 2 });
  59. * return job.updates().pipe(
  60. * map(update => {
  61. * // The returned Observable will emit a value for every update to the job
  62. * // such as when the `progress` or `status` value changes.
  63. * Logger.info(`Job ${update.id}: progress: ${update.progress}`);
  64. * if (update.state === JobState.COMPLETED) {
  65. * Logger.info(`COMPLETED ${update.id}: ${update.result}`);
  66. * }
  67. * return update.result;
  68. * }),
  69. * catchError(err => of(err.message)),
  70. * );
  71. * ```
  72. *
  73. * Alternatively, if you aren't interested in the intermediate
  74. * `progress` changes, you can convert to a Promise like this:
  75. *
  76. * @example
  77. * ```ts
  78. * const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 2 });
  79. * return job.updates().toPromise()
  80. * .then(update => update.result),
  81. * .catch(err => err.message);
  82. * ```
  83. */
  84. async add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>): Promise<SubscribableJob<Data>> {
  85. const job = new Job<any>({
  86. data,
  87. queueName: this.options.name,
  88. retries: options?.retries ?? 0,
  89. });
  90. const isBuffered = await this.jobBufferService.add(job);
  91. if (!isBuffered) {
  92. try {
  93. const addedJob = await this.jobQueueStrategy.add(job);
  94. return new SubscribableJob(addedJob, this.jobQueueStrategy);
  95. } catch (err: any) {
  96. Logger.error(`Could not add Job to "${this.name}" queue`, undefined, err.stack);
  97. return new SubscribableJob(job, this.jobQueueStrategy);
  98. }
  99. } else {
  100. const bufferedJob = new Job({
  101. ...job,
  102. data: job.data,
  103. id: 'buffered',
  104. });
  105. return new SubscribableJob(bufferedJob, this.jobQueueStrategy);
  106. }
  107. }
  108. }