Przeglądaj źródła

feat(core): Add scheduled task to clean up jobs from the DB

Michael Bromley 8 miesięcy temu
rodzic
commit
ed28280719

+ 52 - 0
packages/core/src/plugin/default-job-queue-plugin/clean-jobs-task.ts

@@ -0,0 +1,52 @@
+import { Logger } from '../../config';
+import { TransactionalConnection } from '../../connection';
+import { ScheduledTask } from '../../scheduler/scheduled-task';
+
+import { DEFAULT_JOB_QUEUE_PLUGIN_OPTIONS, DEFAULT_KEEP_JOBS_COUNT } from './constants';
+import { JobRecord } from './job-record.entity';
+import { DefaultJobQueueOptions } from './types';
+
+/**
+ * @description
+ * A {@link ScheduledTask} that cleans up old jobs from the database when using the DefaultJobQueuePlugin.
+ * You can configure the settings & schedule of the task via the {@link DefaultJobQueuePlugin} options.
+ *
+ * @since 3.3.0
+ * @docsCategory JobQueue
+ * @docsPage DefaultJobQueuePlugin
+ */
+export const cleanJobsTask = new ScheduledTask({
+    id: 'clean-jobs',
+    description: 'Clean completed, failed, and cancelled jobs from the database',
+    schedule: cron => cron.every(2).hours(),
+    async execute({ injector, params }) {
+        const options = injector.get<DefaultJobQueueOptions>(DEFAULT_JOB_QUEUE_PLUGIN_OPTIONS);
+        const keepJobsCount = options.keepJobsCount || DEFAULT_KEEP_JOBS_COUNT;
+        const connection = injector.get(TransactionalConnection);
+        const qb = connection.rawConnection
+            .getRepository(JobRecord)
+            .createQueryBuilder('job')
+            .where(`job.state IN (:...states)`, {
+                states: ['COMPLETED', 'FAILED', 'CANCELLED'],
+            })
+            .orderBy('createdAt', 'ASC');
+
+        const count = await qb.getCount();
+
+        const BATCH_SIZE = 100;
+        const numberOfJobsToDelete = Math.max(count - keepJobsCount, 0);
+        if (0 < numberOfJobsToDelete) {
+            const jobsToDelete = await qb.select('id').limit(numberOfJobsToDelete).getRawMany();
+            Logger.verbose(`Cleaning up ${jobsToDelete.length} jobs...`);
+            for (let i = 0; i < jobsToDelete.length; i += BATCH_SIZE) {
+                const batch = jobsToDelete.slice(i, i + BATCH_SIZE);
+                await connection.rawConnection
+                    .getRepository(JobRecord)
+                    .delete(batch.map(job => job.id) as string[]);
+            }
+        }
+        return {
+            jobRecordsDeleted: numberOfJobsToDelete,
+        };
+    },
+});

+ 2 - 0
packages/core/src/plugin/default-job-queue-plugin/constants.ts

@@ -0,0 +1,2 @@
+export const DEFAULT_JOB_QUEUE_PLUGIN_OPTIONS = Symbol('DEFAULT_JOB_QUEUE_PLUGIN_OPTIONS');
+export const DEFAULT_KEEP_JOBS_COUNT = 1000;

+ 37 - 82
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -1,94 +1,15 @@
 import { Type } from '@vendure/common/lib/shared-types';
 
-import { Job } from '../../job-queue/job';
-import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy';
 import { PluginCommonModule } from '../plugin-common.module';
 import { VendurePlugin } from '../vendure-plugin';
 
+import { cleanJobsTask } from './clean-jobs-task';
+import { DEFAULT_JOB_QUEUE_PLUGIN_OPTIONS } from './constants';
 import { JobRecordBuffer } from './job-record-buffer.entity';
 import { JobRecord } from './job-record.entity';
 import { SqlJobBufferStorageStrategy } from './sql-job-buffer-storage-strategy';
 import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
-
-/**
- * @description
- * Configuration options for the DefaultJobQueuePlugin. These values get passed into the
- * {@link SqlJobQueueStrategy}.
- *
- * @docsCategory JobQueue
- * @docsPage DefaultJobQueuePlugin
- */
-export interface DefaultJobQueueOptions {
-    /**
-     * @description
-     * The interval in ms between polling the database for new jobs. If many job queues
-     * are active, the polling may cause undue load on the database, in which case this value
-     * should be increased to e.g. 1000.
-     *
-     * @default 200
-     */
-    pollInterval?: number | ((queueName: string) => number);
-    /**
-     * @description
-     * How many jobs from a given queue to process concurrently.
-     *
-     * @default 1
-     */
-    concurrency?: number;
-    /**
-     * @description
-     * The strategy used to decide how long to wait before retrying a failed job.
-     *
-     * @default () => 1000
-     */
-    backoffStrategy?: BackoffStrategy;
-    /**
-     * @description
-     * When a job is added to the JobQueue using `JobQueue.add()`, the calling
-     * code may specify the number of retries in case of failure. This option allows
-     * you to override that number and specify your own number of retries based on
-     * the job being added.
-     *
-     * @example
-     * ```ts
-     * setRetries: (queueName, job) => {
-     *   if (queueName === 'send-email') {
-     *     // Override the default number of retries
-     *     // for the 'send-email' job because we have
-     *     // a very unreliable email service.
-     *     return 10;
-     *   }
-     *   return job.retries;
-     * }
-     *  ```
-     * @param queueName
-     * @param job
-     */
-    setRetries?: (queueName: string, job: Job) => number;
-    /**
-     * @description
-     * If set to `true`, the database will be used to store buffered jobs. This is
-     * recommended for production.
-     *
-     * When enabled, a new `JobRecordBuffer` database entity will be defined which will
-     * require a migration when first enabling this option.
-     *
-     * @since 1.3.0
-     */
-    useDatabaseForBuffer?: boolean;
-    /**
-     * @description
-     * The timeout in ms which the queue will use when attempting a graceful shutdown.
-     * That means when the server is shut down but a job is running, the job queue will
-     * wait for the job to complete before allowing the server to shut down. If the job
-     * does not complete within this timeout window, the job will be forced to stop
-     * and the server will shut down anyway.
-     *
-     * @since 2.2.0
-     * @default 20_000
-     */
-    gracefulShutdownTimeout?: number;
-}
+import { DefaultJobQueueOptions } from './types';
 
 /**
  * @description
@@ -177,7 +98,30 @@ export interface DefaultJobQueueOptions {
  * };
  * ```
  *
+ * ### Removing old jobs
+ * Since v3.3, the job queue will automatically remove old jobs from the database. This is done by a scheduled task
+ * which runs every 2 hours by default. The number of jobs to keep in the database can be configured using the
+ * `keepJobsCount` option. The default is 1000.
+ *
+ * @example
+ * ```ts
+ * export const config: VendureConfig = {
+ *   plugins: [
+ *     DefaultJobQueuePlugin.init({
+ *       // The number of completed/failed/cancelled
+ *       // jobs to keep in the database. The default is 1000.
+ *       keepJobsCount: 100,
+ *       // The interval at which to run the clean-up task.
+ *       // This can be a standard cron expression or a function
+ *       // that returns a cron expression. The default is every 2 hours.
+ *       cleanJobsSchedule: cron => cron.every(5).hours(),
+ *     }),
+ *   ],
+ * };
+ * ```
+ *
  * @docsCategory JobQueue
+ * @docsPage DefaultJobQueuePlugin
  * @docsWeight 0
  */
 @VendurePlugin({
@@ -199,8 +143,19 @@ export interface DefaultJobQueueOptions {
         if (DefaultJobQueuePlugin.options.useDatabaseForBuffer === true) {
             config.jobQueueOptions.jobBufferStorageStrategy = new SqlJobBufferStorageStrategy();
         }
+        config.schedulerOptions.tasks.push(
+            cleanJobsTask.configure({
+                schedule: DefaultJobQueuePlugin.options.cleanJobsSchedule,
+            }),
+        );
         return config;
     },
+    providers: [
+        {
+            provide: DEFAULT_JOB_QUEUE_PLUGIN_OPTIONS,
+            useFactory: () => DefaultJobQueuePlugin.options,
+        },
+    ],
     compatibility: '>0.0.0',
 })
 export class DefaultJobQueuePlugin {

+ 102 - 0
packages/core/src/plugin/default-job-queue-plugin/types.ts

@@ -0,0 +1,102 @@
+import { BackoffStrategy, Job } from '../../job-queue';
+import { ScheduledTaskConfig } from '../../scheduler';
+
+/**
+ * @description
+ * Configuration options for the DefaultJobQueuePlugin. These values get passed into the
+ * {@link SqlJobQueueStrategy}.
+ *
+ * @docsCategory JobQueue
+ * @docsPage DefaultJobQueuePlugin
+ */
+export interface DefaultJobQueueOptions {
+    /**
+     * @description
+     * The interval in ms between polling the database for new jobs. If many job queues
+     * are active, the polling may cause undue load on the database, in which case this value
+     * should be increased to e.g. 1000.
+     *
+     * @default 200
+     */
+    pollInterval?: number | ((queueName: string) => number);
+    /**
+     * @description
+     * How many jobs from a given queue to process concurrently.
+     *
+     * @default 1
+     */
+    concurrency?: number;
+    /**
+     * @description
+     * The strategy used to decide how long to wait before retrying a failed job.
+     *
+     * @default () => 1000
+     */
+    backoffStrategy?: BackoffStrategy;
+    /**
+     * @description
+     * When a job is added to the JobQueue using `JobQueue.add()`, the calling
+     * code may specify the number of retries in case of failure. This option allows
+     * you to override that number and specify your own number of retries based on
+     * the job being added.
+     *
+     * @example
+     * ```ts
+     * setRetries: (queueName, job) => {
+     *   if (queueName === 'send-email') {
+     *     // Override the default number of retries
+     *     // for the 'send-email' job because we have
+     *     // a very unreliable email service.
+     *     return 10;
+     *   }
+     *   return job.retries;
+     * }
+     *  ```
+     * @param queueName
+     * @param job
+     */
+    setRetries?: (queueName: string, job: Job) => number;
+    /**
+     * @description
+     * If set to `true`, the database will be used to store buffered jobs. This is
+     * recommended for production.
+     *
+     * When enabled, a new `JobRecordBuffer` database entity will be defined which will
+     * require a migration when first enabling this option.
+     *
+     * @since 1.3.0
+     */
+    useDatabaseForBuffer?: boolean;
+    /**
+     * @description
+     * The timeout in ms which the queue will use when attempting a graceful shutdown.
+     * That means when the server is shut down but a job is running, the job queue will
+     * wait for the job to complete before allowing the server to shut down. If the job
+     * does not complete within this timeout window, the job will be forced to stop
+     * and the server will shut down anyway.
+     *
+     * @since 2.2.0
+     * @default 20_000
+     */
+    gracefulShutdownTimeout?: number;
+    /**
+     * @description
+     * The number of completed/failed jobs to keep in the database. This is useful for
+     * debugging and auditing purposes, but if you have a lot of jobs, it may be
+     * desirable to limit the number of records in the database.
+     *
+     * @since 3.3.0
+     * @default 1000
+     */
+    keepJobsCount?: number;
+    /**
+     * @description
+     * The schedule for the "clean-jobs" task. This task will run periodically to clean up
+     * old jobs from the database. The schedule can be a cron expression or a function
+     * that returns a cron expression.
+     *
+     * @since 3.3.0
+     * @default cron => cron.every(2).hours()
+     */
+    cleanJobsSchedule?: ScheduledTaskConfig['schedule'];
+}

+ 1 - 0
packages/core/src/plugin/index.ts

@@ -12,3 +12,4 @@ export * from './vendure-plugin';
 export * from './plugin-common.module';
 export * from './plugin-utils';
 export * from './plugin-metadata';
+export * from './default-job-queue-plugin/types';

+ 1 - 1
packages/core/src/scheduler/tasks/clean-sessions-task.ts

@@ -3,7 +3,7 @@ import { ScheduledTask } from '../scheduled-task';
 
 /**
  * @description
- * A scheduled task that cleans expired & inactive sessions from the database.
+ * A {@link ScheduledTask} that cleans expired & inactive sessions from the database.
  *
  * @example
  * ```ts