一个使用epoll的高性能C++ socket服务器示例。这个服务器使用epoll进行I/O多路复用,支持大数据块传输,并使用线程池处理客户端连接。在构造函数中添加了 signal(SIGPIPE, SIG_IGN);
来忽略 SIGPIPE 信号。这可以防止在写入已关闭的 socket 时程序被终止,以下是代码实现:
```cpp
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <unordered_map>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
const int MAX_EVENTS = 10000;
const int BUFFER_SIZE = 1024 * 1024; // 1MB buffer
const int THREAD_POOL_SIZE = 4;
class Client {
public:
int fd;
std::vector<char> buffer;
uint64_t expected_size;
uint64_t received_size;
Client(int fd) : fd(fd), expected_size(0), received_size(0) {
buffer.reserve(BUFFER_SIZE);
}
};
class Server {
private:
int server_fd, epoll_fd;
std::vector<std::thread> thread_pool;
std::atomic<bool> running;
std::unordered_map<int, std::unique_ptr<Client>> clients;
public:
Server(int port) : running(true) {
// 忽略 SIGPIPE 信号
signal(SIGPIPE, SIG_IGN);
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
throw std::runtime_error("Failed to create socket");
}
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
throw std::runtime_error("Failed to set socket options");
}
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
throw std::runtime_error("Failed to bind to port");
}
if (listen(server_fd, SOMAXCONN) < 0) {
throw std::runtime_error("Failed to listen on socket");
}
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
throw std::runtime_error("Failed to create epoll file descriptor");
}
add_to_epoll(server_fd);
for (int i = 0; i < THREAD_POOL_SIZE; ++i) {
thread_pool.emplace_back(&Server::worker, this);
}
}
~Server() {
running = false;
for (auto& thread : thread_pool) {
thread.join();
}
close(epoll_fd);
close(server_fd);
}
void run() {
std::cout << "Server started. Waiting for connections..." << std::endl;
while (running) {
epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == server_fd) {
handle_new_connection();
} else {
handle_client_event(events[i]);
}
}
}
}
private:
void worker() {
while (running) {
// Process data or perform other tasks
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
void add_to_epoll(int fd) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
throw std::runtime_error("Failed to add file descriptor to epoll");
}
}
void handle_new_connection() {
sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd == -1) {
std::cerr << "Failed to accept client connection" << std::endl;
return;
}
fcntl(client_fd, F_SETFL, O_NONBLOCK);
add_to_epoll(client_fd);
clients[client_fd] = std::make_unique<Client>(client_fd);
std::cout << "New client connected: " << client_fd << std::endl;
}
void handle_client_event(const epoll_event& event) {
int fd = event.data.fd;
auto it = clients.find(fd);
if (it == clients.end()) {
std::cerr << "Client not found: " << fd << std::endl;
return;
}
Client& client = *(it->second);
if (event.events & EPOLLIN) {
if (client.expected_size == 0) {
if (!read_size(client)) {
return;
}
}
if (!read_data(client)) {
return;
}
if (client.received_size == client.expected_size) {
process_data(client);
}
}
if (event.events & (EPOLLRDHUP | EPOLLHUP)) {
close_connection(fd);
}
}
bool read_size(Client& client) {
uint64_t size;
ssize_t bytes_read = read(client.fd, &size, sizeof(size));
if (bytes_read == sizeof(size)) {
client.expected_size = size;
client.received_size = 0;
client.buffer.clear();
return true;
} else if (bytes_read == 0) {
close_connection(client.fd);
} else if (bytes_read == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
std::cerr << "Error reading size from client " << client.fd << ": " << strerror(errno) << std::endl;
close_connection(client.fd);
}
return false;
}
bool read_data(Client& client) {
while (client.received_size < client.expected_size) {
ssize_t to_read = std::min(static_cast<uint64_t>(BUFFER_SIZE), client.expected_size - client.received_size);
ssize_t bytes_read = read(client.fd, client.buffer.data() + client.received_size, to_read);
if (bytes_read > 0) {
client.received_size += bytes_read;
} else if (bytes_read == 0) {
close_connection(client.fd);
return false;
} else if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return true;
} else {
std::cerr << "Error reading data from client " << client.fd << ": " << strerror(errno) << std::endl;
close_connection(client.fd);
return false;
}
}
}
return true;
}
void process_data(Client& client) {
// Process the received data here
std::cout << "Received message of size: " << client.received_size << " bytes from client " << client.fd << std::endl;
// Echo back for this example
safe_write(client.fd, &client.expected_size, sizeof(client.expected_size));
safe_write(client.fd, client.buffer.data(), client.received_size);
client.expected_size = 0;
client.received_size = 0;
}
void safe_write(int fd, const void* buffer, size_t length) {
const char* ptr = static_cast<const char*>(buffer);
size_t remaining = length;
while (remaining > 0) {
ssize_t written = write(fd, ptr, remaining);
if (written > 0) {
ptr += written;
remaining -= written;
} else if (written == -1) {
if (errno == EINTR) {
// 被信号中断,继续写入
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 暂时无法写入,稍后重试
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} else {
// 其他错误,关闭连接
std::cerr << "Error writing to client " << fd << ": " << strerror(errno) << std::endl;
close_connection(fd);
return;
}
}
}
}
void close_connection(int fd) {
std::cout << "Client disconnected: " << fd << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
clients.erase(fd);
}
};
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;
return 1;
}
int port = std::stoi(argv[1]);
try {
Server server(port);
server.run();
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
```
这个服务器具有以下特点:
1. 使用epoll进行I/O多路复用,提高并发性能。
2. 支持多线程,使用线程池处理客户端请求。
3. 支持大数据块传输,使用固定大小的缓冲区(1MB)分块接收大型消息。
4. 实现了简单的协议:先发送8字节的消息大小,然后发送实际数据。
5. 使用非阻塞I/O和边缘触发(Edge-Triggered)模式。
6. 错误处理和资源管理。
7. 支持优雅关闭。
8 .在构造函数中添加了 signal(SIGPIPE, SIG_IGN);
来忽略 SIGPIPE 信号。这可以防止在写入已关闭的 socket 时程序被终止
9.添加了 safe_write
函数来替代直接的 write
调用。这个函数处理了各种可能的错误情况,包括:
(1)EINTR(被信号中断)
(2)EAGAIN 或 EWOULDBLOCK(暂时无法写入)
(3)其他错误(关闭连接)
10.在 process_data 函数中,使用 safe_write 替代了直接的 write 调用。
在 process_data 函数中,使用 safe_write 替代了直接的 write 调用。
要编译和运行这个程序,你需要一个支持C++11或更高版本的编译器。编译命令示例:
```
g++ -std=c++11 server.cpp -o server -pthread
```
运行命令示例(假设你想使用8080端口):
```
./server 8080
```
这个服务器应该能够处理大数据块传输的需求,并且具有良好的并发性能。在实际使用时,你可能还需要根据具体需求进行一些调整和优化。