本文探究一种轻量级的 pool 实现 ftp 连接。
一、背景
简要介绍:业务中一般使用较多的是各种开源组件,设计有点重,因此本文探究一种轻量级的 pool 池的思想实现。
期望:设置连接池最大连接数为 N 时,批量执行 M 个 FTP 请求,所有请求都可以成功。
关键点: 使用池的思想存储 FTP 链接,同时控制 FTP 连接的数量。
二、池思想及 sync.Pool 重点分析
池思想设计模式
Golang 是有自动垃圾回收机制的编程语言,使用三色并发标记算法标记对象并回收。但是如果想开发一个高性能的应用程序,就必须考虑垃圾回收给性能带来的影响。因为 Go 的垃圾回收机制有一个 STW 时间,而且在堆上大量创建对象,也会影响垃圾回收标记时间。
通常采用对象池的方式,将不用的对象回收起来,避免垃圾回收。另外,像数据库连接、TCP 长连接等,连接的创建是非常耗时操作,如果每次使用都创建新链接,则整个业务很多时间都花在了创建链接上了。因此,若将这些链接保存起来,避免每次使用时都重新创建,则能大大降低业务耗时,提升系统整体性能。
sync.Pool 要点
golang 中提供了 sync.Pool 数据结构,可以使用它来创建池化对象。不过使用时有几个重点是要关注的,避免踩坑:
- sync.Pool 本身是线程安全的,多个 goroutine 可以并发地调用存取对象。
- sync.Pool 不可用在使用之后复制使用。关于这一点 context 包里面有大量使用,不再赘述。
- sync.Pool 用来保存的是一组可独立访问的“临时”对象。注意这里的“临时”,这表明池中的对象可能在未来某个时间被毫无预兆的移除(因为长久不使用被 GC 回收了)。
关于第 3 点非常重要,下面我们实现一个 demo 来详细说明:
package main import ( "fmt" "net/http" "sync" "time" ) func main() { var p sync.Pool // 创建一个对象池 p.New = func() interface{} { return &http.Client{ Timeout: 5 * time.Second, } } var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func() { defer wg.Done() client := p.Get().(*http.Client) defer p.Put(client) //获取http请求并打印返回码 resp, err := client.Get("https://www.baidu.com") if err != nil { fmt.Println("http get error", err) return } resp.Body.Close() fmt.Println("success", resp.Status) }() } //等待所有请求结束 wg.Wait() }
这里我们使用 New 定义了创建 http.Client 的方法,然后启动 10 个 goroutine 并发访问网址,使用的 http.Client 对象都是从池中获取的,使用完毕后再放回到池子。
实际上,这个池中可能创建了 10 个 http.Client ,也可能创建了 8 个,还有可能创建了 3 个。取决于每个请求执行时池中是否有空闲的 http.Client ,以及其它的 goroutine 是否及时的放回去。
另外这里要注意的是,我们设置了 New 字段,当没有空闲请求时,Get 方法会调用 New 重新生成一个新的 http.Client。这种方式实现的好处是不必担心没有 http.Client 可用,缺点是数量不可控。你可能会想,不设置 New 字段是否可以?也是可以的,实现如下:
package main import ( "fmt" "net/http" "sync" "time" ) func main() { var p sync.Pool // 创建一个对象池 for i := 0; i < 5; i++ { p.Put(&http.Client{Timeout: 5 * time.Second}) // 不设置 New 字段,初始化时就放入5个可重用对象 } var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func() { defer wg.Done() client, ok := p.Get().(*http.Client) if !ok { fmt.Println("get client is nil") return } defer p.Put(client) resp, err := client.Get("https://www.baidu.com") if err != nil { fmt.Println("http get error", err) return } resp.Body.Close() fmt.Println("success", resp.Status) }() } //等待所有请求结束 wg.Wait() }
在初始化时直接放入一定数量的可重用对象,从而达到了控制数量的目的。但是不设置 New 字段的风险很大,因为池化的对象如果长时间没有被调用,可能会被回收,而我们是无法预知什么时候池化的对象是会被回收的。因此一般不会使用这种方式,而是通过其它的方式来实现并发数量控制。
至此,也清楚了我们想实现的诉求:既要通过池满足连接复用,也要控制连接数量。(我们已经知道,仅仅依靠 sync.Pool 是实现不了的)
三、FTP 连接池的实现
- 创建 ftp docker 容器
docker run -d --name ftp_server \ -p 2100:21 \ -p 30010-30019:30010-30019 \ -e "FTP_PASSIVE_PORTS=30010:30019" \ -e FTP_USER_HOME=/home/test \ -e FTP_USER_NAME=test \ -e FTP_USER_PASS=123456 \ -e FTP_USER_LIMIT=30 \ -e "PUBLICHOST=localhost" \ stilliard/pure-ftpd
- 使用 golang ftp client 库进行代码开发
package main import ( "bytes" "fmt" "time" "github.com/jlaffaye/ftp" ) type FTPConnectionPool struct { conns chan *ftp.ServerConn maxConns int } func NewFTPConnectionPool(server, username, password string, maxConns int) (*FTPConnectionPool, error) { pool := &FTPConnectionPool{ conns: make(chan *ftp.ServerConn, maxConns), maxConns: maxConns, } for i := 0; i < maxConns; i++ { conn, err := ftp.Dial(server, ftp.DialWithTimeout(5*time.Second)) if err != nil { return nil, err } err = conn.Login(username, password) if err != nil { return nil, err } pool.conns <- conn } return pool, nil } func (p *FTPConnectionPool) GetConnection() (*ftp.ServerConn, error) { return <-p.conns, nil } func (p *FTPConnectionPool) ReleaseConnection(conn *ftp.ServerConn) { p.conns <- conn } func (p *FTPConnectionPool) Close() { close(p.conns) for conn := range p.conns { _ = conn.Quit() } } func (p *FTPConnectionPool) StoreFileWithPool(remotePath string, buffer []byte) error { conn, err := p.GetConnection() if err != nil { return err } defer p.ReleaseConnection(conn) data := bytes.NewBuffer(buffer) err = conn.Stor(remotePath, data) if err != nil { return fmt.Errorf("failed to upload file: %w", err) } return nil } func main() { fmt.Println("hello world") }
- 性能测试
package main import ( "fmt" "log" "sync" "testing" ) func BenchmarkFTPClient_StoreFileWithMaxConnections(b *testing.B) { // Assume NewFTPConnectionPool has been called elsewhere to initialize the pool // with a maxConns value of 4. For example: pool, err := NewFTPConnectionPool("localhost:2100", "jovy", "123456", 5) if err != nil { log.Fatalf("Failed to initialize FTP connection pool: %v", err) } defer pool.Close() var wg sync.WaitGroup buffer := []byte("test data for benchmarking") b.ResetTimer() for i := 0; i < 50; i++ { wg.Add(1) go func(i int) { defer wg.Done() // Use the connection pool to store the file err := pool.StoreFileWithPool(fmt.Sprintf("file_%d.txt", i), buffer) if err != nil { b.Errorf("Failed to store file: %v", err) } }(i) } wg.Wait() }
可以看到,池化后性能这块已经达到了极致。
至此,整个功能也实现差不多了,后续的错误处理及代码抽象可以在此基础上继续优化,感兴趣的同学可以测试看看。
四、参考
- 《深入理解 Go 并发编程》 鸟窝
- 在容器中搭建运行 FTP 服务器 https://www.niwoxuexi.com/blog/hangge/article/903.html
- linux开启ftp服务和golang实现ftp_server_client https://www.liuvv.com/p/d43abcbd.html