想真正学习或者提升自己的ETL领域知识的朋友欢迎进群,一起学习,共同进步。文章底部关注我,公众号后台加我微信入群,备注kettle。
1、MQTT介绍
MQTT (Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,设计用于连接低带宽、高延迟或不可靠网络的设备。
MQTT 是基于发布/订阅模式(Publish/Subscribe)的协议,其中设备可以发布消息到一个主题(Topic),其他设备可以订阅这个主题以接收相关消息。这种模式使得设备之间可以进行灵活的通信,而不需要直接连接到彼此。
常见的支持支持MQTT的中间件有RabbitMQ和ActiveMQ。
2、今天我们基于RabbitMQ来进行学习下MQTT 生产者,如下图所示:
3、通过步骤【生成记录】生成10条记录,记录中有一个message字段,类型为String字符串,message的值为我是java小金刚,如下图所示:
4、通过步骤【MQTT producer】将message值推送到RabbitMQ中供其他应用消费。
Step name:自定义步骤名称
Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)
Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。
Specify topic:选择“指定topic”以输入特定的主题名称,静态指定。
Get data from field:动态指定topic。
Topic name:在主题名称字段中,输入您希望发布流数据(消息)的 MQTT 主题的名称。每个 MQTT 生产者步骤将启动一个单独的线程进行发布。
Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)
Message field:设置消息字段,来源于前置步骤,下拉选择需要的字段。
Username:MQTT服务器的用户名,如admin
Password:MQTT服务器的秘密。
Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。
Keep Alive Interval:指定在 PDI 客户端完成传输一个控制数据包和开始传输下一个数据包之间允许经过的最大间隔秒数。
Max Inflight:指定在任何给定时间点处理中的最大消息数量。
Connection Timeout:指定在未收到消息时断开连接的时间,以秒为单位。
Clean Session:指定broker是否会存储或清除会话的消息。请选择以下之一。
True
当设置为 True 时,经纪人不会存储客户端的任何信息。所有来自先前持久会话的信息都将被清除。
False
当设置为 False 时,经纪人将存储客户端的所有订阅。当 QoS(服务质量)参数设置为 1 或 2 时,所有未接收的消息将被存储。
Storage Level:消息是存储在内存中还是在磁盘上的。
默认(留空)是内存。
对于磁盘,请输入有效路径。
Server URIs:指定 MQTT 服务器的统一资源标识符(URI),如mqtt://example.com:1883
MQTT Version:请指定此步骤连接到的 MQTT 协议版本,如MQTT 3.1.1
Automatic Reconnect:客户端在与服务器断开连接时尝试自动重新连接。请选择 True 或 False:
True
是,尝试重新连接到服务器。
False
否,不尝试重新连接到服务器。
5、RabbitMQ MQTT协议配置,
1)需要先安装然RabbitMQ。
2)启用MQTT插件,通过命令rabbitmq-plugins enable rabbitmq_mqtt进行启动。
3)开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883
端口上了。
6、spoon中运行转换文件,此时发现RabbitMQ中已经成功写入数据,如下图所示: