소스 검색

fix(core): Copy context on transaction start. Do not allow to run queries after transaction aborts. (#1481)

* fix(core): More restrictive check on released query runner. Copy context on execute transaction

* fix(core): Corrected docs

* fix(core): Added some tests to reproduce an issue. Move copy operation to executeInTransaction() function. Copy context on firing event subscription, associate context with handlers.
Alexander Shitikov 3 년 전
부모
커밋
6050279860

+ 62 - 1
packages/core/e2e/database-transactions.e2e-spec.ts

@@ -13,6 +13,14 @@ import {
     TRIGGER_ATTEMPTED_UPDATE_EMAIL,
 } from './fixtures/test-plugins/transaction-test-plugin';
 
+type DBType = 'mysql' | 'postgres' | 'sqlite' | 'sqljs';
+
+const itIfDb = (dbs: DBType[]) => {
+    return dbs.includes(process.env.DB as DBType || 'sqljs')
+        ? it
+        : it.skip
+} 
+
 describe('Transaction infrastructure', () => {
     const { server, adminClient } = createTestEnvironment(
         mergeConfig(testConfig(), {
@@ -69,6 +77,26 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.users.find((u: any) => u.identifier === 'test2')).toBe(false);
     });
 
+    it('failing mutation with promise concurrent execution', async () => {
+        try {
+            await adminClient.query(CREATE_N_ADMINS, {
+                emailAddress: 'testN-',
+                failFactor: 0.4,
+                n: 10
+            })
+            fail('Should have thrown');
+        } catch (e) {
+            expect(e.message).toContain('Failed!');
+        }
+
+        const { verify } = await adminClient.query(VERIFY_TEST);
+
+        expect(verify.admins.length).toBe(2);
+        expect(verify.users.length).toBe(2);
+        expect(!!verify.admins.find((a: any) => a.emailAddress.includes('testN'))).toBe(false);
+        expect(!!verify.users.find((u: any) => u.identifier.includes('testN'))).toBe(false);
+    });
+
     it('failing manual mutation', async () => {
         try {
             await adminClient.query(CREATE_ADMIN2, {
@@ -127,6 +155,27 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
     });
 
+    itIfDb(['postgres', 'mysql'])('failing mutation inside connection.withTransaction() wrapper with context and promise concurrent execution', async () => {
+        try {
+            await adminClient.query(CREATE_N_ADMINS2, {
+                emailAddress: 'testN-',
+                failFactor: 0.4,
+                n: 10
+            })
+            fail('Should have thrown');
+        } catch (e) {
+            expect(e.message)
+                .toMatch(/^Failed!|Query runner already released. Cannot run queries anymore.$/);
+        }
+
+        const { verify } = await adminClient.query(VERIFY_TEST);
+
+        expect(verify.admins.length).toBe(2);
+        expect(verify.users.length).toBe(3);
+        expect(!!verify.admins.find((a: any) => a.emailAddress.includes('testN'))).toBe(false);
+        expect(!!verify.users.find((u: any) => u.identifier.includes('testN'))).toBe(false);
+    });
+
     it('failing mutation inside connection.withTransaction() wrapper without request context', async () => {
         try {
             await adminClient.query(CREATE_ADMIN5, {
@@ -228,6 +277,18 @@ const CREATE_ADMIN5 = gql`
     ${ADMIN_FRAGMENT}
 `;
 
+const CREATE_N_ADMINS = gql`
+    mutation CreateNTestAdmins($emailAddress: String!, $failFactor: Float!, $n: Int!) {
+        createNTestAdministrators(emailAddress: $emailAddress, failFactor: $failFactor, n: $n)
+    }
+`;
+
+const CREATE_N_ADMINS2 = gql`
+    mutation CreateNTestAdmins2($emailAddress: String!, $failFactor: Float!, $n: Int!) {
+        createNTestAdministrators2(emailAddress: $emailAddress, failFactor: $failFactor, n: $n)
+    }
+`;
+
 const VERIFY_TEST = gql`
     query VerifyTest {
         verify {
@@ -241,4 +302,4 @@ const VERIFY_TEST = gql`
             }
         }
     }
-`;
+`;

+ 85 - 4
packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts

@@ -37,7 +37,8 @@ class TestUserService {
                 passwordHash: 'abc',
             }),
         );
-        const user = await this.connection.getRepository(ctx, User).save(
+
+        await this.connection.getRepository(ctx, User).insert(
             new User({
                 authenticationMethods: [authMethod],
                 identifier,
@@ -45,7 +46,10 @@ class TestUserService {
                 verified: true,
             }),
         );
-        return user;
+
+        return this.connection.getRepository(ctx, User).findOne({
+            where: { identifier }
+        });
     }
 }
 
@@ -55,9 +59,11 @@ class TestAdminService {
 
     async createAdministrator(ctx: RequestContext, emailAddress: string, fail: boolean) {
         const user = await this.userService.createUser(ctx, emailAddress);
+
         if (fail) {
             throw new InternalServerError('Failed!');
         }
+
         const admin = await this.connection.getRepository(ctx, Administrator).save(
             new Administrator({
                 emailAddress,
@@ -112,7 +118,7 @@ class TestResolver {
     @Mutation()
     async createTestAdministrator5(@Ctx() ctx: RequestContext, @Args() args: any) {
         if (args.noContext === true) {
-            return this.connection.withTransaction(ctx, async _ctx => {
+            return this.connection.withTransaction(async _ctx => {
                 const admin = await this.testAdminService.createAdministrator(
                     _ctx,
                     args.emailAddress,
@@ -121,7 +127,7 @@ class TestResolver {
                 return admin;
             });
         } else {
-            return this.connection.withTransaction(async _ctx => {
+            return this.connection.withTransaction(ctx, async _ctx => {
                 const admin = await this.testAdminService.createAdministrator(
                     _ctx,
                     args.emailAddress,
@@ -132,6 +138,61 @@ class TestResolver {
         }
     }
 
+    @Mutation()
+    @Transaction()
+    async createNTestAdministrators(@Ctx() ctx: RequestContext, @Args() args: any) {
+        let error: any;
+
+        const promises: Promise<any>[] = []
+        for (let i = 0; i < args.n; i++) {
+            promises.push(
+                new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
+                    this.testAdminService.createAdministrator(ctx, `${args.emailAddress}${i}`, i < args.n * args.failFactor)
+                )
+            )
+        }
+
+        const result = await Promise.all(promises).catch((e: any) => {
+            error = e;
+        })
+
+        await this.allSettled(promises)
+    
+        if (error) {
+            throw error;
+        }
+
+        return result;
+    }
+
+    @Mutation()
+    async createNTestAdministrators2(@Ctx() ctx: RequestContext, @Args() args: any) {
+        let error: any;
+
+        const promises: Promise<any>[] = []
+        const result = await this.connection.withTransaction(ctx, _ctx => {
+            for (let i = 0; i < args.n; i++) {
+                promises.push(
+                    new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
+                        this.testAdminService.createAdministrator(_ctx, `${args.emailAddress}${i}`, i < args.n * args.failFactor)
+                    )
+                )
+            }
+
+            return Promise.all(promises);
+        }).catch((e: any) => {
+            error = e;
+        })
+
+        await this.allSettled(promises)
+    
+        if (error) {
+            throw error;
+        }
+
+        return result;
+    }
+
     @Query()
     async verify() {
         const admins = await this.connection.getRepository(Administrator).find();
@@ -141,6 +202,24 @@ class TestResolver {
             users,
         };
     }
+
+    // Promise.allSettled polyfill
+    // Same as Promise.all but waits until all promises will be fulfilled or rejected.
+    private allSettled<T>(promises: Promise<T>[]): Promise<({status: 'fulfilled', value: T} | { status: 'rejected', reason: any})[]> {
+        return Promise.all(
+            promises.map((promise, i) =>
+              promise
+                .then(value => ({
+                  status: "fulfilled" as const,
+                  value,
+                }))
+                .catch(reason => ({
+                  status: "rejected" as const,
+                  reason,
+                }))
+            )
+          );
+    }
 }
 
 @VendurePlugin({
@@ -158,6 +237,8 @@ class TestResolver {
                     fail: Boolean!
                     noContext: Boolean!
                 ): Administrator
+                createNTestAdministrators(emailAddress: String!, failFactor: Float!, n: Int!): JSON
+                createNTestAdministrators2(emailAddress: String!, failFactor: Float!, n: Int!): JSON
             }
             type VerifyResult {
                 admins: [Administrator!]!

+ 12 - 3
packages/core/src/api/decorators/request-context.decorator.ts

@@ -1,6 +1,6 @@
 import { ContextType, createParamDecorator, ExecutionContext } from '@nestjs/common';
 
-import { REQUEST_CONTEXT_KEY } from '../../common/constants';
+import { REQUEST_CONTEXT_KEY, REQUEST_CONTEXT_MAP_KEY } from '../../common/constants';
 
 /**
  * @description
@@ -19,11 +19,20 @@ import { REQUEST_CONTEXT_KEY } from '../../common/constants';
  * @docsPage Ctx Decorator
  */
 export const Ctx = createParamDecorator((data, ctx: ExecutionContext) => {
+    const getContext = (req: any) => {
+        const map: Map<Function, any> | undefined = req[REQUEST_CONTEXT_MAP_KEY];
+
+        // If a map contains associated transactional context with this handler
+        // we have to use it. It means that this handler was wrapped with @Transaction decorator.
+        // Otherwise use default context.
+        return map?.get(ctx.getHandler()) || req[REQUEST_CONTEXT_KEY];
+    }
+
     if (ctx.getType<ContextType | 'graphql'>() === 'graphql') {
         // GraphQL request
-        return ctx.getArgByIndex(2).req[REQUEST_CONTEXT_KEY];
+        return getContext(ctx.getArgByIndex(2).req);
     } else {
         // REST request
-        return ctx.switchToHttp().getRequest()[REQUEST_CONTEXT_KEY];
+        return getContext(ctx.switchToHttp().getRequest());
     }
 });

+ 26 - 4
packages/core/src/api/middleware/transaction-interceptor.ts

@@ -1,8 +1,9 @@
 import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
 import { Reflector } from '@nestjs/core';
 import { Observable, of } from 'rxjs';
+import { RequestContext } from '..';
 
-import { REQUEST_CONTEXT_KEY } from '../../common/constants';
+import { REQUEST_CONTEXT_KEY, REQUEST_CONTEXT_MAP_KEY } from '../../common/constants';
 import { TransactionWrapper } from '../../connection/transaction-wrapper';
 import { TransactionalConnection } from '../../connection/transactional-connection';
 import { parseContext } from '../common/parse-context';
@@ -20,24 +21,45 @@ export class TransactionInterceptor implements NestInterceptor {
         private transactionWrapper: TransactionWrapper,
         private reflector: Reflector,
     ) {}
+
     intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
         const { isGraphQL, req } = parseContext(context);
-        const ctx = (req as any)[REQUEST_CONTEXT_KEY];
+        const ctx: RequestContext | undefined = (req as any)[REQUEST_CONTEXT_KEY];
+
         if (ctx) {
             const transactionMode = this.reflector.get<TransactionMode>(
                 TRANSACTION_MODE_METADATA_KEY,
                 context.getHandler(),
             );
+            
             return of(
                 this.transactionWrapper.executeInTransaction(
                     ctx,
-                    () => next.handle(),
+                    (ctx) => {
+                        this.registerTransactionalContext(ctx, context.getHandler(), req);
+
+                        return next.handle()
+                    },
                     transactionMode,
                     this.connection.rawConnection,
-                ),
+                )
             );
         } else {
             return next.handle();
         }
     }
+
+    /**
+     * Registers transactional request context associated with execution handler function
+     * 
+     * @param ctx transactional request context
+     * @param handler handler function from ExecutionContext
+     * @param req Request object
+     */
+    registerTransactionalContext(ctx: RequestContext, handler: Function, req: any) {
+        const map: Map<Function, RequestContext> = req[REQUEST_CONTEXT_MAP_KEY] || new Map();
+        map.set(handler, ctx);
+
+        req[REQUEST_CONTEXT_MAP_KEY] = map;
+    }
 }

+ 1 - 0
packages/core/src/common/constants.ts

@@ -9,6 +9,7 @@ import { CrudPermissionDefinition, PermissionDefinition, PermissionMetadata } fr
 export const DEFAULT_LANGUAGE_CODE = LanguageCode.en;
 export const TRANSACTION_MANAGER_KEY = Symbol('TRANSACTION_MANAGER');
 export const REQUEST_CONTEXT_KEY = 'vendureRequestContext';
+export const REQUEST_CONTEXT_MAP_KEY = 'vendureRequestContextMap';
 export const DEFAULT_PERMISSIONS: PermissionDefinition[] = [
     new PermissionDefinition({
         name: 'Authenticated',

+ 11 - 4
packages/core/src/connection/transaction-wrapper.ts

@@ -18,19 +18,26 @@ export class TransactionWrapper {
      * Executes the `work` function within the context of a transaction. If the `work` function
      * resolves / completes, then all the DB operations it contains will be committed. If it
      * throws an error or rejects, then all DB operations will be rolled back.
+     * 
+     * @note
+     * This function does not mutate your context. Instead, this function makes a copy and passes
+     * context to work function.
      */
     async executeInTransaction<T>(
-        ctx: RequestContext,
-        work: () => Observable<T> | Promise<T>,
+        originalCtx: RequestContext,
+        work: (ctx: RequestContext) => Observable<T> | Promise<T>,
         mode: TransactionMode,
         connection: Connection,
     ): Promise<T> {
+        // Copy to make sure original context will remain valid after transaction completes
+        const ctx = originalCtx.copy();
+
         const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
         if (queryRunnerExists) {
             // If a QueryRunner already exists on the RequestContext, there must be an existing
             // outer transaction in progress. In that case, we just execute the work function
             // as usual without needing to further wrap in a transaction.
-            return from(work()).toPromise();
+            return from(work(ctx)).toPromise();
         }
         const queryRunner = connection.createQueryRunner();
         if (mode === 'auto') {
@@ -40,7 +47,7 @@ export class TransactionWrapper {
 
         try {
             const maxRetries = 5;
-            const result = await from(work())
+            const result = await from(work(ctx))
                 .pipe(
                     retryWhen(errors =>
                         errors.pipe(

+ 11 - 6
packages/core/src/connection/transactional-connection.ts

@@ -73,8 +73,9 @@ export class TransactionalConnection {
     ): Repository<Entity> {
         if (ctxOrTarget instanceof RequestContext) {
             const transactionManager = this.getTransactionManager(ctxOrTarget);
-            if (transactionManager && maybeTarget && !transactionManager.queryRunner?.isReleased) {
-                return transactionManager.getRepository(maybeTarget);
+            if (transactionManager) {
+                // tslint:disable-next-line:no-non-null-assertion
+                return transactionManager.getRepository(maybeTarget!);
             } else {
                 // tslint:disable-next-line:no-non-null-assertion
                 return getRepository(maybeTarget!);
@@ -101,15 +102,19 @@ export class TransactionalConnection {
      * of Vendure internal services.
      *
      * If there is already a {@link RequestContext} object available, you should pass it in as the first
-     * argument in order to add a new transaction to it. If not, omit the first argument and an empty
+     * argument in order to create transactional context as the copy. If not, omit the first argument and an empty
      * RequestContext object will be created, which is then used to propagate the transaction to
      * all inner method calls.
      *
      * @example
      * ```TypeScript
-     * private async transferCredit(fromId: ID, toId: ID, amount: number) {
-     *   await this.connection.withTransaction(ctx => {
+     * private async transferCredit(outerCtx: RequestContext, fromId: ID, toId: ID, amount: number) {
+     *   await this.connection.withTransaction(outerCtx, ctx => {
      *     await this.giftCardService.updateCustomerCredit(fromId, -amount);
+     * 
+     *     // Note you must not use outerCtx here, instead use ctx. Otherwise this query
+     *     // will be executed outside of transaction
+     *     await this.connection.getRepository(ctx, GiftCard).update(fromId, { transferred: true })
      *
      *     // If some intermediate logic here throws an Error,
      *     // then all DB transactions will be rolled back and neither Customer's
@@ -138,7 +143,7 @@ export class TransactionalConnection {
             ctx = RequestContext.empty();
             work = ctxOrWork;
         }
-        return this.transactionWrapper.executeInTransaction(ctx, () => work(ctx), 'auto', this.rawConnection);
+        return this.transactionWrapper.executeInTransaction(ctx, work, 'auto', this.rawConnection);
     }
 
     /**

+ 18 - 3
packages/core/src/event-bus/event-bus.ts

@@ -110,14 +110,29 @@ export class EventBus implements OnModuleDestroy {
      * * https://github.com/vendure-ecommerce/vendure/issues/1107
      */
     private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T> {
-        const ctx = Object.values(event).find(value => value instanceof RequestContext);
-        if (!ctx) {
+        const entry = Object.entries(event).find(([_, value]) => value instanceof RequestContext);
+
+        if (!entry) {
             return event;
         }
+
+        const [key, ctx]: [string, RequestContext] = entry;
+        
         const transactionManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
         if (!transactionManager?.queryRunner) {
             return event;
         }
-        return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => event);
+
+        return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => {
+            // Copy context and remove transaction manager
+            // This will prevent queries to released query runner
+            const newContext = ctx.copy();
+            delete (newContext as any)[TRANSACTION_MANAGER_KEY];
+
+            // Reassign new context
+            (event as any)[key] = newContext
+
+            return event;
+        });
     }
 }