Kafka接收消息

avatar
作者
筋斗云
阅读量:0

文章目录

// 采用监听得方式接收 @Payload标记消息体内容. @KafkaListener(topics = {"test"},groupId = "hello") public void onEvent(@Payload String event,                    @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,                    @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition){    System.out.println("读取到了时间消息: " + event); } 

Acknowledgment

开启手动确认模式;

listener: 	ack-mode: manual 
// 采用监听得方式接收 @Payload标记消息体内容. @KafkaListener(topics = {"test"},groupId = "hello") public void onEvent(@Payload String event,                    @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,                    @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition,                    ConsumerRecord<String,String> record,                    Acknowledgment ack){    ack.acknowledge(); // 手动确认,告诉kafka服务器该消息我已经收到了.     System.out.println("读取到了时间消息: " + event); } 

读消息指定分区

@KafkaListener(groupId = "hello",            topicPartitions = {                @TopicPartition(                        topic = "${kafka.topic.test}",                        partitions = {"0","1","2"}, // 0 1 2分区不限制偏移量                        partitionOffsets = { // 3 分区只读 3偏移量之后的; 4分区只读 4偏移量之后的                                @PartitionOffset(partition = "3",initialOffset = "3"),                                @PartitionOffset(partition = "4",initialOffset = "3")                        })            } ) 

批量消费

修改配置

kafka:     bootstrap-servers: 192.168.225.128:9092     listener:       type: batch     # 每次读取20条     consumer:       max-poll-records: 20 

消费者端接收一个List即可

@KafkaListener(topics = {"hi"},groupId = "batchGroup2") public void onEvent3(List<ConsumerRecord<String,String>> records){     System.out.println(records.size()); } 

消息拦截

在这里插入图片描述

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!