Browse Source

fix(core): Make SqlJobQueueStrategy concurrency-safe

Michael Bromley 4 years ago
parent
commit
5e5e55a0a0

+ 6 - 1
packages/core/src/job-queue/job.ts

@@ -74,7 +74,12 @@ export class Job<T extends JobData<T> = any> {
     }
 
     get isSettled(): boolean {
-        return !!this._settledAt;
+        return (
+            !!this._settledAt &&
+            (this._state === JobState.COMPLETED ||
+                this._state === JobState.FAILED ||
+                this._state === JobState.CANCELLED)
+        );
     }
 
     get startedAt(): Date | undefined {

+ 31 - 2
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -1,6 +1,6 @@
 import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
-import { Brackets, Connection, FindConditions, In, LessThan } from 'typeorm';
+import { Brackets, Connection, EntityManager, FindConditions, In, LessThan } from 'typeorm';
 
 import { Injector } from '../../common/injector';
 import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
@@ -46,7 +46,32 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         if (!this.connectionAvailable(this.connection)) {
             throw new Error('Connection not available');
         }
-        const record = await this.connection
+        const connection = this.connection;
+        const connectionType = this.connection.options.type;
+
+        return new Promise(async (resolve, reject) => {
+            if (connectionType === 'sqlite' || connectionType === 'sqljs') {
+                // SQLite driver does not support concurrent transactions. See https://github.com/typeorm/typeorm/issues/1884
+                const result = await this.getNextAndSetAsRunning(connection.manager, queueName);
+                resolve(result);
+            } else {
+                // Selecting the next job is wrapped in a transaction so that we can
+                // set a lock on that row and immediately update the status to "RUNNING".
+                // This prevents multiple worker processes from taking the same job when
+                // running concurrent workers.
+                connection.transaction(async transactionManager => {
+                    const result = await this.getNextAndSetAsRunning(transactionManager, queueName);
+                    resolve(result);
+                });
+            }
+        });
+    }
+
+    private async getNextAndSetAsRunning(
+        manager: EntityManager,
+        queueName: string,
+    ): Promise<Job | undefined> {
+        const record = await manager
             .getRepository(JobRecord)
             .createQueryBuilder('record')
             .where('record.queueName = :queueName', { queueName })
@@ -62,7 +87,11 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         if (record) {
             const job = this.fromRecord(record);
             job.start();
+            record.state = JobState.RUNNING;
+            await manager.getRepository(JobRecord).save(record);
             return job;
+        } else {
+            return;
         }
     }
 

+ 65 - 0
packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts

@@ -0,0 +1,65 @@
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { Args, Mutation, Resolver } from '@nestjs/graphql';
+import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
+import { gql } from 'apollo-server-core';
+
+@Injectable()
+export class JobQueueTestService implements OnModuleInit {
+    private myQueue: JobQueue<{ intervalMs: number; shouldFail: boolean }>;
+
+    constructor(private jobQueueService: JobQueueService) {}
+
+    async onModuleInit() {
+        this.myQueue = await this.jobQueueService.createQueue({
+            name: 'my-queue',
+            process: async job => {
+                Logger.info(`Starting job ${job.id}, shouldFail: ${JSON.stringify(job.data.shouldFail)}`);
+                let progress = 0;
+                while (progress < 100) {
+                    // Logger.info(`Job ${job.id} progress: ${progress}`);
+                    await new Promise(resolve => setTimeout(resolve, job.data.intervalMs));
+                    progress += 10;
+                    job.setProgress(progress);
+                    if (progress > 70 && job.data.shouldFail) {
+                        Logger.warn(`Job ${job.id} will fail`);
+                        throw new Error(`Job failed!!`);
+                    }
+                }
+                Logger.info(`Completed job ${job.id}`);
+            },
+        });
+    }
+
+    async startTask(intervalMs: number, shouldFail: boolean) {
+        await this.myQueue.add({ intervalMs, shouldFail }, { retries: 3 });
+        return true;
+    }
+}
+
+@Resolver()
+export class JobQueueTestResolver {
+    constructor(private service: JobQueueTestService) {}
+
+    @Mutation()
+    startTask(@Args() args: any) {
+        return this.service.startTask(args.intervalMs, args.shouldFail);
+    }
+}
+
+/**
+ * A plugin which can be used to test job queue strategies. Exposes a mutation `startTask` in
+ * the Admin API which triggers a job.
+ */
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    adminApiExtensions: {
+        resolvers: [JobQueueTestResolver],
+        schema: gql`
+            extend type Mutation {
+                startTask(intervalMs: Int, shouldFail: Boolean!): Boolean!
+            }
+        `,
+    },
+    providers: [JobQueueTestService],
+})
+export class JobQueueTestPlugin {}