# SimpleMqttPool **Repository Path**: kami_xenos/mqtt_pool ## Basic Information - **Project Name**: SimpleMqttPool - **Description**: 基于org.eclipse.paho.client.mqttv3实现的一个简易的Mqtt连接池 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 10 - **Forks**: 6 - **Created**: 2020-09-01 - **Last Updated**: 2024-11-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 重构 SimpleMqttPool ## 介绍 在《[用Spring Boot实现一个简易的MQTT客户端](https://gitee.com/kami_xenos/simple-mqtt-client)》中介绍了一个自定义的MQTT的客户端的实现,但单一的客户端并不能满足实际的开发需求,这里基于之前实现的自定义客户端实现一个简单的连接池 ## 设计思路 之前的版本是用一个`List`存储所有的连接对象,每次获取时从`List`中删除,释放连接时重新加入`List`,然后用`synchronized` 关键字做并发控制。再参考了部分的`Druid`连接池的代码后,有了这个版本的设计。 > 1. 用一个数组 `activeConnections` 存储所有的连接,定义一个指针,指向当前数组中当前可用连接对象的最后一个下标 > > 2. 对池中的连接对象进行操作时,只要维持指针的位置,保证指针左边的所有对象都是可用的(滑动窗口) > > 3. 从池中取走一个对象时,向左移动指针;放回一个对象时,向右移动指针 > > 4. 之前的设计的连接对象在发送数据后会处于忙碌的状态,只有等待回调方法执行成功后,才会处于空闲可用的状态,所以用了一个数组 `busyConnections` 存储这些忙碌中的连接对象,等到这些对象空闲后,才放回到可用池中 > > 5. 对于需要进行删除的对象同样也用一个数组 `discardConnections` 进行存储,以上两个数组都只需要知道连接对象在 `activeConnections` 的下标即可,不需要存储一个完整的对象 > > 6. 用 `juc` 包下的类做并发控制,提高并发的效率 ## 具体实现 - 定义相关变量 ```java /** * 当前连接总数 */ private int activeCount; /** * 当前可以从池中拿取的连接数 */ private int poolingCount; /** * 存储所有连接的数组 */ private volatile Connection[] activeConnections; /** * 存储连接在 activeConnection 中的下标 */ private volatile Map connectionIndexDict; /** * 处于忙碌状态的连接,存储 activeConnections 中连接的ID */ private volatile String[] busyConnections; /** * 处于忙碌状态的连接数 */ private int busyCount; /** * 即将丢弃的连接, 存储 activeConnections 中连接的ID */ private volatile String[] discardConnections; /** * 将要丢弃的连接数 */ private int discardingCount; ``` - 连接池的初始化 ![init-pool](img/initPool.PNG) ```java /** * 连接池初始化 */ public void initialize() throws PoolInitializeException { if (isInitialized) { return; } ReentrantLock lock = this.lock; lock.lock(); try { // 一些配置文件属性的判断 // ... ... // 创建数组和字典 activeConnections = new Connection[maxSize]; connectionIndexDict = new ConcurrentHashMap<>(maxSize); busyConnections = new String[maxSize]; discardConnections = new String[2 * maxSize]; // 创建最小数量的连接 while (activeCount < minSize) { Connection connection = createConnection(); if (connection != null) { activeConnections[activeCount] = connection; connectionIndexDict.put(connection.getClientId(), activeCount++); poolingCount++; } } // 创建并启动 连接创建线程 createAndStartCreatorThread(); // 创建并启动 连接销毁线程 createAndStartDestroyThread(); // 创建并启动 连接状态刷新线程 createAndStartRefreshThread(); // 创建并启动 连接状态监视线程 createAndStartWatchThread(); // 等待所有线程启动 initLatch.await(); this.initTime = LocalDateTime.now(); this.isInitialized = true; } catch (Exception e) { throw new PoolInitializeException("连接池初始化发生异常", e); } finally { lock.unlock(); log.info("连接池初始化已完成,当前可用连接数为 " + poolingCount); } } ``` - 获取连接 ![get-connection](img/getConnection.PNG) ```java private Connection getLast() throws InterruptedException { try { // 如果当前可以弹出的连接数为 0 while (poolingCount == 0) { // 唤醒创建线程 empty.signal(); // 当前等待获取连接的线程数 +1 waitThreadCount++; try { // 等待 连接创建线程 创建新的连接,获取有连接被释放时唤醒 notEmpty.await(); } finally { // 执行到这里说明已经有新的连接可以被获取 waitThreadCount--; } } } catch (InterruptedException e) { notEmpty.signal(); throw e; } Connection last = null; try { // 判断当前连接是否可用,不可用的连接会被放入待删除的数组中等待删除 while (poolingCount > 0 && !testConnection(poolingCount - 1)) { poolingCount--; if (poolingCount == 0) { empty.signal(); waitThreadCount++; try { notEmpty.await(); } finally { waitThreadCount--; } } } } catch (InterruptedException e) { notEmpty.signal(); throw e; } last = activeConnections[--poolingCount]; return last; } ``` 如果当前连接对象不够,并且连接数未达到最大值,由连接创建线程去创建新的连接,过程如下 ![createConnection](img/createConnection.PNG) - 将连接放回池中 将连接放回去时,并不会直接将连接认为是可用的,而是放入到 忙碌连接数组中 ```java public void releaseConnection(Connection connection) { if (connection == null) { return; } lock.lock(); try { String clientId = connection.getClientId(); if (!connectionIndexDict.containsKey(clientId)) { return; } // 放入忙碌池中 busyConnections[busyCount++] = clientId; refreshCondition.signal(); } finally { lock.unlock(); } } ``` 然后由忙碌连接处理线程去做连接状态的更新操作,过程如下: ![refreshConnection](img/refreshConnection.PNG) - 清除无效连接 无效连接指那些连接断开的连接或者是已经超时的连接(长时间没有被使用的连接),它由状态监视线程监视产生并放入到待删除连接数组中,删除过程如下: ![closeConnection](img/closeConnection.PNG) 检查连接是否可用以及监视线程的操作 ```java /** * 判断一个连接是否可用,如果不可用,则加入到待销毁列表中 * @param connectionIndex 连接的下标 * @return 是否可用 */ private boolean testConnection(int connectionIndex) { if (connectionIndex > activeConnections.length) { return false; } Connection connection = activeConnections[connectionIndex]; long lastActiveTimeMillis = connection.lastActiveTimeMillis; long currMillis = System.currentTimeMillis(); int minSize = properties.getPool().getMinPoolSize(); long maxWaitTime = properties.getPool().getMaxWaitTime(); boolean needDiscard = connection.isInTrouble() || (activeCount > minSize && currMillis - lastActiveTimeMillis > maxWaitTime); if (needDiscard) { // 加入到销毁列表中 discardConnections[discardingCount++] = connection.getClientId(); // 唤醒销毁线程 discardCondition.signal(); return false; } return true; } /** * 创建并启动监视线程,定期扫描可用数组,将数组中超时的、不可用的连接移入到待销毁数组中 */ private void createAndStartWatchThread() { initLatch.countDown(); long maxWaitTime = properties.getPool().getMaxWaitTime(); WatcherThreadPool.THREAD_POOL.scheduledExecute(() -> { lock.lock(); try { int minPoolSize = properties.getPool().getMinPoolSize(); int index = poolingCount - 1; while (poolingCount > minPoolSize) { boolean isUsableConnection = testConnection(index); if (!isUsableConnection) { // 一旦检查到失效连接,更新可用连接的边界 swapAndUpdateIndex(index, --poolingCount); } index--; } } finally { lock.unlock(); } }, maxWaitTime, maxWaitTime); } ```