一,kafka简介
Kafka是一个分布式流处理平台,最初由LinkedIn公司开发,并基于Scala语言和ZooKeeper进行构建。它已经捐献给Apache基金会,成为开源项目。Kafka的主要特点包括高吞吐量、低延迟、可扩展性、持久性和高性能等。
Kafka的基本原理
Kafka通过将数据流发布到主题(Topic)上,然后订阅该主题的消费者可以实时读取和处理这些数据。每个主题可以有多个生产者和消费者,并且可以进行分区管理以提高性能和可靠性。
其核心组件包括生产者(Producer)、代理(Broker)和消费者(Consumer),其中生产者负责发布消息到指定的主题,消费者则从主题中订阅并消费消息。
Kafka的应用场景
(1)日志收集与分析:这是Kafka最初的设计目标之一,广泛用于各种服务的日志收集和分析。
(2)消息系统:Kafka能够解耦生产者和消费者,缓存消息,从而提高系统的可靠性和可用性。
它常用于不同系统间的数据交流和传递,如订单系统、支付系统和库存系统等。
(3)网站活动追踪:Kafka经常被用来记录Web用户或App用户的活动数据,帮助进行用户行为分析和个性化推荐。
(4)指标监控与报警:Kafka常用于传输监控指标数据,实现系统的实时监控和报警功能。
(5)流处理:结合Spark、Flink等流处理框架,Kafka可以构建复杂的实时数据处理管道,支持大规模数据的实时处理。
(6)事件驱动架构:Kafka支持事件驱动的架构设计,通过将业务逻辑与事件的发生绑定,实现事件驱动的自动化处理
(7)数据变更捕获(CDC) :Kafka可以用于捕获数据库的数据变更,实现数据同步和迁移。
Kafka的优势
(1)高吞吐量:Kafka每秒可以处理几十万条消息,延迟最低只有几毫秒。
(2)低延迟:顺序写入和零拷贝技术使得Kafka的延迟控制在毫秒级。
(3)可扩展性:无需停机即可扩展节点及节点上线。
(4)持久性:数据存储到磁盘上,保障数据的安全性和可靠性。
(5)高性能:Kafka具有非常稳定的性能,即使在TB级的数据量下也能保持高效运行
二,kafka的下载和安装
(1)官网下载 https://kafka.apache.org/downloads.html 或者在Linux下使用wget命令下载kafka;
(2)安装kafka,以Linux为例,下载的kafka资源压缩包上传到Linux服务器的某目录kafka下,并解压:
解压到当前目录的kafka下:tar -zxvf kafka_2.12-3.8.0.tgz -C ./kafka
授权当前用户所有权限:sudo chown -R xLoginUser ./kafka
(3)修改配置文件:
在kafka的安装目录下修改配置,常见的配置文件包括server.properties 、zookeeper.properties 等。需要特别注意的是,如果有多台机器部署Kafka集群,每台机器都需要独立修改 broker.id 和 listeners 等参数。
(4)添加环境变量:
将Kafka的bin目录添加到系统的PATH环境变量中,以便在任何地方都能运行Kafka的脚本
export PATH=$PATH:/opt/kafka/bin
source ~/.bashrc
(5)启动ZooKeeper服务:
Kafka依赖于ZooKeeper来管理集群,因此需要先启动ZooKeeper服务。可以在Kafka的
bin目录下运行以下命令启动ZooKeeper:
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
(6)启动Kafka服务:
启动ZooKeeper之后,就可以启动Kafka服务了。在Kafka的bin目录下运行以下命令启动Kafka:
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
(7)测试Kafka是否正常运行:
# 创建生产者
/opt/kafka/bin/kafka-console producer -- topic=your-topic-name -- property的关键值对
# 创建消费者
/opt/kafka/bin/kafka-console consumer -- topic=your-topic-name -- from beginning -- property的关键值对
三,SpringBoot如何接入kafka的例子
Spring Boot 接入 Kafka 的方法主要是通过 Spring Kafka 项目。以下是一个基本的例子
1,添加依赖到你的 pom.xml 文件中:
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <!-- 上面是 kafka 的依赖,下面是SpringBoot的依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>3.2.8</version> </dependency> </dependencies>
2,在 application.properties 或 application.yml 中配置 Kafka 相关属性:
# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3,创建一个 Kafka 生产者,使用 KafkaTemplate 对象把消息发送到指定的 topic 中:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
4,创建一个 Kafka 消费者,使用注解 @KafkaListener 监听指定的 topic 和 groupId :
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { @KafkaListener(topics = "myTopic", groupId = "myGroup") public void listen(String message) { System.out.println("Received message in group myGroup: " + message); } }
5,在主类或配置类上添加 @EnableKafka 注解启用 Kafka:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; @SpringBootApplication @EnableKafka public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
这里介绍和演示SpringBoot如何接入kafka的例子,欢迎拍砖讨论...