From 340c5c15725ec11058c298f9d36d844a9e6570ca Mon Sep 17 00:00:00 2001 From: cjmamo <823038+cjmamo@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:29:13 +0100 Subject: [PATCH] =?UTF-8?q?refactor:=20retrofit=20replay=20functionality?= =?UTF-8?q?=20in=20DHIS2-RapidPro=20solution=20wi=E2=80=A6=20(#198)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: retrofit replay functionality in DHIS2-RapidPro solution with Camel HIE Refs: https://dhis2.atlassian.net/browse/INTEROP-37 * ci: force snapshot dependencies to be updated * fix: customise Camel HIE SQL for PostgreSQL refactor: remove SQL which is no longer needed thanks to Camel HIE * fix: remove bad casting * fix: correct SQL query --------- Co-authored-by: cjmamo <823038+claudemamo@users.noreply.github.com> --- .github/workflows/ci.yml | 4 +- pom.xml | 2 +- .../integration/rapidpro/Application.java | 33 ++++-- .../ContactOrgUnitIdAggrStrategy.java | 2 +- .../route/DeliverReportRouteBuilder.java | 44 ++----- src/main/resources/application.properties | 1 - src/main/resources/banner.txt | 10 -- src/main/resources/schema-h2.sql | 27 ----- src/main/resources/schema-postgresql.sql | 32 ++---- src/main/resources/sql.properties | 18 +-- .../rapidpro/AbstractFunctionalTestCase.java | 2 +- ...rReportRouteBuilderFunctionalTestCase.java | 107 ++++++++---------- ...eEventsRouteBuilderFunctionalTestCase.java | 3 - .../resources/application-test.properties | 3 +- 14 files changed, 96 insertions(+), 192 deletions(-) delete mode 100644 src/main/resources/banner.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60872899..32789632 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,5 +26,5 @@ jobs: java-version: '17' distribution: 'adopt' cache: maven - - run: mvn -B package -Ddhis.image.name=${{ matrix.dhis-image-name }} - - run: mvn -B clean test -Ddhis.image.name=${{ matrix.dhis-image-name }} -Dspring.sql.init.platform=postgresql + - run: mvn -B -U package -Ddhis.image.name=${{ matrix.dhis-image-name }} + - run: mvn -B -U clean test -Ddhis.image.name=${{ matrix.dhis-image-name }} -Dspring.sql.init.platform=postgresql diff --git a/pom.xml b/pom.xml index f10adba3..3b76d28a 100644 --- a/pom.xml +++ b/pom.xml @@ -337,4 +337,4 @@ - + \ No newline at end of file diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/Application.java b/src/main/java/org/hisp/dhis/integration/rapidpro/Application.java index 34d1ac5d..57db0697 100644 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/Application.java +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/Application.java @@ -52,7 +52,9 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer; import org.springframework.boot.autoconfigure.jms.artemis.ArtemisProperties; +import org.springframework.boot.context.event.ApplicationContextInitializedEvent; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; @@ -65,6 +67,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.List; @@ -126,9 +129,6 @@ public class Application extends CamelHieBootApp @Autowired private ArtemisProperties artemisProperties; - @Autowired - private KeyStoreGenerator keyStoreGenerator; - @Autowired private CamelContext camelContext; @@ -143,7 +143,7 @@ public class Application extends CamelHieBootApp @Autowired private DataSource dataSource; - + @PostConstruct public void postConstruct() throws @@ -191,10 +191,6 @@ public void postConstruct() testRapidProConnection(); } FileUtils.forceMkdir( new File( routesReloadDirectory ) ); - if ( serverSslEnabled ) - { - keyStoreGenerator.generate(); - } camelContext.getRegistry().bind( "native", nativeDataSonnetLibrary ); } @@ -203,6 +199,27 @@ public static void main( String[] args ) SQLException { SpringApplication springApplication = new SpringApplication( Application.class ); + springApplication.addListeners( event -> { + if ( event instanceof ApplicationContextInitializedEvent ) + { + ApplicationContext applicationContext = ((ApplicationContextInitializedEvent) event).getApplicationContext(); + String property = applicationContext.getEnvironment().getProperty( "server.ssl.enabled" ); + if ( property == null || property.equalsIgnoreCase( "true" ) ) + { + KeyStoreGenerator keyStoreGenerator = new KeyStoreGenerator(); + try + { + keyStoreGenerator.generate(); + } + catch ( MalformedURLException e ) + { + throw new Dhis2RapidProException(); + } + } + + } + } ); + springApplication.run( args ); } diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ContactOrgUnitIdAggrStrategy.java b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ContactOrgUnitIdAggrStrategy.java index 96e80327..cb217a2b 100644 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ContactOrgUnitIdAggrStrategy.java +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ContactOrgUnitIdAggrStrategy.java @@ -48,7 +48,7 @@ public Exchange aggregate( Exchange oldExchange, Exchange newExchange ) Map contact = (Map) newExchange.getMessage().getBody( Iterator.class ).next(); String contactUuid = (String) ((Map) oldExchange.getMessage().getBody( Map.class ) .get( "contact" )).get( "uuid" ); - LOGGER.debug( String.format( "Fetched contact %s => %s ", contactUuid, contact ) ); + LOGGER.debug( "Fetched contact {} => {} ", contactUuid, contact ); oldExchange.getMessage() .setHeader( "orgUnitId", JsonPath.read( contact, "$.fields.dhis2_organisation_unit_id" ) ); return oldExchange; diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java index 8a996779..8e5c46ae 100644 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java @@ -27,7 +27,6 @@ */ package org.hisp.dhis.integration.rapidpro.route; -import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy; @@ -65,26 +64,6 @@ public class DeliverReportRouteBuilder extends AbstractRouteBuilder @Override protected void doConfigure() { - ErrorHandlerFactory errorHandlerDefinition = deadLetterChannel( - "direct:dlq" ).maximumRedeliveries( 3 ).useExponentialBackOff().useCollisionAvoidance() - .allowRedeliveryWhileStopping( false ); - - from( "timer://retryReports?fixedRate=true&period=5000" ) - .routeId( "Retry Reports" ) - .setBody( simple( "${properties:report.retry.dlc.select.{{spring.sql.init.platform}}}" ) ) - .to( "jdbc:dataSource" ) - .split().body() - .setHeader( "id", simple( "${body['id']}" ) ) - .log( LoggingLevel.INFO, LOGGER, "Retrying row with ID ${header.id}" ) - .setHeader( "dataSetCode", simple( "${body['data_set_code']}" ) ) - .setHeader( "reportPeriodOffset", simple( "${body['report_period_offset']}" ) ) - .setHeader( "orgUnitId", simple( "${body['organisation_unit_id']}" ) ) - .setBody( simple( "${body['payload']}" ) ) - .to( "jms:queue:dhis2AggregateReports?exchangePattern=InOnly" ) - .setBody( simple( "${properties:report.processed.dlc.update.{{spring.sql.init.platform}}}" ) ) - .to( "jdbc:dataSource?useHeadersAsParameters=true" ) - .end(); - from( "quartz://dhis2AggregateReports?cron={{report.delivery.schedule.expression}}" ) .routeId( "Schedule Report Delivery" ) .precondition( "'{{report.delivery.schedule.expression:}}' != ''" ) @@ -94,6 +73,7 @@ protected void doConfigure() from( "jms:queue:dhis2AggregateReports" ) .routeId( "Consume Report" ) .precondition( "'{{report.delivery.schedule.expression:}}' == ''" ) + .kamelet( "hie-create-replay-checkpoint-action" ) .to( "direct:deliverReport" ); from( "direct:deliverReport" ) @@ -103,7 +83,6 @@ protected void doConfigure() from( "direct:transformReport" ) .routeId( "Transform Report" ) - .errorHandler( errorHandlerDefinition ) .streamCache("true") .setHeader( "originalPayload", simple( "${body}" ) ) .unmarshal().json() @@ -134,7 +113,6 @@ protected void doConfigure() from( "direct:transmitReport" ) .routeId( "Transmit Report" ) - .errorHandler( errorHandlerDefinition ) .log( LoggingLevel.INFO, LOGGER, "Saving data value set => ${body}" ) .setHeader( "dhisRequest", simple( "${body}" ) ) .toD( "dhis2://post/resource?path=dataValueSets&inBody=resource&client=#dhis2Client" ) @@ -145,20 +123,11 @@ protected void doConfigure() .when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) ) .to( "direct:completeDataSetRegistration" ) .otherwise() - .log( LoggingLevel.ERROR, LOGGER, "Import error from DHIS2 while saving data value set => ${body}" ) - .to( "direct:dlq" ) + .setHeader( "errorMessage", simple( "Import error from DHIS2 while saving data value set => ${body}" ) ) + .log( LoggingLevel.ERROR, LOGGER, "${header.errorMessage}" ) + .kamelet( "hie-fail-replay-checkpoint-action" ) .end(); - from( "direct:dlq" ) - .routeId( "Save Failed Report" ) - .log( LoggingLevel.ERROR, "${exception}" ) - .setHeader( "errorMessage", rootCauseExpr ) - .setHeader( "payload", header( "originalPayload" ) ) - .setHeader( "orgUnitId" ).ognl( "request.headers.orgUnitId" ) - .setHeader( "dataSetCode" ).ognl( "request.headers.dataSetCode" ) - .setBody( simple( "${properties:report.error.dlc.insert.{{spring.sql.init.platform}}}" ) ) - .to( "jdbc:dataSource?useHeadersAsParameters=true" ); - from( "direct:computePeriod" ) .routeId( "Compute Period" ) .toD( "dhis2://get/collection?path=dataSets&arrayName=dataSets&filter=code:eq:${headers['dataSetCode']}&fields=periodType&client=#dhis2Client" ) @@ -177,8 +146,9 @@ protected void doConfigure() .setBody( simple( "${properties:report.success.log.insert.{{spring.sql.init.platform}}}" ) ) .to( "jdbc:dataSource?useHeadersAsParameters=true" ) .otherwise() - .log( LoggingLevel.ERROR, LOGGER, "Error from DHIS2 while completing data set registration => ${body}" ) - .to( "direct:dlq" ) + .setHeader( "errorMessage", simple( "Error from DHIS2 while completing data set registration => ${body}" ) ) + .log( LoggingLevel.ERROR, LOGGER, "${header.errorMessage}" ) + .kamelet( "hie-fail-replay-checkpoint-action" ) .end(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7f2d8e1c..b32f99df 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -52,7 +52,6 @@ camel.springboot.main-run-controller=true camel.springboot.routes-reload-enabled=true camel.springboot.routes-reload-directory=routes camel.springboot.routes-reload-remove-all-routes=false -camel.springboot.routes-include-pattern=file:routes/* rapidpro.webhook.enabled=false rapidpro.contact.scheme=whatsapp \ No newline at end of file diff --git a/src/main/resources/banner.txt b/src/main/resources/banner.txt deleted file mode 100644 index 43dfb95b..00000000 --- a/src/main/resources/banner.txt +++ /dev/null @@ -1,10 +0,0 @@ - - ____ _ _ ___ ____ _ ____ _ _ ____ - | _ \| | | |_ _/ ___| | |_ ___ | _ \ __ _ _ __ (_) __| | _ \ _ __ ___ - | | | | |_| || |\___ \ _____| __/ _ \ _____| |_) / _` | '_ \| |/ _` | |_) | '__/ _ \ - | |_| | _ || | ___) |_____| || (_) |_____| _ < (_| | |_) | | (_| | __/| | | (_) | - |____/|_| |_|___|____/ \__\___/ |_| \_\__,_| .__/|_|\__,_|_| |_| \___/ - |_| - -Version: ${project.version} -Powered by Apache Camel v${camel.version} diff --git a/src/main/resources/schema-h2.sql b/src/main/resources/schema-h2.sql index dc3fd5e1..916fc34c 100644 --- a/src/main/resources/schema-h2.sql +++ b/src/main/resources/schema-h2.sql @@ -1,32 +1,5 @@ -ALTER TABLE IF EXISTS DEAD_LETTER_CHANNEL RENAME TO REPORT_DEAD_LETTER_CHANNEL; ALTER TABLE IF EXISTS SUCCESS_LOG RENAME TO REPORT_SUCCESS_LOG; -CREATE TABLE IF NOT EXISTS REPORT_DEAD_LETTER_CHANNEL ( - id INTEGER PRIMARY KEY AUTO_INCREMENT, - payload VARCHAR NOT NULL, - data_set_code VARCHAR, - report_period_offset INTEGER NOT NULL, - organisation_unit_id VARCHAR, - error_message VARCHAR NOT NULL, - status ENUM('PROCESSED', 'RETRY', 'ERROR') NOT NULL, - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP -); - -CREATE INDEX ON REPORT_DEAD_LETTER_CHANNEL(status); - -CREATE TABLE IF NOT EXISTS EVENT_DEAD_LETTER_CHANNEL ( - id INTEGER PRIMARY KEY AUTO_INCREMENT, - payload VARCHAR NOT NULL, - event_id VARCHAR NOT NULL, - error_message VARCHAR NOT NULL, - status ENUM('PROCESSED', 'RETRY', 'ERROR') NOT NULL, - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP -); - -CREATE INDEX ON EVENT_DEAD_LETTER_CHANNEL(status); - CREATE TABLE IF NOT EXISTS POLLER ( flow_uuid VARCHAR PRIMARY KEY, last_run_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP diff --git a/src/main/resources/schema-postgresql.sql b/src/main/resources/schema-postgresql.sql index cd1ee764..ca405b18 100644 --- a/src/main/resources/schema-postgresql.sql +++ b/src/main/resources/schema-postgresql.sql @@ -1,32 +1,14 @@ -ALTER TABLE IF EXISTS DEAD_LETTER_CHANNEL RENAME TO REPORT_DEAD_LETTER_CHANNEL; ALTER TABLE IF EXISTS SUCCESS_LOG RENAME TO REPORT_SUCCESS_LOG; -CREATE TABLE IF NOT EXISTS REPORT_DEAD_LETTER_CHANNEL ( - id BIGSERIAL PRIMARY KEY, - payload VARCHAR NOT NULL, - data_set_code VARCHAR, - report_period_offset INTEGER NOT NULL, - organisation_unit_id VARCHAR, - error_message VARCHAR NOT NULL, - status VARCHAR NOT NULL CHECK (status = 'PROCESSED' OR status = 'RETRY' OR status = 'ERROR'), - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +CREATE TABLE IF NOT EXISTS MESSAGE_STORE ( + id BIGSERIAL PRIMARY KEY, + key_ VARCHAR NOT NULL, + headers VARCHAR NOT NULL, + body VARCHAR, + context VARCHAR, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ); -CREATE INDEX ON REPORT_DEAD_LETTER_CHANNEL(status); - -CREATE TABLE IF NOT EXISTS EVENT_DEAD_LETTER_CHANNEL ( - id BIGSERIAL PRIMARY KEY, - payload VARCHAR NOT NULL, - event_id VARCHAR NOT NULL, - error_message VARCHAR NOT NULL, - status VARCHAR NOT NULL CHECK (status = 'PROCESSED' OR status = 'RETRY' OR status = 'ERROR'), - created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP -); - -CREATE INDEX ON EVENT_DEAD_LETTER_CHANNEL(status); - CREATE TABLE IF NOT EXISTS POLLER ( flow_uuid VARCHAR PRIMARY KEY, last_run_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP diff --git a/src/main/resources/sql.properties b/src/main/resources/sql.properties index 83996718..2ff717e5 100644 --- a/src/main/resources/sql.properties +++ b/src/main/resources/sql.properties @@ -1,21 +1,15 @@ report.success.log.insert.h2=INSERT INTO REPORT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, data_set_code, report_period_offset, organisation_unit_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId) -report.retry.dlc.select.h2=SELECT * FROM REPORT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100 -report.error.dlc.insert.h2=INSERT INTO REPORT_DEAD_LETTER_CHANNEL (payload, data_set_code, report_period_offset, organisation_unit_id, status, error_message) VALUES (:?payload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId, 'ERROR', :?errorMessage) -report.processed.dlc.update.h2=UPDATE REPORT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id event.success.log.insert.h2=INSERT INTO EVENT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, event_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?eventId) -event.retry.dlc.select.h2=SELECT * FROM EVENT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100 -event.error.dlc.insert.h2=INSERT INTO EVENT_DEAD_LETTER_CHANNEL (payload, event_id, status, error_message) VALUES (:?payload, :?eventId, 'ERROR', :?errorMessage) -event.processed.dlc.update.h2=UPDATE EVENT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id last.run.select.h2=SELECT * FROM POLLER WHERE flow_uuid = :?flowUuid last.run.upsert.h2=MERGE INTO POLLER (flow_uuid, last_run_at) VALUES (:?flowUuid, :?newLastRunAt) report.success.log.insert.postgresql=INSERT INTO REPORT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, data_set_code, report_period_offset, organisation_unit_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId) -report.retry.dlc.select.postgresql=SELECT * FROM REPORT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100 -report.error.dlc.insert.postgresql=INSERT INTO REPORT_DEAD_LETTER_CHANNEL (payload, data_set_code, report_period_offset, organisation_unit_id, status, error_message) VALUES (:?payload, :?dataSetCode, :?reportPeriodOffset, :?orgUnitId, 'ERROR', :?errorMessage) -report.processed.dlc.update.postgresql=UPDATE REPORT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id event.success.log.insert.postgresql=INSERT INTO EVENT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, event_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?eventId) -event.retry.dlc.select.postgresql=SELECT * FROM EVENT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100 -event.error.dlc.insert.postgresql=INSERT INTO EVENT_DEAD_LETTER_CHANNEL (payload, event_id, status, error_message) VALUES (:?payload, :?event_id, 'ERROR', :?errorMessage) -event.processed.dlc.update.postgresql=UPDATE EVENT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id last.run.select.postgresql=SELECT * FROM POLLER WHERE flow_uuid = :?flowUuid last.run.upsert.postgresql=INSERT INTO POLLER (flow_uuid, last_run_at) VALUES (:?flowUuid, :?newLastRunAt) ON CONFLICT (flow_uuid) DO UPDATE SET last_run_at = :?newLastRunAt + +message.store.insert.postgresql=INSERT INTO MESSAGE_STORE (key_, headers, body, context) VALUES (:?key, :?headers, :?body, :?context) +message.store.select.postgresql=SELECT * FROM MESSAGE_STORE WHERE key_ LIKE :?key +message.store.select.range.postgresql=SELECT * FROM MESSAGE_STORE WHERE key_ LIKE :?key OFFSET :?offset ROWS FETCH FIRST :?limit ROWS ONLY +message.store.delete.postgresql=DELETE FROM MESSAGE_STORE WHERE key_ LIKE :?key RETURNING * +message.store.move.postgresql=WITH OLD AS (SELECT * FROM MESSAGE_STORE WHERE key_ = :?oldKey) UPDATE MESSAGE_STORE SET key_ = :?newKey WHERE key_ = :?oldKey RETURNING *, (SELECT key_ FROM OLD); diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java index 3854c3e8..b2b17103 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java @@ -115,7 +115,7 @@ public void beforeEach() System.clearProperty( "rapidpro.flow.uuids" ); System.clearProperty( "rapidpro.webhook.enabled" ); - jdbcTemplate.execute( "TRUNCATE TABLE REPORT_DEAD_LETTER_CHANNEL" ); + jdbcTemplate.execute( "TRUNCATE TABLE MESSAGE_STORE" ); jdbcTemplate.execute( "TRUNCATE TABLE REPORT_SUCCESS_LOG" ); jdbcTemplate.execute( "TRUNCATE TABLE MESSAGES" ); diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java index e4d02954..ff9bb3c3 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java @@ -30,6 +30,7 @@ import static org.hisp.dhis.integration.rapidpro.Environment.DHIS_IMAGE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -44,7 +45,10 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelExecutionException; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.Message; import org.apache.camel.builder.AdviceWith; import org.apache.camel.component.mock.MockEndpoint; import org.hisp.dhis.api.model.v40_0.DataValue; @@ -53,8 +57,11 @@ import org.hisp.dhis.integration.rapidpro.AbstractFunctionalTestCase; import org.hisp.dhis.integration.rapidpro.Environment; import org.hisp.dhis.integration.sdk.support.period.PeriodBuilder; +import org.hisp.hieboot.camel.spi.MessageRepository; +import org.hisp.hieboot.camel.spi.RepositoryMessage; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.function.Executable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StreamUtils; @@ -65,6 +72,9 @@ public class DeliverReportRouteBuilderFunctionalTestCase extends AbstractFunctio @Autowired private ObjectMapper objectMapper; + @Autowired + private MessageRepository messageRepository; + @Test public void testDataValueSetIsCreated() throws @@ -144,13 +154,11 @@ public void testRecordInDeadLetterChannelIsCreatedGivenWebMessageErrorWhileCreat ExchangePattern.InOut, String.format( webhookMessage, contactUuid ), Map.of( "dataSetCode", "MAL_YEARLY", "orgUnitId", "acme" ) ); - List> deadLetterChannel = jdbcTemplate.queryForList( - "SELECT * FROM REPORT_DEAD_LETTER_CHANNEL" ); - assertEquals( 1, deadLetterChannel.size() ); - assertEquals( "ERROR", - objectMapper.readValue( (String) deadLetterChannel.get( 0 ).get( "error_message" ), - WebMessage.class ) - .getStatus().value() ); + List repositoryMessages = messageRepository.retrieve( "failed:*" ); + assertEquals( 1, repositoryMessages.size() ); + assertEquals( + "Import error from DHIS2 while saving data value set => {code=null, devMessage=null, httpStatus=null, httpStatusCode=null, message=null, response=null, status=ERROR}", + repositoryMessages.get( 0 ).getContext() ); } @Test @@ -173,14 +181,18 @@ public void testRecordInDeadLetterChannelIsCreatedGivenMissingDataSetCode() String webhookMessage = StreamUtils.copyToString( Thread.currentThread().getContextClassLoader().getResourceAsStream( "webhook.json" ), Charset.defaultCharset() ); - producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2AggregateReports", - ExchangePattern.InOut, String.format( webhookMessage, contactUuid ), - Map.of( "orgUnitId", Environment.ORG_UNIT_ID ) ); - List> deadLetterChannel = jdbcTemplate.queryForList( - "SELECT * FROM REPORT_DEAD_LETTER_CHANNEL" ); - assertEquals( 1, deadLetterChannel.size() ); - assertNull( deadLetterChannel.get( 0 ).get( "data_set_code" ) ); + assertThrows( CamelExecutionException.class, + () -> producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2AggregateReports", + ExchangePattern.InOut, String.format( webhookMessage, contactUuid ), + Map.of( "orgUnitId", Environment.ORG_UNIT_ID ) ) ); + + List repositoryMessages = messageRepository.retrieve( "failed:*" ); + assertEquals( 1, repositoryMessages.size() ); + Map payload = objectMapper.readValue( + (String) repositoryMessages.get( 0 ).getMessage().getBody(), + Map.class ); + assertNull( payload.get( "data_set_code" ) ); } @Test @@ -225,19 +237,17 @@ public void testRecordInDeadLetterChannelIsCreatedGivenErrorWhileCreatingDataVal Thread.currentThread().getContextClassLoader().getResourceAsStream( "webhook.json" ), Charset.defaultCharset() ); - producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2AggregateReports", ExchangePattern.InOut, - String.format( webhookMessage, UUID.randomUUID() ), Map.of( "dataSetCode", "MAL_YEARLY" ) ); + assertThrows( CamelExecutionException.class, + () -> producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2AggregateReports", ExchangePattern.InOut, + String.format( webhookMessage, UUID.randomUUID() ), Map.of( "dataSetCode", "MAL_YEARLY" ) ) ); - List> deadLetterChannel = jdbcTemplate.queryForList( - "SELECT * FROM REPORT_DEAD_LETTER_CHANNEL" ); - assertEquals( 1, deadLetterChannel.size() ); - assertEquals( "ERROR", deadLetterChannel.get( 0 ).get( "STATUS" ) ); - assertEquals( deadLetterChannel.get( 0 ).get( "CREATED_AT" ), - deadLetterChannel.get( 0 ).get( "LAST_PROCESSED_AT" ) ); - assertTrue( ((String) deadLetterChannel.get( 0 ).get( "ERROR_MESSAGE" )).startsWith( - "org.apache.camel.CamelExchangeException: Error occurred during aggregation." ) ); - Map payload = objectMapper.readValue( (String) deadLetterChannel.get( 0 ).get( "PAYLOAD" ), + List repositoryMessages = messageRepository.retrieve( "failed:*" ); + assertEquals( 1, repositoryMessages.size() ); + RepositoryMessage repositoryMessage = repositoryMessages.get( 0 ); + Map payload = objectMapper.readValue( (String) repositoryMessage.getMessage().getBody(), Map.class ); + assertTrue( repositoryMessage.getContext().startsWith( + "org.apache.camel.CamelExchangeException: Error occurred during aggregation." ) ); assertEquals( "John Doe", ((Map) payload.get( "contact" )).get( "name" ) ); } @@ -259,48 +269,21 @@ public void testRetryRecordInDeadLetterChannelIsReProcessed() Charset.defaultCharset() ); String wrongContactUuid = UUID.randomUUID().toString(); - producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2AggregateReports", ExchangePattern.InOut, - String.format( webhookMessage, wrongContactUuid ), Map.of( "dataSetCode", "MAL_YEARLY" ) ); + assertThrows( CamelExecutionException.class, + () -> producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2AggregateReports", ExchangePattern.InOut, + String.format( webhookMessage, wrongContactUuid ), Map.of( "dataSetCode", "MAL_YEARLY" ) ) ); + assertEquals( 0, spyEndpoint.getReceivedCounter() ); - String payload = (String) jdbcTemplate.queryForList( "SELECT payload FROM REPORT_DEAD_LETTER_CHANNEL" ).get( 0 ) - .get( "PAYLOAD" ); - jdbcTemplate.execute( - String.format( - "UPDATE REPORT_DEAD_LETTER_CHANNEL SET STATUS = 'RETRY', PAYLOAD = '%s' WHERE STATUS = 'ERROR'", - payload.replace( wrongContactUuid, contactUuid ) ) ); + List delete = messageRepository.delete( "*" ); + delete.get( 0 ).getMessage() + .setBody( ((String) delete.get( 0 ).getMessage().getBody()).replace( wrongContactUuid, contactUuid ) ); + messageRepository.store( delete.get( 0 ).getKey().replace( "failed:", "replay:" ), + delete.get( 0 ).getMessage() ); spyEndpoint.await( 1, TimeUnit.MINUTES ); assertEquals( 1, spyEndpoint.getReceivedCounter() ); - List> deadLetterChannel = jdbcTemplate.queryForList( - "SELECT * FROM REPORT_DEAD_LETTER_CHANNEL" ); - assertEquals( 1, deadLetterChannel.size() ); - assertEquals( "PROCESSED", deadLetterChannel.get( 0 ).get( "STATUS" ) ); - - Object lastProcessedAt = deadLetterChannel.get( 0 ).get( "LAST_PROCESSED_AT" ); - Instant lastProcessedAsInstant; - if ( lastProcessedAt instanceof OffsetDateTime ) - { - lastProcessedAsInstant = ((OffsetDateTime) lastProcessedAt).toInstant(); - } - else - { - lastProcessedAsInstant = ((Timestamp) lastProcessedAt).toInstant(); - } - - Object createdAt = deadLetterChannel.get( 0 ).get( "CREATED_AT" ); - Instant createdAtAsInstant; - if ( createdAt instanceof OffsetDateTime ) - { - createdAtAsInstant = ((OffsetDateTime) createdAt).toInstant(); - } - else - { - createdAtAsInstant = ((Timestamp) createdAt).toInstant(); - } - - assertTrue( lastProcessedAsInstant.isAfter( createdAtAsInstant ) ); } @Test @@ -334,4 +317,4 @@ public void testDataValueSetIsCreatedGivenOrgUnitIdSchemeIsCode() assertEquals( "2", dataValue.get().getValue().get() ); assertTrue( dataValue.get().getComment().isPresent() ); } -} +} \ No newline at end of file diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java index ad54b24a..26f7207f 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java @@ -63,9 +63,6 @@ public class QueueProgramStageEventsRouteBuilderFunctionalTestCase extends Abstr @Override public void doBeforeEach() - throws - IOException, - ParseException { System.setProperty( "dhis2.phone.number.attribute.code", "PHONE_LOCAL" ); System.setProperty( "dhis2.given.name.attribute.code", "GIVEN_NAME" ); diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index 2f98e0c8..cc940e0f 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -3,5 +3,4 @@ dhis2.api.password = district server.ssl.enabled=false spring.datasource.url=jdbc:h2:./target/dhis2rapidpro;AUTO_SERVER=TRUE;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH;CASE_INSENSITIVE_IDENTIFIERS=TRUE logging.file.name=target/dhis2rapidpro.log -camel.springboot.routes-reload-directory=target/routes -camel.springboot.routes-include-pattern=file:target/routes/* \ No newline at end of file +camel.springboot.routes-reload-directory=target/routes \ No newline at end of file