# chatIM **Repository Path**: Boeing-777/chatIM ## Basic Information - **Project Name**: chatIM - **Description**: chatIM是一个通过netty和websocket实现IM项目,主要实现两人聊天功能 Vue+SpringBoot Netty + WebSocket 实现即时通讯 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-01-07 - **Last Updated**: 2024-03-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # chatIM是一个通过netty和websocket实现IM项目,主要实现两人聊天功能 ## 一、技术栈 ### 前端 - vue3 - Vant4 UI - axios 请求库 - Vue Router 路由管理 ### 后端 - Java 8 + Spring Boot 框架(spring boot 2.6.7) - Spring MVC + Mybatis Plus 框架 - Knife4j + Swagger 生成接口文档 - MySQL 8.x (数据存储) + Redis(缓存) - **Netty + WebSocket 实现即时通讯** **(主要学习)** - RabbitMQ 消息队列 - MinIO 实现文件存储 ## 二、 效果展示 ![登录页面](%E6%95%88%E6%9E%9C%E5%B1%95%E7%A4%BA%E5%9B%BE%E7%89%87%E6%88%AA%E5%B1%8F2024-03-18%2020.18.59.png) ![聊天功能](%E6%95%88%E6%9E%9C%E5%B1%95%E7%A4%BA%E5%9B%BE%E7%89%87%E6%88%AA%E5%B1%8F2024-03-18%2020.42.10.png) ![输入图片说明](%E6%95%88%E6%9E%9C%E5%B1%95%E7%A4%BA%E5%9B%BE%E7%89%87%E6%88%AA%E5%B1%8F2024-03-18%2020.42.25.png) ## 三、WebSocket模块 ### 服务端推送web方案 #### 1.短轮询 短轮询,就是web端不停地间隔一段时间向服务端发一个 HTTP 请求,如果有新消息,就会在某次请求返回。 ![输入图片说明](%E5%9B%BE%E7%89%8782e47e8a6bb399b32db496db7b948f0e.png) #####适用场景: (1)扫码登录:短时间内频繁查询二维码状态 (2)小OA系统:客户端使用量不大的情况下可以使用 缺点: (1) 大量无效请求:大量的无效请求,浪费服务器资源 (2) 服务端请求压力大:万人群聊频繁访问,上万并发服务扛不住。 #### 2.长轮询 长轮询和短轮询相比,一个最大的改进之处在于: (1) 短轮询模式下,服务端不管本轮有没有新消息产生,都会马上响应并返回。而长轮询模式当本次请求没有获取到新消息时,并不会马上结束返回,而是会在服务端“悬挂(hang)”,等待一段时间; (2)如果在等待的这段时间内有新消息产生,就能马上响应返回。 这也意味着web端的请求超时时长得设置长一些。 优点:相比短轮询模式 (1)大幅降低短轮询模式中客户端高频无用的轮询导致的网络开销和功耗开销 (2)降低了服务端处理请求的 QPS 缺点: (1)无效请求:长轮询在超时时间内没有获取到消息时,会结束返回,因此仍然没有完全解决客户端“无效”请求的问题。 (2)服务端压力大:服务端悬挂(hang)住请求,只是降低了入口请求的 QPS,并没有减少对后端资源轮询的压力。假如有 1000 个请求在等待消息,可能意味着有 1000 个线程在不断轮询消息存储资源。(轮询转移到了后端) #### 3.Websocket长连接 长轮询和短轮询都算作是服务端没法主动向客户端推送的一种曲线救国的方式,那最好的方案,就是能不能解决这个问题,因此诞生了websocket。 实现原理:客户端和服务器之间维持一个 TCP/IP 长连接,全双工通道。 ![输入图片说明](%E5%9B%BE%E7%89%87f4b91d0c1f1eb6c68733bcf55d1686d3.png) 基本弥补了上面的缺点,唯一的缺点就是实现起来可能会有些复杂,我们需要去管理链接 #### 4、为啥我选netty不用tomcat? 1. netty是nio基于事件驱动的多路复用框架,使用单线程或少量线程处理大量的并发连接。相比之下,Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/O)模型。所以这个点并不是重点。 2. Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理器,可以轻松处理复杂的协议和数据格式。Netty 的扩展性也非常好,可以根据需要添加自定义的组件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连接的状态。这些都是netty的优势。 #### 5、netty实现websocket ``` public void run() throws InterruptedException { // 服务器启动引导对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //30秒客户端没有向服务器发送心跳则关闭连接 pipeline.addLast(new IdleStateHandler(30, 0, 0)); // 因为使用http协议,所以需要使用http的编码器,解码器 pipeline.addLast(new HttpServerCodec()); // 以块方式写,添加 chunkedWriter 处理器 pipeline.addLast(new ChunkedWriteHandler()); /** * 说明: * 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来; * 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因 */ pipeline.addLast(new HttpObjectAggregator(8192)); //保存用户ip pipeline.addLast(new HttpHeadersHandler()); /** * 说明: * 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的; * 2. 可以看到 WebSocketFrame 下面有6个子类 * 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri * 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接; * 是通过一个状态码 101 来切换的 */ pipeline.addLast(new WebSocketServerProtocolHandler("/")); // 自定义handler ,处理业务逻辑 pipeline.addLast(new NettyWebSocketServerHandler()); } }); // 启动服务器,监听端口,阻塞直到启动成功 serverBootstrap.bind(WEB_SOCKET_PORT).sync(); System.out.println("启动成功"); } ``` 明白了websocket的升级过程,对netty的处理的就比较简单了。websocket初期是通过http请求,进行升级,建立双方的连接。 1.所以编解码器需要用到HttpServerCodec。 2.WebSocketServerProtocolHandler是netty进行websocket升级的处理器。在这期间会抹除http相关的信息,比如请求头啥的。如果想获取相关信息,需要在这之前获取。 3.HttpHeadersHandler是我们自己的处理器。赶在websocket升级之前,获取用户的ip地址,然后保存到channel的附件里。 4.NettyWebSocketServerHandler是我们的业务处理器,里面处理客户端的事件。 5.IdleStateHandler实现心跳检测。 #### 5、获取用户ip和连接token 我们获取用户ip的点有两处,注册和连接认证。两处都是需要从socket建立连接的时候,赶在协议升级前,保存用户的ip地址。 ``` public class HttpHeadersHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { FullHttpRequest request = (FullHttpRequest) msg; UrlBuilder urlBuilder = UrlBuilder.ofHttp(request.uri()); // 获取token参数 String token = Optional.ofNullable(urlBuilder.getQuery()).map(k->k.get("token")).map(CharSequence::toString).orElse(""); NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); // 获取请求路径 request.setUri(urlBuilder.getPath().toString()); HttpHeaders headers = request.headers(); String ip = headers.get("X-Real-IP"); if (StringUtils.isEmpty(ip)) {//如果没经过nginx,就直接获取远端地址 InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); ip = address.getAddress().getHostAddress(); } NettyUtil.setAttr(ctx.channel(), NettyUtil.IP, ip); ctx.pipeline().remove(this); ctx.fireChannelRead(request); }else { ctx.fireChannelRead(msg); } } } ``` 在协议升级前,消息体还是FullHttpRequest类型的,这个if逻辑只会走一次,以后都不会了。所以我们要趁着在升级前,获取请求头里的ip,保存到channel的附件中,以后需要的时候都能提取。 token的原理我们会在后面的握手认证介绍。 #### 6、心跳包 如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。 直接接入netty的现有组件new IdleStateHandler(30, 0, 0)可以实现30秒链接没有读请求,就主动关闭链接。我们的web前端需要保持每10s发送一个心跳包。 #### 7、请求处理 自己实现的处理器NettyWebSocketServerHandler接受websocket信息。根据消息类型进行路由处理。 目前请求对websocket依赖很低,只做这一件事 ``` @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { WSBaseReq wsBaseReq = JSONUtil.toBean(msg.text(), WSBaseReq.class); WSReqTypeEnum wsReqTypeEnum = WSReqTypeEnum.of(wsBaseReq.getType()); switch (wsReqTypeEnum) { case LOGIN: this.webSocketService.handleLoginReq(ctx.channel()); log.info("请求二维码 = " + msg.text()); break; case HEARTBEAT: break; default: log.info("未知类型"); } } ``` ## 四、IM顶层设计 ### 消息时序性 消息时序性,主要解决的就是消息展示的顺序问题。它为啥那么难?如果用户a给用户b同时发了三条消息 aa bb cc。而服务端接收到的消息是并发的,可能入库就是aa cc bb。 就产生了发送方顺序和接收方顺序不一致的情况。 ![输入图片说明](%E5%9B%BE%E7%89%87%E6%88%AA%E5%B1%8F2024-03-18%2022.15.32.png) ### 客户端排序 这时候可以让a发送的每条消息带上时间戳,服务存储a的消息时间戳。b的展示根据时间戳排序。 ![输入图片说明](%E5%9B%BE%E7%89%87image.png) 参考腾讯sdk的实现。 给消息设置一个本地的自增id,发送消息的时候带上。排序整体以服务器的时间为准,相同秒内的排序以自增id为准。 ![输入图片说明](%E5%9B%BE%E7%89%87image%20(1).png) 通常没有绝对的客户端排序,单聊场景可以用客户端seq。保证单位时间内多条消息的顺序性。 消息不仅要保证时序性,也要保证唯一性。通常用消息id一个字段满足两个需求。 ### 消息id 消息id被我们给予厚望,不仅要唯一,还要有序(递增) 保证唯一很简单,随便一个分布式id都能实现。 消息在整个IM系统都是唯一且递增的。一般对于单表来说主键就自然保证了递增。但是如果消息量大了,省不了分库分表,分库分表后的消息递增,通常采用分布式id。但是分布式id通常保证的是趋势递增,而不是单调递增。 ### 消息可靠ACK 消息时序性和可靠性是IM产品最重要的功能。如果你发出去的消息,都不能确保对方能不能收到。那谁还会用这个软件呢? IM的消息发送一般分为两个场景。 1、发送方发送消息给服务端,服务端入库成功返回ack 2、服务端推送消息给接收方,接收方返回ack ## 五、总结 **仅仅只是实现运行了被人的代码,并试图理解这个过程。 群聊这个过程还无法理解,希望以后有时间再研究**