边缘计算网关项目(含上报进程、32&Modbus采集进程、设备搜索响应进程源码)

avatar
作者
猴君
阅读量:0

目录

边缘层

架构说明

包含知识点

数据上报进程

功能描述

功能开发

上报线程

数据存储线程

指令处理线程

 项目源码

 上报模块.c代码:

 上报模块Makefile代码:

 STM32采集模块.c代码

设备搜索响应模块Linux部分.c代码

 设备搜索响应模块Qt端代码.h

设备搜索响应模块Qt端代码.cpp

Modbus采集进程.c代码


边缘层

随着智能化发展,越来越多的设备需要联网,海量的数据需要汇总处理。传统的云计算压力太大,所以越来越多的任务由云端转到边缘端进行处理。边缘设备是部署在网络边缘侧的高性能嵌入式设备,一般会使用高性能的处理器,搭配Linux或者安卓等智能操作系统来实现。通过网络联接、协议转换等功能联接物理和数字世界,提供轻量化的联接管理、实时数据分析及应用管理功能。比较常见的就是智能家居中智能音箱(蓝牙网关)+路由器(wifi网关),工厂里的工业网关等。它们通常扮演着一个区域内中心网关的功能,即负责终端设备的网络连接,也负责各终端数据的采集以及远程控制。同时,又提供数据上云的功能。

架构说明

  • 多进程设计,遵循软件设计中的高内聚,低耦合。
  • 数据上报进程负责上报规则更新,并根据上报规则将最新的数据上报给客户端。
  • 客户端和上报进程使用mqtt进行通信,具体通信协议遵循json格式。
  • 设备相关的采集进程使用共享内存和上报进程进行通信,采集进程获取到最新数据后,刷新到共享内存,上报进程从共享内存读取后上报即可。
  • 客户端下行控制指令通过mqtt发送给上报进程后,上报进程通过消息队列方式发送给采集进程,采集进程再进行具体设备的控制。
  • 每个进程包含多个线程,可根据具体功能进行再拆分。

包含知识点

两大网联网场景:消费物联网、工业物联网两大场景全覆盖

边缘网关新概念:物联网边缘网关中边缘采集、边缘计算两大主流技术

基础综合运用:C、shell、Makefile、C++、QT、单片机、数据库等基础知识大融合

Linux开发技术点:进程间通信、多线程程序设计、文件操作、网络编程、应用协议

物联网主流通信技术:Json、modbus、mqtt

各种调试工具学习:mqtt.fx、modbus slave、modbus poll、wireshark、网络调试、串口调试等

数据上报进程

功能描述

上报进程主要实现以下功能:
●建立mqtt服务器连接。
●根据上报策略,将采集到的数据上报给客户端。
●分析客户端的设备控制命令,并转发给其它模块。
●接收客户端上报模式修改命令,并写到配置文件永久生效。
●采集的数据点按照一定的时间周期来定时存储到数据库。
●根据上位机要求过滤设备的历史数据并发送给客户端。
上报进程是最先启动的进程,需要连接mqtt服务、建立数据库、解析点表,映射为共享内存的节点以供采集进程使用。主进程(main)完成上述动作后,即可启动不同的线程来处理不同的功能。

功能开发

上报线程

根据上报方式的不同,采用不同的分支策略。

  • 刷新上报

不需要主动轮询,只需要等待客户端的查询指令后,轮询共享内存后上报即可。本质上属于控制指令的响应。

  • 定时上报

根据点表配置的上报周期,按周期轮询共享内存后上报。

  • 变化上报

不断轮询共享内存的节点,并比较新旧值是否相等,如果不相等,单独上报相应的数据点。变化上报需要先主动进行一次刷新上报(这里的主动指网关这边先把所有数据点上报一次,防止有些点不变化导致永远没有机会上报),全部数据点上报一次后,以此为基准进行后续比较、上报。

数据存储线程

按照一定的周期(不用太快,分钟级别即可),将共享内存中的数据点的值存储到数据库中。数据库表记录字段至少应该包括:时间戳、数据点key值、数据点值。存储后等待客户端获取指令即可。

指令处理线程

指令线程逻辑相对复杂,需要区分指令类型,然后根据指令要求做出相应的响应。

  • 如果收到的是控制命令指令,需要区分控制指令发送的目标,这里从客户端拿到的只有设备的key值,可以用过共享内存中结构体的dev_type字段来确定是发给哪个设备,然后通过消息队列发送即可。
  • 如果收到的是刷新上报指令,说明上报方式为刷新上报,此时用户通过客户端发起了刷新请求。只需要获取到共享内存中所有数据点并按照规定的协议,上报给客户端即可。
  • 如果收到的是模式修改指令,代表客户需要修改当前的上报方式,需要解析指令并将新的模式写到配置文件中,等待下次重启生效,这里不需要做进程热重启,真实产品中会直接重启设备。
  • 如果收到的是历史数据请求指令,那么根据客户端时间戳要求(如果有),过滤出数据库中相应数据点的历史数据,并将结果回传给客户端。

 项目源码


 代码仅供参考


 上报模块.c代码:

(数据库是在外部用sql语句创建的)

CREATE TABLE histry(    key INTEGER,    val TEXT,    time TEXT );
 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include "MQTTClient.h" #include "cJSON.h" #include"msg_queue_peer.h" #include"shmem.h" #include <sqlite3.h> #include <time.h>  void mqtt_send(char *p);  #define CLIENTID    "ExampleClientPub"  #define QOS         1 #define TIMEOUT     10000L cJSON *allList; sqlite3 *db; char *errmsg; volatile MQTTClient_deliveryToken deliveredtoken; MQTTClient_deliveryToken token; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient client; char ADDRESS[32]; int PERIOD=0;  union val_t {      int b_val;  //bool类型存储空间     int i_val;   //整形值存储空间     float f_val;  //浮点值存储空间 };  struct std_node { 	int key;  //唯一键值 	int type;  //数据点类型     int dev_type;  //数据点属于哪个设备,根据网关支持的设备自行定义 	union val_t old_val;  //变化上报后需要更新旧值     union val_t new_val;  //从共享内存取出最新数据,放到new_val中     int ret;  //默认为-1,采集成功后设置为0,采集失败再置-1 };  struct msg {     long type; // 消息类型,用于区分32与modbus设备     int key;     union val_t val; };  int *num =NULL; struct std_node *std = NULL;  void delivered(void *context, MQTTClient_deliveryToken dt) {     deliveredtoken = dt; }  void sendAllData() {     cJSON *root = cJSON_CreateObject();     cJSON_AddItemToObject(root, "type", cJSON_CreateNumber(1));     cJSON_AddItemToObject(root, "result", cJSON_CreateNumber(0));     cJSON *arr = cJSON_CreateArray();     cJSON_AddItemToObject(root, "data", arr);      for(int i =0 ;i<*num;i++)     {         cJSON *data1 = cJSON_CreateObject();         cJSON_AddItemToArray(arr, data1);          char buf[8]="";          if( (std+i)->type==1 || (std+i)->type==2 )             sprintf(buf,"%d",(std+i)->new_val.i_val);         else              sprintf(buf,"%.1f",(std+i)->new_val.f_val);          cJSON_AddItemToObject(data1, "key", cJSON_CreateNumber((std+i)->key));         cJSON_AddItemToObject(data1, "val", cJSON_CreateString(buf));     }     char *p = cJSON_Print(root);     mqtt_send(p);     printf("上报\n");     free(p);     cJSON_Delete(root); }   void setDev(cJSON *key,cJSON *val) {     struct msg msg_set;     msg_set.key=key->valueint;     for(int i=0;i<*num;i++)     {         if((std+i)->key == key->valueint)         {                          if((std+i)->dev_type == 0)             {                 msg_set.type = 3;             }else if((std+i)->dev_type==1)             {                 msg_set.type =1;             }             char *ch = val->valuestring;             msg_set.val.b_val=*ch-'0';             printf("key=%d\n",msg_set.key);             printf("type=%ld\n",msg_set.type);                 printf("val=%d\n",msg_set.val.b_val);            msg_queue_send("set_msg", &msg_set, sizeof(msg_set), 0);         }     }      printf("setDev ok\n");  } void setAllList(int mod,int period) {     printf("setAllList ok\n");     cJSON *Report = cJSON_GetObjectItem(allList, "report");     cJSON_ReplaceItemInObject(Report,"type",cJSON_CreateNumber(mod));     cJSON_ReplaceItemInObject(Report,"period",cJSON_CreateNumber(period));     char *p = cJSON_Print(allList);     FILE * fp;     fp = fopen("./node.json","w");     if(fp == NULL)     {         perror("fopen err");         return ;     }     fwrite(p, strlen(p), 1,fp);     fclose(fp);  }   int key_his=0; pthread_t tid_his; void *getHistory(void *arg)  {     char sql[68] = "";     sprintf(sql,"SELECT * FROM histry WHERE key=%d;",key_his);     int KEY = key_his;     char **resultp;     int n_row, n_cloum;     int i, j;     if (sqlite3_get_table(db, sql, &resultp, &n_row, &n_cloum, &errmsg) != SQLITE_OK)     {         printf("err:%s\n", errmsg);         return NULL;     }      char **p;     p = resultp + n_cloum;     char data[64]="";     for (i = 0; i < n_row; i++)     {         sprintf(data,"key:%s  val:%s time:%s",p[(i * n_cloum) + 0],p[(i * n_cloum) + 1],p[(i * n_cloum) + 2]);          cJSON *root = cJSON_CreateObject();         cJSON_AddItemToObject(root, "type", cJSON_CreateNumber(4));         cJSON_AddItemToObject(root, "result", cJSON_CreateNumber(0));         cJSON_AddItemToObject(root, "key", cJSON_CreateNumber(KEY));         cJSON_AddItemToObject(root, "data", cJSON_CreateString(data));         char *p = cJSON_Print(root);         mqtt_send(p);         free(p);         printf("找到一条历史记录\n");     }     printf("历史记录:%d条\n",n_row);  }  int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {     int i;     char* payloadptr;      payloadptr = message->payload;     //** analysis command***//     cJSON *root = cJSON_Parse(payloadptr);     cJSON *type = cJSON_GetObjectItem(root, "type");      if(type->valueint==1)     {         sendAllData();      }else if(type->valueint == 2)     {         cJSON *data = cJSON_GetObjectItem(root, "data");         cJSON *key = cJSON_GetObjectItem(data, "key");         cJSON *val = cJSON_GetObjectItem(data, "val");         setDev(key,val);      }else if(type->valueint == 3)     {         cJSON *data = cJSON_GetObjectItem(root, "data");         cJSON *type_mod = cJSON_GetObjectItem(data, "type");         cJSON *type_period = cJSON_GetObjectItem(data, "period");         setAllList(type_mod->valueint,type_period->valueint);     }else if(type->valueint == 4)     {         cJSON *data = cJSON_GetObjectItem(root, "data");         cJSON *key = cJSON_GetObjectItem(data, "key");         // cJSON *limit = cJSON_GetObjectItem(data, "limit");         key_his=key->valueint;         pthread_create(&tid_his, NULL, getHistory, NULL);         // getHistory(key->valueint);     }      cJSON_Delete(root);     MQTTClient_freeMessage(&message);     MQTTClient_free(topicName);      return 1; }  void connlost(void *context, char *cause) {     printf("\nConnection lost\n");     printf("     cause: %s\n", cause); }       void mqtt_send(char *p)     {         pubmsg.qos = QOS;         pubmsg.retained = 0;         pubmsg.payload = p;         pubmsg.payloadlen = (int)strlen(p);         MQTTClient_publishMessage(client, "/app/data/up", &pubmsg, &token);     }  void *up_period(void *arg)  {     while (1)     {         sleep(PERIOD);                  sendAllData();     } }   void *up_vary(void *arg) {     cJSON *root = cJSON_CreateObject();     cJSON_AddItemToObject(root, "type", cJSON_CreateNumber(1));     cJSON_AddItemToObject(root, "result", cJSON_CreateNumber(0));     cJSON *arr = cJSON_CreateArray();     cJSON_AddItemToObject(root, "data", arr);     cJSON *data1 = cJSON_CreateObject();     cJSON_AddItemToArray(arr, data1);     cJSON_AddItemToObject(data1, "key", cJSON_CreateNumber(0));     cJSON_AddItemToObject(data1, "val", cJSON_CreateString(""));      while (1)     {         for(int i =0 ;i<*num;i++)         {             if(memcmp(&(std+i)->old_val,&(std+i)->new_val ,sizeof(union val_t)) != 0)             {                 char buf[8]="";                 if( (std+i)->type==1 || (std+i)->type==2 )                 {                     sprintf(buf,"%d",(std+i)->new_val.i_val);                     printf("new:%d\n",(std+i)->new_val.i_val);                     printf("old:%d\n",(std+i)->old_val.i_val);                 }                 else                  {                     sprintf(buf,"%.1f",(std+i)->new_val.f_val);                 printf("new:%f\n",(std+i)->new_val.f_val);                 printf("old:%f\n",(std+i)->old_val.f_val);                 }                 cJSON_ReplaceItemInObject(data1,"key",cJSON_CreateNumber((std+i)->key));                 cJSON_ReplaceItemInObject(data1,"val",cJSON_CreateString(buf));                 char *p = cJSON_Print(root);                 mqtt_send(p);                 free(p);                 (std+i)->old_val.i_val = (std+i)->new_val.i_val;                 printf("数据有变化\n");             }         }     }     cJSON_Delete(root); }   int main(int argc, char* argv[]) {     time_t raw_time;      // ******get list****     FILE * fp;     pthread_t tid;     char buf[1024] = "";     fp = fopen("./node.json","r");     if(fp == NULL)     {         perror("fopen err");         return -1;     }     fread(buf,2048,1,fp);     fclose(fp);      //**** open&init db ***//          sqlite3_open("Histry.db", &db);       //******* shmem init *******//     struct shm_param para;     if(shm_init(&para,"SHMEM",512)<0)     {         perror("init err");         return 0;     }     num = shm_getaddr(&para);     if(num==NULL)     {         perror("getaddr err");         return 0;     }     std = (struct std_node*)(num+1);              allList = cJSON_Parse(buf);     cJSON *Stm32 = cJSON_GetObjectItem(allList, "stm32");     cJSON *Data_32 = cJSON_GetObjectItem(Stm32, "data");      cJSON *Modbus = cJSON_GetObjectItem(allList, "modbus");     cJSON *Data_modbus = cJSON_GetObjectItem(Modbus, "data");     *num = cJSON_GetArraySize(Data_32) + cJSON_GetArraySize(Data_modbus);     struct std_node *temp=std;     for(int i = 0; i < cJSON_GetArraySize(Data_32); i++)     {         cJSON *item = cJSON_GetArrayItem(Data_32, i);         cJSON *t = cJSON_GetObjectItem(item, "key");         temp->key = t->valueint;         t = cJSON_GetObjectItem(item, "type");         temp->type = t->valueint;         temp->dev_type = 0;         temp->ret = -1;         temp++;     }     for(int i = 0; i < cJSON_GetArraySize(Data_modbus); i++)     {         cJSON *item = cJSON_GetArrayItem(Data_modbus, i);         cJSON *t = cJSON_GetObjectItem(item, "key");         temp->key = t->valueint;         t = cJSON_GetObjectItem(item, "type");         temp->type = t->valueint;         temp->dev_type = 1;         temp->ret = -1;         temp++;     }     temp = NULL;   //***** connect mqtt *******//          cJSON *mqtt_server = cJSON_GetObjectItem(allList, "mqtt_server");     cJSON *addr = cJSON_GetObjectItem(mqtt_server, "addr");     cJSON *port = cJSON_GetObjectItem(mqtt_server, "port");     sprintf(ADDRESS,"tcp://%s:%d",addr->valuestring,port->valueint);     // MqttConnect();     MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;     int rc;      MQTTClient_create(&client, ADDRESS, CLIENTID,         MQTTCLIENT_PERSISTENCE_NONE, NULL);     conn_opts.keepAliveInterval = 20;     conn_opts.cleansession = 1;     MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);      if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)     {         printf("Failed to connect, return code %d\n", rc);         exit(EXIT_FAILURE);     }      MQTTClient_subscribe(client, "/app/data/down", QOS);        // ******** UpMod*******      cJSON *report = cJSON_GetObjectItem(allList, "report");     cJSON *up_mod = cJSON_GetObjectItem(report, "type");     if(up_mod->valueint == 0) // no     {         printf("当前上报模式:不上报\n");     }else if(up_mod->valueint == 1) // vary      {         printf("当前上报模式:变化上报\n");         pthread_create(&tid, NULL, up_vary, NULL);      }else if(up_mod->valueint == 2) // period     {         printf("当前上报模式:周期上报\n");         cJSON *period = cJSON_GetObjectItem(report, "period");         PERIOD=period->valueint;         pthread_create(&tid, NULL, up_period, NULL);     }              while (1)     {         sleep(60);         time(&raw_time);         struct tm *time_info = localtime(&raw_time);         char * time = asctime(time_info);         char sql[128]="";         for(int i = 0;i<*num;i++)         {             char val[8]="";             sprintf(val,"%d",(std+i)->new_val.i_val);             sprintf(sql,"INSERT INTO histry VALUES (%d,'%s','%s');",(std+i)->key,val ,time);             sqlite3_exec(db, sql, NULL, NULL, &errmsg);         }                  }          return 0; } 

 上报模块Makefile代码:

#指定生成的文件名 OJB_OUT = test  #指定每一个c文件对应的.o文件 OBJS = cJSON.o UpData.o msg_queue_peer.o shmem.o  #指定编译器 CC = gcc  #指定需要的库 ULDFLAGS = -lpthread -lm -lpaho-mqtt3c -lsqlite3  ########################################### #以下的内容不需要修改 ########################################### all:$(OJB_OUT)  $(OJB_OUT):$(OBJS) 	$(CC) -o $@ $^ $(ULDFLAGS)  dep_files := $(foreach f,$(OBJS),.$(f).d) dep_files := $(wildcard $(dep_files))  ifneq ($(dep_files),)   include $(dep_files) endif  %.o:%.c 	$(CC) -Wp,-MD,.$@.d -c $< -o $@      clean: 	rm -rf .*.o.d *.o $(OJB_OUT) 

 STM32采集模块.c代码

#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <sys/types.h>  /* See NOTES */ #include <netinet/ip.h> /* superset of previous */ #include <string.h> #include <unistd.h> #include "cJSON.h" #include <stdlib.h> #include <errno.h> #include <strings.h> #include <pthread.h> #include <sys/select.h> #include <sys/time.h> #include <sys/stat.h> #include <fcntl.h> #include "shmem.h" #include "msg_queue_peer.h"  #define N 1024 /***************************************共享内存***********************************/ union val_t {     int b_val;   //bool类型存储空间     int i_val;   //整形值存储空间     float f_val; //浮点值存储空间 };  struct std_node {     int key;             //唯一键值     int type;            //数据点类型     int dev_type;        //数据点属于哪个设备,根据网关支持的设备自行定义     union val_t old_val; //变化上报后需要更新旧值     union val_t new_val; //从共享内存取出最新数据,放到new_val中     int ret;             //默认为-1,采集成功后设置为0,采集失败再置-1 };  /***************************************消息队列***********************************/ struct msg //消息队列的消息类型 {     long type;       //类型 0:STM32  1:modbus     int key;         //key 值     union val_t val; //数据(共用体类型) };         struct shm_param para; //反序列化         struct std_node *st=NULL;         int *num = NULL;  void aliviar(char *buf) //解除序列化 {     cJSON *root = cJSON_Parse(buf);     cJSON *item = cJSON_GetObjectItem(root, "data");     int size = cJSON_GetArraySize(item);     // printf("%d",size);     cJSON *arr;      for (int i = 0; i < size; i++)     {         arr = cJSON_GetArrayItem(item, i);         cJSON *arr1 = cJSON_GetObjectItem(arr, "key");         cJSON *arr2 = cJSON_GetObjectItem(arr, "name");         cJSON *arr3 = cJSON_GetObjectItem(arr, "val");          printf("%s:%s\n", arr1->string, arr1->valuestring); //key   int         printf("%s:%s\n", arr2->string, arr2->valuestring); // name string          if (strcmp(arr2->valuestring, "temp") == 0)         {             printf("%s:%.1f\n", arr3->string, arr3->valuedouble);         }         else         {             printf("%s:%d\n", arr3->string, arr3->valueint); //val double         }          int key = atoi(arr1->valuestring);         /*存入共享内存 */          for (int i = 0; i < *num; i++)         {             if ((st + i)->key == key)             {                 if (key == 202)                 {                     (st + i)->new_val.f_val = arr3->valuedouble;                 }                  else if (key == 201)                 {                     (st + i)->new_val.f_val = arr3->valuedouble;                 }                 else                 {                     (st + i)->new_val.i_val = arr3->valueint;                 }             }         }      }      cJSON_Delete(root); } int clientfd = 0;  //线程执行从消息对列中读取数据并发送 void *handler_thread(void *arg) {     struct msg msg1;     while (1)     {         printf("111\n");         long type = 3;         //用消息队列接收指令         msg_queue_recv("set_msg", &msg1, sizeof(struct msg), type, 0);         printf("%d\n", msg1.key);         printf("msg=%d\n", msg1.val.i_val);         if (msg1.key == 203)         {                      printf("%d\n", msg1.key);         printf("msg=%d\n", msg1.val.i_val);             cJSON *root = cJSON_CreateObject();             cJSON_AddItemToObject(root, "key", cJSON_CreateString("203"));             cJSON_AddItemToObject(root, "val", cJSON_CreateNumber(msg1.val.i_val));              char *p = cJSON_PrintUnformatted(root);              printf("p=%s\n", p);             int res= send(clientfd, p, 128, 0);             printf("%d\n",res);                free(p);             cJSON_Delete(root);         }     }      pthread_exit(NULL); //退出线程      return NULL; }  int main(int argc, char const *argv[]) {     if (shm_init(&para, "SHMEM", 512) < 0)         {             perror("shm_init err\n");             return 0;         }         if ((num = shm_getaddr(&para)) == NULL)         {             perror("shm getaddr err");             return 0;         }         st = (struct std_node *)(++num);       //创建流式套接     int serverfd = socket(AF_INET, SOCK_STREAM, 0);     if (serverfd < 0)     {         perror("socket err");         return -1;     }     //绑定自己的地址     struct sockaddr_in myaddr;     socklen_t addrlen = sizeof(myaddr);     memset(&myaddr, 0, addrlen);     myaddr.sin_family = AF_INET;     myaddr.sin_port = htons(8888);     myaddr.sin_addr.s_addr = INADDR_ANY;     int ret = bind(serverfd, (struct sockaddr *)&myaddr, addrlen);     if (ret < 0)     {         perror("bind err");         return -1;     }      //启动监听     ret = listen(serverfd, 5);     if (ret < 0)     {         perror(";istenerr");         return -1;     }      //构建文件描述符     fd_set readfd, tmpreadfd;     FD_ZERO(&readfd);     FD_SET(serverfd, &readfd);      int maxfd = serverfd;      pthread_t tid;     if (pthread_create(&tid, NULL, handler_thread, NULL) != 0)     {         printf("create thread err");         return -1;     }      pthread_detach(tid);      while (1) //创建监听表监听单片机发送的数据并存入共享内存     {         //临时监听表         tmpreadfd = readfd;         select(maxfd + 1, &tmpreadfd, NULL, NULL, NULL);         for (int i = 0; i < maxfd + 1; i++)         {             //判断此时描述符有数据吗             if (FD_ISSET(i, &tmpreadfd))             {                 //判断这个描述符是谁?服务器/客户端                 if (i == serverfd)                 {                     clientfd = accept(serverfd, NULL, NULL);                     printf("new client fd=%d\n", clientfd);                     //把描述符加入表                     FD_SET(clientfd, &readfd);                     if (clientfd > maxfd)                     {                         //更新maxfd                         maxfd = clientfd;                     }                 }                 else                 {                     char buf[N] = {};                     bzero(buf, N);                     int len = read(i, buf, N);                     if (len < 0)                     {                         perror("read err");                         break;                     }                     else if (len == 0) //如果客户端退出则退出监听表的描述符                     {                         printf("client %dquit\n", i);                         FD_CLR(i, &readfd);                         close(i);                         break;                     }                     else //接受到的数据处理                     {                         //打印读到的数据                         printf("read from %d client=%s\n", i, buf);                         //  printf("%s\n", buf);                          //反序列化//并发生给共享内存                         aliviar(buf);                          printf("%s\n", buf);                          fflush(NULL);                     }                 }             }         }     }     close(serverfd);     return 0; }

设备搜索响应模块Linux部分.c代码

#include <stdio.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <netinet/in.h> #include <netinet/ip.h> /* superset of previous */ #include <arpa/inet.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <string.h> #include <pthread.h> #include "cJSON.h"  #define N 64 void *new_sock(void *arg) {     printf("will recv new connect\n");     int sockfd, qtfilefd;     char buf[1200];     int addrlen = sizeof(struct sockaddr);     struct sockaddr_in addr, clientaddr;     int nbytes;     // 1创建一个套接字--socket     sockfd = socket(AF_INET, SOCK_STREAM, 0);     if (sockfd < 0)     {         perror("socket err");         exit(-1);     }      // 2定义套接字地址--sockaddr_in     bzero(&addr, addrlen);     addr.sin_family = AF_INET;     addr.sin_addr.s_addr = inet_addr("192.168.50.132");     addr.sin_port = htons(6666); //新的连接用6666端口      // 3绑定套接字--bind     if (bind(sockfd, (struct sockaddr *)&addr, addrlen) < 0)     {         perror("bind err");         exit(-1);     }      // 4启动监听--listen     if (listen(sockfd, 5) < 0)     {         perror("listen err");         exit(-1);     }      // 5接收连接--accept     qtfilefd = accept(sockfd, (struct sockaddr *)&clientaddr, &addrlen);     if (qtfilefd < 0)     {         perror("accept err");         exit(-1);     }     //第二个tcp接收到QT链接     printf("recv qt client\n");      // 6收发点表--recv/send     bzero(buf, 1200);     if (recv(qtfilefd, buf, 1200, 0) > 0)     {         printf("copy....\n");         //拷贝配置文件         int fd = open("/home/hq/project/port.json", O_WRONLY | O_CREAT | O_TRUNC, 0777);         write(fd, buf, 1048);          printf("copy success!\n");     }     else     {         perror("copy err\n");     }     pthread_exit(NULL); } int main(int argc, char const *argv[]) {     int broadfd;      //创建一个socket文件描述符     broadfd = socket(AF_INET, SOCK_DGRAM, 0);     if (broadfd < 0)     {         perror("sock err");         return -1;     }     //绑定套接字(ip+port)     struct sockaddr_in addr;     addr.sin_family = AF_INET;     addr.sin_port = htons(8888); //端口号     addr.sin_addr.s_addr = INADDR_ANY;      int addrlen = sizeof(addr);      if (bind(broadfd, (struct sockaddr *)&addr, addrlen) < 0)     {         perror("bind err");         return -1;     }      ssize_t len;     char buf[N] = {0};     struct sockaddr_in cliaddr;      //接收"coffee"搜索包     bzero(buf, N);     len = recvfrom(broadfd, buf, N, 0, (struct sockaddr *)&cliaddr, &addrlen);      //判断是否是本公司产品:收到的数据是否"group"     if (strcmp(buf, "group") != 0)     {         printf("not my group\n");         return -1;     }      //回复yes,告诉软件,我收到了搜索协议,并且回复地址     sendto(broadfd, "yes", 4, 0, (struct sockaddr *)&cliaddr, addrlen);     bzero(buf, N);      //变身为TCP服务器,准备接收软件的升级文件     int tcpfd = socket(AF_INET, SOCK_STREAM, 0);     if (tcpfd < 0)     {         perror("sock err");         return -1;     }      if (bind(tcpfd, (struct sockaddr *)&addr, addrlen) < 0)     {         perror("bind err");         return -1;     }      //监听套接字     if (listen(tcpfd, 5) < 0)     {         perror("listen err");         return -1;     }      //接收客户端的连接     printf("wait qt connect\n");     int clifd;      //接收对端的地址     clifd = accept(tcpfd, NULL, NULL);     if (clifd < 0)     {         perror("accept err");         return -1;     }      //接收到QT端链接     printf("qt cononect coming\n");      send(clifd, "yes", 3, 0);     // bzero(buf, N);     // recv(clifd, buf, 4, 0);     // printf("%s\n", buf);     while (1)     {         bzero(buf, N);         int len = recv(clifd, buf, N, 0);         cJSON *root = cJSON_Parse(buf);         cJSON *item = cJSON_GetObjectItem(root, "type");         int type = item->valueint;         if (type == 1)         {             printf("tcp链接成功\n");             pthread_t tid;             pthread_create(&tid, NULL, new_sock, NULL);         }         else if (type == 3)         {             if (send(clifd, "living", 7, 0) < 0)             {                 perror("send err\n");             }         }     }      return 0; } 

 设备搜索响应模块Qt端代码.h

#ifndef M0WIG_H #define M0WIG_H   #include <QWidget> #define UDPIP "192.168.50.255" #include <QDebug> #include <QMessageBox> //UDP库 #include <QUdpSocket> #include <QHostAddress> //TCP库 #include <QTcpSocket> //文本流 #include <QTextStream> #include <QFile> //定时类 #include <QTimer> //QJSON类 #include <QJsonDocument> #include <QJsonObject>  namespace Ui { class M0Wig; }  class M0Wig : public QWidget {     Q_OBJECT  public:     explicit M0Wig(QWidget *parent = 0);     ~M0Wig();  private:     Ui::M0Wig *ui;      QUdpSocket *udpSocket;//udp socket对象     QTcpSocket *socket;//TCP客户端对象     QTcpSocket *fliesocket;//tcp文件对象     QString tcpip;//服务器IP地址     QTimer *timer;//心跳包定时器  private slots:     void btnSearchSlot();     void n1tcpReadSlot();     void btnLoginSlot();     void udpRecvSlot();//接收广播回复     void connectFileSlot();     void heartTimerSlot(); };  #endif // M0WIG_H 

设备搜索响应模块Qt端代码.cpp

#include "m0wig.h" #include "ui_m0wig.h"  //json对象 QJsonObject json;  M0Wig::M0Wig(QWidget *parent) :     QWidget(parent),     ui(new Ui::M0Wig) {     ui->setupUi(this);      //创建udp对象     udpSocket = new QUdpSocket(this);      //创建tcp客户端对象     socket = new QTcpSocket(this);     fliesocket=new QTcpSocket(this);     //发送广播槽函数     connect(ui->pushButtonSearch,SIGNAL(clicked()),this,SLOT(btnSearchSlot()));     //接收广播回复     connect(udpSocket, SIGNAL(readyRead()), this,SLOT(udpRecvSlot()));     //建立第一个TCP连接     connect(ui->pushButtonLogin,SIGNAL(clicked()),this,SLOT(btnLoginSlot()));     //接收服务器数据     connect(socket,SIGNAL(readyRead()),this,SLOT(n1tcpReadSlot()));     //第二个tcp发送文件     connect(fliesocket,SIGNAL(connected()),this,SLOT(connectFileSlot()));     // 创建定时器,每隔1秒发送心跳包     timer = new QTimer(this);     connect(timer, SIGNAL(timeout()), this,SLOT(heartTimerSlot()));     timer->start(1000); // 1000毫秒为间隔 }  M0Wig::~M0Wig() {     delete ui;     //关闭广播     udpSocket->close(); } //进行UDP广播 void M0Wig::btnSearchSlot() {     //绑定广播地址     QHostAddress broadcastAddress(UDPIP);      QByteArray datagram = "group"; // 你想要广播的消息      if (udpSocket->writeDatagram(datagram, datagram.size(), broadcastAddress, 8888)) { // 8888是端口号         // 消息成功发送         qDebug()<<"广播成功";     } else {         // 消息发送失败         qDebug()<<"广播失败";     } }  void M0Wig::n1tcpReadSlot() {     QByteArray buffer = socket->readAll();     // QByteArray → QString     QString text(buffer);     if(text=="yes")     {         QMessageBox::information(this,"信息","tcp连接成功");         json["type"] = 1;         // 创建JSON文档         QJsonDocument jsonDoc(json);         // 将JSON文档转换为字节数组         QByteArray jsonData = jsonDoc.toJson();         // 发送JSON数据         socket->write(jsonData); //        text="port"; //        QTextStream output(socket); //        output << text;         //建立第二个TCP连接         fliesocket->connectToHost(tcpip,6666);     }     else if(text == "living")     {         qDebug()<<"process living";     }     else         qDebug()<<"tcp连接失败";   } //建立TCP连接 void M0Wig::btnLoginSlot() {     //建立第一个TCP连接     socket->connectToHost(tcpip,8888); }  void M0Wig::udpRecvSlot() {     while (udpSocket->hasPendingDatagrams()) {         QByteArray datagram;         datagram.resize(udpSocket->pendingDatagramSize());         QHostAddress sender;         quint16 senderPort;         udpSocket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort);         tcpip = sender.toString();//获取服务器地址         // 打印发送者地址和接收到的数据         qDebug() << "Received datagram from:" << sender.toString() << datagram.data();         ui->textBrowser->append(sender.toString());         udpSocket->close();//关闭广播     } }  void M0Wig::connectFileSlot() {     QFile readFile(":/new/prefix1/js.json");     readFile.open(QIODevice::ReadOnly);     QByteArray file;     qint64 totalSize = readFile.size();     qDebug()<<totalSize;     file = readFile.read(totalSize);     qDebug()<<file;     fliesocket->write(file); }  void M0Wig::heartTimerSlot() {         // 添加其他键值对         json["type"] = 3;         // 创建JSON文档         QJsonDocument jsonDoc(json);          // 将JSON文档转换为字节数组         QByteArray jsonData = jsonDoc.toJson();          // 发送JSON数据         socket->write(jsonData);          // 等待数据发送完毕         if (socket->waitForBytesWritten()) {              qDebug() << "JSON data sent successfully1";         } else {             qDebug() << "Failed to send JSON data";         } } 

Modbus采集进程.c代码

#include <stdio.h> #include <string.h> #include <stdlib.h> #include <modbus.h> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> #include <pthread.h> #include <sys/ipc.h> #include <sys/shm.h> #include <errno.h> #include "msg_queue_peer.h" #include "shmem.h"  #define MODBUS_IP "192.168.50.249" #define MODBUS_PORT 502  union val_t {     int b_val;   //bool类型存储空间     int i_val;   //整形值存储空间     float f_val; //浮点值存储空间 }; struct msg //消息队列的消息类型 {     long type;       //类型 3:STM32  1:modbus     int key;         //key 值     union val_t val; //数据(共用体类型) }; struct std_node {     int key;             //唯一键值     int type;            //数据点类型     int dev_type;        //数据点属于哪个设备,根据网关支持的设备自行定义     union val_t old_val; //变化上报后需要更新旧值     union val_t new_val; //从共享内存取出最新数据,放到new_val中     int ret;             //默认为-1,采集成功后设置为0,采集失败再置-1 }; modbus_t *ctx; // modbus读取设备线程 void *modbus_thread(void *arg) {          //初始化创建共享内存     struct shm_param para;     if (shm_init(&para, "SHMEM", 512))     {         perror("shm_init err\n");     }     int *num = shm_getaddr(&para); //指针指向共享内存的地址->映射     if (NULL == num)     {         perror("shm getaddr err");     }     struct std_node *std;     std=(struct student*)(num+1);     uint16_t buf[1024] = {};     uint16_t dataa[128] = {};     uint8_t data[128] = {};     while (1) //循环检测modbus salve里线圈和寄存器信息     {         sleep(2); //联调后把延时和打印注释掉         //读保持寄存器 功能码03         //ctx   :Modbus实例         //addr :寄存器起始地址         //nb   :寄存器个数         //dest :得到的状态值         modbus_read_registers(ctx, 0, 10, dataa);         // printf("湿度:%d\n光照强度:%d\nCO2浓度:%d\n", dataa[0], dataa[2], dataa[4]);         //读线圈 功能码01         modbus_read_bits(ctx, 0, 10, data);         //printf("水泵开关:%d 灯光开关:%d 排气扇:%d\n",data[0], data[1], data[2]);         //printf("num=%d\n", *num);         for (int i = 0; i < *num; i++)         {             if ((std + i)->key == 101) //土壤湿度             {                 (std+i)->new_val.i_val=dataa[0];             }             if ((std + i)->key == 102) //光照强调             {                 (std + i)->new_val.i_val = dataa[2];             }             if ((std + i)->key == 105) //CO2浓度             {                 (std + i)->new_val.i_val = dataa[4];             }                          if ((std + i)->key == 104) //开关水泵             {                 (std + i)->new_val.i_val = data[0];             }             if ((std + i)->key == 103) //开关灯             {                 (std + i)->new_val.i_val = data[1];             }             if ((std + i)->key == 106) //排气扇             {                 (std + i)->new_val.i_val = data[2];             }         }         for (int i = 0; i < *num; i++)         {                          if ((std + i)->key == 101) //土壤湿度             {                 printf("土壤湿度:%d\n", (std + i)->new_val.i_val);             }             if ((std + i)->key == 102) //光照强调             {                 printf("光照强度:%d\n", (std + i)->new_val.i_val);             }             if ((std + i)->key == 105) //CO2浓度             {                 printf("CO2浓度:%d\n", (std + i)->new_val.i_val);             }               if ((std + i)->key == 104) //开关水泵             {                 printf("水泵状态:%d\n", (std + i)->new_val.i_val);             }             if ((std + i)->key == 103) //开关灯             {                 printf("灯状态:%d\n", (std + i)->new_val.i_val);             }             if ((std + i)->key == 106) //排气扇             {                 printf("排气扇状态:%d\n", (std + i)->new_val.i_val);             }             putchar(10);         }         memset(dataa, 0, sizeof(dataa));         memset(data, 0, sizeof(data));     }     shm_del(&para);     pthread_exit(NULL); } //modbus写入线圈 void *modbus_write(void *arg) {     struct msg msg1;     while (1)     {         printf("111\n");         //用消息队列接收指令         msg_queue_recv("set_msg",&msg1,sizeof(struct msg),1,0);         printf("msg1.key=%d\n",msg1.key);         // modbus_write_bit(ctx, 0, msg1->val.i_val);         printf("msg1.val.i_val=%d\n",msg1.val.i_val);         if (msg1.key==104)         {             modbus_write_bit(ctx,0,msg1.val.i_val);         }         if (msg1.key==103)         {             modbus_write_bit(ctx,1,msg1.val.i_val);         }         if (msg1.key==106)         {             modbus_write_bit(ctx,2,msg1.val.i_val);          }        }     pthread_exit(NULL);     //pause(); } int main(int argc, char const *argv[]) {     ctx = modbus_new_tcp(MODBUS_IP, MODBUS_PORT);     if (ctx == NULL)     {         perror("modbus create err");         return -1;     }     if (modbus_set_slave(ctx, 1) != 0)     {         perror("set id err");         return -1;     }     if (modbus_connect(ctx) != 0)     {         perror("connect err");         return -1;     }     pthread_t tid, tid1;     pthread_create(&tid, NULL, modbus_thread, NULL);     pthread_create(&tid1, NULL, modbus_write, NULL);     pthread_detach(tid);     pthread_detach(tid1);     while (1);     modbus_free(ctx);     modbus_close(ctx);     return 0; } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!