| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589 |
- import { Client } from '@elastic/elasticsearch';
- import { Controller, Inject, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
- import { MessagePattern } from '@nestjs/microservices';
- import { InjectConnection } from '@nestjs/typeorm';
- import { unique } from '@vendure/common/lib/unique';
- import {
- asyncObservable,
- AsyncQueue,
- FacetValue,
- ID,
- JobService,
- Logger,
- Product,
- ProductVariant,
- ProductVariantService,
- RequestContext,
- translateDeep,
- } from '@vendure/core';
- import { Observable } from 'rxjs';
- import { Connection, SelectQueryBuilder } from 'typeorm';
- import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
- import {
- ELASTIC_SEARCH_OPTIONS,
- loggerCtx,
- PRODUCT_INDEX_NAME,
- PRODUCT_INDEX_TYPE,
- VARIANT_INDEX_NAME,
- VARIANT_INDEX_TYPE,
- } from './constants';
- import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils';
- import { ElasticsearchOptions } from './options';
- import {
- AssignProductToChannelMessage,
- BulkOperation,
- BulkOperationDoc,
- BulkResponseBody,
- DeleteProductMessage,
- DeleteVariantMessage,
- ProductIndexItem,
- ReindexMessage,
- RemoveProductFromChannelMessage,
- UpdateProductMessage,
- UpdateVariantMessage,
- UpdateVariantsByIdMessage,
- VariantIndexItem,
- } from './types';
- export const variantRelations = [
- 'product',
- 'product.featuredAsset',
- 'product.facetValues',
- 'product.facetValues.facet',
- 'product.channels',
- 'featuredAsset',
- 'facetValues',
- 'facetValues.facet',
- 'collections',
- 'taxCategory',
- ];
- export interface ReindexMessageResponse {
- total: number;
- completed: number;
- duration: number;
- }
- @Controller()
- export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
- private client: Client;
- private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
- constructor(
- @InjectConnection() private connection: Connection,
- @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
- private productVariantService: ProductVariantService,
- private jobService: JobService,
- ) {}
- onModuleInit(): any {
- const { host, port } = this.options;
- this.client = new Client({
- node: `${host}:${port}`,
- });
- }
- onModuleDestroy(): any {
- return this.client.close();
- }
- /**
- * Updates the search index only for the affected product.
- */
- @MessagePattern(UpdateProductMessage.pattern)
- updateProduct({
- ctx: rawContext,
- productId,
- }: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- return asyncObservable(async () => {
- await this.updateProductInternal(ctx, productId, ctx.channelId);
- return true;
- });
- }
- /**
- * Updates the search index only for the affected product.
- */
- @MessagePattern(DeleteProductMessage.pattern)
- deleteProduct({
- ctx: rawContext,
- productId,
- }: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- return asyncObservable(async () => {
- await this.deleteProductInternal(productId, ctx.channelId);
- const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
- await this.deleteVariantsInternal(variants.map(v => v.id), ctx.channelId);
- return true;
- });
- }
- /**
- * Updates the search index only for the affected product.
- */
- @MessagePattern(AssignProductToChannelMessage.pattern)
- assignProductsToChannel({
- ctx: rawContext,
- productId,
- channelId,
- }: AssignProductToChannelMessage['data']): Observable<AssignProductToChannelMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- return asyncObservable(async () => {
- await this.updateProductInternal(ctx, productId, channelId);
- const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
- await this.updateVariantsInternal(ctx, variants.map(v => v.id), channelId);
- return true;
- });
- }
- /**
- * Updates the search index only for the affected product.
- */
- @MessagePattern(RemoveProductFromChannelMessage.pattern)
- removeProductFromChannel({
- ctx: rawContext,
- productId,
- channelId,
- }: RemoveProductFromChannelMessage['data']): Observable<RemoveProductFromChannelMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- return asyncObservable(async () => {
- await this.deleteProductInternal(productId, channelId);
- const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
- await this.deleteVariantsInternal(variants.map(v => v.id), channelId);
- return true;
- });
- }
- /**
- * Updates the search index only for the affected entities.
- */
- @MessagePattern(UpdateVariantMessage.pattern)
- updateVariants({
- ctx: rawContext,
- variantIds,
- }: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- return asyncObservable(async () => {
- return this.asyncQueue.push(async () => {
- await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
- return true;
- });
- });
- }
- @MessagePattern(DeleteVariantMessage.pattern)
- private deleteVaiants({
- ctx: rawContext,
- variantIds,
- }: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- return asyncObservable(async () => {
- await this.deleteVariantsInternal(variantIds, ctx.channelId);
- return true;
- });
- }
- @MessagePattern(UpdateVariantsByIdMessage.pattern)
- updateVariantsById({
- ctx: rawContext,
- ids,
- }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- const { batchSize } = this.options;
- return asyncObservable(async observer => {
- return this.asyncQueue.push(async () => {
- const timeStart = Date.now();
- if (ids.length) {
- const batches = Math.ceil(ids.length / batchSize);
- Logger.verbose(`Updating ${ids.length} variants...`, loggerCtx);
- let variantsInProduct: ProductVariant[] = [];
- for (let i = 0; i < batches; i++) {
- const begin = i * batchSize;
- const end = begin + batchSize;
- const batchIds = ids.slice(begin, end);
- const variants = await this.getVariantsByIds(ctx, batchIds);
- variantsInProduct = await this.processVariantBatch(
- variants,
- variantsInProduct,
- (operations, variant) => {
- operations.push(
- { update: { _id: this.getId(variant.id, ctx.channelId) } },
- { doc: this.createVariantIndexItem(variant, ctx.channelId) },
- );
- },
- (operations, product, _variants) => {
- operations.push(
- { update: { _id: this.getId(product.id, ctx.channelId) } },
- { doc: this.createProductIndexItem(_variants, ctx.channelId) },
- );
- },
- );
- observer.next({
- total: ids.length,
- completed: Math.min((i + 1) * batchSize, ids.length),
- duration: +new Date() - timeStart,
- });
- }
- }
- Logger.verbose(`Completed updating variants`, loggerCtx);
- return {
- total: ids.length,
- completed: ids.length,
- duration: +new Date() - timeStart,
- };
- });
- });
- }
- @MessagePattern(ReindexMessage.pattern)
- reindex({
- ctx: rawContext,
- dropIndices,
- }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- const { batchSize } = this.options;
- return asyncObservable(async observer => {
- return this.asyncQueue.push(async () => {
- const timeStart = Date.now();
- if (dropIndices) {
- await deleteIndices(this.client, this.options.indexPrefix);
- await createIndices(this.client, this.options.indexPrefix);
- } else {
- await deleteByChannel(this.client, this.options.indexPrefix, ctx.channelId);
- }
- const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
- const count = await qb.andWhere('variants__product.deletedAt IS NULL').getCount();
- Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
- const batches = Math.ceil(count / batchSize);
- let variantsInProduct: ProductVariant[] = [];
- for (let i = 0; i < batches; i++) {
- const variants = await this.getBatch(ctx, qb, i);
- Logger.verbose(
- `Processing batch ${i + 1} of ${batches}. ProductVariants count: ${variants.length}`,
- loggerCtx,
- );
- variantsInProduct = await this.processVariantBatch(
- variants,
- variantsInProduct,
- (operations, variant) => {
- operations.push(
- { index: { _id: this.getId(variant.id, ctx.channelId) } },
- this.createVariantIndexItem(variant, ctx.channelId),
- );
- },
- (operations, product, _variants) => {
- operations.push(
- { index: { _id: this.getId(product.id, ctx.channelId) } },
- this.createProductIndexItem(_variants, ctx.channelId),
- );
- },
- );
- observer.next({
- total: count,
- completed: Math.min((i + 1) * batchSize, count),
- duration: +new Date() - timeStart,
- });
- }
- Logger.verbose(`Completed reindexing!`, loggerCtx);
- return {
- total: count,
- completed: count,
- duration: +new Date() - timeStart,
- };
- });
- });
- }
- private async processVariantBatch(
- variants: ProductVariant[],
- variantsInProduct: ProductVariant[],
- processVariants: (
- operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem> | VariantIndexItem>,
- variant: ProductVariant,
- ) => void,
- processProducts: (
- operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem> | ProductIndexItem>,
- product: Product,
- variants: ProductVariant[],
- ) => void,
- ) {
- const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
- const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
- const productIdsIndexed = new Set<ID>();
- // tslint:disable-next-line:prefer-for-of
- for (let j = 0; j < variants.length; j++) {
- const variant = variants[j];
- variantsInProduct.push(variant);
- processVariants(variantsToIndex, variant);
- const nextVariant = variants[j + 1];
- const nextVariantIsNewProduct = nextVariant && nextVariant.productId !== variant.productId;
- const thisVariantIsLastAndProductNotAdded =
- !nextVariant && !productIdsIndexed.has(variant.productId);
- if (nextVariantIsNewProduct || thisVariantIsLastAndProductNotAdded) {
- processProducts(productsToIndex, variant.product, variantsInProduct);
- variantsInProduct = [];
- productIdsIndexed.add(variant.productId);
- }
- }
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
- return variantsInProduct;
- }
- private async updateVariantsInternal(ctx: RequestContext, variantIds: ID[], channelId: ID) {
- let updatedVariants: ProductVariant[] = [];
- const productVariants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
- relations: variantRelations,
- });
- updatedVariants = this.hydrateVariants(ctx, productVariants);
- if (updatedVariants.length) {
- // When ProductVariants change, we need to update the corresponding Product index
- // since e.g. price changes must be reflected on the Product level too.
- const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
- for (const variantProductId of productIdsOfVariants) {
- await this.updateProductInternal(ctx, variantProductId, channelId);
- }
- const operations = updatedVariants.reduce(
- (ops, variant) => {
- return [
- ...ops,
- { update: { _id: this.getId(variant.id, channelId) } },
- { doc: this.createVariantIndexItem(variant, channelId), doc_as_upsert: true },
- ];
- },
- [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
- );
- Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
- }
- }
- private async updateProductInternal(ctx: RequestContext, productId: ID, channelId: ID) {
- let updatedProductVariants: ProductVariant[] = [];
- const product = await this.connection.getRepository(Product).findOne(productId, {
- relations: ['variants'],
- });
- if (product) {
- updatedProductVariants = await this.connection
- .getRepository(ProductVariant)
- .findByIds(product.variants.map(v => v.id), {
- relations: variantRelations,
- });
- if (product.enabled === false) {
- updatedProductVariants.forEach(v => (v.enabled = false));
- }
- }
- if (updatedProductVariants.length) {
- Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
- updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
- const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants, channelId);
- const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
- { update: { _id: this.getId(updatedProductIndexItem.productId, channelId) } },
- { doc: updatedProductIndexItem, doc_as_upsert: true },
- ];
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
- }
- }
- private async deleteProductInternal(productId: ID, channelId: ID) {
- Logger.verbose(`Deleting 1 Product (${productId})`, loggerCtx);
- const operations: BulkOperation[] = [{ delete: { _id: this.getId(productId, channelId) } }];
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
- }
- private async deleteVariantsInternal(variantIds: ID[], channelId: ID) {
- Logger.verbose(`Deleting ${variantIds.length} ProductVariants`, loggerCtx);
- const operations: BulkOperation[] = variantIds.map(id => ({
- delete: { _id: this.getId(id, channelId) },
- }));
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
- }
- private async executeBulkOperations(
- indexName: string,
- indexType: string,
- operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
- ) {
- try {
- const fullIndexName = this.options.indexPrefix + indexName;
- const { body }: { body: BulkResponseBody } = await this.client.bulk({
- refresh: 'true',
- index: fullIndexName,
- type: indexType,
- body: operations,
- });
- if (body.errors) {
- Logger.error(
- `Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`,
- loggerCtx,
- );
- body.items.forEach(item => {
- if (item.index) {
- Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
- }
- if (item.update) {
- Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
- }
- if (item.delete) {
- Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
- }
- });
- } else {
- Logger.verbose(
- `Executed ${body.items.length} bulk operations on index [${fullIndexName}]`,
- loggerCtx,
- );
- }
- return body;
- } catch (e) {
- Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
- Logger.error('Error details: ' + JSON.stringify(e.body && e.body.error, null, 2), loggerCtx);
- }
- }
- private getSearchIndexQueryBuilder(channelId: ID) {
- const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
- FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
- relations: variantRelations,
- order: {
- productId: 'ASC',
- },
- });
- FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
- qb.leftJoin('variants.product', '__product')
- .leftJoin('__product.channels', '__channel')
- .where('__channel.id = :channelId', { channelId });
- return qb;
- }
- private async getBatch(
- ctx: RequestContext,
- qb: SelectQueryBuilder<ProductVariant>,
- batchNumber: string | number,
- ): Promise<ProductVariant[]> {
- const { batchSize } = this.options;
- const i = Number.parseInt(batchNumber.toString(), 10);
- const variants = await qb
- .andWhere('variants__product.deletedAt IS NULL')
- .take(batchSize)
- .skip(i * batchSize)
- .getMany();
- return this.hydrateVariants(ctx, variants);
- }
- private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
- const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
- relations: variantRelations,
- });
- return this.hydrateVariants(ctx, variants);
- }
- /**
- * Given an array of ProductVariants, this method applies the correct taxes and translations.
- */
- private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
- return variants
- .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
- .map(v => translateDeep(v, ctx.languageCode, ['product']));
- }
- private createVariantIndexItem(v: ProductVariant, channelId: ID): VariantIndexItem {
- const item: VariantIndexItem = {
- channelId,
- productVariantId: v.id as string,
- sku: v.sku,
- slug: v.product.slug,
- productId: v.product.id as string,
- productName: v.product.name,
- productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
- productVariantName: v.name,
- productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
- price: v.price,
- priceWithTax: v.priceWithTax,
- currencyCode: v.currencyCode,
- description: v.product.description,
- facetIds: this.getFacetIds([v]),
- channelIds: v.product.channels.map(c => c.id as string),
- facetValueIds: this.getFacetValueIds([v]),
- collectionIds: v.collections.map(c => c.id.toString()),
- enabled: v.enabled && v.product.enabled,
- };
- const customMappings = Object.entries(this.options.customProductVariantMappings);
- for (const [name, def] of customMappings) {
- item[name] = def.valueFn(v);
- }
- return item;
- }
- private createProductIndexItem(variants: ProductVariant[], channelId: ID): ProductIndexItem {
- const first = variants[0];
- const prices = variants.map(v => v.price);
- const pricesWithTax = variants.map(v => v.priceWithTax);
- const item: ProductIndexItem = {
- channelId,
- sku: variants.map(v => v.sku),
- slug: variants.map(v => v.product.slug),
- productId: first.product.id,
- productName: variants.map(v => v.product.name),
- productPreview: first.product.featuredAsset ? first.product.featuredAsset.preview : '',
- productVariantId: variants.map(v => v.id),
- productVariantName: variants.map(v => v.name),
- productVariantPreview: variants.filter(v => v.featuredAsset).map(v => v.featuredAsset.preview),
- priceMin: Math.min(...prices),
- priceMax: Math.max(...prices),
- priceWithTaxMin: Math.min(...pricesWithTax),
- priceWithTaxMax: Math.max(...pricesWithTax),
- currencyCode: first.currencyCode,
- description: first.product.description,
- facetIds: this.getFacetIds(variants),
- facetValueIds: this.getFacetValueIds(variants),
- collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
- channelIds: first.product.channels.map(c => c.id as string),
- enabled: variants.some(v => v.enabled),
- };
- const customMappings = Object.entries(this.options.customProductMappings);
- for (const [name, def] of customMappings) {
- item[name] = def.valueFn(variants[0].product, variants);
- }
- return item;
- }
- private getFacetIds(variants: ProductVariant[]): string[] {
- const facetIds = (fv: FacetValue) => fv.facet.id.toString();
- const variantFacetIds = variants.reduce(
- (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
- [] as string[],
- );
- const productFacetIds = variants[0].product.facetValues.map(facetIds);
- return unique([...variantFacetIds, ...productFacetIds]);
- }
- private getFacetValueIds(variants: ProductVariant[]): string[] {
- const facetValueIds = (fv: FacetValue) => fv.id.toString();
- const variantFacetValueIds = variants.reduce(
- (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
- [] as string[],
- );
- const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
- return unique([...variantFacetValueIds, ...productFacetValueIds]);
- }
- private getId(entityId: ID, channelId: ID): string {
- return `${channelId.toString()}__${entityId.toString()}`;
- }
- }
|