pub-sub-job-queue-strategy.ts 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import { Message, PubSub, Subscription, Topic } from '@google-cloud/pubsub';
  2. import { JobState } from '@vendure/common/lib/generated-types';
  3. import {
  4. InjectableJobQueueStrategy,
  5. Injector,
  6. Job,
  7. JobData,
  8. JobQueueStrategy,
  9. Logger,
  10. QueueNameProcessStorage,
  11. } from '@vendure/core';
  12. import { loggerCtx, PUB_SUB_OPTIONS } from './constants';
  13. import { PubSubOptions } from './options';
  14. export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implements JobQueueStrategy {
  15. private concurrency: number;
  16. private queueNamePubSubPair: Map<string, [string, string]>;
  17. private pubSubClient: PubSub;
  18. private topics = new Map<string, Topic>();
  19. private subscriptions = new Map<string, Subscription>();
  20. private listeners = new QueueNameProcessStorage<(message: Message) => void>();
  21. init(injector: Injector) {
  22. this.pubSubClient = injector.get(PubSub);
  23. const options = injector.get<PubSubOptions>(PUB_SUB_OPTIONS);
  24. this.concurrency = options.concurrency ?? 20;
  25. this.queueNamePubSubPair = options.queueNamePubSubPair ?? new Map();
  26. super.init(injector);
  27. }
  28. destroy() {
  29. super.destroy();
  30. for (const subscription of this.subscriptions.values()) {
  31. subscription.removeAllListeners('message');
  32. }
  33. this.subscriptions.clear();
  34. this.topics.clear();
  35. }
  36. async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
  37. if (!this.hasInitialized) {
  38. throw new Error('Cannot add job before init');
  39. }
  40. const id = await this.topic(job.queueName).publish(Buffer.from(JSON.stringify(job.data)));
  41. Logger.debug(`Sent message ${job.queueName}: ${id}`);
  42. return new Job<Data>({
  43. id,
  44. queueName: job.queueName,
  45. data: job.data,
  46. attempts: 0,
  47. state: JobState.PENDING,
  48. createdAt: new Date(),
  49. });
  50. }
  51. async start<Data extends JobData<Data> = object>(
  52. queueName: string,
  53. process: (job: Job<Data>) => Promise<any>,
  54. ) {
  55. if (!this.hasInitialized) {
  56. this.started.set(queueName, process);
  57. return;
  58. }
  59. if (this.listeners.has(queueName, process)) {
  60. return;
  61. }
  62. const subscription = this.subscription(queueName);
  63. const processMessage = async (message: Message) => {
  64. Logger.verbose(`Received message: ${queueName}: ${message.id}`, loggerCtx);
  65. const job = new Job<Data>({
  66. id: message.id,
  67. queueName,
  68. data: JSON.parse(message.data.toString()),
  69. attempts: message.deliveryAttempt,
  70. state: JobState.RUNNING,
  71. startedAt: new Date(),
  72. createdAt: message.publishTime,
  73. });
  74. await process(job);
  75. };
  76. const listener = (message: Message) => {
  77. processMessage(message)
  78. .then(() => {
  79. message.ack();
  80. Logger.verbose(`Finished handling: ${queueName}: ${message.id}`, loggerCtx);
  81. })
  82. .catch(err => {
  83. message.nack();
  84. Logger.error(
  85. `Error handling: ${queueName}: ${message.id}: ${String(err.message)}`,
  86. loggerCtx,
  87. );
  88. });
  89. };
  90. this.listeners.set(queueName, process, listener);
  91. subscription.on('message', listener);
  92. }
  93. async stop<Data extends JobData<Data> = object>(
  94. queueName: string,
  95. process: (job: Job<Data>) => Promise<any>,
  96. ) {
  97. const listener = this.listeners.getAndDelete(queueName, process);
  98. if (!listener) {
  99. return;
  100. }
  101. this.subscription(queueName).off('message', listener);
  102. }
  103. private topic(queueName: string): Topic {
  104. let topic = this.topics.get(queueName);
  105. if (topic) {
  106. return topic;
  107. }
  108. const pair = this.queueNamePubSubPair.get(queueName);
  109. if (!pair) {
  110. throw new Error(`Topic name not set for queue: ${queueName}`);
  111. }
  112. const [topicName, subscriptionName] = pair;
  113. topic = this.pubSubClient.topic(topicName);
  114. this.topics.set(queueName, topic);
  115. return topic;
  116. }
  117. private subscription(queueName: string): Subscription {
  118. let subscription = this.subscriptions.get(queueName);
  119. if (subscription) {
  120. return subscription;
  121. }
  122. const pair = this.queueNamePubSubPair.get(queueName);
  123. if (!pair) {
  124. throw new Error(`Subscription name not set for queue: ${queueName}`);
  125. }
  126. const [topicName, subscriptionName] = pair;
  127. subscription = this.topic(queueName).subscription(subscriptionName, {
  128. flowControl: {
  129. maxMessages: this.concurrency,
  130. },
  131. });
  132. this.subscriptions.set(queueName, subscription);
  133. return subscription;
  134. }
  135. }