1.前言:
1.1关于生产者与消费者的详细介绍请查看另一篇文章:
使用JavaApi实现模拟Kafka的消息生产者与发送者http://t.csdnimg.cn/ukNSU
1.2 本文使用 KafkaTemplate与 @KafkaListener实现生产者与消费者功能:
Kafka 是一个流行的分布式流处理平台,广泛用于构建实时数据管道和流应用程序。在 Java 应用程序中,Spring Framework 提供了对 Kafka 的集成支持,通过
spring-kafka
模块实现。KafkaTemplate
和@KafkaListener
是 Spring Kafka 中的两个重要组件,它们分别用于发送消息和接收消息。
所需依赖:
!-- spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.6.0</version> </dependency> <!-- kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency>
2. KafkaTemplate生产者发送消息到kafka
KafkaTemplate 是Spring Kafka提供的一个高级抽象,用于简化Kafka消息的生产。它封装了KafkaProducer生产者客户端的复杂性,并提供了一系列发送消息的方法。
2.1 主要特性:
- 线程安全:KafkaTemplate 是线程安全的,可以在多个线程中共享使用。
- 消息发送:支持同步和异步发送消息。
- 消息类型:支持发送键值对(Key-Value)消息和仅值消息。
- 事务管理:支持在事务中发送消息。
2.2 常用方法:
send(String topic, V data)
:发送一个仅值消息到指定主题。send(String topic, K key, V data)
:发送一个键值对消息到指定主题。send(Message<?> message)
:发送一个Spring Kafka Message对象。send(String topic, Integer partition, K key, V data)
:发送消息到指定主题的指定分区。
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }
3.@KafkaListener消费者从kafka中获取消息进行消费
@KafkaListener 是Spring Kafka提供的一个注解,它简化了消息的接收和处理。用于标记方法作为 Kafka 消费者来监听特定的主题。当消息到达时,被标记的方法将会被自动调用来处理这些消息。这使得消息的处理能够以事件驱动的方式进行,无需轮询或显式拉取。
3.1主要特性:
- 消息监听:可以监听一个或多个主题的消息。
- 分区监听:可以指定监听特定分区的消息。
- 消息类型:支持接收键值对(Key-Value)消息和仅值消息。
- 消息转换:支持将接收到的消息转换为具体的对象。
3.2常用属性:
topics
:监听的Kafka主题,可以是多个。topicPattern
:监听的Kafka主题模式,支持正则表达式。groupId
:消费者组ID。containerFactory
:指定KafkaListenerContainerFactory,用于自定义消费者配置。
@KafkaListener(topics = "myTopic", groupId = "myGroup") public void listen(String message) { System.out.println("Received message: " + message); }
在这个例子中,listen
方法将被自动调用,每当 Kafka 消费者从 "myTopic" 主题的 "myGroup" 消费者组接收到消息时。
4. 代码举例
最近有个需求,记录不同系统之间的交互日志,使用拦截器将日志拦截,进行前置与后置处理,最终组装成json发送到kafka中,消费者消费kafka中的数据,最终将日志数据进行入库操作,前端进行展示。
下面只将KafkaTemplate与 @KafkaListener举例出来,其他不过多展示。
4.1 生产者KafkaTemplate
@Slf4j @Tag(name = "运营平台-系统日志管理") @RestController @RequestMapping("/log") public class SystemInteractionLogController { @Autowired private SystemInteractionLogService systemInteractionLogService; @Resource private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/page") @Operation(summary = "日志分页查询") //分组设计 根据messageId分类 public Result<PageData<SystemInteractionLogVO>> page(@RequestBody LogQueryPageDTO dto) { return Result.success(systemInteractionLogService.queryPage(dto)); } @PostMapping("/queryInfoById/{id}") @Operation(summary = "日志详情查询") public Result<SystemInteractionLogInfoVO> queryInfoById(@PathVariable Long id) { Validator.validateNotNull(id, "id不能为空"); return Result.success(systemInteractionLogService.queryInfoById(id)); } @PostMapping ("/testSendLogToKafka") @Operation(summary = "(测试使用)往kafka中发送数据") public void sendMessage(@RequestBody SystemInteractionLog systemInteractionLog) { String topicName = "system-interaction-logs"; if(ObjectUtils.isEmpty(systemInteractionLog)){ log.info("数据为空,不操作"); return; } try { String message = JSONUtil.toJsonStr(systemInteractionLog); log.info("消息为:{}",message); kafkaTemplate.send(topicName, message); }catch (Exception e){ log.error("消息发送失败:{}",e.getMessage()); } } }
4.2 消费者@KafkaListener
/**跨系统交互日志监听 * @author ZhaoShuhao * @data 2024/7/13 9:46 */ @Slf4j @Component public class SystemInteractionLogListener { @Resource private OperateApi operateApi; @KafkaListener(topics="system-interaction-logs",groupId="system-interaction-logs-groupId") public void receiveTask(String message, Acknowledgment ack) { try { log.info("接收到消息:{}", message); if(StringUtils.isNoneBlank(message)){ SystemInteractionLogDto logDto = JSONUtil.toBean(message, SystemInteractionLogDto.class); //保存入库操作 Result<Boolean> result = operateApi.saveLog(logDto); if(!ObjectUtils.isEmpty(result)){ if(result.getCode() == 200){ log.info("消息处理成功:{}", logDto); //只有保存成功后才会确认接收到消息 ack.acknowledge(); }else { log.info("消息保存失败:{}", logDto); } } } }catch (Exception e) { //异常数据不做处理 log.info("消息保存失败,参数为{}", message); log.error(ExceptionUtils.getStackTrace(e)); } } }