Răsfoiți Sursa

feat(core): Implement removeSettledJobs mutation

Michael Bromley 5 ani în urmă
părinte
comite
82af7f69dd

+ 8 - 0
packages/common/src/generated-types.ts

@@ -1798,6 +1798,8 @@ export type Mutation = {
   deleteFacetValues: Array<DeletionResponse>;
   updateGlobalSettings: GlobalSettings;
   importProducts?: Maybe<ImportInfo>;
+  /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
+  removeSettledJobs: Scalars['Int'];
   settlePayment: Payment;
   fulfillOrder: Fulfillment;
   cancelOrder: Order;
@@ -2060,6 +2062,12 @@ export type MutationImportProductsArgs = {
 };
 
 
+export type MutationRemoveSettledJobsArgs = {
+  queueNames?: Maybe<Array<Scalars['String']>>;
+  olderThan?: Maybe<Scalars['DateTime']>;
+};
+
+
 export type MutationSettlePaymentArgs = {
   id: Scalars['ID'];
 };

+ 8 - 3
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -1801,6 +1801,8 @@ export type Mutation = {
     deleteFacetValues: Array<DeletionResponse>;
     updateGlobalSettings: GlobalSettings;
     importProducts?: Maybe<ImportInfo>;
+    /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
+    removeSettledJobs: Scalars['Int'];
     settlePayment: Payment;
     fulfillOrder: Fulfillment;
     cancelOrder: Order;
@@ -2027,6 +2029,11 @@ export type MutationImportProductsArgs = {
     csvFile: Scalars['Upload'];
 };
 
+export type MutationRemoveSettledJobsArgs = {
+    queueNames?: Maybe<Array<Scalars['String']>>;
+    olderThan?: Maybe<Scalars['DateTime']>;
+};
+
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
 };
@@ -3676,9 +3683,7 @@ export type GetFacetValuesQuery = { __typename?: 'Query' } & {
     };
 };
 
-export type GetCollectionsQueryVariables = {
-    options?: Maybe<CollectionListOptions>;
-};
+export type GetCollectionsQueryVariables = {};
 
 export type GetCollectionsQuery = { __typename?: 'Query' } & {
     collections: { __typename?: 'CollectionList' } & {

+ 12 - 5
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -1,5 +1,6 @@
-import { Args, Query, Resolver } from '@nestjs/graphql';
+import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
 import {
+    MutationRemoveSettledJobsArgs,
     Permission,
     QueryJobArgs,
     QueryJobsArgs,
@@ -14,26 +15,32 @@ export class JobResolver {
     constructor(private jobService: JobQueueService) {}
 
     @Query()
-    @Allow(Permission.Authenticated)
+    @Allow(Permission.ReadSettings)
     job(@Args() args: QueryJobArgs) {
         return this.jobService.getJob(args.jobId);
     }
 
     @Query()
-    @Allow(Permission.Authenticated)
+    @Allow(Permission.ReadSettings)
     jobs(@Args() args: QueryJobsArgs) {
         return this.jobService.getJobs(args.options || undefined);
     }
 
     @Query()
-    @Allow(Permission.Authenticated)
+    @Allow(Permission.ReadSettings)
     jobsById(@Args() args: QueryJobsByIdArgs) {
         return this.jobService.getJobsById(args.jobIds || undefined);
     }
 
     @Query()
-    @Allow(Permission.Authenticated)
+    @Allow(Permission.ReadSettings)
     jobQueues() {
         return this.jobService.getJobQueues();
     }
+
+    @Mutation()
+    @Allow(Permission.DeleteSettings)
+    removeSettledJobs(@Args() args: MutationRemoveSettledJobsArgs) {
+        return this.jobService.removeSettledJobs(args.queueNames || [], args.olderThan);
+    }
 }

+ 5 - 0
packages/core/src/api/schema/admin-api/job.api.graphql

@@ -5,6 +5,11 @@ type Query {
     jobQueues: [JobQueue!]!
 }
 
+type Mutation {
+    "Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted."
+    removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
+}
+
 enum JobState {
     PENDING
     RUNNING

+ 10 - 0
packages/core/src/config/job-queue/job-queue-strategy.ts

@@ -73,4 +73,14 @@ export interface JobQueueStrategy {
      * Returns an array of jobs for the given ids.
      */
     findManyById(ids: ID[]): Promise<Job[]>;
+
+    /**
+     * @description
+     * Remove all settled jobs in the specified queues older than the given date.
+     * If no queueName is passed, all queues will be considered. If no olderThan
+     * date is passed, all jobs older than the current time will be removed.
+     *
+     * Returns a promise of the number of jobs removed.
+     */
+    removeSettledJobs(queueNames?: string[], olderThan?: Date): Promise<number>;
 }

+ 36 - 20
packages/core/src/job-queue/in-memory-job-queue-strategy.ts

@@ -78,7 +78,7 @@ export class InMemoryJobQueueStrategy implements JobQueueStrategy {
     }
 
     async findManyById(ids: ID[]): Promise<Job[]> {
-        return ids.map((id) => this.jobs.get(id)).filter(notNullOrUndefined);
+        return ids.map(id => this.jobs.get(id)).filter(notNullOrUndefined);
     }
 
     async next(queueName: string): Promise<Job | undefined> {
@@ -97,6 +97,29 @@ export class InMemoryJobQueueStrategy implements JobQueueStrategy {
         this.jobs.set(job.id!, job);
     }
 
+    async removeSettledJobs(queueNames: string[] = [], olderThan?: Date): Promise<number> {
+        let removed = 0;
+        for (const job of this.jobs.values()) {
+            if (0 < queueNames.length && !queueNames.includes(job.queueName)) {
+                continue;
+            }
+            if (job.isSettled) {
+                if (olderThan) {
+                    if (job.settledAt && job.settledAt < olderThan) {
+                        // tslint:disable-next-line:no-non-null-assertion
+                        this.jobs.delete(job.id!);
+                        removed++;
+                    }
+                } else {
+                    // tslint:disable-next-line:no-non-null-assertion
+                    this.jobs.delete(job.id!);
+                    removed++;
+                }
+            }
+        }
+        return removed;
+    }
+
     private applySort(items: Job[], sort: JobSortParameter): Job[] {
         for (const [prop, direction] of Object.entries(sort)) {
             const key = prop as keyof Required<JobSortParameter>;
@@ -110,40 +133,40 @@ export class InMemoryJobQueueStrategy implements JobQueueStrategy {
         for (const [prop, operator] of Object.entries(filters)) {
             const key = prop as keyof Required<JobFilterParameter>;
             if (operator?.eq !== undefined) {
-                items = items.filter((i) => i[key] === operator.eq);
+                items = items.filter(i => i[key] === operator.eq);
             }
 
             const contains = (operator as StringOperators)?.contains;
             if (contains) {
-                items = items.filter((i) => (i[key] as string).includes(contains));
+                items = items.filter(i => (i[key] as string).includes(contains));
             }
             const gt = (operator as NumberOperators)?.gt;
             if (gt) {
-                items = items.filter((i) => (i[key] as number) > gt);
+                items = items.filter(i => (i[key] as number) > gt);
             }
             const gte = (operator as NumberOperators)?.gte;
             if (gte) {
-                items = items.filter((i) => (i[key] as number) >= gte);
+                items = items.filter(i => (i[key] as number) >= gte);
             }
             const lt = (operator as NumberOperators)?.lt;
             if (lt) {
-                items = items.filter((i) => (i[key] as number) < lt);
+                items = items.filter(i => (i[key] as number) < lt);
             }
             const lte = (operator as NumberOperators)?.lte;
             if (lte) {
-                items = items.filter((i) => (i[key] as number) <= lte);
+                items = items.filter(i => (i[key] as number) <= lte);
             }
             const before = (operator as DateOperators)?.before;
             if (before) {
-                items = items.filter((i) => (i[key] as Date) <= before);
+                items = items.filter(i => (i[key] as Date) <= before);
             }
             const after = (operator as DateOperators)?.after;
             if (after) {
-                items = items.filter((i) => (i[key] as Date) >= after);
+                items = items.filter(i => (i[key] as Date) >= after);
             }
             const between = (operator as NumberOperators)?.between;
             if (between) {
-                items = items.filter((i) => {
+                items = items.filter(i => {
                     const num = i[key] as number;
                     return num > between.start && num < between.end;
                 });
@@ -164,16 +187,9 @@ export class InMemoryJobQueueStrategy implements JobQueueStrategy {
      * grows indefinitely.
      */
     private evictSettledJobs = () => {
-        for (const job of this.jobs.values()) {
-            if (job.isSettled) {
-                const settledAtMs = job.settledAt ? +job.settledAt : 0;
-                const nowMs = +new Date();
-                if (nowMs - settledAtMs > this.evictJobsAfterMs) {
-                    // tslint:disable-next-line:no-non-null-assertion
-                    this.jobs.delete(job.id!);
-                }
-            }
-        }
+        const nowMs = +new Date();
+        const olderThanMs = nowMs - this.evictJobsAfterMs;
+        this.removeSettledJobs([], new Date(olderThanMs));
         this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
     };
 }

+ 7 - 3
packages/core/src/job-queue/job-queue.service.ts

@@ -38,7 +38,7 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
                     `jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not receommended to set this lower than 100ms`,
                 );
             }
-            await new Promise((resolve) => setTimeout(resolve, 1000));
+            await new Promise(resolve => setTimeout(resolve, 1000));
             this.hasInitialized = true;
             for (const queue of this.queues) {
                 if (!queue.started) {
@@ -50,7 +50,7 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
 
     /** @internal */
     onModuleDestroy() {
-        return Promise.all(this.queues.map((q) => q.destroy()));
+        return Promise.all(this.queues.map(q => q.destroy()));
     }
 
     /**
@@ -95,9 +95,13 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
     }
 
     getJobQueues(): GraphQlJobQueue[] {
-        return this.queues.map((queue) => ({
+        return this.queues.map(queue => ({
             name: queue.name,
             running: queue.started,
         }));
     }
+
+    removeSettledJobs(queueNames: string[], olderThan?: Date) {
+        return this.jobQueueStrategy.removeSettledJobs(queueNames, olderThan);
+    }
 }

+ 8 - 5
packages/core/src/job-queue/job-queue.ts

@@ -51,19 +51,22 @@ export class JobQueue<Data extends JobData<Data> = {}> {
                 if (nextJob) {
                     this.activeJobs.push(nextJob);
                     await this.jobQueueStrategy.update(nextJob);
-                    nextJob.on('complete', (job) => this.onFailOrComplete(job));
-                    nextJob.on('fail', (job) => this.onFailOrComplete(job));
+                    nextJob.on('complete', job => this.onFailOrComplete(job));
+                    nextJob.on('progress', job => this.jobQueueStrategy.update(job));
+                    nextJob.on('fail', job => this.onFailOrComplete(job));
                     try {
                         const returnVal = this.options.process(nextJob);
                         if (returnVal instanceof Promise) {
-                            returnVal.catch((err) => nextJob.fail(err));
+                            returnVal.catch(err => nextJob.fail(err));
                         }
                     } catch (err) {
                         nextJob.fail(err);
                     }
                 }
             }
-            this.timer = setTimeout(runNextJobs, this.pollInterval);
+            if (this.running) {
+                this.timer = setTimeout(runNextJobs, this.pollInterval);
+            }
         };
 
         runNextJobs();
@@ -82,7 +85,7 @@ export class JobQueue<Data extends JobData<Data> = {}> {
         const start = +new Date();
         // Wait for 2 seconds to allow running jobs to complete
         const maxTimeout = 2000;
-        return new Promise((resolve) => {
+        return new Promise(resolve => {
             const pollActiveJobs = async () => {
                 const timedOut = +new Date() - start > maxTimeout;
                 if (this.activeJobs.length === 0 || timedOut) {

+ 3 - 1
packages/core/src/job-queue/job.ts

@@ -10,7 +10,7 @@ import { JobConfig, JobData } from './types';
  * @docsCategory JobQueue
  * @docsPage Job
  */
-export type JobEventType = 'start' | 'complete' | 'fail';
+export type JobEventType = 'start' | 'progress' | 'complete' | 'fail';
 
 /**
  * @description
@@ -46,6 +46,7 @@ export class Job<T extends JobData<T> = any> {
     private _settledAt?: Date;
     private readonly eventListeners: { [type in JobEventType]: Array<JobEventListener<T>> } = {
         start: [],
+        progress: [],
         complete: [],
         fail: [],
     };
@@ -130,6 +131,7 @@ export class Job<T extends JobData<T> = any> {
      */
     setProgress(percent: number) {
         this._progress = Math.min(percent, 100);
+        this.fireEvent('progress');
     }
 
     /**

+ 18 - 3
packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts

@@ -2,7 +2,7 @@ import { ModuleRef } from '@nestjs/core';
 import { getConnectionToken } from '@nestjs/typeorm';
 import { JobListOptions, JobState } from '@vendure/common/lib/generated-types';
 import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
-import { Brackets, Connection } from 'typeorm';
+import { Brackets, Connection, FindConditions, In, LessThan, Not } from 'typeorm';
 
 import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy';
 import { Job } from '../../job-queue/job';
@@ -41,7 +41,7 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
             .createQueryBuilder('record')
             .where('record.queueName = :queueName', { queueName })
             .andWhere(
-                new Brackets((qb) => {
+                new Brackets(qb => {
                     qb.where('record.state = :pending', {
                         pending: JobState.PENDING,
                     }).orWhere('record.state = :retrying', { retrying: JobState.RETRYING });
@@ -96,7 +96,22 @@ export class SqlJobQueueStrategy implements JobQueueStrategy {
         return this.connection
             .getRepository(JobRecord)
             .findByIds(ids)
-            .then((records) => records.map(this.fromRecord));
+            .then(records => records.map(this.fromRecord));
+    }
+
+    async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
+        if (!this.connectionAvailable(this.connection)) {
+            return 0;
+        }
+        const findOptions: FindConditions<JobRecord> = {
+            ...(0 < queueNames.length ? { queueName: In(queueNames) } : {}),
+            isSettled: true,
+            settledAt: LessThan(olderThan || new Date()),
+        };
+        const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions });
+        const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions });
+        await this.connection.getRepository(JobRecord).delete(findOptions);
+        return deleteCount;
     }
 
     private connectionAvailable(connection: Connection | undefined): connection is Connection {

Fișier diff suprimat deoarece este prea mare
+ 0 - 0
schema-admin.json


Unele fișiere nu au fost afișate deoarece prea multe fișiere au fost modificate în acest diff