job.ts 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { isClassInstance, isObject } from '@vendure/common/lib/shared-utils';
  3. import { Logger } from '../config/logger/vendure-logger';
  4. import { JobConfig, JobData } from './types';
  5. /**
  6. * @description
  7. * An event raised by a Job.
  8. *
  9. * @docsCategory JobQueue
  10. * @docsPage Job
  11. */
  12. export type JobEventType = 'progress';
  13. /**
  14. * @description
  15. * The signature of the event handler expected by the `Job.on()` method.
  16. *
  17. * @docsCategory JobQueue
  18. * @docsPage Job
  19. */
  20. export type JobEventListener<T extends JobData<T>> = (job: Job<T>) => void;
  21. /**
  22. * @description
  23. * A Job represents a piece of work to be run in the background, i.e. outside the request-response cycle.
  24. * It is intended to be used for long-running work triggered by API requests. Jobs should now generally
  25. * be directly instantiated. Rather, the {@link JobQueue} `add()` method should be used to create and
  26. * add a new Job to a queue.
  27. *
  28. * @docsCategory JobQueue
  29. * @docsPage Job
  30. * @docsWeight 0
  31. */
  32. export class Job<T extends JobData<T> = any> {
  33. readonly id: number | string | null;
  34. readonly queueName: string;
  35. readonly retries: number;
  36. readonly createdAt: Date;
  37. private readonly _data: T;
  38. private _state: JobState;
  39. private _progress: number;
  40. private _result?: any;
  41. private _error?: any;
  42. private _attempts: number;
  43. private _startedAt?: Date;
  44. private _settledAt?: Date;
  45. private readonly eventListeners: { [type in JobEventType]: Array<JobEventListener<T>> } = {
  46. progress: [],
  47. };
  48. get name(): string {
  49. return this.queueName;
  50. }
  51. get data(): T {
  52. return this._data;
  53. }
  54. get state(): JobState {
  55. return this._state;
  56. }
  57. get progress(): number {
  58. return this._progress;
  59. }
  60. get result(): any {
  61. return this._result;
  62. }
  63. get error(): any {
  64. return this._error;
  65. }
  66. get isSettled(): boolean {
  67. return (
  68. !!this._settledAt &&
  69. (this._state === JobState.COMPLETED ||
  70. this._state === JobState.FAILED ||
  71. this._state === JobState.CANCELLED)
  72. );
  73. }
  74. get startedAt(): Date | undefined {
  75. return this._startedAt;
  76. }
  77. get settledAt(): Date | undefined {
  78. return this._settledAt;
  79. }
  80. get duration(): number {
  81. if (this.state === JobState.PENDING || this.state === JobState.RETRYING) {
  82. return 0;
  83. }
  84. const end = this._settledAt || new Date();
  85. return +end - +(this._startedAt || end);
  86. }
  87. get attempts(): number {
  88. return this._attempts;
  89. }
  90. constructor(config: JobConfig<T>) {
  91. this.queueName = config.queueName;
  92. this._data = this.ensureDataIsSerializable(config.data);
  93. this.id = config.id || null;
  94. this._state = config.state || JobState.PENDING;
  95. this.retries = config.retries || 0;
  96. this._attempts = config.attempts || 0;
  97. this._progress = config.progress || 0;
  98. this.createdAt = config.createdAt || new Date();
  99. this._result = config.result;
  100. this._error = config.error;
  101. this._startedAt = config.startedAt;
  102. this._settledAt = config.settledAt;
  103. }
  104. /**
  105. * @description
  106. * Calling this signifies that the job work has started. This method should be
  107. * called in the {@link JobQueueStrategy} `next()` method.
  108. */
  109. start() {
  110. if (this._state === JobState.PENDING || this._state === JobState.RETRYING) {
  111. this._state = JobState.RUNNING;
  112. this._startedAt = new Date();
  113. this._attempts++;
  114. Logger.debug(
  115. `Job ${this.id} [${this.queueName}] starting (attempt ${this._attempts} of ${
  116. this.retries + 1
  117. })`,
  118. );
  119. }
  120. }
  121. /**
  122. * @description
  123. * Sets the progress (0 - 100) of the job.
  124. */
  125. setProgress(percent: number) {
  126. this._progress = Math.min(percent || 0, 100);
  127. this.fireEvent('progress');
  128. }
  129. /**
  130. * @description
  131. * Calling this method signifies that the job succeeded. The result
  132. * will be stored in the `Job.result` property.
  133. */
  134. complete(result?: any) {
  135. this._result = result;
  136. this._progress = 100;
  137. this._state = JobState.COMPLETED;
  138. this._settledAt = new Date();
  139. Logger.debug(`Job ${this.id} [${this.queueName}] completed`);
  140. }
  141. /**
  142. * @description
  143. * Calling this method signifies that the job failed.
  144. */
  145. fail(err?: any) {
  146. this._error = err?.message ? err.message : String(err);
  147. this._progress = 0;
  148. if (this.retries >= this._attempts) {
  149. this._state = JobState.RETRYING;
  150. Logger.warn(
  151. `Job ${this.id} [${this.queueName}] failed (attempt ${this._attempts} of ${
  152. this.retries + 1
  153. })`,
  154. );
  155. } else {
  156. if (this._state !== JobState.CANCELLED) {
  157. this._state = JobState.FAILED;
  158. Logger.warn(`Job ${this.id} [${this.queueName}] failed and will not retry.`);
  159. }
  160. this._settledAt = new Date();
  161. }
  162. }
  163. cancel() {
  164. this._settledAt = new Date();
  165. this._state = JobState.CANCELLED;
  166. }
  167. /**
  168. * @description
  169. * Sets a RUNNING job back to PENDING. Should be used when the JobQueue is being
  170. * destroyed before the job has been completed.
  171. */
  172. defer() {
  173. if (this._state === JobState.RUNNING) {
  174. this._state = JobState.PENDING;
  175. this._attempts = 0;
  176. Logger.debug(`Job ${this.id} [${this.queueName}] deferred back to PENDING state`);
  177. }
  178. }
  179. /**
  180. * @description
  181. * Used to register event handlers for job events
  182. */
  183. on(eventType: JobEventType, listener: JobEventListener<T>) {
  184. this.eventListeners[eventType].push(listener);
  185. }
  186. off(eventType: JobEventType, listener: JobEventListener<T>) {
  187. const idx = this.eventListeners[eventType].indexOf(listener);
  188. if (idx !== -1) {
  189. this.eventListeners[eventType].splice(idx, 1);
  190. }
  191. }
  192. private fireEvent(eventType: JobEventType) {
  193. for (const listener of this.eventListeners[eventType]) {
  194. listener(this);
  195. }
  196. }
  197. /**
  198. * All data in a job must be serializable. This method handles certain problem cases such as when
  199. * the data is a class instance with getters. Even though technically the "data" object should
  200. * already be serializable per the TS type, in practice data can slip through due to loss of
  201. * type safety.
  202. */
  203. private ensureDataIsSerializable(data: any, depth = 0): any {
  204. if (10 < depth) {
  205. return '[max depth reached]';
  206. }
  207. depth++;
  208. let output: any;
  209. if (data instanceof Date) {
  210. return data.toISOString();
  211. } else if (isObject(data)) {
  212. if (!output) {
  213. output = {};
  214. }
  215. for (const key of Object.keys(data)) {
  216. output[key] = this.ensureDataIsSerializable((data as any)[key], depth);
  217. }
  218. if (isClassInstance(data)) {
  219. const descriptors = Object.getOwnPropertyDescriptors(Object.getPrototypeOf(data));
  220. for (const name of Object.keys(descriptors)) {
  221. const descriptor = descriptors[name];
  222. if (typeof descriptor.get === 'function') {
  223. output[name] = (data as any)[name];
  224. }
  225. }
  226. }
  227. } else if (Array.isArray(data)) {
  228. if (!output) {
  229. output = [];
  230. }
  231. data.forEach((item, i) => {
  232. output[i] = this.ensureDataIsSerializable(item, depth);
  233. });
  234. } else {
  235. return data;
  236. }
  237. return output;
  238. }
  239. }