indexer.controller.ts 43 KB


  1. import { Client } from '@elastic/elasticsearch';
  2. import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  3. import { ModuleRef } from '@nestjs/core';
  4. import { unique } from '@vendure/common/lib/unique';
  5. import {
  6. Asset,
  7. asyncObservable,
  8. AsyncQueue,
  9. Channel,
  10. Collection,
  11. ConfigService,
  12. EntityRelationPaths,
  13. FacetValue,
  14. ID,
  15. Injector,
  16. InternalServerError,
  17. LanguageCode,
  18. Logger,
  19. MutableRequestContext,
  20. Product,
  21. ProductPriceApplicator,
  22. ProductVariant,
  23. ProductVariantService,
  24. RequestContext,
  25. RequestContextCacheService,
  26. TransactionalConnection,
  27. Translatable,
  28. Translation,
  29. } from '@vendure/core';
  30. import { Observable } from 'rxjs';
  31. import { In, IsNull } from 'typeorm';
  32. import { ELASTIC_SEARCH_OPTIONS, loggerCtx, VARIANT_INDEX_NAME } from '../constants';
  33. import { ElasticsearchOptions } from '../options';
  34. import {
  35. BulkOperation,
  36. BulkOperationDoc,
  37. ProductChannelMessageData,
  38. ProductIndexItem,
  39. ReindexMessageData,
  40. UpdateAssetMessageData,
  41. UpdateProductMessageData,
  42. UpdateVariantMessageData,
  43. UpdateVariantsByIdMessageData,
  44. VariantChannelMessageData,
  45. VariantIndexItem,
  46. } from '../types';
  47. import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
  48. export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
  49. 'featuredAsset',
  50. 'facetValues',
  51. 'facetValues.facet',
  52. 'channels',
  53. 'channels.defaultTaxZone',
  54. ];
  55. export const defaultVariantRelations: Array<EntityRelationPaths<ProductVariant>> = [
  56. 'featuredAsset',
  57. 'facetValues',
  58. 'facetValues.facet',
  59. 'collections',
  60. 'taxCategory',
  61. 'channels',
  62. 'channels.defaultTaxZone',
  63. ];
  64. export interface ReindexMessageResponse {
  65. total: number;
  66. completed: number;
  67. duration: number;
  68. }
  69. type BulkVariantOperation = {
  70. index: typeof VARIANT_INDEX_NAME;
  71. operation: BulkOperation | BulkOperationDoc<VariantIndexItem>;
  72. };
  73. @Injectable()
  74. export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
  75. private client: Client;
  76. private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
  77. private productRelations: Array<EntityRelationPaths<Product>>;
  78. private variantRelations: Array<EntityRelationPaths<ProductVariant>>;
  79. private injector: Injector;
  80. constructor(
  81. private connection: TransactionalConnection,
  82. @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
  83. private productPriceApplicator: ProductPriceApplicator,
  84. private configService: ConfigService,
  85. private productVariantService: ProductVariantService,
  86. private requestContextCache: RequestContextCacheService,
  87. private moduleRef: ModuleRef,
  88. ) {}
  89. onModuleInit(): any {
  90. this.client = getClient(this.options);
  91. this.productRelations = this.getReindexRelations(
  92. defaultProductRelations,
  93. this.options.hydrateProductRelations,
  94. );
  95. this.variantRelations = this.getReindexRelations(
  96. defaultVariantRelations,
  97. this.options.hydrateProductVariantRelations,
  98. );
  99. this.injector = new Injector(this.moduleRef);
  100. }
  101. onModuleDestroy(): any {
  102. return this.client.close();
  103. }
  104. /**
  105. * Updates the search index only for the affected product.
  106. */
  107. async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  108. const ctx = MutableRequestContext.deserialize(rawContext);
  109. await this.updateProductsInternal(ctx, [productId]);
  110. return true;
  111. }
  112. /**
  113. * Updates the search index only for the affected product.
  114. */
  115. async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  116. await this.deleteProductOperations(RequestContext.deserialize(rawContext), productId);
  117. return true;
  118. }
  119. /**
  120. * Updates the search index only for the affected product.
  121. */
  122. async assignProductToChannel({
  123. ctx: rawContext,
  124. productId,
  125. channelId,
  126. }: ProductChannelMessageData): Promise<boolean> {
  127. const ctx = MutableRequestContext.deserialize(rawContext);
  128. await this.updateProductsInternal(ctx, [productId]);
  129. return true;
  130. }
  131. /**
  132. * Updates the search index only for the affected product.
  133. */
  134. async removeProductFromChannel({
  135. ctx: rawContext,
  136. productId,
  137. channelId,
  138. }: ProductChannelMessageData): Promise<boolean> {
  139. const ctx = MutableRequestContext.deserialize(rawContext);
  140. await this.updateProductsInternal(ctx, [productId]);
  141. return true;
  142. }
  143. async assignVariantToChannel({
  144. ctx: rawContext,
  145. productVariantId,
  146. channelId,
  147. }: VariantChannelMessageData): Promise<boolean> {
  148. const productIds = await this.getProductIdsByVariantIds([productVariantId]);
  149. const ctx = MutableRequestContext.deserialize(rawContext);
  150. await this.updateProductsInternal(ctx, productIds);
  151. return true;
  152. }
  153. async removeVariantFromChannel({
  154. ctx: rawContext,
  155. productVariantId,
  156. channelId,
  157. }: VariantChannelMessageData): Promise<boolean> {
  158. const productIds = await this.getProductIdsByVariantIds([productVariantId]);
  159. const ctx = MutableRequestContext.deserialize(rawContext);
  160. await this.updateProductsInternal(ctx, productIds);
  161. return true;
  162. }
  163. /**
  164. * Updates the search index only for the affected entities.
  165. */
  166. async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  167. const ctx = MutableRequestContext.deserialize(rawContext);
  168. return this.asyncQueue.push(async () => {
  169. const productIds = await this.getProductIdsByVariantIds(variantIds);
  170. await this.updateProductsInternal(ctx, productIds);
  171. return true;
  172. });
  173. }
  174. async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  175. const ctx = MutableRequestContext.deserialize(rawContext);
  176. const productIds = await this.getProductIdsByVariantIds(variantIds);
  177. for (const productId of productIds) {
  178. await this.updateProductsInternal(ctx, [productId]);
  179. }
  180. return true;
  181. }
  182. updateVariantsById({
  183. ctx: rawContext,
  184. ids,
  185. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  186. const ctx = MutableRequestContext.deserialize(rawContext);
  187. return asyncObservable(async observer => {
  188. return this.asyncQueue.push(async () => {
  189. const timeStart = Date.now();
  190. const productIds = await this.getProductIdsByVariantIds(ids);
  191. if (productIds.length) {
  192. let finishedProductsCount = 0;
  193. for (const productId of productIds) {
  194. await this.updateProductsInternal(ctx, [productId]);
  195. finishedProductsCount++;
  196. observer.next({
  197. total: productIds.length,
  198. completed: Math.min(finishedProductsCount, productIds.length),
  199. duration: +new Date() - timeStart,
  200. });
  201. }
  202. }
  203. Logger.verbose('Completed updating variants', loggerCtx);
  204. return {
  205. total: productIds.length,
  206. completed: productIds.length,
  207. duration: +new Date() - timeStart,
  208. };
  209. });
  210. });
  211. }
  212. reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
  213. return asyncObservable(async observer => {
  214. return this.asyncQueue.push(async () => {
  215. const timeStart = Date.now();
  216. const ctx = MutableRequestContext.deserialize(rawContext);
  217. const reindexTempName = new Date().getTime();
  218. const variantIndexName = `${this.options.indexPrefix}${VARIANT_INDEX_NAME}`;
  219. const variantIndexNameForReindex = `${VARIANT_INDEX_NAME}-reindex-${reindexTempName}`;
  220. const reindexVariantAliasName = `${this.options.indexPrefix}${variantIndexNameForReindex}`;
  221. try {
  222. await createIndices(
  223. this.client,
  224. this.options.indexPrefix,
  225. this.options.indexSettings,
  226. this.options.indexMappingProperties,
  227. true,
  228. `-reindex-${reindexTempName}`,
  229. );
  230. } catch (e: any) {
  231. Logger.error('Could not recreate indices.', loggerCtx);
  232. Logger.error(JSON.stringify(e), loggerCtx);
  233. throw e;
  234. }
  235. const totalProductIds = await this.connection.rawConnection
  236. .getRepository(Product)
  237. .createQueryBuilder('product')
  238. .where('product.deletedAt IS NULL')
  239. .getCount();
  240. Logger.verbose(`Will reindex ${totalProductIds} products`, loggerCtx);
  241. let productIds = [];
  242. let skip = 0;
  243. let finishedProductsCount = 0;
  244. do {
  245. productIds = await this.connection.rawConnection
  246. .getRepository(Product)
  247. .createQueryBuilder('product')
  248. .select('product.id')
  249. .where('product.deletedAt IS NULL')
  250. .skip(skip)
  251. .take(this.options.reindexProductsChunkSize)
  252. .getMany();
  253. for (const { id: productId } of productIds) {
  254. await this.updateProductsOperationsOnly(ctx, productId, variantIndexNameForReindex);
  255. finishedProductsCount++;
  256. observer.next({
  257. total: totalProductIds,
  258. completed: Math.min(finishedProductsCount, totalProductIds),
  259. duration: +new Date() - timeStart,
  260. });
  261. }
  262. skip += this.options.reindexProductsChunkSize;
  263. Logger.verbose(`Done ${finishedProductsCount} / ${totalProductIds} products`);
  264. } while (productIds.length >= this.options.reindexProductsChunkSize);
  265. // Switch the index to the new reindexed one
  266. await this.switchAlias(reindexVariantAliasName, variantIndexName);
  267. Logger.verbose('Completed reindexing!', loggerCtx);
  268. return {
  269. total: totalProductIds,
  270. completed: totalProductIds,
  271. duration: +new Date() - timeStart,
  272. };
  273. });
  274. });
  275. }
  276. async executeBulkOperationsByChunks(
  277. chunkSize: number,
  278. operations: BulkVariantOperation[],
  279. index = VARIANT_INDEX_NAME,
  280. ): Promise<void> {
  281. Logger.verbose(
  282. `Will execute ${operations.length} bulk update operations with index ${index}`,
  283. loggerCtx,
  284. );
  285. let i;
  286. let j;
  287. let processedOperation = 0;
  288. for (i = 0, j = operations.length; i < j; i += chunkSize) {
  289. const operationsChunks = operations.slice(i, i + chunkSize);
  290. await this.executeBulkOperations(operationsChunks, index);
  291. processedOperation += operationsChunks.length;
  292. Logger.verbose(
  293. `Executing operation chunks ${processedOperation}/${operations.length}`,
  294. loggerCtx,
  295. );
  296. }
  297. }
  298. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  299. const result = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
  300. await this.client.indices.refresh({
  301. index: [this.options.indexPrefix + VARIANT_INDEX_NAME],
  302. });
  303. return result;
  304. }
  305. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  306. const result = await this.deleteAssetForIndex(VARIANT_INDEX_NAME, data.asset);
  307. await this.client.indices.refresh({
  308. index: [this.options.indexPrefix + VARIANT_INDEX_NAME],
  309. });
  310. return result;
  311. }
  312. private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
  313. const focalPoint = asset.focalPoint || null;
  314. const params = { focalPoint };
  315. return this.updateAssetForIndex(
  316. indexName,
  317. asset,
  318. {
  319. source: 'ctx._source.productPreviewFocalPoint = params.focalPoint',
  320. params,
  321. },
  322. {
  323. source: 'ctx._source.productVariantPreviewFocalPoint = params.focalPoint',
  324. params,
  325. },
  326. );
  327. }
  328. private async deleteAssetForIndex(indexName: string, asset: Asset): Promise<boolean> {
  329. return this.updateAssetForIndex(
  330. indexName,
  331. asset,
  332. { source: 'ctx._source.productAssetId = null' },
  333. { source: 'ctx._source.productVariantAssetId = null' },
  334. );
  335. }
  336. private async updateAssetForIndex(
  337. indexName: string,
  338. asset: Asset,
  339. updateProductScript: { source: string; params?: any },
  340. updateVariantScript: { source: string; params?: any },
  341. ): Promise<boolean> {
  342. const result1 = await this.client.updateByQuery(
  343. {
  344. index: this.options.indexPrefix + indexName,
  345. script: updateProductScript,
  346. query: {
  347. term: {
  348. productAssetId: asset.id,
  349. },
  350. },
  351. },
  352. { meta: true },
  353. );
  354. if (result1.body.failures) {
  355. for (const failure of result1.body.failures) {
  356. Logger.error(`${failure.cause.type}: ${failure.cause.reason as string}`, loggerCtx);
  357. }
  358. }
  359. const result2 = await this.client.updateByQuery(
  360. {
  361. index: this.options.indexPrefix + indexName,
  362. script: updateVariantScript,
  363. query: {
  364. term: {
  365. productVariantAssetId: asset.id,
  366. },
  367. },
  368. },
  369. { meta: true },
  370. );
  371. if (result2.body.failures) {
  372. for (const failure of result2.body.failures) {
  373. Logger.error(`${failure.cause.type}: ${failure.cause.reason as string}`, loggerCtx);
  374. }
  375. }
  376. const failures1 = result1.body.failures ?? [];
  377. const failures2 = result2.body.failures ?? [];
  378. return failures1.length === 0 && failures2.length === 0;
  379. }
  380. private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
  381. await this.updateProductsOperations(ctx, productIds);
  382. }
  383. private async switchAlias(reindexVariantAliasName: string, variantIndexName: string): Promise<void> {
  384. try {
  385. const reindexVariantAliasExist = await this.client.indices.existsAlias(
  386. {
  387. name: reindexVariantAliasName,
  388. },
  389. { meta: true },
  390. );
  391. if (reindexVariantAliasExist) {
  392. const reindexVariantIndexName = await getIndexNameByAlias(
  393. this.client,
  394. reindexVariantAliasName,
  395. );
  396. const originalVariantAliasExist = await this.client.indices.existsAlias(
  397. {
  398. name: variantIndexName,
  399. },
  400. { meta: true },
  401. );
  402. const originalVariantIndexExist = await this.client.indices.exists(
  403. {
  404. index: variantIndexName,
  405. },
  406. { meta: true },
  407. );
  408. const originalVariantIndexName = await getIndexNameByAlias(this.client, variantIndexName);
  409. const actions = [
  410. {
  411. remove: {
  412. index: reindexVariantIndexName,
  413. alias: reindexVariantAliasName,
  414. },
  415. },
  416. {
  417. add: {
  418. index: reindexVariantIndexName,
  419. alias: variantIndexName,
  420. },
  421. },
  422. ];
  423. if (originalVariantAliasExist.body) {
  424. actions.push({
  425. remove: {
  426. index: originalVariantIndexName,
  427. alias: variantIndexName,
  428. },
  429. });
  430. } else if (originalVariantIndexExist.body) {
  431. await this.client.indices.delete({
  432. index: [variantIndexName],
  433. });
  434. }
  435. await this.client.indices.updateAliases({
  436. actions,
  437. });
  438. if (originalVariantAliasExist.body && originalVariantIndexName) {
  439. await this.client.indices.delete({
  440. index: [originalVariantIndexName],
  441. });
  442. }
  443. }
  444. } catch (e: any) {
  445. Logger.error('Could not switch indexes');
  446. } finally {
  447. const reindexVariantAliasExist = await this.client.indices.existsAlias(
  448. {
  449. name: reindexVariantAliasName,
  450. },
  451. { meta: true },
  452. );
  453. if (reindexVariantAliasExist.body) {
  454. const reindexVariantAliasResult = await this.client.indices.getAlias(
  455. { name: reindexVariantAliasName },
  456. { meta: true },
  457. );
  458. const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
  459. await this.client.indices.delete({
  460. index: [reindexVariantIndexName],
  461. });
  462. }
  463. }
  464. }
  465. private async updateProductsOperationsOnly(
  466. ctx: MutableRequestContext,
  467. productId: ID,
  468. index = VARIANT_INDEX_NAME,
  469. ): Promise<void> {
  470. let operations: BulkVariantOperation[] = [];
  471. let product: Product | undefined;
  472. try {
  473. product = await this.connection
  474. .getRepository(ctx, Product)
  475. .find({
  476. where: { id: productId, deletedAt: IsNull() },
  477. relations: this.productRelations,
  478. })
  479. .then(result => result[0] ?? undefined);
  480. } catch (e: any) {
  481. Logger.error(e.message, loggerCtx, e.stack);
  482. throw e;
  483. }
  484. if (!product) {
  485. return;
  486. }
  487. let updatedProductVariants: ProductVariant[] = [];
  488. try {
  489. updatedProductVariants = await this.connection.rawConnection.getRepository(ProductVariant).find({
  490. relations: this.variantRelations,
  491. where: {
  492. productId,
  493. deletedAt: IsNull(),
  494. },
  495. order: {
  496. id: 'ASC',
  497. },
  498. });
  499. } catch (e: any) {
  500. Logger.error(e.message, loggerCtx, e.stack);
  501. }
  502. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  503. updatedProductVariants.forEach(variant => (variant.product = product!));
  504. if (!product.enabled) {
  505. updatedProductVariants.forEach(v => (v.enabled = false));
  506. }
  507. Logger.debug(`Updating Product (${productId})`, loggerCtx);
  508. const languageVariants: LanguageCode[] = [];
  509. languageVariants.push(...product.translations.map(t => t.languageCode));
  510. for (const variant of updatedProductVariants)
  511. languageVariants.push(...variant.translations.map(t => t.languageCode));
  512. const uniqueLanguageVariants = unique(languageVariants);
  513. const originalChannel = ctx.channel;
  514. for (const channel of product.channels) {
  515. ctx.setChannel(channel);
  516. const variantsInChannel = updatedProductVariants.filter(v =>
  517. v.channels.map(c => c.id).includes(ctx.channelId),
  518. );
  519. for (const variant of variantsInChannel)
  520. await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
  521. for (const languageCode of uniqueLanguageVariants) {
  522. if (variantsInChannel.length) {
  523. for (const variant of variantsInChannel) {
  524. operations.push(
  525. {
  526. index: VARIANT_INDEX_NAME,
  527. operation: {
  528. update: {
  529. _id: ElasticsearchIndexerController.getId(
  530. variant.id,
  531. ctx.channelId,
  532. languageCode,
  533. ),
  534. },
  535. },
  536. },
  537. {
  538. index: VARIANT_INDEX_NAME,
  539. operation: {
  540. doc: await this.createVariantIndexItem(
  541. variant,
  542. variantsInChannel,
  543. ctx,
  544. languageCode,
  545. ),
  546. doc_as_upsert: true,
  547. },
  548. },
  549. );
  550. if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
  551. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  552. await this.executeBulkOperationsByChunks(
  553. this.options.reindexBulkOperationSizeLimit,
  554. operations,
  555. index,
  556. );
  557. operations = [];
  558. }
  559. }
  560. } else {
  561. operations.push(
  562. {
  563. index: VARIANT_INDEX_NAME,
  564. operation: {
  565. update: {
  566. _id: ElasticsearchIndexerController.getId(
  567. -product.id,
  568. ctx.channelId,
  569. languageCode,
  570. ),
  571. },
  572. },
  573. },
  574. {
  575. index: VARIANT_INDEX_NAME,
  576. operation: {
  577. doc: await this.createSyntheticProductIndexItem(product, ctx, languageCode),
  578. doc_as_upsert: true,
  579. },
  580. },
  581. );
  582. }
  583. if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
  584. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  585. await this.executeBulkOperationsByChunks(
  586. this.options.reindexBulkOperationSizeLimit,
  587. operations,
  588. index,
  589. );
  590. operations = [];
  591. }
  592. }
  593. }
  594. ctx.setChannel(originalChannel);
  595. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  596. await this.executeBulkOperationsByChunks(
  597. this.options.reindexBulkOperationSizeLimit,
  598. operations,
  599. index,
  600. );
  601. return;
  602. }
  603. private async updateProductsOperations(ctx: MutableRequestContext, productIds: ID[]): Promise<void> {
  604. Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
  605. for (const productId of productIds) {
  606. await this.deleteProductOperations(ctx, productId);
  607. await this.updateProductsOperationsOnly(ctx, productId);
  608. }
  609. return;
  610. }
  611. /**
  612. * Takes the default relations, and combines them with any extra relations specified in the
  613. * `hydrateProductRelations` and `hydrateProductVariantRelations`. This method also ensures
  614. * that the relation values are unique and that paths are fully expanded.
  615. *
  616. * This means that if a `hydrateProductRelations` value of `['assets.asset']` is specified,
  617. * this method will also add `['assets']` to the relations array, otherwise TypeORM would
  618. * throw an error trying to join a 2nd-level deep relation without the first level also
  619. * being joined.
  620. */
  621. private getReindexRelations<T extends Product | ProductVariant>(
  622. defaultRelations: Array<EntityRelationPaths<T>>,
  623. hydratedRelations: Array<EntityRelationPaths<T>>,
  624. ): Array<EntityRelationPaths<T>> {
  625. const uniqueRelations = unique([...defaultRelations, ...hydratedRelations]);
  626. for (const relation of hydratedRelations) {
  627. let path = relation.split('.');
  628. if (path[0] === 'customFields') {
  629. if (path.length > 2) {
  630. throw new InternalServerError(
  631. [
  632. 'hydrateProductRelations / hydrateProductVariantRelations does not currently support nested custom field relations',
  633. `Received: "${relation}"`,
  634. ].join('\n'),
  635. );
  636. }
  637. path = [path.join('.')];
  638. }
  639. const pathToPart: string[] = [];
  640. for (const part of path) {
  641. pathToPart.push(part);
  642. const joinedPath = pathToPart.join('.') as EntityRelationPaths<T>;
  643. if (!uniqueRelations.includes(joinedPath)) {
  644. uniqueRelations.push(joinedPath);
  645. }
  646. }
  647. }
  648. return uniqueRelations;
  649. }
  650. private async deleteProductOperations(
  651. ctx: RequestContext,
  652. productId: ID,
  653. index: string = VARIANT_INDEX_NAME,
  654. ): Promise<void> {
  655. const channels = await this.requestContextCache.get(ctx, 'elastic-index-all-channels', () =>
  656. this.connection.rawConnection
  657. .getRepository(Channel)
  658. .createQueryBuilder('channel')
  659. .select('channel.id')
  660. .getMany(),
  661. );
  662. const product = await this.connection
  663. .getRepository(ctx, Product)
  664. .createQueryBuilder('product')
  665. .select([
  666. 'product.id',
  667. 'productVariant.id',
  668. 'productTranslations.languageCode',
  669. 'productVariantTranslations.languageCode',
  670. ])
  671. .leftJoin('product.translations', 'productTranslations')
  672. .leftJoin('product.variants', 'productVariant')
  673. .leftJoin('productVariant.translations', 'productVariantTranslations')
  674. .leftJoin('product.channels', 'channel')
  675. .where('product.id = :productId', { productId })
  676. .andWhere('channel.id = :channelId', { channelId: ctx.channelId })
  677. .getOne();
  678. if (!product) return;
  679. Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
  680. let operations: BulkVariantOperation[] = [];
  681. const languageVariants: LanguageCode[] = [];
  682. languageVariants.push(...product.translations.map(t => t.languageCode));
  683. for (const variant of product.variants)
  684. languageVariants.push(...variant.translations.map(t => t.languageCode));
  685. const uniqueLanguageVariants = unique(languageVariants);
  686. for (const { id: channelId } of channels) {
  687. for (const languageCode of uniqueLanguageVariants) {
  688. operations.push({
  689. index: VARIANT_INDEX_NAME,
  690. operation: {
  691. delete: {
  692. _id: ElasticsearchIndexerController.getId(-product.id, channelId, languageCode),
  693. },
  694. },
  695. });
  696. if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
  697. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  698. await this.executeBulkOperationsByChunks(
  699. this.options.reindexBulkOperationSizeLimit,
  700. operations,
  701. index,
  702. );
  703. operations = [];
  704. }
  705. }
  706. }
  707. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  708. await this.executeBulkOperationsByChunks(
  709. this.options.reindexBulkOperationSizeLimit,
  710. operations,
  711. index,
  712. );
  713. await this.deleteVariantsInternalOperations(
  714. product.variants,
  715. channels.map(c => c.id),
  716. uniqueLanguageVariants,
  717. index,
  718. );
  719. return;
  720. }
  721. private async deleteVariantsInternalOperations(
  722. variants: ProductVariant[],
  723. channelIds: ID[],
  724. languageVariants: LanguageCode[],
  725. index = VARIANT_INDEX_NAME,
  726. ): Promise<void> {
  727. Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
  728. let operations: BulkVariantOperation[] = [];
  729. for (const variant of variants) {
  730. for (const channelId of channelIds) {
  731. for (const languageCode of languageVariants) {
  732. operations.push({
  733. index: VARIANT_INDEX_NAME,
  734. operation: {
  735. delete: {
  736. _id: ElasticsearchIndexerController.getId(
  737. variant.id,
  738. channelId,
  739. languageCode,
  740. ),
  741. },
  742. },
  743. });
  744. if (operations.length >= this.options.reindexBulkOperationSizeLimit) {
  745. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  746. await this.executeBulkOperationsByChunks(
  747. this.options.reindexBulkOperationSizeLimit,
  748. operations,
  749. index,
  750. );
  751. operations = [];
  752. }
  753. }
  754. }
  755. }
  756. // Because we can have a huge amount of variant for 1 product, we also chunk update operations
  757. await this.executeBulkOperationsByChunks(
  758. this.options.reindexBulkOperationSizeLimit,
  759. operations,
  760. index,
  761. );
  762. return;
  763. }
  764. private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
  765. const variants = await this.connection.getRepository(ProductVariant).find({
  766. where: { id: In(variantIds) },
  767. relations: ['product'],
  768. loadEagerRelations: false,
  769. });
  770. return unique(variants.map(v => v.product.id));
  771. }
  772. private async executeBulkOperations(operations: BulkVariantOperation[], indexName = VARIANT_INDEX_NAME) {
  773. const variantOperations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
  774. for (const operation of operations) {
  775. variantOperations.push(operation.operation);
  776. }
  777. return Promise.all([this.runBulkOperationsOnIndex(indexName, variantOperations)]);
  778. }
  779. private async runBulkOperationsOnIndex(
  780. indexName: string,
  781. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
  782. ) {
  783. if (operations.length === 0) {
  784. return;
  785. }
  786. try {
  787. const fullIndexName = this.options.indexPrefix + indexName;
  788. const { body } = await this.client.bulk(
  789. {
  790. refresh: true,
  791. index: fullIndexName,
  792. body: operations,
  793. },
  794. { meta: true },
  795. );
  796. if (body.errors) {
  797. Logger.error(
  798. `Some errors occurred running bulk operations on ${fullIndexName}! Set logger to "debug" to print all errors.`,
  799. loggerCtx,
  800. );
  801. body.items.forEach(item => {
  802. if (item.index) {
  803. Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
  804. }
  805. if (item.update) {
  806. Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
  807. }
  808. if (item.delete) {
  809. Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
  810. }
  811. });
  812. } else {
  813. Logger.debug(
  814. `Executed ${body.items.length} bulk operations on index [${fullIndexName}]`,
  815. loggerCtx,
  816. );
  817. }
  818. return body;
  819. } catch (e: any) {
  820. Logger.error(`Error when attempting to run bulk operations [${JSON.stringify(e)}]`, loggerCtx);
  821. Logger.error('Error details: ' + JSON.stringify(e.body?.error, null, 2), loggerCtx);
  822. }
  823. }
  824. private async createVariantIndexItem(
  825. v: ProductVariant,
  826. variants: ProductVariant[],
  827. ctx: RequestContext,
  828. languageCode: LanguageCode,
  829. ): Promise<VariantIndexItem> {
  830. try {
  831. const productAsset = v.product.featuredAsset;
  832. const variantAsset = v.featuredAsset;
  833. const productTranslation = this.getTranslation(v.product, languageCode);
  834. const variantTranslation = this.getTranslation(v, languageCode);
  835. const collectionTranslations = v.collections.map(c => this.getTranslation(c, languageCode));
  836. const productCollectionTranslations = variants.reduce(
  837. (translations, variant) => [
  838. ...translations,
  839. ...variant.collections.map(c => this.getTranslation(c, languageCode)),
  840. ],
  841. [] as Array<Translation<Collection>>,
  842. );
  843. const prices = variants.map(variant => variant.price);
  844. const pricesWithTax = variants.map(variant => variant.priceWithTax);
  845. const item: VariantIndexItem = {
  846. channelId: ctx.channelId,
  847. languageCode,
  848. productVariantId: v.id,
  849. sku: v.sku,
  850. slug: productTranslation.slug,
  851. productId: v.product.id,
  852. productName: productTranslation.name,
  853. productAssetId: productAsset ? productAsset.id : undefined,
  854. productPreview: productAsset ? productAsset.preview : '',
  855. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  856. productVariantName: variantTranslation.name,
  857. productVariantAssetId: variantAsset ? variantAsset.id : undefined,
  858. productVariantPreview: variantAsset ? variantAsset.preview : '',
  859. productVariantPreviewFocalPoint: variantAsset
  860. ? variantAsset.focalPoint || undefined
  861. : undefined,
  862. price: v.price,
  863. priceWithTax: v.priceWithTax,
  864. currencyCode: v.currencyCode,
  865. description: productTranslation.description,
  866. facetIds: this.getFacetIds([v]),
  867. channelIds: v.channels.map(c => c.id),
  868. facetValueIds: this.getFacetValueIds([v]),
  869. collectionIds: v.collections.map(c => c.id.toString()),
  870. collectionSlugs: collectionTranslations.map(c => c.slug),
  871. enabled: v.enabled && v.product.enabled,
  872. productEnabled: variants.some(variant => variant.enabled) && v.product.enabled,
  873. productPriceMin: Math.min(...prices),
  874. productPriceMax: Math.max(...prices),
  875. productPriceWithTaxMin: Math.min(...pricesWithTax),
  876. productPriceWithTaxMax: Math.max(...pricesWithTax),
  877. productFacetIds: this.getFacetIds(variants),
  878. productFacetValueIds: this.getFacetValueIds(variants),
  879. productCollectionIds: unique(
  880. variants.reduce(
  881. (ids, variant) => [...ids, ...variant.collections.map(c => c.id)],
  882. [] as ID[],
  883. ),
  884. ),
  885. productCollectionSlugs: unique(productCollectionTranslations.map(c => c.slug)),
  886. productChannelIds: v.product.channels.map(c => c.id),
  887. inStock: 0 < (await this.productVariantService.getSaleableStockLevel(ctx, v)),
  888. productInStock: await this.getProductInStockValue(ctx, variants),
  889. };
  890. const variantCustomMappings = Object.entries(this.options.customProductVariantMappings);
  891. for (const [name, def] of variantCustomMappings) {
  892. item[`variant-${name}`] = await def.valueFn(v, languageCode, this.injector, ctx);
  893. }
  894. const productCustomMappings = Object.entries(this.options.customProductMappings);
  895. for (const [name, def] of productCustomMappings) {
  896. item[`product-${name}`] = await def.valueFn(
  897. v.product,
  898. variants,
  899. languageCode,
  900. this.injector,
  901. ctx,
  902. );
  903. }
  904. return item;
  905. } catch (err: any) {
  906. Logger.error(err.toString());
  907. throw Error('Error while reindexing!');
  908. }
  909. }
  910. private async getProductInStockValue(ctx: RequestContext, variants: ProductVariant[]): Promise<boolean> {
  911. return this.requestContextCache.get(
  912. ctx,
  913. `elastic-index-product-in-stock-${variants.map(v => v.id).join(',')}`,
  914. async () => {
  915. const stockLevels = await Promise.all(
  916. variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)),
  917. );
  918. return stockLevels.some(stockLevel => 0 < stockLevel);
  919. },
  920. );
  921. }
  922. /**
  923. * If a Product has no variants, we create a synthetic variant for the purposes
  924. * of making that product visible via the search query.
  925. */
  926. private async createSyntheticProductIndexItem(
  927. product: Product,
  928. ctx: RequestContext,
  929. languageCode: LanguageCode,
  930. ): Promise<VariantIndexItem> {
  931. const productTranslation = this.getTranslation(product, languageCode);
  932. const productAsset = product.featuredAsset;
  933. const item: VariantIndexItem = {
  934. channelId: ctx.channelId,
  935. languageCode,
  936. productVariantId: 0,
  937. sku: '',
  938. slug: productTranslation.slug,
  939. productId: product.id,
  940. productName: productTranslation.name,
  941. productAssetId: productAsset ? productAsset.id : undefined,
  942. productPreview: productAsset ? productAsset.preview : '',
  943. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  944. productVariantName: productTranslation.name,
  945. productVariantAssetId: undefined,
  946. productVariantPreview: '',
  947. productVariantPreviewFocalPoint: undefined,
  948. price: 0,
  949. priceWithTax: 0,
  950. currencyCode: ctx.currencyCode,
  951. description: productTranslation.description,
  952. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  953. channelIds: [ctx.channelId],
  954. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  955. collectionIds: [],
  956. collectionSlugs: [],
  957. enabled: false,
  958. productEnabled: false,
  959. productPriceMin: 0,
  960. productPriceMax: 0,
  961. productPriceWithTaxMin: 0,
  962. productPriceWithTaxMax: 0,
  963. productFacetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  964. productFacetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  965. productCollectionIds: [],
  966. productCollectionSlugs: [],
  967. productChannelIds: product.channels.map(c => c.id),
  968. inStock: false,
  969. productInStock: false,
  970. };
  971. const productCustomMappings = Object.entries(this.options.customProductMappings);
  972. for (const [name, def] of productCustomMappings) {
  973. item[`product-${name}`] = await def.valueFn(product, [], languageCode, this.injector, ctx);
  974. }
  975. return item;
  976. }
  977. private getTranslation<T extends Translatable>(
  978. translatable: T,
  979. languageCode: LanguageCode,
  980. ): Translation<T> {
  981. return (translatable.translations.find(t => t.languageCode === languageCode) ||
  982. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  983. translatable.translations[0]) as unknown as Translation<T>;
  984. }
  985. private getFacetIds(variants: ProductVariant[]): string[] {
  986. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  987. const variantFacetIds = variants.reduce(
  988. (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
  989. [] as string[],
  990. );
  991. const productFacetIds = variants[0].product.facetValues.map(facetIds);
  992. return unique([...variantFacetIds, ...productFacetIds]);
  993. }
  994. private getFacetValueIds(variants: ProductVariant[]): string[] {
  995. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  996. const variantFacetValueIds = variants.reduce(
  997. (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
  998. [] as string[],
  999. );
  1000. const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
  1001. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  1002. }
  1003. private static getId(entityId: ID, channelId: ID, languageCode: LanguageCode): string {
  1004. return `${channelId.toString()}_${entityId.toString()}_${languageCode}`;
  1005. }
  1006. }