job-queue-test-plugin.ts 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import { Injectable, OnModuleInit } from '@nestjs/common';
  2. import { Args, Mutation, Resolver } from '@nestjs/graphql';
  3. import { JobState } from '@vendure/common/lib/generated-types';
  4. import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
  5. import { gql } from 'apollo-server-core';
  6. import { of } from 'rxjs';
  7. import { catchError, map, tap } from 'rxjs/operators';
  8. @Injectable()
  9. export class JobQueueTestService implements OnModuleInit {
  10. private myQueue: JobQueue<{ intervalMs: number; shouldFail: boolean }>;
  11. constructor(private jobQueueService: JobQueueService) {}
  12. async onModuleInit() {
  13. this.myQueue = await this.jobQueueService.createQueue({
  14. name: 'my-queue',
  15. process: async job => {
  16. Logger.info(`Starting job ${job.id}, shouldFail: ${JSON.stringify(job.data.shouldFail)}`);
  17. let progress = 0;
  18. while (progress < 100) {
  19. // Logger.info(`Job ${job.id} progress: ${progress}`);
  20. await new Promise(resolve => setTimeout(resolve, job.data.intervalMs));
  21. progress += 10;
  22. job.setProgress(progress);
  23. if (progress > 70 && job.data.shouldFail) {
  24. Logger.warn(`Job ${job.id} will fail`);
  25. throw new Error(`Job failed!!`);
  26. }
  27. }
  28. Logger.info(`Completed job ${job.id}`);
  29. return 'Done!';
  30. },
  31. });
  32. }
  33. async startTask(intervalMs: number, shouldFail: boolean, subscribeToResult: boolean) {
  34. const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 0 });
  35. if (subscribeToResult) {
  36. return job.updates().pipe(
  37. map(update => {
  38. Logger.info(`Job ${update.id}: progress: ${update.progress}`);
  39. if (update.state === JobState.COMPLETED) {
  40. Logger.info(`COMPLETED: ${JSON.stringify(update.result, null, 2)}`);
  41. return update.result;
  42. }
  43. return update.progress;
  44. }),
  45. catchError(err => of(err.message)),
  46. );
  47. } else {
  48. return 'running in background';
  49. }
  50. }
  51. }
  52. @Resolver()
  53. export class JobQueueTestResolver {
  54. constructor(private service: JobQueueTestService) {}
  55. @Mutation()
  56. startTask(@Args() args: any) {
  57. return this.service.startTask(args.intervalMs, args.shouldFail, args.subscribeToResult);
  58. }
  59. }
  60. /**
  61. * A plugin which can be used to test job queue strategies. Exposes a mutation `startTask` in
  62. * the Admin API which triggers a job.
  63. */
  64. @VendurePlugin({
  65. imports: [PluginCommonModule],
  66. adminApiExtensions: {
  67. resolvers: [JobQueueTestResolver],
  68. schema: gql`
  69. extend type Mutation {
  70. startTask(intervalMs: Int, shouldFail: Boolean!, subscribeToResult: Boolean!): JSON!
  71. }
  72. `,
  73. },
  74. providers: [JobQueueTestService],
  75. })
  76. export class JobQueueTestPlugin {}