Skip to content

Commit

Permalink
Merge pull request #218 from HimajaDhanyamraju2/1.1.5.3
Browse files Browse the repository at this point in the history
MOSIP-29320: Added health check to consolidator
  • Loading branch information
vishwa-vyom authored Sep 27, 2023
2 parents 66dcf37 + c014036 commit d817a3c
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 12 deletions.
66 changes: 66 additions & 0 deletions consolidator/consolidator_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,72 @@ import consolidatorService.config;
import consolidatorService.util;
import consolidatorService.connections as conn;
import consolidatorService.persistence as persist;
import ballerina/http;
import consolidatorService.health_check as healthcheck;
import consolidatorService.kafka_health_check as kafkahealthcheck;
import ballerina/jballerina.java;

http:Service healthCheckService = service object {

resource function get .() returns http:Ok|http:ServiceUnavailable {
//diskspace
string diskSpaceStatus = "DOWN";
handle handleStr = java:fromString(config:CURRENT_WORKING_DIR);
handle fileObj = newFile(java:fromString(getCurrent(handleStr).toString()));
int usableSpace = getUsableSpace(fileObj);
int totalSpace = getTotalSpace(fileObj);
int threshold = config:DISK_SPACE_THRESHOLD;
healthcheck:DiskSpaceMetaData diskSpaceMetaData = {free: usableSpace, total: totalSpace, threshold: threshold};
if (usableSpace >= threshold) {
diskSpaceStatus = "UP";
}
healthcheck:HealthCheckResp diskSpace = {status: diskSpaceStatus, details: {diskSpaceMetaData}};

//kafka
string kafkaStatus = "DOWN";
handle|error? producerResult = kafkahealthcheck:describeTopicKafka(config:CONSOLIDATED_WEBSUB_SUBSCRIBERS_TOPIC);
if (producerResult is handle) {
kafkaStatus = "UP";
}
healthcheck:HealthCheckResp kafkaHealth = {status: kafkaStatus, details: {}};
//add to main map
map<healthcheck:HealthCheckResp> details = {
"diskSpace": diskSpace,
"kafka": kafkaHealth
};

string resultStatus = "DOWN";
if(diskSpaceStatus == "UP" && kafkaStatus == "UP"){
resultStatus = "UP";
healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}};
http:Ok res = {body: healthCheckResp};
return res;
}

//main object
healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}};
http:ServiceUnavailable res = {body: healthCheckResp};
return res;
}
};

function newFile(handle c) returns handle = @java:Constructor {
'class: "java.io.File",
paramTypes: ["java.lang.String"]
} external;

function getCurrent(handle prop) returns handle = @java:Method {
name: "getProperty",
'class: "java.lang.System"
} external;

function getUsableSpace(handle fileObj) returns int = @java:Method {
'class: "java.io.File"
} external;

isolated function getTotalSpace(handle fileObj) returns int = @java:Method {
'class: "java.io.File"
} external;

isolated function startConsolidator() returns error? {
do {
Expand Down
7 changes: 7 additions & 0 deletions consolidator/init_consolidator.bal
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import consolidatorService.config;
import consolidatorService.inittopic as initt;
import consolidatorService.util;
import consolidatorService.connections as conn;
import ballerina/http;
import ballerina/lang.runtime;

isolated map<websubhub:TopicRegistration> registeredTopicsCache = {};
isolated map<websubhub:VerifiedSubscription> subscribersCache = {};
Expand All @@ -31,6 +33,11 @@ public function main() returns error? {
if result is error {
return result;
}
// Start the Hub
http:Listener httpListener = check new (config:CONSOLIDATOR_PORT);
check httpListener.attach(healthCheckService, config:CONSOLIDATOR_HEALTH_ENDPOINT);
check httpListener.'start();
runtime:registerListener(httpListener);
// Initialize consolidator-service state
check syncRegsisteredTopicsCache();
_ = check conn:consolidatedTopicsConsumer->close(config:GRACEFUL_CLOSE_PERIOD);
Expand Down
11 changes: 11 additions & 0 deletions consolidator/modules/config/configurations.bal
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,14 @@ public configurable decimal POLLING_INTERVAL = 10;
public configurable decimal GRACEFUL_CLOSE_PERIOD = 5;

public final string CONSTRUCTED_CONSUMER_ID = util:generateRandomString();

public final string CURRENT_WORKING_DIR = "user.dir";

# The disk space threshold for healthcheck
public configurable int DISK_SPACE_THRESHOLD = 10485760;

# The port that is used to start the consolidator
public configurable int CONSOLIDATOR_PORT = 9192;

# consolidator health endpoint
public configurable string CONSOLIDATOR_HEALTH_ENDPOINT = "/consolidator/actuator/health";
10 changes: 10 additions & 0 deletions consolidator/modules/health_check/health_check.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
public type HealthCheckResp record {|
string status;
map<anydata> details;
|};

public type DiskSpaceMetaData record {|
int total;
int free;
int threshold;
|};
20 changes: 20 additions & 0 deletions consolidator/modules/kafka_health_check/kafka_health_check.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import ballerina/jballerina.java;
import consolidatorService.config;

public function describeTopicKafka(string topic) returns handle|error? {
handle bootStrapServer = java:fromString(config:KAFKA_BOOTSTRAP_NODE);
handle newMosipKafkaAdminClientResult = newMosipKafkaAdminClient(bootStrapServer);
handle|error? result = trap describeTopic(newMosipKafkaAdminClientResult, java:fromString(topic));
return result;
}

function newMosipKafkaAdminClient(handle bootstrapServers) returns handle = @java:Constructor {
'class: "io.mosip.kafkaadminclient.MosipKafkaAdminClient",
paramTypes: ["java.lang.String"]
} external;

function describeTopic(handle adminClinetObject, handle topic) returns handle|error? = @java:Method {
name: "describeTopic",
'class: "io.mosip.kafkaadminclient.MosipKafkaAdminClient",
paramTypes: ["java.lang.String"]
} external;
35 changes: 23 additions & 12 deletions hub/hub_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import ballerinax/kafka;

http:Service healthCheckService = service object {

resource function get .() returns healthcheck:HealthCheckResp {
resource function get .() returns http:Ok|http:ServiceUnavailable {
//diskspace
string diskSpaceStatus = "DOWN";
handle handleStr = java:fromString(config:CURRENT_WORKING_DIR);
Expand All @@ -42,22 +42,33 @@ http:Service healthCheckService = service object {
}
healthcheck:HealthCheckResp diskSpace = {status: diskSpaceStatus, details: {diskSpaceMetaData}};

//kafka
string kafkaStatus = "DOWN";
kafka:TopicPartition[]|kafka:Error producerResult = conn:statePersistProducer->getTopicPartitions(config:REGISTERED_WEBSUB_TOPICS_TOPIC);
if (producerResult is kafka:TopicPartition[]) {
kafkaStatus = "UP";
//consolidator
string consolidatorStatus = "DOWN";
http:Client|http:ClientError clientEndpoint = new (config:CONSOLIDATOR_BASE_URL);
if(clientEndpoint is http:ClientError){
log:printError(clientEndpoint.message());
}else{
healthcheck:HealthCheckResp|error consolidatorHealth = clientEndpoint -> get(config:CONSOLIDATOR_HEALTH_ENDPOINT);
if(consolidatorHealth is healthcheck:HealthCheckResp){
consolidatorStatus = consolidatorHealth.status;
}
healthcheck:HealthCheckResp kafkaHealth = {status: kafkaStatus, details: {}};
}
healthcheck:HealthCheckResp consolidatorSHealth = {status: consolidatorStatus, details: {}};
//add to main map
map<healthcheck:HealthCheckResp> details = {
"diskSpace": diskSpace,
"kafka": kafkaHealth
"consolidator": consolidatorSHealth
};

//main object
healthcheck:HealthCheckResp healthCheckResp = {status: "UP", details: {details}};
return healthCheckResp;
string resultStatus = "DOWN";
if(diskSpaceStatus == "UP" && consolidatorStatus == "UP"){
resultStatus = "UP";
healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}};
http:Ok res = {body: healthCheckResp};
return res;
}
healthcheck:HealthCheckResp healthCheckResp = {status: resultStatus, details: {details}};
http:ServiceUnavailable res = {body: healthCheckResp};
return res;
}
};

Expand Down
6 changes: 6 additions & 0 deletions hub/modules/config/configurations.bal
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,9 @@ public configurable int KAFKA_CONSUMER_FETCH_MAX_BYTES = 3145728;

# The maximum retry count
public configurable int KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES = 524288;

# consolidator base url
public configurable string CONSOLIDATOR_BASE_URL = "http://websub-consolidator";

# consolidator health endpoint
public configurable string CONSOLIDATOR_HEALTH_ENDPOINT = "/consolidator/actuator/health";

0 comments on commit d817a3c

Please sign in to comment.