|
|
@@ -61,13 +61,16 @@ export class TaskService {
|
|
|
private getScheduleIntervalMs(schedule: string | ((cronTime: typeof CronTime) => string)): number {
|
|
|
const scheduleString = typeof schedule === 'function' ? schedule(CronTime) : schedule;
|
|
|
const cron = new Cron(scheduleString);
|
|
|
- const nextFn: () => Date | null | undefined =
|
|
|
+ const nextFn: (d?: Date) => Date | null | undefined =
|
|
|
typeof (cron as any).nextRun === 'function'
|
|
|
? (cron as any).nextRun.bind(cron)
|
|
|
: (cron as any).next.bind(cron);
|
|
|
const next1 = nextFn();
|
|
|
- const next2 = nextFn();
|
|
|
- if (!next1 || !next2) {
|
|
|
+ if (!next1) {
|
|
|
+ throw new Error('Could not compute next run times');
|
|
|
+ }
|
|
|
+ const next2 = nextFn(next1);
|
|
|
+ if (!next2) {
|
|
|
throw new Error('Could not compute next run times');
|
|
|
}
|
|
|
return next2.getTime() - next1.getTime();
|
|
|
@@ -119,25 +122,20 @@ export class TaskService {
|
|
|
}
|
|
|
|
|
|
private async clearLocks(staleTasks: ScheduledTaskRecord[]): Promise<void> {
|
|
|
- await this.connection.withTransaction(async ctx => {
|
|
|
- const repo = this.connection.getRepository(ctx, ScheduledTaskRecord);
|
|
|
- for (const task of staleTasks) {
|
|
|
- const result = await repo.update(
|
|
|
- { taskId: task.taskId, lockedAt: task.lockedAt ?? undefined },
|
|
|
- { lockedAt: null },
|
|
|
+ const repo = this.connection.rawConnection.getRepository(ScheduledTaskRecord);
|
|
|
+ for (const task of staleTasks) {
|
|
|
+ const result = await repo.update(
|
|
|
+ { taskId: task.taskId, lockedAt: task.lockedAt ?? undefined },
|
|
|
+ { lockedAt: null },
|
|
|
+ );
|
|
|
+ if (result.affected && result.affected > 0) {
|
|
|
+ Logger.verbose(`Successfully cleaned stale task locks for task "${task.taskId}"`, loggerCtx);
|
|
|
+ } else {
|
|
|
+ Logger.debug(
|
|
|
+ `Skipped clearing lock for task "${task.taskId}" because the lock has changed since observation`,
|
|
|
+ loggerCtx,
|
|
|
);
|
|
|
- if (result.affected && result.affected > 0) {
|
|
|
- Logger.verbose(
|
|
|
- `Successfully cleaned stale task locks for task "${task.taskId}"`,
|
|
|
- loggerCtx,
|
|
|
- );
|
|
|
- } else {
|
|
|
- Logger.debug(
|
|
|
- `Skipped clearing lock for task "${task.taskId}" because the lock has changed since observation`,
|
|
|
- loggerCtx,
|
|
|
- );
|
|
|
- }
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
}
|