libuv
libuv简介
1: 开源跨平台的异步IO库, 主要功能有网络异步,文件异步等。
2: libuv主页: http://libuv.org/
3: libuv是node.js的底层库;
4: libuv的事件循环模型:
epoll, kqueue, IOCP, event ports;
异步 TCP 与 UDP sockets;
DNS 解析
异步文件读写;
信号处理;
高性能定时器;
进程/线程池;
libuv原理
1:异步: 在用户层同时管理多个句柄请求。
2: loop循环等待所有的事件和句柄,管理好所有的这些请求。
3: 当其中一个请求完成后,loop就会监测得到然后调用用户指定的回掉函数处理;
4: 例如loop监听所有的socket,有数据来了后,loop就会处理,然后转到用户指定的回调函数。
5: libuv编写思想:
1> 创建一个对象, 例如socket;
2> 给loop管理这个对象;
3> 并指定一个回调函数,当有事件发生的时候调用这个回调函数, callback;
6: 1>向loop发送请求;
2>指定结束后的回调函数;
3>当请求结束后,调用调函数;
TCP服务器搭建
首先需要下载libuv库,导入到工程中,设置好include的路径和链接上对应的库
代码如下:
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "uv.h" /* uv_handle_s 数据结构: UV_HANDLE_FIELDS uv_stream_t 数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS uv_tcp_t 数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_TCP_PRIVATE_FIELDS uv_tcp_t is uv_stream_t is uv_handle_t; */ static uv_loop_t* loop = NULL; static uv_tcp_t l_server; // 监听句柄; // 当我们的event loop检车到handle上有数据可以读的时候, // 就会调用这个函数, 让这个函数给event loop准备好读入数据的内存; // event loop知道有多少数据,suggested_size, // handle: 发生读时间的handle; // suggested_size: 建议我们分配多大的内存来保存这个数据; // uv_buf_t: 我们准备好的内存,通过uv_buf_t,告诉even loop; static void uv_alloc_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { if (handle->data != NULL) { free(handle->data); handle->data = NULL; } buf->base = malloc(suggested_size + 1); buf->len = suggested_size; handle->data = buf->base; } static void on_close(uv_handle_t* handle) { printf("close client\n"); if (handle->data) { free(handle->data); handle->data = NULL; } } static void on_shutdown(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, on_close); free(req); } static void after_write(uv_write_t* req, int status) { if (status == 0) { printf("write success\n"); } uv_buf_t* w_buf = req->data; if (w_buf) { free(w_buf); } free(req); } // 参数: // uv_stream_t* handle --> uv_tcp_t; // nread: 读到了多少字节的数据; // uv_buf_t: 我们的数据都读到到了哪个buf里面, base; static void after_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { // 连接断开了; if (nread < 0) { uv_shutdown_t* reg = malloc(sizeof(uv_shutdown_t)); memset(reg, 0, sizeof(uv_shutdown_t)); uv_shutdown(reg, stream, on_shutdown); return; } // end buf->base[nread] = 0; printf("recv %d\n", nread); printf("%s\n", buf->base); // 测试发送给我们的 客户端; uv_write_t* w_req = malloc(sizeof(uv_write_t)); uv_buf_t* w_buf = malloc(sizeof(uv_buf_t)); w_buf->base = buf->base; w_buf->len = nread; w_req->data = w_buf; uv_write(w_req, stream, w_buf, 1, after_write); } static void uv_connection(uv_stream_t* server, int status) { printf("new client comming\n"); // 接入客户端; uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); memset(client, 0, sizeof(uv_tcp_t)); uv_tcp_init(loop, client); uv_accept(server, (uv_stream_t*)client); // end // 告诉event loop,让他帮你管理哪个事件; uv_read_start((uv_stream_t*)client, uv_alloc_buf, after_read); } int main(int argc, char** argv) { int ret; loop = uv_default_loop(); // Tcp 监听服务; uv_tcp_init(loop, &l_server); // 将l_server监听句柄加入到event loop里面; // 你需要event loop来给你做那种管理呢?配置你要的管理类型; struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", 6080, &addr); // ip地址, 端口 ret = uv_tcp_bind(&l_server, (const struct sockaddr*) &addr, 0); if (ret != 0) { goto failed; } // 让event loop来做监听管理,当我们的l_server句柄上有人连接的时候; // event loop就会调用用户指定的这个处理函数uv_connection; uv_listen((uv_stream_t*)&l_server, SOMAXCONN, uv_connection); uv_run(loop, UV_RUN_DEFAULT); failed: printf("end\n"); system("pause"); return 0; }
UDP服务器搭建
客户端
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <WinSock2.h> #pragma comment(lib, "ws2_32.lib") int main(int argc, char** argv) { WSADATA ws; WSAStartup(MAKEWORD(2, 2), &ws); SOCKET client = socket(AF_INET, SOCK_DGRAM, 0); // 可以bind也可以不绑,如果不要求别人先发给你可以不bind; // end SOCKADDR_IN addr; addr.sin_family = AF_INET; addr.sin_port = htons(6080); addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); int len = sizeof(SOCKADDR_IN); int send_len = sendto(client, "Hello", 5, 0, (const SOCKADDR*)&addr, len); printf("send_len = %d\n", send_len); char buf[128]; SOCKADDR_IN sender_addr; // 收到谁发的数据包的地址; int recv_len = recvfrom(client, buf, 128, 0, &sender_addr, &len); if (recv_len > 0) { buf[recv_len] = 0; // 加上结尾符号; printf("%s\n", buf); } WSACleanup(); system("pause"); return 0; }
不用libuv版本的服务器
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <WinSock2.h> #pragma comment(lib, "ws2_32.lib") int main(int argc, char** argv) { WSADATA ws; WSAStartup(MAKEWORD(2, 2), &ws); SOCKET client = socket(AF_INET, SOCK_DGRAM, 0); // 可以bind也可以不绑,如果不要求别人先发给你可以不bind; // end SOCKADDR_IN addr; addr.sin_family = AF_INET; addr.sin_port = htons(6080); addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); int len = sizeof(SOCKADDR_IN); int send_len = sendto(client, "Hello", 5, 0, (const SOCKADDR*)&addr, len); printf("send_len = %d\n", send_len); char buf[128]; SOCKADDR_IN sender_addr; // 收到谁发的数据包的地址; int recv_len = recvfrom(client, buf, 128, 0, &sender_addr, &len); if (recv_len > 0) { buf[recv_len] = 0; // 加上结尾符号; printf("%s\n", buf); } WSACleanup(); system("pause"); return 0; }
libuv的UDP服务器
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "uv.h" static uv_loop_t* event_loop = NULL; static uv_udp_t server; // UDP的句柄; static void uv_alloc_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { if (handle->data != NULL) { free(handle->data); handle->data = NULL; } handle->data = malloc(suggested_size + 1); // +1测试的时候,我要收字符串,所以呢要加上1来访结尾符号; buf->base = handle->data; buf->len = suggested_size; } static void on_uv_udp_send_end(uv_udp_send_t* req, int status) { if (status == 0) { printf("send sucess\n"); } free(req); } static void after_uv_udp_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, // 发过来数据包的IP地址 + 端口; unsigned flags) { char ip_addr[128]; uv_ip4_name((struct sockaddr_in*)addr, ip_addr, 128); int port = ntohs(((struct sockaddr_in*)addr)->sin_port); printf("ip: %s:%d nread = %d\n", ip_addr, port, nread); char* str_buf = handle->data; str_buf[nread] = 0; printf("recv %s\n", str_buf); uv_buf_t w_buf; w_buf = uv_buf_init("PING", 4); // 写数据; uv_udp_send_t* req = malloc(sizeof(uv_udp_send_t)); uv_udp_send(req, handle, &w_buf, 1, addr, on_uv_udp_send_end); // end } int main(int argc, char** argv) { event_loop = uv_default_loop(); memset(&server, 0 ,sizeof(uv_udp_t)); uv_udp_init(event_loop, &server); // bind端口; struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", 6080, &addr); uv_udp_bind(&server, (const struct sockaddr*)&addr, 0); // end // 告诉事件循环,你要他管理recv事件; uv_udp_recv_start(&server, uv_alloc_buf, after_uv_udp_recv); uv_run(event_loop, UV_RUN_DEFAULT); system("pause"); return 0; }
定时器设计
源码
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "uv.h" #include "time_list.h" #define my_malloc malloc #define my_free free struct timer { uv_timer_t uv_timer; // libuv timer handle void(*on_timer)(void* udata); void* udata; int repeat_count; // -1一直循环; }; static struct timer* alloc_timer(void(*on_timer)(void* udata), void* udata, int repeat_count) { struct timer* t = my_malloc(sizeof(struct timer)); memset(t, 0, sizeof(struct timer)); t->on_timer = on_timer; t->repeat_count = repeat_count; t->udata = udata; uv_timer_init(uv_default_loop(), &t->uv_timer); return t; } static void free_timer(struct timer* t) { my_free(t); } static void on_uv_timer(uv_timer_t* handle) { struct timer* t = handle->data; if (t->repeat_count < 0) { // 不断的触发; t->on_timer(t->udata); } else { t->repeat_count --; t->on_timer(t->udata); if (t->repeat_count == 0) { // 函数time结束 uv_timer_stop(&t->uv_timer); // 停止这个timer free_timer(t); } } } struct timer* schedule(void(*on_timer)(void* udata), void* udata, int after_msec, int repeat_count) { struct timer* t = alloc_timer(on_timer, udata, repeat_count); // 启动一个timer; t->uv_timer.data = t; uv_timer_start(&t->uv_timer, on_uv_timer, after_msec, after_msec); // end return t; } void cancel_timer(struct timer* t) { if (t->repeat_count == 0) { // 全部触发完成,; return; } uv_timer_stop(&t->uv_timer); free_timer(t); } struct timer* schedule_once(void(*on_timer)(void* udata), void* udata, int after_msec) { return schedule(on_timer, udata, after_msec, 1); }
#ifndef __MY_TIMER_LIST_H__ #define __MY_TIMER_LIST_H__ // on_timer是一个回掉函数,当timer触发的时候调用; // udata: 是用户传的自定义的数据结构; // on_timer执行的时候 udata,就是你这个udata; // after_sec: 多少秒开始执行; // repeat_count: 执行多少次, repeat_count == -1一直执行; // 返回timer的句柄; struct timer; struct timer* schedule(void(*on_timer)(void* udata), void* udata, int after_msec, int repeat_count); // 取消掉这个timer; void cancel_timer(struct timer* t); struct timer* schedule_once(void(*on_timer)(void* udata), void* udata, int after_msec); #endif
使用
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "uv.h" // 获取当前系统从开机到现在运行了多少毫秒; #ifdef WIN32 #include <windows.h> static unsigned int get_cur_ms() { return GetTickCount(); } #else #include <sys/time.h> #include <time.h> #include <limits.h> static unsigned int get_cur_ms() { struct timeval tv; // struct timezone tz; gettimeofday(&tv, NULL); return ((tv.tv_usec / 1000) + tv.tv_sec * 1000); } #endif static uv_loop_t* event_loop = NULL; #include "time_list.h" struct timer* t = NULL; static void on_time_func(void* udata) { static int count = 0; char* str = (udata); printf("%s\n", str); count++; if (count == 10) { cancel_timer(t); } } static void on_time_func2(void* udata) { char* str = (udata); printf("%s\n", str); } int main(int argc, char** argv) { event_loop = uv_default_loop(); // 每隔5秒掉一次,掉4次; t = schedule(on_time_func, "HelloWorld!!!", 1000, -1); // schedule_once(on_time_func2, "CallFunc!!!", 1000); uv_run(event_loop, UV_RUN_DEFAULT); system("pause"); return 0; }
异步文件读写
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <fcntl.h> #include "uv.h" /* uv_fs_open: loop: 事件循环, uv_fs_t req请求对象; path: 文件路径 flags: 标志0 mode: 可读,可写... O_RDONLY O_RDWR... */ static uv_loop_t* event_loop = NULL; static uv_fs_t req; static uv_fs_t w_req; static uv_file fs_handle; static char mem_buffer[1024]; /* uv_file 文件句柄对象: 打开文件以后的文件handle uv_fs_t result,每次请求的结果都是这个值来返回; 打开文件: result返回打开文件句柄对象uv_file; 读文件: result读到的数据长度; 写文件: result为写入的数据长度; */ /* 释放掉这个请求req所占的资源 uv_req_cleanup(req); */ /* stdin: 标注的输入文件, scanf, cin>> stdout: 标准的输出文件 printf; fprintf(stdout, "xxxxxxx"); 每个进程在运行的时候: stdin文件句柄与stdout这个文件句柄始终是打开的; stdin:标准的输入文件, stdout: 标准的输出; */ static void after_read(uv_fs_t* req) { printf("read %d byte\n", req->result); mem_buffer[req->result] = 0; // 字符串结尾; printf("%s\n", mem_buffer); uv_fs_req_cleanup(req); uv_fs_close(event_loop, req, fs_handle, NULL); uv_fs_req_cleanup(req); } static void on_open_fs_cb(uv_fs_t* req) { // 打开文件 fs_handle = req->result; uv_fs_req_cleanup(req); printf("open success end\n"); uv_buf_t buf = uv_buf_init(mem_buffer, 1024); uv_fs_read(event_loop, req, fs_handle, &buf, 1, 0, after_read); } int main(int argc, char** argv) { event_loop = uv_default_loop(); // step1:打开文件: uv_fs_open(event_loop, &req, "./test.txt", 0, O_RDONLY, on_open_fs_cb); uv_buf_t w_buf = uv_buf_init("Good! BYCW!!!!", 12); uv_fs_write(event_loop, &w_req, (uv_file)1, &w_buf, 1, 0, NULL); uv_fs_req_cleanup(&w_req); uv_run(event_loop, UV_RUN_DEFAULT); system("pause"); return 0; }
websocket协议
1: websocket是基于TCP的一种协议,是H5的一种传输协议;
2: websocket连接协议;
3: websocket 发送数据协议;
4: websocket 接受数据协议;
5: websocket 关闭协议;
建立连接
1:客户端向服务器发送http报文,服务器处理后回客户端连接报文;
2: 客户端发过来的报文:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
3: 服务器回应客户端报文:
:key+migic , SHA-1 加密, base-64 加密
key=”来自客户端的随机”, migic = “258EAFA5-E914-47DA-95CA-C5AB0DC85B11”;
static char *wb_accept = “HTTP/1.1 101 Switching Protocols\r\n”
“Upgrade:websocket\r\n”
“Connection: Upgrade\r\n”
“Sec-WebSocket-Accept: %s\r\n”
“WebSocket-Protocol:chat\r\n\r\n”;
Sec-WebSocket-Key/Accept的作用
- 避免服务端收到非法的websocket连接(比如http客户端不小心请求连接websocket服务,此时服务端可以直接拒绝连接)
- 确保服务端理解websocket连接。因为ws握手阶段采用的是http协议,因此可能ws连接是被一个http服务器处理并返回的,此时客户端可以通过Sec-WebSocket-Key来确保服务端认识ws协议。(并非百分百保险,比如总是存在那么些无聊的http服务器,光处理Sec-WebSocket-Key,但并没有实现ws协议。。。)
- 用浏览器里发起ajax请求,设置header时,Sec-WebSocket-Key以及其他相关的header是被禁止的。这样可以避免客户端发送ajax请求时,意外请求协议升级(websocket upgrade)
可以防止反向代理(不理解ws协议)返回错误的数据。比如反向代理前后收到两次ws连接的升级请求,反向代理把第一次请求的返回给cache住,然后第二次请求到来时直接把cache住的请求给返回(无意义的返回)。 - Sec-WebSocket-Key主要目的并不是确保数据的安全性,因为Sec-WebSocket-Key、Sec-WebSocket-Accept的转换计算公式是公开的,而且非常简单,最主要的作用是预防一些常见的意外情况(非故意的)。
关闭连接
1: 主动关闭socket
2: 客户端关闭socket:
收到 0x88 开头的数据包;
收到tcp socket关闭事件;
发送数据
- 固定字节(0x81)
- 包长度字节
- 原始数据
接收数据
1)固定字节(1000 0001或1000 0010);
2)包长度字节, 去掉最高位, 剩下7为得到一个整数(0, 127);125以内的长度直接表示就可以了;
126表示后面两个字节表示大小,127表示后面的8个字节是数据的长度;(高位存在低地址)
3)mark 掩码为包长之后的 4 个字节
4)兄弟数据:
得到真实数据的方法:将兄弟数据的每一字节 x ,和掩码的第 i%4 字节做 xor 运算,其中 i 是 x 在兄弟数据中的索引
代码
客户端网页
<!DOCTYPE html> <html> <head> <title>skynet WebSocket example</title> </head> <body> <script> var ws = new WebSocket('ws://127.0.0.1:8001/ws'); ws.onopen = function(){ alert("open"); ws.send('WebSocket'); }; ws.onmessage = function(ev){ alert(ev.data); }; ws.onclose = function(ev){ alert("close"); }; ws.onerror = function(ev){ console.log(ev); alert("error"); }; </script> </body> </html>
服务器
该代码在前面的TCP服务器的基础上修改
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "../3rd/http_parser/http_parser.h" #include "../3rd/crypto/sha1.h" #include "../3rd/crypto/base64_encoder.h" #include "uv.h" struct ws_context { int is_shake_hand; // 是否已经握手 char* data; // 读取数据的buf }; static uv_loop_t* loop = NULL; static uv_tcp_t l_server; // 监听句柄; static void uv_alloc_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { struct ws_context* wc = handle->data; if (wc->data != NULL) { free(wc->data); wc->data = NULL; } buf->base = malloc(suggested_size + 1); buf->len = suggested_size; wc->data = buf->base; } static void on_close(uv_handle_t* handle) { printf("close client\n"); if (handle->data) { struct ws_context* wc = handle->data; free(wc->data); wc->data = NULL; free(wc); handle->data = NULL; } free(handle); } static void on_shutdown(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, on_close); free(req); } static void after_write(uv_write_t* req, int status) { if (status == 0) { printf("write success\n"); } uv_buf_t* w_buf = req->data; if (w_buf) { free(w_buf->base); free(w_buf); } free(req); } static void send_data(uv_stream_t* stream, unsigned char* send_data, int send_len) { uv_write_t* w_req = malloc(sizeof(uv_write_t)); uv_buf_t* w_buf = malloc(sizeof(uv_buf_t)); unsigned char* send_buf = malloc(send_len); memcpy(send_buf, send_data, send_len); w_buf->base = send_buf; w_buf->len = send_len; w_req->data = w_buf; uv_write(w_req, stream, w_buf, 1, after_write); } static char filed_sec_key[512]; static char value_sec_key[512]; static int is_sec_key = 0; static int has_sec_key = 0; static int on_ws_header_field(http_parser* p, const char *at, size_t length) { if (strncmp(at, "Sec-WebSocket-Key", length) == 0) { is_sec_key = 1; } else { is_sec_key = 0; } return 0; } static int on_ws_header_value(http_parser* p, const char *at, size_t length) { if (!is_sec_key) { return 0; } strncpy(value_sec_key, at, length); value_sec_key[length] = 0; has_sec_key = 1; return 0; } // static char* wb_migic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // base64(sha1(key + wb_migic)) static char *wb_accept = "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade:websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: %s\r\n" "WebSocket-Protocol:chat\r\n\r\n"; static void ws_connect_shake_hand(uv_stream_t* stream, unsigned char* data, int data_len) { http_parser_settings settings; http_parser_settings_init(&settings); settings.on_header_field = on_ws_header_field; settings.on_header_value = on_ws_header_value; http_parser p; http_parser_init(&p, HTTP_REQUEST); is_sec_key = 0; has_sec_key = 0; http_parser_execute(&p, &settings, data, data_len); if (has_sec_key) { // 解析到了websocket里面的Sec-WebSocket-Key printf("Sec-WebSocket-Key: %s\n", value_sec_key); // key + migic static char key_migic[512]; static char sha1_key_migic[SHA1_DIGEST_SIZE]; static char send_client[512]; int sha1_size; sprintf(key_migic, "%s%s", value_sec_key, wb_migic); crypt_sha1((unsigned char*)key_migic, strlen(key_migic), (unsigned char*)&sha1_key_migic, &sha1_size); int base64_len; char* base_buf = base64_encode(sha1_key_migic, sha1_size, &base64_len); sprintf(send_client, wb_accept, base_buf); base64_encode_free(base_buf); send_data(stream, (unsigned char*)send_client, strlen(send_client)); } } static void ws_send_data(uv_stream_t* stream, unsigned char* data, int len) { int head_size = 2; if (len > 125 && len < 65536) { // 两个字节[0, 65535] head_size += 2; } else if (len >= 65536) { // 不做处理 head_size += 8; } unsigned char* data_buf = malloc(head_size + len); data_buf[0] = 0x81; if (len <= 125) { data_buf[1] = len; } else if (len > 125 && len < 65536) { data_buf[1] = 126; data_buf[2] = (len & 0x0000ff00) >> 8; data_buf[3] = (len & 0x000000ff); } else { // 127不写了 return; } memcpy(data_buf + head_size, data, len); send_data(stream, data_buf, head_size + len); free(data_buf); } // 收到的是一个数据包; static void ws_on_recv_data(uv_stream_t* stream, unsigned char* data, unsigned int len) { if (data[0] != 0x81 && data[0] != 0x82) { return; } unsigned int data_len = data[1] & 0x0000007f; int head_size = 2; if (data_len == 126) { // 后面两个字节表示的是数据长度;data[2], data[3] data_len = data[3] | (data[2] << 8); head_size += 2; } else if (data_len == 127) { // 后面8个字节表示数据长度; 2, 3, 4, 5 | 6, 7, 8, 9 unsigned int low = data[5] | (data[4] << 8) | (data[3] << 16) | (data[2] << 24); unsigned int hight = data[9] | (data[8] << 8) | (data[7] << 16) | (data[6] << 24); data_len = low; head_size += 8; } unsigned char* mask = data + head_size; unsigned char* body = data + head_size + 4; for (unsigned int i = 0; i < data_len; i++) { // 遍历后面所有的数据; body[i] = body[i] ^ mask[i % 4]; } // test static char test_buf[4096]; memcpy(test_buf, body, data_len); test_buf[data_len] = 0; printf("%s\n", test_buf); // 发送协议 ws_send_data(stream, "Hello", strlen("Hello")); // end } static void after_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { // 连接断开了; if (nread < 0) { uv_shutdown_t* reg = malloc(sizeof(uv_shutdown_t)); memset(reg, 0, sizeof(uv_shutdown_t)); uv_shutdown(reg, stream, on_shutdown); return; } // end printf("start websocket!!!\n"); struct ws_context* wc = stream->data; // 如果没有握手,就进入websocket握手协议 if (wc->is_shake_hand == 0) { ws_connect_shake_hand(stream, buf->base, buf->len); wc->is_shake_hand = 1; return; } // end // 如果客户端主动关闭;0x88, 状态码 if ((unsigned char)(buf->base[0]) == 0x88) { // 关闭 printf("ws closing!!!!"); return; } // end // ws正常的数据, 暂时不处理粘包这些问题; // 假设我们一次性都可以收完websocket发过来的数据包; ws_on_recv_data(stream, buf->base, nread); // end } static void uv_connection(uv_stream_t* server, int status) { printf("new client comming\n"); uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); memset(client, 0, sizeof(uv_tcp_t)); uv_tcp_init(loop, client); uv_accept(server, (uv_stream_t*)client); struct ws_context* wc = malloc(sizeof(struct ws_context)); memset(wc, 0, sizeof(struct ws_context)); client->data = wc; uv_read_start((uv_stream_t*)client, uv_alloc_buf, after_read); } int main(int argc, char** argv) { int ret; loop = uv_default_loop(); uv_tcp_init(loop, &l_server); struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", 8001, &addr); // ip地址, 端口 ret = uv_tcp_bind(&l_server, (const struct sockaddr*) &addr, 0); if (ret != 0) { goto failed; } uv_listen((uv_stream_t*)&l_server, SOMAXCONN, uv_connection); uv_run(loop, UV_RUN_DEFAULT); failed: printf("end\n"); system("pause"); return 0; }
总结
WebSocket是一种基于TCP协议的双向通信协议,与HTTP/HTTPS协议相比,具有更低的延迟和更高的实时性。使用WebSocket协议可以实现实时通信、数据推送、在线游戏等功能。但是,WebSocket协议并没有被广泛采用的原因有以下几个方面:
兼容性问题:WebSocket协议是HTML5标准中新增的协议,对于较老的浏览器可能不支持。虽然现代主流浏览器已经支持WebSocket协议,但是在一些特殊情况下(例如企业内部应用、旧版浏览器等),WebSocket协议的兼容性可能成为问题。
安全问题:由于WebSocket协议实现了双向通信,因此在网络安全方面需要更加关注。例如,在进行WebSocket通信时需要进行恰当的身份验证和加密,以避免数据泄露和劫持等问题。
部署和负载问题:WebSocket协议需要建立长连接,因此在部署WebSocket服务时需要考虑服务器负载和连接管理等问题。如果不恰当地部署WebSocket服务,可能会导致服务器资源浪费、连接管理不当等问题。
用处有限:对于大多数网站来说,HTTP请求已经能满足需求。使用WebSocket可以带来实时性改善,但对许多网站功能影响不大。所以如果没有实时交互等强需求,就不一定需要引入WebSocket。
尽管WebSocket协议存在一些问题,但是在特定的场景下仍然是一种非常有用的协议。例如,在实时通信、数据推送、在线游戏等场景下,WebSocket协议可以发挥出其优势。
HTTP服务器
步骤:
1: 等待socket 连接进来;
2: 接收socket发送过来的数据;–> http协议格式的请求数据包
3: http解析url
4: 根据url来找对应的响应处理;
5: 将要返回的数据打包成http响应格式,发给客户端;
6: 关闭客户端的连接;
以Get请求为例:
1: 客户端提交请求;
2: 服务端解析get的url找到对应的响应;
3: 服务端解析get参数;
5: 处理,返回结果给客户端:
“HTTP/1.1 %d %s\r\n” 状态码,状态描述
“transfer-encoding:%s\r\n”, “identity”
“content-length: %d\r\n\r\n” 内容长度
body数据
6: get携带参数格式?uname=xiaoming&key=18074532323
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "uv.h" #include "../3rd/http_parser/http_parser.h" /* url 注册管理模块 */ typedef void(*web_get_handle)(uv_stream_t* stream, char* url); typedef void(*web_post_handle)(uv_stream_t* stream, char* url, char* body); struct url_node { char* url; // url地址 web_get_handle get; // url地址对应的处理函数; web_post_handle post; // url地址对应的post函数 }; static struct url_node* alloc_url_node(char* url, web_get_handle get, web_post_handle post) { struct url_node* node = malloc(sizeof(struct url_node)); memset(node, 0, sizeof(struct url_node)); node->url = strdup(url); node->get = get; node->post = post; return node; } static struct url_node* url_node[1024]; static int url_count = 0; static void register_web_handle(char* url, web_get_handle get, web_post_handle post) { url_node[url_count] = alloc_url_node(url, get, post); url_count ++; } static struct url_node* get_url_node(char* url, int len) { for (int i = 0; i < url_count; i++) { if (strncmp(url, url_node[i]->url, len) == 0) { return url_node[i]; } } return NULL; } /* { [100] = "Continue", [101] = "Switching Protocols", [200] = "OK", [201] = "Created", [202] = "Accepted", [203] = "Non-Authoritative Information", [204] = "No Content", [205] = "Reset Content", [206] = "Partial Content", [300] = "Multiple Choices", [301] = "Moved Permanently", [302] = "Found", [303] = "See Other", [304] = "Not Modified", [305] = "Use Proxy", [307] = "Temporary Redirect", [400] = "Bad Request", [401] = "Unauthorized", [402] = "Payment Required", [403] = "Forbidden", [404] = "Not Found", [405] = "Method Not Allowed", [406] = "Not Acceptable", [407] = "Proxy Authentication Required", [408] = "Request Time-out", [409] = "Conflict", [410] = "Gone", [411] = "Length Required", [412] = "Precondition Failed", [413] = "Request Entity Too Large", [414] = "Request-URI Too Large", [415] = "Unsupported Media Type", [416] = "Requested range not satisfiable", [417] = "Expectation Failed", [500] = "Internal Server Error", [501] = "Not Implemented", [502] = "Bad Gateway", [503] = "Service Unavailable", [504] = "Gateway Time-out", [505] = "HTTP Version not supported", } */ /* uv_handle_s 数据结构: UV_HANDLE_FIELDS uv_stream_t 数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS uv_tcp_t 数据结构; UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_TCP_PRIVATE_FIELDS uv_tcp_t is uv_stream_t is uv_handle_t; */ static uv_loop_t* loop = NULL; static uv_tcp_t l_server; // 监听句柄; // 当我们的event loop检车到handle上有数据可以读的时候, // 就会调用这个函数, 让这个函数给event loop准备好读入数据的内存; // event loop知道有多少数据,suggested_size, // handle: 发生读时间的handle; // suggested_size: 建议我们分配多大的内存来保存这个数据; // uv_buf_t: 我们准备好的内存,通过uv_buf_t,告诉even loop; static void uv_alloc_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { if (handle->data != NULL) { free(handle->data); handle->data = NULL; } buf->base = malloc(suggested_size + 1); buf->len = suggested_size; handle->data = buf->base; } static void on_close(uv_handle_t* handle) { printf("close client\n"); if (handle->data) { free(handle->data); handle->data = NULL; } } static void on_shutdown(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, on_close); free(req); } static void after_write(uv_write_t* req, int status) { if (status == 0) { printf("write success\n"); } uv_buf_t* w_buf = req->data; if (w_buf) { free(w_buf); } free(req); } static void send_data(uv_stream_t* stream, unsigned char* send_data, int send_len) { uv_write_t* w_req = malloc(sizeof(uv_write_t)); uv_buf_t* w_buf = malloc(sizeof(uv_buf_t)); unsigned char* send_buf = malloc(send_len); memcpy(send_buf, send_data, send_len); w_buf->base = send_buf; w_buf->len = send_len; w_req->data = w_buf; uv_write(w_req, stream, w_buf, 1, after_write); } static char req_url[4096]; int on_url(http_parser* p, const char *at, size_t length) { strncpy(req_url, at, length); req_url[length] = 0; return 0; } static int filter_url(char* url) { char* walk = url; int len = 0; while (*url != '?' && *url != '\0') { len++; url++; } return len; } static void on_http_request(uv_stream_t* stream, char* req, int len) { http_parser_settings settings; http_parser_settings_init(&settings); settings.on_url = on_url; http_parser p; http_parser_init(&p, HTTP_REQUEST); http_parser_execute(&p, &settings, req, len); // http get是可以携带参数的: // http://www.baidu.com:6080/test?name=xiaoming&age=34 int url_len = filter_url(req_url); struct url_node* node = get_url_node(req_url, url_len); printf("%s\n", req_url); if (node == NULL) { return; } switch (p.method) { // 请求方法 case HTTP_GET: if (node->get != NULL) { node->get(stream, req_url); } break; case HTTP_POST: if (node->post != NULL) { } break; } } // 参数: // uv_stream_t* handle --> uv_tcp_t; // nread: 读到了多少字节的数据; // uv_buf_t: 我们的数据都读到到了哪个buf里面, base; static void after_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { // 连接断开了; if (nread < 0) { uv_shutdown_t* reg = malloc(sizeof(uv_shutdown_t)); memset(reg, 0, sizeof(uv_shutdown_t)); uv_shutdown(reg, stream, on_shutdown); return; } // end buf->base[nread] = 0; printf("recv %d\n", nread); printf("%s\n", buf->base); // 处理 on_http_request(stream, buf->base, buf->len); // end } static void uv_connection(uv_stream_t* server, int status) { printf("new client comming\n"); // 接入客户端; uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); memset(client, 0, sizeof(uv_tcp_t)); uv_tcp_init(loop, client); uv_accept(server, (uv_stream_t*)client); // end // 告诉event loop,让他帮你管理哪个事件; uv_read_start((uv_stream_t*)client, uv_alloc_buf, after_read); } static void test_get(uv_stream_t* stream, char* url) { printf("%s\n", url); char* body = "SUCCESS TEST1"; static char respons_buf[4096]; char* walk = respons_buf; sprintf(walk, "HTTP/1.1 %d %s\r\n", 200, "OK"); walk += strlen(walk); sprintf(walk, "transfer-encoding:%s\r\n", "identity"); walk += strlen(walk); sprintf(walk, "content-length: %d\r\n\r\n", strlen(body)); walk += strlen(walk); sprintf(walk, "%s", body); send_data(stream, respons_buf, strlen(respons_buf)); } static void test2_get(uv_stream_t* stream, char* url) { printf("%s\n", url); char* body = "SUCCESS TEST2"; static char respons_buf[4096]; char* walk = respons_buf; sprintf(walk, "HTTP/1.1 %d %s\r\n", 200, "OK"); walk += strlen(walk); sprintf(walk, "transfer-encoding:%s\r\n", "identity"); walk += strlen(walk); sprintf(walk, "content-length: %d\r\n\r\n", strlen(body)); walk += strlen(walk); sprintf(walk, "%s", body); send_data(stream, respons_buf, strlen(respons_buf)); } int main(int argc, char** argv) { // 注册一下web请求函数 register_web_handle("/test", test_get, NULL); register_web_handle("/test2", test2_get, NULL); // end int ret; loop = uv_default_loop(); // Tcp 监听服务; uv_tcp_init(loop, &l_server); // 将l_server监听句柄加入到event loop里面; // 你需要event loop来给你做那种管理呢?配置你要的管理类型; struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", 6080, &addr); // ip地址, 端口 ret = uv_tcp_bind(&l_server, (const struct sockaddr*) &addr, 0); if (ret != 0) { goto failed; } // 让event loop来做监听管理,当我们的l_server句柄上有人连接的时候; // event loop就会调用用户指定的这个处理函数uv_connection; uv_listen((uv_stream_t*)&l_server, SOMAXCONN, uv_connection); uv_run(loop, UV_RUN_DEFAULT); failed: printf("end\n"); system("pause"); return 0; }
多线程与工作队列
1: 线程相关函数:
uv_thread_create: 创建一个线程;
uv_thread_self: 获取当前线程id号;
uv_thread_join: 等待线程结束;
2: 锁:
uv_mutex_init: 初始化锁;
uv_mutex_destroy: 销毁锁;
uv_mutex_lock: 获取锁,如果被占用,就等待;
uv_mutex_trylock: 尝试获取锁,如果获取不到,直接返回,不等待;
uv_mutex_unlock: 释放锁;
3: 等待/触发事件;
uv_cond_init: 创建条件事件;
uv_cond_destroy: 销毁条件事件;
uv_cond_signal: 触发条件事件;
uv_cond_broadcast: 广播条件事件;
uv_cond_wait/uv_cond_timedwait: 等待事件/等待事件超时;
工作队列:
1: libuv在启动的时候会创建一个线程池;
2: 线程池默认启动的线程数目是4个,最大是128个线程;
3: uv_queue_work工作队列原理:
step1: libuv启动线程池,等待任务的调度;
step2: 用户给工作队列一个执行函数,工作队列执行完后,通知主线程;
step3: 主线程在执行之前设置的回掉函数;
4: 工作队列的使用:
有很多操作,比如读文件,比如读数据库,比如读redis都会等待,所以使用工作队列,
可以工作队列来执行,然后通知主线程,这样就不会卡主线程了。给工作队列一个任务(函数),并指定好任务完成的回掉函数,线程池就会调度去执行这个任务函数,完成后,通知主线程,主线程调用回掉函数;
#include <stdio.h> #include <string.h> #include <stdlib.h> #include "uv.h" // 工作队列的用处: // 1:复杂的算法放到工作队列 ; // 2:IO放到我们工作队列,获取数据库结果; // .... // 是在线程池里面另外一个线程里面调用,不在主线程; static void thread_work(uv_work_t* req) { // printf("user data = %d \n", (int)req->data); printf("thread_work id 0x%d:\n", uv_thread_self()); } // 当工作队列里面的线程执行完上面的任务后,通知主线程; // 主线程调用这个函数; static void on_work_complete(uv_work_t* req, int status) { printf("on_work_complete thread id 0x%d:\n", uv_thread_self()); } int main(int argc, char** argv) { uv_work_t uv_work; printf("main id 0x%d:\n", uv_thread_self()); uv_work.data = (void*)6; uv_queue_work(uv_default_loop(), &uv_work, thread_work, on_work_complete); uv_run(uv_default_loop(), UV_RUN_DEFAULT); system("pause"); return 0; }