Browse Source

refactor(core): Simplify cleaning of stale tasks

Michael Bromley 4 months ago
parent
commit
93bf38f83e

+ 3 - 3
packages/core/src/plugin/default-scheduler-plugin/default-scheduler-strategy.ts

@@ -32,13 +32,13 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
     private readonly tasks: Map<string, { task: ScheduledTask; isRegistered: boolean }> = new Map();
     private pluginOptions: DefaultSchedulerPluginOptions;
     private runningTasks: ScheduledTask[] = [];
-    private taskService: StaleTaskService;
+    private staleTaskService: StaleTaskService;
 
     init(injector: Injector) {
         this.connection = injector.get(TransactionalConnection);
         this.pluginOptions = injector.get(DEFAULT_SCHEDULER_PLUGIN_OPTIONS);
         this.injector = injector;
-        this.taskService = injector.get(StaleTaskService);
+        this.staleTaskService = injector.get(StaleTaskService);
 
         const runTriggerCheck =
             injector.get(ConfigService).schedulerOptions.runTasksInWorkerOnly === false ||
@@ -74,7 +74,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
     executeTask(task: ScheduledTask) {
         return async (job?: Cron) => {
             await this.ensureTaskIsRegistered(task);
-            await this.taskService.cleanStaleLocks();
+            await this.staleTaskService.cleanStaleLocksForTask(task);
 
             const taskEntity = await this.connection.rawConnection
                 .getRepository(ScheduledTaskRecord)

+ 50 - 75
packages/core/src/plugin/default-scheduler-plugin/stale-task.service.ts

@@ -5,41 +5,42 @@ import { Cron } from 'croner';
 import { Instrument } from '../../common/instrument-decorator';
 import { Logger } from '../../config';
 import { TransactionalConnection } from '../../connection/transactional-connection';
-import { SchedulerService } from '../../scheduler/scheduler.service';
+import { ScheduledTask } from '../../scheduler';
 
 import { loggerCtx } from './constants';
 import { ScheduledTaskRecord } from './scheduled-task-record.entity';
 
+/**
+ * @description
+ * This service finds and cleans stale locks, which can occur if a worker instance is
+ * non-gracefully shut down while a task is ongoing, and the lock is never released.
+ *
+ * Failure to clean these stale locks will prevent the task from ever running again, since
+ * workers will assume it is locked by another processes and will ignore it.
+ */
 @Injectable()
 @Instrument()
 export class StaleTaskService {
-    constructor(
-        private connection: TransactionalConnection,
-        private schedulerService: SchedulerService,
-    ) {}
+    // Cache the interval for each taskId
+    private taskIntervalMap = new Map<string, number>();
+
+    constructor(private connection: TransactionalConnection) {}
 
     /**
      * @description
      * Cleans stale task locks from the database.
      */
-    async cleanStaleLocks() {
+    async cleanStaleLocksForTask(task: ScheduledTask) {
         const now = new Date();
-        Logger.verbose('Cleaning stale task locks', loggerCtx);
-
         try {
-            const lockedTasks = await this.fetchLockedTasks();
-            const staleTasks = await this.extractStaleTasks(lockedTasks, now);
-
-            if (staleTasks.length > 0) {
-                await this.clearLocks(staleTasks);
-            } else {
-                Logger.debug('No stale task locks found', loggerCtx);
+            const lockedTask = await this.fetchLockedTask(task.id);
+            if (!lockedTask) {
+                return;
+            }
+            const intervalMs = this.getScheduleIntervalMs(task);
+            if (this.isStale(lockedTask, now, intervalMs)) {
+                await this.clearStaleLock(lockedTask);
             }
-
-            return {
-                success: true,
-                tasksCleared: staleTasks.length,
-            };
         } catch (error) {
             Logger.error(
                 `Error cleaning up stale task locks: ${error instanceof Error ? error.message : String(error)}`,
@@ -49,16 +50,25 @@ export class StaleTaskService {
         }
     }
 
-    private async fetchLockedTasks(): Promise<ScheduledTaskRecord[]> {
+    private async fetchLockedTask(taskId: string): Promise<ScheduledTaskRecord | null> {
         return this.connection.rawConnection
             .getRepository(ScheduledTaskRecord)
             .createQueryBuilder('task')
             .select(['task.taskId', 'task.lockedAt'])
             .where('task.lockedAt IS NOT NULL')
-            .getMany();
+            .andWhere('task.taskId = :taskId', { taskId })
+            .getOne();
     }
 
-    private getScheduleIntervalMs(schedule: string | ((cronTime: typeof CronTime) => string)): number {
+    /**
+     * Returns the interval in ms between one run of the task, and the next.
+     */
+    private getScheduleIntervalMs(task: ScheduledTask): number {
+        const cachedInterval = this.taskIntervalMap.get(task.id);
+        if (cachedInterval) {
+            return cachedInterval;
+        }
+        const schedule = task.options.schedule;
         const scheduleString = typeof schedule === 'function' ? schedule(CronTime) : schedule;
         const cron = new Cron(scheduleString);
         const nextFn: (d?: Date) => Date | null | undefined =
@@ -73,7 +83,9 @@ export class StaleTaskService {
         if (!next2) {
             throw new Error('Could not compute next run times');
         }
-        return next2.getTime() - next1.getTime();
+        const interval = next2.getTime() - next1.getTime();
+        this.taskIntervalMap.set(task.id, interval);
+        return interval;
     }
 
     private isStale(task: ScheduledTaskRecord, now: Date, intervalMs: number): boolean {
@@ -83,59 +95,22 @@ export class StaleTaskService {
         return now.getTime() - task.lockedAt.getTime() > intervalMs;
     }
 
-    private async extractStaleTasks(
-        lockedTasks: ScheduledTaskRecord[],
-        now: Date,
-    ): Promise<ScheduledTaskRecord[]> {
-        const staleTasks: ScheduledTaskRecord[] = [];
-        const schedulerTasks = await this.schedulerService.getTaskList();
-        const taskInfoById = new Map<string, { id: string; schedule: string }>(
-            schedulerTasks.map((t: { id: string; schedule: string }) => [t.id, t]),
-        );
-
-        for (const task of lockedTasks) {
-            if (!task.lockedAt) {
-                continue;
-            }
-
-            const taskInfo = taskInfoById.get(task.taskId);
-            if (!taskInfo) {
-                Logger.verbose(`Task ${task.taskId} not found in scheduler service`, loggerCtx);
-                continue;
-            }
-
-            try {
-                const intervalMs = this.getScheduleIntervalMs(taskInfo.schedule);
-                if (this.isStale(task, now, intervalMs)) {
-                    staleTasks.push(task);
-                }
-            } catch (e) {
-                Logger.warn(
-                    `Could not parse schedule for task ${task.taskId}: ${e instanceof Error ? e.message : String(e)}`,
-                    loggerCtx,
-                );
-                continue;
-            }
-        }
-
-        return staleTasks;
-    }
-
-    private async clearLocks(staleTasks: ScheduledTaskRecord[]): Promise<void> {
+    private async clearStaleLock(staleTask: ScheduledTaskRecord): Promise<void> {
         const repo = this.connection.rawConnection.getRepository(ScheduledTaskRecord);
-        for (const task of staleTasks) {
-            const result = await repo.update(
-                { taskId: task.taskId, lockedAt: task.lockedAt ?? undefined },
-                { lockedAt: null },
+        const result = await repo.update(
+            { taskId: staleTask.taskId, lockedAt: staleTask.lockedAt ?? undefined },
+            { lockedAt: null },
+        );
+        if (result.affected && result.affected > 0) {
+            Logger.verbose(
+                `Successfully cleaned stale task locks for task "${staleTask.taskId}", which was locked at ${staleTask.lockedAt?.toISOString() ?? 'unknown'}`,
+                loggerCtx,
+            );
+        } else {
+            Logger.debug(
+                `Skipped clearing lock for task "${staleTask.taskId}" because the lock has changed since observation`,
+                loggerCtx,
             );
-            if (result.affected && result.affected > 0) {
-                Logger.verbose(`Successfully cleaned stale task locks for task "${task.taskId}"`, loggerCtx);
-            } else {
-                Logger.debug(
-                    `Skipped clearing lock for task "${task.taskId}" because the lock has changed since observation`,
-                    loggerCtx,
-                );
-            }
         }
     }
 }