Browse Source

fix(core): Fix error in SqlJobQueueStrategy when using SQLite

Michael Bromley 4 years ago
parent
commit
c7758220cd

+ 12 - 8
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -54,7 +54,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         return new Promise(async (resolve, reject) => {
             if (isSQLite) {
                 // SQLite driver does not support concurrent transactions. See https://github.com/typeorm/typeorm/issues/1884
-                const result = await this.getNextAndSetAsRunning(connection.manager, queueName);
+                const result = await this.getNextAndSetAsRunning(connection.manager, queueName, false);
                 resolve(result);
             } else {
                 // Selecting the next job is wrapped in a transaction so that we can
@@ -62,7 +62,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
                 // This prevents multiple worker processes from taking the same job when
                 // running concurrent workers.
                 connection.transaction(async transactionManager => {
-                    const result = await this.getNextAndSetAsRunning(transactionManager, queueName);
+                    const result = await this.getNextAndSetAsRunning(transactionManager, queueName, true);
                     resolve(result);
                 });
             }
@@ -72,21 +72,25 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
     private async getNextAndSetAsRunning(
         manager: EntityManager,
         queueName: string,
+        setLock: boolean,
     ): Promise<Job | undefined> {
-        const record = await manager
+        const qb = manager
             .getRepository(JobRecord)
             .createQueryBuilder('record')
             .where('record.queueName = :queueName', { queueName })
             .andWhere(
-                new Brackets(qb => {
-                    qb.where('record.state = :pending', {
+                new Brackets(qb1 => {
+                    qb1.where('record.state = :pending', {
                         pending: JobState.PENDING,
                     }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING });
                 }),
             )
-            .orderBy('record.createdAt', 'ASC')
-            .setLock('pessimistic_write')
-            .getOne();
+            .orderBy('record.createdAt', 'ASC');
+
+        if (setLock) {
+            qb.setLock('pessimistic_write');
+        }
+        const record = await qb.getOne();
         if (record) {
             const job = this.fromRecord(record);
             job.start();