diff --git a/patch036-backport-RIP65.patch b/patch036-backport-RIP65.patch
new file mode 100644
index 0000000000000000000000000000000000000000..d0ed8baa1f6d4bfb4d855357444748fb50cbebea
--- /dev/null
+++ b/patch036-backport-RIP65.patch
@@ -0,0 +1,3173 @@
+From 63130f51e84bda2547c3aa442f14184ccefb9180 Mon Sep 17 00:00:00 2001
+From: lizhimins <707364882@qq.com>
+Date: Tue, 21 Nov 2023 13:57:44 +0800
+Subject: [PATCH] [ISSUE #7545] [RIP-65] Support efficient random index for
+ massive messages (#7546)
+
+Support efficient random index for massive messages
+
+Co-authored-by: bareheadtom <1983697019@qq.com>
+---
+ style/spotbugs-suppressions.xml | 2 +-
+ tieredstore/pom.xml | 14 +
+ .../tieredstore/TieredMessageFetcher.java | 103 ++--
+ .../file/CompositeQueueFlatFile.java | 29 +-
+ .../tieredstore/file/TieredConsumeQueue.java | 2 +-
+ .../tieredstore/file/TieredFlatFile.java | 5 +-
+ .../file/TieredFlatFileManager.java | 40 +-
+ .../tieredstore/file/TieredIndexFile.java | 470 -----------------
+ .../rocketmq/tieredstore/index/IndexFile.java | 35 ++
+ .../rocketmq/tieredstore/index/IndexItem.java | 114 ++++
+ .../tieredstore/index/IndexService.java | 62 +++
+ .../tieredstore/index/IndexStoreFile.java | 499 ++++++++++++++++++
+ .../tieredstore/index/IndexStoreService.java | 362 +++++++++++++
+ .../provider/TieredFileSegment.java | 9 +-
+ .../provider/TieredStoreProvider.java | 10 +-
+ .../provider/posix/PosixFileSegment.java | 1 +
+ .../tieredstore/TieredMessageFetcherTest.java | 17 +-
+ .../tieredstore/file/TieredIndexFileTest.java | 93 ----
+ .../tieredstore/index/IndexItemTest.java | 91 ++++
+ .../tieredstore/index/IndexStoreFileTest.java | 282 ++++++++++
+ .../index/IndexStoreServiceBenchTest.java | 147 ++++++
+ .../index/IndexStoreServiceTest.java | 313 +++++++++++
+ .../util/MessageBufferUtilTest.java | 1 -
+ .../src/test/resources/rmq.logback-test.xml | 15 +-
+ 24 files changed, 2019 insertions(+), 697 deletions(-)
+ delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+ create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
+ create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
+ create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
+ create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+ create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+ delete mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+ create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
+ create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+ create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
+ create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+
+diff --git a/style/spotbugs-suppressions.xml b/style/spotbugs-suppressions.xml
+index 5778695e1..6443e029f 100644
+--- a/style/spotbugs-suppressions.xml
++++ b/style/spotbugs-suppressions.xml
+@@ -31,7 +31,7 @@
+
+
+
+-
++
+
+
+
+diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
+index b2ea40bf3..9f2a8bf22 100644
+--- a/tieredstore/pom.xml
++++ b/tieredstore/pom.xml
+@@ -53,5 +53,19 @@
+ commons-io
+ test
+
++
++
++ org.openjdk.jmh
++ jmh-core
++ 1.36
++ provided
++
++
++
++ org.openjdk.jmh
++ jmh-generator-annprocess
++ 1.36
++ provided
++
+
+
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+index c948fa3fa..f739773eb 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+ import javax.annotation.Nullable;
+ import org.apache.commons.lang3.tuple.Pair;
++import org.apache.rocketmq.common.BoundaryType;
+ import org.apache.rocketmq.common.message.MessageQueue;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+@@ -50,7 +51,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
+ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
+ import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
+ import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
+-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
++import org.apache.rocketmq.tieredstore.index.IndexItem;
++import org.apache.rocketmq.tieredstore.index.IndexService;
+ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
+ import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
+ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
+@@ -58,7 +60,6 @@ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+ import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
+ import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+-import org.apache.rocketmq.common.BoundaryType;
+
+ public class TieredMessageFetcher implements MessageStoreFetcher {
+
+@@ -555,85 +556,51 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
+ public CompletableFuture queryMessageAsync(
+ String topic, String key, int maxCount, long begin, long end) {
+
+- TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
++ IndexService indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
+
+- int hashCode = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
+ long topicId;
+ try {
+ TopicMetadata topicMetadata = metadataStore.getTopic(topic);
+ if (topicMetadata == null) {
+- LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
++ LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
+ return CompletableFuture.completedFuture(new QueryMessageResult());
+ }
+ topicId = topicMetadata.getTopicId();
+ } catch (Exception e) {
+- LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
++ LOGGER.error("MessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
+ return CompletableFuture.completedFuture(new QueryMessageResult());
+ }
+
+- return indexFile.queryAsync(topic, key, begin, end)
+- .thenCompose(indexBufferList -> {
+- QueryMessageResult result = new QueryMessageResult();
+- int resultCount = 0;
+- List> futureList = new ArrayList<>(maxCount);
+- for (Pair pair : indexBufferList) {
+- Long fileBeginTimestamp = pair.getKey();
+- ByteBuffer indexBuffer = pair.getValue();
+-
+- if (indexBuffer.remaining() % TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
+- LOGGER.error("[Bug] TieredMessageFetcher#queryMessageAsync: " +
+- "index buffer size {} is not multiple of index item size {}",
+- indexBuffer.remaining(), TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
+- continue;
+- }
+-
+- for (int indexOffset = indexBuffer.position();
+- indexOffset < indexBuffer.limit();
+- indexOffset += TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
+-
+- int indexItemHashCode = indexBuffer.getInt(indexOffset);
+- if (indexItemHashCode != hashCode) {
+- continue;
+- }
+-
+- int indexItemTopicId = indexBuffer.getInt(indexOffset + 4);
+- if (indexItemTopicId != topicId) {
+- continue;
+- }
+-
+- int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
+- CompositeFlatFile flatFile =
+- flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
+- if (flatFile == null) {
+- continue;
+- }
+-
+- // decode index item
+- long offset = indexBuffer.getLong(indexOffset + 4 + 4 + 4);
+- int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8);
+- int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4);
+- long indexTimestamp = fileBeginTimestamp + timeDiff;
+- if (indexTimestamp < begin || indexTimestamp > end) {
+- continue;
+- }
++ CompletableFuture> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end);
+
+- CompletableFuture getMessageFuture = flatFile.getCommitLogAsync(offset, size)
+- .thenAccept(messageBuffer -> result.addMessage(
+- new SelectMappedBufferResult(0, messageBuffer, size, null)));
+- futureList.add(getMessageFuture);
+-
+- resultCount++;
+- if (resultCount >= maxCount) {
+- break;
+- }
+- }
+-
+- if (resultCount >= maxCount) {
+- break;
+- }
++ return future.thenCompose(indexItemList -> {
++ QueryMessageResult result = new QueryMessageResult();
++ List> futureList = new ArrayList<>(maxCount);
++ for (IndexItem indexItem : indexItemList) {
++ if (topicId != indexItem.getTopicId()) {
++ continue;
+ }
+- return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
+- .thenApply(v -> result);
+- });
++ CompositeFlatFile flatFile =
++ flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, indexItem.getQueueId()));
++ if (flatFile == null) {
++ continue;
++ }
++ CompletableFuture getMessageFuture = flatFile
++ .getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
++ .thenAccept(messageBuffer -> result.addMessage(
++ new SelectMappedBufferResult(
++ indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
++ futureList.add(getMessageFuture);
++ if (futureList.size() >= maxCount) {
++ break;
++ }
++ }
++ return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
++ }).whenComplete((result, throwable) -> {
++ if (result != null) {
++ LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
++ result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
++ }
++ });
+ }
+ }
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+index 0a797f465..67d2cf064 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+@@ -17,11 +17,15 @@
+
+ package org.apache.rocketmq.tieredstore.file;
+
++import java.util.Arrays;
++import java.util.HashSet;
++import java.util.Set;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.rocketmq.common.message.MessageConst;
+ import org.apache.rocketmq.common.message.MessageQueue;
+ import org.apache.rocketmq.store.DispatchRequest;
+ import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.index.IndexService;
+ import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
+ import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
+ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+@@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
+ private final MessageQueue messageQueue;
+ private long topicSequenceNumber;
+ private QueueMetadata queueMetadata;
+- private final TieredIndexFile indexFile;
++ private final IndexService indexStoreService;
+
+ public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
+ super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
+ this.messageQueue = messageQueue;
+ this.recoverQueueMetadata();
+- this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
++ this.indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
+ }
+
+ @Override
+@@ -85,24 +89,15 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
+ return AppendResult.FILE_CLOSED;
+ }
+
++ Set keySet = new HashSet<>(
++ Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR)));
+ if (StringUtils.isNotBlank(request.getUniqKey())) {
+- AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
+- request.getUniqKey(), request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
+- if (result != AppendResult.SUCCESS) {
+- return result;
+- }
++ keySet.add(request.getUniqKey());
+ }
+
+- for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) {
+- if (StringUtils.isNotBlank(key)) {
+- AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
+- key, request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
+- if (result != AppendResult.SUCCESS) {
+- return result;
+- }
+- }
+- }
+- return AppendResult.SUCCESS;
++ return indexStoreService.putKey(
++ messageQueue.getTopic(), (int) topicSequenceNumber, messageQueue.getQueueId(), keySet,
++ request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
+ }
+
+ public MessageQueue getMessageQueue() {
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+index 35007f8cb..6953db032 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+@@ -20,9 +20,9 @@ import com.google.common.annotations.VisibleForTesting;
+ import java.nio.ByteBuffer;
+ import java.util.concurrent.CompletableFuture;
+ import org.apache.commons.lang3.tuple.Pair;
++import org.apache.rocketmq.common.BoundaryType;
+ import org.apache.rocketmq.tieredstore.common.AppendResult;
+ import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+-import org.apache.rocketmq.common.BoundaryType;
+
+ public class TieredConsumeQueue {
+
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+index d96eb6e8f..a41d562d1 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+@@ -141,7 +141,6 @@ public class TieredFlatFile {
+ return fileType;
+ }
+
+- @VisibleForTesting
+ public List getFileSegmentList() {
+ return fileSegmentList;
+ }
+@@ -274,7 +273,7 @@ public class TieredFlatFile {
+ }
+
+ @Nullable
+- protected TieredFileSegment getFileByIndex(int index) {
++ public TieredFileSegment getFileByIndex(int index) {
+ fileSegmentLock.readLock().lock();
+ try {
+ if (index < fileSegmentList.size()) {
+@@ -354,7 +353,7 @@ public class TieredFlatFile {
+ }
+ }
+
+- protected List getFileListByTime(long beginTime, long endTime) {
++ public List getFileListByTime(long beginTime, long endTime) {
+ fileSegmentLock.readLock().lock();
+ try {
+ return fileSegmentList.stream()
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+index 087ea8c9c..ffe0836f1 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+@@ -34,6 +34,8 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+ import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
++import org.apache.rocketmq.tieredstore.index.IndexService;
++import org.apache.rocketmq.tieredstore.index.IndexStoreService;
+ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
+ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+@@ -43,7 +45,7 @@ public class TieredFlatFileManager {
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ private static volatile TieredFlatFileManager instance;
+- private static volatile TieredIndexFile indexFile;
++ private static volatile IndexStoreService indexStoreService;
+
+ private final TieredMetadataStore metadataStore;
+ private final TieredMessageStoreConfig storeConfig;
+@@ -76,25 +78,26 @@ public class TieredFlatFileManager {
+ return instance;
+ }
+
+- public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) {
++ public static IndexService getTieredIndexService(TieredMessageStoreConfig storeConfig) {
+ if (storeConfig == null) {
+- return indexFile;
++ return indexStoreService;
+ }
+
+- if (indexFile == null) {
++ if (indexStoreService == null) {
+ synchronized (TieredFlatFileManager.class) {
+- if (indexFile == null) {
++ if (indexStoreService == null) {
+ try {
+ String filePath = TieredStoreUtil.toPath(new MessageQueue(
+ TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
+- indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
++ indexStoreService = new IndexStoreService(new TieredFileAllocator(storeConfig), filePath);
++ indexStoreService.start();
+ } catch (Exception e) {
+ logger.error("Construct FlatFileManager indexFile error", e);
+ }
+ }
+ }
+ }
+- return indexFile;
++ return indexStoreService;
+ }
+
+ public void doCommit() {
+@@ -120,15 +123,6 @@ public class TieredFlatFileManager {
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+- TieredStoreExecutor.commitExecutor.schedule(() -> {
+- try {
+- if (indexFile != null) {
+- indexFile.commit(true);
+- }
+- } catch (Throwable e) {
+- logger.error("Commit indexFile periodically failed", e);
+- }
+- }, 0, TimeUnit.MILLISECONDS);
+ }
+
+ public void doCleanExpiredFile() {
+@@ -148,10 +142,6 @@ public class TieredFlatFileManager {
+ }
+ });
+ }
+- if (indexFile != null) {
+- indexFile.cleanExpiredFile(expiredTimeStamp);
+- indexFile.destroyExpiredFile();
+- }
+ }
+
+ private void doScheduleTask() {
+@@ -244,7 +234,7 @@ public class TieredFlatFileManager {
+
+ private static void cleanStaticReference() {
+ instance = null;
+- indexFile = null;
++ indexStoreService = null;
+ }
+
+ @Nullable
+@@ -271,8 +261,8 @@ public class TieredFlatFileManager {
+ }
+
+ public void shutdown() {
+- if (indexFile != null) {
+- indexFile.commit(true);
++ if (indexStoreService != null) {
++ indexStoreService.shutdown();
+ }
+ for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) {
+ flatFile.shutdown();
+@@ -280,8 +270,8 @@ public class TieredFlatFileManager {
+ }
+
+ public void destroy() {
+- if (indexFile != null) {
+- indexFile.destroy();
++ if (indexStoreService != null) {
++ indexStoreService.destroy();
+ }
+ ImmutableList flatFileList = deepCopyFlatFileToList();
+ cleanup();
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+deleted file mode 100644
+index eda5e0106..000000000
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
++++ /dev/null
+@@ -1,470 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.rocketmq.tieredstore.file;
+-
+-import com.google.common.annotations.VisibleForTesting;
+-import java.io.File;
+-import java.io.IOException;
+-import java.nio.ByteBuffer;
+-import java.nio.MappedByteBuffer;
+-import java.nio.file.Paths;
+-import java.util.ArrayList;
+-import java.util.List;
+-import java.util.concurrent.CompletableFuture;
+-import java.util.concurrent.Future;
+-import java.util.concurrent.TimeUnit;
+-import java.util.concurrent.locks.ReentrantLock;
+-import org.apache.commons.lang3.tuple.Pair;
+-import org.apache.rocketmq.common.message.MessageQueue;
+-import org.apache.rocketmq.logging.org.slf4j.Logger;
+-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+-import org.apache.rocketmq.store.index.IndexHeader;
+-import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+-import org.apache.rocketmq.store.logfile.MappedFile;
+-import org.apache.rocketmq.tieredstore.common.AppendResult;
+-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+-import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+-
+-public class TieredIndexFile {
+-
+- private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+-
+- // header format:
+- // magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + index num(4)
+- public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
+- public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
+- public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
+- public static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
+- public static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
+- public static final int INDEX_FILE_HEADER_SIZE = 28;
+-
+- // index item
+- public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
+- public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
+- public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
+- public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
+- public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
+-
+- private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
+- private static final String CUR_INDEX_FILE_NAME = "0000";
+- private static final String PRE_INDEX_FILE_NAME = "1111";
+- private static final String COMPACT_FILE_NAME = "2222";
+-
+- private final TieredMessageStoreConfig storeConfig;
+- private final TieredFlatFile flatFile;
+- private final int maxHashSlotNum;
+- private final int maxIndexNum;
+- private final int fileMaxSize;
+- private final String curFilePath;
+- private final String preFilepath;
+- private MappedFile preMappedFile;
+- private MappedFile curMappedFile;
+-
+- private final ReentrantLock curFileLock = new ReentrantLock();
+- private Future inflightCompactFuture = CompletableFuture.completedFuture(null);
+-
+- protected TieredIndexFile(TieredFileAllocator fileQueueFactory, String filePath) throws IOException {
+- this.storeConfig = fileQueueFactory.getStoreConfig();
+- this.flatFile = fileQueueFactory.createFlatFileForIndexFile(filePath);
+- if (flatFile.getBaseOffset() == -1) {
+- flatFile.setBaseOffset(0);
+- }
+- this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+- this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+- this.fileMaxSize = IndexHeader.INDEX_HEADER_SIZE
+- + this.maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+- + this.maxIndexNum * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE
+- + 4;
+- this.curFilePath = Paths.get(
+- storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, CUR_INDEX_FILE_NAME).toString();
+- this.preFilepath = Paths.get(
+- storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, PRE_INDEX_FILE_NAME).toString();
+- initFile();
+- TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(
+- this::doScheduleTask, 10, 10, TimeUnit.SECONDS);
+- }
+-
+- protected void doScheduleTask() {
+- try {
+- curFileLock.lock();
+- try {
+- synchronized (TieredIndexFile.class) {
+- MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
+- int indexNum = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
+- long lastIndexTime = mappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+- if (indexNum > 0 &&
+- System.currentTimeMillis() - lastIndexTime >
+- storeConfig.getTieredStoreIndexFileRollingIdleInterval()) {
+- mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
+- rollingFile();
+- }
+- if (inflightCompactFuture.isDone() && preMappedFile != null && preMappedFile.isAvailable()) {
+- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(
+- new CompactTask(storeConfig, preMappedFile, flatFile), null);
+- }
+- }
+- } finally {
+- curFileLock.unlock();
+- }
+- } catch (Throwable throwable) {
+- logger.error("TieredIndexFile: submit compact index file task failed:", throwable);
+- }
+- }
+-
+- private static boolean isFileSealed(MappedFile mappedFile) {
+- return mappedFile.getMappedByteBuffer().getInt(mappedFile.getFileSize() - 4) == INDEX_FILE_END_MAGIC_CODE;
+- }
+-
+- private void initIndexFileHeader(MappedFile mappedFile) {
+- MappedByteBuffer mappedByteBuffer = mappedFile.getMappedByteBuffer();
+- if (mappedByteBuffer.getInt(0) != INDEX_FILE_BEGIN_MAGIC_CODE) {
+- mappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
+- mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, -1L);
+- mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, -1L);
+- mappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 0);
+- mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 0);
+- for (int i = 0; i < maxHashSlotNum; i++) {
+- mappedByteBuffer.putInt(INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE, -1);
+- }
+- mappedByteBuffer.putInt(fileMaxSize - 4, -1);
+- }
+- }
+-
+- @VisibleForTesting
+- public MappedFile getPreMappedFile() {
+- return preMappedFile;
+- }
+-
+- private void initFile() throws IOException {
+- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+- initIndexFileHeader(curMappedFile);
+- File preFile = new File(preFilepath);
+- boolean preFileExists = preFile.exists();
+- if (preFileExists) {
+- preMappedFile = new DefaultMappedFile(preFilepath, fileMaxSize);
+- }
+-
+- if (isFileSealed(curMappedFile)) {
+- if (preFileExists) {
+- if (preFile.delete()) {
+- logger.info("Pre IndexFile deleted success", preFilepath);
+- } else {
+- logger.error("Pre IndexFile deleted failed", preFilepath);
+- }
+- }
+- boolean rename = curMappedFile.renameTo(preFilepath);
+- if (rename) {
+- preMappedFile = curMappedFile;
+- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+- initIndexFileHeader(curMappedFile);
+- preFileExists = true;
+- }
+- }
+-
+- if (preFileExists) {
+- synchronized (TieredIndexFile.class) {
+- if (inflightCompactFuture.isDone()) {
+- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(
+- new CompactTask(storeConfig, preMappedFile, flatFile), null);
+- }
+- }
+- }
+- }
+-
+- public AppendResult append(MessageQueue mq, int topicId, String key, long offset, int size, long timeStamp) {
+- return putKey(mq, topicId, indexKeyHashMethod(buildKey(mq.getTopic(), key)), offset, size, timeStamp);
+- }
+-
+- private boolean rollingFile() throws IOException {
+- File preFile = new File(preFilepath);
+- boolean preFileExists = preFile.exists();
+- if (!preFileExists) {
+- boolean rename = curMappedFile.renameTo(preFilepath);
+- if (rename) {
+- preMappedFile = curMappedFile;
+- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+- initIndexFileHeader(curMappedFile);
+- tryToCompactPreFile();
+- return true;
+- } else {
+- logger.error("TieredIndexFile#rollingFile: rename current file failed");
+- return false;
+- }
+- }
+- tryToCompactPreFile();
+- return false;
+- }
+-
+- private void tryToCompactPreFile() throws IOException {
+- synchronized (TieredIndexFile.class) {
+- if (inflightCompactFuture.isDone()) {
+- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null);
+- }
+- }
+- }
+-
+- private AppendResult putKey(MessageQueue mq, int topicId, int hashCode, long offset, int size, long timeStamp) {
+- curFileLock.lock();
+- try {
+- if (isFileSealed(curMappedFile) && !rollingFile()) {
+- return AppendResult.FILE_FULL;
+- }
+-
+- MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
+-
+- int slotPosition = hashCode % maxHashSlotNum;
+- int slotOffset = INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE;
+-
+- int slotValue = mappedByteBuffer.getInt(slotOffset);
+-
+- long beginTimeStamp = mappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
+- if (beginTimeStamp == -1) {
+- mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, timeStamp);
+- beginTimeStamp = timeStamp;
+- }
+-
+- int indexCount = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
+- int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+- + indexCount * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
+-
+- int timeDiff = (int) (timeStamp - beginTimeStamp);
+-
+- // put hash index
+- mappedByteBuffer.putInt(indexOffset, hashCode);
+- mappedByteBuffer.putInt(indexOffset + 4, topicId);
+- mappedByteBuffer.putInt(indexOffset + 4 + 4, mq.getQueueId());
+- mappedByteBuffer.putLong(indexOffset + 4 + 4 + 4, offset);
+- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8, size);
+- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4, timeDiff);
+- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4, slotValue);
+-
+- // put hash slot
+- mappedByteBuffer.putInt(slotOffset, indexCount);
+-
+- // put header
+- indexCount += 1;
+- mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, indexCount);
+- mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, timeStamp);
+- if (indexCount == maxIndexNum) {
+- mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
+- rollingFile();
+- }
+- return AppendResult.SUCCESS;
+- } catch (Exception e) {
+- logger.error("TieredIndexFile#putKey: put key failed:", e);
+- return AppendResult.IO_ERROR;
+- } finally {
+- curFileLock.unlock();
+- }
+- }
+-
+- public CompletableFuture>> queryAsync(String topic, String key, long beginTime,
+- long endTime) {
+- int hashCode = indexKeyHashMethod(buildKey(topic, key));
+- int slotPosition = hashCode % maxHashSlotNum;
+- List fileSegmentList = flatFile.getFileListByTime(beginTime, endTime);
+- CompletableFuture>> future = null;
+- for (int i = fileSegmentList.size() - 1; i >= 0; i--) {
+- TieredFileSegment fileSegment = fileSegmentList.get(i);
+- CompletableFuture tmpFuture = fileSegment.readAsync(INDEX_FILE_HEADER_SIZE + (long) slotPosition * INDEX_FILE_HASH_SLOT_SIZE, INDEX_FILE_HASH_SLOT_SIZE)
+- .thenCompose(slotBuffer -> {
+- int indexPosition = slotBuffer.getInt();
+- if (indexPosition == -1) {
+- return CompletableFuture.completedFuture(null);
+- }
+-
+- int indexSize = slotBuffer.getInt();
+- if (indexSize <= 0) {
+- return CompletableFuture.completedFuture(null);
+- }
+- return fileSegment.readAsync(indexPosition, indexSize);
+- });
+- if (future == null) {
+- future = tmpFuture.thenApply(indexBuffer -> {
+- List> result = new ArrayList<>();
+- if (indexBuffer != null) {
+- result.add(Pair.of(fileSegment.getMinTimestamp(), indexBuffer));
+- }
+- return result;
+- });
+- } else {
+- future = future.thenCombine(tmpFuture, (indexList, indexBuffer) -> {
+- if (indexBuffer != null) {
+- indexList.add(Pair.of(fileSegment.getMinTimestamp(), indexBuffer));
+- }
+- return indexList;
+- });
+- }
+- }
+- return future == null ? CompletableFuture.completedFuture(new ArrayList<>()) : future;
+- }
+-
+- public static String buildKey(String topic, String key) {
+- return topic + "#" + key;
+- }
+-
+- public static int indexKeyHashMethod(String key) {
+- int keyHash = key.hashCode();
+- int keyHashPositive = Math.abs(keyHash);
+- if (keyHashPositive < 0)
+- keyHashPositive = 0;
+- return keyHashPositive;
+- }
+-
+- public void commit(boolean sync) {
+- flatFile.commit(sync);
+- if (sync) {
+- try {
+- inflightCompactFuture.get();
+- } catch (Exception ignore) {
+- }
+- }
+- }
+-
+- public void cleanExpiredFile(long expireTimestamp) {
+- flatFile.cleanExpiredFile(expireTimestamp);
+- }
+-
+- public void destroyExpiredFile() {
+- flatFile.destroyExpiredFile();
+- }
+-
+- public void destroy() {
+- inflightCompactFuture.cancel(true);
+- if (preMappedFile != null) {
+- preMappedFile.destroy(-1);
+- }
+- if (curMappedFile != null) {
+- curMappedFile.destroy(-1);
+- }
+- String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
+- File compactFile = new File(compactFilePath);
+- if (compactFile.exists()) {
+- compactFile.delete();
+- }
+- flatFile.destroy();
+- }
+-
+- static class CompactTask implements Runnable {
+- private final TieredMessageStoreConfig storeConfig;
+-
+- private final int maxHashSlotNum;
+- private final int maxIndexNum;
+- private final int fileMaxSize;
+- private MappedFile originFile;
+- private TieredFlatFile fileQueue;
+- private MappedFile compactFile;
+-
+- public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile,
+- TieredFlatFile fileQueue) throws IOException {
+- this.storeConfig = storeConfig;
+- this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+- this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+- this.originFile = originFile;
+- this.fileQueue = fileQueue;
+- String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
+- fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (storeConfig.getTieredStoreIndexFileMaxHashSlotNum() * INDEX_FILE_HASH_SLOT_SIZE) + (storeConfig.getTieredStoreIndexFileMaxIndexNum() * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
+- // TODO check magic code, upload immediately when compact complete
+- File compactFile = new File(compactFilePath);
+- if (compactFile.exists()) {
+- compactFile.delete();
+- }
+- this.compactFile = new DefaultMappedFile(compactFilePath, fileMaxSize);
+- }
+-
+- @Override
+- public void run() {
+- try {
+- compact();
+- } catch (Throwable throwable) {
+- logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable);
+- }
+-
+- try {
+- if (originFile != null) {
+- originFile.destroy(-1);
+- }
+- if (compactFile != null) {
+- compactFile.destroy(-1);
+- }
+- } catch (Throwable throwable) {
+- logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable);
+- }
+- }
+-
+- public void compact() {
+- if (!isFileSealed(originFile)) {
+- logger.error("[Bug]TieredIndexFile#CompactTask#compact: try to compact unsealed file");
+- originFile.destroy(-1);
+- compactFile.destroy(-1);
+- return;
+- }
+-
+- buildCompactFile();
+- fileQueue.append(compactFile.getMappedByteBuffer());
+- fileQueue.commit(true);
+- compactFile.destroy(-1);
+- originFile.destroy(-1);
+- compactFile = null;
+- originFile = null;
+- }
+-
+- private void buildCompactFile() {
+- MappedByteBuffer originMappedByteBuffer = originFile.getMappedByteBuffer();
+- MappedByteBuffer compactMappedByteBuffer = compactFile.getMappedByteBuffer();
+- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
+- compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION));
+- compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION));
+- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, maxHashSlotNum);
+- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, originMappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION));
+-
+- int rePutSlotValue = INDEX_FILE_HEADER_SIZE + (maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE);
+- for (int i = 0; i < maxHashSlotNum; i++) {
+- int slotOffset = INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE;
+- int slotValue = originMappedByteBuffer.getInt(slotOffset);
+- if (slotValue != -1) {
+- int indexTotalSize = 0;
+- int indexPosition = slotValue;
+-
+- while (indexPosition >= 0 && indexPosition < maxIndexNum) {
+- int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+- + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
+- int rePutIndexOffset = rePutSlotValue + indexTotalSize;
+-
+- compactMappedByteBuffer.putInt(rePutIndexOffset, originMappedByteBuffer.getInt(indexOffset));
+- compactMappedByteBuffer.putInt(rePutIndexOffset + 4, originMappedByteBuffer.getInt(indexOffset + 4));
+- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4));
+- compactMappedByteBuffer.putLong(rePutIndexOffset + 4 + 4 + 4, originMappedByteBuffer.getLong(indexOffset + 4 + 4 + 4));
+- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8));
+- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4));
+-
+- indexTotalSize += INDEX_FILE_HASH_COMPACT_INDEX_SIZE;
+- indexPosition = originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4);
+- }
+- compactMappedByteBuffer.putInt(slotOffset, rePutSlotValue);
+- compactMappedByteBuffer.putInt(slotOffset + 4, indexTotalSize);
+- rePutSlotValue += indexTotalSize;
+- }
+- }
+- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_END_MAGIC_CODE);
+- compactMappedByteBuffer.putInt(rePutSlotValue, INDEX_FILE_BEGIN_MAGIC_CODE);
+- compactMappedByteBuffer.limit(rePutSlotValue + 4);
+- }
+- }
+-}
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
+new file mode 100644
+index 000000000..d131b9b53
+--- /dev/null
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.tieredstore.index;
++
++import java.nio.ByteBuffer;
++
++public interface IndexFile extends IndexService {
++
++ /**
++ * Enumeration for the status of the index file.
++ */
++ enum IndexStatusEnum {
++ SHUTDOWN, UNSEALED, SEALED, UPLOAD
++ }
++
++ long getTimestamp();
++
++ IndexStatusEnum getFileStatus();
++
++ ByteBuffer doCompaction();
++}
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
+new file mode 100644
+index 000000000..24ccc4322
+--- /dev/null
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
+@@ -0,0 +1,114 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.tieredstore.index;
++
++import java.nio.ByteBuffer;
++
++public class IndexItem {
++
++ public static final int INDEX_ITEM_SIZE = 32;
++ public static final int COMPACT_INDEX_ITEM_SIZE = 28;
++
++ private final int hashCode;
++ private final int topicId;
++ private final int queueId;
++ private final long offset;
++ private final int size;
++ private final int timeDiff;
++ private final int itemIndex;
++
++ public IndexItem(int topicId, int queueId, long offset, int size, int hashCode, int timeDiff, int itemIndex) {
++ this.hashCode = hashCode;
++ this.topicId = topicId;
++ this.queueId = queueId;
++ this.offset = offset;
++ this.size = size;
++ this.timeDiff = timeDiff;
++ this.itemIndex = itemIndex;
++ }
++
++ public IndexItem(byte[] bytes) {
++ if (bytes == null ||
++ bytes.length != INDEX_ITEM_SIZE &&
++ bytes.length != COMPACT_INDEX_ITEM_SIZE) {
++ throw new IllegalArgumentException("Byte array length not correct");
++ }
++
++ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
++ hashCode = byteBuffer.getInt(0);
++ topicId = byteBuffer.getInt(4);
++ queueId = byteBuffer.getInt(8);
++ offset = byteBuffer.getLong(12);
++ size = byteBuffer.getInt(20);
++ timeDiff = byteBuffer.getInt(24);
++ itemIndex = bytes.length == INDEX_ITEM_SIZE ? byteBuffer.getInt(28) : 0;
++ }
++
++ public ByteBuffer getByteBuffer() {
++ ByteBuffer byteBuffer = ByteBuffer.allocate(32);
++ byteBuffer.putInt(0, hashCode);
++ byteBuffer.putInt(4, topicId);
++ byteBuffer.putInt(8, queueId);
++ byteBuffer.putLong(12, offset);
++ byteBuffer.putInt(20, size);
++ byteBuffer.putInt(24, timeDiff);
++ byteBuffer.putInt(28, itemIndex);
++ return byteBuffer;
++ }
++
++ public int getHashCode() {
++ return hashCode;
++ }
++
++ public int getTopicId() {
++ return topicId;
++ }
++
++ public int getQueueId() {
++ return queueId;
++ }
++
++ public long getOffset() {
++ return offset;
++ }
++
++ public int getSize() {
++ return size;
++ }
++
++ public int getTimeDiff() {
++ return timeDiff;
++ }
++
++ public int getItemIndex() {
++ return itemIndex;
++ }
++
++ @Override
++ public String toString() {
++ return "IndexItem{" +
++ "hashCode=" + hashCode +
++ ", topicId=" + topicId +
++ ", queueId=" + queueId +
++ ", offset=" + offset +
++ ", size=" + size +
++ ", timeDiff=" + timeDiff +
++ ", position=" + itemIndex +
++ '}';
++ }
++}
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
+new file mode 100644
+index 000000000..d4eb854a2
+--- /dev/null
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
+@@ -0,0 +1,62 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.tieredstore.index;
++
++import java.util.List;
++import java.util.Set;
++import java.util.concurrent.CompletableFuture;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++
++public interface IndexService {
++
++ /**
++ * Puts a key into the index.
++ *
++ * @param topic The topic of the key.
++ * @param topicId The ID of the topic.
++ * @param queueId The ID of the queue.
++ * @param keySet The set of keys to be indexed.
++ * @param offset The offset value of the key.
++ * @param size The size of the key.
++ * @param timestamp The timestamp of the key.
++ * @return The result of the put operation.
++ */
++ AppendResult putKey(
++ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp);
++
++ /**
++ * Asynchronously queries the index for a specific key within a given time range.
++ *
++ * @param topic The topic of the key.
++ * @param key The key to be queried.
++ * @param beginTime The start time of the query range.
++ * @param endTime The end time of the query range.
++ * @return A CompletableFuture that holds the list of IndexItems matching the query.
++ */
++ CompletableFuture> queryAsync(String topic, String key, int maxCount, long beginTime, long endTime);
++
++ /**
++ * Shutdown the index service.
++ */
++ void shutdown();
++
++ /**
++ * Destroys the index service and releases all resources.
++ */
++ void destroy();
++}
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+new file mode 100644
+index 000000000..52a686f68
+--- /dev/null
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+@@ -0,0 +1,499 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.tieredstore.index;
++
++import com.google.common.base.Stopwatch;
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.nio.MappedByteBuffer;
++import java.nio.file.Paths;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.List;
++import java.util.Optional;
++import java.util.Set;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicLong;
++import java.util.concurrent.atomic.AtomicReference;
++import java.util.concurrent.locks.ReadWriteLock;
++import java.util.concurrent.locks.ReentrantReadWriteLock;
++import java.util.stream.Collectors;
++import org.apache.commons.lang3.StringUtils;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.store.logfile.DefaultMappedFile;
++import org.apache.rocketmq.store.logfile.MappedFile;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
++
++import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.SEALED;
++import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UNSEALED;
++import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UPLOAD;
++import static org.apache.rocketmq.tieredstore.index.IndexItem.COMPACT_INDEX_ITEM_SIZE;
++import static org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_COMPACTED_DIRECTORY_NAME;
++import static org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_DIRECTORY_NAME;
++
++/**
++ * a single IndexFile in indexService
++ */
++public class IndexStoreFile implements IndexFile {
++
++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
++
++ /**
++ * header format:
++ * magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + index num(4)
++ */
++ public static final int INDEX_MAGIC_CODE = 0;
++ public static final int INDEX_BEGIN_TIME_STAMP = 4;
++ public static final int INDEX_END_TIME_STAMP = 12;
++ public static final int INDEX_SLOT_COUNT = 20;
++ public static final int INDEX_ITEM_INDEX = 24;
++ public static final int INDEX_HEADER_SIZE = 28;
++
++ public static final int BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
++ public static final int END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
++
++ /**
++ * hash slot
++ */
++ private static final int INVALID_INDEX = 0;
++ private static final int HASH_SLOT_SIZE = Long.BYTES;
++ private static final int MAX_QUERY_COUNT = 512;
++
++ private final int hashSlotMaxCount;
++ private final int indexItemMaxCount;
++
++ private final ReadWriteLock fileReadWriteLock;
++ private final AtomicReference fileStatus;
++ private final AtomicLong beginTimestamp = new AtomicLong(-1L);
++ private final AtomicLong endTimestamp = new AtomicLong(-1L);
++ private final AtomicInteger hashSlotCount = new AtomicInteger(0);
++ private final AtomicInteger indexItemCount = new AtomicInteger(0);
++
++ private MappedFile mappedFile;
++ private ByteBuffer byteBuffer;
++ private MappedFile compactMappedFile;
++ private TieredFileSegment fileSegment;
++
++ public IndexStoreFile(TieredMessageStoreConfig storeConfig, long timestamp) throws IOException {
++ this.hashSlotMaxCount = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
++ this.indexItemMaxCount = storeConfig.getTieredStoreIndexFileMaxIndexNum();
++ this.fileStatus = new AtomicReference<>(UNSEALED);
++ this.fileReadWriteLock = new ReentrantReadWriteLock();
++ this.mappedFile = new DefaultMappedFile(
++ Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString(),
++ this.getItemPosition(indexItemMaxCount));
++ this.byteBuffer = this.mappedFile.getMappedByteBuffer();
++
++ this.beginTimestamp.set(timestamp);
++ this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP));
++ this.hashSlotCount.set(byteBuffer.getInt(INDEX_SLOT_COUNT));
++ this.indexItemCount.set(byteBuffer.getInt(INDEX_ITEM_INDEX));
++ this.flushNewMetadata(byteBuffer, indexItemMaxCount == this.indexItemCount.get() + 1);
++ }
++
++ public IndexStoreFile(TieredMessageStoreConfig storeConfig, TieredFileSegment fileSegment) {
++ this.fileSegment = fileSegment;
++ this.fileStatus = new AtomicReference<>(UPLOAD);
++ this.fileReadWriteLock = new ReentrantReadWriteLock();
++
++ this.beginTimestamp.set(fileSegment.getMinTimestamp());
++ this.endTimestamp.set(fileSegment.getMaxTimestamp());
++ this.hashSlotCount.set(storeConfig.getTieredStoreIndexFileMaxHashSlotNum());
++ this.indexItemCount.set(storeConfig.getTieredStoreIndexFileMaxIndexNum());
++ this.hashSlotMaxCount = hashSlotCount.get();
++ this.indexItemMaxCount = indexItemCount.get();
++ }
++
++ @Override
++ public long getTimestamp() {
++ return this.beginTimestamp.get();
++ }
++
++ public long getEndTimestamp() {
++ return this.endTimestamp.get();
++ }
++
++ public long getHashSlotCount() {
++ return this.hashSlotCount.get();
++ }
++
++ public long getIndexItemCount() {
++ return this.indexItemCount.get();
++ }
++
++ @Override
++ public IndexStatusEnum getFileStatus() {
++ return this.fileStatus.get();
++ }
++
++ protected String buildKey(String topic, String key) {
++ return String.format("%s#%s", topic, key);
++ }
++
++ protected int hashCode(String keyStr) {
++ int keyHash = keyStr.hashCode();
++ return (keyHash < 0) ? -keyHash : keyHash;
++ }
++
++ protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) {
++ byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
++ byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
++ byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
++ byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
++ byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
++ }
++
++ protected int getSlotPosition(int slotIndex) {
++ return INDEX_HEADER_SIZE + slotIndex * HASH_SLOT_SIZE;
++ }
++
++ protected int getSlotValue(int slotPosition) {
++ return Math.max(this.byteBuffer.getInt(slotPosition), INVALID_INDEX);
++ }
++
++ protected int getItemPosition(int itemIndex) {
++ return INDEX_HEADER_SIZE + hashSlotMaxCount * HASH_SLOT_SIZE + itemIndex * IndexItem.INDEX_ITEM_SIZE;
++ }
++
++ @Override
++ public AppendResult putKey(
++ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp) {
++
++ if (StringUtils.isBlank(topic)) {
++ return AppendResult.UNKNOWN_ERROR;
++ }
++
++ if (keySet == null || keySet.isEmpty()) {
++ return AppendResult.SUCCESS;
++ }
++
++ try {
++ fileReadWriteLock.writeLock().lock();
++
++ if (!UNSEALED.equals(fileStatus.get())) {
++ return AppendResult.FILE_FULL;
++ }
++
++ if (this.indexItemCount.get() + keySet.size() >= this.indexItemMaxCount) {
++ this.fileStatus.set(IndexStatusEnum.SEALED);
++ return AppendResult.FILE_FULL;
++ }
++
++ for (String key : keySet) {
++ int hashCode = this.hashCode(this.buildKey(topic, key));
++ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
++ int slotOldValue = this.getSlotValue(slotPosition);
++ int timeDiff = (int) ((timestamp - this.beginTimestamp.get()) / 1000L);
++
++ IndexItem indexItem = new IndexItem(
++ topicId, queueId, offset, size, hashCode, timeDiff, slotOldValue);
++ int itemIndex = this.indexItemCount.incrementAndGet();
++ this.byteBuffer.position(this.getItemPosition(itemIndex));
++ this.byteBuffer.put(indexItem.getByteBuffer());
++ this.byteBuffer.putInt(slotPosition, itemIndex);
++
++ if (slotOldValue <= INVALID_INDEX) {
++ this.hashSlotCount.incrementAndGet();
++ }
++ if (this.endTimestamp.get() < timestamp) {
++ this.endTimestamp.set(timestamp);
++ }
++ this.flushNewMetadata(byteBuffer, indexItemMaxCount == this.indexItemCount.get() + 1);
++
++ log.trace("IndexStoreFile put key, timestamp: {}, topic: {}, key: {}, slot: {}, item: {}, previous item: {}, content: {}",
++ this.getTimestamp(), topic, key, hashCode % this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
++ }
++ return AppendResult.SUCCESS;
++ } catch (Exception e) {
++ log.error("IndexStoreFile put key error, topic: {}, topicId: {}, queueId: {}, keySet: {}, offset: {}, " +
++ "size: {}, timestamp: {}", topic, topicId, queueId, keySet, offset, size, timestamp, e);
++ } finally {
++ fileReadWriteLock.writeLock().unlock();
++ }
++
++ return AppendResult.UNKNOWN_ERROR;
++ }
++
++ @Override
++ public CompletableFuture> queryAsync(
++ String topic, String key, int maxCount, long beginTime, long endTime) {
++
++ switch (this.fileStatus.get()) {
++ case UNSEALED:
++ case SEALED:
++ return this.queryAsyncFromUnsealedFile(buildKey(topic, key), maxCount, beginTime, endTime);
++ case UPLOAD:
++ return this.queryAsyncFromSegmentFile(buildKey(topic, key), maxCount, beginTime, endTime);
++ case SHUTDOWN:
++ default:
++ return CompletableFuture.completedFuture(new ArrayList<>());
++ }
++ }
++
++ protected CompletableFuture> queryAsyncFromUnsealedFile(
++ String key, int maxCount, long beginTime, long endTime) {
++
++ return CompletableFuture.supplyAsync(() -> {
++ List result = new ArrayList<>();
++ try {
++ fileReadWriteLock.readLock().lock();
++ if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) {
++ return result;
++ }
++
++ if (mappedFile == null || !mappedFile.hold()) {
++ return result;
++ }
++
++ int hashCode = this.hashCode(key);
++ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
++ int slotValue = this.getSlotValue(slotPosition);
++
++ int left = MAX_QUERY_COUNT;
++ while (left > 0 &&
++ slotValue > INVALID_INDEX &&
++ slotValue <= this.indexItemCount.get()) {
++
++ byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
++ ByteBuffer buffer = this.byteBuffer.duplicate();
++ buffer.position(this.getItemPosition(slotValue));
++ buffer.get(bytes);
++ IndexItem indexItem = new IndexItem(bytes);
++ if (hashCode == indexItem.getHashCode()) {
++ result.add(indexItem);
++ if (result.size() > maxCount) {
++ break;
++ }
++ }
++ slotValue = indexItem.getItemIndex();
++ left--;
++ }
++
++ log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " +
++ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
++ getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime);
++ } catch (Exception e) {
++ log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " +
++ "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e);
++ } finally {
++ fileReadWriteLock.readLock().unlock();
++ mappedFile.release();
++ }
++ return result;
++ }, TieredStoreExecutor.fetchDataExecutor);
++ }
++
++ protected CompletableFuture> queryAsyncFromSegmentFile(
++ String key, int maxCount, long beginTime, long endTime) {
++
++ if (this.fileSegment == null || !UPLOAD.equals(this.fileStatus.get())) {
++ return CompletableFuture.completedFuture(Collections.emptyList());
++ }
++
++ Stopwatch stopwatch = Stopwatch.createStarted();
++ int hashCode = this.hashCode(key);
++ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
++
++ CompletableFuture> future = this.fileSegment.readAsync(slotPosition, HASH_SLOT_SIZE)
++ .thenCompose(slotBuffer -> {
++ if (slotBuffer.remaining() < HASH_SLOT_SIZE) {
++ log.error("IndexStoreFile query from tiered storage return error slot buffer, " +
++ "key: {}, maxCount: {}, timestamp={}-{}", key, maxCount, beginTime, endTime);
++ return CompletableFuture.completedFuture(null);
++ }
++ int indexPosition = slotBuffer.getInt();
++ int indexTotalSize = Math.min(slotBuffer.getInt(), COMPACT_INDEX_ITEM_SIZE * 1024);
++ if (indexPosition <= INVALID_INDEX || indexTotalSize <= 0) {
++ return CompletableFuture.completedFuture(null);
++ }
++ return this.fileSegment.readAsync(indexPosition, indexTotalSize);
++ })
++ .thenApply(itemBuffer -> {
++ List result = new ArrayList<>();
++ if (itemBuffer == null) {
++ return result;
++ }
++
++ if (itemBuffer.remaining() % COMPACT_INDEX_ITEM_SIZE != 0) {
++ log.error("IndexStoreFile query from tiered storage return error item buffer, " +
++ "key: {}, maxCount: {}, timestamp={}-{}", key, maxCount, beginTime, endTime);
++ return result;
++ }
++
++ int size = itemBuffer.remaining() / COMPACT_INDEX_ITEM_SIZE;
++ byte[] bytes = new byte[COMPACT_INDEX_ITEM_SIZE];
++ for (int i = 0; i < size; i++) {
++ itemBuffer.get(bytes);
++ IndexItem indexItem = new IndexItem(bytes);
++ long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
++ if (hashCode == indexItem.getHashCode() &&
++ beginTime <= storeTimestamp && storeTimestamp <= endTime &&
++ result.size() < maxCount) {
++ result.add(indexItem);
++ }
++ }
++ return result;
++ });
++
++ return future.whenComplete((result, throwable) -> {
++ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
++ if (throwable != null) {
++ log.error("IndexStoreFile query from segment file, cost: {}ms, timestamp: {}, " +
++ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
++ costTime, getTimestamp(), key, hashCode, maxCount, beginTime, endTime, throwable);
++ } else {
++ String details = Optional.ofNullable(result)
++ .map(r -> r.stream()
++ .map(item -> String.format("%d-%d", item.getQueueId(), item.getOffset()))
++ .collect(Collectors.joining(", ")))
++ .orElse("");
++
++ log.debug("IndexStoreFile query from segment file, cost: {}ms, timestamp: {}, result size: {}, ({}), " +
++ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
++ costTime, getTimestamp(), result != null ? result.size() : 0, details, key, hashCode, maxCount, beginTime, endTime);
++ }
++ });
++ }
++
++ @Override
++ public ByteBuffer doCompaction() {
++ Stopwatch stopwatch = Stopwatch.createStarted();
++ ByteBuffer buffer;
++ try {
++ buffer = compactToNewFile();
++ log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms",
++ this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS));
++ } catch (Exception e) {
++ log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms",
++ this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e);
++ return null;
++ }
++
++ try {
++ // Make sure there is no read request here
++ fileReadWriteLock.writeLock().lock();
++ fileStatus.set(IndexStatusEnum.SEALED);
++ } catch (Exception e) {
++ log.error("IndexStoreFile change file status to sealed error, timestamp={}", this.getTimestamp());
++ } finally {
++ fileReadWriteLock.writeLock().unlock();
++ }
++ return buffer;
++ }
++
++ protected String getCompactedFilePath() {
++ return Paths.get(this.mappedFile.getFileName()).getParent()
++ .resolve(FILE_COMPACTED_DIRECTORY_NAME)
++ .resolve(String.valueOf(this.getTimestamp())).toString();
++ }
++
++ protected ByteBuffer compactToNewFile() throws IOException {
++
++ byte[] payload = new byte[IndexItem.INDEX_ITEM_SIZE];
++ ByteBuffer payloadBuffer = ByteBuffer.wrap(payload);
++ int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount * HASH_SLOT_SIZE);
++ int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE * indexItemCount.get();
++
++ compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), fileMaxLength);
++ MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer();
++
++ for (int i = 0; i < hashSlotMaxCount; i++) {
++ int slotPosition = this.getSlotPosition(i);
++ int slotValue = this.getSlotValue(slotPosition);
++ int writeBeginPosition = writePosition;
++
++ while (slotValue > INVALID_INDEX && writePosition < fileMaxLength) {
++ ByteBuffer buffer = this.byteBuffer.duplicate();
++ buffer.position(this.getItemPosition(slotValue));
++ buffer.get(payload);
++ int newSlotValue = payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
++ buffer.limit(COMPACT_INDEX_ITEM_SIZE);
++ newBuffer.position(writePosition);
++ newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
++ log.trace("IndexStoreFile do compaction, write item, slot: {}, current: {}, next: {}", i, slotValue, newSlotValue);
++ slotValue = newSlotValue;
++ writePosition += COMPACT_INDEX_ITEM_SIZE;
++ }
++
++ int length = writePosition - writeBeginPosition;
++ newBuffer.putInt(slotPosition, writeBeginPosition);
++ newBuffer.putInt(slotPosition + Integer.BYTES, length);
++
++ if (length > 0) {
++ log.trace("IndexStoreFile do compaction, write slot, slot: {}, begin: {}, length: {}", i, writeBeginPosition, length);
++ }
++ }
++
++ this.flushNewMetadata(newBuffer, true);
++ newBuffer.flip();
++ return newBuffer;
++ }
++
++ @Override
++ public void shutdown() {
++ try {
++ fileReadWriteLock.writeLock().lock();
++ this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
++ if (this.mappedFile != null) {
++ this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
++ this.mappedFile = null;
++ }
++ if (this.compactMappedFile != null) {
++ this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
++ this.compactMappedFile = null;
++ }
++ } catch (Exception e) {
++ log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
++ } finally {
++ fileReadWriteLock.writeLock().unlock();
++ }
++ }
++
++ @Override
++ public void destroy() {
++ try {
++ fileReadWriteLock.writeLock().lock();
++ this.shutdown();
++ switch (this.fileStatus.get()) {
++ case SHUTDOWN:
++ case UNSEALED:
++ case SEALED:
++ if (this.mappedFile != null) {
++ this.mappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
++ }
++ if (this.compactMappedFile != null) {
++ this.compactMappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
++ }
++ log.info("IndexStoreService destroy local file, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get());
++ break;
++ case UPLOAD:
++ log.warn("[BUG] IndexStoreService destroy remote file, timestamp: {}", this.getTimestamp());
++ }
++ } catch (Exception e) {
++ log.error("IndexStoreService destroy failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
++ } finally {
++ fileReadWriteLock.writeLock().unlock();
++ }
++ }
++}
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+new file mode 100644
+index 000000000..14608aa58
+--- /dev/null
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+@@ -0,0 +1,362 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.tieredstore.index;
++
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Stopwatch;
++import java.io.File;
++import java.nio.ByteBuffer;
++import java.nio.file.Paths;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ConcurrentNavigableMap;
++import java.util.concurrent.ConcurrentSkipListMap;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicLong;
++import java.util.concurrent.locks.ReadWriteLock;
++import java.util.concurrent.locks.ReentrantReadWriteLock;
++import org.apache.commons.lang3.StringUtils;
++import org.apache.rocketmq.common.ServiceThread;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.store.logfile.DefaultMappedFile;
++import org.apache.rocketmq.store.logfile.MappedFile;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
++import org.apache.rocketmq.tieredstore.file.TieredFlatFile;
++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
++
++public class IndexStoreService extends ServiceThread implements IndexService {
++
++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
++
++ public static final String FILE_DIRECTORY_NAME = "tiered_index_file";
++ public static final String FILE_COMPACTED_DIRECTORY_NAME = "compacting";
++
++ /**
++ * File status in table example:
++ * upload, upload, upload, sealed, sealed, unsealed
++ */
++ private final TieredMessageStoreConfig storeConfig;
++ private final ConcurrentSkipListMap timeStoreTable;
++ private final ReadWriteLock readWriteLock;
++ private final AtomicLong compactTimestamp;
++ private final String filePath;
++ private final TieredFileAllocator fileAllocator;
++
++ private IndexFile currentWriteFile;
++ private TieredFlatFile flatFile;
++
++ public IndexStoreService(TieredFileAllocator fileAllocator, String filePath) {
++ this.storeConfig = fileAllocator.getStoreConfig();
++ this.filePath = filePath;
++ this.fileAllocator = fileAllocator;
++ this.timeStoreTable = new ConcurrentSkipListMap<>();
++ this.compactTimestamp = new AtomicLong(0L);
++ this.readWriteLock = new ReentrantReadWriteLock();
++ this.recover();
++ }
++
++ private void doConvertOldFormatFile(String filePath) {
++ try {
++ File file = new File(filePath);
++ if (!file.exists()) {
++ return;
++ }
++ MappedFile mappedFile = new DefaultMappedFile(file.getPath(), (int) file.length());
++ long timestamp = mappedFile.getMappedByteBuffer().getLong(IndexStoreFile.INDEX_BEGIN_TIME_STAMP);
++ if (timestamp <= 0) {
++ mappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
++ } else {
++ mappedFile.renameTo(String.valueOf(new File(file.getParent(), String.valueOf(timestamp))));
++ mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
++ }
++ } catch (Exception e) {
++ log.error("IndexStoreService do convert old format error, file: {}", filePath, e);
++ }
++ }
++
++ private void recover() {
++ Stopwatch stopwatch = Stopwatch.createStarted();
++
++ // recover local
++ File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
++ this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString());
++ this.doConvertOldFormatFile(Paths.get(dir.getPath(), "1111").toString());
++ File[] files = dir.listFiles();
++
++ if (files != null) {
++ List fileList = Arrays.asList(files);
++ fileList.sort(Comparator.comparing(File::getName));
++
++ for (File file : fileList) {
++ if (file.isDirectory() || !StringUtils.isNumeric(file.getName())) {
++ continue;
++ }
++
++ try {
++ IndexFile indexFile = new IndexStoreFile(storeConfig, Long.parseLong(file.getName()));
++ timeStoreTable.put(indexFile.getTimestamp(), indexFile);
++ log.info("IndexStoreService recover load local file, timestamp: {}", indexFile.getTimestamp());
++ } catch (Exception e) {
++ log.error("IndexStoreService recover, load local file error", e);
++ }
++ }
++ }
++
++ if (this.timeStoreTable.isEmpty()) {
++ this.createNewIndexFile(System.currentTimeMillis());
++ }
++
++ this.currentWriteFile = this.timeStoreTable.lastEntry().getValue();
++ this.setCompactTimestamp(this.timeStoreTable.firstKey() - 1);
++
++ // recover remote
++ this.flatFile = fileAllocator.createFlatFileForIndexFile(filePath);
++ if (this.flatFile.getBaseOffset() == -1) {
++ this.flatFile.setBaseOffset(0);
++ }
++
++ for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
++ IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
++ timeStoreTable.put(indexFile.getTimestamp(), indexFile);
++ log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp());
++ }
++
++ log.info("IndexStoreService recover finished, entrySize: {}, cost: {}ms, directory: {}",
++ timeStoreTable.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS), dir.getAbsolutePath());
++ }
++
++ public void createNewIndexFile(long timestamp) {
++ try {
++ this.readWriteLock.writeLock().lock();
++ IndexFile indexFile = this.currentWriteFile;
++ if (this.timeStoreTable.containsKey(timestamp) ||
++ indexFile != null && IndexFile.IndexStatusEnum.UNSEALED.equals(indexFile.getFileStatus())) {
++ return;
++ }
++ IndexStoreFile newStoreFile = new IndexStoreFile(storeConfig, timestamp);
++ this.timeStoreTable.put(timestamp, newStoreFile);
++ this.currentWriteFile = newStoreFile;
++ log.info("IndexStoreService construct next file, timestamp: {}", timestamp);
++ } catch (Exception e) {
++ log.error("IndexStoreService construct next file, timestamp: {}", timestamp, e);
++ } finally {
++ this.readWriteLock.writeLock().unlock();
++ }
++ }
++
++ @VisibleForTesting
++ public ConcurrentSkipListMap getTimeStoreTable() {
++ return timeStoreTable;
++ }
++
++ @Override
++ public AppendResult putKey(
++ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp) {
++
++ if (StringUtils.isBlank(topic)) {
++ return AppendResult.UNKNOWN_ERROR;
++ }
++
++ if (keySet == null || keySet.isEmpty()) {
++ return AppendResult.SUCCESS;
++ }
++
++ for (int i = 0; i < 3; i++) {
++ AppendResult result = this.currentWriteFile.putKey(
++ topic, topicId, queueId, keySet, offset, size, timestamp);
++
++ if (AppendResult.SUCCESS.equals(result)) {
++ return AppendResult.SUCCESS;
++ } else if (AppendResult.FILE_FULL.equals(result)) {
++ this.createNewIndexFile(timestamp);
++ }
++ }
++
++ log.error("IndexStoreService put key three times return error, topic: {}, topicId: {}, " +
++ "queueId: {}, keySize: {}, timestamp: {}", topic, topicId, queueId, keySet.size(), timestamp);
++ return AppendResult.UNKNOWN_ERROR;
++ }
++
++ @Override
++ public CompletableFuture> queryAsync(
++ String topic, String key, int maxCount, long beginTime, long endTime) {
++
++ CompletableFuture> future = new CompletableFuture<>();
++ try {
++ readWriteLock.readLock().lock();
++ ConcurrentNavigableMap pendingMap =
++ this.timeStoreTable.subMap(beginTime, true, endTime, true);
++ List> futureList = new ArrayList<>(pendingMap.size());
++ ConcurrentHashMap result = new ConcurrentHashMap<>();
++
++ for (Map.Entry entry : pendingMap.descendingMap().entrySet()) {
++ CompletableFuture completableFuture = entry.getValue()
++ .queryAsync(topic, key, maxCount, beginTime, endTime)
++ .thenAccept(itemList -> itemList.forEach(indexItem -> {
++ if (result.size() < maxCount) {
++ result.put(String.format(
++ "%d-%d", indexItem.getQueueId(), indexItem.getOffset()), indexItem);
++ }
++ }));
++ futureList.add(completableFuture);
++ }
++
++ CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
++ .whenComplete((v, t) -> {
++ // Try to return the query results as much as possible here
++ // rather than directly throwing exceptions
++ if (result.isEmpty() && t != null) {
++ future.completeExceptionally(t);
++ } else {
++ List resultList = new ArrayList<>(result.values());
++ future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount)));
++ }
++ });
++ } catch (Exception e) {
++ future.completeExceptionally(e);
++ } finally {
++ readWriteLock.readLock().unlock();
++ }
++ return future;
++ }
++
++ public void doCompactThenUploadFile(IndexFile indexFile) {
++ if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
++ log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}",
++ indexFile.getTimestamp(), indexFile.getFileStatus());
++ return;
++ }
++
++ Stopwatch stopwatch = Stopwatch.createStarted();
++ ByteBuffer byteBuffer = indexFile.doCompaction();
++ if (byteBuffer == null) {
++ log.error("IndexStoreService found compaction buffer is null, timestamp: {}", indexFile.getTimestamp());
++ return;
++ }
++ flatFile.append(byteBuffer);
++ flatFile.commit(true);
++
++ TieredFileSegment fileSegment = flatFile.getFileByIndex(flatFile.getFileSegmentCount() - 1);
++ if (fileSegment == null || fileSegment.getMinTimestamp() != indexFile.getTimestamp()) {
++ log.warn("IndexStoreService submit compacted file to server failed, timestamp: {}", indexFile.getTimestamp());
++ return;
++ }
++
++ try {
++ readWriteLock.writeLock().lock();
++ IndexFile storeFile = new IndexStoreFile(storeConfig, fileSegment);
++ timeStoreTable.put(indexFile.getTimestamp(), storeFile);
++ indexFile.destroy();
++ } catch (Exception e) {
++ log.error("IndexStoreService switch file failed, timestamp: {}, cost: {}ms",
++ indexFile.getTimestamp(), stopwatch.elapsed(TimeUnit.MILLISECONDS), e);
++ } finally {
++ readWriteLock.writeLock().unlock();
++ }
++ }
++
++ public void destroyExpiredFile(long expireTimestamp) {
++ flatFile.cleanExpiredFile(expireTimestamp);
++ flatFile.destroyExpiredFile();
++ }
++
++ public void destroy() {
++ try {
++ readWriteLock.writeLock().lock();
++
++ // delete local store file
++ for (Map.Entry entry : timeStoreTable.entrySet()) {
++ IndexFile indexFile = entry.getValue();
++ if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
++ continue;
++ }
++ indexFile.destroy();
++ }
++
++ // delete remote
++ if (flatFile != null) {
++ flatFile.destroy();
++ }
++ } catch (Exception e) {
++ log.error("IndexStoreService destroy all file error", e);
++ } finally {
++ readWriteLock.writeLock().unlock();
++ }
++ }
++
++ @Override
++ public String getServiceName() {
++ return IndexStoreService.class.getSimpleName();
++ }
++
++ public void setCompactTimestamp(long timestamp) {
++ this.compactTimestamp.set(timestamp);
++ log.info("IndexStoreService compact timestamp has been set to: {}", timestamp);
++ }
++
++ protected IndexFile getNextSealedFile() {
++ try {
++ Map.Entry entry =
++ this.timeStoreTable.higherEntry(this.compactTimestamp.get());
++ if (entry != null && entry.getKey() < this.timeStoreTable.lastKey()) {
++ return entry.getValue();
++ }
++ } catch (Throwable e) {
++ log.error("Error occurred in " + getServiceName(), e);
++ }
++ return null;
++ }
++
++ @Override
++ public void run() {
++ log.info(this.getServiceName() + " service started");
++ while (!this.isStopped()) {
++ long expireTimestamp = System.currentTimeMillis()
++ - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
++ this.destroyExpiredFile(expireTimestamp);
++
++ IndexFile indexFile = this.getNextSealedFile();
++ if (indexFile == null) {
++ this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
++ continue;
++ }
++ this.doCompactThenUploadFile(indexFile);
++ this.setCompactTimestamp(indexFile.getTimestamp());
++ }
++ log.info(this.getServiceName() + " service shutdown");
++ }
++
++ @Override
++ public void shutdown() {
++ super.shutdown();
++ for (Map.Entry entry : timeStoreTable.entrySet()) {
++ entry.getValue().shutdown();
++ }
++ this.timeStoreTable.clear();
++ log.info("IndexStoreService shutdown gracefully");
++ }
++}
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+index 32911a6e8..aad42de98 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+@@ -31,12 +31,14 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
+ import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
+ import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
+ import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
+-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
+ import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+ import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
+ import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
++import static org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_BEGIN_TIME_STAMP;
++import static org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_END_TIME_STAMP;
++
+ public abstract class TieredFileSegment implements Comparable, TieredStoreProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+@@ -198,8 +200,9 @@ public abstract class TieredFileSegment implements Comparable
+ }
+
+ if (fileType == FileSegmentType.INDEX) {
+- minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
+- maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
++ minTimestamp = byteBuf.getLong(INDEX_BEGIN_TIME_STAMP);
++ maxTimestamp = byteBuf.getLong(INDEX_END_TIME_STAMP);
++
+ appendPosition += byteBuf.remaining();
+ // IndexFile is large and not change after compaction, no need deep copy
+ bufferList.add(byteBuf);
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+index 0db3eaf8f..b9938b7a8 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+@@ -59,7 +59,7 @@ public interface TieredStoreProvider {
+ * Get data from backend file system
+ *
+ * @param position the index from where the file will be read
+- * @param length the data size will be read
++ * @param length the data size will be read
+ * @return data to be read
+ */
+ CompletableFuture read0(long position, int length);
+@@ -68,10 +68,10 @@ public interface TieredStoreProvider {
+ * Put data to backend file system
+ *
+ * @param inputStream data stream
+- * @param position backend file position to put, used in append mode
+- * @param length data size in stream
+- * @param append try to append or create a new file
++ * @param position backend file position to put, used in append mode
++ * @param length data size in stream
++ * @param append try to append or create a new file
+ * @return put result, true
if data successfully write; false
otherwise
+ */
+- CompletableFuture commit0(FileSegmentInputStream inputStream,long position, int length, boolean append);
++ CompletableFuture commit0(FileSegmentInputStream inputStream, long position, int length, boolean append);
+ }
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+index 7e949cb28..ee56b1e68 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+@@ -159,6 +159,7 @@ public class PosixFileSegment extends TieredFileSegment {
+ readFileChannel.position(position);
+ readFileChannel.read(byteBuffer);
+ byteBuffer.flip();
++ byteBuffer.limit(length);
+
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+index 774c6cf64..4e0d7e697 100644
+--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+@@ -37,7 +37,6 @@ import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+ import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
+ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
+ import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
+-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
+ import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+ import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+@@ -83,6 +82,7 @@ public class TieredMessageFetcherTest {
+ Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, getMessageResult.getStatus());
+
+ CompositeFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq);
++ Assert.assertNotNull(flatFile);
+ flatFile.initOffset(0);
+
+ getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
+@@ -197,6 +197,7 @@ public class TieredMessageFetcherTest {
+ public void testGetMessageStoreTimeStampAsync() {
+ TieredMessageFetcher fetcher = new TieredMessageFetcher(storeConfig);
+ CompositeFlatFile flatFile = TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq);
++ Assert.assertNotNull(flatFile);
+ flatFile.initOffset(0);
+
+ ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
+@@ -270,6 +271,7 @@ public class TieredMessageFetcherTest {
+ CompositeQueueFlatFile flatFile = TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq);
+ Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
+
++ Assert.assertNotNull(flatFile);
+ flatFile.initOffset(0);
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+ buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
+@@ -281,20 +283,19 @@ public class TieredMessageFetcherTest {
+ buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2);
+ flatFile.appendCommitLog(buffer);
+
+- DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, null);
++ long timestamp = System.currentTimeMillis();
++ DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, timestamp, 0, "", "key", 0, 0, null);
+ flatFile.appendIndexFile(request);
+- request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, null);
++ request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 1, 0, "", "key", 0, 0, null);
+ flatFile.appendIndexFile(request);
+- request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "another-key", 0, 0, null);
++ request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 2, 0, "", "another-key", 0, 0, null);
+ flatFile.appendIndexFile(request);
+ flatFile.commit(true);
+- TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
+- indexFile.commit(true);
+ Assert.assertEquals(1, fetcher.queryMessageAsync(mq.getTopic(), "key", 1, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
+
+ QueryMessageResult result = fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join();
+ Assert.assertEquals(2, result.getMessageMapedList().size());
+- Assert.assertEquals(1, result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
+- Assert.assertEquals(0, result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
++ Assert.assertEquals(0, result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
++ Assert.assertEquals(1, result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
+ }
+ }
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+deleted file mode 100644
+index 2da72bc7a..000000000
+--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
++++ /dev/null
+@@ -1,93 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.rocketmq.tieredstore.file;
+-
+-import com.sun.jna.Platform;
+-import java.io.IOException;
+-import java.nio.ByteBuffer;
+-import java.time.Duration;
+-import java.util.List;
+-import org.apache.commons.lang3.tuple.Pair;
+-import org.apache.rocketmq.common.message.MessageQueue;
+-import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
+-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+-import org.awaitility.Awaitility;
+-import org.junit.After;
+-import org.junit.Assert;
+-import org.junit.Before;
+-import org.junit.Test;
+-
+-public class TieredIndexFileTest {
+-
+- private final String storePath = TieredStoreTestUtil.getRandomStorePath();
+- private MessageQueue mq;
+- private TieredMessageStoreConfig storeConfig;
+-
+- @Before
+- public void setUp() {
+- storeConfig = new TieredMessageStoreConfig();
+- storeConfig.setBrokerName("IndexFileBroker");
+- storeConfig.setStorePathRootDir(storePath);
+- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+- storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+- storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+- mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1);
+- TieredStoreUtil.getMetadataStore(storeConfig);
+- TieredStoreExecutor.init();
+- }
+-
+- @After
+- public void tearDown() throws IOException {
+- TieredStoreTestUtil.destroyMetadataStore();
+- TieredStoreTestUtil.destroyTempDir(storePath);
+- TieredStoreExecutor.shutdown();
+- }
+-
+- @Test
+- public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException {
+- if (Platform.isWindows()) {
+- return;
+- }
+-
+- TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig);
+- TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath);
+-
+- indexFile.append(mq, 0, "key3", 3, 300, 1000);
+- indexFile.append(mq, 0, "key2", 2, 200, 1100);
+- indexFile.append(mq, 0, "key1", 1, 100, 1200);
+-
+- // do not do schedule task here
+- TieredStoreExecutor.shutdown();
+- List> indexList =
+- indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
+- Assert.assertEquals(0, indexList.size());
+-
+- // do compaction once
+- TieredStoreExecutor.init();
+- storeConfig.setTieredStoreIndexFileRollingIdleInterval(0);
+- indexFile.doScheduleTask();
+- Awaitility.await().atMost(Duration.ofSeconds(10))
+- .until(() -> !indexFile.getPreMappedFile().getFile().exists());
+-
+- indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
+- Assert.assertEquals(1, indexList.size());
+-
+- indexFile.destroy();
+- }
+-}
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
+new file mode 100644
+index 000000000..22ed4cc18
+--- /dev/null
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
+@@ -0,0 +1,91 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.tieredstore.index;
++
++import java.nio.ByteBuffer;
++import org.junit.Assert;
++import org.junit.Test;
++
++public class IndexItemTest {
++
++ private final int topicId = 1;
++ private final int queueId = 2;
++ private final long offset = 3L;
++ private final int size = 4;
++ private final int hashCode = 5;
++ private final int timeDiff = 6;
++ private final int itemIndex = 7;
++
++ @Test
++ public void indexItemConstructorTest() {
++ IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, hashCode, timeDiff, itemIndex);
++
++ Assert.assertEquals(topicId, indexItem.getTopicId());
++ Assert.assertEquals(queueId, indexItem.getQueueId());
++ Assert.assertEquals(offset, indexItem.getOffset());
++ Assert.assertEquals(size, indexItem.getSize());
++ Assert.assertEquals(hashCode, indexItem.getHashCode());
++ Assert.assertEquals(timeDiff, indexItem.getTimeDiff());
++ Assert.assertEquals(itemIndex, indexItem.getItemIndex());
++ }
++
++ @Test
++ public void byteBufferConstructorTest() {
++ ByteBuffer byteBuffer = ByteBuffer.allocate(IndexItem.INDEX_ITEM_SIZE);
++ byteBuffer.putInt(hashCode);
++ byteBuffer.putInt(topicId);
++ byteBuffer.putInt(queueId);
++ byteBuffer.putLong(offset);
++ byteBuffer.putInt(size);
++ byteBuffer.putInt(timeDiff);
++ byteBuffer.putInt(itemIndex);
++
++ byte[] bytes = byteBuffer.array();
++ IndexItem indexItem = new IndexItem(bytes);
++
++ Assert.assertEquals(topicId, indexItem.getTopicId());
++ Assert.assertEquals(queueId, indexItem.getQueueId());
++ Assert.assertEquals(offset, indexItem.getOffset());
++ Assert.assertEquals(size, indexItem.getSize());
++ Assert.assertEquals(hashCode, indexItem.getHashCode());
++ Assert.assertEquals(timeDiff, indexItem.getTimeDiff());
++ Assert.assertEquals(itemIndex, indexItem.getItemIndex());
++ Assert.assertNotNull(indexItem.toString());
++
++ Exception exception = null;
++ try {
++ new IndexItem(null);
++ } catch (Exception e) {
++ exception = e;
++ }
++ Assert.assertNotNull(exception);
++ }
++
++ @Test
++ public void getByteBufferTest() {
++ IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, hashCode, timeDiff, itemIndex);
++ ByteBuffer byteBuffer = indexItem.getByteBuffer();
++ Assert.assertEquals(hashCode, byteBuffer.getInt(0));
++ Assert.assertEquals(topicId, byteBuffer.getInt(4));
++ Assert.assertEquals(queueId, byteBuffer.getInt(8));
++ Assert.assertEquals(offset, byteBuffer.getLong(12));
++ Assert.assertEquals(size, byteBuffer.getInt(20));
++ Assert.assertEquals(timeDiff, byteBuffer.getInt(24));
++ Assert.assertEquals(itemIndex, byteBuffer.getInt(28));
++ }
++}
+\ No newline at end of file
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+new file mode 100644
+index 000000000..b408a7c3c
+--- /dev/null
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+@@ -0,0 +1,282 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.tieredstore.index;
++
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.nio.file.Paths;
++import java.util.Collections;
++import java.util.List;
++import java.util.Set;
++import java.util.UUID;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import org.apache.rocketmq.common.ThreadFactoryImpl;
++import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.common.FileSegmentType;
++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
++import org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++
++public class IndexStoreFileTest {
++
++ private static final String TOPIC_NAME = "TopicTest";
++ private static final int TOPIC_ID = 123;
++ private static final int QUEUE_ID = 2;
++ private static final long MESSAGE_OFFSET = 666L;
++ private static final int MESSAGE_SIZE = 1024;
++ private static final String KEY = "MessageKey";
++ private static final Set KEY_SET = Collections.singleton(KEY);
++
++ private String filePath;
++ private TieredMessageStoreConfig storeConfig;
++ private IndexStoreFile indexStoreFile;
++
++ @Before
++ public void init() throws IOException {
++ TieredStoreExecutor.init();
++ filePath = UUID.randomUUID().toString().replace("-", "").substring(0, 8);
++ String directory = Paths.get(System.getProperty("user.home"), "store_test", filePath).toString();
++ storeConfig = new TieredMessageStoreConfig();
++ storeConfig.setStorePathRootDir(directory);
++ storeConfig.setTieredStoreFilePath(directory);
++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
++ storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
++ indexStoreFile = new IndexStoreFile(storeConfig, System.currentTimeMillis());
++ }
++
++ @After
++ public void shutdown() {
++ if (this.indexStoreFile != null) {
++ this.indexStoreFile.shutdown();
++ this.indexStoreFile.destroy();
++ }
++ TieredStoreTestUtil.destroyMetadataStore();
++ TieredStoreTestUtil.destroyTempDir(storeConfig.getStorePathRootDir());
++ TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath());
++ TieredStoreExecutor.shutdown();
++ }
++
++ @Test
++ public void testIndexHeaderConstants() {
++ Assert.assertEquals(0, IndexStoreFile.INDEX_MAGIC_CODE);
++ Assert.assertEquals(4, IndexStoreFile.INDEX_BEGIN_TIME_STAMP);
++ Assert.assertEquals(12, IndexStoreFile.INDEX_END_TIME_STAMP);
++ Assert.assertEquals(20, IndexStoreFile.INDEX_SLOT_COUNT);
++ Assert.assertEquals(24, IndexStoreFile.INDEX_ITEM_INDEX);
++ Assert.assertEquals(28, IndexStoreFile.INDEX_HEADER_SIZE);
++ Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 4, IndexStoreFile.BEGIN_MAGIC_CODE);
++ Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 8, IndexStoreFile.END_MAGIC_CODE);
++ }
++
++ @Test
++ public void basicMethodTest() throws IOException {
++ long timestamp = System.currentTimeMillis();
++ IndexStoreFile localFile = new IndexStoreFile(storeConfig, timestamp);
++ Assert.assertEquals(timestamp, localFile.getTimestamp());
++
++ // test file status
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, localFile.getFileStatus());
++ localFile.doCompaction();
++ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, localFile.getFileStatus());
++
++ // test hash
++ Assert.assertEquals("TopicTest#MessageKey", localFile.buildKey(TOPIC_NAME, KEY));
++ Assert.assertEquals(638347386, indexStoreFile.hashCode(localFile.buildKey(TOPIC_NAME, KEY)));
++
++ // test calculate position
++ long headerSize = IndexStoreFile.INDEX_HEADER_SIZE;
++ Assert.assertEquals(headerSize + Long.BYTES * 2, indexStoreFile.getSlotPosition(2));
++ Assert.assertEquals(headerSize + Long.BYTES * 5, indexStoreFile.getSlotPosition(5));
++ Assert.assertEquals(headerSize + Long.BYTES * 5 + IndexItem.INDEX_ITEM_SIZE * 2,
++ indexStoreFile.getItemPosition(2));
++ Assert.assertEquals(headerSize + Long.BYTES * 5 + IndexItem.INDEX_ITEM_SIZE * 5,
++ indexStoreFile.getItemPosition(5));
++ }
++
++ @Test
++ public void basicPutGetTest() {
++ long timestamp = indexStoreFile.getTimestamp();
++
++ // check metadata
++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
++ Assert.assertEquals(0, indexStoreFile.getEndTimestamp());
++ Assert.assertEquals(0, indexStoreFile.getIndexItemCount());
++ Assert.assertEquals(0, indexStoreFile.getHashSlotCount());
++
++ // not put success
++ Assert.assertEquals(AppendResult.UNKNOWN_ERROR, indexStoreFile.putKey(
++ null, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, null, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.emptySet(), MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++
++ // first item is invalid
++ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum() - 2; i++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
++ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
++ Assert.assertEquals(1, indexStoreFile.getHashSlotCount());
++ Assert.assertEquals(i + 1, indexStoreFile.getIndexItemCount());
++ }
++
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++
++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
++ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
++ Assert.assertEquals(1, indexStoreFile.getHashSlotCount());
++ Assert.assertEquals(storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1, indexStoreFile.getIndexItemCount());
++ }
++
++ @Test
++ public void differentKeyPutTest() {
++ long timestamp = indexStoreFile.getTimestamp();
++ for (int i = 0; i < 5; i++) {
++ for (int j = 0; j < 3; j++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME + i, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ }
++ }
++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
++ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
++ Assert.assertEquals(5, indexStoreFile.getHashSlotCount());
++ Assert.assertEquals(5 * 3, indexStoreFile.getIndexItemCount());
++ }
++
++ @Test
++ public void concurrentPutTest() throws InterruptedException {
++ long timestamp = indexStoreFile.getTimestamp();
++
++ ExecutorService executorService = Executors.newFixedThreadPool(
++ 4, new ThreadFactoryImpl("ConcurrentPutGetTest"));
++
++ // first item is invalid
++ int indexCount = storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1;
++ CountDownLatch latch = new CountDownLatch(indexCount);
++ for (int i = 0; i < indexCount; i++) {
++ executorService.submit(() -> {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ try {
++ Thread.sleep(100);
++ } catch (InterruptedException ignored) {
++ }
++ latch.countDown();
++ });
++ }
++ latch.await();
++
++ executorService.shutdown();
++ Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
++ }
++
++ @Test
++ public void recoverFileTest() throws IOException {
++ int indexCount = 10;
++ long timestamp = indexStoreFile.getTimestamp();
++ for (int i = 0; i < indexCount; i++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ }
++ indexStoreFile.shutdown();
++ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
++ indexStoreFile = new IndexStoreFile(storeConfig, timestamp);
++ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
++ }
++
++ @Test
++ public void doCompactionTest() throws Exception {
++ long timestamp = indexStoreFile.getTimestamp();
++ for (int i = 0; i < 10; i++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ }
++
++ ByteBuffer byteBuffer = indexStoreFile.doCompaction();
++ TieredFileSegment fileSegment = new PosixFileSegment(
++ storeConfig, FileSegmentType.INDEX, filePath, 0L);
++ fileSegment.append(byteBuffer, timestamp);
++ fileSegment.commit();
++ Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
++ fileSegment.destroyFile();
++ }
++
++ @Test
++ public void queryAsyncFromUnsealedFileTest() throws Exception {
++ long timestamp = indexStoreFile.getTimestamp();
++ for (int i = 0; i < 5; i++) {
++ for (int j = 0; j < 3; j++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(TOPIC_NAME + i,
++ TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, System.currentTimeMillis()));
++ }
++ }
++ List itemList = indexStoreFile.queryAsync(
++ TOPIC_NAME + "1", KEY, 64, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(3, itemList.size());
++ }
++
++ @Test
++ public void queryAsyncFromSegmentFileTest() throws ExecutionException, InterruptedException {
++ long timestamp = indexStoreFile.getTimestamp();
++ for (int i = 0; i < 5; i++) {
++ for (int j = 0; j < 3; j++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(TOPIC_NAME + i,
++ TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, System.currentTimeMillis()));
++ }
++ }
++
++ ByteBuffer byteBuffer = indexStoreFile.doCompaction();
++ TieredFileSegment fileSegment = new PosixFileSegment(
++ storeConfig, FileSegmentType.INDEX, filePath, 0L);
++ fileSegment.append(byteBuffer, timestamp);
++ fileSegment.commit();
++ Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
++ indexStoreFile.destroy();
++
++ indexStoreFile = new IndexStoreFile(storeConfig, fileSegment);
++
++ // change topic
++ List itemList = indexStoreFile.queryAsync(
++ TOPIC_NAME, KEY, 64, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(0, itemList.size());
++
++ // change key
++ itemList = indexStoreFile.queryAsync(
++ TOPIC_NAME, KEY + "1", 64, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(0, itemList.size());
++
++ itemList = indexStoreFile.queryAsync(
++ TOPIC_NAME + "1", KEY, 64, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(3, itemList.size());
++ }
++}
+\ No newline at end of file
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
+new file mode 100644
+index 000000000..57d00eefe
+--- /dev/null
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
+@@ -0,0 +1,147 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.tieredstore.index;
++
++import com.google.common.base.Stopwatch;
++import java.io.File;
++import java.io.IOException;
++import java.nio.file.Paths;
++import java.util.Collections;
++import java.util.List;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.LongAdder;
++import org.apache.rocketmq.common.UtilAll;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
++import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
++import org.junit.Assert;
++import org.junit.Ignore;
++import org.openjdk.jmh.annotations.BenchmarkMode;
++import org.openjdk.jmh.annotations.Fork;
++import org.openjdk.jmh.annotations.Measurement;
++import org.openjdk.jmh.annotations.Mode;
++import org.openjdk.jmh.annotations.OutputTimeUnit;
++import org.openjdk.jmh.annotations.Scope;
++import org.openjdk.jmh.annotations.Setup;
++import org.openjdk.jmh.annotations.State;
++import org.openjdk.jmh.annotations.TearDown;
++import org.openjdk.jmh.annotations.Threads;
++import org.openjdk.jmh.annotations.Warmup;
++import org.openjdk.jmh.results.format.ResultFormatType;
++import org.openjdk.jmh.runner.Runner;
++import org.openjdk.jmh.runner.options.Options;
++import org.openjdk.jmh.runner.options.OptionsBuilder;
++
++@Ignore
++@State(Scope.Benchmark)
++@Fork(value = 1, jvmArgs = {"-Djava.net.preferIPv4Stack=true", "-Djmh.rmi.port=1099"})
++public class IndexStoreServiceBenchTest {
++
++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
++ private static final String TOPIC_NAME = "TopicTest";
++ private TieredMessageStoreConfig storeConfig;
++ private IndexStoreService indexStoreService;
++ private final LongAdder failureCount = new LongAdder();
++
++ @Setup
++ public void init() throws ClassNotFoundException, NoSuchMethodException {
++ String storePath = Paths.get(System.getProperty("user.home"), "store_test", "index").toString();
++ UtilAll.deleteFile(new File(storePath));
++ UtilAll.deleteFile(new File("./e96d41b2_IndexService"));
++ storeConfig = new TieredMessageStoreConfig();
++ storeConfig.setBrokerClusterName("IndexService");
++ storeConfig.setBrokerName("IndexServiceBroker");
++ storeConfig.setStorePathRootDir(storePath);
++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500 * 1000);
++ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000 * 1000);
++ TieredStoreUtil.getMetadataStore(storeConfig);
++ TieredStoreExecutor.init();
++ TieredFileAllocator tieredFileAllocator = new TieredFileAllocator(storeConfig);
++ indexStoreService = new IndexStoreService(tieredFileAllocator, storePath);
++ indexStoreService.start();
++ }
++
++ @TearDown
++ public void shutdown() throws IOException {
++ indexStoreService.shutdown();
++ indexStoreService.destroy();
++ TieredStoreExecutor.shutdown();
++ }
++
++ //@Benchmark
++ @Threads(2)
++ @BenchmarkMode(Mode.Throughput)
++ @OutputTimeUnit(TimeUnit.SECONDS)
++ @Warmup(iterations = 1, time = 1)
++ @Measurement(iterations = 1, time = 1)
++ public void doPutThroughputBenchmark() {
++ for (int i = 0; i < 100; i++) {
++ AppendResult result = indexStoreService.putKey(
++ TOPIC_NAME, 123, 2, Collections.singleton(String.valueOf(i)),
++ i * 100L, i * 100, System.currentTimeMillis());
++ if (AppendResult.SUCCESS.equals(result)) {
++ failureCount.increment();
++ }
++ }
++ }
++
++ @Threads(1)
++ @BenchmarkMode(Mode.AverageTime)
++ @OutputTimeUnit(TimeUnit.SECONDS)
++ @Warmup(iterations = 0)
++ @Measurement(iterations = 1, time = 1)
++ public void doGetThroughputBenchmark() throws ExecutionException, InterruptedException {
++ for (int j = 0; j < 10; j++) {
++ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
++ indexStoreService.putKey(
++ "TopicTest", 123, j, Collections.singleton(String.valueOf(i)),
++ i * 100L, i * 100, System.currentTimeMillis());
++ }
++ }
++
++ int queryCount = 100 * 10000;
++ Stopwatch stopwatch = Stopwatch.createStarted();
++ for (int i = 0; i < queryCount; i++) {
++ List indexItems = indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i),
++ 20, 0, System.currentTimeMillis()).get();
++ Assert.assertEquals(10, indexItems.size());
++
++ List indexItems2 = indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i),
++ 5, 0, System.currentTimeMillis()).get();
++ Assert.assertEquals(5, indexItems2.size());
++ }
++ log.info("DoGetThroughputBenchmark test cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
++ }
++
++ public static void main(String[] args) throws Exception {
++ Options opt = new OptionsBuilder()
++ .include(IndexStoreServiceBenchTest.class.getSimpleName())
++ .warmupIterations(0)
++ .measurementIterations(1)
++ .result("result.json")
++ .resultFormat(ResultFormatType.JSON)
++ .build();
++ new Runner(opt).run();
++ }
++}
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+new file mode 100644
+index 000000000..20b4acbfa
+--- /dev/null
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+@@ -0,0 +1,313 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.tieredstore.index;
++
++import java.io.File;
++import java.io.IOException;
++import java.nio.file.Paths;
++import java.time.Duration;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.List;
++import java.util.Set;
++import java.util.UUID;
++import java.util.concurrent.ConcurrentSkipListMap;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
++import org.apache.rocketmq.common.ThreadFactoryImpl;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.store.logfile.DefaultMappedFile;
++import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
++import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++
++import static org.awaitility.Awaitility.await;
++
++public class IndexStoreServiceTest {
++
++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
++
++ private static final String TOPIC_NAME = "TopicTest";
++ private static final int TOPIC_ID = 123;
++ private static final int QUEUE_ID = 2;
++ private static final long MESSAGE_OFFSET = 666;
++ private static final int MESSAGE_SIZE = 1024;
++ private static final Set KEY_SET = Collections.singleton("MessageKey");
++
++ private String filePath;
++ private TieredMessageStoreConfig storeConfig;
++ private TieredFileAllocator fileAllocator;
++ private IndexStoreService indexService;
++
++ @Before
++ public void init() throws IOException, ClassNotFoundException, NoSuchMethodException {
++ TieredStoreExecutor.init();
++ filePath = UUID.randomUUID().toString().replace("-", "").substring(0, 8);
++ String directory = Paths.get(System.getProperty("user.home"), "store_test", filePath).toString();
++ storeConfig = new TieredMessageStoreConfig();
++ storeConfig.setStorePathRootDir(directory);
++ storeConfig.setTieredStoreFilePath(directory);
++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
++ storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
++ fileAllocator = new TieredFileAllocator(storeConfig);
++ }
++
++ @After
++ public void shutdown() {
++ if (indexService != null) {
++ indexService.shutdown();
++ indexService.destroy();
++ }
++ TieredStoreTestUtil.destroyMetadataStore();
++ TieredStoreTestUtil.destroyTempDir(storeConfig.getStorePathRootDir());
++ TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath());
++ TieredStoreExecutor.shutdown();
++ }
++
++ @Test
++ public void basicServiceTest() throws InterruptedException {
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ for (int i = 0; i < 50; i++) {
++ Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, MESSAGE_SIZE, System.currentTimeMillis()));
++ TimeUnit.MILLISECONDS.sleep(1);
++ }
++ ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable();
++ Assert.assertEquals(3, timeStoreTable.size());
++ }
++
++ @Test
++ public void doConvertOldFormatTest() throws IOException {
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ long timestamp = indexService.getTimeStoreTable().firstKey();
++ Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
++ indexService.shutdown();
++
++ File file = new File(Paths.get(filePath, IndexStoreService.FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString());
++ DefaultMappedFile mappedFile = new DefaultMappedFile(file.getName(), (int) file.length());
++ mappedFile.renameTo(String.valueOf(new File(file.getParent(), "0000")));
++ mappedFile.shutdown(10 * 1000);
++
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable();
++ Assert.assertEquals(1, timeStoreTable.size());
++ Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey());
++ mappedFile.destroy(10 * 1000);
++ }
++
++ @Test
++ public void concurrentPutTest() throws InterruptedException {
++ ExecutorService executorService = Executors.newFixedThreadPool(
++ 4, new ThreadFactoryImpl("ConcurrentPutTest"));
++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500);
++ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ long timestamp = System.currentTimeMillis();
++
++ // first item is invalid
++ AtomicInteger success = new AtomicInteger();
++ int indexCount = 5000;
++ CountDownLatch latch = new CountDownLatch(indexCount);
++ for (int i = 0; i < indexCount; i++) {
++ final int index = i;
++ executorService.submit(() -> {
++ try {
++ AppendResult result = indexService.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(index)),
++ index * 100, MESSAGE_SIZE, timestamp + index);
++ if (AppendResult.SUCCESS.equals(result)) {
++ success.incrementAndGet();
++ }
++ } catch (Exception e) {
++ log.error("ConcurrentPutTest error", e);
++ } finally {
++ latch.countDown();
++ }
++ });
++ }
++ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
++ Assert.assertEquals(3, indexService.getTimeStoreTable().size());
++ executorService.shutdown();
++ }
++
++ @Test
++ public void doCompactionTest() throws InterruptedException {
++ concurrentPutTest();
++ IndexFile indexFile = indexService.getNextSealedFile();
++ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, indexFile.getFileStatus());
++
++ indexService.doCompactThenUploadFile(indexFile);
++ indexService.setCompactTimestamp(indexFile.getTimestamp());
++ indexFile.destroy();
++
++ List files = new ArrayList<>(indexService.getTimeStoreTable().values());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, files.get(1).getFileStatus());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus());
++
++ indexFile = indexService.getNextSealedFile();
++ indexService.doCompactThenUploadFile(indexFile);
++ indexService.setCompactTimestamp(indexFile.getTimestamp());
++ files = new ArrayList<>(indexService.getTimeStoreTable().values());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(1).getFileStatus());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus());
++
++ indexFile = indexService.getNextSealedFile();
++ Assert.assertNull(indexFile);
++ files = new ArrayList<>(indexService.getTimeStoreTable().values());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(1).getFileStatus());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus());
++ }
++
++ @Test
++ public void runServiceTest() throws InterruptedException {
++ concurrentPutTest();
++ indexService.start();
++ await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(() -> {
++ boolean result = true;
++ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values());
++ result &= IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus());
++ result &= IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(1).getFileStatus());
++ result &= IndexFile.IndexStatusEnum.UNSEALED.equals(files.get(2).getFileStatus());
++ return result;
++ });
++ }
++
++ @Test
++ public void restartServiceTest() throws InterruptedException {
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ for (int i = 0; i < 20; i++) {
++ AppendResult result = indexService.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)),
++ i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
++ Assert.assertEquals(AppendResult.SUCCESS, result);
++ TimeUnit.MILLISECONDS.sleep(1);
++ }
++ long timestamp = indexService.getTimeStoreTable().firstKey();
++ indexService.shutdown();
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
++
++ indexService.start();
++ await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(() -> {
++ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values());
++ return IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus());
++ });
++ indexService.shutdown();
++
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
++ Assert.assertEquals(2, indexService.getTimeStoreTable().size());
++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
++ indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
++ }
++
++ @Test
++ public void queryFromFileTest() throws InterruptedException, ExecutionException {
++ long timestamp = System.currentTimeMillis();
++ indexService = new IndexStoreService(fileAllocator, filePath);
++
++ // three files, echo contains 19 items
++ for (int i = 0; i < 3; i++) {
++ for (int j = 0; j < 20 - 1; j++) {
++ AppendResult result = indexService.putKey(
++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(j)),
++ i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis());
++ Assert.assertEquals(AppendResult.SUCCESS, result);
++ TimeUnit.MILLISECONDS.sleep(1);
++ }
++ }
++
++ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values());
++ Assert.assertEquals(3, files.size());
++
++ for (int i = 0; i < 3; i++) {
++ List indexItems = indexService.queryAsync(
++ TOPIC_NAME, String.valueOf(1), 1, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(1, indexItems.size());
++
++ indexItems = indexService.queryAsync(
++ TOPIC_NAME, String.valueOf(1), 3, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(3, indexItems.size());
++
++ indexItems = indexService.queryAsync(
++ TOPIC_NAME, String.valueOf(1), 5, timestamp, System.currentTimeMillis()).get();
++ Assert.assertEquals(3, indexItems.size());
++ }
++ }
++
++ @Test
++ public void concurrentGetTest() throws InterruptedException {
++ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
++ indexService = new IndexStoreService(fileAllocator, filePath);
++ indexService.start();
++
++ int fileCount = 10;
++ for (int j = 0; j < fileCount; j++) {
++ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
++ indexService.putKey(TOPIC_NAME, TOPIC_ID, j, Collections.singleton(String.valueOf(i)),
++ i * 100L, i * 100, System.currentTimeMillis());
++ }
++ TimeUnit.MILLISECONDS.sleep(1);
++ }
++
++ CountDownLatch latch = new CountDownLatch(fileCount * 3);
++ AtomicBoolean result = new AtomicBoolean(true);
++ ExecutorService executorService = Executors.newFixedThreadPool(
++ 4, new ThreadFactoryImpl("ConcurrentGetTest"));
++
++ for (int i = 0; i < fileCount; i++) {
++ int finalI = i;
++ executorService.submit(() -> {
++ for (int j = 1; j <= 3; j++) {
++ try {
++ List indexItems = indexService.queryAsync(
++ TOPIC_NAME, String.valueOf(finalI), j * 5, 0, System.currentTimeMillis()).get();
++ if (Math.min(fileCount, j * 5) != indexItems.size()) {
++ result.set(false);
++ }
++ } catch (Exception e) {
++ result.set(false);
++ } finally {
++ latch.countDown();
++ }
++ }
++ });
++ }
++
++ Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
++ executorService.shutdown();
++ Assert.assertTrue(result.get());
++ }
++}
+\ No newline at end of file
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+index a413f2113..68277cacc 100644
+--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+@@ -135,7 +135,6 @@ public class MessageBufferUtilTest {
+ Assert.assertEquals("uservalue0", properties.get("userkey"));
+ }
+
+-
+ @Test
+ public void testGetTotalSize() {
+ ByteBuffer buffer = buildMockedMessageBuffer();
+diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml
+index a7933b5ef..ac0895e05 100644
+--- a/tieredstore/src/test/resources/rmq.logback-test.xml
++++ b/tieredstore/src/test/resources/rmq.logback-test.xml
+@@ -19,11 +19,22 @@
+
+
+- %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
++ ${CONSOLE_LOG_PATTERN}
+
+
+
++
++
+
+-
++
+
++
++
++
++
++
++
++
++
+
+--
+2.32.0.windows.2
+
diff --git a/rocketmq.spec b/rocketmq.spec
index 743ffa7adb51c30db5260e0800d88d47b4a9e77e..a2e4839bbeba70bce623526b3ea1d9160f6fc718 100644
--- a/rocketmq.spec
+++ b/rocketmq.spec
@@ -5,7 +5,7 @@
Summary: Cloud-Native, Distributed Messaging and Streaming
Name: rocketmq
Version: 5.1.5
-Release: 36
+Release: 37
License: Apache-2.0
Group: Applications/Message
URL: https://rocketmq.apache.org/
@@ -45,6 +45,7 @@ Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch
Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch
Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch
Patch0035: patch035-backport-fix-some-bugs.patch
+Patch0036: patch036-backport-RIP65.patch
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
Requires: java-1.8.0-openjdk-devel
@@ -85,6 +86,9 @@ exit 0
%changelog
+* Mon Dec 11 2023 ShiZhili - 5.1.3-37
+- backport rip 65
+
* Mon Dec 11 2023 ShiZhili - 5.1.3-36
- backport fix some bugs