Skip to content

Commit

Permalink
add consumer properties
Browse files Browse the repository at this point in the history
  • Loading branch information
deepcloudlabs committed Jun 29, 2020
1 parent 92b8694 commit 8fec39b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -35,11 +34,6 @@ public Customer createCustomer(@RequestBody Customer customer) {
return customerService.createCustomer(customer);
}

@PutMapping("{identity}")
public Customer updateCustomer(@PathVariable String identity, @RequestBody Customer customer) {
return customerService.updateCustomerByIdentity(identity, customer);
}

@PatchMapping("{identity}")
public Customer patchCustomer(@PathVariable String identity, @RequestBody Map<String, Object> request) {
return customerService.updateCustomerByIdentity(identity, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ public interface CustomerCommandService {

Customer createCustomer(Customer customer);

Customer updateCustomerByIdentity(String identity, Customer customer);

Customer updateCustomerByIdentity(String identity, Map<String, Object> request);

Customer deleteCustomerByIdentity(String identity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ public Customer createCustomer(Customer customer) {
return customer;
}

@Override
public Customer updateCustomerByIdentity(String identity, Customer customer) {
var managedCustomer = customerRepository.findById(identity);
if (managedCustomer.isPresent()) {
return customerRepository.save(customer);
}
throw new IllegalArgumentException("Customer does not exist");
}

@Override
public Customer updateCustomerByIdentity(String identity, Map<String, Object> request) {
var managedCustomer = customerRepository.findById(identity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,38 @@
import javax.annotation.PostConstruct;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.stereotype.Service;

import com.example.hr.events.BusinessEvent;

@Service
public class SimpleKafkaConsumer {
private KafkaConsumer<String, BusinessEvent> kafkaConsumer;
//TODO: @PostConstruct

@PostConstruct
public void init() {
Properties configs = new Properties();
// TODO: set kafka properties
this.kafkaConsumer = new KafkaConsumer<>(configs);
var consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "localhost:9092");
consumerProperties.put("group.id", "hr");
consumerProperties.put("zookeeper.session.timeout.ms", "6000");
consumerProperties.put("zookeeper.sync.time.ms", "2000");
consumerProperties.put("auto.commit.enable", "false");
consumerProperties.put("auto.commit.interval.ms", "1000");
consumerProperties.put("consumer.timeout.ms", "-1");
consumerProperties.put("max.poll.records", "1");
consumerProperties.put("value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
this.kafkaConsumer = new KafkaConsumer<>(consumerProperties);
this.kafkaConsumer.subscribe(List.of("employees"));
new Thread(this::pollMessages).start();
}

public void pollMessages() {
while(true) {
this.kafkaConsumer.poll(Duration.ofMillis(100)).forEach( event -> {
CompletableFuture.runAsync( () -> {
while (true) {
this.kafkaConsumer.poll(Duration.ofMillis(100)).forEach(event -> {
CompletableFuture.runAsync(() -> {
System.out.println(event);
});
});
Expand Down

0 comments on commit 8fec39b

Please sign in to comment.