job-queue-concurrency.e2e-spec.ts 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import { OnApplicationBootstrap } from '@nestjs/common';
  2. import {
  3. DefaultJobQueuePlugin,
  4. JobQueue,
  5. JobQueueService,
  6. mergeConfig,
  7. PluginCommonModule,
  8. VendurePlugin,
  9. } from '@vendure/core';
  10. import { createTestEnvironment } from '@vendure/testing';
  11. import path from 'path';
  12. import { afterAll, beforeAll, describe, expect, it } from 'vitest';
  13. import { initialData } from '../../../e2e-common/e2e-initial-data';
  14. import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
  15. @VendurePlugin({
  16. imports: [PluginCommonModule],
  17. })
  18. class ConcurrencyTestPlugin implements OnApplicationBootstrap {
  19. static slowQueueMaxConcurrent = 0;
  20. static fastQueueMaxConcurrent = 0;
  21. static slowQueueCurrent = 0;
  22. static fastQueueCurrent = 0;
  23. static processedJobs: string[] = [];
  24. static slowQueue: JobQueue<{ id: number }>;
  25. static fastQueue: JobQueue<{ id: number }>;
  26. constructor(private jobQueueService: JobQueueService) {}
  27. async onApplicationBootstrap() {
  28. ConcurrencyTestPlugin.slowQueue = await this.jobQueueService.createQueue({
  29. name: 'test-slow-queue',
  30. process: async job => {
  31. ConcurrencyTestPlugin.slowQueueCurrent++;
  32. ConcurrencyTestPlugin.slowQueueMaxConcurrent = Math.max(
  33. ConcurrencyTestPlugin.slowQueueMaxConcurrent,
  34. ConcurrencyTestPlugin.slowQueueCurrent,
  35. );
  36. await new Promise(resolve => setTimeout(resolve, 100));
  37. ConcurrencyTestPlugin.slowQueueCurrent--;
  38. ConcurrencyTestPlugin.processedJobs.push(`slow-${job.data.id}`);
  39. return job.data;
  40. },
  41. });
  42. ConcurrencyTestPlugin.fastQueue = await this.jobQueueService.createQueue({
  43. name: 'test-fast-queue',
  44. process: async job => {
  45. ConcurrencyTestPlugin.fastQueueCurrent++;
  46. ConcurrencyTestPlugin.fastQueueMaxConcurrent = Math.max(
  47. ConcurrencyTestPlugin.fastQueueMaxConcurrent,
  48. ConcurrencyTestPlugin.fastQueueCurrent,
  49. );
  50. await new Promise(resolve => setTimeout(resolve, 100));
  51. ConcurrencyTestPlugin.fastQueueCurrent--;
  52. ConcurrencyTestPlugin.processedJobs.push(`fast-${job.data.id}`);
  53. return job.data;
  54. },
  55. });
  56. }
  57. static reset() {
  58. this.slowQueueMaxConcurrent = 0;
  59. this.fastQueueMaxConcurrent = 0;
  60. this.slowQueueCurrent = 0;
  61. this.fastQueueCurrent = 0;
  62. this.processedJobs = [];
  63. }
  64. }
  65. describe('Job queue per-queue concurrency', () => {
  66. const activeConfig = testConfig();
  67. if (activeConfig.dbConnectionOptions.type === 'sqljs') {
  68. it.only('skip per-queue concurrency tests for sqljs', () => {
  69. // The tests in this suite will fail when running on sqljs because
  70. // the DB state is not persisted correctly with the polling nature
  71. // of the SQL job queue strategy.
  72. return;
  73. });
  74. }
  75. const { server, adminClient } = createTestEnvironment(
  76. mergeConfig(activeConfig, {
  77. plugins: [
  78. DefaultJobQueuePlugin.init({
  79. pollInterval: 50,
  80. concurrency: (queueName: string) => {
  81. if (queueName === 'test-slow-queue') {
  82. return 1;
  83. }
  84. return 3;
  85. },
  86. }),
  87. ConcurrencyTestPlugin,
  88. ],
  89. }),
  90. );
  91. beforeAll(async () => {
  92. await server.init({
  93. initialData,
  94. productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-empty.csv'),
  95. customerCount: 0,
  96. });
  97. await adminClient.asSuperAdmin();
  98. }, TEST_SETUP_TIMEOUT_MS);
  99. afterAll(async () => {
  100. await server.destroy();
  101. });
  102. it('should respect per-queue concurrency limits', async () => {
  103. ConcurrencyTestPlugin.reset();
  104. // Add 5 jobs to each queue
  105. const jobPromises: Array<Promise<any>> = [];
  106. for (let i = 0; i < 5; i++) {
  107. jobPromises.push(ConcurrencyTestPlugin.slowQueue.add({ id: i }));
  108. jobPromises.push(ConcurrencyTestPlugin.fastQueue.add({ id: i }));
  109. }
  110. await Promise.all(jobPromises);
  111. // Wait for all jobs to complete (5 jobs * 100ms each / concurrency + buffer)
  112. // slow-queue: 5 jobs / 1 concurrency = 500ms
  113. // fast-queue: 5 jobs / 3 concurrency = ~200ms
  114. await new Promise(resolve => setTimeout(resolve, 1500));
  115. // Verify slow queue never exceeded concurrency of 1
  116. expect(ConcurrencyTestPlugin.slowQueueMaxConcurrent).toBe(1);
  117. // Verify fast queue processed multiple jobs concurrently
  118. expect(ConcurrencyTestPlugin.fastQueueMaxConcurrent).toBeGreaterThan(1);
  119. expect(ConcurrencyTestPlugin.fastQueueMaxConcurrent).toBeLessThanOrEqual(3);
  120. // Verify all jobs were processed
  121. expect(ConcurrencyTestPlugin.processedJobs.filter(j => j.startsWith('slow-')).length).toBe(5);
  122. expect(ConcurrencyTestPlugin.processedJobs.filter(j => j.startsWith('fast-')).length).toBe(5);
  123. });
  124. });