From 8ae797293ec9bf1901d90415de397e8785fb57c7 Mon Sep 17 00:00:00 2001 From: "ruizhi.zhao" Date: Tue, 8 Feb 2022 15:39:09 +0800 Subject: [PATCH 1/2] fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 50e146c..9e7f8a0 100644 --- a/README.md +++ b/README.md @@ -11,4 +11,4 @@ keytool -genkey -alias myClientAlias1 -keysize 2048 -validity 365 -keyalg RSA -d keytool -import -trustcacerts -alias mySrvAlias1 -file yqServer.cer -storepass cstorepass654 -keystore yqClient.jks keytool -export -alias myClientAlias1 -keystore yqClient.jks -storepass cstorepass654 -file yqClient.cer keytool -import -trustcacerts -alias myClientSelfAlias -file yqClient.cer -storepass sstorepass456 -keystore yqServer.jks -#add fix +#add fix model -- Gitee From 085b223cd163845743e19798ef5fa29638b73312 Mon Sep 17 00:00:00 2001 From: "ruizhi.zhao" Date: Wed, 9 Feb 2022 15:09:33 +0800 Subject: [PATCH 2/2] add heartbeat --- .../fx/client/netty/ConnectionListener.java | 42 +++++++++++++++ .../rz/fx/client/netty/ProtobufClient.java | 28 ++++++++-- .../netty/handler/HeartBeatHandler.java | 53 +++++++++++++++++++ .../java/com/rz/zim/netty/ProtobufServer.java | 3 +- .../netty/handler/HeartBeatServerHandler.java | 37 +++---------- 5 files changed, 129 insertions(+), 34 deletions(-) create mode 100644 fx/src/main/java/com/rz/fx/client/netty/ConnectionListener.java create mode 100644 fx/src/main/java/com/rz/fx/client/netty/handler/HeartBeatHandler.java diff --git a/fx/src/main/java/com/rz/fx/client/netty/ConnectionListener.java b/fx/src/main/java/com/rz/fx/client/netty/ConnectionListener.java new file mode 100644 index 0000000..662227e --- /dev/null +++ b/fx/src/main/java/com/rz/fx/client/netty/ConnectionListener.java @@ -0,0 +1,42 @@ +package com.rz.fx.client.netty; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoop; +import javafx.fxml.FXML; +import javafx.scene.control.TextField; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * @Description + * @Author ruizhi.zhao + * @Version V1.0.0 + * @Since 1.0 + * @Date 2022/2/9 + */ +@Slf4j +@Component +public class ConnectionListener implements ChannelFutureListener { + @Autowired + ProtobufClient protobufClient; + @FXML + private TextField server; + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (!channelFuture.isSuccess()) { + log.warn("-------------客户端重新连接-----------------"); + final EventLoop loop = channelFuture.channel().eventLoop(); + loop.schedule(new Runnable() { + @SneakyThrows + @Override + public void run() { + protobufClient.doConnect(server.getText()); + } + }, 1L, TimeUnit.SECONDS); + }} +} diff --git a/fx/src/main/java/com/rz/fx/client/netty/ProtobufClient.java b/fx/src/main/java/com/rz/fx/client/netty/ProtobufClient.java index d459b80..20a87be 100644 --- a/fx/src/main/java/com/rz/fx/client/netty/ProtobufClient.java +++ b/fx/src/main/java/com/rz/fx/client/netty/ProtobufClient.java @@ -9,6 +9,7 @@ package com.rz.fx.client.netty; */ import com.rz.fx.client.netty.handler.ChatMsgHandler; +import com.rz.fx.client.netty.handler.HeartBeatHandler; import common.codec.ProtobufDecoder; import common.codec.ProtobufEncoder; import common.ssl.SSLContextFactory; @@ -16,13 +17,18 @@ import config.SSLConfig; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -78,16 +84,16 @@ public class ProtobufClient implements InitializingBean, DisposableBean { sslEngine.setUseClientMode(true); sslEngine.setNeedClientAuth(SSLConfig.isNeedClientAuth()); ch.pipeline().addFirst("ssl", new SslHandler(sslEngine)); + ch.pipeline().addFirst("ping", new IdleStateHandler(30,60,90,TimeUnit.SECONDS)); ch.pipeline().addLast("decoder", new ProtobufDecoder()); ch.pipeline().addLast("encoder", new ProtobufEncoder()); + ch.pipeline().addLast("heartbeat", new HeartBeatHandler()); ch.pipeline().addLast("chatMsgHandler", chatMsgHandler); - //ch.pipeline().addLast("exceptionHandler",exceptionHandler); } }); - // Start the connection attempt. - // Start the client. String[] address = server.split(":"); - Channel ch = b.connect(new InetSocketAddress(address[0], Integer.parseInt(address[1]))).sync().channel(); + Channel ch = b.connect(new InetSocketAddress(address[0], Integer.parseInt(address[1]))).addListeners(new ConnectionListener() + ).sync().channel(); ProtobufClient.ch = ch; ProtobufClient.group = group; ProtobufClient.init.set(true); @@ -122,6 +128,20 @@ public class ProtobufClient implements InitializingBean, DisposableBean { return mb.build(); } + private ProtoMsg.Message buildHeartBeat(String content) { + ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder(); + ProtoMsg.MessageHeartBeat.Builder builder = ProtoMsg.MessageHeartBeat.newBuilder(); + builder.setUid(content); + builder.setSeq(1); + builder.setJson(""); + + ProtoMsg.MessageHeartBeat request = builder.build(); + mb.setHeartBeat(request); + mb.setType(ProtoMsg.HeadType.HEART_BEAT); + mb.setSessionId(""); + return mb.build(); + } + @Override public void destroy() throws Exception { diff --git a/fx/src/main/java/com/rz/fx/client/netty/handler/HeartBeatHandler.java b/fx/src/main/java/com/rz/fx/client/netty/handler/HeartBeatHandler.java new file mode 100644 index 0000000..d1c28ba --- /dev/null +++ b/fx/src/main/java/com/rz/fx/client/netty/handler/HeartBeatHandler.java @@ -0,0 +1,53 @@ +package com.rz.fx.client.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; +import proto.ProtoMsg; + +/** + * @Description + * @Author ruizhi.zhao + * @Version V1.0.0 + * @Since 1.0 + * @Date 2022/2/9 + */ +public class HeartBeatHandler extends ChannelInboundHandlerAdapter { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + // IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑. + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + switch (e.state()) { + case READER_IDLE: + break; + case WRITER_IDLE: + break; + case ALL_IDLE: + handleAllIdle(ctx); + break; + default: + break; + } + } + } + + private void handleAllIdle(ChannelHandlerContext ctx) { + ProtoMsg.Message heartBeat = buildHeartBeat("11111111", "ping"); + ctx.write(heartBeat); + } + + + private ProtoMsg.Message buildHeartBeat(String uid, String content) { + ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder(); + ProtoMsg.MessageHeartBeat.Builder builder = ProtoMsg.MessageHeartBeat.newBuilder(); + builder.setUid(content); + builder.setSeq(1); + builder.setJson("ping"); + ProtoMsg.MessageHeartBeat request = builder.build(); + mb.setHeartBeat(request); + mb.setType(ProtoMsg.HeadType.HEART_BEAT); + mb.setSessionId(content); + return mb.build(); + } +} diff --git a/server/src/main/java/com/rz/zim/netty/ProtobufServer.java b/server/src/main/java/com/rz/zim/netty/ProtobufServer.java index 6b6ccda..9ea24e9 100644 --- a/server/src/main/java/com/rz/zim/netty/ProtobufServer.java +++ b/server/src/main/java/com/rz/zim/netty/ProtobufServer.java @@ -4,6 +4,7 @@ import com.rz.zim.config.ProtobufProperties; import com.rz.zim.netty.engine.EpollProvider; import com.rz.zim.netty.engine.KQueueProvider; import com.rz.zim.netty.handler.ChatRedirectHandler; +import com.rz.zim.netty.handler.HeartBeatServerHandler; import com.rz.zim.netty.handler.LoginHandler; import common.ssl.SSLContextFactory; import common.codec.ProtobufDecoder; @@ -131,7 +132,7 @@ public class ProtobufServer implements DisposableBean { ch.pipeline().addLast("deCoder", new ProtobufDecoder()); ch.pipeline().addLast("enCoder", new ProtobufEncoder()); ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0)); - //ch.pipeline().addLast("heartBeat",new HeartBeatServerHandler()); + ch.pipeline().addLast("heartBeat",new HeartBeatServerHandler()); //ch.pipeline().addFirst("bizHandler",bizHandler); ch.pipeline().addLast("loginHandler", loginHandler); ch.pipeline().addLast("chatRedirect", chatRedirectHandler); diff --git a/server/src/main/java/com/rz/zim/netty/handler/HeartBeatServerHandler.java b/server/src/main/java/com/rz/zim/netty/handler/HeartBeatServerHandler.java index 2a1b92f..5466cf9 100644 --- a/server/src/main/java/com/rz/zim/netty/handler/HeartBeatServerHandler.java +++ b/server/src/main/java/com/rz/zim/netty/handler/HeartBeatServerHandler.java @@ -3,6 +3,7 @@ package com.rz.zim.netty.handler; import common.FutureTaskScheduler; import common.ServerConstants; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; @@ -18,41 +19,19 @@ import java.util.concurrent.TimeUnit; * @Date 2021/6/14 */ @Slf4j -public class HeartBeatServerHandler extends IdleStateHandler { +public class HeartBeatServerHandler extends SimpleChannelInboundHandler { - private static final int READ_IDLE_GAP = 150; - public HeartBeatServerHandler() { - super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS); + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { } - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - //判断消息实例 - if (null == msg || !(msg instanceof ProtoMsg.Message)) { - super.channelRead(ctx, msg); - return; - } - - ProtoMsg.Message pkg = (ProtoMsg.Message) msg; - //判断消息类型 - ProtoMsg.HeadType headType = pkg.getType(); - if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) { - //异步处理,将心跳包,直接回复给客户端 - FutureTaskScheduler.add(() -> { - if (ctx.channel().isActive()) { - ctx.writeAndFlush(msg); - } - }); + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if(evt instanceof IdleStateEvent){ } - super.channelRead(ctx, msg); - - } - - @Override - protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { - log.info(READ_IDLE_GAP + "秒内未读到数据,关闭连接 {}",ctx.channel().attr(ServerConstants.CHANNEL_NAME).get()); + super.userEventTriggered(ctx, evt); } } -- Gitee