Browse Source

feat(core): Add `gracefulShutdownTimeout` to DefaultJobQueuePlugin

Michael Bromley 1 year ago
parent
commit
cba06e054d

+ 16 - 1
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -51,6 +51,18 @@ export interface PollingJobQueueStrategyConfig {
      * @default () => 1000
      */
     backoffStrategy?: BackoffStrategy;
+    /**
+     * @description
+     * The timeout in ms which the queue will use when attempting a graceful shutdown.
+     * That means, when the server is shut down but a job is running, the job queue will
+     * wait for the job to complete before allowing the server to shut down. If the job
+     * does not complete within this timeout window, the job will be forced to stop
+     * and the server will shut down anyway.
+     *
+     * @since 2.2.0
+     * @default 20_000
+     */
+    gracefulShutdownTimeout?: number;
 }
 
 const STOP_SIGNAL = Symbol('STOP_SIGNAL');
@@ -232,6 +244,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
     public pollInterval: number | ((queueName: string) => number);
     public setRetries: (queueName: string, job: Job) => number;
     public backOffStrategy?: BackoffStrategy;
+    public gracefulShutdownTimeout: number;
 
     protected activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
 
@@ -245,10 +258,12 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
             this.pollInterval = concurrencyOrConfig.pollInterval ?? 200;
             this.backOffStrategy = concurrencyOrConfig.backoffStrategy ?? (() => 1000);
             this.setRetries = concurrencyOrConfig.setRetries ?? ((_, job) => job.retries);
+            this.gracefulShutdownTimeout = concurrencyOrConfig.gracefulShutdownTimeout ?? 20_000;
         } else {
             this.concurrency = concurrencyOrConfig ?? 1;
             this.pollInterval = maybePollInterval ?? 200;
             this.setRetries = (_, job) => job.retries;
+            this.gracefulShutdownTimeout = 20_000;
         }
     }
 
@@ -276,7 +291,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
         if (!active) {
             return;
         }
-        await active.stop();
+        await active.stop(this.gracefulShutdownTimeout);
     }
 
     async cancelJob(jobId: ID): Promise<Job | undefined> {

+ 14 - 1
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -76,6 +76,18 @@ export interface DefaultJobQueueOptions {
      * @since 1.3.0
      */
     useDatabaseForBuffer?: boolean;
+    /**
+     * @description
+     * The timeout in ms which the queue will use when attempting a graceful shutdown.
+     * That means when the server is shut down but a job is running, the job queue will
+     * wait for the job to complete before allowing the server to shut down. If the job
+     * does not complete within this timeout window, the job will be forced to stop
+     * and the server will shut down anyway.
+     *
+     * @since 2.2.0
+     * @default 20_000
+     */
+    gracefulShutdownTimeout?: number;
 }
 
 /**
@@ -175,13 +187,14 @@ export interface DefaultJobQueueOptions {
             ? [JobRecord, JobRecordBuffer]
             : [JobRecord],
     configuration: config => {
-        const { pollInterval, concurrency, backoffStrategy, setRetries } =
+        const { pollInterval, concurrency, backoffStrategy, setRetries, gracefulShutdownTimeout } =
             DefaultJobQueuePlugin.options ?? {};
         config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({
             concurrency,
             pollInterval,
             backoffStrategy,
             setRetries,
+            gracefulShutdownTimeout,
         });
         if (DefaultJobQueuePlugin.options.useDatabaseForBuffer === true) {
             config.jobQueueOptions.jobBufferStorageStrategy = new SqlJobBufferStorageStrategy();