subscribable-job.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { pick } from '@vendure/common/lib/pick';
  3. import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
  4. import ms from 'ms';
  5. import { interval, Observable, race, timer } from 'rxjs';
  6. import { distinctUntilChanged, filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
  7. import { InternalServerError } from '../common/error/errors';
  8. import { Logger } from '../config/index';
  9. import { isInspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
  10. import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
  11. import { Job } from './job';
  12. import { JobConfig, JobData } from './types';
  13. /**
  14. * @description
  15. * Job update status as returned from the {@link SubscribableJob}'s `update()` method.
  16. *
  17. * @docsCategory JobQueue
  18. * @docsPage types
  19. */
  20. export type JobUpdate<T extends JobData<T>> = Pick<
  21. Job<T>,
  22. 'id' | 'state' | 'progress' | 'result' | 'error' | 'data'
  23. >;
  24. /**
  25. * @description
  26. * Job update options, that you can specify by calling {@link SubscribableJob} `updates` method.
  27. *
  28. * @docsCategory JobQueue
  29. * @docsPage types
  30. */
  31. export type JobUpdateOptions = {
  32. /**
  33. * Polling interval. Defaults to 200ms
  34. */
  35. pollInterval?: number;
  36. /**
  37. * Polling timeout in milliseconds. Defaults to 1 hour
  38. */
  39. timeoutMs?: number;
  40. /**
  41. * Observable sequence will end with an error if true. Default to true
  42. */
  43. errorOnFail?: boolean;
  44. };
  45. /**
  46. * @description
  47. * This is a type of Job object that allows you to subscribe to updates to the Job. It is returned
  48. * by the {@link JobQueue}'s `add()` method. Note that the subscription capability is only supported
  49. * if the {@link JobQueueStrategy} implements the {@link InspectableJobQueueStrategy} interface (e.g.
  50. * the {@link SqlJobQueueStrategy} does support this).
  51. *
  52. * @docsCategory JobQueue
  53. */
  54. export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
  55. private readonly jobQueueStrategy: JobQueueStrategy;
  56. constructor(job: Job<T>, jobQueueStrategy: JobQueueStrategy) {
  57. const config: JobConfig<T> = {
  58. ...job,
  59. state: job.state,
  60. data: job.data,
  61. id: job.id || undefined,
  62. };
  63. super(config);
  64. this.jobQueueStrategy = jobQueueStrategy;
  65. }
  66. /**
  67. * @description
  68. * Returns an Observable stream of updates to the Job. Works by polling the current JobQueueStrategy's `findOne()` method
  69. * to obtain updates. If the updates are not subscribed to, then no polling occurs.
  70. *
  71. * Polling interval, timeout and other options may be configured with an options arguments {@link JobUpdateOptions}.
  72. *
  73. */
  74. updates(options?: JobUpdateOptions): Observable<JobUpdate<T>> {
  75. const pollInterval = Math.max(50, options?.pollInterval ?? 200);
  76. const timeoutMs = Math.max(1, options?.timeoutMs ?? ms('1h'));
  77. const strategy = this.jobQueueStrategy;
  78. if (!isInspectableJobQueueStrategy(strategy)) {
  79. throw new InternalServerError(
  80. `The configured JobQueueStrategy (${strategy.constructor.name}) is not inspectable, so Job updates cannot be subscribed to`,
  81. );
  82. } else {
  83. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  84. const updates$ = interval(pollInterval).pipe(
  85. switchMap(() => {
  86. const id = this.id;
  87. if (!id) {
  88. throw new Error('Cannot subscribe to update: Job does not have an ID');
  89. }
  90. return strategy.findOne(id);
  91. }),
  92. filter(notNullOrUndefined),
  93. distinctUntilChanged((a, b) => a?.progress === b?.progress && a?.state === b?.state),
  94. takeWhile(
  95. job =>
  96. job?.state !== JobState.FAILED &&
  97. job.state !== JobState.COMPLETED &&
  98. job.state !== JobState.CANCELLED,
  99. true,
  100. ),
  101. tap(job => {
  102. if (job.state === JobState.FAILED && (options?.errorOnFail ?? true)) {
  103. throw new Error(job.error);
  104. }
  105. }),
  106. map(job => pick(job, ['id', 'state', 'progress', 'result', 'error', 'data'])),
  107. );
  108. const timeout$ = timer(timeoutMs).pipe(
  109. tap(i => {
  110. Logger.error(
  111. `Job ${
  112. this.id ?? ''
  113. } SubscribableJob update polling timed out after ${timeoutMs}ms. The job may still be running.`,
  114. );
  115. }),
  116. map(
  117. () =>
  118. ({
  119. id: this.id,
  120. state: JobState.RUNNING,
  121. data: this.data,
  122. error: this.error,
  123. progress: this.progress,
  124. result: 'Job subscription timed out. The job may still be running',
  125. }) satisfies JobUpdate<any>,
  126. ),
  127. );
  128. // Use race() to return whichever observable emits first and follow it to completion.
  129. // - If updates$ emits first, it will continue emitting until the job settles
  130. // - If timeout$ emits first, it will emit the timeout message and complete
  131. return race(updates$, timeout$) as Observable<JobUpdate<T>>;
  132. }
  133. }
  134. }