1
0

job-queue-test-plugin.ts 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import { Injectable, OnModuleInit } from '@nestjs/common';
  2. import { Args, Mutation, Resolver } from '@nestjs/graphql';
  3. import { JobState } from '@vendure/common/lib/generated-types';
  4. import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
  5. import { gql } from 'graphql-tag';
  6. import { forkJoin, Observable, of } from 'rxjs';
  7. import { catchError, map } from 'rxjs/operators';
  8. interface TaskConfigInput {
  9. intervalMs: number;
  10. shouldFail: boolean;
  11. retries: number;
  12. subscribeToResult: boolean;
  13. }
  14. let queueCount = 1;
  15. @Injectable()
  16. export class JobQueueTestService implements OnModuleInit {
  17. private queues: Array<JobQueue<{ intervalMs: number; shouldFail: boolean }>> = [];
  18. constructor(private jobQueueService: JobQueueService) {}
  19. async onModuleInit() {
  20. for (let i = 0; i < queueCount; i++) {
  21. const queue: JobQueue<{
  22. intervalMs: number;
  23. shouldFail: boolean;
  24. }> = await this.jobQueueService.createQueue({
  25. name: `test-queue-${i + 1}`,
  26. process: async job => {
  27. Logger.info(`Starting job ${job.id}, shouldFail: ${JSON.stringify(job.data.shouldFail)}`);
  28. let progress = 0;
  29. while (progress < 100) {
  30. // Logger.info(`Job ${job.id} progress: ${progress}`);
  31. await new Promise(resolve => setTimeout(resolve, job.data.intervalMs));
  32. progress += 10;
  33. job.setProgress(progress);
  34. if (progress > 70 && job.data.shouldFail) {
  35. Logger.warn(`Job ${job.id} will fail`);
  36. throw new Error(`Job failed!!`);
  37. }
  38. }
  39. Logger.info(`Completed job ${job.id}`);
  40. return 'Done!';
  41. },
  42. });
  43. this.queues.push(queue);
  44. }
  45. }
  46. async startTask(input: TaskConfigInput) {
  47. const { intervalMs, shouldFail, subscribeToResult, retries } = input;
  48. const updates: Array<Observable<number>> = [];
  49. for (const queue of this.queues) {
  50. const job = await queue.add({ intervalMs, shouldFail }, { retries });
  51. if (subscribeToResult) {
  52. updates.push(
  53. job.updates().pipe(
  54. map(update => {
  55. Logger.info(`Job ${update.id}: progress: ${update.progress}`);
  56. if (update.state === JobState.COMPLETED) {
  57. Logger.info(`COMPLETED: ${JSON.stringify(update.result, null, 2)}`);
  58. return update.result;
  59. }
  60. return update.progress;
  61. }),
  62. catchError(err => of(err.message)),
  63. ),
  64. );
  65. }
  66. }
  67. if (subscribeToResult) {
  68. return forkJoin(...updates);
  69. } else {
  70. return 'running in background';
  71. }
  72. }
  73. }
  74. @Resolver()
  75. export class JobQueueTestResolver {
  76. constructor(private service: JobQueueTestService) {}
  77. @Mutation()
  78. startTask(@Args() args: any) {
  79. return this.service.startTask(args.input);
  80. }
  81. }
  82. /**
  83. * A plugin which can be used to test job queue strategies. Exposes a mutation `startTask` in
  84. * the Admin API which triggers a job.
  85. */
  86. @VendurePlugin({
  87. imports: [PluginCommonModule],
  88. adminApiExtensions: {
  89. resolvers: [JobQueueTestResolver],
  90. schema: gql`
  91. input TaskConfigInput {
  92. intervalMs: Int!
  93. shouldFail: Boolean!
  94. retries: Int
  95. subscribeToResult: Boolean
  96. }
  97. extend type Mutation {
  98. startTask(input: TaskConfigInput!): JSON!
  99. }
  100. `,
  101. },
  102. providers: [JobQueueTestService],
  103. })
  104. export class JobQueueTestPlugin {
  105. static init(options: { queueCount: number }) {
  106. queueCount = options.queueCount;
  107. return this;
  108. }
  109. }