Browse Source

fix(job-queue-plugin): Prevent duplicate cancellation subscriptions

Add `cancellationSubscribed` flag to ensure the cancellation event
subscription only happens once. Previously, when multiple queues
shared the same concurrency value, the subscription block could run
multiple times causing duplicate listeners on `cancellationSub`.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Will Nahmens 3 days ago
parent
commit
d55e625eb4
1 changed files with 12 additions and 6 deletions
  1. 12 6
      packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

+ 12 - 6
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -66,6 +66,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private jobListIndexService: JobListIndexService;
     private readonly queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
     private cancellationSub: Redis;
+    private cancellationSubscribed = false;
     private readonly cancelRunningJob$ = new Subject<string>();
     private readonly CANCEL_JOB_CHANNEL = 'cancel-job';
     private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';
@@ -328,10 +329,12 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                 this.workers.set(concurrency, worker);
             }
 
-            // Subscribe to cancellation events once (on first worker creation).
-            // The `subscribeToCancellationEvents` handler broadcasts to all workers
-            // via `cancelRunningJob$` since any worker may be processing the job.
-            if (this.workers.size === 1) {
+            // Subscribe to cancellation events once. The `subscribeToCancellationEvents` handler
+            // broadcasts to all workers via `cancelRunningJob$` since any worker may be
+            // processing the job. We use a flag to ensure idempotency since multiple queues
+            // may share the same concurrency value and this block runs for each queue.
+            if (!this.cancellationSubscribed) {
+                this.cancellationSubscribed = true;
                 await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
                 this.cancellationSub.on('message', this.subscribeToCancellationEvents);
             }
@@ -361,8 +364,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                     .on('completed', (job: Bull.Job) => {
                         Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx);
                     });
-                await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
-                this.cancellationSub.on('message', this.subscribeToCancellationEvents);
+                if (!this.cancellationSubscribed) {
+                    this.cancellationSubscribed = true;
+                    await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
+                    this.cancellationSub.on('message', this.subscribeToCancellationEvents);
+                }
             }
         }
     }