Sfoglia il codice sorgente

feat(docs): Add docs on JobQueue & examples

Michael Bromley 5 anni fa
parent
commit
8b45d86b16

+ 45 - 0
docs/content/docs/developer-guide/job-queue/index.md

@@ -0,0 +1,45 @@
+---
+title: 'Job Queue'
+showtoc: true
+---
+
+# The Vendure Job Queue
+
+## What is a job queue?
+
+Vendure uses a [job queue](https://en.wikipedia.org/wiki/Job_queue) to handle the processing of certain tasks which are typically too slow to run in the normal request-response cycle. A normal request-response looks like this:
+
+{{< figure src="./job_queue_req_res.png" >}}
+
+In the normal request-response, all intermediate tasks (looking up data in the database, performing business logic etc.) occur before the response can be returned. For most operations this is fine, since those intermediate tasks are very fast.
+
+Some operations however will need to perform much longer-running tasks. For example, updating the search index on thousands of products could take up to a minute or more. In this case, we certainly don't want to delay the reponse until that processing has completed. That's where a job queue comes in:
+
+{{< figure src="./job_queue_req_res_with_queue.png" >}}
+
+## What does Vendure use the job queue for?
+
+-   Re-building the search index
+-   Updating the search index when changes are made to Products, ProductVariants, Assets etc.
+-   Updating the contents of Collections
+-   Sending transactional emails
+
+## Job Queue persistence
+
+When a job is added to the queue, that fact must be persisted to some kind of storage. In Vendure, the storage mechanism is defined by the [JobQueueStrategy]({{< relref "job-queue-strategy" >}}).
+
+By default, Vendure uses an [in-memory store]({{< relref "in-memory-job-queue-strategy" >}}) of the contents of each queue. While this has the advantage of requiring no external dependencies, it is not suitable for production because when the server is stopped, the entire queue will be lost and any pending jobs will never be processed.
+
+A better alternative is to use the [DefaultJobQueuePlugin]({{< relref "default-job-queue-plugin" >}}), which configures Vendure to use the [SqlJobQueueStrategy]({{< relref "sql-job-queue-strategy" >}}). This means that event if the Vendure server stops, pending jobs will be persisted and upon re-start, they will be processed.
+
+It is also possible to implement your own JobQueueStrategy to enable other persistence mechanisms, e.g. Redis.
+
+## Using Job Queues in a plugin
+
+If you create a [Vendure plugin]({{< relref "/docs/plugins" >}}) which involves some long-running tasks, you can also make use of the job queue. See the [JobQueue plugin example]({{< relref "plugin-examples" >}}#using-the-jobqueueservice) for a detailed annotated example.
+
+{{% alert "primary" %}}
+Note: The [JobQueueService]({{< relref "job-queue-service" >}}) combines well with the [WorkerService]({{< relref "worker-service" >}}).
+
+A real example of this can be seen in the [EmailPlugin source](https://github.com/vendure-ecommerce/vendure/blob/07e1958f1ad1766e6fd3dae80f526bb688c0288e/packages/email-plugin/src/plugin.ts#L201-L210)
+{{% /alert %}}

BIN
docs/content/docs/developer-guide/job-queue/job_queue_req_res.png


BIN
docs/content/docs/developer-guide/job-queue/job_queue_req_res_with_queue.png


+ 10 - 0
docs/content/docs/developer-guide/vendure-worker.md

@@ -36,3 +36,13 @@ Internally, the Worker is an instance of a [NestJS microservice](https://docs.ne
 ## Running on the main process
 
 There is an option `runInMainProcess` which, if set to `true` will cause the Worker to be bootstrapped along with the main application, without the need for a separate process running `bootstrapWorker`. This is mainly used for testing and development, and is not advised for production use, since it negates the benefits of running long tasks off of the main process.
+
+## Running custom code on the worker
+
+If you are authoring a [Vendure plugin]({{< relref "/docs/plugins" >}}) to implement custom functionality, you can also make use of the worker process in order to handle long-running or computationally-demanding tasks. See the [Plugin Examples]({{< relref "plugin-examples" >}}#running-processes-on-the-worker) page for an example of this.
+
+{{% alert "primary" %}}
+Note: The [WorkerService]({{< relref "worker-service" >}}) combines well with the [JobQueueService]({{< relref "job-queue-service" >}}).
+
+A real example of this can be seen in the [EmailPlugin source](https://github.com/vendure-ecommerce/vendure/blob/07e1958f1ad1766e6fd3dae80f526bb688c0288e/packages/email-plugin/src/plugin.ts#L201-L210)
+{{% /alert %}}

+ 112 - 1
docs/content/docs/plugins/plugin-examples.md

@@ -169,7 +169,7 @@ Side note: since this uses no Vendure-specific metadata, it could also be writte
 
 This example shows how to set up a microservice running on the Worker process, as well as subscribing to events via the [EventBus]({{< relref "event-bus" >}}).
 
-Also see the docs for [WorkerService]({{< relref "worker-service" >}}) and 
+Also see the docs for [WorkerService]({{< relref "worker-service" >}}).
 
 ```TypeScript
 // order-processing.controller.ts
@@ -241,3 +241,114 @@ export class OrderAnalyticsPlugin implements OnVendureBootstrap {
 
 }
 ```
+
+## Using the JobQueueService
+
+If your plugin involves long-running tasks, you can take advantage of the [job queue system]({{< relref "/docs/developer-guide/job-queue" >}}) that comes with Vendure. This example defines a mutation that can be used to transcode and link a video to a Product's customFields.
+
+```TypeScript
+// product-video.resolver.ts
+import { Args, Mutation, Resolver } from '@nestjs/graphql';
+import { Ctx, RequestContext } from '@vendure/core'
+import { ProductVideoService } from './product-video.service';
+
+@Resolver()
+class ProductVideoResolver {
+
+  constructor(private productVideoService: ProductVideoService) {}
+
+  @Mutation()
+  addVideoToProduct(@Args() args: any) {
+    return this.productVideoService.transcodeForProduct(
+      args.productId, 
+      args.videoUrl,
+    );
+  }
+
+}
+```
+The resolver just defines how to handle the new `addVideoToProduct` mutation, delegating the actual work to the `ProductVideoService`:
+```TypeScript
+// product-video.service.ts
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { InjectConnection } from '@nestjs/typeorm';
+import { Connection } from 'typeorm';
+import { JobQueue, JobQueueService, ID, Product } from '@vendure/core';
+import { transcode } from 'third-party-video-sdk'; 
+
+let jobQueue: JobQueue<{ productId: ID; videoUrl: string; }>;
+
+@Injectable()
+class ProductVideoService implements OnModuleInit { 
+  
+  constructor(private jobQueueService: JobQueueService, 
+              @InjectConnection() private connection: Connection) {}
+
+  onModuleInit() {
+    // This check ensures that only a single JobQueue is created, even if this
+    // service gets instantiated more than once.
+    if (!jobQueue) {
+      jobQueue = this.jobQueueService.createQueue({
+        name: 'transcode-video',
+        concurrency: 5,
+        process: async job => {
+          // Here we define how each job in the queue will be processed.
+          // In this case we call out to some imaginary 3rd-party video
+          // transcoding API, which performs the work and then
+          // returns a new URL of the transcoded video, which we can then
+          // associate with the Product via the customFields.
+          try {
+            const result = await transcode(job.data.videoId);
+            await this.connection.getRepository(Product).save({
+              id: job.data.productId,
+              customFields: {
+                videoUrl: result.url,
+              },
+            });
+            job.complete(result);
+          } catch (e) {
+            job.fail(e);
+          }
+        },
+      });
+    }
+  }
+
+  transcodeForProduct(productId: ID, videoUrl: string) { 
+    // Add a new job to the queue and immediately return the
+    // job itself.
+    return jobQueue.add({ productId, videoUrl });
+  }
+}
+```
+The `ProductVideoService` is in charge of setting up the JobQueue and adding jobs to that queue.
+
+```TypeScript
+// product-video.plugin.ts
+import gql from 'graphql-tag';
+import { PluginCommonModule, VendurePlugin } from '@vendure/core';
+import { ProductVideoService } from './product-video.service'
+import { ProductVideoResolver } from './product-video.resolver'
+
+@VendurePlugin({
+  imports: [PluginCommonModule],
+  providers: [ProductVideoService],
+  adminApiExtensions: {
+    schema: gql`
+      extend type Mutation {
+        addVideoToProduct(productId: ID! videoUrl: String!): Job!
+      }
+    `,
+    resolvers: [ProductVideoResolver]
+  },
+  configure: config => {
+    config.customFields.Product.push({
+      name: 'videoUrl',
+      type: 'string',
+    });
+    return config;
+  }
+})
+export class ProductVideoPlugin {}
+```
+Finally, the `ProductVideoPlugin` brings it all together, extending the GraphQL API, defining the required CustomField to store the transcoded video URL, and registering our service and resolver. The [PluginCommonModule]({{< relref "plugin-common-module" >}}) is imported as it exports the JobQueueService.

+ 12 - 0
docs/diagrams/job-queue-req-res-with-queue.puml

@@ -0,0 +1,12 @@
+@startuml
+!include theme.puml
+hide footbox
+title Request-response with job queue
+Client -> "Vendure API": mutation `reindex`
+"Vendure API" -> JobQueue: add "reindex" job to queue
+JobQueue --> "Vendure API": job is pending
+"Vendure API" --> Client: reindex response
+...some time later...
+JobQueue -> Worker: Run reindex logic
+Worker --> JobQueue: Reindexing complete
+@enduml

+ 9 - 0
docs/diagrams/job-queue-req-res.puml

@@ -0,0 +1,9 @@
+@startuml
+!include theme.puml
+hide footbox
+title Regular request-response
+Client -> "Vendure API": query `product(ID: 1)`
+"Vendure API" -> DB #99FF99: Lookup data
+DB --> "Vendure API": Return data
+"Vendure API" --> Client: product query response
+@enduml

+ 9 - 3
packages/admin-ui/src/lib/core/src/common/generated-types.ts

@@ -583,7 +583,7 @@ export type CreateZoneInput = {
 /**
  * @description
  * ISO 4217 currency code
- *
+ * 
  * @docsCategory common
  */
 export enum CurrencyCode {
@@ -1322,6 +1322,12 @@ export type JobSortParameter = {
   duration?: Maybe<SortOrder>;
 };
 
+/**
+ * @description
+ * The state of a Job in the JobQueue
+ * 
+ * @docsCategory common
+ */
 export enum JobState {
   PENDING = 'PENDING',
   RUNNING = 'RUNNING',
@@ -1334,7 +1340,7 @@ export enum JobState {
 /**
  * @description
  * ISO 639-1 language code
- *
+ * 
  * @docsCategory common
  */
 export enum LanguageCode {
@@ -2526,7 +2532,7 @@ export type PaymentMethodSortParameter = {
  * @description
  * Permissions for administrators and customers. Used to control access to
  * GraphQL resolvers via the {@link Allow} decorator.
- *
+ * 
  * @docsCategory common
  */
 export enum Permission {

+ 6 - 0
packages/asset-server-plugin/e2e/graphql/generated-e2e-asset-server-plugin-types.ts

@@ -1318,6 +1318,12 @@ export type JobSortParameter = {
     duration?: Maybe<SortOrder>;
 };
 
+/**
+ * @description
+ * The state of a Job in the JobQueue
+ *
+ * @docsCategory common
+ */
 export enum JobState {
     PENDING = 'PENDING',
     RUNNING = 'RUNNING',

+ 6 - 0
packages/common/src/generated-types.ts

@@ -1314,6 +1314,12 @@ export type JobSortParameter = {
   duration?: Maybe<SortOrder>;
 };
 
+/**
+ * @description
+ * The state of a Job in the JobQueue
+ * 
+ * @docsCategory common
+ */
 export enum JobState {
   PENDING = 'PENDING',
   RUNNING = 'RUNNING',

+ 6 - 0
packages/core/e2e/graphql/generated-e2e-admin-types.ts

@@ -1318,6 +1318,12 @@ export type JobSortParameter = {
     duration?: Maybe<SortOrder>;
 };
 
+/**
+ * @description
+ * The state of a Job in the JobQueue
+ *
+ * @docsCategory common
+ */
 export enum JobState {
     PENDING = 'PENDING',
     RUNNING = 'RUNNING',

+ 6 - 0
packages/core/src/api/schema/admin-api/job.api.graphql

@@ -10,6 +10,12 @@ type Mutation {
     removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
 }
 
+"""
+@description
+The state of a Job in the JobQueue
+
+@docsCategory common
+"""
 enum JobState {
     PENDING
     RUNNING

+ 4 - 2
packages/core/src/config/vendure-config.ts

@@ -384,6 +384,8 @@ export interface WorkerOptions {
 /**
  * @description
  * Options related to the built-in job queue.
+ *
+ * @docsCategory JobQueue
  */
 export interface JobQueueOptions {
     /**
@@ -395,10 +397,10 @@ export interface JobQueueOptions {
     jobQueueStrategy?: JobQueueStrategy;
     /**
      * @description
-     * Defines the interval in ms used by the JobService to poll for new
+     * Defines the interval in ms used by the {@link JobQueueService} to poll for new
      * jobs in the queue to process.
      *
-     * @default 100
+     * @default 200
      */
     pollInterval?: number;
 }

+ 12 - 0
packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts

@@ -10,6 +10,18 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
  * plugin to an existing Vendure installation, you'll need to run a [database migration](/docs/developer-guide/migrations), since this
  * plugin will add a new "job_record" table to the database.
  *
+ * @example
+ * ```TypeScript
+ * import { DefaultJobQueuePlugin, VendureConfig } from '\@vendure/core';
+ *
+ * export const config: VendureConfig = {
+ *   // Add an instance of the plugin to the plugins array
+ *   plugins: [
+ *     DefaultJobQueuePlugin,
+ *   ],
+ * };
+ * ```
+ *
  * @docsCategory JobQueue
  */
 @VendurePlugin({

+ 11 - 11
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -41,9 +41,9 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
  *
  * @example
  * ```ts
- * import { DefaultSearchPlugin } from '\@vendure/core';
+ * import { DefaultSearchPlugin, VendureConfig } from '\@vendure/core';
  *
- * const config: VendureConfig = {
+ * export const config: VendureConfig = {
  *   // Add an instance of the plugin to the plugins array
  *   plugins: [
  *     DefaultSearchPlugin,
@@ -69,21 +69,21 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
     async onVendureBootstrap() {
         this.searchIndexService.initJobQueue();
 
-        this.eventBus.ofType(ProductEvent).subscribe((event) => {
+        this.eventBus.ofType(ProductEvent).subscribe(event => {
             if (event.type === 'deleted') {
                 return this.searchIndexService.deleteProduct(event.ctx, event.product);
             } else {
                 return this.searchIndexService.updateProduct(event.ctx, event.product);
             }
         });
-        this.eventBus.ofType(ProductVariantEvent).subscribe((event) => {
+        this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
             if (event.type === 'deleted') {
                 return this.searchIndexService.deleteVariant(event.ctx, event.variants);
             } else {
                 return this.searchIndexService.updateVariants(event.ctx, event.variants);
             }
         });
-        this.eventBus.ofType(AssetEvent).subscribe((event) => {
+        this.eventBus.ofType(AssetEvent).subscribe(event => {
             if (event.type === 'updated') {
                 return this.searchIndexService.updateAsset(event.ctx, event.asset);
             }
@@ -91,7 +91,7 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
                 return this.searchIndexService.deleteAsset(event.ctx, event.asset);
             }
         });
-        this.eventBus.ofType(ProductChannelEvent).subscribe((event) => {
+        this.eventBus.ofType(ProductChannelEvent).subscribe(event => {
             if (event.type === 'assigned') {
                 return this.searchIndexService.assignProductToChannel(
                     event.ctx,
@@ -112,18 +112,18 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
         collectionModification$
             .pipe(
                 buffer(closingNotifier$),
-                filter((events) => 0 < events.length),
-                map((events) => ({
+                filter(events => 0 < events.length),
+                map(events => ({
                     ctx: events[0].ctx,
                     ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
                 })),
-                filter((e) => 0 < e.ids.length),
+                filter(e => 0 < e.ids.length),
             )
-            .subscribe((events) => {
+            .subscribe(events => {
                 return this.searchIndexService.updateVariantsById(events.ctx, events.ids);
             });
 
-        this.eventBus.ofType(TaxRateModificationEvent).subscribe((event) => {
+        this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
                 return this.searchIndexService.reindex(event.ctx);

+ 5 - 4
packages/core/src/plugin/plugin-common.module.ts

@@ -14,10 +14,11 @@ import { WorkerServiceModule } from '../worker/worker-service.module';
  *
  * The PluginCommonModule exports:
  *
- * * EventBusModule, allowing the injection of the {@link EventBus} service.
- * * ServiceModule allowing the injection of any of the various entity services such as ProductService, OrderService etc.
- * * ConfigModule, allowing the injection of the ConfigService.
- * * WorkerServiceModule, allowing the injection of the {@link WorkerService}.
+ * * `EventBusModule`, allowing the injection of the {@link EventBus} service.
+ * * `ServiceModule` allowing the injection of any of the various entity services such as ProductService, OrderService etc.
+ * * `ConfigModule`, allowing the injection of the ConfigService.
+ * * `WorkerServiceModule`, allowing the injection of the {@link WorkerService}.
+ * * `JobQueueModule`, allowing the injection of the {@link JobQueueService}.
  *
  * @docsCategory plugin
  */

+ 6 - 0
packages/elasticsearch-plugin/e2e/graphql/generated-e2e-elasticsearch-plugin-types.ts

@@ -1318,6 +1318,12 @@ export type JobSortParameter = {
     duration?: Maybe<SortOrder>;
 };
 
+/**
+ * @description
+ * The state of a Job in the JobQueue
+ *
+ * @docsCategory common
+ */
 export enum JobState {
     PENDING = 'PENDING',
     RUNNING = 'RUNNING',

File diff suppressed because it is too large
+ 0 - 0
schema-admin.json


Some files were not shown because too many files changed in this diff