diff --git a/crm-microservice/pom.xml b/crm-microservice/pom.xml new file mode 100644 index 0000000..ca1e9ad --- /dev/null +++ b/crm-microservice/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.3.1.RELEASE + + + com.example + crm-microservice + 0.0.1-SNAPSHOT + crm-microservice + Spring Boot Project for CRM Service + + + 11 + + + + + org.springframework.boot + spring-boot-starter-data-mongodb + + + org.springframework.boot + spring-boot-starter-web + + + io.springfox + springfox-swagger2 + 2.9.2 + + + org.hibernate + hibernate-validator + 6.1.5.Final + + + io.swagger + swagger-annotations + 1.5.24 + + + io.springfox + springfox-swagger-ui + 2.9.2 + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + org.springframework.kafka + spring-kafka + + + org.springframework.kafka + spring-kafka-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/crm-microservice/src/main/java/com/example/crm/config/SwaggerConfig.java b/crm-microservice/src/main/java/com/example/crm/config/SwaggerConfig.java new file mode 100644 index 0000000..1e1fd22 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/config/SwaggerConfig.java @@ -0,0 +1,74 @@ +package com.example.crm.config; + +import java.util.Date; + +import javax.servlet.ServletContext; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +import springfox.documentation.builders.ApiInfoBuilder; +import springfox.documentation.builders.PathSelectors; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.paths.RelativePathProvider; +import springfox.documentation.spring.web.plugins.Docket; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + +/** + * + * @author Binnur Kurt + * + */ +@Configuration +@EnableSwagger2 +public class SwaggerConfig implements WebMvcConfigurer { + @Value("${major.version}") + private String majorVersion; + @Value("${minor.version}") + private String minorVersion; + @Value("${timestamp}") + private long timestamp; + @Value("${server.servlet.context-path}") + private String contextPath; + @Value("${spring.mvc.servlet.path}") + private String servletPath; + + @Value("${server.address}") + private String host; + + @Value("${server.port}") + private long port; + + @Bean + public Docket api(ServletContext servletContext) { + return new Docket(DocumentationType.SWAGGER_2).select().apis(RequestHandlerSelectors.any()) + .paths(PathSelectors.any()).build().host(host.concat(":").concat(Long.toString(port))) + .pathProvider(new RelativePathProvider(servletContext) { + @Override + public String getApplicationBasePath() { + return contextPath; + } + }).apiInfo(apiInfo()); + } + + private ApiInfo apiInfo() { + + return new ApiInfoBuilder().title("Market Service") + .description("Client FrontEnd API

Updated: [" + (new Date(timestamp)).toString() + + " ]" + " ") + .version(majorVersion + "." + minorVersion).build(); + } + + @Override + public void addResourceHandlers(ResourceHandlerRegistry registry) { + registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/"); + + registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/"); + } +} diff --git a/crm-microservice/src/main/java/com/example/crm/controller/CustomerCommandRestController.java b/crm-microservice/src/main/java/com/example/crm/controller/CustomerCommandRestController.java new file mode 100644 index 0000000..d466337 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/controller/CustomerCommandRestController.java @@ -0,0 +1,47 @@ +package com.example.crm.controller; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.CrossOrigin; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.PatchMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.annotation.RequestScope; + +import com.example.crm.document.Customer; +import com.example.crm.service.CustomerCommandService; + +@RestController +@RequestScope +@RequestMapping("customers") +@CrossOrigin +public class CustomerCommandRestController { + @Autowired + private CustomerCommandService customerService; + + @PostMapping + public Customer createCustomer(@RequestBody Customer customer) { + return customerService.createCustomer(customer); + } + + @PutMapping("{identity}") + public Customer updateCustomer(@PathVariable String identity, @RequestBody Customer customer) { + return customerService.updateCustomerByIdentity(identity, customer); + } + + @PatchMapping("{identity}") + public Customer patchCustomer(@PathVariable String identity, @RequestBody Map request) { + return customerService.updateCustomerByIdentity(identity, request); + } + + @DeleteMapping("{identity}") + public Customer removeCustomerByIdentity(@PathVariable String identity) { + return customerService.deleteCustomerByIdentity(identity); + } +} diff --git a/crm-microservice/src/main/java/com/example/crm/controller/CustomerQueryRestController.java b/crm-microservice/src/main/java/com/example/crm/controller/CustomerQueryRestController.java new file mode 100644 index 0000000..5217724 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/controller/CustomerQueryRestController.java @@ -0,0 +1,35 @@ +package com.example.crm.controller; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.CrossOrigin; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.annotation.RequestScope; + +import com.example.crm.document.Customer; +import com.example.crm.service.CustomerQueryService; + +@RestController +@RequestScope +@RequestMapping("customers") +@CrossOrigin +public class CustomerQueryRestController { + @Autowired + private CustomerQueryService customerService; + + @GetMapping(params = { "pagesize", "pageno" }) + public List getAllCustomers(@RequestParam int pagesize, @RequestParam int pageno) { + return customerService.findAllCustomers(pagesize, pageno); + } + + @GetMapping("{identity}") + public Customer getCustomerByIdentity(@PathVariable String identity) { + return customerService.findCustomerByIdentity(identity); + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/document/Customer.java b/crm-microservice/src/main/java/com/example/crm/document/Customer.java new file mode 100644 index 0000000..f45d5f7 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/document/Customer.java @@ -0,0 +1,128 @@ +package com.example.crm.document; + +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +@Document(collection = "customers") +public class Customer { + @Id + private String identity; + private String fullname; + private String homeAddress; + private String businessAddress; + private String email; + private String sms; + private int birthDay; + private String photo; + + public Customer() { + } + + public Customer(String identity, String fullname, String homeAddress, String businessAddress, String email, + String sms, int birthDay) { + this.identity = identity; + this.fullname = fullname; + this.homeAddress = homeAddress; + this.businessAddress = businessAddress; + this.email = email; + this.sms = sms; + this.birthDay = birthDay; + } + + public String getIdentity() { + return identity; + } + + public void setIdentity(String identity) { + this.identity = identity; + } + + public String getFullname() { + return fullname; + } + + public void setFullname(String fullname) { + this.fullname = fullname; + } + + public String getHomeAddress() { + return homeAddress; + } + + public void setHomeAddress(String homeAddress) { + this.homeAddress = homeAddress; + } + + public String getBusinessAddress() { + return businessAddress; + } + + public void setBusinessAddress(String businessAddress) { + this.businessAddress = businessAddress; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getSms() { + return sms; + } + + public void setSms(String sms) { + this.sms = sms; + } + + public int getBirthDay() { + return birthDay; + } + + public void setBirthDay(int birthDay) { + this.birthDay = birthDay; + } + + public String getPhoto() { + return photo; + } + + public void setPhoto(String photo) { + this.photo = photo; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((identity == null) ? 0 : identity.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Customer other = (Customer) obj; + if (identity == null) { + if (other.identity != null) + return false; + } else if (!identity.equals(other.identity)) + return false; + return true; + } + + @Override + public String toString() { + return "Customer [identity=" + identity + ", fullname=" + fullname + ", homeAddress=" + homeAddress + + ", businessAddress=" + businessAddress + ", email=" + email + ", sms=" + sms + ", birthDay=" + + birthDay + "]"; + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/events/CustomerBaseEvent.java b/crm-microservice/src/main/java/com/example/crm/events/CustomerBaseEvent.java new file mode 100644 index 0000000..3769e64 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/events/CustomerBaseEvent.java @@ -0,0 +1,64 @@ +package com.example.crm.events; + +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +@Document(collection = "events") +public class CustomerBaseEvent { + @Id + private String eventId; + private long conversationId; + private long sequenceId; + private String sourceId; + private String identity; + + public CustomerBaseEvent() { + } + + public String getEventId() { + return eventId; + } + + public void setEventId(String eventId) { + this.eventId = eventId; + } + + public long getConversationId() { + return conversationId; + } + + public void setConversationId(long conversationId) { + this.conversationId = conversationId; + } + + public long getSequenceId() { + return sequenceId; + } + + public void setSequenceId(long sequenceId) { + this.sequenceId = sequenceId; + } + + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + public String getIdentity() { + return identity; + } + + public void setIdentity(String identity) { + this.identity = identity; + } + + @Override + public String toString() { + return "CustomerBaseEvent [eventId=" + eventId + ", conversationId=" + conversationId + ", sequenceId=" + + sequenceId + ", sourceId=" + sourceId + ", identity=" + identity + "]"; + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/events/CustomerCreatedEvent.java b/crm-microservice/src/main/java/com/example/crm/events/CustomerCreatedEvent.java new file mode 100644 index 0000000..b89edc2 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/events/CustomerCreatedEvent.java @@ -0,0 +1,21 @@ +package com.example.crm.events; + +import com.example.crm.document.Customer; + +public class CustomerCreatedEvent extends CustomerBaseEvent { + private Customer customer; + + public Customer getCustomer() { + return customer; + } + + public void setCustomer(Customer customer) { + this.customer = customer; + } + + @Override + public String toString() { + return "CustomerCreatedEvent [customer=" + customer + "]"; + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/events/CustomerEmailChangedEvent.java b/crm-microservice/src/main/java/com/example/crm/events/CustomerEmailChangedEvent.java new file mode 100644 index 0000000..4a26ef1 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/events/CustomerEmailChangedEvent.java @@ -0,0 +1,19 @@ +package com.example.crm.events; + +public class CustomerEmailChangedEvent extends CustomerBaseEvent { + private String email; + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + @Override + public String toString() { + return "CustomerEmailChangedEvent [email=" + email + "]"; + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/events/CustomerPhotoChangedEvent.java b/crm-microservice/src/main/java/com/example/crm/events/CustomerPhotoChangedEvent.java new file mode 100644 index 0000000..e862d20 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/events/CustomerPhotoChangedEvent.java @@ -0,0 +1,19 @@ +package com.example.crm.events; + +public class CustomerPhotoChangedEvent extends CustomerBaseEvent { + private String photo; + + public String getPhoto() { + return photo; + } + + public void setPhoto(String photo) { + this.photo = photo; + } + + @Override + public String toString() { + return "CustomerPhotoChangedEvent []"; + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/repository/BaseEventRepository.java b/crm-microservice/src/main/java/com/example/crm/repository/BaseEventRepository.java new file mode 100644 index 0000000..cca2a16 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/repository/BaseEventRepository.java @@ -0,0 +1,13 @@ +package com.example.crm.repository; + +import java.util.List; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import com.example.crm.events.CustomerBaseEvent; + +public interface BaseEventRepository extends MongoRepository { + + List findAllByIdentity(String identity); + +} diff --git a/crm-microservice/src/main/java/com/example/crm/repository/CustomerRepository.java b/crm-microservice/src/main/java/com/example/crm/repository/CustomerRepository.java new file mode 100644 index 0000000..3e33e41 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/repository/CustomerRepository.java @@ -0,0 +1,9 @@ +package com.example.crm.repository; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import com.example.crm.document.Customer; + +public interface CustomerRepository extends MongoRepository{ + +} diff --git a/crm-microservice/src/main/java/com/example/crm/service/CustomerCommandService.java b/crm-microservice/src/main/java/com/example/crm/service/CustomerCommandService.java new file mode 100644 index 0000000..97d0ac9 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/service/CustomerCommandService.java @@ -0,0 +1,17 @@ +package com.example.crm.service; + +import java.util.Map; + +import com.example.crm.document.Customer; + +public interface CustomerCommandService { + + Customer createCustomer(Customer customer); + + Customer updateCustomerByIdentity(String identity, Customer customer); + + Customer updateCustomerByIdentity(String identity, Map request); + + Customer deleteCustomerByIdentity(String identity); + +} diff --git a/crm-microservice/src/main/java/com/example/crm/service/CustomerQueryService.java b/crm-microservice/src/main/java/com/example/crm/service/CustomerQueryService.java new file mode 100644 index 0000000..1d3ccea --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/service/CustomerQueryService.java @@ -0,0 +1,13 @@ +package com.example.crm.service; + +import java.util.List; + +import com.example.crm.document.Customer; + +public interface CustomerQueryService { + + List findAllCustomers(int pagesize, int pageno); + + Customer findCustomerByIdentity(String identity); + +} diff --git a/crm-microservice/src/main/java/com/example/crm/service/CustomerService.java b/crm-microservice/src/main/java/com/example/crm/service/CustomerService.java new file mode 100644 index 0000000..935b069 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/service/CustomerService.java @@ -0,0 +1,22 @@ +package com.example.crm.service; + +import java.util.List; +import java.util.Map; + +import com.example.crm.document.Customer; + +public interface CustomerService { + + List findAllCustomers(int pagesize, int pageno); + + Customer findCustomerByIdentity(String identity); + + Customer createCustomer(Customer customer); + + Customer updateCustomerByIdentity(String identity, Customer customer); + + Customer updateCustomerByIdentity(String identity, Map request); + + Customer deleteCustomerByIdentity(String identity); + +} diff --git a/crm-microservice/src/main/java/com/example/crm/service/business/StandardCustomerCommandService.java b/crm-microservice/src/main/java/com/example/crm/service/business/StandardCustomerCommandService.java new file mode 100644 index 0000000..f51c300 --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/service/business/StandardCustomerCommandService.java @@ -0,0 +1,96 @@ +package com.example.crm.service.business; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.UUID; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import com.example.crm.document.Customer; +import com.example.crm.events.CustomerBaseEvent; +import com.example.crm.events.CustomerCreatedEvent; +import com.example.crm.events.CustomerPhotoChangedEvent; +import com.example.crm.repository.BaseEventRepository; +import com.example.crm.repository.CustomerRepository; +import com.example.crm.service.CustomerCommandService; + +@Service +public class StandardCustomerCommandService implements CustomerCommandService { + + @Autowired + private CustomerRepository customerRepository; + + @Autowired + private BaseEventRepository eventRepository; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Override + public Customer createCustomer(Customer customer) { + String identity = customer.getIdentity(); + var customerEvents = eventRepository.findAllByIdentity(identity); + if (customerEvents.size()>0) { + throw new IllegalArgumentException("Customer already exists"); + } + customerRepository.save(customer); + CustomerCreatedEvent customerEvent = new CustomerCreatedEvent(); + customerEvent.setIdentity(identity); + customerEvent.setEventId(UUID.randomUUID().toString()); + customerEvent.setCustomer(customer); + eventRepository.save(customerEvent); + kafkaTemplate.send("customer-events", customerEvent); + return customer; + } + + @Override + public Customer updateCustomerByIdentity(String identity, Customer customer) { + var managedCustomer = customerRepository.findById(identity); + if (managedCustomer.isPresent()) { + return customerRepository.save(customer); + } + throw new IllegalArgumentException("Customer does not exist"); + } + + @Override + public Customer updateCustomerByIdentity(String identity, Map request) { + var managedCustomer = customerRepository.findById(identity); + if (managedCustomer.isPresent()) { + var customer = managedCustomer.get(); + request.forEach((field, value) -> { + Field declaredField; + try { + declaredField = Customer.class.getDeclaredField(field); + if (field.equals("photo")) { + CustomerPhotoChangedEvent event = new CustomerPhotoChangedEvent(); + event.setIdentity(identity); + event.setEventId(UUID.randomUUID().toString()); + event.setPhoto(value.toString()); + eventRepository.save(event); + } + declaredField.setAccessible(true); + declaredField.set(customer, value); + declaredField.setAccessible(false); + } catch (Exception e) { + System.err.println(e.getMessage()); + } + }); + return customerRepository.save(customer); + } + throw new IllegalArgumentException("Customer does not exist"); + } + + @Override + public Customer deleteCustomerByIdentity(String identity) { + var customer = customerRepository.findById(identity); + if (customer.isPresent()) { + Customer removedCustomer = customer.get(); + customerRepository.delete(removedCustomer); + return removedCustomer; + } + throw new IllegalArgumentException("Customer does not exist"); + } + +} diff --git a/crm-microservice/src/main/java/com/example/crm/service/business/StandardCustomerQueryService.java b/crm-microservice/src/main/java/com/example/crm/service/business/StandardCustomerQueryService.java new file mode 100644 index 0000000..30719ef --- /dev/null +++ b/crm-microservice/src/main/java/com/example/crm/service/business/StandardCustomerQueryService.java @@ -0,0 +1,37 @@ +package com.example.crm.service.business; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.example.crm.document.Customer; +import com.example.crm.events.CustomerCreatedEvent; +import com.example.crm.repository.BaseEventRepository; +import com.example.crm.repository.CustomerRepository; +import com.example.crm.service.CustomerQueryService; + +@Service +public class StandardCustomerQueryService implements CustomerQueryService { + + @Autowired + private CustomerRepository customerRepository; + @Autowired + private BaseEventRepository eventRepository; + + @Override + public List findAllCustomers(int pagesize, int pageno) { + return customerRepository.findAll(); + } + + @Override + public Customer findCustomerByIdentity(String identity) { + var events = eventRepository.findAllByIdentity(identity); + if (events.isEmpty()) + throw new IllegalArgumentException("Cannot find customer"); + Customer customer = CustomerCreatedEvent.class.cast(events.get(0)).getCustomer(); + // TODO: use the remainings events to reconstruct final customer state + return customer; + } + +} diff --git a/crm-microservice/src/main/resources/application.properties b/crm-microservice/src/main/resources/application.properties new file mode 100644 index 0000000..bf9d7da --- /dev/null +++ b/crm-microservice/src/main/resources/application.properties @@ -0,0 +1,23 @@ +# REST API URL BASE +# http(s)://localhost:8101/crm/api/v1 +server.address=localhost +server.port=8100 +server.servlet.context-path=/crm +spring.mvc.servlet.path=/api/v1 + +major.version=1 +minor.version=0 +timestamp=1581420934 +#http://localhost:8100/crm/api/v1/swagger-ui.html + +#mongodb configuration +spring.data.mongodb.uri=mongodb://localhost:27017/crm + +# kafka configuration +spring.kafka.consumer.group-id=hr +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.consumer.properties.spring.json.trusted.packages=* \ No newline at end of file diff --git a/hr-domain/src/com/example/hr/application/business/SimpleEmployeeApplication.java b/hr-domain/src/com/example/hr/application/business/SimpleEmployeeApplication.java index e04c687..12916fb 100644 --- a/hr-domain/src/com/example/hr/application/business/SimpleEmployeeApplication.java +++ b/hr-domain/src/com/example/hr/application/business/SimpleEmployeeApplication.java @@ -17,8 +17,8 @@ * */ public class SimpleEmployeeApplication implements EmployeeApplication { - private EmployeeRepository employeeRepository; - private EventPushlisher eventPushlisher; + private EmployeeRepository employeeRepository; // SPI #1 + private EventPushlisher eventPushlisher; // SPI #1 public void setEmployeeRepository(EmployeeRepository employeeRepository) { this.employeeRepository = employeeRepository; diff --git a/identity-card-microservice/src/main/java/com/example/icard/service/SimpleKafkaConsumer.java b/identity-card-microservice/src/main/java/com/example/icard/service/SimpleKafkaConsumer.java new file mode 100644 index 0000000..b4b60a9 --- /dev/null +++ b/identity-card-microservice/src/main/java/com/example/icard/service/SimpleKafkaConsumer.java @@ -0,0 +1,37 @@ +package com.example.icard.service; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import javax.annotation.PostConstruct; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.stereotype.Service; + +import com.example.hr.events.BusinessEvent; + +@Service +public class SimpleKafkaConsumer { + private KafkaConsumer kafkaConsumer; + + //TODO: @PostConstruct + public void init() { + Properties configs = new Properties(); + // TODO: set kafka properties + this.kafkaConsumer = new KafkaConsumer<>(configs); + this.kafkaConsumer.subscribe(List.of("employees")); + new Thread(this::pollMessages).start(); + } + + public void pollMessages() { + while(true) { + this.kafkaConsumer.poll(Duration.ofMillis(100)).forEach( event -> { + CompletableFuture.runAsync( () -> { + System.out.println(event); + }); + }); + } + } +}