| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- ---
- title: 'Worker & Job Queue'
- sidebar_position: 5
- ---
- The Vendure Worker is a Node.js process responsible for running computationally intensive
- or otherwise long-running tasks in the background. For example, updating a
- search index or sending emails. Running such tasks in the background allows
- the server to stay responsive, since a response can be returned immediately
- without waiting for the slower tasks to complete.
- Put another way, the Worker executes **jobs** which have been placed in the **job queue**.
- 
- ## The worker
- The worker is started by calling the [`bootstrapWorker()`](/reference/typescript-api/worker/bootstrap-worker/) function with the same
- configuration as is passed to the main server `bootstrap()`. In a standard Vendure installation, this is found
- in the `index-worker.ts` file:
- ```ts title="src/index-worker.ts"
- import { bootstrapWorker } from '@vendure/core';
- import { config } from './vendure-config';
- bootstrapWorker(config)
- .then(worker => worker.startJobQueue())
- .catch(err => {
- console.log(err);
- });
- ```
- ### Underlying architecture
- The Worker is a NestJS standalone application. This means it is almost identical to the main server app,
- but does not have any network layer listening for requests. The server communicates with the worker
- via a “job queue” architecture. The exact implementation of the job queue is dependent on the
- configured [`JobQueueStrategy`](/reference/typescript-api/job-queue/job-queue-strategy/), but by default
- the worker polls the database for new jobs.
- ### Multiple workers
- It is possible to run multiple workers in parallel to better handle heavy loads. Using the
- [`JobQueueOptions.activeQueues`](/reference/typescript-api/job-queue/job-queue-options#activequeues) configuration, it is even possible to have particular workers dedicated
- to one or more specific types of jobs. For example, if your application does video transcoding,
- you might want to set up a dedicated worker just for that task:
- ```ts title="src/transcoder-worker.ts"
- import { bootstrapWorker, mergeConfig } from '@vendure/core';
- import { config } from './vendure-config';
- const transcoderConfig = mergeConfig(config, {
- jobQueueOptions: {
- activeQueues: ['transcode-video'],
- }
- });
- bootstrapWorker(transcoderConfig)
- .then(worker => worker.startJobQueue())
- .catch(err => {
- console.log(err);
- });
- ```
- ### Running jobs on the main process
- It is possible to run jobs from the Job Queue on the main server. This is mainly used for testing
- and automated tasks, and is not advised for production use, since it negates the benefits of
- running long tasks off of the main process. To do so, you need to manually start the JobQueueService:
- ```ts title="src/index.ts"
- import { bootstrap, JobQueueService } from '@vendure/core';
- import { config } from './vendure-config';
- bootstrap(config)
- .then(app => app.get(JobQueueService).start())
- .catch(err => {
- console.log(err);
- process.exit(1);
- });
- ```
- ### ProcessContext
- Sometimes your code may need to be aware of whether it is being run as part of a server or worker process.
- In this case you can inject the [`ProcessContext`](/reference/typescript-api/common/process-context/) provider and query it like this:
- ```ts title="src/plugins/my-plugin/services/my.service.ts"
- import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
- import { ProcessContext } from '@vendure/core';
- @Injectable()
- export class MyService implements OnApplicationBootstrap {
- constructor(private processContext: ProcessContext) {}
- onApplicationBootstrap() {
- if (this.processContext.isServer) {
- // code which will only execute when running in
- // the server process
- }
- }
- }
- ```
- ## The job queue
- Vendure uses a [job queue](https://en.wikipedia.org/wiki/Job_queue) to handle the processing of certain tasks which are typically too slow to run in the
- normal request-response cycle. A normal request-response looks like this:
- 
- In the normal request-response, all intermediate tasks (looking up data in the database, performing business logic etc.)
- occur before the response can be returned. For most operations this is fine, since those intermediate tasks are very fast.
- Some operations however will need to perform much longer-running tasks. For example, updating the search index on
- thousands of products could take up to a minute or more. In this case, we certainly don’t want to delay the response
- until that processing has completed. That’s where a job queue comes in:
- 
- ### What does Vendure use the job queue for?
- By default, Vendure uses the job queue for the following tasks:
- - Re-building the search index
- - Updating the search index when changes are made to Products, ProductVariants, Assets etc.
- - Updating the contents of Collections
- - Sending transactional emails
- ### How does the Job Queue work?
- This diagram illustrates the job queue mechanism:
- 
- The server adds jobs to the queue. The worker then picks up these jobs from the queue and processes them in sequence,
- one by one (it is possible to increase job queue throughput by running multiple workers or by increasing the concurrency
- of a single worker).
- ### JobQueueStrategy
- The actual queue part is defined by the configured [`JobQueueStrategy`](/reference/typescript-api/job-queue/job-queue-strategy/).
- If no strategy is defined, Vendure uses an [in-memory store](/reference/typescript-api/job-queue/in-memory-job-queue-strategy/)
- of the contents of each queue. While this has the advantage
- of requiring no external dependencies, it is not suitable for production because when the server is stopped, the entire
- queue will be lost and any pending jobs will never be processed. Moreover, it cannot be used when running the worker
- as a separate process.
- A better alternative is to use the [DefaultJobQueuePlugin](/reference/typescript-api/job-queue/default-job-queue-plugin/)
- (which will be used in a standard `@vendure/create` installation), which configures Vendure to use the [SqlJobQueueStrategy](/reference/typescript-api/job-queue/sql-job-queue-strategy).
- This strategy uses the database as a queue, and means that even if the Vendure server stops, pending jobs will be persisted and upon re-start, they will be processed.
- It is also possible to implement your own JobQueueStrategy to take advantage of other technologies.
- Examples include RabbitMQ, Google Cloud Pub Sub & Amazon SQS. It may make sense to implement a custom strategy based on
- one of these if the default database-based approach does not meet your performance requirements.
- ### Job Queue Performance
- It is common for larger Vendure projects to define multiple custom job queues, When using the [DefaultJobQueuePlugin](/reference/typescript-api/job-queue/default-job-queue-plugin/)
- with many queues, performance may be impacted. This is because the `SqlJobQueueStrategy` uses polling to check for
- new jobs in the database. Each queue will (by default) query the database every 200ms. So if there are 10 queues,
- this will result in a constant 50 queries/second.
- In this case it is recommended to try the [BullMQJobQueuePlugin](/reference/core-plugins/job-queue-plugin/bull-mqjob-queue-plugin/),
- which uses an efficient push-based strategy built on Redis.
- ## Using Job Queues in a plugin
- If your plugin involves long-running tasks, you can also make use of the job queue.
- :::info
- A real example of this can be seen in the [EmailPlugin source](https://github.com/vendure-ecommerce/vendure/blob/master/packages/email-plugin/src/plugin.ts)
- :::
- Let's say you are building a plugin which allows a video URL to be specified, and then that video gets transcoded into a format suitable for streaming on the storefront. This is a long-running task which should not block the main thread, so we will use the job queue to run the task on the worker.
- First we'll add a new mutation to the Admin API schema:
- ```ts title="src/plugins/product-video/api/api-extensions.ts"
- import gql from 'graphql-tag';
- export const adminApiExtensions = gql`
- extend type Mutation {
- addVideoToProduct(productId: ID! videoUrl: String!): Job!
- }
- `;
- ```
- The resolver looks like this:
- ```ts title="src/plugins/product-video/api/product-video.resolver.ts"
- import { Args, Mutation, Resolver } from '@nestjs/graphql';
- import { Allow, Ctx, RequestContext, Permission, RequestContext } from '@vendure/core'
- import { ProductVideoService } from '../services/product-video.service';
- @Resolver()
- export class ProductVideoResolver {
- constructor(private productVideoService: ProductVideoService) {}
- @Mutation()
- @Allow(Permission.UpdateProduct)
- addVideoToProduct(@Ctx() ctx: RequestContext, @Args() args: { productId: ID; videoUrl: string; }) {
- return this.productVideoService.transcodeForProduct(
- args.productId,
- args.videoUrl,
- );
- }
- }
- ```
- The resolver just defines how to handle the new `addVideoToProduct` mutation, delegating the actual work to the `ProductVideoService`.
- ### Creating a job queue
- The [`JobQueueService`](/reference/typescript-api/job-queue/job-queue-service/) creates and manages job queues. The queue is created when the
- application starts up (see [NestJS lifecycle events](https://docs.nestjs.com/fundamentals/lifecycle-events)), and then we can use the `add()` method to add jobs to the queue.
- ```ts title="src/plugins/product-video/services/product-video.service.ts"
- import { Injectable, OnModuleInit } from '@nestjs/common';
- import { JobQueue, JobQueueService, ID, Product, TransactionalConnection } from '@vendure/core';
- import { transcode } from 'third-party-video-sdk';
- @Injectable()
- class ProductVideoService implements OnModuleInit {
- private jobQueue: JobQueue<{ productId: ID; videoUrl: string; }>;
- constructor(private jobQueueService: JobQueueService,
- private connection: TransactionalConnection) {
- }
- async onModuleInit() {
- this.jobQueue = await this.jobQueueService.createQueue({
- name: 'transcode-video',
- process: async job => {
- // Inside the `process` function we define how each job
- // in the queue will be processed.
- // In this case we call out to some imaginary 3rd-party video
- // transcoding API, which performs the work and then
- // returns a new URL of the transcoded video, which we can then
- // associate with the Product via the customFields.
- const result = await transcode(job.data.videoUrl);
- await this.connection.getRepository(Product).save({
- id: job.data.productId,
- customFields: {
- videoUrl: result.url,
- },
- });
- // The value returned from the `process` function is stored as the "result"
- // field of the job (for those JobQueueStrategies that support recording of results).
- //
- // Any error thrown from this function will cause the job to fail.
- return result;
- },
- });
- }
- transcodeForProduct(productId: ID, videoUrl: string) {
- // Add a new job to the queue and immediately return the
- // job itself.
- return this.jobQueue.add({productId, videoUrl}, {retries: 2});
- }
- }
- ```
- Notice the generic type parameter of the `JobQueue`:
- ```ts
- JobQueue<{ productId: ID; videoUrl: string; }>
- ```
- This means that when we call `jobQueue.add()` we must pass in an object of this type. This data will then be available in the `process` function as the `job.data` property.
- :::note
- The data passed to `jobQueue.add()` must be JSON-serializable, because it gets serialized into a string when stored in the job queue. Therefore you should
- avoid passing in complex objects such as `Date` instances, `Buffer`s, etc.
- :::
- The `ProductVideoService` is in charge of setting up the JobQueue and adding jobs to that queue. Calling
- ```ts
- productVideoService.transcodeForProduct(id, url);
- ```
- will add a transcoding job to the queue.
- :::tip
- Plugin code typically gets executed on both the server _and_ the worker. Therefore, you sometimes need to explicitly check
- what context you are in. This can be done with the [ProcessContext](/reference/typescript-api/common/process-context/) provider.
- :::
- Finally, the `ProductVideoPlugin` brings it all together, extending the GraphQL API, defining the required CustomField to store the transcoded video URL, and registering our service and resolver. The [PluginCommonModule](/reference/typescript-api/plugin/plugin-common-module/) is imported as it exports the `JobQueueService`.
- ```ts title="src/plugins/product-video/product-video.plugin.ts"
- import gql from 'graphql-tag';
- import { PluginCommonModule, VendurePlugin } from '@vendure/core';
- import { ProductVideoService } from './services/product-video.service';
- import { ProductVideoResolver } from './api/product-video.resolver';
- import { adminApiExtensions } from './api/api-extensions';
- @VendurePlugin({
- imports: [PluginCommonModule],
- providers: [ProductVideoService],
- adminApiExtensions: {
- schema: adminApiExtensions,
- resolvers: [ProductVideoResolver]
- },
- configuration: config => {
- config.customFields.Product.push({
- name: 'videoUrl',
- type: 'string',
- });
- return config;
- }
- })
- export class ProductVideoPlugin {}
- ```
- ### Subscribing to job updates
- When creating a new job via `JobQueue.add()`, it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.
- In the video transcoding example above, we could modify the `transcodeForProduct()` call to look like this:
- ```ts title="src/plugins/product-video/services/product-video.service.ts"
- import { Injectable, OnModuleInit } from '@nestjs/common';
- import { of } from 'rxjs';
- import { map, catchError } from 'rxjs/operators';
- import { ID, Product, TransactionalConnection } from '@vendure/core';
- @Injectable()
- class ProductVideoService implements OnModuleInit {
- // ... omitted (see above)
- transcodeForProduct(productId: ID, videoUrl: string) {
- const job = await this.jobQueue.add({productId, videoUrl}, {retries: 2});
- return job.updates().pipe(
- map(update => {
- // The returned Observable will emit a value for every update to the job
- // such as when the `progress` or `status` value changes.
- Logger.info(`Job ${update.id}: progress: ${update.progress}`);
- if (update.state === JobState.COMPLETED) {
- Logger.info(`COMPLETED ${update.id}: ${update.result}`);
- }
- return update.result;
- }),
- catchError(err => of(err.message)),
- );
- }
- }
- ```
- If you prefer to work with Promises rather than Rxjs Observables, you can also convert the updates to a promise:
- ```ts
- const job = await this.jobQueue.add({ productId, videoUrl }, { retries: 2 });
- return job.updates().toPromise()
- .then(/* ... */)
- .catch(/* ... */);
- ```
|