Browse Source

feat(core): Implement basic task scheduling mechanism

Relates to #1425
Michael Bromley 9 months ago
parent
commit
6160edd644

+ 20 - 1
package-lock.json

@@ -4088,7 +4088,6 @@
     },
     "node_modules/@clack/prompts/node_modules/is-unicode-supported": {
       "version": "1.3.0",
-      "extraneous": true,
       "inBundle": true,
       "license": "MIT",
       "engines": {
@@ -22242,6 +22241,21 @@
         "node": ">=12.0.0"
       }
     },
+    "node_modules/cron-time-generator": {
+      "version": "2.0.3",
+      "resolved": "https://registry.npmjs.org/cron-time-generator/-/cron-time-generator-2.0.3.tgz",
+      "integrity": "sha512-02Ab5okFEMpcDernEwUXY16hLCryUxATAFGYyzyLymin0xl/udC50LkBFHX+qOeXnwXMRyK+uH4doXzUSpOoQA==",
+      "license": "MIT"
+    },
+    "node_modules/croner": {
+      "version": "9.0.0",
+      "resolved": "https://registry.npmjs.org/croner/-/croner-9.0.0.tgz",
+      "integrity": "sha512-onMB0OkDjkXunhdW9htFjEhqrD54+M94i6ackoUkjHKbRnXdyEyKRelp4nJ1kAz32+s27jP1FsebpJCVl0BsvA==",
+      "license": "MIT",
+      "engines": {
+        "node": ">=18.0"
+      }
+    },
     "node_modules/cross-env": {
       "version": "7.0.3",
       "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz",
@@ -45770,6 +45784,8 @@
         "bcrypt": "^5.1.1",
         "body-parser": "^1.20.2",
         "cookie-session": "^2.1.0",
+        "cron-time-generator": "^2.0.3",
+        "croner": "^9.0.0",
         "csv-parse": "^5.6.0",
         "express": "^5.0.1",
         "fs-extra": "^11.2.0",
@@ -46305,6 +46321,9 @@
         "eslint-plugin-react-refresh": "^0.4.18",
         "globals": "^15.14.0",
         "vite-plugin-dts": "^4.5.3"
+      },
+      "funding": {
+        "url": "https://github.com/sponsors/michaelbromley"
       }
     },
     "packages/dashboard/node_modules/@eslint/eslintrc": {

+ 2 - 0
packages/core/package.json

@@ -53,6 +53,8 @@
         "bcrypt": "^5.1.1",
         "body-parser": "^1.20.2",
         "cookie-session": "^2.1.0",
+        "cron-time-generator": "^2.0.3",
+        "croner": "^9.0.0",
         "csv-parse": "^5.6.0",
         "express": "^5.0.1",
         "fs-extra": "^11.2.0",

+ 2 - 0
packages/core/src/config/config.module.ts

@@ -87,6 +87,7 @@ export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdo
         } = this.configService.authOptions;
         const { taxZoneStrategy, taxLineCalculationStrategy } = this.configService.taxOptions;
         const { jobQueueStrategy, jobBufferStorageStrategy } = this.configService.jobQueueOptions;
+        const { schedulerStrategy } = this.configService.schedulerOptions;
         const {
             mergeStrategy,
             checkoutMergeStrategy,
@@ -156,6 +157,7 @@ export class ConfigModule implements OnApplicationBootstrap, OnApplicationShutdo
             ...refundProcess,
             cacheStrategy,
             ...orderInterceptors,
+            schedulerStrategy,
         ];
     }
 

+ 5 - 0
packages/core/src/config/config.service.ts

@@ -18,6 +18,7 @@ import {
     PaymentOptions,
     PromotionOptions,
     RuntimeVendureConfig,
+    SchedulerOptions,
     ShippingOptions,
     SystemOptions,
     TaxOptions,
@@ -116,6 +117,10 @@ export class ConfigService implements VendureConfig {
         return this.activeConfig.jobQueueOptions;
     }
 
+    get schedulerOptions(): Required<SchedulerOptions> {
+        return this.activeConfig.schedulerOptions;
+    }
+
     get systemOptions(): Required<SystemOptions> {
         return this.activeConfig.systemOptions;
     }

+ 6 - 2
packages/core/src/config/default-config.ts

@@ -1,15 +1,16 @@
 import { LanguageCode } from '@vendure/common/lib/generated-types';
 import {
     DEFAULT_AUTH_TOKEN_HEADER_KEY,
+    DEFAULT_CHANNEL_TOKEN_KEY,
     SUPER_ADMIN_USER_IDENTIFIER,
     SUPER_ADMIN_USER_PASSWORD,
-    DEFAULT_CHANNEL_TOKEN_KEY,
 } from '@vendure/common/lib/shared-constants';
 import { randomBytes } from 'crypto';
 
 import { TypeORMHealthCheckStrategy } from '../health-check/typeorm-health-check-strategy';
 import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy';
 import { InMemoryJobBufferStorageStrategy } from '../job-queue/job-buffer/in-memory-job-buffer-storage-strategy';
+import { NoopSchedulerStrategy } from '../scheduler/noop-scheduler-strategy';
 
 import { DefaultAssetImportStrategy } from './asset-import-strategy/default-asset-import-strategy';
 import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
@@ -24,7 +25,6 @@ import { DefaultProductVariantPriceCalculationStrategy } from './catalog/default
 import { DefaultProductVariantPriceSelectionStrategy } from './catalog/default-product-variant-price-selection-strategy';
 import { DefaultProductVariantPriceUpdateStrategy } from './catalog/default-product-variant-price-update-strategy';
 import { DefaultStockDisplayStrategy } from './catalog/default-stock-display-strategy';
-import { DefaultStockLocationStrategy } from './catalog/default-stock-location-strategy';
 import { MultiChannelStockLocationStrategy } from './catalog/multi-channel-stock-location-strategy';
 import { AutoIncrementIdStrategy } from './entity/auto-increment-id-strategy';
 import { DefaultMoneyStrategy } from './entity/default-money-strategy';
@@ -194,6 +194,10 @@ export const defaultConfig: RuntimeVendureConfig = {
         activeQueues: [],
         prefix: '',
     },
+    schedulerOptions: {
+        schedulerStrategy: new NoopSchedulerStrategy(),
+        tasks: [],
+    },
     customFields: {
         Address: [],
         Administrator: [],

+ 26 - 0
packages/core/src/config/vendure-config.ts

@@ -9,6 +9,8 @@ import { DataSourceOptions } from 'typeorm';
 import { Middleware } from '../common';
 import { PermissionDefinition } from '../common/permission-definition';
 import { JobBufferStorageStrategy } from '../job-queue/job-buffer/job-buffer-storage-strategy';
+import { ScheduledTask } from '../scheduler/scheduled-task';
+import { SchedulerStrategy } from '../scheduler/scheduler-strategy';
 
 import { AssetImportStrategy } from './asset-import-strategy/asset-import-strategy';
 import { AssetNamingStrategy } from './asset-naming-strategy/asset-naming-strategy';
@@ -962,6 +964,22 @@ export interface JobQueueOptions {
     prefix?: string;
 }
 
+export interface SchedulerOptions {
+    /**
+     * @description
+     * The strategy used to execute scheduled tasks.
+     *
+     * @default DefaultSchedulerStrategy
+     */
+    schedulerStrategy?: SchedulerStrategy;
+
+    /**
+     * @description
+     * The tasks to be executed.
+     */
+    tasks?: ScheduledTask[];
+}
+
 /**
  * @description
  * Options relating to the internal handling of entities.
@@ -1211,6 +1229,13 @@ export interface VendureConfig {
      * Configures how the job queue is persisted and processed.
      */
     jobQueueOptions?: JobQueueOptions;
+    /**
+     * @description
+     * Configures the scheduler mechanism and tasks.
+     *
+     * @since 3.3.0
+     */
+    schedulerOptions?: SchedulerOptions;
     /**
      * @description
      * Configures system options
@@ -1236,6 +1261,7 @@ export interface RuntimeVendureConfig extends Required<VendureConfig> {
     entityOptions: Required<Omit<EntityOptions, 'entityIdStrategy'>> & EntityOptions;
     importExportOptions: Required<ImportExportOptions>;
     jobQueueOptions: Required<JobQueueOptions>;
+    schedulerOptions: Required<SchedulerOptions>;
     orderOptions: Required<OrderOptions>;
     promotionOptions: Required<PromotionOptions>;
     shippingOptions: Required<ShippingOptions>;

+ 2 - 0
packages/core/src/plugin/default-scheduler-plugin/constants.ts

@@ -0,0 +1,2 @@
+export const DEFAULT_SCHEDULER_PLUGIN_OPTIONS = Symbol('DEFAULT_SCHEDULER_PLUGIN_OPTIONS');
+export const DEFAULT_TIMEOUT = 60000;

+ 106 - 0
packages/core/src/plugin/default-scheduler-plugin/default-scheduler-strategy.ts

@@ -0,0 +1,106 @@
+import { Cron } from 'croner';
+import ms from 'ms';
+
+import { Injector } from '../../common';
+import { Logger } from '../../config/logger/vendure-logger';
+import { TransactionalConnection } from '../../connection';
+import { ProcessContext } from '../../process-context';
+import { ScheduledTask } from '../../scheduler/scheduled-task';
+import { SchedulerStrategy } from '../../scheduler/scheduler-strategy';
+
+import { DEFAULT_SCHEDULER_PLUGIN_OPTIONS } from './constants';
+import { ScheduledTaskRecord } from './scheduled-task-record.entity';
+import { DefaultSchedulerPluginOptions } from './types';
+
+export class DefaultSchedulerStrategy implements SchedulerStrategy {
+    private connection: TransactionalConnection;
+    private injector: Injector;
+    private processContext: ProcessContext;
+    private tasks: Map<string, { task: ScheduledTask; isRegistered: boolean }> = new Map();
+    private pluginOptions: DefaultSchedulerPluginOptions;
+
+    init(injector: Injector) {
+        this.connection = injector.get(TransactionalConnection);
+        this.processContext = injector.get(ProcessContext);
+        this.pluginOptions = injector.get(DEFAULT_SCHEDULER_PLUGIN_OPTIONS);
+        this.injector = injector;
+    }
+
+    executeTask(task: ScheduledTask) {
+        return async (job: Cron) => {
+            if (this.processContext.isServer) {
+                return;
+            }
+            await this.ensureTaskIsRegistered(task);
+            const taskEntity = await this.connection.rawConnection
+                .getRepository(ScheduledTaskRecord)
+                .createQueryBuilder('task')
+                .update()
+                .set({ lockedAt: new Date() })
+                .where('taskId = :taskId', { taskId: task.id })
+                .andWhere('lockedAt IS NULL')
+                .execute();
+            if (!taskEntity.affected) {
+                return;
+            }
+            try {
+                const timeout = task.options.timeout ?? (this.pluginOptions.defaultTimeout as number);
+                const timeoutMs = typeof timeout === 'number' ? timeout : ms(timeout);
+
+                let timeoutTimer: NodeJS.Timeout | undefined;
+                const timeoutPromise = new Promise((_, reject) => {
+                    timeoutTimer = setTimeout(() => {
+                        Logger.warn(`Scheduled task ${task.id} timed out after ${timeoutMs}ms`);
+                        reject(new Error('Task timed out'));
+                    }, timeoutMs);
+                });
+
+                const result = await Promise.race([task.execute(this.injector), timeoutPromise]);
+
+                if (timeoutTimer) {
+                    clearTimeout(timeoutTimer);
+                }
+
+                await this.connection.rawConnection.getRepository(ScheduledTaskRecord).update(
+                    {
+                        taskId: task.id,
+                    },
+                    {
+                        lastExecutedAt: new Date(),
+                        lockedAt: null,
+                        lastResult: result,
+                    },
+                );
+            } catch (error) {
+                let errorMessage = 'Unknown error';
+                if (error instanceof Error) {
+                    errorMessage = error.message;
+                }
+                Logger.error(`Scheduled task ${task.id} failed with error: ${errorMessage}`);
+                await this.connection.rawConnection.getRepository(ScheduledTaskRecord).update(
+                    {
+                        taskId: task.id,
+                    },
+                    {
+                        lockedAt: null,
+                        lastResult: { error: errorMessage } as any,
+                    },
+                );
+            }
+        };
+    }
+
+    private async ensureTaskIsRegistered(task: ScheduledTask) {
+        if (!this.tasks.get(task.id)?.isRegistered) {
+            await this.connection.rawConnection
+                .getRepository(ScheduledTaskRecord)
+                .createQueryBuilder()
+                .insert()
+                .into(ScheduledTaskRecord)
+                .values({ taskId: task.id })
+                .orIgnore()
+                .execute();
+            this.tasks.set(task.id, { task, isRegistered: true });
+        }
+    }
+}

+ 32 - 0
packages/core/src/plugin/default-scheduler-plugin/default-scheduler.plugin.ts

@@ -0,0 +1,32 @@
+import { PluginCommonModule } from '../../plugin/plugin-common.module';
+import { VendurePlugin } from '../../plugin/vendure-plugin';
+
+import { DEFAULT_SCHEDULER_PLUGIN_OPTIONS, DEFAULT_TIMEOUT } from './constants';
+import { DefaultSchedulerStrategy } from './default-scheduler-strategy';
+import { ScheduledTaskRecord } from './scheduled-task-record.entity';
+import { DefaultSchedulerPluginOptions } from './types';
+
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    entities: [ScheduledTaskRecord],
+    configuration: config => {
+        config.schedulerOptions.schedulerStrategy = new DefaultSchedulerStrategy();
+        return config;
+    },
+    providers: [
+        {
+            provide: DEFAULT_SCHEDULER_PLUGIN_OPTIONS,
+            useValue: DefaultSchedulerPlugin.options,
+        },
+    ],
+})
+export class DefaultSchedulerPlugin {
+    static options: DefaultSchedulerPluginOptions = {
+        defaultTimeout: DEFAULT_TIMEOUT,
+    };
+
+    static init(config: DefaultSchedulerPluginOptions) {
+        this.options = { ...this.options, ...config };
+        return this;
+    }
+}

+ 26 - 0
packages/core/src/plugin/default-scheduler-plugin/scheduled-task-record.entity.ts

@@ -0,0 +1,26 @@
+import { Column, Entity, Unique } from 'typeorm';
+
+import { VendureEntity } from '../../entity/base/base.entity';
+
+@Entity()
+@Unique(['taskId'])
+export class ScheduledTaskRecord extends VendureEntity {
+    constructor(input: Partial<ScheduledTaskRecord>) {
+        super(input);
+    }
+
+    @Column()
+    taskId: string;
+
+    @Column({ default: true })
+    enabled: boolean;
+
+    @Column({ nullable: true, type: 'datetime', precision: 3 })
+    lockedAt: Date | null;
+
+    @Column({ nullable: true, type: 'datetime', precision: 3 })
+    lastExecutedAt: Date | null;
+
+    @Column({ type: 'json', nullable: true })
+    lastResult: Record<string, any> | string | number | null;
+}

+ 9 - 0
packages/core/src/plugin/default-scheduler-plugin/types.ts

@@ -0,0 +1,9 @@
+export interface DefaultSchedulerPluginOptions {
+    /**
+     * @description
+     * The default timeout for scheduled tasks.
+     *
+     * @default 60_000ms
+     */
+    defaultTimeout?: string | number;
+}

+ 1 - 0
packages/core/src/plugin/index.ts

@@ -4,6 +4,7 @@ export * from './default-job-queue-plugin/job-record-buffer.entity';
 export * from './default-job-queue-plugin/sql-job-buffer-storage-strategy';
 export * from './default-cache-plugin/default-cache-plugin';
 export * from './default-cache-plugin/sql-cache-strategy';
+export * from './default-scheduler-plugin/default-scheduler.plugin';
 export * from './redis-cache-plugin/redis-cache-plugin';
 export * from './redis-cache-plugin/redis-cache-strategy';
 export * from './redis-cache-plugin/types';

+ 11 - 0
packages/core/src/scheduler/noop-scheduler-strategy.ts

@@ -0,0 +1,11 @@
+import { Logger } from '../config/logger/vendure-logger';
+
+import { ScheduledTask } from './scheduled-task';
+import { SchedulerStrategy } from './scheduler-strategy';
+
+export class NoopSchedulerStrategy implements SchedulerStrategy {
+    executeTask(task: ScheduledTask) {
+        Logger.warn(`No task scheduler is configured! The task ${task.id} will not be executed.`);
+        return () => Promise.resolve();
+    }
+}

+ 76 - 0
packages/core/src/scheduler/scheduled-task.ts

@@ -0,0 +1,76 @@
+import CronTime from 'cron-time-generator';
+
+import { Injector } from '../common/index';
+/**
+ * @description
+ * The configuration for a scheduled task.
+ *
+ * @since 3.3.0
+ */
+export interface ScheduledTaskConfig {
+    /**
+     * @description
+     * The unique identifier for the scheduled task.
+     */
+    id: string;
+    /**
+     * @description
+     * The cron schedule for the scheduled task. This can be a standard cron expression or
+     * a function that returns a [cron-time-generator](https://www.npmjs.com/package/cron-time-generator)
+     * expression.
+     *
+     * @example
+     * ```ts
+     * // Standard cron expression
+     * { schedule: '0 0-23/5 * * *', } // every 5 hours
+     * { schedule: '0 22 * * *', } // every day at 10:00 PM
+     *
+     * // Cron-time-generator expression
+     * { schedule: cronTime => cronTime.every(2).minutes(), }
+     * { schedule: cronTime => cronTime.every(5).hours(), }
+     * ```
+     */
+    schedule: string | ((cronTime: typeof CronTime) => string);
+    /**
+     * @description
+     * The timeout for the scheduled task. If the task takes longer than the timeout, the task
+     * will be considered to have failed with a timeout error.
+     *
+     * @default 60_000ms
+     */
+    timeout?: number | string;
+    /**
+     * @description
+     * Whether the scheduled task should be prevented from running if it is already running.
+     *
+     * @default true
+     */
+    preventOverlap?: boolean;
+    /**
+     * @description
+     * The function that will be executed when the scheduled task is run.
+     */
+    execute(injector: Injector): Promise<any>;
+}
+
+/**
+ * @description
+ * A scheduled task that will be executed at a given cron schedule.
+ *
+ * @since 3.3.0
+ */
+export class ScheduledTask {
+    constructor(private readonly config: ScheduledTaskConfig) {}
+
+    get id() {
+        return this.config.id;
+    }
+
+    get options() {
+        return this.config;
+    }
+
+    async execute(injector: Injector) {
+        return this.config.execute(injector);
+    }
+}

+ 9 - 0
packages/core/src/scheduler/scheduler-strategy.ts

@@ -0,0 +1,9 @@
+import { Cron } from 'croner';
+
+import { InjectableStrategy } from '../common';
+
+import { ScheduledTask } from './scheduled-task';
+
+export interface SchedulerStrategy extends InjectableStrategy {
+    executeTask(task: ScheduledTask): (job: Cron) => Promise<any> | any;
+}

+ 11 - 0
packages/core/src/scheduler/scheduler.module.ts

@@ -0,0 +1,11 @@
+import { Module } from '@nestjs/common';
+
+import { ConfigModule } from '../config/index';
+
+import { SchedulerService } from './scheduler.service';
+
+@Module({
+    imports: [ConfigModule],
+    providers: [SchedulerService],
+})
+export class SchedulerModule {}

+ 59 - 0
packages/core/src/scheduler/scheduler.service.ts

@@ -0,0 +1,59 @@
+import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
+import CronTime from 'cron-time-generator';
+import { Cron } from 'croner';
+
+import { ConfigService } from '../config/config.service';
+import { Logger } from '../config/logger/vendure-logger';
+
+import { NoopSchedulerStrategy } from './noop-scheduler-strategy';
+import { ScheduledTask } from './scheduled-task';
+
+@Injectable()
+export class SchedulerService implements OnApplicationBootstrap {
+    private jobs: Cron[] = [];
+    constructor(private configService: ConfigService) {}
+
+    onApplicationBootstrap() {
+        const schedulerStrategy = this.configService.schedulerOptions.schedulerStrategy;
+        if (!schedulerStrategy || schedulerStrategy instanceof NoopSchedulerStrategy) {
+            Logger.warn('No scheduler strategy is configured! Scheduled tasks will not be executed.');
+            Logger.warn(
+                'Please use the `DefaultSchedulerPlugin` (or alternative) to enable scheduled tasks.',
+            );
+            return;
+        }
+        const scheduledTasks = this.configService.schedulerOptions.tasks ?? [];
+
+        for (const task of scheduledTasks) {
+            const job = this.createCronJob(task);
+            this.jobs.push(job);
+        }
+    }
+
+    private createCronJob(task: ScheduledTask) {
+        const schedulerStrategy = this.configService.schedulerOptions.schedulerStrategy;
+        const protectCallback = (_job: Cron) => {
+            const currentRun = _job.currentRun();
+            if (currentRun) {
+                Logger.warn(
+                    `Task invocation of ${task.id} at ${new Date().toISOString()} was blocked because an existing task is still running at ${currentRun.toISOString()}`,
+                );
+            }
+        };
+
+        const schedule =
+            typeof task.options.schedule === 'function'
+                ? task.options.schedule(CronTime)
+                : task.options.schedule;
+
+        const job = new Cron(
+            schedule,
+            {
+                name: task.id,
+                protect: task.options.preventOverlap ? protectCallback : undefined,
+            },
+            () => schedulerStrategy.executeTask(task)(job),
+        );
+        return job;
+    }
+}

+ 2 - 1
packages/core/src/service/service.module.ts

@@ -5,6 +5,7 @@ import { ConfigModule } from '../config/config.module';
 import { ConnectionModule } from '../connection/connection.module';
 import { EventBusModule } from '../event-bus/event-bus.module';
 import { JobQueueModule } from '../job-queue/job-queue.module';
+import { SchedulerModule } from '../scheduler/scheduler.module';
 
 import { ActiveOrderService } from './helpers/active-order/active-order.service';
 import { ConfigArgService } from './helpers/config-arg/config-arg.service';
@@ -140,7 +141,7 @@ const helpers = [
  * only run a single time.
  */
 @Module({
-    imports: [ConnectionModule, ConfigModule, EventBusModule, CacheModule, JobQueueModule],
+    imports: [ConnectionModule, ConfigModule, EventBusModule, CacheModule, JobQueueModule, SchedulerModule],
     providers: [...services, ...helpers, InitializerService],
     exports: [...services, ...helpers],
 })