Browse Source

refactor(job-queue-plugin): Use multiple BullMQ queues instead of one

David Höck 1 year ago
parent
commit
664e22d34c

+ 11 - 1
packages/core/src/job-queue/job-queue.service.ts

@@ -54,7 +54,10 @@ export class JobQueueService implements OnModuleDestroy {
         return this.configService.jobQueueOptions.jobQueueStrategy;
     }
 
-    constructor(private configService: ConfigService, private jobBufferService: JobBufferService) {}
+    constructor(
+        private configService: ConfigService,
+        private jobBufferService: JobBufferService,
+    ) {}
 
     /** @internal */
     onModuleDestroy() {
@@ -154,6 +157,13 @@ export class JobQueueService implements OnModuleDestroy {
         return this.jobBufferService.flush(forBuffers);
     }
 
+    /**
+     * @description Returns the raw objects representing the JobQueues.
+     */
+    getRawJobQueues() {
+        return this.queues;
+    }
+
     /**
      * @description
      * Returns an array of `{ name: string; running: boolean; }` for each

+ 169 - 45
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -7,6 +7,8 @@ import {
     InternalServerError,
     Job,
     JobData,
+    JobQueue,
+    JobQueueService,
     Logger,
     PaginatedList,
 } from '@vendure/core';
@@ -42,8 +44,8 @@ const DEFAULT_CONCURRENCY = 3;
 export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private redisConnection: Redis | Cluster;
     private connectionOptions: ConnectionOptions;
-    private queue: Queue;
-    private worker: Worker;
+    private queues: Map<string, Queue> = new Map();
+    private workers: Map<string, Worker> = new Map();
     private workerProcessor: Processor;
     private options: BullMQPluginOptions;
     private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
@@ -51,9 +53,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private cancelRunningJob$ = new Subject<string>();
     private readonly CANCEL_JOB_CHANNEL = 'cancel-job';
     private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';
+    private jobQueueService: JobQueueService;
 
     async init(injector: Injector): Promise<void> {
         const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
+        this.jobQueueService = injector.get(JobQueueService);
         this.options = {
             ...options,
             workerOptions: {
@@ -91,19 +95,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             Logger.info('Connected to Redis ✔', loggerCtx);
         }
 
-        this.queue = new Queue(QUEUE_NAME, {
-            ...options.queueOptions,
-            connection: this.redisConnection,
-        })
-            .on('error', (e: any) =>
-                Logger.error(`BullMQ Queue error: ${JSON.stringify(e.message)}`, loggerCtx, e.stack),
-            )
-            .on('resumed', () => Logger.verbose('BullMQ Queue resumed', loggerCtx))
-            .on('paused', () => Logger.verbose('BullMQ Queue paused', loggerCtx));
-
-        if (await this.queue.isPaused()) {
-            await this.queue.resume();
-        }
+        await this.setupQueues();
 
         this.workerProcessor = async bullJob => {
             const queueName = bullJob.name;
@@ -145,12 +137,41 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             }
             throw new InternalServerError(`No processor defined for the queue "${queueName}"`);
         };
+
         // Subscription-mode Redis connection for the cancellation messages
         this.cancellationSub = new Redis(this.connectionOptions as RedisOptions);
     }
 
+    getBullQueueName(queue: JobQueue | string) {
+        return `vendure-queue-${typeof queue === 'string' ? queue : queue.name}`;
+    }
+
+    async setupQueues() {
+        const queues = this.jobQueueService.getRawJobQueues();
+
+        for (const queue of queues) {
+            const bullQueueName = this.getBullQueueName(queue);
+
+            const bullQueue = new Queue(bullQueueName, {
+                ...this.options.queueOptions,
+                connection: this.redisConnection,
+            })
+                .on('error', (e: any) =>
+                    Logger.error(`BullMQ Queue error: ${JSON.stringify(e.message)}`, loggerCtx, e.stack),
+                )
+                .on('resumed', () => Logger.verbose('BullMQ Queue resumed', loggerCtx))
+                .on('paused', () => Logger.verbose('BullMQ Queue paused', loggerCtx));
+
+            if (await bullQueue.isPaused()) {
+                await bullQueue.resume();
+            }
+
+            this.queues.set(bullQueueName, bullQueue);
+        }
+    }
+
     async destroy() {
-        await Promise.all([this.queue.close(), this.worker?.close()]);
+        await Promise.all([this.closeAllQueues(), this.closeAllWorkers()]);
     }
 
     async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
@@ -159,7 +180,15 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             delay: 1000,
             type: 'exponential',
         };
-        const bullJob = await this.queue.add(job.queueName, job.data, {
+
+        const bullQueueName = this.getBullQueueName(job.queueName);
+        const bullQueue = this.queues.get(bullQueueName);
+
+        if (!bullQueue) {
+            throw new InternalServerError(`Queue ${bullQueueName} not found. Could not add new job`);
+        }
+
+        const bullJob = await bullQueue.add(job.queueName, job.data, {
             attempts: retries + 1,
             backoff,
         });
@@ -167,7 +196,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     async cancelJob(jobId: string): Promise<Job | undefined> {
-        const bullJob = await this.queue.getJob(jobId);
+        const bullJob = await this.findOneBullJob(jobId);
         if (bullJob) {
             if (await bullJob.isActive()) {
                 await this.setActiveJobAsCancelled(jobId);
@@ -215,15 +244,16 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
         const settledFilter = options?.filter?.isSettled;
         if (settledFilter?.eq != null) {
-            jobTypes =
-                settledFilter.eq === true
-                    ? ['completed', 'failed']
-                    : ['wait', 'waiting-children', 'active', 'repeat', 'delayed', 'paused'];
+            jobTypes = settledFilter.eq
+                ? ['completed', 'failed']
+                : ['wait', 'waiting-children', 'active', 'repeat', 'delayed', 'paused'];
         }
 
         let items: Bull.Job[] = [];
         let totalItems = 0;
 
+        // TODO: pagination with separated queues is not possible, because there is no getter for jobs of all queues
+
         try {
             const [total, jobIds] = await this.callCustomScript(getJobsByType, [
                 skip,
@@ -250,43 +280,113 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     async findManyById(ids: ID[]): Promise<Job[]> {
-        const bullJobs = await Promise.all(ids.map(id => this.queue.getJob(id.toString())));
-        return Promise.all(bullJobs.filter(notNullOrUndefined).map(j => this.createVendureJob(j)));
+        let bullJobs: Bull.Job[] = [];
+
+        for (const queue of this.queues.values()) {
+            const jobs = await Promise.all(ids.map(id => queue.getJob(id.toString())));
+            bullJobs = bullJobs.concat(jobs.filter(notNullOrUndefined));
+        }
+
+        // TODO: fix this type assertion
+        return bullJobs as unknown as Job[];
     }
 
     async findOne(id: ID): Promise<Job | undefined> {
-        const bullJob = await this.queue.getJob(id.toString());
+        const bullJob = await this.findOneBullJob(id);
+
         if (bullJob) {
             return this.createVendureJob(bullJob);
         }
+
+        Logger.info(`Job with id ${id} not found`, loggerCtx);
+    }
+
+    private async findOneBullJob(id: ID) {
+        let bullJob: Bull.Job | undefined;
+
+        for (const queue of this.queues.values()) {
+            bullJob = await queue.getJob(id.toString());
+            if (bullJob) {
+                break;
+            }
+        }
+
+        return bullJob;
     }
 
-    // TODO V2: actually make it use the olderThan parameter
     async removeSettledJobs(queueNames?: string[], olderThan?: Date): Promise<number> {
+        const queuesToProcess = this.getAllQueues(queueNames);
+
+        const defaultGracePeriod = 100;
+        const gracePeriod = olderThan
+            ? this.calculateGracePeriod(olderThan, defaultGracePeriod)
+            : defaultGracePeriod;
+
         try {
-            const jobCounts = await this.queue.getJobCounts('completed', 'failed');
-            await this.queue.clean(100, 0, 'completed');
-            await this.queue.clean(100, 0, 'failed');
-            return Object.values(jobCounts).reduce((sum, num) => sum + num, 0);
+            let totalRemoved = 0;
+            for (const [bullQueueName, queue] of queuesToProcess) {
+                const jobCounts = await queue.getJobCounts('completed', 'failed');
+
+                await queue.clean(gracePeriod, 0, 'completed');
+                await queue.clean(gracePeriod, 0, 'failed');
+
+                totalRemoved += Object.values(jobCounts).reduce((sum, num) => sum + num, 0);
+            }
+
+            return totalRemoved;
         } catch (e: any) {
             Logger.error(e.message, loggerCtx, e.stack);
             return 0;
         }
     }
 
+    private calculateGracePeriod(olderThan: Date, fallback: number) {
+        const currentTime = new Date().getTime(); // Get the current date and time
+        const gracePeriod = currentTime - olderThan.getTime(); // Calculate the difference in milliseconds
+
+        if (gracePeriod < 0) {
+            return fallback;
+        }
+
+        return gracePeriod;
+    }
+
+    private getAllQueues(queueNames?: string[]) {
+        // TODO: check if there is a better way to do that
+        if (queueNames && queueNames.length > 0) {
+            const queues: Map<string, Queue> = new Map();
+            for (const queueName of queueNames) {
+                const queue = this.queues.get(this.getBullQueueName(queueName));
+                if (queue) {
+                    queues.set(queueName, queue);
+                }
+            }
+
+            return queues;
+        }
+
+        return this.queues;
+    }
+
     // eslint-disable-next-line @typescript-eslint/require-await
     async start<Data extends JobData<Data> = object>(
         queueName: string,
         process: (job: Job<Data>) => Promise<any>,
     ): Promise<void> {
+        const bullQueueName = this.getBullQueueName(queueName);
+
         this.queueNameProcessFnMap.set(queueName, process);
-        if (!this.worker) {
+
+        const worker = this.workers.get(bullQueueName);
+
+        if (!worker) {
             const options: WorkerOptions = {
                 concurrency: DEFAULT_CONCURRENCY,
                 ...this.options.workerOptions,
                 connection: this.redisConnection,
             };
-            this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
+
+            const newWorker = new Worker(bullQueueName, this.workerProcessor, options)
                 .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
                 .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
                 .on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx))
@@ -306,6 +406,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                 });
             await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
             this.cancellationSub.on('message', this.subscribeToCancellationEvents);
+
+            this.workers.set(bullQueueName, newWorker);
         }
     }
 
@@ -328,23 +430,25 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
                 let timer: NodeJS.Timeout;
                 const checkActive = async () => {
-                    const activeCount = await this.queue.getActiveCount();
-                    if (0 < activeCount) {
-                        const activeJobs = await this.queue.getActive();
-                        Logger.info(
-                            `Waiting on ${activeCount} active ${
-                                activeCount > 1 ? 'jobs' : 'job'
-                            } (${activeJobs.map(j => j.id).join(', ')})...`,
-                            loggerCtx,
-                        );
-                        timer = setTimeout(checkActive, 2000);
+                    for (const queue of this.queues.values()) {
+                        const activeCount = await queue.getActiveCount();
+                        if (0 < activeCount) {
+                            const activeJobs = await queue.getActive();
+                            Logger.info(
+                                `Waiting on ${activeCount} active ${
+                                    activeCount > 1 ? 'jobs' : 'job'
+                                } (${activeJobs.map(j => j.id).join(', ')})...`,
+                                loggerCtx,
+                            );
+                            timer = setTimeout(checkActive, 2000);
+                        }
                     }
                 };
                 timer = setTimeout(checkActive, 2000);
 
-                await this.worker.close();
+                await this.closeAllWorkers();
                 Logger.info(`Worker closed`, loggerCtx);
-                await this.queue.close();
+                await this.closeAllQueues();
                 clearTimeout(timer);
                 Logger.info(`Queue closed`, loggerCtx);
                 this.cancellationSub.off('message', this.subscribeToCancellationEvents);
@@ -354,6 +458,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
     }
 
+    private async closeAllWorkers() {
+        for (const worker of this.workers.values()) {
+            await worker.close();
+        }
+    }
+
+    private async closeAllQueues() {
+        for (const queue of this.queues.values()) {
+            await queue.close();
+        }
+    }
+
     private async setActiveJobAsCancelled(jobId: string) {
         // Not yet possible natively in BullMQ, see
         // https://github.com/taskforcesh/bullmq/issues/632
@@ -409,12 +525,20 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     private callCustomScript<T, Args extends any[]>(
+        queueName: string,
         scriptDef: CustomScriptDefinition<T, Args>,
         args: Args,
     ): Promise<T> {
+        const bullQueueName = this.getBullQueueName(queueName);
+        const bullQueue = this.queues.get(bullQueueName);
+
+        if (!bullQueue) {
+            throw new InternalServerError(`Queue ${bullQueueName} not found. Could not call custom script`);
+        }
+
         return new Promise<T>((resolve, reject) => {
             (this.redisConnection as any)[scriptDef.name](
-                `bull:${this.queue.name}:`,
+                `bull:${bullQueue.name}:`,
                 ...args,
                 (err: any, result: any) => {
                     if (err) {