Просмотр исходного кода

fix(core): Improve fault-tolerance of default scheduler

Michael Bromley 7 месяцев назад
Родитель
Сommit
852cdcec66

+ 5 - 0
packages/core/src/plugin/default-scheduler-plugin/default-scheduler-strategy.ts

@@ -181,6 +181,11 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
     }
 
     private async checkForManuallyTriggeredTasks() {
+        // Since this is run on an interval, there is an edge case where, during shutdown,
+        // the connection may not be initialized anymore.
+        if (!this.connection.rawConnection.isInitialized) {
+            return;
+        }
         const taskEntities = await this.connection.rawConnection
             .getRepository(ScheduledTaskRecord)
             .createQueryBuilder('task')

+ 20 - 2
packages/core/src/scheduler/scheduler.service.ts

@@ -35,6 +35,7 @@ export interface TaskInfo {
 export class SchedulerService implements OnApplicationBootstrap, OnApplicationShutdown {
     private readonly jobs: Map<string, { task: ScheduledTask; job: Cron }> = new Map();
     private shouldRunTasks = false;
+    private runningTasks = 0;
     constructor(
         private readonly configService: ConfigService,
         private readonly processContext: ProcessContext,
@@ -70,10 +71,19 @@ export class SchedulerService implements OnApplicationBootstrap, OnApplicationSh
         }
     }
 
-    onApplicationShutdown(signal?: string) {
+    async onApplicationShutdown(signal?: string) {
         for (const job of this.jobs.values()) {
             job.job.stop();
         }
+        const startTime = Date.now();
+        // If any tasks are still running, wait a short time for them to finish
+        const maxWaitTime = 10_000;
+        while (this.runningTasks > 0 && Date.now() - startTime < maxWaitTime) {
+            Logger.warn(
+                `Waiting for ${this.runningTasks} running tasks to finish before shutting down (signal: ${signal ?? 'unknown'})`,
+            );
+            await new Promise(resolve => setTimeout(resolve, 100));
+        }
     }
 
     /**
@@ -166,7 +176,15 @@ export class SchedulerService implements OnApplicationBootstrap, OnApplicationSh
                     // Only execute the cron task on the worker process
                     // so that any expensive logic does not affect
                     // the responsiveness of server processes
-                    schedulerStrategy.executeTask(task)(job);
+                    this.runningTasks++;
+                    try {
+                        schedulerStrategy.executeTask(task)(job);
+                    } catch (e: any) {
+                        const message = e instanceof Error ? e.message : String(e);
+                        Logger.error(`Error executing scheduled task ${task.id}: ${message}`);
+                    } finally {
+                        this.runningTasks--;
+                    }
                 }
             },
         );