Browse Source

feat(core): JobQueueStrategy pollInterval accepts function

Allows finer control over polling intervals based on queue type.
Michael Bromley 4 years ago
parent
commit
c2701b945d

+ 9 - 4
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -24,7 +24,7 @@ export type BackoffStrategy = (queueName: string, attemptsMade: number, job: Job
 
 export interface PollingJobQueueStrategyConfig {
     concurrency?: number;
-    pollInterval?: number;
+    pollInterval?: number | ((queueName: string) => number);
     backoffStrategy?: BackoffStrategy;
 }
 
@@ -38,6 +38,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
     private errorNotifier$ = new Subject<[string, string]>();
     private queueStopped$ = new Subject<typeof STOP_SIGNAL>();
     private subscription: Subscription;
+    private readonly pollInterval: number;
 
     constructor(
         private readonly queueName: string,
@@ -48,6 +49,10 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
             Logger.error(message);
             Logger.debug(stack);
         });
+        this.pollInterval =
+            typeof this.jobQueueStrategy.pollInterval === 'function'
+                ? this.jobQueueStrategy.pollInterval(queueName)
+                : this.jobQueueStrategy.pollInterval;
     }
 
     start() {
@@ -63,7 +68,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
                         await this.jobQueueStrategy.update(nextJob);
                         const onProgress = (job: Job) => this.jobQueueStrategy.update(job);
                         nextJob.on('progress', onProgress);
-                        const cancellationSignal$ = interval(this.jobQueueStrategy.pollInterval * 5).pipe(
+                        const cancellationSignal$ = interval(this.pollInterval * 5).pipe(
                             // tslint:disable-next-line:no-non-null-assertion
                             switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)),
                             filter(job => job?.state === JobState.CANCELLED),
@@ -106,7 +111,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
                 ]);
             }
             if (this.running) {
-                this.timer = setTimeout(runNextJobs, this.jobQueueStrategy.pollInterval);
+                this.timer = setTimeout(runNextJobs, this.pollInterval);
             }
         };
 
@@ -159,7 +164,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
  */
 export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
     public concurrency: number;
-    public pollInterval: number;
+    public pollInterval: number | ((queueName: string) => number);
     public backOffStrategy?: BackoffStrategy;
 
     private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();

+ 37 - 7
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -7,6 +7,20 @@ import { VendurePlugin } from '../vendure-plugin';
 import { JobRecord } from './job-record.entity';
 import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
 
+/**
+ * @description
+ * Configuration options for the DefaultJobQueuePlugin. These values get passed into the
+ * {@link SqlJobQueueStrategy}.
+ *
+ * @docsCategory JobQueue
+ * @docsPage DefaultJobQueuePlugin
+ */
+export interface DefaultJobQueueOptions {
+    pollInterval?: number | ((queueName: string) => number);
+    concurrency?: number;
+    backoffStrategy?: BackoffStrategy;
+}
+
 /**
  * @description
  * A plugin which configures Vendure to use the SQL database to persist the JobQueue jobs using the {@link SqlJobQueueStrategy}. If you add this
@@ -32,8 +46,27 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
  * ### pollInterval
  * The interval in ms between polling for new jobs. The default is 200ms.
  * Using a longer interval reduces load on the database but results in a slight
- * delay in processing jobs.
+ * delay in processing jobs. For more control, it is possible to supply a function which can specify
+ * a pollInterval based on the queue name:
  *
+ * @example
+ * ```TypeScript
+ * export const config: VendureConfig = {
+ *   plugins: [
+ *     DefaultJobQueuePlugin.init({
+ *       pollInterval: queueName => {
+ *         if (queueName === 'cart-recovery-email') {
+ *           // This queue does not need to be polled so frequently,
+ *           // so we set a longer interval in order to reduce load
+ *           // on the database.
+ *           return 10000;
+ *         }
+ *         return 200;
+ *       },
+ *     }),
+ *   ],
+ * };
+ * ```
  * ### concurrency
  * The number of jobs to process concurrently per worker. Defaults to 1.
  *
@@ -67,6 +100,7 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
  * ```
  *
  * @docsCategory JobQueue
+ * @docsWeight 0
  */
 @VendurePlugin({
     imports: [PluginCommonModule],
@@ -83,13 +117,9 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
 })
 export class DefaultJobQueuePlugin {
     /** @internal */
-    static options: { pollInterval?: number; concurrency?: number; backoffStrategy?: BackoffStrategy };
+    static options: DefaultJobQueueOptions;
 
-    static init(options: {
-        pollInterval?: number;
-        concurrency?: number;
-        backoffStrategy?: BackoffStrategy;
-    }): Type<DefaultJobQueuePlugin> {
+    static init(options: DefaultJobQueueOptions): Type<DefaultJobQueuePlugin> {
         DefaultJobQueuePlugin.options = options;
         return DefaultJobQueuePlugin;
     }