Skip to content

Commit

Permalink
Merge pull request #3 from phantasmicmeans/feature-2
Browse files Browse the repository at this point in the history
[#2] support dynamic retry topic and retry consumer
  • Loading branch information
phantasmicmeans authored Feb 3, 2021
2 parents c4c4146 + cfc639d commit 888c5cd
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 37 deletions.
80 changes: 46 additions & 34 deletions src/main/java/com/boot/kafa/consumer/dlq/CustomMessageListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,24 @@
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.backoff.FixedBackOff;

import com.boot.kafa.consumer.dlq.model.CustomMessage;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -38,24 +31,15 @@
*/
@Slf4j
@Service
@RequiredArgsConstructor
@Profile(value = "original")
public class CustomMessageListener {

@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;

public ProducerFactory<String, Object> dlqProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(dlqProducerFactory());
}
@Value(value = "${dynamic-kafka.dlq}")
private String dlqTopic;

public Map<String, Object> jsonDeserializeConsumerConfigs(String groupId) {
Map<String, Object> props = new HashMap<>();
Expand All @@ -68,29 +52,61 @@ public Map<String, Object> jsonDeserializeConsumerConfigs(String groupId) {

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CustomMessage> customMessageKafkaListenerContainerFactory(
KafkaOperations<String, Object> operations,
@Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
KafkaOperations<String, Object> operations) {
ConcurrentKafkaListenerContainerFactory<String, CustomMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(jsonDeserializeConsumerConfigs("custom-message-processor"),
new StringDeserializer(),
new JsonDeserializer<>(CustomMessage.class)
));

factory.setRetryTemplate(retryTemplate);
// dlq
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(operations, (cr, e) -> new TopicPartition("history-5m-retry", cr.partition())),
new FixedBackOff(1000L, 1)));
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CustomMessage> retry5mKafkaListenerContainerFactory(
KafkaOperations<String, Object> operations) {
ConcurrentKafkaListenerContainerFactory<String, CustomMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(jsonDeserializeConsumerConfigs(""),
new StringDeserializer(),
new JsonDeserializer<>(CustomMessage.class)
));

factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(operations, (cr, e) -> new TopicPartition("history-10m-retry", cr.partition())),
new FixedBackOff(1000L, 1)));
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CustomMessage> retry10mKafkaListenerContainerFactory(
KafkaOperations<String, Object> operations) {
ConcurrentKafkaListenerContainerFactory<String, CustomMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(jsonDeserializeConsumerConfigs(""),
new StringDeserializer(),
new JsonDeserializer<>(CustomMessage.class)
));

factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(operations, (cr, e) -> new TopicPartition("custom-message-dlq", cr.partition())),
new FixedBackOff(1000L, 1)));
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CustomMessage> dltContainerFactory(KafkaTemplate<String, Object> template) {
public ConcurrentKafkaListenerContainerFactory<String, CustomMessage> retry20mKafkaListenerContainerFactory(
KafkaOperations<String, Object> operations) {
ConcurrentKafkaListenerContainerFactory<String, CustomMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(jsonDeserializeConsumerConfigs("content-message-processor-dlt"),
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(jsonDeserializeConsumerConfigs(""),
new StringDeserializer(),
new JsonDeserializer<>(CustomMessage.class)
));

factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(operations, (cr, e) -> new TopicPartition(dlqTopic, cr.partition())),
new FixedBackOff(1000L, 1)));
return factory;
}

Expand All @@ -100,7 +116,7 @@ public ConcurrentKafkaListenerContainerFactory<String, CustomMessage> dltContain
containerFactory = "customMessageKafkaListenerContainerFactory"
)
public void listen(CustomMessage value) {
log.info("listen() " + value.getData());
log.info("\nlisten() " + value.getData() + "\n");

/**
* do something
Expand All @@ -109,12 +125,8 @@ public void listen(CustomMessage value) {
throw new RuntimeException(); // exception -> to dead-letter-topic
}

@KafkaListener(
id = "custom-message-processor-dlt",
topics = "custom-message-dlq",
containerFactory = "dltContainerFactory"
)
public void dltListen(CustomMessage message) {
log.info("dltListen() " + message.getData());
@KafkaListener
public void listener5m(CustomMessage message) {
log.info("\nlistener5m() " + message.getData() + "\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

import com.boot.kafa.consumer.dlq.dynamic.config.KafkaDynamicEndpointBindingConfigurationProperties;

@EnableConfigurationProperties(KafkaDynamicEndpointBindingConfigurationProperties.class)
@SpringBootApplication
public class SpringBootKafkaConsumerDlqApplication {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.boot.kafa.consumer.dlq.dynamic;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.stereotype.Component;

import com.boot.kafa.consumer.dlq.dynamic.config.KafkaDynamicEndpointBindingConfigurationProperties;
import com.boot.kafa.consumer.dlq.dynamic.config.KafkaDynamicRetryProperties;
import com.boot.kafa.consumer.dlq.utils.JsonUtils;

import lombok.Getter;

@Component
@ConditionalOnProperty(name = "dynamic-kafka.enable", havingValue = "true")
public class KafkaDynamicEndpointProcessor extends KafkaListenerAnnotationBeanPostProcessor<String, Object> implements InitializingBean {
private final ApplicationContext context;
private final KafkaDynamicEndpointBindingConfigurationProperties bindingProperties;

public KafkaDynamicEndpointProcessor(ApplicationContext context, KafkaDynamicEndpointBindingConfigurationProperties bindingProperties) {
this.context = context;
this.bindingProperties = bindingProperties;
}

@Override
public void afterPropertiesSet() {
Class<?> defaultClazzName = bindingProperties.getDefaultProperties().getClassPath();
String methodName = bindingProperties.getDefaultProperties().getMethodName();

Tuple<Method, KafkaListener> listenerTuple = findListenerInfo(defaultClazzName, methodName);
run(listenerTuple);
}

public void run(Tuple<Method, KafkaListener> listenerTuple) {
Object handler = Proxy.getInvocationHandler(listenerTuple.getV());
Field field;

try {
field = handler.getClass().getDeclaredField("memberValues");
field.setAccessible(true);
} catch (NoSuchFieldException e) {
throw new IllegalStateException(e);
}

Class<?> defaultClazzType = bindingProperties.getDefaultProperties().getClassPath();
assert defaultClazzType != null;
Object bean = context.getBean(defaultClazzType);

registerMainListener(listenerTuple, bean);
Map<String, Object> properties = bindingProperties.getRetry();

for (String topic : properties.keySet()) {
KafkaDynamicRetryProperties retryProperties = JsonUtils.convert(properties.get(topic), KafkaDynamicRetryProperties.class);
Method method = listenerTuple.getK();
KafkaListener kafkaListener = listenerTuple.getV();

String methodName = retryProperties.getMethodName();
Class<?> clazzType = retryProperties.getClassPath();

if (methodName != null && !methodName.isEmpty()) {
clazzType = (clazzType == null) ? defaultClazzType : clazzType;
listenerTuple = findListenerInfo(clazzType, methodName);

method = listenerTuple.getK();
kafkaListener = listenerTuple.getV();
handler = Proxy.getInvocationHandler(kafkaListener);
}

Map<String, Object> attributes;
try {
attributes = (Map<String, Object>) field.get(handler);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}

if (!attributes.containsKey("topics")) {
throw new NoSuchElementException("KafkaListener not contains 'topics' attributes");
}

String[] newTopics = topics(topic);

attributes.put("topics", newTopics);
attributes.put("id", retryProperties.getId());
attributes.put("groupId", retryProperties.getGroupId());
attributes.put("containerFactory", retryProperties.getContainerFactory());

super.processKafkaListener(kafkaListener, method, bean, retryProperties.getId());
}
}

private Tuple<Method, KafkaListener> findListenerInfo(Class<?> clazz, String methodName) {
Map<Method, KafkaListener> annotatedMethod = MethodIntrospector.selectMethods(clazz,
(MethodIntrospector.MetadataLookup<KafkaListener>) method -> AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class));

Set<Method> keys = annotatedMethod.keySet();

Method listenerMethod = keys.stream()
.filter(method -> method.getName().equals(methodName))
.findAny()
.orElseThrow(() -> new RuntimeException(String.format("No Method named {0}.", methodName)));

return Tuple.of(listenerMethod, annotatedMethod.get(listenerMethod));
}

public void registerMainListener(Tuple<Method, KafkaListener> tuple, Object bean) {
Map<String, Object> parentAttributes = AnnotatedElementUtils.findMergedAnnotationAttributes(tuple.getK(), KafkaListener.class, true, true);
assert parentAttributes != null;
super.processKafkaListener(tuple.getV(), tuple.getK(), bean, (String) parentAttributes.get("id"));
}

private String[] topics(String...topic) {
return topic;
}

@Getter
static class Tuple<K, V> {
private final K k;
private final V v;

private Tuple(K k, V v) {
this.k = k;
this.v = v;
}

static <K, V> Tuple<K, V> of(K k, V v) {
return new Tuple<>(k, v);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.boot.kafa.consumer.dlq.dynamic.config;

import java.util.Map;

import org.springframework.boot.context.properties.ConfigurationProperties;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@ConfigurationProperties(prefix = "dynamic-kafka")
public class KafkaDynamicEndpointBindingConfigurationProperties {
@JsonProperty("default")
private Default defaultProperties;
private Map<String, Object> retry;
private String dlq;

@Getter
@Setter
public static class Default {
private Class<?> classPath;
private String methodName;
}

@JsonIgnore
public Default getDefaultProperties() {
return defaultProperties;
}

public Default getDefault() {
return this.defaultProperties;
}

public void setDefault(Default aDefault) {
this.defaultProperties = aDefault;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.boot.kafa.consumer.dlq.dynamic.config;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KafkaDynamicRetryProperties {
private String interval;
private String id;
private String containerFactory;
private String groupId;
private Class<?> classPath;
private String methodName;
}
Loading

0 comments on commit 888c5cd

Please sign in to comment.