job-queue.service.spec.ts 16 KB


  1. /* eslint-disable @typescript-eslint/no-non-null-assertion */
  2. import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
  3. import { ModuleRef } from '@nestjs/core';
  4. import { Test, TestingModule } from '@nestjs/testing';
  5. import { JobState } from '@vendure/common/lib/generated-types';
  6. import { Subject } from 'rxjs';
  7. import { take } from 'rxjs/operators';
  8. import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
  9. import { assertFound, Injector } from '../common';
  10. import { ConfigService } from '../config/config.service';
  11. import { ProcessContext, setProcessContext } from '../process-context/process-context';
  12. import { Job } from './job';
  13. import { JobBuffer } from './job-buffer/job-buffer';
  14. import { JobBufferService } from './job-buffer/job-buffer.service';
  15. import { TestingJobBufferStorageStrategy } from './job-buffer/testing-job-buffer-storage-strategy';
  16. import { JobQueue } from './job-queue';
  17. import { JobQueueService } from './job-queue.service';
  18. import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
  19. const queuePollInterval = 10;
  20. const backoffStrategySpy = vi.fn();
  21. const testJobQueueStrategy = new TestingJobQueueStrategy({
  22. concurrency: 1,
  23. pollInterval: queuePollInterval,
  24. backoffStrategy: backoffStrategySpy.mockReturnValue(0),
  25. });
  26. const testJobBufferStorageStrategy = new TestingJobBufferStorageStrategy();
  27. describe('JobQueueService', () => {
  28. let jobQueueService: JobQueueService;
  29. let module: TestingModule;
  30. function getJob(job: Job | string): Promise<Job> {
  31. const id = typeof job === 'string' ? job : job.id!;
  32. return assertFound(testJobQueueStrategy.findOne(id));
  33. }
  34. beforeEach(async () => {
  35. setProcessContext('server');
  36. module = await Test.createTestingModule({
  37. providers: [
  38. { provide: ConfigService, useClass: MockConfigService },
  39. JobQueueService,
  40. JobBufferService,
  41. ProcessContext,
  42. ],
  43. }).compile();
  44. await module.init();
  45. jobQueueService = module.get(JobQueueService);
  46. await jobQueueService.start();
  47. });
  48. afterEach(async () => {
  49. await module.close();
  50. });
  51. it('data is passed into job', async () => {
  52. const subject = new Subject<string>();
  53. const subNext = subject.pipe(take(1)).toPromise();
  54. const testQueue = await jobQueueService.createQueue<string>({
  55. name: 'test',
  56. process: async job => {
  57. subject.next(job.data);
  58. },
  59. });
  60. await testQueue.add('hello');
  61. const data = await subNext;
  62. expect(data).toBe('hello');
  63. });
  64. it('job marked as complete', async () => {
  65. const subject = new Subject<string>();
  66. const testQueue = await jobQueueService.createQueue<string>({
  67. name: 'test',
  68. process: job => {
  69. return subject.toPromise();
  70. },
  71. });
  72. const testJob = await testQueue.add('hello');
  73. expect(testJob.state).toBe(JobState.PENDING);
  74. await tick(queuePollInterval);
  75. expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
  76. subject.next('yay');
  77. subject.complete();
  78. await tick();
  79. expect((await getJob(testJob)).state).toBe(JobState.COMPLETED);
  80. expect((await getJob(testJob)).result).toBe('yay');
  81. });
  82. it('job marked as failed when exception thrown', async () => {
  83. const subject = new Subject<string>();
  84. const testQueue = await jobQueueService.createQueue<string>({
  85. name: 'test',
  86. process: async job => {
  87. const result = await subject.toPromise();
  88. throw result;
  89. },
  90. });
  91. const testJob = await testQueue.add('hello');
  92. expect(testJob.state).toBe(JobState.PENDING);
  93. await tick(queuePollInterval);
  94. expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
  95. subject.next('uh oh');
  96. subject.complete();
  97. await tick();
  98. expect((await getJob(testJob)).state).toBe(JobState.FAILED);
  99. expect((await getJob(testJob)).error).toBe('uh oh');
  100. });
  101. it('job marked as failed when async error thrown', async () => {
  102. const err = new Error('something bad happened');
  103. const testQueue = await jobQueueService.createQueue<string>({
  104. name: 'test',
  105. process: async job => {
  106. throw err;
  107. },
  108. });
  109. const testJob = await testQueue.add('hello');
  110. expect(testJob.state).toBe(JobState.PENDING);
  111. await tick(queuePollInterval);
  112. expect((await getJob(testJob)).state).toBe(JobState.FAILED);
  113. expect((await getJob(testJob)).error).toBe(err.message);
  114. });
  115. it('jobs processed in FIFO queue', async () => {
  116. const subject = new Subject<void>();
  117. const testQueue = await jobQueueService.createQueue<string>({
  118. name: 'test',
  119. process: job => {
  120. return subject.pipe(take(1)).toPromise();
  121. },
  122. });
  123. const testJob1 = await testQueue.add('1');
  124. const testJob2 = await testQueue.add('2');
  125. const testJob3 = await testQueue.add('3');
  126. const getStates = async () => [
  127. (await getJob(testJob1)).state,
  128. (await getJob(testJob2)).state,
  129. (await getJob(testJob3)).state,
  130. ];
  131. await tick(queuePollInterval);
  132. expect(await getStates()).toEqual([JobState.RUNNING, JobState.PENDING, JobState.PENDING]);
  133. subject.next();
  134. await tick();
  135. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.PENDING, JobState.PENDING]);
  136. await tick(queuePollInterval);
  137. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.RUNNING, JobState.PENDING]);
  138. subject.next();
  139. await tick();
  140. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
  141. await tick(queuePollInterval);
  142. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
  143. subject.next();
  144. await tick();
  145. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
  146. subject.complete();
  147. });
  148. it('with concurrency', async () => {
  149. const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
  150. .jobQueueStrategy as TestingJobQueueStrategy;
  151. testingJobQueueStrategy.concurrency = 2;
  152. const subject = new Subject<void>();
  153. const testQueue = await jobQueueService.createQueue<string>({
  154. name: 'test',
  155. process: job => {
  156. return subject.pipe(take(1)).toPromise();
  157. },
  158. });
  159. const testJob1 = await testQueue.add('1');
  160. const testJob2 = await testQueue.add('2');
  161. const testJob3 = await testQueue.add('3');
  162. const getStates = async () => [
  163. (await getJob(testJob1)).state,
  164. (await getJob(testJob2)).state,
  165. (await getJob(testJob3)).state,
  166. ];
  167. await tick(queuePollInterval);
  168. expect(await getStates()).toEqual([JobState.RUNNING, JobState.RUNNING, JobState.PENDING]);
  169. subject.next();
  170. await tick();
  171. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.PENDING]);
  172. await tick(queuePollInterval);
  173. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.RUNNING]);
  174. subject.next();
  175. await tick();
  176. expect(await getStates()).toEqual([JobState.COMPLETED, JobState.COMPLETED, JobState.COMPLETED]);
  177. subject.complete();
  178. });
  179. it('processes existing jobs on start', async () => {
  180. await testJobQueueStrategy.prePopulate([
  181. new Job<any>({
  182. queueName: 'test',
  183. data: {},
  184. id: 'job-1',
  185. }),
  186. new Job<any>({
  187. queueName: 'test',
  188. data: {},
  189. id: 'job-2',
  190. }),
  191. ]);
  192. const testQueue = await jobQueueService.createQueue<string>({
  193. name: 'test',
  194. process: async job => {
  195. return;
  196. },
  197. });
  198. const job1 = await getJob('job-1');
  199. const job2 = await getJob('job-2');
  200. expect(job1?.state).toBe(JobState.COMPLETED);
  201. expect(job2?.state).toBe(JobState.RUNNING);
  202. await tick(queuePollInterval);
  203. expect((await getJob('job-2')).state).toBe(JobState.COMPLETED);
  204. });
  205. it('retries', async () => {
  206. backoffStrategySpy.mockClear();
  207. const subject = new Subject<boolean>();
  208. const testQueue = await jobQueueService.createQueue<string>({
  209. name: 'test',
  210. process: job => {
  211. return subject
  212. .pipe(take(1))
  213. .toPromise()
  214. .then(success => {
  215. if (!success) {
  216. throw new Error();
  217. }
  218. });
  219. },
  220. });
  221. const testJob = await testQueue.add('hello', { retries: 2 });
  222. await tick(queuePollInterval);
  223. expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
  224. expect((await getJob(testJob)).isSettled).toBe(false);
  225. subject.next(false);
  226. await tick();
  227. expect((await getJob(testJob)).state).toBe(JobState.RETRYING);
  228. expect((await getJob(testJob)).isSettled).toBe(false);
  229. await tick(queuePollInterval);
  230. expect(backoffStrategySpy).toHaveBeenCalledTimes(1);
  231. expect(backoffStrategySpy.mock.calls[0]).toEqual(['test', 1, await getJob(testJob)]);
  232. subject.next(false);
  233. await tick();
  234. expect((await getJob(testJob)).state).toBe(JobState.RETRYING);
  235. expect((await getJob(testJob)).isSettled).toBe(false);
  236. await tick(queuePollInterval);
  237. expect(backoffStrategySpy).toHaveBeenCalledTimes(2);
  238. expect(backoffStrategySpy.mock.calls[1]).toEqual(['test', 2, await getJob(testJob)]);
  239. subject.next(false);
  240. await tick();
  241. expect((await getJob(testJob)).state).toBe(JobState.FAILED);
  242. expect((await getJob(testJob)).isSettled).toBe(true);
  243. });
  244. it('sets long-running jobs to pending on destroy', async () => {
  245. const testingJobQueueStrategy = module.get(ConfigService).jobQueueOptions
  246. .jobQueueStrategy as TestingJobQueueStrategy;
  247. const subject = new Subject<boolean>();
  248. const testQueue = await jobQueueService.createQueue<string>({
  249. name: 'test',
  250. process: job => {
  251. return subject.pipe(take(1)).toPromise();
  252. },
  253. });
  254. const testJob = await testQueue.add('hello');
  255. await tick(queuePollInterval);
  256. expect((await testingJobQueueStrategy.findOne(testJob.id!))?.state).toBe(JobState.RUNNING);
  257. await testQueue.stop();
  258. expect((await testingJobQueueStrategy.findOne(testJob.id!))?.state).toBe(JobState.PENDING);
  259. }, 10000);
  260. it('should start a queue if its name is in the active list', async () => {
  261. module.get(ConfigService).jobQueueOptions.activeQueues = ['test'];
  262. const subject = new Subject();
  263. const testQueue = await jobQueueService.createQueue<string>({
  264. name: 'test',
  265. process: job => {
  266. return subject.toPromise();
  267. },
  268. });
  269. const testJob = await testQueue.add('hello');
  270. expect(testJob.state).toBe(JobState.PENDING);
  271. await tick(queuePollInterval);
  272. expect((await getJob(testJob)).state).toBe(JobState.RUNNING);
  273. subject.next('yay');
  274. subject.complete();
  275. await tick();
  276. expect((await getJob(testJob)).state).toBe(JobState.COMPLETED);
  277. expect((await getJob(testJob)).result).toBe('yay');
  278. });
  279. it('should not start a queue if its name is in the active list', async () => {
  280. module.get(ConfigService).jobQueueOptions.activeQueues = ['another'];
  281. const subject = new Subject();
  282. const testQueue = await jobQueueService.createQueue<string>({
  283. name: 'test',
  284. process: job => {
  285. return subject.toPromise();
  286. },
  287. });
  288. const testJob = await testQueue.add('hello');
  289. expect(testJob.state).toBe(JobState.PENDING);
  290. await tick(queuePollInterval);
  291. expect((await getJob(testJob)).state).toBe(JobState.PENDING);
  292. subject.next('yay');
  293. subject.complete();
  294. expect((await getJob(testJob)).state).toBe(JobState.PENDING);
  295. });
  296. describe('buffering', () => {
  297. class TestJobBuffer implements JobBuffer<string> {
  298. readonly id: 'test-job-buffer';
  299. collect(job: Job<string>): boolean | Promise<boolean> {
  300. return job.queueName === 'buffer-test-queue-1';
  301. }
  302. reduce(collectedJobs: Array<Job<string>>): Array<Job<string>> {
  303. const concated = collectedJobs.map(j => j.data).join(' ');
  304. return [
  305. new Job({
  306. ...collectedJobs[0],
  307. id: undefined,
  308. data: concated,
  309. }),
  310. ];
  311. }
  312. }
  313. let testQueue1: JobQueue<string>;
  314. let testQueue2: JobQueue<string>;
  315. const subject1 = new Subject();
  316. const subject2 = new Subject();
  317. const testJobBuffer = new TestJobBuffer();
  318. beforeEach(async () => {
  319. testQueue1 = await jobQueueService.createQueue({
  320. name: 'buffer-test-queue-1',
  321. process: job => {
  322. return subject1.toPromise();
  323. },
  324. });
  325. testQueue2 = await jobQueueService.createQueue({
  326. name: 'buffer-test-queue-2',
  327. process: job => {
  328. return subject2.toPromise();
  329. },
  330. });
  331. jobQueueService.addBuffer(testJobBuffer);
  332. });
  333. it('buffers the specified jobs', async () => {
  334. const testJob1_1 = await testQueue1.add('hello');
  335. const testJob1_2 = await testQueue1.add('world');
  336. const testJob2_1 = await testQueue2.add('foo');
  337. const testJob2_2 = await testQueue2.add('bar');
  338. await tick(queuePollInterval);
  339. expect(await getJob(testJob1_1)).toBeUndefined();
  340. expect(await getJob(testJob1_2)).toBeUndefined();
  341. expect((await getJob(testJob2_1)).state).toBe(JobState.RUNNING);
  342. expect((await getJob(testJob2_2)).state).toBe(JobState.RUNNING);
  343. const bufferedJobs = testJobBufferStorageStrategy.getBufferedJobs(testJobBuffer.id);
  344. expect(bufferedJobs.map(j => j.data)).toEqual(['hello', 'world']);
  345. });
  346. it('flushes and reduces buffered jobs', async () => {
  347. const result = await jobQueueService.flush(testJobBuffer);
  348. expect(result.length).toBe(1);
  349. expect(result[0].data).toBe('hello world');
  350. });
  351. });
  352. });
  353. function tick(ms: number = 0): Promise<void> {
  354. return new Promise<void>(resolve => {
  355. if (ms > 0) {
  356. setTimeout(resolve, ms);
  357. } else {
  358. process.nextTick(resolve);
  359. }
  360. });
  361. }
  362. @Injectable()
  363. class MockConfigService implements OnApplicationBootstrap, OnModuleDestroy {
  364. constructor(private moduleRef: ModuleRef) {}
  365. jobQueueOptions = {
  366. jobQueueStrategy: testJobQueueStrategy,
  367. activeQueues: [],
  368. jobBufferStorageStrategy: testJobBufferStorageStrategy,
  369. };
  370. systemOptions = {
  371. errorHandlers: [],
  372. };
  373. async onApplicationBootstrap() {
  374. const injector = new Injector(this.moduleRef);
  375. this.jobQueueOptions.jobQueueStrategy.init(injector);
  376. }
  377. async onModuleDestroy() {
  378. this.jobQueueOptions.jobQueueStrategy.destroy();
  379. }
  380. }