|
|
@@ -128,24 +128,40 @@ export class ElasticsearchIndexerController {
|
|
|
return new Observable(observer => {
|
|
|
(async () => {
|
|
|
const timeStart = Date.now();
|
|
|
+
|
|
|
if (ids.length) {
|
|
|
const batches = Math.ceil(ids.length / batchSize);
|
|
|
Logger.verbose(`Updating ${ids.length} variants...`);
|
|
|
|
|
|
+ let variantsInProduct: ProductVariant[] = [];
|
|
|
+
|
|
|
for (let i = 0; i < batches; i++) {
|
|
|
const begin = i * batchSize;
|
|
|
const end = begin + batchSize;
|
|
|
Logger.verbose(`Updating ids from index ${begin} to ${end}`);
|
|
|
const batchIds = ids.slice(begin, end);
|
|
|
- const batch = await this.getBatchByIds(ctx, batchIds);
|
|
|
- const operations = batch.reduce((ops, variant) => {
|
|
|
- return [
|
|
|
- ...ops,
|
|
|
- { update: { _id: variant.id.toString() } },
|
|
|
- { doc: this.createVariantIndexItem(variant) },
|
|
|
- ];
|
|
|
- }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
|
|
|
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
+
|
|
|
+ const variants = await this.getVariantsByIds(ctx, batchIds);
|
|
|
+
|
|
|
+ const variantsToIndex: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
|
|
|
+ const productsToIndex: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
|
|
|
+
|
|
|
+ // tslint:disable-next-line:prefer-for-of
|
|
|
+ for (let j = 0; j < variants.length; j++) {
|
|
|
+ const variant = variants[j];
|
|
|
+ variantsInProduct.push(variant);
|
|
|
+ variantsToIndex.push({ update: { _id: variant.id.toString() } });
|
|
|
+ variantsToIndex.push({ doc: this.createVariantIndexItem(variant) });
|
|
|
+
|
|
|
+ const nextVariant = variants[j + 1];
|
|
|
+ if (nextVariant && nextVariant.productId !== variant.productId) {
|
|
|
+ productsToIndex.push({ update: { _id: variant.productId.toString() } });
|
|
|
+ productsToIndex.push({ doc: this.createProductIndexItem(variantsInProduct) });
|
|
|
+ variantsInProduct = [];
|
|
|
+ }
|
|
|
+ }
|
|
|
+ await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
|
|
|
+ await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
|
|
|
observer.next({
|
|
|
total: ids.length,
|
|
|
completed: Math.min((i + 1) * batchSize, ids.length),
|
|
|
@@ -280,7 +296,7 @@ export class ElasticsearchIndexerController {
|
|
|
return this.hydrateVariants(ctx, variants);
|
|
|
}
|
|
|
|
|
|
- private async getBatchByIds(ctx: RequestContext, ids: ID[]) {
|
|
|
+ private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
|
|
|
const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
|
|
|
relations: variantRelations,
|
|
|
});
|