Browse Source

fix(core): Gracefully handle errors in JobQueue

Closes #635
Michael Bromley 5 years ago
parent
commit
6d1b8c6217
1 changed files with 37 additions and 16 deletions
  1. 37 16
      packages/core/src/job-queue/job-queue.ts

+ 37 - 16
packages/core/src/job-queue/job-queue.ts

@@ -1,6 +1,9 @@
 import { JobState } from '@vendure/common/lib/generated-types';
+import { Subject, Subscription } from 'rxjs';
+import { throttleTime } from 'rxjs/operators';
 
 import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+import { Logger } from '../config/logger/vendure-logger';
 
 import { Job } from './job';
 import { CreateQueueOptions, JobConfig, JobData } from './types';
@@ -22,6 +25,8 @@ export class JobQueue<Data extends JobData<Data> = {}> {
     private timer: any;
     private fooId: number;
     private running = false;
+    private errorNotifier$ = new Subject<[string, string]>();
+    private subscription: Subscription;
 
     get concurrency(): number {
         return this.options.concurrency;
@@ -39,34 +44,49 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         private options: CreateQueueOptions<Data>,
         private jobQueueStrategy: JobQueueStrategy,
         private pollInterval: number,
-    ) {}
+    ) {
+        this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
+            Logger.error(message);
+            Logger.debug(stack);
+        });
+    }
 
     /** @internal */
     start() {
         if (this.running) {
             return;
         }
+        Logger.debug(`Starting JobQueue "${this.options.name}"`);
         this.running = true;
         const concurrency = this.options.concurrency;
         const runNextJobs = async () => {
-            const runningJobsCount = this.activeJobs.length;
-            for (let i = runningJobsCount; i < concurrency; i++) {
-                const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(this.options.name);
-                if (nextJob) {
-                    this.activeJobs.push(nextJob);
-                    await this.jobQueueStrategy.update(nextJob);
-                    nextJob.on('complete', job => this.onFailOrComplete(job));
-                    nextJob.on('progress', job => this.jobQueueStrategy.update(job));
-                    nextJob.on('fail', job => this.onFailOrComplete(job));
-                    try {
-                        const returnVal = this.options.process(nextJob);
-                        if (returnVal instanceof Promise) {
-                            returnVal.catch(err => nextJob.fail(err));
+            try {
+                const runningJobsCount = this.activeJobs.length;
+                for (let i = runningJobsCount; i < concurrency; i++) {
+                    const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(
+                        this.options.name,
+                    );
+                    if (nextJob) {
+                        this.activeJobs.push(nextJob);
+                        await this.jobQueueStrategy.update(nextJob);
+                        nextJob.on('complete', job => this.onFailOrComplete(job));
+                        nextJob.on('progress', job => this.jobQueueStrategy.update(job));
+                        nextJob.on('fail', job => this.onFailOrComplete(job));
+                        try {
+                            const returnVal = this.options.process(nextJob);
+                            if (returnVal instanceof Promise) {
+                                returnVal.catch(err => nextJob.fail(err));
+                            }
+                        } catch (err) {
+                            nextJob.fail(err);
                         }
-                    } catch (err) {
-                        nextJob.fail(err);
                     }
                 }
+            } catch (e) {
+                this.errorNotifier$.next([
+                    `Job queue "${this.options.name}" encountered an error (set log level to Debug for trace): ${e.message}`,
+                    e.stack,
+                ]);
             }
             if (this.running) {
                 this.timer = setTimeout(runNextJobs, this.pollInterval);
@@ -78,6 +98,7 @@ export class JobQueue<Data extends JobData<Data> = {}> {
 
     /** @internal */
     pause() {
+        Logger.debug(`Pausing JobQueue "${this.options.name}"`);
         this.running = false;
         clearTimeout(this.timer);
     }