2.Netty TCP服务器(TcpServer)

avatar
作者
猴君
阅读量:1

目录


Netty专栏目录(点击进入…)


Netty TCP服务器(TcpServer)

Reactor Netty提供了一个易于使用和配置的TcpServer。它隐藏Netty了创建TCP服务器所需的大部分功能并增加了Reactive Streams背压(Reactive Streams是具有无阻塞背压的异步流处理的标准)

启动和停止

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create()		// 创建一个TcpServer 准备好配置的实例 				         .bindNow();	// 以阻塞方式启动服务器并等待它完成初始化  		server.onDispose() 		      .block(); 	} } 

启动和停止(主机和端口)

要在特定的host和上提供服务port,可以将以下配置应用到TCP服务器:

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application { 	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create() 				         .host("localhost")  				         .port(8080)         				         .bindNow(); 		server.onDispose() 		      .block(); 	} } 
import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class MultiAddressApplication {  	public static void main(String[] args) { 		TcpServer tcpServer = TcpServer.create();                    // 配置第一台TCP服务器主机、端口 		DisposableServer server1 = tcpServer 				.host("localhost")  				.port(8080)         				.bindNow();           // 配置第二台TCP服务器主机、端口 		DisposableServer server2 = tcpServer 				.host("0.0.0.0")  				.port(8081)       				.bindNow();  		Mono.when(server1.onDispose(), server2.onDispose()) 				.block(); 	} } 

急切初始化

默认情况下,TcpServer资源的初始化是按需进行的。这意味着bind operation吸收了初始化和加载所需的额外时间:

①:事件循环组
②:本机传输库(使用本机传输时)
③:用于安全性的本机库(在的情况下OpenSsl)

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		TcpServer tcpServer = 				TcpServer.create() 				         .handle((inbound, outbound) -> inbound.receive().then());  		tcpServer.warmup()  // 初始化并加载事件循环组、本机传输库和用于安全性的本机库 		         .block(); 		DisposableServer server = tcpServer.bindNow(); 		server.onDispose() 		      .block(); 	} 	 } 

消费客户端数据

为了从连接的客户端接收数据,必须附加一个I/O 处理程序。I/O 处理程序可以访问NettyInbound以读取数据。

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create()                           // 从连接的客户端接收数据 				         .handle((inbound, outbound) -> inbound.receive().then())  				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

生命周期回调

TcpServer提供了以下生命周期回调以便扩展。

回调函数描述
doOnBind在服务器通道即将绑定时调用
doOnBound在绑定服务器通道时调用
doOnChannelInit初始化通道时调用
doOnConnection连接远程客户端时调用
doOnUnbound当服务器通道未绑定时调用

使用doOnConnection和doOnChannelInit回调:

import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import java.util.concurrent.TimeUnit;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create()                          // 当连接远程客户端时,Netty管道使用ReadTimeoutHandler进行扩展 				         .doOnConnection(conn -> 				             conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS)))                           // 初始化通道时,Netty管道使用LoggingHandler进行扩展 				         .doOnChannelInit((observer, channel, remoteAddress) -> 				             channel.pipeline() 				                    .addFirst(new LoggingHandler("reactor.netty.examples"))) 				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

TCP-level配置(三种配置)

(1)Setting Channel Options:设置通道参数选项

默认情况下,TCP服务器配置有以下选项:

Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2); childOptions.put(ChannelOption.AUTO_READ, false); childOptions.put(ChannelOption.TCP_NODELAY, true);  this.config = new TcpServerConfig( 	Collections.singletonMap(ChannelOption.SO_REUSEADDR, true), 	childOptions, 	() -> new InetSocketAddress(DEFAULT_PORT) ); 

如果需要其他选项或需要更改当前选项,可以应用以下配置:

import io.netty.channel.ChannelOption; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create() 				         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) 				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

(2)Wire Logger:连线日志记录

Reactor Netty提供连线日志记录,用于何时需要检查对等点之间的流量。

默认情况下,线路日志记录处于禁用状态。要启用它,必须将记录器reactor.netty.tcp.TcpServer级别设置为DEBUG并应用以下配置:

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create() 				         .wiretap(true) // 启用连线记录 				         .bindNow(); 		server.onDispose() 		      .block(); 	} 	 } 

Wire Logger格式化程序
Reactor Netty支持3种不同的格式化程序:

连线日志格式化描述
AdvancedByteBufFormat#HEX_DUM同时记录事件和内容。内容将采用十六进制格式(默认)
AdvancedByteBufFormat#SIMPLE使用此格式启用连线记录时,仅记录事件
AdvancedByteBufFormat#TEXTUAL同时记录事件和内容。内容将采用纯文本格式

当需要更改默认格式化程序时,使用方式:

import io.netty.handler.logging.LogLevel; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import reactor.netty.transport.logging.AdvancedByteBufFormat;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create()                          // 启用连线记录, AdvancedByteBufFormat#TEXTUAL用于打印内容 				         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL)  				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 
①:AdvancedByteBufFormat#HEX_DUM - 默认

使用此格式启用连线记录时,将同时记录事件和内容。内容将采用十六进制格式

reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B   *          +-------------------------------------------------+  *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |  * +--------+-------------------------------------------------+----------------+  * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|  * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|  * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|  * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|  * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|  * ...  * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B  *          +-------------------------------------------------+  *          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |  * +--------+-------------------------------------------------+----------------+  * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|  * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|  * |00000020| 20 30 0d 0a 0d 0a                               | 0....          |  * +--------+-------------------------------------------------+----------------+ 
②:AdvancedByteBufFormat#SIMPLE

使用此格式启用连线记录时,仅记录事件

reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B 
③:AdvancedByteBufFormat#TEXTUAL

使用此格式启用连线记录时,将同时记录事件和内容。内容将采用纯文本格式。

reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1  * Content-Type: text/plain  * user-agent: ReactorNetty/dev  * ...  * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK  * content-length: 0 

(3)Event Loop Group:事件循环组

默认情况下,TCP服务器使用“事件循环组”,其中工作线程的数量等于初始化时运行时可用的处理器数量(但最小值为4)。当需要不同的配置时,可以使用LoopResource#create方法之一。

默认配置Event Loop Group如下:

/** *默认工作线程数,回退到可用处理器(但最小值为4)  */ public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount"; /**  *默认选择器线程计数,回退到-1(无选择器线程)  */ public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount"; /** *UDP的默认工作线程数,回退到可用处理器(但最小值为4)  */ public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount"; /** *默认的静默期,保证不会发生对底层循环资源的处置,回退到2秒。 */ public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod"; /** *默认情况下,无论任务是否在静默期内提交,在处理底层资源之前等待的最长时间为15秒。  */ public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";  /** *默认值是否首选本机传输(epoll、kqueue),回退在可用时是否首选 */ public static final String NATIVE = "reactor.netty.native"; 

如果需要更改这些设置,可以应用以下配置:

import reactor.netty.DisposableServer; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		LoopResources loop = LoopResources.create("event-loop", 1, 4, true);  		DisposableServer server = 				TcpServer.create() 				         .runOn(loop) 				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

Option和childOption参数设置

Option和childOption参数设置(点击进入…)


SSL和TLS

当需要SSL或TLS时,可以应用下一个清单中显示的配置。默认情况下,如果OpenSSL可用,SslProvider.OPENSSL则使用provider作为提供者。否则SslProvider.JDK使用。可以通过SslContextBuilder或通过设置来切换提供程序“-Dio.netty.handler.ssl.noOpenSsl=true”

SSL(Secure Socket Layer,安全套接字层)

位于可靠的面向连接的网络层协议和应用层协议之间的一种协议层。SSL通过互相认证、使用数字签名确保完整性、使用加密确保私密性,以实现客户端和服务器之间的安全通讯。该协议由两层组成:①SSL记录协议、②SSL握手协议

TLS(Transport Layer Security,传输层安全协议)

用于两个应用程序之间提供保密性和数据完整性。该协议由两层组成:①TLS记录协议、②TLS握手协议

SSL是Netscape开发的专门用户保护Web通讯的,目前版本为3.0。最新版本的TLS 1.0是IETF(工程任务组)制定的一种新的协议,它建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本。两者差别极小,可以理解为SSL 3.1,它是写入了RFC的。

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import reactor.netty.tcp.TcpSslContextSpec;  import java.io.File;  public class Application {  	public static void main(String[] args) { 		File cert = new File("certificate.crt"); 		File key = new File("private.key");  		TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);  		DisposableServer server = 				TcpServer.create() 				         .secure(spec -> spec.sslContext(tcpSslContextSpec)) 				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

服务器域名
可以配置TCP多个SslContext映射到特定域的服务器。配置SNI映射时可以使用确切的域名或包含通配符的域名

使用包含通配符的域名:

public class Application {  	public static void main(String[] args) throws Exception { 		File defaultCert = new File("default_certificate.crt"); 		File defaultKey = new File("default_private.key");  		File testDomainCert = new File("default_certificate.crt"); 		File testDomainKey = new File("default_private.key");  		SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build(); 		SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();  		DisposableServer server = 				TcpServer.create() 				         .secure(spec -> spec.sslContext(defaultSslContext) 				                             .addSniMapping("*.test.com", 				                                     testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext))) 				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

Metrics(监控指标)

TCP服务器支持与Micrometer,它公开了前缀为reactor.netty.tcp.server的所有指标

在应用程序中,通常会记录日志以便事后分析,在很多情况下是产生了问题之后,再去查看日志,是一种事后的静态分析。在很多时候,可能需要了解整个系统在当前,或者某一时刻运行的情况,比如一个系统后台服务,可能需要了解一些实时监控的数据
1、每秒钟的请求数是多少(TPS)?
2、平均每个请求处理的时间?
3、请求处理的最长耗时?
4、请求处理的响应的直方图?
5、请求处理正确响应率?
6、等待处理的请求队列长度?
7、查看整个系统的的CPU使用率、内存占用、jvm运行情况;以及系统运行出错率等等一系列的实时数据采集时,最简单的方法就是在系统的入口、出口和关键位置设置埋点,然后将采集到的信息发送到实时监控平台或者存入到缓存和DB中做进一步的分析和展示。

Metrics作为一款监控指标的度量类库,提供了许多工具帮助开发者来完成各项数据的监控。
Metrics提供5种基本的度量类型:Meters、Gauges、Counters、Histograms和Timers

启用该集成:

import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create() 				         .metrics(true) // 启用与 Micrometer 的内置集成 				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

当需要TCP服务器指标与其他系统集成时,Micrometer或者想提供自己的集成Micrometer,可以提供自己的指标记录器,如下所示:

import reactor.netty.DisposableServer; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.tcp.TcpServer;  import java.net.SocketAddress; import java.time.Duration;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create() 						 // 启用 TCP 服务器指标并提供ChannelMetricsRecorder实现。 				         .metrics(true, CustomChannelMetricsRecorder::new)  				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

TCP服务器支持与Micrometer,它公开了前缀为reactor.netty.tcp.server的所有指标。

(1)汇总ConnectionProvider指标

metric nametype描述
reactor.netty.tcp.server.data.receivedDistributionSummary接收的数据量,以字节为单位
reactor.netty.tcp.server.data.sentDistributionSummary发送的数据量,以字节为单位
reactor.netty.tcp.server.errorsCounter发生的错误数
reactor.netty.tcp.server.tls.handshake.timeTimerTLS握手所花费的时间

(2)ByteBufAllocator指标

metric nametype描述
reactor.netty.bytebuf.allocator.used.heap.memoryGauge堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memoryGauge直接内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenasGauge堆区域的数量(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.direct.arenasGauge直接竞技场的数量(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.threadlocal.cachesGauge线程本地缓存的数量(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.small.cache.sizeGauge小缓存的大小(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.normal.cache.sizeGauge正常缓存的大小(当PooledByteBufAllocator)
reactor.netty.bytebuf.allocator.used.chunk.sizeGauge竞技场的块大小(当PooledByteBufAllocator)

(3)EventLoop指标:

metric nametype描述
reactor.netty.eventloop.pending.tasksGauge事件循环中待处理的任务数

Unix域套接字

TCP当使用本机传输时,客户端支持Unix域套接字(UDS)。

使用UDS支持:

import io.netty.channel.unix.DomainSocketAddress; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer;  public class Application {  	public static void main(String[] args) { 		DisposableServer server = 				TcpServer.create()                          // 指定DomainSocketAddress将被使用 				         .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))  				         .bindNow();  		server.onDispose() 		      .block(); 	} 	 } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!