Browse Source

feat(job-queue-plugin): Add support for job options Priority

Michael Bromley 10 months ago
parent
commit
90b5e05ec3

+ 23 - 24
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -159,9 +159,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             delay: 1000,
             type: 'exponential',
         };
+        const customJobOptions = this.options.setJobOptions?.(job.queueName, job) ?? {};
         const bullJob = await this.queue.add(job.queueName, job.data, {
             attempts: retries + 1,
             backoff,
+            ...customJobOptions,
         });
         return this.createVendureJob(bullJob);
     }
@@ -194,7 +196,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         if (stateFilter?.eq) {
             switch (stateFilter.eq) {
                 case 'PENDING':
-                    jobTypes = ['wait'];
+                    jobTypes = ['wait', 'waiting-children', 'prioritized'];
                     break;
                 case 'RUNNING':
                     jobTypes = ['active'];
@@ -218,7 +220,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             jobTypes =
                 settledFilter.eq === true
                     ? ['completed', 'failed']
-                    : ['wait', 'waiting-children', 'active', 'repeat', 'delayed', 'paused'];
+                    : ['wait', 'waiting-children', 'active', 'repeat', 'delayed', 'paused', 'prioritized'];
         }
 
         let items: Bull.Job[] = [];
@@ -382,30 +384,27 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
     private async getState(bullJob: Bull.Job): Promise<JobState> {
         const jobId = bullJob.id?.toString();
-
-        if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) {
-            return JobState.PENDING;
-        }
-        if (await bullJob.isActive()) {
-            const isCancelled =
-                jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId));
-            if (isCancelled) {
-                return JobState.CANCELLED;
-            } else {
-                return JobState.RUNNING;
+        const state = await bullJob.getState();
+        switch (state) {
+            case 'completed':
+                return JobState.COMPLETED;
+            case 'failed':
+                return JobState.FAILED;
+            case 'waiting':
+            case 'waiting-children':
+            case 'prioritized':
+                return JobState.PENDING;
+            case 'delayed':
+                return JobState.RETRYING;
+            case 'active': {
+                const isCancelled =
+                    jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId));
+                return isCancelled ? JobState.CANCELLED : JobState.RUNNING;
             }
+            case 'unknown':
+            default:
+                throw new InternalServerError(`Could not determine job state: ${state}`);
         }
-        if (await bullJob.isDelayed()) {
-            return JobState.RETRYING;
-        }
-        if (await bullJob.isFailed()) {
-            return JobState.FAILED;
-        }
-        if (await bullJob.isCompleted()) {
-            return JobState.COMPLETED;
-        }
-        throw new InternalServerError('Could not determine job state');
-        // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.
     }
 
     private callCustomScript<T, Args extends any[]>(

+ 1 - 0
packages/job-queue-plugin/src/bullmq/constants.ts

@@ -12,4 +12,5 @@ export const ALL_JOB_TYPES: JobType[] = [
     'active',
     'wait',
     'paused',
+    'prioritized',
 ];

+ 40 - 0
packages/job-queue-plugin/src/bullmq/plugin.ts

@@ -129,6 +129,46 @@ import { BullMQPluginOptions } from './types';
  * maximum age of a job in seconds. If both options are specified, then the jobs kept will be the ones that satisfy
  * both properties.
  *
+ * ## Job Priority
+ * Some jobs are more important than others. For example, sending out a timely email after a customer places an order
+ * is probably more important than a routine data import task. Sometimes you can get the situation where lower-priority
+ * jobs are blocking higher-priority jobs.
+ *
+ * Let's say you have a data import job that runs periodically and takes a long time to complete. If you have a high-priority
+ * job that needs to be processed quickly, it could be stuck behind the data import job in the queue. A customer might
+ * not get their confirmation email for 30 minutes while that data import job is processed!
+ *
+ * To solve this problem, you can set the `priority` option on a job. Jobs with a higher priority will be processed before
+ * jobs with a lower priority. By default, all jobs have a priority of 0 (which is the highest).
+ *
+ * Learn more about how priority works in BullMQ in their [documentation](https://docs.bullmq.io/guide/jobs/prioritized).
+ *
+ * You can set the priority by using the `setJobOptions` option (introduced in Vendure v3.2.0):
+ *
+ * @example
+ * ```ts
+ * const config: VendureConfig = {
+ *   plugins: [
+ *     BullMQJobQueuePlugin.init({
+ *       setJobOptions: (queueName, job) => {
+ *         let priority = 10;
+ *         switch (queueName) {
+ *           case 'super-critical-task':
+ *             priority = 0;
+ *             break;
+ *           case 'send-email':
+ *             priority = 5;
+ *             break;
+ *           default:
+ *             priority = 10;
+ *         }
+ *         return { priority };
+ *       }
+ *     }),
+ *   ],
+ * };
+ * ```
+ *
  * @docsCategory core plugins/JobQueuePlugin
  */
 @VendurePlugin({

+ 32 - 1
packages/job-queue-plugin/src/bullmq/types.ts

@@ -1,7 +1,14 @@
 import { Job } from '@vendure/core';
-import { ConnectionOptions, WorkerOptions } from 'bullmq';
+import { ConnectionOptions, WorkerOptions, Queue } from 'bullmq';
 import { QueueOptions } from 'bullmq';
 
+/**
+ * @description
+ * This type is the third parameter to the `Queue.add()` method,
+ * which allows additional options to be specified for the job.
+ */
+export type BullJobsOptions = Parameters<Queue['add']>[2];
+
 /**
  * @description
  * Configuration options for the {@link BullMQJobQueuePlugin}.
@@ -54,6 +61,7 @@ export interface BullMQPluginOptions {
      * }
      *  ```
      *
+     * @deprecated Use `setJobOptions` instead.
      * @since 1.3.0
      */
     setRetries?: (queueName: string, job: Job) => number;
@@ -73,10 +81,33 @@ export interface BullMQPluginOptions {
      *   };
      * }
      * ```
+     *
+     * @deprecated Use `setJobOptions` instead.
      * @since 1.3.0
      * @default 'exponential', 1000
      */
     setBackoff?: (queueName: string, job: Job) => BackoffOptions | undefined;
+    /**
+     * @description
+     * This allows you to specify additional options for a job when it is added to the queue.
+     * The object returned is the BullMQ [JobsOptions](https://api.docs.bullmq.io/types/v5.JobsOptions.html)
+     * type, which includes control over settings such as `delay`, `attempts`, `priority` and much more.
+     *
+     * This function is called every time a job is added to the queue, so you can return different options
+     * based on the job being added.
+     *
+     * @example
+     * ```ts
+     * // Here we want to assign a higher priority to jobs in the 'critical' queue
+     * setJobOptions: (queueName, job) => {
+     *   const priority = queueName === 'critical' ? 1 : 5;
+     *   return { priority };
+     * }
+     * ```
+     *
+     * @since 3.2.0
+     */
+    setJobOptions?: (queueName: string, job: Job) => BullJobsOptions;
 }
 
 /**