diff --git a/ext/jt/jt-808-server-spring-boot-starter-reactive/src/main/java/io/github/hylexus/xtream/codec/ext/jt808/extensions/handler/Jt808InstructionServerTcpHandlerAdapter.java b/ext/jt/jt-808-server-spring-boot-starter-reactive/src/main/java/io/github/hylexus/xtream/codec/ext/jt808/extensions/handler/Jt808InstructionServerTcpHandlerAdapter.java index c75f8f101758ed80d3d9696eee45d2f747201e78..c328f04c25e0f5f8d6a0fd2434580abed6869c89 100644 --- a/ext/jt/jt-808-server-spring-boot-starter-reactive/src/main/java/io/github/hylexus/xtream/codec/ext/jt808/extensions/handler/Jt808InstructionServerTcpHandlerAdapter.java +++ b/ext/jt/jt-808-server-spring-boot-starter-reactive/src/main/java/io/github/hylexus/xtream/codec/ext/jt808/extensions/handler/Jt808InstructionServerTcpHandlerAdapter.java @@ -33,6 +33,8 @@ import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import java.net.InetSocketAddress; +import java.net.SocketException; +import java.util.function.Consumer; /** * @author hylexus @@ -60,11 +62,43 @@ public class Jt808InstructionServerTcpHandlerAdapter extends DefaultTcpXtreamNet log.error("Unexpected Exception", throwable); return Mono.empty(); }); + }).doOnError(SocketException.class, throwable -> { + this.doWithSessionManager(manager -> { + boolean closed = false; + Channel channel = inboundInfo.channel(); + if (channel == null) { + return; + } + try { + final String sessionId = sessionManager.sessionIdGenerator().generateTcpSessionId(channel); + closed = sessionManager.closeSessionById(sessionId, XtreamSessionEventListener.DefaultSessionCloseReason.CLOSED_BY_CLIENT); + } finally { + if (!closed) { + closeAndIgnoreException(channel, "Close channel because of [Socket Exception]: {}"); + } + } + }); }).onErrorResume(throwable -> { log.error("Unexpected Error", throwable); return Mono.empty(); }); } + protected void doWithSessionManager(Consumer> consumer) { + if (this.sessionManager != null) { + consumer.accept(this.sessionManager); + } else { + throw new IllegalStateException("No session manager found"); + } + } + + private static void closeAndIgnoreException(Channel channel, String message) { + try { + log.info(message, channel); + channel.close(); + } catch (Exception ignored) { + // ignored + } + } @Override protected Mono handleSingleRequest(NettyInbound nettyInbound, NettyOutbound nettyOutbound, ByteBuf payload, InboundInfo inboundInfo) {