|
|
@@ -1,4 +1,5 @@
|
|
|
import { Injectable } from '@nestjs/common';
|
|
|
+import CronTime from 'cron-time-generator';
|
|
|
import { Cron } from 'croner';
|
|
|
|
|
|
import { Instrument } from '../../common/instrument-decorator';
|
|
|
@@ -56,17 +57,15 @@ export class TaskService {
|
|
|
.getMany();
|
|
|
}
|
|
|
|
|
|
- private async taskConfigExists(taskId: string): Promise<boolean> {
|
|
|
- const taskConfig = await this.connection.rawConnection
|
|
|
- .getRepository(ScheduledTaskRecord)
|
|
|
- .findOne({ where: { taskId } });
|
|
|
- return !!taskConfig;
|
|
|
- }
|
|
|
-
|
|
|
- private getScheduleIntervalMs(schedule: string): number {
|
|
|
- const cron = new Cron(schedule);
|
|
|
- const next1 = cron.nextRun();
|
|
|
- const next2 = cron.nextRun();
|
|
|
+ private getScheduleIntervalMs(schedule: string | ((cronTime: typeof CronTime) => string)): number {
|
|
|
+ const scheduleString = typeof schedule === 'function' ? schedule(CronTime) : schedule;
|
|
|
+ const cron = new Cron(scheduleString);
|
|
|
+ const nextFn: () => Date | null | undefined =
|
|
|
+ typeof (cron as any).nextRun === 'function'
|
|
|
+ ? (cron as any).nextRun.bind(cron)
|
|
|
+ : (cron as any).next.bind(cron);
|
|
|
+ const next1 = nextFn();
|
|
|
+ const next2 = nextFn();
|
|
|
if (!next1 || !next2) {
|
|
|
throw new Error('Could not compute next run times');
|
|
|
}
|
|
|
@@ -86,14 +85,16 @@ export class TaskService {
|
|
|
): Promise<ScheduledTaskRecord[]> {
|
|
|
const staleTasks: ScheduledTaskRecord[] = [];
|
|
|
const schedulerTasks = await this.schedulerService.getTaskList();
|
|
|
+ const taskInfoById = new Map<string, { id: string; schedule: string }>(
|
|
|
+ schedulerTasks.map((t: { id: string; schedule: string }) => [t.id, t]),
|
|
|
+ );
|
|
|
|
|
|
for (const task of lockedTasks) {
|
|
|
- const exists = await this.taskConfigExists(task.taskId);
|
|
|
- if (!exists || !task.lockedAt) {
|
|
|
+ if (!task.lockedAt) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- const taskInfo = schedulerTasks.find((t: { id: string }) => t.id === task.taskId);
|
|
|
+ const taskInfo = taskInfoById.get(task.taskId);
|
|
|
if (!taskInfo) {
|
|
|
Logger.verbose(`Task ${task.taskId} not found in scheduler service`, loggerCtx);
|
|
|
continue;
|
|
|
@@ -117,12 +118,22 @@ export class TaskService {
|
|
|
}
|
|
|
|
|
|
private async clearLocks(staleTasks: ScheduledTaskRecord[]): Promise<void> {
|
|
|
- for (const task of staleTasks) {
|
|
|
- await this.connection.rawConnection
|
|
|
- .getRepository(ScheduledTaskRecord)
|
|
|
- .update({ taskId: task.taskId }, { lockedAt: null });
|
|
|
-
|
|
|
- Logger.verbose(`Successfully cleaned stale task locks for task "${task.taskId}"`);
|
|
|
- }
|
|
|
+ await this.connection.withTransaction(async ctx => {
|
|
|
+ const repo = this.connection.getRepository(ctx, ScheduledTaskRecord);
|
|
|
+ for (const task of staleTasks) {
|
|
|
+ const result = await repo.update(
|
|
|
+ { taskId: task.taskId, lockedAt: task.lockedAt ?? undefined },
|
|
|
+ { lockedAt: null },
|
|
|
+ );
|
|
|
+ if (result.affected && result.affected > 0) {
|
|
|
+ Logger.verbose(`Successfully cleaned stale task locks for task "${task.taskId}"`);
|
|
|
+ } else {
|
|
|
+ Logger.debug(
|
|
|
+ `Skipped clearing lock for task "${task.taskId}" because the lock has changed since observation`,
|
|
|
+ loggerCtx,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|