index.mdx 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. ---
  2. title: 'Worker & Job Queue'
  3. sidebar_position: 5
  4. ---
  5. The Vendure Worker is a Node.js process responsible for running computationally intensive
  6. or otherwise long-running tasks in the background. For example, updating a
  7. search index or sending emails. Running such tasks in the background allows
  8. the server to stay responsive, since a response can be returned immediately
  9. without waiting for the slower tasks to complete.
  10. Put another way, the Worker executes **jobs** which have been placed in the **job queue**.
  11. ![Worker & Job Queue](./worker-job-queue.webp)
  12. ## The worker
  13. The worker is started by calling the [`bootstrapWorker()`](/reference/typescript-api/worker/bootstrap-worker/) function with the same
  14. configuration as is passed to the main server `bootstrap()`. In a standard Vendure installation, this is found
  15. in the `index-worker.ts` file:
  16. ```ts title="src/index-worker.ts"
  17. import { bootstrapWorker } from '@vendure/core';
  18. import { config } from './vendure-config';
  19. bootstrapWorker(config)
  20. .then(worker => worker.startJobQueue())
  21. .catch(err => {
  22. console.log(err);
  23. });
  24. ```
  25. ### Underlying architecture
  26. The Worker is a NestJS standalone application. This means it is almost identical to the main server app,
  27. but does not have any network layer listening for requests. The server communicates with the worker
  28. via a “job queue” architecture. The exact implementation of the job queue is dependent on the
  29. configured [`JobQueueStrategy`](/reference/typescript-api/job-queue/job-queue-strategy/), but by default
  30. the worker polls the database for new jobs.
  31. ### Multiple workers
  32. It is possible to run multiple workers in parallel to better handle heavy loads. Using the
  33. [`JobQueueOptions.activeQueues`](/reference/typescript-api/job-queue/job-queue-options#activequeues) configuration, it is even possible to have particular workers dedicated
  34. to one or more specific types of jobs. For example, if your application does video transcoding,
  35. you might want to set up a dedicated worker just for that task:
  36. ```ts title="src/transcoder-worker.ts"
  37. import { bootstrapWorker, mergeConfig } from '@vendure/core';
  38. import { config } from './vendure-config';
  39. const transcoderConfig = mergeConfig(config, {
  40. jobQueueOptions: {
  41. activeQueues: ['transcode-video'],
  42. }
  43. });
  44. bootstrapWorker(transcoderConfig)
  45. .then(worker => worker.startJobQueue())
  46. .catch(err => {
  47. console.log(err);
  48. });
  49. ```
  50. ### Running jobs on the main process
  51. It is possible to run jobs from the Job Queue on the main server. This is mainly used for testing
  52. and automated tasks, and is not advised for production use, since it negates the benefits of
  53. running long tasks off of the main process. To do so, you need to manually start the JobQueueService:
  54. ```ts title="src/index.ts"
  55. import { bootstrap, JobQueueService } from '@vendure/core';
  56. import { config } from './vendure-config';
  57. bootstrap(config)
  58. .then(app => app.get(JobQueueService).start())
  59. .catch(err => {
  60. console.log(err);
  61. process.exit(1);
  62. });
  63. ```
  64. ### ProcessContext
  65. Sometimes your code may need to be aware of whether it is being run as part of a server or worker process.
  66. In this case you can inject the [`ProcessContext`](/reference/typescript-api/common/process-context/) provider and query it like this:
  67. ```ts title="src/plugins/my-plugin/services/my.service.ts"
  68. import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
  69. import { ProcessContext } from '@vendure/core';
  70. @Injectable()
  71. export class MyService implements OnApplicationBootstrap {
  72. constructor(private processContext: ProcessContext) {}
  73. onApplicationBootstrap() {
  74. if (this.processContext.isServer) {
  75. // code which will only execute when running in
  76. // the server process
  77. }
  78. }
  79. }
  80. ```
  81. ## The job queue
  82. Vendure uses a [job queue](https://en.wikipedia.org/wiki/Job_queue) to handle the processing of certain tasks which are typically too slow to run in the
  83. normal request-response cycle. A normal request-response looks like this:
  84. ![Regular request response](./Vendure_docs-job-queue.webp)
  85. In the normal request-response, all intermediate tasks (looking up data in the database, performing business logic etc.)
  86. occur before the response can be returned. For most operations this is fine, since those intermediate tasks are very fast.
  87. Some operations however will need to perform much longer-running tasks. For example, updating the search index on
  88. thousands of products could take up to a minute or more. In this case, we certainly don’t want to delay the response
  89. until that processing has completed. That’s where a job queue comes in:
  90. ![Request response with job queue](./Vendure_docs-job-queue-2.webp)
  91. ### What does Vendure use the job queue for?
  92. By default, Vendure uses the job queue for the following tasks:
  93. - Re-building the search index
  94. - Updating the search index when changes are made to Products, ProductVariants, Assets etc.
  95. - Updating the contents of Collections
  96. - Sending transactional emails
  97. ### How does the Job Queue work?
  98. This diagram illustrates the job queue mechanism:
  99. ![Job queue sequence](./Vendure_docs-job-queue-3.webp)
  100. The server adds jobs to the queue. The worker then picks up these jobs from the queue and processes them in sequence,
  101. one by one (it is possible to increase job queue throughput by running multiple workers or by increasing the concurrency
  102. of a single worker).
  103. ### JobQueueStrategy
  104. The actual queue part is defined by the configured [`JobQueueStrategy`](/reference/typescript-api/job-queue/job-queue-strategy/).
  105. If no strategy is defined, Vendure uses an [in-memory store](/reference/typescript-api/job-queue/in-memory-job-queue-strategy/)
  106. of the contents of each queue. While this has the advantage
  107. of requiring no external dependencies, it is not suitable for production because when the server is stopped, the entire
  108. queue will be lost and any pending jobs will never be processed. Moreover, it cannot be used when running the worker
  109. as a separate process.
  110. A better alternative is to use the [DefaultJobQueuePlugin](/reference/typescript-api/job-queue/default-job-queue-plugin/)
  111. (which will be used in a standard `@vendure/create` installation), which configures Vendure to use the [SqlJobQueueStrategy](/reference/typescript-api/job-queue/sql-job-queue-strategy).
  112. This strategy uses the database as a queue, and means that even if the Vendure server stops, pending jobs will be persisted and upon re-start, they will be processed.
  113. It is also possible to implement your own JobQueueStrategy to take advantage of other technologies.
  114. Examples include RabbitMQ, Google Cloud Pub Sub & Amazon SQS. It may make sense to implement a custom strategy based on
  115. one of these if the default database-based approach does not meet your performance requirements.
  116. ### Job Queue Performance
  117. It is common for larger Vendure projects to define multiple custom job queues, When using the [DefaultJobQueuePlugin](/reference/typescript-api/job-queue/default-job-queue-plugin/)
  118. with many queues, performance may be impacted. This is because the `SqlJobQueueStrategy` uses polling to check for
  119. new jobs in the database. Each queue will (by default) query the database every 200ms. So if there are 10 queues,
  120. this will result in a constant 50 queries/second.
  121. In this case it is recommended to try the [BullMQJobQueuePlugin](/reference/core-plugins/job-queue-plugin/bull-mqjob-queue-plugin/),
  122. which uses an efficient push-based strategy built on Redis.
  123. ## Using Job Queues in a plugin
  124. If your plugin involves long-running tasks, you can also make use of the job queue.
  125. :::info
  126. A real example of this can be seen in the [EmailPlugin source](https://github.com/vendure-ecommerce/vendure/blob/master/packages/email-plugin/src/plugin.ts)
  127. :::
  128. Let's say you are building a plugin which allows a video URL to be specified, and then that video gets transcoded into a format suitable for streaming on the storefront. This is a long-running task which should not block the main thread, so we will use the job queue to run the task on the worker.
  129. First we'll add a new mutation to the Admin API schema:
  130. ```ts title="src/plugins/product-video/api/api-extensions.ts"
  131. import gql from 'graphql-tag';
  132. export const adminApiExtensions = gql`
  133. extend type Mutation {
  134. addVideoToProduct(productId: ID! videoUrl: String!): Job!
  135. }
  136. `;
  137. ```
  138. The resolver looks like this:
  139. ```ts title="src/plugins/product-video/api/product-video.resolver.ts"
  140. import { Args, Mutation, Resolver } from '@nestjs/graphql';
  141. import { Allow, Ctx, RequestContext, Permission, RequestContext } from '@vendure/core'
  142. import { ProductVideoService } from '../services/product-video.service';
  143. @Resolver()
  144. export class ProductVideoResolver {
  145. constructor(private productVideoService: ProductVideoService) {}
  146. @Mutation()
  147. @Allow(Permission.UpdateProduct)
  148. addVideoToProduct(@Ctx() ctx: RequestContext, @Args() args: { productId: ID; videoUrl: string; }) {
  149. return this.productVideoService.transcodeForProduct(
  150. args.productId,
  151. args.videoUrl,
  152. );
  153. }
  154. }
  155. ```
  156. The resolver just defines how to handle the new `addVideoToProduct` mutation, delegating the actual work to the `ProductVideoService`.
  157. ### Creating a job queue
  158. The [`JobQueueService`](/reference/typescript-api/job-queue/job-queue-service/) creates and manages job queues. The queue is created when the
  159. application starts up (see [NestJS lifecycle events](https://docs.nestjs.com/fundamentals/lifecycle-events)), and then we can use the `add()` method to add jobs to the queue.
  160. ```ts title="src/plugins/product-video/services/product-video.service.ts"
  161. import { Injectable, OnModuleInit } from '@nestjs/common';
  162. import { JobQueue, JobQueueService, ID, Product, TransactionalConnection } from '@vendure/core';
  163. import { transcode } from 'third-party-video-sdk';
  164. @Injectable()
  165. class ProductVideoService implements OnModuleInit {
  166. private jobQueue: JobQueue<{ productId: ID; videoUrl: string; }>;
  167. constructor(private jobQueueService: JobQueueService,
  168. private connection: TransactionalConnection) {
  169. }
  170. async onModuleInit() {
  171. this.jobQueue = await this.jobQueueService.createQueue({
  172. name: 'transcode-video',
  173. process: async job => {
  174. // Inside the `process` function we define how each job
  175. // in the queue will be processed.
  176. // In this case we call out to some imaginary 3rd-party video
  177. // transcoding API, which performs the work and then
  178. // returns a new URL of the transcoded video, which we can then
  179. // associate with the Product via the customFields.
  180. const result = await transcode(job.data.videoUrl);
  181. await this.connection.getRepository(Product).save({
  182. id: job.data.productId,
  183. customFields: {
  184. videoUrl: result.url,
  185. },
  186. });
  187. // The value returned from the `process` function is stored as the "result"
  188. // field of the job (for those JobQueueStrategies that support recording of results).
  189. //
  190. // Any error thrown from this function will cause the job to fail.
  191. return result;
  192. },
  193. });
  194. }
  195. transcodeForProduct(productId: ID, videoUrl: string) {
  196. // Add a new job to the queue and immediately return the
  197. // job itself.
  198. return this.jobQueue.add({productId, videoUrl}, {retries: 2});
  199. }
  200. }
  201. ```
  202. Notice the generic type parameter of the `JobQueue`:
  203. ```ts
  204. JobQueue<{ productId: ID; videoUrl: string; }>
  205. ```
  206. This means that when we call `jobQueue.add()` we must pass in an object of this type. This data will then be available in the `process` function as the `job.data` property.
  207. :::note
  208. The data passed to `jobQueue.add()` must be JSON-serializable, because it gets serialized into a string when stored in the job queue. Therefore you should
  209. avoid passing in complex objects such as `Date` instances, `Buffer`s, etc.
  210. :::
  211. The `ProductVideoService` is in charge of setting up the JobQueue and adding jobs to that queue. Calling
  212. ```ts
  213. productVideoService.transcodeForProduct(id, url);
  214. ```
  215. will add a transcoding job to the queue.
  216. :::tip
  217. Plugin code typically gets executed on both the server _and_ the worker. Therefore, you sometimes need to explicitly check
  218. what context you are in. This can be done with the [ProcessContext](/reference/typescript-api/common/process-context/) provider.
  219. :::
  220. Finally, the `ProductVideoPlugin` brings it all together, extending the GraphQL API, defining the required CustomField to store the transcoded video URL, and registering our service and resolver. The [PluginCommonModule](/reference/typescript-api/plugin/plugin-common-module/) is imported as it exports the `JobQueueService`.
  221. ```ts title="src/plugins/product-video/product-video.plugin.ts"
  222. import gql from 'graphql-tag';
  223. import { PluginCommonModule, VendurePlugin } from '@vendure/core';
  224. import { ProductVideoService } from './services/product-video.service';
  225. import { ProductVideoResolver } from './api/product-video.resolver';
  226. import { adminApiExtensions } from './api/api-extensions';
  227. @VendurePlugin({
  228. imports: [PluginCommonModule],
  229. providers: [ProductVideoService],
  230. adminApiExtensions: {
  231. schema: adminApiExtensions,
  232. resolvers: [ProductVideoResolver]
  233. },
  234. configuration: config => {
  235. config.customFields.Product.push({
  236. name: 'videoUrl',
  237. type: 'string',
  238. });
  239. return config;
  240. }
  241. })
  242. export class ProductVideoPlugin {}
  243. ```
  244. ### Subscribing to job updates
  245. When creating a new job via `JobQueue.add()`, it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.
  246. In the video transcoding example above, we could modify the `transcodeForProduct()` call to look like this:
  247. ```ts title="src/plugins/product-video/services/product-video.service.ts"
  248. import { Injectable, OnModuleInit } from '@nestjs/common';
  249. import { of } from 'rxjs';
  250. import { map, catchError } from 'rxjs/operators';
  251. import { ID, Product, TransactionalConnection } from '@vendure/core';
  252. @Injectable()
  253. class ProductVideoService implements OnModuleInit {
  254. // ... omitted (see above)
  255. transcodeForProduct(productId: ID, videoUrl: string) {
  256. const job = await this.jobQueue.add({productId, videoUrl}, {retries: 2});
  257. return job.updates().pipe(
  258. map(update => {
  259. // The returned Observable will emit a value for every update to the job
  260. // such as when the `progress` or `status` value changes.
  261. Logger.info(`Job ${update.id}: progress: ${update.progress}`);
  262. if (update.state === JobState.COMPLETED) {
  263. Logger.info(`COMPLETED ${update.id}: ${update.result}`);
  264. }
  265. return update.result;
  266. }),
  267. catchError(err => of(err.message)),
  268. );
  269. }
  270. }
  271. ```
  272. If you prefer to work with Promises rather than Rxjs Observables, you can also convert the updates to a promise:
  273. ```ts
  274. const job = await this.jobQueue.add({ productId, videoUrl }, { retries: 2 });
  275. return job.updates().toPromise()
  276. .then(/* ... */)
  277. .catch(/* ... */);
  278. ```