Browse Source

feat(core): Make event bus subscriptions transaction-safe

Fixes #1107. Relates to #520.
This commit introduces a new mechanism to guarantee that event subscribers do not run into issues
with database transactions. It works by postponing the publishing on an event until the
associated transaction has completed. Thus the subscriber can be assured that all data changes
in the transaction are available to read right away.
Michael Bromley 4 years ago
parent
commit
f0fd66258b

+ 31 - 5
packages/core/e2e/database-transactions.e2e-spec.ts

@@ -2,11 +2,16 @@ import { mergeConfig } from '@vendure/core';
 import { createTestEnvironment } from '@vendure/testing';
 import gql from 'graphql-tag';
 import path from 'path';
+import { ReplaySubject } from 'rxjs';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
 import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 
-import { TransactionTestPlugin, TRIGGER_EMAIL } from './fixtures/test-plugins/transaction-test-plugin';
+import {
+    TransactionTestPlugin,
+    TRIGGER_ATTEMPTED_READ_EMAIL,
+    TRIGGER_ATTEMPTED_UPDATE_EMAIL,
+} from './fixtures/test-plugins/transaction-test-plugin';
 
 describe('Transaction infrastructure', () => {
     const { server, adminClient } = createTestEnvironment(
@@ -104,13 +109,25 @@ describe('Transaction infrastructure', () => {
 
     // Testing https://github.com/vendure-ecommerce/vendure/issues/520
     it('passing transaction via EventBus', async () => {
-        TransactionTestPlugin.errorHandler.mockClear();
+        TransactionTestPlugin.reset();
         const { createTestAdministrator } = await adminClient.query(CREATE_ADMIN, {
-            emailAddress: TRIGGER_EMAIL,
+            emailAddress: TRIGGER_ATTEMPTED_UPDATE_EMAIL,
             fail: false,
         });
         await TransactionTestPlugin.eventHandlerComplete$.toPromise();
-        expect(createTestAdministrator.emailAddress).toBe(TRIGGER_EMAIL);
+        expect(createTestAdministrator.emailAddress).toBe(TRIGGER_ATTEMPTED_UPDATE_EMAIL);
+        expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
+    });
+
+    // Testing https://github.com/vendure-ecommerce/vendure/issues/1107
+    it('passing transaction via EventBus with delay in committing transaction', async () => {
+        TransactionTestPlugin.reset();
+        const { createTestAdministrator4 } = await adminClient.query(CREATE_ADMIN4, {
+            emailAddress: TRIGGER_ATTEMPTED_READ_EMAIL,
+            fail: false,
+        });
+        await TransactionTestPlugin.eventHandlerComplete$.toPromise();
+        expect(createTestAdministrator4.emailAddress).toBe(TRIGGER_ATTEMPTED_READ_EMAIL);
         expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
     });
 });
@@ -145,7 +162,7 @@ const CREATE_ADMIN2 = gql`
 `;
 
 const CREATE_ADMIN3 = gql`
-    mutation CreateTestAdmin2($emailAddress: String!, $fail: Boolean!) {
+    mutation CreateTestAdmin3($emailAddress: String!, $fail: Boolean!) {
         createTestAdministrator3(emailAddress: $emailAddress, fail: $fail) {
             ...CreatedAdmin
         }
@@ -153,6 +170,15 @@ const CREATE_ADMIN3 = gql`
     ${ADMIN_FRAGMENT}
 `;
 
+const CREATE_ADMIN4 = gql`
+    mutation CreateTestAdmin4($emailAddress: String!, $fail: Boolean!) {
+        createTestAdministrator4(emailAddress: $emailAddress, fail: $fail) {
+            ...CreatedAdmin
+        }
+    }
+    ${ADMIN_FRAGMENT}
+`;
+
 const VERIFY_TEST = gql`
     query VerifyTest {
         verify {

+ 4 - 4
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -42,7 +42,7 @@ import {
     UpdateCollection,
     UpdateProduct,
     UpdateProductVariants,
-    UpdateTaxRate
+    UpdateTaxRate,
 } from './graphql/generated-e2e-admin-types';
 import { LogicalOperator, SearchProductsShop } from './graphql/generated-e2e-shop-types';
 import {
@@ -565,7 +565,7 @@ describe('Default search plugin', () => {
                 },
             );
             expect(result.search.collections).toEqual([
-                {collection: {id: 'T_2', name: 'Plants',},count: 3,},
+                { collection: { id: 'T_2', name: 'Plants' }, count: 3 },
             ]);
         });
 
@@ -579,8 +579,8 @@ describe('Default search plugin', () => {
                 },
             );
             expect(result.search.collections).toEqual([
-                {collection: {id: 'T_2', name: 'Plants',},count: 3,},
-                ]);
+                { collection: { id: 'T_2', name: 'Plants' }, count: 3 },
+            ]);
         });
 
         it('encodes the productId and productVariantId', async () => {

+ 32 - 4
packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts

@@ -23,7 +23,8 @@ export class TestEvent extends VendureEvent {
     }
 }
 
-export const TRIGGER_EMAIL = 'trigger-email';
+export const TRIGGER_ATTEMPTED_UPDATE_EMAIL = 'trigger-attempted-update-email';
+export const TRIGGER_ATTEMPTED_READ_EMAIL = 'trigger-attempted-read-email';
 
 @Injectable()
 class TestUserService {
@@ -99,6 +100,15 @@ class TestResolver {
         return this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail);
     }
 
+    @Mutation()
+    @Transaction()
+    async createTestAdministrator4(@Ctx() ctx: RequestContext, @Args() args: any) {
+        const admin = await this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail);
+        this.eventBus.publish(new TestEvent(ctx, admin));
+        await new Promise(resolve => setTimeout(resolve, 50));
+        return admin;
+    }
+
     @Query()
     async verify() {
         const admins = await this.connection.getRepository(Administrator).find();
@@ -119,6 +129,7 @@ class TestResolver {
                 createTestAdministrator(emailAddress: String!, fail: Boolean!): Administrator
                 createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator
                 createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator
+                createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator
             }
             type VerifyResult {
                 admins: [Administrator!]!
@@ -138,16 +149,33 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
 
     constructor(private eventBus: EventBus, private connection: TransactionalConnection) {}
 
+    static reset() {
+        this.eventHandlerComplete$ = new ReplaySubject(1);
+        this.errorHandler.mockClear();
+    }
+
     onApplicationBootstrap(): any {
         // This part is used to test how RequestContext with transactions behave
         // when used in an Event subscription
         this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => {
             const { ctx, administrator } = event;
-            if (administrator.emailAddress === TRIGGER_EMAIL) {
+            if (administrator.emailAddress === TRIGGER_ATTEMPTED_UPDATE_EMAIL) {
+                const adminRepository = this.connection.getRepository(ctx, Administrator);
+                await new Promise(resolve => setTimeout(resolve, 50));
                 administrator.lastName = 'modified';
                 try {
-                    await new Promise(resolve => setTimeout(resolve, 50));
-                    await this.connection.getRepository(ctx, Administrator).save(administrator);
+                    await adminRepository.save(administrator);
+                } catch (e) {
+                    TransactionTestPlugin.errorHandler(e);
+                } finally {
+                    TransactionTestPlugin.eventHandlerComplete$.complete();
+                }
+            }
+            if (administrator.emailAddress === TRIGGER_ATTEMPTED_READ_EMAIL) {
+                // note the ctx is not passed here, so we are not inside the ongoing transaction
+                const adminRepository = this.connection.getRepository(Administrator);
+                try {
+                    await adminRepository.findOneOrFail(administrator.id);
                 } catch (e) {
                     TransactionTestPlugin.errorHandler(e);
                 } finally {

+ 1 - 0
packages/core/src/config/config.service.mock.ts

@@ -23,6 +23,7 @@ export class MockConfigService implements MockClass<ConfigService> {
     defaultLanguageCode: jest.Mock<any>;
     roundingStrategy: {};
     entityIdStrategy = new MockIdStrategy();
+    entityOptions = {};
     assetOptions = {
         assetNamingStrategy: {} as any,
         assetStorageStrategy: {} as any,

+ 3 - 3
packages/core/src/connection/connection.module.ts

@@ -6,14 +6,14 @@ import { ConfigModule } from '../config/config.module';
 import { ConfigService } from '../config/config.service';
 import { TypeOrmLogger } from '../config/logger/typeorm-logger';
 
+import { TransactionSubscriber } from './transaction-subscriber';
 import { TransactionalConnection } from './transactional-connection';
 
 let defaultTypeOrmModule: DynamicModule;
 
 @Module({
-    imports: [],
-    providers: [TransactionalConnection],
-    exports: [TransactionalConnection],
+    providers: [TransactionalConnection, TransactionSubscriber],
+    exports: [TransactionalConnection, TransactionSubscriber],
 })
 export class ConnectionModule {
     static forRoot(): DynamicModule {

+ 67 - 0
packages/core/src/connection/transaction-subscriber.ts

@@ -0,0 +1,67 @@
+import { Injectable } from '@nestjs/common';
+import { InjectConnection } from '@nestjs/typeorm';
+import { merge, Subject } from 'rxjs';
+import { filter, map, take } from 'rxjs/operators';
+import { Connection, EntitySubscriberInterface } from 'typeorm';
+import { EntityManager } from 'typeorm/entity-manager/EntityManager';
+import { QueryRunner } from 'typeorm/query-runner/QueryRunner';
+import { TransactionCommitEvent } from 'typeorm/subscriber/event/TransactionCommitEvent';
+import { TransactionRollbackEvent } from 'typeorm/subscriber/event/TransactionRollbackEvent';
+
+export interface TransactionSubscriberEvent {
+    /**
+     * Connection used in the event.
+     */
+    connection: Connection;
+    /**
+     * QueryRunner used in the event transaction.
+     * All database operations in the subscribed event listener should be performed using this query runner instance.
+     */
+    queryRunner: QueryRunner;
+    /**
+     * EntityManager used in the event transaction.
+     * All database operations in the subscribed event listener should be performed using this entity manager instance.
+     */
+    manager: EntityManager;
+}
+
+/**
+ * This subscriber listens to all transaction commit/rollback events emitted by TypeORM
+ * so that we can be notified as soon as a particular queryRunner's transactions ends.
+ *
+ * This is used by the {@link EventBus} to prevent events from being published until their
+ * associated transactions are complete.
+ */
+@Injectable()
+export class TransactionSubscriber implements EntitySubscriberInterface {
+    private commit$ = new Subject<TransactionSubscriberEvent>();
+    private rollback$ = new Subject<TransactionSubscriberEvent>();
+
+    constructor(@InjectConnection() private connection: Connection) {
+        if (!connection.subscribers.find(subscriber => subscriber.constructor === TransactionSubscriber)) {
+            connection.subscribers.push(this);
+        }
+    }
+
+    afterTransactionCommit(event: TransactionCommitEvent) {
+        this.commit$.next(event);
+    }
+
+    afterTransactionRollback(event: TransactionRollbackEvent) {
+        this.rollback$.next(event);
+    }
+
+    awaitRelease(queryRunner: QueryRunner): Promise<QueryRunner> {
+        if (queryRunner.isTransactionActive) {
+            return merge(this.commit$, this.rollback$)
+                .pipe(
+                    filter(event => event.queryRunner === queryRunner),
+                    take(1),
+                    map(event => event.queryRunner),
+                )
+                .toPromise();
+        } else {
+            return Promise.resolve(queryRunner);
+        }
+    }
+}

+ 2 - 6
packages/core/src/connection/transactional-connection.ts

@@ -36,9 +36,7 @@ export interface GetEntityOrThrowOptions<T = any> extends FindOneOptions<T> {
     /**
      * @description
      * If set to a positive integer, it will retry getting the entity in case it is initially not
-     * found. This can be useful when working with the {@link EventBus} and subscribing to the
-     * creation of new Entities which may on first attempt be inaccessible due to an ongoing
-     * transaction.
+     * found.
      *
      * @since 1.1.0
      * @default 0
@@ -157,9 +155,7 @@ export class TransactionalConnection {
     /**
      * @description
      * Finds an entity of the given type by ID, or throws an `EntityNotFoundError` if none
-     * is found. Can be configured to retry (using the `retries` option) in the event of the
-     * entity not being found on the first attempt. This can be useful when attempting to access
-     * an entity which was just created and may be inaccessible due to an ongoing transaction.
+     * is found.
      */
     async getEntityOrThrow<T extends VendureEntity>(
         ctx: RequestContext,

+ 3 - 0
packages/core/src/event-bus/event-bus.module.ts

@@ -1,8 +1,11 @@
 import { Module } from '@nestjs/common';
 
+import { ConnectionModule } from '../connection/connection.module';
+
 import { EventBus } from './event-bus';
 
 @Module({
+    imports: [ConnectionModule],
     providers: [EventBus],
     exports: [EventBus],
 })

+ 35 - 19
packages/core/src/event-bus/event-bus.ts

@@ -1,11 +1,12 @@
 import { Injectable, OnModuleDestroy } from '@nestjs/common';
 import { Type } from '@vendure/common/lib/shared-types';
 import { Observable, Subject } from 'rxjs';
-import { filter, takeUntil } from 'rxjs/operators';
+import { filter, mergeMap, takeUntil } from 'rxjs/operators';
 import { EntityManager } from 'typeorm';
 
 import { RequestContext } from '../api/common/request-context';
 import { TRANSACTION_MANAGER_KEY } from '../common/constants';
+import { TransactionSubscriber } from '../connection/transaction-subscriber';
 
 import { VendureEvent } from './vendure-event';
 
@@ -57,22 +58,30 @@ export class EventBus implements OnModuleDestroy {
     private eventStream = new Subject<VendureEvent>();
     private destroy$ = new Subject();
 
+    constructor(private transactionSubscriber: TransactionSubscriber) {}
+
     /**
      * @description
      * Publish an event which any subscribers can react to.
      */
     publish<T extends VendureEvent>(event: T): void {
-        this.eventStream.next(this.prepareRequestContext(event));
+        this.eventStream.next(event);
     }
 
     /**
      * @description
      * Returns an RxJS Observable stream of events of the given type.
+     * If the event contains a {@link RequestContext} object, the subscriber
+     * will only get called after any active database transactions are complete.
+     *
+     * This means that the subscriber function can safely access all updated
+     * data related to the event.
      */
     ofType<T extends VendureEvent>(type: Type<T>): Observable<T> {
         return this.eventStream.asObservable().pipe(
             takeUntil(this.destroy$),
             filter(e => (e as any).constructor === type),
+            mergeMap(event => this.awaitActiveTransactions(event)),
         ) as Observable<T>;
     }
 
@@ -82,26 +91,33 @@ export class EventBus implements OnModuleDestroy {
     }
 
     /**
-     * If the Event includes a RequestContext property, we need to:
+     * If the Event includes a RequestContext property, we need to check for any active transaction
+     * associated with it, and if there is one, we await that transaction to either commit or rollback
+     * before publishing the event.
+     *
+     * The reason for this is that if the transaction is still active when event subscribers execute,
+     * this can cause a couple of issues:
      *
-     * 1) Set it as a copy of the original
-     * 2) Remove the TRANSACTION_MANAGER_KEY from that copy
+     * 1. If the transaction hasn't completed by the time the subscriber runs, the new data inside
+     *  the transaction will not be available to the subscriber.
+     * 2. If the subscriber gets a reference to the EntityManager which has an active transaction,
+     *   and then the transaction completes, and then the subscriber attempts a DB operation using that
+     *   EntityManager, a fatal QueryRunnerAlreadyReleasedError will be thrown.
      *
-     * The TRANSACTION_MANAGER_KEY is used to track transactions across calls
-     * (this is why we always pass the `ctx` object to get TransactionalConnection.getRepository() method).
-     * However, allowing a transaction to continue in an async event subscriber function _will_ cause
-     * very confusing issues (see https://github.com/vendure-ecommerce/vendure/issues/520), which is why
-     * we simply remove the reference to the transaction manager from the context object altogether.
+     * For more context on these two issues, see:
+     *
+     * * https://github.com/vendure-ecommerce/vendure/issues/520
+     * * https://github.com/vendure-ecommerce/vendure/issues/1107
      */
-    private prepareRequestContext<T extends VendureEvent>(event: T): T {
-        for (const propertyName of Object.getOwnPropertyNames(event)) {
-            const property = event[propertyName as keyof T];
-            if (property instanceof RequestContext) {
-                const ctxCopy = property.copy();
-                delete (ctxCopy as any)[TRANSACTION_MANAGER_KEY];
-                (event[propertyName as keyof T] as any) = ctxCopy;
-            }
+    private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T> {
+        const ctx = Object.values(event).find(value => value instanceof RequestContext);
+        if (!ctx) {
+            return event;
+        }
+        const transactionManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
+        if (!transactionManager?.queryRunner) {
+            return event;
         }
-        return event;
+        return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => event);
     }
 }

+ 0 - 3
packages/core/src/service/services/product-variant.service.ts

@@ -649,9 +649,6 @@ export class ProductVariantService {
             ctx,
             variants.map(v => v.id),
         );
-        // Publish the events at the latest possible stage to decrease the chance of race conditions
-        // whereby an event listener triggers a query which does not yet have access to the changes
-        // within the current transaction.
         for (const variant of variants) {
             this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
         }

+ 63 - 0
packages/dev-server/test-plugins/event-bus-transactions-plugin.ts

@@ -0,0 +1,63 @@
+/* tslint:disable:no-non-null-assertion */
+import { OnModuleInit } from '@nestjs/common';
+import { Args, Mutation, Resolver } from '@nestjs/graphql';
+import {
+    Asset,
+    AssetEvent,
+    AssetService,
+    Ctx,
+    EventBus,
+    ID,
+    Logger,
+    PluginCommonModule,
+    RequestContext,
+    Transaction,
+    TransactionalConnection,
+    VendurePlugin,
+} from '@vendure/core';
+import gql from 'graphql-tag';
+
+@Resolver()
+class TestResolver {
+    constructor(private assetService: AssetService) {}
+
+    @Transaction()
+    @Mutation()
+    async setAssetName(@Ctx() ctx: RequestContext, @Args() args: { id: ID; name: string }) {
+        await this.assetService.update(ctx, {
+            id: args.id,
+            name: args.name,
+        });
+        await new Promise(resolve => setTimeout(resolve, 500));
+        Logger.info(`setAssetName returning`);
+        return true;
+    }
+}
+
+// A plugin to explore solutions to https://github.com/vendure-ecommerce/vendure/issues/1107
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    adminApiExtensions: {
+        schema: gql`
+            extend type Mutation {
+                setAssetName(id: ID!, name: String!): Boolean
+            }
+        `,
+        resolvers: [TestResolver],
+    },
+})
+export class EventBusTransactionsPlugin implements OnModuleInit {
+    constructor(private eventBus: EventBus, private connection: TransactionalConnection) {}
+
+    onModuleInit(): any {
+        this.eventBus.ofType(AssetEvent).subscribe(async event => {
+            Logger.info(`Event handler started`);
+            const repository = this.connection.getRepository(event.ctx, Asset);
+            await new Promise(resolve => setTimeout(resolve, 1000));
+            const asset = await repository.findOne(event.asset.id);
+            Logger.info(`The asset name is ${asset?.name}`);
+            asset!.name = asset!.name + ' modified';
+            await repository.save(asset!);
+        });
+    }
+}