indexer.controller.ts 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. import { Client } from '@elastic/elasticsearch';
  2. import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  3. import { unique } from '@vendure/common/lib/unique';
  4. import {
  5. Asset,
  6. asyncObservable,
  7. AsyncQueue,
  8. Channel,
  9. Collection,
  10. ConfigService,
  11. FacetValue,
  12. ID,
  13. LanguageCode,
  14. Logger,
  15. Product,
  16. ProductVariant,
  17. ProductVariantService,
  18. RequestContext,
  19. TransactionalConnection,
  20. Translatable,
  21. Translation,
  22. } from '@vendure/core';
  23. import { Observable } from 'rxjs';
  24. import { ELASTIC_SEARCH_OPTIONS, loggerCtx, PRODUCT_INDEX_NAME, VARIANT_INDEX_NAME } from './constants';
  25. import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
  26. import { ElasticsearchOptions } from './options';
  27. import {
  28. BulkOperation,
  29. BulkOperationDoc,
  30. BulkResponseBody,
  31. ProductChannelMessageData,
  32. ProductIndexItem,
  33. ReindexMessageData,
  34. UpdateAssetMessageData,
  35. UpdateProductMessageData,
  36. UpdateVariantMessageData,
  37. UpdateVariantsByIdMessageData,
  38. VariantChannelMessageData,
  39. VariantIndexItem,
  40. } from './types';
  41. export const productRelations = [
  42. 'variants',
  43. 'featuredAsset',
  44. 'facetValues',
  45. 'facetValues.facet',
  46. 'channels',
  47. 'channels.defaultTaxZone',
  48. ];
  49. export const variantRelations = [
  50. 'featuredAsset',
  51. 'facetValues',
  52. 'facetValues.facet',
  53. 'collections',
  54. 'taxCategory',
  55. 'channels',
  56. 'channels.defaultTaxZone',
  57. ];
  58. export interface ReindexMessageResponse {
  59. total: number;
  60. completed: number;
  61. duration: number;
  62. }
  63. type BulkProductOperation = {
  64. index: typeof PRODUCT_INDEX_NAME;
  65. operation: BulkOperation | BulkOperationDoc<ProductIndexItem>;
  66. };
  67. type BulkVariantOperation = {
  68. index: typeof VARIANT_INDEX_NAME;
  69. operation: BulkOperation | BulkOperationDoc<VariantIndexItem>;
  70. };
  71. @Injectable()
  72. export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
  73. private client: Client;
  74. private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
  75. constructor(
  76. private connection: TransactionalConnection,
  77. @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
  78. private productVariantService: ProductVariantService,
  79. private configService: ConfigService,
  80. ) {}
  81. onModuleInit(): any {
  82. this.client = getClient(this.options);
  83. }
  84. onModuleDestroy(): any {
  85. return this.client.close();
  86. }
  87. /**
  88. * Updates the search index only for the affected product.
  89. */
  90. async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  91. await this.updateProductsInternal([productId]);
  92. return true;
  93. }
  94. /**
  95. * Updates the search index only for the affected product.
  96. */
  97. async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  98. const operations = await this.deleteProductOperations(productId);
  99. await this.executeBulkOperations(operations);
  100. return true;
  101. }
  102. /**
  103. * Updates the search index only for the affected product.
  104. */
  105. async assignProductToChannel({
  106. ctx: rawContext,
  107. productId,
  108. channelId,
  109. }: ProductChannelMessageData): Promise<boolean> {
  110. await this.updateProductsInternal([productId]);
  111. return true;
  112. }
  113. /**
  114. * Updates the search index only for the affected product.
  115. */
  116. async removeProductFromChannel({
  117. ctx: rawContext,
  118. productId,
  119. channelId,
  120. }: ProductChannelMessageData): Promise<boolean> {
  121. await this.updateProductsInternal([productId]);
  122. return true;
  123. }
  124. async assignVariantToChannel({
  125. ctx: rawContext,
  126. productVariantId,
  127. channelId,
  128. }: VariantChannelMessageData): Promise<boolean> {
  129. const productIds = await this.getProductIdsByVariantIds([productVariantId]);
  130. await this.updateProductsInternal(productIds);
  131. return true;
  132. }
  133. async removeVariantFromChannel({
  134. ctx: rawContext,
  135. productVariantId,
  136. channelId,
  137. }: VariantChannelMessageData): Promise<boolean> {
  138. const productIds = await this.getProductIdsByVariantIds([productVariantId]);
  139. await this.updateProductsInternal(productIds);
  140. return true;
  141. }
  142. /**
  143. * Updates the search index only for the affected entities.
  144. */
  145. async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  146. return this.asyncQueue.push(async () => {
  147. const productIds = await this.getProductIdsByVariantIds(variantIds);
  148. await this.updateProductsInternal(productIds);
  149. return true;
  150. });
  151. }
  152. async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  153. const productIds = await this.getProductIdsByVariantIds(variantIds);
  154. for (const productId of productIds) {
  155. await this.updateProductsInternal([productId]);
  156. }
  157. return true;
  158. }
  159. updateVariantsById({
  160. ctx: rawContext,
  161. ids,
  162. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  163. return asyncObservable(async observer => {
  164. return this.asyncQueue.push(async () => {
  165. const timeStart = Date.now();
  166. const productIds = await this.getProductIdsByVariantIds(ids);
  167. if (productIds.length) {
  168. let finishedProductsCount = 0;
  169. for (const productId of productIds) {
  170. await this.updateProductsInternal([productId]);
  171. finishedProductsCount++;
  172. observer.next({
  173. total: productIds.length,
  174. completed: Math.min(finishedProductsCount, productIds.length),
  175. duration: +new Date() - timeStart,
  176. });
  177. }
  178. }
  179. Logger.verbose(`Completed updating variants`, loggerCtx);
  180. return {
  181. total: productIds.length,
  182. completed: productIds.length,
  183. duration: +new Date() - timeStart,
  184. };
  185. });
  186. });
  187. }
  188. reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
  189. return asyncObservable(async observer => {
  190. return this.asyncQueue.push(async () => {
  191. const timeStart = Date.now();
  192. const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
  193. const reindexTempName = new Date().getTime();
  194. const productIndexName = this.options.indexPrefix + PRODUCT_INDEX_NAME;
  195. const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
  196. const reindexProductAliasName = productIndexName + `-reindex-${reindexTempName}`;
  197. const reindexVariantAliasName = variantIndexName + `-reindex-${reindexTempName}`;
  198. try {
  199. await createIndices(
  200. this.client,
  201. this.options.indexPrefix,
  202. this.options.indexSettings,
  203. this.options.indexMappingProperties,
  204. this.configService.entityIdStrategy.primaryKeyType,
  205. true,
  206. `-reindex-${reindexTempName}`,
  207. );
  208. const reindexProductIndexName = await getIndexNameByAlias(
  209. this.client,
  210. reindexProductAliasName,
  211. );
  212. const reindexVariantIndexName = await getIndexNameByAlias(
  213. this.client,
  214. reindexVariantAliasName,
  215. );
  216. const originalProductAliasExist = await this.client.indices.existsAlias({
  217. name: productIndexName,
  218. });
  219. const originalVariantAliasExist = await this.client.indices.existsAlias({
  220. name: variantIndexName,
  221. });
  222. const originalProductIndexExist = await this.client.indices.exists({
  223. index: productIndexName,
  224. });
  225. const originalVariantIndexExist = await this.client.indices.exists({
  226. index: variantIndexName,
  227. });
  228. const originalProductIndexName = await getIndexNameByAlias(this.client, productIndexName);
  229. const originalVariantIndexName = await getIndexNameByAlias(this.client, variantIndexName);
  230. if (originalVariantAliasExist.body || originalVariantIndexExist.body) {
  231. await this.client.reindex({
  232. refresh: true,
  233. body: {
  234. source: {
  235. index: variantIndexName,
  236. },
  237. dest: {
  238. index: reindexVariantAliasName,
  239. },
  240. },
  241. });
  242. }
  243. if (originalProductAliasExist.body || originalProductIndexExist.body) {
  244. await this.client.reindex({
  245. refresh: true,
  246. body: {
  247. source: {
  248. index: productIndexName,
  249. },
  250. dest: {
  251. index: reindexProductAliasName,
  252. },
  253. },
  254. });
  255. }
  256. const actions = [
  257. {
  258. remove: {
  259. index: reindexVariantIndexName,
  260. alias: reindexVariantAliasName,
  261. },
  262. },
  263. {
  264. remove: {
  265. index: reindexProductIndexName,
  266. alias: reindexProductAliasName,
  267. },
  268. },
  269. {
  270. add: {
  271. index: reindexVariantIndexName,
  272. alias: variantIndexName,
  273. },
  274. },
  275. {
  276. add: {
  277. index: reindexProductIndexName,
  278. alias: productIndexName,
  279. },
  280. },
  281. ];
  282. if (originalProductAliasExist.body) {
  283. actions.push({
  284. remove: {
  285. index: originalProductIndexName,
  286. alias: productIndexName,
  287. },
  288. });
  289. } else if (originalProductIndexExist.body) {
  290. await this.client.indices.delete({
  291. index: [productIndexName],
  292. });
  293. }
  294. if (originalVariantAliasExist.body) {
  295. actions.push({
  296. remove: {
  297. index: originalVariantIndexName,
  298. alias: variantIndexName,
  299. },
  300. });
  301. } else if (originalVariantIndexExist.body) {
  302. await this.client.indices.delete({
  303. index: [variantIndexName],
  304. });
  305. }
  306. await this.client.indices.updateAliases({
  307. body: {
  308. actions,
  309. },
  310. });
  311. if (originalProductAliasExist.body) {
  312. await this.client.indices.delete({
  313. index: [originalProductIndexName],
  314. });
  315. }
  316. if (originalVariantAliasExist.body) {
  317. await this.client.indices.delete({
  318. index: [originalVariantIndexName],
  319. });
  320. }
  321. } catch (e) {
  322. Logger.warn(
  323. `Could not recreate indices. Reindexing continue with existing indices.`,
  324. loggerCtx,
  325. );
  326. Logger.warn(JSON.stringify(e), loggerCtx);
  327. } finally {
  328. const reindexVariantAliasExist = await this.client.indices.existsAlias({
  329. name: reindexVariantAliasName,
  330. });
  331. if (reindexVariantAliasExist.body) {
  332. const reindexVariantAliasResult = await this.client.indices.getAlias({
  333. name: reindexVariantAliasName,
  334. });
  335. const reindexVariantIndexName = Object.keys(reindexVariantAliasResult.body)[0];
  336. await this.client.indices.delete({
  337. index: [reindexVariantIndexName],
  338. });
  339. }
  340. const reindexProductAliasExist = await this.client.indices.existsAlias({
  341. name: reindexProductAliasName,
  342. });
  343. if (reindexProductAliasExist.body) {
  344. const reindexProductAliasResult = await this.client.indices.getAlias({
  345. name: reindexProductAliasName,
  346. });
  347. const reindexProductIndexName = Object.keys(reindexProductAliasResult.body)[0];
  348. await this.client.indices.delete({
  349. index: [reindexProductIndexName],
  350. });
  351. }
  352. }
  353. const deletedProductIds = await this.connection
  354. .getRepository(Product)
  355. .createQueryBuilder('product')
  356. .select('product.id')
  357. .where('product.deletedAt IS NOT NULL')
  358. .getMany();
  359. for (const { id: deletedProductId } of deletedProductIds) {
  360. operations.push(...(await this.deleteProductOperations(deletedProductId)));
  361. }
  362. const productIds = await this.connection
  363. .getRepository(Product)
  364. .createQueryBuilder('product')
  365. .select('product.id')
  366. .where('product.deletedAt IS NULL')
  367. .getMany();
  368. Logger.verbose(`Reindexing ${productIds.length} Products`, loggerCtx);
  369. let finishedProductsCount = 0;
  370. for (const { id: productId } of productIds) {
  371. operations.push(...(await this.updateProductsOperations([productId])));
  372. finishedProductsCount++;
  373. observer.next({
  374. total: productIds.length,
  375. completed: Math.min(finishedProductsCount, productIds.length),
  376. duration: +new Date() - timeStart,
  377. });
  378. }
  379. Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
  380. await this.executeBulkOperations(operations);
  381. Logger.verbose(`Completed reindexing!`, loggerCtx);
  382. return {
  383. total: productIds.length,
  384. completed: productIds.length,
  385. duration: +new Date() - timeStart,
  386. };
  387. });
  388. });
  389. }
  390. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  391. const result1 = await this.updateAssetFocalPointForIndex(PRODUCT_INDEX_NAME, data.asset);
  392. const result2 = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
  393. await this.client.indices.refresh({
  394. index: [
  395. this.options.indexPrefix + PRODUCT_INDEX_NAME,
  396. this.options.indexPrefix + VARIANT_INDEX_NAME,
  397. ],
  398. });
  399. return result1 && result2;
  400. }
  401. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  402. const result1 = await this.deleteAssetForIndex(PRODUCT_INDEX_NAME, data.asset);
  403. const result2 = await this.deleteAssetForIndex(VARIANT_INDEX_NAME, data.asset);
  404. await this.client.indices.refresh({
  405. index: [
  406. this.options.indexPrefix + PRODUCT_INDEX_NAME,
  407. this.options.indexPrefix + VARIANT_INDEX_NAME,
  408. ],
  409. });
  410. return result1 && result2;
  411. }
  412. private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
  413. const focalPoint = asset.focalPoint || null;
  414. const params = { focalPoint };
  415. return this.updateAssetForIndex(
  416. indexName,
  417. asset,
  418. {
  419. source: 'ctx._source.productPreviewFocalPoint = params.focalPoint',
  420. params,
  421. },
  422. {
  423. source: 'ctx._source.productVariantPreviewFocalPoint = params.focalPoint',
  424. params,
  425. },
  426. );
  427. }
  428. private async deleteAssetForIndex(indexName: string, asset: Asset): Promise<boolean> {
  429. return this.updateAssetForIndex(
  430. indexName,
  431. asset,
  432. { source: 'ctx._source.productAssetId = null' },
  433. { source: 'ctx._source.productVariantAssetId = null' },
  434. );
  435. }
  436. private async updateAssetForIndex(
  437. indexName: string,
  438. asset: Asset,
  439. updateProductScript: { source: string; params?: any },
  440. updateVariantScript: { source: string; params?: any },
  441. ): Promise<boolean> {
  442. const result1 = await this.client.update_by_query({
  443. index: this.options.indexPrefix + indexName,
  444. body: {
  445. script: updateProductScript,
  446. query: {
  447. term: {
  448. productAssetId: asset.id,
  449. },
  450. },
  451. },
  452. });
  453. for (const failure of result1.body.failures) {
  454. Logger.error(`${failure.cause.type}: ${failure.cause.reason}`, loggerCtx);
  455. }
  456. const result2 = await this.client.update_by_query({
  457. index: this.options.indexPrefix + indexName,
  458. body: {
  459. script: updateVariantScript,
  460. query: {
  461. term: {
  462. productVariantAssetId: asset.id,
  463. },
  464. },
  465. },
  466. });
  467. for (const failure of result1.body.failures) {
  468. Logger.error(`${failure.cause.type}: ${failure.cause.reason}`, loggerCtx);
  469. }
  470. return result1.body.failures.length === 0 && result2.body.failures === 0;
  471. }
  472. private async updateProductsInternal(productIds: ID[]) {
  473. const operations = await this.updateProductsOperations(productIds);
  474. await this.executeBulkOperations(operations);
  475. }
  476. private async updateProductsOperations(
  477. productIds: ID[],
  478. ): Promise<Array<BulkProductOperation | BulkVariantOperation>> {
  479. Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
  480. const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
  481. for (const productId of productIds) {
  482. operations.push(...(await this.deleteProductOperations(productId)));
  483. const product = await this.connection.getRepository(Product).findOne(productId, {
  484. relations: productRelations,
  485. where: {
  486. deletedAt: null,
  487. },
  488. });
  489. if (product) {
  490. const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
  491. product.variants.map(v => v.id),
  492. {
  493. relations: variantRelations,
  494. where: {
  495. deletedAt: null,
  496. },
  497. order: {
  498. id: 'ASC',
  499. },
  500. },
  501. );
  502. updatedProductVariants.forEach(variant => (variant.product = product));
  503. if (!product.enabled) {
  504. updatedProductVariants.forEach(v => (v.enabled = false));
  505. }
  506. Logger.verbose(`Updating Product (${productId})`, loggerCtx);
  507. if (updatedProductVariants.length) {
  508. operations.push(...(await this.updateVariantsOperations(updatedProductVariants)));
  509. }
  510. const languageVariants = product.translations.map(t => t.languageCode);
  511. for (const channel of product.channels) {
  512. const channelCtx = new RequestContext({
  513. channel,
  514. apiType: 'admin',
  515. authorizedAsOwnerOnly: false,
  516. isAuthorized: true,
  517. session: {} as any,
  518. });
  519. const variantsInChannel = updatedProductVariants.filter(v =>
  520. v.channels.map(c => c.id).includes(channelCtx.channelId),
  521. );
  522. for (const variant of variantsInChannel) {
  523. await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
  524. }
  525. for (const languageCode of languageVariants) {
  526. operations.push(
  527. {
  528. index: PRODUCT_INDEX_NAME,
  529. operation: {
  530. update: {
  531. _id: this.getId(product.id, channelCtx.channelId, languageCode),
  532. },
  533. },
  534. },
  535. {
  536. index: PRODUCT_INDEX_NAME,
  537. operation: {
  538. doc: variantsInChannel.length
  539. ? this.createProductIndexItem(
  540. variantsInChannel,
  541. channelCtx.channelId,
  542. languageCode,
  543. )
  544. : this.createSyntheticProductIndexItem(
  545. channelCtx,
  546. product,
  547. languageCode,
  548. ),
  549. doc_as_upsert: true,
  550. },
  551. },
  552. );
  553. }
  554. }
  555. }
  556. }
  557. return operations;
  558. }
  559. private async updateVariantsOperations(
  560. productVariants: ProductVariant[],
  561. ): Promise<BulkVariantOperation[]> {
  562. if (productVariants.length === 0) {
  563. return [];
  564. }
  565. const operations: BulkVariantOperation[] = [];
  566. for (const variant of productVariants) {
  567. const languageVariants = variant.translations.map(t => t.languageCode);
  568. for (const channel of variant.channels) {
  569. const channelCtx = new RequestContext({
  570. channel,
  571. apiType: 'admin',
  572. authorizedAsOwnerOnly: false,
  573. isAuthorized: true,
  574. session: {} as any,
  575. });
  576. await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
  577. for (const languageCode of languageVariants) {
  578. operations.push(
  579. {
  580. index: VARIANT_INDEX_NAME,
  581. operation: {
  582. update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) },
  583. },
  584. },
  585. {
  586. index: VARIANT_INDEX_NAME,
  587. operation: {
  588. doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode),
  589. doc_as_upsert: true,
  590. },
  591. },
  592. );
  593. }
  594. }
  595. }
  596. Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
  597. return operations;
  598. }
  599. private async deleteProductOperations(
  600. productId: ID,
  601. ): Promise<Array<BulkProductOperation | BulkVariantOperation>> {
  602. const channels = await this.connection
  603. .getRepository(Channel)
  604. .createQueryBuilder('channel')
  605. .select('channel.id')
  606. .getMany();
  607. const product = await this.connection.getRepository(Product).findOne(productId, {
  608. relations: ['variants'],
  609. });
  610. if (!product) {
  611. return [];
  612. }
  613. Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
  614. const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
  615. for (const { id: channelId } of channels) {
  616. const languageVariants = product.translations.map(t => t.languageCode);
  617. for (const languageCode of languageVariants) {
  618. operations.push({
  619. index: PRODUCT_INDEX_NAME,
  620. operation: { delete: { _id: this.getId(product.id, channelId, languageCode) } },
  621. });
  622. }
  623. }
  624. operations.push(
  625. ...(await this.deleteVariantsInternalOperations(
  626. product.variants,
  627. channels.map(c => c.id),
  628. )),
  629. );
  630. return operations;
  631. }
  632. private async deleteVariantsInternalOperations(
  633. variants: ProductVariant[],
  634. channelIds: ID[],
  635. ): Promise<BulkVariantOperation[]> {
  636. Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
  637. const operations: BulkVariantOperation[] = [];
  638. for (const variant of variants) {
  639. for (const channelId of channelIds) {
  640. const languageVariants = variant.translations.map(t => t.languageCode);
  641. for (const languageCode of languageVariants) {
  642. operations.push({
  643. index: VARIANT_INDEX_NAME,
  644. operation: {
  645. delete: { _id: this.getId(variant.id, channelId, languageCode) },
  646. },
  647. });
  648. }
  649. }
  650. }
  651. return operations;
  652. }
  653. private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
  654. const variants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  655. relations: ['product'],
  656. loadEagerRelations: false,
  657. });
  658. return unique(variants.map(v => v.product.id));
  659. }
  660. private async executeBulkOperations(operations: Array<BulkProductOperation | BulkVariantOperation>) {
  661. const productOperations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
  662. const variantOperations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
  663. for (const operation of operations) {
  664. if (operation.index === PRODUCT_INDEX_NAME) {
  665. productOperations.push(operation.operation);
  666. } else {
  667. variantOperations.push(operation.operation);
  668. }
  669. }
  670. return Promise.all([
  671. this.runBulkOperationsOnIndex(PRODUCT_INDEX_NAME, productOperations),
  672. this.runBulkOperationsOnIndex(VARIANT_INDEX_NAME, variantOperations),
  673. ]);
  674. }
  675. private async runBulkOperationsOnIndex(
  676. indexName: string,
  677. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
  678. ) {
  679. if (operations.length === 0) {
  680. return;
  681. }
  682. try {
  683. const fullIndexName = this.options.indexPrefix + indexName;
  684. const { body }: { body: BulkResponseBody } = await this.client.bulk({
  685. refresh: true,
  686. index: fullIndexName,
  687. body: operations,
  688. });
  689. if (body.errors) {
  690. Logger.error(
  691. `Some errors occurred running bulk operations on ${fullIndexName}! Set logger to "debug" to print all errors.`,
  692. loggerCtx,
  693. );
  694. body.items.forEach(item => {
  695. if (item.index) {
  696. Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
  697. }
  698. if (item.update) {
  699. Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
  700. }
  701. if (item.delete) {
  702. Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
  703. }
  704. });
  705. } else {
  706. Logger.debug(
  707. `Executed ${body.items.length} bulk operations on index [${fullIndexName}]`,
  708. loggerCtx,
  709. );
  710. }
  711. return body;
  712. } catch (e) {
  713. Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
  714. Logger.error('Error details: ' + JSON.stringify(e.body?.error, null, 2), loggerCtx);
  715. }
  716. }
  717. private createVariantIndexItem(
  718. v: ProductVariant,
  719. channelId: ID,
  720. languageCode: LanguageCode,
  721. ): VariantIndexItem {
  722. const productAsset = v.product.featuredAsset;
  723. const variantAsset = v.featuredAsset;
  724. const productTranslation = this.getTranslation(v.product, languageCode);
  725. const variantTranslation = this.getTranslation(v, languageCode);
  726. const collectionTranslations = v.collections.map(c => this.getTranslation(c, languageCode));
  727. const item: VariantIndexItem = {
  728. channelId,
  729. languageCode,
  730. productVariantId: v.id,
  731. sku: v.sku,
  732. slug: productTranslation.slug,
  733. productId: v.product.id,
  734. productName: productTranslation.name,
  735. productAssetId: productAsset ? productAsset.id : undefined,
  736. productPreview: productAsset ? productAsset.preview : '',
  737. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  738. productVariantName: variantTranslation.name,
  739. productVariantAssetId: variantAsset ? variantAsset.id : undefined,
  740. productVariantPreview: variantAsset ? variantAsset.preview : '',
  741. productVariantPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  742. price: v.price,
  743. priceWithTax: v.priceWithTax,
  744. currencyCode: v.currencyCode,
  745. description: productTranslation.description,
  746. facetIds: this.getFacetIds([v]),
  747. channelIds: v.channels.map(c => c.id),
  748. facetValueIds: this.getFacetValueIds([v]),
  749. collectionIds: v.collections.map(c => c.id.toString()),
  750. collectionSlugs: collectionTranslations.map(c => c.slug),
  751. enabled: v.enabled && v.product.enabled,
  752. };
  753. const customMappings = Object.entries(this.options.customProductVariantMappings);
  754. for (const [name, def] of customMappings) {
  755. item[name] = def.valueFn(v, languageCode);
  756. }
  757. return item;
  758. }
  759. private createProductIndexItem(
  760. variants: ProductVariant[],
  761. channelId: ID,
  762. languageCode: LanguageCode,
  763. ): ProductIndexItem {
  764. const first = variants[0];
  765. const prices = variants.map(v => v.price);
  766. const pricesWithTax = variants.map(v => v.priceWithTax);
  767. const productAsset = first.product.featuredAsset;
  768. const variantAsset = variants.filter(v => v.featuredAsset).length
  769. ? variants.filter(v => v.featuredAsset)[0].featuredAsset
  770. : null;
  771. const productTranslation = this.getTranslation(first.product, languageCode);
  772. const variantTranslation = this.getTranslation(first, languageCode);
  773. const collectionTranslations = variants.reduce(
  774. (translations, variant) => [
  775. ...translations,
  776. ...variant.collections.map(c => this.getTranslation(c, languageCode)),
  777. ],
  778. [] as Array<Translation<Collection>>,
  779. );
  780. const item: ProductIndexItem = {
  781. channelId,
  782. languageCode,
  783. sku: first.sku,
  784. slug: productTranslation.slug,
  785. productId: first.product.id,
  786. productName: productTranslation.name,
  787. productAssetId: productAsset ? productAsset.id : undefined,
  788. productPreview: productAsset ? productAsset.preview : '',
  789. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  790. productVariantId: first.id,
  791. productVariantName: variantTranslation.name,
  792. productVariantAssetId: variantAsset ? variantAsset.id : undefined,
  793. productVariantPreview: variantAsset ? variantAsset.preview : '',
  794. productVariantPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  795. priceMin: Math.min(...prices),
  796. priceMax: Math.max(...prices),
  797. priceWithTaxMin: Math.min(...pricesWithTax),
  798. priceWithTaxMax: Math.max(...pricesWithTax),
  799. currencyCode: first.currencyCode,
  800. description: productTranslation.description,
  801. facetIds: this.getFacetIds(variants),
  802. facetValueIds: this.getFacetValueIds(variants),
  803. collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
  804. collectionSlugs: collectionTranslations.map(c => c.slug),
  805. channelIds: first.product.channels.map(c => c.id),
  806. enabled: variants.some(v => v.enabled) && first.product.enabled,
  807. };
  808. const customMappings = Object.entries(this.options.customProductMappings);
  809. for (const [name, def] of customMappings) {
  810. item[name] = def.valueFn(variants[0].product, variants, languageCode);
  811. }
  812. return item;
  813. }
  814. /**
  815. * If a Product has no variants, we create a synthetic variant for the purposes
  816. * of making that product visible via the search query.
  817. */
  818. private createSyntheticProductIndexItem(
  819. ctx: RequestContext,
  820. product: Product,
  821. languageCode: LanguageCode,
  822. ): ProductIndexItem {
  823. const productTranslation = this.getTranslation(product, ctx.languageCode);
  824. return {
  825. channelId: ctx.channelId,
  826. languageCode,
  827. sku: '',
  828. slug: productTranslation.slug,
  829. productId: product.id,
  830. productName: productTranslation.name,
  831. productAssetId: product.featuredAsset?.id ?? undefined,
  832. productPreview: product.featuredAsset?.preview ?? '',
  833. productPreviewFocalPoint: product.featuredAsset?.focalPoint ?? undefined,
  834. productVariantId: 0,
  835. productVariantName: productTranslation.name,
  836. productVariantAssetId: undefined,
  837. productVariantPreview: '',
  838. productVariantPreviewFocalPoint: undefined,
  839. priceMin: 0,
  840. priceMax: 0,
  841. priceWithTaxMin: 0,
  842. priceWithTaxMax: 0,
  843. currencyCode: ctx.channel.currencyCode,
  844. description: productTranslation.description,
  845. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  846. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  847. collectionIds: [],
  848. collectionSlugs: [],
  849. channelIds: [ctx.channelId],
  850. enabled: false,
  851. };
  852. }
  853. private getTranslation<T extends Translatable>(
  854. translatable: T,
  855. languageCode: LanguageCode,
  856. ): Translation<T> {
  857. return (translatable.translations.find(t => t.languageCode === languageCode) ||
  858. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  859. translatable.translations[0]) as unknown as Translation<T>;
  860. }
  861. private getFacetIds(variants: ProductVariant[]): string[] {
  862. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  863. const variantFacetIds = variants.reduce(
  864. (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
  865. [] as string[],
  866. );
  867. const productFacetIds = variants[0].product.facetValues.map(facetIds);
  868. return unique([...variantFacetIds, ...productFacetIds]);
  869. }
  870. private getFacetValueIds(variants: ProductVariant[]): string[] {
  871. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  872. const variantFacetValueIds = variants.reduce(
  873. (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
  874. [] as string[],
  875. );
  876. const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
  877. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  878. }
  879. private getId(entityId: ID, channelId: ID, languageCode: LanguageCode): string {
  880. return `${channelId.toString()}_${entityId.toString()}_${languageCode}`;
  881. }
  882. }