Skip to content

Commit

Permalink
refactor: retrofit replay functionality in DHIS2-RapidPro solution wi… (
Browse files Browse the repository at this point in the history
#198)

* refactor: retrofit replay functionality in DHIS2-RapidPro solution with Camel HIE

Refs: https://dhis2.atlassian.net/browse/INTEROP-37

* ci: force snapshot dependencies to be updated

* fix: customise Camel HIE SQL for PostgreSQL

refactor: remove SQL which is no longer needed thanks to Camel HIE

* fix: remove bad casting

* fix: correct SQL query

---------

Co-authored-by: cjmamo <823038+claudemamo@users.noreply.github.com>
  • Loading branch information
cjmamo and cjmamo authored Dec 11, 2024
1 parent e7406b6 commit 340c5c1
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 192 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ jobs:
java-version: '17'
distribution: 'adopt'
cache: maven
- run: mvn -B package -Ddhis.image.name=${{ matrix.dhis-image-name }}
- run: mvn -B clean test -Ddhis.image.name=${{ matrix.dhis-image-name }} -Dspring.sql.init.platform=postgresql
- run: mvn -B -U package -Ddhis.image.name=${{ matrix.dhis-image-name }}
- run: mvn -B -U clean test -Ddhis.image.name=${{ matrix.dhis-image-name }} -Dspring.sql.init.platform=postgresql
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,4 @@
</snapshots>
</repository>
</repositories>
</project>
</project>
33 changes: 25 additions & 8 deletions src/main/java/org/hisp/dhis/integration/rapidpro/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer;
import org.springframework.boot.autoconfigure.jms.artemis.ArtemisProperties;
import org.springframework.boot.context.event.ApplicationContextInitializedEvent;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
Expand All @@ -65,6 +67,7 @@
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.List;
Expand Down Expand Up @@ -126,9 +129,6 @@ public class Application extends CamelHieBootApp
@Autowired
private ArtemisProperties artemisProperties;

@Autowired
private KeyStoreGenerator keyStoreGenerator;

@Autowired
private CamelContext camelContext;

Expand All @@ -143,7 +143,7 @@ public class Application extends CamelHieBootApp

@Autowired
private DataSource dataSource;

@PostConstruct
public void postConstruct()
throws
Expand Down Expand Up @@ -191,10 +191,6 @@ public void postConstruct()
testRapidProConnection();
}
FileUtils.forceMkdir( new File( routesReloadDirectory ) );
if ( serverSslEnabled )
{
keyStoreGenerator.generate();
}
camelContext.getRegistry().bind( "native", nativeDataSonnetLibrary );
}

Expand All @@ -203,6 +199,27 @@ public static void main( String[] args )
SQLException
{
SpringApplication springApplication = new SpringApplication( Application.class );
springApplication.addListeners( event -> {
if ( event instanceof ApplicationContextInitializedEvent )
{
ApplicationContext applicationContext = ((ApplicationContextInitializedEvent) event).getApplicationContext();
String property = applicationContext.getEnvironment().getProperty( "server.ssl.enabled" );
if ( property == null || property.equalsIgnoreCase( "true" ) )
{
KeyStoreGenerator keyStoreGenerator = new KeyStoreGenerator();
try
{
keyStoreGenerator.generate();
}
catch ( MalformedURLException e )
{
throw new Dhis2RapidProException();
}
}

}
} );

springApplication.run( args );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Exchange aggregate( Exchange oldExchange, Exchange newExchange )
Map<String, Object> contact = (Map<String, Object>) newExchange.getMessage().getBody( Iterator.class ).next();
String contactUuid = (String) ((Map<String, Object>) oldExchange.getMessage().getBody( Map.class )
.get( "contact" )).get( "uuid" );
LOGGER.debug( String.format( "Fetched contact %s => %s ", contactUuid, contact ) );
LOGGER.debug( "Fetched contact {} => {} ", contactUuid, contact );
oldExchange.getMessage()
.setHeader( "orgUnitId", JsonPath.read( contact, "$.fields.dhis2_organisation_unit_id" ) );
return oldExchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
*/
package org.hisp.dhis.integration.rapidpro.route;

import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
Expand Down Expand Up @@ -65,26 +64,6 @@ public class DeliverReportRouteBuilder extends AbstractRouteBuilder
@Override
protected void doConfigure()
{
ErrorHandlerFactory errorHandlerDefinition = deadLetterChannel(
"direct:dlq" ).maximumRedeliveries( 3 ).useExponentialBackOff().useCollisionAvoidance()
.allowRedeliveryWhileStopping( false );

from( "timer://retryReports?fixedRate=true&period=5000" )
.routeId( "Retry Reports" )
.setBody( simple( "${properties:report.retry.dlc.select.{{spring.sql.init.platform}}}" ) )
.to( "jdbc:dataSource" )
.split().body()
.setHeader( "id", simple( "${body['id']}" ) )
.log( LoggingLevel.INFO, LOGGER, "Retrying row with ID ${header.id}" )
.setHeader( "dataSetCode", simple( "${body['data_set_code']}" ) )
.setHeader( "reportPeriodOffset", simple( "${body['report_period_offset']}" ) )
.setHeader( "orgUnitId", simple( "${body['organisation_unit_id']}" ) )
.setBody( simple( "${body['payload']}" ) )
.to( "jms:queue:dhis2AggregateReports?exchangePattern=InOnly" )
.setBody( simple( "${properties:report.processed.dlc.update.{{spring.sql.init.platform}}}" ) )
.to( "jdbc:dataSource?useHeadersAsParameters=true" )
.end();

from( "quartz://dhis2AggregateReports?cron={{report.delivery.schedule.expression}}" )
.routeId( "Schedule Report Delivery" )
.precondition( "'{{report.delivery.schedule.expression:}}' != ''" )
Expand All @@ -94,6 +73,7 @@ protected void doConfigure()
from( "jms:queue:dhis2AggregateReports" )
.routeId( "Consume Report" )
.precondition( "'{{report.delivery.schedule.expression:}}' == ''" )
.kamelet( "hie-create-replay-checkpoint-action" )
.to( "direct:deliverReport" );

from( "direct:deliverReport" )
Expand All @@ -103,7 +83,6 @@ protected void doConfigure()

from( "direct:transformReport" )
.routeId( "Transform Report" )
.errorHandler( errorHandlerDefinition )
.streamCache("true")
.setHeader( "originalPayload", simple( "${body}" ) )
.unmarshal().json()
Expand Down Expand Up @@ -134,7 +113,6 @@ protected void doConfigure()

from( "direct:transmitReport" )
.routeId( "Transmit Report" )
.errorHandler( errorHandlerDefinition )
.log( LoggingLevel.INFO, LOGGER, "Saving data value set => ${body}" )
.setHeader( "dhisRequest", simple( "${body}" ) )
.toD( "dhis2://post/resource?path=dataValueSets&inBody=resource&client=#dhis2Client" )
Expand All @@ -145,20 +123,11 @@ protected void doConfigure()
.when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) )
.to( "direct:completeDataSetRegistration" )
.otherwise()
.log( LoggingLevel.ERROR, LOGGER, "Import error from DHIS2 while saving data value set => ${body}" )
.to( "direct:dlq" )
.setHeader( "errorMessage", simple( "Import error from DHIS2 while saving data value set => ${body}" ) )
.log( LoggingLevel.ERROR, LOGGER, "${header.errorMessage}" )
.kamelet( "hie-fail-replay-checkpoint-action" )
.end();

from( "direct:dlq" )
.routeId( "Save Failed Report" )
.log( LoggingLevel.ERROR, "${exception}" )
.setHeader( "errorMessage", rootCauseExpr )
.setHeader( "payload", header( "originalPayload" ) )
.setHeader( "orgUnitId" ).ognl( "request.headers.orgUnitId" )
.setHeader( "dataSetCode" ).ognl( "request.headers.dataSetCode" )
.setBody( simple( "${properties:report.error.dlc.insert.{{spring.sql.init.platform}}}" ) )
.to( "jdbc:dataSource?useHeadersAsParameters=true" );

from( "direct:computePeriod" )
.routeId( "Compute Period" )
.toD( "dhis2://get/collection?path=dataSets&arrayName=dataSets&filter=code:eq:${headers['dataSetCode']}&fields=periodType&client=#dhis2Client" )
Expand All @@ -177,8 +146,9 @@ protected void doConfigure()
.setBody( simple( "${properties:report.success.log.insert.{{spring.sql.init.platform}}}" ) )
.to( "jdbc:dataSource?useHeadersAsParameters=true" )
.otherwise()
.log( LoggingLevel.ERROR, LOGGER, "Error from DHIS2 while completing data set registration => ${body}" )
.to( "direct:dlq" )
.setHeader( "errorMessage", simple( "Error from DHIS2 while completing data set registration => ${body}" ) )
.log( LoggingLevel.ERROR, LOGGER, "${header.errorMessage}" )
.kamelet( "hie-fail-replay-checkpoint-action" )
.end();
}
}
1 change: 0 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ camel.springboot.main-run-controller=true
camel.springboot.routes-reload-enabled=true
camel.springboot.routes-reload-directory=routes
camel.springboot.routes-reload-remove-all-routes=false
camel.springboot.routes-include-pattern=file:routes/*

rapidpro.webhook.enabled=false
rapidpro.contact.scheme=whatsapp
10 changes: 0 additions & 10 deletions src/main/resources/banner.txt

This file was deleted.

27 changes: 0 additions & 27 deletions src/main/resources/schema-h2.sql
Original file line number Diff line number Diff line change
@@ -1,32 +1,5 @@
ALTER TABLE IF EXISTS DEAD_LETTER_CHANNEL RENAME TO REPORT_DEAD_LETTER_CHANNEL;
ALTER TABLE IF EXISTS SUCCESS_LOG RENAME TO REPORT_SUCCESS_LOG;

CREATE TABLE IF NOT EXISTS REPORT_DEAD_LETTER_CHANNEL (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
payload VARCHAR NOT NULL,
data_set_code VARCHAR,
report_period_offset INTEGER NOT NULL,
organisation_unit_id VARCHAR,
error_message VARCHAR NOT NULL,
status ENUM('PROCESSED', 'RETRY', 'ERROR') NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX ON REPORT_DEAD_LETTER_CHANNEL(status);

CREATE TABLE IF NOT EXISTS EVENT_DEAD_LETTER_CHANNEL (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
payload VARCHAR NOT NULL,
event_id VARCHAR NOT NULL,
error_message VARCHAR NOT NULL,
status ENUM('PROCESSED', 'RETRY', 'ERROR') NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX ON EVENT_DEAD_LETTER_CHANNEL(status);

CREATE TABLE IF NOT EXISTS POLLER (
flow_uuid VARCHAR PRIMARY KEY,
last_run_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
Expand Down
32 changes: 7 additions & 25 deletions src/main/resources/schema-postgresql.sql
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
ALTER TABLE IF EXISTS DEAD_LETTER_CHANNEL RENAME TO REPORT_DEAD_LETTER_CHANNEL;
ALTER TABLE IF EXISTS SUCCESS_LOG RENAME TO REPORT_SUCCESS_LOG;

CREATE TABLE IF NOT EXISTS REPORT_DEAD_LETTER_CHANNEL (
id BIGSERIAL PRIMARY KEY,
payload VARCHAR NOT NULL,
data_set_code VARCHAR,
report_period_offset INTEGER NOT NULL,
organisation_unit_id VARCHAR,
error_message VARCHAR NOT NULL,
status VARCHAR NOT NULL CHECK (status = 'PROCESSED' OR status = 'RETRY' OR status = 'ERROR'),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
CREATE TABLE IF NOT EXISTS MESSAGE_STORE (
id BIGSERIAL PRIMARY KEY,
key_ VARCHAR NOT NULL,
headers VARCHAR NOT NULL,
body VARCHAR,
context VARCHAR,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX ON REPORT_DEAD_LETTER_CHANNEL(status);

CREATE TABLE IF NOT EXISTS EVENT_DEAD_LETTER_CHANNEL (
id BIGSERIAL PRIMARY KEY,
payload VARCHAR NOT NULL,
event_id VARCHAR NOT NULL,
error_message VARCHAR NOT NULL,
status VARCHAR NOT NULL CHECK (status = 'PROCESSED' OR status = 'RETRY' OR status = 'ERROR'),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX ON EVENT_DEAD_LETTER_CHANNEL(status);

CREATE TABLE IF NOT EXISTS POLLER (
flow_uuid VARCHAR PRIMARY KEY,
last_run_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
Expand Down
18 changes: 6 additions & 12 deletions src/main/resources/sql.properties
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
report.success.log.insert.h2=INSERT INTO REPORT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, data_set_code, report_period_offset, organisation_unit_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId)
report.retry.dlc.select.h2=SELECT * FROM REPORT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100
report.error.dlc.insert.h2=INSERT INTO REPORT_DEAD_LETTER_CHANNEL (payload, data_set_code, report_period_offset, organisation_unit_id, status, error_message) VALUES (:?payload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId, 'ERROR', :?errorMessage)
report.processed.dlc.update.h2=UPDATE REPORT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id
event.success.log.insert.h2=INSERT INTO EVENT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, event_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?eventId)
event.retry.dlc.select.h2=SELECT * FROM EVENT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100
event.error.dlc.insert.h2=INSERT INTO EVENT_DEAD_LETTER_CHANNEL (payload, event_id, status, error_message) VALUES (:?payload, :?eventId, 'ERROR', :?errorMessage)
event.processed.dlc.update.h2=UPDATE EVENT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id
last.run.select.h2=SELECT * FROM POLLER WHERE flow_uuid = :?flowUuid
last.run.upsert.h2=MERGE INTO POLLER (flow_uuid, last_run_at) VALUES (:?flowUuid, :?newLastRunAt)

report.success.log.insert.postgresql=INSERT INTO REPORT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, data_set_code, report_period_offset, organisation_unit_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId)
report.retry.dlc.select.postgresql=SELECT * FROM REPORT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100
report.error.dlc.insert.postgresql=INSERT INTO REPORT_DEAD_LETTER_CHANNEL (payload, data_set_code, report_period_offset, organisation_unit_id, status, error_message) VALUES (:?payload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId, 'ERROR', :?errorMessage)
report.processed.dlc.update.postgresql=UPDATE REPORT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id
event.success.log.insert.postgresql=INSERT INTO EVENT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, event_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?eventId)
event.retry.dlc.select.postgresql=SELECT * FROM EVENT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100
event.error.dlc.insert.postgresql=INSERT INTO EVENT_DEAD_LETTER_CHANNEL (payload, event_id, status, error_message) VALUES (:?payload, :?event_id, 'ERROR', :?errorMessage)
event.processed.dlc.update.postgresql=UPDATE EVENT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id
last.run.select.postgresql=SELECT * FROM POLLER WHERE flow_uuid = :?flowUuid
last.run.upsert.postgresql=INSERT INTO POLLER (flow_uuid, last_run_at) VALUES (:?flowUuid, :?newLastRunAt) ON CONFLICT (flow_uuid) DO UPDATE SET last_run_at = :?newLastRunAt

message.store.insert.postgresql=INSERT INTO MESSAGE_STORE (key_, headers, body, context) VALUES (:?key, :?headers, :?body, :?context)
message.store.select.postgresql=SELECT * FROM MESSAGE_STORE WHERE key_ LIKE :?key
message.store.select.range.postgresql=SELECT * FROM MESSAGE_STORE WHERE key_ LIKE :?key OFFSET :?offset ROWS FETCH FIRST :?limit ROWS ONLY
message.store.delete.postgresql=DELETE FROM MESSAGE_STORE WHERE key_ LIKE :?key RETURNING *
message.store.move.postgresql=WITH OLD AS (SELECT * FROM MESSAGE_STORE WHERE key_ = :?oldKey) UPDATE MESSAGE_STORE SET key_ = :?newKey WHERE key_ = :?oldKey RETURNING *, (SELECT key_ FROM OLD);
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void beforeEach()
System.clearProperty( "rapidpro.flow.uuids" );
System.clearProperty( "rapidpro.webhook.enabled" );

jdbcTemplate.execute( "TRUNCATE TABLE REPORT_DEAD_LETTER_CHANNEL" );
jdbcTemplate.execute( "TRUNCATE TABLE MESSAGE_STORE" );
jdbcTemplate.execute( "TRUNCATE TABLE REPORT_SUCCESS_LOG" );
jdbcTemplate.execute( "TRUNCATE TABLE MESSAGES" );

Expand Down
Loading

0 comments on commit 340c5c1

Please sign in to comment.