job-queue-test-plugin.ts 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import { Injectable, OnModuleInit } from '@nestjs/common';
  2. import { Args, Mutation, Resolver } from '@nestjs/graphql';
  3. import { JobState } from '@vendure/common/lib/generated-types';
  4. import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
  5. import { gql } from 'apollo-server-core';
  6. import { of } from 'rxjs';
  7. import { catchError, map, tap } from 'rxjs/operators';
  8. interface TaskConfigInput {
  9. intervalMs: number;
  10. shouldFail: boolean;
  11. retries: number;
  12. subscribeToResult: boolean;
  13. }
  14. @Injectable()
  15. export class JobQueueTestService implements OnModuleInit {
  16. private myQueue: JobQueue<{ intervalMs: number; shouldFail: boolean }>;
  17. constructor(private jobQueueService: JobQueueService) {}
  18. async onModuleInit() {
  19. this.myQueue = await this.jobQueueService.createQueue({
  20. name: 'my-queue',
  21. process: async job => {
  22. Logger.info(`Starting job ${job.id}, shouldFail: ${JSON.stringify(job.data.shouldFail)}`);
  23. let progress = 0;
  24. while (progress < 100) {
  25. // Logger.info(`Job ${job.id} progress: ${progress}`);
  26. await new Promise(resolve => setTimeout(resolve, job.data.intervalMs));
  27. progress += 10;
  28. job.setProgress(progress);
  29. if (progress > 70 && job.data.shouldFail) {
  30. Logger.warn(`Job ${job.id} will fail`);
  31. throw new Error(`Job failed!!`);
  32. }
  33. }
  34. Logger.info(`Completed job ${job.id}`);
  35. return 'Done!';
  36. },
  37. });
  38. }
  39. async startTask(input: TaskConfigInput) {
  40. const { intervalMs, shouldFail, subscribeToResult, retries } = input;
  41. const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries });
  42. if (subscribeToResult) {
  43. return job.updates().pipe(
  44. map(update => {
  45. Logger.info(`Job ${update.id}: progress: ${update.progress}`);
  46. if (update.state === JobState.COMPLETED) {
  47. Logger.info(`COMPLETED: ${JSON.stringify(update.result, null, 2)}`);
  48. return update.result;
  49. }
  50. return update.progress;
  51. }),
  52. catchError(err => of(err.message)),
  53. );
  54. } else {
  55. return 'running in background';
  56. }
  57. }
  58. }
  59. @Resolver()
  60. export class JobQueueTestResolver {
  61. constructor(private service: JobQueueTestService) {}
  62. @Mutation()
  63. startTask(@Args() args: any) {
  64. return this.service.startTask(args.input);
  65. }
  66. }
  67. /**
  68. * A plugin which can be used to test job queue strategies. Exposes a mutation `startTask` in
  69. * the Admin API which triggers a job.
  70. */
  71. @VendurePlugin({
  72. imports: [PluginCommonModule],
  73. adminApiExtensions: {
  74. resolvers: [JobQueueTestResolver],
  75. schema: gql`
  76. input TaskConfigInput {
  77. intervalMs: Int!
  78. shouldFail: Boolean!
  79. retries: Int
  80. subscribeToResult: Boolean
  81. }
  82. extend type Mutation {
  83. startTask(input: TaskConfigInput!): JSON!
  84. }
  85. `,
  86. },
  87. providers: [JobQueueTestService],
  88. })
  89. export class JobQueueTestPlugin {}