| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import { Controller } from '@nestjs/common';
- import { MessagePattern } from '@nestjs/microservices';
- import { InjectConnection } from '@nestjs/typeorm';
- import { LanguageCode } from '@vendure/common/lib/generated-types';
- import { ID } from '@vendure/common/lib/shared-types';
- import { unique } from '@vendure/common/lib/unique';
- import { defer, Observable } from 'rxjs';
- import { Connection } from 'typeorm';
- import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
- import { RequestContext } from '../../../api/common/request-context';
- import { Logger } from '../../../config/logger/vendure-logger';
- import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
- import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
- import { Product } from '../../../entity/product/product.entity';
- import { translateDeep } from '../../../service/helpers/utils/translate-entity';
- import { ProductVariantService } from '../../../service/services/product-variant.service';
- import { TaxRateService } from '../../../service/services/tax-rate.service';
- import { AsyncQueue } from '../async-queue';
- import { loggerCtx, Message } from '../constants';
- import { SearchIndexItem } from '../search-index-item.entity';
- export const BATCH_SIZE = 1000;
- export const variantRelations = [
- 'product',
- 'product.featuredAsset',
- 'product.facetValues',
- 'product.facetValues.facet',
- 'featuredAsset',
- 'facetValues',
- 'facetValues.facet',
- 'collections',
- 'taxCategory',
- ];
- export interface ReindexMessageResponse {
- total: number;
- completed: number;
- duration: number;
- }
- @Controller()
- export class IndexerController {
- private queue = new AsyncQueue('search-index');
- constructor(
- @InjectConnection() private connection: Connection,
- private productVariantService: ProductVariantService,
- private taxRateService: TaxRateService,
- ) {}
- @MessagePattern(Message.Reindex)
- reindex({ ctx: rawContext }: { ctx: any }): Observable<ReindexMessageResponse> {
- const ctx = RequestContext.fromObject(rawContext);
- return new Observable(observer => {
- (async () => {
- const timeStart = Date.now();
- const qb = this.getSearchIndexQueryBuilder();
- const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
- Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
- const batches = Math.ceil(count / BATCH_SIZE);
- // Ensure tax rates are up-to-date.
- await this.taxRateService.updateActiveTaxRates();
- await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
- Logger.verbose('Deleted existing index items', loggerCtx);
- for (let i = 0; i < batches; i++) {
- Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
- const variants = await qb
- .where('variants__product.deletedAt IS NULL')
- .take(BATCH_SIZE)
- .skip(i * BATCH_SIZE)
- .getMany();
- const hydratedVariants = this.hydrateVariants(ctx, variants);
- await this.saveVariants(ctx, hydratedVariants);
- observer.next({
- total: count,
- completed: Math.min((i + 1) * BATCH_SIZE, count),
- duration: +new Date() - timeStart,
- });
- }
- Logger.verbose(`Completed reindexing!`);
- observer.next({
- total: count,
- completed: count,
- duration: +new Date() - timeStart,
- });
- observer.complete();
- })();
- });
- }
- @MessagePattern(Message.UpdateVariantsById)
- updateVariantsById({ ctx: rawContext, ids }: { ctx: any, ids: ID[] }): Observable<ReindexMessageResponse> {
- const ctx = RequestContext.fromObject(rawContext);
- return new Observable(observer => {
- (async () => {
- const timeStart = Date.now();
- if (ids.length) {
- const batches = Math.ceil(ids.length / BATCH_SIZE);
- Logger.verbose(`Updating ${ids.length} variants...`);
- for (let i = 0; i < batches; i++) {
- const begin = i * BATCH_SIZE;
- const end = begin + BATCH_SIZE;
- Logger.verbose(`Updating ids from index ${begin} to ${end}`);
- const batchIds = ids.slice(begin, end);
- const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
- relations: variantRelations,
- });
- const variants = this.hydrateVariants(ctx, batch);
- await this.saveVariants(ctx, variants);
- observer.next({
- total: ids.length,
- completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
- duration: +new Date() - timeStart,
- });
- }
- }
- Logger.verbose(`Completed reindexing!`);
- observer.next({
- total: ids.length,
- completed: ids.length,
- duration: +new Date() - timeStart,
- });
- observer.complete();
- })();
- });
- }
- /**
- * Updates the search index only for the affected entities.
- */
- @MessagePattern(Message.UpdateProductOrVariant)
- updateProductOrVariant({ ctx: rawContext, productId, variantId }: { ctx: any, productId?: ID, variantId?: ID }): Observable<boolean> {
- const ctx = RequestContext.fromObject(rawContext);
- let updatedVariants: ProductVariant[] = [];
- let removedVariantIds: ID[] = [];
- return defer(async () => {
- if (productId) {
- const product = await this.connection.getRepository(Product).findOne(productId, {
- relations: ['variants'],
- });
- if (product) {
- if (product.deletedAt) {
- removedVariantIds = product.variants.map(v => v.id);
- } else {
- updatedVariants = await this.connection
- .getRepository(ProductVariant)
- .findByIds(product.variants.map(v => v.id), {
- relations: variantRelations,
- });
- if (product.enabled === false) {
- updatedVariants.forEach(v => v.enabled = false);
- }
- }
- }
- } else {
- const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
- relations: variantRelations,
- });
- if (variant) {
- updatedVariants = [variant];
- }
- }
- Logger.verbose(`Updating ${updatedVariants.length} variants`, loggerCtx);
- updatedVariants = this.hydrateVariants(ctx, updatedVariants);
- if (updatedVariants.length) {
- await this.saveVariants(ctx, updatedVariants);
- }
- if (removedVariantIds.length) {
- await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
- }
- return true;
- });
- }
- private getSearchIndexQueryBuilder() {
- const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
- FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
- relations: variantRelations,
- });
- FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
- return qb;
- }
- /**
- * Given an array of ProductVariants, this method applies the correct taxes and translations.
- */
- private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
- return variants
- .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
- .map(v => translateDeep(v, ctx.languageCode, ['product']));
- }
- private async saveVariants(ctx: RequestContext, variants: ProductVariant[]) {
- const items = variants.map((v: ProductVariant) =>
- new SearchIndexItem({
- sku: v.sku,
- enabled: v.enabled,
- slug: v.product.slug,
- price: v.price,
- priceWithTax: v.priceWithTax,
- languageCode: ctx.languageCode,
- productVariantId: v.id,
- productId: v.product.id,
- productName: v.product.name,
- description: v.product.description,
- productVariantName: v.name,
- productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
- productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
- facetIds: this.getFacetIds(v),
- facetValueIds: this.getFacetValueIds(v),
- collectionIds: v.collections.map(c => c.id.toString()),
- }),
- );
- await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
- }
- private getFacetIds(variant: ProductVariant): string[] {
- const facetIds = (fv: FacetValue) => fv.facet.id.toString();
- const variantFacetIds = variant.facetValues.map(facetIds);
- const productFacetIds = variant.product.facetValues.map(facetIds);
- return unique([...variantFacetIds, ...productFacetIds]);
- }
- private getFacetValueIds(variant: ProductVariant): string[] {
- const facetValueIds = (fv: FacetValue) => fv.id.toString();
- const variantFacetValueIds = variant.facetValues.map(facetValueIds);
- const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
- return unique([...variantFacetValueIds, ...productFacetValueIds]);
- }
- /**
- * Remove items from the search index
- */
- private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
- const compositeKeys = variantIds.map(id => ({
- productVariantId: id,
- languageCode,
- })) as any[];
- await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
- }
- }
|