Browse Source

feat(core): Add DB-based buffer storage support to DefaultJobQueuePlugin

Relates to #1137
Michael Bromley 4 years ago
parent
commit
f26ad4bdd2

+ 0 - 1
packages/core/src/job-queue/index.ts

@@ -3,7 +3,6 @@ export * from './job-buffer/in-memory-job-buffer-storage-strategy';
 export * from './job-buffer/job-buffer';
 export * from './job-buffer/job-buffer';
 export * from './job-buffer/job-buffer-storage-strategy';
-export * from './job-buffer/sql-job-buffer-storage-strategy';
 export * from './job';
 export * from './job-queue';
 export * from './job-queue.service';

+ 0 - 25
packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts

@@ -1,25 +0,0 @@
-import { Injector } from '../../common/injector';
-import { TransactionalConnection } from '../../connection/transactional-connection';
-import { Job } from '../job';
-
-import { JobBufferStorageStrategy } from './job-buffer-storage-strategy';
-
-export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
-    private connection: TransactionalConnection;
-
-    init(injector: Injector) {
-        this.connection = injector.get(TransactionalConnection);
-    }
-
-    add(processorId: string, job: Job): Promise<Job> {
-        return Promise.resolve(job);
-    }
-
-    bufferSize(processorIds?: string[]) {
-        return Promise.resolve({});
-    }
-
-    flush(processorIds?: string[]) {
-        return Promise.resolve({});
-    }
-}

+ 21 - 2
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -5,7 +5,9 @@ import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
 
+import { JobRecordBuffer } from './job-record-buffer.entity';
 import { JobRecord } from './job-record.entity';
+import { SqlJobBufferStorageStrategy } from './sql-job-buffer-storage-strategy';
 import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
 
 /**
@@ -21,6 +23,17 @@ export interface DefaultJobQueueOptions {
     concurrency?: number;
     backoffStrategy?: BackoffStrategy;
     setRetries?: (queueName: string, job: Job) => number;
+    /**
+     * @description
+     * If set to `true`, the database will be used to store buffered jobs. This is
+     * recommended for production.
+     *
+     * When enabled, a new `JobRecordBuffer` database entity will be defined which will
+     * require a migration when first enabling this option.
+     *
+     * @since 1.3.0
+     */
+    useDatabaseForBuffer?: boolean;
 }
 
 /**
@@ -115,7 +128,10 @@ export interface DefaultJobQueueOptions {
  */
 @VendurePlugin({
     imports: [PluginCommonModule],
-    entities: [JobRecord],
+    entities: () =>
+        DefaultJobQueuePlugin.options.useDatabaseForBuffer === true
+            ? [JobRecord, JobRecordBuffer]
+            : [JobRecord],
     configuration: config => {
         const { pollInterval, concurrency, backoffStrategy, setRetries } =
             DefaultJobQueuePlugin.options ?? {};
@@ -125,12 +141,15 @@ export interface DefaultJobQueueOptions {
             backoffStrategy,
             setRetries,
         });
+        if (DefaultJobQueuePlugin.options.useDatabaseForBuffer === true) {
+            config.jobQueueOptions.jobBufferStorageStrategy = new SqlJobBufferStorageStrategy();
+        }
         return config;
     },
 })
 export class DefaultJobQueuePlugin {
     /** @internal */
-    static options: DefaultJobQueueOptions;
+    static options: DefaultJobQueueOptions = {};
 
     static init(options: DefaultJobQueueOptions): Type<DefaultJobQueuePlugin> {
         DefaultJobQueuePlugin.options = options;

+ 18 - 0
packages/core/src/plugin/default-job-queue-plugin/job-record-buffer.entity.ts

@@ -0,0 +1,18 @@
+import { DeepPartial } from '@vendure/common/lib/shared-types';
+import { Column, Entity } from 'typeorm';
+
+import { VendureEntity } from '../../entity/base/base.entity';
+import { JobConfig } from '../../job-queue/types';
+
+@Entity()
+export class JobRecordBuffer extends VendureEntity {
+    constructor(input: DeepPartial<JobRecordBuffer>) {
+        super(input);
+    }
+
+    @Column()
+    bufferId: string;
+
+    @Column('simple-json')
+    job: JobConfig<any>;
+}

+ 75 - 0
packages/core/src/plugin/default-job-queue-plugin/sql-job-buffer-storage-strategy.ts

@@ -0,0 +1,75 @@
+import { Injector } from '../../common/injector';
+import { TransactionalConnection } from '../../connection/transactional-connection';
+import { Job } from '../../job-queue/job';
+import { JobBufferStorageStrategy } from '../../job-queue/job-buffer/job-buffer-storage-strategy';
+import { JobConfig } from '../../job-queue/types';
+
+import { JobRecordBuffer } from './job-record-buffer.entity';
+
+/**
+ * @description
+ * This stores the buffered jobs in the database.
+ */
+export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
+    private connection: TransactionalConnection;
+
+    init(injector: Injector) {
+        this.connection = injector.get(TransactionalConnection);
+    }
+
+    async add(bufferId: string, job: Job): Promise<Job> {
+        await this.connection.getRepository(JobRecordBuffer).save(
+            new JobRecordBuffer({
+                bufferId,
+                job: this.toJobConfig(job),
+            }),
+        );
+
+        return job;
+    }
+
+    async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
+        const rows = await this.connection
+            .getRepository(JobRecordBuffer)
+            .createQueryBuilder('record')
+            .select(`COUNT(*)`, 'count')
+            .addSelect(`record.bufferId`, 'bufferId')
+            .groupBy('record.bufferId')
+            .getRawMany();
+
+        const result: { [bufferId: string]: number } = {};
+        for (const row of rows) {
+            result[row.bufferId] = +row.count;
+        }
+        return result;
+    }
+
+    async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
+        const selectQb = this.connection.getRepository(JobRecordBuffer).createQueryBuilder('record');
+        if (bufferIds?.length) {
+            selectQb.where(`record.bufferId IN (:...bufferIds)`, { bufferIds });
+        }
+        const rows = await selectQb.getMany();
+        const result: { [bufferId: string]: Job[] } = {};
+        for (const row of rows) {
+            if (!result[row.bufferId]) {
+                result[row.bufferId] = [];
+            }
+            result[row.bufferId].push(new Job(row.job));
+        }
+        const deleteQb = this.connection.rawConnection.createQueryBuilder().delete().from(JobRecordBuffer);
+        if (bufferIds?.length) {
+            deleteQb.where(`bufferId IN (:...bufferIds)`, { bufferIds });
+        }
+        await deleteQb.execute();
+        return result;
+    }
+
+    private toJobConfig(job: Job<any>): JobConfig<any> {
+        return {
+            ...job,
+            data: job.data,
+            id: job.id ?? undefined,
+        };
+    }
+}

+ 2 - 0
packages/core/src/plugin/index.ts

@@ -1,5 +1,7 @@
 export * from './default-search-plugin/default-search-plugin';
 export * from './default-job-queue-plugin/default-job-queue-plugin';
+export * from './default-job-queue-plugin/job-record-buffer.entity';
+export * from './default-job-queue-plugin/sql-job-buffer-storage-strategy';
 export * from './vendure-plugin';
 export * from './plugin-common.module';
 export * from './plugin-utils';