Prechádzať zdrojové kódy

test(job-queue-plugin): Add e2e tests for BullMQJobQueueStrategy

Michael Bromley 1 rok pred
rodič
commit
254b025b97

+ 8 - 3
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -1,4 +1,4 @@
-import { Controller, Get, OnModuleInit } from '@nestjs/common';
+import { Controller, Get, OnModuleInit, Query } from '@nestjs/common';
 import { JobQueue, JobQueueService, PluginCommonModule, VendurePlugin } from '@vendure/core';
 import { Subject } from 'rxjs';
 import { take } from 'rxjs/operators';
@@ -30,8 +30,13 @@ class TestController implements OnModuleInit {
     }
 
     @Get()
-    async runJob() {
-        await this.queue.add({});
+    async runJob(@Query('retries') retries?: string) {
+        await this.queue.add(
+            {},
+            {
+                retries: retries ? parseInt(retries, 10) : undefined,
+            },
+        );
         return true;
     }
 

+ 144 - 44
packages/job-queue-plugin/e2e/bullmq-job-queue-plugin.e2e-spec.ts

@@ -1,69 +1,169 @@
-import { DefaultLogger, LogLevel, mergeConfig } from '@vendure/core';
+import { JobState } from '@vendure/common/lib/generated-types';
+import { AutoIncrementIdStrategy, JobQueueService, mergeConfig } from '@vendure/core';
+import { PluginWithJobQueue } from '@vendure/core/e2e/fixtures/test-plugins/with-job-queue';
+import {
+    CancelJobDocument,
+    GetRunningJobsDocument,
+} from '@vendure/core/e2e/graphql/generated-e2e-admin-types';
 import { createTestEnvironment } from '@vendure/testing';
-import { RedisConnection } from 'bullmq';
+import { removeAllQueueData } from 'bullmq';
+import IORedis from 'ioredis';
 import path from 'path';
+import { afterAll, beforeAll, describe, expect, it } from 'vitest';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
-import { awaitRunningJobs } from '../../core/e2e/utils/await-running-jobs';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
 import { BullMQJobQueuePlugin } from '../src/bullmq/plugin';
 
-// eslint-disable-next-line @typescript-eslint/no-var-requires
-const Redis = require('ioredis');
-
-// eslint-disable-next-line @typescript-eslint/no-var-requires
-const { redisHost, redisPort } = require('./constants');
-
-jest.setTimeout(10 * 3000);
-
-// TODO: How to solve issues with Jest open handles after test suite finishes?
-// See https://github.com/luin/ioredis/issues/1088
-
 describe('BullMQJobQueuePlugin', () => {
-    const redisConnection: any = new Redis({
-        host: redisHost,
-        port: redisPort,
+    const redisConnection = {
+        host: '127.0.0.1',
+        port: process.env.CI ? +(process.env.E2E_REDIS_PORT || 6379) : 6379,
+        maxRetriesPerRequest: null,
+    };
+    const activeConfig = mergeConfig(testConfig(), {
+        entityOptions: {
+            entityIdStrategy: new AutoIncrementIdStrategy(),
+        },
+        plugins: [
+            BullMQJobQueuePlugin.init({
+                connection: redisConnection,
+                workerOptions: {
+                    prefix: 'e2e',
+                },
+                queueOptions: {
+                    prefix: 'e2e',
+                    defaultJobOptions: {
+                        attempts: 3,
+                    },
+                },
+                gracefulShutdownTimeout: 1_000,
+            }),
+            PluginWithJobQueue,
+        ],
     });
 
-    const { server, adminClient, shopClient } = createTestEnvironment(
-        mergeConfig(testConfig(), {
-            apiOptions: {
-                port: 4050,
-            },
-            logger: new DefaultLogger({ level: LogLevel.Info }),
-            plugins: [
-                BullMQJobQueuePlugin.init({
-                    connection: redisConnection,
-                    workerOptions: {
-                        prefix: 'e2e',
-                    },
-                    queueOptions: {
-                        prefix: 'e2e',
-                    },
-                }),
-            ],
-        }),
-    );
+    const { server, adminClient } = createTestEnvironment(activeConfig);
 
     beforeAll(async () => {
+        await removeAllQueueData(new IORedis(redisConnection), 'vendure-queue-test', 'e2e');
         await server.init({
             initialData,
             productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-minimal.csv'),
             customerCount: 1,
         });
         await adminClient.asSuperAdmin();
+        await sleep(1_000);
     }, TEST_SETUP_TIMEOUT_MS);
 
     afterAll(async () => {
-        await awaitRunningJobs(adminClient);
+        PluginWithJobQueue.jobSubject.complete();
         await server.destroy();
-        // redis.quit() creates a thread to close the connection.
-        // We wait until all threads have been run once to ensure the connection closes.
-        // See https://stackoverflow.com/a/54560610/772859
-        await new Promise(resolve => setTimeout(resolve, 100));
     });
 
-    it('works', () => {
-        expect(1).toBe(1);
+    function getJobsInTestQueue(state?: JobState) {
+        return adminClient
+            .query(GetRunningJobsDocument, {
+                options: {
+                    filter: {
+                        queueName: {
+                            eq: 'test',
+                        },
+                        ...(state
+                            ? {
+                                  state: { eq: state },
+                              }
+                            : {}),
+                    },
+                },
+            })
+            .then(data => data.jobs);
+    }
+
+    let testJobId: string;
+
+    it('creates and starts running a job', async () => {
+        const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job?retries=2`;
+        await adminClient.fetch(restControllerUrl);
+
+        await sleep(300);
+        const jobs = await getJobsInTestQueue();
+
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].state).toBe(JobState.RUNNING);
+        expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
+        testJobId = jobs.items[0].id;
+    });
+
+    it(
+        'shutdown server before completing job',
+        async () => {
+            await server.destroy();
+            await server.bootstrap();
+            await adminClient.asSuperAdmin();
+
+            await sleep(300);
+            const jobs = await getJobsInTestQueue();
+
+            expect(jobs.items.length).toBe(1);
+            expect(jobs.items[0].state).toBe(JobState.RUNNING);
+            expect(jobs.items[0].id).toBe(testJobId);
+            expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
+        },
+        TEST_SETUP_TIMEOUT_MS,
+    );
+
+    it('complete job after restart', async () => {
+        PluginWithJobQueue.jobSubject.next();
+
+        await sleep(300);
+        const jobs = await getJobsInTestQueue();
+
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].state).toBe(JobState.COMPLETED);
+        expect(jobs.items[0].id).toBe(testJobId);
+        expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
+    });
+
+    it('cancels a running job', async () => {
+        PluginWithJobQueue.jobHasDoneWork = false;
+        const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job`;
+        await adminClient.fetch(restControllerUrl);
+
+        await sleep(300);
+        const jobs = await getJobsInTestQueue(JobState.RUNNING);
+
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].state).toBe(JobState.RUNNING);
+        expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
+        const jobId = jobs.items[0].id;
+
+        const { cancelJob } = await adminClient.query(CancelJobDocument, {
+            id: jobId,
+        });
+
+        expect(cancelJob.state).toBe(JobState.CANCELLED);
+        expect(cancelJob.isSettled).toBe(true);
+        expect(cancelJob.settledAt).not.toBeNull();
+
+        await sleep(300);
+        const jobs2 = await getJobsInTestQueue(JobState.CANCELLED);
+        expect(jobs2.items.length).toBe(1);
+        expect(jobs2.items[0].id).toBe(jobId);
+
+        PluginWithJobQueue.jobSubject.next();
     });
+
+    // it('subscribe to result of job', async () => {
+    //     const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe`;
+    //     const result = await adminClient.fetch(restControllerUrl);
+    //
+    //     expect(await result.text()).toBe('42!');
+    //     const jobs = await getJobsInTestQueue(JobState.RUNNING);
+    //     expect(jobs.items.length).toBe(0);
+    // });
 });
+
+function sleep(ms: number): Promise<void> {
+    return new Promise(resolve => setTimeout(resolve, ms));
+}

+ 2 - 2
packages/job-queue-plugin/e2e/check-connection.js

@@ -2,8 +2,8 @@ const { RedisConnection } = require('bullmq');
 const { redisHost, redisPort } = require('./constants');
 
 const connection = new RedisConnection({
-    port: redisPort,
-    host: redisHost,
+    host: '127.0.0.1',
+    port: process.env.CI ? +(process.env.E2E_REDIS_PORT || 6379) : 6379,
 });
 
 let timer;

+ 1 - 1
packages/job-queue-plugin/package.json

@@ -13,7 +13,7 @@
         "build": "rimraf package && tsc -p ./tsconfig.build.json",
         "lint": "eslint --fix .",
         "test": "vitest --run",
-        "e2e-wip": "node e2e/check-connection.js || jest --config ../../e2e-common/jest-config.js --runInBand --package=job-queue-plugin",
+        "e2e": "node e2e/check-connection.js || cross-env PACKAGE=job-queue-plugin vitest --config ../../e2e-common/vitest.config.mts --run",
         "ci": "npm run build"
     },
     "homepage": "https://www.vendure.io/",

+ 155 - 79
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -2,6 +2,7 @@ import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import {
     ID,
+    idsAreEqual,
     Injector,
     InspectableJobQueueStrategy,
     InternalServerError,
@@ -12,26 +13,30 @@ import {
     Logger,
     PaginatedList,
 } from '@vendure/core';
-import Bull, {
-    ConnectionOptions,
-    JobType,
-    Processor,
-    Queue,
-    Worker,
-    WorkerOptions,
-    Job as BullJob,
-} from 'bullmq';
+import Bull, { ConnectionOptions, JobType, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
 import { EventEmitter } from 'events';
 import { Cluster, Redis, RedisOptions } from 'ioredis';
-import { Subject } from 'rxjs';
-import { filter, takeUntil } from 'rxjs/operators';
+import { firstValueFrom, Subject, Subscription, lastValueFrom } from 'rxjs';
+import { map, tap, filter, takeUntil, debounceTime } from 'rxjs/operators';
 
 import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
 import { RedisHealthIndicator } from './redis-health-indicator';
 import { BullMQPluginOptions } from './types';
 
+const QUEUE_NAME_PREFIX = 'vendure-queue-';
 const DEFAULT_CONCURRENCY = 3;
 
+const QUEUE_ID_BITS = 12; // 12 bits for the queue ID (supports 4096 queues)
+const JOB_ID_BITS = 41; // 41 bits for the job ID (supports ~2 trillion jobs per queue)
+// eslint-disable-next-line no-bitwise
+const MAX_QUEUE_ID = (1 << QUEUE_ID_BITS) - 1; // Max queue ID (65535)
+
+export class GracefulShutdownTimeoutError extends Error {
+    constructor(message: string) {
+        super(message);
+    }
+}
+
 /**
  * @description
  * This JobQueueStrategy uses [BullMQ](https://docs.bullmq.io/) to implement a push-based job queue
@@ -43,12 +48,17 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     private redisConnection: Redis | Cluster;
     private connectionOptions: ConnectionOptions;
     private queues: Map<string, Queue> = new Map();
+    // A unique integer ID assigned to each queue, used to generate a unique job ID
+    // from the combination of queue ID and job ID.
+    private queueIds: Map<string, number> = new Map();
     private workers: Map<string, Worker> = new Map();
     private workerProcessor: Processor;
     private options: BullMQPluginOptions;
     private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
     private cancellationSub: Redis;
+    // emits a globalId of a job to cancel
     private cancelRunningJob$ = new Subject<string>();
+    private forceFailRunningJob$ = new Subject<string>();
     private readonly CANCEL_JOB_CHANNEL = 'cancel-job';
     private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';
     private jobQueueService: JobQueueService;
@@ -69,6 +79,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         this.options = {
             ...options,
             workerOptions: {
+                ...options.workerOptions,
                 removeOnComplete: options.workerOptions?.removeOnComplete ?? {
                     age: 60 * 60 * 24 * 30,
                     count: 5000,
@@ -105,6 +116,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
         this.workerProcessor = async bullJob => {
             const queueName = bullJob.name;
+            const cancelled$ = this.cancelRunningJob$.asObservable();
             Logger.debug(
                 `Job ${bullJob.id ?? ''} [${queueName}] starting (attempt ${bullJob.attemptsMade + 1} of ${
                     bullJob.opts.attempts ?? 1
@@ -114,20 +126,42 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             if (processFn) {
                 const job = await this.createVendureJob(bullJob);
                 const completed$ = new Subject<void>();
+                let subscription: Subscription | undefined;
                 try {
                     // eslint-disable-next-line
                     job.on('progress', _job => bullJob.updateProgress(_job.progress));
 
-                    this.cancelRunningJob$
-                        .pipe(
-                            filter(jobId => jobId === job.id),
-                            takeUntil(completed$),
-                        )
-                        .subscribe(() => {
-                            Logger.info(`Setting job ${job.id ?? ''} as cancelled`, loggerCtx);
-                            job.cancel();
+                    const jobWasCancelled = new Promise(resolve => {
+                        subscription = cancelled$.subscribe(val => {
+                            if (idsAreEqual(val, job.id)) {
+                                Logger.warn(`Setting job ${job.id ?? ''} as cancelled`, loggerCtx);
+                                job.cancel();
+                                // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+                                resolve(new Error(`Job ${job.name}:${job.id} was cancelled`));
+                            }
                         });
-                    const result = await processFn(job);
+                    });
+
+                    const result = await Promise.race([
+                        processFn(job),
+                        firstValueFrom(
+                            this.forceFailRunningJob$.asObservable().pipe(
+                                takeUntil(completed$),
+                                map(
+                                    () =>
+                                        new GracefulShutdownTimeoutError(
+                                            `Job ${job.name} was force failed on shutdown because it was still running after the gracefulShutdownTimeout period`,
+                                        ),
+                                ),
+                            ),
+                        ),
+                        jobWasCancelled,
+                    ]);
+
+                    // await new Promise(resolve => setTimeout(resolve, 100));
+                    if (result instanceof Error) {
+                        throw result;
+                    }
 
                     await bullJob.updateProgress(100);
                     return result;
@@ -139,6 +173,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                     }
                     completed$.next();
                     completed$.complete();
+                    subscription?.unsubscribe();
                 }
             }
             throw new InternalServerError(`No processor defined for the queue "${queueName}"`);
@@ -149,7 +184,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     getBullQueueName(queue: JobQueue | string) {
-        return `vendure-queue-${typeof queue === 'string' ? queue : queue.name}`;
+        const namePrefixRe = new RegExp(`^${QUEUE_NAME_PREFIX}`);
+        const queueNameString = typeof queue === 'string' ? queue : queue.name;
+        return namePrefixRe.test(queueNameString)
+            ? queueNameString
+            : `${QUEUE_NAME_PREFIX}${queueNameString}`;
     }
 
     getOrCreateBullQueue(queueName: string) {
@@ -172,6 +211,13 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     private async setupQueue(queue: JobQueue | string) {
+        const allQueues = this.jobQueueService.getRawJobQueues();
+        const queueIndex = allQueues.findIndex(
+            q => q.name === (typeof queue === 'string' ? queue : queue.name),
+        );
+        if (queueIndex > MAX_QUEUE_ID) {
+            throw new Error('Exceeded maximum number of queues');
+        }
         const bullQueueName = this.getBullQueueName(queue);
         const bullQueue = new Queue(bullQueueName, {
             ...this.options.queueOptions,
@@ -190,6 +236,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         Logger.info(`Queue "${bullQueueName}" created.`, loggerCtx);
 
         this.queues.set(bullQueueName, bullQueue);
+        this.queueIds.set(bullQueueName, queueIndex);
 
         return bullQueue;
     }
@@ -215,12 +262,20 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         return this.createVendureJob(bullJob);
     }
 
-    async cancelJob(jobId: string): Promise<Job | undefined> {
-        const bullJob = await this.findOneBullJob(jobId);
+    async cancelJob(globalId: number): Promise<Job | undefined> {
+        const bullJob = await this.getBullJobFromGlobalId(globalId);
         if (bullJob) {
-            if (await bullJob.isActive()) {
-                await this.setActiveJobAsCancelled(jobId);
-                return this.createVendureJob(bullJob);
+            if ((await bullJob.isActive()) && bullJob.id) {
+                await this.setActiveJobAsCancelled(globalId);
+                const jobToCancel = await this.createVendureJob(bullJob);
+                return new Job({
+                    ...jobToCancel,
+                    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+                    id: jobToCancel.id!,
+                    state: JobState.CANCELLED,
+                    settledAt: new Date(),
+                    data: jobToCancel.data,
+                });
             } else {
                 try {
                     const job = await this.createVendureJob(bullJob);
@@ -235,8 +290,17 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
     }
 
-    buildUniqueJobId(queueName: string, jobId: ID | undefined) {
-        return `${queueName}_${jobId ?? 0}`;
+    getGlobalId(queueName: string, jobId: number) {
+        const queueID = this.queueIds.get(queueName);
+        if (queueID == null) {
+            throw new Error(`Queue "${queueName}" not found`);
+        }
+        // eslint-disable-next-line no-bitwise
+        if (jobId >= 1 << JOB_ID_BITS) {
+            throw new Error('Job ID exceeds maximum allowed value');
+        }
+        // eslint-disable-next-line no-bitwise
+        return (queueID << JOB_ID_BITS) | jobId;
     }
 
     async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
@@ -283,17 +347,20 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                 const bullQueue = await this.getOrCreateBullQueue(queueNameIsEqualFilter);
                 items = (await bullQueue?.getJobs(jobTypes, skip, take)) ?? [];
 
-                items = (
-                    await Promise.all(
-                        items
-                            .filter(job => notNullOrUndefined(job.id))
-                            .map(job => {
-                                return this.findOneBullJob(
-                                    this.buildUniqueJobId(queueNameIsEqualFilter, job.id),
-                                );
-                            }),
-                    )
-                ).filter(notNullOrUndefined);
+                // items = (
+                //     await Promise.all(
+                //         items
+                //             .filter(job => notNullOrUndefined(job.id))
+                //             .map(job => {
+                //                 return this.findOneBullJob(
+                //                     this.buildUniqueJobId(
+                //                         this.getBullQueueName(queueNameIsEqualFilter),
+                //                         Number(job.id),
+                //                     ),
+                //                 );
+                //             }),
+                //     )
+                // ).filter(notNullOrUndefined);
 
                 const jobCounts = (await bullQueue?.getJobCounts(...jobTypes)) ?? 0;
                 totalItems = Object.values(jobCounts).reduce((sum, num) => sum + num, 0);
@@ -308,21 +375,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         };
     }
 
-    resolveQueueNameAndJobIdFromRedisKey(bullString: string) {
-        const regex = /^bull:vendure-queue-(.+):(\d+)$/;
-        const match = bullString.match(regex);
-
-        if (match) {
-            const queueName = match[1]; // Captured <queue-name>
-            const jobId = match[2]; // Captured <job-id>
-
-            return { queueName, jobId };
-        }
-
-        // If the string doesn't match the pattern
-        return undefined;
-    }
-
     async findManyById(ids: ID[]): Promise<Job[]> {
         let bullJobs: Bull.Job[] = [];
 
@@ -335,25 +387,27 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         return bullJobs as unknown as Job[];
     }
 
-    async findOne(id: ID): Promise<Job | undefined> {
-        const bullJob = await this.findOneBullJob(id);
+    async findOne(globalId: ID): Promise<Job | undefined> {
+        const bullJob = await this.getBullJobFromGlobalId(+globalId);
 
         if (bullJob) {
             return this.createVendureJob(bullJob);
         }
 
-        Logger.info(`Job with id ${id} not found`, loggerCtx);
+        Logger.info(`Job with id ${globalId} not found`, loggerCtx);
     }
 
-    private async findOneBullJob(id: ID) {
-        const [queueName, jobId] = typeof id == 'string' ? id.split('_') : [undefined, id];
+    private async getBullJobFromGlobalId(globalId: number): Promise<Bull.Job | undefined> {
+        // eslint-disable-next-line no-bitwise
+        const queueId = (globalId >> JOB_ID_BITS) & MAX_QUEUE_ID;
+        // eslint-disable-next-line no-bitwise
+        const jobId = globalId & ((1 << JOB_ID_BITS) - 1);
+        const queueName = Array.from(this.queueIds.entries()).find(([_, index]) => index === queueId)?.[0];
 
         if (!queueName) {
             return undefined;
         }
-
         const bullQueue = await this.getOrCreateBullQueue(queueName);
-
         return bullQueue?.getJob(jobId.toString());
     }
 
@@ -470,29 +524,51 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             this.stopped = true;
             try {
                 Logger.info(`Closing worker`, loggerCtx);
-
+                const gracefulShutdownTimeout = this.options.gracefulShutdownTimeout ?? 1000 * 60 * 10;
+                const startTime = Date.now();
                 let timer: NodeJS.Timeout;
-                const checkActive = async () => {
+                const checkActive = async (resolve: (value: boolean) => void) => {
+                    let activeCount = 0;
+                    const activeJobs: Bull.Job[] = [];
                     for (const queue of this.queues.values()) {
-                        const activeCount = await queue.getActiveCount();
-                        if (0 < activeCount) {
-                            const activeJobs = await queue.getActive();
-                            Logger.info(
-                                `Waiting on ${activeCount} active ${
-                                    activeCount > 1 ? 'jobs' : 'job'
-                                } (${activeJobs.map(j => j.id).join(', ')})...`,
+                        const queueActiveCount = await queue.getActiveCount();
+                        activeCount += queueActiveCount;
+                        if (0 < queueActiveCount) {
+                            const queueActiveJobs = await queue.getActive();
+                            activeJobs.push(...queueActiveJobs);
+                        }
+                    }
+                    if (0 < activeCount) {
+                        Logger.info(
+                            `Waiting on ${activeCount} active ${
+                                activeCount > 1 ? 'jobs' : 'job'
+                            } (${activeJobs.map(j => this.getGlobalId(j.queueName, Number(j.id))).join(', ')})...`,
+                            loggerCtx,
+                        );
+                        if (Date.now() - startTime > gracefulShutdownTimeout) {
+                            // If we've waited too long, just close the worker
+                            // timer = setTimeout(checkActive, 2000);
+                            Logger.warn(
+                                `The graceful shutdown timeout of ${gracefulShutdownTimeout}ms has been exceeded. ` +
+                                    `Setting ${activeCount} jobs as failed...`,
                                 loggerCtx,
                             );
-                            timer = setTimeout(checkActive, 2000);
+                            this.forceFailRunningJob$.next('all');
+                            Logger.warn('All active jobs set as failed', loggerCtx);
+                            resolve(false);
+                        } else {
+                            timer = setTimeout(() => checkActive(resolve), 2000);
                         }
+                    } else {
+                        resolve(true);
                     }
                 };
-                timer = setTimeout(checkActive, 2000);
+                const gracefullyStopped = await new Promise(resolve => checkActive(resolve));
 
                 await this.closeAllWorkers();
                 Logger.info(`Worker closed`, loggerCtx);
                 await this.closeAllQueues();
-                clearTimeout(timer);
+                // clearTimeout(timer);
                 Logger.info(`Queue closed`, loggerCtx);
                 this.cancellationSub.off('message', this.subscribeToCancellationEvents);
             } catch (e: any) {
@@ -501,9 +577,9 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
     }
 
-    private async closeAllWorkers() {
+    private async closeAllWorkers(force = false) {
         for (const worker of this.workers.values()) {
-            await worker.close();
+            await worker.close(force);
         }
     }
 
@@ -513,19 +589,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
     }
 
-    private async setActiveJobAsCancelled(jobId: string) {
+    private async setActiveJobAsCancelled(globalId: number) {
         // Not yet possible natively in BullMQ, see
         // https://github.com/taskforcesh/bullmq/issues/632
         // So we have our own custom method of marking a job as cancelled.
-        await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, jobId);
-        await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, jobId.toString());
+        await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, globalId.toString());
+        await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, globalId.toString());
     }
 
     private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
         const jobJson = bullJob.toJSON();
         return new Job({
             queueName: bullJob.name,
-            id: this.buildUniqueJobId(bullJob.queueName, bullJob.id),
+            id: this.getGlobalId(bullJob.queueName, Number(bullJob.id)),
             state: await this.getState(bullJob),
             data: bullJob.data,
             attempts: bullJob.attemptsMade,

+ 10 - 0
packages/job-queue-plugin/src/bullmq/types.ts

@@ -77,6 +77,16 @@ export interface BullMQPluginOptions {
      * @default 'exponential', 1000
      */
     setBackoff?: (queueName: string, job: Job) => BackoffOptions | undefined;
+    /**
+     * @description
+     * The maximum time in ms to wait for the graceful shutdown of the job queue
+     * when a shutdown event is received and there are jobs which are still
+     * running.
+     *
+     * @since 3.1.0
+     * @default 1000 * 60 * 10
+     */
+    gracefulShutdownTimeout?: number;
 }
 
 /**