Browse Source

fix(elasticsearch-plugin): Correctly remove deleted items from index

Michael Bromley 6 years ago
parent
commit
f0a56fa455
1 changed files with 95 additions and 71 deletions
  1. 95 71
      packages/elasticsearch-plugin/src/indexer.controller.ts

+ 95 - 71
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -79,82 +79,17 @@ export class ElasticsearchIndexerController {
         variantId?: ID;
     }): Observable<boolean> {
         const ctx = RequestContext.fromObject(rawContext);
-        let updatedVariants: ProductVariant[] = [];
 
         return defer(async () => {
             if (productId) {
                 await this.updateProduct(ctx, productId);
-            } else {
-                const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
-                    relations: variantRelations,
-                });
-                if (variant) {
-                    updatedVariants = [variant];
-                }
-            }
-            if (updatedVariants.length) {
-                // When ProductVariants change, we need to update the corresponding Product index
-                // since e.g. price changes must be reflected on the Product level too.
-                const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
-                for (const variantProductId of productIdsOfVariants) {
-                    await this.updateProduct(ctx, variantProductId);
-                }
-                const operations = updatedVariants.reduce(
-                    (ops, variant) => {
-                        return [
-                            ...ops,
-                            { update: { _id: variant.id.toString() } },
-                            { doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
-                        ];
-                    },
-                    [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
-                );
-                await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+            } else if (variantId) {
+                await this.updateProductVariant(ctx, variantId);
             }
             return true;
         });
     }
 
-    private async updateProduct(ctx: RequestContext, productId: ID) {
-        let updatedProductVariants: ProductVariant[] = [];
-        let removedProducts: Product[] = [];
-        let removedVariantIds: ID[] = [];
-        const product = await this.connection.getRepository(Product).findOne(productId, {
-            relations: ['variants'],
-        });
-        if (product) {
-            if (product.deletedAt) {
-                removedProducts = [product];
-                removedVariantIds = product.variants.map(v => v.id);
-            } else {
-                updatedProductVariants = await this.connection
-                    .getRepository(ProductVariant)
-                    .findByIds(product.variants.map(v => v.id), {
-                        relations: variantRelations,
-                    });
-            }
-        }
-        if (updatedProductVariants.length) {
-            Logger.verbose(`Updating 1 product`, loggerCtx);
-            updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
-            const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
-            const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
-                { update: { _id: updatedProductIndexItem.productId.toString() } },
-                { doc: updatedProductIndexItem, doc_as_upsert: true },
-            ];
-            await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
-        }
-        if (removedVariantIds.length) {
-            const operations = removedVariantIds.reduce(
-                (ops, id) => {
-                    return [...ops, { delete: { _id: id.toString() } }];
-                },
-                [] as BulkOperation[],
-            );
-            await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
-        }
-    }
-
     @MessagePattern(Message.UpdateVariantsById)
     updateVariantsById({
         ctx: rawContext,
@@ -239,7 +174,7 @@ export class ElasticsearchIndexerController {
                 const timeStart = Date.now();
                 const qb = this.getSearchIndexQueryBuilder();
                 const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
-                Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
+                Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
 
                 const batches = Math.ceil(count / batchSize);
                 let variantsInProduct: ProductVariant[] = [];
@@ -248,7 +183,7 @@ export class ElasticsearchIndexerController {
                     Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
 
                     const variants = await this.getBatch(ctx, qb, i);
-                    Logger.verbose(`variants count: ${variants.length}`);
+                    Logger.verbose(`ProductVariants count: ${variants.length}`);
 
                     const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
                     const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
@@ -286,15 +221,104 @@ export class ElasticsearchIndexerController {
         });
     }
 
+    private async updateProductVariant(ctx: RequestContext, variantId: ID) {
+        let updatedVariants: ProductVariant[] = [];
+        let removedVariantId: ID | undefined;
+
+        const productVariant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
+            relations: variantRelations,
+        });
+        if (productVariant) {
+            if (productVariant.deletedAt) {
+                removedVariantId = variantId;
+            } else {
+                updatedVariants = this.hydrateVariants(ctx, [productVariant]);
+            }
+        }
+
+        if (updatedVariants.length) {
+            // When ProductVariants change, we need to update the corresponding Product index
+            // since e.g. price changes must be reflected on the Product level too.
+            const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
+            for (const variantProductId of productIdsOfVariants) {
+                await this.updateProduct(ctx, variantProductId);
+            }
+            const operations = updatedVariants.reduce(
+                (ops, variant) => {
+                    return [
+                        ...ops,
+                        { update: { _id: variant.id.toString() } },
+                        { doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
+                    ];
+                },
+                [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
+            );
+            Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
+            await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+        }
+        if (removedVariantId) {
+            Logger.verbose(`Deleting 1 ProductVariant (${removedVariantId})`, loggerCtx);
+            const operations: BulkOperation[] = [{ delete: { _id: removedVariantId.toString() } }];
+            await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+        }
+    }
+
+    private async updateProduct(ctx: RequestContext, productId: ID) {
+        let updatedProductVariants: ProductVariant[] = [];
+        let removedProductId: ID | undefined;
+        let removedVariantIds: ID[] = [];
+        const product = await this.connection.getRepository(Product).findOne(productId, {
+            relations: ['variants'],
+        });
+        if (product) {
+            if (product.deletedAt) {
+                removedProductId = productId;
+                removedVariantIds = product.variants.map(v => v.id);
+            } else {
+                updatedProductVariants = await this.connection
+                    .getRepository(ProductVariant)
+                    .findByIds(product.variants.map(v => v.id), {
+                        relations: variantRelations,
+                    });
+            }
+        }
+        if (updatedProductVariants.length) {
+            Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
+            updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
+            const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
+            const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
+                { update: { _id: updatedProductIndexItem.productId.toString() } },
+                { doc: updatedProductIndexItem, doc_as_upsert: true },
+            ];
+            await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
+        }
+        if (removedVariantIds.length) {
+            const operations = removedVariantIds.reduce(
+                (ops, id) => {
+                    Logger.verbose(`Deleting 1 ProductVariant (${id})`, loggerCtx);
+                    return [...ops, { delete: { _id: id.toString() } }];
+                },
+                [] as BulkOperation[],
+            );
+            await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
+        }
+        if (removedProductId) {
+            Logger.verbose(`Deleting 1 Product (${removedProductId})`, loggerCtx);
+            const operations: BulkOperation[] = [{ delete: { _id: removedProductId.toString() } }];
+            await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
+        }
+    }
+
     private async executeBulkOperations(
         indexName: string,
         indexType: string,
         operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
     ) {
         try {
+            const fullIndexName = this.options.indexPrefix + indexName;
             const { body }: { body: BulkResponseBody } = await this.client.bulk({
                 refresh: 'true',
-                index: this.options.indexPrefix + indexName,
+                index: fullIndexName,
                 type: indexType,
                 body: operations,
             });
@@ -316,7 +340,7 @@ export class ElasticsearchIndexerController {
                     }
                 });
             } else {
-                Logger.verbose(`Executed ${body.items.length} bulk operations on ${indexType}`);
+                Logger.verbose(`Executed ${body.items.length} bulk operations on index [${fullIndexName}]`);
             }
             return body;
         } catch (e) {