job-queue.service.ts 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import { Injectable, OnModuleDestroy } from '@nestjs/common';
  2. import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types';
  3. import { Instrument } from '../common';
  4. import { ConfigService, JobQueueStrategy, Logger } from '../config';
  5. import { loggerCtx } from './constants';
  6. import { Job } from './job';
  7. import { JobBuffer } from './job-buffer/job-buffer';
  8. import { JobBufferService } from './job-buffer/job-buffer.service';
  9. import { JobQueue } from './job-queue';
  10. import { CreateQueueOptions, JobData } from './types';
  11. /**
  12. * @description
  13. * The JobQueueService is used to create new {@link JobQueue} instances and access
  14. * existing jobs.
  15. *
  16. * @example
  17. * ```ts
  18. * // A service which transcodes video files
  19. * class VideoTranscoderService {
  20. *
  21. * private jobQueue: JobQueue<{ videoId: string; }>;
  22. *
  23. * async onModuleInit() {
  24. * // The JobQueue is created on initialization
  25. * this.jobQueue = await this.jobQueueService.createQueue({
  26. * name: 'transcode-video',
  27. * process: async job => {
  28. * return await this.transcodeVideo(job.data.videoId);
  29. * },
  30. * });
  31. * }
  32. *
  33. * addToTranscodeQueue(videoId: string) {
  34. * this.jobQueue.add({ videoId, })
  35. * }
  36. *
  37. * private async transcodeVideo(videoId: string) {
  38. * // e.g. call some external transcoding service
  39. * }
  40. *
  41. * }
  42. * ```
  43. *
  44. * @docsCategory JobQueue
  45. */
  46. @Injectable()
  47. @Instrument()
  48. export class JobQueueService implements OnModuleDestroy {
  49. private queues: Array<JobQueue<any>> = [];
  50. private hasStarted = false;
  51. private get jobQueueStrategy(): JobQueueStrategy {
  52. return this.configService.jobQueueOptions.jobQueueStrategy;
  53. }
  54. constructor(
  55. private configService: ConfigService,
  56. private jobBufferService: JobBufferService,
  57. ) {}
  58. /** @internal */
  59. onModuleDestroy() {
  60. this.hasStarted = false;
  61. return Promise.all(this.queues.map(q => q.stop()));
  62. }
  63. /**
  64. * @description
  65. * Configures and creates a new {@link JobQueue} instance.
  66. */
  67. async createQueue<Data extends JobData<Data>>(
  68. options: CreateQueueOptions<Data>,
  69. ): Promise<JobQueue<Data>> {
  70. if (this.configService.jobQueueOptions.prefix) {
  71. options = { ...options, name: `${this.configService.jobQueueOptions.prefix}${options.name}` };
  72. }
  73. const wrappedProcessFn = this.createWrappedProcessFn(options.process);
  74. options = { ...options, process: wrappedProcessFn };
  75. const queue = new JobQueue(options, this.jobQueueStrategy, this.jobBufferService);
  76. if (this.hasStarted && this.shouldStartQueue(queue.name)) {
  77. await queue.start();
  78. }
  79. this.queues.push(queue);
  80. return queue;
  81. }
  82. async start(): Promise<void> {
  83. this.hasStarted = true;
  84. for (const queue of this.queues) {
  85. if (!queue.started && this.shouldStartQueue(queue.name)) {
  86. Logger.info(`Starting queue: ${queue.name}`, loggerCtx);
  87. await queue.start();
  88. }
  89. }
  90. }
  91. /**
  92. * @description
  93. * Adds a {@link JobBuffer}, which will make it active and begin collecting
  94. * jobs to buffer.
  95. *
  96. * @since 1.3.0
  97. */
  98. addBuffer(buffer: JobBuffer<any>) {
  99. this.jobBufferService.addBuffer(buffer);
  100. }
  101. /**
  102. * @description
  103. * Removes a {@link JobBuffer}, prevent it from collecting and buffering any
  104. * subsequent jobs.
  105. *
  106. * @since 1.3.0
  107. */
  108. removeBuffer(buffer: JobBuffer<any>) {
  109. this.jobBufferService.removeBuffer(buffer);
  110. }
  111. /**
  112. * @description
  113. * Returns an object containing the number of buffered jobs arranged by bufferId. This
  114. * can be used to decide whether a particular buffer has any jobs to flush.
  115. *
  116. * Passing in JobBuffer instances _or_ ids limits the results to the specified JobBuffers.
  117. * If no argument is passed, sizes will be returned for _all_ JobBuffers.
  118. *
  119. * @example
  120. * ```ts
  121. * const sizes = await this.jobQueueService.bufferSize('buffer-1', 'buffer-2');
  122. *
  123. * // sizes = { 'buffer-1': 12, 'buffer-2': 3 }
  124. * ```
  125. *
  126. * @since 1.3.0
  127. */
  128. bufferSize(...forBuffers: Array<JobBuffer<any> | string>): Promise<{ [bufferId: string]: number }> {
  129. return this.jobBufferService.bufferSize(forBuffers);
  130. }
  131. /**
  132. * @description
  133. * Flushes the specified buffers, which means that the buffer is cleared and the jobs get
  134. * sent to the job queue for processing. Before sending the jobs to the job queue,
  135. * they will be passed through each JobBuffer's `reduce()` method, which is can be used
  136. * to optimize the amount of work to be done by e.g. de-duplicating identical jobs or
  137. * aggregating data over the collected jobs.
  138. *
  139. * Passing in JobBuffer instances _or_ ids limits the action to the specified JobBuffers.
  140. * If no argument is passed, _all_ JobBuffers will be flushed.
  141. *
  142. * Returns an array of all Jobs which were added to the job queue.
  143. *
  144. * @since 1.3.0
  145. */
  146. flush(...forBuffers: Array<JobBuffer<any> | string>): Promise<Job[]> {
  147. return this.jobBufferService.flush(forBuffers);
  148. }
  149. /**
  150. * @description
  151. * Returns an array of `{ name: string; running: boolean; }` for each
  152. * registered JobQueue.
  153. */
  154. getJobQueues(): GraphQlJobQueue[] {
  155. return this.queues.map(queue => ({
  156. name: queue.name,
  157. running: queue.started,
  158. }));
  159. }
  160. /**
  161. * We wrap the process function in order to catch any errors thrown and pass them to
  162. * any configured ErrorHandlerStrategies.
  163. */
  164. private createWrappedProcessFn<Data extends JobData<Data>>(
  165. processFn: (job: Job<Data>) => Promise<any>,
  166. ): (job: Job<Data>) => Promise<any> {
  167. const { errorHandlers } = this.configService.systemOptions;
  168. return async (job: Job<Data>) => {
  169. try {
  170. return await processFn(job);
  171. } catch (e) {
  172. for (const handler of errorHandlers) {
  173. if (e instanceof Error) {
  174. void handler.handleWorkerError(e, { job });
  175. }
  176. }
  177. throw e;
  178. }
  179. };
  180. }
  181. private shouldStartQueue(queueName: string): boolean {
  182. if (this.configService.jobQueueOptions.activeQueues.length > 0) {
  183. if (!this.configService.jobQueueOptions.activeQueues.includes(queueName)) {
  184. return false;
  185. }
  186. }
  187. return true;
  188. }
  189. }