Golang TCP/IP服务器/客户端应用程序,设计一个简单可靠帧传送通信协议。(并且正确处理基于流式控制协议,带来的应用层沾帧[沾包]问题)

avatar
作者
筋斗云
阅读量:7

在 Golang 语言标准库之中提供了,对于TCP/IP链接、侦听器的高级封装支持,这易于上层开发人员轻松基于这些BCL(基础类库)实现期望的功能。

TCP/IP链接(客户端)

net.Conn 接口

TCP/IP侦听器(服务器)

net.Listener

Golang 提供了易用的写入数据到远程(对端)实现,而不比像 C/C++ 这类传统的编程语言,人们需要自行处理发送的字节数。

例如:

原生:send、WSASend、WSPSend 等函数

ASIO:stream::async_write_some    等函数

它与 Microsoft .NET 提供的 System.Net.Socket 类的发送函数功能是类似的,调用该函数发送数据,它会确保数据全部发送到对端(远端),否则视为失败。

在实际生产环境之中,绝大多数的场景上面,人们的确不需要调用一次发送函数,但不保证本次期望传送数据全部发送成功,而是潜在的可能只发送一部分,还需要开发人员自行处理,这样繁琐的TCP网络程序发送实现的。

但这在一些特定场景的网络程序上面是有意义的,例如我们需要知道已用掉了多少的流量,因为这一次缓冲区发送并没有全部传送到远端,但已经传送了一部分也生产了网络带宽资源的浪费,所以,像这种问题,Golang 不提供类似接口,它这块的不自由,是会有一些问题的。

较为庆幸的是:

net.Conn 接口提供的 Read 函数并非是保证一定读入期望BUF大小的,否则这个在很多类型的网络程序上面就很坑人了。

它就相当于传统阻塞的 recv,不会出现非阻塞的EAGIN要求开发人员重试的操作的问题,所有它只有返回已收到的字节数,或发生错误。

当然人们仍需处理一个特殊的情况,recv 可能返回FIN 0字节,但并非错误,这是因为对端正确的关闭了TCP链接时产生的。

但遇到类似这类型的场景还是用 C/C++、或者CGO调用原生C API来实现把,功能上面都可以解决,只是用GO语言整会很麻烦就是了。

本文提供一个简单的网络传输协议,适用四个字节来表示长度,一个字节来表示关键帧字,不考虑对于流的效验合(checksum)的计算及验证,人们若有需求可以自行修改,在大多数的TCP应用协议服务器上面,它都可以经过少量修改集成到解决方案之中。(Go 语言之中或许该称为集成到 Package 程序包之中)

四个字节长度,可以描述到一帧最大 INT32_MAX(2147483647)字节封装传送,其实绝大多数情况传递大包是没有太大意义的,人们可以自行评估调整。

值得一提,在绝大多数的场景之中,如若产生大包,三个字节来表示长度,人们自行位运算即可,这是因为过大的帧长,可能会导致网络程序在接受这些大数据帧时,产生严重的内存恐慌问题。

个人一个好的建议是,对于追求网络吞吐性能的TCP应用协议,人们在适用 Golang 应该直接废弃掉,没有任何意义的各种接口及封装实现,如返回  io.Reader,并且应当适用固定缓冲区的最大帧对齐,如:4K,即用户不要发送超过最大对齐(4K)的单帧报文。

随机内存分配会导致碎片化的产生,影响网络程序的吞吐能力,同时频繁的内存复制也会导致内存、及CPU计算资源负载升高。

但在大多数场景的网络程序来说,并不需要在意这块的优化,因为没有太大意义,但对于纯网络IO密集型应用来说,这是有很大必要的。

本文提供的实现不适用上述场景,但可以适用于略微带一些大包处理(即用户不愿意在业务层分片、组片的场景),但本人更希望大家趋近于共同学习目的。

运行测试:

go run -race test.go

服务器及客户端实现及封装:(含测试用例)

main.go

package main  import ( 	"encoding/binary" 	"errors" 	"fmt" 	"io" 	"math" 	"math/rand" 	"net" 	"strconv" 	"sync" 	"time" )  type _ConnectionReader struct { 	owner       *Connection 	length      int 	offset      int 	checksum    uint32 	header_recv []byte 	lock_recv   sync.Mutex }  type Connection struct { 	disposed    bool 	connection  net.Conn 	header_send []byte 	lock_sent   sync.Mutex 	reader      *_ConnectionReader 	listener    *Listener }  type Listener struct { 	sync.Mutex 	disposed    bool 	listener    net.Listener 	connections map[*Connection]bool }  /* #pragma pack(push, 1) typedef struct { 	BYTE  bKf;       // 关键帧字 	DWORD dwLength;  // 载荷长度 } PACKET_HEADER; #pragma pack(pop)  static constexpr int PACKET_HEADER_SIZE = sizeof(PACKET_HEADER); // 4 + 1 = 5 BYTE */  const ( 	_CONNECTION_PACKET_HEADER_KF   = 0x2A // 置关键帧字 	_CONNECTION_PACKET_HEADER_SIZE = 5 	CONNECTION_MIN_PORT            = 0 	CONNECTION_MAX_PORT            = math.MaxUint16 )  var ErrConnectionClosed = errors.New("connection has been closed") var ErrConnectionArgP = errors.New("the parameter p cannot be incorrectly null or array length 0") var ErrConnectionProtocolKf = errors.New("network protocol error, kf check error") var ErrConnectionProtocolLength = errors.New("network protocol error, length check error") var ErrConnectionArgAcceptor = errors.New("the acceptor parameter cannot be null") var ErrConnectionDisconnect = errors.New("connection has been disconnect")  // 功能名:发送数据 // 返回值: // <  0 发送错误(ERR) // == 0 链接断开(FIN) // >  0 已发送字节数 func (my *Connection) Send(buffer []byte, offset int, length int) int { 	// 对于欲发送数据的参数检查 	if buffer == nil || offset < 0 || length < 1 { 		return -1 	}  	// 检查是否溢出BUFF缓存大小 	len := len(buffer) 	if offset+length > len { 		return -1 	}  	// 检查链接是否存在 	connection := my.connection 	if connection == nil { 		return -1 	}  	// 预备环境及变量 	bytes_transferred := 0 	sync := &my.lock_sent 	header := my.header_send 	payload := buffer[offset : offset+length]  	// 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。 	sync.Lock() 	defer sync.Unlock()  	// 检查当前链接是否已经释放 	if my.disposed { 		return -1 	}  	// 先发送协议帧头 	header[0] = _CONNECTION_PACKET_HEADER_KF 	binary.BigEndian.PutUint32(header[1:], uint32(length))  	written_size, err := connection.Write(header) 	if err != nil { 		return -1 	} else { 		bytes_transferred += written_size 	}  	// 在发送协议载荷 	written_size, err = connection.Write(payload) 	if err != nil { 		return -1 	}  	// 加上已传送的字节数 	bytes_transferred += written_size 	return bytes_transferred }  // 功能名:收取数据 // 上个 Reader 未完成之前一直阻塞当前协程直到对方结束后返回 func (my *Connection) Receive() io.Reader { 	// 检查当前链接是否已经释放 	if my.disposed { 		return nil 	}  	// 检查链接是否存在 	connection := my.connection 	if connection == nil { 		return nil 	}  	// 返回帧读入器 	reader := my.reader 	reader.lock_recv.Lock() 	return reader }  // 功能名:实例化一个链接对象 func NewConnection(conn net.Conn, listener *Listener) *Connection { 	var connection *Connection  	if conn != nil { 		connection = &Connection{ 			disposed:    false, 			connection:  conn, 			listener:    listener, 			header_send: make([]byte, _CONNECTION_PACKET_HEADER_SIZE), 		} 		connection.reader = &_ConnectionReader{ 			owner:       connection, 			length:      0, 			offset:      0, 			checksum:    0, 			header_recv: make([]byte, _CONNECTION_PACKET_HEADER_SIZE), 		} 	} 	return connection }  // 功能名:链接主机 func Connect(host string, port int) *Connection { 	// 检查端口参数的有效性 	if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT { 		return nil 	}  	// 服务器主机地址不可为空 	if len(host) < 1 { 		return nil 	}  	// 服务器地址并且尝试链接 	address := host + ":" + strconv.Itoa(port) 	conn, err := net.Dial("tcp", address) 	if err != nil { 		return nil 	}  	// 返回TCP链接的封装对象 	return NewConnection(conn, nil) }  // 功能名:关闭链接(网络) func (my *Connection) close(connection net.Conn) error { 	// 强制关闭链接,但可能会失败 	if my.disposed { 		return nil 	}  	my.disposed = true 	return connection.Close() }  // 功能名:关闭链接 func (my *Connection) Close(await bool) (err error) { 	// 检查链接是否存在 	connection := my.connection 	if connection == nil { 		return 	}  	// 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。 	sync := &my.lock_sent 	if await { 		sync.Lock() 		sync.Unlock()  		// 检查当前链接是否已经释放 		err = my.close(connection) 	} else { 		err = my.close(connection) 	}  	// 如果是服务器接受的链接对象,就从服务器列表之中删除这个链接实例。 	listener := my.listener 	if listener != nil { 		listener.Lock() 		delete(listener.connections, my) 		listener.Unlock() 	}  	return }  func (my *Connection) connection_get_ip_end_point(remote bool) string { 	connection := my.connection 	if connection == nil { 		return "" 	}  	var address net.Addr 	if remote { 		address = connection.RemoteAddr() 	} else { 		address = connection.LocalAddr() 	}  	if address == nil { 		return "" 	}  	return address.String() }  // 功能名:获取远程地址 func (my *Connection) GetRemoteEndPoint() string { 	return my.connection_get_ip_end_point(true) }  // 功能名:获取本地地址 func (my *Connection) GetLocalEndPoint() string { 	return my.connection_get_ip_end_point(false) }  // 功能名:读入帧数据 func (my *_ConnectionReader) Read(p []byte) (n int, err error) { 	// 检查当前链接是否已经释放 	owner := my.owner 	if owner.disposed { 		return 0, ErrConnectionClosed 	}  	// 检查参数P不可以为NUL或数组长度为0 	length := len(p) 	if length < 1 { 		return 0, ErrConnectionArgP 	}  	// 帧已经被全部收取完成 	if my.length < 0 { 		my.length = 0 		my.lock_recv.Unlock() 		return 0, io.EOF 	}  	// 收取协议报文的头部 	if my.length == 0 { 		header := my.header_recv 		n, err := io.ReadFull(owner.connection, header) 		if err != nil { 			return n, err 		}  		// 判断协议关键帧字 		kf := header[0] 		if kf != _CONNECTION_PACKET_HEADER_KF { 			return 0, ErrConnectionProtocolKf 		}  		// 检查载荷的总长度 		my.length = int(binary.BigEndian.Uint32(header[1:])) 		my.offset = 0 		my.checksum = 0 		if my.length < 1 { 			return 0, ErrConnectionProtocolLength 		} 	}  	// 循环收取数据到缓存区P之中 	remain := my.length - my.offset 	if length <= remain { 		n, err = owner.connection.Read(p) 	} else { 		n, err = owner.connection.Read(p[:remain]) 	}  	// 从链接之中读入数据出现错误 	if err != nil { 		return n, err 	}  	// 是否收取到FIN字节(0) 	if n < 1 { 		return n, ErrConnectionDisconnect 	}  	// 计算当前帧是否已经收取完毕 	my.offset += n 	if my.offset < my.length { 		return n, nil 	} else { 		my.offset = 0 		my.length = -1 		my.checksum = 0 		return n, nil 	} }  // 功能名:实例化一个侦听器 func NewListener(host string, port int) *Listener { 	// 检查端口参数的有效性 	if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT { 		return nil 	}  	// 服务器主机地址不可为空 	if len(host) < 1 { 		return nil 	}  	// 服务器地址并且尝试绑定 	address := host + ":" + strconv.Itoa(port) 	listener, err := net.Listen("tcp", address) 	if err != nil { 		return nil 	}  	return &Listener{ 		disposed:    false, 		listener:    listener, 		connections: make(map[*Connection]bool), 	} }  // 功能名:侦听服务器 func (my *Listener) ListenAndServe(acceptor func(*Connection)) error { 	// 接收器参数不可以为空 	if acceptor == nil { 		return ErrConnectionArgAcceptor 	}  	// 网络侦听器已经关闭 	if my.disposed { 		return ErrConnectionClosed 	}  	any := false 	listener := my.listener 	for { 		// 网络如果已经被关闭了 		if my.disposed { 			return nil 		}  		// 尝试接收一个网络链接 		conn, err := listener.Accept() 		if err != nil { 			if any { 				return nil 			} else { 				return err 			} 		}  		// 如果没有获取到链接的引用则迭代到下个链接接受 		if conn == nil { 			continue 		}  		// 构建一个封装的网络链接对象 		connection := NewConnection(conn, my) 		my.Lock() 		my.connections[connection] = true 		my.Unlock()  		// 启动对于链接处理的协同程序 		go acceptor(connection) 	} }  // 功能名:关闭全部链接 func (my *Listener) Close() { 	// 强制关闭服务器的侦听器 	listener := my.listener 	if listener != nil { 		listener.Close() 	}  	// 释放全部持有的托管资源 	my.Lock() 	my.disposed = true 	connections := my.connections 	my.connections = make(map[*Connection]bool) 	my.Unlock()  	// 强制关闭全部的网络链接 	for connection := range connections { 		connection.Close(false) 	} }  func test() { 	rand.Seed(time.Now().UnixNano())  	// 链接服务器 	packet := 0 	connection := Connect("127.0.0.1", 11111) 	for i, c := 0, rand.Intn(100)+1; i < c; i++ { 		length := rand.Intn(128) + 1 		buffer := make([]byte, length) 		for j := 0; j < length; j++ { 			buffer[j] = byte(rand.Intn(26)) + 97 		}  		// 发送数据 		transferred := connection.Send(buffer, 0, length) 		if transferred < 1 { 			break 		} else { 			// 接受数据 			r := connection.Receive() 			if r == nil { 				break 			}  			// 读取全部数据(一帧) 			buf, err := io.ReadAll(r) 			if err != nil { 				break 			} else if len(buf) < 1 { 				break 			}  			// 打印收到的帧数据 			packet++ 			fmt.Printf("[%s]: client packet=%d length=%d string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), string(buf)) 		} 	}  	// 关闭链接 	connection.Close(true)  	// 客户端关闭网络链接 	fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "client connection closed") }  func main() { 	// 运行客户端测试协程 	go test()  	// 打开服务器侦听器哟 	listener := NewListener("127.0.0.1", 11111) 	listener.ListenAndServe(func(c *Connection) { 		packet := 0 		remoteEP := c.GetRemoteEndPoint() 		for { 			// 获取网络接收器 			r := c.Receive() 			if r == nil { 				break 			}  			// 读取全部数据(一帧) 			buf, err := io.ReadAll(r) 			if err != nil { 				break 			} else if len(buf) < 1 { 				break 			}  			// 打印收到的帧数据 			packet++ 			fmt.Printf("[%s]: server packet=%d length=%d remote=%s string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), remoteEP, string(buf))  			// 回显客户端的数据 			transferred := c.Send(buf, 0, len(buf)) 			if transferred < 1 { 				break 			} 		}  		// 关闭客户端链接 		c.Close(true)  		// 服务器关闭网络链接 		fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "server connection closed") 	}) } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!