with-job-queue.ts 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import { Controller, Get, OnModuleInit } from '@nestjs/common';
  2. import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
  3. import { Subject } from 'rxjs';
  4. import { take } from 'rxjs/operators';
  5. @Controller('run-job')
  6. class TestController implements OnModuleInit {
  7. private queue: JobQueue<{ returnValue?: string }>;
  8. private progressQueue: JobQueue<{ duration: number }>;
  9. constructor(private jobQueueService: JobQueueService) {}
  10. async onModuleInit(): Promise<void> {
  11. this.queue = await this.jobQueueService.createQueue({
  12. name: 'test',
  13. process: async job => {
  14. if (job.data.returnValue) {
  15. await new Promise(resolve => setTimeout(resolve, 50));
  16. return job.data.returnValue;
  17. } else {
  18. const interval = setInterval(() => {
  19. Logger.info(`Job is running...`);
  20. if (job.state === 'CANCELLED') {
  21. clearInterval(interval);
  22. PluginWithJobQueue.jobSubject.next();
  23. }
  24. }, 500);
  25. return PluginWithJobQueue.jobSubject
  26. .pipe(take(1))
  27. .toPromise()
  28. .then(() => {
  29. PluginWithJobQueue.jobHasDoneWork = true;
  30. clearInterval(interval);
  31. return 'job result';
  32. });
  33. }
  34. },
  35. });
  36. // Queue for testing that updates() emits multiple times until job completion
  37. this.progressQueue = await this.jobQueueService.createQueue({
  38. name: 'test-progress',
  39. process: async job => {
  40. const duration = job.data.duration;
  41. const steps = 4;
  42. const stepDuration = duration / steps;
  43. for (let i = 1; i <= steps; i++) {
  44. await new Promise(resolve => setTimeout(resolve, stepDuration));
  45. job.setProgress(i * 25);
  46. }
  47. return 'completed';
  48. },
  49. });
  50. }
  51. @Get()
  52. async runJob() {
  53. await this.queue.add({});
  54. return true;
  55. }
  56. @Get('subscribe')
  57. async runJobAndSubscribe() {
  58. const job = await this.queue.add({ returnValue: '42!' });
  59. return job
  60. .updates()
  61. .toPromise()
  62. .then(update => update?.result);
  63. }
  64. @Get('subscribe-timeout')
  65. async runJobAndSubscribeTimeout() {
  66. const job = await this.queue.add({});
  67. const result = await job
  68. .updates({ timeoutMs: 100 })
  69. .toPromise()
  70. .then(update => update?.result);
  71. return result;
  72. }
  73. /**
  74. * This endpoint tests that job.updates() emits multiple times as the job progresses,
  75. * and continues until the job reaches a terminal state (COMPLETED).
  76. * See https://github.com/vendure-ecommerce/vendure/issues/4112
  77. */
  78. @Get('subscribe-all-updates')
  79. async runJobAndSubscribeAllUpdates() {
  80. const job = await this.progressQueue.add({ duration: 500 });
  81. const allUpdates: Array<{ state: string; progress: number; result: any }> = [];
  82. return new Promise(resolve => {
  83. job.updates({ pollInterval: 50, timeoutMs: 10000 }).subscribe({
  84. next: update => {
  85. allUpdates.push({
  86. state: update.state as string,
  87. progress: update.progress,
  88. result: update.result,
  89. });
  90. },
  91. error: err => {
  92. resolve(
  93. JSON.stringify({
  94. updateCount: allUpdates.length,
  95. states: allUpdates.map(u => u.state),
  96. finalState: allUpdates[allUpdates.length - 1]?.state,
  97. finalResult: allUpdates[allUpdates.length - 1]?.result,
  98. error: err.message,
  99. }),
  100. );
  101. },
  102. complete: () => {
  103. resolve(
  104. JSON.stringify({
  105. updateCount: allUpdates.length,
  106. states: allUpdates.map(u => u.state),
  107. finalState: allUpdates[allUpdates.length - 1]?.state,
  108. finalResult: allUpdates[allUpdates.length - 1]?.result,
  109. }),
  110. );
  111. },
  112. });
  113. });
  114. }
  115. }
  116. @VendurePlugin({
  117. imports: [PluginCommonModule],
  118. controllers: [TestController],
  119. })
  120. export class PluginWithJobQueue {
  121. static jobHasDoneWork = false;
  122. static jobSubject = new Subject<void>();
  123. }