bullmq-job-queue-plugin.e2e-spec.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import { JobState } from '@vendure/common/lib/generated-types';
  2. import { AutoIncrementIdStrategy, JobQueueService, mergeConfig } from '@vendure/core';
  3. import { PluginWithJobQueue } from '@vendure/core/e2e/fixtures/test-plugins/with-job-queue';
  4. import {
  5. CancelJobDocument,
  6. GetRunningJobsDocument,
  7. } from '@vendure/core/e2e/graphql/generated-e2e-admin-types';
  8. import { createTestEnvironment } from '@vendure/testing';
  9. import { removeAllQueueData } from 'bullmq';
  10. import IORedis from 'ioredis';
  11. import path from 'path';
  12. import { afterAll, beforeAll, describe, expect, it } from 'vitest';
  13. import { initialData } from '../../../e2e-common/e2e-initial-data';
  14. import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
  15. import { BullMQJobQueuePlugin } from '../src/bullmq/plugin';
  16. describe('BullMQJobQueuePlugin', () => {
  17. const redisConnection = {
  18. host: '127.0.0.1',
  19. port: process.env.CI ? +(process.env.E2E_REDIS_PORT || 6379) : 6379,
  20. maxRetriesPerRequest: null,
  21. };
  22. const activeConfig = mergeConfig(testConfig(), {
  23. entityOptions: {
  24. entityIdStrategy: new AutoIncrementIdStrategy(),
  25. },
  26. plugins: [
  27. BullMQJobQueuePlugin.init({
  28. connection: redisConnection,
  29. workerOptions: {
  30. prefix: 'e2e',
  31. },
  32. queueOptions: {
  33. prefix: 'e2e',
  34. defaultJobOptions: {
  35. attempts: 3,
  36. },
  37. },
  38. gracefulShutdownTimeout: 1_000,
  39. }),
  40. PluginWithJobQueue,
  41. ],
  42. });
  43. const { server, adminClient } = createTestEnvironment(activeConfig);
  44. beforeAll(async () => {
  45. await removeAllQueueData(new IORedis(redisConnection), 'vendure-queue-test', 'e2e');
  46. await server.init({
  47. initialData,
  48. productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-minimal.csv'),
  49. customerCount: 1,
  50. });
  51. await adminClient.asSuperAdmin();
  52. await sleep(1_000);
  53. }, TEST_SETUP_TIMEOUT_MS);
  54. afterAll(async () => {
  55. PluginWithJobQueue.jobSubject.complete();
  56. await server.destroy();
  57. });
  58. function getJobsInTestQueue(state?: JobState) {
  59. return adminClient
  60. .query(GetRunningJobsDocument, {
  61. options: {
  62. filter: {
  63. queueName: {
  64. eq: 'test',
  65. },
  66. ...(state
  67. ? {
  68. state: { eq: state },
  69. }
  70. : {}),
  71. },
  72. },
  73. })
  74. .then(data => data.jobs);
  75. }
  76. let testJobId: string;
  77. it('creates and starts running a job', async () => {
  78. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job?retries=2`;
  79. await adminClient.fetch(restControllerUrl);
  80. await sleep(300);
  81. const jobs = await getJobsInTestQueue();
  82. expect(jobs.items.length).toBe(1);
  83. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  84. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  85. testJobId = jobs.items[0].id;
  86. });
  87. it(
  88. 'shutdown server before completing job',
  89. async () => {
  90. await server.destroy();
  91. await server.bootstrap();
  92. await adminClient.asSuperAdmin();
  93. await sleep(300);
  94. const jobs = await getJobsInTestQueue();
  95. expect(jobs.items.length).toBe(1);
  96. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  97. expect(jobs.items[0].id).toBe(testJobId);
  98. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  99. },
  100. TEST_SETUP_TIMEOUT_MS,
  101. );
  102. it('complete job after restart', async () => {
  103. PluginWithJobQueue.jobSubject.next();
  104. await sleep(300);
  105. const jobs = await getJobsInTestQueue();
  106. expect(jobs.items.length).toBe(1);
  107. expect(jobs.items[0].state).toBe(JobState.COMPLETED);
  108. expect(jobs.items[0].id).toBe(testJobId);
  109. expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
  110. });
  111. it('cancels a running job', async () => {
  112. PluginWithJobQueue.jobHasDoneWork = false;
  113. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job`;
  114. await adminClient.fetch(restControllerUrl);
  115. await sleep(300);
  116. const jobs = await getJobsInTestQueue(JobState.RUNNING);
  117. expect(jobs.items.length).toBe(1);
  118. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  119. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  120. const jobId = jobs.items[0].id;
  121. const { cancelJob } = await adminClient.query(CancelJobDocument, {
  122. id: jobId,
  123. });
  124. expect(cancelJob.state).toBe(JobState.CANCELLED);
  125. expect(cancelJob.isSettled).toBe(true);
  126. expect(cancelJob.settledAt).not.toBeNull();
  127. await sleep(300);
  128. const jobs2 = await getJobsInTestQueue(JobState.CANCELLED);
  129. expect(jobs2.items.length).toBe(1);
  130. expect(jobs2.items[0].id).toBe(jobId);
  131. PluginWithJobQueue.jobSubject.next();
  132. });
  133. // it('subscribe to result of job', async () => {
  134. // const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe`;
  135. // const result = await adminClient.fetch(restControllerUrl);
  136. //
  137. // expect(await result.text()).toBe('42!');
  138. // const jobs = await getJobsInTestQueue(JobState.RUNNING);
  139. // expect(jobs.items.length).toBe(0);
  140. // });
  141. });
  142. function sleep(ms: number): Promise<void> {
  143. return new Promise(resolve => setTimeout(resolve, ms));
  144. }