indexer.controller.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import { Controller } from '@nestjs/common';
  2. import { MessagePattern } from '@nestjs/microservices';
  3. import { InjectConnection } from '@nestjs/typeorm';
  4. import { LanguageCode } from '@vendure/common/lib/generated-types';
  5. import { ID } from '@vendure/common/lib/shared-types';
  6. import { unique } from '@vendure/common/lib/unique';
  7. import { defer, Observable } from 'rxjs';
  8. import { Connection } from 'typeorm';
  9. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  10. import { RequestContext } from '../../../api/common/request-context';
  11. import { Logger } from '../../../config/logger/vendure-logger';
  12. import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
  13. import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
  14. import { Product } from '../../../entity/product/product.entity';
  15. import { translateDeep } from '../../../service/helpers/utils/translate-entity';
  16. import { ProductVariantService } from '../../../service/services/product-variant.service';
  17. import { TaxRateService } from '../../../service/services/tax-rate.service';
  18. import { AsyncQueue } from '../async-queue';
  19. import { loggerCtx, Message } from '../constants';
  20. import { SearchIndexItem } from '../search-index-item.entity';
  21. export const BATCH_SIZE = 1000;
  22. export const variantRelations = [
  23. 'product',
  24. 'product.featuredAsset',
  25. 'product.facetValues',
  26. 'product.facetValues.facet',
  27. 'featuredAsset',
  28. 'facetValues',
  29. 'facetValues.facet',
  30. 'collections',
  31. 'taxCategory',
  32. ];
  33. export interface ReindexMessageResponse {
  34. total: number;
  35. completed: number;
  36. duration: number;
  37. }
  38. @Controller()
  39. export class IndexerController {
  40. private queue = new AsyncQueue('search-index');
  41. constructor(
  42. @InjectConnection() private connection: Connection,
  43. private productVariantService: ProductVariantService,
  44. private taxRateService: TaxRateService,
  45. ) {}
  46. @MessagePattern(Message.Reindex)
  47. reindex({ ctx: rawContext }: { ctx: any }): Observable<ReindexMessageResponse> {
  48. const ctx = RequestContext.fromObject(rawContext);
  49. return new Observable(observer => {
  50. (async () => {
  51. const timeStart = Date.now();
  52. const qb = this.getSearchIndexQueryBuilder();
  53. const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
  54. Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
  55. const batches = Math.ceil(count / BATCH_SIZE);
  56. // Ensure tax rates are up-to-date.
  57. await this.taxRateService.updateActiveTaxRates();
  58. await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode });
  59. Logger.verbose('Deleted existing index items', loggerCtx);
  60. for (let i = 0; i < batches; i++) {
  61. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
  62. const variants = await qb
  63. .where('variants__product.deletedAt IS NULL')
  64. .take(BATCH_SIZE)
  65. .skip(i * BATCH_SIZE)
  66. .getMany();
  67. const hydratedVariants = this.hydrateVariants(ctx, variants);
  68. await this.saveVariants(ctx, hydratedVariants);
  69. observer.next({
  70. total: count,
  71. completed: Math.min((i + 1) * BATCH_SIZE, count),
  72. duration: +new Date() - timeStart,
  73. });
  74. }
  75. Logger.verbose(`Completed reindexing!`);
  76. observer.next({
  77. total: count,
  78. completed: count,
  79. duration: +new Date() - timeStart,
  80. });
  81. observer.complete();
  82. })();
  83. });
  84. }
  85. @MessagePattern(Message.UpdateVariantsById)
  86. updateVariantsById({ ctx: rawContext, ids }: { ctx: any, ids: ID[] }): Observable<ReindexMessageResponse> {
  87. const ctx = RequestContext.fromObject(rawContext);
  88. return new Observable(observer => {
  89. (async () => {
  90. const timeStart = Date.now();
  91. if (ids.length) {
  92. const batches = Math.ceil(ids.length / BATCH_SIZE);
  93. Logger.verbose(`Updating ${ids.length} variants...`);
  94. for (let i = 0; i < batches; i++) {
  95. const begin = i * BATCH_SIZE;
  96. const end = begin + BATCH_SIZE;
  97. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  98. const batchIds = ids.slice(begin, end);
  99. const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
  100. relations: variantRelations,
  101. });
  102. const variants = this.hydrateVariants(ctx, batch);
  103. await this.saveVariants(ctx, variants);
  104. observer.next({
  105. total: ids.length,
  106. completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
  107. duration: +new Date() - timeStart,
  108. });
  109. }
  110. }
  111. Logger.verbose(`Completed reindexing!`);
  112. observer.next({
  113. total: ids.length,
  114. completed: ids.length,
  115. duration: +new Date() - timeStart,
  116. });
  117. observer.complete();
  118. })();
  119. });
  120. }
  121. /**
  122. * Updates the search index only for the affected entities.
  123. */
  124. @MessagePattern(Message.UpdateProductOrVariant)
  125. updateProductOrVariant({ ctx: rawContext, productId, variantId }: { ctx: any, productId?: ID, variantId?: ID }): Observable<boolean> {
  126. const ctx = RequestContext.fromObject(rawContext);
  127. let updatedVariants: ProductVariant[] = [];
  128. let removedVariantIds: ID[] = [];
  129. return defer(async () => {
  130. if (productId) {
  131. const product = await this.connection.getRepository(Product).findOne(productId, {
  132. relations: ['variants'],
  133. });
  134. if (product) {
  135. if (product.deletedAt) {
  136. removedVariantIds = product.variants.map(v => v.id);
  137. } else {
  138. updatedVariants = await this.connection
  139. .getRepository(ProductVariant)
  140. .findByIds(product.variants.map(v => v.id), {
  141. relations: variantRelations,
  142. });
  143. if (product.enabled === false) {
  144. updatedVariants.forEach(v => v.enabled = false);
  145. }
  146. }
  147. }
  148. } else {
  149. const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
  150. relations: variantRelations,
  151. });
  152. if (variant) {
  153. updatedVariants = [variant];
  154. }
  155. }
  156. Logger.verbose(`Updating ${updatedVariants.length} variants`, loggerCtx);
  157. updatedVariants = this.hydrateVariants(ctx, updatedVariants);
  158. if (updatedVariants.length) {
  159. await this.saveVariants(ctx, updatedVariants);
  160. }
  161. if (removedVariantIds.length) {
  162. await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
  163. }
  164. return true;
  165. });
  166. }
  167. private getSearchIndexQueryBuilder() {
  168. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  169. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  170. relations: variantRelations,
  171. });
  172. FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
  173. return qb;
  174. }
  175. /**
  176. * Given an array of ProductVariants, this method applies the correct taxes and translations.
  177. */
  178. private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
  179. return variants
  180. .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
  181. .map(v => translateDeep(v, ctx.languageCode, ['product']));
  182. }
  183. private async saveVariants(ctx: RequestContext, variants: ProductVariant[]) {
  184. const items = variants.map((v: ProductVariant) =>
  185. new SearchIndexItem({
  186. sku: v.sku,
  187. enabled: v.enabled,
  188. slug: v.product.slug,
  189. price: v.price,
  190. priceWithTax: v.priceWithTax,
  191. languageCode: ctx.languageCode,
  192. productVariantId: v.id,
  193. productId: v.product.id,
  194. productName: v.product.name,
  195. description: v.product.description,
  196. productVariantName: v.name,
  197. productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
  198. productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
  199. facetIds: this.getFacetIds(v),
  200. facetValueIds: this.getFacetValueIds(v),
  201. collectionIds: v.collections.map(c => c.id.toString()),
  202. }),
  203. );
  204. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
  205. }
  206. private getFacetIds(variant: ProductVariant): string[] {
  207. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  208. const variantFacetIds = variant.facetValues.map(facetIds);
  209. const productFacetIds = variant.product.facetValues.map(facetIds);
  210. return unique([...variantFacetIds, ...productFacetIds]);
  211. }
  212. private getFacetValueIds(variant: ProductVariant): string[] {
  213. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  214. const variantFacetValueIds = variant.facetValues.map(facetValueIds);
  215. const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
  216. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  217. }
  218. /**
  219. * Remove items from the search index
  220. */
  221. private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
  222. const compositeKeys = variantIds.map(id => ({
  223. productVariantId: id,
  224. languageCode,
  225. })) as any[];
  226. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
  227. }
  228. }