Browse Source

feat(core): Add channel handling to DefaultSearchPlugin

BREAKING CHANGE: The `SearchIndexItem` entity used by the `DefaultSearchPlugin` has a couple of new fields related to Channel handling. Once the schema is updated (either by synchronizing or running a migration), the search index should be rebuilt.
Michael Bromley 6 years ago
parent
commit
280a38bf91

+ 14 - 6
packages/core/src/plugin/default-search-plugin/default-search-plugin.ts

@@ -3,11 +3,10 @@ import { ID } from '@vendure/common/lib/shared-types';
 import { buffer, debounceTime, filter, map } from 'rxjs/operators';
 
 import { idsAreEqual } from '../../common/utils';
-import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
-import { Product } from '../../entity/product/product.entity';
 import { EventBus } from '../../event-bus/event-bus';
-import { CatalogModificationEvent } from '../../event-bus/events/catalog-modification-event';
 import { CollectionModificationEvent } from '../../event-bus/events/collection-modification-event';
+import { ProductEvent } from '../../event-bus/events/product-event';
+import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
 import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
 import { PluginCommonModule } from '../plugin-common.module';
 import { OnVendureBootstrap, VendurePlugin } from '../vendure-plugin';
@@ -66,9 +65,18 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {
 
     /** @internal */
     async onVendureBootstrap() {
-        this.eventBus.ofType(CatalogModificationEvent).subscribe(event => {
-            if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
-                return this.searchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
+        this.eventBus.ofType(ProductEvent).subscribe(event => {
+            if (event.type === 'deleted') {
+                return this.searchIndexService.deleteProduct(event.ctx, event.product).start();
+            } else {
+                return this.searchIndexService.updateProduct(event.ctx, event.product).start();
+            }
+        });
+        this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
+            if (event.type === 'deleted') {
+                return this.searchIndexService.deleteVariant(event.ctx, event.variants).start();
+            } else {
+                return this.searchIndexService.updateVariants(event.ctx, event.variants).start();
             }
         });
 

+ 88 - 47
packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts

@@ -18,7 +18,14 @@ import { ProductVariantService } from '../../../service/services/product-variant
 import { TaxRateService } from '../../../service/services/tax-rate.service';
 import { AsyncQueue } from '../async-queue';
 import { SearchIndexItem } from '../search-index-item.entity';
-import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from '../types';
+import {
+    DeleteProductMessage,
+    DeleteVariantMessage,
+    ReindexMessage,
+    UpdateProductMessage,
+    UpdateVariantMessage,
+    UpdateVariantsByIdMessage,
+} from '../types';
 
 export const BATCH_SIZE = 1000;
 export const variantRelations = [
@@ -26,6 +33,7 @@ export const variantRelations = [
     'product.featuredAsset',
     'product.facetValues',
     'product.facetValues.facet',
+    'product.channels',
     'featuredAsset',
     'facetValues',
     'facetValues.facet',
@@ -51,9 +59,12 @@ export class IndexerController {
         return new Observable(observer => {
             (async () => {
                 const timeStart = Date.now();
-                const qb = this.getSearchIndexQueryBuilder();
-                const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
-                Logger.verbose(`Reindexing ${count} variants`, workerLoggerCtx);
+                const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
+                const count = await qb.getCount();
+                Logger.verbose(
+                    `Reindexing ${count} variants for channel ${ctx.channel.code}`,
+                    workerLoggerCtx,
+                );
                 const batches = Math.ceil(count / BATCH_SIZE);
 
                 // Ensure tax rates are up-to-date.
@@ -61,14 +72,14 @@ export class IndexerController {
 
                 await this.connection
                     .getRepository(SearchIndexItem)
-                    .delete({ languageCode: ctx.languageCode });
+                    .delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
                 Logger.verbose('Deleted existing index items', workerLoggerCtx);
 
                 for (let i = 0; i < batches; i++) {
                     Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
 
                     const variants = await qb
-                        .where('variants__product.deletedAt IS NULL')
+                        .andWhere('variants__product.deletedAt IS NULL')
                         .take(BATCH_SIZE)
                         .skip(i * BATCH_SIZE)
                         .getMany();
@@ -135,60 +146,87 @@ export class IndexerController {
         });
     }
 
-    /**
-     * Updates the search index only for the affected entities.
-     */
-    @MessagePattern(UpdateProductOrVariantMessage.pattern)
-    updateProductOrVariant(data: UpdateProductOrVariantMessage['data']): Observable<boolean> {
+    @MessagePattern(UpdateProductMessage.pattern)
+    updateProduct(data: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
         const ctx = RequestContext.fromObject(data.ctx);
-        const { productId, variantId } = data;
-        let updatedVariants: ProductVariant[] = [];
-        let removedVariantIds: ID[] = [];
         return defer(async () => {
-            if (data.productId) {
-                const product = await this.connection.getRepository(Product).findOne(productId, {
-                    relations: ['variants'],
-                });
-                if (product) {
-                    if (product.deletedAt) {
-                        removedVariantIds = product.variants.map(v => v.id);
-                    } else {
-                        updatedVariants = await this.connection
-                            .getRepository(ProductVariant)
-                            .findByIds(product.variants.map(v => v.id), {
-                                relations: variantRelations,
-                            });
-                        if (product.enabled === false) {
-                            updatedVariants.forEach(v => (v.enabled = false));
-                        }
-                    }
+            const product = await this.connection.getRepository(Product).findOne(data.productId, {
+                relations: ['variants'],
+            });
+            if (product) {
+                let updatedVariants = await this.connection
+                    .getRepository(ProductVariant)
+                    .findByIds(product.variants.map(v => v.id), {
+                        relations: variantRelations,
+                    });
+                if (product.enabled === false) {
+                    updatedVariants.forEach(v => (v.enabled = false));
                 }
-            } else {
-                const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
-                    relations: variantRelations,
-                });
-                if (variant) {
-                    updatedVariants = [variant];
+                Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
+                updatedVariants = this.hydrateVariants(ctx, updatedVariants);
+                if (updatedVariants.length) {
+                    await this.saveVariants(ctx, updatedVariants);
                 }
             }
-            Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
-            updatedVariants = this.hydrateVariants(ctx, updatedVariants);
-            if (updatedVariants.length) {
+            return true;
+        });
+    }
+
+    @MessagePattern(UpdateVariantMessage.pattern)
+    updateVariants(data: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
+        const ctx = RequestContext.fromObject(data.ctx);
+        return defer(async () => {
+            const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds, {
+                relations: variantRelations,
+            });
+            if (variants) {
+                const updatedVariants = this.hydrateVariants(ctx, variants);
+                Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
                 await this.saveVariants(ctx, updatedVariants);
             }
-            if (removedVariantIds.length) {
-                await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
+            return true;
+        });
+    }
+
+    @MessagePattern(DeleteProductMessage.pattern)
+    deleteProduct(data: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
+        const ctx = RequestContext.fromObject(data.ctx);
+        return defer(async () => {
+            const product = await this.connection.getRepository(Product).findOne(data.productId, {
+                relations: ['variants'],
+            });
+            if (product && product.deletedAt) {
+                const removedVariantIds = product.variants.map(v => v.id);
+                if (removedVariantIds.length) {
+                    await this.removeSearchIndexItems(ctx, removedVariantIds);
+                }
+            }
+            return true;
+        });
+    }
+
+    @MessagePattern(DeleteVariantMessage.pattern)
+    deleteVariant(data: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
+        const ctx = RequestContext.fromObject(data.ctx);
+        return defer(async () => {
+            const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
+            if (variants.length) {
+                await this.removeSearchIndexItems(ctx, variants.map(v => v.id));
             }
             return true;
         });
     }
 
-    private getSearchIndexQueryBuilder() {
+    private getSearchIndexQueryBuilder(channelId: ID) {
         const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
         FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
             relations: variantRelations,
         });
         FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
+        qb.leftJoin('variants.product', 'product')
+            .leftJoin('product.channels', 'channel')
+            .where('channel.id = :channelId', { channelId })
+            .andWhere('variants__product.deletedAt IS NULL');
         return qb;
     }
 
@@ -205,19 +243,21 @@ export class IndexerController {
         const items = variants.map(
             (v: ProductVariant) =>
                 new SearchIndexItem({
+                    productVariantId: v.id,
+                    channelId: ctx.channelId,
+                    languageCode: ctx.languageCode,
                     sku: v.sku,
                     enabled: v.enabled,
                     slug: v.product.slug,
                     price: v.price,
                     priceWithTax: v.priceWithTax,
-                    languageCode: ctx.languageCode,
-                    productVariantId: v.id,
                     productId: v.product.id,
                     productName: v.product.name,
                     description: v.product.description,
                     productVariantName: v.name,
                     productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
                     productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
+                    channelIds: v.product.channels.map(c => c.id as string),
                     facetIds: this.getFacetIds(v),
                     facetValueIds: this.getFacetValueIds(v),
                     collectionIds: v.collections.map(c => c.id.toString()),
@@ -243,10 +283,11 @@ export class IndexerController {
     /**
      * Remove items from the search index
      */
-    private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
+    private async removeSearchIndexItems(ctx: RequestContext, variantIds: ID[]) {
         const compositeKeys = variantIds.map(id => ({
             productVariantId: id,
-            languageCode,
+            channelId: ctx.channelId,
+            languageCode: ctx.languageCode,
         })) as any[];
         await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
     }

+ 55 - 24
packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts

@@ -7,11 +7,15 @@ import { ProductVariant } from '../../../entity/product-variant/product-variant.
 import { Product } from '../../../entity/product/product.entity';
 import { Job } from '../../../service/helpers/job-manager/job';
 import { JobReporter, JobService } from '../../../service/services/job.service';
+import { WorkerMessage } from '../../../worker/types';
 import { WorkerService } from '../../../worker/worker.service';
 import {
+    DeleteProductMessage,
+    DeleteVariantMessage,
     ReindexMessage,
     ReindexMessageResponse,
-    UpdateProductOrVariantMessage,
+    UpdateProductMessage,
+    UpdateVariantMessage,
     UpdateVariantsByIdMessage,
 } from '../types';
 
@@ -33,29 +37,37 @@ export class SearchIndexService {
         });
     }
 
-    /**
-     * Updates the search index only for the affected entities.
-     */
-    updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
-        return this.jobService.createJob({
-            name: 'update-index',
-            metadata: {
-                entity: updatedEntity.constructor.name,
-                id: updatedEntity.id,
-            },
-            work: reporter => {
-                const data =
-                    updatedEntity instanceof Product
-                        ? { ctx, productId: updatedEntity.id }
-                        : { ctx, variantId: updatedEntity.id };
-                this.workerService.send(new UpdateProductOrVariantMessage(data)).subscribe({
-                    complete: () => reporter.complete(true),
-                    error: err => {
-                        Logger.error(err);
-                        reporter.complete(false);
-                    },
-                });
-            },
+    updateProduct(ctx: RequestContext, product: Product) {
+        const data = { ctx, productId: product.id };
+        return this.createShortWorkerJob(new UpdateProductMessage(data), {
+            entity: 'Product',
+            id: product.id,
+        });
+    }
+
+    updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
+        const variantIds = variants.map(v => v.id);
+        const data = { ctx, variantIds };
+        return this.createShortWorkerJob(new UpdateVariantMessage(data), {
+            entity: 'ProductVariant',
+            ids: variantIds,
+        });
+    }
+
+    deleteProduct(ctx: RequestContext, product: Product) {
+        const data = { ctx, productId: product.id };
+        return this.createShortWorkerJob(new DeleteProductMessage(data), {
+            entity: 'Product',
+            id: product.id,
+        });
+    }
+
+    deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
+        const variantIds = variants.map(v => v.id);
+        const data = { ctx, variantIds };
+        return this.createShortWorkerJob(new DeleteVariantMessage(data), {
+            entity: 'ProductVariant',
+            id: variantIds,
         });
     }
 
@@ -74,6 +86,25 @@ export class SearchIndexService {
         });
     }
 
+    /**
+     * Creates a short-running job that does not expect progress updates.
+     */
+    private createShortWorkerJob<T extends WorkerMessage<any, any>>(message: T, metadata: any) {
+        return this.jobService.createJob({
+            name: 'update-index',
+            metadata,
+            work: reporter => {
+                this.workerService.send(message).subscribe({
+                    complete: () => reporter.complete(true),
+                    error: err => {
+                        Logger.error(err);
+                        reporter.complete(false);
+                    },
+                });
+            },
+        });
+    }
+
     private createObserver(reporter: JobReporter) {
         let total: number | undefined;
         let duration = 0;

+ 6 - 0
packages/core/src/plugin/default-search-plugin/search-index-item.entity.ts

@@ -20,6 +20,9 @@ export class SearchIndexItem {
     @PrimaryColumn('varchar')
     languageCode: LanguageCode;
 
+    @EntityId({ primary: true })
+    channelId: ID;
+
     @EntityId()
     productId: ID;
 
@@ -61,6 +64,9 @@ export class SearchIndexItem {
     @Column('simple-array')
     collectionIds: string[];
 
+    @Column('simple-array')
+    channelIds: string[];
+
     @Column()
     productPreview: string;
 

+ 16 - 4
packages/core/src/plugin/default-search-plugin/search-strategy/mysql-search-strategy.ts

@@ -17,14 +17,18 @@ export class MysqlSearchStrategy implements SearchStrategy {
 
     constructor(private connection: Connection) {}
 
-    async getFacetValueIds(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<Map<ID, number>> {
+    async getFacetValueIds(
+        ctx: RequestContext,
+        input: SearchInput,
+        enabledOnly: boolean,
+    ): Promise<Map<ID, number>> {
         const facetValuesQb = this.connection
             .getRepository(SearchIndexItem)
             .createQueryBuilder('si')
             .select(['productId', 'productVariantId'])
             .addSelect('GROUP_CONCAT(facetValueIds)', 'facetValues');
 
-        this.applyTermAndFilters(facetValuesQb, input);
+        this.applyTermAndFilters(ctx, facetValuesQb, input);
         if (!input.groupByProduct) {
             facetValuesQb.groupBy('productVariantId');
         }
@@ -35,7 +39,11 @@ export class MysqlSearchStrategy implements SearchStrategy {
         return createFacetIdCountMap(facetValuesResult);
     }
 
-    async getSearchResults(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<SearchResult[]> {
+    async getSearchResults(
+        ctx: RequestContext,
+        input: SearchInput,
+        enabledOnly: boolean,
+    ): Promise<SearchResult[]> {
         const take = input.take || 25;
         const skip = input.skip || 0;
         const sort = input.sort;
@@ -46,7 +54,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
                 .addSelect('MIN(priceWithTax)', 'minPriceWithTax')
                 .addSelect('MAX(priceWithTax)', 'maxPriceWithTax');
         }
-        this.applyTermAndFilters(qb, input);
+        this.applyTermAndFilters(ctx, qb, input);
         if (input.term && input.term.length > this.minTermLength) {
             qb.orderBy('score', 'DESC');
         }
@@ -71,6 +79,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
 
     async getTotalCount(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<number> {
         const innerQb = this.applyTermAndFilters(
+            ctx,
             this.connection.getRepository(SearchIndexItem).createQueryBuilder('si'),
             input,
         );
@@ -87,6 +96,7 @@ export class MysqlSearchStrategy implements SearchStrategy {
     }
 
     private applyTermAndFilters(
+        ctx: RequestContext,
         qb: SelectQueryBuilder<SearchIndexItem>,
         input: SearchInput,
     ): SelectQueryBuilder<SearchIndexItem> {
@@ -122,6 +132,8 @@ export class MysqlSearchStrategy implements SearchStrategy {
         if (collectionId) {
             qb.andWhere(`FIND_IN_SET (:collectionId, collectionIds)`, { collectionId });
         }
+        qb.andWhere('languageCode = :languageCode', { languageCode: ctx.languageCode });
+        qb.andWhere('channelId = :channelId', { channelId: ctx.channelId });
         if (input.groupByProduct === true) {
             qb.groupBy('productId');
         }

+ 18 - 6
packages/core/src/plugin/default-search-plugin/search-strategy/postgres-search-strategy.ts

@@ -17,14 +17,18 @@ export class PostgresSearchStrategy implements SearchStrategy {
 
     constructor(private connection: Connection) {}
 
-    async getFacetValueIds(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<Map<ID, number>> {
+    async getFacetValueIds(
+        ctx: RequestContext,
+        input: SearchInput,
+        enabledOnly: boolean,
+    ): Promise<Map<ID, number>> {
         const facetValuesQb = this.connection
             .getRepository(SearchIndexItem)
             .createQueryBuilder('si')
             .select(['"si"."productId"', 'MAX("si"."productVariantId")'])
             .addSelect(`string_agg("si"."facetValueIds",',')`, 'facetValues');
 
-        this.applyTermAndFilters(facetValuesQb, input, true);
+        this.applyTermAndFilters(ctx, facetValuesQb, input, true);
         if (!input.groupByProduct) {
             facetValuesQb.groupBy('"si"."productVariantId", "si"."productId"');
         }
@@ -35,7 +39,11 @@ export class PostgresSearchStrategy implements SearchStrategy {
         return createFacetIdCountMap(facetValuesResult);
     }
 
-    async getSearchResults(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<SearchResult[]> {
+    async getSearchResults(
+        ctx: RequestContext,
+        input: SearchInput,
+        enabledOnly: boolean,
+    ): Promise<SearchResult[]> {
         const take = input.take || 25;
         const skip = input.skip || 0;
         const sort = input.sort;
@@ -49,7 +57,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
                 .addSelect('MIN("priceWithTax")', 'minPriceWithTax')
                 .addSelect('MAX("priceWithTax")', 'maxPriceWithTax');
         }
-        this.applyTermAndFilters(qb, input);
+        this.applyTermAndFilters(ctx, qb, input);
         if (input.term && input.term.length > this.minTermLength) {
             qb.orderBy('score', 'DESC');
         }
@@ -75,6 +83,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
 
     async getTotalCount(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<number> {
         const innerQb = this.applyTermAndFilters(
+            ctx,
             this.connection
                 .getRepository(SearchIndexItem)
                 .createQueryBuilder('si')
@@ -93,6 +102,7 @@ export class PostgresSearchStrategy implements SearchStrategy {
     }
 
     private applyTermAndFilters(
+        ctx: RequestContext,
         qb: SelectQueryBuilder<SearchIndexItem>,
         input: SearchInput,
         forceGroup: boolean = false,
@@ -110,8 +120,8 @@ export class PostgresSearchStrategy implements SearchStrategy {
                     (ts_rank_cd(to_tsvector(${minIfGrouped('si.sku')}), to_tsquery(:term)) * 10 +
                     ts_rank_cd(to_tsvector(${minIfGrouped('si.productName')}), to_tsquery(:term)) * 2 +
                     ts_rank_cd(to_tsvector(${minIfGrouped(
-                    'si.productVariantName',
-                )}), to_tsquery(:term)) * 1.5 +
+                        'si.productVariantName',
+                    )}), to_tsquery(:term)) * 1.5 +
                     ts_rank_cd(to_tsvector(${minIfGrouped('si.description')}), to_tsquery(:term)) * 1)
                             `,
                 'score',
@@ -137,6 +147,8 @@ export class PostgresSearchStrategy implements SearchStrategy {
         if (collectionId) {
             qb.andWhere(`:collectionId = ANY (string_to_array(si.collectionIds, ','))`, { collectionId });
         }
+        qb.andWhere('languageCode = :languageCode', { languageCode: ctx.languageCode });
+        qb.andWhere('channelId = :channelId', { channelId: ctx.channelId });
         if (input.groupByProduct === true) {
             qb.groupBy('si.productId');
         }

+ 1 - 0
packages/core/src/plugin/default-search-plugin/search-strategy/search-strategy-utils.ts

@@ -29,6 +29,7 @@ export function mapToSearchResult(raw: any, currencyCode: CurrencyCode): SearchR
         facetIds: raw.si_facetIds.split(',').map((x: string) => x.trim()),
         facetValueIds: raw.si_facetValueIds.split(',').map((x: string) => x.trim()),
         collectionIds: raw.si_collectionIds.split(',').map((x: string) => x.trim()),
+        channelIds: raw.si_channelIds.split(',').map((x: string) => x.trim()),
         productPreview: raw.si_productPreview,
         productVariantPreview: raw.si_productVariantPreview,
         score: raw.score || 0,

+ 6 - 2
packages/core/src/plugin/default-search-plugin/search-strategy/sqlite-search-strategy.ts

@@ -29,7 +29,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
             .select(['productId', 'productVariantId'])
             .addSelect('GROUP_CONCAT(si.facetValueIds)', 'facetValues');
 
-        this.applyTermAndFilters(facetValuesQb, input);
+        this.applyTermAndFilters(ctx, facetValuesQb, input);
         if (!input.groupByProduct) {
             facetValuesQb.groupBy('productVariantId');
         }
@@ -56,7 +56,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
                 'maxPriceWithTax',
             );
         }
-        this.applyTermAndFilters(qb, input);
+        this.applyTermAndFilters(ctx, qb, input);
         if (input.term && input.term.length > this.minTermLength) {
             qb.orderBy('score', 'DESC');
         }
@@ -83,6 +83,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
 
     async getTotalCount(ctx: RequestContext, input: SearchInput, enabledOnly: boolean): Promise<number> {
         const innerQb = this.applyTermAndFilters(
+            ctx,
             this.connection.getRepository(SearchIndexItem).createQueryBuilder('si'),
             input,
         );
@@ -100,6 +101,7 @@ export class SqliteSearchStrategy implements SearchStrategy {
     }
 
     private applyTermAndFilters(
+        ctx: RequestContext,
         qb: SelectQueryBuilder<SearchIndexItem>,
         input: SearchInput,
     ): SelectQueryBuilder<SearchIndexItem> {
@@ -140,6 +142,8 @@ export class SqliteSearchStrategy implements SearchStrategy {
                 collectionId: `%,${collectionId},%`,
             });
         }
+        qb.andWhere('languageCode = :languageCode', { languageCode: ctx.languageCode });
+        qb.andWhere('channelId = :channelId', { channelId: ctx.channelId });
         if (input.groupByProduct === true) {
             qb.groupBy('productId');
         }

+ 18 - 5
packages/core/src/plugin/default-search-plugin/types.ts

@@ -9,10 +9,14 @@ export interface ReindexMessageResponse {
     duration: number;
 }
 
-export type UpdateProductOrVariantMessageData = {
+export type UpdateProductMessageData = {
     ctx: RequestContext;
-    productId?: ID;
-    variantId?: ID;
+    productId: ID;
+};
+
+export type UpdateVariantMessageData = {
+    ctx: RequestContext;
+    variantIds: ID[];
 };
 
 export interface UpdateVariantsByIdMessageData {
@@ -23,8 +27,17 @@ export interface UpdateVariantsByIdMessageData {
 export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> {
     static readonly pattern = 'Reindex';
 }
-export class UpdateProductOrVariantMessage extends WorkerMessage<UpdateProductOrVariantMessageData, boolean> {
-    static readonly pattern = 'UpdateProductOrVariant';
+export class UpdateVariantMessage extends WorkerMessage<UpdateVariantMessageData, boolean> {
+    static readonly pattern = 'UpdateProduct';
+}
+export class UpdateProductMessage extends WorkerMessage<UpdateProductMessageData, boolean> {
+    static readonly pattern = 'UpdateVariant';
+}
+export class DeleteVariantMessage extends WorkerMessage<UpdateVariantMessageData, boolean> {
+    static readonly pattern = 'DeleteProduct';
+}
+export class DeleteProductMessage extends WorkerMessage<UpdateProductMessageData, boolean> {
+    static readonly pattern = 'DeleteVariant';
 }
 export class UpdateVariantsByIdMessage extends WorkerMessage<
     UpdateVariantsByIdMessageData,