with-job-queue.ts 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import { Controller, Get, OnModuleInit } from '@nestjs/common';
  2. import { JobQueue, JobQueueService, PluginCommonModule, VendurePlugin } from '@vendure/core';
  3. import { Subject } from 'rxjs';
  4. import { take } from 'rxjs/operators';
  5. @Controller('run-job')
  6. class TestController implements OnModuleInit {
  7. private queue: JobQueue<{ returnValue?: string }>;
  8. constructor(private jobQueueService: JobQueueService) {}
  9. async onModuleInit(): Promise<void> {
  10. this.queue = await this.jobQueueService.createQueue({
  11. name: 'test',
  12. process: async job => {
  13. if (job.data.returnValue) {
  14. await new Promise(resolve => setTimeout(resolve, 50));
  15. return job.data.returnValue;
  16. } else {
  17. return PluginWithJobQueue.jobSubject
  18. .pipe(take(1))
  19. .toPromise()
  20. .then(() => {
  21. PluginWithJobQueue.jobHasDoneWork = true;
  22. return job.data.returnValue;
  23. });
  24. }
  25. },
  26. });
  27. }
  28. @Get()
  29. async runJob() {
  30. await this.queue.add({});
  31. return true;
  32. }
  33. @Get('subscribe')
  34. async runJobAndSubscribe() {
  35. const job = await this.queue.add({ returnValue: '42!' });
  36. return job
  37. .updates()
  38. .toPromise()
  39. .then(update => update?.result);
  40. }
  41. }
  42. @VendurePlugin({
  43. imports: [PluginCommonModule],
  44. controllers: [TestController],
  45. })
  46. export class PluginWithJobQueue {
  47. static jobHasDoneWork = false;
  48. static jobSubject = new Subject<void>();
  49. }