|
|
@@ -126,10 +126,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
|
|
|
- await this.deleteProductOperations(
|
|
|
- RequestContext.deserialize(rawContext),
|
|
|
- productId,
|
|
|
- );
|
|
|
+ await this.deleteProductOperations(RequestContext.deserialize(rawContext), productId);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
@@ -258,8 +255,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
throw e;
|
|
|
}
|
|
|
|
|
|
- const totalProductIds = await this.connection
|
|
|
- .rawConnection
|
|
|
+ const totalProductIds = await this.connection.rawConnection
|
|
|
.getRepository(Product)
|
|
|
.createQueryBuilder('product')
|
|
|
.where('product.deletedAt IS NULL')
|
|
|
@@ -271,8 +267,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
let skip = 0;
|
|
|
let finishedProductsCount = 0;
|
|
|
do {
|
|
|
- productIds = await this.connection
|
|
|
- .rawConnection
|
|
|
+ productIds = await this.connection.rawConnection
|
|
|
.getRepository(Product)
|
|
|
.createQueryBuilder('product')
|
|
|
.select('product.id')
|
|
|
@@ -281,7 +276,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
.take(this.options.reindexProductsChunkSize)
|
|
|
.getMany();
|
|
|
|
|
|
- for (const {id: productId} of productIds) {
|
|
|
+ for (const { id: productId } of productIds) {
|
|
|
await this.updateProductsOperationsOnly(ctx, productId, variantIndexNameForReindex);
|
|
|
finishedProductsCount++;
|
|
|
observer.next({
|
|
|
@@ -294,7 +289,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
skip += this.options.reindexProductsChunkSize;
|
|
|
|
|
|
Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
|
|
|
- } while (productIds.length >= this.options.reindexProductsChunkSize)
|
|
|
+ } while (productIds.length >= this.options.reindexProductsChunkSize);
|
|
|
|
|
|
// Switch the index to the new reindexed one
|
|
|
await this.switchAlias(reindexVariantAliasName, variantIndexName);
|
|
|
@@ -307,7 +302,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
duration: +new Date() - timeStart,
|
|
|
};
|
|
|
});
|
|
|
- })
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
async executeBulkOperationsByChunks(
|
|
|
@@ -315,7 +310,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
operations: BulkVariantOperation[],
|
|
|
index = VARIANT_INDEX_NAME,
|
|
|
): Promise<void> {
|
|
|
- Logger.verbose(`Will execute ${operations.length} bulk update operations with index ${index}`, loggerCtx);
|
|
|
+ Logger.verbose(
|
|
|
+ `Will execute ${operations.length} bulk update operations with index ${index}`,
|
|
|
+ loggerCtx,
|
|
|
+ );
|
|
|
let i;
|
|
|
let j;
|
|
|
let processedOperation = 0;
|
|
|
@@ -414,10 +412,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
await this.updateProductsOperations(ctx, productIds);
|
|
|
}
|
|
|
|
|
|
- private async switchAlias(
|
|
|
- reindexVariantAliasName: string,
|
|
|
- variantIndexName: string,
|
|
|
- ): Promise<void> {
|
|
|
+ private async switchAlias(reindexVariantAliasName: string, variantIndexName: string): Promise<void> {
|
|
|
try {
|
|
|
const reindexVariantAliasExist = await this.client.indices.existsAlias({
|
|
|
name: reindexVariantAliasName,
|
|
|
@@ -434,10 +429,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
index: variantIndexName,
|
|
|
});
|
|
|
|
|
|
- const originalVariantIndexName = await getIndexNameByAlias(
|
|
|
- this.client,
|
|
|
- variantIndexName,
|
|
|
- );
|
|
|
+ const originalVariantIndexName = await getIndexNameByAlias(this.client, variantIndexName);
|
|
|
|
|
|
const actions = [
|
|
|
{
|
|
|
@@ -489,7 +481,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
const reindexVariantAliasResult = await this.client.indices.getAlias({
|
|
|
name: reindexVariantAliasName,
|
|
|
});
|
|
|
- const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0]
|
|
|
+ const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
|
|
|
await this.client.indices.delete({
|
|
|
index: [reindexVariantIndexName],
|
|
|
});
|
|
|
@@ -500,7 +492,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
private async updateProductsOperationsOnly(
|
|
|
ctx: MutableRequestContext,
|
|
|
productId: ID,
|
|
|
- index = VARIANT_INDEX_NAME
|
|
|
+ index = VARIANT_INDEX_NAME,
|
|
|
): Promise<void> {
|
|
|
let operations: BulkVariantOperation[] = [];
|
|
|
let product: Product | undefined;
|
|
|
@@ -509,7 +501,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
.getRepository(ctx, Product)
|
|
|
.find({
|
|
|
relationLoadStrategy: 'query',
|
|
|
- where: {id: productId, deletedAt: IsNull()},
|
|
|
+ where: { id: productId, deletedAt: IsNull() },
|
|
|
relations: this.productRelations,
|
|
|
})
|
|
|
.then(result => result[0] ?? undefined);
|
|
|
@@ -517,33 +509,40 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
Logger.error(e.message, loggerCtx, e.stack);
|
|
|
throw e;
|
|
|
}
|
|
|
- if (!product)
|
|
|
+ if (!product) {
|
|
|
return;
|
|
|
+ }
|
|
|
|
|
|
- const updatedProductVariants = await this.connection.getRepository(ctx, ProductVariant).find({
|
|
|
- relations: this.variantRelations,
|
|
|
- where: {
|
|
|
- productId,
|
|
|
- deletedAt: IsNull(),
|
|
|
- },
|
|
|
- order: {
|
|
|
- id: 'ASC',
|
|
|
- },
|
|
|
- relationLoadStrategy: 'query',
|
|
|
- });
|
|
|
+ let updatedProductVariants: ProductVariant[] = [];
|
|
|
+ try {
|
|
|
+ updatedProductVariants = await this.connection.rawConnection.getRepository(ProductVariant).find({
|
|
|
+ relations: this.variantRelations,
|
|
|
+ where: {
|
|
|
+ productId,
|
|
|
+ deletedAt: IsNull(),
|
|
|
+ },
|
|
|
+ order: {
|
|
|
+ id: 'ASC',
|
|
|
+ },
|
|
|
+ relationLoadStrategy: 'query',
|
|
|
+ });
|
|
|
+ } catch (e: any) {
|
|
|
+ console.log(e);
|
|
|
+ }
|
|
|
|
|
|
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
|
|
updatedProductVariants.forEach(variant => (variant.product = product!));
|
|
|
if (!product.enabled) {
|
|
|
updatedProductVariants.forEach(v => (v.enabled = false));
|
|
|
}
|
|
|
|
|
|
- Logger.debug(`Updating Product (${productId})`, loggerCtx)
|
|
|
- const languageVariants: LanguageCode[] = []
|
|
|
+ Logger.debug(`Updating Product (${productId})`, loggerCtx);
|
|
|
+ const languageVariants: LanguageCode[] = [];
|
|
|
languageVariants.push(...product.translations.map(t => t.languageCode));
|
|
|
for (const variant of updatedProductVariants)
|
|
|
languageVariants.push(...variant.translations.map(t => t.languageCode));
|
|
|
|
|
|
- const uniqueLanguageVariants = unique(languageVariants)
|
|
|
+ const uniqueLanguageVariants = unique(languageVariants);
|
|
|
for (const channel of product.channels) {
|
|
|
ctx.setChannel(channel);
|
|
|
const variantsInChannel = updatedProductVariants.filter(v =>
|
|
|
@@ -580,7 +579,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
doc_as_upsert: true,
|
|
|
},
|
|
|
},
|
|
|
- )
|
|
|
+ );
|
|
|
|
|
|
if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
|
|
|
// Because we can have a huge amount of variant for 1 product, we also chunk update operations
|
|
|
@@ -589,7 +588,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
operations,
|
|
|
index,
|
|
|
);
|
|
|
- operations = []
|
|
|
+ operations = [];
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
@@ -622,7 +621,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
operations,
|
|
|
index,
|
|
|
);
|
|
|
- operations = []
|
|
|
+ operations = [];
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -637,11 +636,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- private async updateProductsOperations(
|
|
|
- ctx: MutableRequestContext,
|
|
|
- productIds: ID[],
|
|
|
- ): Promise<void> {
|
|
|
- Logger.debug(`Updating ${productIds.length} Products`, loggerCtx)
|
|
|
+ private async updateProductsOperations(ctx: MutableRequestContext, productIds: ID[]): Promise<void> {
|
|
|
+ Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
|
|
|
for (const productId of productIds) {
|
|
|
await this.deleteProductOperations(ctx, productId);
|
|
|
await this.updateProductsOperationsOnly(ctx, productId);
|
|
|
@@ -695,8 +691,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
index: string = VARIANT_INDEX_NAME,
|
|
|
): Promise<void> {
|
|
|
const channels = await this.requestContextCache.get(ctx, 'elastic-index-all-channels', () =>
|
|
|
- this.connection
|
|
|
- .rawConnection
|
|
|
+ this.connection.rawConnection
|
|
|
.getRepository(Channel)
|
|
|
.createQueryBuilder('channel')
|
|
|
.select('channel.id')
|
|
|
@@ -720,8 +715,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
.andWhere('channel.id = :channelId', { channelId: ctx.channelId })
|
|
|
.getOne();
|
|
|
|
|
|
- if (!product)
|
|
|
- return;
|
|
|
+ if (!product) return;
|
|
|
|
|
|
Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
|
|
|
let operations: BulkVariantOperation[] = [];
|
|
|
@@ -732,8 +726,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
|
|
|
const uniqueLanguageVariants = unique(languageVariants);
|
|
|
|
|
|
-
|
|
|
- for (const {id: channelId} of channels) {
|
|
|
+ for (const { id: channelId } of channels) {
|
|
|
for (const languageCode of uniqueLanguageVariants) {
|
|
|
operations.push({
|
|
|
index: VARIANT_INDEX_NAME,
|
|
|
@@ -750,7 +743,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
operations,
|
|
|
index,
|
|
|
);
|
|
|
- operations = []
|
|
|
+ operations = [];
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -777,7 +770,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
languageVariants: LanguageCode[],
|
|
|
index = VARIANT_INDEX_NAME,
|
|
|
): Promise<void> {
|
|
|
- Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx)
|
|
|
+ Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
|
|
|
let operations: BulkVariantOperation[] = [];
|
|
|
for (const variant of variants) {
|
|
|
for (const channelId of channelIds) {
|
|
|
@@ -793,15 +786,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
),
|
|
|
},
|
|
|
},
|
|
|
- })
|
|
|
+ });
|
|
|
if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
|
|
|
// Because we can have a huge amount of variant for 1 product, we also chunk update operations
|
|
|
await this.executeBulkOperationsByChunks(
|
|
|
this.options.reindexBulkOperationSizeLimit,
|
|
|
operations,
|
|
|
index,
|
|
|
- )
|
|
|
- operations = []
|
|
|
+ );
|
|
|
+ operations = [];
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -811,8 +804,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
this.options.reindexBulkOperationSizeLimit,
|
|
|
operations,
|
|
|
index,
|
|
|
- )
|
|
|
- return
|
|
|
+ );
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
|