diff --git a/patch036-backport-RIP65.patch b/patch036-backport-RIP65.patch new file mode 100644 index 0000000000000000000000000000000000000000..d0ed8baa1f6d4bfb4d855357444748fb50cbebea --- /dev/null +++ b/patch036-backport-RIP65.patch @@ -0,0 +1,3173 @@ +From 63130f51e84bda2547c3aa442f14184ccefb9180 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Tue, 21 Nov 2023 13:57:44 +0800 +Subject: [PATCH] [ISSUE #7545] [RIP-65] Support efficient random index for + massive messages (#7546) + +Support efficient random index for massive messages + +Co-authored-by: bareheadtom <1983697019@qq.com> +--- + style/spotbugs-suppressions.xml | 2 +- + tieredstore/pom.xml | 14 + + .../tieredstore/TieredMessageFetcher.java | 103 ++-- + .../file/CompositeQueueFlatFile.java | 29 +- + .../tieredstore/file/TieredConsumeQueue.java | 2 +- + .../tieredstore/file/TieredFlatFile.java | 5 +- + .../file/TieredFlatFileManager.java | 40 +- + .../tieredstore/file/TieredIndexFile.java | 470 ----------------- + .../rocketmq/tieredstore/index/IndexFile.java | 35 ++ + .../rocketmq/tieredstore/index/IndexItem.java | 114 ++++ + .../tieredstore/index/IndexService.java | 62 +++ + .../tieredstore/index/IndexStoreFile.java | 499 ++++++++++++++++++ + .../tieredstore/index/IndexStoreService.java | 362 +++++++++++++ + .../provider/TieredFileSegment.java | 9 +- + .../provider/TieredStoreProvider.java | 10 +- + .../provider/posix/PosixFileSegment.java | 1 + + .../tieredstore/TieredMessageFetcherTest.java | 17 +- + .../tieredstore/file/TieredIndexFileTest.java | 93 ---- + .../tieredstore/index/IndexItemTest.java | 91 ++++ + .../tieredstore/index/IndexStoreFileTest.java | 282 ++++++++++ + .../index/IndexStoreServiceBenchTest.java | 147 ++++++ + .../index/IndexStoreServiceTest.java | 313 +++++++++++ + .../util/MessageBufferUtilTest.java | 1 - + .../src/test/resources/rmq.logback-test.xml | 15 +- + 24 files changed, 2019 insertions(+), 697 deletions(-) + delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java + create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java + delete mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java + +diff --git a/style/spotbugs-suppressions.xml b/style/spotbugs-suppressions.xml +index 5778695e1..6443e029f 100644 +--- a/style/spotbugs-suppressions.xml ++++ b/style/spotbugs-suppressions.xml +@@ -31,7 +31,7 @@ + + + +- ++ + + + +diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml +index b2ea40bf3..9f2a8bf22 100644 +--- a/tieredstore/pom.xml ++++ b/tieredstore/pom.xml +@@ -53,5 +53,19 @@ + commons-io + test + ++ ++ ++ org.openjdk.jmh ++ jmh-core ++ 1.36 ++ provided ++ ++ ++ ++ org.openjdk.jmh ++ jmh-generator-annprocess ++ 1.36 ++ provided ++ + + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +index c948fa3fa..f739773eb 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; + import java.util.concurrent.TimeUnit; + import javax.annotation.Nullable; + import org.apache.commons.lang3.tuple.Pair; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -50,7 +51,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; + import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; + import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; + import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; +-import org.apache.rocketmq.tieredstore.file.TieredIndexFile; ++import org.apache.rocketmq.tieredstore.index.IndexItem; ++import org.apache.rocketmq.tieredstore.index.IndexService; + import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; + import org.apache.rocketmq.tieredstore.metadata.TopicMetadata; + import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant; +@@ -58,7 +60,6 @@ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; + import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +-import org.apache.rocketmq.common.BoundaryType; + + public class TieredMessageFetcher implements MessageStoreFetcher { + +@@ -555,85 +556,51 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + public CompletableFuture queryMessageAsync( + String topic, String key, int maxCount, long begin, long end) { + +- TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig); ++ IndexService indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig); + +- int hashCode = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key)); + long topicId; + try { + TopicMetadata topicMetadata = metadataStore.getTopic(topic); + if (topicMetadata == null) { +- LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic); ++ LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic); + return CompletableFuture.completedFuture(new QueryMessageResult()); + } + topicId = topicMetadata.getTopicId(); + } catch (Exception e) { +- LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e); ++ LOGGER.error("MessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e); + return CompletableFuture.completedFuture(new QueryMessageResult()); + } + +- return indexFile.queryAsync(topic, key, begin, end) +- .thenCompose(indexBufferList -> { +- QueryMessageResult result = new QueryMessageResult(); +- int resultCount = 0; +- List> futureList = new ArrayList<>(maxCount); +- for (Pair pair : indexBufferList) { +- Long fileBeginTimestamp = pair.getKey(); +- ByteBuffer indexBuffer = pair.getValue(); +- +- if (indexBuffer.remaining() % TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) { +- LOGGER.error("[Bug] TieredMessageFetcher#queryMessageAsync: " + +- "index buffer size {} is not multiple of index item size {}", +- indexBuffer.remaining(), TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE); +- continue; +- } +- +- for (int indexOffset = indexBuffer.position(); +- indexOffset < indexBuffer.limit(); +- indexOffset += TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) { +- +- int indexItemHashCode = indexBuffer.getInt(indexOffset); +- if (indexItemHashCode != hashCode) { +- continue; +- } +- +- int indexItemTopicId = indexBuffer.getInt(indexOffset + 4); +- if (indexItemTopicId != topicId) { +- continue; +- } +- +- int queueId = indexBuffer.getInt(indexOffset + 4 + 4); +- CompositeFlatFile flatFile = +- flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId)); +- if (flatFile == null) { +- continue; +- } +- +- // decode index item +- long offset = indexBuffer.getLong(indexOffset + 4 + 4 + 4); +- int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8); +- int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4); +- long indexTimestamp = fileBeginTimestamp + timeDiff; +- if (indexTimestamp < begin || indexTimestamp > end) { +- continue; +- } ++ CompletableFuture> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end); + +- CompletableFuture getMessageFuture = flatFile.getCommitLogAsync(offset, size) +- .thenAccept(messageBuffer -> result.addMessage( +- new SelectMappedBufferResult(0, messageBuffer, size, null))); +- futureList.add(getMessageFuture); +- +- resultCount++; +- if (resultCount >= maxCount) { +- break; +- } +- } +- +- if (resultCount >= maxCount) { +- break; +- } ++ return future.thenCompose(indexItemList -> { ++ QueryMessageResult result = new QueryMessageResult(); ++ List> futureList = new ArrayList<>(maxCount); ++ for (IndexItem indexItem : indexItemList) { ++ if (topicId != indexItem.getTopicId()) { ++ continue; + } +- return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])) +- .thenApply(v -> result); +- }); ++ CompositeFlatFile flatFile = ++ flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, indexItem.getQueueId())); ++ if (flatFile == null) { ++ continue; ++ } ++ CompletableFuture getMessageFuture = flatFile ++ .getCommitLogAsync(indexItem.getOffset(), indexItem.getSize()) ++ .thenAccept(messageBuffer -> result.addMessage( ++ new SelectMappedBufferResult( ++ indexItem.getOffset(), messageBuffer, indexItem.getSize(), null))); ++ futureList.add(getMessageFuture); ++ if (futureList.size() >= maxCount) { ++ break; ++ } ++ } ++ return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result); ++ }).whenComplete((result, throwable) -> { ++ if (result != null) { ++ LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}", ++ result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end); ++ } ++ }); + } + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +index 0a797f465..67d2cf064 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +@@ -17,11 +17,15 @@ + + package org.apache.rocketmq.tieredstore.file; + ++import java.util.Arrays; ++import java.util.HashSet; ++import java.util.Set; + import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.store.DispatchRequest; + import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.index.IndexService; + import org.apache.rocketmq.tieredstore.metadata.QueueMetadata; + import org.apache.rocketmq.tieredstore.metadata.TopicMetadata; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +@@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + private final MessageQueue messageQueue; + private long topicSequenceNumber; + private QueueMetadata queueMetadata; +- private final TieredIndexFile indexFile; ++ private final IndexService indexStoreService; + + public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) { + super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue)); + this.messageQueue = messageQueue; + this.recoverQueueMetadata(); +- this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig); ++ this.indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig); + } + + @Override +@@ -85,24 +89,15 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + return AppendResult.FILE_CLOSED; + } + ++ Set keySet = new HashSet<>( ++ Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR))); + if (StringUtils.isNotBlank(request.getUniqKey())) { +- AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber, +- request.getUniqKey(), request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp()); +- if (result != AppendResult.SUCCESS) { +- return result; +- } ++ keySet.add(request.getUniqKey()); + } + +- for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) { +- if (StringUtils.isNotBlank(key)) { +- AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber, +- key, request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp()); +- if (result != AppendResult.SUCCESS) { +- return result; +- } +- } +- } +- return AppendResult.SUCCESS; ++ return indexStoreService.putKey( ++ messageQueue.getTopic(), (int) topicSequenceNumber, messageQueue.getQueueId(), keySet, ++ request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp()); + } + + public MessageQueue getMessageQueue() { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java +index 35007f8cb..6953db032 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java +@@ -20,9 +20,9 @@ import com.google.common.annotations.VisibleForTesting; + import java.nio.ByteBuffer; + import java.util.concurrent.CompletableFuture; + import org.apache.commons.lang3.tuple.Pair; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.AppendResult; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +-import org.apache.rocketmq.common.BoundaryType; + + public class TieredConsumeQueue { + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +index d96eb6e8f..a41d562d1 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +@@ -141,7 +141,6 @@ public class TieredFlatFile { + return fileType; + } + +- @VisibleForTesting + public List getFileSegmentList() { + return fileSegmentList; + } +@@ -274,7 +273,7 @@ public class TieredFlatFile { + } + + @Nullable +- protected TieredFileSegment getFileByIndex(int index) { ++ public TieredFileSegment getFileByIndex(int index) { + fileSegmentLock.readLock().lock(); + try { + if (index < fileSegmentList.size()) { +@@ -354,7 +353,7 @@ public class TieredFlatFile { + } + } + +- protected List getFileListByTime(long beginTime, long endTime) { ++ public List getFileListByTime(long beginTime, long endTime) { + fileSegmentLock.readLock().lock(); + try { + return fileSegmentList.stream() +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +index 087ea8c9c..ffe0836f1 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +@@ -34,6 +34,8 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; ++import org.apache.rocketmq.tieredstore.index.IndexService; ++import org.apache.rocketmq.tieredstore.index.IndexStoreService; + import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + +@@ -43,7 +45,7 @@ public class TieredFlatFileManager { + private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + + private static volatile TieredFlatFileManager instance; +- private static volatile TieredIndexFile indexFile; ++ private static volatile IndexStoreService indexStoreService; + + private final TieredMetadataStore metadataStore; + private final TieredMessageStoreConfig storeConfig; +@@ -76,25 +78,26 @@ public class TieredFlatFileManager { + return instance; + } + +- public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) { ++ public static IndexService getTieredIndexService(TieredMessageStoreConfig storeConfig) { + if (storeConfig == null) { +- return indexFile; ++ return indexStoreService; + } + +- if (indexFile == null) { ++ if (indexStoreService == null) { + synchronized (TieredFlatFileManager.class) { +- if (indexFile == null) { ++ if (indexStoreService == null) { + try { + String filePath = TieredStoreUtil.toPath(new MessageQueue( + TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0)); +- indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath); ++ indexStoreService = new IndexStoreService(new TieredFileAllocator(storeConfig), filePath); ++ indexStoreService.start(); + } catch (Exception e) { + logger.error("Construct FlatFileManager indexFile error", e); + } + } + } + } +- return indexFile; ++ return indexStoreService; + } + + public void doCommit() { +@@ -120,15 +123,6 @@ public class TieredFlatFileManager { + } + }, delay, TimeUnit.MILLISECONDS); + } +- TieredStoreExecutor.commitExecutor.schedule(() -> { +- try { +- if (indexFile != null) { +- indexFile.commit(true); +- } +- } catch (Throwable e) { +- logger.error("Commit indexFile periodically failed", e); +- } +- }, 0, TimeUnit.MILLISECONDS); + } + + public void doCleanExpiredFile() { +@@ -148,10 +142,6 @@ public class TieredFlatFileManager { + } + }); + } +- if (indexFile != null) { +- indexFile.cleanExpiredFile(expiredTimeStamp); +- indexFile.destroyExpiredFile(); +- } + } + + private void doScheduleTask() { +@@ -244,7 +234,7 @@ public class TieredFlatFileManager { + + private static void cleanStaticReference() { + instance = null; +- indexFile = null; ++ indexStoreService = null; + } + + @Nullable +@@ -271,8 +261,8 @@ public class TieredFlatFileManager { + } + + public void shutdown() { +- if (indexFile != null) { +- indexFile.commit(true); ++ if (indexStoreService != null) { ++ indexStoreService.shutdown(); + } + for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) { + flatFile.shutdown(); +@@ -280,8 +270,8 @@ public class TieredFlatFileManager { + } + + public void destroy() { +- if (indexFile != null) { +- indexFile.destroy(); ++ if (indexStoreService != null) { ++ indexStoreService.destroy(); + } + ImmutableList flatFileList = deepCopyFlatFileToList(); + cleanup(); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java +deleted file mode 100644 +index eda5e0106..000000000 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java ++++ /dev/null +@@ -1,470 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.rocketmq.tieredstore.file; +- +-import com.google.common.annotations.VisibleForTesting; +-import java.io.File; +-import java.io.IOException; +-import java.nio.ByteBuffer; +-import java.nio.MappedByteBuffer; +-import java.nio.file.Paths; +-import java.util.ArrayList; +-import java.util.List; +-import java.util.concurrent.CompletableFuture; +-import java.util.concurrent.Future; +-import java.util.concurrent.TimeUnit; +-import java.util.concurrent.locks.ReentrantLock; +-import org.apache.commons.lang3.tuple.Pair; +-import org.apache.rocketmq.common.message.MessageQueue; +-import org.apache.rocketmq.logging.org.slf4j.Logger; +-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.store.index.IndexHeader; +-import org.apache.rocketmq.store.logfile.DefaultMappedFile; +-import org.apache.rocketmq.store.logfile.MappedFile; +-import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; +-import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +- +-public class TieredIndexFile { +- +- private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); +- +- // header format: +- // magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + index num(4) +- public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0; +- public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4; +- public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12; +- public static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20; +- public static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24; +- public static final int INDEX_FILE_HEADER_SIZE = 28; +- +- // index item +- public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4; +- public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8; +- public static final int INDEX_FILE_HASH_SLOT_SIZE = 8; +- public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32; +- public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28; +- +- private static final String INDEX_FILE_DIR_NAME = "tiered_index_file"; +- private static final String CUR_INDEX_FILE_NAME = "0000"; +- private static final String PRE_INDEX_FILE_NAME = "1111"; +- private static final String COMPACT_FILE_NAME = "2222"; +- +- private final TieredMessageStoreConfig storeConfig; +- private final TieredFlatFile flatFile; +- private final int maxHashSlotNum; +- private final int maxIndexNum; +- private final int fileMaxSize; +- private final String curFilePath; +- private final String preFilepath; +- private MappedFile preMappedFile; +- private MappedFile curMappedFile; +- +- private final ReentrantLock curFileLock = new ReentrantLock(); +- private Future inflightCompactFuture = CompletableFuture.completedFuture(null); +- +- protected TieredIndexFile(TieredFileAllocator fileQueueFactory, String filePath) throws IOException { +- this.storeConfig = fileQueueFactory.getStoreConfig(); +- this.flatFile = fileQueueFactory.createFlatFileForIndexFile(filePath); +- if (flatFile.getBaseOffset() == -1) { +- flatFile.setBaseOffset(0); +- } +- this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum(); +- this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum(); +- this.fileMaxSize = IndexHeader.INDEX_HEADER_SIZE +- + this.maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE +- + this.maxIndexNum * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE +- + 4; +- this.curFilePath = Paths.get( +- storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, CUR_INDEX_FILE_NAME).toString(); +- this.preFilepath = Paths.get( +- storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, PRE_INDEX_FILE_NAME).toString(); +- initFile(); +- TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay( +- this::doScheduleTask, 10, 10, TimeUnit.SECONDS); +- } +- +- protected void doScheduleTask() { +- try { +- curFileLock.lock(); +- try { +- synchronized (TieredIndexFile.class) { +- MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer(); +- int indexNum = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION); +- long lastIndexTime = mappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION); +- if (indexNum > 0 && +- System.currentTimeMillis() - lastIndexTime > +- storeConfig.getTieredStoreIndexFileRollingIdleInterval()) { +- mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE); +- rollingFile(); +- } +- if (inflightCompactFuture.isDone() && preMappedFile != null && preMappedFile.isAvailable()) { +- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit( +- new CompactTask(storeConfig, preMappedFile, flatFile), null); +- } +- } +- } finally { +- curFileLock.unlock(); +- } +- } catch (Throwable throwable) { +- logger.error("TieredIndexFile: submit compact index file task failed:", throwable); +- } +- } +- +- private static boolean isFileSealed(MappedFile mappedFile) { +- return mappedFile.getMappedByteBuffer().getInt(mappedFile.getFileSize() - 4) == INDEX_FILE_END_MAGIC_CODE; +- } +- +- private void initIndexFileHeader(MappedFile mappedFile) { +- MappedByteBuffer mappedByteBuffer = mappedFile.getMappedByteBuffer(); +- if (mappedByteBuffer.getInt(0) != INDEX_FILE_BEGIN_MAGIC_CODE) { +- mappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE); +- mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, -1L); +- mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, -1L); +- mappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 0); +- mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 0); +- for (int i = 0; i < maxHashSlotNum; i++) { +- mappedByteBuffer.putInt(INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE, -1); +- } +- mappedByteBuffer.putInt(fileMaxSize - 4, -1); +- } +- } +- +- @VisibleForTesting +- public MappedFile getPreMappedFile() { +- return preMappedFile; +- } +- +- private void initFile() throws IOException { +- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); +- initIndexFileHeader(curMappedFile); +- File preFile = new File(preFilepath); +- boolean preFileExists = preFile.exists(); +- if (preFileExists) { +- preMappedFile = new DefaultMappedFile(preFilepath, fileMaxSize); +- } +- +- if (isFileSealed(curMappedFile)) { +- if (preFileExists) { +- if (preFile.delete()) { +- logger.info("Pre IndexFile deleted success", preFilepath); +- } else { +- logger.error("Pre IndexFile deleted failed", preFilepath); +- } +- } +- boolean rename = curMappedFile.renameTo(preFilepath); +- if (rename) { +- preMappedFile = curMappedFile; +- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); +- initIndexFileHeader(curMappedFile); +- preFileExists = true; +- } +- } +- +- if (preFileExists) { +- synchronized (TieredIndexFile.class) { +- if (inflightCompactFuture.isDone()) { +- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit( +- new CompactTask(storeConfig, preMappedFile, flatFile), null); +- } +- } +- } +- } +- +- public AppendResult append(MessageQueue mq, int topicId, String key, long offset, int size, long timeStamp) { +- return putKey(mq, topicId, indexKeyHashMethod(buildKey(mq.getTopic(), key)), offset, size, timeStamp); +- } +- +- private boolean rollingFile() throws IOException { +- File preFile = new File(preFilepath); +- boolean preFileExists = preFile.exists(); +- if (!preFileExists) { +- boolean rename = curMappedFile.renameTo(preFilepath); +- if (rename) { +- preMappedFile = curMappedFile; +- curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); +- initIndexFileHeader(curMappedFile); +- tryToCompactPreFile(); +- return true; +- } else { +- logger.error("TieredIndexFile#rollingFile: rename current file failed"); +- return false; +- } +- } +- tryToCompactPreFile(); +- return false; +- } +- +- private void tryToCompactPreFile() throws IOException { +- synchronized (TieredIndexFile.class) { +- if (inflightCompactFuture.isDone()) { +- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null); +- } +- } +- } +- +- private AppendResult putKey(MessageQueue mq, int topicId, int hashCode, long offset, int size, long timeStamp) { +- curFileLock.lock(); +- try { +- if (isFileSealed(curMappedFile) && !rollingFile()) { +- return AppendResult.FILE_FULL; +- } +- +- MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer(); +- +- int slotPosition = hashCode % maxHashSlotNum; +- int slotOffset = INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE; +- +- int slotValue = mappedByteBuffer.getInt(slotOffset); +- +- long beginTimeStamp = mappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION); +- if (beginTimeStamp == -1) { +- mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, timeStamp); +- beginTimeStamp = timeStamp; +- } +- +- int indexCount = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION); +- int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE +- + indexCount * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE; +- +- int timeDiff = (int) (timeStamp - beginTimeStamp); +- +- // put hash index +- mappedByteBuffer.putInt(indexOffset, hashCode); +- mappedByteBuffer.putInt(indexOffset + 4, topicId); +- mappedByteBuffer.putInt(indexOffset + 4 + 4, mq.getQueueId()); +- mappedByteBuffer.putLong(indexOffset + 4 + 4 + 4, offset); +- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8, size); +- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4, timeDiff); +- mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4, slotValue); +- +- // put hash slot +- mappedByteBuffer.putInt(slotOffset, indexCount); +- +- // put header +- indexCount += 1; +- mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, indexCount); +- mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, timeStamp); +- if (indexCount == maxIndexNum) { +- mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE); +- rollingFile(); +- } +- return AppendResult.SUCCESS; +- } catch (Exception e) { +- logger.error("TieredIndexFile#putKey: put key failed:", e); +- return AppendResult.IO_ERROR; +- } finally { +- curFileLock.unlock(); +- } +- } +- +- public CompletableFuture>> queryAsync(String topic, String key, long beginTime, +- long endTime) { +- int hashCode = indexKeyHashMethod(buildKey(topic, key)); +- int slotPosition = hashCode % maxHashSlotNum; +- List fileSegmentList = flatFile.getFileListByTime(beginTime, endTime); +- CompletableFuture>> future = null; +- for (int i = fileSegmentList.size() - 1; i >= 0; i--) { +- TieredFileSegment fileSegment = fileSegmentList.get(i); +- CompletableFuture tmpFuture = fileSegment.readAsync(INDEX_FILE_HEADER_SIZE + (long) slotPosition * INDEX_FILE_HASH_SLOT_SIZE, INDEX_FILE_HASH_SLOT_SIZE) +- .thenCompose(slotBuffer -> { +- int indexPosition = slotBuffer.getInt(); +- if (indexPosition == -1) { +- return CompletableFuture.completedFuture(null); +- } +- +- int indexSize = slotBuffer.getInt(); +- if (indexSize <= 0) { +- return CompletableFuture.completedFuture(null); +- } +- return fileSegment.readAsync(indexPosition, indexSize); +- }); +- if (future == null) { +- future = tmpFuture.thenApply(indexBuffer -> { +- List> result = new ArrayList<>(); +- if (indexBuffer != null) { +- result.add(Pair.of(fileSegment.getMinTimestamp(), indexBuffer)); +- } +- return result; +- }); +- } else { +- future = future.thenCombine(tmpFuture, (indexList, indexBuffer) -> { +- if (indexBuffer != null) { +- indexList.add(Pair.of(fileSegment.getMinTimestamp(), indexBuffer)); +- } +- return indexList; +- }); +- } +- } +- return future == null ? CompletableFuture.completedFuture(new ArrayList<>()) : future; +- } +- +- public static String buildKey(String topic, String key) { +- return topic + "#" + key; +- } +- +- public static int indexKeyHashMethod(String key) { +- int keyHash = key.hashCode(); +- int keyHashPositive = Math.abs(keyHash); +- if (keyHashPositive < 0) +- keyHashPositive = 0; +- return keyHashPositive; +- } +- +- public void commit(boolean sync) { +- flatFile.commit(sync); +- if (sync) { +- try { +- inflightCompactFuture.get(); +- } catch (Exception ignore) { +- } +- } +- } +- +- public void cleanExpiredFile(long expireTimestamp) { +- flatFile.cleanExpiredFile(expireTimestamp); +- } +- +- public void destroyExpiredFile() { +- flatFile.destroyExpiredFile(); +- } +- +- public void destroy() { +- inflightCompactFuture.cancel(true); +- if (preMappedFile != null) { +- preMappedFile.destroy(-1); +- } +- if (curMappedFile != null) { +- curMappedFile.destroy(-1); +- } +- String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME; +- File compactFile = new File(compactFilePath); +- if (compactFile.exists()) { +- compactFile.delete(); +- } +- flatFile.destroy(); +- } +- +- static class CompactTask implements Runnable { +- private final TieredMessageStoreConfig storeConfig; +- +- private final int maxHashSlotNum; +- private final int maxIndexNum; +- private final int fileMaxSize; +- private MappedFile originFile; +- private TieredFlatFile fileQueue; +- private MappedFile compactFile; +- +- public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile, +- TieredFlatFile fileQueue) throws IOException { +- this.storeConfig = storeConfig; +- this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum(); +- this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum(); +- this.originFile = originFile; +- this.fileQueue = fileQueue; +- String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME; +- fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (storeConfig.getTieredStoreIndexFileMaxHashSlotNum() * INDEX_FILE_HASH_SLOT_SIZE) + (storeConfig.getTieredStoreIndexFileMaxIndexNum() * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4; +- // TODO check magic code, upload immediately when compact complete +- File compactFile = new File(compactFilePath); +- if (compactFile.exists()) { +- compactFile.delete(); +- } +- this.compactFile = new DefaultMappedFile(compactFilePath, fileMaxSize); +- } +- +- @Override +- public void run() { +- try { +- compact(); +- } catch (Throwable throwable) { +- logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable); +- } +- +- try { +- if (originFile != null) { +- originFile.destroy(-1); +- } +- if (compactFile != null) { +- compactFile.destroy(-1); +- } +- } catch (Throwable throwable) { +- logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable); +- } +- } +- +- public void compact() { +- if (!isFileSealed(originFile)) { +- logger.error("[Bug]TieredIndexFile#CompactTask#compact: try to compact unsealed file"); +- originFile.destroy(-1); +- compactFile.destroy(-1); +- return; +- } +- +- buildCompactFile(); +- fileQueue.append(compactFile.getMappedByteBuffer()); +- fileQueue.commit(true); +- compactFile.destroy(-1); +- originFile.destroy(-1); +- compactFile = null; +- originFile = null; +- } +- +- private void buildCompactFile() { +- MappedByteBuffer originMappedByteBuffer = originFile.getMappedByteBuffer(); +- MappedByteBuffer compactMappedByteBuffer = compactFile.getMappedByteBuffer(); +- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE); +- compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION)); +- compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION)); +- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, maxHashSlotNum); +- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, originMappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION)); +- +- int rePutSlotValue = INDEX_FILE_HEADER_SIZE + (maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE); +- for (int i = 0; i < maxHashSlotNum; i++) { +- int slotOffset = INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE; +- int slotValue = originMappedByteBuffer.getInt(slotOffset); +- if (slotValue != -1) { +- int indexTotalSize = 0; +- int indexPosition = slotValue; +- +- while (indexPosition >= 0 && indexPosition < maxIndexNum) { +- int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE +- + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE; +- int rePutIndexOffset = rePutSlotValue + indexTotalSize; +- +- compactMappedByteBuffer.putInt(rePutIndexOffset, originMappedByteBuffer.getInt(indexOffset)); +- compactMappedByteBuffer.putInt(rePutIndexOffset + 4, originMappedByteBuffer.getInt(indexOffset + 4)); +- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4)); +- compactMappedByteBuffer.putLong(rePutIndexOffset + 4 + 4 + 4, originMappedByteBuffer.getLong(indexOffset + 4 + 4 + 4)); +- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8)); +- compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4)); +- +- indexTotalSize += INDEX_FILE_HASH_COMPACT_INDEX_SIZE; +- indexPosition = originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4); +- } +- compactMappedByteBuffer.putInt(slotOffset, rePutSlotValue); +- compactMappedByteBuffer.putInt(slotOffset + 4, indexTotalSize); +- rePutSlotValue += indexTotalSize; +- } +- } +- compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_END_MAGIC_CODE); +- compactMappedByteBuffer.putInt(rePutSlotValue, INDEX_FILE_BEGIN_MAGIC_CODE); +- compactMappedByteBuffer.limit(rePutSlotValue + 4); +- } +- } +-} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java +new file mode 100644 +index 000000000..d131b9b53 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java +@@ -0,0 +1,35 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.index; ++ ++import java.nio.ByteBuffer; ++ ++public interface IndexFile extends IndexService { ++ ++ /** ++ * Enumeration for the status of the index file. ++ */ ++ enum IndexStatusEnum { ++ SHUTDOWN, UNSEALED, SEALED, UPLOAD ++ } ++ ++ long getTimestamp(); ++ ++ IndexStatusEnum getFileStatus(); ++ ++ ByteBuffer doCompaction(); ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java +new file mode 100644 +index 000000000..24ccc4322 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java +@@ -0,0 +1,114 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.index; ++ ++import java.nio.ByteBuffer; ++ ++public class IndexItem { ++ ++ public static final int INDEX_ITEM_SIZE = 32; ++ public static final int COMPACT_INDEX_ITEM_SIZE = 28; ++ ++ private final int hashCode; ++ private final int topicId; ++ private final int queueId; ++ private final long offset; ++ private final int size; ++ private final int timeDiff; ++ private final int itemIndex; ++ ++ public IndexItem(int topicId, int queueId, long offset, int size, int hashCode, int timeDiff, int itemIndex) { ++ this.hashCode = hashCode; ++ this.topicId = topicId; ++ this.queueId = queueId; ++ this.offset = offset; ++ this.size = size; ++ this.timeDiff = timeDiff; ++ this.itemIndex = itemIndex; ++ } ++ ++ public IndexItem(byte[] bytes) { ++ if (bytes == null || ++ bytes.length != INDEX_ITEM_SIZE && ++ bytes.length != COMPACT_INDEX_ITEM_SIZE) { ++ throw new IllegalArgumentException("Byte array length not correct"); ++ } ++ ++ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); ++ hashCode = byteBuffer.getInt(0); ++ topicId = byteBuffer.getInt(4); ++ queueId = byteBuffer.getInt(8); ++ offset = byteBuffer.getLong(12); ++ size = byteBuffer.getInt(20); ++ timeDiff = byteBuffer.getInt(24); ++ itemIndex = bytes.length == INDEX_ITEM_SIZE ? byteBuffer.getInt(28) : 0; ++ } ++ ++ public ByteBuffer getByteBuffer() { ++ ByteBuffer byteBuffer = ByteBuffer.allocate(32); ++ byteBuffer.putInt(0, hashCode); ++ byteBuffer.putInt(4, topicId); ++ byteBuffer.putInt(8, queueId); ++ byteBuffer.putLong(12, offset); ++ byteBuffer.putInt(20, size); ++ byteBuffer.putInt(24, timeDiff); ++ byteBuffer.putInt(28, itemIndex); ++ return byteBuffer; ++ } ++ ++ public int getHashCode() { ++ return hashCode; ++ } ++ ++ public int getTopicId() { ++ return topicId; ++ } ++ ++ public int getQueueId() { ++ return queueId; ++ } ++ ++ public long getOffset() { ++ return offset; ++ } ++ ++ public int getSize() { ++ return size; ++ } ++ ++ public int getTimeDiff() { ++ return timeDiff; ++ } ++ ++ public int getItemIndex() { ++ return itemIndex; ++ } ++ ++ @Override ++ public String toString() { ++ return "IndexItem{" + ++ "hashCode=" + hashCode + ++ ", topicId=" + topicId + ++ ", queueId=" + queueId + ++ ", offset=" + offset + ++ ", size=" + size + ++ ", timeDiff=" + timeDiff + ++ ", position=" + itemIndex + ++ '}'; ++ } ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java +new file mode 100644 +index 000000000..d4eb854a2 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java +@@ -0,0 +1,62 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.index; ++ ++import java.util.List; ++import java.util.Set; ++import java.util.concurrent.CompletableFuture; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++ ++public interface IndexService { ++ ++ /** ++ * Puts a key into the index. ++ * ++ * @param topic The topic of the key. ++ * @param topicId The ID of the topic. ++ * @param queueId The ID of the queue. ++ * @param keySet The set of keys to be indexed. ++ * @param offset The offset value of the key. ++ * @param size The size of the key. ++ * @param timestamp The timestamp of the key. ++ * @return The result of the put operation. ++ */ ++ AppendResult putKey( ++ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp); ++ ++ /** ++ * Asynchronously queries the index for a specific key within a given time range. ++ * ++ * @param topic The topic of the key. ++ * @param key The key to be queried. ++ * @param beginTime The start time of the query range. ++ * @param endTime The end time of the query range. ++ * @return A CompletableFuture that holds the list of IndexItems matching the query. ++ */ ++ CompletableFuture> queryAsync(String topic, String key, int maxCount, long beginTime, long endTime); ++ ++ /** ++ * Shutdown the index service. ++ */ ++ void shutdown(); ++ ++ /** ++ * Destroys the index service and releases all resources. ++ */ ++ void destroy(); ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +new file mode 100644 +index 000000000..52a686f68 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +@@ -0,0 +1,499 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.index; ++ ++import com.google.common.base.Stopwatch; ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.nio.MappedByteBuffer; ++import java.nio.file.Paths; ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.List; ++import java.util.Optional; ++import java.util.Set; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicInteger; ++import java.util.concurrent.atomic.AtomicLong; ++import java.util.concurrent.atomic.AtomicReference; ++import java.util.concurrent.locks.ReadWriteLock; ++import java.util.concurrent.locks.ReentrantReadWriteLock; ++import java.util.stream.Collectors; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.store.logfile.DefaultMappedFile; ++import org.apache.rocketmq.store.logfile.MappedFile; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; ++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; ++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++ ++import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.SEALED; ++import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UNSEALED; ++import static org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UPLOAD; ++import static org.apache.rocketmq.tieredstore.index.IndexItem.COMPACT_INDEX_ITEM_SIZE; ++import static org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_COMPACTED_DIRECTORY_NAME; ++import static org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_DIRECTORY_NAME; ++ ++/** ++ * a single IndexFile in indexService ++ */ ++public class IndexStoreFile implements IndexFile { ++ ++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); ++ ++ /** ++ * header format: ++ * magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + index num(4) ++ */ ++ public static final int INDEX_MAGIC_CODE = 0; ++ public static final int INDEX_BEGIN_TIME_STAMP = 4; ++ public static final int INDEX_END_TIME_STAMP = 12; ++ public static final int INDEX_SLOT_COUNT = 20; ++ public static final int INDEX_ITEM_INDEX = 24; ++ public static final int INDEX_HEADER_SIZE = 28; ++ ++ public static final int BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4; ++ public static final int END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8; ++ ++ /** ++ * hash slot ++ */ ++ private static final int INVALID_INDEX = 0; ++ private static final int HASH_SLOT_SIZE = Long.BYTES; ++ private static final int MAX_QUERY_COUNT = 512; ++ ++ private final int hashSlotMaxCount; ++ private final int indexItemMaxCount; ++ ++ private final ReadWriteLock fileReadWriteLock; ++ private final AtomicReference fileStatus; ++ private final AtomicLong beginTimestamp = new AtomicLong(-1L); ++ private final AtomicLong endTimestamp = new AtomicLong(-1L); ++ private final AtomicInteger hashSlotCount = new AtomicInteger(0); ++ private final AtomicInteger indexItemCount = new AtomicInteger(0); ++ ++ private MappedFile mappedFile; ++ private ByteBuffer byteBuffer; ++ private MappedFile compactMappedFile; ++ private TieredFileSegment fileSegment; ++ ++ public IndexStoreFile(TieredMessageStoreConfig storeConfig, long timestamp) throws IOException { ++ this.hashSlotMaxCount = storeConfig.getTieredStoreIndexFileMaxHashSlotNum(); ++ this.indexItemMaxCount = storeConfig.getTieredStoreIndexFileMaxIndexNum(); ++ this.fileStatus = new AtomicReference<>(UNSEALED); ++ this.fileReadWriteLock = new ReentrantReadWriteLock(); ++ this.mappedFile = new DefaultMappedFile( ++ Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString(), ++ this.getItemPosition(indexItemMaxCount)); ++ this.byteBuffer = this.mappedFile.getMappedByteBuffer(); ++ ++ this.beginTimestamp.set(timestamp); ++ this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP)); ++ this.hashSlotCount.set(byteBuffer.getInt(INDEX_SLOT_COUNT)); ++ this.indexItemCount.set(byteBuffer.getInt(INDEX_ITEM_INDEX)); ++ this.flushNewMetadata(byteBuffer, indexItemMaxCount == this.indexItemCount.get() + 1); ++ } ++ ++ public IndexStoreFile(TieredMessageStoreConfig storeConfig, TieredFileSegment fileSegment) { ++ this.fileSegment = fileSegment; ++ this.fileStatus = new AtomicReference<>(UPLOAD); ++ this.fileReadWriteLock = new ReentrantReadWriteLock(); ++ ++ this.beginTimestamp.set(fileSegment.getMinTimestamp()); ++ this.endTimestamp.set(fileSegment.getMaxTimestamp()); ++ this.hashSlotCount.set(storeConfig.getTieredStoreIndexFileMaxHashSlotNum()); ++ this.indexItemCount.set(storeConfig.getTieredStoreIndexFileMaxIndexNum()); ++ this.hashSlotMaxCount = hashSlotCount.get(); ++ this.indexItemMaxCount = indexItemCount.get(); ++ } ++ ++ @Override ++ public long getTimestamp() { ++ return this.beginTimestamp.get(); ++ } ++ ++ public long getEndTimestamp() { ++ return this.endTimestamp.get(); ++ } ++ ++ public long getHashSlotCount() { ++ return this.hashSlotCount.get(); ++ } ++ ++ public long getIndexItemCount() { ++ return this.indexItemCount.get(); ++ } ++ ++ @Override ++ public IndexStatusEnum getFileStatus() { ++ return this.fileStatus.get(); ++ } ++ ++ protected String buildKey(String topic, String key) { ++ return String.format("%s#%s", topic, key); ++ } ++ ++ protected int hashCode(String keyStr) { ++ int keyHash = keyStr.hashCode(); ++ return (keyHash < 0) ? -keyHash : keyHash; ++ } ++ ++ protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) { ++ byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE); ++ byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get()); ++ byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get()); ++ byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get()); ++ byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get()); ++ } ++ ++ protected int getSlotPosition(int slotIndex) { ++ return INDEX_HEADER_SIZE + slotIndex * HASH_SLOT_SIZE; ++ } ++ ++ protected int getSlotValue(int slotPosition) { ++ return Math.max(this.byteBuffer.getInt(slotPosition), INVALID_INDEX); ++ } ++ ++ protected int getItemPosition(int itemIndex) { ++ return INDEX_HEADER_SIZE + hashSlotMaxCount * HASH_SLOT_SIZE + itemIndex * IndexItem.INDEX_ITEM_SIZE; ++ } ++ ++ @Override ++ public AppendResult putKey( ++ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp) { ++ ++ if (StringUtils.isBlank(topic)) { ++ return AppendResult.UNKNOWN_ERROR; ++ } ++ ++ if (keySet == null || keySet.isEmpty()) { ++ return AppendResult.SUCCESS; ++ } ++ ++ try { ++ fileReadWriteLock.writeLock().lock(); ++ ++ if (!UNSEALED.equals(fileStatus.get())) { ++ return AppendResult.FILE_FULL; ++ } ++ ++ if (this.indexItemCount.get() + keySet.size() >= this.indexItemMaxCount) { ++ this.fileStatus.set(IndexStatusEnum.SEALED); ++ return AppendResult.FILE_FULL; ++ } ++ ++ for (String key : keySet) { ++ int hashCode = this.hashCode(this.buildKey(topic, key)); ++ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); ++ int slotOldValue = this.getSlotValue(slotPosition); ++ int timeDiff = (int) ((timestamp - this.beginTimestamp.get()) / 1000L); ++ ++ IndexItem indexItem = new IndexItem( ++ topicId, queueId, offset, size, hashCode, timeDiff, slotOldValue); ++ int itemIndex = this.indexItemCount.incrementAndGet(); ++ this.byteBuffer.position(this.getItemPosition(itemIndex)); ++ this.byteBuffer.put(indexItem.getByteBuffer()); ++ this.byteBuffer.putInt(slotPosition, itemIndex); ++ ++ if (slotOldValue <= INVALID_INDEX) { ++ this.hashSlotCount.incrementAndGet(); ++ } ++ if (this.endTimestamp.get() < timestamp) { ++ this.endTimestamp.set(timestamp); ++ } ++ this.flushNewMetadata(byteBuffer, indexItemMaxCount == this.indexItemCount.get() + 1); ++ ++ log.trace("IndexStoreFile put key, timestamp: {}, topic: {}, key: {}, slot: {}, item: {}, previous item: {}, content: {}", ++ this.getTimestamp(), topic, key, hashCode % this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem); ++ } ++ return AppendResult.SUCCESS; ++ } catch (Exception e) { ++ log.error("IndexStoreFile put key error, topic: {}, topicId: {}, queueId: {}, keySet: {}, offset: {}, " + ++ "size: {}, timestamp: {}", topic, topicId, queueId, keySet, offset, size, timestamp, e); ++ } finally { ++ fileReadWriteLock.writeLock().unlock(); ++ } ++ ++ return AppendResult.UNKNOWN_ERROR; ++ } ++ ++ @Override ++ public CompletableFuture> queryAsync( ++ String topic, String key, int maxCount, long beginTime, long endTime) { ++ ++ switch (this.fileStatus.get()) { ++ case UNSEALED: ++ case SEALED: ++ return this.queryAsyncFromUnsealedFile(buildKey(topic, key), maxCount, beginTime, endTime); ++ case UPLOAD: ++ return this.queryAsyncFromSegmentFile(buildKey(topic, key), maxCount, beginTime, endTime); ++ case SHUTDOWN: ++ default: ++ return CompletableFuture.completedFuture(new ArrayList<>()); ++ } ++ } ++ ++ protected CompletableFuture> queryAsyncFromUnsealedFile( ++ String key, int maxCount, long beginTime, long endTime) { ++ ++ return CompletableFuture.supplyAsync(() -> { ++ List result = new ArrayList<>(); ++ try { ++ fileReadWriteLock.readLock().lock(); ++ if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) { ++ return result; ++ } ++ ++ if (mappedFile == null || !mappedFile.hold()) { ++ return result; ++ } ++ ++ int hashCode = this.hashCode(key); ++ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); ++ int slotValue = this.getSlotValue(slotPosition); ++ ++ int left = MAX_QUERY_COUNT; ++ while (left > 0 && ++ slotValue > INVALID_INDEX && ++ slotValue <= this.indexItemCount.get()) { ++ ++ byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE]; ++ ByteBuffer buffer = this.byteBuffer.duplicate(); ++ buffer.position(this.getItemPosition(slotValue)); ++ buffer.get(bytes); ++ IndexItem indexItem = new IndexItem(bytes); ++ if (hashCode == indexItem.getHashCode()) { ++ result.add(indexItem); ++ if (result.size() > maxCount) { ++ break; ++ } ++ } ++ slotValue = indexItem.getItemIndex(); ++ left--; ++ } ++ ++ log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " + ++ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", ++ getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime); ++ } catch (Exception e) { ++ log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " + ++ "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e); ++ } finally { ++ fileReadWriteLock.readLock().unlock(); ++ mappedFile.release(); ++ } ++ return result; ++ }, TieredStoreExecutor.fetchDataExecutor); ++ } ++ ++ protected CompletableFuture> queryAsyncFromSegmentFile( ++ String key, int maxCount, long beginTime, long endTime) { ++ ++ if (this.fileSegment == null || !UPLOAD.equals(this.fileStatus.get())) { ++ return CompletableFuture.completedFuture(Collections.emptyList()); ++ } ++ ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ int hashCode = this.hashCode(key); ++ int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); ++ ++ CompletableFuture> future = this.fileSegment.readAsync(slotPosition, HASH_SLOT_SIZE) ++ .thenCompose(slotBuffer -> { ++ if (slotBuffer.remaining() < HASH_SLOT_SIZE) { ++ log.error("IndexStoreFile query from tiered storage return error slot buffer, " + ++ "key: {}, maxCount: {}, timestamp={}-{}", key, maxCount, beginTime, endTime); ++ return CompletableFuture.completedFuture(null); ++ } ++ int indexPosition = slotBuffer.getInt(); ++ int indexTotalSize = Math.min(slotBuffer.getInt(), COMPACT_INDEX_ITEM_SIZE * 1024); ++ if (indexPosition <= INVALID_INDEX || indexTotalSize <= 0) { ++ return CompletableFuture.completedFuture(null); ++ } ++ return this.fileSegment.readAsync(indexPosition, indexTotalSize); ++ }) ++ .thenApply(itemBuffer -> { ++ List result = new ArrayList<>(); ++ if (itemBuffer == null) { ++ return result; ++ } ++ ++ if (itemBuffer.remaining() % COMPACT_INDEX_ITEM_SIZE != 0) { ++ log.error("IndexStoreFile query from tiered storage return error item buffer, " + ++ "key: {}, maxCount: {}, timestamp={}-{}", key, maxCount, beginTime, endTime); ++ return result; ++ } ++ ++ int size = itemBuffer.remaining() / COMPACT_INDEX_ITEM_SIZE; ++ byte[] bytes = new byte[COMPACT_INDEX_ITEM_SIZE]; ++ for (int i = 0; i < size; i++) { ++ itemBuffer.get(bytes); ++ IndexItem indexItem = new IndexItem(bytes); ++ long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); ++ if (hashCode == indexItem.getHashCode() && ++ beginTime <= storeTimestamp && storeTimestamp <= endTime && ++ result.size() < maxCount) { ++ result.add(indexItem); ++ } ++ } ++ return result; ++ }); ++ ++ return future.whenComplete((result, throwable) -> { ++ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); ++ if (throwable != null) { ++ log.error("IndexStoreFile query from segment file, cost: {}ms, timestamp: {}, " + ++ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", ++ costTime, getTimestamp(), key, hashCode, maxCount, beginTime, endTime, throwable); ++ } else { ++ String details = Optional.ofNullable(result) ++ .map(r -> r.stream() ++ .map(item -> String.format("%d-%d", item.getQueueId(), item.getOffset())) ++ .collect(Collectors.joining(", "))) ++ .orElse(""); ++ ++ log.debug("IndexStoreFile query from segment file, cost: {}ms, timestamp: {}, result size: {}, ({}), " + ++ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", ++ costTime, getTimestamp(), result != null ? result.size() : 0, details, key, hashCode, maxCount, beginTime, endTime); ++ } ++ }); ++ } ++ ++ @Override ++ public ByteBuffer doCompaction() { ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ ByteBuffer buffer; ++ try { ++ buffer = compactToNewFile(); ++ log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms", ++ this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS)); ++ } catch (Exception e) { ++ log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms", ++ this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e); ++ return null; ++ } ++ ++ try { ++ // Make sure there is no read request here ++ fileReadWriteLock.writeLock().lock(); ++ fileStatus.set(IndexStatusEnum.SEALED); ++ } catch (Exception e) { ++ log.error("IndexStoreFile change file status to sealed error, timestamp={}", this.getTimestamp()); ++ } finally { ++ fileReadWriteLock.writeLock().unlock(); ++ } ++ return buffer; ++ } ++ ++ protected String getCompactedFilePath() { ++ return Paths.get(this.mappedFile.getFileName()).getParent() ++ .resolve(FILE_COMPACTED_DIRECTORY_NAME) ++ .resolve(String.valueOf(this.getTimestamp())).toString(); ++ } ++ ++ protected ByteBuffer compactToNewFile() throws IOException { ++ ++ byte[] payload = new byte[IndexItem.INDEX_ITEM_SIZE]; ++ ByteBuffer payloadBuffer = ByteBuffer.wrap(payload); ++ int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount * HASH_SLOT_SIZE); ++ int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE * indexItemCount.get(); ++ ++ compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), fileMaxLength); ++ MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer(); ++ ++ for (int i = 0; i < hashSlotMaxCount; i++) { ++ int slotPosition = this.getSlotPosition(i); ++ int slotValue = this.getSlotValue(slotPosition); ++ int writeBeginPosition = writePosition; ++ ++ while (slotValue > INVALID_INDEX && writePosition < fileMaxLength) { ++ ByteBuffer buffer = this.byteBuffer.duplicate(); ++ buffer.position(this.getItemPosition(slotValue)); ++ buffer.get(payload); ++ int newSlotValue = payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE); ++ buffer.limit(COMPACT_INDEX_ITEM_SIZE); ++ newBuffer.position(writePosition); ++ newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE); ++ log.trace("IndexStoreFile do compaction, write item, slot: {}, current: {}, next: {}", i, slotValue, newSlotValue); ++ slotValue = newSlotValue; ++ writePosition += COMPACT_INDEX_ITEM_SIZE; ++ } ++ ++ int length = writePosition - writeBeginPosition; ++ newBuffer.putInt(slotPosition, writeBeginPosition); ++ newBuffer.putInt(slotPosition + Integer.BYTES, length); ++ ++ if (length > 0) { ++ log.trace("IndexStoreFile do compaction, write slot, slot: {}, begin: {}, length: {}", i, writeBeginPosition, length); ++ } ++ } ++ ++ this.flushNewMetadata(newBuffer, true); ++ newBuffer.flip(); ++ return newBuffer; ++ } ++ ++ @Override ++ public void shutdown() { ++ try { ++ fileReadWriteLock.writeLock().lock(); ++ this.fileStatus.set(IndexStatusEnum.SHUTDOWN); ++ if (this.mappedFile != null) { ++ this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); ++ this.mappedFile = null; ++ } ++ if (this.compactMappedFile != null) { ++ this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); ++ this.compactMappedFile = null; ++ } ++ } catch (Exception e) { ++ log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); ++ } finally { ++ fileReadWriteLock.writeLock().unlock(); ++ } ++ } ++ ++ @Override ++ public void destroy() { ++ try { ++ fileReadWriteLock.writeLock().lock(); ++ this.shutdown(); ++ switch (this.fileStatus.get()) { ++ case SHUTDOWN: ++ case UNSEALED: ++ case SEALED: ++ if (this.mappedFile != null) { ++ this.mappedFile.destroy(TimeUnit.SECONDS.toMillis(10)); ++ } ++ if (this.compactMappedFile != null) { ++ this.compactMappedFile.destroy(TimeUnit.SECONDS.toMillis(10)); ++ } ++ log.info("IndexStoreService destroy local file, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get()); ++ break; ++ case UPLOAD: ++ log.warn("[BUG] IndexStoreService destroy remote file, timestamp: {}", this.getTimestamp()); ++ } ++ } catch (Exception e) { ++ log.error("IndexStoreService destroy failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); ++ } finally { ++ fileReadWriteLock.writeLock().unlock(); ++ } ++ } ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +new file mode 100644 +index 000000000..14608aa58 +--- /dev/null ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +@@ -0,0 +1,362 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.index; ++ ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Stopwatch; ++import java.io.File; ++import java.nio.ByteBuffer; ++import java.nio.file.Paths; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Comparator; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ConcurrentNavigableMap; ++import java.util.concurrent.ConcurrentSkipListMap; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicLong; ++import java.util.concurrent.locks.ReadWriteLock; ++import java.util.concurrent.locks.ReentrantReadWriteLock; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.ServiceThread; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.store.logfile.DefaultMappedFile; ++import org.apache.rocketmq.store.logfile.MappedFile; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.file.TieredFileAllocator; ++import org.apache.rocketmq.tieredstore.file.TieredFlatFile; ++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; ++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++ ++public class IndexStoreService extends ServiceThread implements IndexService { ++ ++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); ++ ++ public static final String FILE_DIRECTORY_NAME = "tiered_index_file"; ++ public static final String FILE_COMPACTED_DIRECTORY_NAME = "compacting"; ++ ++ /** ++ * File status in table example: ++ * upload, upload, upload, sealed, sealed, unsealed ++ */ ++ private final TieredMessageStoreConfig storeConfig; ++ private final ConcurrentSkipListMap timeStoreTable; ++ private final ReadWriteLock readWriteLock; ++ private final AtomicLong compactTimestamp; ++ private final String filePath; ++ private final TieredFileAllocator fileAllocator; ++ ++ private IndexFile currentWriteFile; ++ private TieredFlatFile flatFile; ++ ++ public IndexStoreService(TieredFileAllocator fileAllocator, String filePath) { ++ this.storeConfig = fileAllocator.getStoreConfig(); ++ this.filePath = filePath; ++ this.fileAllocator = fileAllocator; ++ this.timeStoreTable = new ConcurrentSkipListMap<>(); ++ this.compactTimestamp = new AtomicLong(0L); ++ this.readWriteLock = new ReentrantReadWriteLock(); ++ this.recover(); ++ } ++ ++ private void doConvertOldFormatFile(String filePath) { ++ try { ++ File file = new File(filePath); ++ if (!file.exists()) { ++ return; ++ } ++ MappedFile mappedFile = new DefaultMappedFile(file.getPath(), (int) file.length()); ++ long timestamp = mappedFile.getMappedByteBuffer().getLong(IndexStoreFile.INDEX_BEGIN_TIME_STAMP); ++ if (timestamp <= 0) { ++ mappedFile.destroy(TimeUnit.SECONDS.toMillis(10)); ++ } else { ++ mappedFile.renameTo(String.valueOf(new File(file.getParent(), String.valueOf(timestamp)))); ++ mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); ++ } ++ } catch (Exception e) { ++ log.error("IndexStoreService do convert old format error, file: {}", filePath, e); ++ } ++ } ++ ++ private void recover() { ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ ++ // recover local ++ File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString()); ++ this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString()); ++ this.doConvertOldFormatFile(Paths.get(dir.getPath(), "1111").toString()); ++ File[] files = dir.listFiles(); ++ ++ if (files != null) { ++ List fileList = Arrays.asList(files); ++ fileList.sort(Comparator.comparing(File::getName)); ++ ++ for (File file : fileList) { ++ if (file.isDirectory() || !StringUtils.isNumeric(file.getName())) { ++ continue; ++ } ++ ++ try { ++ IndexFile indexFile = new IndexStoreFile(storeConfig, Long.parseLong(file.getName())); ++ timeStoreTable.put(indexFile.getTimestamp(), indexFile); ++ log.info("IndexStoreService recover load local file, timestamp: {}", indexFile.getTimestamp()); ++ } catch (Exception e) { ++ log.error("IndexStoreService recover, load local file error", e); ++ } ++ } ++ } ++ ++ if (this.timeStoreTable.isEmpty()) { ++ this.createNewIndexFile(System.currentTimeMillis()); ++ } ++ ++ this.currentWriteFile = this.timeStoreTable.lastEntry().getValue(); ++ this.setCompactTimestamp(this.timeStoreTable.firstKey() - 1); ++ ++ // recover remote ++ this.flatFile = fileAllocator.createFlatFileForIndexFile(filePath); ++ if (this.flatFile.getBaseOffset() == -1) { ++ this.flatFile.setBaseOffset(0); ++ } ++ ++ for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) { ++ IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment); ++ timeStoreTable.put(indexFile.getTimestamp(), indexFile); ++ log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp()); ++ } ++ ++ log.info("IndexStoreService recover finished, entrySize: {}, cost: {}ms, directory: {}", ++ timeStoreTable.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS), dir.getAbsolutePath()); ++ } ++ ++ public void createNewIndexFile(long timestamp) { ++ try { ++ this.readWriteLock.writeLock().lock(); ++ IndexFile indexFile = this.currentWriteFile; ++ if (this.timeStoreTable.containsKey(timestamp) || ++ indexFile != null && IndexFile.IndexStatusEnum.UNSEALED.equals(indexFile.getFileStatus())) { ++ return; ++ } ++ IndexStoreFile newStoreFile = new IndexStoreFile(storeConfig, timestamp); ++ this.timeStoreTable.put(timestamp, newStoreFile); ++ this.currentWriteFile = newStoreFile; ++ log.info("IndexStoreService construct next file, timestamp: {}", timestamp); ++ } catch (Exception e) { ++ log.error("IndexStoreService construct next file, timestamp: {}", timestamp, e); ++ } finally { ++ this.readWriteLock.writeLock().unlock(); ++ } ++ } ++ ++ @VisibleForTesting ++ public ConcurrentSkipListMap getTimeStoreTable() { ++ return timeStoreTable; ++ } ++ ++ @Override ++ public AppendResult putKey( ++ String topic, int topicId, int queueId, Set keySet, long offset, int size, long timestamp) { ++ ++ if (StringUtils.isBlank(topic)) { ++ return AppendResult.UNKNOWN_ERROR; ++ } ++ ++ if (keySet == null || keySet.isEmpty()) { ++ return AppendResult.SUCCESS; ++ } ++ ++ for (int i = 0; i < 3; i++) { ++ AppendResult result = this.currentWriteFile.putKey( ++ topic, topicId, queueId, keySet, offset, size, timestamp); ++ ++ if (AppendResult.SUCCESS.equals(result)) { ++ return AppendResult.SUCCESS; ++ } else if (AppendResult.FILE_FULL.equals(result)) { ++ this.createNewIndexFile(timestamp); ++ } ++ } ++ ++ log.error("IndexStoreService put key three times return error, topic: {}, topicId: {}, " + ++ "queueId: {}, keySize: {}, timestamp: {}", topic, topicId, queueId, keySet.size(), timestamp); ++ return AppendResult.UNKNOWN_ERROR; ++ } ++ ++ @Override ++ public CompletableFuture> queryAsync( ++ String topic, String key, int maxCount, long beginTime, long endTime) { ++ ++ CompletableFuture> future = new CompletableFuture<>(); ++ try { ++ readWriteLock.readLock().lock(); ++ ConcurrentNavigableMap pendingMap = ++ this.timeStoreTable.subMap(beginTime, true, endTime, true); ++ List> futureList = new ArrayList<>(pendingMap.size()); ++ ConcurrentHashMap result = new ConcurrentHashMap<>(); ++ ++ for (Map.Entry entry : pendingMap.descendingMap().entrySet()) { ++ CompletableFuture completableFuture = entry.getValue() ++ .queryAsync(topic, key, maxCount, beginTime, endTime) ++ .thenAccept(itemList -> itemList.forEach(indexItem -> { ++ if (result.size() < maxCount) { ++ result.put(String.format( ++ "%d-%d", indexItem.getQueueId(), indexItem.getOffset()), indexItem); ++ } ++ })); ++ futureList.add(completableFuture); ++ } ++ ++ CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])) ++ .whenComplete((v, t) -> { ++ // Try to return the query results as much as possible here ++ // rather than directly throwing exceptions ++ if (result.isEmpty() && t != null) { ++ future.completeExceptionally(t); ++ } else { ++ List resultList = new ArrayList<>(result.values()); ++ future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount))); ++ } ++ }); ++ } catch (Exception e) { ++ future.completeExceptionally(e); ++ } finally { ++ readWriteLock.readLock().unlock(); ++ } ++ return future; ++ } ++ ++ public void doCompactThenUploadFile(IndexFile indexFile) { ++ if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) { ++ log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}", ++ indexFile.getTimestamp(), indexFile.getFileStatus()); ++ return; ++ } ++ ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ ByteBuffer byteBuffer = indexFile.doCompaction(); ++ if (byteBuffer == null) { ++ log.error("IndexStoreService found compaction buffer is null, timestamp: {}", indexFile.getTimestamp()); ++ return; ++ } ++ flatFile.append(byteBuffer); ++ flatFile.commit(true); ++ ++ TieredFileSegment fileSegment = flatFile.getFileByIndex(flatFile.getFileSegmentCount() - 1); ++ if (fileSegment == null || fileSegment.getMinTimestamp() != indexFile.getTimestamp()) { ++ log.warn("IndexStoreService submit compacted file to server failed, timestamp: {}", indexFile.getTimestamp()); ++ return; ++ } ++ ++ try { ++ readWriteLock.writeLock().lock(); ++ IndexFile storeFile = new IndexStoreFile(storeConfig, fileSegment); ++ timeStoreTable.put(indexFile.getTimestamp(), storeFile); ++ indexFile.destroy(); ++ } catch (Exception e) { ++ log.error("IndexStoreService switch file failed, timestamp: {}, cost: {}ms", ++ indexFile.getTimestamp(), stopwatch.elapsed(TimeUnit.MILLISECONDS), e); ++ } finally { ++ readWriteLock.writeLock().unlock(); ++ } ++ } ++ ++ public void destroyExpiredFile(long expireTimestamp) { ++ flatFile.cleanExpiredFile(expireTimestamp); ++ flatFile.destroyExpiredFile(); ++ } ++ ++ public void destroy() { ++ try { ++ readWriteLock.writeLock().lock(); ++ ++ // delete local store file ++ for (Map.Entry entry : timeStoreTable.entrySet()) { ++ IndexFile indexFile = entry.getValue(); ++ if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) { ++ continue; ++ } ++ indexFile.destroy(); ++ } ++ ++ // delete remote ++ if (flatFile != null) { ++ flatFile.destroy(); ++ } ++ } catch (Exception e) { ++ log.error("IndexStoreService destroy all file error", e); ++ } finally { ++ readWriteLock.writeLock().unlock(); ++ } ++ } ++ ++ @Override ++ public String getServiceName() { ++ return IndexStoreService.class.getSimpleName(); ++ } ++ ++ public void setCompactTimestamp(long timestamp) { ++ this.compactTimestamp.set(timestamp); ++ log.info("IndexStoreService compact timestamp has been set to: {}", timestamp); ++ } ++ ++ protected IndexFile getNextSealedFile() { ++ try { ++ Map.Entry entry = ++ this.timeStoreTable.higherEntry(this.compactTimestamp.get()); ++ if (entry != null && entry.getKey() < this.timeStoreTable.lastKey()) { ++ return entry.getValue(); ++ } ++ } catch (Throwable e) { ++ log.error("Error occurred in " + getServiceName(), e); ++ } ++ return null; ++ } ++ ++ @Override ++ public void run() { ++ log.info(this.getServiceName() + " service started"); ++ while (!this.isStopped()) { ++ long expireTimestamp = System.currentTimeMillis() ++ - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); ++ this.destroyExpiredFile(expireTimestamp); ++ ++ IndexFile indexFile = this.getNextSealedFile(); ++ if (indexFile == null) { ++ this.waitForRunning(TimeUnit.SECONDS.toMillis(10)); ++ continue; ++ } ++ this.doCompactThenUploadFile(indexFile); ++ this.setCompactTimestamp(indexFile.getTimestamp()); ++ } ++ log.info(this.getServiceName() + " service shutdown"); ++ } ++ ++ @Override ++ public void shutdown() { ++ super.shutdown(); ++ for (Map.Entry entry : timeStoreTable.entrySet()) { ++ entry.getValue().shutdown(); ++ } ++ this.timeStoreTable.clear(); ++ log.info("IndexStoreService shutdown gracefully"); ++ } ++} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +index 32911a6e8..aad42de98 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +@@ -31,12 +31,14 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; + import org.apache.rocketmq.tieredstore.exception.TieredStoreException; + import org.apache.rocketmq.tieredstore.file.TieredCommitLog; + import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; +-import org.apache.rocketmq.tieredstore.file.TieredIndexFile; + import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; + import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + ++import static org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_BEGIN_TIME_STAMP; ++import static org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_END_TIME_STAMP; ++ + public abstract class TieredFileSegment implements Comparable, TieredStoreProvider { + + private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); +@@ -198,8 +200,9 @@ public abstract class TieredFileSegment implements Comparable + } + + if (fileType == FileSegmentType.INDEX) { +- minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION); +- maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION); ++ minTimestamp = byteBuf.getLong(INDEX_BEGIN_TIME_STAMP); ++ maxTimestamp = byteBuf.getLong(INDEX_END_TIME_STAMP); ++ + appendPosition += byteBuf.remaining(); + // IndexFile is large and not change after compaction, no need deep copy + bufferList.add(byteBuf); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java +index 0db3eaf8f..b9938b7a8 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java +@@ -59,7 +59,7 @@ public interface TieredStoreProvider { + * Get data from backend file system + * + * @param position the index from where the file will be read +- * @param length the data size will be read ++ * @param length the data size will be read + * @return data to be read + */ + CompletableFuture read0(long position, int length); +@@ -68,10 +68,10 @@ public interface TieredStoreProvider { + * Put data to backend file system + * + * @param inputStream data stream +- * @param position backend file position to put, used in append mode +- * @param length data size in stream +- * @param append try to append or create a new file ++ * @param position backend file position to put, used in append mode ++ * @param length data size in stream ++ * @param append try to append or create a new file + * @return put result, true if data successfully write; false otherwise + */ +- CompletableFuture commit0(FileSegmentInputStream inputStream,long position, int length, boolean append); ++ CompletableFuture commit0(FileSegmentInputStream inputStream, long position, int length, boolean append); + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +index 7e949cb28..ee56b1e68 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +@@ -159,6 +159,7 @@ public class PosixFileSegment extends TieredFileSegment { + readFileChannel.position(position); + readFileChannel.read(byteBuffer); + byteBuffer.flip(); ++ byteBuffer.limit(length); + + attributesBuilder.put(LABEL_SUCCESS, true); + long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +index 774c6cf64..4e0d7e697 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +@@ -37,7 +37,6 @@ import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; + import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; + import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; +-import org.apache.rocketmq.tieredstore.file.TieredIndexFile; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +@@ -83,6 +82,7 @@ public class TieredMessageFetcherTest { + Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, getMessageResult.getStatus()); + + CompositeFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq); ++ Assert.assertNotNull(flatFile); + flatFile.initOffset(0); + + getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), mq.getQueueId(), 0, 32, null).join(); +@@ -197,6 +197,7 @@ public class TieredMessageFetcherTest { + public void testGetMessageStoreTimeStampAsync() { + TieredMessageFetcher fetcher = new TieredMessageFetcher(storeConfig); + CompositeFlatFile flatFile = TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq); ++ Assert.assertNotNull(flatFile); + flatFile.initOffset(0); + + ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer(); +@@ -270,6 +271,7 @@ public class TieredMessageFetcherTest { + CompositeQueueFlatFile flatFile = TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq); + Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join().getMessageMapedList().size()); + ++ Assert.assertNotNull(flatFile); + flatFile.initOffset(0); + ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); + buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0); +@@ -281,20 +283,19 @@ public class TieredMessageFetcherTest { + buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2); + flatFile.appendCommitLog(buffer); + +- DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, null); ++ long timestamp = System.currentTimeMillis(); ++ DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, timestamp, 0, "", "key", 0, 0, null); + flatFile.appendIndexFile(request); +- request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, null); ++ request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 1, 0, "", "key", 0, 0, null); + flatFile.appendIndexFile(request); +- request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "another-key", 0, 0, null); ++ request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 2, 0, "", "another-key", 0, 0, null); + flatFile.appendIndexFile(request); + flatFile.commit(true); +- TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig); +- indexFile.commit(true); + Assert.assertEquals(1, fetcher.queryMessageAsync(mq.getTopic(), "key", 1, 0, Long.MAX_VALUE).join().getMessageMapedList().size()); + + QueryMessageResult result = fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join(); + Assert.assertEquals(2, result.getMessageMapedList().size()); +- Assert.assertEquals(1, result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION)); +- Assert.assertEquals(0, result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION)); ++ Assert.assertEquals(0, result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION)); ++ Assert.assertEquals(1, result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION)); + } + } +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +deleted file mode 100644 +index 2da72bc7a..000000000 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java ++++ /dev/null +@@ -1,93 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.rocketmq.tieredstore.file; +- +-import com.sun.jna.Platform; +-import java.io.IOException; +-import java.nio.ByteBuffer; +-import java.time.Duration; +-import java.util.List; +-import org.apache.commons.lang3.tuple.Pair; +-import org.apache.rocketmq.common.message.MessageQueue; +-import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; +-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; +-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +-import org.awaitility.Awaitility; +-import org.junit.After; +-import org.junit.Assert; +-import org.junit.Before; +-import org.junit.Test; +- +-public class TieredIndexFileTest { +- +- private final String storePath = TieredStoreTestUtil.getRandomStorePath(); +- private MessageQueue mq; +- private TieredMessageStoreConfig storeConfig; +- +- @Before +- public void setUp() { +- storeConfig = new TieredMessageStoreConfig(); +- storeConfig.setBrokerName("IndexFileBroker"); +- storeConfig.setStorePathRootDir(storePath); +- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); +- storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5); +- storeConfig.setTieredStoreIndexFileMaxIndexNum(20); +- mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1); +- TieredStoreUtil.getMetadataStore(storeConfig); +- TieredStoreExecutor.init(); +- } +- +- @After +- public void tearDown() throws IOException { +- TieredStoreTestUtil.destroyMetadataStore(); +- TieredStoreTestUtil.destroyTempDir(storePath); +- TieredStoreExecutor.shutdown(); +- } +- +- @Test +- public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException { +- if (Platform.isWindows()) { +- return; +- } +- +- TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig); +- TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath); +- +- indexFile.append(mq, 0, "key3", 3, 300, 1000); +- indexFile.append(mq, 0, "key2", 2, 200, 1100); +- indexFile.append(mq, 0, "key1", 1, 100, 1200); +- +- // do not do schedule task here +- TieredStoreExecutor.shutdown(); +- List> indexList = +- indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); +- Assert.assertEquals(0, indexList.size()); +- +- // do compaction once +- TieredStoreExecutor.init(); +- storeConfig.setTieredStoreIndexFileRollingIdleInterval(0); +- indexFile.doScheduleTask(); +- Awaitility.await().atMost(Duration.ofSeconds(10)) +- .until(() -> !indexFile.getPreMappedFile().getFile().exists()); +- +- indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); +- Assert.assertEquals(1, indexList.size()); +- +- indexFile.destroy(); +- } +-} +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java +new file mode 100644 +index 000000000..22ed4cc18 +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java +@@ -0,0 +1,91 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.index; ++ ++import java.nio.ByteBuffer; ++import org.junit.Assert; ++import org.junit.Test; ++ ++public class IndexItemTest { ++ ++ private final int topicId = 1; ++ private final int queueId = 2; ++ private final long offset = 3L; ++ private final int size = 4; ++ private final int hashCode = 5; ++ private final int timeDiff = 6; ++ private final int itemIndex = 7; ++ ++ @Test ++ public void indexItemConstructorTest() { ++ IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, hashCode, timeDiff, itemIndex); ++ ++ Assert.assertEquals(topicId, indexItem.getTopicId()); ++ Assert.assertEquals(queueId, indexItem.getQueueId()); ++ Assert.assertEquals(offset, indexItem.getOffset()); ++ Assert.assertEquals(size, indexItem.getSize()); ++ Assert.assertEquals(hashCode, indexItem.getHashCode()); ++ Assert.assertEquals(timeDiff, indexItem.getTimeDiff()); ++ Assert.assertEquals(itemIndex, indexItem.getItemIndex()); ++ } ++ ++ @Test ++ public void byteBufferConstructorTest() { ++ ByteBuffer byteBuffer = ByteBuffer.allocate(IndexItem.INDEX_ITEM_SIZE); ++ byteBuffer.putInt(hashCode); ++ byteBuffer.putInt(topicId); ++ byteBuffer.putInt(queueId); ++ byteBuffer.putLong(offset); ++ byteBuffer.putInt(size); ++ byteBuffer.putInt(timeDiff); ++ byteBuffer.putInt(itemIndex); ++ ++ byte[] bytes = byteBuffer.array(); ++ IndexItem indexItem = new IndexItem(bytes); ++ ++ Assert.assertEquals(topicId, indexItem.getTopicId()); ++ Assert.assertEquals(queueId, indexItem.getQueueId()); ++ Assert.assertEquals(offset, indexItem.getOffset()); ++ Assert.assertEquals(size, indexItem.getSize()); ++ Assert.assertEquals(hashCode, indexItem.getHashCode()); ++ Assert.assertEquals(timeDiff, indexItem.getTimeDiff()); ++ Assert.assertEquals(itemIndex, indexItem.getItemIndex()); ++ Assert.assertNotNull(indexItem.toString()); ++ ++ Exception exception = null; ++ try { ++ new IndexItem(null); ++ } catch (Exception e) { ++ exception = e; ++ } ++ Assert.assertNotNull(exception); ++ } ++ ++ @Test ++ public void getByteBufferTest() { ++ IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, hashCode, timeDiff, itemIndex); ++ ByteBuffer byteBuffer = indexItem.getByteBuffer(); ++ Assert.assertEquals(hashCode, byteBuffer.getInt(0)); ++ Assert.assertEquals(topicId, byteBuffer.getInt(4)); ++ Assert.assertEquals(queueId, byteBuffer.getInt(8)); ++ Assert.assertEquals(offset, byteBuffer.getLong(12)); ++ Assert.assertEquals(size, byteBuffer.getInt(20)); ++ Assert.assertEquals(timeDiff, byteBuffer.getInt(24)); ++ Assert.assertEquals(itemIndex, byteBuffer.getInt(28)); ++ } ++} +\ No newline at end of file +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java +new file mode 100644 +index 000000000..b408a7c3c +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java +@@ -0,0 +1,282 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.index; ++ ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.nio.file.Paths; ++import java.util.Collections; ++import java.util.List; ++import java.util.Set; ++import java.util.UUID; ++import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.common.FileSegmentType; ++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; ++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; ++import org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++public class IndexStoreFileTest { ++ ++ private static final String TOPIC_NAME = "TopicTest"; ++ private static final int TOPIC_ID = 123; ++ private static final int QUEUE_ID = 2; ++ private static final long MESSAGE_OFFSET = 666L; ++ private static final int MESSAGE_SIZE = 1024; ++ private static final String KEY = "MessageKey"; ++ private static final Set KEY_SET = Collections.singleton(KEY); ++ ++ private String filePath; ++ private TieredMessageStoreConfig storeConfig; ++ private IndexStoreFile indexStoreFile; ++ ++ @Before ++ public void init() throws IOException { ++ TieredStoreExecutor.init(); ++ filePath = UUID.randomUUID().toString().replace("-", "").substring(0, 8); ++ String directory = Paths.get(System.getProperty("user.home"), "store_test", filePath).toString(); ++ storeConfig = new TieredMessageStoreConfig(); ++ storeConfig.setStorePathRootDir(directory); ++ storeConfig.setTieredStoreFilePath(directory); ++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5); ++ storeConfig.setTieredStoreIndexFileMaxIndexNum(20); ++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); ++ indexStoreFile = new IndexStoreFile(storeConfig, System.currentTimeMillis()); ++ } ++ ++ @After ++ public void shutdown() { ++ if (this.indexStoreFile != null) { ++ this.indexStoreFile.shutdown(); ++ this.indexStoreFile.destroy(); ++ } ++ TieredStoreTestUtil.destroyMetadataStore(); ++ TieredStoreTestUtil.destroyTempDir(storeConfig.getStorePathRootDir()); ++ TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath()); ++ TieredStoreExecutor.shutdown(); ++ } ++ ++ @Test ++ public void testIndexHeaderConstants() { ++ Assert.assertEquals(0, IndexStoreFile.INDEX_MAGIC_CODE); ++ Assert.assertEquals(4, IndexStoreFile.INDEX_BEGIN_TIME_STAMP); ++ Assert.assertEquals(12, IndexStoreFile.INDEX_END_TIME_STAMP); ++ Assert.assertEquals(20, IndexStoreFile.INDEX_SLOT_COUNT); ++ Assert.assertEquals(24, IndexStoreFile.INDEX_ITEM_INDEX); ++ Assert.assertEquals(28, IndexStoreFile.INDEX_HEADER_SIZE); ++ Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 4, IndexStoreFile.BEGIN_MAGIC_CODE); ++ Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 8, IndexStoreFile.END_MAGIC_CODE); ++ } ++ ++ @Test ++ public void basicMethodTest() throws IOException { ++ long timestamp = System.currentTimeMillis(); ++ IndexStoreFile localFile = new IndexStoreFile(storeConfig, timestamp); ++ Assert.assertEquals(timestamp, localFile.getTimestamp()); ++ ++ // test file status ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, localFile.getFileStatus()); ++ localFile.doCompaction(); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, localFile.getFileStatus()); ++ ++ // test hash ++ Assert.assertEquals("TopicTest#MessageKey", localFile.buildKey(TOPIC_NAME, KEY)); ++ Assert.assertEquals(638347386, indexStoreFile.hashCode(localFile.buildKey(TOPIC_NAME, KEY))); ++ ++ // test calculate position ++ long headerSize = IndexStoreFile.INDEX_HEADER_SIZE; ++ Assert.assertEquals(headerSize + Long.BYTES * 2, indexStoreFile.getSlotPosition(2)); ++ Assert.assertEquals(headerSize + Long.BYTES * 5, indexStoreFile.getSlotPosition(5)); ++ Assert.assertEquals(headerSize + Long.BYTES * 5 + IndexItem.INDEX_ITEM_SIZE * 2, ++ indexStoreFile.getItemPosition(2)); ++ Assert.assertEquals(headerSize + Long.BYTES * 5 + IndexItem.INDEX_ITEM_SIZE * 5, ++ indexStoreFile.getItemPosition(5)); ++ } ++ ++ @Test ++ public void basicPutGetTest() { ++ long timestamp = indexStoreFile.getTimestamp(); ++ ++ // check metadata ++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp()); ++ Assert.assertEquals(0, indexStoreFile.getEndTimestamp()); ++ Assert.assertEquals(0, indexStoreFile.getIndexItemCount()); ++ Assert.assertEquals(0, indexStoreFile.getHashSlotCount()); ++ ++ // not put success ++ Assert.assertEquals(AppendResult.UNKNOWN_ERROR, indexStoreFile.putKey( ++ null, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, null, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.emptySet(), MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ ++ // first item is invalid ++ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum() - 2; i++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp()); ++ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp()); ++ Assert.assertEquals(1, indexStoreFile.getHashSlotCount()); ++ Assert.assertEquals(i + 1, indexStoreFile.getIndexItemCount()); ++ } ++ ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ ++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp()); ++ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp()); ++ Assert.assertEquals(1, indexStoreFile.getHashSlotCount()); ++ Assert.assertEquals(storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1, indexStoreFile.getIndexItemCount()); ++ } ++ ++ @Test ++ public void differentKeyPutTest() { ++ long timestamp = indexStoreFile.getTimestamp(); ++ for (int i = 0; i < 5; i++) { ++ for (int j = 0; j < 3; j++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME + i, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ } ++ } ++ Assert.assertEquals(timestamp, indexStoreFile.getTimestamp()); ++ Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp()); ++ Assert.assertEquals(5, indexStoreFile.getHashSlotCount()); ++ Assert.assertEquals(5 * 3, indexStoreFile.getIndexItemCount()); ++ } ++ ++ @Test ++ public void concurrentPutTest() throws InterruptedException { ++ long timestamp = indexStoreFile.getTimestamp(); ++ ++ ExecutorService executorService = Executors.newFixedThreadPool( ++ 4, new ThreadFactoryImpl("ConcurrentPutGetTest")); ++ ++ // first item is invalid ++ int indexCount = storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1; ++ CountDownLatch latch = new CountDownLatch(indexCount); ++ for (int i = 0; i < indexCount; i++) { ++ executorService.submit(() -> { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ try { ++ Thread.sleep(100); ++ } catch (InterruptedException ignored) { ++ } ++ latch.countDown(); ++ }); ++ } ++ latch.await(); ++ ++ executorService.shutdown(); ++ Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount()); ++ } ++ ++ @Test ++ public void recoverFileTest() throws IOException { ++ int indexCount = 10; ++ long timestamp = indexStoreFile.getTimestamp(); ++ for (int i = 0; i < indexCount; i++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ } ++ indexStoreFile.shutdown(); ++ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount()); ++ indexStoreFile = new IndexStoreFile(storeConfig, timestamp); ++ Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount()); ++ } ++ ++ @Test ++ public void doCompactionTest() throws Exception { ++ long timestamp = indexStoreFile.getTimestamp(); ++ for (int i = 0; i < 10; i++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ } ++ ++ ByteBuffer byteBuffer = indexStoreFile.doCompaction(); ++ TieredFileSegment fileSegment = new PosixFileSegment( ++ storeConfig, FileSegmentType.INDEX, filePath, 0L); ++ fileSegment.append(byteBuffer, timestamp); ++ fileSegment.commit(); ++ Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize()); ++ fileSegment.destroyFile(); ++ } ++ ++ @Test ++ public void queryAsyncFromUnsealedFileTest() throws Exception { ++ long timestamp = indexStoreFile.getTimestamp(); ++ for (int i = 0; i < 5; i++) { ++ for (int j = 0; j < 3; j++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(TOPIC_NAME + i, ++ TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, System.currentTimeMillis())); ++ } ++ } ++ List itemList = indexStoreFile.queryAsync( ++ TOPIC_NAME + "1", KEY, 64, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(3, itemList.size()); ++ } ++ ++ @Test ++ public void queryAsyncFromSegmentFileTest() throws ExecutionException, InterruptedException { ++ long timestamp = indexStoreFile.getTimestamp(); ++ for (int i = 0; i < 5; i++) { ++ for (int j = 0; j < 3; j++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(TOPIC_NAME + i, ++ TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, System.currentTimeMillis())); ++ } ++ } ++ ++ ByteBuffer byteBuffer = indexStoreFile.doCompaction(); ++ TieredFileSegment fileSegment = new PosixFileSegment( ++ storeConfig, FileSegmentType.INDEX, filePath, 0L); ++ fileSegment.append(byteBuffer, timestamp); ++ fileSegment.commit(); ++ Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize()); ++ indexStoreFile.destroy(); ++ ++ indexStoreFile = new IndexStoreFile(storeConfig, fileSegment); ++ ++ // change topic ++ List itemList = indexStoreFile.queryAsync( ++ TOPIC_NAME, KEY, 64, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(0, itemList.size()); ++ ++ // change key ++ itemList = indexStoreFile.queryAsync( ++ TOPIC_NAME, KEY + "1", 64, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(0, itemList.size()); ++ ++ itemList = indexStoreFile.queryAsync( ++ TOPIC_NAME + "1", KEY, 64, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(3, itemList.size()); ++ } ++} +\ No newline at end of file +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java +new file mode 100644 +index 000000000..57d00eefe +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java +@@ -0,0 +1,147 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.tieredstore.index; ++ ++import com.google.common.base.Stopwatch; ++import java.io.File; ++import java.io.IOException; ++import java.nio.file.Paths; ++import java.util.Collections; ++import java.util.List; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.LongAdder; ++import org.apache.rocketmq.common.UtilAll; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; ++import org.apache.rocketmq.tieredstore.file.TieredFileAllocator; ++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.junit.Assert; ++import org.junit.Ignore; ++import org.openjdk.jmh.annotations.BenchmarkMode; ++import org.openjdk.jmh.annotations.Fork; ++import org.openjdk.jmh.annotations.Measurement; ++import org.openjdk.jmh.annotations.Mode; ++import org.openjdk.jmh.annotations.OutputTimeUnit; ++import org.openjdk.jmh.annotations.Scope; ++import org.openjdk.jmh.annotations.Setup; ++import org.openjdk.jmh.annotations.State; ++import org.openjdk.jmh.annotations.TearDown; ++import org.openjdk.jmh.annotations.Threads; ++import org.openjdk.jmh.annotations.Warmup; ++import org.openjdk.jmh.results.format.ResultFormatType; ++import org.openjdk.jmh.runner.Runner; ++import org.openjdk.jmh.runner.options.Options; ++import org.openjdk.jmh.runner.options.OptionsBuilder; ++ ++@Ignore ++@State(Scope.Benchmark) ++@Fork(value = 1, jvmArgs = {"-Djava.net.preferIPv4Stack=true", "-Djmh.rmi.port=1099"}) ++public class IndexStoreServiceBenchTest { ++ ++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); ++ private static final String TOPIC_NAME = "TopicTest"; ++ private TieredMessageStoreConfig storeConfig; ++ private IndexStoreService indexStoreService; ++ private final LongAdder failureCount = new LongAdder(); ++ ++ @Setup ++ public void init() throws ClassNotFoundException, NoSuchMethodException { ++ String storePath = Paths.get(System.getProperty("user.home"), "store_test", "index").toString(); ++ UtilAll.deleteFile(new File(storePath)); ++ UtilAll.deleteFile(new File("./e96d41b2_IndexService")); ++ storeConfig = new TieredMessageStoreConfig(); ++ storeConfig.setBrokerClusterName("IndexService"); ++ storeConfig.setBrokerName("IndexServiceBroker"); ++ storeConfig.setStorePathRootDir(storePath); ++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); ++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500 * 1000); ++ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000 * 1000); ++ TieredStoreUtil.getMetadataStore(storeConfig); ++ TieredStoreExecutor.init(); ++ TieredFileAllocator tieredFileAllocator = new TieredFileAllocator(storeConfig); ++ indexStoreService = new IndexStoreService(tieredFileAllocator, storePath); ++ indexStoreService.start(); ++ } ++ ++ @TearDown ++ public void shutdown() throws IOException { ++ indexStoreService.shutdown(); ++ indexStoreService.destroy(); ++ TieredStoreExecutor.shutdown(); ++ } ++ ++ //@Benchmark ++ @Threads(2) ++ @BenchmarkMode(Mode.Throughput) ++ @OutputTimeUnit(TimeUnit.SECONDS) ++ @Warmup(iterations = 1, time = 1) ++ @Measurement(iterations = 1, time = 1) ++ public void doPutThroughputBenchmark() { ++ for (int i = 0; i < 100; i++) { ++ AppendResult result = indexStoreService.putKey( ++ TOPIC_NAME, 123, 2, Collections.singleton(String.valueOf(i)), ++ i * 100L, i * 100, System.currentTimeMillis()); ++ if (AppendResult.SUCCESS.equals(result)) { ++ failureCount.increment(); ++ } ++ } ++ } ++ ++ @Threads(1) ++ @BenchmarkMode(Mode.AverageTime) ++ @OutputTimeUnit(TimeUnit.SECONDS) ++ @Warmup(iterations = 0) ++ @Measurement(iterations = 1, time = 1) ++ public void doGetThroughputBenchmark() throws ExecutionException, InterruptedException { ++ for (int j = 0; j < 10; j++) { ++ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) { ++ indexStoreService.putKey( ++ "TopicTest", 123, j, Collections.singleton(String.valueOf(i)), ++ i * 100L, i * 100, System.currentTimeMillis()); ++ } ++ } ++ ++ int queryCount = 100 * 10000; ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ for (int i = 0; i < queryCount; i++) { ++ List indexItems = indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i), ++ 20, 0, System.currentTimeMillis()).get(); ++ Assert.assertEquals(10, indexItems.size()); ++ ++ List indexItems2 = indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i), ++ 5, 0, System.currentTimeMillis()).get(); ++ Assert.assertEquals(5, indexItems2.size()); ++ } ++ log.info("DoGetThroughputBenchmark test cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); ++ } ++ ++ public static void main(String[] args) throws Exception { ++ Options opt = new OptionsBuilder() ++ .include(IndexStoreServiceBenchTest.class.getSimpleName()) ++ .warmupIterations(0) ++ .measurementIterations(1) ++ .result("result.json") ++ .resultFormat(ResultFormatType.JSON) ++ .build(); ++ new Runner(opt).run(); ++ } ++} +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +new file mode 100644 +index 000000000..20b4acbfa +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +@@ -0,0 +1,313 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.index; ++ ++import java.io.File; ++import java.io.IOException; ++import java.nio.file.Paths; ++import java.time.Duration; ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.List; ++import java.util.Set; ++import java.util.UUID; ++import java.util.concurrent.ConcurrentSkipListMap; ++import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.atomic.AtomicInteger; ++import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.store.logfile.DefaultMappedFile; ++import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; ++import org.apache.rocketmq.tieredstore.file.TieredFileAllocator; ++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++import static org.awaitility.Awaitility.await; ++ ++public class IndexStoreServiceTest { ++ ++ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); ++ ++ private static final String TOPIC_NAME = "TopicTest"; ++ private static final int TOPIC_ID = 123; ++ private static final int QUEUE_ID = 2; ++ private static final long MESSAGE_OFFSET = 666; ++ private static final int MESSAGE_SIZE = 1024; ++ private static final Set KEY_SET = Collections.singleton("MessageKey"); ++ ++ private String filePath; ++ private TieredMessageStoreConfig storeConfig; ++ private TieredFileAllocator fileAllocator; ++ private IndexStoreService indexService; ++ ++ @Before ++ public void init() throws IOException, ClassNotFoundException, NoSuchMethodException { ++ TieredStoreExecutor.init(); ++ filePath = UUID.randomUUID().toString().replace("-", "").substring(0, 8); ++ String directory = Paths.get(System.getProperty("user.home"), "store_test", filePath).toString(); ++ storeConfig = new TieredMessageStoreConfig(); ++ storeConfig.setStorePathRootDir(directory); ++ storeConfig.setTieredStoreFilePath(directory); ++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5); ++ storeConfig.setTieredStoreIndexFileMaxIndexNum(20); ++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); ++ fileAllocator = new TieredFileAllocator(storeConfig); ++ } ++ ++ @After ++ public void shutdown() { ++ if (indexService != null) { ++ indexService.shutdown(); ++ indexService.destroy(); ++ } ++ TieredStoreTestUtil.destroyMetadataStore(); ++ TieredStoreTestUtil.destroyTempDir(storeConfig.getStorePathRootDir()); ++ TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath()); ++ TieredStoreExecutor.shutdown(); ++ } ++ ++ @Test ++ public void basicServiceTest() throws InterruptedException { ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ for (int i = 0; i < 50; i++) { ++ Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, MESSAGE_SIZE, System.currentTimeMillis())); ++ TimeUnit.MILLISECONDS.sleep(1); ++ } ++ ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable(); ++ Assert.assertEquals(3, timeStoreTable.size()); ++ } ++ ++ @Test ++ public void doConvertOldFormatTest() throws IOException { ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ long timestamp = indexService.getTimeStoreTable().firstKey(); ++ Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); ++ indexService.shutdown(); ++ ++ File file = new File(Paths.get(filePath, IndexStoreService.FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString()); ++ DefaultMappedFile mappedFile = new DefaultMappedFile(file.getName(), (int) file.length()); ++ mappedFile.renameTo(String.valueOf(new File(file.getParent(), "0000"))); ++ mappedFile.shutdown(10 * 1000); ++ ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable(); ++ Assert.assertEquals(1, timeStoreTable.size()); ++ Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey()); ++ mappedFile.destroy(10 * 1000); ++ } ++ ++ @Test ++ public void concurrentPutTest() throws InterruptedException { ++ ExecutorService executorService = Executors.newFixedThreadPool( ++ 4, new ThreadFactoryImpl("ConcurrentPutTest")); ++ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500); ++ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000); ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ long timestamp = System.currentTimeMillis(); ++ ++ // first item is invalid ++ AtomicInteger success = new AtomicInteger(); ++ int indexCount = 5000; ++ CountDownLatch latch = new CountDownLatch(indexCount); ++ for (int i = 0; i < indexCount; i++) { ++ final int index = i; ++ executorService.submit(() -> { ++ try { ++ AppendResult result = indexService.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(index)), ++ index * 100, MESSAGE_SIZE, timestamp + index); ++ if (AppendResult.SUCCESS.equals(result)) { ++ success.incrementAndGet(); ++ } ++ } catch (Exception e) { ++ log.error("ConcurrentPutTest error", e); ++ } finally { ++ latch.countDown(); ++ } ++ }); ++ } ++ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); ++ Assert.assertEquals(3, indexService.getTimeStoreTable().size()); ++ executorService.shutdown(); ++ } ++ ++ @Test ++ public void doCompactionTest() throws InterruptedException { ++ concurrentPutTest(); ++ IndexFile indexFile = indexService.getNextSealedFile(); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, indexFile.getFileStatus()); ++ ++ indexService.doCompactThenUploadFile(indexFile); ++ indexService.setCompactTimestamp(indexFile.getTimestamp()); ++ indexFile.destroy(); ++ ++ List files = new ArrayList<>(indexService.getTimeStoreTable().values()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, files.get(1).getFileStatus()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus()); ++ ++ indexFile = indexService.getNextSealedFile(); ++ indexService.doCompactThenUploadFile(indexFile); ++ indexService.setCompactTimestamp(indexFile.getTimestamp()); ++ files = new ArrayList<>(indexService.getTimeStoreTable().values()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(1).getFileStatus()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus()); ++ ++ indexFile = indexService.getNextSealedFile(); ++ Assert.assertNull(indexFile); ++ files = new ArrayList<>(indexService.getTimeStoreTable().values()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(0).getFileStatus()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, files.get(1).getFileStatus()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, files.get(2).getFileStatus()); ++ } ++ ++ @Test ++ public void runServiceTest() throws InterruptedException { ++ concurrentPutTest(); ++ indexService.start(); ++ await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(() -> { ++ boolean result = true; ++ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values()); ++ result &= IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus()); ++ result &= IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(1).getFileStatus()); ++ result &= IndexFile.IndexStatusEnum.UNSEALED.equals(files.get(2).getFileStatus()); ++ return result; ++ }); ++ } ++ ++ @Test ++ public void restartServiceTest() throws InterruptedException { ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ for (int i = 0; i < 20; i++) { ++ AppendResult result = indexService.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)), ++ i * 100L, MESSAGE_SIZE, System.currentTimeMillis()); ++ Assert.assertEquals(AppendResult.SUCCESS, result); ++ TimeUnit.MILLISECONDS.sleep(1); ++ } ++ long timestamp = indexService.getTimeStoreTable().firstKey(); ++ indexService.shutdown(); ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue()); ++ ++ indexService.start(); ++ await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(() -> { ++ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values()); ++ return IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus()); ++ }); ++ indexService.shutdown(); ++ ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue()); ++ Assert.assertEquals(2, indexService.getTimeStoreTable().size()); ++ Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, ++ indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus()); ++ } ++ ++ @Test ++ public void queryFromFileTest() throws InterruptedException, ExecutionException { ++ long timestamp = System.currentTimeMillis(); ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ ++ // three files, echo contains 19 items ++ for (int i = 0; i < 3; i++) { ++ for (int j = 0; j < 20 - 1; j++) { ++ AppendResult result = indexService.putKey( ++ TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(j)), ++ i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis()); ++ Assert.assertEquals(AppendResult.SUCCESS, result); ++ TimeUnit.MILLISECONDS.sleep(1); ++ } ++ } ++ ++ ArrayList files = new ArrayList<>(indexService.getTimeStoreTable().values()); ++ Assert.assertEquals(3, files.size()); ++ ++ for (int i = 0; i < 3; i++) { ++ List indexItems = indexService.queryAsync( ++ TOPIC_NAME, String.valueOf(1), 1, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(1, indexItems.size()); ++ ++ indexItems = indexService.queryAsync( ++ TOPIC_NAME, String.valueOf(1), 3, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(3, indexItems.size()); ++ ++ indexItems = indexService.queryAsync( ++ TOPIC_NAME, String.valueOf(1), 5, timestamp, System.currentTimeMillis()).get(); ++ Assert.assertEquals(3, indexItems.size()); ++ } ++ } ++ ++ @Test ++ public void concurrentGetTest() throws InterruptedException { ++ storeConfig.setTieredStoreIndexFileMaxIndexNum(2000); ++ indexService = new IndexStoreService(fileAllocator, filePath); ++ indexService.start(); ++ ++ int fileCount = 10; ++ for (int j = 0; j < fileCount; j++) { ++ for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) { ++ indexService.putKey(TOPIC_NAME, TOPIC_ID, j, Collections.singleton(String.valueOf(i)), ++ i * 100L, i * 100, System.currentTimeMillis()); ++ } ++ TimeUnit.MILLISECONDS.sleep(1); ++ } ++ ++ CountDownLatch latch = new CountDownLatch(fileCount * 3); ++ AtomicBoolean result = new AtomicBoolean(true); ++ ExecutorService executorService = Executors.newFixedThreadPool( ++ 4, new ThreadFactoryImpl("ConcurrentGetTest")); ++ ++ for (int i = 0; i < fileCount; i++) { ++ int finalI = i; ++ executorService.submit(() -> { ++ for (int j = 1; j <= 3; j++) { ++ try { ++ List indexItems = indexService.queryAsync( ++ TOPIC_NAME, String.valueOf(finalI), j * 5, 0, System.currentTimeMillis()).get(); ++ if (Math.min(fileCount, j * 5) != indexItems.size()) { ++ result.set(false); ++ } ++ } catch (Exception e) { ++ result.set(false); ++ } finally { ++ latch.countDown(); ++ } ++ } ++ }); ++ } ++ ++ Assert.assertTrue(latch.await(15, TimeUnit.SECONDS)); ++ executorService.shutdown(); ++ Assert.assertTrue(result.get()); ++ } ++} +\ No newline at end of file +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +index a413f2113..68277cacc 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +@@ -135,7 +135,6 @@ public class MessageBufferUtilTest { + Assert.assertEquals("uservalue0", properties.get("userkey")); + } + +- + @Test + public void testGetTotalSize() { + ByteBuffer buffer = buildMockedMessageBuffer(); +diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml +index a7933b5ef..ac0895e05 100644 +--- a/tieredstore/src/test/resources/rmq.logback-test.xml ++++ b/tieredstore/src/test/resources/rmq.logback-test.xml +@@ -19,11 +19,22 @@ + + +- %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n ++ ${CONSOLE_LOG_PATTERN} + + + ++ ++ + +- ++ + ++ ++ ++ ++ ++ ++ ++ ++ + +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 743ffa7adb51c30db5260e0800d88d47b4a9e77e..a2e4839bbeba70bce623526b3ea1d9160f6fc718 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 36 +Release: 37 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -45,6 +45,7 @@ Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch Patch0035: patch035-backport-fix-some-bugs.patch +Patch0036: patch036-backport-RIP65.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -85,6 +86,9 @@ exit 0 %changelog +* Mon Dec 11 2023 ShiZhili - 5.1.3-37 +- backport rip 65 + * Mon Dec 11 2023 ShiZhili - 5.1.3-36 - backport fix some bugs