event-bus.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import { Injectable, OnModuleDestroy } from '@nestjs/common';
  2. import { Type } from '@vendure/common/lib/shared-types';
  3. import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
  4. import { Observable, Subject } from 'rxjs';
  5. import { filter, mergeMap, takeUntil } from 'rxjs/operators';
  6. import { EntityManager } from 'typeorm';
  7. import { RequestContext } from '../api/common/request-context';
  8. import { TRANSACTION_MANAGER_KEY } from '../common/constants';
  9. import { Instrument } from '../common/instrument-decorator';
  10. import { Logger } from '../config/logger/vendure-logger';
  11. import { TransactionSubscriber, TransactionSubscriberError } from '../connection/transaction-subscriber';
  12. import { VendureEvent } from './vendure-event';
  13. /**
  14. * @description
  15. * Options for registering a blocking event handler.
  16. *
  17. * @since 2.2.0
  18. * @docsCategory events
  19. */
  20. export type BlockingEventHandlerOptions<T extends VendureEvent> = {
  21. /**
  22. * @description
  23. * The event type to which the handler should listen.
  24. * Can be a single event type or an array of event types.
  25. */
  26. event: Type<T> | Array<Type<T>>;
  27. /**
  28. * @description
  29. * The handler function which will be executed when the event is published.
  30. * If the handler returns a Promise, the event publishing code will wait for the Promise to resolve
  31. * before continuing. Any errors thrown by the handler will cause the event publishing code to fail.
  32. */
  33. handler: (event: T) => void | Promise<void>;
  34. /**
  35. * @description
  36. * A unique identifier for the handler. This can then be used to specify the order in which
  37. * handlers should be executed using the `before` and `after` options in other handlers.
  38. */
  39. id: string;
  40. /**
  41. * @description
  42. * The ID of another handler which this handler should execute before.
  43. */
  44. before?: string;
  45. /**
  46. * @description
  47. * The ID of another handler which this handler should execute after.
  48. */
  49. after?: string;
  50. };
  51. /**
  52. * @description
  53. * The EventBus is used to globally publish events which can then be subscribed to.
  54. *
  55. * Events are published whenever certain actions take place within the Vendure server, for example:
  56. *
  57. * * when a Product is updated ({@link ProductEvent})
  58. * * when an Order transitions state ({@link OrderStateTransitionEvent})
  59. * * when a Customer registers a new account ({@link AccountRegistrationEvent})
  60. *
  61. * Using the EventBus it is possible to subscribe to an take action when these events occur.
  62. * This is done with the `.ofType()` method, which takes an event type and returns an rxjs observable
  63. * stream of events:
  64. *
  65. * @example
  66. * ```ts
  67. * import { OnApplicationBootstrap } from '\@nestjs/common';
  68. * import { EventBus, PluginCommonModule, VendurePlugin } from '\@vendure/core';
  69. * import { filter } from 'rxjs/operators';
  70. *
  71. * \@VendurePlugin({
  72. * imports: [PluginCommonModule]
  73. * })
  74. * export class MyPlugin implements OnApplicationBootstrap {
  75. *
  76. * constructor(private eventBus: EventBus) {}
  77. *
  78. * async onApplicationBootstrap() {
  79. *
  80. * this.eventBus
  81. * .ofType(OrderStateTransitionEvent)
  82. * .pipe(
  83. * filter(event => event.toState === 'PaymentSettled'),
  84. * )
  85. * .subscribe((event) => {
  86. * // do some action when this event fires
  87. * });
  88. * }
  89. * }
  90. * ```
  91. *
  92. * @docsCategory events
  93. * */
  94. @Injectable()
  95. @Instrument()
  96. export class EventBus implements OnModuleDestroy {
  97. private eventStream = new Subject<VendureEvent>();
  98. private destroy$ = new Subject<void>();
  99. private blockingEventHandlers = new Map<Type<VendureEvent>, Array<BlockingEventHandlerOptions<any>>>();
  100. constructor(private transactionSubscriber: TransactionSubscriber) {}
  101. /**
  102. * @description
  103. * Publish an event which any subscribers can react to.
  104. *
  105. * @example
  106. * ```ts
  107. * await eventBus.publish(new SomeEvent());
  108. * ```
  109. */
  110. async publish<T extends VendureEvent>(event: T): Promise<void> {
  111. this.eventStream.next(event);
  112. await this.executeBlockingEventHandlers(event);
  113. }
  114. /**
  115. * @description
  116. * Returns an RxJS Observable stream of events of the given type.
  117. * If the event contains a {@link RequestContext} object, the subscriber
  118. * will only get called after any active database transactions are complete.
  119. *
  120. * This means that the subscriber function can safely access all updated
  121. * data related to the event.
  122. */
  123. ofType<T extends VendureEvent>(type: Type<T>): Observable<T> {
  124. return this.eventStream.asObservable().pipe(
  125. takeUntil(this.destroy$),
  126. filter(e => e.constructor === type),
  127. mergeMap(event => this.awaitActiveTransactions(event)),
  128. filter(notNullOrUndefined),
  129. ) as Observable<T>;
  130. }
  131. /**
  132. * @description
  133. * Returns an RxJS Observable stream of events filtered by a custom predicate.
  134. * If the event contains a {@link RequestContext} object, the subscriber
  135. * will only get called after any active database transactions are complete.
  136. *
  137. * This means that the subscriber function can safely access all updated
  138. * data related to the event.
  139. */
  140. filter<T extends VendureEvent>(predicate: (event: VendureEvent) => boolean): Observable<T> {
  141. return this.eventStream.asObservable().pipe(
  142. takeUntil(this.destroy$),
  143. filter(e => predicate(e)),
  144. mergeMap(event => this.awaitActiveTransactions(event)),
  145. filter(notNullOrUndefined),
  146. ) as Observable<T>;
  147. }
  148. /**
  149. * @description
  150. * Register an event handler function which will be executed when an event of the given type is published,
  151. * and will block execution of the code which published the event until the handler has completed.
  152. *
  153. * This is useful when you need assurance that the event handler has successfully completed, and you want
  154. * the triggering code to fail if the handler fails.
  155. *
  156. * ::: warning
  157. * This API should be used with caution, as errors or performance issues in the handler can cause the
  158. * associated operation to be slow or fail entirely. For this reason, any handler which takes longer than
  159. * 100ms to execute will log a warning. Any non-trivial task to be performed in a blocking event handler
  160. * should be offloaded to a background job using the {@link JobQueueService}.
  161. *
  162. * Also, be aware that the handler will be executed in the _same database transaction_ as the code which published
  163. * the event (as long as you pass the `ctx` object from the event to any TransactionalConnection calls).
  164. * :::
  165. *
  166. * @example
  167. * ```ts
  168. * eventBus.registerBlockingEventHandler({
  169. * event: OrderStateTransitionEvent,
  170. * id: 'my-order-state-transition-handler',
  171. * handler: async (event) => {
  172. * // perform some synchronous task
  173. * }
  174. * });
  175. * ```
  176. *
  177. * @since 2.2.0
  178. */
  179. registerBlockingEventHandler<T extends VendureEvent>(handlerOptions: BlockingEventHandlerOptions<T>) {
  180. const events = Array.isArray(handlerOptions.event) ? handlerOptions.event : [handlerOptions.event];
  181. for (const event of events) {
  182. let handlers = this.blockingEventHandlers.get(event);
  183. const handlerWithIdAlreadyExists = handlers?.some(h => h.id === handlerOptions.id);
  184. if (handlerWithIdAlreadyExists) {
  185. throw new Error(
  186. `A handler with the id "${handlerOptions.id}" is already registered for the event ${event.name}`,
  187. );
  188. }
  189. if (handlers) {
  190. handlers.push(handlerOptions);
  191. } else {
  192. handlers = [handlerOptions];
  193. }
  194. const orderedHandlers = this.orderEventHandlers(handlers);
  195. this.blockingEventHandlers.set(event, orderedHandlers);
  196. }
  197. }
  198. /** @internal */
  199. onModuleDestroy(): any {
  200. this.destroy$.next();
  201. }
  202. private async executeBlockingEventHandlers<T extends VendureEvent>(event: T): Promise<void> {
  203. const blockingHandlers = this.blockingEventHandlers.get(event.constructor as Type<T>);
  204. for (const options of blockingHandlers || []) {
  205. const timeStart = new Date().getTime();
  206. await options.handler(event);
  207. const timeEnd = new Date().getTime();
  208. const timeTaken = timeEnd - timeStart;
  209. Logger.debug(`Blocking event handler ${options.id} took ${timeTaken}ms`);
  210. if (timeTaken > 100) {
  211. Logger.warn(
  212. [
  213. `Blocking event handler ${options.id} took ${timeTaken}ms`,
  214. `Consider optimizing the handler by moving the logic to a background job or using a more efficient algorithm.`,
  215. ].join('\n'),
  216. );
  217. }
  218. }
  219. }
  220. private orderEventHandlers<T extends VendureEvent>(
  221. handlers: Array<BlockingEventHandlerOptions<T>>,
  222. ): Array<BlockingEventHandlerOptions<T>> {
  223. let orderedHandlers: Array<BlockingEventHandlerOptions<T>> = [];
  224. const handlerMap: Map<string, BlockingEventHandlerOptions<T>> = new Map();
  225. // Create a map of handlers by ID for efficient lookup
  226. for (const handler of handlers) {
  227. handlerMap.set(handler.id, handler);
  228. }
  229. // Helper function to recursively add handlers in correct order
  230. const addHandler = (handler: BlockingEventHandlerOptions<T>) => {
  231. // If the handler is already in the ordered list, skip it
  232. if (orderedHandlers.includes(handler)) {
  233. return;
  234. }
  235. // If an "after" handler is specified, add it recursively
  236. if (handler.after) {
  237. const afterHandler = handlerMap.get(handler.after);
  238. if (afterHandler) {
  239. if (afterHandler.after === handler.id) {
  240. throw new Error(
  241. `Circular dependency detected between event handlers ${handler.id} and ${afterHandler.id}`,
  242. );
  243. }
  244. orderedHandlers = orderedHandlers.filter(h => h.id !== afterHandler.id);
  245. addHandler(afterHandler);
  246. }
  247. }
  248. // Add the current handler
  249. orderedHandlers.push(handler);
  250. // If a "before" handler is specified, add it recursively
  251. if (handler.before) {
  252. const beforeHandler = handlerMap.get(handler.before);
  253. if (beforeHandler) {
  254. if (beforeHandler.before === handler.id) {
  255. throw new Error(
  256. `Circular dependency detected between event handlers ${handler.id} and ${beforeHandler.id}`,
  257. );
  258. }
  259. orderedHandlers = orderedHandlers.filter(h => h.id !== beforeHandler.id);
  260. addHandler(beforeHandler);
  261. }
  262. }
  263. };
  264. // Start adding handlers from the original list
  265. for (const handler of handlers) {
  266. addHandler(handler);
  267. }
  268. return orderedHandlers;
  269. }
  270. /**
  271. * If the Event includes a RequestContext property, we need to check for any active transaction
  272. * associated with it, and if there is one, we await that transaction to either commit or rollback
  273. * before publishing the event.
  274. *
  275. * The reason for this is that if the transaction is still active when event subscribers execute,
  276. * this can cause a couple of issues:
  277. *
  278. * 1. If the transaction hasn't completed by the time the subscriber runs, the new data inside
  279. * the transaction will not be available to the subscriber.
  280. * 2. If the subscriber gets a reference to the EntityManager which has an active transaction,
  281. * and then the transaction completes, and then the subscriber attempts a DB operation using that
  282. * EntityManager, a fatal QueryRunnerAlreadyReleasedError will be thrown.
  283. *
  284. * For more context on these two issues, see:
  285. *
  286. * * https://github.com/vendure-ecommerce/vendure/issues/520
  287. * * https://github.com/vendure-ecommerce/vendure/issues/1107
  288. */
  289. private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T | undefined> {
  290. const entry = Object.entries(event).find(([_, value]) => value instanceof RequestContext);
  291. if (!entry) {
  292. return event;
  293. }
  294. const [key, ctx]: [string, RequestContext] = entry;
  295. const transactionManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
  296. if (!transactionManager?.queryRunner) {
  297. return event;
  298. }
  299. try {
  300. await this.transactionSubscriber.awaitCommit(transactionManager.queryRunner);
  301. // Copy context and remove transaction manager
  302. // This will prevent queries to released query runner
  303. const newContext = ctx.copy();
  304. delete (newContext as any)[TRANSACTION_MANAGER_KEY];
  305. // Reassign new context
  306. (event as any)[key] = newContext;
  307. return event;
  308. } catch (e: any) {
  309. if (e instanceof TransactionSubscriberError) {
  310. // Expected commit, but rollback or something else happened.
  311. // This is still reliable behavior, return undefined
  312. // as event should not be exposed from this transaction
  313. return;
  314. }
  315. throw e;
  316. }
  317. }
  318. }