Explorar o código

fix(core): Fix SubscribableJob.updates() completing after single emission (#4120)

Michael Bromley hai 22 horas
pai
achega
45c2e66db0

+ 59 - 0
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -6,6 +6,7 @@ import { take } from 'rxjs/operators';
 @Controller('run-job')
 class TestController implements OnModuleInit {
     private queue: JobQueue<{ returnValue?: string }>;
+    private progressQueue: JobQueue<{ duration: number }>;
 
     constructor(private jobQueueService: JobQueueService) {}
 
@@ -35,6 +36,21 @@ class TestController implements OnModuleInit {
                 }
             },
         });
+
+        // Queue for testing that updates() emits multiple times until job completion
+        this.progressQueue = await this.jobQueueService.createQueue({
+            name: 'test-progress',
+            process: async job => {
+                const duration = job.data.duration;
+                const steps = 4;
+                const stepDuration = duration / steps;
+                for (let i = 1; i <= steps; i++) {
+                    await new Promise(resolve => setTimeout(resolve, stepDuration));
+                    job.setProgress(i * 25);
+                }
+                return 'completed';
+            },
+        });
     }
 
     @Get()
@@ -61,6 +77,49 @@ class TestController implements OnModuleInit {
             .then(update => update?.result);
         return result;
     }
+
+    /**
+     * This endpoint tests that job.updates() emits multiple times as the job progresses,
+     * and continues until the job reaches a terminal state (COMPLETED).
+     * See https://github.com/vendure-ecommerce/vendure/issues/4112
+     */
+    @Get('subscribe-all-updates')
+    async runJobAndSubscribeAllUpdates() {
+        const job = await this.progressQueue.add({ duration: 500 });
+        const allUpdates: Array<{ state: string; progress: number; result: any }> = [];
+        return new Promise(resolve => {
+            job.updates({ pollInterval: 50, timeoutMs: 10000 }).subscribe({
+                next: update => {
+                    allUpdates.push({
+                        state: update.state as string,
+                        progress: update.progress,
+                        result: update.result,
+                    });
+                },
+                error: err => {
+                    resolve(
+                        JSON.stringify({
+                            updateCount: allUpdates.length,
+                            states: allUpdates.map(u => u.state),
+                            finalState: allUpdates[allUpdates.length - 1]?.state,
+                            finalResult: allUpdates[allUpdates.length - 1]?.result,
+                            error: err.message,
+                        }),
+                    );
+                },
+                complete: () => {
+                    resolve(
+                        JSON.stringify({
+                            updateCount: allUpdates.length,
+                            states: allUpdates.map(u => u.state),
+                            finalState: allUpdates[allUpdates.length - 1]?.state,
+                            finalResult: allUpdates[allUpdates.length - 1]?.result,
+                        }),
+                    );
+                },
+            });
+        });
+    }
 }
 
 @VendurePlugin({

+ 15 - 1
packages/core/e2e/job-queue.e2e-spec.ts

@@ -5,7 +5,7 @@ import path from 'path';
 import { afterAll, beforeAll, describe, expect, it } from 'vitest';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
 
 import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
 import {
@@ -174,6 +174,20 @@ describe('JobQueue', () => {
         const jobs = await getJobsInTestQueue(JobState.RUNNING);
         expect(jobs.items.length).toBe(1);
     });
+
+    // https://github.com/vendure-ecommerce/vendure/issues/4112
+    it('updates() should emit multiple times until job completes', async () => {
+        const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe-all-updates`;
+        const response = await adminClient.fetch(restControllerUrl);
+        const result = JSON.parse(await response.text());
+
+        // Job does 4 progress steps (25%, 50%, 75%, 100%), so we should see at least 3 distinct updates
+        expect(result.updateCount).toBeGreaterThanOrEqual(3);
+        // Final state should be COMPLETED
+        expect(result.finalState).toBe(JobState.COMPLETED);
+        // Final result should be what the job returned
+        expect(result.finalResult).toBe('completed');
+    });
 });
 
 function sleep(ms: number): Promise<void> {

+ 8 - 7
packages/core/src/job-queue/subscribable-job.ts

@@ -2,8 +2,8 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { pick } from '@vendure/common/lib/pick';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import ms from 'ms';
-import { interval, merge, Observable, timer } from 'rxjs';
-import { distinctUntilChanged, filter, map, switchMap, take, takeWhile, tap } from 'rxjs/operators';
+import { interval, Observable, race, timer } from 'rxjs';
+import { distinctUntilChanged, filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
 
 import { InternalServerError } from '../common/error/errors';
 import { Logger } from '../config/index';
@@ -42,7 +42,7 @@ export type JobUpdateOptions = {
      */
     timeoutMs?: number;
     /**
-     * Observable sequence will end with an error if true. Default to false
+     * Observable sequence will end with an error if true. Default to true
      */
     errorOnFail?: boolean;
 };
@@ -80,7 +80,7 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
      */
     updates(options?: JobUpdateOptions): Observable<JobUpdate<T>> {
         const pollInterval = Math.max(50, options?.pollInterval ?? 200);
-        const timeoutMs = Math.max(pollInterval, options?.timeoutMs ?? ms('1h'));
+        const timeoutMs = Math.max(1, options?.timeoutMs ?? ms('1h'));
         const strategy = this.jobQueueStrategy;
         if (!isInspectableJobQueueStrategy(strategy)) {
             throw new InternalServerError(
@@ -133,9 +133,10 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
                 ),
             );
 
-            // Use merge + take(1) instead of race() to handle immediate timer emissions more reliably
-            // This prevents race conditions where the timer might fire before race() can capture it
-            return merge(updates$, timeout$).pipe(take(1));
+            // Use race() to return whichever observable emits first and follow it to completion.
+            // - If updates$ emits first, it will continue emitting until the job settles
+            // - If timeout$ emits first, it will emit the timeout message and complete
+            return race(updates$, timeout$) as Observable<JobUpdate<T>>;
         }
     }
 }