Skip to content

Commit 6f45623

Browse files
authored
Merge pull request #7 from CDOT-CV/tim-compatibility
Tim compatibility and CI updates
2 parents 2ef6d89 + 69113ca commit 6f45623

File tree

14 files changed

+161
-42
lines changed

14 files changed

+161
-42
lines changed

.github/workflows/ci.yml

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
name: CI
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- 'jpo-deduplicator/**'
7+
8+
jobs:
9+
deduplicator-test:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- name: Checkout Repository
13+
uses: actions/checkout@v4
14+
15+
- name: Set up JDK
16+
uses: actions/setup-java@v4
17+
with:
18+
java-version: "21"
19+
distribution: "temurin"
20+
cache: 'maven'
21+
22+
- name: Run tests
23+
env:
24+
MAVEN_GITHUB_TOKEN: ${{ secrets.MAVEN_GITHUB_TOKEN }}
25+
MAVEN_GITHUB_ORG: ${{ github.repository_owner }}
26+
run: |
27+
cd $GITHUB_WORKSPACE/jpo-deduplicator/jpo-deduplicator
28+
mvn verify -s ./settings.xml

.github/workflows/docker.yml

+30-1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,33 @@ jobs:
2323
secrets: |
2424
MAVEN_GITHUB_TOKEN: ${{ secrets.MAVEN_GITHUB_TOKEN }}
2525
cache-from: type=gha
26-
cache-to: type=gha,mode=max
26+
cache-to: type=gha,mode=max
27+
28+
jpo-jikkou:
29+
runs-on: ubuntu-latest
30+
steps:
31+
- name: Checkout
32+
uses: actions/checkout@v3
33+
- name: Set up Docker Buildx
34+
uses: docker/setup-buildx-action@v2
35+
- name: Build
36+
uses: docker/build-push-action@v3
37+
with:
38+
context: jikkou
39+
file: jikkou/Dockerfile.jikkou
40+
cache-from: type=gha
41+
cache-to: type=gha,mode=max
42+
43+
jpo-kafka-connect:
44+
runs-on: ubuntu-latest
45+
steps:
46+
- name: Checkout
47+
uses: actions/checkout@v3
48+
- name: Set up Docker Buildx
49+
uses: docker/setup-buildx-action@v2
50+
- name: Build
51+
uses: docker/build-push-action@v3
52+
with:
53+
context: kafka-connect
54+
cache-from: type=gha
55+
cache-to: type=gha,mode=max

.github/workflows/dockerhub.yml

+54-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
username: ${{ secrets.DOCKERHUB_USERNAME }}
2222
password: ${{ secrets.DOCKERHUB_TOKEN }}
2323

24-
- name: Replcae Docker tag
24+
- name: Replace Docker tag
2525
id: set_tag
2626
run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV
2727

@@ -38,4 +38,56 @@ jobs:
3838
secrets: |
3939
MAVEN_GITHUB_TOKEN: ${{ secrets.MAVEN_GITHUB_TOKEN }}
4040
cache-from: type=gha
41-
cache-to: type=gha,mode=max
41+
cache-to: type=gha,mode=max
42+
43+
dockerhub-jpo-jikkou:
44+
runs-on: ubuntu-latest
45+
steps:
46+
- name: Checkout
47+
uses: actions/checkout@v3
48+
- name: Set up Docker Buildx
49+
uses: docker/setup-buildx-action@v2
50+
- name: Login to DockerHub
51+
uses: docker/login-action@v2
52+
with:
53+
username: ${{ secrets.DOCKERHUB_USERNAME }}
54+
password: ${{ secrets.DOCKERHUB_TOKEN }}
55+
56+
- name: Replace Docker tag
57+
id: set_tag
58+
run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV
59+
60+
- name: Build
61+
uses: docker/build-push-action@v3
62+
with:
63+
file: jikkou/Dockerfile.jikkou
64+
push: true
65+
tags: usdotjpoode/jpo-jikkou:${{ env.TAG }}
66+
cache-from: type=gha
67+
cache-to: type=gha,mode=max
68+
69+
dockerhub-jpo-kafka-connect:
70+
runs-on: ubuntu-latest
71+
steps:
72+
- name: Checkout
73+
uses: actions/checkout@v3
74+
- name: Set up Docker Buildx
75+
uses: docker/setup-buildx-action@v2
76+
- name: Login to DockerHub
77+
uses: docker/login-action@v2
78+
with:
79+
username: ${{ secrets.DOCKERHUB_USERNAME }}
80+
password: ${{ secrets.DOCKERHUB_TOKEN }}
81+
82+
- name: Replace Docker tag
83+
id: set_tag
84+
run: echo "TAG=$(echo ${GITHUB_REF##*/} | sed 's/\//-/g')" >> $GITHUB_ENV
85+
86+
- name: Build
87+
uses: docker/build-push-action@v3
88+
with:
89+
context: kafka-connect
90+
push: true
91+
tags: usdotjpoode/jpo-kafka-connect:${{ env.TAG }}
92+
cache-from: type=gha
93+
cache-to: type=gha,mode=max

docker-compose-kafka.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ services:
121121
- kafka_full
122122
- kafka_ui
123123
hostname: kafka-ui
124-
image: ghcr.io/kafbat/kafka-ui:v1.0.0
124+
image: ghcr.io/kafbat/kafka-ui:v1.1.0
125125
restart: ${RESTART_POLICY}
126126
deploy:
127127
resources:

jpo-deduplicator/jpo-deduplicator/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@
208208
<plugin>
209209
<groupId>org.springframework.boot</groupId>
210210
<artifactId>spring-boot-maven-plugin</artifactId>
211-
<configuration>
211+
<!-- <configuration>
212212
<jvmArguments>@{argLine}</jvmArguments>
213-
</configuration>
213+
</configuration> -->
214214
<executions>
215215
<execution>
216216
<id>build-info</id>

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/DeduplicatorServiceController.java

+7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
4747
try {
4848

4949
if(props.isEnableProcessedMapDeduplication()){
50+
logger.info("Starting Processed Map Deduplicator");
5051
ProcessedMapDeduplicatorTopology processedMapDeduplicatorTopology = new ProcessedMapDeduplicatorTopology(
5152
props,
5253
props.createStreamProperties("ProcessedMapDeduplicator")
@@ -55,6 +56,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
5556
}
5657

5758
if(props.isEnableProcessedMapWktDeduplication()){
59+
logger.info("Starting Processed Map WKT Deduplicator");
5860
ProcessedMapWktDeduplicatorTopology processedMapWktDeduplicatorTopology = new ProcessedMapWktDeduplicatorTopology(
5961
props,
6062
props.createStreamProperties("ProcessedMapWKTdeduplicator")
@@ -63,6 +65,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
6365
}
6466

6567
if(props.isEnableProcessedMapDeduplication()){
68+
logger.info("Starting Map Deduplicator");
6669
MapDeduplicatorTopology mapDeduplicatorTopology = new MapDeduplicatorTopology(
6770
props,
6871
props.createStreamProperties("MapDeduplicator")
@@ -71,6 +74,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
7174
}
7275

7376
if(props.isEnableOdeTimDeduplication()){
77+
logger.info("Starting Tim Deduplicator");
7478
TimDeduplicatorTopology timDeduplicatorTopology = new TimDeduplicatorTopology(
7579
props,
7680
props.createStreamProperties("TimDeduplicator")
@@ -79,6 +83,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
7983
}
8084

8185
if(props.isEnableOdeRawEncodedTimDeduplication()){
86+
logger.info("Starting Raw Encoded TIM Deduplicator");
8287
OdeRawEncodedTimDeduplicatorTopology odeRawEncodedTimDeduplicatorTopology = new OdeRawEncodedTimDeduplicatorTopology(
8388
props,
8489
props.createStreamProperties("OdeRawEncodedTimDeduplicator")
@@ -87,6 +92,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
8792
}
8893

8994
if(props.isEnableProcessedSpatDeduplication()){
95+
logger.info("Starting Processed Spat Deduplicator");
9096
ProcessedSpatDeduplicatorTopology processedSpatDeduplicatorTopology = new ProcessedSpatDeduplicatorTopology(
9197
props,
9298
props.createStreamProperties("ProcessedSpatDeduplicator")
@@ -95,6 +101,7 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
95101
}
96102

97103
if(props.isEnableOdeBsmDeduplication()){
104+
logger.info("Starting BSM Deduplicator");
98105
BsmDeduplicatorTopology bsmDeduplicatorTopology = new BsmDeduplicatorTopology(props);
99106
bsmDeduplicatorTopology.start();
100107
}

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/OdeTimJsonProcessor.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
import java.time.Instant;
55
import java.time.format.DateTimeFormatter;
66

7-
import com.fasterxml.jackson.databind.JsonNode;
8-
97
import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
8+
import us.dot.its.jpo.ode.model.OdeTimData;
9+
import us.dot.its.jpo.ode.model.OdeTimMetadata;
1010

11-
public class OdeTimJsonProcessor extends DeduplicationProcessor<JsonNode>{
11+
public class OdeTimJsonProcessor extends DeduplicationProcessor<OdeTimData>{
1212

1313
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
1414

@@ -20,17 +20,18 @@ public OdeTimJsonProcessor(DeduplicatorProperties props){
2020

2121

2222
@Override
23-
public Instant getMessageTime(JsonNode message) {
23+
public Instant getMessageTime(OdeTimData message) {
2424
try {
25-
String time = message.get("metadata").get("odeReceivedAt").asText();
25+
// String time = message.get("metadata").get("odeReceivedAt").asText();
26+
String time = ((OdeTimMetadata)message.getMetadata()).getOdeReceivedAt();
2627
return Instant.from(formatter.parse(time));
2728
} catch (Exception e) {
2829
return Instant.ofEpochMilli(0);
2930
}
3031
}
3132

3233
@Override
33-
public boolean isDuplicate(JsonNode lastMessage, JsonNode newMessage) {
34+
public boolean isDuplicate(OdeTimData lastMessage, OdeTimData newMessage) {
3435
Instant oldValueTime = getMessageTime(lastMessage);
3536
Instant newValueTime = getMessageTime(newMessage);
3637

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/OdeTimJsonProcessorSupplier.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import org.apache.kafka.streams.processor.api.Processor;
44
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
5-
import com.fasterxml.jackson.databind.JsonNode;
65

76
import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
87
import us.dot.its.jpo.deduplicator.deduplicator.processors.OdeTimJsonProcessor;
8+
import us.dot.its.jpo.ode.model.OdeTimData;
99

10-
public class OdeTimJsonProcessorSupplier implements ProcessorSupplier<String, JsonNode, String, JsonNode> {
10+
public class OdeTimJsonProcessorSupplier implements ProcessorSupplier<String, OdeTimData, String, OdeTimData> {
1111

1212
String storeName;
1313
DeduplicatorProperties props;
@@ -16,7 +16,7 @@ public OdeTimJsonProcessorSupplier(DeduplicatorProperties props){
1616
}
1717

1818
@Override
19-
public Processor<String, JsonNode, String, JsonNode> get() {
19+
public Processor<String, OdeTimData, String, OdeTimData> get() {
2020
return new OdeTimJsonProcessor(props);
2121
}
2222
}

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/serialization/JsonSerdes.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import us.dot.its.jpo.ode.model.OdeTimData;
1111

1212
public class JsonSerdes {
13-
public static Serde<OdeTimData> Tim() {
13+
public static Serde<OdeTimData> OdeTim() {
1414
return Serdes.serdeFrom(
1515
new JsonSerializer<OdeTimData>(),
1616
new JsonDeserializer<>(OdeTimData.class));

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/OdeRawEncodedTimDeduplicatorTopology.java

-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ public Topology buildTopology() {
8383

8484
KStream<String, JsonNode> timRekeyedStream = inputStream.selectKey((key, value)->{
8585
try{
86-
8786
String messageBytes = value.get("payload")
8887
.get("data")
8988
.get("bytes").asText();

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedSpatDeduplicatorTopology.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public Topology buildTopology() {
6666
Serdes.String(), JsonSerdes.ProcessedSpat()));
6767

6868
KStream<String, ProcessedSpat> deduplicatedStream = inputStream.process(new ProcessedSpatProcessorSupplier(props), props.getKafkaStateStoreProcessedSpatName());
69+
6970

7071
deduplicatedStream.to(props.getKafkaTopicDeduplicatedProcessedSpat(), Produced.with(Serdes.String(), JsonSerdes.ProcessedSpat()));
7172

jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/TimDeduplicatorTopology.java

+21-19
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
import org.apache.kafka.streams.KafkaStreams.StateListener;
88
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
99

10-
import us.dot.its.jpo.deduplicator.deduplicator.serialization.JsonSerdes;
1110
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
11+
import us.dot.its.jpo.ode.model.OdeTimData;
12+
import us.dot.its.jpo.ode.model.OdeTimMetadata;
13+
import us.dot.its.jpo.ode.model.OdeTimPayload;
14+
import us.dot.its.jpo.ode.plugin.j2735.OdeTravelerInformationMessage;
15+
import us.dot.its.jpo.ode.plugin.j2735.travelerinformation.TravelerInformation;
1216

1317
import org.apache.kafka.streams.kstream.*;
1418
import org.apache.kafka.streams.state.Stores;
@@ -23,6 +27,7 @@
2327

2428
import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
2529
import us.dot.its.jpo.deduplicator.deduplicator.processors.suppliers.OdeTimJsonProcessorSupplier;
30+
import us.dot.its.jpo.deduplicator.deduplicator.serialization.JsonSerdes;
2631

2732

2833
public class TimDeduplicatorTopology {
@@ -65,37 +70,34 @@ public JsonNode genJsonNode() {
6570
public Topology buildTopology() {
6671
StreamsBuilder builder = new StreamsBuilder();
6772

68-
KStream<Void, JsonNode> inputStream = builder.stream(props.getKafkaTopicOdeTimJson(),
69-
Consumed.with(Serdes.Void(), JsonSerdes.JSON()));
73+
KStream<String, OdeTimData> inputStream = builder.stream(props.getKafkaTopicOdeTimJson(),
74+
Consumed.with(Serdes.String(), JsonSerdes.OdeTim()));
7075

7176
builder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(props.getKafkaStateStoreOdeTimJsonName()),
72-
Serdes.String(), JsonSerdes.JSON()));
77+
Serdes.String(), JsonSerdes.OdeTim()));
7378

74-
75-
76-
KStream<String, JsonNode> timRekeyedStream = inputStream.selectKey((key, value) -> {
79+
KStream<String, OdeTimData> timRekeyedStream = inputStream.selectKey((key, value) -> {
7780
try {
78-
79-
JsonNode travellerInformation = value.get("payload")
80-
.get("data")
81-
.get("MessageFrame")
82-
.get("value")
83-
.get("TravelerInformation");
8481

85-
String rsuIP = value.get("metadata").get("originIp").asText();
86-
String packetId = travellerInformation.get("packetID").asText();
87-
String msgCnt = travellerInformation.get("msgCnt").asText();
82+
TravelerInformation travellerInformation = (TravelerInformation)value.getPayload().getData();
83+
84+
85+
String rsuIP = ((OdeTimMetadata)value.getMetadata()).getOriginIp();
86+
// String packetId = ((OdeTimPayload)value.getPayload()).getData();// .get("packetID").asText();
87+
String packetId = travellerInformation.getPacketID().toString();
88+
int msgCnt = travellerInformation.getMsgCnt().intValue();
8889

8990
String newKey = rsuIP + "_" + packetId + "_" + msgCnt;
9091
return newKey;
9192
} catch (Exception e) {
93+
System.out.println(e);
9294
return "";
9395
}
94-
}).repartition(Repartitioned.with(Serdes.String(), JsonSerdes.JSON()));
96+
}).repartition(Repartitioned.with(Serdes.String(), JsonSerdes.OdeTim()));
9597

96-
KStream<String, JsonNode> deduplicatedStream = timRekeyedStream.process(new OdeTimJsonProcessorSupplier(props), props.getKafkaStateStoreOdeTimJsonName());
98+
KStream<String, OdeTimData> deduplicatedStream = timRekeyedStream.process(new OdeTimJsonProcessorSupplier(props), props.getKafkaStateStoreOdeTimJsonName());
9799

98-
deduplicatedStream.to(props.getKafkaTopicDeduplicatedOdeTimJson(), Produced.with(Serdes.String(), JsonSerdes.JSON()));
100+
deduplicatedStream.to(props.getKafkaTopicDeduplicatedOdeTimJson(), Produced.with(Serdes.String(), JsonSerdes.OdeTim()));
99101

100102
return builder.build();
101103

jpo-deduplicator/jpo-deduplicator/src/main/resources/application.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ odeBsmAlwaysIncludeAtSpeed: 1 # Meter / Second
5050
# Processed Map Configuration
5151
kafkaTopicProcessedSpat: topic.ProcessedSpat
5252
kafkaTopicDeduplicatedProcessedSpat: topic.DeduplicatedProcessedSpat
53-
enableProcessedSpatDeduplication: true
53+
enableProcessedSpatDeduplication: true
5454

5555

5656
# Amount of time to wait to try and increase batching

0 commit comments

Comments
 (0)