Bladeren bron

fix(job-queue-plugin): Add missing logging & backoff settings

Michael Bromley 4 jaren geleden
bovenliggende
commit
6f7cc34acd
1 gewijzigde bestanden met toevoegingen van 24 en 5 verwijderingen
  1. 24 5
      packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

+ 24 - 5
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -51,7 +51,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             ...options.queueOptions,
             connection: options.connection,
         }).on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack));
-        const client = await this.queue.client;
 
         if (await this.queue.isPaused()) {
             await this.queue.resume();
@@ -59,6 +58,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
         this.workerProcessor = async bullJob => {
             const queueName = bullJob.name;
+            Logger.debug(
+                `Job ${bullJob.id} [${queueName}] starting (attempt ${bullJob.attemptsMade + 1} of ${
+                    bullJob.opts.attempts ?? 1
+                })`,
+            );
             const processFn = this.queueNameProcessFnMap.get(queueName);
             if (processFn) {
                 const job = this.createVendureJob(bullJob);
@@ -82,7 +86,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
     async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
         const bullJob = await this.queue.add(job.queueName, job.data, {
-            attempts: job.retries,
+            attempts: job.retries + 1,
+            backoff: {
+                delay: 1000,
+                type: 'exponential',
+            },
         });
         return this.createVendureJob(bullJob);
     }
@@ -187,9 +195,20 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                 ...this.options.workerOptions,
                 connection: this.options.connection,
             };
-            this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options).on('error', (e: any) =>
-                Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack),
-            );
+            this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
+                .on('error', (e: any) =>
+                    Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack),
+                )
+                .on('failed', (job: Bull.Job, failedReason: string) => {
+                    Logger.warn(
+                        `Job ${job.id} [${job.name}] failed (attempt ${job.attemptsMade} of ${
+                            job.opts.attempts ?? 1
+                        })`,
+                    );
+                })
+                .on('completed', (job: Bull.Job, failedReason: string) => {
+                    Logger.debug(`Job ${job.id} [${job.name}] completed`);
+                });
         }
     }