Browse Source

test(core): Add e2e tests for per-queue concurrency feature

Add e2e tests to verify that the DefaultJobQueuePlugin correctly
supports function-based per-queue concurrency configuration. The tests
verify:
- Slow queue respects concurrency limit of 1
- Fast queue can process multiple jobs concurrently (up to 3)
- All jobs complete successfully

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Will Nahmens 4 days ago
parent
commit
ec464c21c0
1 changed files with 140 additions and 0 deletions
  1. 140 0
      packages/core/e2e/job-queue-concurrency.e2e-spec.ts

+ 140 - 0
packages/core/e2e/job-queue-concurrency.e2e-spec.ts

@@ -0,0 +1,140 @@
+import { OnApplicationBootstrap } from '@nestjs/common';
+import {
+    DefaultJobQueuePlugin,
+    JobQueue,
+    JobQueueService,
+    mergeConfig,
+    PluginCommonModule,
+    VendurePlugin,
+} from '@vendure/core';
+import { createTestEnvironment } from '@vendure/testing';
+import path from 'path';
+import { afterAll, beforeAll, describe, expect, it } from 'vitest';
+
+import { initialData } from '../../../e2e-common/e2e-initial-data';
+import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+
+@VendurePlugin({
+    imports: [PluginCommonModule],
+})
+class ConcurrencyTestPlugin implements OnApplicationBootstrap {
+    static slowQueueMaxConcurrent = 0;
+    static fastQueueMaxConcurrent = 0;
+    static slowQueueCurrent = 0;
+    static fastQueueCurrent = 0;
+    static processedJobs: string[] = [];
+    static slowQueue: JobQueue<{ id: number }>;
+    static fastQueue: JobQueue<{ id: number }>;
+
+    constructor(private jobQueueService: JobQueueService) {}
+
+    async onApplicationBootstrap() {
+        ConcurrencyTestPlugin.slowQueue = await this.jobQueueService.createQueue({
+            name: 'test-slow-queue',
+            process: async job => {
+                ConcurrencyTestPlugin.slowQueueCurrent++;
+                ConcurrencyTestPlugin.slowQueueMaxConcurrent = Math.max(
+                    ConcurrencyTestPlugin.slowQueueMaxConcurrent,
+                    ConcurrencyTestPlugin.slowQueueCurrent,
+                );
+                await new Promise(resolve => setTimeout(resolve, 100));
+                ConcurrencyTestPlugin.slowQueueCurrent--;
+                ConcurrencyTestPlugin.processedJobs.push(`slow-${job.data.id}`);
+                return job.data;
+            },
+        });
+
+        ConcurrencyTestPlugin.fastQueue = await this.jobQueueService.createQueue({
+            name: 'test-fast-queue',
+            process: async job => {
+                ConcurrencyTestPlugin.fastQueueCurrent++;
+                ConcurrencyTestPlugin.fastQueueMaxConcurrent = Math.max(
+                    ConcurrencyTestPlugin.fastQueueMaxConcurrent,
+                    ConcurrencyTestPlugin.fastQueueCurrent,
+                );
+                await new Promise(resolve => setTimeout(resolve, 100));
+                ConcurrencyTestPlugin.fastQueueCurrent--;
+                ConcurrencyTestPlugin.processedJobs.push(`fast-${job.data.id}`);
+                return job.data;
+            },
+        });
+    }
+
+    static reset() {
+        this.slowQueueMaxConcurrent = 0;
+        this.fastQueueMaxConcurrent = 0;
+        this.slowQueueCurrent = 0;
+        this.fastQueueCurrent = 0;
+        this.processedJobs = [];
+    }
+}
+
+describe('Job queue per-queue concurrency', () => {
+    const activeConfig = testConfig();
+    if (activeConfig.dbConnectionOptions.type === 'sqljs') {
+        it.only('skip per-queue concurrency tests for sqljs', () => {
+            // The tests in this suite will fail when running on sqljs because
+            // the DB state is not persisted correctly with the polling nature
+            // of the SQL job queue strategy.
+            return;
+        });
+    }
+
+    const { server, adminClient } = createTestEnvironment(
+        mergeConfig(activeConfig, {
+            plugins: [
+                DefaultJobQueuePlugin.init({
+                    pollInterval: 50,
+                    concurrency: (queueName: string) => {
+                        if (queueName === 'test-slow-queue') {
+                            return 1;
+                        }
+                        return 3;
+                    },
+                }),
+                ConcurrencyTestPlugin,
+            ],
+        }),
+    );
+
+    beforeAll(async () => {
+        await server.init({
+            initialData,
+            productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-empty.csv'),
+            customerCount: 0,
+        });
+        await adminClient.asSuperAdmin();
+    }, TEST_SETUP_TIMEOUT_MS);
+
+    afterAll(async () => {
+        await server.destroy();
+    });
+
+    it('should respect per-queue concurrency limits', async () => {
+        ConcurrencyTestPlugin.reset();
+
+        // Add 5 jobs to each queue
+        const jobPromises: Array<Promise<any>> = [];
+        for (let i = 0; i < 5; i++) {
+            jobPromises.push(ConcurrencyTestPlugin.slowQueue.add({ id: i }));
+            jobPromises.push(ConcurrencyTestPlugin.fastQueue.add({ id: i }));
+        }
+        await Promise.all(jobPromises);
+
+        // Wait for all jobs to complete (5 jobs * 100ms each / concurrency + buffer)
+        // slow-queue: 5 jobs / 1 concurrency = 500ms
+        // fast-queue: 5 jobs / 3 concurrency = ~200ms
+        await new Promise(resolve => setTimeout(resolve, 1500));
+
+        // Verify slow queue never exceeded concurrency of 1
+        expect(ConcurrencyTestPlugin.slowQueueMaxConcurrent).toBe(1);
+
+        // Verify fast queue processed multiple jobs concurrently
+        expect(ConcurrencyTestPlugin.fastQueueMaxConcurrent).toBeGreaterThan(1);
+        expect(ConcurrencyTestPlugin.fastQueueMaxConcurrent).toBeLessThanOrEqual(3);
+
+        // Verify all jobs were processed
+        expect(ConcurrencyTestPlugin.processedJobs.filter(j => j.startsWith('slow-')).length).toBe(5);
+        expect(ConcurrencyTestPlugin.processedJobs.filter(j => j.startsWith('fast-')).length).toBe(5);
+    });
+});