小阿轩yx-zookeeper+kafka群集
消息队列(Message Queue)
- 是分布式系统中重要的组件
通用的使用场景可以简单地描述为
- 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。
消息队列
什么是消息队列
消息(Message)
- 指在应用间传送的数据。
消息队列(Message Queue)
- 一种应用间的通信方式
- 消息发送后可以立即返回
- 由消息系统来确保消息的可靠传递
为什么需要消息队列
解耦
- 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
冗余
- 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
扩展性
- 消息队列解耦了处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
灵活性 & 峰值处理能力
- 访问量剧增情况下应用仍然需要继续发挥作用。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。
- 使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
可恢复性
- 系统一些组件失效时不会影响整个系统
- 消息队列降低了进程间的耦合度,加入的消息仍然可以在系统恢复后被处理
顺序保证
- 大部分消息队列本就是排序的,且能保证数据会按特定的顺序处理(Kafka 保证一个 Partition 内的消息的有序性)
缓冲
- 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
异步通信
- 有时用户不想也不需要立即处理消息
- 消息队列提供异步处理机制,允许用户把消息放入队列但不立即处理,队列中想放多少就放多少,在需要时再处理
Kafka 基础与入门
kafka 基本概念
- Kafka 是一种高吞吐量的分布式发布/订阅消息系统,这是官方对 kafka 的定义
- 是 Apache 组织下的一个开源系统
举个例子
- 现在是个大数据时代,各种商业、社交、搜索、浏览都会产生大量的数据。
如何快速收集这些数据,如何实时的分析这些数据,是一个必须要解决的问题,同时,这也形成了一个业务需求模型
- 即生产者生产(produce)各种数据,消费者(consume)消费(分析、处理)这些数据。
面对这些需求,如何高效、稳定的完成数据的生产和消费呢?
- 这就需要在生产者与消费者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间如何传递消息。
最大的特性
- 可以实时的处理大量数据以满足各种需求场景
- 比如基于hadoop 平台的数据分析、低时延的实时系统storm/spark 流式处理引擎等。
现在已被多家大型公司作为多种类型的数据管道和消息系统使用。
kafka 角色术语
核心概念和角色
- Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)。
- Topic:每条发布到 Kafka 集群的消息都有一个分类,这个类别被称为Topic(主题)。
- Producer:指消息的生产者,负责发布消息到kafka broker。
- Consumer:指消息的消费者,从kafka broker 拉取数据,并消费这些已发布的消息。
- Partition:Partition 是物理上的概念,每个Topic 包含一个或多个 Partition,每个partition 都是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。
- Consumer Group:消费者组,可以给每个Consumer 指定消费组,若不指定消费者组,则属于默认的 group。
- Message:消息,通信的基本单位,每个producer 可以向一个topic 发布一些消息。
kafka 拓步架构
一个典型的 kafka 集群包含
- 若干 Producer
- 若干 broker
- 若干 Consumer group
- 一个 Zookeeper 集群
kafka
- 通过 Zookeeper 管理群集配置,选举leader
- 支持消息持久化存储
- 每条消息写到 partition 中,顺序写入磁盘,效率高
Topic 和 partition
- Kafka 中 topic(主题)以 partition 的形式存放
- 每一个 topic 都可以设置它的partition 数量,Partition 的数量决定了组成 topic 的 log 的数量。
- 推荐 partition 的数量一定要大于同时运行的 consumer 的数量。
- 在存储结构上,每个 partition 在物理上对应一个文件夹,该文件夹下存储这个 partition的所有消息和索引文件。
- partiton 命名规则为 topic 名称+序号,第一个 partiton 序号从0开始,序号最大值为 partitions 数量减 1。
建议 partition 的数量要小于等于集群 broker 的数量,这样消息数据就可以均匀的分布在各个broker 中
Product 生产机制
- Producer 是消息和数据的生产者
- 它发送消息到 broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。
- 如果 Partition 机制设置的合理,所有消息都可以均匀分布到不同的 Partition 里,这样就实现了数据的负载均衡。
- 如果一个 Topic 对应一个文件,那这个文件所在的机器 I/0 将会成为这个Topic 的性能瓶颈,而有了 Partition后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。
Consumer 消费机制
Kafka 发布消息通常有两种模式
- 队列模式(queuing)
- 发布/订阅模式(publish-subscribe)
Kafka 中的 Producer 和 consumer 采用的两种模式
- push
- pull
zookeeeper 概念介绍
- 是一种分布式协调技术
- 也是一种为分布式应用所设计的高可用、高性能的开源协调服务
- 提供了一项基本服务:分布式锁服务
- 也提供了数据的维护和管理机制(如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等)
分布式协调技术
- 主要用来解决分布式环境当中多个进程之间的同步控制
- 让他们有序的去访问某种共享资源
- 防止造成资源竞争(脑裂)的后果
脑裂
- 指在主备切换时,由于切换不彻底或其他原因,导致客户端和 Slave 误以为出现两个activemaster,最终使得整个集群处于混乱状态
分布式系统
- 在不同地域分布的多个服务器,共同组成的一个应用系统来为用户提供服务,在分布式系统中最重要的是进程的调度
分布式锁
- 属于分布式环境下
- 分布式协调技术实现的核心内派
分布式锁的实现者
- Google 的 Chubby
- Apache 的 ZooKeeper
经过验证的优势
- 可靠性
- 可用性
zookeeper 应用举例
什么是单点故障问题?
在主从分布式系统中
- 主节点负责任务调度分发,从节点负责任务处理
- 主节点发生故障时整个应用系统就瘫痪了
解决方法
- 通过对集群 master 角色获取解决问题
传统的方式是怎么解决单点故障的?以及有哪些缺点?
- 采用一个备用节点
- 备用节点定期向主节点发送ping包,主节点收到ping包以后向备用节点回复Ack信息
- 当备用节点收到回复时就会认为当前主节点运行正常,让它继续提供服务
- 当主节点故障时,备节点就无法收到回复信息了,备节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务
传统解决单点故障方法隐患
网络问题
- 主节点没有出现故障,只在回复 ack 响应时网络发生故障,备用节点无法收到回复,就会认为主节点出现故障,备节点接管主节点服务成为新主节点
- 分布式系统就出现两个主节点(双Master节点),会导致分布式系统服务发生混乱,整个分布式系统将不可用
zookeeper 的工作原理是什么?
master 启动
- 分布式系统引入 Zookeeper 后可以配置多个主节点,两个 A(master000001) 、B(master000002) 主节点启动后,都会向 Zookeeper 中注册节点信息,注册完会进行选举,编号最小节点A选举获胜成为主节点,B成为备用节点,这样 Zookeeper 完成了对两个 Master 进程调度。完成主、备节点的分配和协作
master 故障
- 节点 A 发生故障,在 Zookeepr 的节点会被自动删除,Zookeepr 会自动感知节点变化,发现节点 A 故障后再次选举,节点 B 将获胜替代主节点 A 成为新主节点
master 恢复
- 主节点恢复,会再次向 Zookeeper 注册自身节点信息,只是这时节点信息会变成 master000003
- ZooKeeper 会感知节点变化再次发动选举, B 会继续担任主节点,A 会担任备用节点
zookeeper 集群架构
- 一般通过集群结构提供服务
集群主要角色有
- server
- client
server分为三个角色
- leader
- follower
- observer
每个角色的含义
Leader:领导者角色
- 主要负责投票的发起和决议
- 更新系统状态
follower:跟随着角色
- 接收客户端的请求并返回结果给客户端
- 在选举过程中参与投票
observer:观察者角色
- 接收客户端的请求
- 并将写请求转发给leader
- 同时同步leader状态
- 但是不参与投票
observer
- 扩展系统
- 提高伸缩性
client:客户端角色
- 向 zookeeper 发起请求
zookeeper 的工作流程
修改数据的流程
- 集群中每个 Server 在内存中存储了一份数据
- Zookeeper 启动时,从实例中选举一个 Server 作为 leader,Leader 负责处理数据更新等操作
- 当且仅当大多数 Server 在内存中成功修改数据,才认为数据修改成功。
写的流程
- 客户端 client 首先和一个 Server 或者 Observe 通信,发起写请求
- 然后 Server 将写请求转发给 Leader,Leader 再将写请求转发给其它 Server,其它 Server 在接收到写请求后写入数据并响应 Leader,Leader 在接收到大多数写成功回应后,认为数据写成功,最后响应 client,完成一次写操作过程。
Zookeeper 在 Kafka 中的作用
Broker 注册
- 是分布式部署并且相互之间相互独立
- 但是需要有一个注册系统能够将整个集群中的Broker 管理起来,此时就使用到了 Zookeeper。
- 在 Zookeeper 上会有一个专门用来进行 Broker服务器列表记录的节点:/brokers/ids
- 每个 Broker 在启动时,都会到Zookeeper 上进行注册,即到/brokers/ids 下创建属于自己的节点,如/brokers/ids/[0...N]。
Kafka
- 使用了全局唯一的数字来指代每个 Broker 服务器
- 不同的 Broker 必须使用不同的 BrokerID 进行注册
- 创建完节点后,每个 Broker 就会将自己的 IP 地址和端口信息记录到该节点中去。
- 其中,Broker 创建的节点类型是临时节点,一旦 Broker 宕机,则对应的临时节点也会被自动删除。
Topic 注册
- Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护,由专门的节点来记录
- Kafka中每个Topic 都会以 /brokers/topics/[topic] 的形式被记录
- Broker 服务器启动后,会到对应 Topic 节点 (/brokers/topics) 上注册自己的 Broker ID 并写入针对该 Topic 的分区总数
生产者负载均衡
- 同一个 Topic 消息会被分区并将其分布在多个 Broker 上
- 生产者需要将消息合理地发送到这些分布式的 Broker 上
四层负载均衡
- 根据生产者的 IP 地址和端口来为其确定一个相关联的 Broker。
- 一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该 Broker。这种方式逻辑简单
- 但是,其无法做到真正的负载均衡,实际系统中的每个生产者产生的消息量及每个 Broker 的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的 Broker 接收到的消息总数差异巨大,同时,生产者也无法实时感知到 Broker 的新增和删除。
使用 Zookeeper 进行负载均衡
- 每个 Broker 启动时,都会完成 Broker 注册过程,生产者会通过该节点的变化来动态地感知到 Broker 服务器列表的变更,这样就可以实现动态的负载均衡机制。
消费者负载均衡
- 与生产者类似
- Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker 服务器上接收消息
- 每个消费者分组包含若干消费者
- 每条消息都只会发送给分组中的一个消费者
- 不同的消费者分组消费自己特定的Topic下面的消息,互不干扰
记录消息区分于消费者的关系
- 消费组(Consumer Group)下有多个 Consumer(消费者)。
- Kafka 会为每个消费者组(consumerGroup)分配一个全局唯一的 Group ID,Group 内部的所有消费者共享该 ID。
- 订阅的 topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他 group)。
- 同时,Kafka为每个消费者分配一个 Consumer ID,通常采用"Hostname:UUID"形式表示。
- 在 Kafka 中,规定了每个消息分区只能被同组的一个消费者进行消费,需要在Zookeeper 上记录消息分区与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其 Consumer ID写入到 Zookeeper 对应消息分区的临时节点上。
消息消费进度 Offset 记录
- 在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度 offset记录到 zookeeper 上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后能够从之前的进度开始继续进行消息消费。
Offset 在 Zookeeper 中由一个专门节点进行记录,节点路径为
- /consumers/[group id]/offsets/[topic]/[broker id-partition id]
- 节点内容就是 Offset 的值。
消费者注册
消费到消费者分组
- 每个消费者服务器启动时,都会到 zookeeper 的指定节点下创建一个属于自己的消费者节点,完成节点创建后,消费者就会将自己订阅的Topic 信息写入该临时节点。
对消费者分组中的消费者的变化注册监听
- 即对/consumers/[group_id]/ids 节点注册子节点变化的 Watcher 监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。
对 Broker 服务器变化注册监听
- 消费者需要对/broker/ids/[0-N]中的节点进行监听
- 如果发现Broker服务器列表发生变化,就根据具体情况来决定是否需要进行消费者负载均衡。
进行消费者负载均衡
- 让同一个 Topic 下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程
- 对于一个消费者分组,如果组内的消费者服务器发生变更或 Broker 服务器发生变更,会发出消费者负载均衡。
单节点部署 kafka
主机
192.168.10.101
zookeeper 和 kafka 源码包上传XShell
设置 hosts 文件
- 192.168.10.101 kafka1
单节点不需要关闭防火墙、内核机制
首先下载 java 依赖
[root@kafka1 ~]# yum -y install java
解压
[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
移动 zookeeper 软件
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
进入目录
[root@kafka1 ~]# cd /etc/zookeeper/conf
复制配置文件
[root@kafka1 conf]# mv zoo_sample.cfg zoo.cfg
修改配置文件
[root@kafka1 conf]# vim zoo.cfg dataDir=/etc/zookeeper/zookeeper-data
进入 zookeeper 目录
[root@kafka1 conf]# cd /etc/zookeeper/
创建目录
[root@kafka1 kafka]# mkdir zookeeper-data
启动服务
[root@kafka1 zookeeper]# ./bin/zkServer.sh start
查看服务状态
[root@kafka1 zookeeper]# ./bin/zkServer.sh status
解压 kafka
[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
移动软件
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
进入目录
[root@kafka1 ~]# cd /etc/kafka/
修改配置文件
[root@kafka1 kafka]# vim config/server.properties //60行 log.dirs=/etc/kafka/kafka-logs
创建目录
[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
启动脚本开启服务
[root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties &
检查两个端口的开启状态
[root@kafka1 kafka]# netstat -anpt | grep 2181 tcp6 0 0 :::2181 :::* LISTEN 2029/java tcp6 0 0 ::1:35678 ::1:2181 ESTABLISHED 2109/java tcp6 0 0 ::1::2181 ::1:35678 ESTABLISHED 2029/java [root@kafka1 kafka]# netstat -anpt | grep 9092 tcp6 0 0 :::9092 :::* LISTEN 2109/java tcp6 0 0 127.0.0.1:38952 127.0.0.1:9092 ESTABLISHED 2109/java tcp6 0 0 127.0.0.1:9092 127.0.0.1:38952 ESTABLISHED 2109/java
启动时先启动 zookeeper,关闭时先关闭 kafka
如果要关闭 kafka
[root@kafka1]# ./kafka-server-stop.sh
如果关不了,就 kill 杀死该进程
先创建 topic
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
列出 topic
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper kafka1:2181 test
生产消息
[root@localhost bin]# ./kafka-console-producer.sh --broker-list kafka1:9092 -topic test >zhangsan >lisi
消费消息(打开一个新的终端,一边生产消息,一边查看消费消息)
[root@localhost bin]# ./kafka-console-producer.sh --bootstrap-server kafka1:9092 -topic test zhangsan lisi
删除消息
[root@localhost bin]# ./kafka-topic.sh --delete --zookeeper kafka1:2181 --topic test
这里恢复快照
群集部署 kafka
主机
kafka1:192.168.10.101
kafka2:192.168.10.102
kafka3:192.168.10.103
上传 zookeeper 和 kafka 源码包至 XShell
修改主机 hosts 文件(所有主机都配置)
修改主机名
[root@localhost ~]# hostnamectl set-hostname kafka1 [root@localhost ~]# bash
[root@localhost ~]# hostnamectl set-hostname kafka2 [root@localhost ~]# bash
[root@localhost ~]# hostnamectl set-hostname kafka3 [root@localhost ~]# bash
同步会话,修改文件
[root@kafka1 ~]# vim /etc/hosts //添加主机地址 192.168.10.101 kafka1 192.168.10.102 kafka2 192.168.10.103 kafka3
关闭防火墙、内核机制
[root@kafka1 ~]# systemctl stop firewalld [root@kafka1 ~]# setenforce 0
下载 java 依赖
[root@kafka1 ~]# yum -y install java
解压 zookeeper
[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
移动
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
进入目录
[root@kafka1 ~]# cd /etc/zookeeper
创建数据目录
[root@kafka1 ~]# mkdir zookeeper-data
切换目录
[root@kafka1 ~]# cd conf/
复制文件
[root@kafka1 conf]# cp zoo_sample.cfg zoo.cfg
修改配置文件
[root@kafka1 conf~]# vim zoo.cfg dataDir=/etc/zookeeper/zookeeper-data server.1=192.168.10.101:2888:3888 server.2=192.168.10.102:2888:3888 server.3=192.168.10.103:2888:3888
- zookeeper 只用的端口
- 2181:对 cline 端提供服务
- 3888:选举 leader 使用
- 2888:集群内机器通讯使用(Leader 监听此端口)
切换到 zookeeper-data 目录
[root@kafka1 conf]# cd /etc/zookeeper/zookeeper-data
创建节点 id 文件(按 server 编号设置这个 id,三个机器不同)
节点1
[root@kafka1 zookeeper-data]# echo 1 > myid [root@kafka1 zookeeper-data]# cat myid 1
节点2
[root@kafka1 zookeeper-data]# echo 2 > myid [root@kafka1 zookeeper-data]# cat myid 2
节点3
[root@kafka1 zookeeper-data]# echo 3 > myid [root@kafka1 zookeeper-data]# cat myid 3
同步会话,切换目录
[root@kafka1 zookeeper-data]# cd [root@kafka1 ~]# cd /etc/zookeeper [root@kafka1 zookeeper]# cd bin
三个节点启动服务
[root@kafka1 bin]# ./zkServer.sh start /usr/bin/java ZooKeeper JMX enabled by default Using config: /etc/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
查看状态
[root@kafkal bin]# ./zkServer.sh status /usr/bin/iava ZooKeeper JMX enabled by default Using config: /etc/zookeeper/bin/../conf/zoo.cfg Client port found: 2181. client address: localhost. Mode: followerr
kafka 的部署
kafka 的安装(三个节点的配置相同)
解压 kafka
[root@kafak1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
移动
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
切换目录
[root@kafka1 ~]# cd /etc/kafka [root@kafka1 kafka]# cd config/
三个节点修改配置文件
[root@kafka1 config]# vim server.properties //21行 修改,注意其他两个的id分别是2和3 broker.id=1 //31行 修改,其他节点改成各自的IP地址 listeners=PLAINTEXT://192.168.10.101:9092 //60行 修改 log.dirs=/etc/kafka/kafka-logs //65行 分片数量,不能超过节点数 num.partitions=1 zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:2181
9092 是 kafka 的监听端口
三个节点创建 logs 目录
[root@kafka1 config]# mkdir /etc/kafka/kafka-logs
切换目录
[root@kafka1 config]# cd .. [root@kafka1 kafka]# cd bin
三个节点启动服务
[root@kafka1 bin]# ./kafka-server-start.sh /etc/kafka/config/server.properties [1] 2212
如果不能启动,可以将/etc/kafka/kafka-logs 中的数据清除再试试
检查端口
[root@kafka1 bin]# netstat -anpt | grep 9092 tcp6 0 0 192.168.10.101:9092 :::* LISTEN 2121/java 212/java tcp6 0 0 192.168.10.101:9092 192.168.10.102:58058 ESTABLISHED 2121/java [root@kafka1 bin]# netstat -anpt | grep 2181 tcp6 0 0 :::2181 :::* LISTEN 2121/java tcp6 0 0 192.168.10.101:2181 192.168.10.101:34214 ESTABLISHED 2212/java tcp6 0 0 192.168.10.101:34214 192.168.10.101:2181 ESTABLISHED 2121/java [root@kafka1 bin]# netstat -anpt | grep 2888 tcp6 0 0 192.168.10.101:59564 192.168.10.103:2888 ESTABLISHED 2121/java [root@kafka1 bin]# netstat -anpt | grep 3888 tcp6 0 0 192.168.10.101:3888 :::* LISTEN 2121/java tcp6 0 0 192.168.10.101:3888 192.168.10.103:49496 ESTABLISHED 2121/java tcp6 0 0 192.168.10.101:3888 192.168.10.102:58838 ESTABLISHED 2121/java
三个节点取消同步
切花目录
[root@kafka1 bin]# cd [root@kafka1 ~]# cd/etc/kafka/bin
创建 topic 目录
[root@kafka1 bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test Created topic test.
列出 topic,(任意一个节点)
[root@kafka2 bin]# ./kafka-topics.sh --list --zookeeper kafka1:2181 test [root@kafka2 bin]# ./kafka-topics.sh --list --zookeeper kafka2:2181 test [root@kafka2 bin]# ./kafka-topics.sh --list --zookeeper kafka3:2181 test
生产消息
[root@kafka1 bin]# ./kafka-console-producer.sh --broker-list kafkal:9092 -topic test >hyx >zhangsan >lisi
任意一个节点查看消费消息
[root@kafka3 bin]# ./kafka-console-consumer.sh --bootstrap-server kafkal:9092 --topic test hyx zhangsan lisi
错误提示
Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
解决方法
删除所有日志文件
[root@kafka1 kafka]# rm -rf /tmp/kafka-logs/*
pkill 杀死 kafka 的进程号
[root@kafka1 kafka]# netstat -anpt | grep 9092
启动服务
[root@kafka2 kafka]# ./bin/kafka-server-start.sh config/server.properties &
小阿轩yx-zookeeper+kafka群集