|
|
@@ -23,7 +23,7 @@ import {
|
|
|
import { Observable } from 'rxjs';
|
|
|
|
|
|
import { ELASTIC_SEARCH_OPTIONS, loggerCtx, PRODUCT_INDEX_NAME, VARIANT_INDEX_NAME } from './constants';
|
|
|
-import { createIndices, deleteIndices } from './indexing-utils';
|
|
|
+import { createIndices } from './indexing-utils';
|
|
|
import { ElasticsearchOptions } from './options';
|
|
|
import {
|
|
|
BulkOperation,
|
|
|
@@ -84,7 +84,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
@Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
|
|
|
private productVariantService: ProductVariantService,
|
|
|
private configService: ConfigService,
|
|
|
- ) {}
|
|
|
+ ) {
|
|
|
+ }
|
|
|
|
|
|
onModuleInit(): any {
|
|
|
const { host, port } = this.options;
|
|
|
@@ -118,10 +119,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
async assignProductToChannel({
|
|
|
- ctx: rawContext,
|
|
|
- productId,
|
|
|
- channelId,
|
|
|
- }: ProductChannelMessageData): Promise<boolean> {
|
|
|
+ ctx: rawContext,
|
|
|
+ productId,
|
|
|
+ channelId,
|
|
|
+ }: ProductChannelMessageData): Promise<boolean> {
|
|
|
await this.updateProductsInternal([productId]);
|
|
|
return true;
|
|
|
}
|
|
|
@@ -130,29 +131,29 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
async removeProductFromChannel({
|
|
|
- ctx: rawContext,
|
|
|
- productId,
|
|
|
- channelId,
|
|
|
- }: ProductChannelMessageData): Promise<boolean> {
|
|
|
+ ctx: rawContext,
|
|
|
+ productId,
|
|
|
+ channelId,
|
|
|
+ }: ProductChannelMessageData): Promise<boolean> {
|
|
|
await this.updateProductsInternal([productId]);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
async assignVariantToChannel({
|
|
|
- ctx: rawContext,
|
|
|
- productVariantId,
|
|
|
- channelId,
|
|
|
- }: VariantChannelMessageData): Promise<boolean> {
|
|
|
+ ctx: rawContext,
|
|
|
+ productVariantId,
|
|
|
+ channelId,
|
|
|
+ }: VariantChannelMessageData): Promise<boolean> {
|
|
|
const productIds = await this.getProductIdsByVariantIds([productVariantId]);
|
|
|
await this.updateProductsInternal(productIds);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
async removeVariantFromChannel({
|
|
|
- ctx: rawContext,
|
|
|
- productVariantId,
|
|
|
- channelId,
|
|
|
- }: VariantChannelMessageData): Promise<boolean> {
|
|
|
+ ctx: rawContext,
|
|
|
+ productVariantId,
|
|
|
+ channelId,
|
|
|
+ }: VariantChannelMessageData): Promise<boolean> {
|
|
|
const productIds = await this.getProductIdsByVariantIds([productVariantId]);
|
|
|
await this.updateProductsInternal(productIds);
|
|
|
return true;
|
|
|
@@ -178,9 +179,9 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
}
|
|
|
|
|
|
updateVariantsById({
|
|
|
- ctx: rawContext,
|
|
|
- ids,
|
|
|
- }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
|
|
|
+ ctx: rawContext,
|
|
|
+ ids,
|
|
|
+ }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
|
|
|
return asyncObservable(async observer => {
|
|
|
return this.asyncQueue.push(async () => {
|
|
|
const timeStart = Date.now();
|
|
|
@@ -207,19 +208,179 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- reindex({ ctx: rawContext, dropIndices }: ReindexMessageData): Observable<ReindexMessageResponse> {
|
|
|
+ reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
|
|
|
return asyncObservable(async observer => {
|
|
|
return this.asyncQueue.push(async () => {
|
|
|
const timeStart = Date.now();
|
|
|
const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
|
|
|
|
|
|
- if (dropIndices) {
|
|
|
- await deleteIndices(this.client, this.options.indexPrefix);
|
|
|
+ const reindexTempName = new Date().getTime();
|
|
|
+ try {
|
|
|
+
|
|
|
+ const getIndexNameByAlias = async (aliasName: string) => {
|
|
|
+ const aliasExist = await this.client.indices.existsAlias({ name: aliasName });
|
|
|
+ if (aliasExist.body) {
|
|
|
+ const alias = await this.client.indices.getAlias(
|
|
|
+ {
|
|
|
+ name: aliasName,
|
|
|
+ },
|
|
|
+ );
|
|
|
+ return Object.keys(alias.body)[0];
|
|
|
+ } else {
|
|
|
+ return aliasName;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
await createIndices(
|
|
|
this.client,
|
|
|
this.options.indexPrefix,
|
|
|
+ this.options.indexSettings,
|
|
|
+ this.options.indexMappingProperties,
|
|
|
this.configService.entityIdStrategy.primaryKeyType,
|
|
|
+ true,
|
|
|
+ `-reindex-${reindexTempName}`,
|
|
|
);
|
|
|
+ const reindexProductAliasName = this.options.indexPrefix + PRODUCT_INDEX_NAME + `-reindex-${reindexTempName}`;
|
|
|
+ const reindexVariantAliasName = this.options.indexPrefix + VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`;
|
|
|
+ const reindexProductIndexName = await getIndexNameByAlias(reindexProductAliasName);
|
|
|
+ const reindexVariantIndexName = await getIndexNameByAlias(reindexVariantAliasName);
|
|
|
+
|
|
|
+ const originalProductAliasExist = await this.client.indices.existsAlias({ name: this.options.indexPrefix + PRODUCT_INDEX_NAME });
|
|
|
+ const originalVariantAliasExist = await this.client.indices.existsAlias({ name: this.options.indexPrefix + VARIANT_INDEX_NAME });
|
|
|
+ const originalProductIndexExist = await this.client.indices.exists({ index: this.options.indexPrefix + PRODUCT_INDEX_NAME });
|
|
|
+ const originalVariantIndexExist = await this.client.indices.exists({ index: this.options.indexPrefix + PRODUCT_INDEX_NAME });
|
|
|
+
|
|
|
+ const originalProductIndexName = await getIndexNameByAlias(this.options.indexPrefix + PRODUCT_INDEX_NAME);
|
|
|
+ const originalVariantIndexName = await getIndexNameByAlias(this.options.indexPrefix + VARIANT_INDEX_NAME);
|
|
|
+
|
|
|
+ if (originalVariantAliasExist.body || originalVariantIndexExist.body) {
|
|
|
+ await this.client.reindex({
|
|
|
+ refresh: true,
|
|
|
+ body: {
|
|
|
+ source: {
|
|
|
+ index: this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
+ },
|
|
|
+ dest: {
|
|
|
+ index: this.options.indexPrefix + VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if (originalProductAliasExist.body || originalProductIndexExist.body) {
|
|
|
+ await this.client.reindex({
|
|
|
+ refresh: true,
|
|
|
+ body: {
|
|
|
+ source: {
|
|
|
+ index: this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
+ },
|
|
|
+ dest: {
|
|
|
+ index: this.options.indexPrefix + PRODUCT_INDEX_NAME + `-reindex-${reindexTempName}`,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ const actions = [
|
|
|
+ {
|
|
|
+ remove: {
|
|
|
+ index: reindexVariantIndexName,
|
|
|
+ alias: this.options.indexPrefix + VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ remove: {
|
|
|
+ index: reindexProductIndexName,
|
|
|
+ alias: this.options.indexPrefix + PRODUCT_INDEX_NAME + `-reindex-${reindexTempName}`,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ add: {
|
|
|
+ index: reindexVariantIndexName,
|
|
|
+ alias: this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ add: {
|
|
|
+ index: reindexProductIndexName,
|
|
|
+ alias: this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ ];
|
|
|
+
|
|
|
+ if (originalProductAliasExist.body) {
|
|
|
+ actions.push({
|
|
|
+ remove: {
|
|
|
+ index: originalProductIndexName,
|
|
|
+ alias: this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ );
|
|
|
+ } else if (originalProductIndexExist.body) {
|
|
|
+ await this.client.indices.delete({
|
|
|
+ index: [this.options.indexPrefix + PRODUCT_INDEX_NAME],
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ if (originalVariantAliasExist.body) {
|
|
|
+ actions.push({
|
|
|
+ remove: {
|
|
|
+ index: originalVariantIndexName,
|
|
|
+ alias: this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ );
|
|
|
+ } else if (originalVariantIndexExist.body) {
|
|
|
+ await this.client.indices.delete({
|
|
|
+ index: [this.options.indexPrefix + VARIANT_INDEX_NAME],
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ await this.client.indices.updateAliases({
|
|
|
+ body: {
|
|
|
+ actions,
|
|
|
+ },
|
|
|
+ });
|
|
|
+
|
|
|
+ if (originalProductAliasExist.body)
|
|
|
+ {
|
|
|
+ await this.client.indices.delete({
|
|
|
+ index: [originalProductIndexName],
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if (originalVariantAliasExist.body)
|
|
|
+ {
|
|
|
+ await this.client.indices.delete({
|
|
|
+ index: [originalVariantIndexName],
|
|
|
+ });
|
|
|
+ }
|
|
|
+ } catch (e) {
|
|
|
+ Logger.warn(`Could not recreate indices. Reindexing continue with existing indices.`, loggerCtx);
|
|
|
+ Logger.warn(JSON.stringify(e), loggerCtx);
|
|
|
+ } finally {
|
|
|
+ const reindexVariantAliasExist = await this.client.indices.existsAlias({ name: this.options.indexPrefix + VARIANT_INDEX_NAME + `-reindex-${reindexTempName}` });
|
|
|
+ if (reindexVariantAliasExist.body) {
|
|
|
+ const reindexVariantAliasResult = await this.client.indices.getAlias(
|
|
|
+ {
|
|
|
+ name: this.options.indexPrefix + VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`,
|
|
|
+ },
|
|
|
+ );
|
|
|
+ const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
|
|
|
+ await this.client.indices.delete({
|
|
|
+ index: [reindexVariantIndexName],
|
|
|
+ });
|
|
|
+ }
|
|
|
+ const reindexProductAliasExist = await this.client.indices.existsAlias({ name: this.options.indexPrefix + PRODUCT_INDEX_NAME + `-reindex-${reindexTempName}` });
|
|
|
+ if (reindexProductAliasExist.body) {
|
|
|
+ const reindexProductAliasResult = await this.client.indices.getAlias(
|
|
|
+ {
|
|
|
+ name: this.options.indexPrefix + PRODUCT_INDEX_NAME + `-reindex-${reindexTempName}`,
|
|
|
+ },
|
|
|
+ );
|
|
|
+ const reindexProductIndexName = Object.keys(reindexProductAliasResult.body)[0];
|
|
|
+ await this.client.indices.delete({
|
|
|
+ index: [reindexProductIndexName],
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
const deletedProductIds = await this.connection
|
|
|
@@ -384,7 +545,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
},
|
|
|
);
|
|
|
updatedProductVariants.forEach(variant => (variant.product = product));
|
|
|
- if (product.enabled === false) {
|
|
|
+ if (!product.enabled) {
|
|
|
updatedProductVariants.forEach(v => (v.enabled = false));
|
|
|
}
|
|
|
Logger.verbose(`Updating Product (${productId})`, loggerCtx);
|
|
|
@@ -424,15 +585,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
operation: {
|
|
|
doc: variantsInChannel.length
|
|
|
? this.createProductIndexItem(
|
|
|
- variantsInChannel,
|
|
|
- channelCtx.channelId,
|
|
|
- languageCode,
|
|
|
- )
|
|
|
+ variantsInChannel,
|
|
|
+ channelCtx.channelId,
|
|
|
+ languageCode,
|
|
|
+ )
|
|
|
: this.createSyntheticProductIndexItem(
|
|
|
- channelCtx,
|
|
|
- product,
|
|
|
- languageCode,
|
|
|
- ),
|
|
|
+ channelCtx,
|
|
|
+ product,
|
|
|
+ languageCode,
|
|
|
+ ),
|
|
|
doc_as_upsert: true,
|
|
|
},
|
|
|
},
|