# netty-study **Repository Path**: zhuhuijie/netty-study ## Basic Information - **Project Name**: netty-study - **Description**: Netty框架的学习 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 1 - **Created**: 2021-05-05 - **Last Updated**: 2025-03-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Netty 异步高性能通信框架 > 互联网行业:RPC框架大量引入Netty,Dubbo 中默认使用Netty做通信框架,大型网络游戏,地图服务器,在大数据领域(AVRO实现数据文件共享)默认采用Netty做跨界点通信,Netty Service 对Netty二次封装... ## 1. IO模型 ### 1.1 BIO 模型 特点:每建立一个连接就会创建一个线程,没有连接就会阻塞等待 ```java package com.zhj.test.bio; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author zhj */ public class BIOServer { public static void main(String[] args) throws IOException { // 线程池机制 // 思路 // 1. 创建一个线程 // 2. 如果有客户端连接,就创建一个线程,与之通信(单独写一个方法) ExecutorService executorService = Executors.newCachedThreadPool(); // 创建ServerSocket ServerSocket serverSocket = new ServerSocket(6666); System.out.println("服务器程序启动!"); while (true) { // 监听,等待客户端连接 System.out.println("等待连接!!!"); final Socket socket = serverSocket.accept(); System.out.println("连接一个客户端(socket)!"); // 创建一个线程与之通讯 executorService.execute(new Runnable() { @Override public void run() { // 可以与客户端通讯 handler(socket); } }); } } /** * 与客户端通讯 */ public static void handler(Socket socket) { byte[] bytes = new byte[1024]; try { InputStream inputStream = socket.getInputStream(); // 循环读取客户端读取的数据 while (true) { System.out.println("等待输入数据!!!"); int read = inputStream.read(bytes); if (read != -1) { System.out.println(Thread.currentThread().getName() + " : " + Thread.currentThread().getId()); System.out.println("接收:" + new String(bytes, 0, read)); } else { break; } } } catch (IOException e) { e.printStackTrace(); } finally { System.out.println("关闭与客户端的连接!"); try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } ``` ### 1.2 NIO > NIO 全称java non-blocking IO 是指JDK提供的新的API。从JDK1.4开始,Java提供了一系列改进输入输出的新特性,被统称NIO(New IO),是同步非阻塞的。 **三大核心部分:Channel(通道),Buffer缓存区),Selector(选择题)** NIO是面向缓冲区,或者面向块编程的,数据读到一个它稍后处理的缓冲区,需要时可在缓冲区前后移动,这就增加了它处理过程中的灵活性,使他可以提供非阻塞式的高伸缩性网络。 特点: - 非阻塞 不需要线程一直等待,有别的任务线程也可以去执行 - 一个线程可以处理多个连接,当大量请求到服务器,不需要每个连接开一个线程 HTTP2.0采用多路复用技术,同一个连接处理多个请求。 三大核心组件的关系 - 每个Channel都会对应一个Buffer - Selector对应一个线程,一个线程对应多个Channel连接 - 该图反应了三个Channel 注册到改Selector 程序 - 程序切换到那个Channel是由事件决定的,Event就是一个重要的概念 - Selector会根据不同的时间再各个通道上切换 - Buffer就是一个内容块,底层是与一个数组的 - 数据的读取写入是通过Buffer,这个与BIO有本质区别,BIO要么是输入流,要么是输出流,不能是双向的,NIO的Buffer是可以读也可以写的,需要flip方法切换 - Channel是双向的,可以返回底层操作系统的情况,Linux底层的操作系统就是双向的 #### 1.2.1 Buffer缓冲区的使用 - Capacity 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变 - Limit 表示缓冲区当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 - Position 位置,下一个要读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写操作做准备 - Mark 标记 ```java package com.zhj.test.bio; import java.nio.IntBuffer; /** * @author zhj */ public class BasicBuffer { public static void main(String[] args) { // 举例说明Buffer 的使用 // 创建一个Buffer IntBuffer intBuffer = IntBuffer.allocate(5); // 向Buffer 存数据 for (int i = 0; i < intBuffer.capacity(); i++) { intBuffer.put(i * 2); } // 从Buffer 读取数据 // 将Buffer转换,读写切换 /* public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } */ intBuffer.flip(); // 设置读取位置 intBuffer.position(2); // 设置读取结束位置 intBuffer.limit(4); while (intBuffer.hasRemaining()) { System.out.println(intBuffer.get()); } } } public class NIOByteBufferPutGet { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(64); buffer.putInt(100); buffer.putLong(9L); buffer.putChar('强'); buffer.putShort((short) 4); buffer.flip(); System.out.println(buffer.getInt()); System.out.println(buffer.getLong()); System.out.println(buffer.getChar()); System.out.println(buffer.getShort()); } } public class ReadOnlyBuffer { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(64); for (int i = 0; i < 64; i++) { buffer.put((byte) i); } buffer.flip(); ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); System.out.println(readOnlyBuffer.getClass()); while (readOnlyBuffer.hasRemaining()) { System.out.println(readOnlyBuffer.get()); } // 只读不能放数据 // readOnlyBuffer.put((byte) 1); } } /** * MappedByteBuffer 说明 * 1. 可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次 * @author zhj */ public class MappedByteBufferTest { public static void main(String[] args) throws Exception { File file1 = new File("E:\\data_file\\log1.txt"); File file2 = new File("E:\\data_file\\log2.txt"); RandomAccessFile randomAccessFile = new RandomAccessFile(file1, "rw"); FileChannel fileChannel = randomAccessFile.getChannel(); /** * 参数(1读写模式,2起始位置,3映射到内存大小) */ MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5); mappedByteBuffer.put(0, (byte) 'H'); mappedByteBuffer.put(3, (byte) '9'); randomAccessFile.close(); System.out.println("修改成功~"); } } ``` #### 1.2.2 Channel通道的使用 **基本介绍** 1)NIO的通道类似与流,但区别如下 - 通道可以同时进行读写,而流只能进行读或者写 - 通道可以实现异步读写数据 - 通道可以从缓冲区读取数据,也可以写数据到缓冲区 2)BIO中的stream 是单向的,如FileinputStream对象只能进行读取数据的操作,而NIO中的通道是双向的,可以读,也可以写 3)Channel 在NIO中是一个接口 4)常用的Channel类有 FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel 5)FileChannel用于文件的数据读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写 FileChannel 类 - read 将通道数据读取到缓冲区中 - write 把缓冲区的数据写到通道 - transferFrom() 从目标通道中复制数据到当前通道 - transferTo() 把数据从当前通道复制给目标通道 ```java // 案例 // 写 public class NIOFileChannel01 { public static void main(String[] args) throws IOException { String str = "hello world"; // 创建一个输出流 FileOutputStream fileOutputStream = new FileOutputStream("E:\\data_file\\log.txt"); // 通过fileOutputStream 获取对应fileChannel // 这个fileChannel 真实类型是 FileChannelImpl FileChannel fileChannel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 将str 放入 byteBuffer.put(str.getBytes()); // 读写切换 byteBuffer.flip(); // 写入Channel fileChannel.write(byteBuffer); fileOutputStream.close(); } } // 读 public class NIOFileChannel02 { public static void main(String[] args) throws IOException { File file = new File("E:\\data_file\\log.txt"); // 创建一个输出流 FileInputStream fileInputStream = new FileInputStream(file); // 通过fileOutputStream 获取对应fileChannel // 这个fileChannel 真实类型是 FileChannelImpl FileChannel fileChannel = fileInputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length()); // 将文件 读入缓冲区 fileChannel.read(byteBuffer); // 读写切换 // byteBuffer.flip(); System.out.println(new String(byteBuffer.array())); fileInputStream.close(); } } // 读写 } // 将buffer 写入到 fileChannel02 byteBuffer.flip(); fileChannel02.write(byteBuffer); } fileInputStream.close(); fileOutputStream.close(); } } // 文件拷贝 public class NIOFileChannel04 { public static void main(String[] args) throws IOException { File file1 = new File("E:\\data_file\\img01.jpg"); File file2 = new File("E:\\data_file\\img02.jpg"); // 创建一个输出流 FileInputStream fileInputStream = new FileInputStream(file1); FileChannel fileChannel01 = fileInputStream.getChannel(); // 创建一个输出流 FileOutputStream fileOutputStream = new FileOutputStream(file2); FileChannel fileChannel02 = fileOutputStream.getChannel(); fileChannel02.transferFrom(fileChannel01,0, fileChannel01.size()); fileInputStream.close(); fileOutputStream.close(); } } ``` **ScatteringAndGathering 分散聚集** ```java /** * Scattering 将数据写入到buffer,可采用buffer数组,依次写入 * Gathering 将数据读出到buffer * @author zhj */ public class ScatteringAndGatheringTest { public static void main(String[] args) throws IOException { // 使用ServerSocketChannel 和SocketChannel 网络 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(7000); // 绑定端口到Socket并启动 serverSocketChannel.socket().bind(inetSocketAddress); // 创建buffer数组 ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(3); // 等客户端连接 SocketChannel socketChannel = serverSocketChannel.accept(); int messageLength = 8; // 假定从客户端接收8个 while (true) { int byteRead = 0; while (byteRead < messageLength) { long read = socketChannel.read(byteBuffers); byteRead += read; // System.out.println("byteRead = " + byteRead); Arrays.asList(byteBuffers).stream().map( buffer -> "postion = " + buffer.position() + ", limit = " + buffer.limit()) .forEach(System.out::println); } // buffer 反转 Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip()); // 将数据显示到客户端 long byteWrite = 0; while (byteWrite < messageLength) { long write = socketChannel.write(byteBuffers); byteWrite += write; } Arrays.asList(byteBuffers).forEach(buffer -> buffer.clear()); System.out.println("byteRead = " + byteRead); System.out.println("byteWrite = " + byteWrite); System.out.println("messageLength = " + messageLength); } } } ``` #### 1.2.3 Selector 选择器的使用 **特点** 1. Netty 的IO线程NioEventLoop 聚合了Selector (选择器,也叫多路复用器),可以同时并发处理成百上千个客户端的连接。 2. 当线程从某客户端Socket通道进行读写时,若没有数据可用时,该线程可以进行其他任务。 3. 线程常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入和输出通道。 4. 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致线程挂起。 5. 一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程的模型,架构性能、弹性伸缩能力和可靠性都得到了极大的提升。 方法:open() 获得 - selector.select() 阻塞 - selector.select(1000) 阻塞1s,返回 - selector.wakeup() 唤醒 - selector.selectNow() 不阻塞 #### 1.2.4 NIO实现 NIO入门案例 ```java ator.next(); // 事件驱动 if (key.isAcceptable()) { System.out.println("有新的客户端连接"); SocketChannel socketChannel = serverSocketChannel.accept(); // 设置为非阻塞 socketChannel.configureBlocking(false); // 注册selector 关联Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("生成非阻塞socketChannel:" + socketChannel.hashCode()); } if (key.isReadable()) { // 通过key反向获取对应channel SocketChannel socketChannel = (SocketChannel) key.channel(); // 获取到该channel 关联的 Buffer ByteBuffer byteBuffer = (ByteBuffer) key.attachment(); socketChannel.read(byteBuffer); System.out.println("客户端:" + new String(byteBuffer.array())); } // 手动从集合移出当前key 防止多线程发生重复读取 iterator.remove(); } } } } public class NIOClient { public static void main(String[] args) throws Exception { // 1.得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); // 2.设置非阻塞 socketChannel.configureBlocking(false); InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); // 3.连接服务器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("客户端因为连接需要时间,客户端不会阻塞"); } } System.out.println("客户端连接服务器连接成功!"); // 4.设置发送内容 String str = "hello world!!!"; // 5.将数据放入缓冲区 wrap可以根据字节数组大小分配大小 ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); socketChannel.write(buffer); System.in.read(); } } ``` ### 1.3 NIO与BIO的比较 1. BIO是以流的方式处理的,而NIO以块的方式处理数据,块IO的效率比流IO的高很多 2. BIO是阻塞的,NIO是非阻塞的 3. BIO基于字节流和字符流进行操作,而NIO基于Channel通道和Buffer缓冲区进行操作,数据总是从通道读到缓冲区中,或者从缓冲区写入到通道中。Sellector选择器用于监听多个通道的事件比如连接请求,数据到达等,因此使用单个线程就可以监听多个客户端通道 ## 2 NIO群聊 服务端 ```java public class GroupChatServer { // 定义相关属性 private Selector selector; private ServerSocketChannel serverSocketChannel; private static final int PORT = 8888; //构造器 public GroupChatServer() { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } // 读取客户端信息 private void readData(SelectionKey key) { // 定义 SocketChannel socketChannel = null; try { socketChannel = (SocketChannel) key.channel(); // 创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = socketChannel.read(buffer); if (count > 0) { // 把缓冲区数据转字符串输出 String msg = new String(buffer.array()); // 输出该消息 System.out.println("From 客户端:" + msg); // 转发消息 sendInfoToOtherClients(msg, socketChannel); } } catch (Exception e) { try { System.out.println(socketChannel.getRemoteAddress() + "离线了"); key.cancel(); socketChannel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } private void sendInfoToOtherClients(String msg, SocketChannel self) { System.out.println("服务器转发消息中。。。"); // 遍历所有注册在selector 并排除自己 try { for (SelectionKey key : selector.keys()) { Channel targetChannel = key.channel(); if (targetChannel instanceof SocketChannel && targetChannel != self) { // 转型 SocketChannel socketChannel = (SocketChannel) targetChannel; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer); } } } catch (IOException e) { e.printStackTrace(); } } // 监听 public void listen() { try { while (true) { int count = selector.select(2000); if (count > 0) { Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress() + "上线了!"); } if (key.isReadable()) { // 处理读方法 readData(key); } iterator.remove(); } } else { // System.out.println("等待。。。"); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { GroupChatServer server = new GroupChatServer(); server.listen(); } } ``` 客户端 ```java public class GroupChatClient { private final String HOST = "127.0.0.1"; private final int PORT = 8888; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() { try { selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + "准备就绪。。。"); } catch (Exception e) { e.printStackTrace(); } } public void sendInfo(String info) { info = username + " : " + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (Exception e) { e.printStackTrace(); } } public void readInfo() { try { int readChannels = selector.select(); if (readChannels > 0) { Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); sc.read(buffer); String msg = new String(buffer.array()); System.out.println(msg.trim()); } iterator.remove(); } } else { // System.out.println("没有可以用的通道。。。"); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { GroupChatClient client = new GroupChatClient(); new Thread() { public void run() { while (true) { client.readInfo(); try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } }.start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); client.sendInfo(msg); } } } ``` ## 3 零拷贝 1. 从操作系统的角度来说,因为内核缓冲区之间,没有数据是重复的 2. 零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的CPU缓存伪共享以及无CPU校验和计算 sendFile优化 2.1版本不是 2.4版本是零拷贝 **mmap 和 sendfile 的区别** - mmap 适合小数据量读写,sendFile 适合大文件传输 - mmap 需要4次上下文切换,3次数据拷贝:sendFile 需要3次上下文切换,最少2次数据拷贝 - sendFile 可以利用DMA方式,减少CPU拷贝,mmap 则不能(必须从内核拷贝到Socket缓冲区) **案例** ```java // 传统IO服务器端 public class OldIOServer { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(7001); while (true) { Socket socket = serverSocket.accept(); DataInputStream dataInputStream = new DataInputStream(socket.getInputStream()); try { byte[] bytes = new byte[4096]; while (true) { int readCount = dataInputStream.read(bytes, 0, bytes.length); if (-1 == readCount) { break; } } } catch (IOException e) { e.printStackTrace(); } } } } // 传统IO服务器端 public class OldIOClient { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 7001); String fileName = ""; InputStream inputStream = new FileInputStream(fileName); DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream()); byte[] bytes = new byte[4096]; long readCount; long total = 0; long startTime = System.currentTimeMillis(); while ((readCount = inputStream.read(bytes)) > 0) { total += readCount; dataOutputStream.write(bytes); } System.out.println("发送总字节数: " + total + " 耗时:" + (System.currentTimeMillis()-startTime)); dataOutputStream.close(); socket.close(); inputStream.close(); } } // 新 public class NewIOServer { public static void main(String[] args) throws Exception { InetSocketAddress address = new InetSocketAddress(7002); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ServerSocket socket = serverSocketChannel.socket(); socket.bind(address); ByteBuffer byteBuffer = ByteBuffer.allocate(4096); while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); int readCount = 0; while (-1 != readCount) { try { readCount = socketChannel.read(byteBuffer); } catch (IOException e) { e.printStackTrace(); break; } byteBuffer.rewind(); // 倒带 position = 0 mark = -1(作废) } } } } public class NewIOClient { public static void main(String[] args) throws Exception { InetSocketAddress address = new InetSocketAddress("127.0.0.1",7002); SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(address); String fileName = ""; FileChannel channel = new FileInputStream(fileName).getChannel(); long startTime = System.currentTimeMillis(); // linux 下一个transferTo方法就可以完成传输 // windows 下调用只能发8m,就需要分段传输文件,要注意传输位置 需要循环计算 // 使用零拷贝 long transferCount = channel.transferTo(0, channel.size(), socketChannel); System.out.println("发送总字节数: " + transferCount + " 耗时:" + (System.currentTimeMillis()-startTime)); channel.close(); } } ``` ## 4 AIO 了解 1. JDK 7 引入Asynchronous I/O ,即AIO.在进行I/O编程中,常用到两种模式;Reactor 和 Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应处理 2. AIO即NIO 2.0 ,叫异步不阻塞IO.AIO引入异步通道的概念,采用了proactor模式,简化了程序的编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用 3. 目前AIO没有被广泛应用,Netty也是基于NIO,而不是AIO | | BIO | NIO | AIO | | -------- | -------- | ---------------------- | ---------- | | IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 | | 编程难度 | 简单 | 复杂 | 负载 | | 可靠性 | 差 | 好 | 好 | | 吞吐量 | 低 | 高 | 高 | ## 5 Netty 概述 > 异步的基于事件驱动的网络应用的框架,用于快速开发高性能,高可靠的网络IO程序 **原生NIO存在的问题** 1. NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel,SocketChannel、ByteBuffer等。 2. 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到 Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。一 3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。 4. JDK NIO的Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector空轮询,最终导致CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决。 **Netty 的优点** Netty对JDK自带的NIO的API进行了封装,解决了上述问题。 1. 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池 2. 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK 5 (Netty3.x或6 (Netty 4.x)就足够了。 3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。 4. 安全:完整的SSL/TLS和StartTLS支持。 5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更多的新功能会被加入 **Netty版本说明** 1. netty版本分为netty3.x和netty4.x、netty5.x 2. 因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本 3. 目前在官网可下载的版本netty3.x netty4.0.x和netty4.1.x4) 4. 在本套课程中,我们讲解Netty4.1.x版本 5. netty下载地址:https://bintray.com/netty/downloads/nettyl ## 6 Netty 线程模型 ### 6.1 线程模型 > 传统阻塞I/O服务模型 和 Reactor模式(单Reactor单线程、单Reactor多线程、主从Reactor多线程) > > Netty基于主从Reactor多线程模型 **传统IO模型** 缺点 1. 当并发数很大,就会创建大量的线程,占用很大系统资源 2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费 **Reactor(反应器模式,分发者模式,通知者模式)** 针对传统阻塞I/o服务模型的2个缺点,解决方案: 1. 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理 2. 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。 ![原理](https://img-blog.csdnimg.cn/20210501161039399.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) **Reactor模式中核心组成:** 1. Reactor: Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对I0O事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人; 2. Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。 **单Reactor单线程** ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210501162223305.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) **单Reactor多线程** ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210501162326923.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) **方案说明** 1. Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发 2. 如果建立连接请求,则右Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件 3. 如果不是连接请求,则由reactor分发调用连接对 应的handler来处理 4. handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务 5. worker线程池会分配独立线程完成真正的业务,并将结果返回给handler 6. handler收到响应后,通过send将结果返回给client **主从Reactor多线程** ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210503161318137.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) **方案优缺点说明:** 1. 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。 2. 优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。 3. 缺点:编程复杂度较高 结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持 **Netty模型** ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210503171111410.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) ### 6.2 Netty简单案例 ```pom io.netty netty-all 4.1.20.Final ``` ```java // 服务端 public class SimpleNettyServer { public static void main(String[] args) throws InterruptedException { // 创建两个线程组 BossGroup 和 WorkerGroup // BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理 // 两个线程组都是无限循环 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(32); // 创建服务器端启动对象,配置参数 try { ServerBootstrap serverBootstrap = new ServerBootstrap(); // 链式编程 serverBootstrap.group(bossGroup,workerGroup) // 设置线程组 .channel(NioServerSocketChannel.class) // 设置NIO通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数 .childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态 .childHandler(new ChannelInitializer() { // 创建通道初始化对象 // 给pipeline 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new SimpleNettyServerHandler()); } }); // 给workerGroup管道设置处理器 System.out.println("服务器初始化完毕!!!"); // 启动服务器,并绑定端口,并且同步处理 ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); // 对关闭通道进行监听 (异步模型) channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } // 服务端处理器 public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取数据 * @param ctx 上下文对象,含有管道pipeline,通道channel,地址 * @param msg 客户端发送的数据 默认是Object * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName()); System.out.println("server ctx = " + ctx); // 将msg转为一个ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址为:" + ctx.channel().remoteAddress()); // 比如 这有一个非常耗时的业务 需要异步执行 // 解决方案1 用户自定义普通任务 taskQueue ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2",CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("服务端发生异常了!!!"); } } }); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { // 15 秒 Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵3",CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("服务端发生异常了!!!"); } } }); // 解决方案2 用户自定义定时任务 scheduleTaskQueue ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { // 20 秒 Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4",CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("服务端发生异常了!!!"); } } },5, TimeUnit.SECONDS); // 解决方案3 非当前Reactor 线程调用Channel的各种方法 System.out.println("Go to..."); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // write + flush 将数据写入缓冲并刷新 // 发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 发生异常,关闭通道 ctx.close(); } } // 客户端 public class SimpleNettyClient { public static void main(String[] args) throws InterruptedException { // 客户端需要一个事件循环组 EventLoopGroup eventExecutors = new NioEventLoopGroup(); // 创建客户端启动对象 try { Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new SimpleNettyClientHandler()); } }); System.out.println("客户端启动完成!!!"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } } // 客户端处理器 public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 通道就绪就可以发送消息 System.out.println("client active ctx = " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务器!",CharsetUtil.UTF_8)); } /** * 通道有读取数据时,会触发 * @param ctx 上下文对象,含有管道pipeline,通道channel,地址 * @param msg 客户端发送的数据 默认是Object * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("client read ctx = " + ctx); // 将msg转为一个ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器端地址为:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 发生异常,关闭通道 ctx.close(); } } ``` ## 7 Netty异步模型 **基本介绍** 1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。 2. Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。 3. 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。 4. Netty的异步模型是建立在future和callback的之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制) ## 8 Netty入门实例 Http服务 ```java public class TestHttpServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new TestHttpServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class TestHttpServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 向管道加入处理器 // 得到管道 ChannelPipeline pipeline = socketChannel.pipeline(); // 加入一个netty 提供的httpServerCodec => [coder - decoder] // HttpServerCodec 说明 // 1.HttpServerCodec 是netty提供的处理http的编码解码器 pipeline.addLast("MyHttpServerCodeC", new HttpServerCodec()); // 2.增加自定义handler pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler()); } } /** * 说明 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject * @author zhj */ public class TestHttpServerHandler extends SimpleChannelInboundHandler { // 读取客户端数据 @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { HttpRequest httpRequest = (HttpRequest) msg; URI uri = new URI(httpRequest.uri()); if ("/favicon.ico".equals(uri.getPath())) { System.out.println("请求了favicon.ico资源 不做处理!!!"); return; } // 每次请求都会产生新的 System.out.println("pipeline hashcode" + ctx.pipeline().hashCode()); System.out.println("TestHttpServerHandler hashcode" + this.hashCode()); System.out.println("msg 类型 : " + msg.getClass()); System.out.println("客户端地址 : " + ctx.channel().remoteAddress()); // 回复信息给浏览器 [http协议] ByteBuf content = Unpooled.copiedBuffer("Hello,服务器", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); // 将构建好 response 返回 ctx.writeAndFlush(response); } } } ``` ## 9 Netty 核心模块 **Bootstrap、ServerBootstrap** 1. Bootstrap意思是引导,一个Netty应用通常由一个 Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中 Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类 2. 常见的方法有 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoopGrouppublic B channel(Class channelClass),该方法用来设置一个服务器端的通道实现public B option(ChannelOption option, Tvalue),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption, Tvalue),用来给接收到的通道添加配置 public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) WorkerGroup public ServerBootstrap Handler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) BossGroup public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort),该方法用于客户端,用来连接服务器端 **Future、ChannelFuture** 1. Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和—ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件 2. 常见的方法有Channel channel(),返回当前正在进行IO操作的通道ChannelFutyre sync(),等待异步操作执行完毕 **Channel** 1. Netty 网络通信的组件,能够用于执行网络I/o操作。 2. 通过Channel可获得当前网络连接的通道的状态 3. 通过Channel可获得网络连接的配置参数(例如接收缓冲区大小) 4. Channel提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成 5. 调用立即返回一个 ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方 6. 支持关联I/O操作与对应的处理程序 7. 不同协议、不同的阻塞类型的连接都有不同的 Channel类型与之对应 常用的Channel类型: - NioSocketChannel,异步的客户端 TCP Socket 连接。 - NioServerSocketChannel,异步的服务器端TCP Socket连接。 - NioDatagramChannel,异步的UDP连接。 - NioSctpChannel,异步的客户端 Sctp连接。 - NioSctpServerChannel,异步的Sctp服务器端连接,这些通道涵盖了UDP和TCP网络IO以及文件IO。 **Selector** 1. Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的 Channel事件。 2. 当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select) 这些注册的Channel是否有己就绪的I/O事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个Channel ## 10 Netty 群聊 ```java // 服务端 public class GroupChatServer { private int port; public GroupChatServer(int port) { this.port = port; } public void run() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 加入解码器 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new GroupChatServerHandler()); } }); ChannelFuture cf = serverBootstrap.bind(port).sync(); System.out.println("【服务器】启动完成~"); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GroupChatServer(7000).run(); } } public class GroupChatServerHandler extends SimpleChannelInboundHandler { // 定义一个channelGroup 管理所有的channel // GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单列 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 私聊 // private static Map channelMap = new HashMap<>(); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 连接建立,第一个被执行 将 channel 加入 channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // 将客户加入聊天的信息推送其他客户端 // 不需要自己遍历 channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 加入聊天室。" + sdf.format(new Date()) + "\n"); channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // 可以自动执行 channelGroup.remove(channel); channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 离开聊天室。" + sdf.format(new Date()) + "\n"); } // channel 处于活动状态 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 上线了。\n"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 跑路了。\n"); } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch -> { if (channel != ch) { ch.writeAndFlush("【客户端】" + channel.remoteAddress() + " : " + s + "\n"); } else { // 回显 ch.writeAndFlush("【我】" + " : " + s + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("【服务端】 开小差了~"); ctx.close(); } } // 客户端 public class GroupChatClient { private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; } public void run() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(bossGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 加入解码器 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture cf = bootstrap.connect(host, port).sync(); Channel channel = cf.channel(); System.out.println("【客户端】" + channel.localAddress() + " 启动完成~"); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); channel.writeAndFlush(msg + "\r\n"); } } finally { bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GroupChatClient("127.0.0.1", 7000).run(); } } public class GroupChatClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(s.trim()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("【客户端】 开小差了~"); ctx.close(); } } ``` ## 11 Netty 实现Websocket **改变http协议的状态码为101,升级成为ws协议实现全双工长连接通信** 服务端 ```java package com.zhj.test.netty.websocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * @author zhj */ public class WebSocketServer { public static void main(String[] args) throws InterruptedException { // 创建两个线程组 BossGroup 和 WorkerGroup // BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理 // 两个线程组都是无限循环 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建服务器端启动对象,配置参数 try { ServerBootstrap serverBootstrap = new ServerBootstrap(); // 链式编程 serverBootstrap.group(bossGroup,workerGroup) // 设置线程组 .channel(NioServerSocketChannel.class) // 设置NIO通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数 .childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { // 创建通道初始化对象 // 给pipeline 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 基于Http协议的,添加Http编解码器 pipeline.addLast(new HttpServerCodec()); // 以块的方式写,添加ChunkedWriteHandler pipeline.addLast(new ChunkedWriteHandler()); // http数据在传输是分段的 HttpObjectAggregator 可以将多段聚合 pipeline.addLast(new HttpObjectAggregator(8192)); // websocket 数据以帧(frame)形式传递 // websocket 有六个子类 // 浏览器请求时 ws://localhost:7000/xxx 表示请求的uri // websocket 核心功能是将http协议升级为ws 保持长连接 pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); // 自定义handler ,处理业务逻辑 pipeline.addLast(new WebSocketHandler()); } }); // 给workerGroup管道设置处理器 System.out.println("服务器初始化完毕!!!"); // 启动服务器,并绑定端口,并且同步处理 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); // 对关闭通道进行监听 (异步模型) channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ``` ```java package com.zhj.test.netty.websocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; /** * 这里TextWebSocketFrame 类型,表示一个文本帧(frame) * @author zhj */ public class WebSocketHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服务器接收消息:" + msg.text()); // 回复消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器响应时间:" + LocalDateTime.now() + msg.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // id 表示唯一的一个值 System.out.println("handler added 被调用 " + ctx.channel().id().asLongText()); System.out.println("handler added 被调用 " + ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handler removed 被调用 " + ctx.channel().id().asLongText()); System.out.println("handler removed 被调用 " + ctx.channel().id().asShortText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生 " + cause.getMessage()); ctx.close(); } } ``` 客户端 ```html websocket
``` ## 12 Protobuf 序列化数据 **Protobuf基本介绍** 1. Protobuf是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC[远程过程调用remote procedure call ]数据交换格式。 **自前很多公司http+json ---> tcp+protobuf** 2. 参考文档:https://developers.google.com/protocol-buffers/docs/proto 语言指南 3. Protobuf是以 message的方式来管理数据的. 4. 支持跨平台、跨语言,即客户端和服务器端可以是不同的语言编写的(支持目前绝 大多数语言,例如C++、C#、Java、python等) 5. 高性能,高可靠性 6. 使用propobuf编译能自动生成代码,Protobuf是将类定义使用.proto文件进行描述。说明,在idea中编写.proto文件时,会自动提示是否下载.ptotot编写插件.可以让语法高亮。 7. 然后通过protoc.exe编译器根据.proto自动生成.java文件 8. protobuf使用 user.proto -> protoc.exe ->user.java 编码 传递二进制 服务端解码 ```doc protoc.exe --java_out=. Student.proto ``` ```protobuf syntax = "proto3"; // 版本 option java_outer_classname = "StudentPOJO"; // 生成的外部类名 同时也是文件名 // protobuf 使用message 管理数据 message Student { // 内部类 真正发送的POJO对象 1表示属性序号 int32 id = 1; string name = 2; } syntax = "proto3"; // 版本 option optimize_for = SPEED; // 加快解析 option java_package = "com.zhj.test.netty.codec2"; // 指生成到哪个包下 option java_outer_classname = "DataInfo"; // 生成的外部类名 同时也是文件名 // protobuf 使用message 管理数据 其他的message message MyMessage { // 定义一个枚举 enum DataType { StudentType = 0; // 在proto3 要求enum 编号从0开始 workerType = 1; } // 用data_type来标识传的是哪一个枚举类型 DataType data_type = 1; // 表示每次枚举类型只能出现其中的一个,节省空间 oneof dataBody { Student student = 2; Worker worker = 3; } } message Student { // 内部类 真正发送的POJO对象 1表示属性序号 int32 id = 1; string name = 2; } message Worker { string name = 1; int32 age = 2; } ``` ## 13 Netty 编解码 > 入站先解码,再执行自己的业务处理器,出站先执行自己的业务处理器,再编码 解码器-ReplayingDecoder 1. public abstract class ReplayingDecoder extends ByteToMessageDecoder 2. ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数T指定了用户状态管理的类型,其中Void代表不需要状态管理 3. ReplayingDecoder使用方便,但它也有一些局限性:并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedoperationException。ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢 其它解码器 1. LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(In或者Irln)作为分隔符来解析数据。 2. DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。 3. HttpObjectDecoder:一个HTTP数据的解码器 4. LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。 ## 14 TCP粘包和拆包基本介绍 1. TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有—一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为**面向流的通信是无消息保护边界的** 2. 由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210505134858828.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) 传输对象(协议包) ```java package com.zhj.test.netty.protocoltcp; import java.util.Arrays; /** * 协议包 * @author zhj */ public class MessageProtocol { private int len; private byte[] content; public MessageProtocol() { } public MessageProtocol(int len, byte[] content) { this.len = len; this.content = content; } public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } @Override public String toString() { return "MessageProtocol{" + "len=" + len + ", content=" + Arrays.toString(content) + '}'; } } ``` 编解码器 ```java package com.zhj.test.netty.protocoltcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; /** * @author zhj */ public class MyMessageDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { System.out.println("MyMessageDecoder decode 方法被调用"); int length = byteBuf.readInt(); byte[] content = new byte[length]; byteBuf.readBytes(content); MessageProtocol messageProtocol = new MessageProtocol(length, content); list.add(messageProtocol); } } package com.zhj.test.netty.protocoltcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * @author zhj */ public class MyMessageEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception { System.out.println("MyMessageEncoder encoder方法被调用"); byteBuf.writeInt(messageProtocol.getLen()); byteBuf.writeBytes(messageProtocol.getContent()); } } ``` 服务端 ```java package com.zhj.test.netty.protocoltcp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author zhj */ public class TcpClient { public static void main(String[] args) throws InterruptedException { // 客户端需要一个事件循环组 EventLoopGroup eventExecutors = new NioEventLoopGroup(); // 创建客户端启动对象 try { Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new TcpClientInitializer()); System.out.println("客户端启动完成!!!"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } } package com.zhj.test.netty.protocoltcp; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @author zhj */ public class TcpServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 向管道加入处理器 // 得到管道 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyMessageEncoder()); // 2.增加自定义handler pipeline.addLast("MyTcpServerHandler", new TcpServerHandler()); } } package com.zhj.test.netty.protocoltcp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import java.nio.charset.StandardCharsets; import java.util.UUID; /** * 说明 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject * @author zhj */ public class TcpServerHandler extends SimpleChannelInboundHandler { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { // System.out.println(new String(msg.array(), CharsetUtil.UTF_8)); System.out.println("服务端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8)); System.out.println("服务端接收的数据长度 " + msg.getLen()); System.out.println("服务器接收到的消息量:" + (++this.count)); String responseContent = UUID.randomUUID().toString(); int len = responseContent.getBytes(StandardCharsets.UTF_8).length; MessageProtocol messageProtocol = new MessageProtocol(len, responseContent.getBytes(StandardCharsets.UTF_8)); ctx.writeAndFlush(messageProtocol); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务器异常:" + cause.getMessage()); ctx.close(); } } ``` 客户端 ```java package com.zhj.test.netty.protocoltcp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author zhj */ public class TcpClient { public static void main(String[] args) throws InterruptedException { // 客户端需要一个事件循环组 EventLoopGroup eventExecutors = new NioEventLoopGroup(); // 创建客户端启动对象 try { Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new TcpClientInitializer()); System.out.println("客户端启动完成!!!"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } } package com.zhj.test.netty.protocoltcp; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @author zhj */ public class TcpClientInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 向管道加入处理器 // 得到管道 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyMessageDecoder()); // 2.增加自定义handler pipeline.addLast("MyTestTcpClientHandler", new TcpClientHandler()); } } package com.zhj.test.netty.protocoltcp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import java.nio.charset.StandardCharsets; /** * 说明 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject * @author zhj */ public class TcpClientHandler extends SimpleChannelInboundHandler { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 使用客户端发送10条数据 hello server for (int i = 0; i < 60; i++) { String mes = "今天真带劲。。。"; byte[] content = mes.getBytes(StandardCharsets.UTF_8); int length = mes.getBytes(StandardCharsets.UTF_8).length; MessageProtocol messageProtocol = new MessageProtocol(length, content); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { System.out.println("客户端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8)); System.out.println("客户端接收的数据长度 " + msg.getLen()); System.out.println("客户端接收到的消息量:" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常信息:" + cause.getMessage()); ctx.close(); } } ``` ## 15 RPC 调用流程 RPC 基本介绍 1. RPC (Remote Procedure Call) —远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程 2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样 3. 常见的RPC框架:Dubbo、google 的 gRPC、Go语言的rpxc、Apache的thrift,Spring旗下的Spring Cloud ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210505171957266.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poajUyNjY2,size_16,color_FFFFFF,t_70) **自己实现dubbo RPC(基于Netty)需求说明** 1. dubbo底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框 2. 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.x **设计说明** 1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。 2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。 3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据 代码实现: