job-queue.ts 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import { Instrument } from '../common';
  2. import { JobQueueStrategy } from '../config';
  3. import { Job } from './job';
  4. import { JobBufferService } from './job-buffer/job-buffer.service';
  5. import { SubscribableJob } from './subscribable-job';
  6. import { CreateQueueOptions, JobData, JobOptions } from './types';
  7. /**
  8. * @description
  9. * A JobQueue is used to process {@link Job}s. A job is added to the queue via the
  10. * `.add()` method, and the configured {@link JobQueueStrategy} will check for new jobs and process each
  11. * according to the defined `process` function.
  12. *
  13. * *Note*: JobQueue instances should not be directly instantiated. Rather, the
  14. * {@link JobQueueService} `createQueue()` method should be used (see that service
  15. * for example usage).
  16. *
  17. * @docsCategory JobQueue
  18. */
  19. @Instrument()
  20. export class JobQueue<Data extends JobData<Data> = object> {
  21. private running = false;
  22. get name(): string {
  23. return this.options.name;
  24. }
  25. get started(): boolean {
  26. return this.running;
  27. }
  28. constructor(
  29. private options: CreateQueueOptions<Data>,
  30. private jobQueueStrategy: JobQueueStrategy,
  31. private jobBufferService: JobBufferService,
  32. ) {}
  33. /** @internal */
  34. async start() {
  35. if (this.running) {
  36. return;
  37. }
  38. this.running = true;
  39. await this.jobQueueStrategy.start<Data>(this.options.name, this.options.process);
  40. }
  41. /** @internal */
  42. async stop(): Promise<void> {
  43. if (!this.running) {
  44. return;
  45. }
  46. this.running = false;
  47. return this.jobQueueStrategy.stop(this.options.name, this.options.process);
  48. }
  49. /**
  50. * @description
  51. * Adds a new {@link Job} to the queue. The resolved {@link SubscribableJob} allows the
  52. * calling code to subscribe to updates to the Job:
  53. *
  54. * @example
  55. * ```ts
  56. * const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 2 });
  57. * return job.updates().pipe(
  58. * map(update => {
  59. * // The returned Observable will emit a value for every update to the job
  60. * // such as when the `progress` or `status` value changes.
  61. * Logger.info(`Job ${update.id}: progress: ${update.progress}`);
  62. * if (update.state === JobState.COMPLETED) {
  63. * Logger.info(`COMPLETED ${update.id}: ${update.result}`);
  64. * }
  65. * return update.result;
  66. * }),
  67. * catchError(err => of(err.message)),
  68. * );
  69. * ```
  70. *
  71. * Alternatively, if you aren't interested in the intermediate
  72. * `progress` changes, you can convert to a Promise like this:
  73. *
  74. * @example
  75. * ```ts
  76. * const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 2 });
  77. * return job.updates().toPromise()
  78. * .then(update => update.result),
  79. * .catch(err => err.message);
  80. * ```
  81. */
  82. async add(data: Data, options?: JobOptions<Data>): Promise<SubscribableJob<Data>> {
  83. const job = new Job<any>({
  84. data,
  85. queueName: this.options.name,
  86. retries: options?.retries ?? 0,
  87. });
  88. const isBuffered = await this.jobBufferService.add(job);
  89. if (!isBuffered) {
  90. const addedJob = await this.jobQueueStrategy.add(job, options);
  91. return new SubscribableJob(addedJob, this.jobQueueStrategy);
  92. } else {
  93. const bufferedJob = new Job({
  94. ...job,
  95. data: job.data,
  96. id: 'buffered',
  97. });
  98. return new SubscribableJob(bufferedJob, this.jobQueueStrategy);
  99. }
  100. }
  101. }