worker.service.ts 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
  2. import { ClientProxy } from '@nestjs/microservices';
  3. import { BehaviorSubject, Observable } from 'rxjs';
  4. import { filter, finalize, mergeMap, take } from 'rxjs/operators';
  5. import { VENDURE_WORKER_CLIENT } from './constants';
  6. import { WorkerMessage } from './types';
  7. /**
  8. * @description
  9. * This service is responsible for sending messages to the Worker process. See the {@link WorkerMessage}
  10. * docs for an example of usage.
  11. *
  12. * @docsCategory worker
  13. */
  14. @Injectable()
  15. export class WorkerService implements OnModuleDestroy {
  16. private pendingConnection = false;
  17. private initialConnection = new BehaviorSubject(false);
  18. constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy) {}
  19. /**
  20. * @description
  21. * Sends a {@link WorkerMessage} to the worker process, where there should be a Controller with a method
  22. * listening out for the message's pattern.
  23. */
  24. send<T, R>(message: WorkerMessage<T, R>): Observable<R> {
  25. // The rather convoluted logic here is required in order to prevent more than
  26. // one connection being opened in the event that the `send` method is called multiple
  27. // times in the same event loop tick.
  28. // On the first invokation, the first path is taken, which establishes the single
  29. // connection (implicit in the first call to ClientProxt.send()). All subsequent
  30. // invokations take the second code path.
  31. if (!this.pendingConnection && this.initialConnection.value === false) {
  32. this.pendingConnection = true;
  33. return this.client
  34. .send<R, T>((message.constructor as typeof WorkerMessage).pattern, message.data)
  35. .pipe(
  36. finalize(() => {
  37. this.initialConnection.next(true);
  38. this.pendingConnection = false;
  39. }),
  40. );
  41. } else {
  42. return this.initialConnection.pipe(
  43. filter(val => val),
  44. take(1),
  45. mergeMap(() => {
  46. return this.client.send<R, T>(
  47. (message.constructor as typeof WorkerMessage).pattern,
  48. message.data,
  49. );
  50. }),
  51. );
  52. }
  53. }
  54. /** @internal */
  55. onModuleDestroy() {
  56. this.client.close();
  57. }
  58. }