From 6bc3726f8287ef1f1c85108ccfbae3275e27076f Mon Sep 17 00:00:00 2001 From: shizhili Date: Fri, 8 Dec 2023 16:32:05 +0800 Subject: [PATCH] backport Fix channel connect issue --- ...5-backport-Fix-channel-connect-issue.patch | 84 +++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 patch025-backport-Fix-channel-connect-issue.patch diff --git a/patch025-backport-Fix-channel-connect-issue.patch b/patch025-backport-Fix-channel-connect-issue.patch new file mode 100644 index 0000000..ce87daf --- /dev/null +++ b/patch025-backport-Fix-channel-connect-issue.patch @@ -0,0 +1,84 @@ +From d73b6013825db9124e39a37db67094e34b9c3d88 Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Mon, 16 Oct 2023 19:06:40 +0800 +Subject: [PATCH] [ISSUE #7330] Fix channel connect issue for goaway (#7467) + +* add waitChannelFuture for goaway + +* add body for retry channel +--- + .../remoting/netty/NettyRemotingClient.java | 41 +++++++++++++------ + 1 file changed, 28 insertions(+), 13 deletions(-) + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +index 4bc51bd83..340daee67 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +@@ -716,20 +716,25 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + } + + if (cw != null) { +- ChannelFuture channelFuture = cw.getChannelFuture(); +- if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { +- if (cw.isOK()) { +- LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); +- return cw.getChannel(); +- } else { +- LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString()); +- } ++ return waitChannelFuture(addr, cw); ++ } ++ ++ return null; ++ } ++ ++ private Channel waitChannelFuture(String addr, ChannelWrapper cw) { ++ ChannelFuture channelFuture = cw.getChannelFuture(); ++ if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { ++ if (cw.isOK()) { ++ LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); ++ return cw.getChannel(); + } else { +- LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), +- channelFuture.toString()); ++ LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString()); + } ++ } else { ++ LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), ++ channelFuture.toString()); + } +- + return null; + } + +@@ -818,8 +823,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); + stopwatch.stop(); + RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); +- Channel retryChannel = channelWrapper.getChannel(); +- if (channel != retryChannel) { ++ retryRequest.setBody(request.getBody()); ++ Channel retryChannel; ++ if (channelWrapper.isOK()) { ++ retryChannel = channelWrapper.getChannel(); ++ } else { ++ retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper); ++ } ++ if (retryChannel != null && channel != retryChannel) { + return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); + } + } +@@ -994,6 +1005,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + this.lastResponseTime = System.currentTimeMillis(); + } + ++ public String getChannelAddress() { ++ return channelAddress; ++ } ++ + public boolean reconnect() { + if (lock.writeLock().tryLock()) { + try { +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index f2fa66d..b0de49f 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 25 +Release: 26 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -34,6 +34,7 @@ Patch0021: patch021-backport-some-enhancements.patch Patch0022: patch022-backport-Support-KV-Storage-for-ConsumeQueue.patch Patch0023: patch023-backport-some-bugfixes.patch Patch0024: patch024-backport-some-format.patch +Patch0025: patch025-backport-Fix-channel-connect-issue.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -74,6 +75,9 @@ exit 0 %changelog +* Fri Dec 8 2023 ShiZhili - 5.1.3-26 +- backport Fix channel connect issue + * Fri Dec 8 2023 ShiZhili - 5.1.3-25 - backport some format and doc -- Gitee