subscribable-job.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { pick } from '@vendure/common/lib/pick';
  3. import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
  4. import ms from 'ms';
  5. import { interval, Observable } from 'rxjs';
  6. import { distinctUntilChanged, filter, map, switchMap, takeWhile, tap } from 'rxjs/operators';
  7. import { InternalServerError } from '../common/error/errors';
  8. import { isInspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
  9. import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
  10. import { Job } from './job';
  11. import { JobConfig, JobData } from './types';
  12. /**
  13. * @description
  14. * Job update status as returned from the {@link SubscribableJob}'s `update()` method.
  15. *
  16. * @docsCategory JobQueue
  17. * @docsPage types
  18. */
  19. export type JobUpdate<T extends JobData<T>> = Pick<
  20. Job<T>,
  21. 'id' | 'state' | 'progress' | 'result' | 'error' | 'data'
  22. >;
  23. /**
  24. * @description
  25. * Job update options, that you can specify by calling {@link SubscribableJob.updates updates()} method.
  26. *
  27. * @docsCategory JobQueue
  28. * @docsPage types
  29. */
  30. export type JobUpdateOptions = {
  31. /**
  32. * Polling interval. Defaults to 200ms
  33. */
  34. pollInterval?: number;
  35. /**
  36. * Polling timeout in milliseconds. Defaults to 1 hour
  37. */
  38. timeoutMs?: number;
  39. /**
  40. * Observable sequence will end with an error if true. Default to false
  41. */
  42. errorOnFail?: boolean;
  43. };
  44. /**
  45. * @description
  46. * This is a type of Job object that allows you to subscribe to updates to the Job. It is returned
  47. * by the {@link JobQueue}'s `add()` method. Note that the subscription capability is only supported
  48. * if the {@link JobQueueStrategy} implements the {@link InspectableJobQueueStrategy} interface (e.g.
  49. * the {@link SqlJobQueueStrategy} does support this).
  50. *
  51. * @docsCategory JobQueue
  52. */
  53. export class SubscribableJob<T extends JobData<T> = any> extends Job<T> {
  54. private readonly jobQueueStrategy: JobQueueStrategy;
  55. constructor(job: Job<T>, jobQueueStrategy: JobQueueStrategy) {
  56. const config: JobConfig<T> = {
  57. ...job,
  58. state: job.state,
  59. data: job.data,
  60. id: job.id || undefined,
  61. };
  62. super(config);
  63. this.jobQueueStrategy = jobQueueStrategy;
  64. }
  65. /**
  66. * @description
  67. * Returns an Observable stream of updates to the Job. Works by polling the current JobQueueStrategy's `findOne()` method
  68. * to obtain updates. If this updates are not subscribed to, then no polling occurs.
  69. *
  70. * Polling interval, timeout and other options may be configured with an options arguments {@link JobUpdateOptions}.
  71. */
  72. updates(options?: JobUpdateOptions): Observable<JobUpdate<T>> {
  73. const pollInterval = Math.max(50, options?.pollInterval ?? 200);
  74. const timeoutMs = Math.max(pollInterval, options?.timeoutMs ?? ms('1h'));
  75. const strategy = this.jobQueueStrategy;
  76. if (!isInspectableJobQueueStrategy(strategy)) {
  77. throw new InternalServerError(
  78. `The configured JobQueueStrategy (${strategy.constructor.name}) is not inspectable, so Job updates cannot be subscribed to`,
  79. );
  80. } else {
  81. // tslint:disable-next-line:no-non-null-assertion
  82. return interval(pollInterval).pipe(
  83. tap(i => {
  84. if (timeoutMs < i * pollInterval) {
  85. throw new Error(
  86. `Job ${this.id} SubscribableJob update polling timed out after ${timeoutMs}ms. The job may still be running.`,
  87. );
  88. }
  89. }),
  90. switchMap(() => {
  91. const id = this.id;
  92. if (!id) {
  93. throw new Error(`Cannot subscribe to update: Job does not have an ID`);
  94. }
  95. return strategy.findOne(id);
  96. }),
  97. filter(notNullOrUndefined),
  98. distinctUntilChanged((a, b) => a?.progress === b?.progress && a?.state === b?.state),
  99. takeWhile(
  100. job =>
  101. job?.state !== JobState.FAILED &&
  102. job.state !== JobState.COMPLETED &&
  103. job.state !== JobState.CANCELLED,
  104. true,
  105. ),
  106. tap(job => {
  107. if (job.state === JobState.FAILED && (options?.errorOnFail ?? true)) {
  108. throw new Error(job.error);
  109. }
  110. }),
  111. map(job => pick(job, ['id', 'state', 'progress', 'result', 'error', 'data'])),
  112. );
  113. }
  114. }
  115. }