Преглед на файлове

fix(core): Mitigate QueryRunnerAlreadyReleasedError in EventBus handlers

Fixes #520
Michael Bromley преди 5 години
родител
ревизия
739e56cb77

+ 13 - 1
packages/core/e2e/database-transactions.e2e-spec.ts

@@ -6,7 +6,7 @@ import path from 'path';
 import { initialData } from '../../../e2e-common/e2e-initial-data';
 import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 
-import { TransactionTestPlugin } from './fixtures/test-plugins/transaction-test-plugin';
+import { TransactionTestPlugin, TRIGGER_EMAIL } from './fixtures/test-plugins/transaction-test-plugin';
 
 describe('Transaction infrastructure', () => {
     const { server, adminClient } = createTestEnvironment(
@@ -101,6 +101,18 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.admins.find((a: any) => a.emailAddress === 'test4')).toBe(false);
         expect(!!verify.users.find((u: any) => u.identifier === 'test4')).toBe(true);
     });
+
+    // Testing https://github.com/vendure-ecommerce/vendure/issues/520
+    it('passing transaction via EventBus', async () => {
+        TransactionTestPlugin.errorHandler.mockClear();
+        const { createTestAdministrator } = await adminClient.query(CREATE_ADMIN, {
+            emailAddress: TRIGGER_EMAIL,
+            fail: false,
+        });
+        await TransactionTestPlugin.eventHandlerComplete$.toPromise();
+        expect(createTestAdministrator.emailAddress).toBe(TRIGGER_EMAIL);
+        expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
+    });
 });
 
 const ADMIN_FRAGMENT = gql`

+ 47 - 5
packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts

@@ -1,8 +1,9 @@
-import { Injectable } from '@nestjs/common';
+import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
 import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
 import {
     Administrator,
     Ctx,
+    EventBus,
     InternalServerError,
     NativeAuthenticationMethod,
     PluginCommonModule,
@@ -10,9 +11,19 @@ import {
     Transaction,
     TransactionalConnection,
     User,
+    VendureEvent,
     VendurePlugin,
 } from '@vendure/core';
 import gql from 'graphql-tag';
+import { ReplaySubject, Subscription } from 'rxjs';
+
+export class TestEvent extends VendureEvent {
+    constructor(public ctx: RequestContext, public administrator: Administrator) {
+        super();
+    }
+}
+
+export const TRIGGER_EMAIL = 'trigger-email';
 
 @Injectable()
 class TestUserService {
@@ -60,12 +71,18 @@ class TestAdminService {
 
 @Resolver()
 class TestResolver {
-    constructor(private testAdminService: TestAdminService, private connection: TransactionalConnection) {}
+    constructor(
+        private testAdminService: TestAdminService,
+        private connection: TransactionalConnection,
+        private eventBus: EventBus,
+    ) {}
 
     @Mutation()
     @Transaction()
-    createTestAdministrator(@Ctx() ctx: RequestContext, @Args() args: any) {
-        return this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail);
+    async createTestAdministrator(@Ctx() ctx: RequestContext, @Args() args: any) {
+        const admin = await this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail);
+        this.eventBus.publish(new TestEvent(ctx, admin));
+        return admin;
     }
 
     @Mutation()
@@ -114,4 +131,29 @@ class TestResolver {
         resolvers: [TestResolver],
     },
 })
-export class TransactionTestPlugin {}
+export class TransactionTestPlugin implements OnApplicationBootstrap {
+    private subscription: Subscription;
+    static errorHandler = jest.fn();
+    static eventHandlerComplete$ = new ReplaySubject(1);
+
+    constructor(private eventBus: EventBus, private connection: TransactionalConnection) {}
+
+    onApplicationBootstrap(): any {
+        // This part is used to test how RequestContext with transactions behave
+        // when used in an Event subscription
+        this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => {
+            const { ctx, administrator } = event;
+            if (administrator.emailAddress === TRIGGER_EMAIL) {
+                administrator.lastName = 'modified';
+                try {
+                    await new Promise(resolve => setTimeout(resolve, 1));
+                    await this.connection.getRepository(ctx, Administrator).save(administrator);
+                } catch (e) {
+                    TransactionTestPlugin.errorHandler(e);
+                } finally {
+                    TransactionTestPlugin.eventHandlerComplete$.complete();
+                }
+            }
+        });
+    }
+}

+ 106 - 56
packages/core/src/api/common/request-context.spec.ts

@@ -9,97 +9,147 @@ import { Zone } from '../../entity/zone/zone.entity';
 import { RequestContext, SerializedRequestContext } from './request-context';
 
 describe('RequestContext', () => {
-    describe('fromObject()', () => {
+    describe('serialize/deserialize', () => {
+        let serializedCtx: SerializedRequestContext;
         let original: RequestContext;
-        let ctxObject: SerializedRequestContext;
-        let session: CachedSession;
-        let channel: Channel;
-        let activeOrder: Order;
-        let zone: Zone;
 
         beforeAll(() => {
-            activeOrder = new Order({
-                id: '55555',
-                active: true,
-                code: 'ADAWDJAWD',
-            });
-            session = {
-                cacheExpiry: Number.MAX_SAFE_INTEGER,
-                expires: new Date(),
-                id: '1234',
-                token: '2d37187e9e8fc47807fe4f58ca',
-                activeOrderId: '123',
-                user: {
-                    id: '8833774',
-                    identifier: 'user',
-                    verified: true,
-                    channelPermissions: [],
-                },
-            };
-            zone = new Zone({
-                id: '62626',
-                name: 'Europe',
-            });
-            channel = new Channel({
-                token: 'oiajwodij09au3r',
-                id: '995859',
-                code: '__default_channel__',
-                currencyCode: CurrencyCode.EUR,
-                pricesIncludeTax: true,
-                defaultLanguageCode: LanguageCode.en,
-                defaultShippingZone: zone,
-                defaultTaxZone: zone,
-            });
-            original = new RequestContext({
-                apiType: 'admin',
-                languageCode: LanguageCode.en,
-                channel,
-                session,
-                isAuthorized: true,
-                authorizedAsOwnerOnly: false,
-            });
-
-            ctxObject = original.serialize();
+            original = createRequestContext();
+            serializedCtx = original.serialize();
         });
 
         it('apiType', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.apiType).toBe(original.apiType);
         });
 
         it('channelId', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.channelId).toBe(original.channelId);
         });
 
         it('languageCode', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.languageCode).toBe(original.languageCode);
         });
 
         it('activeUserId', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.activeUserId).toBe(original.activeUserId);
         });
 
         it('isAuthorized', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.isAuthorized).toBe(original.isAuthorized);
         });
 
         it('authorizedAsOwnerOnly', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.authorizedAsOwnerOnly).toBe(original.authorizedAsOwnerOnly);
         });
 
         it('channel', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.channel).toEqual(original.channel);
         });
 
         it('session', () => {
-            const result = RequestContext.deserialize(ctxObject);
+            const result = RequestContext.deserialize(serializedCtx);
             expect(result.session).toEqual(original.session);
         });
     });
+
+    describe('copy', () => {
+        let original: RequestContext;
+
+        beforeAll(() => {
+            original = createRequestContext();
+        });
+
+        it('is a RequestContext instance', () => {
+            const copy = original.copy();
+            expect(copy instanceof RequestContext).toBe(true);
+        });
+
+        it('is not identical to original', () => {
+            const copy = original.copy();
+            expect(copy === original).toBe(false);
+        });
+
+        it('getters work', () => {
+            const copy = original.copy();
+
+            expect(copy.apiType).toEqual(original.apiType);
+            expect(copy.channelId).toEqual(original.channelId);
+            expect(copy.languageCode).toEqual(original.languageCode);
+            expect(copy.activeUserId).toEqual(original.activeUserId);
+            expect(copy.isAuthorized).toEqual(original.isAuthorized);
+            expect(copy.authorizedAsOwnerOnly).toEqual(original.authorizedAsOwnerOnly);
+            expect(copy.channel).toEqual(original.channel);
+            expect(copy.session).toEqual(original.session);
+        });
+
+        it('mutating copy leaves original intact', () => {
+            const copy = original.copy();
+            (copy as any).foo = 'bar';
+
+            expect((copy as any).foo).toBe('bar');
+            expect((original as any).foo).toBeUndefined();
+        });
+
+        it('mutating deep property affects both', () => {
+            const copy = original.copy();
+            copy.channel.code = 'changed';
+
+            expect(copy.channel.code).toBe('changed');
+            expect(original.channel.code).toBe('changed');
+        });
+    });
+
+    function createRequestContext() {
+        let session: CachedSession;
+        let channel: Channel;
+        let activeOrder: Order;
+        let zone: Zone;
+        activeOrder = new Order({
+            id: '55555',
+            active: true,
+            code: 'ADAWDJAWD',
+        });
+        session = {
+            cacheExpiry: Number.MAX_SAFE_INTEGER,
+            expires: new Date(),
+            id: '1234',
+            token: '2d37187e9e8fc47807fe4f58ca',
+            activeOrderId: '123',
+            user: {
+                id: '8833774',
+                identifier: 'user',
+                verified: true,
+                channelPermissions: [],
+            },
+        };
+        zone = new Zone({
+            id: '62626',
+            name: 'Europe',
+        });
+        channel = new Channel({
+            token: 'oiajwodij09au3r',
+            id: '995859',
+            code: '__default_channel__',
+            currencyCode: CurrencyCode.EUR,
+            pricesIncludeTax: true,
+            defaultLanguageCode: LanguageCode.en,
+            defaultShippingZone: zone,
+            defaultTaxZone: zone,
+        });
+        return new RequestContext({
+            apiType: 'admin',
+            languageCode: LanguageCode.en,
+            channel,
+            session,
+            isAuthorized: true,
+            authorizedAsOwnerOnly: false,
+        });
+    }
 });

+ 18 - 2
packages/core/src/api/common/request-context.ts

@@ -85,8 +85,8 @@ export class RequestContext {
 
     /**
      * @description
-     * Creates a new RequestContext object from a plain object which is the result of
-     * a JSON serialization - deserialization operation.
+     * Creates a new RequestContext object from a serialized object created by the
+     * `serialize()` method.
      */
     static deserialize(ctxObject: SerializedRequestContext): RequestContext {
         return new RequestContext({
@@ -102,10 +102,26 @@ export class RequestContext {
         });
     }
 
+    /**
+     * @description
+     * Serializes the RequestContext object into a JSON-compatible simple object.
+     * This is useful when you need to send a RequestContext object to another
+     * process, e.g. to pass it to the Worker process via the {@link WorkerService}.
+     */
     serialize(): SerializedRequestContext {
         return JSON.parse(JSON.stringify(this));
     }
 
+    /**
+     * @description
+     * Creates a shallow copy of the RequestContext instance. This means that
+     * mutations to the copy itself will not affect the original, but deep mutations
+     * (e.g. copy.channel.code = 'new') *will* also affect the original.
+     */
+    copy(): RequestContext {
+        return Object.assign(Object.create(Object.getPrototypeOf(this)), this);
+    }
+
     get apiType(): ApiType {
         return this._apiType;
     }

+ 33 - 2
packages/core/src/event-bus/event-bus.ts

@@ -3,6 +3,9 @@ import { Type } from '@vendure/common/lib/shared-types';
 import { Observable, Subject } from 'rxjs';
 import { filter, takeUntil } from 'rxjs/operators';
 
+import { RequestContext } from '../api/common/request-context';
+import { TRANSACTION_MANAGER_KEY } from '../common/constants';
+
 import { VendureEvent } from './vendure-event';
 
 export type EventHandler<T extends VendureEvent> = (event: T) => void;
@@ -33,7 +36,7 @@ export class EventBus implements OnModuleDestroy {
                 handlers[i](event);
             }
         }
-        this.eventStream.next(event);
+        this.eventStream.next(this.prepareRequestContext(event));
     }
 
     /**
@@ -62,11 +65,39 @@ export class EventBus implements OnModuleDestroy {
             handlers.push(handler);
         }
         this.subscriberMap.set(type, handlers);
-        return () => this.subscriberMap.set(type, handlers.filter(h => h !== handler));
+        return () =>
+            this.subscriberMap.set(
+                type,
+                handlers.filter(h => h !== handler),
+            );
     }
 
     /** @internal */
     onModuleDestroy(): any {
         this.destroy$.next();
     }
+
+    /**
+     * If the Event includes a RequestContext property, we need to:
+     *
+     * 1) Set it as a copy of the original
+     * 2) Remove the TRANSACTION_MANAGER_KEY from that copy
+     *
+     * The TRANSACTION_MANAGER_KEY is used to track transactions across calls
+     * (this is why we always pass the `ctx` object to get TransactionalConnection.getRepository() method).
+     * However, allowing a transaction to continue in an async event subscriber function _will_ cause
+     * very confusing issues (see https://github.com/vendure-ecommerce/vendure/issues/520), which is why
+     * we simply remove the reference to the transaction manager from the context object altogether.
+     */
+    private prepareRequestContext<T extends VendureEvent>(event: T): T {
+        for (const propertyName of Object.getOwnPropertyNames(event)) {
+            const property = event[propertyName as keyof T];
+            if (property instanceof RequestContext) {
+                const ctxCopy = property.copy();
+                delete (ctxCopy as any)[TRANSACTION_MANAGER_KEY];
+                (event[propertyName as keyof T] as any) = ctxCopy;
+            }
+        }
+        return event;
+    }
 }