indexer.controller.ts 39 KB


  1. import { Client } from '@elastic/elasticsearch';
  2. import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  3. import { unique } from '@vendure/common/lib/unique';
  4. import {
  5. Asset,
  6. asyncObservable,
  7. AsyncQueue,
  8. Channel,
  9. Collection,
  10. ConfigService,
  11. EntityRelationPaths,
  12. FacetValue,
  13. ID,
  14. LanguageCode,
  15. Logger,
  16. Product,
  17. ProductPriceApplicator,
  18. ProductVariant,
  19. ProductVariantService,
  20. RequestContext,
  21. RequestContextCacheService,
  22. TransactionalConnection,
  23. Translatable,
  24. Translation,
  25. } from '@vendure/core';
  26. import { Observable } from 'rxjs';
  27. import { ELASTIC_SEARCH_OPTIONS, loggerCtx, VARIANT_INDEX_NAME } from '../constants';
  28. import { ElasticsearchOptions } from '../options';
  29. import {
  30. BulkOperation,
  31. BulkOperationDoc,
  32. BulkResponseBody,
  33. ProductChannelMessageData,
  34. ProductIndexItem,
  35. ReindexMessageData,
  36. UpdateAssetMessageData,
  37. UpdateProductMessageData,
  38. UpdateVariantMessageData,
  39. UpdateVariantsByIdMessageData,
  40. VariantChannelMessageData,
  41. VariantIndexItem,
  42. } from '../types';
  43. import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
  44. import { MutableRequestContext } from './mutable-request-context';
  45. const REINDEX_CHUNK_SIZE = 2500;
  46. const REINDEX_OPERATION_CHUNK_SIZE = 3000;
  47. export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
  48. 'variants',
  49. 'featuredAsset',
  50. 'facetValues',
  51. 'facetValues.facet',
  52. 'channels',
  53. 'channels.defaultTaxZone',
  54. ];
  55. export const defaultVariantRelations: Array<EntityRelationPaths<ProductVariant>> = [
  56. 'featuredAsset',
  57. 'facetValues',
  58. 'facetValues.facet',
  59. 'collections',
  60. 'taxCategory',
  61. 'channels',
  62. 'channels.defaultTaxZone',
  63. ];
  64. export interface ReindexMessageResponse {
  65. total: number;
  66. completed: number;
  67. duration: number;
  68. }
  69. type BulkVariantOperation = {
  70. index: typeof VARIANT_INDEX_NAME;
  71. operation: BulkOperation | BulkOperationDoc<VariantIndexItem>;
  72. };
  73. @Injectable()
  74. export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
  75. private client: Client;
  76. private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
  77. private productRelations: Array<EntityRelationPaths<Product>>;
  78. private variantRelations: Array<EntityRelationPaths<ProductVariant>>;
  79. constructor(
  80. private connection: TransactionalConnection,
  81. @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
  82. private productPriceApplicator: ProductPriceApplicator,
  83. private configService: ConfigService,
  84. private productVariantService: ProductVariantService,
  85. private requestContextCache: RequestContextCacheService,
  86. ) {}
  87. onModuleInit(): any {
  88. this.client = getClient(this.options);
  89. this.productRelations = this.getReindexRelationsRelations(
  90. defaultProductRelations,
  91. this.options.hydrateProductRelations,
  92. );
  93. this.variantRelations = this.getReindexRelationsRelations(
  94. defaultVariantRelations,
  95. this.options.hydrateProductVariantRelations,
  96. );
  97. }
  98. onModuleDestroy(): any {
  99. return this.client.close();
  100. }
  101. /**
  102. * Updates the search index only for the affected product.
  103. */
  104. async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  105. const ctx = MutableRequestContext.deserialize(rawContext);
  106. await this.updateProductsInternal(ctx, [productId]);
  107. return true;
  108. }
  109. /**
  110. * Updates the search index only for the affected product.
  111. */
  112. async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  113. const operations = await this.deleteProductOperations(
  114. RequestContext.deserialize(rawContext),
  115. productId,
  116. );
  117. await this.executeBulkOperations(operations);
  118. return true;
  119. }
  120. /**
  121. * Updates the search index only for the affected product.
  122. */
  123. async assignProductToChannel({
  124. ctx: rawContext,
  125. productId,
  126. channelId,
  127. }: ProductChannelMessageData): Promise<boolean> {
  128. const ctx = MutableRequestContext.deserialize(rawContext);
  129. await this.updateProductsInternal(ctx, [productId]);
  130. return true;
  131. }
  132. /**
  133. * Updates the search index only for the affected product.
  134. */
  135. async removeProductFromChannel({
  136. ctx: rawContext,
  137. productId,
  138. channelId,
  139. }: ProductChannelMessageData): Promise<boolean> {
  140. const ctx = MutableRequestContext.deserialize(rawContext);
  141. await this.updateProductsInternal(ctx, [productId]);
  142. return true;
  143. }
  144. async assignVariantToChannel({
  145. ctx: rawContext,
  146. productVariantId,
  147. channelId,
  148. }: VariantChannelMessageData): Promise<boolean> {
  149. const productIds = await this.getProductIdsByVariantIds([productVariantId]);
  150. const ctx = MutableRequestContext.deserialize(rawContext);
  151. await this.updateProductsInternal(ctx, productIds);
  152. return true;
  153. }
  154. async removeVariantFromChannel({
  155. ctx: rawContext,
  156. productVariantId,
  157. channelId,
  158. }: VariantChannelMessageData): Promise<boolean> {
  159. const productIds = await this.getProductIdsByVariantIds([productVariantId]);
  160. const ctx = MutableRequestContext.deserialize(rawContext);
  161. await this.updateProductsInternal(ctx, productIds);
  162. return true;
  163. }
  164. /**
  165. * Updates the search index only for the affected entities.
  166. */
  167. async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  168. const ctx = MutableRequestContext.deserialize(rawContext);
  169. return this.asyncQueue.push(async () => {
  170. const productIds = await this.getProductIdsByVariantIds(variantIds);
  171. await this.updateProductsInternal(ctx, productIds);
  172. return true;
  173. });
  174. }
  175. async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  176. const ctx = MutableRequestContext.deserialize(rawContext);
  177. const productIds = await this.getProductIdsByVariantIds(variantIds);
  178. for (const productId of productIds) {
  179. await this.updateProductsInternal(ctx, [productId]);
  180. }
  181. return true;
  182. }
  183. updateVariantsById({
  184. ctx: rawContext,
  185. ids,
  186. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  187. const ctx = MutableRequestContext.deserialize(rawContext);
  188. return asyncObservable(async observer => {
  189. return this.asyncQueue.push(async () => {
  190. const timeStart = Date.now();
  191. const productIds = await this.getProductIdsByVariantIds(ids);
  192. if (productIds.length) {
  193. let finishedProductsCount = 0;
  194. for (const productId of productIds) {
  195. await this.updateProductsInternal(ctx, [productId]);
  196. finishedProductsCount++;
  197. observer.next({
  198. total: productIds.length,
  199. completed: Math.min(finishedProductsCount, productIds.length),
  200. duration: +new Date() - timeStart,
  201. });
  202. }
  203. }
  204. Logger.verbose(`Completed updating variants`, loggerCtx);
  205. return {
  206. total: productIds.length,
  207. completed: productIds.length,
  208. duration: +new Date() - timeStart,
  209. };
  210. });
  211. });
  212. }
  213. reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
  214. return asyncObservable(async observer => {
  215. return this.asyncQueue.push(async () => {
  216. const timeStart = Date.now();
  217. const ctx = MutableRequestContext.deserialize(rawContext);
  218. const reindexTempName = new Date().getTime();
  219. const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
  220. const variantIndexNameForReindex = VARIANT_INDEX_NAME + `-reindex-${reindexTempName}`;
  221. const reindexVariantAliasName = this.options.indexPrefix + variantIndexNameForReindex;
  222. try {
  223. await createIndices(
  224. this.client,
  225. this.options.indexPrefix,
  226. this.options.indexSettings,
  227. this.options.indexMappingProperties,
  228. true,
  229. `-reindex-${reindexTempName}`,
  230. );
  231. } catch (e) {
  232. Logger.error(`Could not recreate indices.`, loggerCtx);
  233. Logger.error(JSON.stringify(e), loggerCtx);
  234. throw e;
  235. }
  236. const totalProductIds = await this.connection
  237. .getRepository(Product)
  238. .createQueryBuilder('product')
  239. .where('product.deletedAt IS NULL')
  240. .getCount();
  241. Logger.verbose(`Will reindex ${totalProductIds} products`, loggerCtx);
  242. let productIds = [];
  243. let skip = 0;
  244. let finishedProductsCount = 0;
  245. do {
  246. const operations: BulkVariantOperation[] = [];
  247. productIds = await this.connection
  248. .getRepository(Product)
  249. .createQueryBuilder('product')
  250. .select('product.id')
  251. .where('product.deletedAt IS NULL')
  252. .skip(skip)
  253. .take(REINDEX_CHUNK_SIZE)
  254. .getMany();
  255. for (const { id: productId } of productIds) {
  256. operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
  257. finishedProductsCount++;
  258. observer.next({
  259. total: totalProductIds,
  260. completed: Math.min(finishedProductsCount, totalProductIds),
  261. duration: +new Date() - timeStart,
  262. });
  263. }
  264. Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
  265. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  266. await this.executeBulkOperationsByChunks(
  267. REINDEX_OPERATION_CHUNK_SIZE,
  268. operations,
  269. variantIndexNameForReindex,
  270. );
  271. skip += REINDEX_CHUNK_SIZE;
  272. Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
  273. } while (productIds.length >= REINDEX_CHUNK_SIZE);
  274. // Switch the index to the new reindexed one
  275. try {
  276. const reindexVariantAliasExist = await this.client.indices.existsAlias({
  277. name: reindexVariantAliasName,
  278. });
  279. if (reindexVariantAliasExist) {
  280. const reindexVariantIndexName = await getIndexNameByAlias(
  281. this.client,
  282. reindexVariantAliasName,
  283. );
  284. const originalVariantAliasExist = await this.client.indices.existsAlias({
  285. name: variantIndexName,
  286. });
  287. const originalVariantIndexExist = await this.client.indices.exists({
  288. index: variantIndexName,
  289. });
  290. const originalVariantIndexName = await getIndexNameByAlias(
  291. this.client,
  292. variantIndexName,
  293. );
  294. const actions = [
  295. {
  296. remove: {
  297. index: reindexVariantIndexName,
  298. alias: reindexVariantAliasName,
  299. },
  300. },
  301. {
  302. add: {
  303. index: reindexVariantIndexName,
  304. alias: variantIndexName,
  305. },
  306. },
  307. ];
  308. if (originalVariantAliasExist.body) {
  309. actions.push({
  310. remove: {
  311. index: originalVariantIndexName,
  312. alias: variantIndexName,
  313. },
  314. });
  315. } else if (originalVariantIndexExist.body) {
  316. await this.client.indices.delete({
  317. index: [variantIndexName],
  318. });
  319. }
  320. await this.client.indices.updateAliases({
  321. body: {
  322. actions,
  323. },
  324. });
  325. if (originalVariantAliasExist.body) {
  326. await this.client.indices.delete({
  327. index: [originalVariantIndexName],
  328. });
  329. }
  330. }
  331. } catch (e) {
  332. Logger.error('Could not switch indexes');
  333. } finally {
  334. const reindexVariantAliasExist = await this.client.indices.existsAlias({
  335. name: reindexVariantAliasName,
  336. });
  337. if (reindexVariantAliasExist.body) {
  338. const reindexVariantAliasResult = await this.client.indices.getAlias({
  339. name: reindexVariantAliasName,
  340. });
  341. const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
  342. await this.client.indices.delete({
  343. index: [reindexVariantIndexName],
  344. });
  345. }
  346. }
  347. Logger.verbose(`Completed reindexing!`, loggerCtx);
  348. return {
  349. total: totalProductIds,
  350. completed: totalProductIds,
  351. duration: +new Date() - timeStart,
  352. };
  353. });
  354. });
  355. }
  356. async executeBulkOperationsByChunks(
  357. chunkSize: number,
  358. operations: BulkVariantOperation[],
  359. index = VARIANT_INDEX_NAME,
  360. ): Promise<void> {
  361. let i;
  362. let j;
  363. let processedOperation = 0;
  364. for (i = 0, j = operations.length; i < j; i += chunkSize) {
  365. const operationsChunks = operations.slice(i, i + chunkSize);
  366. await this.executeBulkOperations(operationsChunks, index);
  367. processedOperation += operationsChunks.length;
  368. Logger.verbose(
  369. `Executing operation chunks ${processedOperation}/${operations.length}`,
  370. loggerCtx,
  371. );
  372. }
  373. }
  374. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  375. const result = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
  376. await this.client.indices.refresh({
  377. index: [this.options.indexPrefix + VARIANT_INDEX_NAME],
  378. });
  379. return result;
  380. }
  381. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  382. const result = await this.deleteAssetForIndex(VARIANT_INDEX_NAME, data.asset);
  383. await this.client.indices.refresh({
  384. index: [this.options.indexPrefix + VARIANT_INDEX_NAME],
  385. });
  386. return result;
  387. }
  388. private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
  389. const focalPoint = asset.focalPoint || null;
  390. const params = { focalPoint };
  391. return this.updateAssetForIndex(
  392. indexName,
  393. asset,
  394. {
  395. source: 'ctx._source.productPreviewFocalPoint = params.focalPoint',
  396. params,
  397. },
  398. {
  399. source: 'ctx._source.productVariantPreviewFocalPoint = params.focalPoint',
  400. params,
  401. },
  402. );
  403. }
  404. private async deleteAssetForIndex(indexName: string, asset: Asset): Promise<boolean> {
  405. return this.updateAssetForIndex(
  406. indexName,
  407. asset,
  408. { source: 'ctx._source.productAssetId = null' },
  409. { source: 'ctx._source.productVariantAssetId = null' },
  410. );
  411. }
  412. private async updateAssetForIndex(
  413. indexName: string,
  414. asset: Asset,
  415. updateProductScript: { source: string; params?: any },
  416. updateVariantScript: { source: string; params?: any },
  417. ): Promise<boolean> {
  418. const result1 = await this.client.update_by_query({
  419. index: this.options.indexPrefix + indexName,
  420. body: {
  421. script: updateProductScript,
  422. query: {
  423. term: {
  424. productAssetId: asset.id,
  425. },
  426. },
  427. },
  428. });
  429. for (const failure of result1.body.failures) {
  430. Logger.error(`${failure.cause.type}: ${failure.cause.reason}`, loggerCtx);
  431. }
  432. const result2 = await this.client.update_by_query({
  433. index: this.options.indexPrefix + indexName,
  434. body: {
  435. script: updateVariantScript,
  436. query: {
  437. term: {
  438. productVariantAssetId: asset.id,
  439. },
  440. },
  441. },
  442. });
  443. for (const failure of result1.body.failures) {
  444. Logger.error(`${failure.cause.type}: ${failure.cause.reason}`, loggerCtx);
  445. }
  446. return result1.body.failures.length === 0 && result2.body.failures === 0;
  447. }
  448. private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
  449. const operations = await this.updateProductsOperations(ctx, productIds);
  450. await this.executeBulkOperations(operations);
  451. }
  452. private async updateProductsOperationsOnly(
  453. ctx: MutableRequestContext,
  454. productId: ID,
  455. ): Promise<BulkVariantOperation[]> {
  456. const operations: BulkVariantOperation[] = [];
  457. let product: Product | undefined;
  458. try {
  459. product = await this.connection.getRepository(Product).findOne(productId, {
  460. relations: this.productRelations,
  461. where: {
  462. deletedAt: null,
  463. },
  464. });
  465. } catch (e) {
  466. Logger.error(e.message, loggerCtx, e.stack);
  467. throw e;
  468. }
  469. if (product) {
  470. const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
  471. product.variants.map(v => v.id),
  472. {
  473. relations: this.variantRelations,
  474. where: {
  475. deletedAt: null,
  476. },
  477. order: {
  478. id: 'ASC',
  479. },
  480. },
  481. );
  482. // tslint:disable-next-line:no-non-null-assertion
  483. updatedProductVariants.forEach(variant => (variant.product = product!));
  484. if (!product.enabled) {
  485. updatedProductVariants.forEach(v => (v.enabled = false));
  486. }
  487. Logger.debug(`Updating Product (${productId})`, loggerCtx);
  488. const languageVariants: LanguageCode[] = [];
  489. languageVariants.push(...product.translations.map(t => t.languageCode));
  490. for (const variant of product.variants) {
  491. languageVariants.push(...variant.translations.map(t => t.languageCode));
  492. }
  493. const uniqueLanguageVariants = unique(languageVariants);
  494. for (const channel of product.channels) {
  495. ctx.setChannel(channel);
  496. const variantsInChannel = updatedProductVariants.filter(v =>
  497. v.channels.map(c => c.id).includes(ctx.channelId),
  498. );
  499. for (const variant of variantsInChannel) {
  500. await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
  501. }
  502. for (const languageCode of uniqueLanguageVariants) {
  503. if (variantsInChannel.length) {
  504. for (const variant of variantsInChannel) {
  505. operations.push(
  506. {
  507. index: VARIANT_INDEX_NAME,
  508. operation: {
  509. update: {
  510. _id: ElasticsearchIndexerController.getId(
  511. variant.id,
  512. ctx.channelId,
  513. languageCode,
  514. ),
  515. },
  516. },
  517. },
  518. {
  519. index: VARIANT_INDEX_NAME,
  520. operation: {
  521. doc: await this.createVariantIndexItem(
  522. variant,
  523. variantsInChannel,
  524. ctx,
  525. languageCode,
  526. ),
  527. doc_as_upsert: true,
  528. },
  529. },
  530. );
  531. }
  532. } else {
  533. operations.push(
  534. {
  535. index: VARIANT_INDEX_NAME,
  536. operation: {
  537. update: {
  538. _id: ElasticsearchIndexerController.getId(
  539. -product.id,
  540. ctx.channelId,
  541. languageCode,
  542. ),
  543. },
  544. },
  545. },
  546. {
  547. index: VARIANT_INDEX_NAME,
  548. operation: {
  549. doc: this.createSyntheticProductIndexItem(product, ctx, languageCode),
  550. doc_as_upsert: true,
  551. },
  552. },
  553. );
  554. }
  555. }
  556. }
  557. }
  558. return operations;
  559. }
  560. private async updateProductsOperations(
  561. ctx: MutableRequestContext,
  562. productIds: ID[],
  563. ): Promise<BulkVariantOperation[]> {
  564. Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
  565. const operations: BulkVariantOperation[] = [];
  566. for (const productId of productIds) {
  567. operations.push(...(await this.deleteProductOperations(ctx, productId)));
  568. operations.push(...(await this.updateProductsOperationsOnly(ctx, productId)));
  569. }
  570. return operations;
  571. }
  572. /**
  573. * Takes the default relations, and combines them with any extra relations specified in the
  574. * `hydrateProductRelations` and `hydrateProductVariantRelations`. This method also ensures
  575. * that the relation values are unique and that paths are fully expanded.
  576. *
  577. * This means that if a `hydrateProductRelations` value of `['assets.asset']` is specified,
  578. * this method will also add `['assets']` to the relations array, otherwise TypeORM would
  579. * throw an error trying to join a 2nd-level deep relation without the first level also
  580. * being joined.
  581. */
  582. private getReindexRelationsRelations<T extends Product | ProductVariant>(
  583. defaultRelations: Array<EntityRelationPaths<T>>,
  584. hydratedRelations: Array<EntityRelationPaths<T>>,
  585. ): Array<EntityRelationPaths<T>> {
  586. const uniqueRelations = unique([...defaultRelations, ...hydratedRelations]);
  587. for (const relation of hydratedRelations) {
  588. const path = relation.split('.');
  589. const pathToPart: string[] = [];
  590. for (const part of path) {
  591. pathToPart.push(part);
  592. const joinedPath = pathToPart.join('.') as EntityRelationPaths<T>;
  593. if (!uniqueRelations.includes(joinedPath)) {
  594. uniqueRelations.push(joinedPath);
  595. }
  596. }
  597. }
  598. return uniqueRelations;
  599. }
  600. private async deleteProductOperations(
  601. ctx: RequestContext,
  602. productId: ID,
  603. ): Promise<BulkVariantOperation[]> {
  604. const channels = await this.requestContextCache.get(ctx, `elastic-index-all-channels`, () =>
  605. this.connection
  606. .getRepository(Channel)
  607. .createQueryBuilder('channel')
  608. .select('channel.id')
  609. .getMany(),
  610. );
  611. const product = await this.connection.getRepository(Product).findOne(productId, {
  612. relations: ['variants'],
  613. });
  614. if (!product) {
  615. return [];
  616. }
  617. Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
  618. const operations: BulkVariantOperation[] = [];
  619. const languageVariants: LanguageCode[] = [];
  620. languageVariants.push(...product.translations.map(t => t.languageCode));
  621. for (const variant of product.variants) {
  622. languageVariants.push(...variant.translations.map(t => t.languageCode));
  623. }
  624. const uniqueLanguageVariants = unique(languageVariants);
  625. for (const { id: channelId } of channels) {
  626. for (const languageCode of uniqueLanguageVariants) {
  627. operations.push({
  628. index: VARIANT_INDEX_NAME,
  629. operation: {
  630. delete: {
  631. _id: ElasticsearchIndexerController.getId(-product.id, channelId, languageCode),
  632. },
  633. },
  634. });
  635. }
  636. }
  637. operations.push(
  638. ...(await this.deleteVariantsInternalOperations(
  639. product.variants,
  640. channels.map(c => c.id),
  641. uniqueLanguageVariants,
  642. )),
  643. );
  644. return operations;
  645. }
  646. private async deleteVariantsInternalOperations(
  647. variants: ProductVariant[],
  648. channelIds: ID[],
  649. languageVariants: LanguageCode[],
  650. ): Promise<BulkVariantOperation[]> {
  651. Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
  652. const operations: BulkVariantOperation[] = [];
  653. for (const variant of variants) {
  654. for (const channelId of channelIds) {
  655. for (const languageCode of languageVariants) {
  656. operations.push({
  657. index: VARIANT_INDEX_NAME,
  658. operation: {
  659. delete: {
  660. _id: ElasticsearchIndexerController.getId(
  661. variant.id,
  662. channelId,
  663. languageCode,
  664. ),
  665. },
  666. },
  667. });
  668. }
  669. }
  670. }
  671. return operations;
  672. }
  673. private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
  674. const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  675. relations: ['product'],
  676. loadEagerRelations: false,
  677. });
  678. return unique(variants.map(v => v.product.id));
  679. }
  680. private async executeBulkOperations(operations: BulkVariantOperation[], indexName = VARIANT_INDEX_NAME) {
  681. const variantOperations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
  682. for (const operation of operations) {
  683. variantOperations.push(operation.operation);
  684. }
  685. return Promise.all([this.runBulkOperationsOnIndex(indexName, variantOperations)]);
  686. }
  687. private async runBulkOperationsOnIndex(
  688. indexName: string,
  689. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
  690. ) {
  691. if (operations.length === 0) {
  692. return;
  693. }
  694. try {
  695. const fullIndexName = this.options.indexPrefix + indexName;
  696. const { body }: { body: BulkResponseBody } = await this.client.bulk({
  697. refresh: true,
  698. index: fullIndexName,
  699. body: operations,
  700. });
  701. if (body.errors) {
  702. Logger.error(
  703. `Some errors occurred running bulk operations on ${fullIndexName}! Set logger to "debug" to print all errors.`,
  704. loggerCtx,
  705. );
  706. body.items.forEach(item => {
  707. if (item.index) {
  708. Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
  709. }
  710. if (item.update) {
  711. Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
  712. }
  713. if (item.delete) {
  714. Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
  715. }
  716. });
  717. } else {
  718. Logger.debug(
  719. `Executed ${body.items.length} bulk operations on index [${fullIndexName}]`,
  720. loggerCtx,
  721. );
  722. }
  723. return body;
  724. } catch (e) {
  725. Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
  726. Logger.error('Error details: ' + JSON.stringify(e.body?.error, null, 2), loggerCtx);
  727. }
  728. }
  729. private async createVariantIndexItem(
  730. v: ProductVariant,
  731. variants: ProductVariant[],
  732. ctx: RequestContext,
  733. languageCode: LanguageCode,
  734. ): Promise<VariantIndexItem> {
  735. try {
  736. const productAsset = v.product.featuredAsset;
  737. const variantAsset = v.featuredAsset;
  738. const productTranslation = this.getTranslation(v.product, languageCode);
  739. const variantTranslation = this.getTranslation(v, languageCode);
  740. const collectionTranslations = v.collections.map(c => this.getTranslation(c, languageCode));
  741. const productCollectionTranslations = variants.reduce(
  742. (translations, variant) => [
  743. ...translations,
  744. ...variant.collections.map(c => this.getTranslation(c, languageCode)),
  745. ],
  746. [] as Array<Translation<Collection>>,
  747. );
  748. const prices = variants.map(variant => variant.price);
  749. const pricesWithTax = variants.map(variant => variant.priceWithTax);
  750. const item: VariantIndexItem = {
  751. channelId: ctx.channelId,
  752. languageCode,
  753. productVariantId: v.id,
  754. sku: v.sku,
  755. slug: productTranslation.slug,
  756. productId: v.product.id,
  757. productName: productTranslation.name,
  758. productAssetId: productAsset ? productAsset.id : undefined,
  759. productPreview: productAsset ? productAsset.preview : '',
  760. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  761. productVariantName: variantTranslation.name,
  762. productVariantAssetId: variantAsset ? variantAsset.id : undefined,
  763. productVariantPreview: variantAsset ? variantAsset.preview : '',
  764. productVariantPreviewFocalPoint: variantAsset
  765. ? variantAsset.focalPoint || undefined
  766. : undefined,
  767. price: v.price,
  768. priceWithTax: v.priceWithTax,
  769. currencyCode: v.currencyCode,
  770. description: productTranslation.description,
  771. facetIds: this.getFacetIds([v]),
  772. channelIds: v.channels.map(c => c.id),
  773. facetValueIds: this.getFacetValueIds([v]),
  774. collectionIds: v.collections.map(c => c.id.toString()),
  775. collectionSlugs: collectionTranslations.map(c => c.slug),
  776. enabled: v.enabled && v.product.enabled,
  777. productEnabled: variants.some(variant => variant.enabled) && v.product.enabled,
  778. productPriceMin: Math.min(...prices),
  779. productPriceMax: Math.max(...prices),
  780. productPriceWithTaxMin: Math.min(...pricesWithTax),
  781. productPriceWithTaxMax: Math.max(...pricesWithTax),
  782. productFacetIds: this.getFacetIds(variants),
  783. productFacetValueIds: this.getFacetValueIds(variants),
  784. productCollectionIds: unique(
  785. variants.reduce(
  786. (ids, variant) => [...ids, ...variant.collections.map(c => c.id)],
  787. [] as ID[],
  788. ),
  789. ),
  790. productCollectionSlugs: unique(productCollectionTranslations.map(c => c.slug)),
  791. productChannelIds: v.product.channels.map(c => c.id),
  792. inStock: 0 < (await this.productVariantService.getSaleableStockLevel(ctx, v)),
  793. productInStock: await this.getProductInStockValue(ctx, variants),
  794. };
  795. const variantCustomMappings = Object.entries(this.options.customProductVariantMappings);
  796. for (const [name, def] of variantCustomMappings) {
  797. item[`variant-${name}`] = def.valueFn(v, languageCode);
  798. }
  799. const productCustomMappings = Object.entries(this.options.customProductMappings);
  800. for (const [name, def] of productCustomMappings) {
  801. item[`product-${name}`] = def.valueFn(v.product, variants, languageCode);
  802. }
  803. return item;
  804. } catch (err) {
  805. Logger.error(err.toString());
  806. throw Error(`Error while reindexing!`);
  807. }
  808. }
  809. private async getProductInStockValue(ctx: RequestContext, variants: ProductVariant[]): Promise<boolean> {
  810. return this.requestContextCache.get(
  811. ctx,
  812. `elastic-index-product-in-stock-${variants.map(v => v.id).join(',')}`,
  813. async () => {
  814. const stockLevels = await Promise.all(
  815. variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)),
  816. );
  817. return stockLevels.some(stockLevel => 0 < stockLevel);
  818. },
  819. );
  820. }
  821. /**
  822. * If a Product has no variants, we create a synthetic variant for the purposes
  823. * of making that product visible via the search query.
  824. */
  825. private createSyntheticProductIndexItem(
  826. product: Product,
  827. ctx: RequestContext,
  828. languageCode: LanguageCode,
  829. ): VariantIndexItem {
  830. const productTranslation = this.getTranslation(product, languageCode);
  831. const productAsset = product.featuredAsset;
  832. const item: VariantIndexItem = {
  833. channelId: ctx.channelId,
  834. languageCode,
  835. productVariantId: 0,
  836. sku: '',
  837. slug: productTranslation.slug,
  838. productId: product.id,
  839. productName: productTranslation.name,
  840. productAssetId: productAsset ? productAsset.id : undefined,
  841. productPreview: productAsset ? productAsset.preview : '',
  842. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  843. productVariantName: productTranslation.name,
  844. productVariantAssetId: undefined,
  845. productVariantPreview: '',
  846. productVariantPreviewFocalPoint: undefined,
  847. price: 0,
  848. priceWithTax: 0,
  849. currencyCode: ctx.channel.currencyCode,
  850. description: productTranslation.description,
  851. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  852. channelIds: [ctx.channelId],
  853. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  854. collectionIds: [],
  855. collectionSlugs: [],
  856. enabled: false,
  857. productEnabled: false,
  858. productPriceMin: 0,
  859. productPriceMax: 0,
  860. productPriceWithTaxMin: 0,
  861. productPriceWithTaxMax: 0,
  862. productFacetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  863. productFacetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  864. productCollectionIds: [],
  865. productCollectionSlugs: [],
  866. productChannelIds: product.channels.map(c => c.id),
  867. inStock: false,
  868. productInStock: false,
  869. };
  870. const productCustomMappings = Object.entries(this.options.customProductMappings);
  871. for (const [name, def] of productCustomMappings) {
  872. item[`product-${name}`] = def.valueFn(product, [], languageCode);
  873. }
  874. return item;
  875. }
  876. private getTranslation<T extends Translatable>(
  877. translatable: T,
  878. languageCode: LanguageCode,
  879. ): Translation<T> {
  880. return (translatable.translations.find(t => t.languageCode === languageCode) ||
  881. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  882. translatable.translations[0]) as unknown as Translation<T>;
  883. }
  884. private getFacetIds(variants: ProductVariant[]): string[] {
  885. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  886. const variantFacetIds = variants.reduce(
  887. (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
  888. [] as string[],
  889. );
  890. const productFacetIds = variants[0].product.facetValues.map(facetIds);
  891. return unique([...variantFacetIds, ...productFacetIds]);
  892. }
  893. private getFacetValueIds(variants: ProductVariant[]): string[] {
  894. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  895. const variantFacetValueIds = variants.reduce(
  896. (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
  897. [] as string[],
  898. );
  899. const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
  900. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  901. }
  902. private static getId(entityId: ID, channelId: ID, languageCode: LanguageCode): string {
  903. return `${channelId.toString()}_${entityId.toString()}_${languageCode}`;
  904. }
  905. }