Browse Source

test(core): Add unit tests for job buffering

Michael Bromley 4 years ago
parent
commit
06d7d519cb

+ 12 - 0
packages/core/src/job-queue/job-buffer/testing-job-buffer-storage-strategy.ts

@@ -0,0 +1,12 @@
+import { Job } from '../job';
+
+import { InMemoryJobBufferStorageStrategy } from './in-memory-job-buffer-storage-strategy';
+
+/**
+ * This strategy is only intended to be used for automated testing.
+ */
+export class TestingJobBufferStorageStrategy extends InMemoryJobBufferStorageStrategy {
+    getBufferedJobs(bufferId: string): Job[] {
+        return Array.from(this.bufferStorage.get(bufferId) ?? []);
+    }
+}

+ 73 - 0
packages/core/src/job-queue/job-queue.service.spec.ts

@@ -11,7 +11,10 @@ import { ConfigService } from '../config/config.service';
 import { ProcessContext, setProcessContext } from '../process-context/process-context';
 import { ProcessContext, setProcessContext } from '../process-context/process-context';
 
 
 import { Job } from './job';
 import { Job } from './job';
+import { JobBuffer } from './job-buffer/job-buffer';
 import { JobBufferService } from './job-buffer/job-buffer.service';
 import { JobBufferService } from './job-buffer/job-buffer.service';
+import { TestingJobBufferStorageStrategy } from './job-buffer/testing-job-buffer-storage-strategy';
+import { JobQueue } from './job-queue';
 import { JobQueueService } from './job-queue.service';
 import { JobQueueService } from './job-queue.service';
 import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
 import { TestingJobQueueStrategy } from './testing-job-queue-strategy';
 
 
@@ -22,6 +25,7 @@ const testJobQueueStrategy = new TestingJobQueueStrategy({
     pollInterval: queuePollInterval,
     pollInterval: queuePollInterval,
     backoffStrategy: backoffStrategySpy.mockReturnValue(0),
     backoffStrategy: backoffStrategySpy.mockReturnValue(0),
 });
 });
+const testJobBufferStorageStrategy = new TestingJobBufferStorageStrategy();
 
 
 describe('JobQueueService', () => {
 describe('JobQueueService', () => {
     let jobQueueService: JobQueueService;
     let jobQueueService: JobQueueService;
@@ -368,6 +372,74 @@ describe('JobQueueService', () => {
 
 
         expect((await getJob(testJob)).state).toBe(JobState.PENDING);
         expect((await getJob(testJob)).state).toBe(JobState.PENDING);
     });
     });
+
+    describe('buffering', () => {
+        class TestJobBuffer implements JobBuffer<string> {
+            readonly id: 'test-job-buffer';
+
+            collect(job: Job<string>): boolean | Promise<boolean> {
+                return job.queueName === 'buffer-test-queue-1';
+            }
+
+            reduce(collectedJobs: Array<Job<string>>): Array<Job<string>> {
+                const concated = collectedJobs.map(j => j.data).join(' ');
+                return [
+                    new Job({
+                        ...collectedJobs[0],
+                        id: undefined,
+                        data: concated,
+                    }),
+                ];
+            }
+        }
+
+        let testQueue1: JobQueue<string>;
+        let testQueue2: JobQueue<string>;
+        const subject1 = new Subject();
+        const subject2 = new Subject();
+        const testJobBuffer = new TestJobBuffer();
+
+        beforeEach(async () => {
+            testQueue1 = await jobQueueService.createQueue({
+                name: 'buffer-test-queue-1',
+                process: job => {
+                    return subject1.toPromise();
+                },
+            });
+            testQueue2 = await jobQueueService.createQueue({
+                name: 'buffer-test-queue-2',
+                process: job => {
+                    return subject2.toPromise();
+                },
+            });
+
+            jobQueueService.addBuffer(testJobBuffer);
+        });
+
+        it('buffers the specified jobs', async () => {
+            const testJob1_1 = await testQueue1.add('hello');
+            const testJob1_2 = await testQueue1.add('world');
+            const testJob2_1 = await testQueue2.add('foo');
+            const testJob2_2 = await testQueue2.add('bar');
+
+            await tick(queuePollInterval);
+            expect(await getJob(testJob1_1)).toBeUndefined();
+            expect(await getJob(testJob1_2)).toBeUndefined();
+
+            expect((await getJob(testJob2_1)).state).toBe(JobState.RUNNING);
+            expect((await getJob(testJob2_2)).state).toBe(JobState.RUNNING);
+
+            const bufferedJobs = testJobBufferStorageStrategy.getBufferedJobs(testJobBuffer.id);
+            expect(bufferedJobs.map(j => j.data)).toEqual(['hello', 'world']);
+        });
+
+        it('flushes and reduces buffered jobs', async () => {
+            const result = await jobQueueService.flush(testJobBuffer);
+
+            expect(result.length).toBe(1);
+            expect(result[0].data).toBe('hello world');
+        });
+    });
 });
 });
 
 
 function tick(ms: number = 0): Promise<void> {
 function tick(ms: number = 0): Promise<void> {
@@ -387,6 +459,7 @@ class MockConfigService implements OnApplicationBootstrap, OnModuleDestroy {
     jobQueueOptions = {
     jobQueueOptions = {
         jobQueueStrategy: testJobQueueStrategy,
         jobQueueStrategy: testJobQueueStrategy,
         activeQueues: [],
         activeQueues: [],
+        jobBufferStorageStrategy: testJobBufferStorageStrategy,
     };
     };
 
 
     async onApplicationBootstrap() {
     async onApplicationBootstrap() {