Преглед на файлове

feat(core): Extract SQL-based JobQueueStrategy in a bundled plugin

By default, Vendure will use an in-memory JobQueueStrategy. For production the DefaultJobQueuePlugin should be used which will persist the job queue to the SQL database.
Michael Bromley преди 5 години
родител
ревизия
a2069f6375

+ 2 - 2
packages/core/src/config/default-config.ts

@@ -3,7 +3,7 @@ import { LanguageCode } from '@vendure/common/lib/generated-types';
 import { DEFAULT_AUTH_TOKEN_HEADER_KEY } from '@vendure/common/lib/shared-constants';
 
 import { generatePublicId } from '../common/generate-public-id';
-import { SqlJobQueueStrategy } from '../job-queue/sql-job-queue-strategy';
+import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy';
 
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
 import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
@@ -94,7 +94,7 @@ export const defaultConfig: RuntimeVendureConfig = {
         },
     },
     jobQueueOptions: {
-        jobQueueStrategy: new SqlJobQueueStrategy(),
+        jobQueueStrategy: new InMemoryJobQueueStrategy(),
         pollInterval: 200,
     },
     customFields: {

+ 2 - 0
packages/core/src/config/vendure-config.ts

@@ -389,6 +389,8 @@ export interface JobQueueOptions {
     /**
      * @description
      * Defines how the jobs in the queue are persisted and accessed.
+     *
+     * @default InMemoryJobQueueStrategy
      */
     jobQueueStrategy?: JobQueueStrategy;
     /**

+ 0 - 2
packages/core/src/entity/entities.ts

@@ -17,7 +17,6 @@ import { Fulfillment } from './fulfillment/fulfillment.entity';
 import { GlobalSettings } from './global-settings/global-settings.entity';
 import { HistoryEntry } from './history-entry/history-entry.entity';
 import { OrderHistoryEntry } from './history-entry/order-history-entry.entity';
-import { JobRecord } from './job-record/job-record.entity';
 import { OrderItem } from './order-item/order-item.entity';
 import { OrderLine } from './order-line/order-line.entity';
 import { Order } from './order/order.entity';
@@ -75,7 +74,6 @@ export const coreEntitiesMap = {
     Fulfillment,
     GlobalSettings,
     HistoryEntry,
-    JobRecord,
     Order,
     OrderHistoryEntry,
     OrderItem,

+ 124 - 0
packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts

@@ -0,0 +1,124 @@
+/* tslint:disable:no-non-null-assertion */
+import { JobListOptions, SortOrder } from '@vendure/common/lib/generated-types';
+
+import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy';
+import { Job } from './job';
+
+describe('InMemoryJobQueueStrategy', () => {
+    let strategy: InMemoryJobQueueStrategy;
+    beforeEach(() => {
+        strategy = new InMemoryJobQueueStrategy();
+    });
+
+    describe('findMany options', () => {
+        beforeEach(() => {
+            strategy.add(
+                new Job({
+                    id: 'video-1',
+                    queueName: 'video',
+                    data: {},
+                    createdAt: new Date('2020-04-03T10:00:00.000Z'),
+                }),
+            );
+            strategy.add(
+                new Job({
+                    id: 'video-2',
+                    queueName: 'video',
+                    data: {},
+                    createdAt: new Date('2020-04-03T10:01:00.000Z'),
+                }),
+            );
+            strategy.add(
+                new Job({
+                    id: 'email-1',
+                    queueName: 'email',
+                    data: {},
+                    createdAt: new Date('2020-04-03T10:02:00.000Z'),
+                }),
+            );
+            strategy.add(
+                new Job({
+                    id: 'video-3',
+                    queueName: 'video',
+                    data: {},
+                    createdAt: new Date('2020-04-03T10:03:00.000Z'),
+                }),
+            );
+            strategy.add(
+                new Job({
+                    id: 'email-2',
+                    queueName: 'email',
+                    data: {},
+                    createdAt: new Date('2020-04-03T10:04:00.000Z'),
+                }),
+            );
+        });
+
+        async function getIdResultsFor(options: JobListOptions): Promise<string[]> {
+            const result = await strategy.findMany(options);
+            return result.items.map((j) => j.id as string);
+        }
+
+        it('take & skip', async () => {
+            expect(await getIdResultsFor({ take: 1 })).toEqual(['video-1']);
+            expect(await getIdResultsFor({ take: 1, skip: 1 })).toEqual(['video-2']);
+            expect(await getIdResultsFor({ take: 10, skip: 2 })).toEqual(['email-1', 'video-3', 'email-2']);
+        });
+
+        it('sort createdAt', async () => {
+            expect(await getIdResultsFor({ sort: { createdAt: SortOrder.DESC } })).toEqual([
+                'email-2',
+                'video-3',
+                'email-1',
+                'video-2',
+                'video-1',
+            ]);
+            expect(await getIdResultsFor({ sort: { createdAt: SortOrder.ASC } })).toEqual([
+                'video-1',
+                'video-2',
+                'email-1',
+                'video-3',
+                'email-2',
+            ]);
+        });
+
+        it('sort id', async () => {
+            expect(await getIdResultsFor({ sort: { id: SortOrder.DESC } })).toEqual([
+                'video-3',
+                'video-2',
+                'video-1',
+                'email-2',
+                'email-1',
+            ]);
+            expect(await getIdResultsFor({ sort: { id: SortOrder.ASC } })).toEqual([
+                'email-1',
+                'email-2',
+                'video-1',
+                'video-2',
+                'video-3',
+            ]);
+        });
+
+        it('filter queueName', async () => {
+            expect(await getIdResultsFor({ filter: { queueName: { eq: 'video' } } })).toEqual([
+                'video-1',
+                'video-2',
+                'video-3',
+            ]);
+
+            expect(await getIdResultsFor({ filter: { queueName: { contains: 'vid' } } })).toEqual([
+                'video-1',
+                'video-2',
+                'video-3',
+            ]);
+        });
+
+        it('filter isSettled', async () => {
+            const video1 = await strategy.findOne('video-1');
+            video1?.complete();
+            await strategy.update(video1!);
+
+            expect(await getIdResultsFor({ filter: { isSettled: { eq: true } } })).toEqual(['video-1']);
+        });
+    });
+});

+ 177 - 0
packages/core/src/job-queue/in-memory-job-queue-strategy.ts

@@ -0,0 +1,177 @@
+import {
+    DateOperators,
+    JobFilterParameter,
+    JobListOptions,
+    JobSortParameter,
+    JobState,
+    NumberOperators,
+    StringOperators,
+} from '@vendure/common/lib/generated-types';
+import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
+import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
+
+import { generatePublicId } from '../common/generate-public-id';
+import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
+import { Logger } from '../config/logger/vendure-logger';
+
+import { Job } from './job';
+
+/**
+ * @description
+ * An in-memory {@link JobQueueStrategy}. This is the default strategy if not using a dedicated
+ * JobQueue plugin (e.g. {@link DefaultJobQueuePlugin}). Not recommended for production, since
+ * the queue will be cleared when the server stops.
+ * Completed jobs will be evicted from the store every 2 hours to prevent a memory leak.
+ *
+ * @docsCategory JobQueue
+ */
+export class InMemoryJobQueueStrategy implements JobQueueStrategy {
+    protected jobs = new Map<ID, Job>();
+    protected unsettledJobs: { [queueName: string]: Job[] } = {};
+    private timer: any;
+    private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours
+
+    init() {
+        this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
+    }
+
+    destroy() {
+        clearTimeout(this.timer);
+    }
+
+    async add(job: Job): Promise<Job> {
+        if (!job.id) {
+            (job as any).id = generatePublicId();
+        }
+        // tslint:disable-next-line:no-non-null-assertion
+        this.jobs.set(job.id!, job);
+        if (!this.unsettledJobs[job.queueName]) {
+            this.unsettledJobs[job.queueName] = [];
+        }
+        this.unsettledJobs[job.queueName].push(job);
+        return job;
+    }
+
+    async findOne(id: ID): Promise<Job | undefined> {
+        return this.jobs.get(id);
+    }
+
+    async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
+        let items = [...this.jobs.values()];
+        if (options) {
+            if (options.sort) {
+                items = this.applySort(items, options.sort);
+            }
+            if (options.filter) {
+                items = this.applyFilters(items, options.filter);
+            }
+            if (options.skip || options.take) {
+                items = this.applyPagination(items, options.skip, options.take);
+            }
+        }
+        return {
+            items,
+            totalItems: items.length,
+        };
+    }
+
+    async findManyById(ids: ID[]): Promise<Job[]> {
+        return ids.map((id) => this.jobs.get(id)).filter(notNullOrUndefined);
+    }
+
+    async next(queueName: string): Promise<Job | undefined> {
+        const next = this.unsettledJobs[queueName]?.shift();
+        if (next) {
+            next.start();
+            return next;
+        }
+    }
+
+    async update(job: Job): Promise<void> {
+        if (job.state === JobState.RETRYING || job.state === JobState.PENDING) {
+            this.unsettledJobs[job.queueName].unshift(job);
+        }
+        // tslint:disable-next-line:no-non-null-assertion
+        this.jobs.set(job.id!, job);
+    }
+
+    private applySort(items: Job[], sort: JobSortParameter): Job[] {
+        for (const [prop, direction] of Object.entries(sort)) {
+            const key = prop as keyof Required<JobSortParameter>;
+            const dir = direction === 'ASC' ? -1 : 1;
+            items = items.sort((a, b) => ((a[key] || 0) < (b[key] || 0) ? 1 * dir : -1 * dir));
+        }
+        return items;
+    }
+
+    private applyFilters(items: Job[], filters: JobFilterParameter): Job[] {
+        for (const [prop, operator] of Object.entries(filters)) {
+            const key = prop as keyof Required<JobFilterParameter>;
+            if (operator?.eq !== undefined) {
+                items = items.filter((i) => i[key] === operator.eq);
+            }
+
+            const contains = (operator as StringOperators)?.contains;
+            if (contains) {
+                items = items.filter((i) => (i[key] as string).includes(contains));
+            }
+            const gt = (operator as NumberOperators)?.gt;
+            if (gt) {
+                items = items.filter((i) => (i[key] as number) > gt);
+            }
+            const gte = (operator as NumberOperators)?.gte;
+            if (gte) {
+                items = items.filter((i) => (i[key] as number) >= gte);
+            }
+            const lt = (operator as NumberOperators)?.lt;
+            if (lt) {
+                items = items.filter((i) => (i[key] as number) < lt);
+            }
+            const lte = (operator as NumberOperators)?.lte;
+            if (lte) {
+                items = items.filter((i) => (i[key] as number) <= lte);
+            }
+            const before = (operator as DateOperators)?.before;
+            if (before) {
+                items = items.filter((i) => (i[key] as Date) <= before);
+            }
+            const after = (operator as DateOperators)?.after;
+            if (after) {
+                items = items.filter((i) => (i[key] as Date) >= after);
+            }
+            const between = (operator as NumberOperators)?.between;
+            if (between) {
+                items = items.filter((i) => {
+                    const num = i[key] as number;
+                    return num > between.start && num < between.end;
+                });
+            }
+        }
+        return items;
+    }
+
+    private applyPagination(items: Job[], skip?: number | null, take?: number | null): Job[] {
+        const start = skip || 0;
+        const end = take != null ? start + take : undefined;
+        return items.slice(start, end);
+    }
+
+    /**
+     * Delete old jobs from the `jobs` Map if they are settled and older than the value
+     * defined in `this.pruneJobsAfterMs`. This prevents a memory leak as the job queue
+     * grows indefinitely.
+     */
+    private evictSettledJobs = () => {
+        for (const job of this.jobs.values()) {
+            if (job.isSettled) {
+                const settledAtMs = job.settledAt ? +job.settledAt : 0;
+                const nowMs = +new Date();
+                if (nowMs - settledAtMs > this.evictJobsAfterMs) {
+                    // tslint:disable-next-line:no-non-null-assertion
+                    this.jobs.delete(job.id!);
+                }
+            }
+        }
+        this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
+    };
+}

+ 1 - 0
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -6,6 +6,7 @@ import { ConfigService } from '../config/config.service';
 import { ProcessContext, ServerProcessContext } from '../process-context/process-context';
 
 import { Job } from './job';
+import { JobQueue } from './job-queue';
 import { JobQueueService } from './job-queue.service';
 import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
 

+ 5 - 55
packages/core/src/job-queue/testing-job-queue-strategy.ts

@@ -1,64 +1,14 @@
-import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
-import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
-
-import { generatePublicId } from '../common/generate-public-id';
-import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
-
+import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy';
 import { Job } from './job';
 
 /**
  * @description
- * An in-memory {@link JobQueueStrategy} design for testing purposes. Not to be used in production
- * since all jobs are lost when the server stops.
+ * An in-memory {@link JobQueueStrategy} design for testing purposes.
  */
-export class TestingJobQueueStrategy implements JobQueueStrategy {
-    private jobs: Job[] = [];
-
+export class TestingJobQueueStrategy extends InMemoryJobQueueStrategy {
     prePopulate(jobs: Job[]) {
-        this.jobs.push(...jobs);
-    }
-
-    async add(job: Job): Promise<Job> {
-        (job as any).id = generatePublicId();
-        this.jobs.push(job);
-        return job;
-    }
-
-    async findOne(id: ID): Promise<Job | undefined> {
-        return this.jobs.find((j) => j.id === id);
-    }
-
-    async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
-        // The sort, filter, paginate logic is not implemented because
-        // it is not needed for testing purposes.
-        const items = this.jobs;
-        return {
-            items,
-            totalItems: items.length,
-        };
-    }
-
-    async findManyById(ids: ID[]): Promise<Job[]> {
-        return this.jobs.filter((job) => job.id && ids.includes(job.id));
-    }
-
-    async next(queueName: string): Promise<Job | undefined> {
-        const next = this.jobs.find((job) => {
-            return (
-                (job.state === JobState.PENDING || job.state === JobState.RETRYING) &&
-                job.queueName === queueName
-            );
-        });
-        if (next) {
-            next.start();
-            return next;
-        }
-    }
-
-    async update(job: Job): Promise<void> {
-        const index = this.jobs.findIndex((j) => j.id === job.id);
-        if (-1 < index) {
-            this.jobs.splice(index, 1, job);
+        for (const job of jobs) {
+            this.add(job);
         }
     }
 }

+ 21 - 0
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -0,0 +1,21 @@
+import { PluginCommonModule } from '../plugin-common.module';
+import { VendurePlugin } from '../vendure-plugin';
+
+import { JobRecord } from './job-record.entity';
+import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
+
+/**
+ * @description
+ * A plugin which configures Vendure to use the SQL database to persist the JobQueue jobs.
+ *
+ * @docsCategory JobQueue
+ */
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    entities: [JobRecord],
+    configuration: (config) => {
+        config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy();
+        return config;
+    },
+})
+export class DefaultJobQueuePlugin {}

+ 1 - 1
packages/core/src/entity/job-record/job-record.entity.ts → packages/core/src/plugin/default-job-queue-plugin/job-record.entity.ts

@@ -2,7 +2,7 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { DeepPartial } from '@vendure/common/lib/shared-types';
 import { Column, Entity } from 'typeorm';
 
-import { VendureEntity } from '../base/base.entity';
+import { VendureEntity } from '../../entity/base/base.entity';
 
 @Entity()
 export class JobRecord extends VendureEntity {

+ 5 - 5
packages/core/src/job-queue/sql-job-queue-strategy.ts → packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -4,12 +4,12 @@ import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
 import { Brackets, Connection } from 'typeorm';
 
-import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
-import { JobRecord } from '../entity/job-record/job-record.entity';
-import { ProcessContext } from '../process-context/process-context';
-import { ListQueryBuilder } from '../service/helpers/list-query-builder/list-query-builder';
+import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy';
+import { Job } from '../../job-queue/job';
+import { ProcessContext } from '../../process-context/process-context';
+import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';
 
-import { Job } from './job';
+import { JobRecord } from './job-record.entity';
 
 export class SqlJobQueueStrategy implements JobQueueStrategy {
     private connection: Connection | undefined;

+ 1 - 0
packages/core/src/plugin/index.ts

@@ -1,4 +1,5 @@
 export * from './default-search-plugin/default-search-plugin';
+export * from './default-job-queue-plugin/default-job-queue-plugin';
 export * from './vendure-plugin';
 export * from './plugin-common.module';
 export { createProxyHandler, ProxyOptions } from './plugin-utils';

+ 2 - 0
packages/dev-server/dev-config.ts

@@ -3,6 +3,7 @@ import { AdminUiPlugin } from '@vendure/admin-ui-plugin';
 import { AssetServerPlugin } from '@vendure/asset-server-plugin';
 import { ADMIN_API_PATH, API_PORT, SHOP_API_PATH } from '@vendure/common/lib/shared-constants';
 import {
+    DefaultJobQueuePlugin,
     DefaultLogger,
     DefaultSearchPlugin,
     examplePaymentHandler,
@@ -51,6 +52,7 @@ export const devConfig: VendureConfig = {
             port: 5002,
         }),
         DefaultSearchPlugin,
+        DefaultJobQueuePlugin,
         // ElasticsearchPlugin.init({
         //     host: 'http://192.168.99.100',
         //     port: 9200,