目录
边缘层
随着智能化发展,越来越多的设备需要联网,海量的数据需要汇总处理。传统的云计算压力太大,所以越来越多的任务由云端转到边缘端进行处理。边缘设备是部署在网络边缘侧的高性能嵌入式设备,一般会使用高性能的处理器,搭配Linux或者安卓等智能操作系统来实现。通过网络联接、协议转换等功能联接物理和数字世界,提供轻量化的联接管理、实时数据分析及应用管理功能。比较常见的就是智能家居中智能音箱(蓝牙网关)+路由器(wifi网关),工厂里的工业网关等。它们通常扮演着一个区域内中心网关的功能,即负责终端设备的网络连接,也负责各终端数据的采集以及远程控制。同时,又提供数据上云的功能。
架构说明
- 多进程设计,遵循软件设计中的高内聚,低耦合。
- 数据上报进程负责上报规则更新,并根据上报规则将最新的数据上报给客户端。
- 客户端和上报进程使用mqtt进行通信,具体通信协议遵循json格式。
- 设备相关的采集进程使用共享内存和上报进程进行通信,采集进程获取到最新数据后,刷新到共享内存,上报进程从共享内存读取后上报即可。
- 客户端下行控制指令通过mqtt发送给上报进程后,上报进程通过消息队列方式发送给采集进程,采集进程再进行具体设备的控制。
- 每个进程包含多个线程,可根据具体功能进行再拆分。
包含知识点
两大网联网场景:消费物联网、工业物联网两大场景全覆盖
边缘网关新概念:物联网边缘网关中边缘采集、边缘计算两大主流技术
基础综合运用:C、shell、Makefile、C++、QT、单片机、数据库等基础知识大融合
Linux开发技术点:进程间通信、多线程程序设计、文件操作、网络编程、应用协议
物联网主流通信技术:Json、modbus、mqtt
各种调试工具学习:mqtt.fx、modbus slave、modbus poll、wireshark、网络调试、串口调试等
数据上报进程
功能描述
上报进程主要实现以下功能:
●建立mqtt服务器连接。
●根据上报策略,将采集到的数据上报给客户端。
●分析客户端的设备控制命令,并转发给其它模块。
●接收客户端上报模式修改命令,并写到配置文件永久生效。
●采集的数据点按照一定的时间周期来定时存储到数据库。
●根据上位机要求过滤设备的历史数据并发送给客户端。
上报进程是最先启动的进程,需要连接mqtt服务、建立数据库、解析点表,映射为共享内存的节点以供采集进程使用。主进程(main)完成上述动作后,即可启动不同的线程来处理不同的功能。
功能开发
上报线程
根据上报方式的不同,采用不同的分支策略。
- 刷新上报
不需要主动轮询,只需要等待客户端的查询指令后,轮询共享内存后上报即可。本质上属于控制指令的响应。
- 定时上报
根据点表配置的上报周期,按周期轮询共享内存后上报。
- 变化上报
不断轮询共享内存的节点,并比较新旧值是否相等,如果不相等,单独上报相应的数据点。变化上报需要先主动进行一次刷新上报(这里的主动指网关这边先把所有数据点上报一次,防止有些点不变化导致永远没有机会上报),全部数据点上报一次后,以此为基准进行后续比较、上报。
数据存储线程
按照一定的周期(不用太快,分钟级别即可),将共享内存中的数据点的值存储到数据库中。数据库表记录字段至少应该包括:时间戳、数据点key值、数据点值。存储后等待客户端获取指令即可。
指令处理线程
指令线程逻辑相对复杂,需要区分指令类型,然后根据指令要求做出相应的响应。
- 如果收到的是控制命令指令,需要区分控制指令发送的目标,这里从客户端拿到的只有设备的key值,可以用过共享内存中结构体的
dev_type
字段来确定是发给哪个设备,然后通过消息队列发送即可。 - 如果收到的是刷新上报指令,说明上报方式为刷新上报,此时用户通过客户端发起了刷新请求。只需要获取到共享内存中所有数据点并按照规定的协议,上报给客户端即可。
- 如果收到的是模式修改指令,代表客户需要修改当前的上报方式,需要解析指令并将新的模式写到配置文件中,等待下次重启生效,这里不需要做进程热重启,真实产品中会直接重启设备。
- 如果收到的是历史数据请求指令,那么根据客户端时间戳要求(如果有),过滤出数据库中相应数据点的历史数据,并将结果回传给客户端。
项目源码
代码仅供参考
上报模块.c代码:
(数据库是在外部用sql语句创建的)
CREATE TABLE histry( key INTEGER, val TEXT, time TEXT );
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include "MQTTClient.h" #include "cJSON.h" #include"msg_queue_peer.h" #include"shmem.h" #include <sqlite3.h> #include <time.h> void mqtt_send(char *p); #define CLIENTID "ExampleClientPub" #define QOS 1 #define TIMEOUT 10000L cJSON *allList; sqlite3 *db; char *errmsg; volatile MQTTClient_deliveryToken deliveredtoken; MQTTClient_deliveryToken token; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient client; char ADDRESS[32]; int PERIOD=0; union val_t { int b_val; //bool类型存储空间 int i_val; //整形值存储空间 float f_val; //浮点值存储空间 }; struct std_node { int key; //唯一键值 int type; //数据点类型 int dev_type; //数据点属于哪个设备,根据网关支持的设备自行定义 union val_t old_val; //变化上报后需要更新旧值 union val_t new_val; //从共享内存取出最新数据,放到new_val中 int ret; //默认为-1,采集成功后设置为0,采集失败再置-1 }; struct msg { long type; // 消息类型,用于区分32与modbus设备 int key; union val_t val; }; int *num =NULL; struct std_node *std = NULL; void delivered(void *context, MQTTClient_deliveryToken dt) { deliveredtoken = dt; } void sendAllData() { cJSON *root = cJSON_CreateObject(); cJSON_AddItemToObject(root, "type", cJSON_CreateNumber(1)); cJSON_AddItemToObject(root, "result", cJSON_CreateNumber(0)); cJSON *arr = cJSON_CreateArray(); cJSON_AddItemToObject(root, "data", arr); for(int i =0 ;i<*num;i++) { cJSON *data1 = cJSON_CreateObject(); cJSON_AddItemToArray(arr, data1); char buf[8]=""; if( (std+i)->type==1 || (std+i)->type==2 ) sprintf(buf,"%d",(std+i)->new_val.i_val); else sprintf(buf,"%.1f",(std+i)->new_val.f_val); cJSON_AddItemToObject(data1, "key", cJSON_CreateNumber((std+i)->key)); cJSON_AddItemToObject(data1, "val", cJSON_CreateString(buf)); } char *p = cJSON_Print(root); mqtt_send(p); printf("上报\n"); free(p); cJSON_Delete(root); } void setDev(cJSON *key,cJSON *val) { struct msg msg_set; msg_set.key=key->valueint; for(int i=0;i<*num;i++) { if((std+i)->key == key->valueint) { if((std+i)->dev_type == 0) { msg_set.type = 3; }else if((std+i)->dev_type==1) { msg_set.type =1; } char *ch = val->valuestring; msg_set.val.b_val=*ch-'0'; printf("key=%d\n",msg_set.key); printf("type=%ld\n",msg_set.type); printf("val=%d\n",msg_set.val.b_val); msg_queue_send("set_msg", &msg_set, sizeof(msg_set), 0); } } printf("setDev ok\n"); } void setAllList(int mod,int period) { printf("setAllList ok\n"); cJSON *Report = cJSON_GetObjectItem(allList, "report"); cJSON_ReplaceItemInObject(Report,"type",cJSON_CreateNumber(mod)); cJSON_ReplaceItemInObject(Report,"period",cJSON_CreateNumber(period)); char *p = cJSON_Print(allList); FILE * fp; fp = fopen("./node.json","w"); if(fp == NULL) { perror("fopen err"); return ; } fwrite(p, strlen(p), 1,fp); fclose(fp); } int key_his=0; pthread_t tid_his; void *getHistory(void *arg) { char sql[68] = ""; sprintf(sql,"SELECT * FROM histry WHERE key=%d;",key_his); int KEY = key_his; char **resultp; int n_row, n_cloum; int i, j; if (sqlite3_get_table(db, sql, &resultp, &n_row, &n_cloum, &errmsg) != SQLITE_OK) { printf("err:%s\n", errmsg); return NULL; } char **p; p = resultp + n_cloum; char data[64]=""; for (i = 0; i < n_row; i++) { sprintf(data,"key:%s val:%s time:%s",p[(i * n_cloum) + 0],p[(i * n_cloum) + 1],p[(i * n_cloum) + 2]); cJSON *root = cJSON_CreateObject(); cJSON_AddItemToObject(root, "type", cJSON_CreateNumber(4)); cJSON_AddItemToObject(root, "result", cJSON_CreateNumber(0)); cJSON_AddItemToObject(root, "key", cJSON_CreateNumber(KEY)); cJSON_AddItemToObject(root, "data", cJSON_CreateString(data)); char *p = cJSON_Print(root); mqtt_send(p); free(p); printf("找到一条历史记录\n"); } printf("历史记录:%d条\n",n_row); } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; payloadptr = message->payload; //** analysis command***// cJSON *root = cJSON_Parse(payloadptr); cJSON *type = cJSON_GetObjectItem(root, "type"); if(type->valueint==1) { sendAllData(); }else if(type->valueint == 2) { cJSON *data = cJSON_GetObjectItem(root, "data"); cJSON *key = cJSON_GetObjectItem(data, "key"); cJSON *val = cJSON_GetObjectItem(data, "val"); setDev(key,val); }else if(type->valueint == 3) { cJSON *data = cJSON_GetObjectItem(root, "data"); cJSON *type_mod = cJSON_GetObjectItem(data, "type"); cJSON *type_period = cJSON_GetObjectItem(data, "period"); setAllList(type_mod->valueint,type_period->valueint); }else if(type->valueint == 4) { cJSON *data = cJSON_GetObjectItem(root, "data"); cJSON *key = cJSON_GetObjectItem(data, "key"); // cJSON *limit = cJSON_GetObjectItem(data, "limit"); key_his=key->valueint; pthread_create(&tid_his, NULL, getHistory, NULL); // getHistory(key->valueint); } cJSON_Delete(root); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } void mqtt_send(char *p) { pubmsg.qos = QOS; pubmsg.retained = 0; pubmsg.payload = p; pubmsg.payloadlen = (int)strlen(p); MQTTClient_publishMessage(client, "/app/data/up", &pubmsg, &token); } void *up_period(void *arg) { while (1) { sleep(PERIOD); sendAllData(); } } void *up_vary(void *arg) { cJSON *root = cJSON_CreateObject(); cJSON_AddItemToObject(root, "type", cJSON_CreateNumber(1)); cJSON_AddItemToObject(root, "result", cJSON_CreateNumber(0)); cJSON *arr = cJSON_CreateArray(); cJSON_AddItemToObject(root, "data", arr); cJSON *data1 = cJSON_CreateObject(); cJSON_AddItemToArray(arr, data1); cJSON_AddItemToObject(data1, "key", cJSON_CreateNumber(0)); cJSON_AddItemToObject(data1, "val", cJSON_CreateString("")); while (1) { for(int i =0 ;i<*num;i++) { if(memcmp(&(std+i)->old_val,&(std+i)->new_val ,sizeof(union val_t)) != 0) { char buf[8]=""; if( (std+i)->type==1 || (std+i)->type==2 ) { sprintf(buf,"%d",(std+i)->new_val.i_val); printf("new:%d\n",(std+i)->new_val.i_val); printf("old:%d\n",(std+i)->old_val.i_val); } else { sprintf(buf,"%.1f",(std+i)->new_val.f_val); printf("new:%f\n",(std+i)->new_val.f_val); printf("old:%f\n",(std+i)->old_val.f_val); } cJSON_ReplaceItemInObject(data1,"key",cJSON_CreateNumber((std+i)->key)); cJSON_ReplaceItemInObject(data1,"val",cJSON_CreateString(buf)); char *p = cJSON_Print(root); mqtt_send(p); free(p); (std+i)->old_val.i_val = (std+i)->new_val.i_val; printf("数据有变化\n"); } } } cJSON_Delete(root); } int main(int argc, char* argv[]) { time_t raw_time; // ******get list**** FILE * fp; pthread_t tid; char buf[1024] = ""; fp = fopen("./node.json","r"); if(fp == NULL) { perror("fopen err"); return -1; } fread(buf,2048,1,fp); fclose(fp); //**** open&init db ***// sqlite3_open("Histry.db", &db); //******* shmem init *******// struct shm_param para; if(shm_init(¶,"SHMEM",512)<0) { perror("init err"); return 0; } num = shm_getaddr(¶); if(num==NULL) { perror("getaddr err"); return 0; } std = (struct std_node*)(num+1); allList = cJSON_Parse(buf); cJSON *Stm32 = cJSON_GetObjectItem(allList, "stm32"); cJSON *Data_32 = cJSON_GetObjectItem(Stm32, "data"); cJSON *Modbus = cJSON_GetObjectItem(allList, "modbus"); cJSON *Data_modbus = cJSON_GetObjectItem(Modbus, "data"); *num = cJSON_GetArraySize(Data_32) + cJSON_GetArraySize(Data_modbus); struct std_node *temp=std; for(int i = 0; i < cJSON_GetArraySize(Data_32); i++) { cJSON *item = cJSON_GetArrayItem(Data_32, i); cJSON *t = cJSON_GetObjectItem(item, "key"); temp->key = t->valueint; t = cJSON_GetObjectItem(item, "type"); temp->type = t->valueint; temp->dev_type = 0; temp->ret = -1; temp++; } for(int i = 0; i < cJSON_GetArraySize(Data_modbus); i++) { cJSON *item = cJSON_GetArrayItem(Data_modbus, i); cJSON *t = cJSON_GetObjectItem(item, "key"); temp->key = t->valueint; t = cJSON_GetObjectItem(item, "type"); temp->type = t->valueint; temp->dev_type = 1; temp->ret = -1; temp++; } temp = NULL; //***** connect mqtt *******// cJSON *mqtt_server = cJSON_GetObjectItem(allList, "mqtt_server"); cJSON *addr = cJSON_GetObjectItem(mqtt_server, "addr"); cJSON *port = cJSON_GetObjectItem(mqtt_server, "port"); sprintf(ADDRESS,"tcp://%s:%d",addr->valuestring,port->valueint); // MqttConnect(); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } MQTTClient_subscribe(client, "/app/data/down", QOS); // ******** UpMod******* cJSON *report = cJSON_GetObjectItem(allList, "report"); cJSON *up_mod = cJSON_GetObjectItem(report, "type"); if(up_mod->valueint == 0) // no { printf("当前上报模式:不上报\n"); }else if(up_mod->valueint == 1) // vary { printf("当前上报模式:变化上报\n"); pthread_create(&tid, NULL, up_vary, NULL); }else if(up_mod->valueint == 2) // period { printf("当前上报模式:周期上报\n"); cJSON *period = cJSON_GetObjectItem(report, "period"); PERIOD=period->valueint; pthread_create(&tid, NULL, up_period, NULL); } while (1) { sleep(60); time(&raw_time); struct tm *time_info = localtime(&raw_time); char * time = asctime(time_info); char sql[128]=""; for(int i = 0;i<*num;i++) { char val[8]=""; sprintf(val,"%d",(std+i)->new_val.i_val); sprintf(sql,"INSERT INTO histry VALUES (%d,'%s','%s');",(std+i)->key,val ,time); sqlite3_exec(db, sql, NULL, NULL, &errmsg); } } return 0; }
上报模块Makefile代码:
#指定生成的文件名 OJB_OUT = test #指定每一个c文件对应的.o文件 OBJS = cJSON.o UpData.o msg_queue_peer.o shmem.o #指定编译器 CC = gcc #指定需要的库 ULDFLAGS = -lpthread -lm -lpaho-mqtt3c -lsqlite3 ########################################### #以下的内容不需要修改 ########################################### all:$(OJB_OUT) $(OJB_OUT):$(OBJS) $(CC) -o $@ $^ $(ULDFLAGS) dep_files := $(foreach f,$(OBJS),.$(f).d) dep_files := $(wildcard $(dep_files)) ifneq ($(dep_files),) include $(dep_files) endif %.o:%.c $(CC) -Wp,-MD,.$@.d -c $< -o $@ clean: rm -rf .*.o.d *.o $(OJB_OUT)
STM32采集模块.c代码
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <sys/types.h> /* See NOTES */ #include <netinet/ip.h> /* superset of previous */ #include <string.h> #include <unistd.h> #include "cJSON.h" #include <stdlib.h> #include <errno.h> #include <strings.h> #include <pthread.h> #include <sys/select.h> #include <sys/time.h> #include <sys/stat.h> #include <fcntl.h> #include "shmem.h" #include "msg_queue_peer.h" #define N 1024 /***************************************共享内存***********************************/ union val_t { int b_val; //bool类型存储空间 int i_val; //整形值存储空间 float f_val; //浮点值存储空间 }; struct std_node { int key; //唯一键值 int type; //数据点类型 int dev_type; //数据点属于哪个设备,根据网关支持的设备自行定义 union val_t old_val; //变化上报后需要更新旧值 union val_t new_val; //从共享内存取出最新数据,放到new_val中 int ret; //默认为-1,采集成功后设置为0,采集失败再置-1 }; /***************************************消息队列***********************************/ struct msg //消息队列的消息类型 { long type; //类型 0:STM32 1:modbus int key; //key 值 union val_t val; //数据(共用体类型) }; struct shm_param para; //反序列化 struct std_node *st=NULL; int *num = NULL; void aliviar(char *buf) //解除序列化 { cJSON *root = cJSON_Parse(buf); cJSON *item = cJSON_GetObjectItem(root, "data"); int size = cJSON_GetArraySize(item); // printf("%d",size); cJSON *arr; for (int i = 0; i < size; i++) { arr = cJSON_GetArrayItem(item, i); cJSON *arr1 = cJSON_GetObjectItem(arr, "key"); cJSON *arr2 = cJSON_GetObjectItem(arr, "name"); cJSON *arr3 = cJSON_GetObjectItem(arr, "val"); printf("%s:%s\n", arr1->string, arr1->valuestring); //key int printf("%s:%s\n", arr2->string, arr2->valuestring); // name string if (strcmp(arr2->valuestring, "temp") == 0) { printf("%s:%.1f\n", arr3->string, arr3->valuedouble); } else { printf("%s:%d\n", arr3->string, arr3->valueint); //val double } int key = atoi(arr1->valuestring); /*存入共享内存 */ for (int i = 0; i < *num; i++) { if ((st + i)->key == key) { if (key == 202) { (st + i)->new_val.f_val = arr3->valuedouble; } else if (key == 201) { (st + i)->new_val.f_val = arr3->valuedouble; } else { (st + i)->new_val.i_val = arr3->valueint; } } } } cJSON_Delete(root); } int clientfd = 0; //线程执行从消息对列中读取数据并发送 void *handler_thread(void *arg) { struct msg msg1; while (1) { printf("111\n"); long type = 3; //用消息队列接收指令 msg_queue_recv("set_msg", &msg1, sizeof(struct msg), type, 0); printf("%d\n", msg1.key); printf("msg=%d\n", msg1.val.i_val); if (msg1.key == 203) { printf("%d\n", msg1.key); printf("msg=%d\n", msg1.val.i_val); cJSON *root = cJSON_CreateObject(); cJSON_AddItemToObject(root, "key", cJSON_CreateString("203")); cJSON_AddItemToObject(root, "val", cJSON_CreateNumber(msg1.val.i_val)); char *p = cJSON_PrintUnformatted(root); printf("p=%s\n", p); int res= send(clientfd, p, 128, 0); printf("%d\n",res); free(p); cJSON_Delete(root); } } pthread_exit(NULL); //退出线程 return NULL; } int main(int argc, char const *argv[]) { if (shm_init(¶, "SHMEM", 512) < 0) { perror("shm_init err\n"); return 0; } if ((num = shm_getaddr(¶)) == NULL) { perror("shm getaddr err"); return 0; } st = (struct std_node *)(++num); //创建流式套接 int serverfd = socket(AF_INET, SOCK_STREAM, 0); if (serverfd < 0) { perror("socket err"); return -1; } //绑定自己的地址 struct sockaddr_in myaddr; socklen_t addrlen = sizeof(myaddr); memset(&myaddr, 0, addrlen); myaddr.sin_family = AF_INET; myaddr.sin_port = htons(8888); myaddr.sin_addr.s_addr = INADDR_ANY; int ret = bind(serverfd, (struct sockaddr *)&myaddr, addrlen); if (ret < 0) { perror("bind err"); return -1; } //启动监听 ret = listen(serverfd, 5); if (ret < 0) { perror(";istenerr"); return -1; } //构建文件描述符 fd_set readfd, tmpreadfd; FD_ZERO(&readfd); FD_SET(serverfd, &readfd); int maxfd = serverfd; pthread_t tid; if (pthread_create(&tid, NULL, handler_thread, NULL) != 0) { printf("create thread err"); return -1; } pthread_detach(tid); while (1) //创建监听表监听单片机发送的数据并存入共享内存 { //临时监听表 tmpreadfd = readfd; select(maxfd + 1, &tmpreadfd, NULL, NULL, NULL); for (int i = 0; i < maxfd + 1; i++) { //判断此时描述符有数据吗 if (FD_ISSET(i, &tmpreadfd)) { //判断这个描述符是谁?服务器/客户端 if (i == serverfd) { clientfd = accept(serverfd, NULL, NULL); printf("new client fd=%d\n", clientfd); //把描述符加入表 FD_SET(clientfd, &readfd); if (clientfd > maxfd) { //更新maxfd maxfd = clientfd; } } else { char buf[N] = {}; bzero(buf, N); int len = read(i, buf, N); if (len < 0) { perror("read err"); break; } else if (len == 0) //如果客户端退出则退出监听表的描述符 { printf("client %dquit\n", i); FD_CLR(i, &readfd); close(i); break; } else //接受到的数据处理 { //打印读到的数据 printf("read from %d client=%s\n", i, buf); // printf("%s\n", buf); //反序列化//并发生给共享内存 aliviar(buf); printf("%s\n", buf); fflush(NULL); } } } } } close(serverfd); return 0; }
设备搜索响应模块Linux部分.c代码
#include <stdio.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <netinet/in.h> #include <netinet/ip.h> /* superset of previous */ #include <arpa/inet.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <string.h> #include <pthread.h> #include "cJSON.h" #define N 64 void *new_sock(void *arg) { printf("will recv new connect\n"); int sockfd, qtfilefd; char buf[1200]; int addrlen = sizeof(struct sockaddr); struct sockaddr_in addr, clientaddr; int nbytes; // 1创建一个套接字--socket sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket err"); exit(-1); } // 2定义套接字地址--sockaddr_in bzero(&addr, addrlen); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr("192.168.50.132"); addr.sin_port = htons(6666); //新的连接用6666端口 // 3绑定套接字--bind if (bind(sockfd, (struct sockaddr *)&addr, addrlen) < 0) { perror("bind err"); exit(-1); } // 4启动监听--listen if (listen(sockfd, 5) < 0) { perror("listen err"); exit(-1); } // 5接收连接--accept qtfilefd = accept(sockfd, (struct sockaddr *)&clientaddr, &addrlen); if (qtfilefd < 0) { perror("accept err"); exit(-1); } //第二个tcp接收到QT链接 printf("recv qt client\n"); // 6收发点表--recv/send bzero(buf, 1200); if (recv(qtfilefd, buf, 1200, 0) > 0) { printf("copy....\n"); //拷贝配置文件 int fd = open("/home/hq/project/port.json", O_WRONLY | O_CREAT | O_TRUNC, 0777); write(fd, buf, 1048); printf("copy success!\n"); } else { perror("copy err\n"); } pthread_exit(NULL); } int main(int argc, char const *argv[]) { int broadfd; //创建一个socket文件描述符 broadfd = socket(AF_INET, SOCK_DGRAM, 0); if (broadfd < 0) { perror("sock err"); return -1; } //绑定套接字(ip+port) struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(8888); //端口号 addr.sin_addr.s_addr = INADDR_ANY; int addrlen = sizeof(addr); if (bind(broadfd, (struct sockaddr *)&addr, addrlen) < 0) { perror("bind err"); return -1; } ssize_t len; char buf[N] = {0}; struct sockaddr_in cliaddr; //接收"coffee"搜索包 bzero(buf, N); len = recvfrom(broadfd, buf, N, 0, (struct sockaddr *)&cliaddr, &addrlen); //判断是否是本公司产品:收到的数据是否"group" if (strcmp(buf, "group") != 0) { printf("not my group\n"); return -1; } //回复yes,告诉软件,我收到了搜索协议,并且回复地址 sendto(broadfd, "yes", 4, 0, (struct sockaddr *)&cliaddr, addrlen); bzero(buf, N); //变身为TCP服务器,准备接收软件的升级文件 int tcpfd = socket(AF_INET, SOCK_STREAM, 0); if (tcpfd < 0) { perror("sock err"); return -1; } if (bind(tcpfd, (struct sockaddr *)&addr, addrlen) < 0) { perror("bind err"); return -1; } //监听套接字 if (listen(tcpfd, 5) < 0) { perror("listen err"); return -1; } //接收客户端的连接 printf("wait qt connect\n"); int clifd; //接收对端的地址 clifd = accept(tcpfd, NULL, NULL); if (clifd < 0) { perror("accept err"); return -1; } //接收到QT端链接 printf("qt cononect coming\n"); send(clifd, "yes", 3, 0); // bzero(buf, N); // recv(clifd, buf, 4, 0); // printf("%s\n", buf); while (1) { bzero(buf, N); int len = recv(clifd, buf, N, 0); cJSON *root = cJSON_Parse(buf); cJSON *item = cJSON_GetObjectItem(root, "type"); int type = item->valueint; if (type == 1) { printf("tcp链接成功\n"); pthread_t tid; pthread_create(&tid, NULL, new_sock, NULL); } else if (type == 3) { if (send(clifd, "living", 7, 0) < 0) { perror("send err\n"); } } } return 0; }
设备搜索响应模块Qt端代码.h
#ifndef M0WIG_H #define M0WIG_H #include <QWidget> #define UDPIP "192.168.50.255" #include <QDebug> #include <QMessageBox> //UDP库 #include <QUdpSocket> #include <QHostAddress> //TCP库 #include <QTcpSocket> //文本流 #include <QTextStream> #include <QFile> //定时类 #include <QTimer> //QJSON类 #include <QJsonDocument> #include <QJsonObject> namespace Ui { class M0Wig; } class M0Wig : public QWidget { Q_OBJECT public: explicit M0Wig(QWidget *parent = 0); ~M0Wig(); private: Ui::M0Wig *ui; QUdpSocket *udpSocket;//udp socket对象 QTcpSocket *socket;//TCP客户端对象 QTcpSocket *fliesocket;//tcp文件对象 QString tcpip;//服务器IP地址 QTimer *timer;//心跳包定时器 private slots: void btnSearchSlot(); void n1tcpReadSlot(); void btnLoginSlot(); void udpRecvSlot();//接收广播回复 void connectFileSlot(); void heartTimerSlot(); }; #endif // M0WIG_H
设备搜索响应模块Qt端代码.cpp
#include "m0wig.h" #include "ui_m0wig.h" //json对象 QJsonObject json; M0Wig::M0Wig(QWidget *parent) : QWidget(parent), ui(new Ui::M0Wig) { ui->setupUi(this); //创建udp对象 udpSocket = new QUdpSocket(this); //创建tcp客户端对象 socket = new QTcpSocket(this); fliesocket=new QTcpSocket(this); //发送广播槽函数 connect(ui->pushButtonSearch,SIGNAL(clicked()),this,SLOT(btnSearchSlot())); //接收广播回复 connect(udpSocket, SIGNAL(readyRead()), this,SLOT(udpRecvSlot())); //建立第一个TCP连接 connect(ui->pushButtonLogin,SIGNAL(clicked()),this,SLOT(btnLoginSlot())); //接收服务器数据 connect(socket,SIGNAL(readyRead()),this,SLOT(n1tcpReadSlot())); //第二个tcp发送文件 connect(fliesocket,SIGNAL(connected()),this,SLOT(connectFileSlot())); // 创建定时器,每隔1秒发送心跳包 timer = new QTimer(this); connect(timer, SIGNAL(timeout()), this,SLOT(heartTimerSlot())); timer->start(1000); // 1000毫秒为间隔 } M0Wig::~M0Wig() { delete ui; //关闭广播 udpSocket->close(); } //进行UDP广播 void M0Wig::btnSearchSlot() { //绑定广播地址 QHostAddress broadcastAddress(UDPIP); QByteArray datagram = "group"; // 你想要广播的消息 if (udpSocket->writeDatagram(datagram, datagram.size(), broadcastAddress, 8888)) { // 8888是端口号 // 消息成功发送 qDebug()<<"广播成功"; } else { // 消息发送失败 qDebug()<<"广播失败"; } } void M0Wig::n1tcpReadSlot() { QByteArray buffer = socket->readAll(); // QByteArray → QString QString text(buffer); if(text=="yes") { QMessageBox::information(this,"信息","tcp连接成功"); json["type"] = 1; // 创建JSON文档 QJsonDocument jsonDoc(json); // 将JSON文档转换为字节数组 QByteArray jsonData = jsonDoc.toJson(); // 发送JSON数据 socket->write(jsonData); // text="port"; // QTextStream output(socket); // output << text; //建立第二个TCP连接 fliesocket->connectToHost(tcpip,6666); } else if(text == "living") { qDebug()<<"process living"; } else qDebug()<<"tcp连接失败"; } //建立TCP连接 void M0Wig::btnLoginSlot() { //建立第一个TCP连接 socket->connectToHost(tcpip,8888); } void M0Wig::udpRecvSlot() { while (udpSocket->hasPendingDatagrams()) { QByteArray datagram; datagram.resize(udpSocket->pendingDatagramSize()); QHostAddress sender; quint16 senderPort; udpSocket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort); tcpip = sender.toString();//获取服务器地址 // 打印发送者地址和接收到的数据 qDebug() << "Received datagram from:" << sender.toString() << datagram.data(); ui->textBrowser->append(sender.toString()); udpSocket->close();//关闭广播 } } void M0Wig::connectFileSlot() { QFile readFile(":/new/prefix1/js.json"); readFile.open(QIODevice::ReadOnly); QByteArray file; qint64 totalSize = readFile.size(); qDebug()<<totalSize; file = readFile.read(totalSize); qDebug()<<file; fliesocket->write(file); } void M0Wig::heartTimerSlot() { // 添加其他键值对 json["type"] = 3; // 创建JSON文档 QJsonDocument jsonDoc(json); // 将JSON文档转换为字节数组 QByteArray jsonData = jsonDoc.toJson(); // 发送JSON数据 socket->write(jsonData); // 等待数据发送完毕 if (socket->waitForBytesWritten()) { qDebug() << "JSON data sent successfully1"; } else { qDebug() << "Failed to send JSON data"; } }
Modbus采集进程.c代码
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <modbus.h> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> #include <pthread.h> #include <sys/ipc.h> #include <sys/shm.h> #include <errno.h> #include "msg_queue_peer.h" #include "shmem.h" #define MODBUS_IP "192.168.50.249" #define MODBUS_PORT 502 union val_t { int b_val; //bool类型存储空间 int i_val; //整形值存储空间 float f_val; //浮点值存储空间 }; struct msg //消息队列的消息类型 { long type; //类型 3:STM32 1:modbus int key; //key 值 union val_t val; //数据(共用体类型) }; struct std_node { int key; //唯一键值 int type; //数据点类型 int dev_type; //数据点属于哪个设备,根据网关支持的设备自行定义 union val_t old_val; //变化上报后需要更新旧值 union val_t new_val; //从共享内存取出最新数据,放到new_val中 int ret; //默认为-1,采集成功后设置为0,采集失败再置-1 }; modbus_t *ctx; // modbus读取设备线程 void *modbus_thread(void *arg) { //初始化创建共享内存 struct shm_param para; if (shm_init(¶, "SHMEM", 512)) { perror("shm_init err\n"); } int *num = shm_getaddr(¶); //指针指向共享内存的地址->映射 if (NULL == num) { perror("shm getaddr err"); } struct std_node *std; std=(struct student*)(num+1); uint16_t buf[1024] = {}; uint16_t dataa[128] = {}; uint8_t data[128] = {}; while (1) //循环检测modbus salve里线圈和寄存器信息 { sleep(2); //联调后把延时和打印注释掉 //读保持寄存器 功能码03 //ctx :Modbus实例 //addr :寄存器起始地址 //nb :寄存器个数 //dest :得到的状态值 modbus_read_registers(ctx, 0, 10, dataa); // printf("湿度:%d\n光照强度:%d\nCO2浓度:%d\n", dataa[0], dataa[2], dataa[4]); //读线圈 功能码01 modbus_read_bits(ctx, 0, 10, data); //printf("水泵开关:%d 灯光开关:%d 排气扇:%d\n",data[0], data[1], data[2]); //printf("num=%d\n", *num); for (int i = 0; i < *num; i++) { if ((std + i)->key == 101) //土壤湿度 { (std+i)->new_val.i_val=dataa[0]; } if ((std + i)->key == 102) //光照强调 { (std + i)->new_val.i_val = dataa[2]; } if ((std + i)->key == 105) //CO2浓度 { (std + i)->new_val.i_val = dataa[4]; } if ((std + i)->key == 104) //开关水泵 { (std + i)->new_val.i_val = data[0]; } if ((std + i)->key == 103) //开关灯 { (std + i)->new_val.i_val = data[1]; } if ((std + i)->key == 106) //排气扇 { (std + i)->new_val.i_val = data[2]; } } for (int i = 0; i < *num; i++) { if ((std + i)->key == 101) //土壤湿度 { printf("土壤湿度:%d\n", (std + i)->new_val.i_val); } if ((std + i)->key == 102) //光照强调 { printf("光照强度:%d\n", (std + i)->new_val.i_val); } if ((std + i)->key == 105) //CO2浓度 { printf("CO2浓度:%d\n", (std + i)->new_val.i_val); } if ((std + i)->key == 104) //开关水泵 { printf("水泵状态:%d\n", (std + i)->new_val.i_val); } if ((std + i)->key == 103) //开关灯 { printf("灯状态:%d\n", (std + i)->new_val.i_val); } if ((std + i)->key == 106) //排气扇 { printf("排气扇状态:%d\n", (std + i)->new_val.i_val); } putchar(10); } memset(dataa, 0, sizeof(dataa)); memset(data, 0, sizeof(data)); } shm_del(¶); pthread_exit(NULL); } //modbus写入线圈 void *modbus_write(void *arg) { struct msg msg1; while (1) { printf("111\n"); //用消息队列接收指令 msg_queue_recv("set_msg",&msg1,sizeof(struct msg),1,0); printf("msg1.key=%d\n",msg1.key); // modbus_write_bit(ctx, 0, msg1->val.i_val); printf("msg1.val.i_val=%d\n",msg1.val.i_val); if (msg1.key==104) { modbus_write_bit(ctx,0,msg1.val.i_val); } if (msg1.key==103) { modbus_write_bit(ctx,1,msg1.val.i_val); } if (msg1.key==106) { modbus_write_bit(ctx,2,msg1.val.i_val); } } pthread_exit(NULL); //pause(); } int main(int argc, char const *argv[]) { ctx = modbus_new_tcp(MODBUS_IP, MODBUS_PORT); if (ctx == NULL) { perror("modbus create err"); return -1; } if (modbus_set_slave(ctx, 1) != 0) { perror("set id err"); return -1; } if (modbus_connect(ctx) != 0) { perror("connect err"); return -1; } pthread_t tid, tid1; pthread_create(&tid, NULL, modbus_thread, NULL); pthread_create(&tid1, NULL, modbus_write, NULL); pthread_detach(tid); pthread_detach(tid1); while (1); modbus_free(ctx); modbus_close(ctx); return 0; }