rabbitMQ
常见的消息队列产品
rocketMQ(火箭)
阿里出品开源
kakfa
较少的核心提供超高的吞吐量,高可用高可靠高可扩展,但是建议支持较少的topic来保证其高吞吐量,适合大数据计算与日志收集。
rabbitMQ
基于erlang语言开发,天生具有高并发承载能力,性能很好,延迟很低。如果并发要求不超百万级建议使用。
rabbitMQ
常用术语
大体与kafka相同只有个别不同
queue(队列)
存放消息的地方
交换器
将收到的消息发送到不同的队列
虚拟主机
一个队列可以被一个虚拟主机管理,可以设置管理用户,权限等等。提供了逻辑的分组与隔离。
消息队列的两种工作方式
PTP:point to point
两方应用程序一对一对接,在queue中一个消息被消费后就会被释放,然后整体往前移一位(offset)。
pub/sub(发布/订阅)
将信息发布到某个队列,当该消息被消费后并不会被释放,仍可以被多个消费者消费。即一对多。
消息队列的缺点
系统可用性降低
越简单的架构可用性越高
系统复杂性提高
对于编程者需要考虑消息在队列中如何处理等,对于运维者就多了一个需要维护的组件,以及要实现消息队列的集群特性等。
数据一致性无法保证
因为增加了中间件,消息的产生与处理,包括中间转发等等因素就可能导致数据不一致。
部署rabbitMQ
单机部署rabbitMQ
准备前奏
处理防火墙与内核安全机制
光盘基础源没有该程序,需要用到网络仓库源。
rabbitMQ使用erlang语言编写的
yum -y install erlang
yum -y install rabbitMQ-server
启动该服务用systemd即可
利用ps aux 查看进行是否开启
开启web管理界面
rabbitMQ提供了web的管理界面,但是需要开启
rabbitmq-plugins list
列出所有可用插件
rabbitmq-plugins enable 插件名
web管理页面插件名为:rabbitmq_management
启用插件后可以在列出一下查看是否启用,启用为E,依赖为e。之后需要重启该服务才能生效。
管理界面的web访问端口为15672,为了安全和记忆考虑可以搭建代理来进行访问,建议使用nginx
在对应目录/etc/nginx/conf.d/下创建后缀名为.conf的文件添加如下参数
server { listen 80; location / { proxy_pass http://127.0.0.1:15672; } }
这里需要注意location的匹配,仅仅作为代理使用。
初始账号密码都为guest
测试
可以利用程序去对接rabbitmq
本案例使用python作为测试工具
安装python3
python3需要安装一个函数库
pip3 install pika
编写测试程序
消息生产者,发送端
import pika # 连接到RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101')) channel = connection.channel() # 声明队列;如果队列不存在会被创建 channel.queue_declare(queue='test_queue', durable=True) # 发送消息到队列中 channel.basic_publish( exchange='', routing_key='test_queue', body='Hello, RabbitMQ!', properties=pika.BasicProperties( delivery_mode=2, # 使消息持久化 ) ) print("消息发送完毕") # 关闭连接 connection.close()
消息消费者,接收端
import pika # 连接到RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101')) channel = connection.channel() # 声明队列,确保RabbitMQ中有一个名为'test_queue'的队列 channel.queue_declare(queue='test_queue', durable=True) # 定义回调函数来处理消息 def callback(ch, method, properties, body): print(f"Received {body.decode()}") # 消费队列中的消息,回调函数为callback channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True) print("Waiting for messages. To exit press CTRL+C") # 开始监听消息 channel.start_consuming()
执行命令
python3 程序文件名
在执行时可以利用web管理界面查看
rabbitmq在命令行模式的一些常用命令
虚拟主机
rabbitmqctl list_vhosts ##列出所有的虚拟主机
rabbitmqctl add_vhost fll ##创建名字叫fll的虚拟主机
rabbitmqctl delete_vhost fll ##删除名字叫fll的虚拟主机
用户管理
rabbitmqctl add_user user1 user1_passwd ##创建user1用户,密码为user1_passwd
rabbitmqctl list_users
rabbitmqctl change_password user1 new_passwd ##更改user1的密码为new_passwd
rabbitmqctl delete_user user1 #删除user1用户
rabbitmqctl list_users #列出用户 rabbitmqctl add_user <username> <password> #创建用户 rabbitmqctl change_password <username> <password> ##更改用户密码 rabbitmqctl delete_user <username> #删除用户 rabbitmqctl clear_password <username> #清除用户密码
权限管理
权限角色介绍
(1) 超级管理员(administrator):guest
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
(2) 监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。
(4) 普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
赋予帐号权限
rabbitmqctl set_user_tags user1 角色
针对虚拟主机添加用户,并赋予对应权限。
rabbitmqctl set_permissions -p fll user1 '.*' '.*' '.*'
用户设置所有的配置,读写queue,exchange,分别对应了三个‘.*’
说明:用户权限指的是用户对exchange(交换器),queue(队列)的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。
列出对应权限
rabbitmqctl list_permissions -p fll
-p 指定虚拟主机
清除权限
rabbitmqctl clear_permissions -p fll user2
针对f11的虚拟主机上的用户清除所有权限。
限制
其他
部署rabbitmq集群
rabbitmq是用erlang编写的,本身就支持分布式,通过各个节点的cookie来实现。
集群中有两个概念
内存节点
内存结点需要将所有的元数据信息存储到内存中
元数据:队列,交换器,绑定,虚拟主机等组成了AMQP(高级队列消息协议)的地处被称为元数据。
磁盘节点
磁盘节点也会将元数据存储到内存上,并且会将数据持久化到磁盘,
实验过程
为了确保数据安全性,磁盘节点与内存节点都要存在。
准备前奏
在三台主机上都修改主机名,并且编辑hosts文件实现通过域名访问。
处理内核安全机制与防火墙
yum安装erlang与rabbitmq-server,需要联网
安装完毕后启动并查看进程
可以选择开启web管理插件
在/var/lib/rabbitmq/下有一个隐藏的cookie文件,.erlang.cookie需要保证三台主机的cookie一致,可利用scp拷贝。或者复制都行。最最最重要的要重启系统否则后续无法正常进行。
内存节点需要单独处理
先关闭rabbitmq
rabbitmq stop_app
该命令只会停止掉rabbitmq,并不会停止erlang
将内存节点连接到磁盘节点
rabbitmqctl join_cluster --ram rabbit@mq01
再次开启rabbitmq
rabbitmqctl start_app
查看集群状态
rabbitmqctl cluster_status
全选复制到xmind后即可形成条理性笔记。