Browse Source

feat(core): Add `cancelJob` mutation

Relates to #614
Michael Bromley 5 years ago
parent
commit
2d099cfe72

+ 6 - 0
packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts

@@ -325,6 +325,7 @@ export type Mutation = {
     importProducts?: Maybe<ImportInfo>;
     importProducts?: Maybe<ImportInfo>;
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     removeSettledJobs: Scalars['Int'];
     removeSettledJobs: Scalars['Int'];
+    cancelJob: Job;
     settlePayment: SettlePaymentResult;
     settlePayment: SettlePaymentResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     cancelOrder: CancelOrderResult;
     cancelOrder: CancelOrderResult;
@@ -611,6 +612,10 @@ export type MutationRemoveSettledJobsArgs = {
     olderThan?: Maybe<Scalars['DateTime']>;
     olderThan?: Maybe<Scalars['DateTime']>;
 };
 };
 
 
+export type MutationCancelJobArgs = {
+    jobId: Scalars['ID'];
+};
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -1208,6 +1213,7 @@ export enum JobState {
     COMPLETED = 'COMPLETED',
     COMPLETED = 'COMPLETED',
     RETRYING = 'RETRYING',
     RETRYING = 'RETRYING',
     FAILED = 'FAILED',
     FAILED = 'FAILED',
+    CANCELLED = 'CANCELLED',
 }
 }
 
 
 export type JobList = PaginatedList & {
 export type JobList = PaginatedList & {

+ 8 - 1
packages/common/src/generated-types.ts

@@ -367,6 +367,7 @@ export type Mutation = {
   importProducts?: Maybe<ImportInfo>;
   importProducts?: Maybe<ImportInfo>;
   /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
   /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
   removeSettledJobs: Scalars['Int'];
   removeSettledJobs: Scalars['Int'];
+  cancelJob: Job;
   settlePayment: SettlePaymentResult;
   settlePayment: SettlePaymentResult;
   addFulfillmentToOrder: AddFulfillmentToOrderResult;
   addFulfillmentToOrder: AddFulfillmentToOrderResult;
   cancelOrder: CancelOrderResult;
   cancelOrder: CancelOrderResult;
@@ -698,6 +699,11 @@ export type MutationRemoveSettledJobsArgs = {
 };
 };
 
 
 
 
+export type MutationCancelJobArgs = {
+  jobId: Scalars['ID'];
+};
+
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
   id: Scalars['ID'];
   id: Scalars['ID'];
 };
 };
@@ -1361,7 +1367,8 @@ export enum JobState {
   RUNNING = 'RUNNING',
   RUNNING = 'RUNNING',
   COMPLETED = 'COMPLETED',
   COMPLETED = 'COMPLETED',
   RETRYING = 'RETRYING',
   RETRYING = 'RETRYING',
-  FAILED = 'FAILED'
+  FAILED = 'FAILED',
+  CANCELLED = 'CANCELLED'
 }
 }
 
 
 export type JobList = PaginatedList & {
 export type JobList = PaginatedList & {

+ 3 - 3
packages/core/e2e/fixtures/test-plugins/with-job-queue.ts

@@ -12,13 +12,13 @@ class TestController implements OnModuleInit {
         this.queue = this.jobQueueService.createQueue({
         this.queue = this.jobQueueService.createQueue({
             name: 'test',
             name: 'test',
             concurrency: 1,
             concurrency: 1,
-            process: (job) => {
+            process: job => {
                 PluginWithJobQueue.jobSubject.subscribe({
                 PluginWithJobQueue.jobSubject.subscribe({
-                    complete: () => {
+                    next: () => {
                         PluginWithJobQueue.jobHasDoneWork = true;
                         PluginWithJobQueue.jobHasDoneWork = true;
                         job.complete();
                         job.complete();
                     },
                     },
-                    error: (err) => job.fail(err),
+                    error: err => job.fail(err),
                 });
                 });
             },
             },
         });
         });

+ 18 - 0
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -325,6 +325,7 @@ export type Mutation = {
     importProducts?: Maybe<ImportInfo>;
     importProducts?: Maybe<ImportInfo>;
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     removeSettledJobs: Scalars['Int'];
     removeSettledJobs: Scalars['Int'];
+    cancelJob: Job;
     settlePayment: SettlePaymentResult;
     settlePayment: SettlePaymentResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     cancelOrder: CancelOrderResult;
     cancelOrder: CancelOrderResult;
@@ -611,6 +612,10 @@ export type MutationRemoveSettledJobsArgs = {
     olderThan?: Maybe<Scalars['DateTime']>;
     olderThan?: Maybe<Scalars['DateTime']>;
 };
 };
 
 
+export type MutationCancelJobArgs = {
+    jobId: Scalars['ID'];
+};
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -1208,6 +1213,7 @@ export enum JobState {
     COMPLETED = 'COMPLETED',
     COMPLETED = 'COMPLETED',
     RETRYING = 'RETRYING',
     RETRYING = 'RETRYING',
     FAILED = 'FAILED',
     FAILED = 'FAILED',
+    CANCELLED = 'CANCELLED',
 }
 }
 
 
 export type JobList = PaginatedList & {
 export type JobList = PaginatedList & {
@@ -5683,6 +5689,12 @@ export type GetOrderHistoryQuery = {
     >;
     >;
 };
 };
 
 
+export type CancelJobMutationVariables = Exact<{
+    id: Scalars['ID'];
+}>;
+
+export type CancelJobMutation = { cancelJob: Pick<Job, 'id' | 'state' | 'isSettled' | 'settledAt'> };
+
 export type UpdateOptionGroupMutationVariables = Exact<{
 export type UpdateOptionGroupMutationVariables = Exact<{
     input: UpdateProductOptionGroupInput;
     input: UpdateProductOptionGroupInput;
 }>;
 }>;
@@ -7660,6 +7672,12 @@ export namespace GetOrderHistory {
     >;
     >;
 }
 }
 
 
+export namespace CancelJob {
+    export type Variables = CancelJobMutationVariables;
+    export type Mutation = CancelJobMutation;
+    export type CancelJob = NonNullable<CancelJobMutation['cancelJob']>;
+}
+
 export namespace UpdateOptionGroup {
 export namespace UpdateOptionGroup {
     export type Variables = UpdateOptionGroupMutationVariables;
     export type Variables = UpdateOptionGroupMutationVariables;
     export type Mutation = UpdateOptionGroupMutation;
     export type Mutation = UpdateOptionGroupMutation;

+ 47 - 3
packages/core/e2e/job-queue.e2e-spec.ts

@@ -1,12 +1,13 @@
 import { DefaultJobQueuePlugin, mergeConfig } from '@vendure/core';
 import { DefaultJobQueuePlugin, mergeConfig } from '@vendure/core';
 import { createTestEnvironment } from '@vendure/testing';
 import { createTestEnvironment } from '@vendure/testing';
+import gql from 'graphql-tag';
 import path from 'path';
 import path from 'path';
 
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
 import { initialData } from '../../../e2e-common/e2e-initial-data';
 import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 
 
 import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
 import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
-import { GetRunningJobs, JobState } from './graphql/generated-e2e-admin-types';
+import { CancelJob, GetRunningJobs, JobState } from './graphql/generated-e2e-admin-types';
 import { GET_RUNNING_JOBS } from './graphql/shared-definitions';
 import { GET_RUNNING_JOBS } from './graphql/shared-definitions';
 
 
 describe('JobQueue', () => {
 describe('JobQueue', () => {
@@ -37,10 +38,11 @@ describe('JobQueue', () => {
     }, TEST_SETUP_TIMEOUT_MS);
     }, TEST_SETUP_TIMEOUT_MS);
 
 
     afterAll(async () => {
     afterAll(async () => {
+        PluginWithJobQueue.jobSubject.complete();
         await server.destroy();
         await server.destroy();
     });
     });
 
 
-    function getJobsInTestQueue() {
+    function getJobsInTestQueue(state?: JobState) {
         return adminClient
         return adminClient
             .query<GetRunningJobs.Query, GetRunningJobs.Variables>(GET_RUNNING_JOBS, {
             .query<GetRunningJobs.Query, GetRunningJobs.Variables>(GET_RUNNING_JOBS, {
                 options: {
                 options: {
@@ -48,6 +50,11 @@ describe('JobQueue', () => {
                         queueName: {
                         queueName: {
                             eq: 'test',
                             eq: 'test',
                         },
                         },
+                        ...(state
+                            ? {
+                                  state: { eq: state },
+                              }
+                            : {}),
                     },
                     },
                 },
                 },
             })
             })
@@ -88,7 +95,7 @@ describe('JobQueue', () => {
     );
     );
 
 
     it('complete job after restart', async () => {
     it('complete job after restart', async () => {
-        PluginWithJobQueue.jobSubject.complete();
+        PluginWithJobQueue.jobSubject.next();
 
 
         await sleep(300);
         await sleep(300);
         const jobs = await getJobsInTestQueue();
         const jobs = await getJobsInTestQueue();
@@ -98,8 +105,45 @@ describe('JobQueue', () => {
         expect(jobs.items[0].id).toBe(testJobId);
         expect(jobs.items[0].id).toBe(testJobId);
         expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
         expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
     });
     });
+
+    it('cancels a running job', async () => {
+        PluginWithJobQueue.jobHasDoneWork = false;
+        const restControllerUrl = `http://localhost:${testConfig.apiOptions.port}/run-job`;
+        await adminClient.fetch(restControllerUrl);
+
+        await sleep(300);
+        const jobs = await getJobsInTestQueue(JobState.RUNNING);
+
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].state).toBe(JobState.RUNNING);
+        expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
+        const jobId = jobs.items[0].id;
+
+        const { cancelJob } = await adminClient.query<CancelJob.Mutation, CancelJob.Variables>(CANCEL_JOB, {
+            id: jobId,
+        });
+
+        expect(cancelJob.state).toBe(JobState.CANCELLED);
+        expect(cancelJob.isSettled).toBe(true);
+        expect(cancelJob.settledAt).not.toBeNull();
+
+        const jobs2 = await getJobsInTestQueue(JobState.CANCELLED);
+        expect(jobs.items.length).toBe(1);
+        expect(jobs.items[0].id).toBe(jobId);
+    });
 });
 });
 
 
 function sleep(ms: number): Promise<void> {
 function sleep(ms: number): Promise<void> {
     return new Promise(resolve => setTimeout(resolve, ms));
     return new Promise(resolve => setTimeout(resolve, ms));
 }
 }
+
+const CANCEL_JOB = gql`
+    mutation CancelJob($id: ID!) {
+        cancelJob(jobId: $id) {
+            id
+            state
+            isSettled
+            settledAt
+        }
+    }
+`;

+ 7 - 0
packages/core/src/api/resolvers/admin/job.resolver.ts

@@ -1,5 +1,6 @@
 import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
 import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
 import {
 import {
+    MutationCancelJobArgs,
     MutationRemoveSettledJobsArgs,
     MutationRemoveSettledJobsArgs,
     Permission,
     Permission,
     QueryJobArgs,
     QueryJobArgs,
@@ -43,4 +44,10 @@ export class JobResolver {
     removeSettledJobs(@Args() args: MutationRemoveSettledJobsArgs) {
     removeSettledJobs(@Args() args: MutationRemoveSettledJobsArgs) {
         return this.jobService.removeSettledJobs(args.queueNames || [], args.olderThan);
         return this.jobService.removeSettledJobs(args.queueNames || [], args.olderThan);
     }
     }
+
+    @Mutation()
+    @Allow(Permission.DeleteSettings)
+    cancelJob(@Args() args: MutationCancelJobArgs) {
+        return this.jobService.cancelJob(args.jobId);
+    }
 }
 }

+ 2 - 0
packages/core/src/api/schema/admin-api/job.api.graphql

@@ -8,6 +8,7 @@ type Query {
 type Mutation {
 type Mutation {
     "Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted."
     "Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted."
     removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
     removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
+    cancelJob(jobId: ID!): Job!
 }
 }
 
 
 """
 """
@@ -22,6 +23,7 @@ enum JobState {
     COMPLETED
     COMPLETED
     RETRYING
     RETRYING
     FAILED
     FAILED
+    CANCELLED
 }
 }
 
 
 input JobListOptions
 input JobListOptions

+ 10 - 1
packages/core/src/job-queue/job-queue.service.ts

@@ -70,7 +70,7 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
             const { pollInterval } = this.configService.jobQueueOptions;
             const { pollInterval } = this.configService.jobQueueOptions;
             if (pollInterval < 100) {
             if (pollInterval < 100) {
                 Logger.warn(
                 Logger.warn(
-                    `jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not receommended to set this lower than 100ms`,
+                    `jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not recommended to set this lower than 100ms`,
                 );
                 );
             }
             }
             await new Promise(resolve => setTimeout(resolve, 1000));
             await new Promise(resolve => setTimeout(resolve, 1000));
@@ -149,4 +149,13 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
     removeSettledJobs(queueNames: string[], olderThan?: Date) {
     removeSettledJobs(queueNames: string[], olderThan?: Date) {
         return this.jobQueueStrategy.removeSettledJobs(queueNames, olderThan);
         return this.jobQueueStrategy.removeSettledJobs(queueNames, olderThan);
     }
     }
+
+    async cancelJob(jobId: ID) {
+        const job = await this.jobQueueStrategy.findOne(jobId);
+        if (job) {
+            job.cancel();
+            await this.jobQueueStrategy.update(job);
+            return job;
+        }
+    }
 }
 }

+ 9 - 1
packages/core/src/job-queue/job.ts

@@ -11,7 +11,7 @@ import { JobConfig, JobData } from './types';
  * @docsCategory JobQueue
  * @docsCategory JobQueue
  * @docsPage Job
  * @docsPage Job
  */
  */
-export type JobEventType = 'start' | 'progress' | 'complete' | 'fail';
+export type JobEventType = 'start' | 'progress' | 'complete' | 'fail' | 'cancel';
 
 
 /**
 /**
  * @description
  * @description
@@ -51,6 +51,7 @@ export class Job<T extends JobData<T> = any> {
         progress: [],
         progress: [],
         complete: [],
         complete: [],
         fail: [],
         fail: [],
+        cancel: [],
     };
     };
 
 
     get name(): string {
     get name(): string {
@@ -165,6 +166,13 @@ export class Job<T extends JobData<T> = any> {
         this.fireEvent('fail');
         this.fireEvent('fail');
     }
     }
 
 
+    cancel() {
+        this._progress = 0;
+        this._settledAt = new Date();
+        this._state = JobState.CANCELLED;
+        this.fireEvent('cancel');
+    }
+
     /**
     /**
      * @description
      * @description
      * Sets a RUNNING job back to PENDING. Should be used when the JobQueue is being
      * Sets a RUNNING job back to PENDING. Should be used when the JobQueue is being

+ 6 - 0
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -325,6 +325,7 @@ export type Mutation = {
     importProducts?: Maybe<ImportInfo>;
     importProducts?: Maybe<ImportInfo>;
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     /** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
     removeSettledJobs: Scalars['Int'];
     removeSettledJobs: Scalars['Int'];
+    cancelJob: Job;
     settlePayment: SettlePaymentResult;
     settlePayment: SettlePaymentResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     addFulfillmentToOrder: AddFulfillmentToOrderResult;
     cancelOrder: CancelOrderResult;
     cancelOrder: CancelOrderResult;
@@ -611,6 +612,10 @@ export type MutationRemoveSettledJobsArgs = {
     olderThan?: Maybe<Scalars['DateTime']>;
     olderThan?: Maybe<Scalars['DateTime']>;
 };
 };
 
 
+export type MutationCancelJobArgs = {
+    jobId: Scalars['ID'];
+};
+
 export type MutationSettlePaymentArgs = {
 export type MutationSettlePaymentArgs = {
     id: Scalars['ID'];
     id: Scalars['ID'];
 };
 };
@@ -1208,6 +1213,7 @@ export enum JobState {
     COMPLETED = 'COMPLETED',
     COMPLETED = 'COMPLETED',
     RETRYING = 'RETRYING',
     RETRYING = 'RETRYING',
     FAILED = 'FAILED',
     FAILED = 'FAILED',
+    CANCELLED = 'CANCELLED',
 }
 }
 
 
 export type JobList = PaginatedList & {
 export type JobList = PaginatedList & {

File diff suppressed because it is too large
+ 0 - 0
schema-admin.json


Some files were not shown because too many files changed in this diff