在 Golang 语言标准库之中提供了,对于TCP/IP链接、侦听器的高级封装支持,这易于上层开发人员轻松基于这些BCL(基础类库)实现期望的功能。
TCP/IP链接(客户端)
net.Conn 接口
TCP/IP侦听器(服务器)
net.Listener
Golang 提供了易用的写入数据到远程(对端)实现,而不比像 C/C++ 这类传统的编程语言,人们需要自行处理发送的字节数。
例如:
原生:send、WSASend、WSPSend 等函数
ASIO:stream::async_write_some 等函数
它与 Microsoft .NET 提供的 System.Net.Socket 类的发送函数功能是类似的,调用该函数发送数据,它会确保数据全部发送到对端(远端),否则视为失败。
在实际生产环境之中,绝大多数的场景上面,人们的确不需要调用一次发送函数,但不保证本次期望传送数据全部发送成功,而是潜在的可能只发送一部分,还需要开发人员自行处理,这样繁琐的TCP网络程序发送实现的。
但这在一些特定场景的网络程序上面是有意义的,例如我们需要知道已用掉了多少的流量,因为这一次缓冲区发送并没有全部传送到远端,但已经传送了一部分也生产了网络带宽资源的浪费,所以,像这种问题,Golang 不提供类似接口,它这块的不自由,是会有一些问题的。
较为庆幸的是:
net.Conn 接口提供的 Read 函数并非是保证一定读入期望BUF大小的,否则这个在很多类型的网络程序上面就很坑人了。
它就相当于传统阻塞的 recv,不会出现非阻塞的EAGIN要求开发人员重试的操作的问题,所有它只有返回已收到的字节数,或发生错误。
当然人们仍需处理一个特殊的情况,recv 可能返回FIN 0字节,但并非错误,这是因为对端正确的关闭了TCP链接时产生的。
但遇到类似这类型的场景还是用 C/C++、或者CGO调用原生C API来实现把,功能上面都可以解决,只是用GO语言整会很麻烦就是了。
本文提供一个简单的网络传输协议,适用四个字节来表示长度,一个字节来表示关键帧字,不考虑对于流的效验合(checksum)的计算及验证,人们若有需求可以自行修改,在大多数的TCP应用协议服务器上面,它都可以经过少量修改集成到解决方案之中。(Go 语言之中或许该称为集成到 Package 程序包之中)
四个字节长度,可以描述到一帧最大 INT32_MAX(2147483647)字节封装传送,其实绝大多数情况传递大包是没有太大意义的,人们可以自行评估调整。
值得一提,在绝大多数的场景之中,如若产生大包,三个字节来表示长度,人们自行位运算即可,这是因为过大的帧长,可能会导致网络程序在接受这些大数据帧时,产生严重的内存恐慌问题。
个人一个好的建议是,对于追求网络吞吐性能的TCP应用协议,人们在适用 Golang 应该直接废弃掉,没有任何意义的各种接口及封装实现,如返回 io.Reader,并且应当适用固定缓冲区的最大帧对齐,如:4K,即用户不要发送超过最大对齐(4K)的单帧报文。
随机内存分配会导致碎片化的产生,影响网络程序的吞吐能力,同时频繁的内存复制也会导致内存、及CPU计算资源负载升高。
但在大多数场景的网络程序来说,并不需要在意这块的优化,因为没有太大意义,但对于纯网络IO密集型应用来说,这是有很大必要的。
本文提供的实现不适用上述场景,但可以适用于略微带一些大包处理(即用户不愿意在业务层分片、组片的场景),但本人更希望大家趋近于共同学习目的。
运行测试:
go run -race test.go
服务器及客户端实现及封装:(含测试用例)
main.go
package main import ( "encoding/binary" "errors" "fmt" "io" "math" "math/rand" "net" "strconv" "sync" "time" ) type _ConnectionReader struct { owner *Connection length int offset int checksum uint32 header_recv []byte lock_recv sync.Mutex } type Connection struct { disposed bool connection net.Conn header_send []byte lock_sent sync.Mutex reader *_ConnectionReader listener *Listener } type Listener struct { sync.Mutex disposed bool listener net.Listener connections map[*Connection]bool } /* #pragma pack(push, 1) typedef struct { BYTE bKf; // 关键帧字 DWORD dwLength; // 载荷长度 } PACKET_HEADER; #pragma pack(pop) static constexpr int PACKET_HEADER_SIZE = sizeof(PACKET_HEADER); // 4 + 1 = 5 BYTE */ const ( _CONNECTION_PACKET_HEADER_KF = 0x2A // 置关键帧字 _CONNECTION_PACKET_HEADER_SIZE = 5 CONNECTION_MIN_PORT = 0 CONNECTION_MAX_PORT = math.MaxUint16 ) var ErrConnectionClosed = errors.New("connection has been closed") var ErrConnectionArgP = errors.New("the parameter p cannot be incorrectly null or array length 0") var ErrConnectionProtocolKf = errors.New("network protocol error, kf check error") var ErrConnectionProtocolLength = errors.New("network protocol error, length check error") var ErrConnectionArgAcceptor = errors.New("the acceptor parameter cannot be null") var ErrConnectionDisconnect = errors.New("connection has been disconnect") // 功能名:发送数据 // 返回值: // < 0 发送错误(ERR) // == 0 链接断开(FIN) // > 0 已发送字节数 func (my *Connection) Send(buffer []byte, offset int, length int) int { // 对于欲发送数据的参数检查 if buffer == nil || offset < 0 || length < 1 { return -1 } // 检查是否溢出BUFF缓存大小 len := len(buffer) if offset+length > len { return -1 } // 检查链接是否存在 connection := my.connection if connection == nil { return -1 } // 预备环境及变量 bytes_transferred := 0 sync := &my.lock_sent header := my.header_send payload := buffer[offset : offset+length] // 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。 sync.Lock() defer sync.Unlock() // 检查当前链接是否已经释放 if my.disposed { return -1 } // 先发送协议帧头 header[0] = _CONNECTION_PACKET_HEADER_KF binary.BigEndian.PutUint32(header[1:], uint32(length)) written_size, err := connection.Write(header) if err != nil { return -1 } else { bytes_transferred += written_size } // 在发送协议载荷 written_size, err = connection.Write(payload) if err != nil { return -1 } // 加上已传送的字节数 bytes_transferred += written_size return bytes_transferred } // 功能名:收取数据 // 上个 Reader 未完成之前一直阻塞当前协程直到对方结束后返回 func (my *Connection) Receive() io.Reader { // 检查当前链接是否已经释放 if my.disposed { return nil } // 检查链接是否存在 connection := my.connection if connection == nil { return nil } // 返回帧读入器 reader := my.reader reader.lock_recv.Lock() return reader } // 功能名:实例化一个链接对象 func NewConnection(conn net.Conn, listener *Listener) *Connection { var connection *Connection if conn != nil { connection = &Connection{ disposed: false, connection: conn, listener: listener, header_send: make([]byte, _CONNECTION_PACKET_HEADER_SIZE), } connection.reader = &_ConnectionReader{ owner: connection, length: 0, offset: 0, checksum: 0, header_recv: make([]byte, _CONNECTION_PACKET_HEADER_SIZE), } } return connection } // 功能名:链接主机 func Connect(host string, port int) *Connection { // 检查端口参数的有效性 if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT { return nil } // 服务器主机地址不可为空 if len(host) < 1 { return nil } // 服务器地址并且尝试链接 address := host + ":" + strconv.Itoa(port) conn, err := net.Dial("tcp", address) if err != nil { return nil } // 返回TCP链接的封装对象 return NewConnection(conn, nil) } // 功能名:关闭链接(网络) func (my *Connection) close(connection net.Conn) error { // 强制关闭链接,但可能会失败 if my.disposed { return nil } my.disposed = true return connection.Close() } // 功能名:关闭链接 func (my *Connection) Close(await bool) (err error) { // 检查链接是否存在 connection := my.connection if connection == nil { return } // 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。 sync := &my.lock_sent if await { sync.Lock() sync.Unlock() // 检查当前链接是否已经释放 err = my.close(connection) } else { err = my.close(connection) } // 如果是服务器接受的链接对象,就从服务器列表之中删除这个链接实例。 listener := my.listener if listener != nil { listener.Lock() delete(listener.connections, my) listener.Unlock() } return } func (my *Connection) connection_get_ip_end_point(remote bool) string { connection := my.connection if connection == nil { return "" } var address net.Addr if remote { address = connection.RemoteAddr() } else { address = connection.LocalAddr() } if address == nil { return "" } return address.String() } // 功能名:获取远程地址 func (my *Connection) GetRemoteEndPoint() string { return my.connection_get_ip_end_point(true) } // 功能名:获取本地地址 func (my *Connection) GetLocalEndPoint() string { return my.connection_get_ip_end_point(false) } // 功能名:读入帧数据 func (my *_ConnectionReader) Read(p []byte) (n int, err error) { // 检查当前链接是否已经释放 owner := my.owner if owner.disposed { return 0, ErrConnectionClosed } // 检查参数P不可以为NUL或数组长度为0 length := len(p) if length < 1 { return 0, ErrConnectionArgP } // 帧已经被全部收取完成 if my.length < 0 { my.length = 0 my.lock_recv.Unlock() return 0, io.EOF } // 收取协议报文的头部 if my.length == 0 { header := my.header_recv n, err := io.ReadFull(owner.connection, header) if err != nil { return n, err } // 判断协议关键帧字 kf := header[0] if kf != _CONNECTION_PACKET_HEADER_KF { return 0, ErrConnectionProtocolKf } // 检查载荷的总长度 my.length = int(binary.BigEndian.Uint32(header[1:])) my.offset = 0 my.checksum = 0 if my.length < 1 { return 0, ErrConnectionProtocolLength } } // 循环收取数据到缓存区P之中 remain := my.length - my.offset if length <= remain { n, err = owner.connection.Read(p) } else { n, err = owner.connection.Read(p[:remain]) } // 从链接之中读入数据出现错误 if err != nil { return n, err } // 是否收取到FIN字节(0) if n < 1 { return n, ErrConnectionDisconnect } // 计算当前帧是否已经收取完毕 my.offset += n if my.offset < my.length { return n, nil } else { my.offset = 0 my.length = -1 my.checksum = 0 return n, nil } } // 功能名:实例化一个侦听器 func NewListener(host string, port int) *Listener { // 检查端口参数的有效性 if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT { return nil } // 服务器主机地址不可为空 if len(host) < 1 { return nil } // 服务器地址并且尝试绑定 address := host + ":" + strconv.Itoa(port) listener, err := net.Listen("tcp", address) if err != nil { return nil } return &Listener{ disposed: false, listener: listener, connections: make(map[*Connection]bool), } } // 功能名:侦听服务器 func (my *Listener) ListenAndServe(acceptor func(*Connection)) error { // 接收器参数不可以为空 if acceptor == nil { return ErrConnectionArgAcceptor } // 网络侦听器已经关闭 if my.disposed { return ErrConnectionClosed } any := false listener := my.listener for { // 网络如果已经被关闭了 if my.disposed { return nil } // 尝试接收一个网络链接 conn, err := listener.Accept() if err != nil { if any { return nil } else { return err } } // 如果没有获取到链接的引用则迭代到下个链接接受 if conn == nil { continue } // 构建一个封装的网络链接对象 connection := NewConnection(conn, my) my.Lock() my.connections[connection] = true my.Unlock() // 启动对于链接处理的协同程序 go acceptor(connection) } } // 功能名:关闭全部链接 func (my *Listener) Close() { // 强制关闭服务器的侦听器 listener := my.listener if listener != nil { listener.Close() } // 释放全部持有的托管资源 my.Lock() my.disposed = true connections := my.connections my.connections = make(map[*Connection]bool) my.Unlock() // 强制关闭全部的网络链接 for connection := range connections { connection.Close(false) } } func test() { rand.Seed(time.Now().UnixNano()) // 链接服务器 packet := 0 connection := Connect("127.0.0.1", 11111) for i, c := 0, rand.Intn(100)+1; i < c; i++ { length := rand.Intn(128) + 1 buffer := make([]byte, length) for j := 0; j < length; j++ { buffer[j] = byte(rand.Intn(26)) + 97 } // 发送数据 transferred := connection.Send(buffer, 0, length) if transferred < 1 { break } else { // 接受数据 r := connection.Receive() if r == nil { break } // 读取全部数据(一帧) buf, err := io.ReadAll(r) if err != nil { break } else if len(buf) < 1 { break } // 打印收到的帧数据 packet++ fmt.Printf("[%s]: client packet=%d length=%d string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), string(buf)) } } // 关闭链接 connection.Close(true) // 客户端关闭网络链接 fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "client connection closed") } func main() { // 运行客户端测试协程 go test() // 打开服务器侦听器哟 listener := NewListener("127.0.0.1", 11111) listener.ListenAndServe(func(c *Connection) { packet := 0 remoteEP := c.GetRemoteEndPoint() for { // 获取网络接收器 r := c.Receive() if r == nil { break } // 读取全部数据(一帧) buf, err := io.ReadAll(r) if err != nil { break } else if len(buf) < 1 { break } // 打印收到的帧数据 packet++ fmt.Printf("[%s]: server packet=%d length=%d remote=%s string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), remoteEP, string(buf)) // 回显客户端的数据 transferred := c.Send(buf, 0, len(buf)) if transferred < 1 { break } } // 关闭客户端链接 c.Close(true) // 服务器关闭网络链接 fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "server connection closed") }) }