indexer.controller.ts 11 KB


  1. import { Controller } from '@nestjs/common';
  2. import { MessagePattern } from '@nestjs/microservices';
  3. import { InjectConnection } from '@nestjs/typeorm';
  4. import { LanguageCode } from '@vendure/common/lib/generated-types';
  5. import { ID } from '@vendure/common/lib/shared-types';
  6. import { unique } from '@vendure/common/lib/unique';
  7. import { defer, Observable } from 'rxjs';
  8. import { Connection } from 'typeorm';
  9. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  10. import { RequestContext } from '../../../api/common/request-context';
  11. import { Logger } from '../../../config/logger/vendure-logger';
  12. import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
  13. import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
  14. import { Product } from '../../../entity/product/product.entity';
  15. import { translateDeep } from '../../../service/helpers/utils/translate-entity';
  16. import { ProductVariantService } from '../../../service/services/product-variant.service';
  17. import { TaxRateService } from '../../../service/services/tax-rate.service';
  18. import { AsyncQueue } from '../async-queue';
  19. import { SearchIndexItem } from '../search-index-item.entity';
  20. import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from '../types';
  21. export const BATCH_SIZE = 1000;
  22. export const variantRelations = [
  23. 'product',
  24. 'product.featuredAsset',
  25. 'product.facetValues',
  26. 'product.facetValues.facet',
  27. 'featuredAsset',
  28. 'facetValues',
  29. 'facetValues.facet',
  30. 'collections',
  31. 'taxCategory',
  32. ];
  33. export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
  34. @Controller()
  35. export class IndexerController {
  36. private queue = new AsyncQueue('search-index');
  37. constructor(
  38. @InjectConnection() private connection: Connection,
  39. private productVariantService: ProductVariantService,
  40. private taxRateService: TaxRateService,
  41. ) {}
  42. @MessagePattern(ReindexMessage.pattern)
  43. reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
  44. const ctx = RequestContext.fromObject(rawContext);
  45. return new Observable(observer => {
  46. (async () => {
  47. const timeStart = Date.now();
  48. const qb = this.getSearchIndexQueryBuilder();
  49. const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
  50. Logger.verbose(`Reindexing ${count} variants`, workerLoggerCtx);
  51. const batches = Math.ceil(count / BATCH_SIZE);
  52. // Ensure tax rates are up-to-date.
  53. await this.taxRateService.updateActiveTaxRates();
  54. await this.connection
  55. .getRepository(SearchIndexItem)
  56. .delete({ languageCode: ctx.languageCode });
  57. Logger.verbose('Deleted existing index items', workerLoggerCtx);
  58. for (let i = 0; i < batches; i++) {
  59. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
  60. const variants = await qb
  61. .where('variants__product.deletedAt IS NULL')
  62. .take(BATCH_SIZE)
  63. .skip(i * BATCH_SIZE)
  64. .getMany();
  65. const hydratedVariants = this.hydrateVariants(ctx, variants);
  66. await this.saveVariants(ctx, hydratedVariants);
  67. observer.next({
  68. total: count,
  69. completed: Math.min((i + 1) * BATCH_SIZE, count),
  70. duration: +new Date() - timeStart,
  71. });
  72. }
  73. Logger.verbose(`Completed reindexing`, workerLoggerCtx);
  74. observer.next({
  75. total: count,
  76. completed: count,
  77. duration: +new Date() - timeStart,
  78. });
  79. observer.complete();
  80. })();
  81. });
  82. }
  83. @MessagePattern(UpdateVariantsByIdMessage.pattern)
  84. updateVariantsById({
  85. ctx: rawContext,
  86. ids,
  87. }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
  88. const ctx = RequestContext.fromObject(rawContext);
  89. return new Observable(observer => {
  90. (async () => {
  91. const timeStart = Date.now();
  92. if (ids.length) {
  93. const batches = Math.ceil(ids.length / BATCH_SIZE);
  94. Logger.verbose(`Updating ${ids.length} variants...`);
  95. for (let i = 0; i < batches; i++) {
  96. const begin = i * BATCH_SIZE;
  97. const end = begin + BATCH_SIZE;
  98. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  99. const batchIds = ids.slice(begin, end);
  100. const batch = await this.connection
  101. .getRepository(ProductVariant)
  102. .findByIds(batchIds, {
  103. relations: variantRelations,
  104. });
  105. const variants = this.hydrateVariants(ctx, batch);
  106. await this.saveVariants(ctx, variants);
  107. observer.next({
  108. total: ids.length,
  109. completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
  110. duration: +new Date() - timeStart,
  111. });
  112. }
  113. }
  114. Logger.verbose(`Completed reindexing!`);
  115. observer.next({
  116. total: ids.length,
  117. completed: ids.length,
  118. duration: +new Date() - timeStart,
  119. });
  120. observer.complete();
  121. })();
  122. });
  123. }
  124. /**
  125. * Updates the search index only for the affected entities.
  126. */
  127. @MessagePattern(UpdateProductOrVariantMessage.pattern)
  128. updateProductOrVariant(data: UpdateProductOrVariantMessage['data']): Observable<boolean> {
  129. const ctx = RequestContext.fromObject(data.ctx);
  130. const { productId, variantId } = data;
  131. let updatedVariants: ProductVariant[] = [];
  132. let removedVariantIds: ID[] = [];
  133. return defer(async () => {
  134. if (data.productId) {
  135. const product = await this.connection.getRepository(Product).findOne(productId, {
  136. relations: ['variants'],
  137. });
  138. if (product) {
  139. if (product.deletedAt) {
  140. removedVariantIds = product.variants.map(v => v.id);
  141. } else {
  142. updatedVariants = await this.connection
  143. .getRepository(ProductVariant)
  144. .findByIds(product.variants.map(v => v.id), {
  145. relations: variantRelations,
  146. });
  147. if (product.enabled === false) {
  148. updatedVariants.forEach(v => (v.enabled = false));
  149. }
  150. }
  151. }
  152. } else {
  153. const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
  154. relations: variantRelations,
  155. });
  156. if (variant) {
  157. updatedVariants = [variant];
  158. }
  159. }
  160. Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
  161. updatedVariants = this.hydrateVariants(ctx, updatedVariants);
  162. if (updatedVariants.length) {
  163. await this.saveVariants(ctx, updatedVariants);
  164. }
  165. if (removedVariantIds.length) {
  166. await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
  167. }
  168. return true;
  169. });
  170. }
  171. private getSearchIndexQueryBuilder() {
  172. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  173. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  174. relations: variantRelations,
  175. });
  176. FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
  177. return qb;
  178. }
  179. /**
  180. * Given an array of ProductVariants, this method applies the correct taxes and translations.
  181. */
  182. private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
  183. return variants
  184. .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
  185. .map(v => translateDeep(v, ctx.languageCode, ['product']));
  186. }
  187. private async saveVariants(ctx: RequestContext, variants: ProductVariant[]) {
  188. const items = variants.map(
  189. (v: ProductVariant) =>
  190. new SearchIndexItem({
  191. sku: v.sku,
  192. enabled: v.enabled,
  193. slug: v.product.slug,
  194. price: v.price,
  195. priceWithTax: v.priceWithTax,
  196. languageCode: ctx.languageCode,
  197. productVariantId: v.id,
  198. productId: v.product.id,
  199. productName: v.product.name,
  200. description: v.product.description,
  201. productVariantName: v.name,
  202. productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
  203. productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
  204. facetIds: this.getFacetIds(v),
  205. facetValueIds: this.getFacetValueIds(v),
  206. collectionIds: v.collections.map(c => c.id.toString()),
  207. }),
  208. );
  209. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
  210. }
  211. private getFacetIds(variant: ProductVariant): string[] {
  212. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  213. const variantFacetIds = variant.facetValues.map(facetIds);
  214. const productFacetIds = variant.product.facetValues.map(facetIds);
  215. return unique([...variantFacetIds, ...productFacetIds]);
  216. }
  217. private getFacetValueIds(variant: ProductVariant): string[] {
  218. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  219. const variantFacetValueIds = variant.facetValues.map(facetValueIds);
  220. const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
  221. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  222. }
  223. /**
  224. * Remove items from the search index
  225. */
  226. private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
  227. const compositeKeys = variantIds.map(id => ({
  228. productVariantId: id,
  229. languageCode,
  230. })) as any[];
  231. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
  232. }
  233. }