Browse Source

fix(core): Backoff strategy does not block next jobs

Fixes #832
Michael Bromley 4 years ago
parent
commit
709cdff0a2

+ 6 - 2
packages/core/src/job-queue/in-memory-job-queue-strategy.ts

@@ -100,9 +100,13 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
         return ids.map(id => this.jobs.get(id)).filter(notNullOrUndefined);
     }
 
-    async next(queueName: string): Promise<Job | undefined> {
+    async next(queueName: string, waitingJobs: Job[] = []): Promise<Job | undefined> {
         this.checkProcessContext();
-        const next = this.unsettledJobs[queueName]?.shift();
+        const nextIndex = this.unsettledJobs[queueName]?.findIndex(item => !waitingJobs.includes(item.job));
+        if (nextIndex === -1) {
+            return;
+        }
+        const next = this.unsettledJobs[queueName]?.splice(nextIndex, 1)[0];
         if (next) {
             if (next.job.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') {
                 const msSinceLastFailure = Date.now() - +next.updatedAt;

+ 9 - 1
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -73,6 +73,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         manager: EntityManager,
         queueName: string,
         setLock: boolean,
+        waitingJobIds: ID[] = [],
     ): Promise<Job | undefined> {
         const qb = manager
             .getRepository(JobRecord)
@@ -87,6 +88,10 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
             )
             .orderBy('record.createdAt', 'ASC');
 
+        if (waitingJobIds.length) {
+            qb.andWhere('record.id NOT IN (:...waitingJobIds)', { waitingJobIds });
+        }
+
         if (setLock) {
             qb.setLock('pessimistic_write');
         }
@@ -97,7 +102,10 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
                 const msSinceLastFailure = Date.now() - +record.updatedAt;
                 const backOffDelayMs = this.backOffStrategy(queueName, record.attempts, job);
                 if (msSinceLastFailure < backOffDelayMs) {
-                    return;
+                    return await this.getNextAndSetAsRunning(manager, queueName, setLock, [
+                        ...waitingJobIds,
+                        record.id,
+                    ]);
                 }
             }
             job.start();