|
|
@@ -3,7 +3,17 @@ import { Controller, Inject } from '@nestjs/common';
|
|
|
import { MessagePattern } from '@nestjs/microservices';
|
|
|
import { InjectConnection } from '@nestjs/typeorm';
|
|
|
import { unique } from '@vendure/common/lib/unique';
|
|
|
-import { FacetValue, ID, JobService, Logger, Product, ProductVariant, ProductVariantService, RequestContext, translateDeep } from '@vendure/core';
|
|
|
+import {
|
|
|
+ FacetValue,
|
|
|
+ ID,
|
|
|
+ JobService,
|
|
|
+ Logger,
|
|
|
+ Product,
|
|
|
+ ProductVariant,
|
|
|
+ ProductVariantService,
|
|
|
+ RequestContext,
|
|
|
+ translateDeep,
|
|
|
+} from '@vendure/core';
|
|
|
import { defer, Observable } from 'rxjs';
|
|
|
import { Connection, SelectQueryBuilder } from 'typeorm';
|
|
|
import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';
|
|
|
@@ -18,8 +28,14 @@ import {
|
|
|
VARIANT_INDEX_NAME,
|
|
|
VARIANT_INDEX_TYPE,
|
|
|
} from './constants';
|
|
|
-import { ElasticsearchOptions } from './plugin';
|
|
|
-import { BulkOperation, BulkOperationDoc, BulkResponseBody, ProductIndexItem, VariantIndexItem } from './types';
|
|
|
+import { ElasticsearchOptions } from './options';
|
|
|
+import {
|
|
|
+ BulkOperation,
|
|
|
+ BulkOperationDoc,
|
|
|
+ BulkResponseBody,
|
|
|
+ ProductIndexItem,
|
|
|
+ VariantIndexItem,
|
|
|
+} from './types';
|
|
|
|
|
|
export const variantRelations = [
|
|
|
'product',
|
|
|
@@ -41,18 +57,27 @@ export interface ReindexMessageResponse {
|
|
|
|
|
|
@Controller()
|
|
|
export class ElasticsearchIndexerController {
|
|
|
-
|
|
|
- constructor(@InjectConnection() private connection: Connection,
|
|
|
- @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
|
|
|
- @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
|
|
|
- private productVariantService: ProductVariantService,
|
|
|
- private jobService: JobService) {}
|
|
|
+ constructor(
|
|
|
+ @InjectConnection() private connection: Connection,
|
|
|
+ @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
|
|
|
+ @Inject(ELASTIC_SEARCH_CLIENT) private client: Client,
|
|
|
+ private productVariantService: ProductVariantService,
|
|
|
+ private jobService: JobService,
|
|
|
+ ) {}
|
|
|
|
|
|
/**
|
|
|
* Updates the search index only for the affected entities.
|
|
|
*/
|
|
|
@MessagePattern(Message.UpdateProductOrVariant)
|
|
|
- updateProductOrVariant({ ctx: rawContext, productId, variantId }: { ctx: any, productId?: ID, variantId?: ID }): Observable<boolean> {
|
|
|
+ updateProductOrVariant({
|
|
|
+ ctx: rawContext,
|
|
|
+ productId,
|
|
|
+ variantId,
|
|
|
+ }: {
|
|
|
+ ctx: any;
|
|
|
+ productId?: ID;
|
|
|
+ variantId?: ID;
|
|
|
+ }): Observable<boolean> {
|
|
|
const ctx = RequestContext.fromObject(rawContext);
|
|
|
let updatedProductVariants: ProductVariant[] = [];
|
|
|
let removedProducts: Product[] = [];
|
|
|
@@ -97,22 +122,25 @@ export class ElasticsearchIndexerController {
|
|
|
await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
|
|
|
}
|
|
|
if (updatedVariants.length) {
|
|
|
- const operations = updatedVariants.reduce((ops, variant) => {
|
|
|
- return [
|
|
|
- ...ops,
|
|
|
- { update: { _id: variant.id.toString() } },
|
|
|
- { doc: this.createVariantIndexItem(variant) },
|
|
|
- ];
|
|
|
- }, [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>);
|
|
|
+ const operations = updatedVariants.reduce(
|
|
|
+ (ops, variant) => {
|
|
|
+ return [
|
|
|
+ ...ops,
|
|
|
+ { update: { _id: variant.id.toString() } },
|
|
|
+ { doc: this.createVariantIndexItem(variant) },
|
|
|
+ ];
|
|
|
+ },
|
|
|
+ [] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
|
|
|
+ );
|
|
|
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
}
|
|
|
if (removedVariantIds.length) {
|
|
|
- const operations = removedVariantIds.reduce((ops, id) => {
|
|
|
- return [
|
|
|
- ...ops,
|
|
|
- { delete: { _id: id.toString() } },
|
|
|
- ];
|
|
|
- }, [] as BulkOperation[]);
|
|
|
+ const operations = removedVariantIds.reduce(
|
|
|
+ (ops, id) => {
|
|
|
+ return [...ops, { delete: { _id: id.toString() } }];
|
|
|
+ },
|
|
|
+ [] as BulkOperation[],
|
|
|
+ );
|
|
|
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
|
|
|
}
|
|
|
|
|
|
@@ -121,7 +149,13 @@ export class ElasticsearchIndexerController {
|
|
|
}
|
|
|
|
|
|
@MessagePattern(Message.UpdateVariantsById)
|
|
|
- updateVariantsById({ ctx: rawContext, ids }: { ctx: any, ids: ID[] }): Observable<ReindexMessageResponse> {
|
|
|
+ updateVariantsById({
|
|
|
+ ctx: rawContext,
|
|
|
+ ids,
|
|
|
+ }: {
|
|
|
+ ctx: any;
|
|
|
+ ids: ID[];
|
|
|
+ }): Observable<ReindexMessageResponse> {
|
|
|
const ctx = RequestContext.fromObject(rawContext);
|
|
|
const { batchSize } = this.options;
|
|
|
|
|
|
@@ -160,8 +194,16 @@ export class ElasticsearchIndexerController {
|
|
|
variantsInProduct = [];
|
|
|
}
|
|
|
}
|
|
|
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
|
|
|
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
|
|
|
+ await this.executeBulkOperations(
|
|
|
+ VARIANT_INDEX_NAME,
|
|
|
+ VARIANT_INDEX_TYPE,
|
|
|
+ variantsToIndex,
|
|
|
+ );
|
|
|
+ await this.executeBulkOperations(
|
|
|
+ PRODUCT_INDEX_NAME,
|
|
|
+ PRODUCT_INDEX_TYPE,
|
|
|
+ productsToIndex,
|
|
|
+ );
|
|
|
observer.next({
|
|
|
total: ids.length,
|
|
|
completed: Math.min((i + 1) * batchSize, ids.length),
|
|
|
@@ -186,63 +228,64 @@ export class ElasticsearchIndexerController {
|
|
|
const { batchSize } = this.options;
|
|
|
|
|
|
return new Observable(observer => {
|
|
|
- (async () => {
|
|
|
- const timeStart = Date.now();
|
|
|
- const qb = this.getSearchIndexQueryBuilder();
|
|
|
- const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
|
|
|
- Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
|
|
|
-
|
|
|
- const batches = Math.ceil(count / batchSize);
|
|
|
- let variantsInProduct: ProductVariant[] = [];
|
|
|
-
|
|
|
- for (let i = 0; i < batches; i++) {
|
|
|
- Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
|
|
|
-
|
|
|
- const variants = await this.getBatch(ctx, qb, i);
|
|
|
- Logger.verbose(`variants count: ${variants.length}`);
|
|
|
-
|
|
|
- const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
|
|
|
- const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
|
|
|
-
|
|
|
- // tslint:disable-next-line:prefer-for-of
|
|
|
- for (let j = 0; j < variants.length; j++) {
|
|
|
- const variant = variants[j];
|
|
|
- variantsInProduct.push(variant);
|
|
|
- variantsToIndex.push({index: {_id: variant.id.toString()}});
|
|
|
- variantsToIndex.push(this.createVariantIndexItem(variant));
|
|
|
-
|
|
|
- const nextVariant = variants[j + 1];
|
|
|
- if (nextVariant && nextVariant.productId !== variant.productId) {
|
|
|
- productsToIndex.push({index: {_id: variant.productId.toString()}});
|
|
|
- productsToIndex.push(this.createProductIndexItem(variantsInProduct) as any);
|
|
|
- variantsInProduct = [];
|
|
|
- }
|
|
|
+ (async () => {
|
|
|
+ const timeStart = Date.now();
|
|
|
+ const qb = this.getSearchIndexQueryBuilder();
|
|
|
+ const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
|
|
|
+ Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
|
|
|
+
|
|
|
+ const batches = Math.ceil(count / batchSize);
|
|
|
+ let variantsInProduct: ProductVariant[] = [];
|
|
|
+
|
|
|
+ for (let i = 0; i < batches; i++) {
|
|
|
+ Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);
|
|
|
+
|
|
|
+ const variants = await this.getBatch(ctx, qb, i);
|
|
|
+ Logger.verbose(`variants count: ${variants.length}`);
|
|
|
+
|
|
|
+ const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
|
|
|
+ const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
|
|
|
+
|
|
|
+ // tslint:disable-next-line:prefer-for-of
|
|
|
+ for (let j = 0; j < variants.length; j++) {
|
|
|
+ const variant = variants[j];
|
|
|
+ variantsInProduct.push(variant);
|
|
|
+ variantsToIndex.push({ index: { _id: variant.id.toString() } });
|
|
|
+ variantsToIndex.push(this.createVariantIndexItem(variant));
|
|
|
+
|
|
|
+ const nextVariant = variants[j + 1];
|
|
|
+ if (nextVariant && nextVariant.productId !== variant.productId) {
|
|
|
+ productsToIndex.push({ index: { _id: variant.productId.toString() } });
|
|
|
+ productsToIndex.push(this.createProductIndexItem(variantsInProduct) as any);
|
|
|
+ variantsInProduct = [];
|
|
|
}
|
|
|
- await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
|
|
|
- await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
|
|
|
- observer.next({
|
|
|
- total: count,
|
|
|
- completed: Math.min((i + 1) * batchSize, count),
|
|
|
- duration: +new Date() - timeStart,
|
|
|
- });
|
|
|
}
|
|
|
- Logger.verbose(`Completed reindexing!`);
|
|
|
+ await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, variantsToIndex);
|
|
|
+ await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, productsToIndex);
|
|
|
observer.next({
|
|
|
total: count,
|
|
|
- completed: count,
|
|
|
+ completed: Math.min((i + 1) * batchSize, count),
|
|
|
duration: +new Date() - timeStart,
|
|
|
});
|
|
|
- observer.complete();
|
|
|
- })();
|
|
|
- },
|
|
|
- );
|
|
|
+ }
|
|
|
+ Logger.verbose(`Completed reindexing!`);
|
|
|
+ observer.next({
|
|
|
+ total: count,
|
|
|
+ completed: count,
|
|
|
+ duration: +new Date() - timeStart,
|
|
|
+ });
|
|
|
+ observer.complete();
|
|
|
+ })();
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- private async executeBulkOperations(indexName: string,
|
|
|
- indexType: string,
|
|
|
- operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>) {
|
|
|
+ private async executeBulkOperations(
|
|
|
+ indexName: string,
|
|
|
+ indexType: string,
|
|
|
+ operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
|
|
|
+ ) {
|
|
|
try {
|
|
|
- const {body}: { body: BulkResponseBody; } = await this.client.bulk({
|
|
|
+ const { body }: { body: BulkResponseBody } = await this.client.bulk({
|
|
|
refresh: 'true',
|
|
|
index: this.options.indexPrefix + indexName,
|
|
|
type: indexType,
|
|
|
@@ -250,7 +293,10 @@ export class ElasticsearchIndexerController {
|
|
|
});
|
|
|
|
|
|
if (body.errors) {
|
|
|
- Logger.error(`Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`, loggerCtx);
|
|
|
+ Logger.error(
|
|
|
+ `Some errors occurred running bulk operations on ${indexType}! Set logger to "debug" to print all errors.`,
|
|
|
+ loggerCtx,
|
|
|
+ );
|
|
|
body.items.forEach(item => {
|
|
|
if (item.index) {
|
|
|
Logger.debug(JSON.stringify(item.index.error, null, 2), loggerCtx);
|
|
|
@@ -284,7 +330,11 @@ export class ElasticsearchIndexerController {
|
|
|
return qb;
|
|
|
}
|
|
|
|
|
|
- private async getBatch(ctx: RequestContext, qb: SelectQueryBuilder<ProductVariant>, batchNumber: string | number): Promise<ProductVariant[]> {
|
|
|
+ private async getBatch(
|
|
|
+ ctx: RequestContext,
|
|
|
+ qb: SelectQueryBuilder<ProductVariant>,
|
|
|
+ batchNumber: string | number,
|
|
|
+ ): Promise<ProductVariant[]> {
|
|
|
const { batchSize } = this.options;
|
|
|
const i = Number.parseInt(batchNumber.toString(), 10);
|
|
|
const variants = await qb
|
|
|
@@ -353,21 +403,27 @@ export class ElasticsearchIndexerController {
|
|
|
description: first.product.description,
|
|
|
facetIds: this.getFacetIds(variants),
|
|
|
facetValueIds: this.getFacetValueIds(variants),
|
|
|
- collectionIds: variants.reduce((ids, v) => [ ...ids, ...v.collections.map(c => c.id)], [] as ID[]),
|
|
|
+ collectionIds: variants.reduce((ids, v) => [...ids, ...v.collections.map(c => c.id)], [] as ID[]),
|
|
|
enabled: first.product.enabled,
|
|
|
};
|
|
|
}
|
|
|
|
|
|
private getFacetIds(variants: ProductVariant[]): string[] {
|
|
|
const facetIds = (fv: FacetValue) => fv.facet.id.toString();
|
|
|
- const variantFacetIds = variants.reduce((ids, v) => [ ...ids, ...v.facetValues.map(facetIds)], [] as string[]);
|
|
|
+ const variantFacetIds = variants.reduce(
|
|
|
+ (ids, v) => [...ids, ...v.facetValues.map(facetIds)],
|
|
|
+ [] as string[],
|
|
|
+ );
|
|
|
const productFacetIds = variants[0].product.facetValues.map(facetIds);
|
|
|
return unique([...variantFacetIds, ...productFacetIds]);
|
|
|
}
|
|
|
|
|
|
private getFacetValueIds(variants: ProductVariant[]): string[] {
|
|
|
const facetValueIds = (fv: FacetValue) => fv.id.toString();
|
|
|
- const variantFacetValueIds = variants.reduce((ids, v) => [ ...ids, ...v.facetValues.map(facetValueIds)], [] as string[]);
|
|
|
+ const variantFacetValueIds = variants.reduce(
|
|
|
+ (ids, v) => [...ids, ...v.facetValues.map(facetValueIds)],
|
|
|
+ [] as string[],
|
|
|
+ );
|
|
|
const productFacetValueIds = variants[0].product.facetValues.map(facetValueIds);
|
|
|
return unique([...variantFacetValueIds, ...productFacetValueIds]);
|
|
|
}
|