Răsfoiți Sursa

fix(core): Fix transaction errors in job queue for better-sqlite3

Michael Bromley 4 ani în urmă
părinte
comite
0043acebca

+ 4 - 2
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -48,9 +48,11 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
         }
         }
         const connection = this.connection;
         const connection = this.connection;
         const connectionType = this.connection.options.type;
         const connectionType = this.connection.options.type;
+        const isSQLite =
+            connectionType === 'sqlite' || connectionType === 'sqljs' || connectionType === 'better-sqlite3';
 
 
         return new Promise(async (resolve, reject) => {
         return new Promise(async (resolve, reject) => {
-            if (connectionType === 'sqlite' || connectionType === 'sqljs') {
+            if (isSQLite) {
                 // SQLite driver does not support concurrent transactions. See https://github.com/typeorm/typeorm/issues/1884
                 // SQLite driver does not support concurrent transactions. See https://github.com/typeorm/typeorm/issues/1884
                 const result = await this.getNextAndSetAsRunning(connection.manager, queueName);
                 const result = await this.getNextAndSetAsRunning(connection.manager, queueName);
                 resolve(result);
                 resolve(result);
@@ -59,7 +61,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
                 // set a lock on that row and immediately update the status to "RUNNING".
                 // set a lock on that row and immediately update the status to "RUNNING".
                 // This prevents multiple worker processes from taking the same job when
                 // This prevents multiple worker processes from taking the same job when
                 // running concurrent workers.
                 // running concurrent workers.
-                connection.transaction(async transactionManager => {
+                await connection.transaction(async transactionManager => {
                     const result = await this.getNextAndSetAsRunning(transactionManager, queueName);
                     const result = await this.getNextAndSetAsRunning(transactionManager, queueName);
                     resolve(result);
                     resolve(result);
                 });
                 });