目录
Netty TCP服务器(TcpServer)
Reactor Netty提供了一个易于使用和配置的TcpServer。它隐藏Netty了创建TCP服务器所需的大部分功能并增加了Reactive Streams背压(Reactive Streams是具有无阻塞背压的异步流处理的标准)
启动和停止
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() // 创建一个TcpServer 准备好配置的实例 .bindNow(); // 以阻塞方式启动服务器并等待它完成初始化 server.onDispose() .block(); } }
启动和停止(主机和端口)
要在特定的host和上提供服务port,可以将以下配置应用到TCP服务器:
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() .host("localhost") .port(8080) .bindNow(); server.onDispose() .block(); } }
import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class MultiAddressApplication { public static void main(String[] args) { TcpServer tcpServer = TcpServer.create(); // 配置第一台TCP服务器主机、端口 DisposableServer server1 = tcpServer .host("localhost") .port(8080) .bindNow(); // 配置第二台TCP服务器主机、端口 DisposableServer server2 = tcpServer .host("0.0.0.0") .port(8081) .bindNow(); Mono.when(server1.onDispose(), server2.onDispose()) .block(); } }
急切初始化
默认情况下,TcpServer资源的初始化是按需进行的。这意味着bind operation吸收了初始化和加载所需的额外时间:
①:事件循环组
②:本机传输库(使用本机传输时)
③:用于安全性的本机库(在的情况下OpenSsl)
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { TcpServer tcpServer = TcpServer.create() .handle((inbound, outbound) -> inbound.receive().then()); tcpServer.warmup() // 初始化并加载事件循环组、本机传输库和用于安全性的本机库 .block(); DisposableServer server = tcpServer.bindNow(); server.onDispose() .block(); } }
消费客户端数据
为了从连接的客户端接收数据,必须附加一个I/O 处理程序。I/O 处理程序可以访问NettyInbound以读取数据。
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() // 从连接的客户端接收数据 .handle((inbound, outbound) -> inbound.receive().then()) .bindNow(); server.onDispose() .block(); } }
生命周期回调
TcpServer提供了以下生命周期回调以便扩展。
回调函数 | 描述 |
---|---|
doOnBind | 在服务器通道即将绑定时调用 |
doOnBound | 在绑定服务器通道时调用 |
doOnChannelInit | 初始化通道时调用 |
doOnConnection | 连接远程客户端时调用 |
doOnUnbound | 当服务器通道未绑定时调用 |
使用doOnConnection和doOnChannelInit回调:
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import java.util.concurrent.TimeUnit; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() // 当连接远程客户端时,Netty管道使用ReadTimeoutHandler进行扩展 .doOnConnection(conn -> conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) // 初始化通道时,Netty管道使用LoggingHandler进行扩展 .doOnChannelInit((observer, channel, remoteAddress) -> channel.pipeline() .addFirst(new LoggingHandler("reactor.netty.examples"))) .bindNow(); server.onDispose() .block(); } }
TCP-level配置(三种配置)
(1)Setting Channel Options:设置通道参数选项
默认情况下,TCP服务器配置有以下选项:
Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2); childOptions.put(ChannelOption.AUTO_READ, false); childOptions.put(ChannelOption.TCP_NODELAY, true); this.config = new TcpServerConfig( Collections.singletonMap(ChannelOption.SO_REUSEADDR, true), childOptions, () -> new InetSocketAddress(DEFAULT_PORT) );
如果需要其他选项或需要更改当前选项,可以应用以下配置:
import io.netty.channel.ChannelOption; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .bindNow(); server.onDispose() .block(); } }
(2)Wire Logger:连线日志记录
Reactor Netty提供连线日志记录,用于何时需要检查对等点之间的流量。
默认情况下,线路日志记录处于禁用状态。要启用它,必须将记录器reactor.netty.tcp.TcpServer级别设置为DEBUG并应用以下配置:
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() .wiretap(true) // 启用连线记录 .bindNow(); server.onDispose() .block(); } }
Wire Logger格式化程序
Reactor Netty支持3种不同的格式化程序:
连线日志格式化 | 描述 |
---|---|
AdvancedByteBufFormat#HEX_DUM | 同时记录事件和内容。内容将采用十六进制格式(默认) |
AdvancedByteBufFormat#SIMPLE | 使用此格式启用连线记录时,仅记录事件 |
AdvancedByteBufFormat#TEXTUAL | 同时记录事件和内容。内容将采用纯文本格式 |
当需要更改默认格式化程序时,使用方式:
import io.netty.handler.logging.LogLevel; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import reactor.netty.transport.logging.AdvancedByteBufFormat; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() // 启用连线记录, AdvancedByteBufFormat#TEXTUAL用于打印内容 .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) .bindNow(); server.onDispose() .block(); } }
①:AdvancedByteBufFormat#HEX_DUM - 默认
使用此格式启用连线记录时,将同时记录事件和内容。内容将采用十六进制格式
reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B * +-------------------------------------------------+ * | 0 1 2 3 4 5 6 7 8 9 a b c d e f | * +--------+-------------------------------------------------+----------------+ * |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World| * |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte| * |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl| * |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:| * |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de| * ... * reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B * +-------------------------------------------------+ * | 0 1 2 3 4 5 6 7 8 9 a b c d e f | * +--------+-------------------------------------------------+----------------+ * |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.| * |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:| * |00000020| 20 30 0d 0a 0d 0a | 0.... | * +--------+-------------------------------------------------+----------------+
②:AdvancedByteBufFormat#SIMPLE
使用此格式启用连线记录时,仅记录事件
reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B
③:AdvancedByteBufFormat#TEXTUAL
使用此格式启用连线记录时,将同时记录事件和内容。内容将采用纯文本格式。
reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1 * Content-Type: text/plain * user-agent: ReactorNetty/dev * ... * reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK * content-length: 0
(3)Event Loop Group:事件循环组
默认情况下,TCP服务器使用“事件循环组”,其中工作线程的数量等于初始化时运行时可用的处理器数量(但最小值为4)。当需要不同的配置时,可以使用LoopResource#create方法之一。
默认配置Event Loop Group如下:
/** *默认工作线程数,回退到可用处理器(但最小值为4) */ public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount"; /** *默认选择器线程计数,回退到-1(无选择器线程) */ public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount"; /** *UDP的默认工作线程数,回退到可用处理器(但最小值为4) */ public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount"; /** *默认的静默期,保证不会发生对底层循环资源的处置,回退到2秒。 */ public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod"; /** *默认情况下,无论任务是否在静默期内提交,在处理底层资源之前等待的最长时间为15秒。 */ public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout"; /** *默认值是否首选本机传输(epoll、kqueue),回退在可用时是否首选 */ public static final String NATIVE = "reactor.netty.native";
如果需要更改这些设置,可以应用以下配置:
import reactor.netty.DisposableServer; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { LoopResources loop = LoopResources.create("event-loop", 1, 4, true); DisposableServer server = TcpServer.create() .runOn(loop) .bindNow(); server.onDispose() .block(); } }
Option和childOption参数设置
SSL和TLS
当需要SSL或TLS时,可以应用下一个清单中显示的配置。默认情况下,如果OpenSSL可用,SslProvider.OPENSSL则使用provider作为提供者。否则SslProvider.JDK使用。可以通过SslContextBuilder或通过设置来切换提供程序“-Dio.netty.handler.ssl.noOpenSsl=true”
SSL(Secure Socket Layer,安全套接字层)
位于可靠的面向连接的网络层协议和应用层协议之间的一种协议层。SSL通过互相认证、使用数字签名确保完整性、使用加密确保私密性,以实现客户端和服务器之间的安全通讯。该协议由两层组成:①SSL记录协议、②SSL握手协议
TLS(Transport Layer Security,传输层安全协议)
用于两个应用程序之间提供保密性和数据完整性。该协议由两层组成:①TLS记录协议、②TLS握手协议
SSL是Netscape开发的专门用户保护Web通讯的,目前版本为3.0。最新版本的TLS 1.0是IETF(工程任务组)制定的一种新的协议,它建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本。两者差别极小,可以理解为SSL 3.1,它是写入了RFC的。
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import reactor.netty.tcp.TcpSslContextSpec; import java.io.File; public class Application { public static void main(String[] args) { File cert = new File("certificate.crt"); File key = new File("private.key"); TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key); DisposableServer server = TcpServer.create() .secure(spec -> spec.sslContext(tcpSslContextSpec)) .bindNow(); server.onDispose() .block(); } }
服务器域名
可以配置TCP多个SslContext映射到特定域的服务器。配置SNI映射时可以使用确切的域名或包含通配符的域名
使用包含通配符的域名:
public class Application { public static void main(String[] args) throws Exception { File defaultCert = new File("default_certificate.crt"); File defaultKey = new File("default_private.key"); File testDomainCert = new File("default_certificate.crt"); File testDomainKey = new File("default_private.key"); SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build(); SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build(); DisposableServer server = TcpServer.create() .secure(spec -> spec.sslContext(defaultSslContext) .addSniMapping("*.test.com", testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext))) .bindNow(); server.onDispose() .block(); } }
Metrics(监控指标)
TCP服务器支持与Micrometer,它公开了前缀为reactor.netty.tcp.server的所有指标。
在应用程序中,通常会记录日志以便事后分析,在很多情况下是产生了问题之后,再去查看日志,是一种事后的静态分析。在很多时候,可能需要了解整个系统在当前,或者某一时刻运行的情况,比如一个系统后台服务,可能需要了解一些实时监控的数据
1、每秒钟的请求数是多少(TPS)?
2、平均每个请求处理的时间?
3、请求处理的最长耗时?
4、请求处理的响应的直方图?
5、请求处理正确响应率?
6、等待处理的请求队列长度?
7、查看整个系统的的CPU使用率、内存占用、jvm运行情况;以及系统运行出错率等等一系列的实时数据采集时,最简单的方法就是在系统的入口、出口和关键位置设置埋点,然后将采集到的信息发送到实时监控平台或者存入到缓存和DB中做进一步的分析和展示。
Metrics作为一款监控指标的度量类库,提供了许多工具帮助开发者来完成各项数据的监控。
Metrics提供5种基本的度量类型:Meters、Gauges、Counters、Histograms和Timers
启用该集成:
import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() .metrics(true) // 启用与 Micrometer 的内置集成 .bindNow(); server.onDispose() .block(); } }
当需要TCP服务器指标与其他系统集成时,Micrometer或者想提供自己的集成Micrometer,可以提供自己的指标记录器,如下所示:
import reactor.netty.DisposableServer; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.tcp.TcpServer; import java.net.SocketAddress; import java.time.Duration; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() // 启用 TCP 服务器指标并提供ChannelMetricsRecorder实现。 .metrics(true, CustomChannelMetricsRecorder::new) .bindNow(); server.onDispose() .block(); } }
TCP服务器支持与Micrometer,它公开了前缀为reactor.netty.tcp.server的所有指标。
(1)汇总ConnectionProvider指标
metric name | type | 描述 |
---|---|---|
reactor.netty.tcp.server.data.received | DistributionSummary | 接收的数据量,以字节为单位 |
reactor.netty.tcp.server.data.sent | DistributionSummary | 发送的数据量,以字节为单位 |
reactor.netty.tcp.server.errors | Counter | 发生的错误数 |
reactor.netty.tcp.server.tls.handshake.time | Timer | TLS握手所花费的时间 |
(2)ByteBufAllocator指标
metric name | type | 描述 |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | 堆内存的字节数 |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | 直接内存的字节数 |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | 堆区域的数量(当PooledByteBufAllocator) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | 直接竞技场的数量(当PooledByteBufAllocator) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | 线程本地缓存的数量(当PooledByteBufAllocator) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | 小缓存的大小(当PooledByteBufAllocator) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | 正常缓存的大小(当PooledByteBufAllocator) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | 竞技场的块大小(当PooledByteBufAllocator) |
(3)EventLoop指标:
metric name | type | 描述 |
---|---|---|
reactor.netty.eventloop.pending.tasks | Gauge | 事件循环中待处理的任务数 |
Unix域套接字
TCP当使用本机传输时,客户端支持Unix域套接字(UDS)。
使用UDS支持:
import io.netty.channel.unix.DomainSocketAddress; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; public class Application { public static void main(String[] args) { DisposableServer server = TcpServer.create() // 指定DomainSocketAddress将被使用 .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) .bindNow(); server.onDispose() .block(); } }