Pārlūkot izejas kodu

feat(elasticsearch-plugin): Add health check

Relates to #289
Michael Bromley 5 gadi atpakaļ
vecāks
revīzija
47a8cb99db

+ 36 - 0
packages/elasticsearch-plugin/src/elasticsearch.health.ts

@@ -0,0 +1,36 @@
+import { Injectable } from '@nestjs/common';
+import { HealthCheckError, HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';
+
+import { ElasticsearchService } from './elasticsearch.service';
+
+@Injectable()
+export class ElasticsearchHealthIndicator extends HealthIndicator {
+    constructor(private elasticsearchService: ElasticsearchService) {
+        super();
+    }
+
+    async isHealthy(): Promise<HealthIndicatorResult> {
+        let isHealthy = false;
+        let error = '';
+        try {
+            await this.elasticsearchService.checkConnection();
+            isHealthy = true;
+        } catch (e) {
+            error = e.message;
+        }
+        const result = this.getStatus('elasticsearch', isHealthy, { message: error });
+        if (isHealthy) {
+            return result;
+        }
+        this.throwHealthCheckError(result);
+    }
+
+    startupCheckFailed(message: string): never {
+        const result = this.getStatus('elasticsearch', false, { message });
+        return this.throwHealthCheckError(result);
+    }
+
+    private throwHealthCheckError(result: HealthIndicatorResult): never {
+        throw new HealthCheckError('Elasticsearch not available', result);
+    }
+}

+ 20 - 9
packages/elasticsearch-plugin/src/plugin.ts

@@ -3,6 +3,7 @@ import {
     CollectionModificationEvent,
     CollectionModificationEvent,
     DeepRequired,
     DeepRequired,
     EventBus,
     EventBus,
+    HealthCheckRegistryService,
     ID,
     ID,
     idsAreEqual,
     idsAreEqual,
     Logger,
     Logger,
@@ -21,6 +22,7 @@ import { ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
 import { CustomMappingsResolver } from './custom-mappings.resolver';
 import { CustomMappingsResolver } from './custom-mappings.resolver';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
 import { ElasticsearchIndexService } from './elasticsearch-index.service';
 import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
 import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
+import { ElasticsearchHealthIndicator } from './elasticsearch.health';
 import { ElasticsearchService } from './elasticsearch.service';
 import { ElasticsearchService } from './elasticsearch.service';
 import { generateSchemaExtensions } from './graphql-schema-extensions';
 import { generateSchemaExtensions } from './graphql-schema-extensions';
 import { ElasticsearchIndexerController } from './indexer.controller';
 import { ElasticsearchIndexerController } from './indexer.controller';
@@ -192,6 +194,7 @@ import { ElasticsearchOptions, mergeWithDefaults } from './options';
     providers: [
     providers: [
         ElasticsearchIndexService,
         ElasticsearchIndexService,
         ElasticsearchService,
         ElasticsearchService,
+        ElasticsearchHealthIndicator,
         { provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
         { provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
     ],
     ],
     adminApiExtensions: { resolvers: [AdminElasticSearchResolver] },
     adminApiExtensions: { resolvers: [AdminElasticSearchResolver] },
@@ -217,6 +220,8 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
         private eventBus: EventBus,
         private eventBus: EventBus,
         private elasticsearchService: ElasticsearchService,
         private elasticsearchService: ElasticsearchService,
         private elasticsearchIndexService: ElasticsearchIndexService,
         private elasticsearchIndexService: ElasticsearchIndexService,
+        private elasticsearchHealthIndicator: ElasticsearchHealthIndicator,
+        private healthCheckRegistryService: HealthCheckRegistryService,
     ) {}
     ) {}
 
 
     /**
     /**
@@ -235,28 +240,34 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
         } catch (e) {
         } catch (e) {
             Logger.error(`Could not connect to Elasticsearch instance at "${host}:${port}"`, loggerCtx);
             Logger.error(`Could not connect to Elasticsearch instance at "${host}:${port}"`, loggerCtx);
             Logger.error(JSON.stringify(e), loggerCtx);
             Logger.error(JSON.stringify(e), loggerCtx);
+            this.healthCheckRegistryService.registerIndicatorFunction(() =>
+                this.elasticsearchHealthIndicator.startupCheckFailed(e.message),
+            );
             return;
             return;
         }
         }
         Logger.info(`Sucessfully connected to Elasticsearch instance at "${host}:${port}"`, loggerCtx);
         Logger.info(`Sucessfully connected to Elasticsearch instance at "${host}:${port}"`, loggerCtx);
 
 
         await this.elasticsearchService.createIndicesIfNotExists();
         await this.elasticsearchService.createIndicesIfNotExists();
         this.elasticsearchIndexService.initJobQueue();
         this.elasticsearchIndexService.initJobQueue();
+        this.healthCheckRegistryService.registerIndicatorFunction(() =>
+            this.elasticsearchHealthIndicator.isHealthy(),
+        );
 
 
-        this.eventBus.ofType(ProductEvent).subscribe((event) => {
+        this.eventBus.ofType(ProductEvent).subscribe(event => {
             if (event.type === 'deleted') {
             if (event.type === 'deleted') {
                 return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product);
                 return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product);
             } else {
             } else {
                 return this.elasticsearchIndexService.updateProduct(event.ctx, event.product);
                 return this.elasticsearchIndexService.updateProduct(event.ctx, event.product);
             }
             }
         });
         });
-        this.eventBus.ofType(ProductVariantEvent).subscribe((event) => {
+        this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
             if (event.type === 'deleted') {
             if (event.type === 'deleted') {
                 return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants);
                 return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants);
             } else {
             } else {
                 return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants);
                 return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants);
             }
             }
         });
         });
-        this.eventBus.ofType(AssetEvent).subscribe((event) => {
+        this.eventBus.ofType(AssetEvent).subscribe(event => {
             if (event.type === 'updated') {
             if (event.type === 'updated') {
                 return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset);
                 return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset);
             }
             }
@@ -265,7 +276,7 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
             }
             }
         });
         });
 
 
-        this.eventBus.ofType(ProductChannelEvent).subscribe((event) => {
+        this.eventBus.ofType(ProductChannelEvent).subscribe(event => {
             if (event.type === 'assigned') {
             if (event.type === 'assigned') {
                 return this.elasticsearchIndexService.assignProductToChannel(
                 return this.elasticsearchIndexService.assignProductToChannel(
                     event.ctx,
                     event.ctx,
@@ -286,18 +297,18 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
         collectionModification$
         collectionModification$
             .pipe(
             .pipe(
                 buffer(closingNotifier$),
                 buffer(closingNotifier$),
-                filter((events) => 0 < events.length),
-                map((events) => ({
+                filter(events => 0 < events.length),
+                map(events => ({
                     ctx: events[0].ctx,
                     ctx: events[0].ctx,
                     ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
                     ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
                 })),
                 })),
-                filter((e) => 0 < e.ids.length),
+                filter(e => 0 < e.ids.length),
             )
             )
-            .subscribe((events) => {
+            .subscribe(events => {
                 return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids);
                 return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids);
             });
             });
 
 
-        this.eventBus.ofType(TaxRateModificationEvent).subscribe((event) => {
+        this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             const defaultTaxZone = event.ctx.channel.defaultTaxZone;
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
             if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
                 return this.elasticsearchService.updateAll(event.ctx);
                 return this.elasticsearchService.updateAll(event.ctx);