job-buffer.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import { Job } from '../job';
  2. import { JobData } from '../types';
  3. /**
  4. * @description
  5. * A JobBuffer is used to temporarily prevent jobs from being sent to the job queue for processing.
  6. * Instead, it collects certain jobs (as specified by the `collect()` method), and stores them.
  7. *
  8. * How these buffered jobs are stored is determined by the configured {@link JobBufferStorageStrategy}.
  9. *
  10. * The JobBuffer can be thought of as a kind of "interceptor" of jobs. That is, when a JobBuffer is active,
  11. * it sits in between calls to `JobQueue.add()` and the actual adding of the job to the queue.
  12. *
  13. * At some later point, the buffer can be flushed (by calling `JobQueue.flush()`), at which point all the jobs
  14. * that were collected into the buffer will be removed from the buffer and passed to the `JobBuffer.reduce()` method.
  15. * This method is able to perform additional logic to e.g. aggregate many jobs into a single job in order to de-duplicate
  16. * work.
  17. *
  18. * @example
  19. * ```ts
  20. * // This is a buffer which will collect all the
  21. * // 'apply-collection-filters' jobs and buffer them.
  22. * export class CollectionJobBuffer implements JobBuffer<ApplyCollectionFiltersJobData> {
  23. * readonly id = 'apply-collection-filters-buffer';
  24. *
  25. * collect(job: Job): boolean {
  26. * return job.queueName === 'apply-collection-filters';
  27. * }
  28. *
  29. *
  30. * // When the buffer gets flushed, this function will be passed all the collected jobs
  31. * // and will reduce them down to a single job that has aggregated all of the collectionIds.
  32. * reduce(collectedJobs: Array<Job<ApplyCollectionFiltersJobData>>): Array<Job<any>> {
  33. * // Concatenate all the collectionIds from all the events that were buffered
  34. * const collectionIdsToUpdate = collectedJobs.reduce((result, job) => {
  35. * return [...result, ...job.data.collectionIds];
  36. * }, [] as ID[]);
  37. *
  38. * const referenceJob = collectedJobs[0];
  39. *
  40. * // Create a new Job containing all the concatenated collectionIds,
  41. * // de-duplicated to include each collectionId only once.
  42. * const batchedCollectionJob = new Job<ApplyCollectionFiltersJobData>({
  43. * ...referenceJob,
  44. * id: undefined,
  45. * data: {
  46. * collectionIds: unique(collectionIdsToUpdate),
  47. * ctx: referenceJob.data.ctx,
  48. * applyToChangedVariantsOnly: referenceJob.data.applyToChangedVariantsOnly,
  49. * },
  50. * });
  51. *
  52. * // Only this single job will get added to the job queue
  53. * return [batchedCollectionJob];
  54. * }
  55. * }
  56. * ```
  57. *
  58. * A JobBuffer is used by adding it to the {@link JobQueueService}, at which point it will become active
  59. * and start collecting jobs.
  60. *
  61. * At some later point, the buffer can be flushed, causing the buffered jobs to be passed through the
  62. * `reduce()` method and sent to the job queue.
  63. *
  64. * @example
  65. * ```ts
  66. * const collectionBuffer = new CollectionJobBuffer();
  67. *
  68. * await this.jobQueueService.addBuffer(collectionBuffer);
  69. *
  70. * // Here you can perform some work which would ordinarily
  71. * // trigger the 'apply-collection-filters' job, such as updating
  72. * // collection filters or changing ProductVariant prices.
  73. *
  74. * await this.jobQueueService.flush(collectionBuffer);
  75. *
  76. * await this.jobQueueService.removeBuffer(collectionBuffer);
  77. * ```
  78. *
  79. * @docsCategory JobQueue
  80. * @since 1.3.0
  81. */
  82. export interface JobBuffer<Data extends JobData<Data> = object> {
  83. readonly id: string;
  84. /**
  85. * @description
  86. * This method is called whenever a job is added to the job queue. If it returns `true`, then
  87. * the job will be _buffered_ and _not_ added to the job queue. If it returns `false`, the job
  88. * will be added to the job queue as normal.
  89. */
  90. collect(job: Job<Data>): boolean | Promise<boolean>;
  91. /**
  92. * @description
  93. * This method is called whenever the buffer gets flushed via a call to `JobQueueService.flush()`.
  94. * It allows logic to be run on the buffered jobs which enables optimizations such as
  95. * aggregating and de-duplicating the work of many jobs into one job.
  96. */
  97. reduce(collectedJobs: Array<Job<Data>>): Array<Job<Data>> | Promise<Array<Job<Data>>>;
  98. }