From 5d784a485dbc574add33afacac1e3fbecfa226d8 Mon Sep 17 00:00:00 2001 From: Himaja Dhanyamraju Date: Mon, 11 Sep 2023 13:01:21 +0530 Subject: [PATCH 1/3] MOSIP-29320 --- consolidator/consolidator_service.bal | 66 +++++++++++++++++++ consolidator/init_consolidator.bal | 7 ++ .../modules/config/configurations.bal | 11 ++++ .../modules/health_check/health_check.bal | 10 +++ .../kafka_health_check/kafka_health_check.bal | 20 ++++++ 5 files changed, 114 insertions(+) create mode 100644 consolidator/modules/health_check/health_check.bal create mode 100644 consolidator/modules/kafka_health_check/kafka_health_check.bal diff --git a/consolidator/consolidator_service.bal b/consolidator/consolidator_service.bal index 147fa75..68219fa 100644 --- a/consolidator/consolidator_service.bal +++ b/consolidator/consolidator_service.bal @@ -22,6 +22,72 @@ import consolidatorService.config; import consolidatorService.util; import consolidatorService.connections as conn; import consolidatorService.persistence as persist; +import ballerina/http; +import consolidatorService.health_check as healthcheck; +import consolidatorService.kafka_health_check as kafkahealthcheck; +import ballerina/jballerina.java; + +http:Service healthCheckService = service object { + + resource function get .() returns http:Ok|http:ServiceUnavailable { + //diskspace + string diskSpaceStatus = "DOWN"; + handle handleStr = java:fromString(config:CURRENT_WORKING_DIR); + handle fileObj = newFile(java:fromString(getCurrent(handleStr).toString())); + int usableSpace = getUsableSpace(fileObj); + int totalSpace = getTotalSpace(fileObj); + int threshold = config:DISK_SPACE_THRESHOLD; + healthcheck:DiskSpaceMetaData diskSpaceMetaData = {free: usableSpace, total: totalSpace, threshold: threshold}; + if (usableSpace >= threshold) { + diskSpaceStatus = "UP"; + } + healthcheck:HealthCheckResp diskSpace = {status: diskSpaceStatus, details: {diskSpaceMetaData}}; + + //kafka + string kafkaStatus = "DOWN"; + handle|error? producerResult = kafkahealthcheck:describeTopicKafka(config:CONSOLIDATED_WEBSUB_SUBSCRIBERS_TOPIC); + if (producerResult is handle) { + kafkaStatus = "UP"; + } + healthcheck:HealthCheckResp kafkaHealth = {status: kafkaStatus, details: {}}; + //add to main map + map details = { + "diskSpace": diskSpace, + "kafka": kafkaHealth + }; + + string resultStatus = "DOWN"; + if(diskSpaceStatus == "UP" && kafkaStatus == "UP"){ + resultStatus = "UP"; + healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}}; + http:Ok res = {body: healthCheckResp}; + return res; + } + + //main object + healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}}; + http:ServiceUnavailable res = {body: healthCheckResp}; + return res; + } +}; + +function newFile(handle c) returns handle = @java:Constructor { + 'class: "java.io.File", + paramTypes: ["java.lang.String"] +} external; + +function getCurrent(handle prop) returns handle = @java:Method { + name: "getProperty", + 'class: "java.lang.System" +} external; + +function getUsableSpace(handle fileObj) returns int = @java:Method { + 'class: "java.io.File" +} external; + +isolated function getTotalSpace(handle fileObj) returns int = @java:Method { + 'class: "java.io.File" +} external; isolated function startConsolidator() returns error? { do { diff --git a/consolidator/init_consolidator.bal b/consolidator/init_consolidator.bal index 573974f..dee88db 100644 --- a/consolidator/init_consolidator.bal +++ b/consolidator/init_consolidator.bal @@ -22,6 +22,8 @@ import consolidatorService.config; import consolidatorService.inittopic as initt; import consolidatorService.util; import consolidatorService.connections as conn; +import ballerina/http; +import ballerina/lang.runtime; isolated map registeredTopicsCache = {}; isolated map subscribersCache = {}; @@ -31,6 +33,11 @@ public function main() returns error? { if result is error { return result; } + // Start the Hub + http:Listener httpListener = check new (config:CONSOLIDATOR_PORT); + check httpListener.attach(healthCheckService, config:CONSOLIDATOR_HEALTH_ENDPOINT); + check httpListener.'start(); + runtime:registerListener(httpListener); // Initialize consolidator-service state check syncRegsisteredTopicsCache(); _ = check conn:consolidatedTopicsConsumer->close(config:GRACEFUL_CLOSE_PERIOD); diff --git a/consolidator/modules/config/configurations.bal b/consolidator/modules/config/configurations.bal index 57c4c68..467e0df 100644 --- a/consolidator/modules/config/configurations.bal +++ b/consolidator/modules/config/configurations.bal @@ -44,3 +44,14 @@ public configurable decimal POLLING_INTERVAL = 10; public configurable decimal GRACEFUL_CLOSE_PERIOD = 5; public final string CONSTRUCTED_CONSUMER_ID = util:generateRandomString(); + +public final string CURRENT_WORKING_DIR = "user.dir"; + +# The disk space threshold for healthcheck +public configurable int DISK_SPACE_THRESHOLD = 10485760; + +# The port that is used to start the consolidator +public configurable int CONSOLIDATOR_PORT = 9192; + +# consolidator health endpoint +public configurable string CONSOLIDATOR_HEALTH_ENDPOINT = "/consolidator/actuator/health"; \ No newline at end of file diff --git a/consolidator/modules/health_check/health_check.bal b/consolidator/modules/health_check/health_check.bal new file mode 100644 index 0000000..c708e59 --- /dev/null +++ b/consolidator/modules/health_check/health_check.bal @@ -0,0 +1,10 @@ +public type HealthCheckResp record {| + string status; + map details; +|}; + +public type DiskSpaceMetaData record {| + int total; + int free; + int threshold; +|}; \ No newline at end of file diff --git a/consolidator/modules/kafka_health_check/kafka_health_check.bal b/consolidator/modules/kafka_health_check/kafka_health_check.bal new file mode 100644 index 0000000..8a5009c --- /dev/null +++ b/consolidator/modules/kafka_health_check/kafka_health_check.bal @@ -0,0 +1,20 @@ +import ballerina/jballerina.java; +import consolidatorService.config; + +public function describeTopicKafka(string topic) returns handle|error? { + handle bootStrapServer = java:fromString(config:KAFKA_BOOTSTRAP_NODE); + handle newMosipKafkaAdminClientResult = newMosipKafkaAdminClient(bootStrapServer); + handle|error? result = trap describeTopic(newMosipKafkaAdminClientResult, java:fromString(topic)); + return result; +} + +function newMosipKafkaAdminClient(handle bootstrapServers) returns handle = @java:Constructor { + 'class: "io.mosip.kafkaadminclient.MosipKafkaAdminClient", + paramTypes: ["java.lang.String"] +} external; + +function describeTopic(handle adminClinetObject, handle topic) returns handle|error? = @java:Method { + name: "describeTopic", + 'class: "io.mosip.kafkaadminclient.MosipKafkaAdminClient", + paramTypes: ["java.lang.String"] +} external; \ No newline at end of file From 5cd45e086a37e09b8b18eee5ca71dc661fb5ed0b Mon Sep 17 00:00:00 2001 From: Himaja Dhanyamraju Date: Thu, 14 Sep 2023 11:50:15 +0530 Subject: [PATCH 2/3] MOSIP-29320: Added consolidator healthcheck --- hub/hub_service.bal | 27 +++++++++++++++++---------- hub/modules/config/configurations.bal | 6 ++++++ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/hub/hub_service.bal b/hub/hub_service.bal index b0b0df2..b27c7a0 100644 --- a/hub/hub_service.bal +++ b/hub/hub_service.bal @@ -42,21 +42,28 @@ http:Service healthCheckService = service object { } healthcheck:HealthCheckResp diskSpace = {status: diskSpaceStatus, details: {diskSpaceMetaData}}; - //kafka - string kafkaStatus = "DOWN"; - kafka:TopicPartition[]|kafka:Error producerResult = conn:statePersistProducer->getTopicPartitions(config:REGISTERED_WEBSUB_TOPICS_TOPIC); - if (producerResult is kafka:TopicPartition[]) { - kafkaStatus = "UP"; + //consolidator + string consolidatorStatus = "DOWN"; + http:Client|http:ClientError clientEndpoint = new (config:CONSOLIDATOR_BASE_URL); + if(clientEndpoint is http:ClientError){ + log:printError(clientEndpoint.message()); + }else{ + healthcheck:HealthCheckResp|error consolidatorHealth = clientEndpoint -> get(config:CONSOLIDATOR_HEALTH_ENDPOINT); + if(consolidatorHealth is healthcheck:HealthCheckResp){ + consolidatorStatus = consolidatorHealth.status; } - healthcheck:HealthCheckResp kafkaHealth = {status: kafkaStatus, details: {}}; + } + healthcheck:HealthCheckResp consolidatorSHealth = {status: consolidatorStatus, details: {}}; //add to main map map details = { "diskSpace": diskSpace, - "kafka": kafkaHealth + "consolidator": consolidatorSHealth }; - - //main object - healthcheck:HealthCheckResp healthCheckResp = {status: "UP", details: {details}}; + string resultStatus = "DOWN"; + if(diskSpaceStatus == "UP" && consolidatorStatus == "UP"){ + resultStatus = "UP"; + } + healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}}; return healthCheckResp; } }; diff --git a/hub/modules/config/configurations.bal b/hub/modules/config/configurations.bal index 01e2e22..083a3d4 100644 --- a/hub/modules/config/configurations.bal +++ b/hub/modules/config/configurations.bal @@ -94,3 +94,9 @@ public configurable int KAFKA_CONSUMER_FETCH_MAX_BYTES = 3145728; # The maximum retry count public configurable int KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES = 524288; + +# consolidator base url +public configurable string CONSOLIDATOR_BASE_URL = "http://websub-consolidator"; + +# consolidator health endpoint +public configurable string CONSOLIDATOR_HEALTH_ENDPOINT = "/consolidator/actuator/health"; \ No newline at end of file From c014036466ae60ed33d75d83e2657eeb5e02a2c4 Mon Sep 17 00:00:00 2001 From: Himaja Dhanyamraju Date: Fri, 22 Sep 2023 15:07:27 +0530 Subject: [PATCH 3/3] MOSIP-29320: Response type changed for healthcheck --- hub/hub_service.bal | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hub/hub_service.bal b/hub/hub_service.bal index b27c7a0..d8eb445 100644 --- a/hub/hub_service.bal +++ b/hub/hub_service.bal @@ -28,7 +28,7 @@ import ballerinax/kafka; http:Service healthCheckService = service object { - resource function get .() returns healthcheck:HealthCheckResp { + resource function get .() returns http:Ok|http:ServiceUnavailable { //diskspace string diskSpaceStatus = "DOWN"; handle handleStr = java:fromString(config:CURRENT_WORKING_DIR); @@ -62,9 +62,13 @@ http:Service healthCheckService = service object { string resultStatus = "DOWN"; if(diskSpaceStatus == "UP" && consolidatorStatus == "UP"){ resultStatus = "UP"; + healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}}; + http:Ok res = {body: healthCheckResp}; + return res; } healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}}; - return healthCheckResp; + http:ServiceUnavailable res = {body: healthCheckResp}; + return res; } };