Browse Source

docs(core): Add documentation to job buffer components

Michael Bromley 4 years ago
parent
commit
a3af77110a

+ 13 - 1
packages/core/src/job-queue/job-buffer/in-memory-job-buffer-storage-strategy.ts

@@ -2,8 +2,20 @@ import { Job } from '../job';
 
 import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
 
+/**
+ * @description
+ * A {@link JobBufferStorageStrategy} which keeps the buffered jobs in memory. Should
+ * _not_ be used in production, since it will lose data in the event of the server
+ * stopping.
+ *
+ * Instead, use the {@link DefaultJobQueuePlugin} with the `useDatabaseForBuffer: true` option set,
+ * or the {@link BullMQJobQueuePlugin} or another custom strategy with persistent storage.
+ *
+ * @since 1.3.0
+ * @docsCategory JobQueue
+ */
 export class InMemoryJobBufferStorageStrategy implements JobBufferStorageStrategy {
-    private bufferStorage = new Map<string, Set<Job>>();
+    protected bufferStorage = new Map<string, Set<Job>>();
 
     async add(bufferId: string, job: Job): Promise<Job> {
         const set = this.getSet(bufferId);

+ 45 - 0
packages/core/src/job-queue/job-buffer/job-buffer-storage-strategy.ts

@@ -1,8 +1,53 @@
 import { InjectableStrategy } from '../../common/types/injectable-strategy';
 import { Job } from '../job';
 
+/**
+ * @description
+ * This strategy defines where to store jobs that have been collected by a
+ * {@link JobBuffer}.
+ *
+ * @since 1.3.0
+ */
 export interface JobBufferStorageStrategy extends InjectableStrategy {
+    /**
+     * @description
+     * Persist a job to the storage medium. The storage format should
+     * take into account the `bufferId` argument, as it is necessary to be
+     * able to later retrieve jobs by that id.
+     */
     add(bufferId: string, job: Job): Promise<Job>;
+
+    /**
+     * @description
+     * Returns an object containing the number of buffered jobs arranged by bufferId.
+     *
+     * Passing bufferIds limits the results to the specified bufferIds.
+     * If the array is empty, sizes will be returned for _all_ bufferIds.
+     *
+     * @example
+     * ```TypeScript
+     * const sizes = await myJobBufferStrategy.bufferSize(['buffer-1', 'buffer-2']);
+     *
+     * // sizes = { 'buffer-1': 12, 'buffer-2': 3 }
+     * ```
+     */
     bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }>;
+
+    /**
+     * @description
+     * Clears all jobs from the storage medium which match the specified bufferIds (if the
+     * array is empty, clear for _all_ bufferIds), and returns those jobs in an object
+     * arranged by bufferId
+     *
+     * @example
+     * ```TypeScript
+     * const result = await myJobBufferStrategy.flush(['buffer-1', 'buffer-2']);
+     *
+     * // result = {
+     * //   'buffer-1': [Job, Job, Job, ...],
+     * //   'buffer-2': [Job, Job, Job, ...],
+     * // };
+     * ```
+     */
     flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }>;
 }

+ 5 - 0
packages/core/src/job-queue/job-buffer/job-buffer.service.ts

@@ -8,6 +8,11 @@ import { Job } from '../job';
 import { JobBuffer } from './job-buffer';
 import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
 
+/**
+ * @description
+ * Used to manage {@link JobBuffer}s.Primarily intended to be used internally by the {@link JobQueueService}, which
+ * exposes its public-facing functionality.
+ */
 @Injectable()
 export class JobBufferService {
     private buffers = new Set<JobBuffer>();

+ 72 - 0
packages/core/src/job-queue/job-buffer/job-buffer.ts

@@ -1,8 +1,80 @@
 import { Job } from '../job';
 import { JobData } from '../types';
 
+/**
+ * @description
+ * A JobBuffer is used to temporarily prevent jobs from being sent to the job queue for processing.
+ * Instead, it collects certain jobs (as specified by the `collect()` method), and stores them.
+ *
+ * How these buffered jobs are stored is determined by the configured {@link JobBufferStorageStrategy}.
+ *
+ * The JobBuffer can be thought of as a kind of "interceptor" of jobs. That is, when a JobBuffer is active,
+ * it sits in between calls to `JobQueue.add()` and the actual adding of the job to the queue.
+ *
+ * At some later point, the buffer can be flushed (by calling `JobQueue.flush()`), at which point all the jobs
+ * that were collected into the buffer will be removed from the buffer and passed to the `JobBuffer.reduce()` method.
+ * This method is able to perform additional logic to e.g. aggregate many jobs into a single job in order to de-duplicate
+ * work.
+ *
+ * @example
+ * ```TypeScript
+ * // This is a buffer which will collect all the
+ * // 'apply-collection-filters' jobs and buffer them.
+ * export class CollectionJobBuffer implements JobBuffer<ApplyCollectionFiltersJobData> {
+ *   readonly id = 'apply-collection-filters-buffer';
+ *
+ *   collect(job: Job): boolean {
+ *     return job.queueName === 'apply-collection-filters';
+ *   }
+ *
+ *
+ *   // When the buffer gets flushed, this function will be passed all the collected jobs
+ *   // and will reduce them down to a single job that has aggregated all of the collectionIds.
+ *   reduce(collectedJobs: Array<Job<ApplyCollectionFiltersJobData>>): Array<Job<any>> {
+ *     // Concatenate all the collectionIds from all the events that were buffered
+ *     const collectionIdsToUpdate = collectedJobs.reduce((result, job) => {
+ *       return [...result, ...job.data.collectionIds];
+ *     }, [] as ID[]);
+ *
+ *     const referenceJob = collectedJobs[0];
+ *
+ *     // Create a new Job containing all the concatenated collectionIds,
+ *     // de-duplicated to include each collectionId only once.
+ *     const batchedCollectionJob = new Job<ApplyCollectionFiltersJobData>({
+ *       ...referenceJob,
+ *       id: undefined,
+ *       data: {
+ *         collectionIds: unique(collectionIdsToUpdate),
+ *         ctx: referenceJob.data.ctx,
+ *         applyToChangedVariantsOnly: referenceJob.data.applyToChangedVariantsOnly,
+ *       },
+ *     });
+ *
+ *     // Only this single job will get added to the job queue
+ *     return [batchedCollectionJob];
+ *   }
+ * }
+ * ```
+ *
+ * @docsCategory JobQueue
+ * @since 1.3.0
+ */
 export interface JobBuffer<Data extends JobData<Data> = {}> {
     readonly id: string;
+
+    /**
+     * @description
+     * This method is called whenever a job is added to the job queue. If it returns `true`, then
+     * the job will be _buffered_ and _not_ added to the job queue. If it returns `false`, the job
+     * will be added to the job queue as normal.
+     */
     collect(job: Job<Data>): boolean | Promise<boolean>;
+
+    /**
+     * @description
+     * This method is called whenever the buffer gets flushed via a call to `JobQueueService.flush()`.
+     * It allows logic to be run on the buffered jobs which enables optimizations such as
+     * aggregating and de-duplicating the work of many jobs into one job.
+     */
     reduce(collectedJobs: Array<Job<Data>>): Array<Job<Data>> | Promise<Array<Job<Data>>>;
 }

+ 46 - 0
packages/core/src/job-queue/job-queue.service.ts

@@ -87,18 +87,64 @@ export class JobQueueService implements OnModuleDestroy {
         }
     }
 
+    /**
+     * @description
+     * Adds a {@link JobBuffer}, which will make it active and begin collecting
+     * jobs to buffer.
+     *
+     * @since 1.3.0
+     */
     addBuffer(buffer: JobBuffer<any>) {
         this.jobBufferService.addBuffer(buffer);
     }
 
+    /**
+     * @description
+     * Removes a {@link JobBuffer}, prevent it from collecting and buffering any
+     * subsequent jobs.
+     *
+     * @since 1.3.0
+     */
     removeBuffer(buffer: JobBuffer<any>) {
         this.jobBufferService.removeBuffer(buffer);
     }
 
+    /**
+     * @description
+     * Returns an object containing the number of buffered jobs arranged by bufferId. This
+     * can be used to decide whether a particular buffer has any jobs to flush.
+     *
+     * Passing in JobBuffer instances _or_ ids limits the results to the specified JobBuffers.
+     * If no argument is passed, sizes will be returned for _all_ JobBuffers.
+     *
+     * @example
+     * ```TypeScript
+     * const sizes = await this.jobQueueService.bufferSize('buffer-1', 'buffer-2');
+     *
+     * // sizes = { 'buffer-1': 12, 'buffer-2': 3 }
+     * ```
+     *
+     * @since 1.3.0
+     */
     bufferSize(...forBuffers: Array<JobBuffer<any> | string>): Promise<{ [bufferId: string]: number }> {
         return this.jobBufferService.bufferSize(forBuffers);
     }
 
+    /**
+     * @description
+     * Flushes the specified buffers, which means that the buffer is cleared and the jobs get
+     * sent to the job queue for processing. Before sending the jobs to the job queue,
+     * they will be passed through each JobBuffer's `reduce()` method, which is can be used
+     * to optimize the amount of work to be done by e.g. de-duplicating identical jobs or
+     * aggregating data over the collected jobs.
+     *
+     * Passing in JobBuffer instances _or_ ids limits the action to the specified JobBuffers.
+     * If no argument is passed, _all_ JobBuffers will be flushed.
+     *
+     * Returns an array of all Jobs which were added to the job queue.
+     *
+     * @since 1.3.0
+     */
     flush(...forBuffers: Array<JobBuffer<any> | string>): Promise<Job[]> {
         return this.jobBufferService.flush(forBuffers);
     }