Bladeren bron

feat(job-queue-plugin): Implement cancellation mechanism

Relates to #1127, relates to #2650. This commit adds a mechanism to
track cancellation of jobs in Redis using pub/sub on a custom
set which tracks the IDs which have been cancelled.

The job process functions will still continue to execute unless
there is specific logic to check the job state and throw an
error on cancellation.
Michael Bromley 1 jaar geleden
bovenliggende
commit
d0e97caaf0

+ 88 - 18
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -21,6 +21,8 @@ import Bull, {
 } from 'bullmq';
 import { EventEmitter } from 'events';
 import { Cluster, Redis, RedisOptions } from 'ioredis';
+import { Subject } from 'rxjs';
+import { filter, takeUntil } from 'rxjs/operators';
 
 import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
 import { RedisHealthIndicator } from './redis-health-indicator';
@@ -45,6 +47,10 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private workerProcessor: Processor;
     private options: BullMQPluginOptions;
     private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
+    private cancellationSub: Redis;
+    private cancelRunningJob$ = new Subject<string>();
+    private readonly CANCEL_JOB_CHANNEL = 'cancel-job';
+    private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';
 
     async init(injector: Injector): Promise<void> {
         const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
@@ -109,18 +115,38 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             const processFn = this.queueNameProcessFnMap.get(queueName);
             if (processFn) {
                 const job = await this.createVendureJob(bullJob);
+                const completed$ = new Subject<void>();
                 try {
                     // eslint-disable-next-line
                     job.on('progress', _job => bullJob.updateProgress(_job.progress));
+
+                    this.cancelRunningJob$
+                        .pipe(
+                            filter(jobId => jobId === job.id),
+                            takeUntil(completed$),
+                        )
+                        .subscribe(() => {
+                            Logger.info(`Cancelling job ${job.id ?? ''}`, loggerCtx);
+                            job.cancel();
+                        });
                     const result = await processFn(job);
+
                     await bullJob.updateProgress(100);
                     return result;
                 } catch (e: any) {
                     throw e;
+                } finally {
+                    if (job.id) {
+                        await this.redisConnection.srem(this.CANCELLED_JOB_LIST_NAME, job.id?.toString());
+                    }
+                    completed$.next();
+                    completed$.complete();
                 }
             }
             throw new InternalServerError(`No processor defined for the queue "${queueName}"`);
         };
+        // Subscription-mode Redis connection for the cancellation messages
+        this.cancellationSub = new Redis(this.connectionOptions as RedisOptions);
     }
 
     async destroy() {
@@ -144,17 +170,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         const bullJob = await this.queue.getJob(jobId);
         if (bullJob) {
             if (await bullJob.isActive()) {
-                // Not yet possible in BullMQ, see
-                // https://github.com/taskforcesh/bullmq/issues/632
-                throw new InternalServerError('Cannot cancel a running job');
-            }
-            try {
-                await bullJob.remove();
+                await this.setActiveJobAsCancelled(jobId);
                 return this.createVendureJob(bullJob);
-            } catch (e: any) {
-                const message = `Error when cancelling job: ${JSON.stringify(e.message)}`;
-                Logger.error(message, loggerCtx);
-                throw new InternalServerError(message);
+            } else {
+                try {
+                    const job = await this.createVendureJob(bullJob);
+                    await bullJob.remove();
+                    return job;
+                } catch (e: any) {
+                    const message = `Error when cancelling job: ${JSON.stringify(e.message)}`;
+                    Logger.error(message, loggerCtx);
+                    throw new InternalServerError(message);
+                }
             }
         }
     }
@@ -262,23 +289,32 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
                 .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
                 .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
-                .on('closed', () => Logger.verbose('BullMQ Worker closed'))
+                .on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx))
                 .on('failed', (job: Bull.Job | undefined, error) => {
                     Logger.warn(
                         `Job ${job?.id ?? '(unknown id)'} [${job?.name ?? 'unknown name'}] failed (attempt ${
                             job?.attemptsMade ?? 'unknown'
                         } of ${job?.opts.attempts ?? 1})`,
+                        loggerCtx,
                     );
                 })
                 .on('stalled', (jobId: string) => {
                     Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
                 })
                 .on('completed', (job: Bull.Job) => {
-                    Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`);
+                    Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx);
                 });
+            await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
+            this.cancellationSub.on('message', this.subscribeToCancellationEvents);
         }
     }
 
+    private subscribeToCancellationEvents = (channel: string, jobId: string) => {
+        if (channel === this.CANCEL_JOB_CHANNEL && jobId) {
+            this.cancelRunningJob$.next(jobId);
+        }
+    };
+
     private stopped = false;
 
     async stop<Data extends JobData<Data> = object>(
@@ -288,13 +324,44 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         if (!this.stopped) {
             this.stopped = true;
             try {
-                await Promise.all([this.queue.close(), this.worker.close()]);
+                Logger.info(`Closing worker`, loggerCtx);
+
+                let timer: NodeJS.Timeout;
+                const checkActive = async () => {
+                    const activeCount = await this.queue.getActiveCount();
+                    if (0 < activeCount) {
+                        const activeJobs = await this.queue.getActive();
+                        Logger.info(
+                            `Waiting on ${activeCount} active ${
+                                activeCount > 1 ? 'jobs' : 'job'
+                            } (${activeJobs.map(j => j.id).join(', ')})...`,
+                            loggerCtx,
+                        );
+                        timer = setTimeout(checkActive, 2000);
+                    }
+                };
+                timer = setTimeout(checkActive, 2000);
+
+                await this.worker.close();
+                Logger.info(`Worker closed`, loggerCtx);
+                await this.queue.close();
+                clearTimeout(timer);
+                Logger.info(`Queue closed`, loggerCtx);
+                this.cancellationSub.off('message', this.subscribeToCancellationEvents);
             } catch (e: any) {
                 Logger.error(e, loggerCtx, e.stack);
             }
         }
     }
 
+    private async setActiveJobAsCancelled(jobId: string) {
+        // Not yet possible natively in BullMQ, see
+        // https://github.com/taskforcesh/bullmq/issues/632
+        // So we have our own custom method of marking a job as cancelled.
+        await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, jobId);
+        await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, jobId.toString());
+    }
+
     private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
         const jobJson = bullJob.toJSON();
         return new Job({
@@ -314,13 +381,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     private async getState(bullJob: Bull.Job): Promise<JobState> {
-        const jobJson = bullJob.toJSON();
+        const jobId = bullJob.id?.toString();
 
         if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) {
             return JobState.PENDING;
         }
         if (await bullJob.isActive()) {
-            return JobState.RUNNING;
+            const isCancelled =
+                jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId));
+            if (isCancelled) {
+                return JobState.CANCELLED;
+            } else {
+                return JobState.RUNNING;
+            }
         }
         if (await bullJob.isDelayed()) {
             return JobState.RETRYING;
@@ -331,9 +404,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         if (await bullJob.isCompleted()) {
             return JobState.COMPLETED;
         }
-        if (!jobJson.finishedOn) {
-            return JobState.CANCELLED;
-        }
         throw new InternalServerError('Could not determine job state');
         // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.
     }

+ 2 - 2
packages/job-queue-plugin/src/bullmq/types.ts

@@ -26,14 +26,14 @@ export interface BullMQPluginOptions {
      * Queue instance.
      * See the [BullMQ QueueOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.queueoptions.md)
      */
-    queueOptions?: Exclude<QueueOptions, 'connection'>;
+    queueOptions?: Omit<QueueOptions, 'connection'>;
     /**
      * @description
      * Additional options used when instantiating the BullMQ
      * Worker instance.
      * See the [BullMQ WorkerOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.workeroptions.md)
      */
-    workerOptions?: Exclude<WorkerOptions, 'connection'>;
+    workerOptions?: Omit<WorkerOptions, 'connection'>;
     /**
      * @description
      * When a job is added to the JobQueue using `JobQueue.add()`, the calling