Prechádzať zdrojové kódy

fix(core): Improve handling of active jobs on worker shutdown

Michael Bromley 1 rok pred
rodič
commit
e1e0987290

+ 48 - 15
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -118,9 +118,9 @@ class ActiveQueue<Data extends JobData<Data> = object> {
                                 },
                             )
                             .finally(() => {
-                                if (!this.running && nextJob.state !== JobState.PENDING) {
-                                    return;
-                                }
+                                // if (!this.running && nextJob.state !== JobState.PENDING) {
+                                //     return;
+                                // }
                                 nextJob.off('progress', onProgress);
                                 return this.onFailOrComplete(nextJob);
                             })
@@ -145,24 +145,55 @@ class ActiveQueue<Data extends JobData<Data> = object> {
         void runNextJobs();
     }
 
-    stop(): Promise<void> {
+    async stop(): Promise<void> {
         this.running = false;
-        this.queueStopped$.next(STOP_SIGNAL);
         clearTimeout(this.timer);
+        await this.awaitRunningJobsOrTimeout();
+        Logger.info(`Stopped queue: ${this.queueName}`);
+        // Allow any job status changes to be persisted
+        // before we permit the application shutdown to continue.
+        // Otherwise, the DB connection will close before our
+        // changes are persisted.
+        await new Promise(resolve => setTimeout(resolve, 1000));
+    }
 
+    private awaitRunningJobsOrTimeout(): Promise<void> {
         const start = +new Date();
-        // Wait for 2 seconds to allow running jobs to complete
-        const maxTimeout = 2000;
-        let pollTimer: any;
+        const stopActiveQueueTimeout = 20_000;
+        let timeout: ReturnType<typeof setTimeout>;
         return new Promise(resolve => {
-            const pollActiveJobs = async () => {
-                const timedOut = +new Date() - start > maxTimeout;
-                if (this.activeJobs.length === 0 || timedOut) {
-                    clearTimeout(pollTimer);
+            let lastStatusUpdate = +new Date();
+            const pollActiveJobs = () => {
+                const now = +new Date();
+                const timedOut =
+                    stopActiveQueueTimeout === undefined ? false : now - start > stopActiveQueueTimeout;
+
+                if (this.activeJobs.length === 0) {
+                    clearTimeout(timeout);
+                    resolve();
+                    return;
+                }
+
+                if (timedOut) {
+                    Logger.warn(
+                        `Timed out (${stopActiveQueueTimeout}ms) waiting for ${this.activeJobs.length} active jobs in queue "${this.queueName}" to complete. Forcing stop...`,
+                    );
+                    this.queueStopped$.next(STOP_SIGNAL);
+                    clearTimeout(timeout);
                     resolve();
-                } else {
-                    pollTimer = setTimeout(pollActiveJobs, 50);
+                    return;
                 }
+
+                if (this.activeJobs.length > 0) {
+                    if (now - lastStatusUpdate > 2000) {
+                        Logger.info(
+                            `Stopping queue: ${this.queueName} - waiting for ${this.activeJobs.length} active jobs to complete...`,
+                        );
+                        lastStatusUpdate = now;
+                    }
+                }
+
+                timeout = setTimeout(pollActiveJobs, 200);
             };
             void pollActiveJobs();
         });
@@ -175,7 +206,9 @@ class ActiveQueue<Data extends JobData<Data> = object> {
 
     private removeJobFromActive(job: Job<Data>) {
         const index = this.activeJobs.indexOf(job);
-        this.activeJobs.splice(index, 1);
+        if (index !== -1) {
+            this.activeJobs.splice(index, 1);
+        }
     }
 }