From e4cceeb55192a4f238a16c4a4d50e1d721b097a2 Mon Sep 17 00:00:00 2001
From: xjf <965921784@qq.com>
Date: Tue, 29 Apr 2025 17:31:04 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E5=96=84v6-dev=E7=9A=84?=
=?UTF-8?q?=E9=99=90=E6=B5=81=E5=99=A8=EF=BC=8C=E5=90=8C=E6=97=B6=E5=AE=9E?=
=?UTF-8?q?=E7=8E=B0=E5=8F=AF=E8=87=AA=E5=AE=9A=E4=B9=89=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E5=AF=B9=E5=BA=94=E7=9A=84=E9=99=90=E6=B5=81=E5=99=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../ratelimiter/FixedWindowRateLimiter.java | 105 ++++++++
.../ratelimiter/LeakyBucketRateLimiter.java | 99 ++++++++
.../core/thread/ratelimiter/RateLimiter.java | 4 +-
.../ratelimiter/RateLimiterFactory.java | 81 ++++++
.../thread/ratelimiter/RateLimiterType.java | 41 +++
.../ratelimiter/SlidingWindowRateLimiter.java | 208 +++++++++++++++
.../FixedWindowRateLimiterTest.java | 160 ++++++++++++
.../LeakyBucketRateLimiterTest.java | 158 ++++++++++++
.../ratelimiter/RateLimiterFactoryTest.java | 238 ++++++++++++++++++
.../SlidingWindowRateLimiterTest.java | 193 ++++++++++++++
10 files changed, 1286 insertions(+), 1 deletion(-)
create mode 100644 hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiter.java
create mode 100644 hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiter.java
create mode 100644 hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactory.java
create mode 100644 hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterType.java
create mode 100644 hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiter.java
create mode 100644 hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiterTest.java
create mode 100644 hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiterTest.java
create mode 100644 hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactoryTest.java
create mode 100644 hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiterTest.java
diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiter.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiter.java
new file mode 100644
index 0000000000..975a80d956
--- /dev/null
+++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiter.java
@@ -0,0 +1,105 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.NamedThreadFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * 固定窗口(Fixed Window)限流器
+ * 固定窗口算法将时间分为固定大小的窗口,并在每个窗口内限制请求数量
+ *
+ *
+ * 窗口处理:通过scheduler定时器,定时重置窗口,重置周期为{@link RateLimiterConfig#getRefreshPeriod()}
+ * 每个窗口的容量为{@link RateLimiterConfig#getCapacity()}
+ *
+ *
+ *
+ * 请求处理:通过{@link #tryAcquire(int)} 方法请求,如果当前窗口已满,则返回false,表示请求失败。
+ *
+ *
+ * @author junfeng Xu
+ * @since 6.0.0
+ */
+public class FixedWindowRateLimiter extends SemaphoreRateLimiter implements Closeable {
+
+ protected final ScheduledExecutorService scheduler;
+ protected final AtomicInteger currentRequests;
+
+ /**
+ * 构造
+ *
+ * @param config 配置
+ */
+ public FixedWindowRateLimiter(final RateLimiterConfig config) {
+ super(config, null);
+ this.currentRequests = new AtomicInteger(0);
+ this.scheduler = configureScheduler();
+ //启动定时器
+ scheduleLimitRefresh();
+ }
+
+ @Override
+ public boolean tryAcquire(final int permits) {
+ // 检查当前窗口是否有足够的容量
+ int currentCount = currentRequests.get();
+ if (currentCount + permits > config.getCapacity()) {
+ return false;
+ }
+
+ // 增加当前窗口的请求计数
+ if (currentRequests.addAndGet(permits) <= config.getCapacity()) {
+ // 获取信号量,允许请求通过
+ return semaphore.tryAcquire(permits);
+ } else {
+ // 如果超过容量,回滚计数
+ currentRequests.addAndGet(-permits);
+ return false;
+ }
+ }
+
+ @Override
+ public void refreshLimit() {
+ // 重置窗口,释放所有信号量
+ final int permits = currentRequests.getAndSet(0);
+ if (permits > 0) {
+ // 重置信号量,准备下一个窗口
+ semaphore.release(permits);
+ }
+ }
+
+ @Override
+ public void close() {
+ scheduler.shutdown();
+ }
+
+ /**
+ * 创建定时器
+ *
+ * @return 定时器
+ */
+ private static ScheduledExecutorService configureScheduler() {
+ final ThreadFactory threadFactory = new NamedThreadFactory("FixedWindowLimiterScheduler-", true);
+ return new ScheduledThreadPoolExecutor(1, threadFactory);
+ }
+
+ /**
+ * 启动定时器,未定义则不启动
+ */
+ private void scheduleLimitRefresh() {
+ if (null == this.scheduler) {
+ return;
+ }
+ final long limitRefreshPeriod = this.config.getRefreshPeriod().toNanos();
+ scheduler.scheduleAtFixedRate(
+ this::refreshLimit,
+ limitRefreshPeriod,
+ limitRefreshPeriod,
+ TimeUnit.NANOSECONDS
+ );
+ }
+}
diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiter.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiter.java
new file mode 100644
index 0000000000..208fdd5aa4
--- /dev/null
+++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiter.java
@@ -0,0 +1,99 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.NamedThreadFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * 漏桶(Leaky Bucket)限流器
+ * 漏桶算法以固定的速率处理请求,不管请求速率如何变化,都按照固定速率处理,多余的请求会被丢弃或等待
+ *
+ *
+ * 漏桶处理:通过scheduler定时器,定时从漏桶中释放请求,释放速率固定。
+ * 释放周期为{@link RateLimiterConfig#getRefreshPeriod()},周期内释放个数为{@link RateLimiterConfig#getMaxReleaseCount()}
+ *
+ *
+ *
+ * 请求处理:通过{@link #tryAcquire(int)} 方法请求,如果漏桶已满,则返回false,表示请求失败。
+ *
+ *
+ * @author junfeng Xu
+ * @since 6.0.0
+ */
+public class LeakyBucketRateLimiter extends SemaphoreRateLimiter implements Closeable {
+
+ protected final ScheduledExecutorService scheduler;
+ protected final AtomicInteger pendingRequests;
+
+ /**
+ * 构造
+ *
+ * @param config 配置
+ */
+ public LeakyBucketRateLimiter(final RateLimiterConfig config) {
+ super(config, null);
+ this.pendingRequests = new AtomicInteger(0);
+ this.scheduler = configureScheduler();
+ //启动定时器
+ scheduleLimitRefresh();
+ }
+
+ @Override
+ public boolean tryAcquire(final int permits) {
+ // 检查是否有足够的容量接收新请求
+ if (pendingRequests.get() + permits > config.getCapacity()) {
+ return false;
+ }
+
+ // 增加待处理请求计数
+ pendingRequests.addAndGet(permits);
+ return true;
+ }
+
+ @Override
+ public void refreshLimit() {
+ // 每个周期释放固定数量的请求
+ int toRelease = Math.min(pendingRequests.get(), config.getMaxReleaseCount());
+ if (toRelease > 0) {
+ pendingRequests.addAndGet(-toRelease);
+ // 释放信号量,允许新的请求进入
+ semaphore.release(toRelease);
+ }
+ }
+
+ @Override
+ public void close() {
+ scheduler.shutdown();
+ }
+
+ /**
+ * 创建定时器
+ *
+ * @return 定时器
+ */
+ private static ScheduledExecutorService configureScheduler() {
+ final ThreadFactory threadFactory = new NamedThreadFactory("LeakyBucketLimiterScheduler-", true);
+ return new ScheduledThreadPoolExecutor(1, threadFactory);
+ }
+
+ /**
+ * 启动定时器,未定义则不启动
+ */
+ private void scheduleLimitRefresh() {
+ if (null == this.scheduler) {
+ return;
+ }
+ final long limitRefreshPeriod = this.config.getRefreshPeriod().toNanos();
+ scheduler.scheduleAtFixedRate(
+ this::refreshLimit,
+ limitRefreshPeriod,
+ limitRefreshPeriod,
+ TimeUnit.NANOSECONDS
+ );
+ }
+}
diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiter.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiter.java
index 34373cd513..ecb28cf583 100644
--- a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiter.java
+++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiter.java
@@ -16,6 +16,8 @@
package org.dromara.hutool.core.thread.ratelimiter;
+import java.io.Closeable;
+
/**
* 限流接口
* 通过实现此接口以实现不同的限流策略,如令牌桶、分布式限流等
@@ -23,7 +25,7 @@ package org.dromara.hutool.core.thread.ratelimiter;
* @author Looly
* @since 6.0.0
*/
-public interface RateLimiter {
+public interface RateLimiter extends Closeable {
/**
* 尝试获取许可,非阻塞方法
diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactory.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactory.java
new file mode 100644
index 0000000000..463276bf2f
--- /dev/null
+++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactory.java
@@ -0,0 +1,81 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.lang.Assert;
+
+/**
+ * 限流器工厂类,用于创建不同类型的限流器
+ *
+ * @author junfeng Xu
+ * @since 6.0.0
+ */
+public class RateLimiterFactory {
+
+ /**
+ * 创建限流器
+ *
+ * @param type 限流器类型
+ * @param config 限流器配置
+ * @return 限流器实例
+ */
+ public static RateLimiter create(final RateLimiterType type, final RateLimiterConfig config) {
+ Assert.notNull(type, "RateLimiter type must not be null");
+ Assert.notNull(config, "RateLimiterConfig must not be null");
+
+ RateLimiter rateLimiter = null;
+ switch (type) {
+ case TOKEN_BUCKET:
+ rateLimiter = new TokenBucketRateLimiter(config);
+ break;
+ case LEAKY_BUCKET:
+ rateLimiter = new LeakyBucketRateLimiter(config);
+ break;
+ case FIXED_WINDOW:
+ rateLimiter = new FixedWindowRateLimiter(config);
+ break;
+ case SLIDING_WINDOW:
+ rateLimiter = new SlidingWindowRateLimiter(config);
+ break;
+ }
+ return rateLimiter;
+ }
+
+ /**
+ * 创建令牌桶限流器
+ *
+ * @param config 限流器配置
+ * @return 令牌桶限流器
+ */
+ public static TokenBucketRateLimiter createTokenBucket(final RateLimiterConfig config) {
+ return new TokenBucketRateLimiter(config);
+ }
+
+ /**
+ * 创建漏桶限流器
+ *
+ * @param config 限流器配置
+ * @return 漏桶限流器
+ */
+ public static LeakyBucketRateLimiter createLeakyBucket(final RateLimiterConfig config) {
+ return new LeakyBucketRateLimiter(config);
+ }
+
+ /**
+ * 创建固定窗口限流器
+ *
+ * @param config 限流器配置
+ * @return 固定窗口限流器
+ */
+ public static FixedWindowRateLimiter createFixedWindow(final RateLimiterConfig config) {
+ return new FixedWindowRateLimiter(config);
+ }
+
+ /**
+ * 创建滑动窗口限流器
+ *
+ * @param config 限流器配置
+ * @return 滑动窗口限流器
+ */
+ public static SlidingWindowRateLimiter createSlidingWindow(final RateLimiterConfig config) {
+ return new SlidingWindowRateLimiter(config);
+ }
+}
diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterType.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterType.java
new file mode 100644
index 0000000000..83c01d3621
--- /dev/null
+++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterType.java
@@ -0,0 +1,41 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+/**
+ * 限流器类型枚举
+ *
+ * @author junfeng Xu
+ * @since 6.0.0
+ */
+public enum RateLimiterType {
+ /**
+ * 令牌桶限流算法
+ *
+ * 令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输
+ *
+ */
+ TOKEN_BUCKET,
+
+ /**
+ * 漏桶限流算法
+ *
+ * 漏桶算法以固定的速率处理请求,不管请求速率如何变化,都按照固定速率处理,多余的请求会被丢弃或等待
+ *
+ */
+ LEAKY_BUCKET,
+
+ /**
+ * 固定窗口限流算法
+ *
+ * 固定窗口算法将时间分为固定大小的窗口,并在每个窗口内限制请求数量
+ *
+ */
+ FIXED_WINDOW,
+
+ /**
+ * 滑动窗口限流算法
+ *
+ * 滑动窗口算法是对固定窗口的改进,它将时间分为多个小窗口,并随着时间推移滑动,避免了边界突刺问题
+ *
+ */
+ SLIDING_WINDOW
+}
diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiter.java b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiter.java
new file mode 100644
index 0000000000..1f9a2dddc9
--- /dev/null
+++ b/hutool-core/src/main/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiter.java
@@ -0,0 +1,208 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.NamedThreadFactory;
+
+import java.io.Closeable;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * 滑动窗口(Sliding Window)限流器
+ * 滑动窗口算法是对固定窗口的改进,它将时间分为多个小窗口,并随着时间推移滑动,避免了边界突刺问题
+ *
+ *
+ * 窗口处理:将一个大的时间窗口分成多个小窗口,每个小窗口记录通过的请求数量。
+ * 随着时间推移,小窗口会滑动,过期的小窗口会被移除,新的小窗口会被添加。
+ * 滑动窗口的总容量为{@link RateLimiterConfig#getCapacity()},刷新周期为{@link RateLimiterConfig#getRefreshPeriod()}
+ *
+ *
+ *
+ * 请求处理:通过{@link #tryAcquire(int)} 方法请求,如果当前滑动窗口内的请求总数已达到容量上限,则返回false,表示请求失败。
+ *
+ *
+ * @author junfeng Xu
+ * @since 6.0.0
+ */
+public class SlidingWindowRateLimiter extends SemaphoreRateLimiter implements Closeable {
+
+ protected final ScheduledExecutorService scheduler;
+ protected final AtomicInteger totalRequests;
+ protected final Queue windowSlots;
+ protected final ReentrantLock lock;
+
+ // 小窗口的数量,默认为10个
+ private final int subWindowCount;
+ // 每个小窗口的时间长度(纳秒)
+ private final long subWindowTimeNanos;
+
+ /**
+ * 构造
+ *
+ * @param config 配置
+ */
+ public SlidingWindowRateLimiter(final RateLimiterConfig config) {
+ this(config, 10); // 默认将窗口分为10个小窗口
+ }
+
+ /**
+ * 构造
+ *
+ * @param config 配置
+ * @param subWindowCount 小窗口数量
+ */
+ public SlidingWindowRateLimiter(final RateLimiterConfig config, final int subWindowCount) {
+ super(config, null);
+ this.subWindowCount = subWindowCount;
+ this.subWindowTimeNanos = config.getRefreshPeriod().toNanos() / subWindowCount;
+ this.totalRequests = new AtomicInteger(0);
+ this.windowSlots = new LinkedList<>();
+ this.lock = new ReentrantLock();
+ this.scheduler = configureScheduler();
+
+ // 初始化窗口
+ initializeWindow();
+
+ // 启动定时器
+ scheduleLimitRefresh();
+ }
+
+ @Override
+ public boolean tryAcquire(final int permits) {
+ // 检查当前滑动窗口是否有足够的容量
+ if (totalRequests.get() + permits > config.getCapacity()) {
+ return false;
+ }
+
+ lock.lock();
+ try {
+ // 再次检查容量(因为可能在获取锁的过程中已经变化)
+ if (totalRequests.get() + permits > config.getCapacity()) {
+ return false;
+ }
+
+ // 将请求添加到当前窗口
+ WindowSlot currentSlot = windowSlots.peek();
+ if (currentSlot != null) {
+ currentSlot.addRequests(permits);
+ totalRequests.addAndGet(permits);
+
+ // 获取信号量,允许请求通过
+ return semaphore.tryAcquire(permits);
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void refreshLimit() {
+ long currentTimeNanos = System.nanoTime();
+
+ lock.lock();
+ try {
+ // 移除过期的窗口
+ while (!windowSlots.isEmpty()) {
+ WindowSlot oldestSlot = windowSlots.peek();
+ if (currentTimeNanos - oldestSlot.getStartTime() > config.getRefreshPeriod().toNanos()) {
+ // 移除过期窗口,减少总请求数
+ WindowSlot removed = windowSlots.poll();
+ int releasedPermits = removed.getRequestCount();
+ totalRequests.addAndGet(-releasedPermits);
+
+ // 释放信号量
+ if (releasedPermits > 0) {
+ semaphore.release(releasedPermits);
+ }
+
+ // 添加新窗口
+ long newSlotStartTime = removed.getStartTime() + subWindowTimeNanos * subWindowCount;
+ windowSlots.offer(new WindowSlot(newSlotStartTime));
+ } else {
+ break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ scheduler.shutdown();
+ }
+
+ /**
+ * 初始化滑动窗口
+ */
+ private void initializeWindow() {
+ long currentTimeNanos = System.nanoTime();
+ lock.lock();
+ try {
+ // 创建初始的小窗口
+ for (int i = 0; i < subWindowCount; i++) {
+ long startTime = currentTimeNanos - (subWindowCount - 1 - i) * subWindowTimeNanos;
+ windowSlots.offer(new WindowSlot(startTime));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * 创建定时器
+ *
+ * @return 定时器
+ */
+ private static ScheduledExecutorService configureScheduler() {
+ final ThreadFactory threadFactory = new NamedThreadFactory("SlidingWindowLimiterScheduler-", true);
+ return new ScheduledThreadPoolExecutor(1, threadFactory);
+ }
+
+ /**
+ * 启动定时器,未定义则不启动
+ */
+ private void scheduleLimitRefresh() {
+ if (null == this.scheduler) {
+ return;
+ }
+ // 使用小窗口的时间长度作为刷新周期,使滑动更平滑
+ scheduler.scheduleAtFixedRate(
+ this::refreshLimit,
+ subWindowTimeNanos,
+ subWindowTimeNanos,
+ TimeUnit.NANOSECONDS
+ );
+ }
+
+ /**
+ * 窗口槽,表示一个小的时间窗口
+ */
+ private static class WindowSlot {
+ private final long startTime;
+ private final AtomicInteger requestCount;
+
+ public WindowSlot(long startTime) {
+ this.startTime = startTime;
+ this.requestCount = new AtomicInteger(0);
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public int getRequestCount() {
+ return requestCount.get();
+ }
+
+ public void addRequests(int count) {
+ requestCount.addAndGet(count);
+ }
+ }
+}
diff --git a/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiterTest.java b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiterTest.java
new file mode 100644
index 0000000000..c0a2fee5cd
--- /dev/null
+++ b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/FixedWindowRateLimiterTest.java
@@ -0,0 +1,160 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.ThreadUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link FixedWindowRateLimiter} 的单元测试
+ *
+ * @author junfeng Xu
+ */
+public class FixedWindowRateLimiterTest {
+
+ @Test
+ void testBasicRateLimiting() {
+ // 创建配置:容量为10
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(500));
+
+ // 创建固定窗口限流器
+ final FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(config);
+
+ try {
+ // 测试基本限流功能
+ // 一次性获取部分容量
+ final boolean b1 = rateLimiter.tryAcquire(8);
+ Assertions.assertTrue(b1, "应该能够获取部分容量");
+
+ // 再获取部分容量
+ final boolean b2 = rateLimiter.tryAcquire(2);
+ Assertions.assertTrue(b2, "应该能够获取剩余容量");
+
+ // 超过容量限制,应该被拒绝
+ final boolean b3 = rateLimiter.tryAcquire(1);
+ Assertions.assertFalse(b3, "超过容量限制应该被拒绝");
+
+ // 等待一个完整的刷新周期
+ ThreadUtil.sleep(600);
+
+ // 窗口应该已经重置,可以再次获取容量
+ final boolean b4 = rateLimiter.tryAcquire(10);
+ Assertions.assertTrue(b4, "窗口重置后应该能够再次获取全部容量");
+ } finally {
+ // 关闭限流器,释放资源
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testWindowReset() {
+ // 创建配置:容量为5,刷新周期为200ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(200));
+
+ // 创建固定窗口限流器
+ final FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(config);
+
+ try {
+ // 获取部分容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(3), "应该能够获取部分容量");
+
+ // 等待不足一个周期的时间
+ ThreadUtil.sleep(100);
+
+ // 获取剩余容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "应该能够获取剩余容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待一个完整的刷新周期
+ ThreadUtil.sleep(200);
+
+ // 窗口应该已经重置
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "窗口重置后应该能够获取全部容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待多个刷新周期
+ ThreadUtil.sleep(500);
+
+ // 窗口应该已经重置多次,但容量仍然是固定的
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "多个周期后窗口重置应该能够获取全部容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testConcurrentRequests() throws InterruptedException {
+ // 创建配置:容量为15,刷新周期为500ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(15)
+ .setRefreshPeriod(Duration.ofMillis(500));
+
+ // 创建固定窗口限流器
+ final FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(config);
+
+ try {
+ // 创建线程池
+ final ExecutorService executorService = Executors.newFixedThreadPool(10);
+ final CountDownLatch latch = new CountDownLatch(10);
+ final AtomicInteger successCount = new AtomicInteger(0);
+
+ // 10个线程同时尝试获取许可
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ // 每个线程尝试获取2个许可
+ if (rateLimiter.tryAcquire(2)) {
+ successCount.incrementAndGet();
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // 等待所有线程完成
+ latch.await();
+ executorService.shutdown();
+
+ // 验证结果:容量为15,每个线程请求2个,最多允许7个线程成功
+ Assertions.assertEquals(7, successCount.get(), "并发请求时应该限制在容量范围内");
+
+ // 等待刷新周期
+ ThreadUtil.sleep(600);
+
+ // 验证窗口已重置
+ Assertions.assertTrue(rateLimiter.tryAcquire(15), "窗口重置后应该恢复全部容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testResourceRelease() {
+ // 创建配置
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(100));
+
+ // 创建限流器
+ final FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(config);
+
+ // 获取许可
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "应该能够获取所有容量");
+
+ // 关闭限流器
+ rateLimiter.close();
+
+ // 验证关闭后的行为,此处主要是确保close()方法不会抛出异常
+ Assertions.assertDoesNotThrow(() -> rateLimiter.close(), "重复关闭不应抛出异常");
+ }
+}
diff --git a/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiterTest.java b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiterTest.java
new file mode 100644
index 0000000000..60b35581b3
--- /dev/null
+++ b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/LeakyBucketRateLimiterTest.java
@@ -0,0 +1,158 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.ThreadUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link LeakyBucketRateLimiter} 的单元测试
+ *
+ * @author junfeng Xu
+ */
+public class LeakyBucketRateLimiterTest {
+
+ @Test
+ void testBasicRateLimiting() {
+ // 创建配置:容量为10,刷新周期为500ms,每个周期释放5个请求
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(500))
+ .setMaxReleaseCount(5);
+
+ // 创建漏桶限流器
+ final LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(config);
+
+ try {
+ // 测试基本限流功能
+ // 一次性添加8个请求到漏桶
+ final boolean b1 = rateLimiter.tryAcquire(8);
+ Assertions.assertTrue(b1, "应该能够接收请求数量在容量范围内的请求");
+
+ // 超过容量限制,应该被拒绝
+ final boolean b2 = rateLimiter.tryAcquire(3);
+ Assertions.assertFalse(b2, "超过容量限制应该被拒绝");
+
+ // 等待一个完整的刷新周期
+ ThreadUtil.sleep(600);
+
+ // 漏桶应该已经释放了5个请求的空间
+ final boolean b3 = rateLimiter.tryAcquire(5);
+ Assertions.assertTrue(b3, "漏桶释放空间后应该能够接收新请求");
+
+ // 再次超过容量限制,应该被拒绝
+ final boolean b4 = rateLimiter.tryAcquire(3);
+ Assertions.assertFalse(b4, "再次超过容量限制应该被拒绝");
+ } finally {
+ // 关闭限流器,释放资源
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testContinuousReleasing() {
+ // 创建配置:容量为10,刷新周期为200ms,每个周期释放2个请求
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(200))
+ .setMaxReleaseCount(2);
+
+ // 创建漏桶限流器
+ final LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(config);
+
+ try {
+ // 填满漏桶
+ Assertions.assertTrue(rateLimiter.tryAcquire(10), "应该能够接收容量范围内的请求");
+
+ // 等待一个刷新周期
+ ThreadUtil.sleep(250);
+
+ // 应该释放了2个请求的空间
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "应该释放了2个请求的空间");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过释放空间应该被拒绝");
+
+ // 再等待两个刷新周期
+ ThreadUtil.sleep(450);
+
+ // 应该又释放了4个请求的空间
+ Assertions.assertTrue(rateLimiter.tryAcquire(4), "应该释放了4个请求的空间");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过释放空间应该被拒绝");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testConcurrentRequests() throws InterruptedException {
+ // 创建配置:容量为20,刷新周期为500ms,每个周期释放10个请求
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(20)
+ .setRefreshPeriod(Duration.ofMillis(500))
+ .setMaxReleaseCount(10);
+
+ // 创建漏桶限流器
+ final LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(config);
+
+ try {
+ // 创建线程池
+ final ExecutorService executorService = Executors.newFixedThreadPool(10);
+ final CountDownLatch latch = new CountDownLatch(10);
+ final AtomicInteger successCount = new AtomicInteger(0);
+
+ // 10个线程同时尝试获取许可
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ // 每个线程尝试获取3个许可
+ if (rateLimiter.tryAcquire(3)) {
+ successCount.incrementAndGet();
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // 等待所有线程完成
+ latch.await();
+ executorService.shutdown();
+
+ // 验证结果:容量为20,每个线程请求3个,最多允许6个线程成功
+ Assertions.assertEquals(6, successCount.get(), "并发请求时应该限制在容量范围内");
+
+ // 等待刷新周期
+ ThreadUtil.sleep(600);
+
+ // 验证漏桶已释放部分空间
+ Assertions.assertTrue(rateLimiter.tryAcquire(10), "刷新后应该释放部分容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testResourceRelease() {
+ // 创建配置
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(100))
+ .setMaxReleaseCount(1);
+
+ // 创建限流器
+ final LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(config);
+
+ // 获取许可
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "应该能够获取所有容量");
+
+ // 关闭限流器
+ rateLimiter.close();
+
+ // 验证关闭后的行为,此处主要是确保close()方法不会抛出异常
+ Assertions.assertDoesNotThrow(() -> rateLimiter.close(), "重复关闭不应抛出异常");
+ }
+}
diff --git a/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactoryTest.java b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactoryTest.java
new file mode 100644
index 0000000000..0b9328f077
--- /dev/null
+++ b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/RateLimiterFactoryTest.java
@@ -0,0 +1,238 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.ThreadUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link RateLimiterFactory} 的单元测试
+ *
+ * @author junfeng Xu
+ */
+public class RateLimiterFactoryTest {
+
+ @Test
+ void testCreateByType() throws IOException {
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(500));
+
+ // 测试创建令牌桶限流器
+ final RateLimiter tokenBucketLimiter = RateLimiterFactory.create(RateLimiterType.TOKEN_BUCKET, config);
+ Assertions.assertTrue(tokenBucketLimiter instanceof TokenBucketRateLimiter, "应该创建TokenBucketRateLimiter实例");
+
+ // 测试创建漏桶限流器
+ final RateLimiter leakyBucketLimiter = RateLimiterFactory.create(RateLimiterType.LEAKY_BUCKET, config);
+ Assertions.assertTrue(leakyBucketLimiter instanceof LeakyBucketRateLimiter, "应该创建LeakyBucketRateLimiter实例");
+
+ // 测试创建固定窗口限流器
+ final RateLimiter fixedWindowLimiter = RateLimiterFactory.create(RateLimiterType.FIXED_WINDOW, config);
+ Assertions.assertTrue(fixedWindowLimiter instanceof FixedWindowRateLimiter, "应该创建FixedWindowRateLimiter实例");
+
+ // 测试创建滑动窗口限流器
+ final RateLimiter slidingWindowLimiter = RateLimiterFactory.create(RateLimiterType.SLIDING_WINDOW, config);
+ Assertions.assertTrue(slidingWindowLimiter instanceof SlidingWindowRateLimiter, "应该创建SlidingWindowRateLimiter实例");
+
+ // 关闭所有限流器
+ tokenBucketLimiter.close();
+ leakyBucketLimiter.close();
+ fixedWindowLimiter.close();
+ slidingWindowLimiter.close();
+ }
+
+ @Test
+ void testCreateTokenBucket() {
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(200))
+ .setMaxReleaseCount(2);
+
+ // 创建令牌桶限流器
+ final TokenBucketRateLimiter rateLimiter = RateLimiterFactory.createTokenBucket(config);
+
+ try {
+ // 测试基本限流功能
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "应该能够获取所有容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待一个刷新周期,应该释放2个令牌
+ ThreadUtil.sleep(300);
+
+ // 应该能获取2个令牌(最大释放数)
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "应该能够获取释放的令牌");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过释放数应该被拒绝");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testCreateLeakyBucket() {
+ // 创建配置:容量为5,刷新周期为200ms,最大释放数为2
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(200))
+ .setMaxReleaseCount(2);
+
+ // 创建漏桶限流器
+ final LeakyBucketRateLimiter rateLimiter = RateLimiterFactory.createLeakyBucket(config);
+
+ try {
+ // 测试基本限流功能
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "应该能够获取所有容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待一个刷新周期,应该释放2个令牌
+ ThreadUtil.sleep(300);
+
+ // 应该能获取2个令牌(最大释放数)
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "应该能够获取释放的令牌");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过释放数应该被拒绝");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testCreateFixedWindow() {
+ // 创建配置:容量为10,刷新周期为300ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(300));
+
+ // 创建固定窗口限流器
+ final FixedWindowRateLimiter rateLimiter = RateLimiterFactory.createFixedWindow(config);
+
+ try {
+ // 测试基本限流功能
+ Assertions.assertTrue(rateLimiter.tryAcquire(8), "应该能够获取部分容量");
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "应该能够获取剩余容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待一个完整的刷新周期
+ ThreadUtil.sleep(400);
+
+ // 窗口应该已经重置,可以再次获取容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(10), "窗口重置后应该能够再次获取全部容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testCreateSlidingWindow() {
+ // 创建配置:容量为10,刷新周期为500ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(500));
+
+ // 使用工厂方法创建滑动窗口限流器
+ final SlidingWindowRateLimiter rateLimiter = RateLimiterFactory.createSlidingWindow(config);
+
+ try {
+ // 测试基本限流功能
+ Assertions.assertTrue(rateLimiter.tryAcquire(6), "应该能够获取部分容量");
+ Assertions.assertTrue(rateLimiter.tryAcquire(4), "应该能够获取剩余容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待部分刷新周期
+ ThreadUtil.sleep(300);
+
+ // 应该释放部分容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "应该能够获取部分释放的容量");
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过释放容量应该被拒绝");
+
+ // 等待完整刷新周期
+ ThreadUtil.sleep(600);
+
+ // 窗口应该完全重置
+ Assertions.assertTrue(rateLimiter.tryAcquire(10), "窗口完全重置后应该能够获取全部容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testConcurrentFactoryCreation() throws InterruptedException {
+ // 创建配置:容量为20,刷新周期为500ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(20)
+ .setRefreshPeriod(Duration.ofMillis(500));
+
+ // 创建线程池
+ final ExecutorService executorService = Executors.newFixedThreadPool(4);
+ final CountDownLatch latch = new CountDownLatch(4);
+ final AtomicInteger successCount = new AtomicInteger(0);
+
+ // 并发创建不同类型的限流器
+ executorService.submit(() -> {
+ try {
+ final RateLimiter limiter = RateLimiterFactory.create(RateLimiterType.TOKEN_BUCKET, config);
+ if (limiter.tryAcquire(5)) {
+ successCount.incrementAndGet();
+ }
+ limiter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ executorService.submit(() -> {
+ try {
+ final RateLimiter limiter = RateLimiterFactory.create(RateLimiterType.LEAKY_BUCKET, config);
+ if (limiter.tryAcquire(5)) {
+ successCount.incrementAndGet();
+ }
+ limiter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ executorService.submit(() -> {
+ try {
+ final RateLimiter limiter = RateLimiterFactory.create(RateLimiterType.FIXED_WINDOW, config);
+ if (limiter.tryAcquire(5)) {
+ successCount.incrementAndGet();
+ }
+ limiter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ executorService.submit(() -> {
+ try {
+ final RateLimiter limiter = RateLimiterFactory.create(RateLimiterType.SLIDING_WINDOW, config);
+ if (limiter.tryAcquire(5)) {
+ successCount.incrementAndGet();
+ }
+ limiter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ // 等待所有线程完成
+ latch.await();
+ executorService.shutdown();
+
+ // 验证所有限流器都能正常工作
+ Assertions.assertEquals(4, successCount.get(), "所有限流器都应该能够正常获取许可");
+ }
+}
diff --git a/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiterTest.java b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiterTest.java
new file mode 100644
index 0000000000..f1e183f49d
--- /dev/null
+++ b/hutool-core/src/test/java/org/dromara/hutool/core/thread/ratelimiter/SlidingWindowRateLimiterTest.java
@@ -0,0 +1,193 @@
+package org.dromara.hutool.core.thread.ratelimiter;
+
+import org.dromara.hutool.core.thread.ThreadUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link SlidingWindowRateLimiter} 的单元测试
+ *
+ * @author junfeng Xu
+ */
+public class SlidingWindowRateLimiterTest {
+
+ @Test
+ void testBasicRateLimiting() {
+ // 创建配置:容量为5,刷新周期为500ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(500));
+
+ // 创建滑动窗口限流器
+ final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(config);
+
+ try {
+ // 测试基本限流功能
+ // 一次性获取所有容量
+ final boolean b1 = rateLimiter.tryAcquire(5);
+ Assertions.assertTrue(b1, "应该能够获取所有可用容量");
+
+ // 超过容量限制,应该被拒绝
+ final boolean b2 = rateLimiter.tryAcquire(1);
+ Assertions.assertFalse(b2, "超过容量限制应该被拒绝");
+
+ // 等待一个完整的刷新周期
+ ThreadUtil.sleep(600);
+
+ // 窗口应该已经滑动,释放了容量
+ final boolean b3 = rateLimiter.tryAcquire(5);
+ Assertions.assertTrue(b3, "窗口滑动后应该能够再次获取容量");
+ } finally {
+ // 关闭限流器,释放资源
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testSlidingWindowMechanism() {
+ // 创建配置:容量为10,刷新周期为1000ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(1000));
+
+ // 创建滑动窗口限流器,设置5个小窗口
+ final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(config, 5);
+
+ try {
+ // 获取部分容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(4), "应该能够获取部分容量");
+
+ // 等待一个小窗口的时间(200ms)
+ ThreadUtil.sleep(250);
+
+ // 再获取部分容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(4), "应该能够获取更多容量");
+
+ // 再获取部分容量,此时应该接近限制
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "应该能够获取剩余容量");
+
+ // 超过限制,应该被拒绝
+ Assertions.assertFalse(rateLimiter.tryAcquire(1), "超过容量限制应该被拒绝");
+
+ // 等待一个小窗口的时间
+ ThreadUtil.sleep(250);
+
+ // 第一个小窗口应该已经滑出,释放了部分容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(2), "窗口滑动后应该释放部分容量");
+
+ // 等待剩余时间,使所有窗口滑出
+ ThreadUtil.sleep(800);
+
+ // 所有窗口都应该已经滑出,完全恢复容量
+ Assertions.assertTrue(rateLimiter.tryAcquire(10), "所有窗口滑出后应该完全恢复容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testConcurrentRequests() throws InterruptedException {
+ // 创建配置:容量为20,刷新周期为1000ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(20)
+ .setRefreshPeriod(Duration.ofMillis(1000));
+
+ // 创建滑动窗口限流器
+ final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(config);
+
+ try {
+ // 创建线程池
+ final ExecutorService executorService = Executors.newFixedThreadPool(10);
+ final CountDownLatch latch = new CountDownLatch(10);
+ final AtomicInteger successCount = new AtomicInteger(0);
+
+ // 10个线程同时尝试获取许可
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ // 每个线程尝试获取3个许可
+ if (rateLimiter.tryAcquire(3)) {
+ successCount.incrementAndGet();
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // 等待所有线程完成
+ latch.await();
+ executorService.shutdown();
+
+ // 验证结果:容量为20,每个线程请求3个,最多允许6个线程成功
+ Assertions.assertEquals(6, successCount.get(), "并发请求时应该限制在容量范围内");
+
+ // 等待刷新周期
+ ThreadUtil.sleep(1100);
+
+ // 验证容量已恢复
+ Assertions.assertTrue(rateLimiter.tryAcquire(20), "刷新后应该恢复全部容量");
+ } finally {
+ rateLimiter.close();
+ }
+ }
+
+ @Test
+ void testDifferentWindowSizes() {
+ // 创建配置:容量为10,刷新周期为1000ms
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(10)
+ .setRefreshPeriod(Duration.ofMillis(1000));
+
+ // 创建两个不同窗口数量的限流器
+ final SlidingWindowRateLimiter rateLimiter1 = new SlidingWindowRateLimiter(config, 2); // 2个窗口
+ final SlidingWindowRateLimiter rateLimiter2 = new SlidingWindowRateLimiter(config, 10); // 10个窗口
+
+ try {
+ // 两个限流器都获取5个许可
+ Assertions.assertTrue(rateLimiter1.tryAcquire(5), "应该能够获取部分容量");
+ Assertions.assertTrue(rateLimiter2.tryAcquire(5), "应该能够获取部分容量");
+
+ // 等待半个刷新周期
+ ThreadUtil.sleep(500);
+
+ // 对于2个窗口的限流器,第一个窗口应该已经滑出,释放了一半容量
+ Assertions.assertTrue(rateLimiter1.tryAcquire(5), "2个窗口的限流器应该释放一半容量");
+
+ // 对于10个窗口的限流器,应该只释放了一小部分容量
+ Assertions.assertTrue(rateLimiter2.tryAcquire(1), "10个窗口的限流器应该释放少量容量");
+ Assertions.assertFalse(rateLimiter2.tryAcquire(5), "10个窗口的限流器不应该释放太多容量");
+ } finally {
+ rateLimiter1.close();
+ rateLimiter2.close();
+ }
+ }
+
+ @Test
+ void testResourceRelease() {
+ // 创建配置
+ final RateLimiterConfig config = RateLimiterConfig.of()
+ .setCapacity(5)
+ .setRefreshPeriod(Duration.ofMillis(100));
+
+ // 创建限流器
+ final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(config);
+
+ // 获取许可
+ Assertions.assertTrue(rateLimiter.tryAcquire(5), "应该能够获取所有容量");
+
+ // 关闭限流器
+ rateLimiter.close();
+
+ // 验证关闭后的行为,此处主要是确保close()方法不会抛出异常
+ // 由于调度器已关闭,不应再刷新窗口,但tryAcquire可能仍然工作
+ // 这里我们只是验证close()方法能正常执行
+ Assertions.assertDoesNotThrow(() -> rateLimiter.close(), "重复关闭不应抛出异常");
+ }
+}
--
Gitee