Browse Source

fix(core): Reduce chance of index err in assigning variants to channels

Michael Bromley 5 years ago
parent
commit
8a1ff82c5d

+ 1 - 10
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -78,6 +78,7 @@ describe('Default search plugin', () => {
     }, TEST_SETUP_TIMEOUT_MS);
 
     afterAll(async () => {
+        await awaitRunningJobs(adminClient);
         await server.destroy();
     });
 
@@ -938,11 +939,6 @@ describe('Default search plugin', () => {
                 });
                 await awaitRunningJobs(adminClient);
 
-                if (process.env.DB === 'postgres') {
-                    // The postgres test is kinda flaky so we stick in a pause for good measure
-                    await new Promise(resolve => setTimeout(resolve, 1000));
-                }
-
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
 
                 const { search: searchGrouped } = await doAdminSearchQuery({ groupByProduct: true });
@@ -969,11 +965,6 @@ describe('Default search plugin', () => {
                 });
                 await awaitRunningJobs(adminClient);
 
-                if (process.env.DB === 'postgres') {
-                    // The postgres test is kinda flaky so we stick in a pause for good measure
-                    await new Promise(resolve => setTimeout(resolve, 1000));
-                }
-
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
 
                 const { search: searchGrouped } = await doAdminSearchQuery({ groupByProduct: true });

+ 16 - 4
packages/core/src/service/services/product-variant.service.ts

@@ -528,12 +528,18 @@ export class ProductVariantService {
                 variant.price * priceFactor,
                 input.channelId,
             );
-            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
         }
-        return this.findByIds(
+        const result = await this.findByIds(
             ctx,
             variants.map(v => v.id),
         );
+        // Publish the events at the latest possible stage to decrease the chance of race conditions
+        // whereby an event listener triggers a query which does not yet have access to the changes
+        // within the current transaction.
+        for (const variant of variants) {
+            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
+        }
+        return result;
     }
 
     async removeProductVariantsFromChannel(
@@ -575,12 +581,18 @@ export class ProductVariantService {
                     input.channelId,
                 ]);
             }
-            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
         }
-        return this.findByIds(
+        const result = await this.findByIds(
             ctx,
             variants.map(v => v.id),
         );
+        // Publish the events at the latest possible stage to decrease the chance of race conditions
+        // whereby an event listener triggers a query which does not yet have access to the changes
+        // within the current transaction.
+        for (const variant of variants) {
+            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
+        }
+        return result;
     }
 
     private async validateVariantOptionIds(ctx: RequestContext, input: CreateProductVariantInput) {

+ 1 - 10
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -131,6 +131,7 @@ describe('Elasticsearch plugin', () => {
     }, TEST_SETUP_TIMEOUT_MS);
 
     afterAll(async () => {
+        await awaitRunningJobs(adminClient);
         await server.destroy();
     });
 
@@ -808,11 +809,6 @@ describe('Elasticsearch plugin', () => {
                 });
                 await awaitRunningJobs(adminClient);
 
-                if (process.env.DB === 'postgres') {
-                    // The postgres test is kinda flaky so we stick in a pause for good measure
-                    await new Promise(resolve => setTimeout(resolve, 1000));
-                }
-
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
 
                 const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
@@ -843,11 +839,6 @@ describe('Elasticsearch plugin', () => {
                 });
                 await awaitRunningJobs(adminClient);
 
-                if (process.env.DB === 'postgres') {
-                    // The postgres test is kinda flaky so we stick in a pause for good measure
-                    await new Promise(resolve => setTimeout(resolve, 1000));
-                }
-
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
 
                 const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {

+ 1 - 1
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -191,7 +191,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         ctx: rawContext,
         productVariantId,
         channelId,
-    }: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
+    }: RemoveVariantFromChannelMessage['data']): Observable<RemoveVariantFromChannelMessage['response']> {
         const ctx = RequestContext.deserialize(rawContext);
         return asyncObservable(async () => {
             const productVariant = await this.connection.getEntityOrThrow(