indexer.controller.ts 22 KB


  1. import { Inject, Injectable } from '@nestjs/common';
  2. import { LanguageCode } from '@vendure/common/lib/generated-types';
  3. import { ID } from '@vendure/common/lib/shared-types';
  4. import { unique } from '@vendure/common/lib/unique';
  5. import { Observable } from 'rxjs';
  6. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  7. import { RequestContext } from '../../../api/common/request-context';
  8. import { RequestContextCacheService } from '../../../cache/request-context-cache.service';
  9. import { AsyncQueue } from '../../../common/async-queue';
  10. import { Translatable, Translation } from '../../../common/types/locale-types';
  11. import { asyncObservable, idsAreEqual } from '../../../common/utils';
  12. import { ConfigService } from '../../../config/config.service';
  13. import { Logger } from '../../../config/logger/vendure-logger';
  14. import { TransactionalConnection } from '../../../connection/transactional-connection';
  15. import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
  16. import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
  17. import { Product } from '../../../entity/product/product.entity';
  18. import { ProductPriceApplicator } from '../../../service/helpers/product-price-applicator/product-price-applicator';
  19. import { ProductVariantService } from '../../../service/services/product-variant.service';
  20. import { PLUGIN_INIT_OPTIONS } from '../constants';
  21. import { SearchIndexItem } from '../entities/search-index-item.entity';
  22. import {
  23. DefaultSearchPluginInitOptions,
  24. ProductChannelMessageData,
  25. ReindexMessageData,
  26. ReindexMessageResponse,
  27. UpdateAssetMessageData,
  28. UpdateProductMessageData,
  29. UpdateVariantMessageData,
  30. UpdateVariantsByIdMessageData,
  31. VariantChannelMessageData,
  32. } from '../types';
  33. import { MutableRequestContext } from './mutable-request-context';
  34. export const BATCH_SIZE = 1000;
  35. export const productRelations = ['featuredAsset', 'facetValues', 'facetValues.facet', 'channels'];
  36. export const variantRelations = [
  37. 'featuredAsset',
  38. 'facetValues',
  39. 'facetValues.facet',
  40. 'collections',
  41. 'taxCategory',
  42. 'channels',
  43. 'channels.defaultTaxZone',
  44. ];
  45. export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
  46. @Injectable()
  47. export class IndexerController {
  48. private queue = new AsyncQueue('search-index');
  49. constructor(
  50. private connection: TransactionalConnection,
  51. private productPriceApplicator: ProductPriceApplicator,
  52. private configService: ConfigService,
  53. private requestContextCache: RequestContextCacheService,
  54. private productVariantService: ProductVariantService,
  55. @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
  56. ) {}
  57. reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
  58. const ctx = MutableRequestContext.deserialize(rawContext);
  59. return asyncObservable(async observer => {
  60. const timeStart = Date.now();
  61. const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
  62. const count = await qb.getCount();
  63. Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
  64. const batches = Math.ceil(count / BATCH_SIZE);
  65. await this.connection
  66. .getRepository(SearchIndexItem)
  67. .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
  68. Logger.verbose('Deleted existing index items', workerLoggerCtx);
  69. for (let i = 0; i < batches; i++) {
  70. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
  71. const variants = await qb
  72. .take(BATCH_SIZE)
  73. .skip(i * BATCH_SIZE)
  74. .getMany();
  75. await this.saveVariants(ctx, variants);
  76. observer.next({
  77. total: count,
  78. completed: Math.min((i + 1) * BATCH_SIZE, count),
  79. duration: +new Date() - timeStart,
  80. });
  81. }
  82. Logger.verbose(`Completed reindexing`, workerLoggerCtx);
  83. return {
  84. total: count,
  85. completed: count,
  86. duration: +new Date() - timeStart,
  87. };
  88. });
  89. }
  90. updateVariantsById({
  91. ctx: rawContext,
  92. ids,
  93. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  94. const ctx = MutableRequestContext.deserialize(rawContext);
  95. return asyncObservable(async observer => {
  96. const timeStart = Date.now();
  97. if (ids.length) {
  98. const batches = Math.ceil(ids.length / BATCH_SIZE);
  99. Logger.verbose(`Updating ${ids.length} variants...`);
  100. for (let i = 0; i < batches; i++) {
  101. const begin = i * BATCH_SIZE;
  102. const end = begin + BATCH_SIZE;
  103. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  104. const batchIds = ids.slice(begin, end);
  105. const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
  106. relations: variantRelations,
  107. where: { deletedAt: null },
  108. });
  109. await this.saveVariants(ctx, batch);
  110. observer.next({
  111. total: ids.length,
  112. completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
  113. duration: +new Date() - timeStart,
  114. });
  115. }
  116. }
  117. Logger.verbose(`Completed reindexing!`);
  118. return {
  119. total: ids.length,
  120. completed: ids.length,
  121. duration: +new Date() - timeStart,
  122. };
  123. });
  124. }
  125. async updateProduct(data: UpdateProductMessageData): Promise<boolean> {
  126. const ctx = MutableRequestContext.deserialize(data.ctx);
  127. return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
  128. }
  129. async updateVariants(data: UpdateVariantMessageData): Promise<boolean> {
  130. const ctx = MutableRequestContext.deserialize(data.ctx);
  131. return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
  132. }
  133. async deleteProduct(data: UpdateProductMessageData): Promise<boolean> {
  134. const ctx = MutableRequestContext.deserialize(data.ctx);
  135. return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
  136. }
  137. async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
  138. const ctx = MutableRequestContext.deserialize(data.ctx);
  139. const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
  140. if (variants.length) {
  141. const languageVariants = unique([
  142. ...variants
  143. .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
  144. .map(t => t.languageCode),
  145. ]);
  146. await this.removeSearchIndexItems(
  147. ctx.channelId,
  148. variants.map(v => v.id),
  149. languageVariants,
  150. );
  151. }
  152. return true;
  153. }
  154. async assignProductToChannel(data: ProductChannelMessageData): Promise<boolean> {
  155. const ctx = MutableRequestContext.deserialize(data.ctx);
  156. return this.updateProductInChannel(ctx, data.productId, data.channelId);
  157. }
  158. async removeProductFromChannel(data: ProductChannelMessageData): Promise<boolean> {
  159. const ctx = MutableRequestContext.deserialize(data.ctx);
  160. return this.deleteProductInChannel(ctx, data.productId, data.channelId);
  161. }
  162. async assignVariantToChannel(data: VariantChannelMessageData): Promise<boolean> {
  163. const ctx = MutableRequestContext.deserialize(data.ctx);
  164. return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
  165. }
  166. async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
  167. const ctx = MutableRequestContext.deserialize(data.ctx);
  168. const variant = await this.connection.getRepository(ProductVariant).findOne(data.productVariantId);
  169. const languageVariants = variant?.translations.map(t => t.languageCode) ?? [];
  170. await this.removeSearchIndexItems(data.channelId, [data.productVariantId], languageVariants);
  171. return true;
  172. }
  173. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  174. const id = data.asset.id;
  175. function getFocalPoint(point?: { x: number; y: number }) {
  176. return point && point.x && point.y ? point : null;
  177. }
  178. const focalPoint = getFocalPoint(data.asset.focalPoint);
  179. await this.connection
  180. .getRepository(SearchIndexItem)
  181. .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
  182. await this.connection
  183. .getRepository(SearchIndexItem)
  184. .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
  185. return true;
  186. }
  187. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  188. const id = data.asset.id;
  189. await this.connection
  190. .getRepository(SearchIndexItem)
  191. .update({ productAssetId: id }, { productAssetId: null });
  192. await this.connection
  193. .getRepository(SearchIndexItem)
  194. .update({ productVariantAssetId: id }, { productVariantAssetId: null });
  195. return true;
  196. }
  197. private async updateProductInChannel(
  198. ctx: MutableRequestContext,
  199. productId: ID,
  200. channelId: ID,
  201. ): Promise<boolean> {
  202. const product = await this.connection.getRepository(Product).findOne(productId, {
  203. relations: ['variants'],
  204. });
  205. if (product) {
  206. const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(
  207. product.variants.map(v => v.id),
  208. {
  209. relations: variantRelations,
  210. where: { deletedAt: null },
  211. },
  212. );
  213. if (updatedVariants.length === 0) {
  214. await this.saveSyntheticVariant(ctx, product);
  215. } else {
  216. if (product.enabled === false) {
  217. updatedVariants.forEach(v => (v.enabled = false));
  218. }
  219. const variantsInCurrentChannel = updatedVariants.filter(
  220. v => !!v.channels.find(c => idsAreEqual(c.id, ctx.channelId)),
  221. );
  222. Logger.verbose(`Updating ${variantsInCurrentChannel.length} variants`, workerLoggerCtx);
  223. if (variantsInCurrentChannel.length) {
  224. await this.saveVariants(ctx, variantsInCurrentChannel);
  225. }
  226. }
  227. }
  228. return true;
  229. }
  230. private async updateVariantsInChannel(
  231. ctx: MutableRequestContext,
  232. variantIds: ID[],
  233. channelId: ID,
  234. ): Promise<boolean> {
  235. const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  236. relations: variantRelations,
  237. where: { deletedAt: null },
  238. });
  239. if (variants) {
  240. Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
  241. await this.saveVariants(ctx, variants);
  242. }
  243. return true;
  244. }
  245. private async deleteProductInChannel(
  246. ctx: RequestContext,
  247. productId: ID,
  248. channelId: ID,
  249. ): Promise<boolean> {
  250. const product = await this.connection.getRepository(Product).findOne(productId, {
  251. relations: ['variants'],
  252. });
  253. if (product) {
  254. const languageVariants = unique([
  255. ...product.translations.map(t => t.languageCode),
  256. ...product.variants
  257. .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
  258. .map(t => t.languageCode),
  259. ]);
  260. const removedVariantIds = product.variants.map(v => v.id);
  261. if (removedVariantIds.length) {
  262. await this.removeSearchIndexItems(channelId, removedVariantIds, languageVariants);
  263. }
  264. }
  265. return true;
  266. }
  267. private getSearchIndexQueryBuilder(channelId: ID) {
  268. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  269. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  270. relations: variantRelations,
  271. });
  272. FindOptionsUtils.joinEagerRelations(
  273. qb,
  274. qb.alias,
  275. this.connection.rawConnection.getMetadata(ProductVariant),
  276. );
  277. qb.leftJoin('variants.product', 'product')
  278. .leftJoin('product.channels', 'channel')
  279. .where('channel.id = :channelId', { channelId })
  280. .andWhere('product.deletedAt IS NULL')
  281. .andWhere('variants.deletedAt IS NULL');
  282. return qb;
  283. }
  284. private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
  285. const items: SearchIndexItem[] = [];
  286. await this.removeSyntheticVariants(variants);
  287. const productMap = new Map<ID, Product>();
  288. for (const variant of variants) {
  289. let product = productMap.get(variant.productId);
  290. if (!product) {
  291. product = await this.connection.getEntityOrThrow(ctx, Product, variant.productId, {
  292. relations: productRelations,
  293. });
  294. productMap.set(variant.productId, product);
  295. }
  296. const languageVariants = unique([
  297. ...variant.translations.map(t => t.languageCode),
  298. ...product.translations.map(t => t.languageCode),
  299. ]);
  300. for (const languageCode of languageVariants) {
  301. const productTranslation = this.getTranslation(product, languageCode);
  302. const variantTranslation = this.getTranslation(variant, languageCode);
  303. const collectionTranslations = variant.collections.map(c =>
  304. this.getTranslation(c, languageCode),
  305. );
  306. for (const channel of variant.channels) {
  307. ctx.setChannel(channel);
  308. await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
  309. const item = new SearchIndexItem({
  310. channelId: channel.id,
  311. languageCode,
  312. productVariantId: variant.id,
  313. price: variant.price,
  314. priceWithTax: variant.priceWithTax,
  315. sku: variant.sku,
  316. enabled: product.enabled === false ? false : variant.enabled,
  317. slug: productTranslation.slug,
  318. productId: product.id,
  319. productName: productTranslation.name,
  320. description: this.constrainDescription(productTranslation.description),
  321. productVariantName: variantTranslation.name,
  322. productAssetId: product.featuredAsset ? product.featuredAsset.id : null,
  323. productPreviewFocalPoint: product.featuredAsset
  324. ? product.featuredAsset.focalPoint
  325. : null,
  326. productVariantPreviewFocalPoint: variant.featuredAsset
  327. ? variant.featuredAsset.focalPoint
  328. : null,
  329. productVariantAssetId: variant.featuredAsset ? variant.featuredAsset.id : null,
  330. productPreview: product.featuredAsset ? product.featuredAsset.preview : '',
  331. productVariantPreview: variant.featuredAsset ? variant.featuredAsset.preview : '',
  332. channelIds: variant.channels.map(c => c.id as string),
  333. facetIds: this.getFacetIds(variant, product),
  334. facetValueIds: this.getFacetValueIds(variant, product),
  335. collectionIds: variant.collections.map(c => c.id.toString()),
  336. collectionSlugs: collectionTranslations.map(c => c.slug),
  337. });
  338. if (this.options.indexStockStatus) {
  339. item.inStock =
  340. 0 < (await this.productVariantService.getSaleableStockLevel(ctx, variant));
  341. const productInStock = await this.requestContextCache.get(
  342. ctx,
  343. `productVariantsStock-${variant.productId}`,
  344. () =>
  345. this.connection
  346. .getRepository(ctx, ProductVariant)
  347. .find({
  348. loadEagerRelations: false,
  349. where: {
  350. productId: variant.productId,
  351. },
  352. })
  353. .then(_variants =>
  354. Promise.all(
  355. _variants.map(v =>
  356. this.productVariantService.getSaleableStockLevel(ctx, v),
  357. ),
  358. ),
  359. )
  360. .then(stockLevels => stockLevels.some(stockLevel => 0 < stockLevel)),
  361. );
  362. item.productInStock = productInStock;
  363. }
  364. items.push(item);
  365. }
  366. }
  367. }
  368. await this.queue.push(() =>
  369. this.connection.getRepository(SearchIndexItem).save(items, { chunk: 2500 }),
  370. );
  371. }
  372. /**
  373. * If a Product has no variants, we create a synthetic variant for the purposes
  374. * of making that product visible via the search query.
  375. */
  376. private async saveSyntheticVariant(ctx: RequestContext, product: Product) {
  377. const productTranslation = this.getTranslation(product, ctx.languageCode);
  378. const item = new SearchIndexItem({
  379. channelId: ctx.channelId,
  380. languageCode: ctx.languageCode,
  381. productVariantId: 0,
  382. price: 0,
  383. priceWithTax: 0,
  384. sku: '',
  385. enabled: false,
  386. slug: productTranslation.slug,
  387. productId: product.id,
  388. productName: productTranslation.name,
  389. description: this.constrainDescription(productTranslation.description),
  390. productVariantName: productTranslation.name,
  391. productAssetId: product.featuredAsset?.id ?? null,
  392. productPreviewFocalPoint: product.featuredAsset?.focalPoint ?? null,
  393. productVariantPreviewFocalPoint: null,
  394. productVariantAssetId: null,
  395. productPreview: product.featuredAsset?.preview ?? '',
  396. productVariantPreview: '',
  397. channelIds: [ctx.channelId.toString()],
  398. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  399. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  400. collectionIds: [],
  401. collectionSlugs: [],
  402. });
  403. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(item));
  404. }
  405. /**
  406. * Removes any synthetic variants for the given product
  407. */
  408. private async removeSyntheticVariants(variants: ProductVariant[]) {
  409. const prodIds = unique(variants.map(v => v.productId));
  410. for (const productId of prodIds) {
  411. await this.queue.push(() =>
  412. this.connection.getRepository(SearchIndexItem).delete({
  413. productId,
  414. sku: '',
  415. price: 0,
  416. }),
  417. );
  418. }
  419. }
  420. private getTranslation<T extends Translatable>(
  421. translatable: T,
  422. languageCode: LanguageCode,
  423. ): Translation<T> {
  424. return (translatable.translations.find(t => t.languageCode === languageCode) ||
  425. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  426. translatable.translations[0]) as unknown as Translation<T>;
  427. }
  428. private getFacetIds(variant: ProductVariant, product: Product): string[] {
  429. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  430. const variantFacetIds = variant.facetValues.map(facetIds);
  431. const productFacetIds = product.facetValues.map(facetIds);
  432. return unique([...variantFacetIds, ...productFacetIds]);
  433. }
  434. private getFacetValueIds(variant: ProductVariant, product: Product): string[] {
  435. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  436. const variantFacetValueIds = variant.facetValues.map(facetValueIds);
  437. const productFacetValueIds = product.facetValues.map(facetValueIds);
  438. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  439. }
  440. /**
  441. * Remove items from the search index
  442. */
  443. private async removeSearchIndexItems(channelId: ID, variantIds: ID[], languageCodes: LanguageCode[]) {
  444. const keys: Array<Partial<SearchIndexItem>> = [];
  445. for (const productVariantId of variantIds) {
  446. for (const languageCode of languageCodes) {
  447. keys.push({
  448. productVariantId,
  449. channelId,
  450. languageCode,
  451. });
  452. }
  453. }
  454. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(keys as any));
  455. }
  456. /**
  457. * Prevent postgres errors from too-long indices
  458. * https://github.com/vendure-ecommerce/vendure/issues/745
  459. */
  460. private constrainDescription(description: string): string {
  461. const { type } = this.connection.rawConnection.options;
  462. const isPostgresLike = type === 'postgres' || type === 'aurora-data-api-pg' || type === 'cockroachdb';
  463. if (isPostgresLike) {
  464. return description.substring(0, 2600);
  465. }
  466. return description;
  467. }
  468. }