阅读量:0
C#实现数据采集系统-数据反写
实现步骤
- MQTT订阅,接收消息 链接-MQTT订阅接收消息
- 反写内容写入通信类,添加到写入队列中
- 实现Modbustcp通信写入
具体实现
2. 消息内容写入通信类,添加到写入队列中
在服务类DAqService
中添加通信集合_modbusTcps
用于存储每个设备的通信类,使用键值对Dictionary
存储设备ID和通信类,用于快速查找
然后在启动的时候,订阅各个设备ID的写入主题,添加控制方法DeviceControl
public class DAqService { public static string MainTopic = "DTSDAQ/"; private Dictionary<string, ModbusTcp> _modbusTcps; public DAqService(DAqOption option) { _modbusTcps = new Dictionary<string, ModbusTcp>(); //... } /// <summary> /// 启动服务 /// </summary> public void Start() { MqttControllor = new MqttControllor(_option.MqttConfig); foreach (var item in _deviceLinks) { ModbusTcp modbusTcp = new ModbusTcp(item); modbusTcp.DoMonitor(); modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated; //将 _modbusTcps.Add(item.UID, modbusTcp); MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl); } if (_serviceConfig.IsPushScheduled) { timer.Start(); } } }
实现消息订阅方法-设备控制DeviceControl
处理消息,将消息转换成对应点位和值,然后调用modbustcp的写入方法
<summary> /// 设备控制,反写 /// </summary> /// <param name="topic"></param> /// <param name="msg"></param> private void DeviceControl(string topic, string msg) { var message = JsonSerializer.Deserialize<DeviceMessage>(msg); //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值 if (message != null) { var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象 if (link != null) { var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值 foreach (var item in message.Data) { var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象 if (point != null) { var parseMethod = point.Type.GetMethod( "Parse", BindingFlags.Public | BindingFlags.Static, new[] { typeof(string) } ); point.WriteValue = parseMethod.Invoke( null, new object[] { item.Value.ToString() } ); //通过点位id找到对应的点位对象 } modbusTcp.Write(point); } } } }
在ModbusTcp通信类中,添加一个写入队列和写入方法,写入点位先添加在队列中,然后再读数据间隙中,实现写入
public class ModbusTcp { /// <summary> /// 写入队列 /// </summary> private Queue<RegisterPoint> _writeQueue = new Queue<RegisterPoint>(); · //写入值先加入一个队列 public void Write(RegisterPoint point) { _writeQueue.Enqueue(point); } }
完整代码
public class DAqService { public static string MainTopic = "DTSDAQ/"; private MqttControllor MqttControllor; private Dictionary<string, ModbusTcp> _modbusTcps; private DAqOption _option; private List<DeviceLink> _deviceLinks; private ServiceConfig _serviceConfig; private System.Timers.Timer timer; public DAqService(DAqOption option) { _modbusTcps = new Dictionary<string, ModbusTcp>(); _option = option; _deviceLinks = option.DeviceLinks; _serviceConfig = option.ServiceConfig; timer = new System.Timers.Timer(_serviceConfig.PushTimeSpan * 1000); timer.Elapsed += Timer_Elapsed; } /// <summary> /// 启动服务 /// </summary> public void Start() { MqttControllor = new MqttControllor(_option.MqttConfig); foreach (var item in _deviceLinks) { ModbusTcp modbusTcp = new ModbusTcp(item); modbusTcp.DoMonitor(); modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated; _modbusTcps.Add(item.UID, modbusTcp); MqttControllor.SubscribeTopic($"{MainTopic}{item.UID}/write", DeviceControl); } if (_serviceConfig.IsPushScheduled) { timer.Start(); } } /// <summary> /// 设备控制,反写 /// </summary> /// <param name="topic"></param> /// <param name="msg"></param> private void DeviceControl(string topic, string msg) { var message = JsonSerializer.Deserialize<DeviceMessage>(msg); //如果消息不为null,则通过设备id找到对应的modbustcp对象,并写入值 if (message != null) { var link = _deviceLinks.FirstOrDefault(x => x.UID == message.DeviceId); //通过设备id找到对应的modbustcp对象 if (link != null) { var modbusTcp = _modbusTcps[link.UID]; //通过设备id找到对应的modbustcp对象 //循环消息中的数据对象,然后再设备link对象点位中找到对应的点位对象,并写入值 foreach (var item in message.Data) { var point = link.Points.FirstOrDefault(x => x.UID == item.Key); //通过点位id找到对应的点位对象 if (point != null) { var parseMethod = point.Type.GetMethod( "Parse", BindingFlags.Public | BindingFlags.Static, new[] { typeof(string) } ); point.WriteValue = parseMethod.Invoke( null, new object[] { item.Value.ToString() } ); //通过点位id找到对应的点位对象 } modbusTcp.Write(point); } } } } private void Timer_Elapsed(object? sender, ElapsedEventArgs e) { foreach (var link in _deviceLinks) { try { DeviceMessage device = new DeviceMessage { DeviceId = link.UID }; foreach (RegisterPoint point in link.Points) { // Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}"); device.Data.Add(point.UID, point.Value); } var data = JsonSerializer.Serialize(device); MqttControllor.Publish($"{MainTopic}{link.UID}/Time", data); //定时推送 } catch (Exception ex) { Console.WriteLine(ex.Message); } } } private void ModbusTcp_ValueUpdated(RegisterPoint point, object value) { if (_serviceConfig.IsPushChanged) { try { DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId }; device.Data.Add(point.UID, value); var data = JsonSerializer.Serialize(device); MqttControllor.Publish($"{MainTopic}{point.DeviceId}/Update", data); //采集立刻推送 } catch (Exception ex) { Console.WriteLine(ex.Message); } } Console.WriteLine($"Point:{point.UID}-->Value:{value}"); } }