indexer.controller.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import { Controller } from '@nestjs/common';
  2. import { MessagePattern } from '@nestjs/microservices';
  3. import { InjectConnection } from '@nestjs/typeorm';
  4. import { LanguageCode } from '@vendure/common/lib/generated-types';
  5. import { ID } from '@vendure/common/lib/shared-types';
  6. import { unique } from '@vendure/common/lib/unique';
  7. import { defer, Observable } from 'rxjs';
  8. import { Connection } from 'typeorm';
  9. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  10. import { RequestContext } from '../../../api/common/request-context';
  11. import { AsyncQueue } from '../../../common/async-queue';
  12. import { Logger } from '../../../config/logger/vendure-logger';
  13. import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
  14. import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
  15. import { Product } from '../../../entity/product/product.entity';
  16. import { translateDeep } from '../../../service/helpers/utils/translate-entity';
  17. import { ProductVariantService } from '../../../service/services/product-variant.service';
  18. import { asyncObservable } from '../../../worker/async-observable';
  19. import { SearchIndexItem } from '../search-index-item.entity';
  20. import {
  21. AssignProductToChannelMessage,
  22. DeleteProductMessage,
  23. DeleteVariantMessage,
  24. ReindexMessage,
  25. RemoveProductFromChannelMessage,
  26. UpdateAssetMessage,
  27. UpdateProductMessage,
  28. UpdateVariantMessage,
  29. UpdateVariantsByIdMessage,
  30. } from '../types';
  31. export const BATCH_SIZE = 1000;
  32. export const variantRelations = [
  33. 'product',
  34. 'product.featuredAsset',
  35. 'product.facetValues',
  36. 'product.facetValues.facet',
  37. 'product.channels',
  38. 'featuredAsset',
  39. 'facetValues',
  40. 'facetValues.facet',
  41. 'collections',
  42. 'taxCategory',
  43. ];
  44. export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
  45. @Controller()
  46. export class IndexerController {
  47. private queue = new AsyncQueue('search-index');
  48. constructor(
  49. @InjectConnection() private connection: Connection,
  50. private productVariantService: ProductVariantService,
  51. ) {}
  52. @MessagePattern(ReindexMessage.pattern)
  53. reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
  54. const ctx = RequestContext.deserialize(rawContext);
  55. return asyncObservable(async (observer) => {
  56. const timeStart = Date.now();
  57. const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
  58. const count = await qb.getCount();
  59. Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
  60. const batches = Math.ceil(count / BATCH_SIZE);
  61. await this.connection
  62. .getRepository(SearchIndexItem)
  63. .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
  64. Logger.verbose('Deleted existing index items', workerLoggerCtx);
  65. for (let i = 0; i < batches; i++) {
  66. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
  67. const variants = await qb
  68. .andWhere('variants__product.deletedAt IS NULL')
  69. .take(BATCH_SIZE)
  70. .skip(i * BATCH_SIZE)
  71. .getMany();
  72. const hydratedVariants = this.hydrateVariants(ctx, variants);
  73. await this.saveVariants(ctx.languageCode, ctx.channelId, hydratedVariants);
  74. observer.next({
  75. total: count,
  76. completed: Math.min((i + 1) * BATCH_SIZE, count),
  77. duration: +new Date() - timeStart,
  78. });
  79. }
  80. Logger.verbose(`Completed reindexing`, workerLoggerCtx);
  81. return {
  82. total: count,
  83. completed: count,
  84. duration: +new Date() - timeStart,
  85. };
  86. });
  87. }
  88. @MessagePattern(UpdateVariantsByIdMessage.pattern)
  89. updateVariantsById({
  90. ctx: rawContext,
  91. ids,
  92. }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
  93. const ctx = RequestContext.deserialize(rawContext);
  94. return asyncObservable(async (observer) => {
  95. const timeStart = Date.now();
  96. if (ids.length) {
  97. const batches = Math.ceil(ids.length / BATCH_SIZE);
  98. Logger.verbose(`Updating ${ids.length} variants...`);
  99. for (let i = 0; i < batches; i++) {
  100. const begin = i * BATCH_SIZE;
  101. const end = begin + BATCH_SIZE;
  102. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  103. const batchIds = ids.slice(begin, end);
  104. const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
  105. relations: variantRelations,
  106. where: { deletedAt: null },
  107. });
  108. const variants = this.hydrateVariants(ctx, batch);
  109. await this.saveVariants(ctx.languageCode, ctx.channelId, variants);
  110. observer.next({
  111. total: ids.length,
  112. completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
  113. duration: +new Date() - timeStart,
  114. });
  115. }
  116. }
  117. Logger.verbose(`Completed reindexing!`);
  118. return {
  119. total: ids.length,
  120. completed: ids.length,
  121. duration: +new Date() - timeStart,
  122. };
  123. });
  124. }
  125. @MessagePattern(UpdateProductMessage.pattern)
  126. updateProduct(data: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
  127. const ctx = RequestContext.deserialize(data.ctx);
  128. return asyncObservable(async () => {
  129. return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
  130. });
  131. }
  132. @MessagePattern(UpdateVariantMessage.pattern)
  133. updateVariants(data: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
  134. const ctx = RequestContext.deserialize(data.ctx);
  135. return asyncObservable(async () => {
  136. return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
  137. });
  138. }
  139. @MessagePattern(DeleteProductMessage.pattern)
  140. deleteProduct(data: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
  141. const ctx = RequestContext.deserialize(data.ctx);
  142. return asyncObservable(async () => {
  143. return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
  144. });
  145. }
  146. @MessagePattern(DeleteVariantMessage.pattern)
  147. deleteVariant(data: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
  148. const ctx = RequestContext.deserialize(data.ctx);
  149. return asyncObservable(async () => {
  150. const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
  151. if (variants.length) {
  152. await this.removeSearchIndexItems(
  153. ctx.languageCode,
  154. ctx.channelId,
  155. variants.map((v) => v.id),
  156. );
  157. }
  158. return true;
  159. });
  160. }
  161. @MessagePattern(AssignProductToChannelMessage.pattern)
  162. assignProductToChannel(
  163. data: AssignProductToChannelMessage['data'],
  164. ): Observable<AssignProductToChannelMessage['response']> {
  165. const ctx = RequestContext.deserialize(data.ctx);
  166. return asyncObservable(async () => {
  167. return this.updateProductInChannel(ctx, data.productId, data.channelId);
  168. });
  169. }
  170. @MessagePattern(RemoveProductFromChannelMessage.pattern)
  171. removeProductFromChannel(
  172. data: RemoveProductFromChannelMessage['data'],
  173. ): Observable<RemoveProductFromChannelMessage['response']> {
  174. const ctx = RequestContext.deserialize(data.ctx);
  175. return asyncObservable(async () => {
  176. return this.deleteProductInChannel(ctx, data.productId, data.channelId);
  177. });
  178. }
  179. @MessagePattern(UpdateAssetMessage.pattern)
  180. updateAsset(data: UpdateAssetMessage['data']): Observable<UpdateAssetMessage['response']> {
  181. return asyncObservable(async () => {
  182. const id = data.asset.id;
  183. function getFocalPoint(point?: { x: number; y: number }) {
  184. return point && point.x && point.y ? point : null;
  185. }
  186. const focalPoint = getFocalPoint(data.asset.focalPoint);
  187. await this.connection
  188. .getRepository(SearchIndexItem)
  189. .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
  190. await this.connection
  191. .getRepository(SearchIndexItem)
  192. .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
  193. return true;
  194. });
  195. }
  196. private async updateProductInChannel(
  197. ctx: RequestContext,
  198. productId: ID,
  199. channelId: ID,
  200. ): Promise<boolean> {
  201. const product = await this.connection.getRepository(Product).findOne(productId, {
  202. relations: ['variants'],
  203. });
  204. if (product) {
  205. let updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(
  206. product.variants.map((v) => v.id),
  207. {
  208. relations: variantRelations,
  209. where: { deletedAt: null },
  210. },
  211. );
  212. if (product.enabled === false) {
  213. updatedVariants.forEach((v) => (v.enabled = false));
  214. }
  215. Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
  216. updatedVariants = this.hydrateVariants(ctx, updatedVariants);
  217. if (updatedVariants.length) {
  218. await this.saveVariants(ctx.languageCode, channelId, updatedVariants);
  219. }
  220. }
  221. return true;
  222. }
  223. private async updateVariantsInChannel(
  224. ctx: RequestContext,
  225. variantIds: ID[],
  226. channelId: ID,
  227. ): Promise<boolean> {
  228. const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  229. relations: variantRelations,
  230. where: { deletedAt: null },
  231. });
  232. if (variants) {
  233. const updatedVariants = this.hydrateVariants(ctx, variants);
  234. Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
  235. await this.saveVariants(ctx.languageCode, channelId, updatedVariants);
  236. }
  237. return true;
  238. }
  239. private async deleteProductInChannel(
  240. ctx: RequestContext,
  241. productId: ID,
  242. channelId: ID,
  243. ): Promise<boolean> {
  244. const product = await this.connection.getRepository(Product).findOne(productId, {
  245. relations: ['variants'],
  246. });
  247. if (product) {
  248. const removedVariantIds = product.variants.map((v) => v.id);
  249. if (removedVariantIds.length) {
  250. await this.removeSearchIndexItems(ctx.languageCode, channelId, removedVariantIds);
  251. }
  252. }
  253. return true;
  254. }
  255. private getSearchIndexQueryBuilder(channelId: ID) {
  256. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  257. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  258. relations: variantRelations,
  259. });
  260. FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
  261. qb.leftJoin('variants.product', 'product')
  262. .leftJoin('product.channels', 'channel')
  263. .where('channel.id = :channelId', { channelId })
  264. .andWhere('variants__product.deletedAt IS NULL')
  265. .andWhere('variants.deletedAt IS NULL');
  266. return qb;
  267. }
  268. /**
  269. * Given an array of ProductVariants, this method applies the correct taxes and translations.
  270. */
  271. private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
  272. return variants
  273. .map((v) => this.productVariantService.applyChannelPriceAndTax(v, ctx))
  274. .map((v) => translateDeep(v, ctx.languageCode, ['product']));
  275. }
  276. private async saveVariants(languageCode: LanguageCode, channelId: ID, variants: ProductVariant[]) {
  277. const items = variants.map(
  278. (v: ProductVariant) =>
  279. new SearchIndexItem({
  280. productVariantId: v.id,
  281. channelId,
  282. languageCode,
  283. sku: v.sku,
  284. enabled: v.enabled,
  285. slug: v.product.slug,
  286. price: v.price,
  287. priceWithTax: v.priceWithTax,
  288. productId: v.product.id,
  289. productName: v.product.name,
  290. description: v.product.description,
  291. productVariantName: v.name,
  292. productAssetId: v.product.featuredAsset ? v.product.featuredAsset.id : null,
  293. productPreviewFocalPoint: v.product.featuredAsset
  294. ? v.product.featuredAsset.focalPoint
  295. : null,
  296. productVariantPreviewFocalPoint: v.featuredAsset ? v.featuredAsset.focalPoint : null,
  297. productVariantAssetId: v.featuredAsset ? v.featuredAsset.id : null,
  298. productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
  299. productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
  300. channelIds: v.product.channels.map((c) => c.id as string),
  301. facetIds: this.getFacetIds(v),
  302. facetValueIds: this.getFacetValueIds(v),
  303. collectionIds: v.collections.map((c) => c.id.toString()),
  304. }),
  305. );
  306. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
  307. }
  308. private getFacetIds(variant: ProductVariant): string[] {
  309. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  310. const variantFacetIds = variant.facetValues.map(facetIds);
  311. const productFacetIds = variant.product.facetValues.map(facetIds);
  312. return unique([...variantFacetIds, ...productFacetIds]);
  313. }
  314. private getFacetValueIds(variant: ProductVariant): string[] {
  315. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  316. const variantFacetValueIds = variant.facetValues.map(facetValueIds);
  317. const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
  318. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  319. }
  320. /**
  321. * Remove items from the search index
  322. */
  323. private async removeSearchIndexItems(languageCode: LanguageCode, channelId: ID, variantIds: ID[]) {
  324. const compositeKeys = variantIds.map((id) => ({
  325. productVariantId: id,
  326. channelId,
  327. languageCode,
  328. })) as any[];
  329. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
  330. }
  331. }