pub-sub-job-queue-strategy.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import { Message, PubSub, Subscription, Topic } from '@google-cloud/pubsub';
  2. import { JobState } from '@vendure/common/lib/generated-types';
  3. import {
  4. InjectableJobQueueStrategy,
  5. Injector,
  6. Job,
  7. JobData,
  8. JobQueueStrategy,
  9. Logger,
  10. QueueNameProcessStorage,
  11. } from '@vendure/core';
  12. import { loggerCtx, PUB_SUB_OPTIONS } from './constants';
  13. import { PubSubOptions } from './options';
  14. export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implements JobQueueStrategy {
  15. private concurrency: number;
  16. private queueNamePubSubPair: Map<string, [string, string]>;
  17. private pubSubClient: PubSub;
  18. private topics = new Map<string, Topic>();
  19. private subscriptions = new Map<string, Subscription>();
  20. private listeners = new QueueNameProcessStorage<(message: Message) => void>();
  21. init(injector: Injector) {
  22. this.pubSubClient = injector.get(PubSub);
  23. const options = injector.get<PubSubOptions>(PUB_SUB_OPTIONS);
  24. this.concurrency = options.concurrency ?? 20;
  25. this.queueNamePubSubPair = options.queueNamePubSubPair ?? new Map();
  26. super.init(injector);
  27. }
  28. destroy() {
  29. super.destroy();
  30. for (const subscription of this.subscriptions.values()) {
  31. subscription.removeAllListeners('message');
  32. }
  33. this.subscriptions.clear();
  34. this.topics.clear();
  35. }
  36. async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
  37. if (!this.hasInitialized) {
  38. throw new Error('Cannot add job before init');
  39. }
  40. const id = await this.topic(job.queueName).publish(Buffer.from(JSON.stringify(job.data)));
  41. Logger.debug(`Sent message ${job.queueName}: ${id}`);
  42. return new Job<Data>({
  43. id,
  44. queueName: job.queueName,
  45. data: job.data,
  46. attempts: 0,
  47. state: JobState.PENDING,
  48. createdAt: new Date(),
  49. });
  50. }
  51. async start<Data extends JobData<Data> = {}>(
  52. queueName: string,
  53. process: (job: Job<Data>) => Promise<any>,
  54. ) {
  55. if (!this.hasInitialized) {
  56. this.started.set(queueName, process);
  57. return;
  58. }
  59. if (this.listeners.has(queueName, process)) {
  60. return;
  61. }
  62. const subscription = this.subscription(queueName);
  63. const listener = (message: Message) => {
  64. Logger.debug(`Received message: ${queueName}: ${message.id}`, loggerCtx);
  65. const job = new Job<Data>({
  66. id: message.id,
  67. queueName,
  68. data: JSON.parse(message.data.toString()),
  69. attempts: message.deliveryAttempt,
  70. state: JobState.RUNNING,
  71. startedAt: new Date(),
  72. createdAt: message.publishTime,
  73. });
  74. process(job)
  75. .then(() => {
  76. message.ack();
  77. })
  78. .catch(err => {
  79. message.nack();
  80. });
  81. };
  82. this.listeners.set(queueName, process, listener);
  83. subscription.on('message', listener);
  84. }
  85. async stop<Data extends JobData<Data> = {}>(
  86. queueName: string,
  87. process: (job: Job<Data>) => Promise<any>,
  88. ) {
  89. const listener = this.listeners.getAndDelete(queueName, process);
  90. if (!listener) {
  91. return;
  92. }
  93. this.subscription(queueName).off('message', listener);
  94. }
  95. private topic(queueName: string): Topic {
  96. let topic = this.topics.get(queueName);
  97. if (topic) {
  98. return topic;
  99. }
  100. const pair = this.queueNamePubSubPair.get(queueName);
  101. if (!pair) {
  102. throw new Error(`Topic name not set for queue: ${queueName}`);
  103. }
  104. const [topicName, subscriptionName] = pair;
  105. topic = this.pubSubClient.topic(topicName);
  106. this.topics.set(queueName, topic);
  107. return topic;
  108. }
  109. private subscription(queueName: string): Subscription {
  110. let subscription = this.subscriptions.get(queueName);
  111. if (subscription) {
  112. return subscription;
  113. }
  114. const pair = this.queueNamePubSubPair.get(queueName);
  115. if (!pair) {
  116. throw new Error(`Subscription name not set for queue: ${queueName}`);
  117. }
  118. const [topicName, subscriptionName] = pair;
  119. subscription = this.topic(queueName).subscription(subscriptionName, {
  120. flowControl: {
  121. maxMessages: this.concurrency,
  122. },
  123. });
  124. this.subscriptions.set(queueName, subscription);
  125. return subscription;
  126. }
  127. }