job-queue.service.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import { Injectable, OnModuleDestroy } from '@nestjs/common';
  2. import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
  3. import { ConfigService, JobQueueStrategy, Logger } from '../config';
  4. import { loggerCtx } from './constants';
  5. import { Job } from './job';
  6. import { JobBuffer } from './job-buffer/job-buffer';
  7. import { JobBufferService } from './job-buffer/job-buffer.service';
  8. import { JobQueue } from './job-queue';
  9. import { CreateQueueOptions, JobData } from './types';
  10. /**
  11. * @description
  12. * The JobQueueService is used to create new {@link JobQueue} instances and access
  13. * existing jobs.
  14. *
  15. * @example
  16. * ```TypeScript
  17. * // A service which transcodes video files
  18. * class VideoTranscoderService {
  19. *
  20. * private jobQueue: JobQueue<{ videoId: string; }>;
  21. *
  22. * async onModuleInit() {
  23. * // The JobQueue is created on initialization
  24. * this.jobQueue = await this.jobQueueService.createQueue({
  25. * name: 'transcode-video',
  26. * process: async job => {
  27. * return await this.transcodeVideo(job.data.videoId);
  28. * },
  29. * });
  30. * }
  31. *
  32. * addToTranscodeQueue(videoId: string) {
  33. * this.jobQueue.add({ videoId, })
  34. * }
  35. *
  36. * private async transcodeVideo(videoId: string) {
  37. * // e.g. call some external transcoding service
  38. * }
  39. *
  40. * }
  41. * ```
  42. *
  43. * @docsCategory JobQueue
  44. */
  45. @Injectable()
  46. export class JobQueueService implements OnModuleDestroy {
  47. private queues: Array<JobQueue<any>> = [];
  48. private hasStarted = false;
  49. private get jobQueueStrategy(): JobQueueStrategy {
  50. return this.configService.jobQueueOptions.jobQueueStrategy;
  51. }
  52. constructor(private configService: ConfigService, private jobBufferService: JobBufferService) {}
  53. /** @internal */
  54. onModuleDestroy() {
  55. this.hasStarted = false;
  56. return Promise.all(this.queues.map(q => q.stop()));
  57. }
  58. /**
  59. * @description
  60. * Configures and creates a new {@link JobQueue} instance.
  61. */
  62. async createQueue<Data extends JobData<Data>>(
  63. options: CreateQueueOptions<Data>,
  64. ): Promise<JobQueue<Data>> {
  65. if (this.configService.jobQueueOptions.prefix) {
  66. options = { ...options, name: `${this.configService.jobQueueOptions.prefix}${options.name}` };
  67. }
  68. const queue = new JobQueue(options, this.jobQueueStrategy, this.jobBufferService);
  69. if (this.hasStarted && this.shouldStartQueue(queue.name)) {
  70. await queue.start();
  71. }
  72. this.queues.push(queue);
  73. return queue;
  74. }
  75. async start(): Promise<void> {
  76. this.hasStarted = true;
  77. for (const queue of this.queues) {
  78. if (!queue.started && this.shouldStartQueue(queue.name)) {
  79. Logger.info(`Starting queue: ${queue.name}`, loggerCtx);
  80. await queue.start();
  81. }
  82. }
  83. }
  84. /**
  85. * @description
  86. * Adds a {@link JobBuffer}, which will make it active and begin collecting
  87. * jobs to buffer.
  88. *
  89. * @since 1.3.0
  90. */
  91. addBuffer(buffer: JobBuffer<any>) {
  92. this.jobBufferService.addBuffer(buffer);
  93. }
  94. /**
  95. * @description
  96. * Removes a {@link JobBuffer}, prevent it from collecting and buffering any
  97. * subsequent jobs.
  98. *
  99. * @since 1.3.0
  100. */
  101. removeBuffer(buffer: JobBuffer<any>) {
  102. this.jobBufferService.removeBuffer(buffer);
  103. }
  104. /**
  105. * @description
  106. * Returns an object containing the number of buffered jobs arranged by bufferId. This
  107. * can be used to decide whether a particular buffer has any jobs to flush.
  108. *
  109. * Passing in JobBuffer instances _or_ ids limits the results to the specified JobBuffers.
  110. * If no argument is passed, sizes will be returned for _all_ JobBuffers.
  111. *
  112. * @example
  113. * ```TypeScript
  114. * const sizes = await this.jobQueueService.bufferSize('buffer-1', 'buffer-2');
  115. *
  116. * // sizes = { 'buffer-1': 12, 'buffer-2': 3 }
  117. * ```
  118. *
  119. * @since 1.3.0
  120. */
  121. bufferSize(...forBuffers: Array<JobBuffer<any> | string>): Promise<{ [bufferId: string]: number }> {
  122. return this.jobBufferService.bufferSize(forBuffers);
  123. }
  124. /**
  125. * @description
  126. * Flushes the specified buffers, which means that the buffer is cleared and the jobs get
  127. * sent to the job queue for processing. Before sending the jobs to the job queue,
  128. * they will be passed through each JobBuffer's `reduce()` method, which is can be used
  129. * to optimize the amount of work to be done by e.g. de-duplicating identical jobs or
  130. * aggregating data over the collected jobs.
  131. *
  132. * Passing in JobBuffer instances _or_ ids limits the action to the specified JobBuffers.
  133. * If no argument is passed, _all_ JobBuffers will be flushed.
  134. *
  135. * Returns an array of all Jobs which were added to the job queue.
  136. *
  137. * @since 1.3.0
  138. */
  139. flush(...forBuffers: Array<JobBuffer<any> | string>): Promise<Job[]> {
  140. return this.jobBufferService.flush(forBuffers);
  141. }
  142. /**
  143. * @description
  144. * Returns an array of `{ name: string; running: boolean; }` for each
  145. * registered JobQueue.
  146. */
  147. getJobQueues(): GraphQlJobQueue[] {
  148. return this.queues.map(queue => ({
  149. name: queue.name,
  150. running: queue.started,
  151. }));
  152. }
  153. private shouldStartQueue(queueName: string): boolean {
  154. if (this.configService.jobQueueOptions.activeQueues.length > 0) {
  155. if (!this.configService.jobQueueOptions.activeQueues.includes(queueName)) {
  156. return false;
  157. }
  158. }
  159. return true;
  160. }
  161. }