Przeglądaj źródła

feat(core): Expose `withTransaction` method on TransactionalConnection

Closes #1129
Michael Bromley 4 lat temu
rodzic
commit
861ef299f3

+ 49 - 0
packages/core/e2e/database-transactions.e2e-spec.ts

@@ -107,6 +107,46 @@ describe('Transaction infrastructure', () => {
         expect(!!verify.users.find((u: any) => u.identifier === 'test4')).toBe(true);
     });
 
+    it('failing mutation inside connection.withTransaction() wrapper with request context', async () => {
+        try {
+            await adminClient.query(CREATE_ADMIN5, {
+                emailAddress: 'test5',
+                fail: true,
+                noContext: false,
+            });
+            fail('Should have thrown');
+        } catch (e) {
+            expect(e.message).toContain('Failed!');
+        }
+
+        const { verify } = await adminClient.query(VERIFY_TEST);
+
+        expect(verify.admins.length).toBe(2);
+        expect(verify.users.length).toBe(3);
+        expect(!!verify.admins.find((a: any) => a.emailAddress === 'test5')).toBe(false);
+        expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
+    });
+
+    it('failing mutation inside connection.withTransaction() wrapper without request context', async () => {
+        try {
+            await adminClient.query(CREATE_ADMIN5, {
+                emailAddress: 'test5',
+                fail: true,
+                noContext: true,
+            });
+            fail('Should have thrown');
+        } catch (e) {
+            expect(e.message).toContain('Failed!');
+        }
+
+        const { verify } = await adminClient.query(VERIFY_TEST);
+
+        expect(verify.admins.length).toBe(2);
+        expect(verify.users.length).toBe(3);
+        expect(!!verify.admins.find((a: any) => a.emailAddress === 'test5')).toBe(false);
+        expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
+    });
+
     // Testing https://github.com/vendure-ecommerce/vendure/issues/520
     it('passing transaction via EventBus', async () => {
         TransactionTestPlugin.reset();
@@ -179,6 +219,15 @@ const CREATE_ADMIN4 = gql`
     ${ADMIN_FRAGMENT}
 `;
 
+const CREATE_ADMIN5 = gql`
+    mutation CreateTestAdmin5($emailAddress: String!, $fail: Boolean!, $noContext: Boolean!) {
+        createTestAdministrator5(emailAddress: $emailAddress, fail: $fail, noContext: $noContext) {
+            ...CreatedAdmin
+        }
+    }
+    ${ADMIN_FRAGMENT}
+`;
+
 const VERIFY_TEST = gql`
     query VerifyTest {
         verify {

+ 28 - 0
packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts

@@ -109,6 +109,29 @@ class TestResolver {
         return admin;
     }
 
+    @Mutation()
+    async createTestAdministrator5(@Ctx() ctx: RequestContext, @Args() args: any) {
+        if (args.noContext === true) {
+            return this.connection.withTransaction(ctx, async _ctx => {
+                const admin = await this.testAdminService.createAdministrator(
+                    _ctx,
+                    args.emailAddress,
+                    args.fail,
+                );
+                return admin;
+            });
+        } else {
+            return this.connection.withTransaction(async _ctx => {
+                const admin = await this.testAdminService.createAdministrator(
+                    _ctx,
+                    args.emailAddress,
+                    args.fail,
+                );
+                return admin;
+            });
+        }
+    }
+
     @Query()
     async verify() {
         const admins = await this.connection.getRepository(Administrator).find();
@@ -130,6 +153,11 @@ class TestResolver {
                 createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator
                 createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator
                 createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator
+                createTestAdministrator5(
+                    emailAddress: String!
+                    fail: Boolean!
+                    noContext: Boolean!
+                ): Administrator
             }
             type VerifyResult {
                 admins: [Administrator!]!

+ 15 - 105
packages/core/src/api/middleware/transaction-interceptor.ts

@@ -1,14 +1,11 @@
 import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
 import { Reflector } from '@nestjs/core';
 import { Observable, of } from 'rxjs';
-import { retryWhen, take, tap } from 'rxjs/operators';
-import { QueryRunner } from 'typeorm';
-import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
 
-import { REQUEST_CONTEXT_KEY, TRANSACTION_MANAGER_KEY } from '../../common/constants';
+import { REQUEST_CONTEXT_KEY } from '../../common/constants';
+import { TransactionWrapper } from '../../connection/transaction-wrapper';
 import { TransactionalConnection } from '../../connection/transactional-connection';
 import { parseContext } from '../common/parse-context';
-import { RequestContext } from '../common/request-context';
 import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/transaction.decorator';
 
 /**
@@ -18,7 +15,11 @@ import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/tr
  */
 @Injectable()
 export class TransactionInterceptor implements NestInterceptor {
-    constructor(private connection: TransactionalConnection, private reflector: Reflector) {}
+    constructor(
+        private connection: TransactionalConnection,
+        private transactionWrapper: TransactionWrapper,
+        private reflector: Reflector,
+    ) {}
     intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
         const { isGraphQL, req } = parseContext(context);
         const ctx = (req as any)[REQUEST_CONTEXT_KEY];
@@ -27,107 +28,16 @@ export class TransactionInterceptor implements NestInterceptor {
                 TRANSACTION_MODE_METADATA_KEY,
                 context.getHandler(),
             );
-            return of(this.withTransaction(ctx, () => next.handle(), transactionMode));
+            return of(
+                this.transactionWrapper.executeInTransaction(
+                    ctx,
+                    () => next.handle(),
+                    transactionMode,
+                    this.connection.rawConnection,
+                ),
+            );
         } else {
             return next.handle();
         }
     }
-
-    /**
-     * @description
-     * Executes the `work` function within the context of a transaction.
-     */
-    private async withTransaction<T>(
-        ctx: RequestContext,
-        work: () => Observable<T>,
-        mode: TransactionMode,
-    ): Promise<T> {
-        const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
-        if (queryRunnerExists) {
-            // If a QueryRunner already exists on the RequestContext, there must be an existing
-            // outer transaction in progress. In that case, we just execute the work function
-            // as usual without needing to further wrap in a transaction.
-            return work().toPromise();
-        }
-        const queryRunner = this.connection.rawConnection.createQueryRunner();
-        if (mode === 'auto') {
-            await this.startTransaction(queryRunner);
-        }
-        (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;
-
-        try {
-            const maxRetries = 5;
-            const result = await work()
-                .pipe(
-                    retryWhen(errors =>
-                        errors.pipe(
-                            tap(err => {
-                                if (!this.isRetriableError(err)) {
-                                    throw err;
-                                }
-                            }),
-                            take(maxRetries),
-                        ),
-                    ),
-                )
-                .toPromise();
-            if (queryRunner.isTransactionActive) {
-                await queryRunner.commitTransaction();
-            }
-            return result;
-        } catch (error) {
-            if (queryRunner.isTransactionActive) {
-                await queryRunner.rollbackTransaction();
-            }
-            throw error;
-        } finally {
-            if (queryRunner?.isReleased === false) {
-                await queryRunner.release();
-            }
-        }
-    }
-
-    /**
-     * Attempts to start a DB transaction, with retry logic in the case that a transaction
-     * is already started for the connection (which is mainly a problem with SQLite/Sql.js)
-     */
-    private async startTransaction(queryRunner: QueryRunner) {
-        const maxRetries = 25;
-        let attempts = 0;
-        let lastError: any;
-        // Returns false if a transaction is already in progress
-        async function attemptStartTransaction(): Promise<boolean> {
-            try {
-                await queryRunner.startTransaction();
-                return true;
-            } catch (err) {
-                lastError = err;
-                if (err instanceof TransactionAlreadyStartedError) {
-                    return false;
-                }
-                throw err;
-            }
-        }
-        while (attempts < maxRetries) {
-            const result = await attemptStartTransaction();
-            if (result) {
-                return;
-            }
-            attempts++;
-            // insert an increasing delay before retrying
-            await new Promise(resolve => setTimeout(resolve, attempts * 20));
-        }
-        throw lastError;
-    }
-
-    /**
-     * If the resolver function throws an error, there are certain cases in which
-     * we want to retry the whole thing again - notably in the case of a deadlock
-     * situation, which can usually be retried with success.
-     */
-    private isRetriableError(err: any): boolean {
-        const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
-        const postgresDeadlock = err.code === 'deadlock_detected';
-        return mysqlDeadlock || postgresDeadlock;
-    }
 }

+ 3 - 2
packages/core/src/connection/connection.module.ts

@@ -7,14 +7,15 @@ import { ConfigService } from '../config/config.service';
 import { TypeOrmLogger } from '../config/logger/typeorm-logger';
 
 import { TransactionSubscriber } from './transaction-subscriber';
+import { TransactionWrapper } from './transaction-wrapper';
 import { TransactionalConnection } from './transactional-connection';
 
 let defaultTypeOrmModule: DynamicModule;
 
 @Module({
     imports: [ConfigModule],
-    providers: [TransactionalConnection, TransactionSubscriber],
-    exports: [TransactionalConnection, TransactionSubscriber],
+    providers: [TransactionalConnection, TransactionSubscriber, TransactionWrapper],
+    exports: [TransactionalConnection, TransactionSubscriber, TransactionWrapper],
 })
 export class ConnectionModule {
     static forRoot(): DynamicModule {

+ 118 - 0
packages/core/src/connection/transaction-wrapper.ts

@@ -0,0 +1,118 @@
+import { from, Observable, of } from 'rxjs';
+import { retryWhen, take, tap } from 'rxjs/operators';
+import { Connection, QueryRunner } from 'typeorm';
+import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
+
+import { RequestContext } from '../api/common/request-context';
+import { TransactionMode } from '../api/decorators/transaction.decorator';
+import { TRANSACTION_MANAGER_KEY } from '../common/constants';
+
+/**
+ * @description
+ * This helper class is used to wrap operations in a TypeORM transaction in order to ensure
+ * atomic operations on the database.
+ */
+export class TransactionWrapper {
+    /**
+     * @description
+     * Executes the `work` function within the context of a transaction. If the `work` function
+     * resolves / completes, then all the DB operations it contains will be committed. If it
+     * throws an error or rejects, then all DB operations will be rolled back.
+     */
+    async executeInTransaction<T>(
+        ctx: RequestContext,
+        work: () => Observable<T> | Promise<T>,
+        mode: TransactionMode,
+        connection: Connection,
+    ): Promise<T> {
+        const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
+        if (queryRunnerExists) {
+            // If a QueryRunner already exists on the RequestContext, there must be an existing
+            // outer transaction in progress. In that case, we just execute the work function
+            // as usual without needing to further wrap in a transaction.
+            return from(work()).toPromise();
+        }
+        const queryRunner = connection.createQueryRunner();
+        if (mode === 'auto') {
+            await this.startTransaction(queryRunner);
+        }
+        (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;
+
+        try {
+            const maxRetries = 5;
+            const result = await from(work())
+                .pipe(
+                    retryWhen(errors =>
+                        errors.pipe(
+                            tap(err => {
+                                if (!this.isRetriableError(err)) {
+                                    throw err;
+                                }
+                            }),
+                            take(maxRetries),
+                        ),
+                    ),
+                )
+                .toPromise();
+            if (queryRunner.isTransactionActive) {
+                await queryRunner.commitTransaction();
+            }
+            return result;
+        } catch (error) {
+            if (queryRunner.isTransactionActive) {
+                await queryRunner.rollbackTransaction();
+            }
+            throw error;
+        } finally {
+            if (queryRunner?.isReleased === false) {
+                await queryRunner.release();
+            }
+        }
+    }
+
+    /**
+     * Attempts to start a DB transaction, with retry logic in the case that a transaction
+     * is already started for the connection (which is mainly a problem with SQLite/Sql.js)
+     */
+    private async startTransaction(queryRunner: QueryRunner) {
+        const maxRetries = 25;
+        let attempts = 0;
+        let lastError: any;
+
+        // Returns false if a transaction is already in progress
+        async function attemptStartTransaction(): Promise<boolean> {
+            try {
+                await queryRunner.startTransaction();
+                return true;
+            } catch (err) {
+                lastError = err;
+                if (err instanceof TransactionAlreadyStartedError) {
+                    return false;
+                }
+                throw err;
+            }
+        }
+
+        while (attempts < maxRetries) {
+            const result = await attemptStartTransaction();
+            if (result) {
+                return;
+            }
+            attempts++;
+            // insert an increasing delay before retrying
+            await new Promise(resolve => setTimeout(resolve, attempts * 20));
+        }
+        throw lastError;
+    }
+
+    /**
+     * If the resolver function throws an error, there are certain cases in which
+     * we want to retry the whole thing again - notably in the case of a deadlock
+     * situation, which can usually be retried with success.
+     */
+    private isRetriableError(err: any): boolean {
+        const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
+        const postgresDeadlock = err.code === 'deadlock_detected';
+        return mysqlDeadlock || postgresDeadlock;
+    }
+}

+ 61 - 1
packages/core/src/connection/transactional-connection.ts

@@ -18,6 +18,7 @@ import { EntityNotFoundError } from '../common/error/errors';
 import { ChannelAware, SoftDeletable } from '../common/types/common-types';
 import { VendureEntity } from '../entity/base/base.entity';
 
+import { TransactionWrapper } from './transaction-wrapper';
 import { GetEntityOrThrowOptions } from './types';
 
 /**
@@ -34,7 +35,10 @@ import { GetEntityOrThrowOptions } from './types';
  */
 @Injectable()
 export class TransactionalConnection {
-    constructor(@InjectConnection() private connection: Connection) {}
+    constructor(
+        @InjectConnection() private connection: Connection,
+        private transactionWrapper: TransactionWrapper,
+    ) {}
 
     /**
      * @description
@@ -81,6 +85,62 @@ export class TransactionalConnection {
         }
     }
 
+    /**
+     * @description
+     * Allows database operations to be wrapped in a transaction, ensuring that in the event of an error being
+     * thrown at any point, the entire transaction will be rolled back and no changes will be saved.
+     *
+     * In the context of API requests, you should instead use the {@link Transaction} decorator on your resolver or
+     * controller method.
+     *
+     * On the other hand, for code that does not run in the context of a GraphQL/REST request, this method
+     * should be used to protect against non-atomic changes to the data which could leave your data in an
+     * inconsistent state.
+     *
+     * Such situations include function processed by the JobQueue or stand-alone scripts which make use
+     * of Vendure internal services.
+     *
+     * If there is already a {@link RequestContext} object available, you should pass it in as the first
+     * argument in order to add a new transaction to it. If not, omit the first argument and an empty
+     * RequestContext object will be created, which is then used to propagate the transaction to
+     * all inner method calls.
+     *
+     * @example
+     * ```TypeScript
+     * private async transferCredit(fromId: ID, toId: ID, amount: number) {
+     *   await this.connection.withTransaction(ctx => {
+     *     await this.giftCardService.updateCustomerCredit(fromId, -amount);
+     *
+     *     // If some intermediate logic here throws an Error,
+     *     // then all DB transactions will be rolled back and neither Customer's
+     *     // credit balance will have changed.
+     *
+     *     await this.giftCardService.updateCustomerCredit(toId, amount);
+     *   })
+     * }
+     * ```
+     *
+     * @since v1.3.0
+     */
+    async withTransaction<T>(work: (ctx: RequestContext) => Promise<T>): Promise<T>;
+    async withTransaction<T>(ctx: RequestContext, work: (ctx: RequestContext) => Promise<T>): Promise<T>;
+    async withTransaction<T>(
+        ctxOrWork: RequestContext | ((ctx: RequestContext) => Promise<T>),
+        maybeWork?: (ctx: RequestContext) => Promise<T>,
+    ): Promise<T> {
+        let ctx: RequestContext;
+        let work: (ctx: RequestContext) => Promise<T>;
+        if (ctxOrWork instanceof RequestContext) {
+            ctx = ctxOrWork;
+            // tslint:disable-next-line:no-non-null-assertion
+            work = maybeWork!;
+        } else {
+            ctx = RequestContext.empty();
+            work = ctxOrWork;
+        }
+        return this.transactionWrapper.executeInTransaction(ctx, () => work(ctx), 'auto', this.rawConnection);
+    }
+
     /**
      * @description
      * Manually start a transaction if one is not already in progress. This method should be used in