Răsfoiți Sursa

fix(job-queue-plugin): More accurate determination of BullMQ job state

Michael Bromley 4 ani în urmă
părinte
comite
3b3bb3b4d9

+ 28 - 16
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -65,7 +65,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             );
             const processFn = this.queueNameProcessFnMap.get(queueName);
             if (processFn) {
-                const job = this.createVendureJob(bullJob);
+                const job = await this.createVendureJob(bullJob);
                 try {
                     job.on('progress', _job => bullJob.updateProgress(_job.progress));
                     const result = await processFn(job);
@@ -100,11 +100,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     async cancelJob(jobId: string): Promise<Job | undefined> {
         const bullJob = await this.queue.getJob(jobId);
         if (bullJob) {
+            if (await bullJob.isActive()) {
+                // Not yet possible in BullMQ, see
+                // https://github.com/taskforcesh/bullmq/issues/632
+                throw new InternalServerError(`Cannot cancel a running job`);
+            }
             try {
                 await bullJob.remove();
                 return this.createVendureJob(bullJob);
             } catch (e) {
-                Logger.error(`Error when cancelling job: ${e.message}`, loggerCtx);
+                const message = `Error when cancelling job: ${e.message}`;
+                Logger.error(message, loggerCtx);
+                throw new InternalServerError(message);
             }
         }
     }
@@ -157,17 +164,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
         const totalItems = Object.values(jobCounts).reduce((sum, count) => sum + count, 0);
 
-        return Promise.resolve({
-            items: items
-                .sort((a, b) => b.timestamp - a.timestamp)
-                .map(bullJob => this.createVendureJob(bullJob)),
+        return {
+            items: await Promise.all(
+                items
+                    .sort((a, b) => b.timestamp - a.timestamp)
+                    .map(bullJob => this.createVendureJob(bullJob)),
+            ),
             totalItems,
-        });
+        };
     }
 
     async findManyById(ids: ID[]): Promise<Job[]> {
         const bullJobs = await Promise.all(ids.map(id => this.queue.getJob(id.toString())));
-        return bullJobs.filter(notNullOrUndefined).map(j => this.createVendureJob(j));
+        return Promise.all(bullJobs.filter(notNullOrUndefined).map(j => this.createVendureJob(j)));
     }
 
     async findOne(id: ID): Promise<Job | undefined> {
@@ -226,12 +235,12 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         await this.worker.disconnect();
     }
 
-    private createVendureJob(bullJob: Bull.Job): Job {
+    private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
         const jobJson = bullJob.toJSON();
         return new Job({
             queueName: bullJob.name,
             id: bullJob.id,
-            state: this.getState(bullJob),
+            state: await this.getState(bullJob),
             data: bullJob.data,
             attempts: bullJob.attemptsMade,
             createdAt: new Date(jobJson.timestamp),
@@ -244,24 +253,27 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         });
     }
 
-    private getState(bullJob: Bull.Job): JobState {
+    private async getState(bullJob: Bull.Job): Promise<JobState> {
         const jobJson = bullJob.toJSON();
 
-        if (!jobJson.processedOn && !jobJson.failedReason) {
+        if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) {
             return JobState.PENDING;
         }
-        if (!jobJson.finishedOn) {
+        if (await bullJob.isActive()) {
             return JobState.RUNNING;
         }
-        if (jobJson.failedReason && bullJob.attemptsMade < (bullJob.opts.attempts ?? 0)) {
+        if (await bullJob.isDelayed()) {
             return JobState.RETRYING;
         }
-        if (jobJson.failedReason) {
+        if (await bullJob.isFailed()) {
             return JobState.FAILED;
         }
-        if (jobJson.finishedOn) {
+        if (await bullJob.isCompleted()) {
             return JobState.COMPLETED;
         }
+        if (!jobJson.finishedOn) {
+            return JobState.CANCELLED;
+        }
         throw new InternalServerError('Could not determine job state');
         // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.
     }