springboot使用@KafkaListener监听多个kafka配置

avatar
作者
猴君
阅读量:0

        背景: 使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用

        方案: 可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:

1. 导入依赖

        <dependency> 			<groupId>org.springframework.kafka</groupId> 			<artifactId>spring-kafka</artifactId> 		</dependency>

2. yml配置

kafka:   # 默认消费者配置   default-consumer:     # 自动提交已消费offset     enable-auto-commit: true     # 自动提交间隔时间     auto-commit-interval: 1000     # 消费的超时时间     poll-timeout: 1500     # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量     auto.offset.reset: latest     # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)     session.timeout.ms: 120000     # 消费请求超时时间     request.timeout.ms: 180000   # 1号kafka配置   test1:     bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx     consumer:       group-id: xxx       sasl.mechanism: xxxx       security.protocol: xxxx       sasl.jaas.config: xxxx   # 2号kafka配置   test2:     bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx     consumer:       group-id: xxx       sasl.mechanism: xxxx       security.protocol: xxxx       sasl.jaas.config: xxxx 

3. 容器工厂配置

package com.zhdx.modules.backstage.config;  import com.google.common.collect.Maps;  import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  import java.util.Map;  /**  * kafka监听容器工厂配置  * <p>  * 拓展其他消费者配置只需配置指定的属性和bean即可  */ @EnableKafka @Configuration @RefreshScope public class KafkaListenerContainerFactoryConfig {      /**      *  test1 kafka配置      */     @Value("${kafka.test1.bootstrap-servers}")     private String test1KafkaServerUrls;      @Value("${kafka.test1.consumer.group-id}")     private String test1GroupId;      @Value("${kafka.test1.consumer.sasl.mechanism}")     private String test1SaslMechanism;      @Value("${kafka.test1.consumer.security.protocol}")     private String test1SecurityProtocol;      @Value("${kafka.test1.consumer.sasl.jaas.config}")     private String test1SaslJaasConfig;     /**      *  test2 kafka配置      */     @Value("${kafka.test2.bootstrap-servers}")     private String test2KafkaServerUrls;      @Value("${kafka.test2.consumer.group-id}")     private String test2GroupId;      @Value("${kafka.test2.consumer.sasl.mechanism}")     private String test2SaslMechanism;      @Value("${kafka.test2.consumer.security.protocol}")     private String test2SecurityProtocol;      @Value("${kafka.test2.consumer.sasl.jaas.config}")     private String test2SaslJaasConfig;      /**      * 默认消费者配置      */     @Value("${kafka.default-consumer.enable-auto-commit}")     private boolean enableAutoCommit;      @Value("${kafka.default-consumer.poll-timeout}")     private int pollTimeout;      @Value("${kafka.default-consumer.auto.offset.reset}")     private String autoOffsetReset;      @Value("${kafka.default-consumer.session.timeout.ms}")     private int sessionTimeoutMs;      @Value("${kafka.default-consumer.request.timeout.ms}")     private int requestTimeoutMs;      /**      * test1消费者配置      */     public Map<String, Object> test1ConsumerConfigs() {         Map<String, Object> props = getDefaultConsumerConfigs();         // broker server地址         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls);         // 消费者组         props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId);         // 加密         props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism);         props.put("security.protocol", test1SecurityProtocol);         // 账号密码         props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig);         return props;     }          /**      * test2消费者配置      */     public Map<String, Object> test2ConsumerConfigs() {         Map<String, Object> props = getDefaultConsumerConfigs();         // broker server地址         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls);         // 消费者组         props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId);         // 加密         props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism);         props.put("security.protocol", test2SecurityProtocol);         // 账号密码         props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig);         return props;     }      /**      * 默认消费者配置      */     private Map<String, Object> getDefaultConsumerConfigs() {         Map<String, Object> props = Maps.newHashMap();         // 自动提交(按周期)已消费offset 批量消费下设置false         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);         // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);         // 消费请求超时时间         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);         // 序列化         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);         return props;     }      /**      * 消费者工厂类      */     public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) {         return new DefaultKafkaConsumerFactory<>(consumerConfigs);     }      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory(         Map<String, Object> consumerConfigs) {         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(initConsumerFactory(consumerConfigs));         // 是否开启批量消费         factory.setBatchListener(false);         // 消费的超时时间         factory.getContainerProperties().setPollTimeout(pollTimeout);         return factory;     }      /**      * 创建test1 Kafka 监听容器工厂。      *      * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象      */     @Bean(name = "test1KafkaListenerContainerFactory")     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() {         Map<String, Object> consumerConfigs = this.test1ConsumerConfigs();         return initKafkaListenerContainerFactory(consumerConfigs);     }           /**      * 创建test2 Kafka 监听容器工厂。      *      * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象      */     @Bean(name = "test2KafkaListenerContainerFactory")     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() {         Map<String, Object> consumerConfigs = this.test2ConsumerConfigs();         return initKafkaListenerContainerFactory(consumerConfigs);     } }
4. @KafkaListener使用 
package com.zhdx.modules.backstage.kafka;  import com.alibaba.fastjson.JSON;  import lombok.extern.slf4j.Slf4j;  import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;  /**  * kafka监听器  */ @Slf4j @Component public class test1KafkaListener {     @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")     public void handleHyPm(ConsumerRecord<String, String> record) {         log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value()));     } }
 

        

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!