indexer.controller.ts 19 KB


  1. import { Client } from '@elastic/elasticsearch';
  2. import { Controller, Inject } from '@nestjs/common';
  3. import { MessagePattern } from '@nestjs/microservices';
  4. import { InjectConnection } from '@nestjs/typeorm';
  5. import { unique } from '@vendure/common/lib/unique';
  6. import {
  7. FacetValue,
  8. ID,
  9. JobService,
  10. Logger,
  11. Product,
  12. ProductVariant,
  13. ProductVariantService,
  14. RequestContext,
  15. translateDeep,
  16. } from '@vendure/core';
  17. import { defer, Observable } from 'rxjs';
  18. import { Connection, SelectQueryBuilder } from 'typeorm';
  19. import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
  20. import {
  21. ELASTIC_SEARCH_CLIENT,
  22. ELASTIC_SEARCH_OPTIONS,
  23. loggerCtx,
  24. PRODUCT_INDEX_NAME,
  25. PRODUCT_INDEX_TYPE,
  26. VARIANT_INDEX_NAME,
  27. VARIANT_INDEX_TYPE,
  28. } from './constants';
  29. import { ElasticsearchOptions } from './options';
  30. import {
  31. BulkOperation,
  32. BulkOperationDoc,
  33. BulkResponseBody,
  34. ProductIndexItem,
  35. ReindexMessage,
  36. UpdateProductOrVariantMessage,
  37. UpdateVariantsByIdMessage,
  38. VariantIndexItem,
  39. } from './types';
  40. export const variantRelations = [
  41. 'product',
  42. 'product.featuredAsset',
  43. 'product.facetValues',
  44. 'product.facetValues.facet',
  45. 'featuredAsset',
  46. 'facetValues',
  47. 'facetValues.facet',
  48. 'collections',
  49. 'taxCategory',
  50. ];
  51. export interface ReindexMessageResponse {
  52. total: number;
  53. completed: number;
  54. duration: number;
  55. }
  56. @Controller()
  57. export class ElasticsearchIndexerController {
  58. constructor(
  59. @InjectConnection() private connection: Connection,
  60. @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
  61. @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
  62. private productVariantService: ProductVariantService,
  63. private jobService: JobService,
  64. ) {}
  65. /**
  66. * Updates the search index only for the affected entities.
  67. */
  68. @MessagePattern(UpdateProductOrVariantMessage.pattern)
  69. updateProductOrVariant({
  70. ctx: rawContext,
  71. productId,
  72. variantId,
  73. }: UpdateProductOrVariantMessage['data']): Observable<boolean> {
  74. const ctx = RequestContext.fromObject(rawContext);
  75. return defer(async () => {
  76. if (productId) {
  77. await this.updateProduct(ctx, productId);
  78. } else if (variantId) {
  79. await this.updateProductVariant(ctx, variantId);
  80. }
  81. return true;
  82. });
  83. }
  84. @MessagePattern(UpdateVariantsByIdMessage.pattern)
  85. updateVariantsById({
  86. ctx: rawContext,
  87. ids,
  88. }: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
  89. const ctx = RequestContext.fromObject(rawContext);
  90. const { batchSize } = this.options;
  91. return new Observable(observer => {
  92. (async () => {
  93. const timeStart = Date.now();
  94. if (ids.length) {
  95. const batches = Math.ceil(ids.length / batchSize);
  96. Logger.verbose(`Updating ${ids.length} variants...`);
  97. let variantsInProduct: ProductVariant[] = [];
  98. for (let i = 0; i < batches; i++) {
  99. const begin = i * batchSize;
  100. const end = begin + batchSize;
  101. Logger.verbose(`Updating ids from index ${begin} to ${end}`);
  102. const batchIds = ids.slice(begin, end);
  103. const variants = await this.getVariantsByIds(ctx, batchIds);
  104. const variantsToIndex: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
  105. const productsToIndex: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
  106. // tslint:disable-next-line:prefer-for-of
  107. for (let j = 0; j < variants.length; j++) {
  108. const variant = variants[j];
  109. variantsInProduct.push(variant);
  110. variantsToIndex.push({ update: { _id: variant.id.toString() } });
  111. variantsToIndex.push({ doc: this.createVariantIndexItem(variant) });
  112. const nextVariant = variants[j + 1];
  113. if (nextVariant && nextVariant.productId !== variant.productId) {
  114. productsToIndex.push({ update: { _id: variant.productId.toString() } });
  115. productsToIndex.push({ doc: this.createProductIndexItem(variantsInProduct) });
  116. variantsInProduct = [];
  117. }
  118. }
  119. await this.executeBulkOperations(
  120. VARIANT_INDEX_NAME,
  121. VARIANT_INDEX_TYPE,
  122. variantsToIndex,
  123. );
  124. await this.executeBulkOperations(
  125. PRODUCT_INDEX_NAME,
  126. PRODUCT_INDEX_TYPE,
  127. productsToIndex,
  128. );
  129. observer.next({
  130. total: ids.length,
  131. completed: Math.min((i + 1) * batchSize, ids.length),
  132. duration: +new Date() - timeStart,
  133. });
  134. }
  135. }
  136. Logger.verbose(`Completed reindexing!`);
  137. observer.next({
  138. total: ids.length,
  139. completed: ids.length,
  140. duration: +new Date() - timeStart,
  141. });
  142. observer.complete();
  143. })();
  144. });
  145. }
  146. @MessagePattern(ReindexMessage.pattern)
  147. reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
  148. const ctx = RequestContext.fromObject(rawContext);
  149. const { batchSize } = this.options;
  150. return new Observable(observer => {
  151. (async () => {
  152. const timeStart = Date.now();
  153. const qb = this.getSearchIndexQueryBuilder();
  154. const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
  155. Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);
  156. const batches = Math.ceil(count / batchSize);
  157. let variantsInProduct: ProductVariant[] = [];
  158. for (let i = 0; i < batches; i++) {
  159. Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
  160. const variants = await this.getBatch(ctx, qb, i);
  161. Logger.verbose(`ProductVariants count: ${variants.length}`);
  162. const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
  163. const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
  164. // tslint:disable-next-line:prefer-for-of
  165. for (let j = 0; j < variants.length; j++) {
  166. const variant = variants[j];
  167. variantsInProduct.push(variant);
  168. variantsToIndex.push({ index: { _id: variant.id.toString() } });
  169. variantsToIndex.push(this.createVariantIndexItem(variant));
  170. const nextVariant = variants[j + 1];
  171. if (nextVariant && nextVariant.productId !== variant.productId) {
  172. productsToIndex.push({ index: { _id: variant.productId.toString() } });
  173. productsToIndex.push(this.createProductIndexItem(variantsInProduct) as any);
  174. variantsInProduct = [];
  175. }
  176. }
  177. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
  178. await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
  179. observer.next({
  180. total: count,
  181. completed: Math.min((i + 1) * batchSize, count),
  182. duration: +new Date() - timeStart,
  183. });
  184. }
  185. Logger.verbose(`Completed reindexing!`);
  186. observer.next({
  187. total: count,
  188. completed: count,
  189. duration: +new Date() - timeStart,
  190. });
  191. observer.complete();
  192. })();
  193. });
  194. }
  195. private async updateProductVariant(ctx: RequestContext, variantId: ID) {
  196. let updatedVariants: ProductVariant[] = [];
  197. let removedVariantId: ID | undefined;
  198. const productVariant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
  199. relations: variantRelations,
  200. });
  201. if (productVariant) {
  202. if (productVariant.deletedAt) {
  203. removedVariantId = variantId;
  204. } else {
  205. updatedVariants = this.hydrateVariants(ctx, [productVariant]);
  206. }
  207. }
  208. if (updatedVariants.length) {
  209. // When ProductVariants change, we need to update the corresponding Product index
  210. // since e.g. price changes must be reflected on the Product level too.
  211. const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
  212. for (const variantProductId of productIdsOfVariants) {
  213. await this.updateProduct(ctx, variantProductId);
  214. }
  215. const operations = updatedVariants.reduce(
  216. (ops, variant) => {
  217. return [
  218. ...ops,
  219. { update: { _id: variant.id.toString() } },
  220. { doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
  221. ];
  222. },
  223. [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
  224. );
  225. Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
  226. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
  227. }
  228. if (removedVariantId) {
  229. Logger.verbose(`Deleting 1 ProductVariant (${removedVariantId})`, loggerCtx);
  230. const operations: BulkOperation[] = [{ delete: { _id: removedVariantId.toString() } }];
  231. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
  232. }
  233. }
  234. private async updateProduct(ctx: RequestContext, productId: ID) {
  235. let updatedProductVariants: ProductVariant[] = [];
  236. let removedProductId: ID | undefined;
  237. let removedVariantIds: ID[] = [];
  238. const product = await this.connection.getRepository(Product).findOne(productId, {
  239. relations: ['variants'],
  240. });
  241. if (product) {
  242. if (product.deletedAt) {
  243. removedProductId = productId;
  244. removedVariantIds = product.variants.map(v => v.id);
  245. } else {
  246. updatedProductVariants = await this.connection
  247. .getRepository(ProductVariant)
  248. .findByIds(product.variants.map(v => v.id), {
  249. relations: variantRelations,
  250. });
  251. }
  252. }
  253. if (updatedProductVariants.length) {
  254. Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
  255. updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
  256. const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
  257. const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
  258. { update: { _id: updatedProductIndexItem.productId.toString() } },
  259. { doc: updatedProductIndexItem, doc_as_upsert: true },
  260. ];
  261. await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
  262. }
  263. if (removedVariantIds.length) {
  264. const operations = removedVariantIds.reduce(
  265. (ops, id) => {
  266. Logger.verbose(`Deleting 1 ProductVariant (${id})`, loggerCtx);
  267. return [...ops, { delete: { _id: id.toString() } }];
  268. },
  269. [] as BulkOperation[],
  270. );
  271. await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
  272. }
  273. if (removedProductId) {
  274. Logger.verbose(`Deleting 1 Product (${removedProductId})`, loggerCtx);
  275. const operations: BulkOperation[] = [{ delete: { _id: removedProductId.toString() } }];
  276. await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
  277. }
  278. }
  279. private async executeBulkOperations(
  280. indexName: string,
  281. indexType: string,
  282. operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
  283. ) {
  284. try {
  285. const fullIndexName = this.options.indexPrefix + indexName;
  286. const { body }: { body: BulkResponseBody } = await this.client.bulk({
  287. refresh: 'true',
  288. index: fullIndexName,
  289. type: indexType,
  290. body: operations,
  291. });
  292. if (body.errors) {
  293. Logger.error(
  294. `Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`,
  295. loggerCtx,
  296. );
  297. body.items.forEach(item => {
  298. if (item.index) {
  299. Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
  300. }
  301. if (item.update) {
  302. Logger.debug(JSON.stringify(item.update.error, null, 2), loggerCtx);
  303. }
  304. if (item.delete) {
  305. Logger.debug(JSON.stringify(item.delete.error, null, 2), loggerCtx);
  306. }
  307. });
  308. } else {
  309. Logger.verbose(`Executed ${body.items.length} bulk operations on index [${fullIndexName}]`);
  310. }
  311. return body;
  312. } catch (e) {
  313. Logger.error(`Error when attempting to run bulk operations [${e.toString()}]`, loggerCtx);
  314. Logger.error('Error details: ' + JSON.stringify(e.body.error, null, 2), loggerCtx);
  315. }
  316. }
  317. private getSearchIndexQueryBuilder() {
  318. const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
  319. FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
  320. relations: variantRelations,
  321. order: {
  322. productId: 'ASC',
  323. },
  324. });
  325. FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
  326. return qb;
  327. }
  328. private async getBatch(
  329. ctx: RequestContext,
  330. qb: SelectQueryBuilder<ProductVariant>,
  331. batchNumber: string | number,
  332. ): Promise<ProductVariant[]> {
  333. const { batchSize } = this.options;
  334. const i = Number.parseInt(batchNumber.toString(), 10);
  335. const variants = await qb
  336. .where('variants__product.deletedAt IS NULL')
  337. .take(batchSize)
  338. .skip(i * batchSize)
  339. .getMany();
  340. return this.hydrateVariants(ctx, variants);
  341. }
  342. private async getVariantsByIds(ctx: RequestContext, ids: ID[]) {
  343. const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, {
  344. relations: variantRelations,
  345. });
  346. return this.hydrateVariants(ctx, variants);
  347. }
  348. /**
  349. * Given an array of ProductVariants, this method applies the correct taxes and translations.
  350. */
  351. private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] {
  352. return variants
  353. .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
  354. .map(v => translateDeep(v, ctx.languageCode, ['product']));
  355. }
  356. private createVariantIndexItem(v: ProductVariant): VariantIndexItem {
  357. return {
  358. sku: v.sku,
  359. slug: v.product.slug,
  360. productId: v.product.id as string,
  361. productName: v.product.name,
  362. productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
  363. productVariantId: v.id as string,
  364. productVariantName: v.name,
  365. productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
  366. price: v.price,
  367. priceWithTax: v.priceWithTax,
  368. currencyCode: v.currencyCode,
  369. description: v.product.description,
  370. facetIds: this.getFacetIds([v]),
  371. facetValueIds: this.getFacetValueIds([v]),
  372. collectionIds: v.collections.map(c => c.id.toString()),
  373. enabled: v.enabled && v.product.enabled,
  374. };
  375. }
  376. private createProductIndexItem(variants: ProductVariant[]): ProductIndexItem {
  377. const first = variants[0];
  378. const prices = variants.map(v => v.price);
  379. const pricesWithTax = variants.map(v => v.priceWithTax);
  380. return {
  381. sku: variants.map(v => v.sku),
  382. slug: variants.map(v => v.product.slug),
  383. productId: first.product.id,
  384. productName: variants.map(v => v.product.name),
  385. productPreview: first.product.featuredAsset ? first.product.featuredAsset.preview : '',
  386. productVariantId: variants.map(v => v.id),
  387. productVariantName: variants.map(v => v.name),
  388. productVariantPreview: variants.filter(v => v.featuredAsset).map(v => v.featuredAsset.preview),
  389. priceMin: Math.min(...prices),
  390. priceMax: Math.max(...prices),
  391. priceWithTaxMin: Math.min(...pricesWithTax),
  392. priceWithTaxMax: Math.max(...pricesWithTax),
  393. currencyCode: first.currencyCode,
  394. description: first.product.description,
  395. facetIds: this.getFacetIds(variants),
  396. facetValueIds: this.getFacetValueIds(variants),
  397. collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
  398. enabled: first.product.enabled,
  399. };
  400. }
  401. private getFacetIds(variants: ProductVariant[]): string[] {
  402. const facetIds = (fv: FacetValue) => fv.facet.id.toString();
  403. const variantFacetIds = variants.reduce(
  404. (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
  405. [] as string[],
  406. );
  407. const productFacetIds = variants[0].product.facetValues.map(facetIds);
  408. return unique([...variantFacetIds, ...productFacetIds]);
  409. }
  410. private getFacetValueIds(variants: ProductVariant[]): string[] {
  411. const facetValueIds = (fv: FacetValue) => fv.id.toString();
  412. const variantFacetValueIds = variants.reduce(
  413. (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
  414. [] as string[],
  415. );
  416. const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
  417. return unique([...variantFacetValueIds, ...productFacetValueIds]);
  418. }
  419. }