ソースを参照

feat(core): Implement new blocking event handler API

Relates to #2735
Michael Bromley 1 年間 前
コミット
1c694996be

+ 231 - 27
packages/core/src/event-bus/event-bus.spec.ts

@@ -1,3 +1,4 @@
+import { firstValueFrom, Subject } from 'rxjs';
 import { QueryRunner } from 'typeorm';
 import { beforeEach, describe, expect, it, vi } from 'vitest';
 
@@ -20,7 +21,7 @@ describe('EventBus', () => {
     it('can publish without subscribers', () => {
         const event = new TestEvent('foo');
 
-        expect(() => eventBus.publish(event)).not.toThrow();
+        expect(async () => await eventBus.publish(event)).not.toThrow();
     });
 
     describe('ofType()', () => {
@@ -29,7 +30,7 @@ describe('EventBus', () => {
             const event = new TestEvent('foo');
             eventBus.ofType(TestEvent).subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(1);
@@ -43,9 +44,9 @@ describe('EventBus', () => {
             const event3 = new TestEvent('baz');
             eventBus.ofType(TestEvent).subscribe(handler);
 
-            eventBus.publish(event1);
-            eventBus.publish(event2);
-            eventBus.publish(event3);
+            await eventBus.publish(event1);
+            await eventBus.publish(event2);
+            await eventBus.publish(event3);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(3);
@@ -63,7 +64,7 @@ describe('EventBus', () => {
             eventBus.ofType(TestEvent).subscribe(handler2);
             eventBus.ofType(TestEvent).subscribe(handler3);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler1).toHaveBeenCalledWith(event);
@@ -76,7 +77,7 @@ describe('EventBus', () => {
             const event = new OtherTestEvent('foo');
             eventBus.ofType(TestEvent).subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).not.toHaveBeenCalled();
@@ -87,15 +88,15 @@ describe('EventBus', () => {
             const event = new TestEvent('foo');
             const subscription = eventBus.ofType(TestEvent).subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(1);
 
             subscription.unsubscribe();
 
-            eventBus.publish(event);
-            eventBus.publish(event);
+            await eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(1);
@@ -108,7 +109,7 @@ describe('EventBus', () => {
             const subscription1 = eventBus.ofType(TestEvent).subscribe(handler1);
             const subscription2 = eventBus.ofType(TestEvent).subscribe(handler2);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler1).toHaveBeenCalledTimes(1);
@@ -116,8 +117,8 @@ describe('EventBus', () => {
 
             subscription1.unsubscribe();
 
-            eventBus.publish(event);
-            eventBus.publish(event);
+            await eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler1).toHaveBeenCalledTimes(1);
@@ -131,7 +132,7 @@ describe('EventBus', () => {
             const event = new TestEvent('foo');
             eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(1);
@@ -145,9 +146,9 @@ describe('EventBus', () => {
             const event3 = new TestEvent('baz');
             eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);
 
-            eventBus.publish(event1);
-            eventBus.publish(event2);
-            eventBus.publish(event3);
+            await eventBus.publish(event1);
+            await eventBus.publish(event2);
+            await eventBus.publish(event3);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(3);
@@ -165,7 +166,7 @@ describe('EventBus', () => {
             eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler2);
             eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler3);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler1).toHaveBeenCalledWith(event);
@@ -178,7 +179,7 @@ describe('EventBus', () => {
             const event = new OtherTestEvent('foo');
             eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).not.toHaveBeenCalled();
@@ -189,7 +190,7 @@ describe('EventBus', () => {
             const event = new ChildTestEvent('bar', 'foo');
             eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalled();
@@ -202,15 +203,15 @@ describe('EventBus', () => {
                 .filter(vendureEvent => vendureEvent instanceof TestEvent)
                 .subscribe(handler);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(1);
 
             subscription.unsubscribe();
 
-            eventBus.publish(event);
-            eventBus.publish(event);
+            await eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler).toHaveBeenCalledTimes(1);
@@ -227,7 +228,7 @@ describe('EventBus', () => {
                 .filter(vendureEvent => vendureEvent instanceof TestEvent)
                 .subscribe(handler2);
 
-            eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler1).toHaveBeenCalledTimes(1);
@@ -235,14 +236,214 @@ describe('EventBus', () => {
 
             subscription1.unsubscribe();
 
-            eventBus.publish(event);
-            eventBus.publish(event);
+            await eventBus.publish(event);
+            await eventBus.publish(event);
             await new Promise(resolve => setImmediate(resolve));
 
             expect(handler1).toHaveBeenCalledTimes(1);
             expect(handler2).toHaveBeenCalledTimes(3);
         });
     });
+
+    describe('blocking event handlers', () => {
+        it('calls the handler function', async () => {
+            const event = new TestEvent('foo');
+            const spy = vi.fn((e: VendureEvent) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy(e),
+                id: 'test-handler',
+                event: TestEvent,
+            });
+
+            await eventBus.publish(event);
+
+            expect(spy).toHaveBeenCalledTimes(1);
+            expect(spy).toHaveBeenCalledWith(event);
+        });
+
+        it('throws when attempting to register with a duplicate id', () => {
+            eventBus.registerBlockingEventHandler({
+                handler: e => undefined,
+                id: 'test-handler',
+                event: TestEvent,
+            });
+            expect(() => {
+                eventBus.registerBlockingEventHandler({
+                    handler: e => undefined,
+                    id: 'test-handler',
+                    event: TestEvent,
+                });
+            }).toThrowError(
+                'A handler with the id "test-handler" is already registered for the event TestEvent',
+            );
+        });
+
+        it('calls multiple handler functions', async () => {
+            const event = new TestEvent('foo');
+            const spy1 = vi.fn((e: VendureEvent) => undefined);
+            const spy2 = vi.fn((e: VendureEvent) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy1(e),
+                id: 'test-handler1',
+                event: TestEvent,
+            });
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy2(e),
+                id: 'test-handler2',
+                event: TestEvent,
+            });
+
+            await eventBus.publish(event);
+
+            expect(spy1).toHaveBeenCalledTimes(1);
+            expect(spy1).toHaveBeenCalledWith(event);
+            expect(spy2).toHaveBeenCalledTimes(1);
+            expect(spy2).toHaveBeenCalledWith(event);
+        });
+
+        it('handles multiple events', async () => {
+            const event1 = new TestEvent('foo');
+            const event2 = new OtherTestEvent('bar');
+            const spy = vi.fn((e: VendureEvent) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy(e),
+                id: 'test-handler',
+                event: [TestEvent, OtherTestEvent],
+            });
+
+            await eventBus.publish(event1);
+            expect(spy).toHaveBeenCalledTimes(1);
+            expect(spy).toHaveBeenCalledWith(event1);
+
+            await eventBus.publish(event2);
+            expect(spy).toHaveBeenCalledTimes(2);
+            expect(spy).toHaveBeenCalledWith(event2);
+        });
+
+        it('publish method throws in a handler throws', async () => {
+            const event = new TestEvent('foo');
+            eventBus.registerBlockingEventHandler({
+                handler: () => {
+                    throw new Error('test error');
+                },
+                id: 'test-handler',
+                event: TestEvent,
+            });
+
+            await expect(eventBus.publish(event)).rejects.toThrow('test error');
+        });
+
+        it('order of execution with "before" property', async () => {
+            const event = new TestEvent('foo');
+            const spy = vi.fn((input: string) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy('test-handler1'),
+                id: 'test-handler1',
+                event: TestEvent,
+            });
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy('test-handler2'),
+                id: 'test-handler2',
+                event: TestEvent,
+                before: 'test-handler1',
+            });
+
+            await eventBus.publish(event);
+
+            expect(spy).toHaveBeenCalledTimes(2);
+            expect(spy).toHaveBeenNthCalledWith(1, 'test-handler2');
+            expect(spy).toHaveBeenNthCalledWith(2, 'test-handler1');
+        });
+
+        it('order of execution with "after" property', async () => {
+            const event = new TestEvent('foo');
+            const spy = vi.fn((input: string) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy('test-handler1'),
+                id: 'test-handler1',
+                event: TestEvent,
+                after: 'test-handler2',
+            });
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy('test-handler2'),
+                id: 'test-handler2',
+                event: TestEvent,
+            });
+
+            await eventBus.publish(event);
+
+            expect(spy).toHaveBeenCalledTimes(2);
+            expect(spy).toHaveBeenNthCalledWith(1, 'test-handler2');
+            expect(spy).toHaveBeenNthCalledWith(2, 'test-handler1');
+        });
+
+        it('throws if there is a cycle in before ordering', () => {
+            const spy = vi.fn((input: string) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy('test-handler1'),
+                id: 'test-handler1',
+                event: TestEvent,
+                before: 'test-handler2',
+            });
+
+            expect(() =>
+                eventBus.registerBlockingEventHandler({
+                    handler: e => spy('test-handler2'),
+                    id: 'test-handler2',
+                    event: TestEvent,
+                    before: 'test-handler1',
+                }),
+            ).toThrowError(
+                'Circular dependency detected between event handlers test-handler1 and test-handler2',
+            );
+        });
+
+        it('throws if there is a cycle in after ordering', () => {
+            const spy = vi.fn((input: string) => undefined);
+            eventBus.registerBlockingEventHandler({
+                handler: e => spy('test-handler1'),
+                id: 'test-handler1',
+                event: TestEvent,
+                after: 'test-handler2',
+            });
+
+            expect(() =>
+                eventBus.registerBlockingEventHandler({
+                    handler: e => spy('test-handler2'),
+                    id: 'test-handler2',
+                    event: TestEvent,
+                    after: 'test-handler1',
+                }),
+            ).toThrowError(
+                'Circular dependency detected between event handlers test-handler1 and test-handler2',
+            );
+        });
+
+        it('blocks execution of the publish method', async () => {
+            const event = new TestEvent('foo');
+            const subject = new Subject<void>();
+            eventBus.registerBlockingEventHandler({
+                handler: e => firstValueFrom(subject.asObservable()),
+                id: 'test-handler',
+                event: TestEvent,
+            });
+            const publishPromise = eventBus.publish(event);
+            expect(publishPromise).toBeInstanceOf(Promise);
+
+            let resolved = false;
+            void publishPromise.then(() => (resolved = true));
+
+            expect(resolved).toBe(false);
+            await new Promise(resolve => setTimeout(resolve, 50));
+            expect(resolved).toBe(false);
+            // Handler only resolves after the subject emits
+            subject.next();
+            // Allow the event loop to tick
+            await new Promise(resolve => setTimeout(resolve, 0));
+            // Now the promise should be resolved
+            expect(resolved).toBe(true);
+        });
+    });
 });
 
 class TestEvent extends VendureEvent {
@@ -252,7 +453,10 @@ class TestEvent extends VendureEvent {
 }
 
 class ChildTestEvent extends TestEvent {
-    constructor(public childPayload: string, payload: string) {
+    constructor(
+        public childPayload: string,
+        payload: string,
+    ) {
         super(payload);
     }
 }

+ 168 - 1
packages/core/src/event-bus/event-bus.ts

@@ -7,10 +7,50 @@ import { EntityManager } from 'typeorm';
 
 import { RequestContext } from '../api/common/request-context';
 import { TRANSACTION_MANAGER_KEY } from '../common/constants';
+import { Logger } from '../config/logger/vendure-logger';
 import { TransactionSubscriber, TransactionSubscriberError } from '../connection/transaction-subscriber';
 
 import { VendureEvent } from './vendure-event';
 
+/**
+ * @description
+ * Options for registering a blocking event handler.
+ *
+ * @since 2.2.0
+ * @docsCategory events
+ */
+export type BlockingEventHandlerOptions<T extends VendureEvent> = {
+    /**
+     * @description
+     * The event type to which the handler should listen.
+     * Can be a single event type or an array of event types.
+     */
+    event: Type<T> | Array<Type<T>>;
+    /**
+     * @description
+     * The handler function which will be executed when the event is published.
+     * If the handler returns a Promise, the event publishing code will wait for the Promise to resolve
+     * before continuing. Any errors thrown by the handler will cause the event publishing code to fail.
+     */
+    handler: (event: T) => void | Promise<void>;
+    /**
+     * @description
+     * A unique identifier for the handler. This can then be used to specify the order in which
+     * handlers should be executed using the `before` and `after` options in other handlers.
+     */
+    id: string;
+    /**
+     * @description
+     * The ID of another handler which this handler should execute before.
+     */
+    before?: string;
+    /**
+     * @description
+     * The ID of another handler which this handler should execute after.
+     */
+    after?: string;
+};
+
 /**
  * @description
  * The EventBus is used to globally publish events which can then be subscribed to.
@@ -58,6 +98,7 @@ import { VendureEvent } from './vendure-event';
 export class EventBus implements OnModuleDestroy {
     private eventStream = new Subject<VendureEvent>();
     private destroy$ = new Subject<void>();
+    private blockingEventHandlers = new Map<Type<VendureEvent>, Array<BlockingEventHandlerOptions<any>>>();
 
     constructor(private transactionSubscriber: TransactionSubscriber) {}
 
@@ -65,8 +106,9 @@ export class EventBus implements OnModuleDestroy {
      * @description
      * Publish an event which any subscribers can react to.
      */
-    publish<T extends VendureEvent>(event: T): void {
+    async publish<T extends VendureEvent>(event: T): Promise<void> {
         this.eventStream.next(event);
+        await this.executeBlockingEventHandlers(event);
     }
 
     /**
@@ -105,11 +147,136 @@ export class EventBus implements OnModuleDestroy {
         ) as Observable<T>;
     }
 
+    /**
+     * @description
+     * Register an event handler function which will be executed when an event of the given type is published,
+     * and will block execution of the code which published the event until the handler has completed.
+     *
+     * This is useful when you need assurance that the event handler has successfully completed, and you want
+     * the triggering code to fail if the handler fails.
+     *
+     * This API should be used with caution, as errors or performance issues in the handler can cause the
+     * associated operation to be slow or fail entirely. For this reason, any handler which takes longer than
+     * 100ms to execute will log a warning. Any non-trivial task to be performed in a blocking event handler
+     * should be offloaded to a background job using the {@link JobQueueService}.
+     *
+     * @example
+     * ```ts
+     * eventBus.registerBlockingEventHandler({
+     *   event: OrderStateTransitionEvent,
+     *   id: 'my-order-state-transition-handler',
+     *   handler: async (event) => {
+     *     // perform some synchronous task
+     *   }
+     * });
+     * ```
+     *
+     * @since 2.2.0
+     */
+    registerBlockingEventHandler<T extends VendureEvent>(handlerOptions: BlockingEventHandlerOptions<T>) {
+        const events = Array.isArray(handlerOptions.event) ? handlerOptions.event : [handlerOptions.event];
+
+        for (const event of events) {
+            let handlers = this.blockingEventHandlers.get(event);
+            const handlerWithIdAlreadyExists = handlers?.some(h => h.id === handlerOptions.id);
+            if (handlerWithIdAlreadyExists) {
+                throw new Error(
+                    `A handler with the id "${handlerOptions.id}" is already registered for the event ${event.name}`,
+                );
+            }
+
+            if (handlers) {
+                handlers.push(handlerOptions);
+            } else {
+                handlers = [handlerOptions];
+            }
+            const orderedHandlers = this.orderEventHandlers(handlers);
+            this.blockingEventHandlers.set(event, orderedHandlers);
+        }
+    }
+
     /** @internal */
     onModuleDestroy(): any {
         this.destroy$.next();
     }
 
+    private async executeBlockingEventHandlers<T extends VendureEvent>(event: T): Promise<void> {
+        const blockingHandlers = this.blockingEventHandlers.get(event.constructor as Type<T>);
+        for (const options of blockingHandlers || []) {
+            const timeStart = new Date().getTime();
+            await options.handler(event);
+            const timeEnd = new Date().getTime();
+            const timeTaken = timeEnd - timeStart;
+            Logger.debug(`Blocking event handler ${options.id} took ${timeTaken}ms`);
+            if (timeTaken > 100) {
+                Logger.warn(
+                    [
+                        `Blocking event handler ${options.id} took ${timeTaken}ms`,
+                        `Consider optimizing the handler by moving the logic to a background job or using a more efficient algorithm.`,
+                    ].join('\n'),
+                );
+            }
+        }
+    }
+
+    private orderEventHandlers<T extends VendureEvent>(
+        handlers: Array<BlockingEventHandlerOptions<T>>,
+    ): Array<BlockingEventHandlerOptions<T>> {
+        let orderedHandlers: Array<BlockingEventHandlerOptions<T>> = [];
+        const handlerMap: Map<string, BlockingEventHandlerOptions<T>> = new Map();
+
+        // Create a map of handlers by ID for efficient lookup
+        for (const handler of handlers) {
+            handlerMap.set(handler.id, handler);
+        }
+
+        // Helper function to recursively add handlers in correct order
+        const addHandler = (handler: BlockingEventHandlerOptions<T>) => {
+            // If the handler is already in the ordered list, skip it
+            if (orderedHandlers.includes(handler)) {
+                return;
+            }
+
+            // If an "after" handler is specified, add it recursively
+            if (handler.after) {
+                const afterHandler = handlerMap.get(handler.after);
+                if (afterHandler) {
+                    if (afterHandler.after === handler.id) {
+                        throw new Error(
+                            `Circular dependency detected between event handlers ${handler.id} and ${afterHandler.id}`,
+                        );
+                    }
+                    orderedHandlers = orderedHandlers.filter(h => h.id !== afterHandler.id);
+                    addHandler(afterHandler);
+                }
+            }
+
+            // Add the current handler
+            orderedHandlers.push(handler);
+
+            // If a "before" handler is specified, add it recursively
+            if (handler.before) {
+                const beforeHandler = handlerMap.get(handler.before);
+                if (beforeHandler) {
+                    if (beforeHandler.before === handler.id) {
+                        throw new Error(
+                            `Circular dependency detected between event handlers ${handler.id} and ${beforeHandler.id}`,
+                        );
+                    }
+                    orderedHandlers = orderedHandlers.filter(h => h.id !== beforeHandler.id);
+                    addHandler(beforeHandler);
+                }
+            }
+        };
+
+        // Start adding handlers from the original list
+        for (const handler of handlers) {
+            addHandler(handler);
+        }
+
+        return orderedHandlers;
+    }
+
     /**
      * If the Event includes a RequestContext property, we need to check for any active transaction
      * associated with it, and if there is one, we await that transaction to either commit or rollback