فهرست منبع

feat(core): Support save points (nested transactions) (#1579)

deps: TypeORM upgrade 0.2.41 -> 0.2.45.

BREAKING CHANGE (TypeORM): Due to an update of the TypeORM version, there is a potential breaking change if you make use of TypeORM's soft-remove feature in combination with listeners/subscribers. Namely, update listeners and subscriber no longer triggered by soft-remove and recover (https://github.com/typeorm/typeorm/blob/master/CHANGELOG.md#0242-2022-02-16). This is not used in Vendure core and is a relatively obscure edge-case.
Alexander Shitikov 3 سال پیش
والد
کامیت
9813d11556

+ 42 - 0
packages/core/e2e/database-transactions.e2e-spec.ts

@@ -11,6 +11,7 @@ import {
     TransactionTestPlugin,
     TRIGGER_ATTEMPTED_READ_EMAIL,
     TRIGGER_ATTEMPTED_UPDATE_EMAIL,
+    TRIGGER_NO_OPERATION,
 } from './fixtures/test-plugins/transaction-test-plugin';
 
 type DBType = 'mysql' | 'postgres' | 'sqlite' | 'sqljs';
@@ -196,6 +197,41 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
     });
 
+    it('non-failing mutation inside connection.withTransaction() wrapper with failing nested transactions and request context', async () => {
+        await adminClient.query(CREATE_N_ADMINS3, {
+            emailAddress: 'testNestedTransactionsN-',
+            failFactor: 0.5,
+            n: 2
+        })
+
+        const { verify } = await adminClient.query(VERIFY_TEST);
+
+        expect(verify.admins.length).toBe(3);
+        expect(verify.users.length).toBe(4);
+        expect(verify.admins.filter((a: any) => a.emailAddress.includes('testNestedTransactionsN'))).toHaveLength(1);
+        expect(verify.users.filter((u: any) => u.identifier.includes('testNestedTransactionsN'))).toHaveLength(1);
+    });
+
+    it('event do not publish after transaction rollback', async () => {
+        TransactionTestPlugin.reset();
+        try {
+            await adminClient.query(CREATE_N_ADMINS, {
+                emailAddress: TRIGGER_NO_OPERATION,
+                failFactor: 0.5,
+                n: 2
+            });
+            fail('Should have thrown');
+        } catch (e) {
+            expect(e.message).toContain('Failed!');
+        }
+
+        // Wait a bit to see an events in handlers
+        await new Promise(resolve => setTimeout(resolve, 100));
+        
+        expect(TransactionTestPlugin.callHandler).not.toHaveBeenCalled();
+        expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
+    });
+
     // Testing https://github.com/vendure-ecommerce/vendure/issues/520
     it('passing transaction via EventBus', async () => {
         TransactionTestPlugin.reset();
@@ -289,6 +325,12 @@ const CREATE_N_ADMINS2 = gql`
     }
 `;
 
+const CREATE_N_ADMINS3 = gql`
+    mutation CreateNTestAdmins3($emailAddress: String!, $failFactor: Float!, $n: Int!) {
+        createNTestAdministrators3(emailAddress: $emailAddress, failFactor: $failFactor, n: $n)
+    }
+`;
+
 const VERIFY_TEST = gql`
     query VerifyTest {
         verify {

+ 96 - 38
packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts

@@ -23,6 +23,7 @@ export class TestEvent extends VendureEvent {
     }
 }
 
+export const TRIGGER_NO_OPERATION = 'trigger-no-operation';
 export const TRIGGER_ATTEMPTED_UPDATE_EMAIL = 'trigger-attempted-update-email';
 export const TRIGGER_ATTEMPTED_READ_EMAIL = 'trigger-attempted-read-email';
 
@@ -48,7 +49,7 @@ class TestUserService {
         );
 
         return this.connection.getRepository(ctx, User).findOne({
-            where: { identifier }
+            where: { identifier },
         });
     }
 }
@@ -143,21 +144,30 @@ class TestResolver {
     async createNTestAdministrators(@Ctx() ctx: RequestContext, @Args() args: any) {
         let error: any;
 
-        const promises: Promise<any>[] = []
+        const promises: Promise<any>[] = [];
         for (let i = 0; i < args.n; i++) {
             promises.push(
-                new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
-                    this.testAdminService.createAdministrator(ctx, `${args.emailAddress}${i}`, i < args.n * args.failFactor)
-                )
-            )
+                new Promise(resolve => setTimeout(resolve, i * 10))
+                    .then(() =>
+                        this.testAdminService.createAdministrator(
+                            ctx,
+                            `${args.emailAddress}${i}`,
+                            i < args.n * args.failFactor,
+                        ),
+                    )
+                    .then(admin => {
+                        this.eventBus.publish(new TestEvent(ctx, admin));
+                        return admin;
+                    }),
+            );
         }
 
         const result = await Promise.all(promises).catch((e: any) => {
             error = e;
-        })
+        });
+
+        await this.allSettled(promises);
 
-        await this.allSettled(promises)
-    
         if (error) {
             throw error;
         }
@@ -169,23 +179,29 @@ class TestResolver {
     async createNTestAdministrators2(@Ctx() ctx: RequestContext, @Args() args: any) {
         let error: any;
 
-        const promises: Promise<any>[] = []
-        const result = await this.connection.withTransaction(ctx, _ctx => {
-            for (let i = 0; i < args.n; i++) {
-                promises.push(
-                    new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
-                        this.testAdminService.createAdministrator(_ctx, `${args.emailAddress}${i}`, i < args.n * args.failFactor)
-                    )
-                )
-            }
+        const promises: Promise<any>[] = [];
+        const result = await this.connection
+            .withTransaction(ctx, _ctx => {
+                for (let i = 0; i < args.n; i++) {
+                    promises.push(
+                        new Promise(resolve => setTimeout(resolve, i * 10)).then(() =>
+                            this.testAdminService.createAdministrator(
+                                _ctx,
+                                `${args.emailAddress}${i}`,
+                                i < args.n * args.failFactor,
+                            ),
+                        ),
+                    );
+                }
 
-            return Promise.all(promises);
-        }).catch((e: any) => {
-            error = e;
-        })
+                return Promise.all(promises);
+            })
+            .catch((e: any) => {
+                error = e;
+            });
+
+        await this.allSettled(promises);
 
-        await this.allSettled(promises)
-    
         if (error) {
             throw error;
         }
@@ -193,6 +209,36 @@ class TestResolver {
         return result;
     }
 
+    @Mutation()
+    @Transaction()
+    async createNTestAdministrators3(@Ctx() ctx: RequestContext, @Args() args: any) {
+        const result: any[] = [];
+
+        const admin = await this.testAdminService.createAdministrator(
+            ctx,
+            `${args.emailAddress}${args.n}`,
+            args.failFactor >= 1,
+        );
+
+        result.push(admin);
+
+        if (args.n > 0) {
+            try {
+                const admins = await this.connection.withTransaction(ctx, _ctx =>
+                    this.createNTestAdministrators3(_ctx, {
+                        ...args,
+                        n: args.n - 1,
+                        failFactor: args.n * args.failFactor / (args.n - 1)
+                    })
+                );
+    
+                result.push(...admins);
+            } catch(e) {}
+        }
+
+        return result;
+    }
+
     @Query()
     async verify() {
         const admins = await this.connection.getRepository(Administrator).find();
@@ -205,20 +251,22 @@ class TestResolver {
 
     // Promise.allSettled polyfill
     // Same as Promise.all but waits until all promises will be fulfilled or rejected.
-    private allSettled<T>(promises: Promise<T>[]): Promise<({status: 'fulfilled', value: T} | { status: 'rejected', reason: any})[]> {
+    private allSettled<T>(
+        promises: Promise<T>[],
+    ): Promise<({ status: 'fulfilled'; value: T } | { status: 'rejected'; reason: any })[]> {
         return Promise.all(
             promises.map((promise, i) =>
-              promise
-                .then(value => ({
-                  status: "fulfilled" as const,
-                  value,
-                }))
-                .catch(reason => ({
-                  status: "rejected" as const,
-                  reason,
-                }))
-            )
-          );
+                promise
+                    .then(value => ({
+                        status: 'fulfilled' as const,
+                        value,
+                    }))
+                    .catch(reason => ({
+                        status: 'rejected' as const,
+                        reason,
+                    })),
+            ),
+        );
     }
 }
 
@@ -239,6 +287,7 @@ class TestResolver {
                 ): Administrator
                 createNTestAdministrators(emailAddress: String!, failFactor: Float!, n: Int!): JSON
                 createNTestAdministrators2(emailAddress: String!, failFactor: Float!, n: Int!): JSON
+                createNTestAdministrators3(emailAddress: String!, failFactor: Float!, n: Int!): JSON
             }
             type VerifyResult {
                 admins: [Administrator!]!
@@ -253,6 +302,7 @@ class TestResolver {
 })
 export class TransactionTestPlugin implements OnApplicationBootstrap {
     private subscription: Subscription;
+    static callHandler = jest.fn();
     static errorHandler = jest.fn();
     static eventHandlerComplete$ = new ReplaySubject(1);
 
@@ -260,6 +310,7 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
 
     static reset() {
         this.eventHandlerComplete$ = new ReplaySubject(1);
+        this.callHandler.mockClear();
         this.errorHandler.mockClear();
     }
 
@@ -268,7 +319,13 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
         // when used in an Event subscription
         this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => {
             const { ctx, administrator } = event;
-            if (administrator.emailAddress === TRIGGER_ATTEMPTED_UPDATE_EMAIL) {
+
+            if (administrator.emailAddress?.includes(TRIGGER_NO_OPERATION)) {
+                TransactionTestPlugin.callHandler();
+                TransactionTestPlugin.eventHandlerComplete$.complete();
+            }
+            if (administrator.emailAddress?.includes(TRIGGER_ATTEMPTED_UPDATE_EMAIL)) {
+                TransactionTestPlugin.callHandler();
                 const adminRepository = this.connection.getRepository(ctx, Administrator);
                 await new Promise(resolve => setTimeout(resolve, 50));
                 administrator.lastName = 'modified';
@@ -280,7 +337,8 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {
                     TransactionTestPlugin.eventHandlerComplete$.complete();
                 }
             }
-            if (administrator.emailAddress === TRIGGER_ATTEMPTED_READ_EMAIL) {
+            if (administrator.emailAddress?.includes(TRIGGER_ATTEMPTED_READ_EMAIL)) {
+                TransactionTestPlugin.callHandler();
                 // note the ctx is not passed here, so we are not inside the ongoing transaction
                 const adminRepository = this.connection.getRepository(Administrator);
                 try {

+ 1 - 1
packages/core/package.json

@@ -76,7 +76,7 @@
     "progress": "^2.0.3",
     "reflect-metadata": "^0.1.13",
     "rxjs": "^6.6.3",
-    "typeorm": "0.2.41"
+    "typeorm": "0.2.45"
   },
   "devDependencies": {
     "@types/bcrypt": "^3.0.0",

+ 49 - 8
packages/core/src/connection/transaction-subscriber.ts

@@ -1,14 +1,28 @@
 import { Injectable } from '@nestjs/common';
 import { InjectConnection } from '@nestjs/typeorm';
-import { merge, Subject } from 'rxjs';
-import { delay, filter, map, take } from 'rxjs/operators';
+import { merge, ObservableInput, Subject } from 'rxjs';
+import { delay, filter, map, take, tap } from 'rxjs/operators';
 import { Connection, EntitySubscriberInterface } from 'typeorm';
 import { EntityManager } from 'typeorm/entity-manager/EntityManager';
 import { QueryRunner } from 'typeorm/query-runner/QueryRunner';
 import { TransactionCommitEvent } from 'typeorm/subscriber/event/TransactionCommitEvent';
 import { TransactionRollbackEvent } from 'typeorm/subscriber/event/TransactionRollbackEvent';
 
+/**
+ * This error should be thrown by an event subscription if types do not match
+ * 
+ * @internal
+ */
+export class TransactionSubscriberError extends Error {}
+
+export type TransactionSubscriberEventType = 'commit' | 'rollback';
+
 export interface TransactionSubscriberEvent {
+    /**
+     * Event type. Either commit or rollback.
+     */
+    type: TransactionSubscriberEventType;
+
     /**
      * Connection used in the event.
      */
@@ -34,8 +48,7 @@ export interface TransactionSubscriberEvent {
  */
 @Injectable()
 export class TransactionSubscriber implements EntitySubscriberInterface {
-    private commit$ = new Subject<TransactionSubscriberEvent>();
-    private rollback$ = new Subject<TransactionSubscriberEvent>();
+    private subject$ = new Subject<TransactionSubscriberEvent>();
 
     constructor(@InjectConnection() private connection: Connection) {
         if (!connection.subscribers.find(subscriber => subscriber.constructor === TransactionSubscriber)) {
@@ -44,19 +57,47 @@ export class TransactionSubscriber implements EntitySubscriberInterface {
     }
 
     afterTransactionCommit(event: TransactionCommitEvent) {
-        this.commit$.next(event);
+        this.subject$.next({
+            type: 'commit',
+            ...event,
+        });
     }
 
     afterTransactionRollback(event: TransactionRollbackEvent) {
-        this.rollback$.next(event);
+        this.subject$.next({
+            type: 'rollback',
+            ...event,
+        });
+    }
+
+    awaitCommit(queryRunner: QueryRunner): Promise<QueryRunner> {
+        return this.awaitTransactionEvent(queryRunner, 'commit');
+    }
+
+    awaitRollback(queryRunner: QueryRunner): Promise<QueryRunner> {
+        return this.awaitTransactionEvent(queryRunner, 'rollback');
     }
 
     awaitRelease(queryRunner: QueryRunner): Promise<QueryRunner> {
+        return this.awaitTransactionEvent(queryRunner);
+    }
+
+    private awaitTransactionEvent(
+        queryRunner: QueryRunner,
+        type?: TransactionSubscriberEventType,
+    ): Promise<QueryRunner> {
         if (queryRunner.isTransactionActive) {
-            return merge(this.commit$, this.rollback$)
+            return this.subject$
                 .pipe(
-                    filter(event => event.queryRunner === queryRunner),
+                    filter(
+                        event => !event.queryRunner.isTransactionActive && event.queryRunner === queryRunner,
+                    ),
                     take(1),
+                    tap(event => {
+                        if (type && event.type !== type) {
+                            throw new TransactionSubscriberError(`Unexpected event type: ${event.type}. Expected ${type}.`);
+                        }
+                    }),
                     map(event => event.queryRunner),
                     // This `delay(0)` call appears to be necessary with the upgrade to TypeORM
                     // v0.2.41, otherwise an active queryRunner can still get picked up in an event

+ 9 - 10
packages/core/src/connection/transaction-wrapper.ts

@@ -1,6 +1,6 @@
 import { from, Observable, of } from 'rxjs';
 import { retryWhen, take, tap } from 'rxjs/operators';
-import { Connection, QueryRunner } from 'typeorm';
+import { Connection, EntityManager, QueryRunner } from 'typeorm';
 import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
 
 import { RequestContext } from '../api/common/request-context';
@@ -32,14 +32,9 @@ export class TransactionWrapper {
         // Copy to make sure original context will remain valid after transaction completes
         const ctx = originalCtx.copy();
 
-        const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
-        if (queryRunnerExists) {
-            // If a QueryRunner already exists on the RequestContext, there must be an existing
-            // outer transaction in progress. In that case, we just execute the work function
-            // as usual without needing to further wrap in a transaction.
-            return from(work(ctx)).toPromise();
-        }
-        const queryRunner = connection.createQueryRunner();
+        const entityManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
+        const queryRunner = entityManager?.queryRunner || connection.createQueryRunner();
+
         if (mode === 'auto') {
             await this.startTransaction(queryRunner);
         }
@@ -71,7 +66,11 @@ export class TransactionWrapper {
             }
             throw error;
         } finally {
-            if (queryRunner?.isReleased === false) {
+            if (!queryRunner.isTransactionActive 
+                && queryRunner.isReleased === false) {
+                // There is a check for an active transaction
+                // because this could be a nested transaction (savepoint).
+
                 await queryRunner.release();
             }
         }

+ 17 - 4
packages/core/src/event-bus/event-bus.ts

@@ -3,10 +3,11 @@ import { Type } from '@vendure/common/lib/shared-types';
 import { Observable, Subject } from 'rxjs';
 import { filter, mergeMap, takeUntil } from 'rxjs/operators';
 import { EntityManager } from 'typeorm';
+import { notNullOrUndefined } from '../../../common/lib/shared-utils';
 
 import { RequestContext } from '../api/common/request-context';
 import { TRANSACTION_MANAGER_KEY } from '../common/constants';
-import { TransactionSubscriber } from '../connection/transaction-subscriber';
+import { TransactionSubscriber, TransactionSubscriberError } from '../connection/transaction-subscriber';
 
 import { VendureEvent } from './vendure-event';
 
@@ -82,6 +83,7 @@ export class EventBus implements OnModuleDestroy {
             takeUntil(this.destroy$),
             filter(e => (e as any).constructor === type),
             mergeMap(event => this.awaitActiveTransactions(event)),
+            filter(notNullOrUndefined)
         ) as Observable<T>;
     }
 
@@ -109,7 +111,7 @@ export class EventBus implements OnModuleDestroy {
      * * https://github.com/vendure-ecommerce/vendure/issues/520
      * * https://github.com/vendure-ecommerce/vendure/issues/1107
      */
-    private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T> {
+    private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T | undefined> {
         const entry = Object.entries(event).find(([_, value]) => value instanceof RequestContext);
 
         if (!entry) {
@@ -123,7 +125,9 @@ export class EventBus implements OnModuleDestroy {
             return event;
         }
 
-        return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => {
+        try {
+            await this.transactionSubscriber.awaitCommit(transactionManager.queryRunner);
+
             // Copy context and remove transaction manager
             // This will prevent queries to released query runner
             const newContext = ctx.copy();
@@ -133,6 +137,15 @@ export class EventBus implements OnModuleDestroy {
             (event as any)[key] = newContext
 
             return event;
-        });
+        } catch (e: any) {
+            if (e instanceof TransactionSubscriberError) {
+                // Expected commit, but rollback or something else happened.
+                // This is still reliable behavior, return undefined
+                // as event should not be exposed from this transaction
+                return;
+            }
+
+            throw e;
+        }
     }
 }

+ 6 - 5
yarn.lock

@@ -18611,10 +18611,10 @@ typedarray@^0.0.6:
   resolved "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777"
   integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=
 
-typeorm@0.2.41:
-  version "0.2.41"
-  resolved "https://registry.npmjs.org/typeorm/-/typeorm-0.2.41.tgz#88758101ac158dc0a0a903d70eaacea2974281cc"
-  integrity sha512-/d8CLJJxKPgsnrZWiMyPI0rz2MFZnBQrnQ5XP3Vu3mswv2WPexb58QM6BEtmRmlTMYN5KFWUz8SKluze+wS9xw==
+typeorm@0.2.45:
+  version "0.2.45"
+  resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.2.45.tgz#e5bbb3af822dc4646bad96cfa48cd22fa4687cea"
+  integrity sha512-c0rCO8VMJ3ER7JQ73xfk0zDnVv0WDjpsP6Q1m6CVKul7DB9iVdWLRjPzc8v2eaeBuomsbZ2+gTaYr8k1gm3bYA==
   dependencies:
     "@sqltools/formatter" "^1.2.2"
     app-root-path "^3.0.0"
@@ -18629,6 +18629,7 @@ typeorm@0.2.41:
     reflect-metadata "^0.1.13"
     sha.js "^2.4.11"
     tslib "^2.1.0"
+    uuid "^8.3.2"
     xml2js "^0.4.23"
     yargs "^17.0.1"
     zen-observable-ts "^1.0.0"
@@ -18924,7 +18925,7 @@ uuid@8.3.1:
   resolved "https://registry.npmjs.org/uuid/-/uuid-8.3.1.tgz#2ba2e6ca000da60fce5a196954ab241131e05a31"
   integrity sha512-FOmRr+FmWEIG8uhZv6C2bTgEVXsHk08kE7mPlrBbEe+c3r9pjceVPgupIfNIhc4yx55H69OXANrUaSuu9eInKg==
 
-uuid@8.3.2, uuid@^8.0.0, uuid@^8.2.0:
+uuid@8.3.2, uuid@^8.0.0, uuid@^8.2.0, uuid@^8.3.2:
   version "8.3.2"
   resolved "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
   integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==