job-queue.e2e-spec.ts 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import { DefaultJobQueuePlugin, mergeConfig } from '@vendure/core';
  2. import { createTestEnvironment } from '@vendure/testing';
  3. import gql from 'graphql-tag';
  4. import path from 'path';
  5. import { afterAll, beforeAll, describe, expect, it } from 'vitest';
  6. import { initialData } from '../../../e2e-common/e2e-initial-data';
  7. import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
  8. import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
  9. import {
  10. CancelJobMutation,
  11. CancelJobMutationVariables,
  12. GetRunningJobsQuery,
  13. GetRunningJobsQueryVariables,
  14. JobState,
  15. } from './graphql/generated-e2e-admin-types';
  16. import { GET_RUNNING_JOBS } from './graphql/shared-definitions';
  17. describe('JobQueue', () => {
  18. const activeConfig = testConfig();
  19. if (activeConfig.dbConnectionOptions.type === 'sqljs') {
  20. it.only('skip JobQueue tests for sqljs', () => {
  21. // The tests in this suite will fail when running on sqljs because
  22. // the DB state is not persisted after shutdown. In this case it is
  23. // an acceptable tradeoff to just skip them, since the other DB engines
  24. // _will_ run in CI, and sqljs is less of a production use-case anyway.
  25. return;
  26. });
  27. }
  28. const { server, adminClient } = createTestEnvironment(
  29. mergeConfig(activeConfig, {
  30. plugins: [
  31. DefaultJobQueuePlugin.init({
  32. pollInterval: 50,
  33. gracefulShutdownTimeout: 1_000,
  34. }),
  35. PluginWithJobQueue,
  36. ],
  37. }),
  38. );
  39. beforeAll(async () => {
  40. await server.init({
  41. initialData,
  42. productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-full.csv'),
  43. customerCount: 1,
  44. });
  45. await adminClient.asSuperAdmin();
  46. await sleep(1000);
  47. }, TEST_SETUP_TIMEOUT_MS);
  48. afterAll(async () => {
  49. PluginWithJobQueue.jobSubject.complete();
  50. await server.destroy();
  51. });
  52. function getJobsInTestQueue(state?: JobState) {
  53. return adminClient
  54. .query<GetRunningJobsQuery, GetRunningJobsQueryVariables>(GET_RUNNING_JOBS, {
  55. options: {
  56. filter: {
  57. queueName: {
  58. eq: 'test',
  59. },
  60. ...(state
  61. ? {
  62. state: { eq: state },
  63. }
  64. : {}),
  65. },
  66. },
  67. })
  68. .then(data => data.jobs);
  69. }
  70. let testJobId: string;
  71. it('creates and starts running a job', async () => {
  72. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job`;
  73. await adminClient.fetch(restControllerUrl);
  74. await sleep(300);
  75. const jobs = await getJobsInTestQueue();
  76. expect(jobs.items.length).toBe(1);
  77. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  78. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  79. testJobId = jobs.items[0].id;
  80. });
  81. it(
  82. 'shutdown server before completing job',
  83. async () => {
  84. await server.destroy();
  85. await server.bootstrap();
  86. await adminClient.asSuperAdmin();
  87. await sleep(300);
  88. const jobs = await getJobsInTestQueue();
  89. expect(jobs.items.length).toBe(1);
  90. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  91. expect(jobs.items[0].id).toBe(testJobId);
  92. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  93. },
  94. TEST_SETUP_TIMEOUT_MS,
  95. );
  96. it('complete job after restart', async () => {
  97. PluginWithJobQueue.jobSubject.next();
  98. await sleep(300);
  99. const jobs = await getJobsInTestQueue();
  100. expect(jobs.items.length).toBe(1);
  101. expect(jobs.items[0].state).toBe(JobState.COMPLETED);
  102. expect(jobs.items[0].id).toBe(testJobId);
  103. expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
  104. });
  105. it('cancels a running job', async () => {
  106. PluginWithJobQueue.jobHasDoneWork = false;
  107. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job`;
  108. await adminClient.fetch(restControllerUrl);
  109. await sleep(300);
  110. const jobs = await getJobsInTestQueue(JobState.RUNNING);
  111. expect(jobs.items.length).toBe(1);
  112. expect(jobs.items[0].state).toBe(JobState.RUNNING);
  113. expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
  114. const jobId = jobs.items[0].id;
  115. const { cancelJob } = await adminClient.query<CancelJobMutation, CancelJobMutationVariables>(
  116. CANCEL_JOB,
  117. {
  118. id: jobId,
  119. },
  120. );
  121. expect(cancelJob.state).toBe(JobState.CANCELLED);
  122. expect(cancelJob.isSettled).toBe(true);
  123. expect(cancelJob.settledAt).not.toBeNull();
  124. const jobs2 = await getJobsInTestQueue(JobState.CANCELLED);
  125. expect(jobs.items.length).toBe(1);
  126. expect(jobs.items[0].id).toBe(jobId);
  127. PluginWithJobQueue.jobSubject.next();
  128. });
  129. it('subscribe to result of job', async () => {
  130. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe`;
  131. const result = await adminClient.fetch(restControllerUrl);
  132. expect(await result.text()).toBe('42!');
  133. const jobs = await getJobsInTestQueue(JobState.RUNNING);
  134. expect(jobs.items.length).toBe(0);
  135. });
  136. it('subscribable that times out', async () => {
  137. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe-timeout`;
  138. const result = await adminClient.fetch(restControllerUrl);
  139. expect(result.status).toBe(200);
  140. expect(await result.text()).toBe('Job subscription timed out. The job may still be running');
  141. const jobs = await getJobsInTestQueue(JobState.RUNNING);
  142. expect(jobs.items.length).toBe(1);
  143. });
  144. it('server still running after timeout', async () => {
  145. const jobs = await getJobsInTestQueue(JobState.RUNNING);
  146. expect(jobs.items.length).toBe(1);
  147. });
  148. // https://github.com/vendure-ecommerce/vendure/issues/4112
  149. it('updates() should emit multiple times until job completes', async () => {
  150. const restControllerUrl = `http://localhost:${activeConfig.apiOptions.port}/run-job/subscribe-all-updates`;
  151. const response = await adminClient.fetch(restControllerUrl);
  152. const result = JSON.parse(await response.text());
  153. // Job does 4 progress steps (25%, 50%, 75%, 100%), so we should see at least 3 distinct updates
  154. expect(result.updateCount).toBeGreaterThanOrEqual(3);
  155. // Final state should be COMPLETED
  156. expect(result.finalState).toBe(JobState.COMPLETED);
  157. // Final result should be what the job returned
  158. expect(result.finalResult).toBe('completed');
  159. });
  160. });
  161. function sleep(ms: number): Promise<void> {
  162. return new Promise(resolve => setTimeout(resolve, ms));
  163. }
  164. const CANCEL_JOB = gql`
  165. mutation CancelJob($id: ID!) {
  166. cancelJob(jobId: $id) {
  167. id
  168. state
  169. isSettled
  170. settledAt
  171. }
  172. }
  173. `;