Browse Source

feat(core,job-queue-plugin): Add per-queue concurrency configuration

Allow the `concurrency` option to be a function that receives the queue
name and returns the concurrency limit. This enables different queues
to have different concurrency limits based on their resource requirements.

Supported in:
- DefaultJobQueuePlugin (SQL-based strategy)
- BullMQJobQueuePlugin
- PubSubJobQueuePlugin

Example:
```ts
DefaultJobQueuePlugin.init({
  concurrency: (queueName) => {
    if (queueName === 'apply-collection-filters') {
      return 1;
    }
    return 3;
  },
})
```

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Will Nahmens 3 days ago
parent
commit
04b1aeae1e

+ 26 - 5
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -1,7 +1,7 @@
 import { JobState } from '@vendure/common/lib/generated-types';
 import { ID } from '@vendure/common/lib/shared-types';
 import { isObject } from '@vendure/common/lib/shared-utils';
-import { from, interval, mergeMap, race, Subject, Subscription } from 'rxjs';
+import { from, interval, race, Subject, Subscription } from 'rxjs';
 import { filter, switchMap, take, throttleTime } from 'rxjs/operators';
 
 import { Logger } from '../config/logger/vendure-logger';
@@ -26,9 +26,23 @@ export interface PollingJobQueueStrategyConfig {
      * @description
      * How many jobs from a given queue to process concurrently.
      *
+     * Can be set to a function which receives the queue name and returns
+     * the concurrency limit. This is useful for limiting concurrency on
+     * queues which have resource-intensive jobs.
+     *
+     * @example
+     * ```ts
+     * concurrency: (queueName) => {
+     *   if (queueName === 'apply-collection-filters') {
+     *     return 1;
+     *   }
+     *   return 3;
+     * }
+     * ```
+     *
      * @default 1
      */
-    concurrency?: number;
+    concurrency?: number | ((queueName: string) => number);
     /**
      * @description
      * The interval in ms between polling the database for new jobs.
@@ -76,6 +90,7 @@ class ActiveQueue<Data extends JobData<Data> = object> {
     private queueStopped$ = new Subject<typeof STOP_SIGNAL>();
     private subscription: Subscription;
     private readonly pollInterval: number;
+    private readonly concurrency: number;
 
     constructor(
         private readonly queueName: string,
@@ -86,6 +101,10 @@ class ActiveQueue<Data extends JobData<Data> = object> {
             typeof this.jobQueueStrategy.pollInterval === 'function'
                 ? this.jobQueueStrategy.pollInterval(queueName)
                 : this.jobQueueStrategy.pollInterval;
+        this.concurrency =
+            typeof this.jobQueueStrategy.concurrency === 'function'
+                ? this.jobQueueStrategy.concurrency(queueName)
+                : this.jobQueueStrategy.concurrency;
     }
 
     start() {
@@ -98,7 +117,7 @@ class ActiveQueue<Data extends JobData<Data> = object> {
         const runNextJobs = async () => {
             try {
                 const runningJobsCount = this.activeJobs.length;
-                for (let i = runningJobsCount; i < this.jobQueueStrategy.concurrency; i++) {
+                for (let i = runningJobsCount; i < this.concurrency; i++) {
                     const nextJob = await this.jobQueueStrategy.next(this.queueName);
                     if (nextJob) {
                         this.activeJobs.push(nextJob);
@@ -193,7 +212,9 @@ class ActiveQueue<Data extends JobData<Data> = object> {
 
                 if (timedOut) {
                     Logger.warn(
-                        `Timed out (${stopActiveQueueTimeout}ms) waiting for ${this.activeJobs.length} active jobs in queue "${this.queueName}" to complete. Forcing stop...`,
+                        `Timed out (${stopActiveQueueTimeout}ms) waiting for ` +
+                            `${this.activeJobs.length} active jobs in queue "${this.queueName}" ` +
+                            `to complete. Forcing stop...`,
                     );
                     this.queueStopped$.next(STOP_SIGNAL);
                     clearTimeout(timeout);
@@ -240,7 +261,7 @@ class ActiveQueue<Data extends JobData<Data> = object> {
  * @docsCategory JobQueue
  */
 export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
-    public concurrency: number;
+    public concurrency: number | ((queueName: string) => number);
     public pollInterval: number | ((queueName: string) => number);
     public setRetries: (queueName: string, job: Job) => number;
     public backOffStrategy?: BackoffStrategy;

+ 17 - 1
packages/core/src/plugin/default-job-queue-plugin/types.ts

@@ -23,9 +23,25 @@ export interface DefaultJobQueueOptions {
      * @description
      * How many jobs from a given queue to process concurrently.
      *
+     * Can be set to a function which receives the queue name and returns
+     * the concurrency limit. This is useful for limiting concurrency on
+     * queues which have resource-intensive jobs.
+     *
+     * @example
+     * ```ts
+     * DefaultJobQueuePlugin.init({
+     *   concurrency: (queueName) => {
+     *     if (queueName === 'apply-collection-filters') {
+     *       return 1;
+     *     }
+     *     return 3;
+     *   }
+     * })
+     * ```
+     *
      * @default 1
      */
-    concurrency?: number;
+    concurrency?: number | ((queueName: string) => number);
     /**
      * @description
      * The strategy used to decide how long to wait before retrying a failed job.

+ 78 - 30
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -55,6 +55,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private connectionOptions: ConnectionOptions;
     private queue: Queue;
     private worker: Worker;
+    private workers = new Map<number, Worker>();
     private workerProcessor: Processor;
     private options: BullMQPluginOptions;
     private jobListIndexService: JobListIndexService;
@@ -155,7 +156,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     async destroy() {
-        await Promise.all([this.queue.close(), this.worker?.close()]);
+        const workerClosePromises = Array.from(this.workers.values()).map(w => w.close());
+        await Promise.all([this.queue.close(), this.worker?.close(), ...workerClosePromises]);
     }
 
     async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
@@ -284,32 +286,71 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         process: (job: Job<Data>) => Promise<any>,
     ): Promise<void> {
         this.queueNameProcessFnMap.set(queueName, process);
-        if (!this.worker) {
-            const options: WorkerOptions = {
-                concurrency: DEFAULT_CONCURRENCY,
-                ...this.options.workerOptions,
-                connection: this.redisConnection,
-            };
-            this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
-                .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
-                .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
-                .on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx))
-                .on('failed', (job: Bull.Job | undefined, error) => {
-                    Logger.warn(
-                        `Job ${job?.id ?? '(unknown id)'} [${job?.name ?? 'unknown name'}] failed (attempt ${
-                            job?.attemptsMade ?? 'unknown'
-                        } of ${job?.opts.attempts ?? 1})`,
-                        loggerCtx,
-                    );
-                })
-                .on('stalled', (jobId: string) => {
-                    Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
-                })
-                .on('completed', (job: Bull.Job) => {
-                    Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx);
-                });
-            await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
-            this.cancellationSub.on('message', this.subscribeToCancellationEvents);
+
+        // If concurrency is a function, we use per-concurrency workers
+        if (typeof this.options.concurrency === 'function') {
+            const concurrency = this.options.concurrency(queueName);
+            if (!this.workers.has(concurrency)) {
+                const options: WorkerOptions = {
+                    concurrency,
+                    ...this.options.workerOptions,
+                    connection: this.redisConnection,
+                };
+                const worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
+                    .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
+                    .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
+                    .on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx))
+                    .on('failed', (job: Bull.Job | undefined, error) => {
+                        Logger.warn(
+                            `Job ${job?.id ?? '(unknown id)'} [${job?.name ?? 'unknown name'}] failed (attempt ${
+                                job?.attemptsMade ?? 'unknown'
+                            } of ${job?.opts.attempts ?? 1})`,
+                            loggerCtx,
+                        );
+                    })
+                    .on('stalled', (jobId: string) => {
+                        Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
+                    })
+                    .on('completed', (job: Bull.Job) => {
+                        Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx);
+                    });
+                this.workers.set(concurrency, worker);
+            }
+
+            // Subscribe to cancellation on first worker
+            if (this.workers.size === 1) {
+                await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
+                this.cancellationSub.on('message', this.subscribeToCancellationEvents);
+            }
+        } else {
+            // Original behavior: single worker with global concurrency
+            if (!this.worker) {
+                const options: WorkerOptions = {
+                    concurrency: this.options.concurrency ?? DEFAULT_CONCURRENCY,
+                    ...this.options.workerOptions,
+                    connection: this.redisConnection,
+                };
+                this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
+                    .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
+                    .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
+                    .on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx))
+                    .on('failed', (job: Bull.Job | undefined, error) => {
+                        Logger.warn(
+                            `Job ${job?.id ?? '(unknown id)'} [${job?.name ?? 'unknown name'}] failed (attempt ${
+                                job?.attemptsMade ?? 'unknown'
+                            } of ${job?.opts.attempts ?? 1})`,
+                            loggerCtx,
+                        );
+                    })
+                    .on('stalled', (jobId: string) => {
+                        Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
+                    })
+                    .on('completed', (job: Bull.Job) => {
+                        Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx);
+                    });
+                await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
+                this.cancellationSub.on('message', this.subscribeToCancellationEvents);
+            }
         }
     }
 
@@ -328,7 +369,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         if (!this.stopped) {
             this.stopped = true;
             try {
-                Logger.info(`Closing worker`, loggerCtx);
+                Logger.info(`Closing worker(s)`, loggerCtx);
 
                 let timer: NodeJS.Timeout;
                 const checkActive = async () => {
@@ -350,8 +391,15 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                     void checkActive();
                 }, 2000);
 
-                await this.worker.close();
-                Logger.info(`Worker closed`, loggerCtx);
+                // Close the single worker (if using number concurrency)
+                if (this.worker) {
+                    await this.worker.close();
+                }
+                // Close all workers (if using function concurrency)
+                for (const worker of this.workers.values()) {
+                    await worker.close();
+                }
+                Logger.info(`Worker(s) closed`, loggerCtx);
                 await this.queue.close();
                 clearTimeout(timer);
                 Logger.info(`Queue closed`, loggerCtx);

+ 24 - 2
packages/job-queue-plugin/src/bullmq/types.ts

@@ -1,6 +1,5 @@
 import { Job } from '@vendure/core';
-import { ConnectionOptions, WorkerOptions, Queue } from 'bullmq';
-import { QueueOptions } from 'bullmq';
+import { ConnectionOptions, Queue, QueueOptions, WorkerOptions } from 'bullmq';
 
 /**
  * @description
@@ -41,6 +40,29 @@ export interface BullMQPluginOptions {
      * See the [BullMQ WorkerOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.workeroptions.md)
      */
     workerOptions?: Omit<WorkerOptions, 'connection'>;
+    /**
+     * @description
+     * How many jobs from a given queue to process concurrently.
+     *
+     * Can be set to a function which receives the queue name and returns
+     * the concurrency limit. This is useful for limiting concurrency on
+     * queues which have resource-intensive jobs.
+     *
+     * @example
+     * ```ts
+     * BullMQJobQueuePlugin.init({
+     *   concurrency: (queueName) => {
+     *     if (queueName === 'apply-collection-filters') {
+     *       return 1;
+     *     }
+     *     return 5;
+     *   }
+     * })
+     * ```
+     *
+     * @default 3
+     */
+    concurrency?: number | ((queueName: string) => number);
     /**
      * @description
      * When a job is added to the JobQueue using `JobQueue.add()`, the calling

+ 20 - 1
packages/job-queue-plugin/src/pub-sub/options.ts

@@ -2,8 +2,27 @@ export interface PubSubOptions {
     /**
      * @description
      * Number of jobs that can be inflight at the same time.
+     *
+     * Can be set to a function which receives the queue name and returns
+     * the concurrency limit. This is useful for limiting concurrency on
+     * queues which have resource-intensive jobs.
+     *
+     * @example
+     * ```ts
+     * PubSubPlugin.init({
+     *   concurrency: (queueName) => {
+     *     if (queueName === 'apply-collection-filters') {
+     *       return 1;
+     *     }
+     *     return 20;
+     *   },
+     *   queueNamePubSubPair: new Map([...])
+     * })
+     * ```
+     *
+     * @default 20
      */
-    concurrency?: number;
+    concurrency?: number | ((queueName: string) => number);
     /**
      * @description
      * This is the mapping of Vendure queue names to PubSub Topics and Subscriptions

+ 9 - 2
packages/job-queue-plugin/src/pub-sub/pub-sub-job-queue-strategy.ts

@@ -27,7 +27,7 @@ import { PubSubOptions } from './options';
  * @docsCategory core plugins/JobQueuePlugin
  */
 export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implements JobQueueStrategy {
-    private concurrency: number;
+    private concurrency: number | ((queueName: string) => number);
     private queueNamePubSubPair: Map<string, [string, string]>;
     private pubSubClient: PubSub;
     private topics = new Map<string, Topic>();
@@ -43,6 +43,13 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
         super.init(injector);
     }
 
+    private getConcurrency(queueName: string): number {
+        if (typeof this.concurrency === 'function') {
+            return this.concurrency(queueName);
+        }
+        return this.concurrency;
+    }
+
     destroy() {
         super.destroy();
         for (const subscription of this.subscriptions.values()) {
@@ -162,7 +169,7 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
         const [topicName, subscriptionName] = pair;
         subscription = this.topic(queueName).subscription(subscriptionName, {
             flowControl: {
-                maxMessages: this.concurrency,
+                maxMessages: this.getConcurrency(queueName),
             },
         });
         this.subscriptions.set(queueName, subscription);