ソースを参照

fix(core): Improve reliability of Job cancellation

Michael Bromley 4 年 前
コミット
410b4c2c42

+ 23 - 8
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -5,20 +5,26 @@ import { take } from 'rxjs/operators';
 
 @Controller('run-job')
 class TestController implements OnModuleInit {
-    private queue: JobQueue;
+    private queue: JobQueue<{ returnValue?: string }>;
 
     constructor(private jobQueueService: JobQueueService) {}
 
     async onModuleInit(): Promise<void> {
         this.queue = await this.jobQueueService.createQueue({
             name: 'test',
-            process: job => {
-                return PluginWithJobQueue.jobSubject
-                    .pipe(take(1))
-                    .toPromise()
-                    .then(() => {
-                        PluginWithJobQueue.jobHasDoneWork = true;
-                    });
+            process: async job => {
+                if (job.data.returnValue) {
+                    await new Promise(resolve => setTimeout(resolve, 50));
+                    return job.data.returnValue;
+                } else {
+                    return PluginWithJobQueue.jobSubject
+                        .pipe(take(1))
+                        .toPromise()
+                        .then(() => {
+                            PluginWithJobQueue.jobHasDoneWork = true;
+                            return job.data.returnValue;
+                        });
+                }
             },
         });
     }
@@ -28,6 +34,15 @@ class TestController implements OnModuleInit {
         await this.queue.add({});
         return true;
     }
+
+    @Get('subscribe')
+    async runJobAndSubscribe() {
+        const job = await this.queue.add({ returnValue: '42!' });
+        return job
+            .updates()
+            .toPromise()
+            .then(update => update.result);
+    }
 }
 
 @VendurePlugin({

+ 3 - 2
packages/core/src/job-queue/job.ts

@@ -158,13 +158,14 @@ export class Job<T extends JobData<T> = any> {
         if (this.retries >= this._attempts) {
             this._state = JobState.RETRYING;
         } else {
-            this._state = JobState.FAILED;
+            if (this._state !== JobState.CANCELLED) {
+                this._state = JobState.FAILED;
+            }
             this._settledAt = new Date();
         }
     }
 
     cancel() {
-        this._progress = 0;
         this._settledAt = new Date();
         this._state = JobState.CANCELLED;
     }

+ 17 - 4
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -1,6 +1,8 @@
+import { JobState } from '@vendure/common/lib/generated-types';
 import { ID } from '@vendure/common/lib/shared-types';
-import { Subject, Subscription } from 'rxjs';
-import { throttleTime } from 'rxjs/operators';
+import { interval, race, Subject, Subscription } from 'rxjs';
+import { fromPromise } from 'rxjs/internal-compatibility';
+import { filter, switchMap, take, throttleTime } from 'rxjs/operators';
 
 import { Logger } from '../config/logger/vendure-logger';
 
@@ -41,10 +43,21 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
                         await this.jobQueueStrategy.update(nextJob);
                         const onProgress = (job: Job) => this.jobQueueStrategy.update(job);
                         nextJob.on('progress', onProgress);
-                        this.process(nextJob)
+                        const cancellationSignal$ = interval(this.jobQueueStrategy.pollInterval * 5).pipe(
+                            // tslint:disable-next-line:no-non-null-assertion
+                            switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)),
+                            filter(job => job?.state === JobState.CANCELLED),
+                            take(1),
+                        );
+                        race(fromPromise(this.process(nextJob)), cancellationSignal$)
+                            .toPromise()
                             .then(
                                 result => {
-                                    nextJob.complete(result);
+                                    if (result instanceof Job && result.state === JobState.CANCELLED) {
+                                        nextJob.cancel();
+                                    } else {
+                                        nextJob.complete(result);
+                                    }
                                 },
                                 err => {
                                     nextJob.fail(err);

+ 9 - 2
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -95,7 +95,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
             const job = this.fromRecord(record);
             job.start();
             record.state = JobState.RUNNING;
-            await manager.getRepository(JobRecord).save(record);
+            await manager.getRepository(JobRecord).save(record, { reload: false });
             return job;
         } else {
             return;
@@ -106,7 +106,14 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         if (!this.connectionAvailable(this.connection)) {
             throw new Error('Connection not available');
         }
-        await this.connection.getRepository(JobRecord).save(this.toRecord(job));
+        await this.connection
+            .getRepository(JobRecord)
+            .createQueryBuilder('job')
+            .update()
+            .set(this.toRecord(job))
+            .where('id = :id', { id: job.id })
+            .andWhere('state != :cancelled', { cancelled: JobState.CANCELLED })
+            .execute();
     }
 
     async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {

+ 1 - 1
packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts

@@ -37,7 +37,7 @@ export class JobQueueTestService implements OnModuleInit {
     async startTask(intervalMs: number, shouldFail: boolean, subscribeToResult: boolean) {
         const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 0 });
         if (subscribeToResult) {
-            return job.updates({ timeoutMs: 1000 }).pipe(
+            return job.updates().pipe(
                 map(update => {
                     Logger.info(`Job ${update.id}: progress: ${update.progress}`);
                     if (update.state === JobState.COMPLETED) {