Просмотр исходного кода

WIP: still buggy, need to revisit

Michael Bromley 1 год назад
Родитель
Сommit
0367a0a5e9

+ 1 - 0
packages/core/src/config/default-config.ts

@@ -190,6 +190,7 @@ export const defaultConfig: RuntimeVendureConfig = {
         jobQueueStrategy: new InMemoryJobQueueStrategy(),
         jobBufferStorageStrategy: new InMemoryJobBufferStorageStrategy(),
         activeQueues: [],
+        excludedQueues: [],
         prefix: '',
     },
     customFields: {

+ 15 - 1
packages/core/src/config/vendure-config.ts

@@ -934,12 +934,26 @@ export interface JobQueueOptions {
      * @description
      * Defines the queues that will run in this process.
      * This can be used to configure only certain queues to run in this process.
-     * If its empty all queues will be run. Note: this option is primarily intended
+     *
+     * If its empty all queues will be run, except for any queues
+     * listed in `excludedQueues`.
+     *
+     * Note: this option is primarily intended
      * to apply to the Worker process. Jobs will _always_ get published to the queue
      * regardless of this setting, but this setting determines whether they get
      * _processed_ or not.
      */
     activeQueues?: string[];
+    /**
+     * @description
+     * Defines the queues that will be excluded from running in this process.
+     * Any queue in this list will not be processed by the worker process.
+     *
+     * If a queue is in both `activeQueues` and `excludeQueues`, it will be excluded.
+     *
+     * @since 3.1.0
+     */
+    excludedQueues?: string[];
     /**
      * @description
      * Prefixes all job queue names with the passed string. This is useful with multiple deployments

+ 5 - 0
packages/core/src/job-queue/job-queue.service.ts

@@ -199,6 +199,11 @@ export class JobQueueService implements OnModuleDestroy {
     }
 
     private shouldStartQueue(queueName: string): boolean {
+        if (this.configService.jobQueueOptions.excludedQueues.length > 0) {
+            if (this.configService.jobQueueOptions.excludedQueues.includes(queueName)) {
+                return false;
+            }
+        }
         if (this.configService.jobQueueOptions.activeQueues.length > 0) {
             if (!this.configService.jobQueueOptions.activeQueues.includes(queueName)) {
                 return false;

+ 4 - 2
packages/dev-server/dev-config.ts

@@ -12,6 +12,7 @@ import {
     LogLevel,
     VendureConfig,
 } from '@vendure/core';
+import { PluginWithJobQueue } from '@vendure/core/e2e/fixtures/test-plugins/with-job-queue';
 import { ElasticsearchPlugin } from '@vendure/elasticsearch-plugin';
 import { defaultEmailHandlers, EmailPlugin, FileBasedTemplateLoader } from '@vendure/email-plugin';
 import { BullMQJobQueuePlugin } from '@vendure/job-queue-plugin/package/bullmq';
@@ -72,14 +73,15 @@ export const devConfig: VendureConfig = {
         //     platformFeePercent: 10,
         //     platformFeeSKU: 'FEE',
         // }),
+        PluginWithJobQueue,
         AssetServerPlugin.init({
             route: 'assets',
             assetUploadDir: path.join(__dirname, 'assets'),
         }),
         DefaultSearchPlugin.init({ bufferUpdates: false, indexStockStatus: false }),
         // Enable if you need to debug the job queue
-        // BullMQJobQueuePlugin.init({}),
-        DefaultJobQueuePlugin.init({}),
+        BullMQJobQueuePlugin.init({}),
+        // DefaultJobQueuePlugin.init({}),
         // JobQueueTestPlugin.init({ queueCount: 10 }),
         // ElasticsearchPlugin.init({
         //     host: 'http://localhost',

+ 8 - 2
packages/dev-server/index-worker.ts

@@ -1,8 +1,14 @@
-import { bootstrapWorker } from '@vendure/core';
+import { bootstrapWorker, mergeConfig } from '@vendure/core';
 
 import { devConfig } from './dev-config';
 
-bootstrapWorker(devConfig)
+bootstrapWorker(
+    mergeConfig(devConfig, {
+        jobQueueOptions: {
+            // excludedQueues: ['update-search-index'],
+        },
+    }),
+)
     .then(worker => worker.startJobQueue())
     // .then(worker => worker.startHealthCheckServer({ port: 3001 }))
     .catch(err => {

+ 17 - 47
packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts

@@ -16,21 +16,17 @@ import {
 import Bull, { ConnectionOptions, JobType, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
 import { EventEmitter } from 'events';
 import { Cluster, Redis, RedisOptions } from 'ioredis';
-import { firstValueFrom, Subject, Subscription, lastValueFrom } from 'rxjs';
-import { map, tap, filter, takeUntil, debounceTime } from 'rxjs/operators';
+import { firstValueFrom, Subject, Subscription } from 'rxjs';
+import { map, takeUntil } from 'rxjs/operators';
 
 import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
+import { getGlobalId, MAX_QUEUE_ID, parseGlobalId } from './global-id';
 import { RedisHealthIndicator } from './redis-health-indicator';
 import { BullMQPluginOptions } from './types';
 
 const QUEUE_NAME_PREFIX = 'vendure-queue-';
 const DEFAULT_CONCURRENCY = 3;
 
-const QUEUE_ID_BITS = 12; // 12 bits for the queue ID (supports 4096 queues)
-const JOB_ID_BITS = 41; // 41 bits for the job ID (supports ~2 trillion jobs per queue)
-// eslint-disable-next-line no-bitwise
-const MAX_QUEUE_ID = (1 << QUEUE_ID_BITS) - 1; // Max queue ID (65535)
-
 export class GracefulShutdownTimeoutError extends Error {
     constructor(message: string) {
         super(message);
@@ -290,19 +286,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
         }
     }
 
-    getGlobalId(queueName: string, jobId: number) {
-        const queueID = this.queueIds.get(queueName);
-        if (queueID == null) {
-            throw new Error(`Queue "${queueName}" not found`);
-        }
-        // eslint-disable-next-line no-bitwise
-        if (jobId >= 1 << JOB_ID_BITS) {
-            throw new Error('Job ID exceeds maximum allowed value');
-        }
-        // eslint-disable-next-line no-bitwise
-        return (queueID << JOB_ID_BITS) | jobId;
-    }
-
     async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
         const skip = options?.skip ?? 0;
         const take = options?.take ?? 10;
@@ -346,22 +329,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
             if (queueNameIsEqualFilter) {
                 const bullQueue = await this.getOrCreateBullQueue(queueNameIsEqualFilter);
                 items = (await bullQueue?.getJobs(jobTypes, skip, take)) ?? [];
-
-                // items = (
-                //     await Promise.all(
-                //         items
-                //             .filter(job => notNullOrUndefined(job.id))
-                //             .map(job => {
-                //                 return this.findOneBullJob(
-                //                     this.buildUniqueJobId(
-                //                         this.getBullQueueName(queueNameIsEqualFilter),
-                //                         Number(job.id),
-                //                     ),
-                //                 );
-                //             }),
-                //     )
-                // ).filter(notNullOrUndefined);
-
                 const jobCounts = (await bullQueue?.getJobCounts(...jobTypes)) ?? 0;
                 totalItems = Object.values(jobCounts).reduce((sum, num) => sum + num, 0);
             }
@@ -398,10 +365,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
     }
 
     private async getBullJobFromGlobalId(globalId: number): Promise<Bull.Job | undefined> {
-        // eslint-disable-next-line no-bitwise
-        const queueId = (globalId >> JOB_ID_BITS) & MAX_QUEUE_ID;
-        // eslint-disable-next-line no-bitwise
-        const jobId = globalId & ((1 << JOB_ID_BITS) - 1);
+        const { queueId, jobId } = parseGlobalId(globalId);
         const queueName = Array.from(this.queueIds.entries()).find(([_, index]) => index === queueId)?.[0];
 
         if (!queueName) {
@@ -526,7 +490,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                 Logger.info(`Closing worker`, loggerCtx);
                 const gracefulShutdownTimeout = this.options.gracefulShutdownTimeout ?? 1000 * 60 * 10;
                 const startTime = Date.now();
-                let timer: NodeJS.Timeout;
                 const checkActive = async (resolve: (value: boolean) => void) => {
                     let activeCount = 0;
                     const activeJobs: Bull.Job[] = [];
@@ -542,12 +505,16 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                         Logger.info(
                             `Waiting on ${activeCount} active ${
                                 activeCount > 1 ? 'jobs' : 'job'
-                            } (${activeJobs.map(j => this.getGlobalId(j.queueName, Number(j.id))).join(', ')})...`,
+                            } (${activeJobs
+                                .map(j => {
+                                    const queueId = this.queueIds.get(j.queueName);
+                                    return queueId ? getGlobalId(queueId, Number(j.id)) : 'unknown queue';
+                                })
+                                .join(', ')})...`,
                             loggerCtx,
                         );
                         if (Date.now() - startTime > gracefulShutdownTimeout) {
                             // If we've waited too long, just close the worker
-                            // timer = setTimeout(checkActive, 2000);
                             Logger.warn(
                                 `The graceful shutdown timeout of ${gracefulShutdownTimeout}ms has been exceeded. ` +
                                     `Setting ${activeCount} jobs as failed...`,
@@ -557,18 +524,17 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
                             Logger.warn('All active jobs set as failed', loggerCtx);
                             resolve(false);
                         } else {
-                            timer = setTimeout(() => checkActive(resolve), 2000);
+                            setTimeout(() => checkActive(resolve), 2000);
                         }
                     } else {
                         resolve(true);
                     }
                 };
-                const gracefullyStopped = await new Promise(resolve => checkActive(resolve));
+                await new Promise(resolve => checkActive(resolve));
 
                 await this.closeAllWorkers();
                 Logger.info(`Worker closed`, loggerCtx);
                 await this.closeAllQueues();
-                // clearTimeout(timer);
                 Logger.info(`Queue closed`, loggerCtx);
                 this.cancellationSub.off('message', this.subscribeToCancellationEvents);
             } catch (e: any) {
@@ -599,9 +565,13 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
 
     private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
         const jobJson = bullJob.toJSON();
+        const queueId = this.queueIds.get(this.getBullQueueName(bullJob.queueName));
+        if (queueId == null) {
+            throw new InternalServerError(`Queue ID not found for queue ${bullJob.queueName}`);
+        }
         return new Job({
             queueName: bullJob.name,
-            id: this.getGlobalId(bullJob.queueName, Number(bullJob.id)),
+            id: getGlobalId(queueId, Number(bullJob.id)),
             state: await this.getState(bullJob),
             data: bullJob.data,
             attempts: bullJob.attemptsMade,

+ 18 - 0
packages/job-queue-plugin/src/bullmq/global-id.spec.ts

@@ -0,0 +1,18 @@
+import { describe, expect, it } from 'vitest';
+
+import { getGlobalId, parseGlobalId } from './global-id';
+
+describe('global id functions', () => {
+    it('works', () => {
+        // repeat the above for 10,000 randomly generated values
+        for (let i = 0; i < 10_000; i++) {
+            const queueId = Math.floor(Math.random() * 65535);
+            const jobId = Math.floor(Math.random() * 2_000_000_000);
+            const globalId = getGlobalId(queueId, jobId);
+            const parsed = parseGlobalId(globalId);
+
+            expect(parsed.queueId).toBe(queueId);
+            expect(parsed.jobId).toBe(jobId);
+        }
+    });
+});

+ 65 - 0
packages/job-queue-plugin/src/bullmq/global-id.ts

@@ -0,0 +1,65 @@
+// const QUEUE_ID_BITS = 12; // 12 bits for the queue ID (supports 4096 queues)
+// const JOB_ID_BITS = 41; // 41 bits for the job ID (supports ~2 trillion jobs per queue)
+// // eslint-disable-next-line no-bitwise
+// export const MAX_QUEUE_ID = (1 << QUEUE_ID_BITS) - 1; // Max queue ID (65535)
+//
+// /**
+//  * Combines a queueId and jobId into a single number, which will be unique across
+//  * all queues.
+//  *
+//  * To generate globally unique integer IDs for jobs across multiple queues while being able
+//  * to derive the queue and job ID from the global ID, you can combine the queue ID and job ID
+//  * into a single number using bitwise operations.
+//  */
+// export function getGlobalId(queueId: number, jobId: number) {
+//     if (queueId == null) {
+//         throw new Error(`Queue not found`);
+//     }
+//     // eslint-disable-next-line no-bitwise
+//     if (jobId >= 1 << JOB_ID_BITS) {
+//         throw new Error('Job ID exceeds maximum allowed value');
+//     }
+//     // eslint-disable-next-line no-bitwise
+//     return (queueId << JOB_ID_BITS) | jobId;
+// }
+//
+// /**
+//  * Splits a global ID into its queueId and jobId components.
+//  */
+// export function parseGlobalId(globalId: number) {
+//     // eslint-disable-next-line no-bitwise
+//     const queueId = (globalId >> JOB_ID_BITS) & MAX_QUEUE_ID;
+//     // eslint-disable-next-line no-bitwise
+//     const jobId = globalId & ((1 << JOB_ID_BITS) - 1);
+//     return { queueId, jobId };
+// }
+
+// import { ParsedGlobalId } from './types';
+// import { validateId } from './validation';
+
+export const MAX_QUEUE_ID = Math.pow(2, 21) - 1; // 2,097,151
+const SHIFT_BITS = 32;
+
+/**
+ * Combines queueId and jobId into a global identifier
+ * @param queueId - The queue identifier
+ * @param jobId - The job identifier
+ * @returns The combined global identifier as a number
+ */
+export function getGlobalId(queueId: number, jobId: number): number {
+    // Shift queueId left by 32 bits and combine with jobId
+    return queueId * Math.pow(2, SHIFT_BITS) + jobId;
+}
+
+/**
+ * Parses a global identifier back into its queueId and jobId components
+ * @param globalId - The global identifier to parse
+ * @returns The parsed queue and job IDs
+ */
+export function parseGlobalId(globalId: number) {
+    // Extract queueId and jobId using bit operations
+    const queueId = Math.floor(globalId / Math.pow(2, SHIFT_BITS));
+    const jobId = globalId % Math.pow(2, SHIFT_BITS);
+
+    return { queueId, jobId };
+}