Browse Source

fix(core): Add retry logic in case of transaction deadlocks

Fixes #527.
Michael Bromley 5 years ago
parent
commit
3b60bcbe72

+ 86 - 0
packages/core/e2e/fixtures/test-plugins/slow-mutation-plugin.ts

@@ -0,0 +1,86 @@
+import { Args, Mutation, Resolver } from '@nestjs/graphql';
+import {
+    Asset,
+    AssetType,
+    Country,
+    Ctx,
+    PluginCommonModule,
+    Product,
+    ProductAsset,
+    RequestContext,
+    TaxCategory,
+    TaxRate,
+    Transaction,
+    TransactionalConnection,
+    VendurePlugin,
+} from '@vendure/core';
+import gql from 'graphql-tag';
+
+@Resolver()
+export class SlowMutationResolver {
+    constructor(private connection: TransactionalConnection) {}
+
+    /**
+     * A mutation which simulates some slow DB operations occurring within a transaction.
+     */
+    @Transaction()
+    @Mutation()
+    async slowMutation(@Ctx() ctx: RequestContext, @Args() args: { delay: number }) {
+        const delay = Math.round(args.delay / 2);
+        const country = await this.connection.getRepository(ctx, Country).findOneOrFail({
+            where: {
+                code: 'AT',
+            },
+        });
+        country.enabled = false;
+        await new Promise(resolve => setTimeout(resolve, delay));
+        await this.connection.getRepository(ctx, Country).save(country);
+        country.enabled = true;
+        await new Promise(resolve => setTimeout(resolve, delay));
+        await this.connection.getRepository(ctx, Country).save(country);
+        return true;
+    }
+
+    /**
+     * This mutation attempts to cause a deadlock
+     */
+    @Transaction()
+    @Mutation()
+    async attemptDeadlock(@Ctx() ctx: RequestContext) {
+        const product = await this.connection.getRepository(ctx, Product).findOneOrFail(1);
+        const asset = await this.connection.getRepository(ctx, Asset).save(
+            new Asset({
+                name: 'test',
+                type: AssetType.BINARY,
+                mimeType: 'test/test',
+                fileSize: 1,
+                source: '',
+                preview: '',
+            }),
+        );
+        await new Promise(resolve => setTimeout(resolve, 100));
+        const productAsset = await this.connection.getRepository(ctx, ProductAsset).save(
+            new ProductAsset({
+                assetId: asset.id,
+                productId: product.id,
+                position: 0,
+            }),
+        );
+        await this.connection.getRepository(ctx, Product).update(product.id, { enabled: false });
+        return true;
+    }
+}
+
+@VendurePlugin({
+    imports: [PluginCommonModule],
+    adminApiExtensions: {
+        resolvers: [SlowMutationResolver],
+        schema: gql`
+            extend type Mutation {
+                slowMutation(delay: Int!): Boolean!
+                attemptDeadlock: Boolean!
+            }
+        `,
+    },
+})
+export class SlowMutationPlugin {}

File diff suppressed because it is too large
+ 400 - 529
packages/core/e2e/graphql/generated-e2e-admin-types.ts


+ 50 - 0
packages/core/e2e/graphql/shared-definitions.ts

@@ -669,3 +669,53 @@ export const ADMIN_TRANSITION_TO_STATE = gql`
     }
     ${ORDER_FRAGMENT}
 `;
+
+export const PRODUCT_OPTION_GROUP_FRAGMENT = gql`
+    fragment ProductOptionGroup on ProductOptionGroup {
+        id
+        code
+        name
+        options {
+            id
+            code
+            name
+        }
+        translations {
+            id
+            languageCode
+            name
+        }
+    }
+`;
+
+export const CREATE_PRODUCT_OPTION_GROUP = gql`
+    mutation CreateProductOptionGroup($input: CreateProductOptionGroupInput!) {
+        createProductOptionGroup(input: $input) {
+            ...ProductOptionGroup
+        }
+    }
+    ${PRODUCT_OPTION_GROUP_FRAGMENT}
+`;
+
+export const PRODUCT_WITH_OPTIONS_FRAGMENT = gql`
+    fragment ProductWithOptions on Product {
+        id
+        optionGroups {
+            id
+            code
+            options {
+                id
+                code
+            }
+        }
+    }
+`;
+
+export const ADD_OPTION_GROUP_TO_PRODUCT = gql`
+    mutation AddOptionGroupToProduct($productId: ID!, $optionGroupId: ID!) {
+        addOptionGroupToProduct(productId: $productId, optionGroupId: $optionGroupId) {
+            ...ProductWithOptions
+        }
+    }
+    ${PRODUCT_WITH_OPTIONS_FRAGMENT}
+`;

+ 143 - 0
packages/core/e2e/parallel-transactions.e2e-spec.ts

@@ -0,0 +1,143 @@
+import gql from 'graphql-tag';
+import path from 'path';
+
+import { initialData } from '../../../e2e-common/e2e-initial-data';
+import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
+import { createTestEnvironment } from '../../testing/lib/create-test-environment';
+
+import { SlowMutationPlugin } from './fixtures/test-plugins/slow-mutation-plugin';
+import {
+    AddOptionGroupToProduct,
+    CreateProduct,
+    CreateProductOptionGroup,
+    CreateProductVariants,
+    LanguageCode,
+} from './graphql/generated-e2e-admin-types';
+import {
+    ADD_OPTION_GROUP_TO_PRODUCT,
+    CREATE_PRODUCT,
+    CREATE_PRODUCT_OPTION_GROUP,
+    CREATE_PRODUCT_VARIANTS,
+} from './graphql/shared-definitions';
+
+describe('Parallel transactions', () => {
+    const { server, adminClient, shopClient } = createTestEnvironment({
+        ...testConfig,
+        plugins: [SlowMutationPlugin],
+    });
+
+    beforeAll(async () => {
+        await server.init({
+            initialData,
+            productsCsvPath: path.join(__dirname, 'fixtures/e2e-products-minimal.csv'),
+            customerCount: 2,
+        });
+        await adminClient.asSuperAdmin();
+    }, TEST_SETUP_TIMEOUT_MS);
+
+    afterAll(async () => {
+        await server.destroy();
+    });
+
+    it('does not fail on many concurrent, slow transactions', async () => {
+        const CONCURRENCY_LIMIT = 20;
+
+        const slowMutations = Array.from({ length: CONCURRENCY_LIMIT }).map(i =>
+            adminClient.query(SLOW_MUTATION, { delay: 50 }),
+        );
+        const result = await Promise.all(slowMutations);
+
+        expect(result).toEqual(Array.from({ length: CONCURRENCY_LIMIT }).map(() => ({ slowMutation: true })));
+    }, 100000);
+
+    it('does not fail on attempted deadlock', async () => {
+        const CONCURRENCY_LIMIT = 4;
+
+        const slowMutations = Array.from({ length: CONCURRENCY_LIMIT }).map(i =>
+            adminClient.query(ATTEMPT_DEADLOCK),
+        );
+        const result = await Promise.all(slowMutations);
+
+        expect(result).toEqual(
+            Array.from({ length: CONCURRENCY_LIMIT }).map(() => ({ attemptDeadlock: true })),
+        );
+    }, 100000);
+
+    // A real-world error-case originally reported in https://github.com/vendure-ecommerce/vendure/issues/527
+    it('does not deadlock on concurrent creating ProductVariants', async () => {
+        const CONCURRENCY_LIMIT = 4;
+
+        const { createProduct } = await adminClient.query<CreateProduct.Mutation, CreateProduct.Variables>(
+            CREATE_PRODUCT,
+            {
+                input: {
+                    translations: [
+                        { languageCode: LanguageCode.en, name: 'Test', slug: 'test', description: 'test' },
+                    ],
+                },
+            },
+        );
+
+        const sizes = Array.from({ length: CONCURRENCY_LIMIT }).map(i => `size-${i}`);
+
+        const { createProductOptionGroup } = await adminClient.query<
+            CreateProductOptionGroup.Mutation,
+            CreateProductOptionGroup.Variables
+        >(CREATE_PRODUCT_OPTION_GROUP, {
+            input: {
+                code: 'size',
+                options: sizes.map(size => ({
+                    code: size,
+                    translations: [{ languageCode: LanguageCode.en, name: size }],
+                })),
+                translations: [{ languageCode: LanguageCode.en, name: 'size' }],
+            },
+        });
+
+        await adminClient.query<AddOptionGroupToProduct.Mutation, AddOptionGroupToProduct.Variables>(
+            ADD_OPTION_GROUP_TO_PRODUCT,
+            {
+                productId: createProduct.id,
+                optionGroupId: createProductOptionGroup.id,
+            },
+        );
+
+        const createVariantMutations = createProductOptionGroup.options
+            .filter((_, index) => index < CONCURRENCY_LIMIT)
+            .map((option, i) => {
+                return adminClient.query<CreateProductVariants.Mutation, CreateProductVariants.Variables>(
+                    CREATE_PRODUCT_VARIANTS,
+                    {
+                        input: [
+                            {
+                                sku: `VARIANT-${i}`,
+                                productId: createProduct.id,
+                                optionIds: [option.id],
+                                translations: [{ languageCode: LanguageCode.en, name: `Variant ${i}` }],
+                                price: 1000,
+                                taxCategoryId: 'T_1',
+                                facetValueIds: ['T_1', 'T_2'],
+                                featuredAssetId: 'T_1',
+                                assetIds: ['T_1'],
+                            },
+                        ],
+                    },
+                );
+            });
+
+        const results = await Promise.all(createVariantMutations);
+        expect(results.length).toBe(CONCURRENCY_LIMIT);
+    }, 100000);
+});
+
+const SLOW_MUTATION = gql`
+    mutation SlowMutation($delay: Int!) {
+        slowMutation(delay: $delay)
+    }
+`;
+
+const ATTEMPT_DEADLOCK = gql`
+    mutation AttemptDeadlock {
+        attemptDeadlock
+    }
+`;

+ 2 - 28
packages/core/e2e/product-option.e2e-spec.ts

@@ -3,7 +3,7 @@ import gql from 'graphql-tag';
 import path from 'path';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 import { omit } from '../../common/lib/omit';
 
 import {
@@ -14,6 +14,7 @@ import {
     UpdateProductOption,
     UpdateProductOptionGroup,
 } from './graphql/generated-e2e-admin-types';
+import { CREATE_PRODUCT_OPTION_GROUP, PRODUCT_OPTION_GROUP_FRAGMENT } from './graphql/shared-definitions';
 import { assertThrowsWithMessage } from './utils/assert-throws-with-message';
 
 // tslint:disable:no-non-null-assertion
@@ -150,33 +151,6 @@ describe('ProductOption resolver', () => {
     });
 });
 
-const PRODUCT_OPTION_GROUP_FRAGMENT = gql`
-    fragment ProductOptionGroup on ProductOptionGroup {
-        id
-        code
-        name
-        options {
-            id
-            code
-            name
-        }
-        translations {
-            id
-            languageCode
-            name
-        }
-    }
-`;
-
-const CREATE_PRODUCT_OPTION_GROUP = gql`
-    mutation CreateProductOptionGroup($input: CreateProductOptionGroupInput!) {
-        createProductOptionGroup(input: $input) {
-            ...ProductOptionGroup
-        }
-    }
-    ${PRODUCT_OPTION_GROUP_FRAGMENT}
-`;
-
 const UPDATE_PRODUCT_OPTION_GROUP = gql`
     mutation UpdateProductOptionGroup($input: UpdateProductOptionGroupInput!) {
         updateProductOptionGroup(input: $input) {

+ 3 - 25
packages/core/e2e/product.e2e-spec.ts

@@ -6,7 +6,7 @@ import gql from 'graphql-tag';
 import path from 'path';
 
 import { initialData } from '../../../e2e-common/e2e-initial-data';
-import { TEST_SETUP_TIMEOUT_MS, testConfig } from '../../../e2e-common/test-config';
+import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';
 
 import {
     AddOptionGroupToProduct,
@@ -32,6 +32,7 @@ import {
     UpdateProductVariants,
 } from './graphql/generated-e2e-admin-types';
 import {
+    ADD_OPTION_GROUP_TO_PRODUCT,
     CREATE_PRODUCT,
     CREATE_PRODUCT_VARIANTS,
     DELETE_PRODUCT,
@@ -40,11 +41,11 @@ import {
     GET_PRODUCT_LIST,
     GET_PRODUCT_SIMPLE,
     GET_PRODUCT_WITH_VARIANTS,
+    PRODUCT_WITH_OPTIONS_FRAGMENT,
     UPDATE_PRODUCT,
     UPDATE_PRODUCT_VARIANTS,
 } from './graphql/shared-definitions';
 import { assertThrowsWithMessage } from './utils/assert-throws-with-message';
-import { sortById } from './utils/test-order-utils';
 
 // tslint:disable:no-non-null-assertion
 
@@ -1168,29 +1169,6 @@ describe('Product resolver', () => {
     });
 });
 
-const PRODUCT_WITH_OPTIONS_FRAGMENT = gql`
-    fragment ProductWithOptions on Product {
-        id
-        optionGroups {
-            id
-            code
-            options {
-                id
-                code
-            }
-        }
-    }
-`;
-
-export const ADD_OPTION_GROUP_TO_PRODUCT = gql`
-    mutation AddOptionGroupToProduct($productId: ID!, $optionGroupId: ID!) {
-        addOptionGroupToProduct(productId: $productId, optionGroupId: $optionGroupId) {
-            ...ProductWithOptions
-        }
-    }
-    ${PRODUCT_WITH_OPTIONS_FRAGMENT}
-`;
-
 export const REMOVE_OPTION_GROUP_FROM_PRODUCT = gql`
     mutation RemoveOptionGroupFromProduct($productId: ID!, $optionGroupId: ID!) {
         removeOptionGroupFromProduct(productId: $productId, optionGroupId: $optionGroupId) {

+ 71 - 6
packages/core/src/api/middleware/transaction-interceptor.ts

@@ -1,12 +1,15 @@
 import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
 import { Reflector } from '@nestjs/core';
 import { Observable, of } from 'rxjs';
+import { retryWhen, take, tap } from 'rxjs/operators';
+import { QueryRunner } from 'typeorm';
+import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';
 
 import { REQUEST_CONTEXT_KEY, TRANSACTION_MANAGER_KEY } from '../../common/constants';
 import { TransactionalConnection } from '../../service/transaction/transactional-connection';
 import { parseContext } from '../common/parse-context';
 import { RequestContext } from '../common/request-context';
-import { TRANSACTION_MODE_METADATA_KEY, TransactionMode } from '../decorators/transaction.decorator';
+import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/transaction.decorator';
 
 /**
  * @description
@@ -24,7 +27,7 @@ export class TransactionInterceptor implements NestInterceptor {
                 TRANSACTION_MODE_METADATA_KEY,
                 context.getHandler(),
             );
-            return of(this.withTransaction(ctx, () => next.handle().toPromise(), transactionMode));
+            return of(this.withTransaction(ctx, () => next.handle(), transactionMode));
         } else {
             return next.handle();
         }
@@ -34,22 +37,40 @@ export class TransactionInterceptor implements NestInterceptor {
      * @description
      * Executes the `work` function within the context of a transaction.
      */
-    private async withTransaction<T>(ctx: RequestContext, work: () => T, mode: TransactionMode): Promise<T> {
+    private async withTransaction<T>(
+        ctx: RequestContext,
+        work: () => Observable<T>,
+        mode: TransactionMode,
+    ): Promise<T> {
         const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
         if (queryRunnerExists) {
             // If a QueryRunner already exists on the RequestContext, there must be an existing
             // outer transaction in progress. In that case, we just execute the work function
             // as usual without needing to further wrap in a transaction.
-            return work();
+            return work().toPromise();
         }
         const queryRunner = this.connection.rawConnection.createQueryRunner();
         if (mode === 'auto') {
-            await queryRunner.startTransaction();
+            await this.startTransaction(queryRunner);
         }
         (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;
 
         try {
-            const result = await work();
+            const maxRetries = 5;
+            const result = await work()
+                .pipe(
+                    retryWhen(errors =>
+                        errors.pipe(
+                            tap(err => {
+                                if (!this.isRetriableError(err)) {
+                                    throw err;
+                                }
+                            }),
+                            take(maxRetries),
+                        ),
+                    ),
+                )
+                .toPromise();
             if (queryRunner.isTransactionActive) {
                 await queryRunner.commitTransaction();
             }
@@ -65,4 +86,48 @@ export class TransactionInterceptor implements NestInterceptor {
             }
         }
     }
+
+    /**
+     * Attempts to start a DB transaction, with retry logic in the case that a transaction
+     * is already started for the connection (which is mainly a problem with SQLite/Sql.js)
+     */
+    private async startTransaction(queryRunner: QueryRunner) {
+        const maxRetries = 25;
+        let attempts = 0;
+        let lastError: any;
+        // Returns false if a transaction is already in progress
+        async function attemptStartTransaction(): Promise<boolean> {
+            try {
+                await queryRunner.startTransaction();
+                return true;
+            } catch (err) {
+                lastError = err;
+                if (err instanceof TransactionAlreadyStartedError) {
+                    return false;
+                }
+                throw err;
+            }
+        }
+        while (attempts < maxRetries) {
+            const result = await attemptStartTransaction();
+            if (result) {
+                return;
+            }
+            attempts++;
+            // insert an increasing delay before retrying
+            await new Promise(resolve => setTimeout(resolve, attempts * 20));
+        }
+        throw lastError;
+    }
+
+    /**
+     * If the resolver function throws an error, there are certain cases in which
+     * we want to retry the whole thing again - notably in the case of a deadlock
+     * situation, which can usually be retried with success.
+     */
+    private isRetriableError(err: any): boolean {
+        const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
+        const postgresDeadlock = err.code === 'deadlock_detected';
+        return mysqlDeadlock || postgresDeadlock;
+    }
 }

+ 1 - 1
scripts/codegen/generate-graphql-types.ts

@@ -13,7 +13,7 @@ const CLIENT_QUERY_FILES = path.join(
 );
 const E2E_ADMIN_QUERY_FILES = path.join(
     __dirname,
-    '../../packages/core/e2e/**/!(import.e2e-spec|plugin.e2e-spec|shop-definitions|custom-fields.e2e-spec|price-calculation-strategy.e2e-spec|list-query-builder.e2e-spec|shop-order.e2e-spec|database-transactions.e2e-spec).ts',
+    '../../packages/core/e2e/**/!(import.e2e-spec|plugin.e2e-spec|shop-definitions|custom-fields.e2e-spec|price-calculation-strategy.e2e-spec|list-query-builder.e2e-spec|shop-order.e2e-spec|database-transactions.e2e-spec|parallel-transactions.e2e-spec).ts',
 );
 const E2E_SHOP_QUERY_FILES = [path.join(__dirname, '../../packages/core/e2e/graphql/shop-definitions.ts')];
 const E2E_ELASTICSEARCH_PLUGIN_QUERY_FILES = path.join(

Some files were not shown because too many files changed in this diff