C/C+++服务器之libuv的使用实战

avatar
作者
猴君
阅读量:1

libuv

libuv简介

1: 开源跨平台的异步IO库, 主要功能有网络异步,文件异步等。
2: libuv主页: http://libuv.org/
3: libuv是node.js的底层库;
4: libuv的事件循环模型:
epoll, kqueue, IOCP, event ports;
异步 TCP 与 UDP sockets;
DNS 解析
异步文件读写;
信号处理;
高性能定时器;
进程/线程池;

libuv原理

1:异步: 在用户层同时管理多个句柄请求。
2: loop循环等待所有的事件和句柄,管理好所有的这些请求。
3: 当其中一个请求完成后,loop就会监测得到然后调用用户指定的回掉函数处理;
4: 例如loop监听所有的socket,有数据来了后,loop就会处理,然后转到用户指定的回调函数。
5: libuv编写思想:
1> 创建一个对象, 例如socket;
2> 给loop管理这个对象;
3> 并指定一个回调函数,当有事件发生的时候调用这个回调函数, callback;

6: 1>向loop发送请求;
2>指定结束后的回调函数;
3>当请求结束后,调用调函数;

TCP服务器搭建

首先需要下载libuv库,导入到工程中,设置好include的路径和链接上对应的库
代码如下:

#include <stdio.h> #include <string.h> #include <stdlib.h>  #include "uv.h"  /* uv_handle_s 数据结构: UV_HANDLE_FIELDS   uv_stream_t  数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS  uv_tcp_t 数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_TCP_PRIVATE_FIELDS  uv_tcp_t is uv_stream_t is uv_handle_t; */  static uv_loop_t* loop = NULL; static uv_tcp_t l_server; // 监听句柄;  // 当我们的event loop检车到handle上有数据可以读的时候, // 就会调用这个函数, 让这个函数给event loop准备好读入数据的内存; // event loop知道有多少数据,suggested_size, // handle: 发生读时间的handle; // suggested_size: 建议我们分配多大的内存来保存这个数据; // uv_buf_t: 我们准备好的内存,通过uv_buf_t,告诉even loop; static void  uv_alloc_buf(uv_handle_t* handle,                   size_t suggested_size, 				  uv_buf_t* buf) {  	if (handle->data != NULL) { 		free(handle->data); 		handle->data = NULL; 	}  	buf->base = malloc(suggested_size + 1); 	buf->len = suggested_size; 	handle->data = buf->base; }  static void on_close(uv_handle_t* handle) { 	printf("close client\n"); 	if (handle->data) { 		free(handle->data); 		handle->data = NULL; 	} }  static void  on_shutdown(uv_shutdown_t* req, int status) { 	uv_close((uv_handle_t*)req->handle, on_close); 	free(req); }  static void  after_write(uv_write_t* req, int status) { 	if (status == 0) { 		printf("write success\n"); 	} 	uv_buf_t* w_buf = req->data; 	if (w_buf) { 		free(w_buf); 	}  	free(req); }  // 参数:  // uv_stream_t* handle --> uv_tcp_t; // nread: 读到了多少字节的数据; // uv_buf_t: 我们的数据都读到到了哪个buf里面, base; static void after_read(uv_stream_t* stream,                        ssize_t nread, 					   const uv_buf_t* buf) { 	// 连接断开了; 	if (nread < 0) { 		uv_shutdown_t* reg = malloc(sizeof(uv_shutdown_t)); 		memset(reg, 0, sizeof(uv_shutdown_t)); 		uv_shutdown(reg, stream, on_shutdown); 		return; 	} 	// end  	buf->base[nread] = 0; 	printf("recv %d\n", nread); 	printf("%s\n", buf->base);  	// 测试发送给我们的 客户端; 	uv_write_t* w_req = malloc(sizeof(uv_write_t)); 	uv_buf_t* w_buf = malloc(sizeof(uv_buf_t)); 	w_buf->base = buf->base; 	w_buf->len = nread; 	w_req->data = w_buf; 	uv_write(w_req, stream, w_buf, 1, after_write); }  static void uv_connection(uv_stream_t* server, int status) { 	printf("new client comming\n"); 	// 接入客户端; 	uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); 	memset(client, 0, sizeof(uv_tcp_t)); 	uv_tcp_init(loop, client); 	uv_accept(server, (uv_stream_t*)client); 	// end  	// 告诉event loop,让他帮你管理哪个事件; 	uv_read_start((uv_stream_t*)client, uv_alloc_buf, after_read); }  int main(int argc, char** argv) { 	int ret; 	loop = uv_default_loop(); 	// Tcp 监听服务; 	uv_tcp_init(loop, &l_server); // 将l_server监听句柄加入到event loop里面; 	// 你需要event loop来给你做那种管理呢?配置你要的管理类型; 	struct sockaddr_in addr; 	uv_ip4_addr("0.0.0.0", 6080, &addr); // ip地址, 端口 	ret = uv_tcp_bind(&l_server, (const struct sockaddr*) &addr, 0); 	if (ret != 0) { 		goto failed; 	} 	// 让event loop来做监听管理,当我们的l_server句柄上有人连接的时候; 	// event loop就会调用用户指定的这个处理函数uv_connection; 	uv_listen((uv_stream_t*)&l_server, SOMAXCONN, uv_connection);  	uv_run(loop, UV_RUN_DEFAULT);  failed: 	printf("end\n"); 	system("pause"); 	return 0; }    

UDP服务器搭建

客户端
#include <stdio.h> #include <string.h> #include <stdlib.h>  #include <WinSock2.h> #pragma comment(lib, "ws2_32.lib")  int main(int argc, char** argv) { 	WSADATA ws; 	WSAStartup(MAKEWORD(2, 2), &ws);  	SOCKET client = socket(AF_INET, SOCK_DGRAM, 0); 	// 可以bind也可以不绑,如果不要求别人先发给你可以不bind; 	// end  	SOCKADDR_IN addr; 	addr.sin_family = AF_INET; 	addr.sin_port = htons(6080); 	addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); 	int len = sizeof(SOCKADDR_IN);  	int send_len = sendto(client, "Hello", 5, 0, (const SOCKADDR*)&addr, len); 	printf("send_len = %d\n", send_len);   	char buf[128]; 	SOCKADDR_IN sender_addr; // 收到谁发的数据包的地址; 	int recv_len = recvfrom(client, buf, 128, 0, &sender_addr, &len); 	if (recv_len > 0) { 		buf[recv_len] = 0; // 加上结尾符号; 		printf("%s\n", buf); 	} 	  	WSACleanup(); 	system("pause"); 	return 0; }  
不用libuv版本的服务器
#include <stdio.h> #include <string.h> #include <stdlib.h>  #include <WinSock2.h> #pragma comment(lib, "ws2_32.lib")  int main(int argc, char** argv) { 	WSADATA ws; 	WSAStartup(MAKEWORD(2, 2), &ws);  	SOCKET client = socket(AF_INET, SOCK_DGRAM, 0); 	// 可以bind也可以不绑,如果不要求别人先发给你可以不bind; 	// end  	SOCKADDR_IN addr; 	addr.sin_family = AF_INET; 	addr.sin_port = htons(6080); 	addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); 	int len = sizeof(SOCKADDR_IN);  	int send_len = sendto(client, "Hello", 5, 0, (const SOCKADDR*)&addr, len); 	printf("send_len = %d\n", send_len);   	char buf[128]; 	SOCKADDR_IN sender_addr; // 收到谁发的数据包的地址; 	int recv_len = recvfrom(client, buf, 128, 0, &sender_addr, &len); 	if (recv_len > 0) { 		buf[recv_len] = 0; // 加上结尾符号; 		printf("%s\n", buf); 	} 	  	WSACleanup(); 	system("pause"); 	return 0; } 
libuv的UDP服务器
#include <stdio.h> #include <string.h> #include <stdlib.h>  #include "uv.h"  static uv_loop_t* event_loop = NULL; static uv_udp_t server; // UDP的句柄;  static void  uv_alloc_buf(uv_handle_t* handle,              size_t suggested_size, 			 uv_buf_t* buf) { 	if (handle->data != NULL) { 		free(handle->data); 		handle->data = NULL; 	}  	handle->data = malloc(suggested_size + 1); // +1测试的时候,我要收字符串,所以呢要加上1来访结尾符号; 	buf->base = handle->data; 	buf->len = suggested_size; }  static void on_uv_udp_send_end(uv_udp_send_t* req, int status) { 	if (status == 0) { 		printf("send sucess\n"); 	} 	free(req); }  static void  after_uv_udp_recv(uv_udp_t* handle,                ssize_t nread, 			   const uv_buf_t* buf, 			   const struct sockaddr* addr, // 发过来数据包的IP地址 + 端口; 			   unsigned flags) { 	char ip_addr[128]; 	uv_ip4_name((struct sockaddr_in*)addr, ip_addr, 128); 	int port = ntohs(((struct sockaddr_in*)addr)->sin_port); 	printf("ip: %s:%d nread = %d\n", ip_addr, port, nread);  	char* str_buf = handle->data; 	str_buf[nread] = 0; 	printf("recv %s\n", str_buf);  	uv_buf_t w_buf; 	w_buf = uv_buf_init("PING", 4); 	// 写数据; 	uv_udp_send_t* req = malloc(sizeof(uv_udp_send_t)); 	uv_udp_send(req, handle, &w_buf, 1, addr, on_uv_udp_send_end); 	// end  }  int main(int argc, char** argv) { 	event_loop = uv_default_loop(); 	memset(&server, 0 ,sizeof(uv_udp_t));  	uv_udp_init(event_loop, &server); 	// bind端口; 	struct sockaddr_in addr; 	uv_ip4_addr("0.0.0.0", 6080, &addr); 	uv_udp_bind(&server, (const struct sockaddr*)&addr, 0); 	// end   	// 告诉事件循环,你要他管理recv事件; 	uv_udp_recv_start(&server, uv_alloc_buf, after_uv_udp_recv);  	uv_run(event_loop, UV_RUN_DEFAULT); 	system("pause"); 	return 0; }  

定时器设计

源码
 #include <stdio.h> #include <string.h> #include <stdlib.h>  #include "uv.h" #include "time_list.h"  #define my_malloc malloc #define my_free free   struct timer { 	uv_timer_t uv_timer; // libuv timer handle 	void(*on_timer)(void* udata); 	void* udata; 	int repeat_count; // -1一直循环; };  static struct timer*  alloc_timer(void(*on_timer)(void* udata),             void* udata, int repeat_count) { 	struct timer* t = my_malloc(sizeof(struct timer)); 	memset(t, 0, sizeof(struct timer));  	t->on_timer = on_timer; 	t->repeat_count = repeat_count; 	t->udata = udata; 	uv_timer_init(uv_default_loop(), &t->uv_timer); 	return t; }  static void free_timer(struct timer* t) { 	my_free(t); }  static void on_uv_timer(uv_timer_t* handle) { 	struct timer* t = handle->data; 	if (t->repeat_count < 0) { // 不断的触发; 		t->on_timer(t->udata); 	} 	else { 		t->repeat_count --; 		t->on_timer(t->udata); 		if (t->repeat_count == 0) { // 函数time结束 			uv_timer_stop(&t->uv_timer); // 停止这个timer 			free_timer(t); 		} 	} 	 }  struct timer* schedule(void(*on_timer)(void* udata),          void* udata, 		 int after_msec,          int repeat_count) { 	struct timer* t = alloc_timer(on_timer, udata, repeat_count);  	// 启动一个timer; 	t->uv_timer.data = t; 	uv_timer_start(&t->uv_timer, on_uv_timer, after_msec, after_msec); 	// end  	return t; }  void cancel_timer(struct timer* t) { 	if (t->repeat_count == 0) { // 全部触发完成,; 		return; 	} 	uv_timer_stop(&t->uv_timer); 	free_timer(t); }  struct timer* schedule_once(void(*on_timer)(void* udata),               void* udata, 			  int after_msec) { 	return schedule(on_timer, udata, after_msec, 1); }    
#ifndef __MY_TIMER_LIST_H__ #define __MY_TIMER_LIST_H__  // on_timer是一个回掉函数,当timer触发的时候调用; // udata: 是用户传的自定义的数据结构; // on_timer执行的时候 udata,就是你这个udata; // after_sec: 多少秒开始执行; // repeat_count: 执行多少次, repeat_count == -1一直执行; // 返回timer的句柄; struct timer; struct timer* schedule(void(*on_timer)(void* udata),           void* udata,  		 int after_msec, 		 int repeat_count);   // 取消掉这个timer; void  cancel_timer(struct timer* t);  struct timer* schedule_once(void(*on_timer)(void* udata),                void* udata,  			  int after_msec); #endif   
使用
#include <stdio.h> #include <string.h> #include <stdlib.h>   #include "uv.h"  // 获取当前系统从开机到现在运行了多少毫秒; #ifdef WIN32 #include <windows.h> static unsigned int get_cur_ms() { 	return GetTickCount(); } #else #include <sys/time.h>   #include <time.h>  #include <limits.h>  static unsigned int get_cur_ms() { 	struct timeval tv; 	// struct timezone tz; 	gettimeofday(&tv, NULL);  	return ((tv.tv_usec / 1000) + tv.tv_sec * 1000); } #endif    static uv_loop_t* event_loop = NULL;  #include "time_list.h"  struct timer* t = NULL; static void on_time_func(void* udata) { 	static int count = 0; 	char* str = (udata); 	printf("%s\n", str);  	count++; 	if (count == 10) { 		cancel_timer(t); 	} }  static void on_time_func2(void* udata) { 	char* str = (udata); 	printf("%s\n", str); }  int main(int argc, char** argv) { 	event_loop = uv_default_loop();  	// 每隔5秒掉一次,掉4次; 	t = schedule(on_time_func, "HelloWorld!!!", 1000, -1);  	//  	schedule_once(on_time_func2, "CallFunc!!!", 1000);  	uv_run(event_loop, UV_RUN_DEFAULT); 	system("pause"); 	return 0; } 

异步文件读写

#include <stdio.h> #include <string.h> #include <stdlib.h> #include <fcntl.h>  #include "uv.h"  /* uv_fs_open: 	loop: 事件循环, 	uv_fs_t req请求对象; 	path: 文件路径 	flags: 标志0 	mode: 可读,可写... O_RDONLY O_RDWR... */ static uv_loop_t* event_loop = NULL; static uv_fs_t req; static uv_fs_t w_req; static uv_file fs_handle; static char mem_buffer[1024]; /* uv_file 文件句柄对象: 打开文件以后的文件handle uv_fs_t   result,每次请求的结果都是这个值来返回;   打开文件: result返回打开文件句柄对象uv_file;   读文件: result读到的数据长度;   写文件: result为写入的数据长度; */  /* 释放掉这个请求req所占的资源 uv_req_cleanup(req);  */  /* stdin: 标注的输入文件, scanf, cin>> stdout: 标准的输出文件 printf; fprintf(stdout, "xxxxxxx");  每个进程在运行的时候: stdin文件句柄与stdout这个文件句柄始终是打开的; stdin:标准的输入文件,  stdout: 标准的输出; */  static void  after_read(uv_fs_t* req) { 	printf("read %d byte\n", req->result); 	mem_buffer[req->result] = 0; // 字符串结尾; 	printf("%s\n", mem_buffer); 	 	uv_fs_req_cleanup(req);  	uv_fs_close(event_loop, req, fs_handle, NULL); 	uv_fs_req_cleanup(req); }  static void  on_open_fs_cb(uv_fs_t* req) { 	// 打开文件 	fs_handle = req->result; 	uv_fs_req_cleanup(req); 	printf("open success end\n");    	uv_buf_t buf = uv_buf_init(mem_buffer, 1024); 	uv_fs_read(event_loop, req,  	           fs_handle, &buf, 1, 0,  			   after_read); }  int main(int argc, char** argv) { 	event_loop = uv_default_loop();  	// step1:打开文件: 	uv_fs_open(event_loop, &req,  		       "./test.txt", 0,  			   O_RDONLY, on_open_fs_cb);     	uv_buf_t w_buf = uv_buf_init("Good! BYCW!!!!", 12); 	uv_fs_write(event_loop, &w_req, (uv_file)1, &w_buf, 1, 0, NULL); 	uv_fs_req_cleanup(&w_req);  	uv_run(event_loop, UV_RUN_DEFAULT); 	system("pause"); 	return 0; }     

websocket协议

1: websocket是基于TCP的一种协议,是H5的一种传输协议;
2: websocket连接协议;
3: websocket 发送数据协议;
4: websocket 接受数据协议;
5: websocket 关闭协议;

建立连接

1:客户端向服务器发送http报文,服务器处理后回客户端连接报文;
2: 客户端发过来的报文:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

3: 服务器回应客户端报文:
:key+migic , SHA-1 加密, base-64 加密
key=”来自客户端的随机”, migic = “258EAFA5-E914-47DA-95CA-C5AB0DC85B11”;
static char *wb_accept = “HTTP/1.1 101 Switching Protocols\r\n”
“Upgrade:websocket\r\n”
“Connection: Upgrade\r\n”
“Sec-WebSocket-Accept: %s\r\n”
“WebSocket-Protocol:chat\r\n\r\n”;

Sec-WebSocket-Key/Accept的作用

  • 避免服务端收到非法的websocket连接(比如http客户端不小心请求连接websocket服务,此时服务端可以直接拒绝连接)
  • 确保服务端理解websocket连接。因为ws握手阶段采用的是http协议,因此可能ws连接是被一个http服务器处理并返回的,此时客户端可以通过Sec-WebSocket-Key来确保服务端认识ws协议。(并非百分百保险,比如总是存在那么些无聊的http服务器,光处理Sec-WebSocket-Key,但并没有实现ws协议。。。)
  • 用浏览器里发起ajax请求,设置header时,Sec-WebSocket-Key以及其他相关的header是被禁止的。这样可以避免客户端发送ajax请求时,意外请求协议升级(websocket upgrade)
    可以防止反向代理(不理解ws协议)返回错误的数据。比如反向代理前后收到两次ws连接的升级请求,反向代理把第一次请求的返回给cache住,然后第二次请求到来时直接把cache住的请求给返回(无意义的返回)。
  • Sec-WebSocket-Key主要目的并不是确保数据的安全性,因为Sec-WebSocket-Key、Sec-WebSocket-Accept的转换计算公式是公开的,而且非常简单,最主要的作用是预防一些常见的意外情况(非故意的)。
关闭连接

1: 主动关闭socket
2: 客户端关闭socket:
收到 0x88 开头的数据包;
收到tcp socket关闭事件;

发送数据
  1. 固定字节(0x81)
  2. 包长度字节
  3. 原始数据
接收数据

1)固定字节(1000 0001或1000 0010);
2)包长度字节, 去掉最高位, 剩下7为得到一个整数(0, 127);125以内的长度直接表示就可以了;
126表示后面两个字节表示大小,127表示后面的8个字节是数据的长度;(高位存在低地址)
3)mark 掩码为包长之后的 4 个字节
4)兄弟数据:
得到真实数据的方法:将兄弟数据的每一字节 x ,和掩码的第 i%4 字节做 xor 运算,其中 i 是 x 在兄弟数据中的索引

代码
客户端网页
<!DOCTYPE html> <html> <head>   <title>skynet WebSocket example</title> </head> <body>      <script>     var ws = new WebSocket('ws://127.0.0.1:8001/ws');      ws.onopen = function(){      alert("open");      ws.send('WebSocket');      };     ws.onmessage = function(ev){      alert(ev.data);     };     ws.onclose = function(ev){      alert("close");     };     ws.onerror = function(ev){         console.log(ev);      alert("error");     };    </script> </body> </html>   
服务器

该代码在前面的TCP服务器的基础上修改

#include <stdio.h> #include <string.h> #include <stdlib.h>   #include "../3rd/http_parser/http_parser.h" #include "../3rd/crypto/sha1.h" #include "../3rd/crypto/base64_encoder.h"  #include "uv.h"  struct ws_context { 	int is_shake_hand; // 是否已经握手 	char* data; // 读取数据的buf };     static uv_loop_t* loop = NULL; static uv_tcp_t l_server; // 监听句柄;  static void  uv_alloc_buf(uv_handle_t* handle,                   size_t suggested_size, 				  uv_buf_t* buf) {  	struct ws_context* wc = handle->data;  	if (wc->data != NULL) { 		free(wc->data); 		wc->data = NULL; 	}  	buf->base = malloc(suggested_size + 1); 	buf->len = suggested_size; 	wc->data = buf->base; }  static void on_close(uv_handle_t* handle) { 	printf("close client\n"); 	if (handle->data) { 		struct ws_context* wc = handle->data; 		free(wc->data); 		wc->data = NULL; 		free(wc); 		handle->data = NULL; 	}  	free(handle); }  static void  on_shutdown(uv_shutdown_t* req, int status) { 	uv_close((uv_handle_t*)req->handle, on_close); 	free(req); }  static void  after_write(uv_write_t* req, int status) { 	if (status == 0) { 		printf("write success\n"); 	} 	uv_buf_t* w_buf = req->data; 	if (w_buf) { 		free(w_buf->base); 		free(w_buf); 	}  	free(req); }  static void send_data(uv_stream_t* stream, unsigned char* send_data, int send_len) { 	uv_write_t* w_req = malloc(sizeof(uv_write_t)); 	uv_buf_t* w_buf = malloc(sizeof(uv_buf_t));  	unsigned char* send_buf = malloc(send_len); 	memcpy(send_buf, send_data, send_len);  	w_buf->base = send_buf; 	w_buf->len = send_len; 	w_req->data = w_buf; 	uv_write(w_req, stream, w_buf, 1, after_write); }  static char filed_sec_key[512]; static char value_sec_key[512]; static int is_sec_key = 0; static int has_sec_key = 0;  static int  on_ws_header_field(http_parser* p, const char *at, size_t length) { 	if (strncmp(at, "Sec-WebSocket-Key", length) == 0) { 		is_sec_key = 1; 	} 	else { 		is_sec_key = 0; 	} 	return 0; }  static int on_ws_header_value(http_parser* p, const char *at, size_t length) { 	if (!is_sec_key) { 		return 0; 	}  	strncpy(value_sec_key, at, length); 	value_sec_key[length] = 0; 	has_sec_key = 1;  	return 0; }  //  static char* wb_migic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // base64(sha1(key + wb_migic)) static char *wb_accept = "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade:websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n" "WebSocket-Protocol:chat\r\n\r\n";   static void ws_connect_shake_hand(uv_stream_t* stream, unsigned char* data, int data_len) { 	http_parser_settings settings; 	http_parser_settings_init(&settings);  	settings.on_header_field = on_ws_header_field; 	settings.on_header_value = on_ws_header_value;  	http_parser p; 	http_parser_init(&p, HTTP_REQUEST); 	is_sec_key = 0; 	has_sec_key = 0; 	http_parser_execute(&p, &settings, data, data_len);  	if (has_sec_key) { // 解析到了websocket里面的Sec-WebSocket-Key 		printf("Sec-WebSocket-Key: %s\n", value_sec_key); 		// key + migic 		static char key_migic[512]; 		static char sha1_key_migic[SHA1_DIGEST_SIZE]; 		static char send_client[512];  		int sha1_size;  		sprintf(key_migic, "%s%s", value_sec_key, wb_migic); 		crypt_sha1((unsigned char*)key_migic, strlen(key_migic), (unsigned char*)&sha1_key_migic, &sha1_size); 		int base64_len; 		char* base_buf = base64_encode(sha1_key_migic, sha1_size, &base64_len); 		sprintf(send_client, wb_accept, base_buf); 		base64_encode_free(base_buf);  		send_data(stream, (unsigned char*)send_client, strlen(send_client)); 	} }  static void  ws_send_data(uv_stream_t* stream, unsigned char* data, int len) { 	int head_size = 2; 	if (len > 125 && len < 65536) { // 两个字节[0, 65535] 		head_size += 2; 	} 	else if (len >= 65536) { // 不做处理 		head_size += 8; 	}  	unsigned char* data_buf = malloc(head_size + len); 	data_buf[0] = 0x81; 	if (len <= 125) { 		data_buf[1] = len; 	} 	else if (len > 125 && len < 65536) { 		data_buf[1] = 126; 		data_buf[2] = (len & 0x0000ff00) >> 8; 		data_buf[3] = (len & 0x000000ff); 	} 	else { // 127不写了  		return; 	}  	memcpy(data_buf + head_size, data, len);  	send_data(stream, data_buf, head_size + len); 	free(data_buf); }  // 收到的是一个数据包; static void  ws_on_recv_data(uv_stream_t* stream,                  unsigned char* data, unsigned int len) { 	if (data[0] != 0x81 && data[0] != 0x82) { 		return; 	}  	unsigned int data_len = data[1] & 0x0000007f; 	int head_size = 2; 	if (data_len == 126) { // 后面两个字节表示的是数据长度;data[2], data[3] 		data_len = data[3] | (data[2] << 8); 		head_size += 2; 	} 	else if (data_len == 127) { // 后面8个字节表示数据长度; 2, 3, 4, 5 | 6, 7, 8, 9 		unsigned int low = data[5] | (data[4] << 8) | (data[3] << 16) | (data[2] << 24); 		unsigned int hight = data[9] | (data[8] << 8) | (data[7] << 16) | (data[6] << 24);  		data_len = low; 		head_size += 8; 	}  	unsigned char* mask = data + head_size; 	unsigned char* body = data + head_size + 4;  	for (unsigned int i = 0; i < data_len; i++) { // 遍历后面所有的数据; 		body[i] = body[i] ^ mask[i % 4]; 	}  	// test 	static char test_buf[4096]; 	memcpy(test_buf, body, data_len); 	test_buf[data_len] = 0; 	printf("%s\n", test_buf);  	// 发送协议 	ws_send_data(stream, "Hello", strlen("Hello")); 	// end   }  static void after_read(uv_stream_t* stream,                        ssize_t nread, 					   const uv_buf_t* buf) { 	// 连接断开了; 	if (nread < 0) { 		uv_shutdown_t* reg = malloc(sizeof(uv_shutdown_t)); 		memset(reg, 0, sizeof(uv_shutdown_t)); 		uv_shutdown(reg, stream, on_shutdown); 		return; 	} 	// end 	printf("start websocket!!!\n"); 	struct ws_context* wc = stream->data;  	// 如果没有握手,就进入websocket握手协议 	if (wc->is_shake_hand == 0) { 		ws_connect_shake_hand(stream, buf->base, buf->len); 		wc->is_shake_hand = 1; 		return; 	} 	// end   	// 如果客户端主动关闭;0x88, 状态码 	if ((unsigned char)(buf->base[0]) == 0x88) { // 关闭 		printf("ws closing!!!!"); 		return; 	} 	// end   	// ws正常的数据, 暂时不处理粘包这些问题; 	// 假设我们一次性都可以收完websocket发过来的数据包; 	ws_on_recv_data(stream, buf->base, nread); 	// end  	 }  static void uv_connection(uv_stream_t* server, int status) { 	printf("new client comming\n"); 	 	uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); 	memset(client, 0, sizeof(uv_tcp_t)); 	uv_tcp_init(loop, client); 	uv_accept(server, (uv_stream_t*)client); 	 	struct ws_context* wc = malloc(sizeof(struct ws_context)); 	memset(wc, 0, sizeof(struct ws_context)); 	client->data = wc;  	uv_read_start((uv_stream_t*)client, uv_alloc_buf, after_read); }  int main(int argc, char** argv) { 	int ret; 	loop = uv_default_loop();  	uv_tcp_init(loop, &l_server);   	struct sockaddr_in addr; 	uv_ip4_addr("0.0.0.0", 8001, &addr); // ip地址, 端口 	ret = uv_tcp_bind(&l_server, (const struct sockaddr*) &addr, 0); 	if (ret != 0) { 		goto failed; 	}  	uv_listen((uv_stream_t*)&l_server, SOMAXCONN, uv_connection);  	uv_run(loop, UV_RUN_DEFAULT);  failed: 	printf("end\n"); 	system("pause"); 	return 0; }    
总结

WebSocket是一种基于TCP协议的双向通信协议,与HTTP/HTTPS协议相比,具有更低的延迟和更高的实时性。使用WebSocket协议可以实现实时通信、数据推送、在线游戏等功能。但是,WebSocket协议并没有被广泛采用的原因有以下几个方面:

兼容性问题:WebSocket协议是HTML5标准中新增的协议,对于较老的浏览器可能不支持。虽然现代主流浏览器已经支持WebSocket协议,但是在一些特殊情况下(例如企业内部应用、旧版浏览器等),WebSocket协议的兼容性可能成为问题。

安全问题:由于WebSocket协议实现了双向通信,因此在网络安全方面需要更加关注。例如,在进行WebSocket通信时需要进行恰当的身份验证和加密,以避免数据泄露和劫持等问题。

部署和负载问题:WebSocket协议需要建立长连接,因此在部署WebSocket服务时需要考虑服务器负载和连接管理等问题。如果不恰当地部署WebSocket服务,可能会导致服务器资源浪费、连接管理不当等问题。

用处有限:对于大多数网站来说,HTTP请求已经能满足需求。使用WebSocket可以带来实时性改善,但对许多网站功能影响不大。所以如果没有实时交互等强需求,就不一定需要引入WebSocket。

尽管WebSocket协议存在一些问题,但是在特定的场景下仍然是一种非常有用的协议。例如,在实时通信、数据推送、在线游戏等场景下,WebSocket协议可以发挥出其优势。

HTTP服务器

步骤:
1: 等待socket 连接进来;
2: 接收socket发送过来的数据;–> http协议格式的请求数据包
3: http解析url
4: 根据url来找对应的响应处理;
5: 将要返回的数据打包成http响应格式,发给客户端;
6: 关闭客户端的连接;

以Get请求为例:
1: 客户端提交请求;
2: 服务端解析get的url找到对应的响应;
3: 服务端解析get参数;
5: 处理,返回结果给客户端:
“HTTP/1.1 %d %s\r\n” 状态码,状态描述
“transfer-encoding:%s\r\n”, “identity”
“content-length: %d\r\n\r\n” 内容长度
body数据

6: get携带参数格式?uname=xiaoming&key=18074532323

#include <stdio.h> #include <string.h> #include <stdlib.h>  #include "uv.h" #include "../3rd/http_parser/http_parser.h"  /* url 注册管理模块 */  typedef void(*web_get_handle)(uv_stream_t* stream, char* url); typedef void(*web_post_handle)(uv_stream_t* stream, char* url, char* body);  struct url_node {    char* url; // url地址    web_get_handle get; // url地址对应的处理函数;    web_post_handle post; // url地址对应的post函数 };  static struct url_node* alloc_url_node(char* url, web_get_handle get, web_post_handle post) {    struct url_node* node = malloc(sizeof(struct url_node));    memset(node, 0, sizeof(struct url_node));    node->url = strdup(url);     node->get = get;    node->post = post;     return node; }  static struct url_node* url_node[1024]; static int url_count = 0;  static void register_web_handle(char* url, web_get_handle get, web_post_handle post) {    url_node[url_count] = alloc_url_node(url, get, post);    url_count ++; }  static struct url_node*  get_url_node(char* url, int len) {    for (int i = 0; i < url_count; i++) {    	if (strncmp(url, url_node[i]->url, len) == 0) {    		return url_node[i];    	}    }    return NULL; }   /* { [100] = "Continue", [101] = "Switching Protocols", [200] = "OK", [201] = "Created", [202] = "Accepted", [203] = "Non-Authoritative Information", [204] = "No Content", [205] = "Reset Content", [206] = "Partial Content", [300] = "Multiple Choices", [301] = "Moved Permanently", [302] = "Found", [303] = "See Other", [304] = "Not Modified", [305] = "Use Proxy", [307] = "Temporary Redirect", [400] = "Bad Request", [401] = "Unauthorized", [402] = "Payment Required", [403] = "Forbidden", [404] = "Not Found", [405] = "Method Not Allowed", [406] = "Not Acceptable", [407] = "Proxy Authentication Required", [408] = "Request Time-out", [409] = "Conflict", [410] = "Gone", [411] = "Length Required", [412] = "Precondition Failed", [413] = "Request Entity Too Large", [414] = "Request-URI Too Large", [415] = "Unsupported Media Type", [416] = "Requested range not satisfiable", [417] = "Expectation Failed", [500] = "Internal Server Error", [501] = "Not Implemented", [502] = "Bad Gateway", [503] = "Service Unavailable", [504] = "Gateway Time-out", [505] = "HTTP Version not supported", } */  /* uv_handle_s 数据结构: UV_HANDLE_FIELDS  uv_stream_t  数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS  uv_tcp_t 数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_TCP_PRIVATE_FIELDS  uv_tcp_t is uv_stream_t is uv_handle_t; */  static uv_loop_t* loop = NULL; static uv_tcp_t l_server; // 监听句柄;  // 当我们的event loop检车到handle上有数据可以读的时候, // 就会调用这个函数, 让这个函数给event loop准备好读入数据的内存; // event loop知道有多少数据,suggested_size, // handle: 发生读时间的handle; // suggested_size: 建议我们分配多大的内存来保存这个数据; // uv_buf_t: 我们准备好的内存,通过uv_buf_t,告诉even loop; static void  uv_alloc_buf(uv_handle_t* handle,                  size_t suggested_size,    			  uv_buf_t* buf) {     if (handle->data != NULL) {    	free(handle->data);    	handle->data = NULL;    }     buf->base = malloc(suggested_size + 1);    buf->len = suggested_size;    handle->data = buf->base; }  static void on_close(uv_handle_t* handle) {    printf("close client\n");    if (handle->data) {    	free(handle->data);    	handle->data = NULL;    } }  static void  on_shutdown(uv_shutdown_t* req, int status) {    uv_close((uv_handle_t*)req->handle, on_close);    free(req); }  static void  after_write(uv_write_t* req, int status) {    if (status == 0) {    	printf("write success\n");    }    uv_buf_t* w_buf = req->data;    if (w_buf) {    	free(w_buf);    }     free(req); }  static void send_data(uv_stream_t* stream, unsigned char* send_data, int send_len) {    uv_write_t* w_req = malloc(sizeof(uv_write_t));    uv_buf_t* w_buf = malloc(sizeof(uv_buf_t));     unsigned char* send_buf = malloc(send_len);    memcpy(send_buf, send_data, send_len);     w_buf->base = send_buf;    w_buf->len = send_len;    w_req->data = w_buf;    uv_write(w_req, stream, w_buf, 1, after_write); }  static char req_url[4096]; int on_url(http_parser* p, const char *at, size_t length) {    strncpy(req_url, at, length);     req_url[length] = 0;    return 0; }  static int  filter_url(char* url) {    char* walk = url;    int len = 0;    while (*url != '?' && *url != '\0') {    	len++;    	url++;    }     return len; }  static void  on_http_request(uv_stream_t* stream, char* req, int len) {    http_parser_settings settings;    http_parser_settings_init(&settings);    settings.on_url = on_url;    http_parser p;    http_parser_init(&p, HTTP_REQUEST);    http_parser_execute(&p, &settings, req, len);     // http get是可以携带参数的:    // http://www.baidu.com:6080/test?name=xiaoming&age=34    int url_len = filter_url(req_url);    struct url_node* node = get_url_node(req_url, url_len);    printf("%s\n", req_url);     if (node == NULL) {    	return;    }     switch (p.method) { // 请求方法    	case HTTP_GET:    		if (node->get != NULL) {    			node->get(stream, req_url);    		}    	break;    	case HTTP_POST:    		if (node->post != NULL) {    		}    	break;    } } // 参数:  // uv_stream_t* handle --> uv_tcp_t; // nread: 读到了多少字节的数据; // uv_buf_t: 我们的数据都读到到了哪个buf里面, base; static void after_read(uv_stream_t* stream,                       ssize_t nread,    				   const uv_buf_t* buf) {    // 连接断开了;    if (nread < 0) {    	uv_shutdown_t* reg = malloc(sizeof(uv_shutdown_t));    	memset(reg, 0, sizeof(uv_shutdown_t));    	uv_shutdown(reg, stream, on_shutdown);    	return;    }    // end     buf->base[nread] = 0;    printf("recv %d\n", nread);    printf("%s\n", buf->base);     // 处理    on_http_request(stream, buf->base, buf->len);    // end }  static void uv_connection(uv_stream_t* server, int status) {    printf("new client comming\n");    // 接入客户端;    uv_tcp_t* client = malloc(sizeof(uv_tcp_t));    memset(client, 0, sizeof(uv_tcp_t));    uv_tcp_init(loop, client);    uv_accept(server, (uv_stream_t*)client);    // end     // 告诉event loop,让他帮你管理哪个事件;    uv_read_start((uv_stream_t*)client, uv_alloc_buf, after_read); }  static void  test_get(uv_stream_t* stream, char* url) {    printf("%s\n", url);    char* body = "SUCCESS TEST1";     static char respons_buf[4096];    char* walk = respons_buf;    sprintf(walk, "HTTP/1.1 %d %s\r\n", 200, "OK");    walk += strlen(walk);    sprintf(walk, "transfer-encoding:%s\r\n", "identity");    walk += strlen(walk);    sprintf(walk, "content-length: %d\r\n\r\n", strlen(body));    walk += strlen(walk);     sprintf(walk, "%s", body);     send_data(stream, respons_buf, strlen(respons_buf)); }  static void  test2_get(uv_stream_t* stream, char* url) {    printf("%s\n", url);    char* body = "SUCCESS TEST2";     static char respons_buf[4096];    char* walk = respons_buf;    sprintf(walk, "HTTP/1.1 %d %s\r\n", 200, "OK");    walk += strlen(walk);    sprintf(walk, "transfer-encoding:%s\r\n", "identity");    walk += strlen(walk);    sprintf(walk, "content-length: %d\r\n\r\n", strlen(body));    walk += strlen(walk);     sprintf(walk, "%s", body);     send_data(stream, respons_buf, strlen(respons_buf)); }  int main(int argc, char** argv) {    // 注册一下web请求函数    register_web_handle("/test", test_get, NULL);    register_web_handle("/test2", test2_get, NULL);    // end      int ret;    loop = uv_default_loop();    // Tcp 监听服务;    uv_tcp_init(loop, &l_server); // 将l_server监听句柄加入到event loop里面;    // 你需要event loop来给你做那种管理呢?配置你要的管理类型;    struct sockaddr_in addr;    uv_ip4_addr("0.0.0.0", 6080, &addr); // ip地址, 端口    ret = uv_tcp_bind(&l_server, (const struct sockaddr*) &addr, 0);    if (ret != 0) {    	goto failed;    }    // 让event loop来做监听管理,当我们的l_server句柄上有人连接的时候;    // event loop就会调用用户指定的这个处理函数uv_connection;    uv_listen((uv_stream_t*)&l_server, SOMAXCONN, uv_connection);     uv_run(loop, UV_RUN_DEFAULT);  failed:    printf("end\n");    system("pause");    return 0; }  

多线程与工作队列

1: 线程相关函数:
uv_thread_create: 创建一个线程;
uv_thread_self: 获取当前线程id号;
uv_thread_join: 等待线程结束;
2: 锁:
uv_mutex_init: 初始化锁;
uv_mutex_destroy: 销毁锁;
uv_mutex_lock: 获取锁,如果被占用,就等待;
uv_mutex_trylock: 尝试获取锁,如果获取不到,直接返回,不等待;
uv_mutex_unlock: 释放锁;
3: 等待/触发事件;
uv_cond_init: 创建条件事件;
uv_cond_destroy: 销毁条件事件;
uv_cond_signal: 触发条件事件;
uv_cond_broadcast: 广播条件事件;
uv_cond_wait/uv_cond_timedwait: 等待事件/等待事件超时;

工作队列:
1: libuv在启动的时候会创建一个线程池;
2: 线程池默认启动的线程数目是4个,最大是128个线程;
3: uv_queue_work工作队列原理:
step1: libuv启动线程池,等待任务的调度;
step2: 用户给工作队列一个执行函数,工作队列执行完后,通知主线程;
step3: 主线程在执行之前设置的回掉函数;
4: 工作队列的使用:
有很多操作,比如读文件,比如读数据库,比如读redis都会等待,所以使用工作队列,
可以工作队列来执行,然后通知主线程,这样就不会卡主线程了。给工作队列一个任务(函数),并指定好任务完成的回掉函数,线程池就会调度去执行这个任务函数,完成后,通知主线程,主线程调用回掉函数;

#include <stdio.h> #include <string.h> #include <stdlib.h>  #include "uv.h"  // 工作队列的用处: // 1:复杂的算法放到工作队列 ; // 2:IO放到我们工作队列,获取数据库结果; // ....  // 是在线程池里面另外一个线程里面调用,不在主线程; static void  thread_work(uv_work_t* req) { 	//  	printf("user data = %d \n", (int)req->data); 	printf("thread_work id 0x%d:\n", uv_thread_self()); }  // 当工作队列里面的线程执行完上面的任务后,通知主线程; // 主线程调用这个函数; static void  on_work_complete(uv_work_t* req, int status) { 	printf("on_work_complete thread id 0x%d:\n", uv_thread_self()); }  int main(int argc, char** argv) { 	uv_work_t uv_work; 	printf("main id 0x%d:\n", uv_thread_self()); 	uv_work.data = (void*)6; 	uv_queue_work(uv_default_loop(), &uv_work, thread_work, on_work_complete);  	uv_run(uv_default_loop(), UV_RUN_DEFAULT); 	system("pause"); 	return 0; }  

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!