indexer.controller.ts 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. import { Injectable } from '@nestjs/common';
  2. import { LanguageCode } from '@vendure/common/lib/generated-types';
  3. import { ID } from '@vendure/common/lib/shared-types';
  4. import { unique } from '@vendure/common/lib/unique';
  5. import { Observable } from 'rxjs';
  6. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  7. import { RequestContext } from '../../../api/common/request-context';
  8. import { AsyncQueue } from '../../../common/async-queue';
  9. import { Translatable, Translation } from '../../../common/types/locale-types';
  10. import { asyncObservable, idsAreEqual } from '../../../common/utils';
  11. import { ConfigService } from '../../../config/config.service';
  12. import { Logger } from '../../../config/logger/vendure-logger';
  13. import { TransactionalConnection } from '../../../connection/transactional-connection';
  14. import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
  15. import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
  16. import { Product } from '../../../entity/product/product.entity';
  17. import { ProductPriceApplicator } from '../../../service/helpers/product-price-applicator/product-price-applicator';
  18. import { SearchIndexItem } from '../entities/search-index-item.entity';
  19. import {
  20. ProductChannelMessageData,
  21. ReindexMessageData,
  22. ReindexMessageResponse,
  23. UpdateAssetMessageData,
  24. UpdateProductMessageData,
  25. UpdateVariantMessageData,
  26. UpdateVariantsByIdMessageData,
  27. VariantChannelMessageData,
  28. } from '../types';
  29. export const BATCH_SIZE = 1000;
  30. export const variantRelations = [
  31. 'product',
  32. 'product.featuredAsset',
  33. 'product.facetValues',
  34. 'product.facetValues.facet',
  35. 'product.channels',
  36. 'featuredAsset',
  37. 'facetValues',
  38. 'facetValues.facet',
  39. 'collections',
  40. 'taxCategory',
  41. 'channels',
  42. 'channels.defaultTaxZone',
  43. ];
  44. export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
  45. @Injectable()
  46. export class IndexerController {
  47. private queue = new AsyncQueue('search-index');
  48. constructor(
  49. private connection: TransactionalConnection,
  50. private productPriceApplicator: ProductPriceApplicator,
  51. private configService: ConfigService,
  52. ) {}
  53. reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
  54. const ctx = RequestContext.deserialize(rawContext);
  55. return asyncObservable(async observer => {
  56. const timeStart = Date.now();
  57. const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
  58. const count = await qb.getCount();
  59. Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
  60. const batches = Math.ceil(count / BATCH_SIZE);
  61. await this.connection
  62. .getRepository(SearchIndexItem)
  63. .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
  64. Logger.verbose('Deleted existing index items', workerLoggerCtx);
  65. for (let i = 0; i < batches; i++) {
  66. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
  67. const variants = await qb
  68. .andWhere('variants__product.deletedAt IS NULL')
  69. .take(BATCH_SIZE)
  70. .skip(i * BATCH_SIZE)
  71. .getMany();
  72. await this.saveVariants(variants);
  73. observer.next({
  74. total: count,
  75. completed: Math.min((i + 1) * BATCH_SIZE, count),
  76. duration: +new Date() - timeStart,
  77. });
  78. }
  79. Logger.verbose(`Completed reindexing`, workerLoggerCtx);
  80. return {
  81. total: count,
  82. completed: count,
  83. duration: +new Date() - timeStart,
  84. };
  85. });
  86. }
  87. updateVariantsById({
  88. ctx: rawContext,
  89. ids,
  90. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  91. const ctx = RequestContext.deserialize(rawContext);
  92. return asyncObservable(async observer => {
  93. const timeStart = Date.now();
  94. if (ids.length) {
  95. const batches = Math.ceil(ids.length / BATCH_SIZE);
  96. Logger.verbose(`Updating ${ids.length} variants...`);
  97. for (let i = 0; i < batches; i++) {
  98. const begin = i * BATCH_SIZE;
  99. const end = begin + BATCH_SIZE;
  100. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  101. const batchIds = ids.slice(begin, end);
  102. const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
  103. relations: variantRelations,
  104. where: { deletedAt: null },
  105. });
  106. await this.saveVariants(batch);
  107. observer.next({
  108. total: ids.length,
  109. completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
  110. duration: +new Date() - timeStart,
  111. });
  112. }
  113. }
  114. Logger.verbose(`Completed reindexing!`);
  115. return {
  116. total: ids.length,
  117. completed: ids.length,
  118. duration: +new Date() - timeStart,
  119. };
  120. });
  121. }
  122. async updateProduct(data: UpdateProductMessageData): Promise<boolean> {
  123. const ctx = RequestContext.deserialize(data.ctx);
  124. return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
  125. }
  126. async updateVariants(data: UpdateVariantMessageData): Promise<boolean> {
  127. const ctx = RequestContext.deserialize(data.ctx);
  128. return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
  129. }
  130. async deleteProduct(data: UpdateProductMessageData): Promise<boolean> {
  131. const ctx = RequestContext.deserialize(data.ctx);
  132. return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
  133. }
  134. async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
  135. const ctx = RequestContext.deserialize(data.ctx);
  136. const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
  137. if (variants.length) {
  138. const languageVariants = unique([
  139. ...variants
  140. .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
  141. .map(t => t.languageCode),
  142. ]);
  143. await this.removeSearchIndexItems(
  144. ctx.channelId,
  145. variants.map(v => v.id),
  146. languageVariants,
  147. );
  148. }
  149. return true;
  150. }
  151. async assignProductToChannel(data: ProductChannelMessageData): Promise<boolean> {
  152. const ctx = RequestContext.deserialize(data.ctx);
  153. return this.updateProductInChannel(ctx, data.productId, data.channelId);
  154. }
  155. async removeProductFromChannel(data: ProductChannelMessageData): Promise<boolean> {
  156. const ctx = RequestContext.deserialize(data.ctx);
  157. return this.deleteProductInChannel(ctx, data.productId, data.channelId);
  158. }
  159. async assignVariantToChannel(data: VariantChannelMessageData): Promise<boolean> {
  160. const ctx = RequestContext.deserialize(data.ctx);
  161. return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
  162. }
  163. async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
  164. const ctx = RequestContext.deserialize(data.ctx);
  165. const variant = await this.connection.getRepository(ProductVariant).findOne(data.productVariantId);
  166. const languageVariants = variant?.translations.map(t => t.languageCode) ?? [];
  167. await this.removeSearchIndexItems(data.channelId, [data.productVariantId], languageVariants);
  168. return true;
  169. }
  170. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  171. const id = data.asset.id;
  172. function getFocalPoint(point?: { x: number; y: number }) {
  173. return point && point.x && point.y ? point : null;
  174. }
  175. const focalPoint = getFocalPoint(data.asset.focalPoint);
  176. await this.connection
  177. .getRepository(SearchIndexItem)
  178. .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
  179. await this.connection
  180. .getRepository(SearchIndexItem)
  181. .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
  182. return true;
  183. }
  184. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  185. const id = data.asset.id;
  186. await this.connection
  187. .getRepository(SearchIndexItem)
  188. .update({ productAssetId: id }, { productAssetId: null });
  189. await this.connection
  190. .getRepository(SearchIndexItem)
  191. .update({ productVariantAssetId: id }, { productVariantAssetId: null });
  192. return true;
  193. }
  194. private async updateProductInChannel(
  195. ctx: RequestContext,
  196. productId: ID,
  197. channelId: ID,
  198. ): Promise<boolean> {
  199. const product = await this.connection.getRepository(Product).findOne(productId, {
  200. relations: ['variants'],
  201. });
  202. if (product) {
  203. const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(
  204. product.variants.map(v => v.id),
  205. {
  206. relations: variantRelations,
  207. where: { deletedAt: null },
  208. },
  209. );
  210. if (updatedVariants.length === 0) {
  211. await this.saveSyntheticVariant(ctx, product);
  212. } else {
  213. if (product.enabled === false) {
  214. updatedVariants.forEach(v => (v.enabled = false));
  215. }
  216. const variantsInCurrentChannel = updatedVariants.filter(
  217. v => !!v.channels.find(c => idsAreEqual(c.id, ctx.channelId)),
  218. );
  219. Logger.verbose(`Updating ${variantsInCurrentChannel.length} variants`, workerLoggerCtx);
  220. if (variantsInCurrentChannel.length) {
  221. await this.saveVariants(variantsInCurrentChannel);
  222. }
  223. }
  224. }
  225. return true;
  226. }
  227. private async updateVariantsInChannel(
  228. ctx: RequestContext,
  229. variantIds: ID[],
  230. channelId: ID,
  231. ): Promise<boolean> {
  232. const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  233. relations: variantRelations,
  234. where: { deletedAt: null },
  235. });
  236. if (variants) {
  237. Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
  238. await this.saveVariants(variants);
  239. }
  240. return true;
  241. }
  242. private async deleteProductInChannel(
  243. ctx: RequestContext,
  244. productId: ID,
  245. channelId: ID,
  246. ): Promise<boolean> {
  247. const product = await this.connection.getRepository(Product).findOne(productId, {
  248. relations: ['variants'],
  249. });
  250. if (product) {
  251. const languageVariants = unique([
  252. ...product.translations.map(t => t.languageCode),
  253. ...product.variants
  254. .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
  255. .map(t => t.languageCode),
  256. ]);
  257. const removedVariantIds = product.variants.map(v => v.id);
  258. if (removedVariantIds.length) {
  259. await this.removeSearchIndexItems(channelId, removedVariantIds, languageVariants);
  260. }
  261. }
  262. return true;
  263. }
  264. private getSearchIndexQueryBuilder(channelId: ID) {
  265. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  266. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  267. relations: variantRelations,
  268. });
  269. FindOptionsUtils.joinEagerRelations(
  270. qb,
  271. qb.alias,
  272. this.connection.rawConnection.getMetadata(ProductVariant),
  273. );
  274. qb.leftJoin('variants.product', 'product')
  275. .leftJoin('product.channels', 'channel')
  276. .where('channel.id = :channelId', { channelId })
  277. .andWhere('variants__product.deletedAt IS NULL')
  278. .andWhere('variants.deletedAt IS NULL');
  279. return qb;
  280. }
  281. private async saveVariants(variants: ProductVariant[]) {
  282. const items: SearchIndexItem[] = [];
  283. await this.removeSyntheticVariants(variants);
  284. for (const variant of variants) {
  285. const languageVariants = unique([
  286. ...variant.translations.map(t => t.languageCode),
  287. ...variant.product.translations.map(t => t.languageCode),
  288. ]);
  289. for (const languageCode of languageVariants) {
  290. const productTranslation = this.getTranslation(variant.product, languageCode);
  291. const variantTranslation = this.getTranslation(variant, languageCode);
  292. const collectionTranslations = variant.collections.map(c =>
  293. this.getTranslation(c, languageCode),
  294. );
  295. for (const channel of variant.channels) {
  296. const ctx = new RequestContext({
  297. channel,
  298. apiType: 'admin',
  299. authorizedAsOwnerOnly: false,
  300. isAuthorized: true,
  301. session: {} as any,
  302. });
  303. await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
  304. items.push(
  305. new SearchIndexItem({
  306. channelId: channel.id,
  307. languageCode,
  308. productVariantId: variant.id,
  309. price: variant.price,
  310. priceWithTax: variant.priceWithTax,
  311. sku: variant.sku,
  312. enabled: variant.product.enabled === false ? false : variant.enabled,
  313. slug: productTranslation.slug,
  314. productId: variant.product.id,
  315. productName: productTranslation.name,
  316. description: this.constrainDescription(productTranslation.description),
  317. productVariantName: variantTranslation.name,
  318. productAssetId: variant.product.featuredAsset
  319. ? variant.product.featuredAsset.id
  320. : null,
  321. productPreviewFocalPoint: variant.product.featuredAsset
  322. ? variant.product.featuredAsset.focalPoint
  323. : null,
  324. productVariantPreviewFocalPoint: variant.featuredAsset
  325. ? variant.featuredAsset.focalPoint
  326. : null,
  327. productVariantAssetId: variant.featuredAsset ? variant.featuredAsset.id : null,
  328. productPreview: variant.product.featuredAsset
  329. ? variant.product.featuredAsset.preview
  330. : '',
  331. productVariantPreview: variant.featuredAsset ? variant.featuredAsset.preview : '',
  332. channelIds: variant.channels.map(c => c.id as string),
  333. facetIds: this.getFacetIds(variant),
  334. facetValueIds: this.getFacetValueIds(variant),
  335. collectionIds: variant.collections.map(c => c.id.toString()),
  336. collectionSlugs: collectionTranslations.map(c => c.slug),
  337. }),
  338. );
  339. }
  340. }
  341. }
  342. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
  343. }
  344. /**
  345. * If a Product has no variants, we create a synthetic variant for the purposes
  346. * of making that product visible via the search query.
  347. */
  348. private async saveSyntheticVariant(ctx: RequestContext, product: Product) {
  349. const productTranslation = this.getTranslation(product, ctx.languageCode);
  350. const item = new SearchIndexItem({
  351. channelId: ctx.channelId,
  352. languageCode: ctx.languageCode,
  353. productVariantId: 0,
  354. price: 0,
  355. priceWithTax: 0,
  356. sku: '',
  357. enabled: false,
  358. slug: productTranslation.slug,
  359. productId: product.id,
  360. productName: productTranslation.name,
  361. description: this.constrainDescription(productTranslation.description),
  362. productVariantName: productTranslation.name,
  363. productAssetId: product.featuredAsset?.id ?? null,
  364. productPreviewFocalPoint: product.featuredAsset?.focalPoint ?? null,
  365. productVariantPreviewFocalPoint: null,
  366. productVariantAssetId: null,
  367. productPreview: product.featuredAsset?.preview ?? '',
  368. productVariantPreview: '',
  369. channelIds: [ctx.channelId.toString()],
  370. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  371. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  372. collectionIds: [],
  373. collectionSlugs: [],
  374. });
  375. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(item));
  376. }
  377. /**
  378. * Removes any synthetic variants for the given product
  379. */
  380. private async removeSyntheticVariants(variants: ProductVariant[]) {
  381. const prodIds = unique(variants.map(v => v.productId));
  382. for (const productId of prodIds) {
  383. await this.queue.push(() =>
  384. this.connection.getRepository(SearchIndexItem).delete({
  385. productId,
  386. sku: '',
  387. price: 0,
  388. }),
  389. );
  390. }
  391. }
  392. private getTranslation<T extends Translatable>(
  393. translatable: T,
  394. languageCode: LanguageCode,
  395. ): Translation<T> {
  396. return (translatable.translations.find(t => t.languageCode === languageCode) ||
  397. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  398. translatable.translations[0]) as unknown as Translation<T>;
  399. }
  400. private getFacetIds(variant: ProductVariant): string[] {
  401. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  402. const variantFacetIds = variant.facetValues.map(facetIds);
  403. const productFacetIds = variant.product.facetValues.map(facetIds);
  404. return unique([...variantFacetIds, ...productFacetIds]);
  405. }
  406. private getFacetValueIds(variant: ProductVariant): string[] {
  407. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  408. const variantFacetValueIds = variant.facetValues.map(facetValueIds);
  409. const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
  410. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  411. }
  412. /**
  413. * Remove items from the search index
  414. */
  415. private async removeSearchIndexItems(channelId: ID, variantIds: ID[], languageCodes: LanguageCode[]) {
  416. const keys: Array<Partial<SearchIndexItem>> = [];
  417. for (const productVariantId of variantIds) {
  418. for (const languageCode of languageCodes) {
  419. keys.push({
  420. productVariantId,
  421. channelId,
  422. languageCode,
  423. });
  424. }
  425. }
  426. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(keys as any));
  427. }
  428. /**
  429. * Prevent postgres errors from too-long indices
  430. * https://github.com/vendure-ecommerce/vendure/issues/745
  431. */
  432. private constrainDescription(description: string): string {
  433. const { type } = this.connection.rawConnection.options;
  434. const isPostgresLike = type === 'postgres' || type === 'aurora-data-api-pg' || type === 'cockroachdb';
  435. if (isPostgresLike) {
  436. return description.substring(0, 2600);
  437. }
  438. return description;
  439. }
  440. }