|
@@ -55,6 +55,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
private connectionOptions: ConnectionOptions;
|
|
private connectionOptions: ConnectionOptions;
|
|
|
private queue: Queue;
|
|
private queue: Queue;
|
|
|
private worker: Worker;
|
|
private worker: Worker;
|
|
|
|
|
+ /**
|
|
|
|
|
+ * When using function-based concurrency, workers are grouped by their concurrency value.
|
|
|
|
|
+ * Key: concurrency number, Value: Worker instance.
|
|
|
|
|
+ * Multiple Vendure queues with the same concurrency share a single worker.
|
|
|
|
|
+ */
|
|
|
private workers = new Map<number, Worker>();
|
|
private workers = new Map<number, Worker>();
|
|
|
private workerProcessor: Processor;
|
|
private workerProcessor: Processor;
|
|
|
private options: BullMQPluginOptions;
|
|
private options: BullMQPluginOptions;
|
|
@@ -287,7 +292,13 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
): Promise<void> {
|
|
): Promise<void> {
|
|
|
this.queueNameProcessFnMap.set(queueName, process);
|
|
this.queueNameProcessFnMap.set(queueName, process);
|
|
|
|
|
|
|
|
- // If concurrency is a function, we use per-concurrency workers
|
|
|
|
|
|
|
+ // If concurrency is a function, we create workers grouped by concurrency value.
|
|
|
|
|
+ // Note: Workers are stored in `this.workers` keyed by concurrency number, not queue name.
|
|
|
|
|
+ // All Vendure job types share a single BullMQ queue (`QUEUE_NAME`), so any worker can
|
|
|
|
|
+ // process any job type. This means multiple Vendure queues returning the same concurrency
|
|
|
|
|
+ // will share a worker, and the concurrency limit applies to total jobs processed by that
|
|
|
|
|
+ // worker—not strictly per Vendure queue. For strict per-queue isolation, use BullMQ Pro
|
|
|
|
|
+ // Groups or create separate BullMQ queues per Vendure queue.
|
|
|
if (typeof this.options.concurrency === 'function') {
|
|
if (typeof this.options.concurrency === 'function') {
|
|
|
const concurrency = this.options.concurrency(queueName);
|
|
const concurrency = this.options.concurrency(queueName);
|
|
|
if (!this.workers.has(concurrency)) {
|
|
if (!this.workers.has(concurrency)) {
|
|
@@ -317,7 +328,9 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
|
|
|
this.workers.set(concurrency, worker);
|
|
this.workers.set(concurrency, worker);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Subscribe to cancellation on first worker
|
|
|
|
|
|
|
+ // Subscribe to cancellation events once (on first worker creation).
|
|
|
|
|
+ // The `subscribeToCancellationEvents` handler broadcasts to all workers
|
|
|
|
|
+ // via `cancelRunningJob$` since any worker may be processing the job.
|
|
|
if (this.workers.size === 1) {
|
|
if (this.workers.size === 1) {
|
|
|
await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
|
|
await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
|
|
|
this.cancellationSub.on('message', this.subscribeToCancellationEvents);
|
|
this.cancellationSub.on('message', this.subscribeToCancellationEvents);
|