Assuming you have Kafka broker and Zookeeper running locally in your environment with default settings. Following topics need to be created to run this program.
This topic has sample events generated by PolicyEventGenerator
class.
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic policyPaid
This topic will have a generated stream created by PolicyEventProcessor
class.
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic policyAnalytic
Make a note to how the binding to the actual Kafka topic happens through application.properties. The actual code still remains independant of your underlying messaging platform. This can bea strong design pattern over time.
- The property 'useNativeEnCoding' and 'useNativeDecoding' is very important. By setting these values to be 'true' we are letting Kafka stream serializer serialize and deserialize the objects. In this example since, I want Long value to be serialized by using LongSerde. In thise case, Kafka stream serializer should be enabled by setting followng property,
spring.cloud.stream.bindings.policyPaidAnalytic.producer.useNativeEncoding=true
- For the
policyPaid
topic, I need the application/json content-type to be serialized by spring-kafka binder implementation. For this topic the encoding property is set to false. Followig property is commented out,
#spring.cloud.stream.bindings.policyPaid.producer.useNativeEncoding=true