Kaynağa Gözat

fix(core): Fix server crash when subscribable job times out

Fixes #3397
Michael Bromley 10 ay önce
ebeveyn
işleme
7f851c3906

+ 20 - 2
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -1,5 +1,5 @@
 import { Controller, Get, OnModuleInit } from '@nestjs/common';
-import { JobQueue, JobQueueService, PluginCommonModule, VendurePlugin } from '@vendure/core';
+import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
 import { Subject } from 'rxjs';
 import { take } from 'rxjs/operators';
 
@@ -17,12 +17,20 @@ class TestController implements OnModuleInit {
                     await new Promise(resolve => setTimeout(resolve, 50));
                     return job.data.returnValue;
                 } else {
+                    const interval = setInterval(() => {
+                        Logger.info(`Job is running...`);
+                        if (job.state === 'CANCELLED') {
+                            clearInterval(interval);
+                            PluginWithJobQueue.jobSubject.next();
+                        }
+                    }, 500);
                     return PluginWithJobQueue.jobSubject
                         .pipe(take(1))
                         .toPromise()
                         .then(() => {
                             PluginWithJobQueue.jobHasDoneWork = true;
-                            return job.data.returnValue;
+                            clearInterval(interval);
+                            return 'job result';
                         });
                 }
             },
@@ -43,6 +51,16 @@ class TestController implements OnModuleInit {
             .toPromise()
             .then(update => update?.result);
     }
+
+    @Get('subscribe-timeout')
+    async runJobAndSubscribeTimeout() {
+        const job = await this.queue.add({});
+        const result = await job
+            .updates({ timeoutMs: 50 })
+            .toPromise()
+            .then(update => update?.result);
+        return result;
+    }
 }
 
 @VendurePlugin({

+ 15 - 0
packages/core/e2e/job-queue.e2e-spec.ts

@@ -159,6 +159,21 @@ describe('JobQueue', () => {
         const jobs = await getJobsInTestQueue(JobState.RUNNING);
         expect(jobs.items.length).toBe(0);
     });
+
+    it('subscribable that times out', async () => {
+        const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe-timeout`;
+        const result = await adminClient.fetch(restControllerUrl);
+
+        expect(result.status).toBe(200);
+        expect(await result.text()).toBe('Job subscription timed out. The job may still be running');
+        const jobs = await getJobsInTestQueue(JobState.RUNNING);
+        expect(jobs.items.length).toBe(1);
+    });
+
+    it('server still running after timeout', async () => {
+        const jobs = await getJobsInTestQueue(JobState.RUNNING);
+        expect(jobs.items.length).toBe(1);
+    });
 });
 
 function sleep(ms: number): Promise<void> {

+ 24 - 11
packages/core/src/job-queue/subscribable-job.ts

@@ -2,10 +2,11 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { pick } from '@vendure/common/lib/pick';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import ms from 'ms';
-import { interval, Observable } from 'rxjs';
+import { interval, Observable, race, timer } from 'rxjs';
 import { distinctUntilChanged, filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
 
 import { InternalServerError } from '../common/error/errors';
+import { Logger } from '../config/index';
 import { isInspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
 import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
 
@@ -87,16 +88,7 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
             );
         } else {
             // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-            return interval(pollInterval).pipe(
-                tap(i => {
-                    if (timeoutMs < i * pollInterval) {
-                        throw new Error(
-                            `Job ${
-                                this.id ?? ''
-                            } SubscribableJob update polling timed out after ${timeoutMs}ms. The job may still be running.`,
-                        );
-                    }
-                }),
+            const updates$ = interval(pollInterval).pipe(
                 switchMap(() => {
                     const id = this.id;
                     if (!id) {
@@ -120,6 +112,27 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
                 }),
                 map(job => pick(job, ['id', 'state', 'progress', 'result', 'error', 'data'])),
             );
+            const timeout$ = timer(timeoutMs).pipe(
+                tap(i => {
+                    Logger.error(
+                        `Job ${
+                            this.id ?? ''
+                        } SubscribableJob update polling timed out after ${timeoutMs}ms. The job may still be running.`,
+                    );
+                }),
+                map(
+                    () =>
+                        ({
+                            id: this.id,
+                            state: JobState.RUNNING,
+                            data: this.data,
+                            error: this.error,
+                            progress: this.progress,
+                            result: 'Job subscription timed out. The job may still be running',
+                        }) satisfies JobUpdate<any>,
+                ),
+            );
+            return race(updates$, timeout$);
         }
     }
 }