SpringBoot如何接入kafka的例子

avatar
作者
猴君
阅读量:0

一,kafka简介

Kafka是一个分布式流处理平台,最初由LinkedIn公司开发,并基于Scala语言和ZooKeeper进行构建。它已经捐献给Apache基金会,成为开源项目。Kafka的主要特点包括高吞吐量、低延迟、可扩展性、持久性和高性能等。

Kafka的基本原理
Kafka通过将数据流发布到主题(Topic)上,然后订阅该主题的消费者可以实时读取和处理这些数据。每个主题可以有多个生产者和消费者,并且可以进行分区管理以提高性能和可靠性。
核心组件包括生产者(Producer)代理(Broker)消费者(Consumer),其中生产者负责发布消息到指定的主题,消费者则从主题中订阅并消费消息。

Kafka的应用场景
(1)日志收集与分析:这是Kafka最初的设计目标之一,广泛用于各种服务的日志收集和分析。
(2)消息系统:Kafka能够解耦生产者和消费者,缓存消息,从而提高系统的可靠性和可用性。
它常用于不同系统间的数据交流和传递,如订单系统、支付系统和库存系统等。
(3)网站活动追踪:Kafka经常被用来记录Web用户或App用户的活动数据,帮助进行用户行为分析和个性化推荐。
(4)指标监控与报警:Kafka常用于传输监控指标数据,实现系统的实时监控和报警功能。
(5)流处理:结合Spark、Flink等流处理框架,Kafka可以构建复杂的实时数据处理管道,支持大规模数据的实时处理。
(6)事件驱动架构:Kafka支持事件驱动的架构设计,通过将业务逻辑与事件的发生绑定,实现事件驱动的自动化处理
(7)数据变更捕获(CDC) :Kafka可以用于捕获数据库的数据变更,实现数据同步和迁移。

Kafka的优势
(1)高吞吐量:Kafka每秒可以处理几十万条消息,延迟最低只有几毫秒。
(2)低延迟:顺序写入和零拷贝技术使得Kafka的延迟控制在毫秒级。
(3)可扩展性:无需停机即可扩展节点及节点上线。
(4)持久性:数据存储到磁盘上,保障数据的安全性和可靠性。
(5)高性能:Kafka具有非常稳定的性能,即使在TB级的数据量下也能保持高效运行
 

二,kafka的下载和安装

(1)官网下载 https://kafka.apache.org/downloads.html 或者在Linux下使用wget命令下载kafka;

(2)安装kafka,以Linux为例,下载的kafka资源压缩包上传到Linux服务器的某目录kafka下,并解压:
解压到当前目录的kafka下:tar -zxvf kafka_2.12-3.8.0.tgz -C ./kafka
授权当前用户所有权限:sudo chown -R xLoginUser ./kafka

(3)修改配置文件:
在kafka的安装目录下修改配置,常见的配置文件包括server.properties 、zookeeper.properties 等。需要特别注意的是,如果有多台机器部署Kafka集群,每台机器都需要独立修改 broker.id 和 listeners 等参数。

(4)添加环境变量:
将Kafka的bin目录添加到系统的PATH环境变量中,以便在任何地方都能运行Kafka的脚本
export PATH=$PATH:/opt/kafka/bin
source ~/.bashrc

(5)启动ZooKeeper服务:
Kafka依赖于ZooKeeper来管理集群,因此需要先启动ZooKeeper服务。可以在Kafka的
bin目录下运行以下命令启动ZooKeeper:
/opt/kafka/bin/zookeeper-server-start.sh  /opt/kafka/config/zookeeper.properties 

(6)启动Kafka服务:
启动ZooKeeper之后,就可以启动Kafka服务了。在Kafka的bin目录下运行以下命令启动Kafka:
/opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties 

(7)测试Kafka是否正常运行:
# 创建生产者
/opt/kafka/bin/kafka-console producer -- topic=your-topic-name -- property的关键值对

# 创建消费者
/opt/kafka/bin/kafka-console consumer -- topic=your-topic-name -- from beginning -- property的关键值对

三,SpringBoot如何接入kafka的例子

Spring Boot 接入 Kafka 的方法主要是通过 Spring Kafka 项目。以下是一个基本的例子
1,添加依赖到你的 pom.xml 文件中

<dependencies>     <dependency>         <groupId>org.springframework.kafka</groupId>         <artifactId>spring-kafka</artifactId>     </dependency>     <dependency>         <groupId>org.apache.kafka</groupId>         <artifactId>kafka-clients</artifactId>     </dependency> 	 	<!-- 上面是 kafka 的依赖,下面是SpringBoot的依赖--> 	<dependency> 		<groupId>org.springframework.boot</groupId> 		<artifactId>spring-boot-starter-web</artifactId> 		<version>3.2.8</version> 	</dependency> </dependencies>

2,在 application.properties 或 application.yml 中配置 Kafka 相关属性

# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3,创建一个 Kafka 生产者,使用 KafkaTemplate 对象把消息发送到指定的 topic 中:

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;   @Service public class KafkaProducer {       @Autowired     private KafkaTemplate<String, String> kafkaTemplate;       public void sendMessage(String topic, String message) {         kafkaTemplate.send(topic, message);     } } 

4,创建一个 Kafka 消费者,使用注解 @KafkaListener 监听指定的 topic 和 groupId :

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;   @Component public class KafkaConsumer {       @KafkaListener(topics = "myTopic", groupId = "myGroup")     public void listen(String message) {         System.out.println("Received message in group myGroup: " + message);     } } 

5,在主类或配置类上添加 @EnableKafka 注解启用 Kafka:

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka;   @SpringBootApplication @EnableKafka public class KafkaApplication {       public static void main(String[] args) {         SpringApplication.run(KafkaApplication.class, args);     } } 

这里介绍和演示SpringBoot如何接入kafka的例子,欢迎拍砖讨论...

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!