indexer.controller.ts 24 KB


  1. import { Client } from '@elastic/elasticsearch';
  2. import { Controller, Inject, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  3. import { MessagePattern } from '@nestjs/microservices';
  4. import { InjectConnection } from '@nestjs/typeorm';
  5. import { unique } from '@vendure/common/lib/unique';
  6. import {
  7. asyncObservable,
  8. AsyncQueue,
  9. FacetValue,
  10. ID,
  11. JobService,
  12. Logger,
  13. Product,
  14. ProductVariant,
  15. ProductVariantService,
  16. RequestContext,
  17. translateDeep,
  18. } from '@vendure/core';
  19. import { Observable } from 'rxjs';
  20. import { Connection, SelectQueryBuilder } from 'typeorm';
  21. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  22. import {
  23. ELASTIC_SEARCH_OPTIONS,
  24. loggerCtx,
  25. PRODUCT_INDEX_NAME,
  26. PRODUCT_INDEX_TYPE,
  27. VARIANT_INDEX_NAME,
  28. VARIANT_INDEX_TYPE,
  29. } from './constants';
  30. import { createIndices, deleteByChannel, deleteIndices } from './indexing-utils';
  31. import { ElasticsearchOptions } from './options';
  32. import {
  33. AssignProductToChannelMessage,
  34. BulkOperation,
  35. BulkOperationDoc,
  36. BulkResponseBody,
  37. DeleteProductMessage,
  38. DeleteVariantMessage,
  39. ProductIndexItem,
  40. ReindexMessage,
  41. RemoveProductFromChannelMessage,
  42. UpdateProductMessage,
  43. UpdateVariantMessage,
  44. UpdateVariantsByIdMessage,
  45. VariantIndexItem,
  46. } from './types';
  47. export const variantRelations = [
  48. 'product',
  49. 'product.featuredAsset',
  50. 'product.facetValues',
  51. 'product.facetValues.facet',
  52. 'product.channels',
  53. 'featuredAsset',
  54. 'facetValues',
  55. 'facetValues.facet',
  56. 'collections',
  57. 'taxCategory',
  58. ];
  59. export interface ReindexMessageResponse {
  60. total: number;
  61. completed: number;
  62. duration: number;
  63. }
  64. @Controller()
  65. export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
  66. private client: Client;
  67. private asyncQueue = new AsyncQueue('elasticsearch-indexer', 5);
  68. constructor(
  69. @InjectConnection() private connection: Connection,
  70. @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
  71. private productVariantService: ProductVariantService,
  72. private jobService: JobService,
  73. ) {}
  74. onModuleInit(): any {
  75. const { host, port } = this.options;
  76. this.client = new Client({
  77. node: `${host}:${port}`,
  78. });
  79. }
  80. onModuleDestroy(): any {
  81. return this.client.close();
  82. }
  83. /**
  84. * Updates the search index only for the affected product.
  85. */
  86. @MessagePattern(UpdateProductMessage.pattern)
  87. updateProduct({
  88. ctx: rawContext,
  89. productId,
  90. }: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
  91. const ctx = RequestContext.fromObject(rawContext);
  92. return asyncObservable(async () => {
  93. await this.updateProductInternal(ctx, productId, ctx.channelId);
  94. return true;
  95. });
  96. }
  97. /**
  98. * Updates the search index only for the affected product.
  99. */
  100. @MessagePattern(DeleteProductMessage.pattern)
  101. deleteProduct({
  102. ctx: rawContext,
  103. productId,
  104. }: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
  105. const ctx = RequestContext.fromObject(rawContext);
  106. return asyncObservable(async () => {
  107. await this.deleteProductInternal(productId, ctx.channelId);
  108. const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
  109. await this.deleteVariantsInternal(variants.map(v => v.id), ctx.channelId);
  110. return true;
  111. });
  112. }
  113. /**
  114. * Updates the search index only for the affected product.
  115. */
  116. @MessagePattern(AssignProductToChannelMessage.pattern)
  117. assignProductsToChannel({
  118. ctx: rawContext,
  119. productId,
  120. channelId,
  121. }: AssignProductToChannelMessage['data']): Observable<AssignProductToChannelMessage['response']> {
  122. const ctx = RequestContext.fromObject(rawContext);
  123. return asyncObservable(async () => {
  124. await this.updateProductInternal(ctx, productId, channelId);
  125. const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
  126. await this.updateVariantsInternal(ctx, variants.map(v => v.id), channelId);
  127. return true;
  128. });
  129. }
  130. /**
  131. * Updates the search index only for the affected product.
  132. */
  133. @MessagePattern(RemoveProductFromChannelMessage.pattern)
  134. removeProductFromChannel({
  135. ctx: rawContext,
  136. productId,
  137. channelId,
  138. }: RemoveProductFromChannelMessage['data']): Observable<RemoveProductFromChannelMessage['response']> {
  139. const ctx = RequestContext.fromObject(rawContext);
  140. return asyncObservable(async () => {
  141. await this.deleteProductInternal(productId, channelId);
  142. const variants = await this.productVariantService.getVariantsByProductId(ctx, productId);
  143. await this.deleteVariantsInternal(variants.map(v => v.id), channelId);
  144. return true;
  145. });
  146. }
  147. /**
  148. * Updates the search index only for the affected entities.
  149. */
  150. @MessagePattern(UpdateVariantMessage.pattern)
  151. updateVariants({
  152. ctx: rawContext,
  153. variantIds,
  154. }: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
  155. const ctx = RequestContext.fromObject(rawContext);
  156. return asyncObservable(async () => {
  157. return this.asyncQueue.push(async () => {
  158. await this.updateVariantsInternal(ctx, variantIds, ctx.channelId);
  159. return true;
  160. });
  161. });
  162. }
  163. @MessagePattern(DeleteVariantMessage.pattern)
  164. private deleteVaiants({
  165. ctx: rawContext,
  166. variantIds,
  167. }: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
  168. const ctx = RequestContext.fromObject(rawContext);
  169. return asyncObservable(async () => {
  170. await this.deleteVariantsInternal(variantIds, ctx.channelId);
  171. return true;
  172. });
  173. }
  174. @MessagePattern(UpdateVariantsByIdMessage.pattern)
  175. updateVariantsById({
  176. ctx: rawContext,
  177. ids,
  178. }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
  179. const ctx = RequestContext.fromObject(rawContext);
  180. const { batchSize } = this.options;
  181. return asyncObservable(async observer => {
  182. return this.asyncQueue.push(async () => {
  183. const timeStart = Date.now();
  184. if (ids.length) {
  185. const batches = Math.ceil(ids.length / batchSize);
  186. Logger.verbose(`Updating ${ids.length} variants...`, loggerCtx);
  187. let variantsInProduct: ProductVariant[] = [];
  188. for (let i = 0; i < batches; i++) {
  189. const begin = i * batchSize;
  190. const end = begin + batchSize;
  191. const batchIds = ids.slice(begin, end);
  192. const variants = await this.getVariantsByIds(ctx, batchIds);
  193. variantsInProduct = await this.processVariantBatch(
  194. variants,
  195. variantsInProduct,
  196. (operations, variant) => {
  197. operations.push(
  198. { update: { _id: this.getId(variant.id, ctx.channelId) } },
  199. { doc: this.createVariantIndexItem(variant, ctx.channelId) },
  200. );
  201. },
  202. (operations, product, _variants) => {
  203. operations.push(
  204. { update: { _id: this.getId(product.id, ctx.channelId) } },
  205. { doc: this.createProductIndexItem(_variants, ctx.channelId) },
  206. );
  207. },
  208. );
  209. observer.next({
  210. total: ids.length,
  211. completed: Math.min((i + 1) * batchSize, ids.length),
  212. duration: +new Date() - timeStart,
  213. });
  214. }
  215. }
  216. Logger.verbose(`Completed updating variants`, loggerCtx);
  217. return {
  218. total: ids.length,
  219. completed: ids.length,
  220. duration: +new Date() - timeStart,
  221. };
  222. });
  223. });
  224. }
  225. @MessagePattern(ReindexMessage.pattern)
  226. reindex({
  227. ctx: rawContext,
  228. dropIndices,
  229. }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
  230. const ctx = RequestContext.fromObject(rawContext);
  231. const { batchSize } = this.options;
  232. return asyncObservable(async observer => {
  233. return this.asyncQueue.push(async () => {
  234. const timeStart = Date.now();
  235. if (dropIndices) {
  236. await deleteIndices(this.client, this.options.indexPrefix);
  237. await createIndices(this.client, this.options.indexPrefix);
  238. } else {
  239. await deleteByChannel(this.client, this.options.indexPrefix, ctx.channelId);
  240. }
  241. const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
  242. const count = await qb.andWhere('variants__product.deletedAt IS NULL').getCount();
  243. Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
  244. const batches = Math.ceil(count / batchSize);
  245. let variantsInProduct: ProductVariant[] = [];
  246. for (let i = 0; i < batches; i++) {
  247. const variants = await this.getBatch(ctx, qb, i);
  248. Logger.verbose(
  249. `Processing batch ${i + 1} of ${batches}. ProductVariants count: ${variants.length}`,
  250. loggerCtx,
  251. );
  252. variantsInProduct = await this.processVariantBatch(
  253. variants,
  254. variantsInProduct,
  255. (operations, variant) => {
  256. operations.push(
  257. { index: { _id: this.getId(variant.id, ctx.channelId) } },
  258. this.createVariantIndexItem(variant, ctx.channelId),
  259. );
  260. },
  261. (operations, product, _variants) => {
  262. operations.push(
  263. { index: { _id: this.getId(product.id, ctx.channelId) } },
  264. this.createProductIndexItem(_variants, ctx.channelId),
  265. );
  266. },
  267. );
  268. observer.next({
  269. total: count,
  270. completed: Math.min((i + 1) * batchSize, count),
  271. duration: +new Date() - timeStart,
  272. });
  273. }
  274. Logger.verbose(`Completed reindexing!`, loggerCtx);
  275. return {
  276. total: count,
  277. completed: count,
  278. duration: +new Date() - timeStart,
  279. };
  280. });
  281. });
  282. }
  283. private async processVariantBatch(
  284. variants: ProductVariant[],
  285. variantsInProduct: ProductVariant[],
  286. processVariants: (
  287. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem> | VariantIndexItem>,
  288. variant: ProductVariant,
  289. ) => void,
  290. processProducts: (
  291. operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem> | ProductIndexItem>,
  292. product: Product,
  293. variants: ProductVariant[],
  294. ) => void,
  295. ) {
  296. const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
  297. const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
  298. const productIdsIndexed = new Set<ID>();
  299. // tslint:disable-next-line:prefer-for-of
  300. for (let j = 0; j < variants.length; j++) {
  301. const variant = variants[j];
  302. variantsInProduct.push(variant);
  303. processVariants(variantsToIndex, variant);
  304. const nextVariant = variants[j + 1];
  305. const nextVariantIsNewProduct = nextVariant && nextVariant.productId !== variant.productId;
  306. const thisVariantIsLastAndProductNotAdded =
  307. !nextVariant && !productIdsIndexed.has(variant.productId);
  308. if (nextVariantIsNewProduct || thisVariantIsLastAndProductNotAdded) {
  309. processProducts(productsToIndex, variant.product, variantsInProduct);
  310. variantsInProduct = [];
  311. productIdsIndexed.add(variant.productId);
  312. }
  313. }
  314. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
  315. await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
  316. return variantsInProduct;
  317. }
  318. private async updateVariantsInternal(ctx: RequestContext, variantIds: ID[], channelId: ID) {
  319. let updatedVariants: ProductVariant[] = [];
  320. const productVariants = await this.connection.getRepository(ProductVariant).findByIds(variantIds, {
  321. relations: variantRelations,
  322. });
  323. updatedVariants = this.hydrateVariants(ctx, productVariants);
  324. if (updatedVariants.length) {
  325. // When ProductVariants change, we need to update the corresponding Product index
  326. // since e.g. price changes must be reflected on the Product level too.
  327. const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
  328. for (const variantProductId of productIdsOfVariants) {
  329. await this.updateProductInternal(ctx, variantProductId, channelId);
  330. }
  331. const operations = updatedVariants.reduce(
  332. (ops, variant) => {
  333. return [
  334. ...ops,
  335. { update: { _id: this.getId(variant.id, channelId) } },
  336. { doc: this.createVariantIndexItem(variant, channelId), doc_as_upsert: true },
  337. ];
  338. },
  339. [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
  340. );
  341. Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
  342. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
  343. }
  344. }
  345. private async updateProductInternal(ctx: RequestContext, productId: ID, channelId: ID) {
  346. let updatedProductVariants: ProductVariant[] = [];
  347. const product = await this.connection.getRepository(Product).findOne(productId, {
  348. relations: ['variants'],
  349. });
  350. if (product) {
  351. updatedProductVariants = await this.connection
  352. .getRepository(ProductVariant)
  353. .findByIds(product.variants.map(v => v.id), {
  354. relations: variantRelations,
  355. });
  356. if (product.enabled === false) {
  357. updatedProductVariants.forEach(v => (v.enabled = false));
  358. }
  359. }
  360. if (updatedProductVariants.length) {
  361. Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
  362. updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
  363. const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants, channelId);
  364. const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
  365. { update: { _id: this.getId(updatedProductIndexItem.productId, channelId) } },
  366. { doc: updatedProductIndexItem, doc_as_upsert: true },
  367. ];
  368. await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
  369. }
  370. }
  371. private async deleteProductInternal(productId: ID, channelId: ID) {
  372. Logger.verbose(`Deleting 1 Product (${productId})`, loggerCtx);
  373. const operations: BulkOperation[] = [{ delete: { _id: this.getId(productId, channelId) } }];
  374. await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
  375. }
  376. private async deleteVariantsInternal(variantIds: ID[], channelId: ID) {
  377. Logger.verbose(`Deleting ${variantIds.length} ProductVariants`, loggerCtx);
  378. const operations: BulkOperation[] = variantIds.map(id => ({
  379. delete: { _id: this.getId(id, channelId) },
  380. }));
  381. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
  382. }
  383. private async executeBulkOperations(
  384. indexName: string,
  385. indexType: string,
  386. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
  387. ) {
  388. try {
  389. const fullIndexName = this.options.indexPrefix + indexName;
  390. const { body }: { body: BulkResponseBody } = await this.client.bulk({
  391. refresh: 'true',
  392. index: fullIndexName,
  393. type: indexType,
  394. body: operations,
  395. });
  396. if (body.errors) {
  397. Logger.error(
  398. `Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`,
  399. loggerCtx,
  400. );
  401. body.items.forEach(item => {
  402. if (item.index) {
  403. Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
  404. }
  405. if (item.update) {
  406. Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
  407. }
  408. if (item.delete) {
  409. Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
  410. }
  411. });
  412. } else {
  413. Logger.verbose(
  414. `Executed ${body.items.length} bulk operations on index [${fullIndexName}]`,
  415. loggerCtx,
  416. );
  417. }
  418. return body;
  419. } catch (e) {
  420. Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
  421. Logger.error('Error details: ' + JSON.stringify(e.body && e.body.error, null, 2), loggerCtx);
  422. }
  423. }
  424. private getSearchIndexQueryBuilder(channelId: ID) {
  425. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  426. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  427. relations: variantRelations,
  428. order: {
  429. productId: 'ASC',
  430. },
  431. });
  432. FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
  433. qb.leftJoin('variants.product', '__product')
  434. .leftJoin('__product.channels', '__channel')
  435. .where('__channel.id = :channelId', { channelId });
  436. return qb;
  437. }
  438. private async getBatch(
  439. ctx: RequestContext,
  440. qb: SelectQueryBuilder<ProductVariant>,
  441. batchNumber: string | number,
  442. ): Promise<ProductVariant[]> {
  443. const { batchSize } = this.options;
  444. const i = Number.parseInt(batchNumber.toString(), 10);
  445. const variants = await qb
  446. .andWhere('variants__product.deletedAt IS NULL')
  447. .take(batchSize)
  448. .skip(i * batchSize)
  449. .getMany();
  450. return this.hydrateVariants(ctx, variants);
  451. }
  452. private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
  453. const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
  454. relations: variantRelations,
  455. });
  456. return this.hydrateVariants(ctx, variants);
  457. }
  458. /**
  459. * Given an array of ProductVariants, this method applies the correct taxes and translations.
  460. */
  461. private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
  462. return variants
  463. .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
  464. .map(v => translateDeep(v, ctx.languageCode, ['product']));
  465. }
  466. private createVariantIndexItem(v: ProductVariant, channelId: ID): VariantIndexItem {
  467. const item: VariantIndexItem = {
  468. channelId,
  469. productVariantId: v.id as string,
  470. sku: v.sku,
  471. slug: v.product.slug,
  472. productId: v.product.id as string,
  473. productName: v.product.name,
  474. productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
  475. productVariantName: v.name,
  476. productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
  477. price: v.price,
  478. priceWithTax: v.priceWithTax,
  479. currencyCode: v.currencyCode,
  480. description: v.product.description,
  481. facetIds: this.getFacetIds([v]),
  482. channelIds: v.product.channels.map(c => c.id as string),
  483. facetValueIds: this.getFacetValueIds([v]),
  484. collectionIds: v.collections.map(c => c.id.toString()),
  485. enabled: v.enabled && v.product.enabled,
  486. };
  487. const customMappings = Object.entries(this.options.customProductVariantMappings);
  488. for (const [name, def] of customMappings) {
  489. item[name] = def.valueFn(v);
  490. }
  491. return item;
  492. }
  493. private createProductIndexItem(variants: ProductVariant[], channelId: ID): ProductIndexItem {
  494. const first = variants[0];
  495. const prices = variants.map(v => v.price);
  496. const pricesWithTax = variants.map(v => v.priceWithTax);
  497. const item: ProductIndexItem = {
  498. channelId,
  499. sku: variants.map(v => v.sku),
  500. slug: variants.map(v => v.product.slug),
  501. productId: first.product.id,
  502. productName: variants.map(v => v.product.name),
  503. productPreview: first.product.featuredAsset ? first.product.featuredAsset.preview : '',
  504. productVariantId: variants.map(v => v.id),
  505. productVariantName: variants.map(v => v.name),
  506. productVariantPreview: variants.filter(v => v.featuredAsset).map(v => v.featuredAsset.preview),
  507. priceMin: Math.min(...prices),
  508. priceMax: Math.max(...prices),
  509. priceWithTaxMin: Math.min(...pricesWithTax),
  510. priceWithTaxMax: Math.max(...pricesWithTax),
  511. currencyCode: first.currencyCode,
  512. description: first.product.description,
  513. facetIds: this.getFacetIds(variants),
  514. facetValueIds: this.getFacetValueIds(variants),
  515. collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
  516. channelIds: first.product.channels.map(c => c.id as string),
  517. enabled: variants.some(v => v.enabled),
  518. };
  519. const customMappings = Object.entries(this.options.customProductMappings);
  520. for (const [name, def] of customMappings) {
  521. item[name] = def.valueFn(variants[0].product, variants);
  522. }
  523. return item;
  524. }
  525. private getFacetIds(variants: ProductVariant[]): string[] {
  526. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  527. const variantFacetIds = variants.reduce(
  528. (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
  529. [] as string[],
  530. );
  531. const productFacetIds = variants[0].product.facetValues.map(facetIds);
  532. return unique([...variantFacetIds, ...productFacetIds]);
  533. }
  534. private getFacetValueIds(variants: ProductVariant[]): string[] {
  535. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  536. const variantFacetValueIds = variants.reduce(
  537. (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
  538. [] as string[],
  539. );
  540. const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
  541. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  542. }
  543. private getId(entityId: ID, channelId: ID): string {
  544. return `${channelId.toString()}__${entityId.toString()}`;
  545. }
  546. }