几百行完成linux socket 服务器 支持 epoll 异步 io

avatar
作者
筋斗云
阅读量:0

一个使用epoll的高性能C++ socket服务器示例。这个服务器使用epoll进行I/O多路复用,支持大数据块传输,并使用线程池处理客户端连接。在构造函数中添加了 signal(SIGPIPE, SIG_IGN); 来忽略 SIGPIPE 信号。这可以防止在写入已关闭的 socket 时程序被终止,以下是代码实现:

```cpp

#include <iostream>  
#include <vector>  
#include <thread>  
#include <atomic>  
#include <unordered_map>  
#include <sys/epoll.h>  
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <fcntl.h>  
#include <unistd.h>  
#include <string.h>  
#include <signal.h>  
#include <errno.h>  

const int MAX_EVENTS = 10000;  
const int BUFFER_SIZE = 1024 * 1024; // 1MB buffer  
const int THREAD_POOL_SIZE = 4;  

class Client {  
public:  
    int fd;  
    std::vector<char> buffer;  
    uint64_t expected_size;  
    uint64_t received_size;  

    Client(int fd) : fd(fd), expected_size(0), received_size(0) {  
        buffer.reserve(BUFFER_SIZE);  
    }  
};  

class Server {  
private:  
    int server_fd, epoll_fd;  
    std::vector<std::thread> thread_pool;  
    std::atomic<bool> running;  
    std::unordered_map<int, std::unique_ptr<Client>> clients;  

public:  
    Server(int port) : running(true) {  
        // 忽略 SIGPIPE 信号  
        signal(SIGPIPE, SIG_IGN);  

        server_fd = socket(AF_INET, SOCK_STREAM, 0);  
        if (server_fd == -1) {  
            throw std::runtime_error("Failed to create socket");  
        }  

        int opt = 1;  
        if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {  
            throw std::runtime_error("Failed to set socket options");  
        }  

        sockaddr_in address;  
        address.sin_family = AF_INET;  
        address.sin_addr.s_addr = INADDR_ANY;  
        address.sin_port = htons(port);  

        if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {  
            throw std::runtime_error("Failed to bind to port");  
        }  

        if (listen(server_fd, SOMAXCONN) < 0) {  
            throw std::runtime_error("Failed to listen on socket");  
        }  

        epoll_fd = epoll_create1(0);  
        if (epoll_fd == -1) {  
            throw std::runtime_error("Failed to create epoll file descriptor");  
        }  

        add_to_epoll(server_fd);  

        for (int i = 0; i < THREAD_POOL_SIZE; ++i) {  
            thread_pool.emplace_back(&Server::worker, this);  
        }  
    }  

    ~Server() {  
        running = false;  
        for (auto& thread : thread_pool) {  
            thread.join();  
        }  
        close(epoll_fd);  
        close(server_fd);  
    }  

    void run() {  
        std::cout << "Server started. Waiting for connections..." << std::endl;  

        while (running) {  
            epoll_event events[MAX_EVENTS];  
            int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);  

            for (int i = 0; i < nfds; ++i) {  
                if (events[i].data.fd == server_fd) {  
                    handle_new_connection();  
                } else {  
                    handle_client_event(events[i]);  
                }  
            }  
        }  
    }  

private:  
    void worker() {  
        while (running) {  
            // Process data or perform other tasks  
            std::this_thread::sleep_for(std::chrono::milliseconds(1));  
        }  
    }  

    void add_to_epoll(int fd) {  
        epoll_event event;  
        event.events = EPOLLIN | EPOLLET;  
        event.data.fd = fd;  
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {  
            throw std::runtime_error("Failed to add file descriptor to epoll");  
        }  
    }  

    void handle_new_connection() {  
        sockaddr_in client_addr;  
        socklen_t client_len = sizeof(client_addr);  
        int client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);  

        if (client_fd == -1) {  
            std::cerr << "Failed to accept client connection" << std::endl;  
            return;  
        }  

        fcntl(client_fd, F_SETFL, O_NONBLOCK);  
        add_to_epoll(client_fd);  

        clients[client_fd] = std::make_unique<Client>(client_fd);  
        std::cout << "New client connected: " << client_fd << std::endl;  
    }  

    void handle_client_event(const epoll_event& event) {  
        int fd = event.data.fd;  
        auto it = clients.find(fd);  
        if (it == clients.end()) {  
            std::cerr << "Client not found: " << fd << std::endl;  
            return;  
        }  

        Client& client = *(it->second);  

        if (event.events & EPOLLIN) {  
            if (client.expected_size == 0) {  
                if (!read_size(client)) {  
                    return;  
                }  
            }  

            if (!read_data(client)) {  
                return;  
            }  

            if (client.received_size == client.expected_size) {  
                process_data(client);  
            }  
        }  

        if (event.events & (EPOLLRDHUP | EPOLLHUP)) {  
            close_connection(fd);  
        }  
    }  

    bool read_size(Client& client) {  
        uint64_t size;  
        ssize_t bytes_read = read(client.fd, &size, sizeof(size));  

        if (bytes_read == sizeof(size)) {  
            client.expected_size = size;  
            client.received_size = 0;  
            client.buffer.clear();  
            return true;  
        } else if (bytes_read == 0) {  
            close_connection(client.fd);  
        } else if (bytes_read == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {  
            std::cerr << "Error reading size from client " << client.fd << ": " << strerror(errno) << std::endl;  
            close_connection(client.fd);  
        }  

        return false;  
    }  

    bool read_data(Client& client) {  
        while (client.received_size < client.expected_size) {  
            ssize_t to_read = std::min(static_cast<uint64_t>(BUFFER_SIZE), client.expected_size - client.received_size);  
            ssize_t bytes_read = read(client.fd, client.buffer.data() + client.received_size, to_read);  

            if (bytes_read > 0) {  
                client.received_size += bytes_read;  
            } else if (bytes_read == 0) {  
                close_connection(client.fd);  
                return false;  
            } else if (bytes_read == -1) {  
                if (errno == EAGAIN || errno == EWOULDBLOCK) {  
                    return true;  
                } else {  
                    std::cerr << "Error reading data from client " << client.fd << ": " << strerror(errno) << std::endl;  
                    close_connection(client.fd);  
                    return false;  
                }  
            }  
        }  

        return true;  
    }  

    void process_data(Client& client) {  
        // Process the received data here  
        std::cout << "Received message of size: " << client.received_size << " bytes from client " << client.fd << std::endl;  

        // Echo back for this example  
        safe_write(client.fd, &client.expected_size, sizeof(client.expected_size));  
        safe_write(client.fd, client.buffer.data(), client.received_size);  

        client.expected_size = 0;  
        client.received_size = 0;  
    }  

    void safe_write(int fd, const void* buffer, size_t length) {  
        const char* ptr = static_cast<const char*>(buffer);  
        size_t remaining = length;  

        while (remaining > 0) {  
            ssize_t written = write(fd, ptr, remaining);  
            if (written > 0) {  
                ptr += written;  
                remaining -= written;  
            } else if (written == -1) {  
                if (errno == EINTR) {  
                    // 被信号中断,继续写入  
                    continue;  
                } else if (errno == EAGAIN || errno == EWOULDBLOCK) {  
                    // 暂时无法写入,稍后重试  
                    std::this_thread::sleep_for(std::chrono::milliseconds(1));  
                } else {  
                    // 其他错误,关闭连接  
                    std::cerr << "Error writing to client " << fd << ": " << strerror(errno) << std::endl;  
                    close_connection(fd);  
                    return;  
                }  
            }  
        }  
    }  

    void close_connection(int fd) {  
        std::cout << "Client disconnected: " << fd << std::endl;  
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);  
        close(fd);  
        clients.erase(fd);  
    }  
};  

int main(int argc, char* argv[]) {  
    if (argc != 2) {  
        std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;  
        return 1;  
    }  

    int port = std::stoi(argv[1]);  

    try {  
        Server server(port);  
        server.run();  
    } catch (const std::exception& e) {  
        std::cerr << "Error: " << e.what() << std::endl;  
        return 1;  
    }  

    return 0;  
}


```

这个服务器具有以下特点:

1. 使用epoll进行I/O多路复用,提高并发性能。

2. 支持多线程,使用线程池处理客户端请求。

3. 支持大数据块传输,使用固定大小的缓冲区(1MB)分块接收大型消息。

4. 实现了简单的协议:先发送8字节的消息大小,然后发送实际数据。

5. 使用非阻塞I/O和边缘触发(Edge-Triggered)模式。

6. 错误处理和资源管理。

7. 支持优雅关闭。

8 .在构造函数中添加了 signal(SIGPIPE, SIG_IGN); 来忽略 SIGPIPE 信号。这可以防止在写入已关闭的 socket 时程序被终止

9.添加了 safe_write 函数来替代直接的 write 调用。这个函数处理了各种可能的错误情况,包括:

(1)EINTR(被信号中断)
(2)EAGAIN 或 EWOULDBLOCK(暂时无法写入)
(3)其他错误(关闭连接)

10.在 process_data 函数中,使用 safe_write 替代了直接的 write 调用。


在 process_data 函数中,使用 safe_write 替代了直接的 write 调用。

要编译和运行这个程序,你需要一个支持C++11或更高版本的编译器。编译命令示例:

```
g++ -std=c++11 server.cpp -o server -pthread
```

运行命令示例(假设你想使用8080端口):

```
./server 8080
```

这个服务器应该能够处理大数据块传输的需求,并且具有良好的并发性能。在实际使用时,你可能还需要根据具体需求进行一些调整和优化。
 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!