Browse Source

feat(core): EventBus exposes Observable event stream with .ofType()

Michael Bromley 6 years ago
parent
commit
506a0fae16

+ 94 - 0
packages/core/src/event-bus/event-bus.spec.ts

@@ -118,6 +118,100 @@ describe('EventBus', () => {
         expect(handler1).toHaveBeenCalledTimes(1);
         expect(handler2).toHaveBeenCalledTimes(3);
     });
+
+    describe('ofType()', () => {
+        it('single handler is called once', () => {
+            const handler = jest.fn();
+            const event = new TestEvent('foo');
+            eventBus.ofType(TestEvent).subscribe(handler);
+
+            eventBus.publish(event);
+
+            expect(handler).toHaveBeenCalledTimes(1);
+            expect(handler).toHaveBeenCalledWith(event);
+        });
+
+        it('single handler is called on multiple events', () => {
+            const handler = jest.fn();
+            const event1 = new TestEvent('foo');
+            const event2 = new TestEvent('bar');
+            const event3 = new TestEvent('baz');
+            eventBus.ofType(TestEvent).subscribe(handler);
+
+            eventBus.publish(event1);
+            eventBus.publish(event2);
+            eventBus.publish(event3);
+
+            expect(handler).toHaveBeenCalledTimes(3);
+            expect(handler).toHaveBeenCalledWith(event1);
+            expect(handler).toHaveBeenCalledWith(event2);
+            expect(handler).toHaveBeenCalledWith(event3);
+        });
+
+        it('multiple handlers are called', () => {
+            const handler1 = jest.fn();
+            const handler2 = jest.fn();
+            const handler3 = jest.fn();
+            const event = new TestEvent('foo');
+            eventBus.ofType(TestEvent).subscribe(handler1);
+            eventBus.ofType(TestEvent).subscribe(handler2);
+            eventBus.ofType(TestEvent).subscribe(handler3);
+
+            eventBus.publish(event);
+
+            expect(handler1).toHaveBeenCalledWith(event);
+            expect(handler2).toHaveBeenCalledWith(event);
+            expect(handler3).toHaveBeenCalledWith(event);
+        });
+
+        it('handler is not called for other events', () => {
+            const handler = jest.fn();
+            const event = new OtherTestEvent('foo');
+            eventBus.ofType(TestEvent).subscribe(handler);
+
+            eventBus.publish(event);
+
+            expect(handler).not.toHaveBeenCalled();
+        });
+
+        it('ofType() returns a subscription', () => {
+            const handler = jest.fn();
+            const event = new TestEvent('foo');
+            const subscription = eventBus.ofType(TestEvent).subscribe(handler);
+
+            eventBus.publish(event);
+
+            expect(handler).toHaveBeenCalledTimes(1);
+
+            subscription.unsubscribe();
+
+            eventBus.publish(event);
+            eventBus.publish(event);
+
+            expect(handler).toHaveBeenCalledTimes(1);
+        });
+
+        it('unsubscribe() only unsubscribes own handler', () => {
+            const handler1 = jest.fn();
+            const handler2 = jest.fn();
+            const event = new TestEvent('foo');
+            const subscription1 = eventBus.ofType(TestEvent).subscribe(handler1);
+            const subscription2 = eventBus.ofType(TestEvent).subscribe(handler2);
+
+            eventBus.publish(event);
+
+            expect(handler1).toHaveBeenCalledTimes(1);
+            expect(handler2).toHaveBeenCalledTimes(1);
+
+            subscription1.unsubscribe();
+
+            eventBus.publish(event);
+            eventBus.publish(event);
+
+            expect(handler1).toHaveBeenCalledTimes(1);
+            expect(handler2).toHaveBeenCalledTimes(3);
+        });
+    });
 });
 
 class TestEvent extends VendureEvent {

+ 28 - 3
packages/core/src/event-bus/event-bus.ts

@@ -1,5 +1,7 @@
-import { Injectable } from '@nestjs/common';
+import { Injectable, OnModuleDestroy } from '@nestjs/common';
 import { Type } from '@vendure/common/lib/shared-types';
+import { Observable, Subject } from 'rxjs';
+import { filter, takeUntil } from 'rxjs/operators';
 
 import { VendureEvent } from './vendure-event';
 
@@ -13,14 +15,16 @@ export type UnsubscribeFn = () => void;
  * @docsCategory events
  * */
 @Injectable()
-export class EventBus {
+export class EventBus implements OnModuleDestroy {
     private subscriberMap = new Map<Type<VendureEvent>, Array<EventHandler<any>>>();
+    private eventStream = new Subject<VendureEvent>();
+    private destroy$ = new Subject();
 
     /**
      * @description
      * Publish an event which any subscribers can react to.
      */
-    publish(event: VendureEvent): void {
+    publish<T extends VendureEvent>(event: T): void {
         const eventType = (event as any).constructor;
         const handlers = this.subscriberMap.get(eventType);
         if (handlers) {
@@ -29,12 +33,28 @@ export class EventBus {
                 handlers[i](event);
             }
         }
+        this.eventStream.next(event);
     }
 
     /**
      * @description
+     * Returns an RxJS Observable stream of events of the given type.
+     */
+    ofType<T extends VendureEvent>(type: Type<T>): Observable<T> {
+        return this.eventStream.asObservable().pipe(
+            takeUntil(this.destroy$),
+            filter(e => (e as any).constructor === type),
+        ) as Observable<T>;
+    }
+
+    /**
+     * @description
+     * Deprecated: use `ofType()` instead.
+     *
      * Subscribe to the given event type. Returns an unsubscribe function which can be used
      * to unsubscribe the handler from the event.
+     *
+     * @deprecated
      */
     subscribe<T extends VendureEvent>(type: Type<T>, handler: EventHandler<T>): UnsubscribeFn {
         const handlers = this.subscriberMap.get(type) || [];
@@ -44,4 +64,9 @@ export class EventBus {
         this.subscriberMap.set(type, handlers);
         return () => this.subscriberMap.set(type, handlers.filter(h => h !== handler));
     }
+
+    /** @internal */
+    onModuleDestroy(): any {
+        this.destroy$.next();
+    }
 }