|
|
@@ -79,29 +79,11 @@ export class ElasticsearchIndexerController {
|
|
|
variantId?: ID;
|
|
|
}): Observable<boolean> {
|
|
|
const ctx = RequestContext.fromObject(rawContext);
|
|
|
- let updatedProductVariants: ProductVariant[] = [];
|
|
|
- let removedProducts: Product[] = [];
|
|
|
let updatedVariants: ProductVariant[] = [];
|
|
|
- let removedVariantIds: ID[] = [];
|
|
|
|
|
|
return defer(async () => {
|
|
|
if (productId) {
|
|
|
- const product = await this.connection.getRepository(Product).findOne(productId, {
|
|
|
- relations: ['variants'],
|
|
|
- });
|
|
|
- if (product) {
|
|
|
- if (product.deletedAt) {
|
|
|
- removedProducts = [product];
|
|
|
- removedVariantIds = product.variants.map(v => v.id);
|
|
|
- } else {
|
|
|
- const variants = await this.connection
|
|
|
- .getRepository(ProductVariant)
|
|
|
- .findByIds(product.variants.map(v => v.id), {
|
|
|
- relations: variantRelations,
|
|
|
- });
|
|
|
- updatedProductVariants = updatedVariants;
|
|
|
- }
|
|
|
- }
|
|
|
+ await this.updateProduct(ctx, productId);
|
|
|
} else {
|
|
|
const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
|
|
|
relations: variantRelations,
|
|
|
@@ -110,44 +92,69 @@ export class ElasticsearchIndexerController {
|
|
|
updatedVariants = [variant];
|
|
|
}
|
|
|
}
|
|
|
- Logger.verbose(`Updating ${updatedVariants.length} variants`, loggerCtx);
|
|
|
- updatedVariants = this.hydrateVariants(ctx, updatedVariants);
|
|
|
-
|
|
|
- if (updatedProductVariants.length) {
|
|
|
- const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
|
|
|
- const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
|
|
|
- { update: { _id: updatedProductIndexItem.productId.toString() } },
|
|
|
- { doc: updatedProductIndexItem },
|
|
|
- ];
|
|
|
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
|
|
|
- }
|
|
|
if (updatedVariants.length) {
|
|
|
+ // When ProductVariants change, we need to update the corresponding Product index
|
|
|
+ // since e.g. price changes must be reflected on the Product level too.
|
|
|
+ const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
|
|
|
+ for (const variantProductId of productIdsOfVariants) {
|
|
|
+ await this.updateProduct(ctx, variantProductId);
|
|
|
+ }
|
|
|
const operations = updatedVariants.reduce(
|
|
|
(ops, variant) => {
|
|
|
return [
|
|
|
...ops,
|
|
|
{ update: { _id: variant.id.toString() } },
|
|
|
- { doc: this.createVariantIndexItem(variant) },
|
|
|
+ { doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
|
|
|
];
|
|
|
},
|
|
|
[] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
|
|
|
);
|
|
|
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
}
|
|
|
- if (removedVariantIds.length) {
|
|
|
- const operations = removedVariantIds.reduce(
|
|
|
- (ops, id) => {
|
|
|
- return [...ops, { delete: { _id: id.toString() } }];
|
|
|
- },
|
|
|
- [] as BulkOperation[],
|
|
|
- );
|
|
|
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
- }
|
|
|
-
|
|
|
return true;
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private async updateProduct(ctx: RequestContext, productId: ID) {
|
|
|
+ let updatedProductVariants: ProductVariant[] = [];
|
|
|
+ let removedProducts: Product[] = [];
|
|
|
+ let removedVariantIds: ID[] = [];
|
|
|
+ const product = await this.connection.getRepository(Product).findOne(productId, {
|
|
|
+ relations: ['variants'],
|
|
|
+ });
|
|
|
+ if (product) {
|
|
|
+ if (product.deletedAt) {
|
|
|
+ removedProducts = [product];
|
|
|
+ removedVariantIds = product.variants.map(v => v.id);
|
|
|
+ } else {
|
|
|
+ updatedProductVariants = await this.connection
|
|
|
+ .getRepository(ProductVariant)
|
|
|
+ .findByIds(product.variants.map(v => v.id), {
|
|
|
+ relations: variantRelations,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (updatedProductVariants.length) {
|
|
|
+ Logger.verbose(`Updating 1 product`, loggerCtx);
|
|
|
+ updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
|
|
|
+ const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
|
|
|
+ const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
|
|
|
+ { update: { _id: updatedProductIndexItem.productId.toString() } },
|
|
|
+ { doc: updatedProductIndexItem, doc_as_upsert: true },
|
|
|
+ ];
|
|
|
+ await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
|
|
|
+ }
|
|
|
+ if (removedVariantIds.length) {
|
|
|
+ const operations = removedVariantIds.reduce(
|
|
|
+ (ops, id) => {
|
|
|
+ return [...ops, { delete: { _id: id.toString() } }];
|
|
|
+ },
|
|
|
+ [] as BulkOperation[],
|
|
|
+ );
|
|
|
+ await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@MessagePattern(Message.UpdateVariantsById)
|
|
|
updateVariantsById({
|
|
|
ctx: rawContext,
|