job-queue.e2e-spec.ts 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import { DefaultJobQueuePlugin, mergeConfig } from '@vendure/core';
  2. import { createTestEnvironment } from '@vendure/testing';
  3. import gql from 'graphql-tag';
  4. import path from 'path';
  5. import { initialData } from '../../../e2e-common/e2e-initial-data';
  6. import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
  7. import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
  8. import { CancelJob, GetRunningJobs, JobState } from './graphql/generated-e2e-admin-types';
  9. import { GET_RUNNING_JOBS } from './graphql/shared-definitions';
  10. describe('JobQueue', () => {
  11. if (testConfig.dbConnectionOptions.type === 'sqljs') {
  12. it.only('skip JobQueue tests for sqljs', () => {
  13. // The tests in this suite will fail when running on sqljs because
  14. // the DB state is not persisted after shutdown. In this case it is
  15. // an acceptable tradeoff to just skip them, since the other DB engines
  16. // _will_ run in CI, and sqljs is less of a production use-case anyway.
  17. return;
  18. });
  19. }
  20. const { server, adminClient } = createTestEnvironment(
  21. mergeConfig(testConfig, {
  22. plugins: [DefaultJobQueuePlugin, PluginWithJobQueue],
  23. }),
  24. );
  25. beforeAll(async () => {
  26. await server.init({
  27. initialData,
  28. productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-full.csv'),
  29. customerCount: 1,
  30. });
  31. await adminClient.asSuperAdmin();
  32. await sleep(1000);
  33. }, TEST_SETUP_TIMEOUT_MS);
  34. afterAll(async () => {
  35. PluginWithJobQueue.jobSubject.complete();
  36. await server.destroy();
  37. });
  38. function getJobsInTestQueue(state?: JobState) {
  39. return adminClient
  40. .query<GetRunningJobs.Query, GetRunningJobs.Variables>(GET_RUNNING_JOBS, {
  41. options: {
  42. filter: {
  43. queueName: {
  44. eq: 'test',
  45. },
  46. ...(state
  47. ? {
  48. state: { eq: state },
  49. }
  50. : {}),
  51. },
  52. },
  53. })
  54. .then(data => data.jobs);
  55. }
  56. let testJobId: string;
  57. it('creates and starts running a job', async () => {
  58. const restControllerUrl = `http://localhost:${testConfig.apiOptions.port}/run-job`;
  59. await adminClient.fetch(restControllerUrl);
  60. await sleep(300);
  61. const jobs = await getJobsInTestQueue();
  62. expect(jobs.items.length).toBe(1);
  63. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  64. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  65. testJobId = jobs.items[0].id;
  66. });
  67. it(
  68. 'shutdown server before completing job',
  69. async () => {
  70. await server.destroy();
  71. await server.bootstrap();
  72. await adminClient.asSuperAdmin();
  73. await sleep(300);
  74. const jobs = await getJobsInTestQueue();
  75. expect(jobs.items.length).toBe(1);
  76. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  77. expect(jobs.items[0].id).toBe(testJobId);
  78. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  79. },
  80. TEST_SETUP_TIMEOUT_MS,
  81. );
  82. it('complete job after restart', async () => {
  83. PluginWithJobQueue.jobSubject.next();
  84. await sleep(300);
  85. const jobs = await getJobsInTestQueue();
  86. expect(jobs.items.length).toBe(1);
  87. expect(jobs.items[0].state).toBe(JobState.COMPLETED);
  88. expect(jobs.items[0].id).toBe(testJobId);
  89. expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
  90. });
  91. it('cancels a running job', async () => {
  92. PluginWithJobQueue.jobHasDoneWork = false;
  93. const restControllerUrl = `http://localhost:${testConfig.apiOptions.port}/run-job`;
  94. await adminClient.fetch(restControllerUrl);
  95. await sleep(300);
  96. const jobs = await getJobsInTestQueue(JobState.RUNNING);
  97. expect(jobs.items.length).toBe(1);
  98. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  99. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  100. const jobId = jobs.items[0].id;
  101. const { cancelJob } = await adminClient.query<CancelJob.Mutation, CancelJob.Variables>(CANCEL_JOB, {
  102. id: jobId,
  103. });
  104. expect(cancelJob.state).toBe(JobState.CANCELLED);
  105. expect(cancelJob.isSettled).toBe(true);
  106. expect(cancelJob.settledAt).not.toBeNull();
  107. const jobs2 = await getJobsInTestQueue(JobState.CANCELLED);
  108. expect(jobs.items.length).toBe(1);
  109. expect(jobs.items[0].id).toBe(jobId);
  110. });
  111. });
  112. function sleep(ms: number): Promise<void> {
  113. return new Promise(resolve => setTimeout(resolve, ms));
  114. }
  115. const CANCEL_JOB = gql`
  116. mutation CancelJob($id: ID!) {
  117. cancelJob(jobId: $id) {
  118. id
  119. state
  120. isSettled
  121. settledAt
  122. }
  123. }
  124. `;