Quellcode durchsuchen

feat(core): Create session cleanup task, add docs to scheduler types

Relates to #1425
Michael Bromley vor 9 Monaten
Ursprung
Commit
f649bd713b

+ 10 - 0
package-lock.json

@@ -22256,6 +22256,15 @@
         "node": ">=18.0"
       }
     },
+    "node_modules/cronstrue": {
+      "version": "2.57.0",
+      "resolved": "https://registry.npmjs.org/cronstrue/-/cronstrue-2.57.0.tgz",
+      "integrity": "sha512-gQOfxJa1RA9uDT4hx37NshhX4dW9t9zTCtIYu15LUziH+mkpuLXYcSEyM8ZewMJ2p8UVuHGjI3n4hGpu3HtCbg==",
+      "license": "MIT",
+      "bin": {
+        "cronstrue": "bin/cli.js"
+      }
+    },
     "node_modules/cross-env": {
       "version": "7.0.3",
       "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz",
@@ -45786,6 +45795,7 @@
         "cookie-session": "^2.1.0",
         "cron-time-generator": "^2.0.3",
         "croner": "^9.0.0",
+        "cronstrue": "^2.57.0",
         "csv-parse": "^5.6.0",
         "express": "^5.0.1",
         "fs-extra": "^11.2.0",

+ 1 - 0
packages/core/src/api/schema/admin-api/scheduled-task.api.graphql

@@ -4,6 +4,7 @@ type Query {
 
 type ScheduledTask {
     id: String!
+    description: String!
     schedule: String!
     scheduleDescription: String!
     lastExecutedAt: DateTime

+ 10 - 3
packages/core/src/config/vendure-config.ts

@@ -964,12 +964,19 @@ export interface JobQueueOptions {
     prefix?: string;
 }
 
+/**
+ * @description
+ * Options related to scheduled tasks..
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
 export interface SchedulerOptions {
     /**
      * @description
-     * The strategy used to execute scheduled tasks.
-     *
-     * @default DefaultSchedulerStrategy
+     * The strategy used to execute scheduled tasks. If you are using the
+     * {@link DefaultSchedulerPlugin} (which is recommended) then this will be set to the
+     * {@link DefaultSchedulerStrategy}.
      */
     schedulerStrategy?: SchedulerStrategy;
 

+ 1 - 0
packages/core/src/index.ts

@@ -16,6 +16,7 @@ export * from './data-import/index';
 export * from './service/index';
 export * from './i18n/index';
 export * from './worker/index';
+export * from './scheduler/index';
 export * from '@vendure/common/lib/shared-types';
 export {
     Permission,

+ 13 - 1
packages/core/src/plugin/default-scheduler-plugin/default-scheduler-strategy.ts

@@ -12,6 +12,15 @@ import { DEFAULT_SCHEDULER_PLUGIN_OPTIONS } from './constants';
 import { ScheduledTaskRecord } from './scheduled-task-record.entity';
 import { DefaultSchedulerPluginOptions } from './types';
 
+/**
+ * @description
+ * The default {@link SchedulerStrategy} implementation that uses the database to
+ * execute scheduled tasks. This strategy is configured when you use the
+ * {@link DefaultSchedulerPlugin}.
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
 export class DefaultSchedulerStrategy implements SchedulerStrategy {
     private connection: TransactionalConnection;
     private injector: Injector;
@@ -43,6 +52,8 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
             if (!taskEntity.affected) {
                 return;
             }
+
+            Logger.verbose(`Executing scheduled task "${task.id}"`);
             try {
                 const timeout = task.options.timeout ?? (this.pluginOptions.defaultTimeout as number);
                 const timeoutMs = typeof timeout === 'number' ? timeout : ms(timeout);
@@ -71,12 +82,13 @@ export class DefaultSchedulerStrategy implements SchedulerStrategy {
                         lastResult: result ?? '',
                     },
                 );
+                Logger.verbose(`Scheduled task "${task.id}" completed successfully`);
             } catch (error) {
                 let errorMessage = 'Unknown error';
                 if (error instanceof Error) {
                     errorMessage = error.message;
                 }
-                Logger.error(`Scheduled task ${task.id} failed with error: ${errorMessage}`);
+                Logger.error(`Scheduled task "${task.id}" failed with error: ${errorMessage}`);
                 await this.connection.rawConnection.getRepository(ScheduledTaskRecord).update(
                     {
                         taskId: task.id,

+ 24 - 0
packages/core/src/plugin/default-scheduler-plugin/default-scheduler.plugin.ts

@@ -6,6 +6,30 @@ import { DefaultSchedulerStrategy } from './default-scheduler-strategy';
 import { ScheduledTaskRecord } from './scheduled-task-record.entity';
 import { DefaultSchedulerPluginOptions } from './types';
 
+/**
+ * @description
+ * This plugin configures a default scheduling strategy that executes scheduled
+ * tasks using the database to ensure that each task is executed exactly once
+ * at the scheduled time, even if there are multiple instances of the worker
+ * running.
+ *
+ * @example
+ * ```ts
+ * import { DefaultSchedulerPlugin, VendureConfig } from '@vendure/core';
+ *
+ * export const config: VendureConfig = {
+ *   plugins: [
+ *     DefaultSchedulerPlugin.init({
+ *       // The default is 60s, but you can override it here
+ *       defaultTimeout: '10s',
+ *     }),
+ *   ],
+ * };
+ * ```
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
 @VendurePlugin({
     imports: [PluginCommonModule],
     entities: [ScheduledTaskRecord],

+ 7 - 0
packages/core/src/plugin/default-scheduler-plugin/types.ts

@@ -1,3 +1,10 @@
+/**
+ * @description
+ * The options for the {@link DefaultSchedulerPlugin}.
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
 export interface DefaultSchedulerPluginOptions {
     /**
      * @description

+ 4 - 0
packages/core/src/scheduler/index.ts

@@ -0,0 +1,4 @@
+export * from './tasks/clean-sessions-task';
+export * from './scheduled-task';
+export * from './scheduler.service';
+export * from './scheduler-strategy';

+ 50 - 5
packages/core/src/scheduler/scheduled-task.ts

@@ -7,12 +7,22 @@ import { Injector } from '../common/index';
  *
  * @since 3.3.0
  */
-export interface ScheduledTaskConfig {
+export interface ScheduledTaskConfig<C extends Record<string, any> = Record<string, any>> {
     /**
      * @description
      * The unique identifier for the scheduled task.
      */
     id: string;
+    /**
+     * @description
+     * The description for the scheduled task.
+     */
+    description?: string;
+    /**
+     * @description
+     * Optional parameters that will be passed to the `execute` function.
+     */
+    params?: C;
     /**
      * @description
      * The cron schedule for the scheduled task. This can be a standard cron expression or
@@ -50,7 +60,7 @@ export interface ScheduledTaskConfig {
      * @description
      * The function that will be executed when the scheduled task is run.
      */
-    execute(injector: Injector): Promise<any>;
+    execute(injector: Injector, config: C): Promise<any>;
 }
 
 /**
@@ -58,9 +68,10 @@ export interface ScheduledTaskConfig {
  * A scheduled task that will be executed at a given cron schedule.
  *
  * @since 3.3.0
+ * @docsCategory scheduled-tasks
  */
-export class ScheduledTask {
-    constructor(private readonly config: ScheduledTaskConfig) {}
+export class ScheduledTask<C extends Record<string, any> = Record<string, any>> {
+    constructor(private readonly config: ScheduledTaskConfig<C>) {}
 
     get id() {
         return this.config.id;
@@ -71,6 +82,40 @@ export class ScheduledTask {
     }
 
     async execute(injector: Injector) {
-        return this.config.execute(injector);
+        return this.config.execute(injector, this.config.params ?? ({} as any));
+    }
+
+    /**
+     * @description
+     * This method allows you to further configure existing scheduled tasks. For example, you may
+     * wish to change the schedule or timeout of a task, without having to define a new task.
+     *
+     * @example
+     * ```ts
+     * import { ScheduledTask } from '\@vendure/core';
+     *
+     * const task = new ScheduledTask({
+     *     id: 'test-job',
+     *     schedule: cron => cron.every(2).minutes(),
+     *     execute: async (injector, params) => {
+     *         // some logic here
+     *     },
+     * });
+     *
+     * // later, you can configure the task
+     * task.configure({ schedule: cron => cron.every(5).minutes() });
+     * ```
+     */
+    configure(additionalConfig: Partial<Pick<ScheduledTaskConfig<C>, 'schedule' | 'timeout' | 'params'>>) {
+        if (additionalConfig.schedule) {
+            this.config.schedule = additionalConfig.schedule;
+        }
+        if (additionalConfig.timeout) {
+            this.config.timeout = additionalConfig.timeout;
+        }
+        if (additionalConfig.params) {
+            this.config.params = additionalConfig.params;
+        }
+        return this;
     }
 }

+ 16 - 0
packages/core/src/scheduler/scheduler-strategy.ts

@@ -12,6 +12,22 @@ export interface TaskReport {
     enabled: boolean;
 }
 
+/**
+ * @description
+ * This strategy is used to define the mechanism by which scheduled tasks are executed
+ * and how they are reported on. The main purpose of this strategy is to ensure
+ * that a given task is executed exactly once at the scheduled time, even if there
+ * are multiple instances of the worker running.
+ *
+ * To do this, the strategy must use some form of shared storage and a method of
+ * locking so that only a single worker is allowed to execute the task.
+ *
+ * By default, the {@link DefaultSchedulerStrategy} will use the database to enable
+ * this functionality.
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
 export interface SchedulerStrategy extends InjectableStrategy {
     executeTask(task: ScheduledTask): (job: Cron) => Promise<any> | any;
     getTasks(): Promise<TaskReport[]>;

+ 23 - 9
packages/core/src/scheduler/scheduler.service.ts

@@ -11,6 +11,7 @@ import { ScheduledTask } from './scheduled-task';
 
 export interface TaskInfo {
     id: string;
+    description: string;
     schedule: string;
     scheduleDescription: string;
     lastExecutedAt: Date | null;
@@ -19,6 +20,13 @@ export interface TaskInfo {
     lastResult: any;
 }
 
+/**
+ * @description
+ * The service that is responsible for setting up and querying the scheduled tasks.
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
 @Injectable()
 export class SchedulerService implements OnApplicationBootstrap {
     private jobs: Map<string, { task: ScheduledTask; job: Cron }> = new Map();
@@ -41,23 +49,29 @@ export class SchedulerService implements OnApplicationBootstrap {
         }
     }
 
+    /**
+     * @description
+     * Returns a list of all the scheduled tasks and their current status.
+     */
     getTaskList(): Promise<TaskInfo[]> {
-        return this.configService.schedulerOptions.schedulerStrategy.getTasks().then(tasks =>
-            tasks
-                .map(task => {
-                    const job = this.jobs.get(task.id)?.job;
-                    if (!job) {
+        return this.configService.schedulerOptions.schedulerStrategy.getTasks().then(taskReports =>
+            taskReports
+                .map(taskReport => {
+                    const job = this.jobs.get(taskReport.id)?.job;
+                    const task = this.jobs.get(taskReport.id)?.task;
+                    if (!job || !task) {
                         return;
                     }
                     const pattern = job.getPattern();
                     return {
-                        id: task.id,
+                        id: taskReport.id,
+                        description: task.options.description ?? '',
                         schedule: pattern ?? 'unknown',
                         scheduleDescription: pattern ? cronstrue.toString(pattern) : 'unknown',
-                        lastExecutedAt: task.lastExecutedAt,
+                        lastExecutedAt: taskReport.lastExecutedAt,
                         nextExecutionAt: job.nextRun(),
-                        isRunning: task.isRunning,
-                        lastResult: task.lastResult,
+                        isRunning: taskReport.isRunning,
+                        lastResult: taskReport.lastResult,
                     };
                 })
                 .filter(x => x !== undefined),

+ 23 - 0
packages/core/src/scheduler/tasks/clean-sessions-task.ts

@@ -0,0 +1,23 @@
+import { SessionService } from '../../service/services/session.service';
+import { ScheduledTask } from '../scheduled-task';
+
+/**
+ * @description
+ * A scheduled task that cleans expired & inactive sessions from the database.
+ *
+ * @since 3.3.0
+ * @docsCategory scheduled-tasks
+ */
+export const cleanSessionsTask = new ScheduledTask({
+    id: 'clean-sessions',
+    description: 'Clean expired & inactive sessions from the database',
+    params: {
+        batchSize: 10_000,
+    },
+    schedule: cron => cron.everyDayAt(0, 0),
+    async execute(injector, params) {
+        const sessionService = injector.get(SessionService);
+        await sessionService.triggerCleanSessionsJob(params.batchSize);
+        return { result: 'Triggered clean sessions job' };
+    },
+});

+ 62 - 4
packages/core/src/service/services/session.service.ts

@@ -1,10 +1,11 @@
-import { Injectable } from '@nestjs/common';
+import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
 import { ID } from '@vendure/common/lib/shared-types';
 import crypto from 'crypto';
 import ms from 'ms';
-import { EntitySubscriberInterface, InsertEvent, RemoveEvent, UpdateEvent } from 'typeorm';
+import { Brackets, EntitySubscriberInterface, InsertEvent, RemoveEvent, UpdateEvent } from 'typeorm';
 
 import { RequestContext } from '../../api/common/request-context';
+import { Logger } from '../../config';
 import { ConfigService } from '../../config/config.service';
 import { CachedSession, SessionCacheStrategy } from '../../config/session-cache/session-cache-strategy';
 import { TransactionalConnection } from '../../connection/transactional-connection';
@@ -15,10 +16,12 @@ import { AnonymousSession } from '../../entity/session/anonymous-session.entity'
 import { AuthenticatedSession } from '../../entity/session/authenticated-session.entity';
 import { Session } from '../../entity/session/session.entity';
 import { User } from '../../entity/user/user.entity';
+import { JobQueue } from '../../job-queue/job-queue';
+import { JobQueueService } from '../../job-queue/job-queue.service';
+import { RequestContextService } from '../helpers/request-context/request-context.service';
 import { getUserChannelsPermissions } from '../helpers/utils/get-user-channels-permissions';
 
 import { OrderService } from './order.service';
-
 /**
  * @description
  * Contains methods relating to {@link Session} entities.
@@ -26,8 +29,9 @@ import { OrderService } from './order.service';
  * @docsCategory services
  */
 @Injectable()
-export class SessionService implements EntitySubscriberInterface {
+export class SessionService implements EntitySubscriberInterface, OnApplicationBootstrap {
     private sessionCacheStrategy: SessionCacheStrategy;
+    private cleanSessionsJobQueue: JobQueue<{ batchSize: number }>;
     private readonly sessionDurationInMs: number;
     private readonly sessionCacheTimeoutMs = 50;
 
@@ -35,6 +39,8 @@ export class SessionService implements EntitySubscriberInterface {
         private connection: TransactionalConnection,
         private configService: ConfigService,
         private orderService: OrderService,
+        private jobQueueService: JobQueueService,
+        private requestContextService: RequestContextService,
     ) {
         this.sessionCacheStrategy = this.configService.authOptions.sessionCacheStrategy;
 
@@ -47,6 +53,22 @@ export class SessionService implements EntitySubscriberInterface {
         this.connection.rawConnection.subscribers.push(this);
     }
 
+    async onApplicationBootstrap() {
+        this.cleanSessionsJobQueue = await this.jobQueueService.createQueue({
+            name: 'clean-sessions',
+            process: async job => {
+                const ctx = await this.requestContextService.create({
+                    apiType: 'admin',
+                });
+                const result = await this.cleanExpiredSessions(ctx, job.data.batchSize);
+                return {
+                    batchSize: job.data.batchSize,
+                    sessionsRemoved: result,
+                };
+            },
+        });
+    }
+
     /** @internal */
     async afterInsert(event: InsertEvent<any>): Promise<any> {
         await this.clearSessionCacheOnDataChange(event);
@@ -298,6 +320,42 @@ export class SessionService implements EntitySubscriberInterface {
         }
     }
 
+    /**
+     * @description
+     * Triggers the clean sessions job.
+     */
+    async triggerCleanSessionsJob(batchSize: number) {
+        await this.cleanSessionsJobQueue.add({ batchSize });
+    }
+
+    /**
+     * @description
+     * Cleans expired sessions from the database & the session cache.
+     */
+    async cleanExpiredSessions(ctx: RequestContext, batchSize: number) {
+        const sessions = await this.connection
+            .getRepository(ctx, Session)
+            .createQueryBuilder('session')
+            .where('session.expires < :now', { now: new Date() })
+            .orWhere(
+                new Brackets(qb1 => {
+                    qb1.where('session.userId IS NULL')
+                        .andWhere('session.activeOrderId IS NULL')
+                        .andWhere('session.updatedAt < :updatedAt', {
+                            updatedAt: new Date(Date.now() - ms('7d')),
+                        });
+                }),
+            )
+            .take(batchSize)
+            .getMany();
+        Logger.verbose(`Cleaning ${sessions.length} expired sessions`);
+        await this.connection.getRepository(ctx, Session).remove(sessions);
+        for (const session of sessions) {
+            await this.withTimeout(this.sessionCacheStrategy.delete(session.token));
+        }
+        Logger.verbose(`Cleaned ${sessions.length} expired sessions`);
+        return sessions.length;
+    }
     /**
      * If we are over half way to the current session's expiry date, then we update it.
      *

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
schema-admin.json


Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
schema-shop.json


Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.