Browse Source

fix(job-queue-plugin): Partially fix BullMQ shutdown error

Relates to #1649. The first part of the error is fixed ('close' of undefined)
but the other part (connection is closed) is proving more difficult to resolve,
and seems to be related to an issue inside the BullMQ library itself.
Michael Bromley 3 years ago
parent
commit
3835f8b22c
1 changed files with 46 additions and 14 deletions
  1. 46 14
      packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

+ 46 - 14
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -10,7 +10,9 @@ import {
     Logger,
     Logger,
     PaginatedList,
     PaginatedList,
 } from '@vendure/core';
 } from '@vendure/core';
-import Bull, { Processor, Queue, QueueScheduler, Worker, WorkerOptions } from 'bullmq';
+import Bull, { ConnectionOptions, Processor, Queue, QueueScheduler, Worker, WorkerOptions } from 'bullmq';
+import { EventEmitter } from 'events';
+import Redis, { RedisOptions } from 'ioredis';
 
 
 import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
 import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
 import { RedisHealthIndicator } from './redis-health-indicator';
 import { RedisHealthIndicator } from './redis-health-indicator';
@@ -27,6 +29,8 @@ const DEFAULT_CONCURRENCY = 3;
  * @docsCategory job-queue-plugin
  * @docsCategory job-queue-plugin
  */
  */
 export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
+    private redisConnection: Redis.Redis | Redis.Cluster;
+    private connectionOptions: ConnectionOptions;
     private queue: Queue;
     private queue: Queue;
     private worker: Worker;
     private worker: Worker;
     private scheduler: QueueScheduler;
     private scheduler: QueueScheduler;
@@ -37,6 +41,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     async init(injector: Injector): Promise<void> {
     async init(injector: Injector): Promise<void> {
         const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
         const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
         this.options = options;
         this.options = options;
+        this.connectionOptions =
+            options.connection ??
+            ({
+                host: 'localhost',
+                port: 6379,
+                maxRetriesPerRequest: null,
+            } as RedisOptions);
+
+        this.redisConnection =
+            this.connectionOptions instanceof EventEmitter
+                ? this.connectionOptions
+                : new Redis(this.connectionOptions);
 
 
         const redisHealthIndicator = injector.get(RedisHealthIndicator);
         const redisHealthIndicator = injector.get(RedisHealthIndicator);
         Logger.info(`Checking Redis connection...`, loggerCtx);
         Logger.info(`Checking Redis connection...`, loggerCtx);
@@ -49,8 +65,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
 
         this.queue = new Queue(QUEUE_NAME, {
         this.queue = new Queue(QUEUE_NAME, {
             ...options.queueOptions,
             ...options.queueOptions,
-            connection: options.connection,
-        }).on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack));
+            connection: this.redisConnection,
+        })
+            .on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack))
+            .on('resumed', () => Logger.verbose(`BullMQ Queue resumed`, loggerCtx))
+            .on('paused', () => Logger.verbose(`BullMQ Queue paused`, loggerCtx));
 
 
         if (await this.queue.isPaused()) {
         if (await this.queue.isPaused()) {
             await this.queue.resume();
             await this.queue.resume();
@@ -80,12 +99,15 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
 
         this.scheduler = new QueueScheduler(QUEUE_NAME, {
         this.scheduler = new QueueScheduler(QUEUE_NAME, {
             ...options.schedulerOptions,
             ...options.schedulerOptions,
-            connection: options.connection,
-        }).on('error', (e: any) => Logger.error(`BullMQ Scheduler error: ${e.message}`, loggerCtx, e.stack));
+            connection: this.redisConnection,
+        })
+            .on('error', (e: any) => Logger.error(`BullMQ Scheduler error: ${e.message}`, loggerCtx, e.stack))
+            .on('stalled', jobId => Logger.warn(`BullMQ Scheduler stalled on job ${jobId}`, loggerCtx))
+            .on('failed', jobId => Logger.warn(`BullMQ Scheduler failed on job ${jobId}`, loggerCtx));
     }
     }
 
 
     async destroy() {
     async destroy() {
-        await Promise.all([this.queue.close(), this.worker.close(), this.scheduler.close()]);
+        await Promise.all([this.queue.close(), this.worker?.close(), this.scheduler.close()]);
     }
     }
 
 
     async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
     async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
@@ -211,13 +233,13 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             const options: WorkerOptions = {
             const options: WorkerOptions = {
                 concurrency: DEFAULT_CONCURRENCY,
                 concurrency: DEFAULT_CONCURRENCY,
                 ...this.options.workerOptions,
                 ...this.options.workerOptions,
-                connection: this.options.connection,
+                connection: this.redisConnection,
             };
             };
             this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
             this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
-                .on('error', (e: any) =>
-                    Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack),
-                )
-                .on('failed', (job: Bull.Job, failedReason: string) => {
+                .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
+                .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
+                .on('closed', () => Logger.verbose(`BullMQ Worker closed`))
+                .on('failed', (job: Bull.Job, failedReason) => {
                     Logger.warn(
                     Logger.warn(
                         `Job ${job.id} [${job.name}] failed (attempt ${job.attemptsMade} of ${
                         `Job ${job.id} [${job.name}] failed (attempt ${job.attemptsMade} of ${
                             job.opts.attempts ?? 1
                             job.opts.attempts ?? 1
@@ -230,13 +252,23 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
         }
     }
     }
 
 
+    private stopped = false;
     async stop<Data extends JobData<Data> = {}>(
     async stop<Data extends JobData<Data> = {}>(
         queueName: string,
         queueName: string,
         process: (job: Job<Data>) => Promise<any>,
         process: (job: Job<Data>) => Promise<any>,
     ): Promise<void> {
     ): Promise<void> {
-        await this.scheduler.disconnect();
-        await this.queue.disconnect();
-        await this.worker.disconnect();
+        if (!this.stopped) {
+            this.stopped = true;
+            try {
+                await Promise.all([
+                    this.scheduler.disconnect(),
+                    this.queue.disconnect(),
+                    this.worker.disconnect(),
+                ]);
+            } catch (e: any) {
+                Logger.error(e, loggerCtx, e.stack);
+            }
+        }
     }
     }
 
 
     private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
     private async createVendureJob(bullJob: Bull.Job): Promise<Job> {