Browse Source

fix(job-queue-plugin): Respect custom prefix for BullMQ Redis keys

Michael Bromley 9 months ago
parent
commit
40f1cb8216
1 changed files with 17 additions and 25 deletions
  1. 17 25
      packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

+ 17 - 25
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -46,9 +46,9 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private worker: Worker;
     private workerProcessor: Processor;
     private options: BullMQPluginOptions;
-    private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
+    private readonly queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
     private cancellationSub: Redis;
-    private cancelRunningJob$ = new Subject<string>();
+    private readonly cancelRunningJob$ = new Subject<string>();
     private readonly CANCEL_JOB_CHANNEL = 'cancel-job';
     private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';
 
@@ -62,19 +62,12 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                     age: 60 * 60 * 24 * 30,
                     count: 5000,
                 },
-                removeOnFail: options.workerOptions?.removeOnFail ?? {
-                    age: 60 * 60 * 24 * 30,
-                    count: 5000,
-                },
+                removeOnFail: options.workerOptions?.removeOnFail ?? { age: 60 * 60 * 24 * 30, count: 5000 },
             },
         };
         this.connectionOptions =
             options.connection ??
-            ({
-                host: 'localhost',
-                port: 6379,
-                maxRetriesPerRequest: null,
-            } as RedisOptions);
+            ({ host: 'localhost', port: 6379, maxRetriesPerRequest: null } as RedisOptions);
 
         this.redisConnection =
             this.connectionOptions instanceof EventEmitter
@@ -92,10 +85,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             Logger.info('Connected to Redis ✔', loggerCtx);
         }
 
-        this.queue = new Queue(QUEUE_NAME, {
-            ...options.queueOptions,
-            connection: this.redisConnection,
-        })
+        this.queue = new Queue(QUEUE_NAME, { ...options.queueOptions, connection: this.redisConnection })
             .on('error', (e: any) =>
                 Logger.error(`BullMQ Queue error: ${JSON.stringify(e.message)}`, loggerCtx, e.stack),
             )
@@ -162,8 +152,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         };
         const customJobOptions = this.options.setJobOptions?.(job.queueName, job) ?? {};
         const bullJob = await this.queue.add(job.queueName, job.data, {
-            attempts: retries + 1,
-            backoff,
+            attempts: typeof retries === 'number' ? retries + 1 : 1,
+            backoff: typeof backoff === 'number' || 'type' in backoff ? backoff : undefined,
             ...customJobOptions,
         });
         return this.createVendureJob(bullJob);
@@ -246,10 +236,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             throw new InternalServerError(e.message);
         }
 
-        return {
-            items: await Promise.all(items.map(bullJob => this.createVendureJob(bullJob))),
-            totalItems,
-        };
+        return { items: await Promise.all(items.map(bullJob => this.createVendureJob(bullJob))), totalItems };
     }
 
     async findManyById(ids: ID[]): Promise<Job[]> {
@@ -312,7 +299,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
     }
 
-    private subscribeToCancellationEvents = (channel: string, jobId: string) => {
+    private readonly subscribeToCancellationEvents = (channel: string, jobId: string) => {
         if (channel === this.CANCEL_JOB_CHANNEL && jobId) {
             this.cancelRunningJob$.next(jobId);
         }
@@ -340,10 +327,14 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                             } (${activeJobs.map(j => j.id).join(', ')})...`,
                             loggerCtx,
                         );
-                        timer = setTimeout(checkActive, 2000);
+                        timer = setTimeout(() => {
+                            void checkActive();
+                        }, 2000);
                     }
                 };
-                timer = setTimeout(checkActive, 2000);
+                timer = setTimeout(() => {
+                    void checkActive();
+                }, 2000);
 
                 await this.worker.close();
                 Logger.info(`Worker closed`, loggerCtx);
@@ -413,8 +404,9 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         args: Args,
     ): Promise<T> {
         return new Promise<T>((resolve, reject) => {
+            const prefix = this.options.workerOptions?.prefix ?? 'bull';
             (this.redisConnection as any)[scriptDef.name](
-                `bull:${this.queue.name}:`,
+                `${prefix}:${this.queue.name}:`,
                 ...args,
                 (err: any, result: any) => {
                     if (err) {