Browse Source

fix(core): Reduce chance of index err in assigning variants to channels

Michael Bromley 5 years ago
parent
commit
58e3f7b6f1

+ 1 - 2
packages/core/e2e/default-search-plugin.e2e-spec.ts

@@ -78,6 +78,7 @@ describe('Default search plugin', () => {
     }, TEST_SETUP_TIMEOUT_MS);
 
     afterAll(async () => {
+        await awaitRunningJobs(adminClient);
         await server.destroy();
     });
 
@@ -937,8 +938,6 @@ describe('Default search plugin', () => {
                     input: { channelId: secondChannel.id, productVariantIds: ['T_10', 'T_15'] },
                 });
                 await awaitRunningJobs(adminClient);
-                // The postgres test is kinda flaky so we stick in a pause for good measure
-                await new Promise(resolve => setTimeout(resolve, 500));
 
                 adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
 

+ 16 - 4
packages/core/src/service/services/product-variant.service.ts

@@ -497,12 +497,18 @@ export class ProductVariantService {
                 variant.price * priceFactor,
                 input.channelId,
             );
-            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
         }
-        return this.findByIds(
+        const result = await this.findByIds(
             ctx,
             variants.map(v => v.id),
         );
+        // Publish the events at the latest possible stage to decrease the chance of race conditions
+        // whereby an event listener triggers a query which does not yet have access to the changes
+        // within the current transaction.
+        for (const variant of variants) {
+            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
+        }
+        return result;
     }
 
     async removeProductVariantsFromChannel(
@@ -544,12 +550,18 @@ export class ProductVariantService {
                     input.channelId,
                 ]);
             }
-            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
         }
-        return this.findByIds(
+        const result = await this.findByIds(
             ctx,
             variants.map(v => v.id),
         );
+        // Publish the events at the latest possible stage to decrease the chance of race conditions
+        // whereby an event listener triggers a query which does not yet have access to the changes
+        // within the current transaction.
+        for (const variant of variants) {
+            this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
+        }
+        return result;
     }
 
     private async validateVariantOptionIds(ctx: RequestContext, input: CreateProductVariantInput) {

+ 1 - 0
packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts

@@ -131,6 +131,7 @@ describe('Elasticsearch plugin', () => {
     }, TEST_SETUP_TIMEOUT_MS);
 
     afterAll(async () => {
+        await awaitRunningJobs(adminClient);
         await server.destroy();
     });
 

+ 1 - 1
packages/elasticsearch-plugin/src/indexer.controller.ts

@@ -191,7 +191,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
         ctx: rawContext,
         productVariantId,
         channelId,
-    }: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
+    }: RemoveVariantFromChannelMessage['data']): Observable<RemoveVariantFromChannelMessage['response']> {
         const ctx = RequestContext.deserialize(rawContext);
         return asyncObservable(async () => {
             const productVariant = await this.connection.getEntityOrThrow(