Browse Source

feat(core): Allow DefaultJobQueue retries to be configured per queue

Relates to #1111
Michael Bromley 4 years ago
parent
commit
5017622ba2

+ 1 - 0
packages/core/src/job-queue/in-memory-job-queue-strategy.ts

@@ -64,6 +64,7 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
                 .toString()
                 .padEnd(10, '0');
         }
+        (job as any).retries = this.setRetries(job.queueName, job);
         // tslint:disable-next-line:no-non-null-assertion
         this.jobs.set(job.id!, job);
         if (!this.unsettledJobs[job.queueName]) {

+ 11 - 0
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -37,6 +37,14 @@ export interface PollingJobQueueStrategyConfig {
      * @description 200
      */
     pollInterval?: number | ((queueName: string) => number);
+    /**
+     * @description
+     * When a job is added to the JobQueue using `JobQueue.add()`, the calling
+     * code may specify the number of retries in case of failure. This option allows
+     * you to override that number and specify your own number of retries based on
+     * the job being added.
+     */
+    setRetries?: (queueName: string, job: Job) => number;
     /**
      * @description
      * The strategy used to decide how long to wait before retrying a failed job.
@@ -183,6 +191,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
 export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
     public concurrency: number;
     public pollInterval: number | ((queueName: string) => number);
+    public setRetries: (queueName: string, job: Job) => number;
     public backOffStrategy?: BackoffStrategy;
 
     private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
@@ -196,9 +205,11 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
             this.concurrency = concurrencyOrConfig.concurrency ?? 1;
             this.pollInterval = concurrencyOrConfig.pollInterval ?? 200;
             this.backOffStrategy = concurrencyOrConfig.backoffStrategy ?? (() => 1000);
+            this.setRetries = concurrencyOrConfig.setRetries ?? ((_, job) => job.retries);
         } else {
             this.concurrency = concurrencyOrConfig ?? 1;
             this.pollInterval = maybePollInterval ?? 200;
+            this.setRetries = (_, job) => job.retries;
         }
     }
 

+ 15 - 2
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -1,5 +1,6 @@
 import { Type } from '@vendure/common/lib/shared-types';
 
+import { Job } from '../../job-queue/job';
 import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
@@ -19,6 +20,7 @@ export interface DefaultJobQueueOptions {
     pollInterval?: number | ((queueName: string) => number);
     concurrency?: number;
     backoffStrategy?: BackoffStrategy;
+    setRetries?: (queueName: string, job: Job) => number;
 }
 
 /**
@@ -74,7 +76,7 @@ export interface DefaultJobQueueOptions {
  * Defines the backoff strategy used when retrying failed jobs. In other words, if a job fails
  * and is configured to be re-tried, how long should we wait before the next attempt?
  *
- * By default a job will be retried as soon as possible, but in some cases this is not desirable. For example,
+ * By default, a job will be retried as soon as possible, but in some cases this is not desirable. For example,
  * a job may interact with an unreliable 3rd-party API which is sensitive to too many requests. In this case, an
  * exponential backoff may be used which progressively increases the delay between each subsequent retry.
  *
@@ -94,6 +96,15 @@ export interface DefaultJobQueueOptions {
  *         // A default delay for all other queues
  *         return 1000;
  *       },
+ *       retries: (queueName, job) => {
+ *         if (queueName === 'send-email') {
+ *           // Override the default number of retries
+ *           // for the 'send-email' job because we have
+ *           // a very unreliable email service.
+ *           return 10;
+ *         }
+ *         return job.retries;
+ *       }
  *     }),
  *   ],
  * };
@@ -106,11 +117,13 @@ export interface DefaultJobQueueOptions {
     imports: [PluginCommonModule],
     entities: [JobRecord],
     configuration: config => {
-        const { pollInterval, concurrency, backoffStrategy } = DefaultJobQueuePlugin.options ?? {};
+        const { pollInterval, concurrency, backoffStrategy, setRetries } =
+            DefaultJobQueuePlugin.options ?? {};
         config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({
             concurrency,
             pollInterval,
             backoffStrategy,
+            setRetries,
         });
         return config;
     },

+ 3 - 3
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -39,7 +39,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
             throw new Error('Connection not available');
         }
         const constrainedData = this.constrainDataSize(job);
-        const newRecord = this.toRecord(job, constrainedData);
+        const newRecord = this.toRecord(job, constrainedData, this.setRetries(job.queueName, job));
         const record = await this.connection.getRepository(JobRecord).save(newRecord);
         return this.fromRecord(record);
     }
@@ -222,7 +222,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         return !!this.connection && this.connection.isConnected;
     }
 
-    private toRecord(job: Job<any>, data?: any): JobRecord {
+    private toRecord(job: Job<any>, data?: any, retries?: number): JobRecord {
         return new JobRecord({
             id: job.id || undefined,
             queueName: job.queueName,
@@ -234,7 +234,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
             startedAt: job.startedAt,
             settledAt: job.settledAt,
             isSettled: job.isSettled,
-            retries: job.retries,
+            retries: retries ?? job.retries,
             attempts: job.attempts,
         });
     }