diff --git a/patch011-backport-optimize-config.patch b/patch011-backport-optimize-config.patch new file mode 100644 index 0000000000000000000000000000000000000000..fc8e2eceb992e95246d55319e080bd08fb22aae5 --- /dev/null +++ b/patch011-backport-optimize-config.patch @@ -0,0 +1,1390 @@ +From 50d1050437ed8748f86ee50261b50a1e1f63162e Mon Sep 17 00:00:00 2001 +From: Jixiang Jin +Date: Wed, 16 Aug 2023 21:15:00 +0800 +Subject: [PATCH 1/7] To config the cardinalityLimit for openTelemetry metrics + exporting and fix logging config for metrics (#7196) + +--- + WORKSPACE | 14 +++--- + .../broker/metrics/BrokerMetricsManager.java | 47 ++++++++++++++----- + .../broker/metrics/PopMetricsManager.java | 11 +++-- + .../src/main/resources/rmq.broker.logback.xml | 17 ++++--- + .../apache/rocketmq/common/BrokerConfig.java | 9 ++++ + .../metrics/ControllerMetricsManager.java | 6 +-- + pom.xml | 4 +- + .../metrics/RemotingMetricsManager.java | 10 ++-- + .../rocketmq/store/DefaultMessageStore.java | 24 +++++----- + .../apache/rocketmq/store/MessageStore.java | 6 +-- + .../metrics/DefaultStoreMetricsManager.java | 4 +- + .../plugin/AbstractPluginMessageStore.java | 6 +-- + .../tieredstore/TieredMessageStore.java | 6 +-- + .../metrics/TieredStoreMetricsManager.java | 23 +++++---- + 14 files changed, 110 insertions(+), 77 deletions(-) + +diff --git a/WORKSPACE b/WORKSPACE +index a8a0aafe9..3126f2d1d 100644 +--- a/WORKSPACE ++++ b/WORKSPACE +@@ -88,14 +88,14 @@ maven_install( + "io.grpc:grpc-api:1.47.0", + "io.grpc:grpc-testing:1.47.0", + "org.springframework:spring-core:5.3.26", +- "io.opentelemetry:opentelemetry-exporter-otlp:1.19.0", +- "io.opentelemetry:opentelemetry-exporter-prometheus:1.19.0-alpha", +- "io.opentelemetry:opentelemetry-exporter-logging:1.19.0", +- "io.opentelemetry:opentelemetry-sdk:1.19.0", ++ "io.opentelemetry:opentelemetry-exporter-otlp:1.29.0", ++ "io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha", ++ "io.opentelemetry:opentelemetry-exporter-logging:1.29.0", ++ "io.opentelemetry:opentelemetry-sdk:1.29.0", + "com.squareup.okio:okio-jvm:3.0.0", +- "io.opentelemetry:opentelemetry-api:1.19.0", +- "io.opentelemetry:opentelemetry-sdk-metrics:1.19.0", +- "io.opentelemetry:opentelemetry-sdk-common:1.19.0", ++ "io.opentelemetry:opentelemetry-api:1.29.0", ++ "io.opentelemetry:opentelemetry-sdk-metrics:1.29.0", ++ "io.opentelemetry:opentelemetry-sdk-common:1.29.0", + "io.github.aliyunmq:rocketmq-slf4j-api:1.0.0", + "io.github.aliyunmq:rocketmq-logback-classic:1.0.0", + "org.slf4j:jul-to-slf4j:2.0.6", +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +index f0b76107e..6af5afc14 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +@@ -34,8 +34,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType; + import io.opentelemetry.sdk.metrics.SdkMeterProvider; + import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; + import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import io.opentelemetry.sdk.metrics.data.AggregationTemporality; + import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; ++import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; + import io.opentelemetry.sdk.resources.Resource; + import java.util.ArrayList; + import java.util.Arrays; +@@ -361,22 +363,45 @@ public class BrokerMetricsManager { + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_MESSAGE_SIZE) + .build(); +- View messageSizeView = View.builder() +- .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)) +- .build(); +- providerBuilder.registerView(messageSizeSelector, messageSizeView); +- +- for (Pair selectorViewPair : RemotingMetricsManager.getMetricsView()) { +- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); ++ ViewBuilder messageSizeViewBuilder = View.builder() ++ .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)); ++ // To config the cardinalityLimit for openTelemetry metrics exporting. ++ SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); ++ providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build()); ++ ++ for (Pair selectorViewPair : RemotingMetricsManager.getMetricsView()) { ++ ViewBuilder viewBuilder = selectorViewPair.getObject2(); ++ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); ++ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build()); + } + +- for (Pair selectorViewPair : messageStore.getMetricsView()) { +- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); ++ for (Pair selectorViewPair : messageStore.getMetricsView()) { ++ ViewBuilder viewBuilder = selectorViewPair.getObject2(); ++ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); ++ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build()); + } + +- for (Pair selectorViewPair : PopMetricsManager.getMetricsView()) { +- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); ++ for (Pair selectorViewPair : PopMetricsManager.getMetricsView()) { ++ ViewBuilder viewBuilder = selectorViewPair.getObject2(); ++ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); ++ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build()); + } ++ ++ // default view builder for all counter. ++ InstrumentSelector defaultCounterSelector = InstrumentSelector.builder() ++ .setType(InstrumentType.COUNTER) ++ .build(); ++ ViewBuilder defaultCounterViewBuilder = View.builder().setDescription("default view for counter."); ++ SdkMeterProviderUtil.setCardinalityLimit(defaultCounterViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); ++ providerBuilder.registerView(defaultCounterSelector, defaultCounterViewBuilder.build()); ++ ++ //default view builder for all observable gauge. ++ InstrumentSelector defaultGaugeSelector = InstrumentSelector.builder() ++ .setType(InstrumentType.OBSERVABLE_GAUGE) ++ .build(); ++ ViewBuilder defaultGaugeViewBuilder = View.builder().setDescription("default view for gauge."); ++ SdkMeterProviderUtil.setCardinalityLimit(defaultGaugeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit()); ++ providerBuilder.registerView(defaultGaugeSelector, defaultGaugeViewBuilder.build()); + } + + private void initStatsMetrics() { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java +index 463371d7e..2de220da1 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java +@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation; + import io.opentelemetry.sdk.metrics.InstrumentSelector; + import io.opentelemetry.sdk.metrics.InstrumentType; + import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.time.Duration; + import java.util.Arrays; + import java.util.List; +@@ -63,7 +64,7 @@ public class PopMetricsManager { + private static LongCounter popReviveGetTotal = new NopLongCounter(); + private static LongCounter popReviveRetryMessageTotal = new NopLongCounter(); + +- public static List> getMetricsView() { ++ public static List> getMetricsView() { + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(10).toMillis(), +@@ -76,10 +77,10 @@ public class PopMetricsManager { + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME) + .build(); +- View popBufferScanTimeConsumeView = View.builder() +- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) +- .build(); +- return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView)); ++ ViewBuilder popBufferScanTimeConsumeViewBuilder = View.builder() ++ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)); ++ ++ return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeViewBuilder)); + } + + public static void initMetrics(Meter meter, BrokerController brokerController, +diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml +index 7d49f6664..3c51e59d4 100644 +--- a/broker/src/main/resources/rmq.broker.logback.xml ++++ b/broker/src/main/resources/rmq.broker.logback.xml +@@ -559,27 +559,27 @@ + + + +- ++ + + brokerContainerLogDir + ${file.separator} + + +- + +- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metric.log ++ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metrics.log + + true + + +- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metric.%i.log.gz ++ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metrics.%i.log.gz + + 1 +- 10 ++ 3 + + +- 500MB ++ 512MB + + + %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n +@@ -588,6 +588,9 @@ + + + ++ ++ ++ + + + +@@ -670,7 +673,7 @@ + + + +- ++ + + + +diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +index 99a5db5ad..45d26b29c 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +@@ -350,6 +350,7 @@ public class BrokerConfig extends BrokerIdentity { + + private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE; + ++ private int metricsOtelCardinalityLimit = 50 * 1000; + private String metricsGrpcExporterTarget = ""; + private String metricsGrpcExporterHeader = ""; + private long metricGrpcExporterTimeOutInMills = 3 * 1000; +@@ -1531,6 +1532,14 @@ public class BrokerConfig extends BrokerIdentity { + this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType); + } + ++ public int getMetricsOtelCardinalityLimit() { ++ return metricsOtelCardinalityLimit; ++ } ++ ++ public void setMetricsOtelCardinalityLimit(int metricsOtelCardinalityLimit) { ++ this.metricsOtelCardinalityLimit = metricsOtelCardinalityLimit; ++ } ++ + public String getMetricsGrpcExporterTarget() { + return metricsGrpcExporterTarget; + } +diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java +index 9b30a3b43..650740bcc 100644 +--- a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java ++++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java +@@ -203,7 +203,7 @@ public class ControllerMetricsManager { + 10 * s + ); + +- View latecyView = View.builder() ++ View latencyView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets)) + .build(); + +@@ -217,8 +217,8 @@ public class ControllerMetricsManager { + .setName(HISTOGRAM_DLEDGER_OP_LATENCY) + .build(); + +- providerBuilder.registerView(requestLatencySelector, latecyView); +- providerBuilder.registerView(dLedgerOpLatencySelector, latecyView); ++ providerBuilder.registerView(requestLatencySelector, latencyView); ++ providerBuilder.registerView(dLedgerOpLatencySelector, latencyView); + } + + private void initMetric(Meter meter) { +diff --git a/pom.xml b/pom.xml +index 3a08d75f2..9f0b3eb96 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -133,8 +133,8 @@ + 2.9.3 + 5.3.27 + 3.0.0 +- 1.26.0 +- 1.26.0-alpha ++ 1.29.0 ++ 1.29.0-alpha + 2.0.6 + 2.20.29 + 1.0.3 +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java +index 34136f94f..2e0d70856 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java +@@ -26,6 +26,7 @@ import io.opentelemetry.sdk.metrics.Aggregation; + import io.opentelemetry.sdk.metrics.InstrumentSelector; + import io.opentelemetry.sdk.metrics.InstrumentType; + import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.time.Duration; + import java.util.Arrays; + import java.util.List; +@@ -61,7 +62,7 @@ public class RemotingMetricsManager { + .build(); + } + +- public static List> getMetricsView() { ++ public static List> getMetricsView() { + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(3).toMillis(), +@@ -77,10 +78,9 @@ public class RemotingMetricsManager { + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_RPC_LATENCY) + .build(); +- View view = View.builder() +- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) +- .build(); +- return Lists.newArrayList(new Pair<>(selector, view)); ++ ViewBuilder viewBuilder = View.builder() ++ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)); ++ return Lists.newArrayList(new Pair<>(selector, viewBuilder)); + } + + public static String getWriteAndFlushResult(Future future) { +diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +index 25e4a166f..6115ead59 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +@@ -22,7 +22,7 @@ import io.openmessaging.storage.dledger.entry.DLedgerEntry; + import io.opentelemetry.api.common.AttributesBuilder; + import io.opentelemetry.api.metrics.Meter; + import io.opentelemetry.sdk.metrics.InstrumentSelector; +-import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.io.File; + import java.io.IOException; + import java.io.RandomAccessFile; +@@ -42,23 +42,24 @@ import java.util.Map; + import java.util.Objects; + import java.util.Optional; + import java.util.Set; +-import java.util.concurrent.LinkedBlockingQueue; +-import java.util.concurrent.TimeUnit; +-import java.util.concurrent.TimeoutException; +-import java.util.concurrent.ExecutionException; +-import java.util.concurrent.Executors; +-import java.util.concurrent.ExecutorService; +-import java.util.concurrent.ThreadPoolExecutor; +-import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.CompletableFuture; +-import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentLinkedQueue; ++import java.util.concurrent.ConcurrentMap; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.LinkedBlockingQueue; ++import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ThreadPoolExecutor; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.TimeoutException; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; + import java.util.function.Supplier; + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.AbstractBrokerRunnable; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.BrokerConfig; + import org.apache.rocketmq.common.BrokerIdentity; + import org.apache.rocketmq.common.MixAll; +@@ -82,7 +83,6 @@ import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.CleanupPolicyUtils; + import org.apache.rocketmq.common.utils.QueueTypeUtils; + import org.apache.rocketmq.common.utils.ServiceProvider; +-import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +@@ -3268,7 +3268,7 @@ public class DefaultMessageStore implements MessageStore { + } + + @Override +- public List> getMetricsView() { ++ public List> getMetricsView() { + return DefaultStoreMetricsManager.getMetricsView(); + } + +diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +index 31bbb907f..989cbbe31 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +@@ -19,8 +19,7 @@ package org.apache.rocketmq.store; + import io.opentelemetry.api.common.AttributesBuilder; + import io.opentelemetry.api.metrics.Meter; + import io.opentelemetry.sdk.metrics.InstrumentSelector; +-import io.opentelemetry.sdk.metrics.View; +- ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.nio.ByteBuffer; + import java.util.HashMap; + import java.util.LinkedList; +@@ -28,7 +27,6 @@ import java.util.List; + import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.function.Supplier; +- + import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.SystemClock; +@@ -964,7 +962,7 @@ public interface MessageStore { + * + * @return List of metrics selector and view pair + */ +- List> getMetricsView(); ++ List> getMetricsView(); + + /** + * Init store metrics +diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java +index ff87f6369..45a6bbc68 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java ++++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java +@@ -23,7 +23,7 @@ import io.opentelemetry.api.metrics.LongCounter; + import io.opentelemetry.api.metrics.Meter; + import io.opentelemetry.api.metrics.ObservableLongGauge; + import io.opentelemetry.sdk.metrics.InstrumentSelector; +-import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.io.File; + import java.util.List; + import java.util.function.Supplier; +@@ -69,7 +69,7 @@ public class DefaultStoreMetricsManager { + public static LongCounter timerDequeueTotal = new NopLongCounter(); + public static LongCounter timerEnqueueTotal = new NopLongCounter(); + +- public static List> getMetricsView() { ++ public static List> getMetricsView() { + return Lists.newArrayList(); + } + +diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +index 25e947512..ab9fc6da7 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +@@ -20,8 +20,7 @@ package org.apache.rocketmq.store.plugin; + import io.opentelemetry.api.common.AttributesBuilder; + import io.opentelemetry.api.metrics.Meter; + import io.opentelemetry.sdk.metrics.InstrumentSelector; +-import io.opentelemetry.sdk.metrics.View; +- ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.nio.ByteBuffer; + import java.util.HashMap; + import java.util.LinkedList; +@@ -29,7 +28,6 @@ import java.util.List; + import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.function.Supplier; +- + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.SystemClock; + import org.apache.rocketmq.common.message.MessageExt; +@@ -643,7 +641,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore { + } + + @Override +- public List> getMetricsView() { ++ public List> getMetricsView() { + return next.getMetricsView(); + } + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index ced1fb818..5240ac8e9 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -21,7 +21,7 @@ import io.opentelemetry.api.common.Attributes; + import io.opentelemetry.api.common.AttributesBuilder; + import io.opentelemetry.api.metrics.Meter; + import io.opentelemetry.sdk.metrics.InstrumentSelector; +-import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.util.List; + import java.util.Set; + import java.util.concurrent.CompletableFuture; +@@ -352,8 +352,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + } + + @Override +- public List> getMetricsView() { +- List> res = super.getMetricsView(); ++ public List> getMetricsView() { ++ List> res = super.getMetricsView(); + res.addAll(TieredStoreMetricsManager.getMetricsView()); + return res; + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +index 3ca0fb614..d8a07f0a7 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation; + import io.opentelemetry.sdk.metrics.InstrumentSelector; + import io.opentelemetry.sdk.metrics.InstrumentType; + import io.opentelemetry.sdk.metrics.View; ++import io.opentelemetry.sdk.metrics.ViewBuilder; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.HashMap; +@@ -101,8 +102,8 @@ public class TieredStoreMetricsManager { + public static ObservableLongGauge storageSize = new NopObservableLongGauge(); + public static ObservableLongGauge storageMessageReserveTime = new NopObservableLongGauge(); + +- public static List> getMetricsView() { +- ArrayList> res = new ArrayList<>(); ++ public static List> getMetricsView() { ++ ArrayList> res = new ArrayList<>(); + + InstrumentSelector providerRpcLatencySelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) +@@ -114,10 +115,9 @@ public class TieredStoreMetricsManager { + .setName(HISTOGRAM_API_LATENCY) + .build(); + +- View rpcLatencyView = View.builder() ++ ViewBuilder rpcLatencyViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 3d, 5d, 7d, 10d, 100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000))) +- .setDescription("tiered_store_rpc_latency_view") +- .build(); ++ .setDescription("tiered_store_rpc_latency_view"); + + InstrumentSelector uploadBufferSizeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) +@@ -129,15 +129,14 @@ public class TieredStoreMetricsManager { + .setName(HISTOGRAM_DOWNLOAD_BYTES) + .build(); + +- View bufferSizeView = View.builder() ++ ViewBuilder bufferSizeViewBuilder = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d * TieredStoreUtil.KB, 10d * TieredStoreUtil.KB, 100d * TieredStoreUtil.KB, 1d * TieredStoreUtil.MB, 10d * TieredStoreUtil.MB, 32d * TieredStoreUtil.MB, 50d * TieredStoreUtil.MB, 100d * TieredStoreUtil.MB))) +- .setDescription("tiered_store_buffer_size_view") +- .build(); ++ .setDescription("tiered_store_buffer_size_view"); + +- res.add(new Pair<>(rpcLatencySelector, rpcLatencyView)); +- res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyView)); +- res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeView)); +- res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeView)); ++ res.add(new Pair<>(rpcLatencySelector, rpcLatencyViewBuilder)); ++ res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyViewBuilder)); ++ res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeViewBuilder)); ++ res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeViewBuilder)); + return res; + } + +-- +2.32.0.windows.2 + + +From a4bcc2a74d8bec9c9d34565536e87df06e0b11c1 Mon Sep 17 00:00:00 2001 +From: Ziyi Tan +Date: Thu, 17 Aug 2023 13:53:48 +0800 +Subject: [PATCH 2/7] [ISSUE #7178] refresh metadata after broker startup + +Signed-off-by: Ziy1-Tan +--- + .../rocketmq/broker/BrokerController.java | 24 +++++++++---------- + 1 file changed, 12 insertions(+), 12 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 30b1d2299..13f9d002b 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -663,7 +663,7 @@ public class BrokerController { + BrokerController.this.getSlaveSynchronize().syncAll(); + lastSyncTimeMs = System.currentTimeMillis(); + } +- ++ + //timer checkpoint, latency-sensitive, so sync it more frequently + if (messageStoreConfig.isTimerWheelEnable()) { + BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint(); +@@ -698,17 +698,6 @@ public class BrokerController { + + initializeBrokerScheduledTasks(); + +- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { +- @Override +- public void run() { +- try { +- BrokerController.this.brokerOuterAPI.refreshMetadata(); +- } catch (Exception e) { +- LOG.error("ScheduledTask refresh metadata exception", e); +- } +- } +- }, 10, 5, TimeUnit.SECONDS); +- + if (this.brokerConfig.getNamesrvAddr() != null) { + this.updateNamesrvAddr(); + LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); +@@ -1682,6 +1671,17 @@ public class BrokerController { + if (brokerConfig.isSkipPreOnline()) { + startServiceWithoutCondition(); + } ++ ++ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ++ @Override ++ public void run() { ++ try { ++ BrokerController.this.brokerOuterAPI.refreshMetadata(); ++ } catch (Exception e) { ++ LOG.error("ScheduledTask refresh metadata exception", e); ++ } ++ } ++ }, 10, 5, TimeUnit.SECONDS); + } + + protected void scheduleSendHeartbeat() { +-- +2.32.0.windows.2 + + +From 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Thu, 17 Aug 2023 13:59:04 +0800 +Subject: [PATCH 3/7] [ISSUE #7201] Remove the DefaultMessageStore.class + dependency in TransientStorePool + +Co-authored-by: guyinyou +--- + .../rocketmq/store/AllocateMappedFileService.java | 6 +++--- + .../apache/rocketmq/store/DefaultMessageStore.java | 7 +++++-- + .../apache/rocketmq/store/TransientStorePool.java | 13 ++++--------- + 3 files changed, 12 insertions(+), 14 deletions(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +index dca7d5325..c8420fea1 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java ++++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +@@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread { + if (this.messageStore.isTransientStorePoolEnable()) { + if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() + && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool +- canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size(); ++ canSubmitRequests = this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size(); + } + } + +@@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread { + if (nextPutOK) { + if (canSubmitRequests <= 0) { + log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + +- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); ++ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); + this.requestTable.remove(nextFilePath); + return null; + } +@@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread { + if (nextNextPutOK) { + if (canSubmitRequests <= 0) { + log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + +- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); ++ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); + this.requestTable.remove(nextNextFilePath); + } else { + boolean offerOK = this.requestQueue.offer(nextNextReq); +diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +index 6115ead59..f2a54ddf6 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +@@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore { + this.reputMessageService = new ConcurrentReputMessageService(); + } + +- this.transientStorePool = new TransientStorePool(this); ++ this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog()); + + this.scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); +@@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore { + } + + public int remainTransientStoreBufferNumbs() { +- return this.transientStorePool.availableBufferNums(); ++ if (this.isTransientStorePoolEnable()) { ++ return this.transientStorePool.availableBufferNums(); ++ } ++ return Integer.MAX_VALUE; + } + + @Override +diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +index 8c1a5338b..0d42ee69e 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java ++++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +@@ -33,13 +33,11 @@ public class TransientStorePool { + private final int poolSize; + private final int fileSize; + private final Deque availableBuffers; +- private final DefaultMessageStore messageStore; + private volatile boolean isRealCommit = true; + +- public TransientStorePool(final DefaultMessageStore messageStore) { +- this.messageStore = messageStore; +- this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize(); +- this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); ++ public TransientStorePool(final int poolSize, final int fileSize) { ++ this.poolSize = poolSize; ++ this.fileSize = fileSize; + this.availableBuffers = new ConcurrentLinkedDeque<>(); + } + +@@ -81,10 +79,7 @@ public class TransientStorePool { + } + + public int availableBufferNums() { +- if (messageStore.isTransientStorePoolEnable()) { +- return availableBuffers.size(); +- } +- return Integer.MAX_VALUE; ++ return availableBuffers.size(); + } + + public boolean isRealCommit() { +-- +2.32.0.windows.2 + + +From 2b93e1e32fd458d9df2091e89ea259ddd4d54061 Mon Sep 17 00:00:00 2001 +From: iamgd67 +Date: Thu, 17 Aug 2023 15:31:14 +0800 +Subject: [PATCH 4/7] Update mqbroker to use runbroker.sh instead of + runserver.sh when use --enable-proxy (#7150) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +Update mqbroker to use runbroker.sh instead of runserver.sh when enabling `--enable-proxy` +this allow JVM `heap` and `gc` configuration using broker's settings instead of other common serverices'(proxy,namenode, etc). +our main purpose, like the filename `mqbroker` suggest, is to start broker (which embeds a proxy), so use broker's config is reasonable + +chinese version +mqbroker的--enable-proxy选项是启动内嵌了proxy的broker,而不是内嵌broker的proxy,而且broker的工作量和重要程度大于proxy,所以使用broker的gc和heap配置更合适 +--- + distribution/bin/mqbroker | 4 ++-- + 1 file changed, 2 insertions(+), 2 deletions(-) + +diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker +index 3758ed597..35eb93c44 100644 +--- a/distribution/bin/mqbroker ++++ b/distribution/bin/mqbroker +@@ -68,11 +68,11 @@ if [ "$enable_proxy" = true ]; then + if [ "$broker_config" != "" ]; then + args_for_proxy=${args_for_proxy}" -bc "${broker_config} + fi +- sh ${ROCKETMQ_HOME}/bin/runserver.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy} ++ sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy} + else + args_for_broker=$other_args + if [ "$broker_config" != "" ]; then + args_for_broker=${args_for_broker}" -c "${broker_config} + fi + sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup ${args_for_broker} +-fi +\ No newline at end of file ++fi +-- +2.32.0.windows.2 + + +From 05e7cde610255ed9410fffb0f153efe7c2c8a326 Mon Sep 17 00:00:00 2001 +From: yao-wenbin +Date: Fri, 18 Aug 2023 09:49:59 +0800 +Subject: [PATCH 5/7] [ISSUE #7042] maven-compile job failed, Because TlsTest's + serverRejectsSSLClient test case will throw TooLongFrameException (#7179) + +--- + .../remoting/netty/NettyRemotingServer.java | 2 +- + .../java/org/apache/rocketmq/remoting/TlsTest.java | 14 ++++++++++++-- + 2 files changed, 13 insertions(+), 3 deletions(-) + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index 90e358ce3..17f138f86 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -502,7 +502,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + case DISABLED: + ctx.close(); + log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode"); +- break; ++ throw new UnsupportedOperationException("The NettyRemotingServer in SSL disabled mode doesn't support ssl client"); + case PERMISSIVE: + case ENFORCING: + if (null != sslContext) { +diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java +index de7edbbfb..a4890d73d 100644 +--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java ++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java +@@ -144,8 +144,13 @@ public class TlsTest { + tlsClientKeyPath = ""; + tlsClientCertPath = ""; + clientConfig.setUseTLS(false); +- } else if ("serverRejectsSSLClient".equals(name.getMethodName())) { ++ } else if ("disabledServerRejectsSSLClient".equals(name.getMethodName())) { + tlsMode = TlsMode.DISABLED; ++ } else if ("disabledServerAcceptUnAuthClient".equals(name.getMethodName())) { ++ tlsMode = TlsMode.DISABLED; ++ tlsClientKeyPath = ""; ++ tlsClientCertPath = ""; ++ clientConfig.setUseTLS(false); + } else if ("reloadSslContextForServer".equals(name.getMethodName())) { + tlsClientAuthServer = false; + tlsServerNeedClientAuth = "none"; +@@ -211,7 +216,7 @@ public class TlsTest { + } + + @Test +- public void serverRejectsSSLClient() throws Exception { ++ public void disabledServerRejectsSSLClient() throws Exception { + try { + RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 1000 * 5); + failBecauseExceptionWasNotThrown(RemotingSendRequestException.class); +@@ -219,6 +224,11 @@ public class TlsTest { + } + } + ++ @Test ++ public void disabledServerAcceptUnAuthClient() throws Exception { ++ requestThenAssertResponse(); ++ } ++ + /** + * Tests that a server configured to require client authentication refuses to accept connections + * from a client that has an untrusted certificate. +-- +2.32.0.windows.2 + + +From 72d796f2b20b3ec6aebca8c004d9275d7c749a95 Mon Sep 17 00:00:00 2001 +From: lk +Date: Fri, 18 Aug 2023 11:55:39 +0800 +Subject: [PATCH 6/7] [ISSUE #7205] support batch ack for pop orderly (#7206) + +--- + .../broker/processor/AckMessageProcessor.java | 99 ++++++----- + .../rocketmq/client/impl/MQClientAPIImpl.java | 91 ++++++++-- + .../test/client/rmq/RMQPopClient.java | 22 +++ + .../client/consumer/pop/BasePopNormally.java | 6 + + .../test/client/consumer/pop/BatchAckIT.java | 159 ++++++++++++++++++ + 5 files changed, 322 insertions(+), 55 deletions(-) + create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +index 687811409..244b459d6 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor; + import com.alibaba.fastjson.JSON; + import io.netty.channel.Channel; + import io.netty.channel.ChannelHandlerContext; ++import java.util.BitSet; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.broker.metrics.PopMetricsManager; + import org.apache.rocketmq.common.KeyBuilder; +@@ -186,46 +187,7 @@ public class AckMessageProcessor implements NettyRequestProcessor { + invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); + + if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { +- // order +- String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; +- long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); +- if (ackOffset < oldOffset) { +- return; +- } +- while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { +- } +- try { +- oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); +- if (ackOffset < oldOffset) { +- return; +- } +- long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( +- topic, consumeGroup, +- qId, ackOffset, +- popTime); +- if (nextOffset > -1) { +- if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( +- topic, consumeGroup, qId)) { +- this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), +- consumeGroup, topic, qId, nextOffset); +- } +- if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, +- consumeGroup, qId, invisibleTime)) { +- this.brokerController.getPopMessageProcessor().notifyMessageArriving( +- topic, consumeGroup, qId); +- } +- } else if (nextOffset == -1) { +- String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", +- lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); +- POP_LOGGER.warn(errorInfo); +- response.setCode(ResponseCode.MESSAGE_ILLEGAL); +- response.setRemark(errorInfo); +- return; +- } +- } finally { +- this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); +- } +- brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); ++ ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response); + return; + } + +@@ -250,17 +212,22 @@ public class AckMessageProcessor implements NettyRequestProcessor { + } + + BatchAckMsg batchAckMsg = new BatchAckMsg(); +- for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) { +- if (!batchAck.getBitSet().get(i)) { +- continue; ++ BitSet bitSet = batchAck.getBitSet(); ++ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { ++ if (i == Integer.MAX_VALUE) { ++ break; + } + long offset = startOffset + i; + if (offset < minOffset || offset > maxOffset) { + continue; + } +- batchAckMsg.getAckOffsetList().add(offset); ++ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { ++ ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response); ++ } else { ++ batchAckMsg.getAckOffsetList().add(offset); ++ } + } +- if (batchAckMsg.getAckOffsetList().isEmpty()) { ++ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) { + return; + } + +@@ -311,4 +278,46 @@ public class AckMessageProcessor implements NettyRequestProcessor { + PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); + brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); + } ++ ++ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) { ++ String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId; ++ long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); ++ if (ackOffset < oldOffset) { ++ return; ++ } ++ while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) { ++ } ++ try { ++ oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId); ++ if (ackOffset < oldOffset) { ++ return; ++ } ++ long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( ++ topic, consumeGroup, ++ qId, ackOffset, ++ popTime); ++ if (nextOffset > -1) { ++ if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( ++ topic, consumeGroup, qId)) { ++ this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), ++ consumeGroup, topic, qId, nextOffset); ++ } ++ if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, ++ consumeGroup, qId, invisibleTime)) { ++ this.brokerController.getPopMessageProcessor().notifyMessageArriving( ++ topic, consumeGroup, qId); ++ } ++ } else if (nextOffset == -1) { ++ String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", ++ lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); ++ POP_LOGGER.warn(errorInfo); ++ response.setCode(ResponseCode.MESSAGE_ILLEGAL); ++ response.setRemark(errorInfo); ++ return; ++ } ++ } finally { ++ this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey); ++ } ++ brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1); ++ } + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +index 5101ffc8e..213c26fd6 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Arrays; ++import java.util.BitSet; + import java.util.Collections; + import java.util.HashMap; + import java.util.Iterator; +@@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback; + import org.apache.rocketmq.client.producer.SendResult; + import org.apache.rocketmq.client.producer.SendStatus; + import org.apache.rocketmq.common.AclConfig; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.MQVersion; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.Pair; +@@ -76,7 +78,8 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback; + import org.apache.rocketmq.common.namesrv.TopAddressing; + import org.apache.rocketmq.common.sysflag.PullSysFlag; + import org.apache.rocketmq.common.topic.TopicValidator; +-import org.apache.rocketmq.common.BoundaryType; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.CommandCustomHeader; + import org.apache.rocketmq.remoting.InvokeCallback; + import org.apache.rocketmq.remoting.RPCHook; +@@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; + import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; + import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; ++import org.apache.rocketmq.remoting.protocol.body.BatchAck; ++import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; + import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; ++import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; + import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; + import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody; + import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; +@@ -114,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; + import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; + import org.apache.rocketmq.remoting.protocol.body.GroupList; + import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +-import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; + import org.apache.rocketmq.remoting.protocol.body.KVTable; + import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; + import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody; +@@ -196,6 +201,10 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig + import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; ++import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; ++import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; ++import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; ++import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; +@@ -207,10 +216,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH + import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; +-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader; +-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; +-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +-import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; + import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; + import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; + import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +@@ -221,8 +226,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden; + import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook; + import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook; +-import org.apache.rocketmq.logging.org.slf4j.Logger; +-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + + import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; + +@@ -885,9 +888,77 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + final String addr, + final long timeOut, + final AckCallback ackCallback, +- final AckMessageRequestHeader requestHeader // ++ final AckMessageRequestHeader requestHeader ++ ) throws RemotingException, MQBrokerException, InterruptedException { ++ ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null); ++ } ++ ++ public void batchAckMessageAsync( ++ final String addr, ++ final long timeOut, ++ final AckCallback ackCallback, ++ final String topic, ++ final String consumerGroup, ++ final List extraInfoList ++ ) throws RemotingException, MQBrokerException, InterruptedException { ++ String brokerName = null; ++ Map batchAckMap = new HashMap<>(); ++ for (String extraInfo : extraInfoList) { ++ String[] extraInfoData = ExtraInfoUtil.split(extraInfo); ++ if (brokerName == null) { ++ brokerName = ExtraInfoUtil.getBrokerName(extraInfoData); ++ } ++ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" + ++ ExtraInfoUtil.getQueueId(extraInfoData) + "@" + ++ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" + ++ ExtraInfoUtil.getPopTime(extraInfoData); ++ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> { ++ BatchAck newBatchAck = new BatchAck(); ++ newBatchAck.setConsumerGroup(consumerGroup); ++ newBatchAck.setTopic(topic); ++ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData)); ++ newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData)); ++ newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData)); ++ newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData)); ++ newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData)); ++ newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData)); ++ newBatchAck.setBitSet(new BitSet()); ++ return newBatchAck; ++ }); ++ bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData))); ++ } ++ ++ BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody(); ++ requestBody.setBrokerName(brokerName); ++ requestBody.setAcks(new ArrayList<>(batchAckMap.values())); ++ batchAckMessageAsync(addr, timeOut, ackCallback, requestBody); ++ } ++ ++ public void batchAckMessageAsync( ++ final String addr, ++ final long timeOut, ++ final AckCallback ackCallback, ++ final BatchAckMessageRequestBody requestBody + ) throws RemotingException, MQBrokerException, InterruptedException { +- final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); ++ ackMessageAsync(addr, timeOut, ackCallback, null, requestBody); ++ } ++ ++ protected void ackMessageAsync( ++ final String addr, ++ final long timeOut, ++ final AckCallback ackCallback, ++ final AckMessageRequestHeader requestHeader, ++ final BatchAckMessageRequestBody requestBody ++ ) throws RemotingException, MQBrokerException, InterruptedException { ++ RemotingCommand request; ++ if (requestHeader != null) { ++ request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); ++ } else { ++ request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); ++ if (requestBody != null) { ++ request.setBody(requestBody.encode()); ++ } ++ } + this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) { + + @Override +diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +index 496bd6da4..09c60c0b4 100644 +--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java ++++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +@@ -17,6 +17,7 @@ + + package org.apache.rocketmq.test.client.rmq; + ++import java.util.List; + import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.client.ClientConfig; + import org.apache.rocketmq.client.consumer.AckCallback; +@@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer { + return future; + } + ++ public CompletableFuture batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup, ++ List extraInfoList) { ++ CompletableFuture future = new CompletableFuture<>(); ++ try { ++ this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() { ++ @Override ++ public void onSuccess(AckResult ackResult) { ++ future.complete(ackResult); ++ } ++ ++ @Override ++ public void onException(Throwable e) { ++ future.completeExceptionally(e); ++ } ++ }, topic, consumerGroup, extraInfoList); ++ } catch (Throwable t) { ++ future.completeExceptionally(t); ++ } ++ return future; ++ } ++ + public CompletableFuture changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic, + String consumerGroup, String extraInfo, long invisibleTime) { + String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo); +diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java +index 952fbe3f5..2e29b95a5 100644 +--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java ++++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java +@@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop { + brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true, + ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); + } ++ ++ protected CompletableFuture popMessageAsync(long invisibleTime, int maxNums) { ++ return client.popMessageAsync( ++ brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false, ++ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); ++ } + } +diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java +new file mode 100644 +index 000000000..ec9153ccc +--- /dev/null ++++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java +@@ -0,0 +1,159 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.test.client.consumer.pop; ++ ++import java.time.Duration; ++import java.util.ArrayList; ++import java.util.List; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicInteger; ++import java.util.function.Supplier; ++import org.apache.rocketmq.client.consumer.AckResult; ++import org.apache.rocketmq.client.consumer.AckStatus; ++import org.apache.rocketmq.client.consumer.PopResult; ++import org.apache.rocketmq.client.consumer.PopStatus; ++import org.apache.rocketmq.common.attribute.CQType; ++import org.apache.rocketmq.common.attribute.TopicMessageType; ++import org.apache.rocketmq.common.constant.ConsumeInitMode; ++import org.apache.rocketmq.common.filter.ExpressionType; ++import org.apache.rocketmq.common.message.MessageConst; ++import org.apache.rocketmq.common.message.MessageExt; ++import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.test.base.IntegrationTestBase; ++import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; ++import org.apache.rocketmq.test.client.rmq.RMQPopClient; ++import org.apache.rocketmq.test.util.MQRandomUtils; ++import org.junit.After; ++import org.junit.Before; ++import org.junit.Test; ++ ++import static org.awaitility.Awaitility.await; ++import static org.junit.Assert.assertEquals; ++ ++public class BatchAckIT extends BasePop { ++ ++ protected String topic; ++ protected String group; ++ protected RMQNormalProducer producer = null; ++ protected RMQPopClient client = null; ++ protected String brokerAddr; ++ protected MessageQueue messageQueue; ++ ++ @Before ++ public void setUp() { ++ brokerAddr = brokerController1.getBrokerAddr(); ++ topic = MQRandomUtils.getRandomTopic(); ++ group = initConsumerGroup(); ++ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL); ++ producer = getProducer(NAMESRV_ADDR, topic); ++ client = getRMQPopClient(); ++ messageQueue = new MessageQueue(topic, BROKER1_NAME, -1); ++ } ++ ++ @After ++ public void tearDown() { ++ shutdown(); ++ } ++ ++ @Test ++ public void testBatchAckNormallyWithPopBuffer() throws Throwable { ++ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true); ++ brokerController2.getBrokerConfig().setEnablePopBufferMerge(true); ++ ++ testBatchAck(() -> { ++ try { ++ return popMessageAsync().get(); ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ }); ++ } ++ ++ @Test ++ public void testBatchAckNormallyWithOutPopBuffer() throws Throwable { ++ brokerController1.getBrokerConfig().setEnablePopBufferMerge(false); ++ brokerController2.getBrokerConfig().setEnablePopBufferMerge(false); ++ ++ testBatchAck(() -> { ++ try { ++ return popMessageAsync().get(); ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ }); ++ } ++ ++ @Test ++ public void testBatchAckOrderly() throws Throwable { ++ testBatchAck(() -> { ++ try { ++ return popMessageOrderlyAsync().get(); ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ }); ++ } ++ ++ public void testBatchAck(Supplier popResultSupplier) throws Throwable { ++ // Send 10 messages but do not ack, let them enter the retry topic ++ producer.send(10); ++ AtomicInteger firstMsgRcvNum = new AtomicInteger(); ++ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { ++ PopResult popResult = popResultSupplier.get(); ++ if (popResult.getPopStatus().equals(PopStatus.FOUND)) { ++ firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size()); ++ } ++ assertEquals(10, firstMsgRcvNum.get()); ++ }); ++ // sleep 6s, expect messages to enter the retry topic ++ TimeUnit.SECONDS.sleep(6); ++ ++ producer.send(20); ++ List extraInfoList = new ArrayList<>(); ++ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { ++ PopResult popResult = popResultSupplier.get(); ++ if (popResult.getPopStatus().equals(PopStatus.FOUND)) { ++ for (MessageExt messageExt : popResult.getMsgFoundList()) { ++ extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK)); ++ } ++ } ++ assertEquals(30, extraInfoList.size()); ++ }); ++ ++ AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, group, extraInfoList).get(); ++ assertEquals(AckStatus.OK, ackResult.getStatus()); ++ ++ // sleep 6s, expected that messages that have been acked will not be re-consumed ++ TimeUnit.SECONDS.sleep(6); ++ PopResult popResult = popResultSupplier.get(); ++ assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus()); ++ } ++ ++ private CompletableFuture popMessageAsync() { ++ return client.popMessageAsync( ++ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false, ++ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); ++ } ++ ++ private CompletableFuture popMessageOrderlyAsync() { ++ return client.popMessageAsync( ++ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false, ++ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null); ++ } ++} +-- +2.32.0.windows.2 + + +From cc16a1b51216e1e80c22011b8b01e060bb4af8b3 Mon Sep 17 00:00:00 2001 +From: rongtong +Date: Tue, 22 Aug 2023 10:42:25 +0800 +Subject: [PATCH 7/7] Set table reference the same object for + setSubscriptionGroupTable method (#7204) + +--- + .../broker/subscription/SubscriptionGroupManager.java | 5 +---- + 1 file changed, 1 insertion(+), 4 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +index 74e39c0fe..e63b93058 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +@@ -341,10 +341,7 @@ public class SubscriptionGroupManager extends ConfigManager { + + + public void setSubscriptionGroupTable(ConcurrentMap subscriptionGroupTable) { +- this.subscriptionGroupTable.clear(); +- for (String key : subscriptionGroupTable.keySet()) { +- putSubscriptionGroupConfig(subscriptionGroupTable.get(key)); +- } ++ this.subscriptionGroupTable = subscriptionGroupTable; + } + + public boolean containsSubscriptionGroup(String group) { +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 55f859f8a0a27090ba8dd40526703e3a6c775f85..1909b2ff93b69c1f0b383be690a5ef9de9197209 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 11 +Release: 12 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -20,6 +20,7 @@ Patch0007: patch007-backport-fix-some-bugs.patch Patch0008: patch008-backport-Allow-BoundaryType.patch Patch0009: patch009-backport-Support-KV-Storage.patch Patch0010: patch010-backport-add-some-fixes.patch +Patch0011: patch011-backport-optimize-config.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -54,6 +55,9 @@ exit 0 %changelog +* Wed Oct 1 2023 ShiZhili - 5.1.3-12 +- backport optimize config + * Wed Oct 1 2023 ShiZhili - 5.1.3-11 - backport add some fixes