| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- import { Client } from '@elastic/elasticsearch';
- import { Controller, Inject } from '@nestjs/common';
- import { MessagePattern } from '@nestjs/microservices';
- import { InjectConnection } from '@nestjs/typeorm';
- import { unique } from '@vendure/common/lib/unique';
- import {
- FacetValue,
- ID,
- JobService,
- Logger,
- Product,
- ProductVariant,
- ProductVariantService,
- RequestContext,
- translateDeep,
- } from '@vendure/core';
- import { defer, Observable } from 'rxjs';
- import { Connection, SelectQueryBuilder } from 'typeorm';
- import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
- import {
- ELASTIC_SEARCH_CLIENT,
- ELASTIC_SEARCH_OPTIONS,
- loggerCtx,
- PRODUCT_INDEX_NAME,
- PRODUCT_INDEX_TYPE,
- VARIANT_INDEX_NAME,
- VARIANT_INDEX_TYPE,
- } from './constants';
- import { ElasticsearchOptions } from './options';
- import {
- BulkOperation,
- BulkOperationDoc,
- BulkResponseBody,
- ProductIndexItem,
- ReindexMessage,
- UpdateProductOrVariantMessage,
- UpdateVariantsByIdMessage,
- VariantIndexItem,
- } from './types';
- export const variantRelations = [
- 'product',
- 'product.featuredAsset',
- 'product.facetValues',
- 'product.facetValues.facet',
- 'featuredAsset',
- 'facetValues',
- 'facetValues.facet',
- 'collections',
- 'taxCategory',
- ];
- export interface ReindexMessageResponse {
- total: number;
- completed: number;
- duration: number;
- }
- @Controller()
- export class ElasticsearchIndexerController {
- constructor(
- @InjectConnection() private connection: Connection,
- @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
- @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
- private productVariantService: ProductVariantService,
- private jobService: JobService,
- ) {}
- /**
- * Updates the search index only for the affected entities.
- */
- @MessagePattern(UpdateProductOrVariantMessage.pattern)
- updateProductOrVariant({
- ctx: rawContext,
- productId,
- variantId,
- }: UpdateProductOrVariantMessage['data']): Observable<boolean> {
- const ctx = RequestContext.fromObject(rawContext);
- return defer(async () => {
- if (productId) {
- await this.updateProduct(ctx, productId);
- } else if (variantId) {
- await this.updateProductVariant(ctx, variantId);
- }
- return true;
- });
- }
- @MessagePattern(UpdateVariantsByIdMessage.pattern)
- updateVariantsById({
- ctx: rawContext,
- ids,
- }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- const { batchSize } = this.options;
- return new Observable(observer => {
- (async () => {
- const timeStart = Date.now();
- if (ids.length) {
- const batches = Math.ceil(ids.length / batchSize);
- Logger.verbose(`Updating ${ids.length} variants...`);
- let variantsInProduct: ProductVariant[] = [];
- for (let i = 0; i < batches; i++) {
- const begin = i * batchSize;
- const end = begin + batchSize;
- Logger.verbose(`Updating ids from index ${begin} to ${end}`);
- const batchIds = ids.slice(begin, end);
- const variants = await this.getVariantsByIds(ctx, batchIds);
- const variantsToIndex: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
- const productsToIndex: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
- // tslint:disable-next-line:prefer-for-of
- for (let j = 0; j < variants.length; j++) {
- const variant = variants[j];
- variantsInProduct.push(variant);
- variantsToIndex.push({ update: { _id: variant.id.toString() } });
- variantsToIndex.push({ doc: this.createVariantIndexItem(variant) });
- const nextVariant = variants[j + 1];
- if (nextVariant && nextVariant.productId !== variant.productId) {
- productsToIndex.push({ update: { _id: variant.productId.toString() } });
- productsToIndex.push({ doc: this.createProductIndexItem(variantsInProduct) });
- variantsInProduct = [];
- }
- }
- await this.executeBulkOperations(
- VARIANT_INDEX_NAME,
- VARIANT_INDEX_TYPE,
- variantsToIndex,
- );
- await this.executeBulkOperations(
- PRODUCT_INDEX_NAME,
- PRODUCT_INDEX_TYPE,
- productsToIndex,
- );
- observer.next({
- total: ids.length,
- completed: Math.min((i + 1) * batchSize, ids.length),
- duration: +new Date() - timeStart,
- });
- }
- }
- Logger.verbose(`Completed reindexing!`);
- observer.next({
- total: ids.length,
- completed: ids.length,
- duration: +new Date() - timeStart,
- });
- observer.complete();
- })();
- });
- }
- @MessagePattern(ReindexMessage.pattern)
- reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
- const ctx = RequestContext.fromObject(rawContext);
- const { batchSize } = this.options;
- return new Observable(observer => {
- (async () => {
- const timeStart = Date.now();
- const qb = this.getSearchIndexQueryBuilder();
- const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
- Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
- const batches = Math.ceil(count / batchSize);
- let variantsInProduct: ProductVariant[] = [];
- for (let i = 0; i < batches; i++) {
- Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
- const variants = await this.getBatch(ctx, qb, i);
- Logger.verbose(`ProductVariants count: ${variants.length}`);
- const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
- const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
- // tslint:disable-next-line:prefer-for-of
- for (let j = 0; j < variants.length; j++) {
- const variant = variants[j];
- variantsInProduct.push(variant);
- variantsToIndex.push({ index: { _id: variant.id.toString() } });
- variantsToIndex.push(this.createVariantIndexItem(variant));
- const nextVariant = variants[j + 1];
- if (nextVariant && nextVariant.productId !== variant.productId) {
- productsToIndex.push({ index: { _id: variant.productId.toString() } });
- productsToIndex.push(this.createProductIndexItem(variantsInProduct) as any);
- variantsInProduct = [];
- }
- }
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
- observer.next({
- total: count,
- completed: Math.min((i + 1) * batchSize, count),
- duration: +new Date() - timeStart,
- });
- }
- Logger.verbose(`Completed reindexing!`);
- observer.next({
- total: count,
- completed: count,
- duration: +new Date() - timeStart,
- });
- observer.complete();
- })();
- });
- }
- private async updateProductVariant(ctx: RequestContext, variantId: ID) {
- let updatedVariants: ProductVariant[] = [];
- let removedVariantId: ID | undefined;
- const productVariant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
- relations: variantRelations,
- });
- if (productVariant) {
- if (productVariant.deletedAt) {
- removedVariantId = variantId;
- } else {
- updatedVariants = this.hydrateVariants(ctx, [productVariant]);
- }
- }
- if (updatedVariants.length) {
- // When ProductVariants change, we need to update the corresponding Product index
- // since e.g. price changes must be reflected on the Product level too.
- const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
- for (const variantProductId of productIdsOfVariants) {
- await this.updateProduct(ctx, variantProductId);
- }
- const operations = updatedVariants.reduce(
- (ops, variant) => {
- return [
- ...ops,
- { update: { _id: variant.id.toString() } },
- { doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
- ];
- },
- [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
- );
- Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
- }
- if (removedVariantId) {
- Logger.verbose(`Deleting 1 ProductVariant (${removedVariantId})`, loggerCtx);
- const operations: BulkOperation[] = [{ delete: { _id: removedVariantId.toString() } }];
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
- }
- }
- private async updateProduct(ctx: RequestContext, productId: ID) {
- let updatedProductVariants: ProductVariant[] = [];
- let removedProductId: ID | undefined;
- let removedVariantIds: ID[] = [];
- const product = await this.connection.getRepository(Product).findOne(productId, {
- relations: ['variants'],
- });
- if (product) {
- if (product.deletedAt) {
- removedProductId = productId;
- removedVariantIds = product.variants.map(v => v.id);
- } else {
- updatedProductVariants = await this.connection
- .getRepository(ProductVariant)
- .findByIds(product.variants.map(v => v.id), {
- relations: variantRelations,
- });
- }
- }
- if (updatedProductVariants.length) {
- Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
- updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
- const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
- const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
- { update: { _id: updatedProductIndexItem.productId.toString() } },
- { doc: updatedProductIndexItem, doc_as_upsert: true },
- ];
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
- }
- if (removedVariantIds.length) {
- const operations = removedVariantIds.reduce(
- (ops, id) => {
- Logger.verbose(`Deleting 1 ProductVariant (${id})`, loggerCtx);
- return [...ops, { delete: { _id: id.toString() } }];
- },
- [] as BulkOperation[],
- );
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
- }
- if (removedProductId) {
- Logger.verbose(`Deleting 1 Product (${removedProductId})`, loggerCtx);
- const operations: BulkOperation[] = [{ delete: { _id: removedProductId.toString() } }];
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
- }
- }
- private async executeBulkOperations(
- indexName: string,
- indexType: string,
- operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
- ) {
- try {
- const fullIndexName = this.options.indexPrefix + indexName;
- const { body }: { body: BulkResponseBody } = await this.client.bulk({
- refresh: 'true',
- index: fullIndexName,
- type: indexType,
- body: operations,
- });
- if (body.errors) {
- Logger.error(
- `Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`,
- loggerCtx,
- );
- body.items.forEach(item => {
- if (item.index) {
- Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
- }
- if (item.update) {
- Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
- }
- if (item.delete) {
- Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
- }
- });
- } else {
- Logger.verbose(`Executed ${body.items.length} bulk operations on index [${fullIndexName}]`);
- }
- return body;
- } catch (e) {
- Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
- Logger.error('Error details: ' + JSON.stringify(e.body.error, null, 2), loggerCtx);
- }
- }
- private getSearchIndexQueryBuilder() {
- const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
- FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
- relations: variantRelations,
- order: {
- productId: 'ASC',
- },
- });
- FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
- return qb;
- }
- private async getBatch(
- ctx: RequestContext,
- qb: SelectQueryBuilder<ProductVariant>,
- batchNumber: string | number,
- ): Promise<ProductVariant[]> {
- const { batchSize } = this.options;
- const i = Number.parseInt(batchNumber.toString(), 10);
- const variants = await qb
- .where('variants__product.deletedAt IS NULL')
- .take(batchSize)
- .skip(i * batchSize)
- .getMany();
- return this.hydrateVariants(ctx, variants);
- }
- private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
- const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
- relations: variantRelations,
- });
- return this.hydrateVariants(ctx, variants);
- }
- /**
- * Given an array of ProductVariants, this method applies the correct taxes and translations.
- */
- private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
- return variants
- .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
- .map(v => translateDeep(v, ctx.languageCode, ['product']));
- }
- private createVariantIndexItem(v: ProductVariant): VariantIndexItem {
- return {
- sku: v.sku,
- slug: v.product.slug,
- productId: v.product.id as string,
- productName: v.product.name,
- productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
- productVariantId: v.id as string,
- productVariantName: v.name,
- productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
- price: v.price,
- priceWithTax: v.priceWithTax,
- currencyCode: v.currencyCode,
- description: v.product.description,
- facetIds: this.getFacetIds([v]),
- facetValueIds: this.getFacetValueIds([v]),
- collectionIds: v.collections.map(c => c.id.toString()),
- enabled: v.enabled && v.product.enabled,
- };
- }
- private createProductIndexItem(variants: ProductVariant[]): ProductIndexItem {
- const first = variants[0];
- const prices = variants.map(v => v.price);
- const pricesWithTax = variants.map(v => v.priceWithTax);
- return {
- sku: variants.map(v => v.sku),
- slug: variants.map(v => v.product.slug),
- productId: first.product.id,
- productName: variants.map(v => v.product.name),
- productPreview: first.product.featuredAsset ? first.product.featuredAsset.preview : '',
- productVariantId: variants.map(v => v.id),
- productVariantName: variants.map(v => v.name),
- productVariantPreview: variants.filter(v => v.featuredAsset).map(v => v.featuredAsset.preview),
- priceMin: Math.min(...prices),
- priceMax: Math.max(...prices),
- priceWithTaxMin: Math.min(...pricesWithTax),
- priceWithTaxMax: Math.max(...pricesWithTax),
- currencyCode: first.currencyCode,
- description: first.product.description,
- facetIds: this.getFacetIds(variants),
- facetValueIds: this.getFacetValueIds(variants),
- collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
- enabled: first.product.enabled,
- };
- }
- private getFacetIds(variants: ProductVariant[]): string[] {
- const facetIds = (fv: FacetValue) => fv.facet.id.toString();
- const variantFacetIds = variants.reduce(
- (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
- [] as string[],
- );
- const productFacetIds = variants[0].product.facetValues.map(facetIds);
- return unique([...variantFacetIds, ...productFacetIds]);
- }
- private getFacetValueIds(variants: ProductVariant[]): string[] {
- const facetValueIds = (fv: FacetValue) => fv.id.toString();
- const variantFacetValueIds = variants.reduce(
- (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
- [] as string[],
- );
- const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
- return unique([...variantFacetValueIds, ...productFacetValueIds]);
- }
- }
|