|
|
@@ -10,7 +10,9 @@ import {
|
|
|
Logger,
|
|
|
PaginatedList,
|
|
|
} from '@vendure/core';
|
|
|
-import Bull, { JobType, Processor, Queue, QueueScheduler, Worker, WorkerOptions } from 'bullmq';
|
|
|
+import Bull, { JobType, ConnectionOptions, Processor, Queue, QueueScheduler, Worker, WorkerOptions } from 'bullmq';
|
|
|
+import { EventEmitter } from 'events';
|
|
|
+import Redis, { RedisOptions } from 'ioredis';
|
|
|
|
|
|
import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
|
|
|
import { RedisHealthIndicator } from './redis-health-indicator';
|
|
|
@@ -27,6 +29,8 @@ const DEFAULT_CONCURRENCY = 3;
|
|
|
* @docsCategory job-queue-plugin
|
|
|
*/
|
|
|
export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
+ private redisConnection: Redis.Redis | Redis.Cluster;
|
|
|
+ private connectionOptions: ConnectionOptions;
|
|
|
private queue: Queue;
|
|
|
private worker: Worker;
|
|
|
private scheduler: QueueScheduler;
|
|
|
@@ -37,6 +41,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
async init(injector: Injector): Promise<void> {
|
|
|
const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
|
|
|
this.options = options;
|
|
|
+ this.connectionOptions =
|
|
|
+ options.connection ??
|
|
|
+ ({
|
|
|
+ host: 'localhost',
|
|
|
+ port: 6379,
|
|
|
+ maxRetriesPerRequest: null,
|
|
|
+ } as RedisOptions);
|
|
|
+
|
|
|
+ this.redisConnection =
|
|
|
+ this.connectionOptions instanceof EventEmitter
|
|
|
+ ? this.connectionOptions
|
|
|
+ : new Redis(this.connectionOptions);
|
|
|
|
|
|
const redisHealthIndicator = injector.get(RedisHealthIndicator);
|
|
|
Logger.info(`Checking Redis connection...`, loggerCtx);
|
|
|
@@ -49,8 +65,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
|
|
|
this.queue = new Queue(QUEUE_NAME, {
|
|
|
...options.queueOptions,
|
|
|
- connection: options.connection ?? {},
|
|
|
- }).on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack));
|
|
|
+ connection: this.redisConnection,
|
|
|
+ })
|
|
|
+ .on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack))
|
|
|
+ .on('resumed', () => Logger.verbose(`BullMQ Queue resumed`, loggerCtx))
|
|
|
+ .on('paused', () => Logger.verbose(`BullMQ Queue paused`, loggerCtx));
|
|
|
|
|
|
if (await this.queue.isPaused()) {
|
|
|
await this.queue.resume();
|
|
|
@@ -80,12 +99,15 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
|
|
|
this.scheduler = new QueueScheduler(QUEUE_NAME, {
|
|
|
...options.schedulerOptions,
|
|
|
- connection: options.connection ?? {},
|
|
|
- }).on('failed', (e: any) => Logger.error(`BullMQ Scheduler error: ${e.message}`, loggerCtx, e.stack));
|
|
|
+ connection: this.redisConnection,
|
|
|
+ })
|
|
|
+ .on('error', (e: any) => Logger.error(`BullMQ Scheduler error: ${e.message}`, loggerCtx, e.stack))
|
|
|
+ .on('stalled', jobId => Logger.warn(`BullMQ Scheduler stalled on job ${jobId}`, loggerCtx))
|
|
|
+ .on('failed', jobId => Logger.warn(`BullMQ Scheduler failed on job ${jobId}`, loggerCtx));
|
|
|
}
|
|
|
|
|
|
async destroy() {
|
|
|
- await Promise.all([this.queue.close(), this.worker.close(), this.scheduler.close()]);
|
|
|
+ await Promise.all([this.queue.close(), this.worker?.close(), this.scheduler.close()]);
|
|
|
}
|
|
|
|
|
|
async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
|
|
|
@@ -211,13 +233,13 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
const options: WorkerOptions = {
|
|
|
concurrency: DEFAULT_CONCURRENCY,
|
|
|
...this.options.workerOptions,
|
|
|
- connection: this.options.connection ?? {},
|
|
|
+ connection: this.redisConnection,
|
|
|
};
|
|
|
this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
|
|
|
- .on('error', (e: any) =>
|
|
|
- Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack),
|
|
|
- )
|
|
|
- .on('failed', (job: Bull.Job, error: Error) => {
|
|
|
+ .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
|
|
|
+ .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
|
|
|
+ .on('closed', () => Logger.verbose(`BullMQ Worker closed`))
|
|
|
+ .on('failed', (job: Bull.Job, failedReason) => {
|
|
|
Logger.warn(
|
|
|
`Job ${job.id} [${job.name}] failed (attempt ${job.attemptsMade} of ${
|
|
|
job.opts.attempts ?? 1
|
|
|
@@ -230,13 +252,23 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private stopped = false;
|
|
|
async stop<Data extends JobData<Data> = {}>(
|
|
|
queueName: string,
|
|
|
process: (job: Job<Data>) => Promise<any>,
|
|
|
): Promise<void> {
|
|
|
- await this.scheduler.disconnect();
|
|
|
- await this.queue.disconnect();
|
|
|
- await this.worker.disconnect();
|
|
|
+ if (!this.stopped) {
|
|
|
+ this.stopped = true;
|
|
|
+ try {
|
|
|
+ await Promise.all([
|
|
|
+ this.scheduler.disconnect(),
|
|
|
+ this.queue.disconnect(),
|
|
|
+ this.worker.disconnect(),
|
|
|
+ ]);
|
|
|
+ } catch (e: any) {
|
|
|
+ Logger.error(e, loggerCtx, e.stack);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
|