indexer.controller.ts 22 KB


  1. import { Inject, Injectable } from '@nestjs/common';
  2. import { LanguageCode } from '@vendure/common/lib/generated-types';
  3. import { ID } from '@vendure/common/lib/shared-types';
  4. import { unique } from '@vendure/common/lib/unique';
  5. import { Observable } from 'rxjs';
  6. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  7. import { RequestContext } from '../../../api/common/request-context';
  8. import { RequestContextCacheService } from '../../../cache/request-context-cache.service';
  9. import { AsyncQueue } from '../../../common/async-queue';
  10. import { Translatable, Translation } from '../../../common/types/locale-types';
  11. import { asyncObservable, idsAreEqual } from '../../../common/utils';
  12. import { ConfigService } from '../../../config/config.service';
  13. import { Logger } from '../../../config/logger/vendure-logger';
  14. import { TransactionalConnection } from '../../../connection/transactional-connection';
  15. import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
  16. import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
  17. import { Product } from '../../../entity/product/product.entity';
  18. import { ProductPriceApplicator } from '../../../service/helpers/product-price-applicator/product-price-applicator';
  19. import { ProductVariantService } from '../../../service/services/product-variant.service';
  20. import { PLUGIN_INIT_OPTIONS } from '../constants';
  21. import { SearchIndexItem } from '../entities/search-index-item.entity';
  22. import {
  23. DefaultSearchPluginInitOptions,
  24. ProductChannelMessageData,
  25. ReindexMessageData,
  26. ReindexMessageResponse,
  27. UpdateAssetMessageData,
  28. UpdateProductMessageData,
  29. UpdateVariantMessageData,
  30. UpdateVariantsByIdMessageData,
  31. VariantChannelMessageData,
  32. } from '../types';
  33. import { MutableRequestContext } from './mutable-request-context';
  34. export const BATCH_SIZE = 1000;
  35. export const variantRelations = [
  36. 'product',
  37. 'product.featuredAsset',
  38. 'product.facetValues',
  39. 'product.facetValues.facet',
  40. 'product.channels',
  41. 'featuredAsset',
  42. 'facetValues',
  43. 'facetValues.facet',
  44. 'collections',
  45. 'taxCategory',
  46. 'channels',
  47. 'channels.defaultTaxZone',
  48. ];
  49. export const workerLoggerCtx = 'DefaultSearchPlugin Worker';
  50. @Injectable()
  51. export class IndexerController {
  52. private queue = new AsyncQueue('search-index');
  53. constructor(
  54. private connection: TransactionalConnection,
  55. private productPriceApplicator: ProductPriceApplicator,
  56. private configService: ConfigService,
  57. private requestContextCache: RequestContextCacheService,
  58. private productVariantService: ProductVariantService,
  59. @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
  60. ) {}
  61. reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
  62. const ctx = MutableRequestContext.deserialize(rawContext);
  63. return asyncObservable(async observer => {
  64. const timeStart = Date.now();
  65. const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
  66. const count = await qb.getCount();
  67. Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
  68. const batches = Math.ceil(count / BATCH_SIZE);
  69. await this.connection
  70. .getRepository(SearchIndexItem)
  71. .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
  72. Logger.verbose('Deleted existing index items', workerLoggerCtx);
  73. for (let i = 0; i < batches; i++) {
  74. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
  75. const variants = await qb
  76. .andWhere('variants__product.deletedAt IS NULL')
  77. .take(BATCH_SIZE)
  78. .skip(i * BATCH_SIZE)
  79. .getMany();
  80. await this.saveVariants(ctx, variants);
  81. observer.next({
  82. total: count,
  83. completed: Math.min((i + 1) * BATCH_SIZE, count),
  84. duration: +new Date() - timeStart,
  85. });
  86. }
  87. Logger.verbose(`Completed reindexing`, workerLoggerCtx);
  88. return {
  89. total: count,
  90. completed: count,
  91. duration: +new Date() - timeStart,
  92. };
  93. });
  94. }
  95. updateVariantsById({
  96. ctx: rawContext,
  97. ids,
  98. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  99. const ctx = MutableRequestContext.deserialize(rawContext);
  100. return asyncObservable(async observer => {
  101. const timeStart = Date.now();
  102. if (ids.length) {
  103. const batches = Math.ceil(ids.length / BATCH_SIZE);
  104. Logger.verbose(`Updating ${ids.length} variants...`);
  105. for (let i = 0; i < batches; i++) {
  106. const begin = i * BATCH_SIZE;
  107. const end = begin + BATCH_SIZE;
  108. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  109. const batchIds = ids.slice(begin, end);
  110. const batch = await this.connection.getRepository(ProductVariant).findByIds(batchIds, {
  111. relations: variantRelations,
  112. where: { deletedAt: null },
  113. });
  114. await this.saveVariants(ctx, batch);
  115. observer.next({
  116. total: ids.length,
  117. completed: Math.min((i + 1) * BATCH_SIZE, ids.length),
  118. duration: +new Date() - timeStart,
  119. });
  120. }
  121. }
  122. Logger.verbose(`Completed reindexing!`);
  123. return {
  124. total: ids.length,
  125. completed: ids.length,
  126. duration: +new Date() - timeStart,
  127. };
  128. });
  129. }
  130. async updateProduct(data: UpdateProductMessageData): Promise<boolean> {
  131. const ctx = MutableRequestContext.deserialize(data.ctx);
  132. return this.updateProductInChannel(ctx, data.productId, ctx.channelId);
  133. }
  134. async updateVariants(data: UpdateVariantMessageData): Promise<boolean> {
  135. const ctx = MutableRequestContext.deserialize(data.ctx);
  136. return this.updateVariantsInChannel(ctx, data.variantIds, ctx.channelId);
  137. }
  138. async deleteProduct(data: UpdateProductMessageData): Promise<boolean> {
  139. const ctx = MutableRequestContext.deserialize(data.ctx);
  140. return this.deleteProductInChannel(ctx, data.productId, ctx.channelId);
  141. }
  142. async deleteVariant(data: UpdateVariantMessageData): Promise<boolean> {
  143. const ctx = MutableRequestContext.deserialize(data.ctx);
  144. const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
  145. if (variants.length) {
  146. const languageVariants = unique([
  147. ...variants
  148. .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
  149. .map(t => t.languageCode),
  150. ]);
  151. await this.removeSearchIndexItems(
  152. ctx.channelId,
  153. variants.map(v => v.id),
  154. languageVariants,
  155. );
  156. }
  157. return true;
  158. }
  159. async assignProductToChannel(data: ProductChannelMessageData): Promise<boolean> {
  160. const ctx = MutableRequestContext.deserialize(data.ctx);
  161. return this.updateProductInChannel(ctx, data.productId, data.channelId);
  162. }
  163. async removeProductFromChannel(data: ProductChannelMessageData): Promise<boolean> {
  164. const ctx = MutableRequestContext.deserialize(data.ctx);
  165. return this.deleteProductInChannel(ctx, data.productId, data.channelId);
  166. }
  167. async assignVariantToChannel(data: VariantChannelMessageData): Promise<boolean> {
  168. const ctx = MutableRequestContext.deserialize(data.ctx);
  169. return this.updateVariantsInChannel(ctx, [data.productVariantId], data.channelId);
  170. }
  171. async removeVariantFromChannel(data: VariantChannelMessageData): Promise<boolean> {
  172. const ctx = MutableRequestContext.deserialize(data.ctx);
  173. const variant = await this.connection.getRepository(ProductVariant).findOne(data.productVariantId);
  174. const languageVariants = variant?.translations.map(t => t.languageCode) ?? [];
  175. await this.removeSearchIndexItems(data.channelId, [data.productVariantId], languageVariants);
  176. return true;
  177. }
  178. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  179. const id = data.asset.id;
  180. function getFocalPoint(point?: { x: number; y: number }) {
  181. return point && point.x && point.y ? point : null;
  182. }
  183. const focalPoint = getFocalPoint(data.asset.focalPoint);
  184. await this.connection
  185. .getRepository(SearchIndexItem)
  186. .update({ productAssetId: id }, { productPreviewFocalPoint: focalPoint });
  187. await this.connection
  188. .getRepository(SearchIndexItem)
  189. .update({ productVariantAssetId: id }, { productVariantPreviewFocalPoint: focalPoint });
  190. return true;
  191. }
  192. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  193. const id = data.asset.id;
  194. await this.connection
  195. .getRepository(SearchIndexItem)
  196. .update({ productAssetId: id }, { productAssetId: null });
  197. await this.connection
  198. .getRepository(SearchIndexItem)
  199. .update({ productVariantAssetId: id }, { productVariantAssetId: null });
  200. return true;
  201. }
  202. private async updateProductInChannel(
  203. ctx: MutableRequestContext,
  204. productId: ID,
  205. channelId: ID,
  206. ): Promise<boolean> {
  207. const product = await this.connection.getRepository(Product).findOne(productId, {
  208. relations: ['variants'],
  209. });
  210. if (product) {
  211. const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(
  212. product.variants.map(v => v.id),
  213. {
  214. relations: variantRelations,
  215. where: { deletedAt: null },
  216. },
  217. );
  218. if (updatedVariants.length === 0) {
  219. await this.saveSyntheticVariant(ctx, product);
  220. } else {
  221. if (product.enabled === false) {
  222. updatedVariants.forEach(v => (v.enabled = false));
  223. }
  224. const variantsInCurrentChannel = updatedVariants.filter(
  225. v => !!v.channels.find(c => idsAreEqual(c.id, ctx.channelId)),
  226. );
  227. Logger.verbose(`Updating ${variantsInCurrentChannel.length} variants`, workerLoggerCtx);
  228. if (variantsInCurrentChannel.length) {
  229. await this.saveVariants(ctx, variantsInCurrentChannel);
  230. }
  231. }
  232. }
  233. return true;
  234. }
  235. private async updateVariantsInChannel(
  236. ctx: MutableRequestContext,
  237. variantIds: ID[],
  238. channelId: ID,
  239. ): Promise<boolean> {
  240. const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  241. relations: variantRelations,
  242. where: { deletedAt: null },
  243. });
  244. if (variants) {
  245. Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
  246. await this.saveVariants(ctx, variants);
  247. }
  248. return true;
  249. }
  250. private async deleteProductInChannel(
  251. ctx: RequestContext,
  252. productId: ID,
  253. channelId: ID,
  254. ): Promise<boolean> {
  255. const product = await this.connection.getRepository(Product).findOne(productId, {
  256. relations: ['variants'],
  257. });
  258. if (product) {
  259. const languageVariants = unique([
  260. ...product.translations.map(t => t.languageCode),
  261. ...product.variants
  262. .reduce((vt, v) => [...vt, ...v.translations], [] as Array<Translation<ProductVariant>>)
  263. .map(t => t.languageCode),
  264. ]);
  265. const removedVariantIds = product.variants.map(v => v.id);
  266. if (removedVariantIds.length) {
  267. await this.removeSearchIndexItems(channelId, removedVariantIds, languageVariants);
  268. }
  269. }
  270. return true;
  271. }
  272. private getSearchIndexQueryBuilder(channelId: ID) {
  273. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  274. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  275. relations: variantRelations,
  276. });
  277. FindOptionsUtils.joinEagerRelations(
  278. qb,
  279. qb.alias,
  280. this.connection.rawConnection.getMetadata(ProductVariant),
  281. );
  282. qb.leftJoin('variants.product', 'product')
  283. .leftJoin('product.channels', 'channel')
  284. .where('channel.id = :channelId', { channelId })
  285. .andWhere('variants__product.deletedAt IS NULL')
  286. .andWhere('variants.deletedAt IS NULL');
  287. return qb;
  288. }
  289. private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
  290. const items: SearchIndexItem[] = [];
  291. await this.removeSyntheticVariants(variants);
  292. for (const variant of variants) {
  293. const languageVariants = unique([
  294. ...variant.translations.map(t => t.languageCode),
  295. ...variant.product.translations.map(t => t.languageCode),
  296. ]);
  297. for (const languageCode of languageVariants) {
  298. const productTranslation = this.getTranslation(variant.product, languageCode);
  299. const variantTranslation = this.getTranslation(variant, languageCode);
  300. const collectionTranslations = variant.collections.map(c =>
  301. this.getTranslation(c, languageCode),
  302. );
  303. for (const channel of variant.channels) {
  304. ctx.setChannel(channel);
  305. await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
  306. const item = new SearchIndexItem({
  307. channelId: channel.id,
  308. languageCode,
  309. productVariantId: variant.id,
  310. price: variant.price,
  311. priceWithTax: variant.priceWithTax,
  312. sku: variant.sku,
  313. enabled: variant.product.enabled === false ? false : variant.enabled,
  314. slug: productTranslation.slug,
  315. productId: variant.product.id,
  316. productName: productTranslation.name,
  317. description: this.constrainDescription(productTranslation.description),
  318. productVariantName: variantTranslation.name,
  319. productAssetId: variant.product.featuredAsset
  320. ? variant.product.featuredAsset.id
  321. : null,
  322. productPreviewFocalPoint: variant.product.featuredAsset
  323. ? variant.product.featuredAsset.focalPoint
  324. : null,
  325. productVariantPreviewFocalPoint: variant.featuredAsset
  326. ? variant.featuredAsset.focalPoint
  327. : null,
  328. productVariantAssetId: variant.featuredAsset ? variant.featuredAsset.id : null,
  329. productPreview: variant.product.featuredAsset
  330. ? variant.product.featuredAsset.preview
  331. : '',
  332. productVariantPreview: variant.featuredAsset ? variant.featuredAsset.preview : '',
  333. channelIds: variant.channels.map(c => c.id as string),
  334. facetIds: this.getFacetIds(variant),
  335. facetValueIds: this.getFacetValueIds(variant),
  336. collectionIds: variant.collections.map(c => c.id.toString()),
  337. collectionSlugs: collectionTranslations.map(c => c.slug),
  338. });
  339. if (this.options.indexStockStatus) {
  340. item.inStock =
  341. 0 < (await this.productVariantService.getSaleableStockLevel(ctx, variant));
  342. const productInStock = await this.requestContextCache.get(
  343. ctx,
  344. `productVariantsStock-${variant.productId}`,
  345. () =>
  346. this.connection
  347. .getRepository(ctx, ProductVariant)
  348. .find({
  349. loadEagerRelations: false,
  350. where: {
  351. productId: variant.productId,
  352. },
  353. })
  354. .then(_variants =>
  355. Promise.all(
  356. _variants.map(v =>
  357. this.productVariantService.getSaleableStockLevel(ctx, v),
  358. ),
  359. ),
  360. )
  361. .then(stockLevels => stockLevels.some(stockLevel => 0 < stockLevel)),
  362. );
  363. item.productInStock = productInStock;
  364. }
  365. items.push(item);
  366. }
  367. }
  368. }
  369. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items, { chunk: 2500 }));
  370. }
  371. /**
  372. * If a Product has no variants, we create a synthetic variant for the purposes
  373. * of making that product visible via the search query.
  374. */
  375. private async saveSyntheticVariant(ctx: RequestContext, product: Product) {
  376. const productTranslation = this.getTranslation(product, ctx.languageCode);
  377. const item = new SearchIndexItem({
  378. channelId: ctx.channelId,
  379. languageCode: ctx.languageCode,
  380. productVariantId: 0,
  381. price: 0,
  382. priceWithTax: 0,
  383. sku: '',
  384. enabled: false,
  385. slug: productTranslation.slug,
  386. productId: product.id,
  387. productName: productTranslation.name,
  388. description: this.constrainDescription(productTranslation.description),
  389. productVariantName: productTranslation.name,
  390. productAssetId: product.featuredAsset?.id ?? null,
  391. productPreviewFocalPoint: product.featuredAsset?.focalPoint ?? null,
  392. productVariantPreviewFocalPoint: null,
  393. productVariantAssetId: null,
  394. productPreview: product.featuredAsset?.preview ?? '',
  395. productVariantPreview: '',
  396. channelIds: [ctx.channelId.toString()],
  397. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  398. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  399. collectionIds: [],
  400. collectionSlugs: [],
  401. });
  402. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(item));
  403. }
  404. /**
  405. * Removes any synthetic variants for the given product
  406. */
  407. private async removeSyntheticVariants(variants: ProductVariant[]) {
  408. const prodIds = unique(variants.map(v => v.productId));
  409. for (const productId of prodIds) {
  410. await this.queue.push(() =>
  411. this.connection.getRepository(SearchIndexItem).delete({
  412. productId,
  413. sku: '',
  414. price: 0,
  415. }),
  416. );
  417. }
  418. }
  419. private getTranslation<T extends Translatable>(
  420. translatable: T,
  421. languageCode: LanguageCode,
  422. ): Translation<T> {
  423. return (translatable.translations.find(t => t.languageCode === languageCode) ||
  424. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  425. translatable.translations[0]) as unknown as Translation<T>;
  426. }
  427. private getFacetIds(variant: ProductVariant): string[] {
  428. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  429. const variantFacetIds = variant.facetValues.map(facetIds);
  430. const productFacetIds = variant.product.facetValues.map(facetIds);
  431. return unique([...variantFacetIds, ...productFacetIds]);
  432. }
  433. private getFacetValueIds(variant: ProductVariant): string[] {
  434. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  435. const variantFacetValueIds = variant.facetValues.map(facetValueIds);
  436. const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
  437. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  438. }
  439. /**
  440. * Remove items from the search index
  441. */
  442. private async removeSearchIndexItems(channelId: ID, variantIds: ID[], languageCodes: LanguageCode[]) {
  443. const keys: Array<Partial<SearchIndexItem>> = [];
  444. for (const productVariantId of variantIds) {
  445. for (const languageCode of languageCodes) {
  446. keys.push({
  447. productVariantId,
  448. channelId,
  449. languageCode,
  450. });
  451. }
  452. }
  453. await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(keys as any));
  454. }
  455. /**
  456. * Prevent postgres errors from too-long indices
  457. * https://github.com/vendure-ecommerce/vendure/issues/745
  458. */
  459. private constrainDescription(description: string): string {
  460. const { type } = this.connection.rawConnection.options;
  461. const isPostgresLike = type === 'postgres' || type === 'aurora-data-api-pg' || type === 'cockroachdb';
  462. if (isPostgresLike) {
  463. return description.substring(0, 2600);
  464. }
  465. return description;
  466. }
  467. }