|
|
@@ -1,6 +1,5 @@
|
|
|
import { Client } from '@elastic/elasticsearch';
|
|
|
-import { Controller, Inject, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
|
|
|
-import { MessagePattern } from '@nestjs/microservices';
|
|
|
+import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
|
|
|
import { unique } from '@vendure/common/lib/unique';
|
|
|
import {
|
|
|
Asset,
|
|
|
@@ -30,22 +29,17 @@ import { ELASTIC_SEARCH_OPTIONS, loggerCtx, PRODUCT_INDEX_NAME, VARIANT_INDEX_NA
|
|
|
import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils';
|
|
|
import { ElasticsearchOptions } from './options';
|
|
|
import {
|
|
|
- AssignProductToChannelMessage,
|
|
|
- AssignVariantToChannelMessage,
|
|
|
BulkOperation,
|
|
|
BulkOperationDoc,
|
|
|
BulkResponseBody,
|
|
|
- DeleteAssetMessage,
|
|
|
- DeleteProductMessage,
|
|
|
- DeleteVariantMessage,
|
|
|
+ ProductChannelMessageData,
|
|
|
ProductIndexItem,
|
|
|
- ReindexMessage,
|
|
|
- RemoveProductFromChannelMessage,
|
|
|
- RemoveVariantFromChannelMessage,
|
|
|
- UpdateAssetMessage,
|
|
|
- UpdateProductMessage,
|
|
|
- UpdateVariantMessage,
|
|
|
- UpdateVariantsByIdMessage,
|
|
|
+ ReindexMessageData,
|
|
|
+ UpdateAssetMessageData,
|
|
|
+ UpdateProductMessageData,
|
|
|
+ UpdateVariantMessageData,
|
|
|
+ UpdateVariantsByIdMessageData,
|
|
|
+ VariantChannelMessageData,
|
|
|
VariantIndexItem,
|
|
|
} from './types';
|
|
|
|
|
|
@@ -70,7 +64,7 @@ export interface ReindexMessageResponse {
|
|
|
duration: number;
|
|
|
}
|
|
|
|
|
|
-@Controller()
|
|
|
+@Injectable()
|
|
|
export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
|
|
|
private client: Client;
|
|
|
private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
|
|
|
@@ -96,160 +90,120 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
/**
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
- @MessagePattern(UpdateProductMessage.pattern)
|
|
|
- updateProduct({
|
|
|
- ctx: rawContext,
|
|
|
- productId,
|
|
|
- }: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
|
|
|
+ async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- await this.updateProductInternal(ctx, productId);
|
|
|
- return true;
|
|
|
- });
|
|
|
+ await this.updateProductInternal(ctx, productId);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
- @MessagePattern(DeleteProductMessage.pattern)
|
|
|
- deleteProduct({
|
|
|
- ctx: rawContext,
|
|
|
- productId,
|
|
|
- }: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
|
|
|
+ async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- const product = await this.connection.getRepository(Product).findOne(productId);
|
|
|
- if (!product) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- await this.deleteProductInternal(product, ctx.channelId);
|
|
|
- const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
- await this.deleteVariantsInternal(variants, ctx.channelId);
|
|
|
- return true;
|
|
|
- });
|
|
|
+ const product = await this.connection.getRepository(Product).findOne(productId);
|
|
|
+ if (!product) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ await this.deleteProductInternal(product, ctx.channelId);
|
|
|
+ const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
+ await this.deleteVariantsInternal(variants, ctx.channelId);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
- @MessagePattern(AssignProductToChannelMessage.pattern)
|
|
|
- assignProductsToChannel({
|
|
|
+ async assignProductToChannel({
|
|
|
ctx: rawContext,
|
|
|
productId,
|
|
|
channelId,
|
|
|
- }: AssignProductToChannelMessage['data']): Observable<AssignProductToChannelMessage['response']> {
|
|
|
+ }: ProductChannelMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- await this.updateProductInternal(ctx, productId);
|
|
|
- const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
- await this.updateVariantsInternal(
|
|
|
- ctx,
|
|
|
- variants.map(v => v.id),
|
|
|
- channelId,
|
|
|
- );
|
|
|
- return true;
|
|
|
- });
|
|
|
+ await this.updateProductInternal(ctx, productId);
|
|
|
+ const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
+ await this.updateVariantsInternal(
|
|
|
+ ctx,
|
|
|
+ variants.map(v => v.id),
|
|
|
+ channelId,
|
|
|
+ );
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Updates the search index only for the affected product.
|
|
|
*/
|
|
|
- @MessagePattern(RemoveProductFromChannelMessage.pattern)
|
|
|
- removeProductFromChannel({
|
|
|
+ async removeProductFromChannel({
|
|
|
ctx: rawContext,
|
|
|
productId,
|
|
|
channelId,
|
|
|
- }: RemoveProductFromChannelMessage['data']): Observable<RemoveProductFromChannelMessage['response']> {
|
|
|
+ }: ProductChannelMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- const product = await this.connection.getRepository(Product).findOne(productId);
|
|
|
- if (!product) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- await this.deleteProductInternal(product, channelId);
|
|
|
- const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
- await this.deleteVariantsInternal(variants, channelId);
|
|
|
- return true;
|
|
|
- });
|
|
|
+ const product = await this.connection.getRepository(Product).findOne(productId);
|
|
|
+ if (!product) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ await this.deleteProductInternal(product, channelId);
|
|
|
+ const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
|
|
|
+ await this.deleteVariantsInternal(variants, channelId);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(AssignVariantToChannelMessage.pattern)
|
|
|
- assignVariantToChannel({
|
|
|
+ async assignVariantToChannel({
|
|
|
ctx: rawContext,
|
|
|
productVariantId,
|
|
|
channelId,
|
|
|
- }: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
|
|
|
+ }: VariantChannelMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- await this.updateVariantsInternal(ctx, [productVariantId], channelId);
|
|
|
- return true;
|
|
|
- });
|
|
|
+ await this.updateVariantsInternal(ctx, [productVariantId], channelId);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(RemoveVariantFromChannelMessage.pattern)
|
|
|
- removeVariantFromChannel({
|
|
|
+ async removeVariantFromChannel({
|
|
|
ctx: rawContext,
|
|
|
productVariantId,
|
|
|
channelId,
|
|
|
- }: RemoveVariantFromChannelMessage['data']): Observable<RemoveVariantFromChannelMessage['response']> {
|
|
|
+ }: VariantChannelMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- const productVariant = await this.connection.getEntityOrThrow(
|
|
|
- ctx,
|
|
|
- ProductVariant,
|
|
|
- productVariantId,
|
|
|
- { relations: ['product', 'product.channels'] },
|
|
|
- );
|
|
|
- await this.deleteVariantsInternal([productVariant], channelId);
|
|
|
-
|
|
|
- if (!productVariant.product.channels.find(c => idsAreEqual(c.id, channelId))) {
|
|
|
- await this.deleteProductInternal(productVariant.product, channelId);
|
|
|
- }
|
|
|
- return true;
|
|
|
+ const productVariant = await this.connection.getEntityOrThrow(ctx, ProductVariant, productVariantId, {
|
|
|
+ relations: ['product', 'product.channels'],
|
|
|
});
|
|
|
+ await this.deleteVariantsInternal([productVariant], channelId);
|
|
|
+
|
|
|
+ if (!productVariant.product.channels.find(c => idsAreEqual(c.id, channelId))) {
|
|
|
+ await this.deleteProductInternal(productVariant.product, channelId);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Updates the search index only for the affected entities.
|
|
|
*/
|
|
|
- @MessagePattern(UpdateVariantMessage.pattern)
|
|
|
- updateVariants({
|
|
|
- ctx: rawContext,
|
|
|
- variantIds,
|
|
|
- }: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
|
|
|
+ async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- return this.asyncQueue.push(async () => {
|
|
|
- await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
|
|
|
- return true;
|
|
|
- });
|
|
|
+ return this.asyncQueue.push(async () => {
|
|
|
+ await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
|
|
|
+ return true;
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(DeleteVariantMessage.pattern)
|
|
|
- private deleteVaiants({
|
|
|
- ctx: rawContext,
|
|
|
- variantIds,
|
|
|
- }: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
|
|
|
+ async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
- return asyncObservable(async () => {
|
|
|
- const variants = await this.connection
|
|
|
- .getRepository(ProductVariant)
|
|
|
- .findByIds(variantIds, { relations: ['product'] });
|
|
|
- const productIds = unique(variants.map(v => v.product.id));
|
|
|
- for (const productId of productIds) {
|
|
|
- await this.updateProductInternal(ctx, productId);
|
|
|
- }
|
|
|
- await this.deleteVariantsInternal(variants, ctx.channelId);
|
|
|
- return true;
|
|
|
- });
|
|
|
+ const variants = await this.connection
|
|
|
+ .getRepository(ProductVariant)
|
|
|
+ .findByIds(variantIds, { relations: ['product'] });
|
|
|
+ const productIds = unique(variants.map(v => v.product.id));
|
|
|
+ for (const productId of productIds) {
|
|
|
+ await this.updateProductInternal(ctx, productId);
|
|
|
+ }
|
|
|
+ await this.deleteVariantsInternal(variants, ctx.channelId);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(UpdateVariantsByIdMessage.pattern)
|
|
|
updateVariantsById({
|
|
|
ctx: rawContext,
|
|
|
ids,
|
|
|
- }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
|
|
|
+ }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
const { batchSize } = this.options;
|
|
|
|
|
|
@@ -326,11 +280,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(ReindexMessage.pattern)
|
|
|
- reindex({
|
|
|
- ctx: rawContext,
|
|
|
- dropIndices,
|
|
|
- }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
|
|
|
+ reindex({ ctx: rawContext, dropIndices }: ReindexMessageData): Observable<ReindexMessageResponse> {
|
|
|
const ctx = RequestContext.deserialize(rawContext);
|
|
|
const { batchSize } = this.options;
|
|
|
|
|
|
@@ -401,34 +351,28 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(UpdateAssetMessage.pattern)
|
|
|
- updateAsset(data: UpdateAssetMessage['data']): Observable<UpdateAssetMessage['response']> {
|
|
|
- return asyncObservable(async () => {
|
|
|
- const result1 = await this.updateAssetFocalPointForIndex(PRODUCT_INDEX_NAME, data.asset);
|
|
|
- const result2 = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
|
|
|
- await this.client.indices.refresh({
|
|
|
- index: [
|
|
|
- this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
- this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
- ],
|
|
|
- });
|
|
|
- return result1 && result2;
|
|
|
+ async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
|
|
|
+ const result1 = await this.updateAssetFocalPointForIndex(PRODUCT_INDEX_NAME, data.asset);
|
|
|
+ const result2 = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
|
|
|
+ await this.client.indices.refresh({
|
|
|
+ index: [
|
|
|
+ this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
+ this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
+ ],
|
|
|
});
|
|
|
+ return result1 && result2;
|
|
|
}
|
|
|
|
|
|
- @MessagePattern(DeleteAssetMessage.pattern)
|
|
|
- deleteAsset(data: DeleteAssetMessage['data']): Observable<DeleteAssetMessage['response']> {
|
|
|
- return asyncObservable(async () => {
|
|
|
- const result1 = await this.deleteAssetForIndex(PRODUCT_INDEX_NAME, data.asset);
|
|
|
- const result2 = await this.deleteAssetForIndex(VARIANT_INDEX_NAME, data.asset);
|
|
|
- await this.client.indices.refresh({
|
|
|
- index: [
|
|
|
- this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
- this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
- ],
|
|
|
- });
|
|
|
- return result1 && result2;
|
|
|
+ async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
|
|
|
+ const result1 = await this.deleteAssetForIndex(PRODUCT_INDEX_NAME, data.asset);
|
|
|
+ const result2 = await this.deleteAssetForIndex(VARIANT_INDEX_NAME, data.asset);
|
|
|
+ await this.client.indices.refresh({
|
|
|
+ index: [
|
|
|
+ this.options.indexPrefix + PRODUCT_INDEX_NAME,
|
|
|
+ this.options.indexPrefix + VARIANT_INDEX_NAME,
|
|
|
+ ],
|
|
|
});
|
|
|
+ return result1 && result2;
|
|
|
}
|
|
|
|
|
|
private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
|