indexer.controller.ts 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. import { Client } from '@elastic/elasticsearch';
  2. import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  3. import { unique } from '@vendure/common/lib/unique';
  4. import {
  5. Asset,
  6. asyncObservable,
  7. AsyncQueue,
  8. Collection,
  9. ConfigService,
  10. FacetValue,
  11. ID,
  12. idsAreEqual,
  13. LanguageCode,
  14. Logger,
  15. Product,
  16. ProductVariant,
  17. ProductVariantService,
  18. RequestContext,
  19. TransactionalConnection,
  20. Translatable,
  21. translateDeep,
  22. Translation,
  23. } from '@vendure/core';
  24. import { Observable } from 'rxjs';
  25. import { SelectQueryBuilder } from 'typeorm';
  26. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  27. import { ELASTIC_SEARCH_OPTIONS, loggerCtx, PRODUCT_INDEX_NAME, VARIANT_INDEX_NAME } from './constants';
  28. import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils';
  29. import { ElasticsearchOptions } from './options';
  30. import {
  31. BulkOperation,
  32. BulkOperationDoc,
  33. BulkResponseBody,
  34. ProductChannelMessageData,
  35. ProductIndexItem,
  36. ReindexMessageData,
  37. UpdateAssetMessageData,
  38. UpdateProductMessageData,
  39. UpdateVariantMessageData,
  40. UpdateVariantsByIdMessageData,
  41. VariantChannelMessageData,
  42. VariantIndexItem,
  43. } from './types';
  44. export const variantRelations = [
  45. 'product',
  46. 'product.featuredAsset',
  47. 'product.facetValues',
  48. 'product.facetValues.facet',
  49. 'product.channels',
  50. 'featuredAsset',
  51. 'facetValues',
  52. 'facetValues.facet',
  53. 'collections',
  54. 'taxCategory',
  55. 'channels',
  56. 'channels.defaultTaxZone',
  57. ];
  58. export interface ReindexMessageResponse {
  59. total: number;
  60. completed: number;
  61. duration: number;
  62. }
  63. @Injectable()
  64. export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
  65. private client: Client;
  66. private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
  67. constructor(
  68. private connection: TransactionalConnection,
  69. @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
  70. private productVariantService: ProductVariantService,
  71. private configService: ConfigService,
  72. ) {}
  73. onModuleInit(): any {
  74. const { host, port } = this.options;
  75. this.client = new Client({
  76. node: `${host}:${port}`,
  77. });
  78. }
  79. onModuleDestroy(): any {
  80. return this.client.close();
  81. }
  82. /**
  83. * Updates the search index only for the affected product.
  84. */
  85. async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  86. const ctx = RequestContext.deserialize(rawContext);
  87. await this.updateProductInternal(ctx, productId);
  88. return true;
  89. }
  90. /**
  91. * Updates the search index only for the affected product.
  92. */
  93. async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
  94. const ctx = RequestContext.deserialize(rawContext);
  95. const product = await this.connection.getRepository(Product).findOne(productId, {
  96. relations: ['channels'],
  97. });
  98. if (!product) {
  99. return false;
  100. }
  101. const channelIds = product.channels.map(c => c.id);
  102. await this.deleteProductInternal(product, channelIds);
  103. const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
  104. await this.deleteVariantsInternal(variants, channelIds);
  105. return true;
  106. }
  107. /**
  108. * Updates the search index only for the affected product.
  109. */
  110. async assignProductToChannel({
  111. ctx: rawContext,
  112. productId,
  113. channelId,
  114. }: ProductChannelMessageData): Promise<boolean> {
  115. const ctx = RequestContext.deserialize(rawContext);
  116. await this.updateProductInternal(ctx, productId);
  117. const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
  118. await this.updateVariantsInternal(
  119. ctx,
  120. variants.map(v => v.id),
  121. channelId,
  122. );
  123. return true;
  124. }
  125. /**
  126. * Updates the search index only for the affected product.
  127. */
  128. async removeProductFromChannel({
  129. ctx: rawContext,
  130. productId,
  131. channelId,
  132. }: ProductChannelMessageData): Promise<boolean> {
  133. const ctx = RequestContext.deserialize(rawContext);
  134. const product = await this.connection.getRepository(Product).findOne(productId);
  135. if (!product) {
  136. return false;
  137. }
  138. await this.deleteProductInternal(product, [channelId]);
  139. const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
  140. await this.deleteVariantsInternal(variants, [channelId]);
  141. return true;
  142. }
  143. async assignVariantToChannel({
  144. ctx: rawContext,
  145. productVariantId,
  146. channelId,
  147. }: VariantChannelMessageData): Promise<boolean> {
  148. const ctx = RequestContext.deserialize(rawContext);
  149. await this.updateVariantsInternal(ctx, [productVariantId], channelId);
  150. return true;
  151. }
  152. async removeVariantFromChannel({
  153. ctx: rawContext,
  154. productVariantId,
  155. channelId,
  156. }: VariantChannelMessageData): Promise<boolean> {
  157. const ctx = RequestContext.deserialize(rawContext);
  158. const productVariant = await this.connection.getEntityOrThrow(ctx, ProductVariant, productVariantId, {
  159. relations: ['product', 'product.channels'],
  160. });
  161. await this.deleteVariantsInternal([productVariant], [channelId]);
  162. if (!productVariant.product.channels.find(c => idsAreEqual(c.id, channelId))) {
  163. await this.deleteProductInternal(productVariant.product, [channelId]);
  164. }
  165. return true;
  166. }
  167. /**
  168. * Updates the search index only for the affected entities.
  169. */
  170. async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  171. const ctx = RequestContext.deserialize(rawContext);
  172. return this.asyncQueue.push(async () => {
  173. await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
  174. return true;
  175. });
  176. }
  177. async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
  178. const ctx = RequestContext.deserialize(rawContext);
  179. const variants = await this.connection
  180. .getRepository(ProductVariant)
  181. .findByIds(variantIds, { relations: ['product', 'channels'] });
  182. const productIds = unique(variants.map(v => v.product.id));
  183. for (const productId of productIds) {
  184. await this.updateProductInternal(ctx, productId);
  185. }
  186. const channelIds = unique(variants.reduce((flat: ID[], v) => [...flat, ...v.channels.map(c => c.id)], []));
  187. await this.deleteVariantsInternal(variants, channelIds);
  188. return true;
  189. }
  190. updateVariantsById({
  191. ctx: rawContext,
  192. ids,
  193. }: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
  194. const ctx = RequestContext.deserialize(rawContext);
  195. const { batchSize } = this.options;
  196. return asyncObservable(async observer => {
  197. return this.asyncQueue.push(async () => {
  198. const timeStart = Date.now();
  199. if (ids.length) {
  200. const batches = Math.ceil(ids.length / batchSize);
  201. Logger.verbose(`Updating ${ids.length} variants...`, loggerCtx);
  202. let variantsInProduct: ProductVariant[] = [];
  203. for (let i = 0; i < batches; i++) {
  204. const begin = i * batchSize;
  205. const end = begin + batchSize;
  206. const batchIds = ids.slice(begin, end);
  207. const variants = await this.getVariantsByIds(ctx, batchIds);
  208. variantsInProduct = await this.processVariantBatch(
  209. variants,
  210. variantsInProduct,
  211. (operations, variant) => {
  212. const languageVariants = variant.translations.map(t => t.languageCode);
  213. for (const languageCode of languageVariants) {
  214. operations.push(
  215. {
  216. update: {
  217. _id: this.getId(variant.id, ctx.channelId, languageCode),
  218. },
  219. },
  220. {
  221. doc: this.createVariantIndexItem(
  222. variant,
  223. ctx.channelId,
  224. languageCode,
  225. ),
  226. },
  227. );
  228. }
  229. },
  230. (operations, product, _variants) => {
  231. const languageVariants = product.translations.map(t => t.languageCode);
  232. for (const languageCode of languageVariants) {
  233. operations.push(
  234. {
  235. update: {
  236. _id: this.getId(product.id, ctx.channelId, languageCode),
  237. },
  238. },
  239. {
  240. doc: this.createProductIndexItem(
  241. _variants,
  242. ctx.channelId,
  243. languageCode,
  244. ),
  245. },
  246. );
  247. }
  248. },
  249. );
  250. observer.next({
  251. total: ids.length,
  252. completed: Math.min((i + 1) * batchSize, ids.length),
  253. duration: +new Date() - timeStart,
  254. });
  255. }
  256. }
  257. Logger.verbose(`Completed updating variants`, loggerCtx);
  258. return {
  259. total: ids.length,
  260. completed: ids.length,
  261. duration: +new Date() - timeStart,
  262. };
  263. });
  264. });
  265. }
  266. reindex({ ctx: rawContext, dropIndices }: ReindexMessageData): Observable<ReindexMessageResponse> {
  267. const ctx = RequestContext.deserialize(rawContext);
  268. const { batchSize } = this.options;
  269. return asyncObservable(async observer => {
  270. return this.asyncQueue.push(async () => {
  271. const timeStart = Date.now();
  272. if (dropIndices) {
  273. await deleteIndices(this.client, this.options.indexPrefix);
  274. await createIndices(
  275. this.client,
  276. this.options.indexPrefix,
  277. this.configService.entityIdStrategy.primaryKeyType,
  278. );
  279. } else {
  280. await deleteByChannel(this.client, this.options.indexPrefix, ctx.channelId);
  281. }
  282. const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
  283. const count = await qb.getCount();
  284. Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
  285. const batches = Math.ceil(count / batchSize);
  286. let variantsInProduct: ProductVariant[] = [];
  287. for (let i = 0; i < batches; i++) {
  288. const variants = await this.getBatch(ctx, qb, i);
  289. Logger.verbose(
  290. `Processing batch ${i + 1} of ${batches}. ProductVariants count: ${variants.length}`,
  291. loggerCtx,
  292. );
  293. variantsInProduct = await this.processVariantBatch(
  294. variants,
  295. variantsInProduct,
  296. (operations, variant) => {
  297. const languageVariants = variant.translations.map(t => t.languageCode);
  298. for (const languageCode of languageVariants) {
  299. operations.push(
  300. { index: { _id: this.getId(variant.id, ctx.channelId, languageCode) } },
  301. this.createVariantIndexItem(variant, ctx.channelId, languageCode),
  302. );
  303. }
  304. },
  305. (operations, product, _variants) => {
  306. const languageVariants = product.translations.map(t => t.languageCode);
  307. for (const languageCode of languageVariants) {
  308. operations.push(
  309. { index: { _id: this.getId(product.id, ctx.channelId, languageCode) } },
  310. this.createProductIndexItem(_variants, ctx.channelId, languageCode),
  311. );
  312. }
  313. },
  314. );
  315. observer.next({
  316. total: count,
  317. completed: Math.min((i + 1) * batchSize, count),
  318. duration: +new Date() - timeStart,
  319. });
  320. }
  321. Logger.verbose(`Completed reindexing!`, loggerCtx);
  322. return {
  323. total: count,
  324. completed: count,
  325. duration: +new Date() - timeStart,
  326. };
  327. });
  328. });
  329. }
  330. async updateAsset(data: UpdateAssetMessageData): Promise<boolean> {
  331. const result1 = await this.updateAssetFocalPointForIndex(PRODUCT_INDEX_NAME, data.asset);
  332. const result2 = await this.updateAssetFocalPointForIndex(VARIANT_INDEX_NAME, data.asset);
  333. await this.client.indices.refresh({
  334. index: [
  335. this.options.indexPrefix + PRODUCT_INDEX_NAME,
  336. this.options.indexPrefix + VARIANT_INDEX_NAME,
  337. ],
  338. });
  339. return result1 && result2;
  340. }
  341. async deleteAsset(data: UpdateAssetMessageData): Promise<boolean> {
  342. const result1 = await this.deleteAssetForIndex(PRODUCT_INDEX_NAME, data.asset);
  343. const result2 = await this.deleteAssetForIndex(VARIANT_INDEX_NAME, data.asset);
  344. await this.client.indices.refresh({
  345. index: [
  346. this.options.indexPrefix + PRODUCT_INDEX_NAME,
  347. this.options.indexPrefix + VARIANT_INDEX_NAME,
  348. ],
  349. });
  350. return result1 && result2;
  351. }
  352. private async updateAssetFocalPointForIndex(indexName: string, asset: Asset): Promise<boolean> {
  353. const focalPoint = asset.focalPoint || null;
  354. const params = { focalPoint };
  355. return this.updateAssetForIndex(
  356. indexName,
  357. asset,
  358. {
  359. source: 'ctx._source.productPreviewFocalPoint = params.focalPoint',
  360. params,
  361. },
  362. {
  363. source: 'ctx._source.productVariantPreviewFocalPoint = params.focalPoint',
  364. params,
  365. },
  366. );
  367. }
  368. private async deleteAssetForIndex(indexName: string, asset: Asset): Promise<boolean> {
  369. return this.updateAssetForIndex(
  370. indexName,
  371. asset,
  372. { source: 'ctx._source.productAssetId = null' },
  373. { source: 'ctx._source.productVariantAssetId = null' },
  374. );
  375. }
  376. private async updateAssetForIndex(
  377. indexName: string,
  378. asset: Asset,
  379. updateProductScript: { source: string; params?: any },
  380. updateVariantScript: { source: string; params?: any },
  381. ): Promise<boolean> {
  382. const result1 = await this.client.update_by_query({
  383. index: this.options.indexPrefix + indexName,
  384. body: {
  385. script: updateProductScript,
  386. query: {
  387. term: {
  388. productAssetId: asset.id,
  389. },
  390. },
  391. },
  392. });
  393. for (const failure of result1.body.failures) {
  394. Logger.error(`${failure.cause.type}: ${failure.cause.reason}`, loggerCtx);
  395. }
  396. const result2 = await this.client.update_by_query({
  397. index: this.options.indexPrefix + indexName,
  398. body: {
  399. script: updateVariantScript,
  400. query: {
  401. term: {
  402. productVariantAssetId: asset.id,
  403. },
  404. },
  405. },
  406. });
  407. for (const failure of result1.body.failures) {
  408. Logger.error(`${failure.cause.type}: ${failure.cause.reason}`, loggerCtx);
  409. }
  410. return result1.body.failures.length === 0 && result2.body.failures === 0;
  411. }
  412. private async processVariantBatch(
  413. variants: ProductVariant[],
  414. variantsInProduct: ProductVariant[],
  415. processVariants: (
  416. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem> | VariantIndexItem>,
  417. variant: ProductVariant,
  418. ) => void,
  419. processProducts: (
  420. operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem> | ProductIndexItem>,
  421. product: Product,
  422. variants: ProductVariant[],
  423. ) => void,
  424. ) {
  425. const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
  426. const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
  427. const productIdsIndexed = new Set<ID>();
  428. // tslint:disable-next-line:prefer-for-of
  429. for (let j = 0; j < variants.length; j++) {
  430. const variant = variants[j];
  431. variantsInProduct.push(variant);
  432. processVariants(variantsToIndex, variant);
  433. const nextVariant = variants[j + 1];
  434. const nextVariantIsNewProduct = nextVariant && nextVariant.productId !== variant.productId;
  435. const thisVariantIsLastAndProductNotAdded =
  436. !nextVariant && !productIdsIndexed.has(variant.productId);
  437. if (nextVariantIsNewProduct || thisVariantIsLastAndProductNotAdded) {
  438. processProducts(productsToIndex, variant.product, variantsInProduct);
  439. variantsInProduct = [];
  440. productIdsIndexed.add(variant.productId);
  441. }
  442. }
  443. await this.executeBulkOperations(VARIANT_INDEX_NAME, variantsToIndex);
  444. await this.executeBulkOperations(PRODUCT_INDEX_NAME, productsToIndex);
  445. return variantsInProduct;
  446. }
  447. private async updateVariantsInternal(ctx: RequestContext, variantIds: ID[], channelId: ID) {
  448. const productVariants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  449. relations: variantRelations,
  450. where: {
  451. deletedAt: null,
  452. },
  453. order: {
  454. id: 'ASC',
  455. },
  456. });
  457. if (productVariants.length) {
  458. // When ProductVariants change, we need to update the corresponding Product index
  459. // since e.g. price changes must be reflected on the Product level too.
  460. const productIdsOfVariants = unique(productVariants.map(v => v.productId));
  461. for (const variantProductId of productIdsOfVariants) {
  462. await this.updateProductInternal(ctx, variantProductId);
  463. }
  464. const operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
  465. for (const variant of productVariants) {
  466. const languageVariants = variant.translations.map(t => t.languageCode);
  467. for (const channel of variant.channels) {
  468. const channelCtx = new RequestContext({
  469. channel,
  470. apiType: 'admin',
  471. authorizedAsOwnerOnly: false,
  472. isAuthorized: true,
  473. session: {} as any,
  474. });
  475. await this.productVariantService.applyChannelPriceAndTax(variant, ctx);
  476. for (const languageCode of languageVariants) {
  477. operations.push(
  478. { update: { _id: this.getId(variant.id, channel.id, languageCode) } },
  479. {
  480. doc: this.createVariantIndexItem(variant, channel.id, languageCode),
  481. doc_as_upsert: true,
  482. },
  483. );
  484. }
  485. }
  486. }
  487. Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
  488. await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
  489. }
  490. }
  491. private async updateProductInternal(ctx: RequestContext, productId: ID) {
  492. const product = await this.connection.getRepository(Product).findOne(productId, {
  493. relations: ['variants', 'channels', 'channels.defaultTaxZone'],
  494. });
  495. if (product) {
  496. const updatedProductVariants = await this.connection.getRepository(ProductVariant).findByIds(
  497. product.variants.map(v => v.id),
  498. {
  499. relations: variantRelations,
  500. where: {
  501. deletedAt: null,
  502. },
  503. },
  504. );
  505. if (product.enabled === false) {
  506. updatedProductVariants.forEach(v => (v.enabled = false));
  507. }
  508. const operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
  509. if (updatedProductVariants.length) {
  510. Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
  511. const languageVariants = product.translations.map(t => t.languageCode);
  512. for (const channel of product.channels) {
  513. const channelCtx = new RequestContext({
  514. channel,
  515. apiType: 'admin',
  516. authorizedAsOwnerOnly: false,
  517. isAuthorized: true,
  518. session: {} as any,
  519. });
  520. const variantsInChannel = updatedProductVariants.filter(v =>
  521. v.channels.map(c => c.id).includes(channel.id),
  522. );
  523. for (const variant of variantsInChannel) {
  524. await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
  525. }
  526. for (const languageCode of languageVariants) {
  527. const updatedProductIndexItem = this.createProductIndexItem(
  528. variantsInChannel,
  529. channel.id,
  530. languageCode,
  531. );
  532. operations.push(
  533. {
  534. update: {
  535. _id: this.getId(
  536. updatedProductIndexItem.productId,
  537. channel.id,
  538. languageCode,
  539. ),
  540. },
  541. },
  542. { doc: updatedProductIndexItem, doc_as_upsert: true },
  543. );
  544. }
  545. }
  546. } else {
  547. const syntheticIndexItem = this.createSyntheticProductIndexItem(ctx, product);
  548. operations.push(
  549. {
  550. update: {
  551. _id: this.getId(syntheticIndexItem.productId, ctx.channelId, ctx.languageCode),
  552. },
  553. },
  554. { doc: syntheticIndexItem, doc_as_upsert: true },
  555. );
  556. }
  557. await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
  558. }
  559. }
  560. private async deleteProductInternal(product: Product, channelIds: ID[]) {
  561. Logger.verbose(`Deleting 1 Product (${product.id})`, loggerCtx);
  562. const operations: BulkOperation[] = [];
  563. const languageVariants = product.translations.map(t => t.languageCode);
  564. for (const languageCode of languageVariants) {
  565. for (const channelId of channelIds) {
  566. operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } });
  567. }
  568. }
  569. await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
  570. }
  571. private async deleteVariantsInternal(variants: ProductVariant[], channelIds: ID[]) {
  572. Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
  573. const operations: BulkOperation[] = [];
  574. for (const variant of variants) {
  575. const languageVariants = variant.translations.map(t => t.languageCode);
  576. for (const languageCode of languageVariants) {
  577. for (const channelId of channelIds) {
  578. operations.push({
  579. delete: { _id: this.getId(variant.id, channelId, languageCode) },
  580. });
  581. }
  582. }
  583. }
  584. await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
  585. }
  586. private async executeBulkOperations(
  587. indexName: string,
  588. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
  589. ) {
  590. try {
  591. const fullIndexName = this.options.indexPrefix + indexName;
  592. const { body }: { body: BulkResponseBody } = await this.client.bulk({
  593. refresh: true,
  594. index: fullIndexName,
  595. body: operations,
  596. });
  597. if (body.errors) {
  598. Logger.error(
  599. `Some errors occurred running bulk operations on ${fullIndexName}! Set logger to "debug" to print all errors.`,
  600. loggerCtx,
  601. );
  602. body.items.forEach(item => {
  603. if (item.index) {
  604. Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
  605. }
  606. if (item.update) {
  607. Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
  608. }
  609. if (item.delete) {
  610. Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
  611. }
  612. });
  613. } else {
  614. Logger.verbose(
  615. `Executed ${body.items.length} bulk operations on index [${fullIndexName}]`,
  616. loggerCtx,
  617. );
  618. }
  619. return body;
  620. } catch (e) {
  621. Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
  622. Logger.error('Error details: ' + JSON.stringify(e.body && e.body.error, null, 2), loggerCtx);
  623. }
  624. }
  625. private getSearchIndexQueryBuilder(channelId: ID) {
  626. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  627. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  628. relations: variantRelations,
  629. order: {
  630. productId: 'ASC',
  631. },
  632. });
  633. FindOptionsUtils.joinEagerRelations(
  634. qb,
  635. qb.alias,
  636. this.connection.rawConnection.getMetadata(ProductVariant),
  637. );
  638. qb.leftJoin('variants.product', '__product')
  639. .leftJoin('__product.channels', '__channel')
  640. .where('__channel.id = :channelId', { channelId })
  641. .andWhere('variants__product.deletedAt IS NULL')
  642. .andWhere('variants.deletedAt IS NULL');
  643. return qb;
  644. }
  645. private async getBatch(
  646. ctx: RequestContext,
  647. qb: SelectQueryBuilder<ProductVariant>,
  648. batchNumber: string | number,
  649. ): Promise<ProductVariant[]> {
  650. const { batchSize } = this.options;
  651. const i = Number.parseInt(batchNumber.toString(), 10);
  652. const variants = await qb
  653. .take(batchSize)
  654. .skip(i * batchSize)
  655. .addOrderBy('variants.id', 'ASC')
  656. .getMany();
  657. return this.hydrateVariants(ctx, variants);
  658. }
  659. private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
  660. const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
  661. relations: variantRelations,
  662. where: {
  663. deletedAt: null,
  664. },
  665. order: {
  666. id: 'ASC',
  667. },
  668. });
  669. return this.hydrateVariants(ctx, variants);
  670. }
  671. /**
  672. * Given an array of ProductVariants, this method applies the correct taxes and translations.
  673. */
  674. private async hydrateVariants(
  675. ctx: RequestContext,
  676. variants: ProductVariant[],
  677. ): Promise<ProductVariant[]> {
  678. return (
  679. await Promise.all(variants.map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx)))
  680. ).map(v => translateDeep(v, ctx.languageCode, ['product', 'collections']));
  681. }
  682. private createVariantIndexItem(
  683. v: ProductVariant,
  684. channelId: ID,
  685. languageCode: LanguageCode,
  686. ): VariantIndexItem {
  687. const productAsset = v.product.featuredAsset;
  688. const variantAsset = v.featuredAsset;
  689. const productTranslation = this.getTranslation(v.product, languageCode);
  690. const variantTranslation = this.getTranslation(v, languageCode);
  691. const collectionTranslations = v.collections.map(c => this.getTranslation(c, languageCode));
  692. const item: VariantIndexItem = {
  693. channelId,
  694. languageCode,
  695. productVariantId: v.id,
  696. sku: v.sku,
  697. slug: productTranslation.slug,
  698. productId: v.product.id,
  699. productName: productTranslation.name,
  700. productAssetId: productAsset ? productAsset.id : undefined,
  701. productPreview: productAsset ? productAsset.preview : '',
  702. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  703. productVariantName: variantTranslation.name,
  704. productVariantAssetId: variantAsset ? variantAsset.id : undefined,
  705. productVariantPreview: variantAsset ? variantAsset.preview : '',
  706. productVariantPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  707. price: v.price,
  708. priceWithTax: v.priceWithTax,
  709. currencyCode: v.currencyCode,
  710. description: productTranslation.description,
  711. facetIds: this.getFacetIds([v]),
  712. channelIds: v.channels.map(c => c.id),
  713. facetValueIds: this.getFacetValueIds([v]),
  714. collectionIds: v.collections.map(c => c.id.toString()),
  715. collectionSlugs: collectionTranslations.map(c => c.slug),
  716. enabled: v.enabled && v.product.enabled,
  717. };
  718. const customMappings = Object.entries(this.options.customProductVariantMappings);
  719. for (const [name, def] of customMappings) {
  720. item[name] = def.valueFn(v, languageCode);
  721. }
  722. return item;
  723. }
  724. private createProductIndexItem(
  725. variants: ProductVariant[],
  726. channelId: ID,
  727. languageCode: LanguageCode,
  728. ): ProductIndexItem {
  729. const first = variants[0];
  730. const prices = variants.map(v => v.price);
  731. const pricesWithTax = variants.map(v => v.priceWithTax);
  732. const productAsset = first.product.featuredAsset;
  733. const variantAsset = variants.filter(v => v.featuredAsset).length
  734. ? variants.filter(v => v.featuredAsset)[0].featuredAsset
  735. : null;
  736. const productTranslation = this.getTranslation(first.product, languageCode);
  737. const variantTranslation = this.getTranslation(first, languageCode);
  738. const collectionTranslations = variants.reduce(
  739. (translations, variant) => [
  740. ...translations,
  741. ...variant.collections.map(c => this.getTranslation(c, languageCode)),
  742. ],
  743. [] as Array<Translation<Collection>>,
  744. );
  745. const item: ProductIndexItem = {
  746. channelId,
  747. languageCode,
  748. sku: first.sku,
  749. slug: productTranslation.slug,
  750. productId: first.product.id,
  751. productName: productTranslation.name,
  752. productAssetId: productAsset ? productAsset.id : undefined,
  753. productPreview: productAsset ? productAsset.preview : '',
  754. productPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  755. productVariantId: first.id,
  756. productVariantName: variantTranslation.name,
  757. productVariantAssetId: variantAsset ? variantAsset.id : undefined,
  758. productVariantPreview: variantAsset ? variantAsset.preview : '',
  759. productVariantPreviewFocalPoint: productAsset ? productAsset.focalPoint || undefined : undefined,
  760. priceMin: Math.min(...prices),
  761. priceMax: Math.max(...prices),
  762. priceWithTaxMin: Math.min(...pricesWithTax),
  763. priceWithTaxMax: Math.max(...pricesWithTax),
  764. currencyCode: first.currencyCode,
  765. description: productTranslation.description,
  766. facetIds: this.getFacetIds(variants),
  767. facetValueIds: this.getFacetValueIds(variants),
  768. collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
  769. collectionSlugs: collectionTranslations.map(c => c.slug),
  770. channelIds: first.product.channels.map(c => c.id),
  771. enabled: variants.some(v => v.enabled) && first.product.enabled,
  772. };
  773. const customMappings = Object.entries(this.options.customProductMappings);
  774. for (const [name, def] of customMappings) {
  775. item[name] = def.valueFn(variants[0].product, variants, languageCode);
  776. }
  777. return item;
  778. }
  779. /**
  780. * If a Product has no variants, we create a synthetic variant for the purposes
  781. * of making that product visible via the search query.
  782. */
  783. private createSyntheticProductIndexItem(ctx: RequestContext, product: Product): ProductIndexItem {
  784. const productTranslation = this.getTranslation(product, ctx.languageCode);
  785. return {
  786. channelId: ctx.channelId,
  787. languageCode: ctx.languageCode,
  788. sku: '',
  789. slug: productTranslation.slug,
  790. productId: product.id,
  791. productName: productTranslation.name,
  792. productAssetId: product.featuredAsset?.id ?? undefined,
  793. productPreview: product.featuredAsset?.preview ?? '',
  794. productPreviewFocalPoint: product.featuredAsset?.focalPoint ?? undefined,
  795. productVariantId: 0,
  796. productVariantName: productTranslation.name,
  797. productVariantAssetId: undefined,
  798. productVariantPreview: '',
  799. productVariantPreviewFocalPoint: undefined,
  800. priceMin: 0,
  801. priceMax: 0,
  802. priceWithTaxMin: 0,
  803. priceWithTaxMax: 0,
  804. currencyCode: ctx.channel.currencyCode,
  805. description: productTranslation.description,
  806. facetIds: product.facetValues?.map(fv => fv.facet.id.toString()) ?? [],
  807. facetValueIds: product.facetValues?.map(fv => fv.id.toString()) ?? [],
  808. collectionIds: [],
  809. collectionSlugs: [],
  810. channelIds: [ctx.channelId],
  811. enabled: false,
  812. };
  813. }
  814. private getTranslation<T extends Translatable>(
  815. translatable: T,
  816. languageCode: LanguageCode,
  817. ): Translation<T> {
  818. return ((translatable.translations.find(t => t.languageCode === languageCode) ||
  819. translatable.translations.find(t => t.languageCode === this.configService.defaultLanguageCode) ||
  820. translatable.translations[0]) as unknown) as Translation<T>;
  821. }
  822. private getFacetIds(variants: ProductVariant[]): string[] {
  823. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  824. const variantFacetIds = variants.reduce(
  825. (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
  826. [] as string[],
  827. );
  828. const productFacetIds = variants[0].product.facetValues.map(facetIds);
  829. return unique([...variantFacetIds, ...productFacetIds]);
  830. }
  831. private getFacetValueIds(variants: ProductVariant[]): string[] {
  832. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  833. const variantFacetValueIds = variants.reduce(
  834. (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
  835. [] as string[],
  836. );
  837. const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
  838. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  839. }
  840. private getId(entityId: ID, channelId: ID, languageCode: LanguageCode): string {
  841. return `${channelId.toString()}_${entityId.toString()}_${languageCode}`;
  842. }
  843. }