|
@@ -1,7 +1,9 @@
|
|
|
|
|
+import { UpdateScheduledTaskInput } from '@vendure/common/lib/generated-types';
|
|
|
import { Cron } from 'croner';
|
|
import { Cron } from 'croner';
|
|
|
import ms from 'ms';
|
|
import ms from 'ms';
|
|
|
|
|
|
|
|
import { Injector } from '../../common';
|
|
import { Injector } from '../../common';
|
|
|
|
|
+import { assertFound } from '../../common/utils';
|
|
|
import { Logger } from '../../config/logger/vendure-logger';
|
|
import { Logger } from '../../config/logger/vendure-logger';
|
|
|
import { TransactionalConnection } from '../../connection';
|
|
import { TransactionalConnection } from '../../connection';
|
|
|
import { ProcessContext } from '../../process-context';
|
|
import { ProcessContext } from '../../process-context';
|
|
@@ -11,7 +13,6 @@ import { SchedulerStrategy, TaskReport } from '../../scheduler/scheduler-strateg
|
|
|
import { DEFAULT_SCHEDULER_PLUGIN_OPTIONS } from './constants';
|
|
import { DEFAULT_SCHEDULER_PLUGIN_OPTIONS } from './constants';
|
|
|
import { ScheduledTaskRecord } from './scheduled-task-record.entity';
|
|
import { ScheduledTaskRecord } from './scheduled-task-record.entity';
|
|
|
import { DefaultSchedulerPluginOptions } from './types';
|
|
import { DefaultSchedulerPluginOptions } from './types';
|
|
|
-
|
|
|
|
|
/**
|
|
/**
|
|
|
* @description
|
|
* @description
|
|
|
* The default {@link SchedulerStrategy} implementation that uses the database to
|
|
* The default {@link SchedulerStrategy} implementation that uses the database to
|
|
@@ -27,6 +28,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
private processContext: ProcessContext;
|
|
private processContext: ProcessContext;
|
|
|
private tasks: Map<string, { task: ScheduledTask; isRegistered: boolean }> = new Map();
|
|
private tasks: Map<string, { task: ScheduledTask; isRegistered: boolean }> = new Map();
|
|
|
private pluginOptions: DefaultSchedulerPluginOptions;
|
|
private pluginOptions: DefaultSchedulerPluginOptions;
|
|
|
|
|
+ private runningTasks: ScheduledTask[] = [];
|
|
|
|
|
|
|
|
init(injector: Injector) {
|
|
init(injector: Injector) {
|
|
|
this.connection = injector.get(TransactionalConnection);
|
|
this.connection = injector.get(TransactionalConnection);
|
|
@@ -35,6 +37,15 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
this.injector = injector;
|
|
this.injector = injector;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ async destroy() {
|
|
|
|
|
+ for (const task of this.runningTasks) {
|
|
|
|
|
+ await this.connection.rawConnection
|
|
|
|
|
+ .getRepository(ScheduledTaskRecord)
|
|
|
|
|
+ .update({ taskId: task.id }, { lockedAt: null });
|
|
|
|
|
+ Logger.info(`Released lock for task "${task.id}"`);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
executeTask(task: ScheduledTask) {
|
|
executeTask(task: ScheduledTask) {
|
|
|
return async (job: Cron) => {
|
|
return async (job: Cron) => {
|
|
|
if (this.processContext.isServer) {
|
|
if (this.processContext.isServer) {
|
|
@@ -48,6 +59,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
.set({ lockedAt: new Date() })
|
|
.set({ lockedAt: new Date() })
|
|
|
.where('taskId = :taskId', { taskId: task.id })
|
|
.where('taskId = :taskId', { taskId: task.id })
|
|
|
.andWhere('lockedAt IS NULL')
|
|
.andWhere('lockedAt IS NULL')
|
|
|
|
|
+ .andWhere('enabled = TRUE')
|
|
|
.execute();
|
|
.execute();
|
|
|
if (!taskEntity.affected) {
|
|
if (!taskEntity.affected) {
|
|
|
return;
|
|
return;
|
|
@@ -55,6 +67,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
|
|
|
|
|
Logger.verbose(`Executing scheduled task "${task.id}"`);
|
|
Logger.verbose(`Executing scheduled task "${task.id}"`);
|
|
|
try {
|
|
try {
|
|
|
|
|
+ this.runningTasks.push(task);
|
|
|
const timeout = task.options.timeout ?? (this.pluginOptions.defaultTimeout as number);
|
|
const timeout = task.options.timeout ?? (this.pluginOptions.defaultTimeout as number);
|
|
|
const timeoutMs = typeof timeout === 'number' ? timeout : ms(timeout);
|
|
const timeoutMs = typeof timeout === 'number' ? timeout : ms(timeout);
|
|
|
|
|
|
|
@@ -83,6 +96,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
},
|
|
},
|
|
|
);
|
|
);
|
|
|
Logger.verbose(`Scheduled task "${task.id}" completed successfully`);
|
|
Logger.verbose(`Scheduled task "${task.id}" completed successfully`);
|
|
|
|
|
+ this.runningTasks = this.runningTasks.filter(t => t !== task);
|
|
|
} catch (error) {
|
|
} catch (error) {
|
|
|
let errorMessage = 'Unknown error';
|
|
let errorMessage = 'Unknown error';
|
|
|
if (error instanceof Error) {
|
|
if (error instanceof Error) {
|
|
@@ -98,6 +112,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
lastResult: { error: errorMessage } as any,
|
|
lastResult: { error: errorMessage } as any,
|
|
|
},
|
|
},
|
|
|
);
|
|
);
|
|
|
|
|
+ this.runningTasks = this.runningTasks.filter(t => t !== task);
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
@@ -121,6 +136,17 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
.then(task => (task ? this.entityToReport(task) : undefined));
|
|
.then(task => (task ? this.entityToReport(task) : undefined));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ async updateTask(input: UpdateScheduledTaskInput): Promise<TaskReport> {
|
|
|
|
|
+ await this.connection.rawConnection
|
|
|
|
|
+ .getRepository(ScheduledTaskRecord)
|
|
|
|
|
+ .createQueryBuilder('task')
|
|
|
|
|
+ .update()
|
|
|
|
|
+ .set({ enabled: input.enabled })
|
|
|
|
|
+ .where('taskId = :id', { id: input.id })
|
|
|
|
|
+ .execute();
|
|
|
|
|
+ return assertFound(this.getTask(input.id));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
private entityToReport(task: ScheduledTaskRecord): TaskReport {
|
|
private entityToReport(task: ScheduledTaskRecord): TaskReport {
|
|
|
return {
|
|
return {
|
|
|
id: task.taskId,
|
|
id: task.taskId,
|
|
@@ -141,6 +167,7 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
|
|
|
.values({ taskId: task.id })
|
|
.values({ taskId: task.id })
|
|
|
.orIgnore()
|
|
.orIgnore()
|
|
|
.execute();
|
|
.execute();
|
|
|
|
|
+
|
|
|
this.tasks.set(task.id, { task, isRegistered: true });
|
|
this.tasks.set(task.id, { task, isRegistered: true });
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|