ftp pool 功能分析及 golang 实现

avatar
作者
筋斗云
阅读量:1

本文探究一种轻量级的 pool 实现 ftp 连接。

一、背景

简要介绍:业务中一般使用较多的是各种开源组件,设计有点重,因此本文探究一种轻量级的 pool 池的思想实现。

期望:设置连接池最大连接数为 N 时,批量执行 M 个 FTP 请求,所有请求都可以成功。

关键点: 使用池的思想存储 FTP 链接,同时控制 FTP 连接的数量。

二、池思想及 sync.Pool 重点分析

池思想设计模式

Golang 是有自动垃圾回收机制的编程语言,使用三色并发标记算法标记对象并回收。但是如果想开发一个高性能的应用程序,就必须考虑垃圾回收给性能带来的影响。因为 Go 的垃圾回收机制有一个 STW 时间,而且在堆上大量创建对象,也会影响垃圾回收标记时间。

通常采用对象池的方式,将不用的对象回收起来,避免垃圾回收。另外,像数据库连接、TCP 长连接等,连接的创建是非常耗时操作,如果每次使用都创建新链接,则整个业务很多时间都花在了创建链接上了。因此,若将这些链接保存起来,避免每次使用时都重新创建,则能大大降低业务耗时,提升系统整体性能。

sync.Pool 要点

在这里插入图片描述

golang 中提供了 sync.Pool 数据结构,可以使用它来创建池化对象。不过使用时有几个重点是要关注的,避免踩坑:

  1. sync.Pool 本身是线程安全的,多个 goroutine 可以并发地调用存取对象。
  2. sync.Pool 不可用在使用之后复制使用。关于这一点 context 包里面有大量使用,不再赘述。
  3. sync.Pool 用来保存的是一组可独立访问的“临时”对象。注意这里的“临时”,这表明池中的对象可能在未来某个时间被毫无预兆的移除(因为长久不使用被 GC 回收了)。

关于第 3 点非常重要,下面我们实现一个 demo 来详细说明:

package main  import ( 	"fmt" 	"net/http" 	"sync" 	"time" )  func main() { 	var p sync.Pool // 创建一个对象池 	p.New = func() interface{} { 		return &http.Client{ 			Timeout: 5 * time.Second, 		} 	} 	var wg sync.WaitGroup 	wg.Add(10) 	for i := 0; i < 10; i++ { 		go func() { 			defer wg.Done() 			client := p.Get().(*http.Client) 			defer p.Put(client) 			//获取http请求并打印返回码 			resp, err := client.Get("https://www.baidu.com") 			if err != nil { 				fmt.Println("http get error", err) 				return 			} 			resp.Body.Close() 			fmt.Println("success", resp.Status) 		}() 	} 	//等待所有请求结束 	wg.Wait() } 

这里我们使用 New 定义了创建 http.Client 的方法,然后启动 10 个 goroutine 并发访问网址,使用的 http.Client 对象都是从池中获取的,使用完毕后再放回到池子。

实际上,这个池中可能创建了 10 个 http.Client ,也可能创建了 8 个,还有可能创建了 3 个。取决于每个请求执行时池中是否有空闲的 http.Client ,以及其它的 goroutine 是否及时的放回去。

另外这里要注意的是,我们设置了 New 字段,当没有空闲请求时,Get 方法会调用 New 重新生成一个新的 http.Client。这种方式实现的好处是不必担心没有 http.Client 可用,缺点是数量不可控。你可能会想,不设置 New 字段是否可以?也是可以的,实现如下:

package main  import ( 	"fmt" 	"net/http" 	"sync" 	"time" )  func main() { 	var p sync.Pool // 创建一个对象池 	for i := 0; i < 5; i++ { 		p.Put(&http.Client{Timeout: 5 * time.Second}) // 不设置 New 字段,初始化时就放入5个可重用对象 	} 	var wg sync.WaitGroup 	wg.Add(10) 	for i := 0; i < 10; i++ { 		go func() { 			defer wg.Done() 			client, ok := p.Get().(*http.Client) 			if !ok { 				fmt.Println("get client is nil") 				return 			} 			defer p.Put(client) 			resp, err := client.Get("https://www.baidu.com") 			if err != nil { 				fmt.Println("http get error", err) 				return 			} 			resp.Body.Close() 			fmt.Println("success", resp.Status) 		}() 	} 	//等待所有请求结束 	wg.Wait() }  

在这里插入图片描述

在初始化时直接放入一定数量的可重用对象,从而达到了控制数量的目的。但是不设置 New 字段的风险很大,因为池化的对象如果长时间没有被调用,可能会被回收,而我们是无法预知什么时候池化的对象是会被回收的。因此一般不会使用这种方式,而是通过其它的方式来实现并发数量控制。

至此,也清楚了我们想实现的诉求:既要通过池满足连接复用,也要控制连接数量。(我们已经知道,仅仅依靠 sync.Pool 是实现不了的)

三、FTP 连接池的实现

  1. 创建 ftp docker 容器
 docker run -d --name ftp_server \ -p 2100:21 \ -p 30010-30019:30010-30019 \ -e "FTP_PASSIVE_PORTS=30010:30019" \ -e FTP_USER_HOME=/home/test \ -e FTP_USER_NAME=test \ -e FTP_USER_PASS=123456 \ -e FTP_USER_LIMIT=30 \   -e "PUBLICHOST=localhost" \ stilliard/pure-ftpd 
  1. 使用 golang ftp client 库进行代码开发
package main  import ( 	"bytes" 	"fmt" 	"time"  	"github.com/jlaffaye/ftp" )  type FTPConnectionPool struct { 	conns    chan *ftp.ServerConn 	maxConns int }  func NewFTPConnectionPool(server, username, password string, maxConns int) (*FTPConnectionPool, error) { 	pool := &FTPConnectionPool{ 		conns:    make(chan *ftp.ServerConn, maxConns), 		maxConns: maxConns, 	}  	for i := 0; i < maxConns; i++ { 		conn, err := ftp.Dial(server, ftp.DialWithTimeout(5*time.Second)) 		if err != nil { 			return nil, err 		} 		err = conn.Login(username, password) 		if err != nil { 			return nil, err 		} 		pool.conns <- conn 	}  	return pool, nil }  func (p *FTPConnectionPool) GetConnection() (*ftp.ServerConn, error) { 	return <-p.conns, nil }  func (p *FTPConnectionPool) ReleaseConnection(conn *ftp.ServerConn) { 	p.conns <- conn }  func (p *FTPConnectionPool) Close() { 	close(p.conns) 	for conn := range p.conns { 		_ = conn.Quit() 	} }  func (p *FTPConnectionPool) StoreFileWithPool(remotePath string, buffer []byte) error { 	conn, err := p.GetConnection() 	if err != nil { 		return err 	} 	defer p.ReleaseConnection(conn)  	data := bytes.NewBuffer(buffer) 	err = conn.Stor(remotePath, data) 	if err != nil { 		return fmt.Errorf("failed to upload file: %w", err) 	} 	return nil }  func main() { 	fmt.Println("hello world") }  
  1. 性能测试
package main  import ( 	"fmt" 	"log" 	"sync" 	"testing" )  func BenchmarkFTPClient_StoreFileWithMaxConnections(b *testing.B) { 	// Assume NewFTPConnectionPool has been called elsewhere to initialize the pool 	// with a maxConns value of 4. For example: 	pool, err := NewFTPConnectionPool("localhost:2100", "jovy", "123456", 5) 	if err != nil { 		log.Fatalf("Failed to initialize FTP connection pool: %v", err) 	} 	defer pool.Close()  	var wg sync.WaitGroup 	buffer := []byte("test data for benchmarking")  	b.ResetTimer()  	for i := 0; i < 50; i++ { 		wg.Add(1) 		go func(i int) { 			defer wg.Done() 			// Use the connection pool to store the file 			err := pool.StoreFileWithPool(fmt.Sprintf("file_%d.txt", i), buffer) 			if err != nil { 				b.Errorf("Failed to store file: %v", err) 			} 		}(i) 	}  	wg.Wait() }  

可以看到,池化后性能这块已经达到了极致。
至此,整个功能也实现差不多了,后续的错误处理及代码抽象可以在此基础上继续优化,感兴趣的同学可以测试看看。

四、参考

  • 《深入理解 Go 并发编程》 鸟窝
  • 在容器中搭建运行 FTP 服务器 https://www.niwoxuexi.com/blog/hangge/article/903.html
  • linux开启ftp服务和golang实现ftp_server_client https://www.liuvv.com/p/d43abcbd.html

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!