with-job-queue.ts 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import { Controller, Get, OnModuleInit } from '@nestjs/common';
  2. import { JobQueue, JobQueueService, Logger, PluginCommonModule, VendurePlugin } from '@vendure/core';
  3. import { Subject } from 'rxjs';
  4. import { take } from 'rxjs/operators';
  5. @Controller('run-job')
  6. class TestController implements OnModuleInit {
  7. private queue: JobQueue<{ returnValue?: string }>;
  8. constructor(private jobQueueService: JobQueueService) {}
  9. async onModuleInit(): Promise<void> {
  10. this.queue = await this.jobQueueService.createQueue({
  11. name: 'test',
  12. process: async job => {
  13. if (job.data.returnValue) {
  14. await new Promise(resolve => setTimeout(resolve, 50));
  15. return job.data.returnValue;
  16. } else {
  17. const interval = setInterval(() => {
  18. Logger.info(`Job is running...`);
  19. if (job.state === 'CANCELLED') {
  20. clearInterval(interval);
  21. PluginWithJobQueue.jobSubject.next();
  22. }
  23. }, 500);
  24. return PluginWithJobQueue.jobSubject
  25. .pipe(take(1))
  26. .toPromise()
  27. .then(() => {
  28. PluginWithJobQueue.jobHasDoneWork = true;
  29. clearInterval(interval);
  30. return 'job result';
  31. });
  32. }
  33. },
  34. });
  35. }
  36. @Get()
  37. async runJob() {
  38. await this.queue.add({});
  39. return true;
  40. }
  41. @Get('subscribe')
  42. async runJobAndSubscribe() {
  43. const job = await this.queue.add({ returnValue: '42!' });
  44. return job
  45. .updates()
  46. .toPromise()
  47. .then(update => update?.result);
  48. }
  49. @Get('subscribe-timeout')
  50. async runJobAndSubscribeTimeout() {
  51. const job = await this.queue.add({});
  52. const result = await job
  53. .updates({ timeoutMs: 100 })
  54. .toPromise()
  55. .then(update => update?.result);
  56. return result;
  57. }
  58. }
  59. @VendurePlugin({
  60. imports: [PluginCommonModule],
  61. controllers: [TestController],
  62. })
  63. export class PluginWithJobQueue {
  64. static jobHasDoneWork = false;
  65. static jobSubject = new Subject<void>();
  66. }