Просмотр исходного кода

fix(core): More reliable job subscriber timeout handling

Michael Bromley 6 месяцев назад
Родитель
Сommit
5e124d5abf

+ 1 - 1
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -56,7 +56,7 @@ class TestController implements OnModuleInit {
     async runJobAndSubscribeTimeout() {
         const job = await this.queue.add({});
         const result = await job
-            .updates({ timeoutMs: 50 })
+            .updates({ timeoutMs: 100 })
             .toPromise()
             .then(update => update?.result);
         return result;

+ 7 - 4
packages/core/src/job-queue/subscribable-job.ts

@@ -2,8 +2,8 @@ import { JobState } from '@vendure/common/lib/generated-types';
 import { pick } from '@vendure/common/lib/pick';
 import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
 import ms from 'ms';
-import { interval, Observable, race, timer } from 'rxjs';
-import { distinctUntilChanged, filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
+import { interval, merge, Observable, timer } from 'rxjs';
+import { distinctUntilChanged, filter, map, switchMap, take, takeWhile, tap } from 'rxjs/operators';
 
 import { InternalServerError } from '../common/error/errors';
 import { Logger } from '../config/index';
@@ -73,7 +73,7 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
     /**
      * @description
      * Returns an Observable stream of updates to the Job. Works by polling the current JobQueueStrategy's `findOne()` method
-     * to obtain updates. If this updates are not subscribed to, then no polling occurs.
+     * to obtain updates. If the updates are not subscribed to, then no polling occurs.
      *
      * Polling interval, timeout and other options may be configured with an options arguments {@link JobUpdateOptions}.
      *
@@ -132,7 +132,10 @@ export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
                         }) satisfies JobUpdate<any>,
                 ),
             );
-            return race(updates$, timeout$);
+
+            // Use merge + take(1) instead of race() to handle immediate timer emissions more reliably
+            // This prevents race conditions where the timer might fire before race() can capture it
+            return merge(updates$, timeout$).pipe(take(1));
         }
     }
 }