Browse Source

docs: Add docs on job cancellation handling

Relates to #1127, relates to #2650.
Michael Bromley 1 year ago
parent
commit
d4229684e3
1 changed files with 100 additions and 0 deletions
  1. 100 0
      docs/docs/guides/developer-guide/worker-job-queue/index.mdx

+ 100 - 0
docs/docs/guides/developer-guide/worker-job-queue/index.mdx

@@ -318,6 +318,106 @@ import { adminApiExtensions } from './api/api-extensions';
 })
 export class ProductVideoPlugin {}
 ```
+
+### Passing the RequestContext
+
+It is common to need to pass the [RequestContext object](/reference/typescript-api/request/request-context) to the `process` function of a job, since `ctx` is required by many Vendure
+service methods that you may be using inside your `process` function. However, the `RequestContext` object itself is not serializable,
+so it cannot be passed directly to the `JobQueue.add()` method. Instead, you can serialize the `RequestContext` using the [`RequestContext.serialize()`
+method](/reference/typescript-api/request/request-context/#serialize), and then deserialize it in the `process` function using the static `deserialize` method:
+
+```ts
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { JobQueue, JobQueueService, Product, TransactionalConnection,
+    SerializedRequestContext, RequestContext } from '@vendure/core';
+
+@Injectable()
+class ProductExportService implements OnModuleInit {
+
+    // highlight-next-line
+    private jobQueue: JobQueue<{ ctx: SerializedRequestContext; }>;
+
+    constructor(private jobQueueService: JobQueueService,
+                private connection: TransactionalConnection) {
+    }
+
+    async onModuleInit() {
+        this.jobQueue = await this.jobQueueService.createQueue({
+            name: 'export-products',
+            process: async job => {
+                // highlight-next-line
+                const ctx = RequestContext.deserialize(job.data.ctx);
+                const allProducts = await this.connection.getRepository(ctx, Product).find();
+                // ... logic to export the product omitted for brevity
+            },
+        });
+    }
+
+    exportAllProducts(ctx: RequestContext) {
+        // highlight-next-line
+        return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) });
+    }
+}
+```
+
+### Handling job cancellation
+
+It is possible for an administrator to cancel a running job. Doing so will cause the configured job queue strategy to mark the job as cancelled, but
+on its own this will not stop the job from running. This is because the job queue itself has no direct control over the `process` function once
+it has been started.
+
+It is up to the `process` function to check for cancellation and stop processing if the job has been cancelled. This can be done by checking the
+`job.state` property, and if the job is cancelled, the `process` function can throw an error to indicate that the job was interrupted
+by early cancellation:
+
+```ts
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { JobQueue, JobQueueService, Product, TransactionalConnection,
+    SerializedRequestContext, RequestContext, Job, JobState } from '@vendure/core';
+import { IsNull } from 'typeorm';
+
+@Injectable()
+class ProductExportService implements OnModuleInit {
+
+    private jobQueue: JobQueue<{ ctx: SerializedRequestContext; }>;
+
+    constructor(private jobQueueService: JobQueueService,
+                private connection: TransactionalConnection) {
+    }
+
+    async onModuleInit() {
+        this.jobQueue = await this.jobQueueService.createQueue({
+            name: 'export-products',
+            process: async job => {
+                const ctx = RequestContext.deserialize(job.data.ctx);
+                const allProducts = await this.connection.getRepository(ctx, Product).find({
+                    where: { deletedAt: IsNull() }
+                });
+                let successfulExportCount = 0;
+                for (const product of allProducts) {
+                    // highlight-start
+                    if (job.state === JobState.CANCELLED) {
+                        // If the job has been cancelled, stop processing
+                        // to prevent unnecessary work.
+                        throw new Error('Job was cancelled');
+                    }
+                    // highlight-end
+
+                    // ... logic to export the product omitted for brevity
+                    successfulExportCount++;
+                }
+                return { successfulExportCount };
+            },
+        });
+    }
+
+    exportAllProducts(ctx: RequestContext) {
+        return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) });
+    }
+}
+```
+
+
 ### Subscribing to job updates
 
 When creating a new job via `JobQueue.add()`, it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.