| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- import { Inject, Injectable } from '@nestjs/common';
- import { LanguageCode } from '@vendure/common/lib/generated-types';
- import { ID } from '@vendure/common/lib/shared-types';
- import { unique } from '@vendure/common/lib/unique';
- import { Observable } from 'rxjs';
- import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
- import { RequestContext } from '../../../api/common/request-context';
- import { RequestContextCacheService } from '../../../cache/request-context-cache.service';
- import { AsyncQueue } from '../../../common/async-queue';
- import { Translatable, Translation } from '../../../common/types/locale-types';
- import { asyncObservable, idsAreEqual } from '../../../common/utils';
- import { ConfigService } from '../../../config/config.service';
- import { Logger } from '../../../config/logger/vendure-logger';
- import { TransactionalConnection } from '../../../connection/transactional-connection';
- import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
- import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
- import { Product } from '../../../entity/product/product.entity';
- import { ProductPriceApplicator } from '../../../service/helpers/product-price-applicator/product-price-applicator';
- import { ProductVariantService } from '../../../service/services/product-variant.service';
- import { PLUGIN_INIT_OPTIONS } from '../constants';
- import { SearchIndexItem } from '../entities/search-index-item.entity';
- import {
- DefaultSearchPluginInitOptions,
- ProductChannelMessageData,
- ReindexMessageData,
- ReindexMessageResponse,
- UpdateAssetMessageData,
- UpdateProductMessageData,
- UpdateVariantMessageData,
- UpdateVariantsByIdMessageData,
- VariantChannelMessageData,
- } from '../types';
- import { MutableRequestContext } from './mutable-request-context';
- export const BATCH_SIZE = 1000;
- export const variantRelations = [
- 'product',
- 'product.featuredAsset',
- 'product.facetValues',
- 'product.facetValues.facet',
- 'product.channels',
- 'featuredAsset',
- 'facetValues',
- 'facetValues.facet',
- 'collections',
- 'taxCategory',
- 'channels',
- 'channels.defaultTaxZone',
- ];
- export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
- @Injectable()
- export class IndexerController {
- private queue = new AsyncQueue('search-index');
- constructor(
- private connection: TransactionalConnection,
- private productPriceApplicator: ProductPriceApplicator,
- private configService: ConfigService,
- private requestContextCache: RequestContextCacheService,
- private productVariantService: ProductVariantService,
- @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
- ) {}
- reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
- const ctx = MutableRequestContext.deserialize(rawContext);
- return asyncObservable(async observer => {
- const timeStart = Date.now();
- const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
- const count = await qb.getCount();
- Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
- const batches = Math.ceil(count / BATCH_SIZE);
- await this.connection
- .getRepository(SearchIndexItem)
- .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
- Logger.verbose('Deleted existing index items', workerLoggerCtx);
- for (let i = 0; i < batches; i++) {
- Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
- const variants = await qb
- .andWhere('variants__product.deletedAt IS NULL')
- .take(BATCH_SIZE)
- .skip(i * BATCH_SIZE)
- .getMany();
- await this.saveVariants(ctx, variants);
- observer.next({
- total: count,
- completed: Math.min((i + 1) * BATCH_SIZE, count),
- duration: +new Date() - timeStart,
- });
- }
- Logger.verbose(`Completed reindexing`, workerLoggerCtx);
- return {
- total: count,
- completed: count,
- duration: +new Date() - timeStart,
- };
- });
- }
- updateVariantsById({
- ctx: rawContext,
- ids,
- }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
- const ctx = MutableRequestContext.deserialize(rawContext);
- return asyncObservable(async observer => {
- const timeStart = Date.now();
- if (ids.length) {
- const batches = Math.ceil(ids.length / BATCH_SIZE);
- Logger.verbose(`Updating ${ids.length} variants...`);
- for (let i = 0; i < batches; i++) {
- const begin = i * BATCH_SIZE;
- const end = begin + BATCH_SIZE;
- Logger.verbose(`Updating ids from index ${begin} to ${end}`);
- const batchIds = ids.slice(begin, end);
- const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
- relations: variantRelations,
- where: { deletedAt: null },
- });
- await this.saveVariants(ctx, batch);
- observer.next({
- total: ids.length,
- completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
- duration: +new Date() - timeStart,
- });
- }
- }
- Logger.verbose(`Completed reindexing!`);
- return {
- total: ids.length,
- completed: ids.length,
- duration: +new Date() - timeStart,
- };
- });
- }
- async updateProduct(data: UpdateProductMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
- }
- async updateVariants(data: UpdateVariantMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
- }
- async deleteProduct(data: UpdateProductMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
- }
- async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
- if (variants.length) {
- const languageVariants = unique([
- ...variants
- .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
- .map(t => t.languageCode),
- ]);
- await this.removeSearchIndexItems(
- ctx.channelId,
- variants.map(v => v.id),
- languageVariants,
- );
- }
- return true;
- }
- async assignProductToChannel(data: ProductChannelMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- return this.updateProductInChannel(ctx, data.productId, data.channelId);
- }
- async removeProductFromChannel(data: ProductChannelMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- return this.deleteProductInChannel(ctx, data.productId, data.channelId);
- }
- async assignVariantToChannel(data: VariantChannelMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
- }
- async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
- const ctx = MutableRequestContext.deserialize(data.ctx);
- const variant = await this.connection.getRepository(ProductVariant).findOne(data.productVariantId);
- const languageVariants = variant?.translations.map(t => t.languageCode) ?? [];
- await this.removeSearchIndexItems(data.channelId, [data.productVariantId], languageVariants);
- return true;
- }
- async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
- const id = data.asset.id;
- function getFocalPoint(point?: { x: number; y: number }) {
- return point && point.x && point.y ? point : null;
- }
- const focalPoint = getFocalPoint(data.asset.focalPoint);
- await this.connection
- .getRepository(SearchIndexItem)
- .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
- await this.connection
- .getRepository(SearchIndexItem)
- .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
- return true;
- }
- async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
- const id = data.asset.id;
- await this.connection
- .getRepository(SearchIndexItem)
- .update({ productAssetId: id }, { productAssetId: null });
- await this.connection
- .getRepository(SearchIndexItem)
- .update({ productVariantAssetId: id }, { productVariantAssetId: null });
- return true;
- }
- private async updateProductInChannel(
- ctx: MutableRequestContext,
- productId: ID,
- channelId: ID,
- ): Promise<boolean> {
- const product = await this.connection.getRepository(Product).findOne(productId, {
- relations: ['variants'],
- });
- if (product) {
- const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(
- product.variants.map(v => v.id),
- {
- relations: variantRelations,
- where: { deletedAt: null },
- },
- );
- if (updatedVariants.length === 0) {
- await this.saveSyntheticVariant(ctx, product);
- } else {
- if (product.enabled === false) {
- updatedVariants.forEach(v => (v.enabled = false));
- }
- const variantsInCurrentChannel = updatedVariants.filter(
- v => !!v.channels.find(c => idsAreEqual(c.id, ctx.channelId)),
- );
- Logger.verbose(`Updating ${variantsInCurrentChannel.length} variants`, workerLoggerCtx);
- if (variantsInCurrentChannel.length) {
- await this.saveVariants(ctx, variantsInCurrentChannel);
- }
- }
- }
- return true;
- }
- private async updateVariantsInChannel(
- ctx: MutableRequestContext,
- variantIds: ID[],
- channelId: ID,
- ): Promise<boolean> {
- const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
- relations: variantRelations,
- where: { deletedAt: null },
- });
- if (variants) {
- Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
- await this.saveVariants(ctx, variants);
- }
- return true;
- }
- private async deleteProductInChannel(
- ctx: RequestContext,
- productId: ID,
- channelId: ID,
- ): Promise<boolean> {
- const product = await this.connection.getRepository(Product).findOne(productId, {
- relations: ['variants'],
- });
- if (product) {
- const languageVariants = unique([
- ...product.translations.map(t => t.languageCode),
- ...product.variants
- .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
- .map(t => t.languageCode),
- ]);
- const removedVariantIds = product.variants.map(v => v.id);
- if (removedVariantIds.length) {
- await this.removeSearchIndexItems(channelId, removedVariantIds, languageVariants);
- }
- }
- return true;
- }
- private getSearchIndexQueryBuilder(channelId: ID) {
- const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
- FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
- relations: variantRelations,
- });
- FindOptionsUtils.joinEagerRelations(
- qb,
- qb.alias,
- this.connection.rawConnection.getMetadata(ProductVariant),
- );
- qb.leftJoin('variants.product', 'product')
- .leftJoin('product.channels', 'channel')
- .where('channel.id = :channelId', { channelId })
- .andWhere('variants__product.deletedAt IS NULL')
- .andWhere('variants.deletedAt IS NULL');
- return qb;
- }
- private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
- const items: SearchIndexItem[] = [];
- await this.removeSyntheticVariants(variants);
- for (const variant of variants) {
- const languageVariants = unique([
- ...variant.translations.map(t => t.languageCode),
- ...variant.product.translations.map(t => t.languageCode),
- ]);
- for (const languageCode of languageVariants) {
- const productTranslation = this.getTranslation(variant.product, languageCode);
- const variantTranslation = this.getTranslation(variant, languageCode);
- const collectionTranslations = variant.collections.map(c =>
- this.getTranslation(c, languageCode),
- );
- for (const channel of variant.channels) {
- ctx.setChannel(channel);
- await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
- const item = new SearchIndexItem({
- channelId: channel.id,
- languageCode,
- productVariantId: variant.id,
- price: variant.price,
- priceWithTax: variant.priceWithTax,
- sku: variant.sku,
- enabled: variant.product.enabled === false ? false : variant.enabled,
- slug: productTranslation.slug,
- productId: variant.product.id,
- productName: productTranslation.name,
- description: this.constrainDescription(productTranslation.description),
- productVariantName: variantTranslation.name,
- productAssetId: variant.product.featuredAsset
- ? variant.product.featuredAsset.id
- : null,
- productPreviewFocalPoint: variant.product.featuredAsset
- ? variant.product.featuredAsset.focalPoint
- : null,
- productVariantPreviewFocalPoint: variant.featuredAsset
- ? variant.featuredAsset.focalPoint
- : null,
- productVariantAssetId: variant.featuredAsset ? variant.featuredAsset.id : null,
- productPreview: variant.product.featuredAsset
- ? variant.product.featuredAsset.preview
- : '',
- productVariantPreview: variant.featuredAsset ? variant.featuredAsset.preview : '',
- channelIds: variant.channels.map(c => c.id as string),
- facetIds: this.getFacetIds(variant),
- facetValueIds: this.getFacetValueIds(variant),
- collectionIds: variant.collections.map(c => c.id.toString()),
- collectionSlugs: collectionTranslations.map(c => c.slug),
- });
- if (this.options.indexStockStatus) {
- item.inStock =
- 0 < (await this.productVariantService.getSaleableStockLevel(ctx, variant));
- const productInStock = await this.requestContextCache.get(
- ctx,
- `productVariantsStock-${variant.productId}`,
- () =>
- this.connection
- .getRepository(ctx, ProductVariant)
- .find({
- loadEagerRelations: false,
- where: {
- productId: variant.productId,
- },
- })
- .then(_variants =>
- Promise.all(
- _variants.map(v =>
- this.productVariantService.getSaleableStockLevel(ctx, v),
- ),
- ),
- )
- .then(stockLevels => stockLevels.some(stockLevel => 0 < stockLevel)),
- );
- item.productInStock = productInStock;
- }
- items.push(item);
- }
- }
- }
- await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items, { chunk: 2500 }));
- }
- /**
- * If a Product has no variants, we create a synthetic variant for the purposes
- * of making that product visible via the search query.
- */
- private async saveSyntheticVariant(ctx: RequestContext, product: Product) {
- const productTranslation = this.getTranslation(product, ctx.languageCode);
- const item = new SearchIndexItem({
- channelId: ctx.channelId,
- languageCode: ctx.languageCode,
- productVariantId: 0,
- price: 0,
- priceWithTax: 0,
- sku: '',
- enabled: false,
- slug: productTranslation.slug,
- productId: product.id,
- productName: productTranslation.name,
- description: this.constrainDescription(productTranslation.description),
- productVariantName: productTranslation.name,
- productAssetId: product.featuredAsset?.id ?? null,
- productPreviewFocalPoint: product.featuredAsset?.focalPoint ?? null,
- productVariantPreviewFocalPoint: null,
- productVariantAssetId: null,
- productPreview: product.featuredAsset?.preview ?? '',
- productVariantPreview: '',
- channelIds: [ctx.channelId.toString()],
- facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
- facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
- collectionIds: [],
- collectionSlugs: [],
- });
- await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(item));
- }
- /**
- * Removes any synthetic variants for the given product
- */
- private async removeSyntheticVariants(variants: ProductVariant[]) {
- const prodIds = unique(variants.map(v => v.productId));
- for (const productId of prodIds) {
- await this.queue.push(() =>
- this.connection.getRepository(SearchIndexItem).delete({
- productId,
- sku: '',
- price: 0,
- }),
- );
- }
- }
- private getTranslation<T extends Translatable>(
- translatable: T,
- languageCode: LanguageCode,
- ): Translation<T> {
- return (translatable.translations.find(t => t.languageCode === languageCode) ||
- translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
- translatable.translations[0]) as unknown as Translation<T>;
- }
- private getFacetIds(variant: ProductVariant): string[] {
- const facetIds = (fv: FacetValue) => fv.facet.id.toString();
- const variantFacetIds = variant.facetValues.map(facetIds);
- const productFacetIds = variant.product.facetValues.map(facetIds);
- return unique([...variantFacetIds, ...productFacetIds]);
- }
- private getFacetValueIds(variant: ProductVariant): string[] {
- const facetValueIds = (fv: FacetValue) => fv.id.toString();
- const variantFacetValueIds = variant.facetValues.map(facetValueIds);
- const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
- return unique([...variantFacetValueIds, ...productFacetValueIds]);
- }
- /**
- * Remove items from the search index
- */
- private async removeSearchIndexItems(channelId: ID, variantIds: ID[], languageCodes: LanguageCode[]) {
- const keys: Array<Partial<SearchIndexItem>> = [];
- for (const productVariantId of variantIds) {
- for (const languageCode of languageCodes) {
- keys.push({
- productVariantId,
- channelId,
- languageCode,
- });
- }
- }
- await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(keys as any));
- }
- /**
- * Prevent postgres errors from too-long indices
- * https://github.com/vendure-ecommerce/vendure/issues/745
- */
- private constrainDescription(description: string): string {
- const { type } = this.connection.rawConnection.options;
- const isPostgresLike = type === 'postgres' || type === 'aurora-data-api-pg' || type === 'cockroachdb';
- if (isPostgresLike) {
- return description.substring(0, 2600);
- }
- return description;
- }
- }
|