Browse Source

fix(core): Fix memory leak in default JobQueueStrategies

Michael Bromley 4 years ago
parent
commit
e9e3c18fa6
1 changed files with 13 additions and 14 deletions
  1. 13 14
      packages/core/src/job-queue/polling-job-queue-strategy.ts

+ 13 - 14
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -11,12 +11,15 @@ import { Job } from './job';
 import { QueueNameProcessStorage } from './queue-name-process-storage';
 import { JobData } from './types';
 
+const STOP_SIGNAL = Symbol('STOP_SIGNAL');
+
 class ActiveQueue<Data extends JobData<Data> = {}> {
     private timer: any;
     private running = false;
     private activeJobs: Array<Job<Data>> = [];
 
     private errorNotifier$ = new Subject<[string, string]>();
+    private queueStopped$ = new Subject<typeof STOP_SIGNAL>();
     private subscription: Subscription;
 
     constructor(
@@ -49,11 +52,15 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
                             filter(job => job?.state === JobState.CANCELLED),
                             take(1),
                         );
-                        race(fromPromise(this.process(nextJob)), cancellationSignal$)
+                        const stopSignal$ = this.queueStopped$.pipe(take(1));
+
+                        race(fromPromise(this.process(nextJob)), cancellationSignal$, stopSignal$)
                             .toPromise()
                             .then(
                                 result => {
-                                    if (result instanceof Job && result.state === JobState.CANCELLED) {
+                                    if (result === STOP_SIGNAL) {
+                                        nextJob.defer();
+                                    } else if (result instanceof Job && result.state === JobState.CANCELLED) {
                                         nextJob.cancel();
                                     } else {
                                         nextJob.complete(result);
@@ -91,29 +98,21 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
 
     stop(): Promise<void> {
         this.running = false;
+        this.queueStopped$.next(STOP_SIGNAL);
         clearTimeout(this.timer);
 
         const start = +new Date();
         // Wait for 2 seconds to allow running jobs to complete
         const maxTimeout = 2000;
+        let pollTimer: any;
         return new Promise(resolve => {
             const pollActiveJobs = async () => {
                 const timedOut = +new Date() - start > maxTimeout;
                 if (this.activeJobs.length === 0 || timedOut) {
-                    // if there are any incomplete jobs after the 2 second
-                    // wait period, set them back to "pending" so they can
-                    // be re-run on next bootstrap.
-                    for (const job of this.activeJobs) {
-                        job.defer();
-                        try {
-                            await this.jobQueueStrategy.update(job);
-                        } catch (err) {
-                            Logger.info(`Error stopping job queue: ${err}`);
-                        }
-                    }
+                    clearTimeout(pollTimer);
                     resolve();
                 } else {
-                    setTimeout(pollActiveJobs, 50);
+                    pollTimer = setTimeout(pollActiveJobs, 50);
                 }
             };
             pollActiveJobs();