RabbitMQ定义的MQ多个consumer重复消费同一条消息
@author:shengfq
@date:2024-07-09
@category: 数据一致性
@title:RabbitMQ定义的MQ多个consumer重复消费同一条消息
系统环境
direct类型的Exchange,通过routingKey:START/CONFORM绑定到业务队列Queue: START queue.xxx.qm.operationStart 工序开工质量监听队列 CONFORM queue.xxx.qm.operationEnd 工序完工质量监听队列 由于有多个qm服务实例,都监听了上述的queue,所以就有多个consumer.问题描述
【2010工厂,生产环境,订单数据】 订单号 880006861916 订单状态 130-在制 订单类型 ZF01-成品订单 物料编码 130901000711B 同一个订单同一个工序生成了2个任务(检验计划里面也只有一个工序的检验规格呢) [图片]TKE日志如下,基本上通过requestId可以判断是同一条消息被多个consumer消费了.
2024-07-05 10:41:10projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-62sxdpathstdoutlog_data2024-07-05 10:41:10.765 [TID: N/A] WARN [-,,,] 1 --- [Executor-128574] c.s.m.q.t.s.h.OperationStartReceiver : OperationStartReceiver msg:{"oprName":"打胶-玻璃装配","scrappedQuantity":0,"materialNo":"130901000711B","wipOrderNo":"880006861916","oprSequenceNo":"0041","goodQuantity":0,"operationWorkStationName":"打胶-玻璃装配","standOprName":"人工组装","wipOrderOperationId":596309434920902660,"requestId":"2b7eaabea93843dcb9bf6838f8da1ca3","workCenterId":579597295600041993,"progressStatus":130,"facilityCode":"2010","facilityId":571301815799623680,"operationWorkStationCode":"2010JS01A0002","completeSystem":"IMOM","operationWorkStationId":579600364052193290,"orderPlanQuantity":1,"materialId":575618864008929280,"creationDate":1720147195174,"workCenterCode":"303910","wipOrderId":596309434912514048,"createdBy":39020,"workType":0,"tenantId":2,"operationType":"START","standSequenceNo":"ZP00009","oprSerialNo":"000000"} UserDetails:CustomUserDetails{userId=39020, username=hecf6, realName='何超锋', email='null', timeZone='null', language='zh_CN', roleId=null, roleIds=null, siteRoleIds=null, tenantRoleIds=null, roleMergeFlag=null, tenantId=2, tenantNum='null', tenantIds=null, imageUrl='null', organizationId=2, isAdmin=null, clientId=null, clientName='null', clientAuthorizedGrantTypes=null, clientResourceIds=null, clientScope=null, clientRegisteredRedirectUri=null, clientAccessTokenValiditySeconds=null, clientRefreshTokenValiditySeconds=null, clientAutoApproveScopes=null, additionInfo={"facility_ids":"571301815799623680","employee_no":"80033656","account_no":"hecf6","facility_name":"2010-华湘工厂","facility_no":"2010","facility_id":"571301815799623680","phone_number":"15062676297","domain_account":"hecf6"}, apiEncryptFlag=null, apiReplayFlag=null, menuIdFlag=null, roleLabels='null} 2024-07-05 10:41:10projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-62sxdpathstdoutlog_data2024-07-05 10:41:10.766 [TID: N/A] INFO [-,,,] 1 --- [Executor-128574] .s.i.BaseInspectionTaskCreateServiceImpl : 开始创建检验任务: wipOrderNo:880006861916 materialNo:130901000711B type:01,02,04 2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-jpxkbpathstdoutlog_data2024-07-05 10:41:10.783 [TID: N/A] INFO [-,,,] 1 --- [Executor-130591] i.InspectionTaskCreateServiceImplProduce : handle additional field in produce: {"objectVersionNumber":1,"oprName":"打胶-玻璃装配","workStationId":579600364052193290,"materialNo":"130901000711B","lastUpdateDate":1720147270779,"_token":"KAGDNz23HetMDAfJJq/9LYpnI/R6ia9Qcr0KvO1kMzQFdY55i+FzAtP46AJMmH1KyuXjf3zheBwdLiwS6ULciHXbNz74wk0jWRr7otOcfXdCgONlfVsDBuyLU6ij1OJZ","productionLineName":"驾驶室装配线","sampleNo":"ZJYB0705018750001","isPassBack":false,"wipOrderNo":"880006861916","oprSequenceNo":"0041","taskExecutionNumber":0,"isFirst":false,"businessNo":"","taskType":"01","wipOrderOperationId":596309434920902660,"workCenterId":579597295600041993,"flex":{},"supplier":"","wipOperationId":596309434920902660,"wipOrderNoReverse":"619168600088","taskNo":"JYZJ2024070501875","id":597014172781752320,"inspectionPlanId":583340904417832960,"workStationCode":"2010JS01A0002","productionLineId":578627703880851457,"lastUpdatedBy":39020,"facilityId":571301815799623680,"quantity":1,"measureUnit":"","materialId":575618864008929280,"creationDate":1720147270779,"exeStatus":false,"inspectionPlanVersion":"2","deleted":false,"workCenterCode":"303910","recordUnqualifiedNumber":false,"createdBy":39020,"productionLineCode":"2010JS01","tenantId":2,"isDevice":false,"businessType":"01","workStationName":"打胶-玻璃装配","status":"2","oprSerialNo":"000000"} ================================= 2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-jpxkbpathstdoutlog_data2024-07-05 10:41:10.697 [TID: N/A] WARN [-,,,] 1 --- [Executor-130591] c.s.m.q.t.s.h.OperationStartReceiver : OperationStartReceiver msg:{"oprName":"打胶-玻璃装配","scrappedQuantity":0,"materialNo":"130901000711B","wipOrderNo":"880006861916","oprSequenceNo":"0041","goodQuantity":0,"operationWorkStationName":"打胶-玻璃装配","standOprName":"人工组装","wipOrderOperationId":596309434920902660,"requestId":"2b7eaabea93843dcb9bf6838f8da1ca3","workCenterId":579597295600041993,"progressStatus":130,"facilityCode":"2010","facilityId":571301815799623680,"operationWorkStationCode":"2010JS01A0002","completeSystem":"IMOM","operationWorkStationId":579600364052193290,"orderPlanQuantity":1,"materialId":575618864008929280,"creationDate":1720147195174,"workCenterCode":"303910","wipOrderId":596309434912514048,"createdBy":39020,"workType":0,"tenantId":2,"operationType":"START","standSequenceNo":"ZP00009","oprSerialNo":"000000"} UserDetails:CustomUserDetails{userId=39020, username=hecf6, realName='何超锋', email='null', timeZone='null', language='zh_CN', roleId=null, roleIds=null, siteRoleIds=null, tenantRoleIds=null, roleMergeFlag=null, tenantId=2, tenantNum='null', tenantIds=null, imageUrl='null', organizationId=2, isAdmin=null, clientId=null, clientName='null', clientAuthorizedGrantTypes=null, clientResourceIds=null, clientScope=null, clientRegisteredRedirectUri=null, clientAccessTokenValiditySeconds=null, clientRefreshTokenValiditySeconds=null, clientAutoApproveScopes=null, additionInfo={"facility_ids":"571301815799623680","employee_no":"80033656","account_no":"hecf6","facility_name":"2010-华湘工厂","facility_no":"2010","facility_id":"571301815799623680","phone_number":"15062676297","domain_account":"hecf6"}, apiEncryptFlag=null, apiReplayFlag=null, menuIdFlag=null, roleLabels='null} 2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-jpxkbpathstdoutlog_data2024-07-05 10:41:10.698 [TID: N/A] INFO [-,,,] 1 --- [Executor-130591] .s.i.BaseInspectionTaskCreateServiceImpl : 开始创建检验任务: wipOrderNo:880006861916 materialNo:130901000711B type:01,02,04 2024-07-05 10:41:11projectitrcp20220217183240276namespacekhoros-mom-prodpodmom-service-qm-9a049-7665f4899b-62sxdpathstdoutlog_data2024-07-05 10:41:10.873 [TID: N/A] INFO [-,,,] 1 --- [Executor-128574] i.InspectionTaskCreateServiceImplProduce : handle additional field in produce: {"objectVersionNumber":1,"oprName":"打胶-玻璃装配","workStationId":579600364052193290,"materialNo":"130901000711B","lastUpdateDate":1720147270864,"_token":"KAGDNz23HetMDAfJJq/9LYpnI/R6ia9Qcr0KvO1kMzQFdY55i+FzAtP46AJMmH1KyuXjf3zheBwdLiwS6ULciHXbNz74wk0jWRr7otOcfXdZr1IoJ3ydQQ21EW6nNQiq","productionLineName":"驾驶室装配线","sampleNo":"ZJYB0705018760001","isPassBack":false,"wipOrderNo":"880006861916","oprSequenceNo":"0041","taskExecutionNumber":0,"isFirst":false,"businessNo":"","taskType":"01","wipOrderOperationId":596309434920902660,"workCenterId":579597295600041993,"flex":{},"supplier":"","wipOperationId":596309434920902660,"wipOrderNoReverse":"619168600088","taskNo":"JYZJ2024070501876","id":597014173138264064,"inspectionPlanId":583340904417832960,"workStationCode":"2010JS01A0002","productionLineId":578627703880851457,"lastUpdatedBy":39020,"facilityId":571301815799623680,"quantity":1,"measureUnit":"","materialId":575618864008929280,"creationDate":1720147270864,"exeStatus":false,"inspectionPlanVersion":"2","deleted":false,"workCenterCode":"303910","recordUnqualifiedNumber":false,"createdBy":39020,"productionLineCode":"2010JS01","tenantId":2,"isDevice":false,"businessType":"01","workStationName":"打胶-玻璃装配","status":"2","oprSerialNo":"000000"}
为什么会出现多个consumer对同一条消息重复消费呢?
通过日志排查,同一消息队列的消息被重复消费,虽然创建检验任务有查重的判断,但是在多实例同时消费时,在非常短的时间内,数据落表和查重存在幻读情况(查询数据库还没落表成功时的状态),因此这种单进程级别的查重在分布式场景中基本无效,会有概率性的失败.
根据rabbitmq的原理,如果是direct类型的exchange,水平扩展的微服务多实例使用相同的routingkey绑定到同一个队列进行消费,其消费模式是 竞争消费者模式:多个消费者实例监听同一个队列,但每条消息只会被一个消费者实例消费。RabbitMQ使用公平分发(round-robin)的策略来选择消费者,但也可以通过设置消费者优先级或预取计数(prefetch count)来影响分发策略.
因此在这种情况下,消费者是竞争消费,当没有及时进行ACK应答时,就有大概率存在重复消费,就会存在同一消息被多次消费的可能情况.生产环境的日志证明了这一点.
问题的根源
消息处理的幂等性,无论是多个实例consumer消费同一条消息,在要求幂等的接口场景,就要做幂等性的代码实现,不能想当然认为这大概率不会发生,即使发生了1次,也是系统缺陷,要么做线性消费,要么做幂等消息处理.
解决办法:
思路1.让多个consumer监听器线性消费并让client端自动ACK.
//application.yml 增加自动应答 spring: rabbitmq: host:xxx port:xxx username:xx password:xx listener: simple: acknowledge-mode: auto prefectch: 1 retry: enabled: true //消息队列初始化为单活消费者模式 初始化queue时,设置client为单活消费者模式 HashMap<String,Object> args=new HashMap(); args.put("x-single-active-consumer",true); amqpAdmin.declareQueue(new Queue(xx,xx,xx,args));
配置完成后,删除该队列,qm服务自动重建队列,配置完后可以查看到该消息队列的consumer中,处于active的只有一个.
思路2. 消息幂等处理
MQ消费者的幂等性一般使用全局ID或者写个唯一标识(比如流水号/时间戳/UUID/订单号)来判断该消息是否已消费过,也可以利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费(推荐使用redis)。
也就是在消费者业务逻辑处理时,先进行全局加锁,不过这种情况对接口响应性能有压力,而且依赖于Redis的稳定性,如果redis崩溃了,接口就无法正常服务.基于现在企业内的实际情况,应该可以做出取舍.
最终选择
采用思路1实现方式,设置简单,影响范围小,不依赖Redis组件.
在sit环境,启动多个qm实例模拟生产环境消费消息,实践证明,没有再出现重复消费消息的情况.
补丁包分支:bugfix-62-imom-28381
缺陷:imom-28383
生产修复:
手动删除队列:
xxx 工序开工质量监听队列
xxx 工序完工质量监听队列
参考资料
微服务架构数据一致性分析