|
|
@@ -15,7 +15,6 @@ import Bull, {
|
|
|
JobType,
|
|
|
Processor,
|
|
|
Queue,
|
|
|
- QueueScheduler,
|
|
|
Worker,
|
|
|
WorkerOptions,
|
|
|
} from 'bullmq';
|
|
|
@@ -41,7 +40,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
private connectionOptions: ConnectionOptions;
|
|
|
private queue: Queue;
|
|
|
private worker: Worker;
|
|
|
- private scheduler: QueueScheduler;
|
|
|
private workerProcessor: Processor;
|
|
|
private options: BullMQPluginOptions;
|
|
|
private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
|
|
|
@@ -106,20 +104,10 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
}
|
|
|
throw new InternalServerError(`No processor defined for the queue "${queueName}"`);
|
|
|
};
|
|
|
-
|
|
|
- this.scheduler = new QueueScheduler(QUEUE_NAME, {
|
|
|
- ...options.schedulerOptions,
|
|
|
- connection: this.redisConnection,
|
|
|
- })
|
|
|
- .on('error', (e: any) =>
|
|
|
- Logger.error(`BullMQ Scheduler error: ${JSON.stringify(e.message)}`, loggerCtx, e.stack),
|
|
|
- )
|
|
|
- .on('stalled', jobId => Logger.warn(`BullMQ Scheduler stalled on job ${jobId}`, loggerCtx))
|
|
|
- .on('failed', jobId => Logger.warn(`BullMQ Scheduler failed on job ${jobId}`, loggerCtx));
|
|
|
}
|
|
|
|
|
|
async destroy() {
|
|
|
- await Promise.all([this.queue.close(), this.worker?.close(), this.scheduler.close()]);
|
|
|
+ await Promise.all([this.queue.close(), this.worker?.close()]);
|
|
|
}
|
|
|
|
|
|
async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
|
|
|
@@ -250,16 +238,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
|
|
|
.on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
|
|
|
.on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
|
|
|
- .on('closed', () => Logger.verbose('BullMQ Worker closed'))
|
|
|
- .on('failed', (job: Bull.Job, failedReason) => {
|
|
|
+ .on('closed', () => Logger.verbose(`BullMQ Worker closed`))
|
|
|
+ .on('failed', (job: Bull.Job | undefined, error) => {
|
|
|
Logger.warn(
|
|
|
- `Job ${job.id ?? ''} [${job.name}] failed (attempt ${job.attemptsMade} of ${
|
|
|
- job.opts.attempts ?? 1
|
|
|
+ `Job ${job?.id} [${job?.name}] failed (attempt ${job?.attemptsMade} of ${
|
|
|
+ job?.opts.attempts ?? 1
|
|
|
})`,
|
|
|
);
|
|
|
})
|
|
|
- .on('completed', (job: Bull.Job, failedReason: string) => {
|
|
|
- Logger.debug(`Job ${job.id ?? ''} [${job.name}] completed`);
|
|
|
+ .on('stalled', (jobId: string) => {
|
|
|
+ Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
|
|
|
+ })
|
|
|
+ .on('completed', (job: Bull.Job) => {
|
|
|
+ Logger.debug(`Job ${job.id} [${job.name}] completed`);
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
@@ -273,7 +264,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
this.stopped = true;
|
|
|
try {
|
|
|
await Promise.all([
|
|
|
- this.scheduler.disconnect(),
|
|
|
this.queue.disconnect(),
|
|
|
this.worker.disconnect(),
|
|
|
]);
|