Browse Source

fix(core): Queue concurrent search index writes to avoid key conflicts

Michael Bromley 6 years ago
parent
commit
ae1145a4a3

+ 3 - 1
packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts

@@ -6,6 +6,7 @@ import { unique } from '../../../../../common/lib/unique';
 import { RequestContext } from '../../../api/common/request-context';
 import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
 import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
+import { AsyncQueue } from '../async-queue';
 import { SearchIndexItem } from '../search-index-item.entity';
 
 import { CompletedMessage, ConnectedMessage, Message, MessageType, ReturnRawBatchMessage, SaveVariantsPayload, VariantsSavedMessage } from './ipc';
@@ -39,6 +40,7 @@ export class IndexBuilder {
     private connection: Connection;
     private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;
     private onMessageHandlers = new Set<(message: string) => void>();
+    private queue = new AsyncQueue('search-index');
 
     /**
      * When running in the main process, it should be constructed with the existing connection.
@@ -139,7 +141,7 @@ export class IndexBuilder {
                 collectionIds: v.collections.map(c => c.id.toString()),
             }),
         );
-        await this.connection.getRepository(SearchIndexItem).save(items);
+        await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
         if (batch === total - 1) {
             return new CompletedMessage(true);
         } else {