diff --git a/README.md b/README.md index f30f9473..c5551099 100644 --- a/README.md +++ b/README.md @@ -527,19 +527,18 @@ Besides being highly configurable, just about any piece of DHIS-to-RapidPro's fu Integration flows in DHIS-to-RapidPro, known as [routes](https://camel.apache.org/manual/routes.html) in Apache Camel, are named according to their purpose. You can override any route if you know its name. The following is a list of the important routes that you may want to override: -| Route name | Description | -|------------------------|----------------------------------------------------------------------------------------| -| RapidPro Webhook | Accepts and queues RapidPro webhook messages | -| Consume Report | De-queues the report for delivery to DHIS2 | -| Transform Report | Maps and enriches the report as received by RapidPro prior to transmitting it to DHIS2 | -| Transmit Report | Transmits the report to DHIS2 | -| Retry Reports | Re-queues reports marked for replay | -| Scan RapidPro Flows | Polls RapidPro for flow runs and queues them | -| Broadcast Reminders | Queries DHIS2 for overdue reports and sends any reminders to RapidPro | -| Set up RapidPro | Configures RapidPro for integration with DHIS2 | -| Create RapidPro Fields | Creates contact fields on RapidPro | -| Create RapidPro Group | Creates contact group on RapidPro | -| Sync RapidPro Contacts | Synchronises RapidPro contacts with DHIS2 users | +| Route ID | Description | +|----------------------|----------------------------------------------------------------------------------------| +| rapidproWebhook | Accepts and queues RapidPro webhook messages | +| consumeReport | De-queues the report for delivery to DHIS2 | +| transformReport | Maps and enriches the report as received by RapidPro prior to transmitting it to DHIS2 | +| transmitReport | Transmits the report to DHIS2 | +| scanRapidproFlows | Polls RapidPro for flow runs and queues them | +| broadcastReminders | Queries DHIS2 for overdue reports and sends any reminders to RapidPro | +| setUpRapidpro | Configures RapidPro for integration with DHIS2 | +| createRapidproFields | Creates contact fields on RapidPro | +| createRapidproGroup | Creates contact group on RapidPro | +| syncRapidproContacts | Synchronises RapidPro contacts with DHIS2 users | You should place the file or files containing the custom routes in a directory named `routes` within DHIS-to-RapidPro's current directory. The custom route will override the inbuilt route if the routes match by name. DHIS-to-RapidPro can reload the routes while its running therefore you have the option to extend the application at runtime. @@ -549,7 +548,7 @@ What follows is an example of a custom YAML route that overrides the inbuilt `Tr ```yaml - route: - id: "Transmit Report" + id: "transmitReport" from: uri: "direct:transmitReport" steps: diff --git a/pom.xml b/pom.xml index 3b76d28a..7dee9522 100644 --- a/pom.xml +++ b/pom.xml @@ -14,9 +14,9 @@ UTF-8 17 17 - 4.7.0 + 4.8.2 3.1.2 - 3.3.3 + 3.4.1 2.17.7 2.38.3.1 h2 @@ -87,7 +87,6 @@ spring-boot-maven-plugin ${spring.boot-version} - true ZIP diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/CompleteDataSetRegistrationFunction.java b/src/main/java/org/hisp/dhis/integration/rapidpro/CompleteDataSetRegistrationFunction.java deleted file mode 100644 index 8a871c02..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/CompleteDataSetRegistrationFunction.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro; - -import org.apache.camel.Exchange; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -@Component -public class CompleteDataSetRegistrationFunction implements Function -{ - - @Override - public Object apply( Exchange exchange ) - { - return Map.of( "completeDataSetRegistrations", List.of( - Map.of( "completed", true, "dataSet", exchange.getMessage().getHeader( "dataSetCode" ), "organisationUnit", - exchange.getMessage().getHeader( "orgUnitId" ), "period", - exchange.getMessage().getHeader( "period" ) ) ) ); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/DataElementCodeAggrStrategy.java b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/DataElementCodeAggrStrategy.java new file mode 100644 index 00000000..87861bff --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/DataElementCodeAggrStrategy.java @@ -0,0 +1,18 @@ +package org.hisp.dhis.integration.rapidpro.aggregationStrategy; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; + +import java.util.List; + +import static org.apache.camel.builder.Builder.jsonpath; + +public class DataElementCodeAggrStrategy implements AggregationStrategy +{ + @Override + public Exchange aggregate( Exchange oldExchange, Exchange newExchange ) + { + oldExchange.getMessage().setHeader( "dataElementCodes", jsonpath( "$.dataElements..code" ).evaluate( newExchange, List.class ) ); + return oldExchange; + } +} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/PeriodAggrStrategy.java b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/PeriodAggrStrategy.java new file mode 100644 index 00000000..892b3968 --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/PeriodAggrStrategy.java @@ -0,0 +1,14 @@ +package org.hisp.dhis.integration.rapidpro.aggregationStrategy; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; + +public class PeriodAggrStrategy implements AggregationStrategy +{ + @Override + public Exchange aggregate( Exchange oldExchange, Exchange newExchange ) + { + oldExchange.getMessage().setHeader( "period", newExchange.getMessage().getBody() ); + return oldExchange; + } +} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageEventsAggrStrategy.java b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageEventsAggrStrategy.java index 31488973..ac4a7435 100644 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageEventsAggrStrategy.java +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageEventsAggrStrategy.java @@ -41,8 +41,6 @@ public class ProgramStageEventsAggrStrategy extends AbstractAggregationStrategy @Override public Exchange doAggregate( Exchange oldExchange, Exchange newExchange ) - throws - Exception { if ( oldExchange == null ) { diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/expression/FlowUuidsExpr.java b/src/main/java/org/hisp/dhis/integration/rapidpro/expression/FlowUuidsExpr.java new file mode 100644 index 00000000..7278838b --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/expression/FlowUuidsExpr.java @@ -0,0 +1,48 @@ +package org.hisp.dhis.integration.rapidpro.expression; + +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.hisp.dhis.integration.rapidpro.ProgramStageToFlowMap; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +public class FlowUuidsExpr implements Expression +{ + @Autowired + private ProgramStageToFlowMap programStageToFlowMap; + + @Value( "${rapidpro.flow.uuids:}" ) + private String aggregateReportFlowUuids; + + @Override + public T evaluate( Exchange exchange, Class type ) + { + String programStageFlowUuids = programStageToFlowMap.getFlowUuids(); + String flowUuids = (programStageFlowUuids.isEmpty() && aggregateReportFlowUuids.isEmpty()) ? + "" : + String.join( ",", programStageFlowUuids, aggregateReportFlowUuids ); + return (T) flowUuids; + } + + public ProgramStageToFlowMap getProgramStageToFlowMap() + { + return programStageToFlowMap; + } + + public void setProgramStageToFlowMap( ProgramStageToFlowMap programStageToFlowMap ) + { + this.programStageToFlowMap = programStageToFlowMap; + } + + public String getAggregateReportFlowUuids() + { + return aggregateReportFlowUuids; + } + + public void setAggregateReportFlowUuids( String aggregateReportFlowUuids ) + { + this.aggregateReportFlowUuids = aggregateReportFlowUuids; + } +} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunAtColumnReader.java b/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunAtColumnReaderExpr.java similarity index 98% rename from src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunAtColumnReader.java rename to src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunAtColumnReaderExpr.java index 14ec88a6..9e47cb93 100644 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunAtColumnReader.java +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunAtColumnReaderExpr.java @@ -41,7 +41,7 @@ import java.util.Map; @Component -public class LastRunAtColumnReader implements Expression +public class LastRunAtColumnReaderExpr implements Expression { @Override public T evaluate( Exchange exchange, Class type ) diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunCalculator.java b/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunCalculatorExpr.java similarity index 97% rename from src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunCalculator.java rename to src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunCalculatorExpr.java index e42254f3..03da913a 100644 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunCalculator.java +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/expression/LastRunCalculatorExpr.java @@ -33,11 +33,10 @@ import java.sql.Timestamp; import java.time.Instant; -import java.util.List; import java.util.Map; @Component -public class LastRunCalculator implements Expression +public class LastRunCalculatorExpr implements Expression { @Override public T evaluate( Exchange exchange, Class type ) diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java b/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java deleted file mode 100644 index f8d4be80..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventStatusUpdateProcessor.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.processor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Component -public class EventStatusUpdateProcessor implements Processor -{ - @Autowired - private ObjectMapper objectMapper; - - @Override - public void process( Exchange exchange ) - throws - Exception - { - Map eventPayload = exchange.getProperty( "eventPayload", Map.class ); - eventPayload.put( "status", "ACTIVE" ); - eventPayload.put( "occurredAt", LocalDateTime.now().format( DateTimeFormatter.ofPattern( "yyyy-MM-dd" ) ) ); - exchange.getMessage().setBody( Map.of( "events", List.of( eventPayload ) ) ); - Map queryParams = new HashMap<>(); - queryParams.put( "async", "false" ); - queryParams.put( "importStrategy", "UPDATE" ); - exchange.getMessage().setHeader( "CamelDhis2.queryParams", queryParams ); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/processor/FetchDueEventsQueryParamSetter.java b/src/main/java/org/hisp/dhis/integration/rapidpro/processor/FetchDueEventsQueryParamSetter.java deleted file mode 100644 index cbbcda3a..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/processor/FetchDueEventsQueryParamSetter.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.processor; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.springframework.stereotype.Component; -import ujson.True; - -import java.time.LocalDate; -import java.util.HashMap; -import java.util.Map; - -@Component -public class FetchDueEventsQueryParamSetter implements Processor -{ - @Override - public void process( Exchange exchange ) - throws - Exception - { - String programStageId = (String) exchange.getProperty( "programStage" ); - String todayString = LocalDate.now().toString(); - Map queryParams = new HashMap<>(); - queryParams.put( "status", "SCHEDULE" ); - queryParams.put( "programStage", programStageId ); - queryParams.put("skipPaging", "false"); - queryParams.put( "occurredBefore", todayString ); - queryParams.put( "scheduledBefore", todayString ); - exchange.getMessage().setHeader( "CamelDhis2.queryParams", queryParams ); - - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/AbstractRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/AbstractRouteBuilder.java deleted file mode 100644 index 7d0a9f64..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/AbstractRouteBuilder.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.http.base.HttpOperationFailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractRouteBuilder extends RouteBuilder -{ - protected static final Logger LOGGER = LoggerFactory.getLogger( AbstractRouteBuilder.class ); - - @Override - public void configure() - throws Exception - { - onException( HttpOperationFailedException.class ) - .log( LoggingLevel.ERROR, LOGGER, - "HTTP response body => ${exchangeProperty.CamelExceptionCaught.responseBody}" ) - .process( exchange -> { - throw (Exception) exchange.getProperty( Exchange.EXCEPTION_CAUGHT ); - } ); - - doConfigure(); - } - - protected abstract void doConfigure() - throws Exception; -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java deleted file mode 100644 index 8e5c46ae..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilder.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy; -import org.hisp.dhis.api.model.v40_0.DataSet; -import org.hisp.dhis.integration.rapidpro.CompleteDataSetRegistrationFunction; -import org.hisp.dhis.integration.rapidpro.aggregationStrategy.ContactOrgUnitIdAggrStrategy; -import org.hisp.dhis.integration.rapidpro.expression.RootCauseExpr; -import org.hisp.dhis.integration.rapidpro.processor.CurrentPeriodCalculator; -import org.hisp.dhis.integration.rapidpro.processor.IdSchemeQueryParamSetter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -@Component -public class DeliverReportRouteBuilder extends AbstractRouteBuilder -{ - @Autowired - private CurrentPeriodCalculator currentPeriodCalculator; - - @Autowired - private RootCauseExpr rootCauseExpr; - - @Autowired - private IdSchemeQueryParamSetter idSchemeQueryParamSetter; - - @Autowired - private ContactOrgUnitIdAggrStrategy contactOrgUnitIdAggrStrategy; - - @Autowired - private CompleteDataSetRegistrationFunction completeDataSetRegistrationFunction; - - @Override - protected void doConfigure() - { - from( "quartz://dhis2AggregateReports?cron={{report.delivery.schedule.expression}}" ) - .routeId( "Schedule Report Delivery" ) - .precondition( "'{{report.delivery.schedule.expression:}}' != ''" ) - .pollEnrich( "jms:queue:dhis2AggregateReports" ) - .to( "direct:deliverReport" ); - - from( "jms:queue:dhis2AggregateReports" ) - .routeId( "Consume Report" ) - .precondition( "'{{report.delivery.schedule.expression:}}' == ''" ) - .kamelet( "hie-create-replay-checkpoint-action" ) - .to( "direct:deliverReport" ); - - from( "direct:deliverReport" ) - .routeId( "Deliver Report" ) - .to( "direct:transformReport" ) - .to( "direct:transmitReport" ); - - from( "direct:transformReport" ) - .routeId( "Transform Report" ) - .streamCache("true") - .setHeader( "originalPayload", simple( "${body}" ) ) - .unmarshal().json() - .choice().when( header( "reportPeriodOffset" ).isNull() ) - .setHeader( "reportPeriodOffset", constant( -1 ) ) - .end() - .enrich() - .simple( "dhis2://get/resource?path=dataElements&filter=dataSetElements.dataSet.code:eq:${headers['dataSetCode']}&fields=code&client=#dhis2Client" ) - .aggregationStrategy( ( oldExchange, newExchange ) -> { - oldExchange.getMessage().setHeader( "dataElementCodes", - jsonpath( "$.dataElements..code" ).evaluate( newExchange, List.class ) ); - return oldExchange; - } ) - .choice().when( header( "orgUnitId" ).isNull() ) - .setHeader( "uuid", simple( "${body[contact][uuid]}" ) ) - .enrich().simple( "kamelet:hie-rapidpro-get-contacts-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .aggregationStrategy( contactOrgUnitIdAggrStrategy ) - .end() - .end() - .enrich( "direct:computePeriod", ( oldExchange, newExchange ) -> { - oldExchange.getMessage().setHeader( "period", newExchange.getMessage().getBody() ); - return oldExchange; - } ) - .transform( datasonnet( "resource:classpath:dataValueSet.ds", Map.class, "application/x-java-object", - "application/x-java-object" ) ) - .process( idSchemeQueryParamSetter ) - .marshal().json().transform().body( String.class ); - - from( "direct:transmitReport" ) - .routeId( "Transmit Report" ) - .log( LoggingLevel.INFO, LOGGER, "Saving data value set => ${body}" ) - .setHeader( "dhisRequest", simple( "${body}" ) ) - .toD( "dhis2://post/resource?path=dataValueSets&inBody=resource&client=#dhis2Client" ) - .setBody( (Function) exchange -> exchange.getMessage().getBody( String.class ) ) - .setHeader( "dhisResponse", simple( "${body}" ) ) - .unmarshal().json() - .choice() - .when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) ) - .to( "direct:completeDataSetRegistration" ) - .otherwise() - .setHeader( "errorMessage", simple( "Import error from DHIS2 while saving data value set => ${body}" ) ) - .log( LoggingLevel.ERROR, LOGGER, "${header.errorMessage}" ) - .kamelet( "hie-fail-replay-checkpoint-action" ) - .end(); - - from( "direct:computePeriod" ) - .routeId( "Compute Period" ) - .toD( "dhis2://get/collection?path=dataSets&arrayName=dataSets&filter=code:eq:${headers['dataSetCode']}&fields=periodType&client=#dhis2Client" ) - .split().body().aggregationStrategy( new GroupedBodyAggregationStrategy() ) - .convertBodyTo( DataSet.class ) - .end() - .process( currentPeriodCalculator ); - - from( "direct:completeDataSetRegistration" ) - .setBody( completeDataSetRegistrationFunction ) - .toD( "dhis2://post/resource?path=completeDataSetRegistrations&inBody=resource&client=#dhis2Client" ) - .unmarshal().json() - .choice() - .when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) ) - .setHeader( "rapidProPayload", header( "originalPayload" ) ) - .setBody( simple( "${properties:report.success.log.insert.{{spring.sql.init.platform}}}" ) ) - .to( "jdbc:dataSource?useHeadersAsParameters=true" ) - .otherwise() - .setHeader( "errorMessage", simple( "Error from DHIS2 while completing data set registration => ${body}" ) ) - .log( LoggingLevel.ERROR, LOGGER, "${header.errorMessage}" ) - .kamelet( "hie-fail-replay-checkpoint-action" ) - .end(); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilder.java deleted file mode 100644 index 8920a29d..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilder.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.hisp.dhis.integration.rapidpro.ProgramStageToFlowMap; -import org.hisp.dhis.integration.rapidpro.expression.LastRunCalculator; -import org.hisp.dhis.integration.rapidpro.expression.LastRunAtColumnReader; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import java.util.Map; - -@Component -public class PullRapidProFlowsRouteBuilder extends AbstractRouteBuilder -{ - @Autowired - private LastRunCalculator lastRunCalculator; - - @Autowired - private LastRunAtColumnReader lastRunAtColumnReader; - - @Autowired - private ProgramStageToFlowMap programStageToFlowMap; - - @Value( "${rapidpro.flow.uuids:}" ) - private String aggregateReportFlowUuids; - - @Override - protected void doConfigure() - { - from( "servlet:tasks/scan?muteException=true" ) - .removeHeaders( "*" ) - .to( "direct:pull" ) - .setHeader( Exchange.CONTENT_TYPE, constant( "application/json" ) ) - .setBody( constant( Map.of("status", "success", "data", "Scanned RapidPro flow runs") ) ) - .marshal().json(); - - from( "quartz://pull?cron={{scan.reports.schedule.expression:0 0/30 * * * ?}}&stateful=true" ) - .to( "direct:pull" ); - - from( "direct:pull" ) - .routeId( "Scan RapidPro Flows" ) - .streamCache("true") - .process( exchange -> { - String programStageFlowUuids = programStageToFlowMap.getFlowUuids(); - String flowUuids = (programStageFlowUuids.isEmpty() && aggregateReportFlowUuids.isEmpty()) ? - "" : - String.join( ",", programStageFlowUuids, aggregateReportFlowUuids ); - exchange.setProperty( "flowUuids", flowUuids ); - } ) - .split( simple( "${exchangeProperty.flowUuids}" ), "," ) - .setHeader( "flowUuid", simple( "${body}" ) ) - .setBody( simple( "${properties:last.run.select.{{spring.sql.init.platform}}}" ) ) - .to( "jdbc:dataSource?useHeadersAsParameters=true" ) - .setProperty( "lastRunAt", lastRunAtColumnReader ) - .setHeader( "newLastRunAt" ).ognl( "@java.sql.Timestamp@from(@java.time.Instant@now())" ) - .setHeader( "reverse", constant( true ) ) - .setHeader( "flow", header( "flowUuid" ) ) - .setHeader( "after", exchangeProperty( "lastRunAt" ) ) - .to("kamelet:hie-rapidpro-get-flow-runs-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}") - .split(body()) - .log( LoggingLevel.DEBUG, LOGGER, "Fetched flow runs after ${header.after} for flow ${header.flow} => ${body}" ) - .setHeader( "newLastRunAt", lastRunCalculator ) - .filter( simple( "${body[exited_on]} != null && ${body[exit_type]} == 'completed'" ) ) - .choice() - .when().simple("${body[values][data_set_code]} != null && ${body[values][event_id]} == null" ) - .to("direct:queueAggregateReport") - .when().simple( "${body[values][data_set_code]} == null && ${body[values][event_id]} != null") - .to("direct:queueProgramStageEvent") - .otherwise() - .log( LoggingLevel.ERROR, LOGGER, - "Cannot process flow run for flow definition ${header.flowUuid} because one of the required flow results is missing. Hint: for aggregate data reports, save the data set code to a flow result named 'data_set_code' in RapidPro. For program stage events, save the value '@trigger.params.eventId' to a flow result named 'event_id' in RapidPro." ) - .end() - .end() - .setBody( simple( "${properties:last.run.upsert.{{spring.sql.init.platform}}}" ) ) - .to( "jdbc:dataSource?useHeadersAsParameters=true" ) - .end(); - - from( "direct:queueAggregateReport" ) - .routeId("Queue Aggregate Report") - .setHeader( "dataSetCode", simple( "${body[values][data_set_code][value]}" ) ) - .setHeader( "orgUnitId" ).ognl( - "request.body['values']['org_unit_id'] == null ? null : request.body['values']['org_unit_id']['value']" ) - .setHeader( "reportPeriodOffset" ).ognl( - "request.body['values']['report_period_offset'] == null ? null : request.body['values']['report_period_offset']['value']" ) - .transform( datasonnet( "resource:classpath:webhook.ds", String.class, "application/x-java-object", - "application/json" ) ) - .to( "jms:queue:dhis2AggregateReports?exchangePattern=InOnly" ) - .log( LoggingLevel.DEBUG, LOGGER, - "Enqueued aggregate report flow run [data set code = ${header.dataSetCode}, report period offset = ${header.reportPeriodOffset}, content = ${body}]" ); - - from( "direct:queueProgramStageEvent" ) - .routeId( "Queue Program Stage Event" ) - .setHeader( "eventId", simple( "${body[values][event_id][value]}" ) ) - .transform( datasonnet( "resource:classpath:webhook.ds", String.class, "application/x-java-object", - "application/json" ) ) - .to( "jms:queue:dhis2ProgramStageEvents?exchangePattern=InOnly" ) - .log( LoggingLevel.DEBUG, LOGGER, "Enqueued program stage event flow run [event Id = ${header.eventId}]" ); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilder.java deleted file mode 100644 index a4aa7149..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilder.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.hisp.dhis.integration.rapidpro.aggregationStrategy.AttributesAggrStrategy; -import org.hisp.dhis.integration.rapidpro.aggregationStrategy.ProgramStageEventsAggrStrategy; -import org.hisp.dhis.integration.rapidpro.aggregationStrategy.TrackedEntityIdAggrStrategy; -import org.hisp.dhis.integration.rapidpro.processor.EventStatusUpdateProcessor; -import org.hisp.dhis.integration.rapidpro.processor.FetchDueEventsQueryParamSetter; -import org.hisp.dhis.integration.rapidpro.processor.SetAttributesEndpointProcessor; -import org.hisp.dhis.integration.rapidpro.processor.SetProgramStagesPropertyProcessor; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.Map; - -@Component -public class QueueProgramStageEventsRouteBuilder extends AbstractRouteBuilder -{ - @Autowired - private SetProgramStagesPropertyProcessor setProgramStagesPropertyProcessor; - - @Autowired - private ProgramStageEventsAggrStrategy programStageEventsAggrStrategy; - - @Autowired - private FetchDueEventsQueryParamSetter fetchDueEventsQueryParamSetter; - - @Autowired - private TrackedEntityIdAggrStrategy trackedEntityIdAggrStrategy; - - @Autowired - private AttributesAggrStrategy attributesAggrStrategy; - - @Autowired - private SetAttributesEndpointProcessor setAttributesEndpointProcessor; - - @Autowired - private EventStatusUpdateProcessor eventStatusUpdateProcessor; - - @Override - protected void doConfigure() - throws - Exception - { - from( "servlet:tasks/syncEvents?muteException=true" ) - .precondition( "{{sync.dhis2.events.to.rapidpro.flows}}" ) - .removeHeaders( "*" ) - .to( "direct:queueEvents" ) - .setHeader( Exchange.CONTENT_TYPE, constant( "application/json" ) ) - .setBody( constant( Map.of( "status", "success", "data", "Fetched and enqueued due program stage events" ) ) ) - .marshal().json(); - - from( "quartz://fetchDueEvents?cron={{sync.events.schedule.expression:0 0/30 * * * ?}}&stateful=true" ) - .precondition( "{{sync.dhis2.events.to.rapidpro.flows}}" ) - .to( "direct:queueEvents" ); - - from("direct:queueEvents") - .routeId( "Queue Program Stage Events" ) - .to( "direct:fetchDueEvents" ) - .split( simple( "${exchangeProperty.dueEvents}" ) ) - .setProperty( "eventPayload", simple( "${body}" ) ) - .to( "direct:fetchAttributes" ) - .transform( datasonnet( "resource:classpath:event.ds", String.class, "application/x-java-object", "application/json" ) ) - .to( "jms:queue:events?exchangePattern=InOnly" ) - .unmarshal().json() - .log( LoggingLevel.DEBUG, LOGGER, "Enqueued event [eventId => ${body[event]}, programStage => ${body[programStage]}]" ) - .to( "direct:updateDhisProgramStageEventStatus" ); - - from( "direct:fetchDueEvents" ) - .routeId( "Fetch Due Events" ) - .process( setProgramStagesPropertyProcessor ) - .split( simple( "${exchangeProperty.programStages}" ) ).aggregationStrategy( programStageEventsAggrStrategy ) - .setProperty( "programStage", simple( "${body}" ) ) - .process( fetchDueEventsQueryParamSetter ) - .toD( "dhis2://get/collection?path=tracker/events&paging=true&arrayName=instances&fields=enrollment,programStage,orgUnit,scheduledAt,occurredAt,event,status&client=#dhis2Client" ) - .end() - .setProperty( "dueEvents", simple( "${body}" ) ) - .setProperty( "dueEventsCount", simple( "${body.size}" ) ) - .log( LoggingLevel.INFO, LOGGER, "Fetched ${body.size} due events from DHIS2" ); - - from( "direct:fetchAttributes" ) - .routeId( "Fetch Attributes" ) - .enrich() - .simple( "dhis2://get/resource?path=tracker/enrollments/${body[enrollment]}&fields=trackedEntity,attributes[code]&client=#dhis2Client" ) - .aggregationStrategy( trackedEntityIdAggrStrategy ) - .process( setAttributesEndpointProcessor ) - .enrich() - .simple( "${exchangeProperty.attributesEndpoint}" ) - .aggregationStrategy( attributesAggrStrategy ) - .choice().when( simple( "${body[contactUrn]} == null" ) ) - .log( LoggingLevel.ERROR, LOGGER, "Error while fetching phone number attribute from DHIS2 enrollment ${body[enrollment]}. Hint: Be sure to set the 'dhis2.phone.number.attribute.code' config property." ) - .stop(); - - from("direct:updateDhisProgramStageEventStatus") - .routeId( "Update DHIS Program Stage Event Status" ) - .process( eventStatusUpdateProcessor ) - .marshal().json().convertBodyTo( String.class ) - .toD( "dhis2://post/resource?path=tracker&inBody=resource&client=#dhis2Client" ) - .unmarshal().json() - .choice().when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) ) - .log( LoggingLevel.DEBUG, LOGGER, "Successfully updated DHIS program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}" ) - .otherwise() - .log( LoggingLevel.ERROR, LOGGER, "Unexpected status code when updating the dhis program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}. HTTP ${header.CamelHttpResponseCode}. HTTP response body => ${body}" ) - .end(); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/ReminderRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/ReminderRouteBuilder.java deleted file mode 100644 index 186214d8..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/ReminderRouteBuilder.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.hisp.dhis.integration.rapidpro.aggregationStrategy.MapAggregationStrategy; -import org.hisp.dhis.integration.rapidpro.processor.SetReportRateQueryParamProcessor; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Map; - -@Component -public class ReminderRouteBuilder extends AbstractRouteBuilder -{ - @Autowired - private SetReportRateQueryParamProcessor setReportRateQueryParamProcessor; - - @Override - protected void doConfigure() - { - from( "servlet:tasks/reminders?muteException=true" ) - .removeHeaders( "*" ) - .to( "direct:reminders" ) - .setHeader( Exchange.CONTENT_TYPE, constant( "application/json" ) ) - .setBody( constant( Map.of("status", "success", "data", "Sent reminders of overdue reports") ) ) - .marshal().json(); - - from( "quartz://reminders?cron={{reminder.schedule.expression:0 0 9 ? * *}}" ) - .to( "direct:reminders" ); - - from( "direct:reminders" ) - .routeId( "Broadcast Reminders" ) - .log( LoggingLevel.INFO, LOGGER, "Reminding RapidPro contacts of overdue reports..." ) - .setProperty( "orgUnitIdScheme", simple( "{{org.unit.id.scheme}}" ) ) - .choice().when( simple( "{{sync.rapidpro.contacts}} == true" ) ) - .to( "direct:sync" ) - .end() - .split( simple( "{{reminder.data.set.codes:}}" ), "," ) - .setProperty( "dataSetCode", body() ) - .to( "direct:fetchDataSet" ) - .choice().when( body().isNull() ) - .log( LoggingLevel.WARN, LOGGER, "Cannot remind contacts given unknown data set code '${exchangeProperty.dataSetCode}'" ) - .otherwise() - .setProperty( "dataSet", body() ) - .setHeader( "group", constant( "DHIS2" ) ) - .to("kamelet:hie-rapidpro-get-contacts-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}") - .split(body(), new MapAggregationStrategy() ) - .setBody().groovy( "[(body.fields.dhis2_organisation_unit_id) : body.uuid]" ) - .end() - .setProperty( "orgUnitIdsAndContactIds", body() ) - .to( "direct:fetchReportRate" ) - .split( simple( "${body['rows']}" ) ) - .filter().ognl( "@java.lang.Double@parseDouble(request.body[4]) < 100" ) - .to( "direct:sendBroadcast" ) - .end() - .end() - .end(); - - from( "direct:fetchDataSet" ) - .toD( "dhis2://get/resource?path=dataSets&filter=code:eq:${body}&fields=id,name,periodType,organisationUnits[id,${exchangeProperty.orgUnitIdScheme.toLowerCase()}]&client=#dhis2Client" ) - .setProperty( "dataSetCount", jsonpath( "$.dataSets.length()" ) ) - .choice().when().simple( "${exchangeProperty.dataSetCount} > 0" ) - .transform( jsonpath( "$.dataSets[0]" ) ) - .otherwise() - .setBody( simple( "${null}" ) ) - .end(); - - from( "direct:fetchReportRate" ) - .process( setReportRateQueryParamProcessor ) - .to( "dhis2://get/resource?path=analytics&client=#dhis2Client" ) - .unmarshal().json( Map.class ); - - from( "direct:sendBroadcast" ) - .setHeader( "contacts", - datasonnet( "resource:classpath:broadcastContacts.ds", List.class, "application/x-java-object", - "application/x-java-object" ) ) - .setHeader( "text" ).groovy( - "java.text.MessageFormat.format(java.util.ResourceBundle.getBundle('reminder').getString('text'), exchangeProperties.dataSet.name)" ) - .to( "kamelet:hie-rapidpro-send-broadcast-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .log( LoggingLevel.INFO, LOGGER, "Overdue report reminder sent => ${body}" ); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteBuilder.java deleted file mode 100644 index 11feeac2..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteBuilder.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.LoggingLevel; -import org.springframework.stereotype.Component; - -@Component -public class SetUpRapidProRouteBuilder extends AbstractRouteBuilder -{ - @Override - protected void doConfigure() - { - from( "direct:prepareRapidPro" ).routeId( "Set up RapidPro" ).to( "direct:createFieldsRoute" ) - .to( "direct:createGroupRoute" ); - - setUpCreateFieldsRoute(); - setUpCreateGroupRoute(); - } - - private void setUpCreateFieldsRoute() - { - from( "direct:createFieldsRoute" ).routeId( "Create RapidPro Fields" ) - .setHeader( "key", constant( "dhis2_organisation_unit_id" ) ) - .to( "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .removeHeaders( "*" ) - .choice().when().groovy( "!body.iterator().hasNext()" ) - .log( LoggingLevel.INFO, LOGGER, "Creating DHIS2 Organisation Unit ID fields in RapidPro..." ) - .setHeader( "label", constant( "DHIS2 Organisation Unit ID" ) ) - .setHeader( "type", constant( "text" ) ) - .to( "kamelet:hie-rapidpro-create-field-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .removeHeaders( "*" ) - .end() - .setHeader( "key", constant( "dhis2_user_id" ) ) - .to( "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .removeHeaders( "*" ) - .choice().when().groovy( "!body.iterator().hasNext()" ) - .log( LoggingLevel.INFO, LOGGER, "Creating DHIS2 User ID field in RapidPro..." ) - .setHeader( "label", constant( "DHIS2 User ID" ) ) - .setHeader( "type", constant( "text" ) ) - .to( "kamelet:hie-rapidpro-create-field-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .removeHeaders( "*" ) - .end(); - } - - private void setUpCreateGroupRoute() - { - from( "direct:createGroupRoute" ).routeId( "Create RapidPro Group" ) - .setHeader( "name", constant( "DHIS2" ) ) - .to( "kamelet:hie-rapidpro-get-groups-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .choice().when().groovy( "!body.iterator().hasNext()" ) - .log( LoggingLevel.INFO, LOGGER, "Creating DHIS2 group in RapidPro..." ) - .to( "kamelet:hie-rapidpro-create-group-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) - .setProperty( "groupUuid", simple( "$.uuid" ) ) - .otherwise() - .setProperty( "groupUuid", simple( "$.uuid" ) ); - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/SyncRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/SyncRouteBuilder.java deleted file mode 100644 index 7a62338e..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/SyncRouteBuilder.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.hisp.dhis.api.model.v40_0.User; -import org.hisp.dhis.integration.rapidpro.IsContactPoint; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import java.util.Iterator; -import java.util.Map; - -@Component -public class SyncRouteBuilder extends AbstractRouteBuilder -{ - @Autowired - private IsContactPoint isContactPoint; - - @Value( "${org.unit.id.scheme}" ) - private String orgUnitIdScheme; - - @Override - public void doConfigure() - { - from( "servlet:tasks/sync?muteException=true" ) - .precondition( "{{sync.rapidpro.contacts}}" ) - .removeHeaders( "*" ) - .to( "direct:sync" ) - .setHeader( Exchange.CONTENT_TYPE, constant( "application/json" ) ) - .setBody( constant( Map.of("status", "success", "data", "Synchronised RapidPro contacts with DHIS2 users") ) ) - .marshal().json(); - - from( "quartz://sync?cron={{sync.schedule.expression:0 0/30 * * * ?}}&stateful=true" ) - .precondition( "{{sync.rapidpro.contacts}}" ) - .to( "direct:sync" ); - - from( "direct:sync" ) - .precondition( "{{sync.rapidpro.contacts}}" ) - .routeId( "Sync RapidPro Contacts" ) - .log( LoggingLevel.INFO, LOGGER, "Synchronising RapidPro contacts..." ) - .to( "direct:prepareRapidPro" ) - .setProperty( "orgUnitIdScheme", simple( "{{org.unit.id.scheme}}" ) ) - .toD( "dhis2://get/collection?path=users&arrayName=users&fields=id,firstName,surname,phoneNumber,telegram,whatsApp,twitter,facebookMessenger,organisationUnits[${exchangeProperty.orgUnitIdScheme.toLowerCase()}~rename(id)]&filter=organisationUnits.id:!null&client=#dhis2Client" ) - .split(body()) - .to( "direct:createOrUpdateContact" ) - .end() - .log( LoggingLevel.INFO, LOGGER, "Completed synchronisation of RapidPro contacts with DHIS2 users" ); - - from( "direct:createOrUpdateContact" ) - .convertBodyTo( User.class ) - .filter( isContactPoint ) - .setProperty( "dhis2UserId" ).groovy( "body.id.get()" ) - .setHeader( "urn", simple( "ext:${exchangeProperty.dhis2UserId}" )) - .setHeader( "group", constant( "DHIS2" ) ) - .toV( "kamelet:hie-rapidpro-get-contacts-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}", null, "rapidProContact" ) - .removeHeader( "urn" ) - .removeHeader( "group" ) - .setHeader( "groups").groovy( "['DHIS2']" ) - .setHeader( "contactName" ).groovy( "body.firstName.get() + ' ' + body.surname.get()" ) - .setHeader( "phoneNumber" ).groovy( "body.phoneNumber.orElse(null)" ) - .setHeader( "telegram" ).groovy( "body.telegram.orElse(null)" ) - .setHeader( "whatsApp" ).groovy( "body.whatsApp.orElse(null)" ) - .setHeader( "facebookMessenger" ).groovy( "body.facebookMessenger.orElse(null)" ) - .setHeader( "twitterId" ).groovy( "body.twitter.orElse(null)" ) - .choice() - .when( exchange -> !exchange.getVariable( "rapidProContact", null, Iterator.class ).hasNext() ) - .log( LoggingLevel.DEBUG, LOGGER, "Creating RapidPro contact for DHIS2 user ${exchangeProperty.dhis2UserId}" ) - .setHeader( "external", exchangeProperty( "dhis2UserId" ) ) - .setHeader( "fields").groovy( "[dhis2_organisation_unit_id : body.organisationUnits.get()[0].id, dhis2_user_id : exchangeProperties.dhis2UserId]" ) - .to( "kamelet:hie-rapidpro-create-or-update-contact-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}&httpOkStatusRange=200-499" ) - .otherwise() - .log( LoggingLevel.DEBUG, LOGGER, "Updating RapidPro contact for DHIS2 user ${exchangeProperty.dhis2UserId}" ) - .setHeader( "uuid" ).groovy( "variables.rapidProContact.iterator().next().uuid" ) - .to( "kamelet:hie-rapidpro-create-or-update-contact-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}&httpOkStatusRange=200-499" ) - .end(); - - } -} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/WebHookRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/WebHookRouteBuilder.java deleted file mode 100644 index 0edf7047..00000000 --- a/src/main/java/org/hisp/dhis/integration/rapidpro/route/WebHookRouteBuilder.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2004-2022, University of Oslo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * Neither the name of the HISP project nor the names of its contributors may - * be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR - * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON - * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package org.hisp.dhis.integration.rapidpro.route; - -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.springframework.stereotype.Component; - -@Component -public class WebHookRouteBuilder extends AbstractRouteBuilder -{ - @Override - protected void doConfigure() - { - from( "servlet:webhook?httpMethodRestrict=POST&muteException=true" ) - .precondition( "{{rapidpro.webhook.enabled}}" ) - .routeId( "RapidPro Webhook" ) - .removeHeader( Exchange.HTTP_URI ) - .to( "jms:queue:dhis2AggregateReports?exchangePattern=InOnly" ) - .log( LoggingLevel.DEBUG, LOGGER, "Enqueued webhook message [data set code = ${header.dataSetCode},report period offset = ${header.reportPeriodOffset},orgUnitId = ${header.orgUnitId},content = ${body}]" ) - .setHeader( Exchange.HTTP_RESPONSE_CODE, constant( 202 ) ) - .setBody().simple( "${null}" ); - } -} diff --git a/src/main/resources/routes/deliver-report.camel.yaml b/src/main/resources/routes/deliver-report.camel.yaml new file mode 100644 index 00000000..eb6d5018 --- /dev/null +++ b/src/main/resources/routes/deliver-report.camel.yaml @@ -0,0 +1,167 @@ +- route: + id: scheduleReportDelivery + precondition: "'{{report.delivery.schedule.expression:}}' != ''" + from: + uri: "quartz://dhis2AggregateReports?cron={{report.delivery.schedule.expression}}" + steps: + - pollEnrich: + constant: jms:queue:dhis2AggregateReports + - to: direct:deliverReport + +- route: + precondition: "'{{report.delivery.schedule.expression:}}' == ''" + from: + uri: jms:queue:dhis2AggregateReports + steps: + - kamelet: hie-create-replay-checkpoint-action + - to: direct:deliverReport + +- route: + id: deliverReport + from: + uri: direct:deliverReport + steps: + - to: direct:transformReport + - to: direct:transmitReport + +- route: + id: transformReport + streamCache: true + from: + uri: direct:transformReport + steps: + - setHeader: + name: originalPayload + simple: ${body} + - unmarshal: + json: {} + - choice: + when: + - simple: ${headers.reportPeriodOffset} == null + steps: + - setHeader: + name: reportPeriodOffset + groovy: -1 + - enrich: + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.DataElementCodeAggrStrategy" + simple: "dhis2://get/resource?path=dataElements&filter=dataSetElements.dataSet.code:eq:${headers['dataSetCode']}&fields=code&client=#dhis2Client" + - choice: + when: + - simple: ${headers.orgUnitId} == null + steps: + - setHeader: + name: uuid + simple: ${body[contact][uuid]} + - enrich: + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.ContactOrgUnitIdAggrStrategy" + constant: "kamelet:hie-rapidpro-get-contacts-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - enrich: + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.PeriodAggrStrategy" + constant: direct:computePeriod + - transform: + datasonnet: + outputMediaType: application/x-java-object + resultType: java.util.Map + bodyMediaType: application/x-java-object + expression: resource:classpath:dataValueSet.ds + - process: + ref: "#class:org.hisp.dhis.integration.rapidpro.processor.IdSchemeQueryParamSetter" + - marshal: + json: {} + - transform: + groovy: request.getBody(String.class) + +- route: + id: transmitReport + from: + uri: direct:transmitReport + steps: + - log: + message: "Saving data value set => ${body}" + loggingLevel: INFO + - setHeader: + name: dhisRequest + simple: ${body} + - to: + uri: dhis2://post/resource + parameters: + path: dataValueSets + inBody: resource + client: "#dhis2Client" + - setBody: + groovy: request.getBody(String.class) + - setHeader: + name: dhisResponse + simple: ${body} + - unmarshal: + json: {} + - choice: + when: + - simple: "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" + steps: + - to: direct:completeDataSetRegistration + otherwise: + steps: + - setHeader: + name: errorMessage + simple: "Import error from DHIS2 while saving data value set => ${body}" + - log: + message: "${header.errorMessage}" + loggingLevel: ERROR + - kamelet: hie-fail-replay-checkpoint-action + +- route: + id: computePeriod + from: + uri: direct:computePeriod + steps: + - toD: + uri: dhis2://get/collection + parameters: + path: dataSets + arrayName: dataSets + filter: code:eq:${headers['dataSetCode']} + fields: periodType + client: "#dhis2Client" + - split: + aggregationStrategy: "#class:org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy" + simple: ${body} + steps: + - convertBodyTo: + type: org.hisp.dhis.api.model.v40_0.DataSet + - process: + ref: "#class:org.hisp.dhis.integration.rapidpro.processor.CurrentPeriodCalculator" + +- route: + from: + uri: direct:completeDataSetRegistration + steps: + - setBody: + groovy: "['completeDataSetRegistrations': [['completed': true, 'dataSet': headers.dataSetCode, 'organisationUnit': headers.orgUnitId, 'period': headers.period]]]" + - to: + uri: dhis2://post/resource + parameters: + path: completeDataSetRegistrations + inBody: resource + client: "#dhis2Client" + - unmarshal: + json: {} + - choice: + when: + - simple: "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" + steps: + - setHeader: + name: rapidProPayload + header: originalPayload + - setBody: + simple: "${properties:report.success.log.insert.{{spring.sql.init.platform}}}" + - to: jdbc:dataSource?useHeadersAsParameters=true + otherwise: + steps: + - setHeader: + name: errorMessage + simple: "Error from DHIS2 while completing data set registration => ${body}" + - log: + message: ${header.errorMessage} + loggingLevel: ERROR + - kamelet: hie-fail-replay-checkpoint-action \ No newline at end of file diff --git a/src/main/resources/routes/exception-handler.camel.yaml b/src/main/resources/routes/exception-handler.camel.yaml new file mode 100644 index 00000000..bcac0ddf --- /dev/null +++ b/src/main/resources/routes/exception-handler.camel.yaml @@ -0,0 +1,11 @@ +- routeConfiguration: + onException: + - onException: + exception: + - org.apache.camel.http.base.HttpOperationFailedException + handled: + constant: false + steps: + - log: + message: HTTP response body => ${exchangeProperty.CamelExceptionCaught.responseBody} + loggingLevel: ERROR \ No newline at end of file diff --git a/src/main/resources/routes/pull-rapidpro-flows.camel.yaml b/src/main/resources/routes/pull-rapidpro-flows.camel.yaml new file mode 100644 index 00000000..cacd28b1 --- /dev/null +++ b/src/main/resources/routes/pull-rapidpro-flows.camel.yaml @@ -0,0 +1,139 @@ +- beans: + - name: lastRunAtColumnReaderExpr + type: "#class:org.hisp.dhis.integration.rapidpro.expression.LastRunAtColumnReaderExpr" + - name: lastRunCalculatorExpr + type: "#class:org.hisp.dhis.integration.rapidpro.expression.LastRunCalculatorExpr" + - name: flowUuidsExpr + type: "#class:org.hisp.dhis.integration.rapidpro.expression.FlowUuidsExpr" + +- route: + from: + uri: servlet:tasks/scan?muteException=true + steps: + - removeHeaders: + pattern: '*' + - to: direct:pull + - setHeader: + name: Content-Type + constant: application/json + - setBody: + groovy: "['status': 'success', 'data': 'Scanned RapidPro flow runs']" + - marshal: + json: {} + +- route: + from: + uri: "quartz://pull?cron={{scan.reports.schedule.expression:0 0/30 * * * ?}}&stateful=true" + steps: + - to: direct:pull + +- route: + id: scanRapidproFlows + streamCache: true + from: + uri: direct:pull + steps: + - setProperty: + name: flowUuids + ref: flowUuidsExpr + - split: + delimiter: "," + simple: ${exchangeProperty.flowUuids} + steps: + - setHeader: + name: flowUuid + simple: ${body} + - setBody: + simple: "${properties:last.run.select.{{spring.sql.init.platform}}}" + - to: jdbc:dataSource?useHeadersAsParameters=true + - setProperty: + name: lastRunAt + ref: lastRunAtColumnReaderExpr + - setHeader: + name: newLastRunAt + ognl: '@java.sql.Timestamp@from(@java.time.Instant@now())' + - setHeader: + name: reverse + constant: true + - setHeader: + name: flow + header: flowUuid + - setHeader: + name: after + exchangeProperty: lastRunAt + - to: "kamelet:hie-rapidpro-get-flow-runs-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - split: + simple: "${body}" + steps: + - log: + message: "Fetched flow runs after ${header.after} for flow ${header.flow} => ${body}" + loggingLevel: DEBUG + - setHeader: + name: newLastRunAt + ref: lastRunCalculatorExpr + - filter: + simple: "${body[exited_on]} != null && ${body[exit_type]} == 'completed'" + - choice: + when: + - simple: "${body[values][data_set_code]} != null && ${body[values][event_id]} == null" + steps: + - to: direct:queueAggregateReport + - simple: "${body[values][data_set_code]} == null && ${body[values][event_id]} != null" + steps: + - to: direct:queueProgramStageEvent + otherwise: + steps: + - log: + message: "Cannot process flow run for flow definition ${header.flowUuid}\ + \ because one of the required flow results is missing. Hint: for aggregate data\ + \ reports, save the data set code to a flow result named 'data_set_code'\ + \ in RapidPro. For program stage events, save the value '@trigger.params.eventId'\ + \ to a flow result named 'event_id' in RapidPro." + loggingLevel: ERROR + - setBody: + simple: "${properties:last.run.upsert.{{spring.sql.init.platform}}}" + - to: jdbc:dataSource?useHeadersAsParameters=true + +- route: + id: queueAggregateReport + from: + uri: direct:queueAggregateReport + steps: + - setHeader: + name: dataSetCode + simple: "${body[values][data_set_code][value]}" + - setHeader: + name: orgUnitId + ognl: "request.body['values']['org_unit_id'] == null ? null : request.body['values']['org_unit_id']['value']" + - setHeader: + name: reportPeriodOffset + ognl: "request.body['values']['report_period_offset'] == null ? null : request.body['values']['report_period_offset']['value']" + - transform: + datasonnet: + outputMediaType: application/json + resultType: String + bodyMediaType: application/x-java-object + expression: resource:classpath:webhook.ds + - to: jms:queue:dhis2AggregateReports?exchangePattern=InOnly + - log: + message: "Enqueued aggregate report flow run [data set code = ${header.dataSetCode}, report period offset = ${header.reportPeriodOffset}, content = ${body}]" + loggingLevel: DEBUG + +- route: + id: queueProgramStageEvent + from: + uri: direct:queueProgramStageEvent + steps: + - setHeader: + name: eventId + simple: "${body[values][event_id][value]}" + - transform: + datasonnet: + outputMediaType: application/json + resultType: String + bodyMediaType: application/x-java-object + expression: resource:classpath:webhook.ds + - to: jms:queue:dhis2ProgramStageEvents?exchangePattern=InOnly + - log: + message: "Enqueued program stage event flow run [event Id = ${header.eventId}]" + loggingLevel: DEBUG \ No newline at end of file diff --git a/src/main/resources/routes/queue-program-stage-events.camel.yaml b/src/main/resources/routes/queue-program-stage-events.camel.yaml new file mode 100644 index 00000000..f6f0ba79 --- /dev/null +++ b/src/main/resources/routes/queue-program-stage-events.camel.yaml @@ -0,0 +1,150 @@ +- route: + precondition: "{{sync.dhis2.events.to.rapidpro.flows}}" + from: + uri: servlet:tasks/syncEvents?muteException=true + steps: + - removeHeaders: + pattern: '*' + - to: direct:queueEvents + - setHeader: + name: Content-Type + constant: application/json + - setBody: + groovy: "['status': 'success', 'data': 'Fetched and enqueued due program stage events']" + - marshal: + json: {} + +- route: + precondition: "{{sync.dhis2.events.to.rapidpro.flows}}" + from: + uri: "quartz://fetchDueEvents?cron={{sync.events.schedule.expression:0 0/30 * * * ?}}&stateful=true" + steps: + - to: direct:queueEvents + +- route: + id: queueProgramStageEvents + from: + uri: direct:queueEvents + steps: + - to: + uri: direct:fetchDueEvents + - split: + simple: "${exchangeProperty.dueEvents}" + steps: + - setProperty: + name: eventPayload + simple: "${body}" + - to: direct:fetchAttributes + - transform: + datasonnet: + outputMediaType: application/json + resultType: String + bodyMediaType: application/x-java-object + expression: resource:classpath:event.ds + - to: jms:queue:events?exchangePattern=InOnly + - unmarshal: + json: {} + - log: + message: "Enqueued event [eventId => ${body[event]}, programStage => ${body[programStage]}]" + loggingLevel: DEBUG + - to: direct:updateDhis2ProgramStageEventStatus + +- route: + id: fetchDueEvents + from: + uri: direct:fetchDueEvents + steps: + - process: + ref: "#class:org.hisp.dhis.integration.rapidpro.processor.SetProgramStagesPropertyProcessor" + - split: + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.ProgramStageEventsAggrStrategy" + simple: "${exchangeProperty.programStages}" + steps: + - setProperty: + name: programStage + simple: "${body}" + - setHeader: + name: CamelDhis2.queryParams + groovy: | + today = java.time.LocalDate.now().toString() + ['status': 'SCHEDULE', 'programStage': exchangeProperties.programStage, 'skipPaging': 'false', 'occurredBefore': today, 'scheduledBefore': today] + - to: + uri: dhis2://get/collection + parameters: + path: tracker/events + paging: true + arrayName: instances + fields: enrollment,programStage,orgUnit,scheduledAt,occurredAt,event,status + client: "#dhis2Client" + - setProperty: + name: dueEvents + simple: "${body}" + - setProperty: + name: dueEventsCount + simple: "${body.size}" + - log: + message: "Fetched ${headers.dueEventsCount} due events from DHIS2" + loggingLevel: INFO + +- route: + id: fetchAttributes + from: + uri: direct:fetchAttributes + steps: + - enrich: + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.TrackedEntityIdAggrStrategy" + simple: "dhis2://get/resource?path=tracker/enrollments/${body[enrollment]}&fields=trackedEntity,attributes[code]&client=#dhis2Client" + - process: + ref: "#class:org.hisp.dhis.integration.rapidpro.processor.SetAttributesEndpointProcessor" + - enrich: + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.AttributesAggrStrategy" + simple: "${exchangeProperty.attributesEndpoint}" + - choice: + when: + - simple: "${body[contactUrn]} == null" + steps: + - log: + message: "Error while fetching phone number attribute from DHIS2 enrollment ${body[enrollment]}.\ + \ Hint: Be sure to set the 'dhis2.phone.number.attribute.code' config property." + loggingLevel: ERROR + - stop: {} + +- route: + id: updateDhis2ProgramStageEventStatus + from: + uri: direct:updateDhis2ProgramStageEventStatus + steps: + - setBody: + groovy: | + exchangeProperties.eventPayload.status = 'ACTIVE' + exchangeProperties.eventPayload.occurredAt = java.time.LocalDate.now().format(java.time.format.DateTimeFormatter.ofPattern('yyyy-MM-dd')) + ['events': [exchangeProperties.eventPayload]] + - marshal: + json: {} + - convertBodyTo: + type: java.lang.String + - setHeader: + name: CamelDhis2.queryParams + groovy: "['async': 'false', 'importStrategy': 'UPDATE']" + - to: + uri: dhis2://post/resource + parameters: + path: tracker + inBody: resource + client: "#dhis2Client" + - unmarshal: + json: {} + - choice: + when: + - simple: "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" + steps: + - log: + message: "Successfully updated DHIS2 program stage event status for event with ID => ${exchangeProperty.eventPayload['event']}" + loggingLevel: DEBUG + otherwise: + steps: + - log: + message: "Unexpected status code when updating the dhis program stage event status for event with ID\ + \ => ${exchangeProperty.eventPayload['event']}. HTTP ${header.CamelHttpResponseCode}. HTTP response\ + \ body => ${body}" + loggingLevel: ERROR \ No newline at end of file diff --git a/src/main/resources/routes/reminders.camel.yaml b/src/main/resources/routes/reminders.camel.yaml new file mode 100644 index 00000000..eb10737c --- /dev/null +++ b/src/main/resources/routes/reminders.camel.yaml @@ -0,0 +1,139 @@ +- route: + from: + uri: servlet:tasks/reminders?muteException=true + steps: + - removeHeaders: + pattern: '*' + - to: direct:reminders + - setHeader: + name: Content-Type + constant: application/json + - setBody: + groovy: "['status':'success', 'data': 'Sent reminders of overdue reports']" + - marshal: + json: {} + +- route: + from: + uri: "quartz://reminders?cron={{reminder.schedule.expression:0 0 9 ? * *}}" + steps: + - to: direct:reminders + +- route: + id: broadcastReminders + from: + uri: direct:reminders + steps: + - log: + message: Reminding RapidPro contacts of overdue reports... + loggingLevel: INFO + - setProperty: + name: orgUnitIdScheme + simple: "{{org.unit.id.scheme}}" + - choice: + when: + - simple: "{{sync.rapidpro.contacts}} == true" + steps: + - to: direct:sync + - split: + delimiter: "," + simple: "{{reminder.data.set.codes:}}" + steps: + - setProperty: + name: dataSetCode + simple: ${body} + - to: direct:fetchDataSet + - choice: + when: + - simple: ${body} == null + steps: + - log: + message: Cannot remind contacts given unknown data set code '${exchangeProperty.dataSetCode}' + loggingLevel: WARN + otherwise: + steps: + - setProperty: + name: dataSet + simple: ${body} + - setHeader: + name: group + constant: DHIS2 + - to: + uri: kamelet:hie-rapidpro-get-contacts-sink + parameters: + rapidProApiToken: "{{rapidpro.api.token}}" + rapidProApiUrl: "{{rapidpro.api.url}}" + - split: + simple: ${body} + aggregationStrategy: "#class:org.hisp.dhis.integration.rapidpro.aggregationStrategy.MapAggregationStrategy" + steps: + - setBody: + groovy: "[(body.fields.dhis2_organisation_unit_id) : body.uuid]" + - setProperty: + name: orgUnitIdsAndContactIds + simple: ${body} + - to: direct:fetchReportRate + - split: + simple: ${body['rows']} + steps: + - filter: + ognl: "@java.lang.Double@parseDouble(request.body[4]) < 100" + steps: + - to: direct:sendBroadcast + +- route: + from: + uri: direct:fetchDataSet + steps: + - toD: + uri: dhis2://get/resource + parameters: + path: dataSets + filter: code:eq:${body} + fields: id,name,periodType,organisationUnits[id,${exchangeProperty.orgUnitIdScheme.toLowerCase()}] + client: "#dhis2Client" + - setProperty: + name: dataSetCount + jsonpath: $.dataSets.length() + - choice: + when: + - simple: "${exchangeProperty.dataSetCount} > 0" + steps: + - transform: + jsonpath: "$.dataSets[0]" + otherwise: + steps: + - setBody: + simple: "${null}" +- route: + from: + uri: direct:fetchReportRate + steps: + - process: + ref: "#class:org.hisp.dhis.integration.rapidpro.processor.SetReportRateQueryParamProcessor" + - to: + uri: dhis2://get/resource + parameters: + path: analytics + client: "#dhis2Client" + - unmarshal: + json: {} + +- route: + from: + uri: direct:sendBroadcast + steps: + - setHeader: + name: contacts + datasonnet: + outputMediaType: application/x-java-object + resultType: java.util.List + bodyMediaType: application/x-java-object + expression: resource:classpath:broadcastContacts.ds + - setHeader: + name: text + groovy: "java.text.MessageFormat.format(java.util.ResourceBundle.getBundle('reminder').getString('text'), exchangeProperties.dataSet.name)" + - to: "kamelet:hie-rapidpro-send-broadcast-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - log: + message: "Overdue report reminder sent => ${body}" + loggingLevel: INFO \ No newline at end of file diff --git a/src/main/resources/routes/set-up-rapidpro.camel.yaml b/src/main/resources/routes/set-up-rapidpro.camel.yaml new file mode 100644 index 00000000..32d583f4 --- /dev/null +++ b/src/main/resources/routes/set-up-rapidpro.camel.yaml @@ -0,0 +1,84 @@ +- route: + id: setUpRapidpro + from: + uri: direct:prepareRapidPro + steps: + - to: direct:createFieldsRoute + - to: direct:createGroupRoute + +- route: + id: createRapidproFields + from: + uri: direct:createFieldsRoute + steps: + - setHeader: + name: key + constant: dhis2_organisation_unit_id + - to: "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - removeHeaders: + pattern: '*' + - choice: + when: + - groovy: '!body.iterator().hasNext()' + steps: + - log: + message: Creating DHIS2 Organisation Unit ID fields in RapidPro... + loggingLevel: INFO + - setHeader: + name: label + constant: DHIS2 Organisation Unit ID + - setHeader: + name: type + constant: text + - to: "kamelet:hie-rapidpro-create-field-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - removeHeaders: + pattern: '*' + - setHeader: + name: key + constant: dhis2_user_id + - to: + uri: "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - removeHeaders: + pattern: '*' + - choice: + when: + - groovy: '!body.iterator().hasNext()' + steps: + - log: + message: Creating DHIS2 User ID field in RapidPro... + loggingLevel: INFO + - setHeader: + name: label + constant: DHIS2 User ID + - setHeader: + name: type + constant: text + - to: "kamelet:hie-rapidpro-create-field-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - removeHeaders: + pattern: '*' + +- route: + id: createRapidproGroup + from: + uri: direct:createGroupRoute + steps: + - setHeader: + name: name + constant: DHIS2 + - to: "kamelet:hie-rapidpro-get-groups-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - choice: + when: + - groovy: '!body.iterator().hasNext()' + steps: + - log: + message: Creating DHIS2 group in RapidPro... + loggingLevel: INFO + - to: "kamelet:hie-rapidpro-create-group-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + - setProperty: + name: groupUuid + simple: $.uuid + otherwise: + steps: + - setProperty: + name: groupUuid + simple: $.uuid \ No newline at end of file diff --git a/src/main/resources/routes/sync-contacts.camel.yaml b/src/main/resources/routes/sync-contacts.camel.yaml new file mode 100644 index 00000000..e269a3be --- /dev/null +++ b/src/main/resources/routes/sync-contacts.camel.yaml @@ -0,0 +1,125 @@ +- beans: + - name: isContactPoint + type: "#class:org.hisp.dhis.integration.rapidpro.IsContactPoint" + +- route: + precondition: "{{sync.rapidpro.contacts}}" + from: + uri: servlet:tasks/sync?muteException=true + steps: + - removeHeaders: + pattern: '*' + - to: direct:sync + - setHeader: + name: Content-Type + constant: application/json + - setBody: + groovy: "['status': 'success', 'data': 'Synchronised RapidPro contacts with DHIS2 users']" + - marshal: + json: {} + +- route: + precondition: "{{sync.rapidpro.contacts}}" + from: + uri: "quartz://sync?cron={{sync.schedule.expression:0 0/30 * * * ?}}&stateful=true" + steps: + - to: direct:sync + +- route: + id: syncRapidproContacts + precondition: "{{sync.rapidpro.contacts}}" + from: + uri: direct:sync + steps: + - log: + message: Synchronising RapidPro contacts... + loggingLevel: INFO + - to: direct:prepareRapidPro + - setProperty: + name: orgUnitIdScheme + simple: "{{org.unit.id.scheme}}" + - toD: + uri: dhis2://get/collection + parameters: + path: users + arrayName: users + fields: id,firstName,surname,phoneNumber,telegram,whatsApp,twitter,facebookMessenger,organisationUnits[${exchangeProperty.orgUnitIdScheme.toLowerCase()}~rename(id)] + filter: organisationUnits.id:!null + client: "#dhis2Client" + - split: + simple: "${body}" + steps: + - to: direct:createOrUpdateContact + - log: + message: Completed synchronisation of RapidPro contacts with DHIS2 users + loggingLevel: INFO + +- route: + from: + uri: direct:createOrUpdateContact + steps: + - convertBodyTo: + type: org.hisp.dhis.api.model.v40_0.User + - filter: + ref: isContactPoint + - setProperty: + name: dhis2UserId + groovy: body.id.get() + - setHeader: + name: urn + simple: "ext:${exchangeProperty.dhis2UserId}" + - setHeader: + name: group + constant: DHIS2 + - to: + uri: "kamelet:hie-rapidpro-get-contacts-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" + variableReceive: rapidProContact + - removeHeader: + name: urn + - removeHeader: + name: group + - setHeader: + name: groups + groovy: "['DHIS2']" + - setHeader: + name: contactName + groovy: body.firstName.get() + ' ' + body.surname.get() + - setHeader: + name: phoneNumber + groovy: body.phoneNumber.orElse(null) + - setHeader: + name: telegram + groovy: body.telegram.orElse(null) + - setHeader: + name: whatsApp + groovy: body.whatsApp.orElse(null) + - setHeader: + name: facebookMessenger + groovy: body.facebookMessenger.orElse(null) + - setHeader: + name: twitterId + groovy: body.twitter.orElse(null) + - choice: + when: + - groovy: "!variables.rapidProContact.iterator().hasNext()" + steps: + - log: + message: "Creating RapidPro contact for DHIS2 user ${exchangeProperty.dhis2UserId}" + loggingLevel: DEBUG + - setHeader: + name: external + exchangeProperty: dhis2UserId + - setHeader: + name: fields + groovy: + expression: "[dhis2_organisation_unit_id : body.organisationUnits.get()[0].id, dhis2_user_id : exchangeProperties.dhis2UserId]" + - to: "kamelet:hie-rapidpro-create-or-update-contact-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}&httpOkStatusRange=200-499" + otherwise: + steps: + - log: + message: "Updating RapidPro contact for DHIS2 user ${exchangeProperty.dhis2UserId}" + loggingLevel: DEBUG + - setHeader: + name: uuid + groovy: variables.rapidProContact.iterator().next().uuid + - to: "kamelet:hie-rapidpro-create-or-update-contact-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}&httpOkStatusRange=200-499" \ No newline at end of file diff --git a/src/main/resources/routes/webhook.camel.yaml b/src/main/resources/routes/webhook.camel.yaml new file mode 100644 index 00000000..73504f81 --- /dev/null +++ b/src/main/resources/routes/webhook.camel.yaml @@ -0,0 +1,19 @@ +- route: + id: rapidproWebhook + precondition: "{{rapidpro.webhook.enabled}}" + from: + uri: servlet:webhook?httpMethodRestrict=POST&muteException=true + steps: + - removeHeader: + name: CamelHttpUri + - to: jms:queue:dhis2AggregateReports?exchangePattern=InOnly + - log: + message: "Enqueued webhook message [data set code = ${header.dataSetCode},report\ + \ period offset = ${header.reportPeriodOffset},orgUnitId = ${header.orgUnitId},content\ + \ = ${body}]" + loggingLevel: DEBUG + - setHeader: + name: CamelHttpResponseCode + constant: 202 + - setBody: + simple: "${null}" \ No newline at end of file diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/ExtensibilityTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/ExtensibilityTestCase.java index 3948470a..9173f4f5 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/ExtensibilityTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/ExtensibilityTestCase.java @@ -78,7 +78,7 @@ public void configure() while ( true ) { Thread.sleep( 5000 ); - Route deliverReportRoute = camelContext.getRoute( "Transmit Report" ); + Route deliverReportRoute = camelContext.getRoute( "transmitReport" ); if ( deliverReportRoute.getSourceLocationShort() != null && deliverReportRoute.getSourceLocationShort() .equals( "deliverReport.yaml:4" ) ) { diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/WebhookFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/WebhookFunctionalTestCase.java index ca0cb329..de10a30c 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/WebhookFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/WebhookFunctionalTestCase.java @@ -50,7 +50,7 @@ public void testWebhook() System.setProperty( "sync.rapidpro.contacts", "true" ); System.setProperty( "rapidpro.webhook.enabled", "true" ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java index ff9bb3c3..94c82f81 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverReportRouteBuilderFunctionalTestCase.java @@ -81,7 +81,7 @@ public void testDataValueSetIsCreated() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); @@ -136,8 +136,8 @@ public void testRecordInDeadLetterChannelIsCreatedGivenWebMessageErrorWhileCreat Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - AdviceWith.adviceWith( camelContext, "Transmit Report", - r -> r.weaveByToUri( "dhis2://post/resource?path=dataValueSets&inBody=resource&client=#dhis2Client" ) + AdviceWith.adviceWith( camelContext, "transmitReport", + r -> r.weaveByToUri( "dhis2://post/resource?client=#dhis2Client&inBody=resource&path=dataValueSets" ) .replace().to( "mock:dhis2" ) ); MockEndpoint fakeDhis2Endpoint = camelContext.getEndpoint( "mock:dhis2", MockEndpoint.class ); fakeDhis2Endpoint.whenAnyExchangeReceived( @@ -167,8 +167,8 @@ public void testRecordInDeadLetterChannelIsCreatedGivenMissingDataSetCode() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - AdviceWith.adviceWith( camelContext, "Transmit Report", - r -> r.weaveByToUri( "dhis2://post/resource?path=dataValueSets&inBody=resource&client=#dhis2Client" ) + AdviceWith.adviceWith( camelContext, "transmitReport", + r -> r.weaveByToUri( "dhis2://post/resource?client=#dhis2Client&inBody=resource&path=dataValueSets" ) .replace().to( "mock:dhis2" ) ); MockEndpoint fakeDhis2Endpoint = camelContext.getEndpoint( "mock:dhis2", MockEndpoint.class ); fakeDhis2Endpoint.whenAnyExchangeReceived( @@ -203,12 +203,12 @@ public void testScheduledReportDelivery() { System.setProperty( "sync.rapidpro.contacts", "true" ); System.setProperty( "report.delivery.schedule.expression", "0 0/1 * * * ?" ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); - camelContext.getRouteController().stopRoute( "Schedule Report Delivery" ); + camelContext.getRouteController().stopRoute( "scheduleReportDelivery" ); String contactUuid = syncContactsAndFetchFirstContactUuid(); String webhookMessage = StreamUtils.copyToString( @@ -220,7 +220,7 @@ public void testScheduledReportDelivery() spyEndpoint.await( 30, TimeUnit.SECONDS ); assertEquals( 0, spyEndpoint.getReceivedCounter() ); - camelContext.getRouteController().startRoute( "Schedule Report Delivery" ); + camelContext.getRouteController().startRoute( "scheduleReportDelivery" ); spyEndpoint.await(); assertEquals( 1, spyEndpoint.getReceivedCounter() ); @@ -257,7 +257,7 @@ public void testRetryRecordInDeadLetterChannelIsReProcessed() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilderFunctionalTestCase.java index debc2ed9..a6eee449 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilderFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/PullRapidProFlowsRouteBuilderFunctionalTestCase.java @@ -29,23 +29,21 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.restassured.http.ContentType; -import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.builder.AdviceWith; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.model.TransformDefinition; import org.apache.camel.spi.CamelLogger; -import org.apache.camel.spring.boot.SpringBootCamelContext; import org.hisp.dhis.api.model.v40_0.DataValue; import org.hisp.dhis.api.model.v40_0.DataValueSet; import org.hisp.dhis.api.model.v40_0.WebMessage; import org.hisp.dhis.integration.rapidpro.AbstractFunctionalTestCase; import org.hisp.dhis.integration.rapidpro.Environment; +import org.hisp.dhis.integration.rapidpro.expression.FlowUuidsExpr; import org.hisp.dhis.integration.sdk.support.period.PeriodBuilder; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.util.ReflectionTestUtils; import java.io.IOException; import java.time.Instant; @@ -69,7 +67,7 @@ public class PullRapidProFlowsRouteBuilderFunctionalTestCase extends AbstractFun private String programStageEventFlowUuid; @Autowired - private PullRapidProFlowsRouteBuilder pullRapidProFlowsRouteBuilder; + private FlowUuidsExpr flowUuidsExpr; @Override public void doBeforeEach() @@ -93,9 +91,9 @@ public void testPullGivenNoAggregateOrEventFlowUuids() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - ReflectionTestUtils.setField( pullRapidProFlowsRouteBuilder, "aggregateReportFlowUuids", - "" ); - AdviceWith.adviceWith( camelContext, "Scan RapidPro Flows", r -> r.weaveAddLast().to( "mock:spy" ) ); + flowUuidsExpr.setAggregateReportFlowUuids( "" ); + + AdviceWith.adviceWith( camelContext, "scanRapidproFlows", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); @@ -109,9 +107,9 @@ public void testPullAggregateReportFlowGivenNoPriorFlowRun() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - ReflectionTestUtils.setField( pullRapidProFlowsRouteBuilder, "aggregateReportFlowUuids", - aggregateReportFlowUuid ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:spy" ) ); + flowUuidsExpr.setAggregateReportFlowUuids( aggregateReportFlowUuid ); + + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 0 ); camelContext.start(); @@ -127,9 +125,9 @@ public void testPullAggregateReportFlowGivenPriorFlowRun() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - ReflectionTestUtils.setField( pullRapidProFlowsRouteBuilder, "aggregateReportFlowUuids", - aggregateReportFlowUuid ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:spy" ) ); + flowUuidsExpr.setAggregateReportFlowUuids( aggregateReportFlowUuid ); + + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); camelContext.start(); @@ -176,7 +174,7 @@ public void testPullProgramStageEventFlowGivenNoPriorFlowRun() { System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); programStageToFlowMap.add( "program-stage-id", programStageEventFlowUuid ); - AdviceWith.adviceWith( camelContext, "Queue Program Stage Event", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "queueProgramStageEvent", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 0 ); camelContext.start(); @@ -192,7 +190,7 @@ public void testPullProgramStageEventFlowGivenPriorFlowRun() { System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); programStageToFlowMap.add( "program-stage-id", programStageEventFlowUuid ); - AdviceWith.adviceWith( camelContext, "Queue Program Stage Event", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "queueProgramStageEvent", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); String eventId = createTrackedEntityAndFetchEventId( "12345678" ); @@ -211,7 +209,7 @@ public void testPullProgramStageEventFlowWithMissingEventId() { System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); programStageToFlowMap.add( "program-stage-id", programStageEventFlowUuid ); - AdviceWith.adviceWith( camelContext, "Queue Program Stage Event", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "queueProgramStageEvent", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 0 ); createTrackedEntityAndFetchEventId( "12345678" ); @@ -254,14 +252,14 @@ public void testConsecutivePullsWithInterleavingFlowRuns() { System.setProperty( "sync.rapidpro.contacts", "true" ); System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); - ReflectionTestUtils.setField( pullRapidProFlowsRouteBuilder, "aggregateReportFlowUuids", - aggregateReportFlowUuid ); + flowUuidsExpr.setAggregateReportFlowUuids( aggregateReportFlowUuid ); + programStageToFlowMap.add( "program-stage-id", programStageEventFlowUuid ); - AdviceWith.adviceWith( camelContext, "Transmit Report", r -> r.weaveAddLast().to( "mock:reportSpy" ) ); + AdviceWith.adviceWith( camelContext, "transmitReport", r -> r.weaveAddLast().to( "mock:reportSpy" ) ); MockEndpoint reportSpyEndpoint = camelContext.getEndpoint( "mock:reportSpy", MockEndpoint.class ); - AdviceWith.adviceWith( camelContext, "Queue Program Stage Event", r -> r.weaveAddLast().to( "mock:eventSpy" ) ); + AdviceWith.adviceWith( camelContext, "queueProgramStageEvent", r -> r.weaveAddLast().to( "mock:eventSpy" ) ); MockEndpoint eventSpyEndpoint = camelContext.getEndpoint( "mock:eventSpy", MockEndpoint.class ); camelContext.start(); @@ -326,9 +324,9 @@ public void testPullFetchesFlowRunsByModifiedAtInAscendingOrder() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - ReflectionTestUtils.setField( pullRapidProFlowsRouteBuilder, "aggregateReportFlowUuids", - aggregateReportFlowUuid ); - AdviceWith.adviceWith( camelContext, "Queue Aggregate Report", + flowUuidsExpr.setAggregateReportFlowUuids( aggregateReportFlowUuid ); + + AdviceWith.adviceWith( camelContext, "queueAggregateReport", r -> r.weaveByType( TransformDefinition.class ).before().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); @@ -374,9 +372,8 @@ public void testPullGivenOrgUnitIdInFlowRunResult() Exception { System.setProperty( "sync.rapidpro.contacts", "true" ); - ReflectionTestUtils.setField( pullRapidProFlowsRouteBuilder, "aggregateReportFlowUuids", - aggregateReportFlowUuid ); - AdviceWith.adviceWith( camelContext, "Scan RapidPro Flows", + flowUuidsExpr.setAggregateReportFlowUuids( aggregateReportFlowUuid ); + AdviceWith.adviceWith( camelContext, "scanRapidproFlows", r -> r.weaveByToUri( "kamelet:hie-rapidpro-get-flow-runs-sink*" ).replace().to( "mock:rapidPro" ) ); MockEndpoint rapidProMockEndpoint = camelContext.getEndpoint( "mock:rapidPro", MockEndpoint.class ); rapidProMockEndpoint.whenAnyExchangeReceived( exchange -> { @@ -392,8 +389,8 @@ public void testPullGivenOrgUnitIdInFlowRunResult() exchange.getMessage().setBody( List.of( result ) ); } ); - AdviceWith.adviceWith( camelContext, "Transmit Report", - r -> r.weaveByToUri( "dhis2://post/resource?path=dataValueSets&inBody=resource&client=#dhis2Client" ) + AdviceWith.adviceWith( camelContext, "transmitReport", + r -> r.weaveByToUri( "dhis2://post/resource?client=#dhis2Client&inBody=resource&path=dataValueSets" ) .replace().to( "mock:dhis2" ) ); MockEndpoint fakeDhis2Endpoint = camelContext.getEndpoint( "mock:dhis2", MockEndpoint.class ); fakeDhis2Endpoint.setExpectedCount( 1 ); diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java index 26f7207f..69e7b091 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/QueueProgramStageEventsRouteBuilderFunctionalTestCase.java @@ -49,9 +49,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.hisp.dhis.integration.rapidpro.Environment.DHIS2_CLIENT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class QueueProgramStageEventsRouteBuilderFunctionalTestCase extends AbstractFunctionalTestCase { @@ -86,7 +88,7 @@ public void testDueEventsCountWithSingleProgramStage() Exception { Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 10, List.of( "ZP5HZ87wzc0" ) ); - AdviceWith.adviceWith( camelContext, "Fetch Due Events", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchDueEvents", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.expectedMessageCount( 1 ); camelContext.start(); @@ -108,7 +110,7 @@ public void testDueEventsCountWithMultipleProgramStages() List.of( "Ish2wk3eLg3", "ZP5HZ87wzc0" ) ); Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 2, List.of( "Ish2wk3eLg3" ) ); - AdviceWith.adviceWith( camelContext, "Fetch Due Events", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchDueEvents", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.expectedMessageCount( 1 ); camelContext.start(); @@ -125,7 +127,7 @@ public void testDueEventsCountWithZeroProgramStages() throws Exception { - AdviceWith.adviceWith( camelContext, "Fetch Due Events", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchDueEvents", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.expectedMessageCount( 1 ); camelContext.start(); @@ -146,7 +148,7 @@ public void testDueEventsCountUnderLoad() Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 50, List.of( "Ish2wk3eLg3" ) ); Environment.createDhis2TrackedEntitiesWithEnrollment( Environment.ORG_UNIT_ID, 250, List.of( "Ish2wk3eLg3", "ZP5HZ87wzc0" ) ); - AdviceWith.adviceWith( camelContext, "Fetch Due Events", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchDueEvents", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.expectedMessageCount( 1 ); camelContext.start(); @@ -165,7 +167,7 @@ public void testFetchTrackedEntityId() { String enrollmentId = Environment.createDhis2TrackedEntityWithEnrollment( Environment.ORG_UNIT_ID, "1234", "ID-1234", "John", List.of( "ZP5HZ87wzc0" ) ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); @@ -189,7 +191,7 @@ public void testAttributesEndpointWhenProgramAttributes() { String enrollmentId = Environment.createDhis2TrackedEntityWithEnrollment( Environment.ORG_UNIT_ID, "1234", "ID-12345", "John", List.of( "ZP5HZ87wzc0" ) ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); @@ -216,13 +218,13 @@ public void testAttributesEndpointWhenTypeAttributes() .returnAs( Map.class ).get( "trackedEntity" ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> { + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> { r.interceptSendToEndpoint( "dhis2://get/resource?path=tracker/enrollments/" + enrollmentId + "&fields=trackedEntity,attributes[code]&client=#dhis2Client" ) .skipSendToOriginalEndpoint() .setBody( exchange -> "{\"trackedEntity\": \"" + trackedEntityId + "\", \"attributes\": [] }" ); } ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); @@ -247,7 +249,7 @@ public void testFetchAttributesWhenTrackedEntityProgramAttributes() { String enrollmentId = Environment.createDhis2TrackedEntityWithEnrollment( Environment.ORG_UNIT_ID, "12345678", "ID-1234567", "John", List.of( "ZP5HZ87wzc0" ) ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); @@ -275,7 +277,7 @@ public void testFetchAttributesWhenTrackedEntityTypeAttributes() .returnAs( Map.class ).get( "trackedEntity" ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> { + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> { r.interceptSendToEndpoint( "dhis2://get/resource?path=tracker/enrollments/" + enrollmentId + "&fields=trackedEntity,attributes[code]&client=#dhis2Client" ) .skipSendToOriginalEndpoint() @@ -292,7 +294,7 @@ public void testFetchAttributesWhenTrackedEntityTypeAttributes() ) ) .withOrgUnit( Environment.ORG_UNIT_ID ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> { + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> { r.interceptSendToEndpoint( "dhis2://get/resource?path=tracker/trackedEntities/" + trackedEntityId + "&fields=attributes[attribute,code,value]&client=#dhis2Client" ) .skipSendToOriginalEndpoint() @@ -308,7 +310,7 @@ public void testFetchAttributesWhenTrackedEntityTypeAttributes() } } ); } ); - AdviceWith.adviceWith( camelContext, "Fetch Attributes", r -> r.weaveAddLast().to( "mock:spy" ) ); + AdviceWith.adviceWith( camelContext, "fetchAttributes", r -> r.weaveAddLast().to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); camelContext.start(); @@ -362,7 +364,7 @@ public void testEventTransformationReturnsExpectedValues() String programStage = "ZP5HZ87wzc0"; String enrollmentId = Environment.createDhis2TrackedEntityWithEnrollment( Environment.ORG_UNIT_ID, phoneNumber, "ID-1", givenName, List.of( programStage ) ); - AdviceWith.adviceWith( camelContext, "Queue Program Stage Events", + AdviceWith.adviceWith( camelContext, "queueProgramStageEvents", r -> r.interceptSendToEndpoint( "jms:queue:events?exchangePattern=InOnly" ).to( "mock:spy" ).stop() ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 1 ); @@ -380,7 +382,7 @@ public void testEventTransformationReturnsExpectedValues() } @Test - public void testUpdateDhisProgramStageEventStatusWithValidEventId() + public void testUpdateDhis2ProgramStageEventStatusWithValidEventId() throws Exception { @@ -394,8 +396,8 @@ public void testUpdateDhisProgramStageEventStatusWithValidEventId() CountDownLatch expectedLogMessage = new CountDownLatch( 1 ); camelContext.getCamelContextExtension() .addLogListener( ( Exchange exchange, CamelLogger camelLogger, String message ) -> { - if ( camelLogger.getLevel().name().equals( "DEBUG" ) && message.startsWith( - String.format( "Successfully updated DHIS program stage event status for event with ID => %s", + if ( message.startsWith( + String.format( "Successfully updated DHIS2 program stage event status for event with ID => %s", event.get( "event" ) ) ) ) { expectedLogMessage.countDown(); @@ -404,9 +406,8 @@ public void testUpdateDhisProgramStageEventStatusWithValidEventId() } ); camelContext.start(); - producerTemplate.sendBodyAndProperty( "direct:updateDhisProgramStageEventStatus", null, "eventPayload", event ); - Thread.sleep( 1000 ); - assertEquals( 0, expectedLogMessage.getCount() ); + producerTemplate.sendBodyAndProperty( "direct:updateDhis2ProgramStageEventStatus", null, "eventPayload", event ); + assertTrue( expectedLogMessage.await( 5000, TimeUnit.MILLISECONDS ) ); } @Test @@ -418,15 +419,14 @@ public void testQueueEvents() CountDownLatch expectedLogMessage = new CountDownLatch( 10 ); camelContext.getCamelContextExtension() .addLogListener( ( Exchange exchange, CamelLogger camelLogger, String message ) -> { - if ( camelLogger.getLevel().name().equals( "DEBUG" ) && message.startsWith( - "Successfully updated DHIS program stage event status for event with ID =>" ) ) + if ( message.startsWith( "Successfully updated DHIS2 program stage event status for event with ID =>" ) ) { expectedLogMessage.countDown(); } return message; } ); - AdviceWith.adviceWith( camelContext, "Queue Program Stage Events", + AdviceWith.adviceWith( camelContext, "queueProgramStageEvents", r -> r.interceptSendToEndpoint( "jms:queue:events?exchangePattern=InOnly" ).to( "mock:spy" ) ); MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); spyEndpoint.setExpectedCount( 10 ); diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteTestCase.java index e2572a1b..4f6ab09f 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/SetUpRapidProRouteTestCase.java @@ -72,7 +72,7 @@ public void testDhis2OrgUnitIdAndDhis2UserIdFieldsAreCreatedWhenTheyDoNotExistOn throws Exception { - AdviceWith.adviceWith( camelContext, "Create RapidPro Fields", + AdviceWith.adviceWith( camelContext, "createRapidproFields", r -> r.interceptSendToEndpoint( "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) .skipSendToOriginalEndpoint().setBody( @@ -85,7 +85,7 @@ public T evaluate( Exchange exchange, Class type ) } } ) ); - AdviceWith.adviceWith( camelContext, "Create RapidPro Fields", + AdviceWith.adviceWith( camelContext, "createRapidproFields", r -> r.weaveByToUri( "kamelet:hie-rapidpro-create-field-sink*" ) .replace().to( "mock:hie-rapidpro-create-field-sink" ) ); @@ -104,7 +104,7 @@ public void testDhis2OrgUnitIdFieldIsCreatedWhenItDoesNotExistOnRapidPro() throws Exception { - AdviceWith.adviceWith( camelContext, "Create RapidPro Fields", + AdviceWith.adviceWith( camelContext, "createRapidproFields", r -> r.interceptSendToEndpoint( "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) .skipSendToOriginalEndpoint().setBody( @@ -122,7 +122,7 @@ public T evaluate( Exchange exchange, Class type ) } ) ); - AdviceWith.adviceWith( camelContext, "Create RapidPro Fields", + AdviceWith.adviceWith( camelContext, "createRapidproFields", r -> r.weaveByToUri( "kamelet:hie-rapidpro-create-field-sink*" ) .replace().to( "mock:hie-rapidpro-create-field-sink" ) ); @@ -140,7 +140,7 @@ public void testDhis2UserIdFieldIsCreatedWhenItDoesNotExistOnRapidPro() throws Exception { - AdviceWith.adviceWith( camelContext, "Create RapidPro Fields", + AdviceWith.adviceWith( camelContext, "createRapidproFields", r -> r.interceptSendToEndpoint( "kamelet:hie-rapidpro-get-fields-sink?rapidProApiToken={{rapidpro.api.token}}&rapidProApiUrl={{rapidpro.api.url}}" ) .skipSendToOriginalEndpoint().setBody( @@ -158,7 +158,7 @@ public T evaluate( Exchange exchange, Class type ) } ) ); - AdviceWith.adviceWith( camelContext, "Create RapidPro Fields", + AdviceWith.adviceWith( camelContext, "createRapidproFields", r -> r.weaveByToUri( "kamelet:hie-rapidpro-create-field-sink*" ) .replace().to( "mock:hie-rapidpro-create-field-sink" ) ); diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index cc940e0f..6637caec 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -3,4 +3,5 @@ dhis2.api.password = district server.ssl.enabled=false spring.datasource.url=jdbc:h2:./target/dhis2rapidpro;AUTO_SERVER=TRUE;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH;CASE_INSENSITIVE_IDENTIFIERS=TRUE logging.file.name=target/dhis2rapidpro.log +logging.level.updateDhis2ProgramStageEventStatus=DEBUG camel.springboot.routes-reload-directory=target/routes \ No newline at end of file diff --git a/src/test/resources/deliverReport.yaml b/src/test/resources/deliverReport.yaml index dbe9d5ec..78a6173b 100644 --- a/src/test/resources/deliverReport.yaml +++ b/src/test/resources/deliverReport.yaml @@ -1,5 +1,5 @@ - route: - id: "Transmit Report" + id: "transmitReport" from: uri: "direct:transmitReport" steps: