Quellcode durchsuchen

test(core): Fix failing job queue tests

Michael Bromley vor 1 Jahr
Ursprung
Commit
3cf61bd90f

+ 4 - 5
packages/core/src/job-queue/polling-job-queue-strategy.ts

@@ -145,10 +145,10 @@ class ActiveQueue<Data extends JobData<Data> = object> {
         void runNextJobs();
         void runNextJobs();
     }
     }
 
 
-    async stop(): Promise<void> {
+    async stop(stopActiveQueueTimeout = 20_000): Promise<void> {
         this.running = false;
         this.running = false;
         clearTimeout(this.timer);
         clearTimeout(this.timer);
-        await this.awaitRunningJobsOrTimeout();
+        await this.awaitRunningJobsOrTimeout(stopActiveQueueTimeout);
         Logger.info(`Stopped queue: ${this.queueName}`);
         Logger.info(`Stopped queue: ${this.queueName}`);
         // Allow any job status changes to be persisted
         // Allow any job status changes to be persisted
         // before we permit the application shutdown to continue.
         // before we permit the application shutdown to continue.
@@ -157,9 +157,8 @@ class ActiveQueue<Data extends JobData<Data> = object> {
         await new Promise(resolve => setTimeout(resolve, 1000));
         await new Promise(resolve => setTimeout(resolve, 1000));
     }
     }
 
 
-    private awaitRunningJobsOrTimeout(): Promise<void> {
+    private awaitRunningJobsOrTimeout(stopActiveQueueTimeout = 20_000): Promise<void> {
         const start = +new Date();
         const start = +new Date();
-        const stopActiveQueueTimeout = 20_000;
         let timeout: ReturnType<typeof setTimeout>;
         let timeout: ReturnType<typeof setTimeout>;
         return new Promise(resolve => {
         return new Promise(resolve => {
             let lastStatusUpdate = +new Date();
             let lastStatusUpdate = +new Date();
@@ -228,7 +227,7 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
     public setRetries: (queueName: string, job: Job) => number;
     public setRetries: (queueName: string, job: Job) => number;
     public backOffStrategy?: BackoffStrategy;
     public backOffStrategy?: BackoffStrategy;
 
 
-    private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
+    protected activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
 
 
     constructor(config?: PollingJobQueueStrategyConfig);
     constructor(config?: PollingJobQueueStrategyConfig);
     constructor(concurrency?: number, pollInterval?: number);
     constructor(concurrency?: number, pollInterval?: number);

+ 12 - 0
packages/core/src/job-queue/testing-job-queue-strategy.ts

@@ -1,5 +1,6 @@
 import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy';
 import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy';
 import { Job } from './job';
 import { Job } from './job';
+import { JobData } from './types';
 
 
 /**
 /**
  * @description
  * @description
@@ -11,4 +12,15 @@ export class TestingJobQueueStrategy extends InMemoryJobQueueStrategy {
             await this.add(job);
             await this.add(job);
         }
         }
     }
     }
+
+    override async stop<Data extends JobData<Data> = object>(
+        queueName: string,
+        process: (job: Job<Data>) => Promise<any>,
+    ) {
+        const active = this.activeQueues.getAndDelete(queueName, process);
+        if (!active) {
+            return;
+        }
+        await active.stop(1_000);
+    }
 }
 }