polling-job-queue-strategy.ts 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { ID } from '@vendure/common/lib/shared-types';
  3. import { isObject } from '@vendure/common/lib/shared-utils';
  4. import { interval, race, Subject, Subscription } from 'rxjs';
  5. import { fromPromise } from 'rxjs/internal-compatibility';
  6. import { filter, switchMap, take, throttleTime } from 'rxjs/operators';
  7. import { Logger } from '../config/logger/vendure-logger';
  8. import { InjectableJobQueueStrategy } from './injectable-job-queue-strategy';
  9. import { Job } from './job';
  10. import { QueueNameProcessStorage } from './queue-name-process-storage';
  11. import { JobData } from './types';
  12. /**
  13. * @description
  14. * Defines the backoff strategy used when retrying failed jobs. Returns the delay in
  15. * ms that should pass before the failed job is retried.
  16. *
  17. * @docsCategory JobQueue
  18. * @docsPage types
  19. */
  20. export type BackoffStrategy = (queueName: string, attemptsMade: number, job: Job) => number;
  21. export interface PollingJobQueueStrategyConfig {
  22. /**
  23. * @description
  24. * How many jobs from a given queue to process concurrently.
  25. *
  26. * @default 1
  27. */
  28. concurrency?: number;
  29. /**
  30. * @description
  31. * The interval in ms between polling the database for new jobs.
  32. *
  33. * @description 200
  34. */
  35. pollInterval?: number | ((queueName: string) => number);
  36. /**
  37. * @description
  38. * The strategy used to decide how long to wait before retrying a failed job.
  39. *
  40. * @default () => 1000
  41. */
  42. backoffStrategy?: BackoffStrategy;
  43. }
  44. const STOP_SIGNAL = Symbol('STOP_SIGNAL');
  45. class ActiveQueue<Data extends JobData<Data> = {}> {
  46. private timer: any;
  47. private running = false;
  48. private activeJobs: Array<Job<Data>> = [];
  49. private errorNotifier$ = new Subject<[string, string]>();
  50. private queueStopped$ = new Subject<typeof STOP_SIGNAL>();
  51. private subscription: Subscription;
  52. private readonly pollInterval: number;
  53. constructor(
  54. private readonly queueName: string,
  55. private readonly process: (job: Job<Data>) => Promise<any>,
  56. private readonly jobQueueStrategy: PollingJobQueueStrategy,
  57. ) {
  58. this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
  59. Logger.error(message);
  60. Logger.debug(stack);
  61. });
  62. this.pollInterval =
  63. typeof this.jobQueueStrategy.pollInterval === 'function'
  64. ? this.jobQueueStrategy.pollInterval(queueName)
  65. : this.jobQueueStrategy.pollInterval;
  66. }
  67. start() {
  68. Logger.debug(`Starting JobQueue "${this.queueName}"`);
  69. this.running = true;
  70. const runNextJobs = async () => {
  71. try {
  72. const runningJobsCount = this.activeJobs.length;
  73. for (let i = runningJobsCount; i < this.jobQueueStrategy.concurrency; i++) {
  74. const nextJob = await this.jobQueueStrategy.next(this.queueName);
  75. if (nextJob) {
  76. this.activeJobs.push(nextJob);
  77. await this.jobQueueStrategy.update(nextJob);
  78. const onProgress = (job: Job) => this.jobQueueStrategy.update(job);
  79. nextJob.on('progress', onProgress);
  80. const cancellationSignal$ = interval(this.pollInterval * 5).pipe(
  81. // tslint:disable-next-line:no-non-null-assertion
  82. switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)),
  83. filter(job => job?.state === JobState.CANCELLED),
  84. take(1),
  85. );
  86. const stopSignal$ = this.queueStopped$.pipe(take(1));
  87. race(fromPromise(this.process(nextJob)), cancellationSignal$, stopSignal$)
  88. .toPromise()
  89. .then(
  90. result => {
  91. if (result === STOP_SIGNAL) {
  92. nextJob.defer();
  93. } else if (result instanceof Job && result.state === JobState.CANCELLED) {
  94. nextJob.cancel();
  95. } else {
  96. nextJob.complete(result);
  97. }
  98. },
  99. err => {
  100. nextJob.fail(err);
  101. },
  102. )
  103. .finally(() => {
  104. if (!this.running && nextJob.state !== JobState.PENDING) {
  105. return;
  106. }
  107. nextJob.off('progress', onProgress);
  108. return this.onFailOrComplete(nextJob);
  109. })
  110. .catch(err => {
  111. Logger.warn(`Error updating job info: ${err}`);
  112. });
  113. }
  114. }
  115. } catch (e) {
  116. this.errorNotifier$.next([
  117. `Job queue "${this.queueName}" encountered an error (set log level to Debug for trace): ${e.message}`,
  118. e.stack,
  119. ]);
  120. }
  121. if (this.running) {
  122. this.timer = setTimeout(runNextJobs, this.pollInterval);
  123. }
  124. };
  125. runNextJobs();
  126. }
  127. stop(): Promise<void> {
  128. this.running = false;
  129. this.queueStopped$.next(STOP_SIGNAL);
  130. clearTimeout(this.timer);
  131. const start = +new Date();
  132. // Wait for 2 seconds to allow running jobs to complete
  133. const maxTimeout = 2000;
  134. let pollTimer: any;
  135. return new Promise(resolve => {
  136. const pollActiveJobs = async () => {
  137. const timedOut = +new Date() - start > maxTimeout;
  138. if (this.activeJobs.length === 0 || timedOut) {
  139. clearTimeout(pollTimer);
  140. resolve();
  141. } else {
  142. pollTimer = setTimeout(pollActiveJobs, 50);
  143. }
  144. };
  145. pollActiveJobs();
  146. });
  147. }
  148. private async onFailOrComplete(job: Job<Data>) {
  149. await this.jobQueueStrategy.update(job);
  150. this.removeJobFromActive(job);
  151. }
  152. private removeJobFromActive(job: Job<Data>) {
  153. const index = this.activeJobs.indexOf(job);
  154. this.activeJobs.splice(index, 1);
  155. }
  156. }
  157. /**
  158. * @description
  159. * This class allows easier implementation of {@link JobQueueStrategy} in a polling style.
  160. * Instead of providing {@link JobQueueStrategy} `start()` you should provide a `next` method.
  161. *
  162. * This class should be extended by any strategy which does not support a push-based system
  163. * to notify on new jobs. It is used by the {@link SqlJobQueueStrategy} and {@link InMemoryJobQueueStrategy}.
  164. *
  165. * @docsCategory JobQueue
  166. */
  167. export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
  168. public concurrency: number;
  169. public pollInterval: number | ((queueName: string) => number);
  170. public backOffStrategy?: BackoffStrategy;
  171. private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
  172. constructor(config?: PollingJobQueueStrategyConfig);
  173. constructor(concurrency?: number, pollInterval?: number);
  174. constructor(concurrencyOrConfig?: number | PollingJobQueueStrategyConfig, maybePollInterval?: number) {
  175. super();
  176. if (concurrencyOrConfig && isObject(concurrencyOrConfig)) {
  177. this.concurrency = concurrencyOrConfig.concurrency ?? 1;
  178. this.pollInterval = concurrencyOrConfig.pollInterval ?? 200;
  179. this.backOffStrategy = concurrencyOrConfig.backoffStrategy ?? (() => 1000);
  180. } else {
  181. this.concurrency = concurrencyOrConfig ?? 1;
  182. this.pollInterval = maybePollInterval ?? 200;
  183. }
  184. }
  185. async start<Data extends JobData<Data> = {}>(
  186. queueName: string,
  187. process: (job: Job<Data>) => Promise<any>,
  188. ) {
  189. if (!this.hasInitialized) {
  190. this.started.set(queueName, process);
  191. return;
  192. }
  193. if (this.activeQueues.has(queueName, process)) {
  194. return;
  195. }
  196. const active = new ActiveQueue<Data>(queueName, process, this);
  197. active.start();
  198. this.activeQueues.set(queueName, process, active);
  199. }
  200. async stop<Data extends JobData<Data> = {}>(
  201. queueName: string,
  202. process: (job: Job<Data>) => Promise<any>,
  203. ) {
  204. const active = this.activeQueues.getAndDelete(queueName, process);
  205. if (!active) {
  206. return;
  207. }
  208. await active.stop();
  209. }
  210. async cancelJob(jobId: ID): Promise<Job | undefined> {
  211. const job = await this.findOne(jobId);
  212. if (job) {
  213. job.cancel();
  214. await this.update(job);
  215. return job;
  216. }
  217. }
  218. /**
  219. * @description
  220. * Should return the next job in the given queue. The implementation is
  221. * responsible for returning the correct job according to the time of
  222. * creation.
  223. */
  224. abstract next(queueName: string): Promise<Job | undefined>;
  225. /**
  226. * @description
  227. * Update the job details in the store.
  228. */
  229. abstract update(job: Job): Promise<void>;
  230. /**
  231. * @description
  232. * Returns a job by its id.
  233. */
  234. abstract findOne(id: ID): Promise<Job | undefined>;
  235. }