Skip to content

Commit 3f21b0b

Browse files
authored
Merge pull request #3 from CDOT-CV/deduplicator--ode-4.0-compatibility
Updated Deduplicator for Compatibility with ode 4.0
2 parents 6e701ef + 3c7e1de commit 3f21b0b

File tree

5 files changed

+22
-19
lines changed

5 files changed

+22
-19
lines changed

README.md

+1-3
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,7 @@ The JPO-Deduplicator is a Kafka Java spring-boot application designed to reduce
205205

206206
### Deduplication Config
207207

208-
When running the jpo-deduplication as a submodule in jpo-utils, the deduplicator will automatically turn on deduplication for a topic when that topic is created. For example if the KAFKA_TOPIC_CREATE_GEOJSONCONVERTER environment variable is set to true, the deduplicator will start performing deduplication for ProcessedMap, ProcessedMapWKT, and ProcessedSpat data.
209-
210-
To manually configure deduplication for a topic, the following environment variables can also be used.
208+
When running the jpo-deduplication as a submodule in jpo-utils, the deduplicator will automatically configure an algorithm as enabled or disabled depending on if the corresponding subcomponent is also active. For example if the KAFKA_TOPIC_CREATE_GEOJSONCONVERTER environment variable is set to true, the deduplicator will start performing deduplication for ProcessedMap, ProcessedMapWKT, and ProcessedSpat data. If the KAFKA_TOPIC_CREATE_GEOJSONCONVERTER is set to false, the deduplicator will disable deduplication for those same topics. To manually configure deduplication for a topic, the following environment variables can also be used. If no value is passed for a given environment variable, the corresponding deduplication algorithm will default to enabled.
211209

212210
| Environment Variable | Description |
213211
|---|---|

jpo-deduplicator/jpo-deduplicator/pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -101,23 +101,23 @@
101101
<dependency>
102102
<groupId>usdot.jpo.ode</groupId>
103103
<artifactId>jpo-ode-core</artifactId>
104-
<version>3.0.0</version>
104+
<version>4.0.0</version>
105105
</dependency>
106106
<dependency>
107107
<groupId>usdot.jpo.ode</groupId>
108108
<artifactId>jpo-ode-plugins</artifactId>
109-
<version>3.0.0</version>
109+
<version>4.0.0</version>
110110
</dependency>
111111
<dependency>
112112
<groupId>usdot.jpo.ode</groupId>
113113
<artifactId>jpo-geojsonconverter</artifactId>
114-
<version>1.4.2</version>
114+
<version>2.0.0</version>
115115
</dependency>
116116
<dependency>
117117
<groupId>usdot.jpo.ode</groupId>
118118
<artifactId>jpo-conflictmonitor</artifactId>
119119
<version>1.5.0</version>
120-
</dependency>
120+
</dependency>
121121
<dependency>
122122
<groupId>junit</groupId>
123123
<artifactId>junit</artifactId>

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedSpatDeduplicatorTopology.java

-2
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ public Topology buildTopology() {
6666
Serdes.String(), JsonSerdes.ProcessedSpat()));
6767

6868
KStream<String, ProcessedSpat> deduplicatedStream = inputStream.process(new ProcessedSpatProcessorSupplier(props), props.getKafkaStateStoreProcessedSpatName());
69-
70-
deduplicatedStream.print(Printed.toSysOut());
7169

7270
deduplicatedStream.to(props.getKafkaTopicDeduplicatedProcessedSpat(), Produced.with(Serdes.String(), JsonSerdes.ProcessedSpat()));
7371

jpo-deduplicator/jpo-deduplicator/src/main/resources/application.yaml

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,27 @@ log4j.logger.org.apache.kafka: OFF
1717
# Processed Map Configuration
1818
kafkaTopicProcessedMap: topic.ProcessedMap
1919
kafkaTopicDeduplicatedProcessedMap: topic.DeduplicatedProcessedMap
20-
enableProcessedMapDeduplication: false
20+
enableProcessedMapDeduplication: true
2121

2222
# Processed Map WKT Configuration
2323
kafkaTopicProcessedMapWKT: topic.ProcessedMapWKT
2424
kafkaTopicDeduplicatedProcessedMapWKT: topic.DeduplicatedProcessedMapWKT
25-
enableProcessedMapWktDeduplication: false
25+
enableProcessedMapWktDeduplication: true
2626

2727
# Ode Map Json Configuration
2828
kafkaTopicOdeMapJson: topic.OdeMapJson
2929
kafkaTopicDeduplicatedOdeMapJson: topic.DeduplicatedOdeMapJson
30-
enableOdeMapDeduplication: false
30+
enableOdeMapDeduplication: true
3131

3232
# Ode Tim Json Configuration
3333
kafkaTopicOdeTimJson: topic.OdeTimJson
3434
kafkaTopicDeduplicatedOdeTimJson: topic.DeduplicatedOdeTimJson
35-
enableOdeTimDeduplication: false
35+
enableOdeTimDeduplication: true
3636

3737
# Ode Raw Encoded Tim Json Configuration
3838
kafkaTopicOdeRawEncodedTimJson: topic.OdeRawEncodedTIMJson
3939
kafkaTopicDeduplicatedOdeRawEncodedTimJson: topic.DeduplicatedOdeRawEncodedTIMJson
40-
enableOdeRawEncodedTimDeduplication: false
40+
enableOdeRawEncodedTimDeduplication: true
4141

4242
# Ode Bsm Json Configuration
4343
kafkaTopicOdeBsmJson: topic.OdeBsmJson
@@ -50,7 +50,7 @@ odeBsmAlwaysIncludeAtSpeed: 1 # Meter / Second
5050
# Processed Map Configuration
5151
kafkaTopicProcessedSpat: topic.ProcessedSpat
5252
kafkaTopicDeduplicatedProcessedSpat: topic.DeduplicatedProcessedSpat
53-
enableProcessedSpatDeduplication: false
53+
enableProcessedSpatDeduplication: true
5454

5555

5656
# Amount of time to wait to try and increase batching

jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/MapDeduplicatorTopologyTest.java

+11-4
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)