Parcourir la source

fix(core): Fix scheduled task concurrent execution on Postgres (#4069)

Michael Bromley il y a 4 semaines
Parent
commit
a80d24b408

+ 63 - 10
packages/core/src/plugin/default-scheduler-plugin/default-scheduler-strategy.ts

@@ -76,16 +76,8 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
             await this.ensureTaskIsRegistered(task);
             await this.staleTaskService.cleanStaleLocksForTask(task);
 
-            const taskEntity = await this.connection.rawConnection
-                .getRepository(ScheduledTaskRecord)
-                .createQueryBuilder('task')
-                .update()
-                .set({ lockedAt: new Date() })
-                .where('taskId = :taskId', { taskId: task.id })
-                .andWhere('lockedAt IS NULL')
-                .andWhere('enabled = TRUE')
-                .execute();
-            if (!taskEntity.affected) {
+            const lockAcquired = await this.tryAcquireLock(task);
+            if (!lockAcquired) {
                 return;
             }
 
@@ -236,6 +228,67 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
         }
     }
 
+    /**
+     * Attempts to acquire a lock for the given task.
+     *
+     * For databases that support pessimistic locking (PostgreSQL, MySQL, MariaDB),
+     * we use SELECT ... FOR UPDATE to ensure only one worker can acquire the lock.
+     * This is necessary because PostgreSQL's MVCC can allow multiple concurrent
+     * UPDATE statements to succeed when using a simple "UPDATE ... WHERE lockedAt IS NULL" pattern.
+     *
+     * For databases that don't support pessimistic locking (SQLite, SQL.js),
+     * we fall back to the atomic UPDATE approach which works correctly for single-connection scenarios.
+     */
+    private async tryAcquireLock(task: ScheduledTask): Promise<boolean> {
+        const dbType = this.connection.rawConnection.options.type;
+        const supportsPessimisticLocking = ['postgres', 'mysql', 'mariadb'].includes(dbType);
+
+        if (supportsPessimisticLocking) {
+            // Use a transaction with pessimistic locking to ensure only one worker
+            // can acquire the lock.
+            return this.connection.rawConnection.transaction(async manager => {
+                // First, try to select the task row with a FOR UPDATE lock.
+                // This will block other transactions trying to select the same row
+                // until this transaction commits or rolls back.
+                const taskRecord = await manager
+                    .getRepository(ScheduledTaskRecord)
+                    .createQueryBuilder('task')
+                    .setLock('pessimistic_write')
+                    .where('task.taskId = :taskId', { taskId: task.id })
+                    .andWhere('task.lockedAt IS NULL')
+                    .andWhere('task.enabled = TRUE')
+                    .getOne();
+
+                if (!taskRecord) {
+                    // Task is either already locked, disabled, or doesn't exist
+                    return false;
+                }
+
+                // Now update the lock within the same transaction
+                await manager
+                    .getRepository(ScheduledTaskRecord)
+                    .update({ id: taskRecord.id }, { lockedAt: new Date() });
+
+                return true;
+            });
+        } else {
+            // For databases without pessimistic locking support (SQLite, SQL.js),
+            // use the atomic UPDATE approach. This works for single-connection scenarios
+            // but may have race conditions with multiple connections.
+            const result = await this.connection.rawConnection
+                .getRepository(ScheduledTaskRecord)
+                .createQueryBuilder('task')
+                .update()
+                .set({ lockedAt: new Date() })
+                .where('taskId = :taskId', { taskId: task.id })
+                .andWhere('lockedAt IS NULL')
+                .andWhere('enabled = TRUE')
+                .execute();
+
+            return !!result.affected;
+        }
+    }
+
     private async ensureTaskIsRegistered(taskOrId: ScheduledTask | string) {
         const taskId = typeof taskOrId === 'string' ? taskOrId : taskOrId.id;
         const task = this.tasks.get(taskId);

+ 1 - 0
packages/dev-server/test-plugins/scheduler-race-test/index.ts

@@ -0,0 +1 @@
+export * from './scheduler-race-test-task';

+ 65 - 0
packages/dev-server/test-plugins/scheduler-race-test/scheduler-race-test-task.ts

@@ -0,0 +1,65 @@
+import { Logger, ScheduledTask } from '@vendure/core';
+import fs from 'fs';
+import path from 'path';
+
+const LOG_FILE = path.join(__dirname, '../../scheduler-race-log.json');
+const WORKER_ID = `worker-${process.pid}`;
+
+/**
+ * A scheduled task designed to test for race conditions in the DefaultSchedulerStrategy.
+ *
+ * This task:
+ * - Runs every 5 seconds
+ * - Logs execution details with worker PID to a JSON file
+ * - Simulates 2 seconds of work to increase the window for race conditions
+ *
+ * To test:
+ * 1. Start multiple workers: `DB=postgres npm run dev:worker` (in separate terminals)
+ * 2. Let them run for a minute or two
+ * 3. Check the log file for duplicate triggerKeys from different workers
+ *
+ * If the same triggerKey appears with different workerIds, that's a race condition!
+ */
+export const schedulerRaceTestTask = new ScheduledTask({
+    id: 'scheduler-race-test',
+    description: 'Tests for race conditions in scheduled task execution',
+    // Every 5 seconds (6-part cron: seconds minutes hours day month weekday)
+    schedule: '*/5 * * * * *',
+    async execute({ injector }) {
+        const startTime = new Date().toISOString();
+
+        // Create a trigger key by rounding to nearest 5-second boundary
+        // This groups executions that should have been the same trigger
+        const triggerTime = new Date();
+        triggerTime.setMilliseconds(0);
+        triggerTime.setSeconds(Math.floor(triggerTime.getSeconds() / 5) * 5);
+        const triggerKey = triggerTime.toISOString();
+
+        Logger.info(`[${WORKER_ID}] ⏰ Starting scheduled task at ${startTime} (trigger: ${triggerKey})`);
+
+        // Simulate some work (2 seconds) - increases the chance of detecting race conditions
+        await new Promise(resolve => setTimeout(resolve, 2000));
+
+        const endTime = new Date().toISOString();
+        const logEntry = {
+            workerId: WORKER_ID,
+            triggerKey,
+            startTime,
+            endTime,
+        };
+
+        // Append to log file
+        let logs: any[] = [];
+        try {
+            const existing = fs.readFileSync(LOG_FILE, 'utf-8');
+            logs = JSON.parse(existing);
+        } catch (e) {
+            // File doesn't exist yet, that's fine
+        }
+        logs.push(logEntry);
+        fs.writeFileSync(LOG_FILE, JSON.stringify(logs, null, 2));
+
+        Logger.info(`[${WORKER_ID}] ✅ Finished scheduled task at ${endTime}`);
+        return { workerId: WORKER_ID, triggerKey, startTime, endTime };
+    },
+});