# netty-test **Repository Path**: Jyokiyi/netty-test ## Basic Information - **Project Name**: netty-test - **Description**: netty的学习和笔记 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2018-04-30 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Netty的学习 Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。 Netty是一款NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化并简化了TCP和UDP套接字服务器等网络编程。Netty的设计经验非常丰富,包括FTP,SMTP,HTTP以及各种基于二进制和基于文本的传统协议等。因此,Netty成功地找到了一种方法来实现轻松的开发,性能,稳定性和灵活性,而无需妥协。  ## 性能 * 吞吐量更好,延迟更低 * 更少的资源消耗 * 最小化不必要的内存拷贝 ## 安全 * 完整的SSL / TLS和StartTLS支持 ## 原理--是什么 ## 使用 ### 编写一个Discard服务器 [Discard协议](https://tools.ietf.org/html/rfc863) 继承关系图:  为了实现DISCARD协议,你需要做的唯一事情就是忽略所有接收到的数据。让我们从处理器实现开始,它处理由Netty生成的I / O事件。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
* DiscardServerHandler继承至ChannelInboundHandlerAdapter类,是一个实现ChannelInboundHandler接口。ChannelInboundHandler接口提供了可以覆盖的各种事件处理程序方法。目前,扩展ChannelInboundHandlerAdapter就足够了,而不是自己实现处理程序接口。
* 我们在这里重写channelRead()处理程序方法。每当接收到来自客户端的新数据时,都会调用此方法。在这个例子中,接收到的消息的类型是ByteBuf。
* 为了实现DISCARD协议,处理程序必须忽略收到的消息。ByteBuf是一个引用计数的对象,必须通过该release()方法明确释放。请记住,处理程序的职责是释放传递给处理程序的任何引用计数的对象。通常,channelRead()处理程序方法的实现如下所示:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
* 该exceptionCaught()事件处理方法被调用,抛出异常时被提出的Netty由于I / O错误或由处理器实现,由于在处理事件引发的异常。在大多数情况下,应该记录捕获到的异常并关闭其相关通道,尽管此方法的实现可能因您想要处理异常情况而需要做的不同而有所不同。例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
1. [NioEventLoopGroup](http://netty.io/4.1/api/io/netty/channel/nio/NioEventLoopGroup.html)是一个处理I/O操作的多线程事件循环。Netty为不同的传输,提供了各种[EventLoopGroup](http://netty.io/4.1/api/io/netty/channel/EventLoopGroup.html)实现。在这个例子中,我们正在实现一个服务器端应用程序,因此将使用两个[NioEventLoopGroup](http://netty.io/4.1/api/io/netty/channel/nio/NioEventLoopGroup.html)。第一个通常被称为“boss”,接受传入的连接。第二个通常称为“worker”,一旦老板接受连接并将接受的连接注册给工作人员,则处理接受连接的流量。使用多少个线程以及它们如何映射到创建的[Channel](http://netty.io/4.1/api/io/netty/channel/Channel.html)s取决于[EventLoopGroup](http://netty.io/4.1/api/io/netty/channel/EventLoopGroup.html)的实现,甚至可以通过构造函数配置。
2. [ServerBootstrap](http://netty.io/4.1/api/io/netty/bootstrap/ServerBootstrap.html)是建立服务器的辅助类。您可以使用[Channel](http://netty.io/4.1/api/io/netty/channel/Channel.html)直接设置服务器。但是,请注意,这是一个乏味的过程,在大多数情况下您不需要这样做。
3. 在这里,我们指定使用[NioServerSocketChannel](http://netty.io/4.1/api/io/netty/channel/socket/nio/NioServerSocketChannel.html)类用于实例化新的[Channel](http://netty.io/4.1/api/io/netty/channel/Channel.html)来接收传入连接。
4. 这里指定的处理程序将始终由新接收的Channel信道进行使用。[ChannelInitializer](http://netty.io/4.1/api/io/netty/channel/ChannelInitializer.html) 是一个专门用来帮助用户配置一个新[Channel](http://netty.io/4.1/api/io/netty/channel/Channel.html)的处理程序。最有可能的是,通过添加一些处理程序(如DiscardServerHandler来配置[ChannelPipeline](http://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html)对于新的Channel,以实现网络应用程序。随着应用程序变得复杂,您可能会向管道中添加更多的处理程序,并最终将此匿名类提取到顶级类中。
5. 可以设置特定于Channel实现的参数。我们正在编写一个TCP / IP服务器,因此我们可以设置诸如tcpNoDelay和的套接字选项keepAlive。请参阅apidocs [ChannelOption](http://netty.io/4.1/api/io/netty/channel/ChannelOption.html)和特定的[ChannelConfig](http://netty.io/4.1/api/io/netty/channel/ChannelConfig.html)实现,以获得关于支持ChannelOption的概述。
6. 你有没有注意到option()和childOption()?option()是为了NioServerSocketChannel接受传入连接。childOption()是为通过父类[ServerChannel](http://netty.io/4.1/api/io/netty/channel/ServerChannel.html)接收Channel,[NioServerSocketChannel](http://netty.io/4.1/api/io/netty/channel/socket/nio/NioServerSocketChannel.html)在这种情况下。
7. 我们现在准备好了。剩下的就是绑定到端口并启动服务器。在这里,我们绑定到机器8080中所有NIC(网卡)的端口。您现在可以bind()根据需要多次调用该方法(使用不同的绑定地址)。
### 编写一个Echo服务器 [回声协议](https://tools.ietf.org/html/rfc862)
让我们学习如何通过实现ECHO协议向客户端写入响应消息,在协议中发送任何接收到的数据。
1. 一个[ChannelHandlerContext](http://netty.io/4.1/api/io/netty/channel/ChannelHandlerContext.html)对象提供了各种操作,使您能够触发各种I/O事件和操作。在这里,我们调用write(Object)逐字写入收到的消息。请注意,我们没有发布收到的消息,与我们在DISCARD示例中所做的不同。这是因为当Netty写出来时,Netty会为你发布它。
2. ctx.write(Object)不会将信息写入线路中。它在内部被缓冲,然后通过ctx.flush()刷写到线路中去。或者,你可以使用简洁方式:ctx.writeAndFlush(msg)。
如果你再次运行telnet命令,你会看到服务器发回你发送给它的任何东西。
### 编写时间服务器 [时间协议](https://tools.ietf.org/html/rfc868)
它与前面的例子不同之处在于,它发送的消息包含一个32位整数,但未收到任何请求,并在发送消息后关闭连接。在本例中,您将学习如何构建和发送消息,并在完成时关闭连接。
因为我们将忽略任何接收到的数据,但只要连接建立就发送消息,我们不能使用此次的channelRead()方法。相反,我们应该重写该channelActive()方法。
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1. 如上所述,channelActive()当连接建立并准备好产生流量时,该方法将被调用。我们来写一个32位整数来表示此方法中的当前时间。
2. 要发送一条新消息,我们需要分配一个新的缓冲区,它将包含该消息。我们将写一个32位整数,因此我们需要一个ByteBuf容量至少为4个字节的数据。获取当前[ByteBufAllocator](http://netty.io/4.1/api/io/netty/buffer/ByteBufAllocator.html)通过ChannelHandlerContext.alloc()并分配一个新的缓冲区。
3. 像往常一样,我们编写构建的消息。
但是等一下,flip在哪里?java.nio.ByteBuffer.flip()在NIO发送消息之前调用。ByteBuf没有这样的方法,因为它有两个指针; 一个用于读取操作,另一个用于写入操作。当你写一些内容时,write index会增加,ByteBuf而阅读器索引(read index)不会改变。阅读器索引和写入器索引分别表示消息开始和结束的位置。
相反,NIO缓冲区不提供一个干净的方式来确定消息内容的起始和结束位置,而不调用flip方法。当您忘记flip缓冲区时,您将遇到麻烦,因为没有或者不正确的数据将被发送。这种错误在Netty中不会发生,因为我们对不同的操作类型有不同的指针。你会发现它让你的生活变得更容易,因为你已经习惯了它 - 一个没有flip的生活!
另一点要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法返回一个[ChannelFuture](http://netty.io/4.1/api/io/netty/channel/ChannelFuture.html)。A [ChannelFuture](http://netty.io/4.1/api/io/netty/channel/ChannelFuture.html)表示尚未发生的I/O操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在Netty中都是异步的。例如,即使在发送消息之前,以下代码可能会关闭连接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,您需要在[ChannelFuture](http://netty.io/4.1/api/io/netty/channel/ChannelFuture.html)完成之后调用close()方法,ChannelFuture是write()方法返回的,并在写入操作完成时通知其侦听器。请注意,close()也可能不会立即关闭连接,因为它也返回一个ChannelFuture。
4. 当写请求完成后,我们如何得到通知?这很简单为返回的ChannelFuture添加一个[ChannelFutureListener](http://netty.io/4.1/api/io/netty/channel/ChannelFutureListener.html)。在这里,我们创建了一个新的匿名[ChannelFutureListener](http://netty.io/4.1/api/io/netty/channel/ChannelFutureListener.html),当操作完成时它关闭Channel。或者,您可以使用预定义的侦听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);
要测试我们的时间服务器是否按预期工作,可以使用UNIX rdate命令:
```rdate -o -p ```
``````你在main()方法中指定的端口号,``````通常是localhost。
### 编写时间客户端
在Netty中,服务器和客户端之间最大和唯一的区别是使用不同的[Bootstrap](http://netty.io/4.1/api/io/netty/bootstrap/Bootstrap.html)和[Channel](http://netty.io/4.1/api/io/netty/channel/Channel.html)的实现。请看下面的代码:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
1. [Bootstrap](http://netty.io/4.1/api/io/netty/bootstrap/Bootstrap.html)与[ServerBootstrap](http://netty.io/4.1/api/io/netty/bootstrap/ServerBootstrap.html)类似,除了它用于非服务器信道,如客户端或无连接信道。
2. 如果您只指定一个[EventLoopGroup](http://netty.io/4.1/api/io/netty/channel/EventLoopGroup.html),它将被用作BOSS组和工作组。不过,BOSS worker并不用于客户端的。
3. 和[NioServerSocketChannel](http://netty.io/4.1/api/io/netty/channel/socket/nio/NioServerSocketChannel.html)相反,[NioSocketChannel](http://netty.io/4.1/api/io/netty/channel/socket/nio/NioSocketChannel.html)用于创建客户端的Channel。
4. 请注意,我们childOption()在这里不使用,不像ServerBootstrap,因为客户端的SocketChannel没有父级。
5. 我们应该调用connect()方法而不是bind()方法。
正如你所看到的,它与服务器端代码并没有什么不同。通道处理程序实现如何?它应该从服务器接收32位整数,将其转换为人类可读格式,打印转换后的时间,并关闭连接:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1. 在TCP/IP中,Netty将从对等体发送来的数据读入到ByteBuf。
它看起来非常简单,与服务器端的示例没有任何区别。然而,这个处理程序有时会拒绝工作IndexOutOfBoundsException。我们在下一节讨论为什么会发生这种情况。
### 关闭你的应用程序
关闭Netty应用程序通常就像关闭你所创建的所有EventLoopGroup一样简单通过shutdownGracefully()方法。它返回一个Future通知你什么时候EventLoopGroup已经完全终止,和属于该组的所有Channel都已关闭。