|
|
@@ -98,7 +98,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
ctx: rawContext,
|
|
|
productId,
|
|
|
}: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
return asyncObservable(async () => {
|
|
|
await this.updateProductInternal(ctx, productId, ctx.channelId);
|
|
|
return true;
|
|
|
@@ -113,11 +113,14 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
ctx: rawContext,
|
|
|
productId,
|
|
|
}: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
return asyncObservable(async () => {
|
|
|
await this.deleteProductInternal(productId, ctx.channelId);
|
|
|
const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
- await this.deleteVariantsInternal(variants.map(v => v.id), ctx.channelId);
|
|
|
+ await this.deleteVariantsInternal(
|
|
|
+ variants.map((v) => v.id),
|
|
|
+ ctx.channelId,
|
|
|
+ );
|
|
|
return true;
|
|
|
});
|
|
|
}
|
|
|
@@ -131,11 +134,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
productId,
|
|
|
channelId,
|
|
|
}: AssignProductToChannelMessage['data']): Observable<AssignProductToChannelMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
return asyncObservable(async () => {
|
|
|
await this.updateProductInternal(ctx, productId, channelId);
|
|
|
const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
- await this.updateVariantsInternal(ctx, variants.map(v => v.id), channelId);
|
|
|
+ await this.updateVariantsInternal(
|
|
|
+ ctx,
|
|
|
+ variants.map((v) => v.id),
|
|
|
+ channelId,
|
|
|
+ );
|
|
|
return true;
|
|
|
});
|
|
|
}
|
|
|
@@ -149,11 +156,14 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
productId,
|
|
|
channelId,
|
|
|
}: RemoveProductFromChannelMessage['data']): Observable<RemoveProductFromChannelMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
return asyncObservable(async () => {
|
|
|
await this.deleteProductInternal(productId, channelId);
|
|
|
const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
- await this.deleteVariantsInternal(variants.map(v => v.id), channelId);
|
|
|
+ await this.deleteVariantsInternal(
|
|
|
+ variants.map((v) => v.id),
|
|
|
+ channelId,
|
|
|
+ );
|
|
|
return true;
|
|
|
});
|
|
|
}
|
|
|
@@ -166,7 +176,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
ctx: rawContext,
|
|
|
variantIds,
|
|
|
}: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
return asyncObservable(async () => {
|
|
|
return this.asyncQueue.push(async () => {
|
|
|
await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
|
|
|
@@ -180,12 +190,12 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
ctx: rawContext,
|
|
|
variantIds,
|
|
|
}: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
return asyncObservable(async () => {
|
|
|
const variants = await this.connection
|
|
|
.getRepository(ProductVariant)
|
|
|
.findByIds(variantIds, { relations: ['product'] });
|
|
|
- const productIds = unique(variants.map(v => v.product.id));
|
|
|
+ const productIds = unique(variants.map((v) => v.product.id));
|
|
|
for (const productId of productIds) {
|
|
|
await this.updateProductInternal(ctx, productId, ctx.channelId);
|
|
|
}
|
|
|
@@ -199,10 +209,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
ctx: rawContext,
|
|
|
ids,
|
|
|
}: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
const { batchSize } = this.options;
|
|
|
|
|
|
- return asyncObservable(async observer => {
|
|
|
+ return asyncObservable(async (observer) => {
|
|
|
return this.asyncQueue.push(async () => {
|
|
|
const timeStart = Date.now();
|
|
|
if (ids.length) {
|
|
|
@@ -254,10 +264,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
ctx: rawContext,
|
|
|
dropIndices,
|
|
|
}: ReindexMessage['data']): Observable<ReindexMessage['response']> {
|
|
|
- const ctx = RequestContext.fromObject(rawContext);
|
|
|
+ const ctx = RequestContext.deserialize(rawContext);
|
|
|
const { batchSize } = this.options;
|
|
|
|
|
|
- return asyncObservable(async observer => {
|
|
|
+ return asyncObservable(async (observer) => {
|
|
|
return this.asyncQueue.push(async () => {
|
|
|
const timeStart = Date.now();
|
|
|
|
|
|
@@ -422,20 +432,17 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
if (updatedVariants.length) {
|
|
|
// When ProductVariants change, we need to update the corresponding Product index
|
|
|
// since e.g. price changes must be reflected on the Product level too.
|
|
|
- const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
|
|
|
+ const productIdsOfVariants = unique(updatedVariants.map((v) => v.productId));
|
|
|
for (const variantProductId of productIdsOfVariants) {
|
|
|
await this.updateProductInternal(ctx, variantProductId, channelId);
|
|
|
}
|
|
|
- const operations = updatedVariants.reduce(
|
|
|
- (ops, variant) => {
|
|
|
- return [
|
|
|
- ...ops,
|
|
|
- { update: { _id: this.getId(variant.id, channelId) } },
|
|
|
- { doc: this.createVariantIndexItem(variant, channelId), doc_as_upsert: true },
|
|
|
- ];
|
|
|
- },
|
|
|
- [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
|
|
|
- );
|
|
|
+ const operations = updatedVariants.reduce((ops, variant) => {
|
|
|
+ return [
|
|
|
+ ...ops,
|
|
|
+ { update: { _id: this.getId(variant.id, channelId) } },
|
|
|
+ { doc: this.createVariantIndexItem(variant, channelId), doc_as_upsert: true },
|
|
|
+ ];
|
|
|
+ }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
|
|
|
Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
|
|
|
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
}
|
|
|
@@ -447,16 +454,17 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
relations: ['variants'],
|
|
|
});
|
|
|
if (product) {
|
|
|
- updatedProductVariants = await this.connection
|
|
|
- .getRepository(ProductVariant)
|
|
|
- .findByIds(product.variants.map(v => v.id), {
|
|
|
+ updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
|
|
|
+ product.variants.map((v) => v.id),
|
|
|
+ {
|
|
|
relations: variantRelations,
|
|
|
where: {
|
|
|
deletedAt: null,
|
|
|
},
|
|
|
- });
|
|
|
+ },
|
|
|
+ );
|
|
|
if (product.enabled === false) {
|
|
|
- updatedProductVariants.forEach(v => (v.enabled = false));
|
|
|
+ updatedProductVariants.forEach((v) => (v.enabled = false));
|
|
|
}
|
|
|
}
|
|
|
if (updatedProductVariants.length) {
|
|
|
@@ -479,7 +487,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
|
|
|
private async deleteVariantsInternal(variantIds: ID[], channelId: ID) {
|
|
|
Logger.verbose(`Deleting ${variantIds.length} ProductVariants`, loggerCtx);
|
|
|
- const operations: BulkOperation[] = variantIds.map(id => ({
|
|
|
+ const operations: BulkOperation[] = variantIds.map((id) => ({
|
|
|
delete: { _id: this.getId(id, channelId) },
|
|
|
}));
|
|
|
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
@@ -504,7 +512,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
`Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`,
|
|
|
loggerCtx,
|
|
|
);
|
|
|
- body.items.forEach(item => {
|
|
|
+ body.items.forEach((item) => {
|
|
|
if (item.index) {
|
|
|
Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
|
|
|
}
|
|
|
@@ -578,8 +586,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
*/
|
|
|
private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
|
|
|
return variants
|
|
|
- .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
|
|
|
- .map(v => translateDeep(v, ctx.languageCode, ['product']));
|
|
|
+ .map((v) => this.productVariantService.applyChannelPriceAndTax(v, ctx))
|
|
|
+ .map((v) => translateDeep(v, ctx.languageCode, ['product']));
|
|
|
}
|
|
|
|
|
|
private createVariantIndexItem(v: ProductVariant, channelId: ID): VariantIndexItem {
|
|
|
@@ -604,9 +612,9 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
currencyCode: v.currencyCode,
|
|
|
description: v.product.description,
|
|
|
facetIds: this.getFacetIds([v]),
|
|
|
- channelIds: v.product.channels.map(c => c.id as string),
|
|
|
+ channelIds: v.product.channels.map((c) => c.id as string),
|
|
|
facetValueIds: this.getFacetValueIds([v]),
|
|
|
- collectionIds: v.collections.map(c => c.id.toString()),
|
|
|
+ collectionIds: v.collections.map((c) => c.id.toString()),
|
|
|
enabled: v.enabled && v.product.enabled,
|
|
|
};
|
|
|
const customMappings = Object.entries(this.options.customProductVariantMappings);
|
|
|
@@ -618,11 +626,11 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
|
|
|
private createProductIndexItem(variants: ProductVariant[], channelId: ID): ProductIndexItem {
|
|
|
const first = variants[0];
|
|
|
- const prices = variants.map(v => v.price);
|
|
|
- const pricesWithTax = variants.map(v => v.priceWithTax);
|
|
|
+ const prices = variants.map((v) => v.price);
|
|
|
+ const pricesWithTax = variants.map((v) => v.priceWithTax);
|
|
|
const productAsset = first.product.featuredAsset;
|
|
|
- const variantAsset = variants.filter(v => v.featuredAsset).length
|
|
|
- ? variants.filter(v => v.featuredAsset)[0].featuredAsset
|
|
|
+ const variantAsset = variants.filter((v) => v.featuredAsset).length
|
|
|
+ ? variants.filter((v) => v.featuredAsset)[0].featuredAsset
|
|
|
: null;
|
|
|
const item: ProductIndexItem = {
|
|
|
channelId,
|
|
|
@@ -646,9 +654,12 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
description: first.product.description,
|
|
|
facetIds: this.getFacetIds(variants),
|
|
|
facetValueIds: this.getFacetValueIds(variants),
|
|
|
- collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
|
|
|
- channelIds: first.product.channels.map(c => c.id as string),
|
|
|
- enabled: variants.some(v => v.enabled),
|
|
|
+ collectionIds: variants.reduce(
|
|
|
+ (ids, v) => [...ids, ...v.collections.map((c) => c.id)],
|
|
|
+ [] as ID[],
|
|
|
+ ),
|
|
|
+ channelIds: first.product.channels.map((c) => c.id as string),
|
|
|
+ enabled: variants.some((v) => v.enabled),
|
|
|
};
|
|
|
|
|
|
const customMappings = Object.entries(this.options.customProductMappings);
|