Browse Source

fix(core): Prevent race conditions when updating search index

Make use of an async task queue which limits concurrent update operations.
Michael Bromley 6 years ago
parent
commit
8872a9467c

+ 18 - 13
packages/core/src/config/collection/default-collection-filters.ts

@@ -13,19 +13,24 @@ export const facetValueCollectionFilter = new CollectionFilter({
     code: 'facet-value-filter',
     description: 'Filter by FacetValues',
     apply: (qb, args) => {
-        qb.leftJoin('productVariant.product', 'product')
-            .leftJoin('product.facetValues', 'productFacetValues')
-            .leftJoin('productVariant.facetValues', 'variantFacetValues')
-            .andWhere(
-                new Brackets(qb1 => {
-                    const ids = args.facetValueIds;
-                    return qb1
-                        .where(`productFacetValues.id IN (:...ids)`, { ids })
-                        .orWhere(`variantFacetValues.id IN (:...ids)`, { ids });
-                }),
-            )
-            .groupBy('productVariant.id')
-            .having(`COUNT(1) >= :count`, { count: args.facetValueIds.length });
+        if (args.facetValueIds.length) {
+            qb.leftJoin('productVariant.product', 'product')
+                .leftJoin('product.facetValues', 'productFacetValues')
+                .leftJoin('productVariant.facetValues', 'variantFacetValues')
+                .andWhere(
+                    new Brackets(qb1 => {
+                        const ids = args.facetValueIds;
+                        return qb1
+                            .where(`productFacetValues.id IN (:...ids)`, {ids})
+                            .orWhere(`variantFacetValues.id IN (:...ids)`, {ids});
+                    }),
+                )
+                .groupBy('productVariant.id')
+                .having(`COUNT(1) >= :count`, {count: args.facetValueIds.length});
+        } else {
+            // If no facetValueIds are specified, no ProductVariants will be matched.
+            qb.andWhere('1 = 0');
+        }
         return qb;
     },
 });

+ 2 - 3
packages/core/src/data-import/providers/import-parser/import-parser.ts

@@ -1,9 +1,8 @@
 import { Injectable } from '@nestjs/common';
-import parse from 'csv-parse';
-import { Stream } from 'stream';
-
 import { normalizeString } from '@vendure/common/lib/normalize-string';
 import { unique } from '@vendure/common/lib/unique';
+import parse from 'csv-parse';
+import { Stream } from 'stream';
 
 export type BaseProductRecord = {
     name?: string;

+ 71 - 0
packages/core/src/plugin/default-search-plugin/async-queue.ts

@@ -0,0 +1,71 @@
+export type Task<T = any> = () => Promise<T> | T;
+export type Resolve<T> = (val: T) => void;
+export type Reject<T> = (val: T) => void;
+type TaskQueueItem = { task: Task; resolve: Resolve<any>; reject: Reject<any>; };
+
+/**
+ * A queue class for limiting concurrent async tasks. This can be used e.g. to prevent
+ * race conditions when working on a shared resource such as writing to a database.
+ *
+ * The task queue itself is shared across instances (even across processes) by means of the
+ * 'label' constructor argument.
+ */
+export class AsyncQueue {
+    private static running: { [label: string]: number } = {};
+    private static taskQueue: { [label: string]: TaskQueueItem[]; } = {};
+
+    constructor(private label: string = 'default', private concurrency: number = 1) {
+        if (!AsyncQueue.taskQueue[label]) {
+            AsyncQueue.taskQueue[label] = [];
+            AsyncQueue.running[label] = 0;
+        }
+    }
+
+    private get running(): number {
+        return AsyncQueue.running[this.label];
+    }
+
+    private inc() {
+        AsyncQueue.running[this.label]++;
+    }
+
+    private dec() {
+        AsyncQueue.running[this.label]--;
+    }
+
+    /**
+     * Pushes a new task onto the queue, upon which the task will either execute immediately or
+     * (if the number of running tasks is equal to the concurrency limit) enqueue the task to
+     * be executed at the soonest opportunity.
+     */
+    push<T>(task: Task<T>): Promise<T> {
+        return new Promise<T>((resolve, reject) => {
+            this.running < this.concurrency ? this.runTask(task, resolve, reject) : this.enqueueTask(task, resolve, reject);
+        });
+    }
+
+    private async runTask<T>(task: Task<T>, resolve: Resolve<T>, reject: Reject<T>) {
+        this.inc();
+        try {
+            const result = await task();
+            resolve(result);
+        } catch (e) {
+            reject(e);
+        }
+        this.dec();
+        if (this.getQueue().length > 0) {
+            const nextTask = this.getQueue().shift();
+            if (nextTask) {
+                await this.runTask(nextTask.task, nextTask.resolve, nextTask.reject);
+            }
+        }
+    }
+
+    private enqueueTask<T>(task: Task<T>, resolve: Resolve<T>, reject: Reject<T>) {
+        this.getQueue().push({ task, resolve, reject });
+    }
+
+    private getQueue(): TaskQueueItem[] {
+        return AsyncQueue.taskQueue[this.label];
+    }
+}

+ 24 - 13
packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts

@@ -16,6 +16,7 @@ import { FacetValueService } from '../../service/services/facet-value.service';
 import { ProductVariantService } from '../../service/services/product-variant.service';
 import { SearchService } from '../../service/services/search.service';
 
+import { AsyncQueue } from './async-queue';
 import { DefaultSearchReindexResponse } from './default-search-plugin';
 import { SearchIndexItem } from './search-index-item.entity';
 import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy';
@@ -29,6 +30,7 @@ import { SqliteSearchStrategy } from './search-strategy/sqlite-search-strategy';
  */
 @Injectable()
 export class FulltextSearchService implements SearchService {
+    private taskQueue = new AsyncQueue('search-service', 1);
     private searchStrategy: SearchStrategy;
     private readonly minTermLength = 2;
     private readonly variantRelations = [
@@ -70,7 +72,7 @@ export class FulltextSearchService implements SearchService {
     async facetValues(
         ctx: RequestContext,
         input: SearchInput,
-        enabledOnly: boolean = false
+        enabledOnly: boolean = false,
     ): Promise<Array<{ facetValue: FacetValue; count: number }>> {
         const facetValueIdsMap = await this.searchStrategy.getFacetValueIds(ctx, input, enabledOnly);
         const facetValues = await this.facetValueService.findByIds(
@@ -96,8 +98,12 @@ export class FulltextSearchService implements SearchService {
         });
         FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
         const variants = await qb.where('variants__product.deletedAt IS NULL').getMany();
-        await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
-        await this.saveSearchIndexItems(ctx, variants);
+
+        await this.taskQueue.push(async () => {
+            await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
+            await this.saveSearchIndexItems(ctx, variants);
+        });
+
         return {
             success: true,
             indexedItemCount: variants.length,
@@ -137,20 +143,25 @@ export class FulltextSearchService implements SearchService {
                 updatedVariants = [variant];
             }
         }
-        if (updatedVariants.length) {
-            await this.saveSearchIndexItems(ctx, updatedVariants);
-        }
-        if (removedVariantIds.length) {
-            await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
-        }
+        await this.taskQueue.push(async () => {
+            if (updatedVariants.length) {
+                await this.saveSearchIndexItems(ctx, updatedVariants);
+            }
+            if (removedVariantIds.length) {
+                await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
+            }
+        });
+
     }
 
     async updateVariantsById(ctx: RequestContext, ids: ID[]) {
         if (ids.length) {
-            const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
-                relations: this.variantRelations,
+            this.taskQueue.push(async () => {
+                const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
+                    relations: this.variantRelations,
+                });
+                await this.saveSearchIndexItems(ctx, updatedVariants);
             });
-            await this.saveSearchIndexItems(ctx, updatedVariants);
         }
     }
 
@@ -203,7 +214,7 @@ export class FulltextSearchService implements SearchService {
                         collectionIds: v.collections.map(c => c.id.toString()),
                     }),
             );
-        await this.connection.getRepository(SearchIndexItem).save(items);
+        return this.connection.getRepository(SearchIndexItem).save(items);
     }
 
     /**