diff --git a/docker-compose.yml b/docker-compose.yml
index 3543fe0f74eae2550fc2e4ab5b75b7df9744137e..f00444ced09f21ca9ea779ee8e7bcdc2a33f4e80 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -2,7 +2,7 @@ networks:
mqtt-network:
driver: bridge
services:
- smart-mqtt:
+ mqtt-broker:
container_name: smart-mqtt
hostname: mqtt-broker
image: smartboot/smart-mqtt:latest
@@ -18,7 +18,8 @@ services:
options:
max-size: "100m"
max-file: "1"
-# emqx:
+
+# mqtt-broker:
# container_name: emqx
# hostname: mqtt-broker
# image: emqx/emqx:5.0.3
@@ -36,8 +37,7 @@ services:
smart-mqtt-bench:
depends_on:
-# - emqx
- - smart-mqtt
+ - mqtt-broker
image: smartboot/smart-mqtt-bench:latest
read_only: true
restart: always
@@ -51,6 +51,6 @@ services:
options:
max-size: "100m"
max-file: "1"
- command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dpublisher=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
-# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish
+ command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
+# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish
version: '3.7'
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index df998158249bafaf5a7c7f611c3dc20344b16ee2..dd9ff4f03d4613468096461d6271b6ba7be8aa14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,12 +4,12 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.15
+ 0.16
4.0.0
mqtt broker
- 0.15
+ 0.16
1.5.25
1.1.22
2.6
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index e594976cb02069091f854df52f6b3089d4edad91..6be6788478bf8572d221d0159384caebfc66ac5a 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.15
+ 0.16
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index 6f0e231c11a1e567a0b03347cb4427a6b2c59d1f..c06ab594e5b6b898d081a66381671326204679de 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -28,7 +28,7 @@ public class BrokerConfigure extends ToString {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.15";
+ public static final String VERSION = "v0.16";
static final Map SystemEnvironments = new HashMap<>();
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
index 24852427dc94c93de6d5ae422afe15adba679833..7a198172800ed495fce239b3edebf44254ebede9 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
@@ -20,7 +20,6 @@ import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.enums.MqttMetricEnum;
-import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
@@ -36,6 +35,7 @@ import org.smartboot.mqtt.common.to.MetricItemTO;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
+import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
import org.smartboot.socket.extension.plugins.AbstractPlugin;
import org.smartboot.socket.transport.AioQuickServer;
import org.smartboot.socket.transport.AioSession;
@@ -44,6 +44,7 @@ import org.yaml.snakeyaml.Yaml;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
+import java.nio.channels.AsynchronousChannelGroup;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -58,6 +59,7 @@ import java.util.ServiceLoader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -109,6 +111,7 @@ public class BrokerContextImpl implements BrokerContext {
* 统计指标
*/
private final Map metricMap = new HashMap<>();
+ private AsynchronousChannelGroup asynchronousChannelGroup;
@Override
public void init() throws IOException {
@@ -129,10 +132,18 @@ public class BrokerContextImpl implements BrokerContext {
try {
+ asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
+ int i;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "smart-mqtt-broker-" + (++i));
+ }
+ });
pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true);
server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor);
server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum()));
- server.start();
+ server.start(asynchronousChannelGroup);
System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")");
System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt");
System.out.println("Github: https://github.com/smartboot/smart-mqtt");
@@ -230,6 +241,8 @@ public class BrokerContextImpl implements BrokerContext {
providers.setConnectAuthenticationProvider(new ConfiguredConnectAuthenticationProviderImpl(this));
}
+ private final TopicSubscriber BREAK = new TopicSubscriber(null, null, null, 0, 0);
+
private void initPushThread() {
if (brokerConfigure.getTopicLimit() <= 0) {
brokerConfigure.setTopicLimit(10);
@@ -269,15 +282,24 @@ public class BrokerContextImpl implements BrokerContext {
}
try {
//存在待输出消息
- Collection subscribers = brokerTopic.getConsumeOffsets().values();
- subscribers.stream().filter(topicSubscriber -> topicSubscriber.isReady() && topicSubscriber.getPushVersion() != brokerTopic.getVersion().get()).forEach(topicSubscriber -> topicSubscriber.batchPublish(BrokerContextImpl.this));
- brokerTopic.setPushing(false);
- for (TopicSubscriber subscriber : subscribers) {
- if (subscriber.getPushVersion() != brokerTopic.getVersion().get()) {
- notifyPush(brokerTopic);
- break;
+ ConcurrentLinkedQueue subscribers = brokerTopic.getQueue();
+ subscribers.offer(BREAK);
+ TopicSubscriber subscriber = null;
+ int version = brokerTopic.getVersion().get();
+ while ((subscriber = subscribers.poll()) != BREAK) {
+// if (subscriber == BREAK) {
+// break;
+// }
+ try {
+ subscriber.batchPublish(BrokerContextImpl.this);
+ } catch (Exception e) {
+ LOGGER.error("batch publish exception:{}", e.getMessage());
}
}
+ brokerTopic.getSemaphore().release();
+ if (version != brokerTopic.getVersion().get() && !subscribers.isEmpty()) {
+ notifyPush(brokerTopic);
+ }
} catch (Exception e) {
LOGGER.error("brokerTopic:{} push message exception", brokerTopic.getTopic(), e);
}
@@ -333,8 +355,8 @@ public class BrokerContextImpl implements BrokerContext {
AsyncTask task = this;
PersistenceMessage storedMessage = providers.getRetainMessageProvider().get(subscriber.getTopic().getTopic(), subscriber.getRetainConsumerOffset());
if (storedMessage == null || storedMessage.getCreateTime() > subscriber.getLatestSubscribeTime()) {
- subscriber.setReady(true);
BrokerTopic topic = subscriber.getTopic();
+ topic.getQueue().offer(subscriber);
notifyPush(topic);
//完成retain消息的消费,正式开始监听Topic
@@ -344,51 +366,35 @@ public class BrokerContextImpl implements BrokerContext {
MqttSession session = subscriber.getMqttSession();
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(storedMessage.getPayload()).qos(subscriber.getMqttQoS()).topicName(storedMessage.getTopic());
- if (subscriber.getMqttQoS() == MqttQoS.AT_LEAST_ONCE || subscriber.getMqttQoS() == MqttQoS.EXACTLY_ONCE) {
- publishBuilder.packetId(session.newPacketId());
- }
if (session.getMqttVersion() == MqttVersion.MQTT_5) {
publishBuilder.publishProperties(new PublishProperties());
}
- MqttPublishMessage publishMessage = publishBuilder.build();
InflightQueue inflightQueue = session.getInflightQueue();
- int index = inflightQueue.offer(publishMessage, storedMessage.getOffset());
- session.publish(publishMessage, packetId -> {
- LOGGER.info("publish retain to client:{} success ,message:{} ", session.getClientId(), publishMessage);
- long offset = inflightQueue.commit(index);
- if (offset != -1) {
- subscriber.setRetainConsumerOffset(offset + 1);
- retainPushThreadPool.execute(task);
- } else {
- LOGGER.error("error...");
- }
-
- });
+ inflightQueue.offer(publishBuilder, offset -> {
+ LOGGER.info("publish retain to client:{} success ", session.getClientId());
+ subscriber.setRetainConsumerOffset(offset + 1);
+ retainPushThreadPool.execute(task);
+ }, storedMessage.getOffset());
+ session.flush();
}
});
}
});
eventBus.subscribe(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, (eventType, subscriber) -> {
LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getTopic().getTopic());
- subscriber.setReady(true);
+ subscriber.getTopic().getQueue().offer(subscriber);
});
}
- private void notifyPush(BrokerTopic topic) {
- if (topic.isPushing()) {
+ void notifyPush(BrokerTopic topic) {
+ if (!topic.getSemaphore().tryAcquire()) {
return;
}
- synchronized (topic) {
- //已加入推送队列
- if (topic.isPushing()) {
- return;
- }
- try {
- topic.setPushing(true);
- pushTopicQueue.put(topic);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ //已加入推送队列
+ try {
+ pushTopicQueue.put(topic);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
@@ -573,6 +579,7 @@ public class BrokerContextImpl implements BrokerContext {
pushTopicQueue.offer(SHUTDOWN_TOPIC);
pushThreadPool.shutdown();
server.shutdown();
+ asynchronousChannelGroup.shutdown();
pagePool.release();
//卸载插件
plugins.forEach(Plugin::uninstall);
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerQosPublisher.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerQosPublisher.java
deleted file mode 100644
index 28f825d6af87a0d7f632cf97ea36e8b5f3620f11..0000000000000000000000000000000000000000
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerQosPublisher.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.smartboot.mqtt.broker;
-
-import org.smartboot.mqtt.common.AbstractSession;
-import org.smartboot.mqtt.common.AsyncTask;
-import org.smartboot.mqtt.common.QosPublisher;
-import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.socket.util.QuickTimerTask;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2022/4/25
- */
-public class BrokerQosPublisher extends QosPublisher {
- private final BrokerContext mqttContext;
-
- public BrokerQosPublisher(BrokerContext mqttContext) {
- this.mqttContext = mqttContext;
- }
-
- @Override
- protected void retry(CompletableFuture future, AbstractSession session, MqttMessage mqttMessage) {
- //注册重试
- QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() {
- @Override
- public void execute() {
- if (!future.isDone()) {
- // 如果客户端发生过断链,则 mqttSession!=session
- System.out.println("retry...");
- MqttSession mqttSession = mqttContext.getSession(session.getClientId());
- mqttSession.write(mqttMessage);
- }
- }
- }, 1, TimeUnit.SECONDS);
- }
-}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java
index aa646f314bc6e670def34b174951402618cc229f..c127ee452c7fd560c5d965b1c1a239cf2323d8de 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java
@@ -4,6 +4,8 @@ import org.smartboot.mqtt.common.Topic;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -21,7 +23,9 @@ public class BrokerTopic extends Topic {
/**
* 当前Topic是否圈闭推送完成
*/
- private boolean pushing;
+ private final Semaphore semaphore = new Semaphore(1);
+
+ private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();
public BrokerTopic(String topic) {
super(topic);
@@ -35,11 +39,11 @@ public class BrokerTopic extends Topic {
return version;
}
- public boolean isPushing() {
- return pushing;
+ public Semaphore getSemaphore() {
+ return semaphore;
}
- public void setPushing(boolean pushing) {
- this.pushing = pushing;
+ public ConcurrentLinkedQueue getQueue() {
+ return queue;
}
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
index 8bcf6fe69f93f84edecc9fc8e9b1f08724cfc438..048732d41541778f69ef5dc87240932f7538dbfe 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
@@ -11,7 +11,6 @@ import org.smartboot.mqtt.broker.processor.PublishProcessor;
import org.smartboot.mqtt.broker.processor.SubscribeProcessor;
import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor;
import org.smartboot.mqtt.common.DefaultMqttWriter;
-import org.smartboot.mqtt.common.QosPublisher;
import org.smartboot.mqtt.common.enums.MqttMetricEnum;
import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
@@ -51,7 +50,6 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor onlineSessions = new ConcurrentHashMap<>();
private final Map, MqttProcessor> processorMap = new HashMap<>();
- private final QosPublisher qosPublisher;
{
processorMap.put(MqttPingReqMessage.class, new PingReqProcessor());
@@ -68,7 +66,6 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor sessionState.getSubscribers().put(topicSubscriber.getTopicFilterToken().getTopicFilter(), topicSubscriber.getMqttQoS()));
mqttContext.getProviders().getSessionStateProvider().store(clientId, sessionState);
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
index 8dd931c5480760d00221478305d792ee95fe85a8..2e2ce334a96e9801ab616021d4fd268f1ffe46ce 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
@@ -10,9 +10,10 @@ import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventType;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
+import java.util.concurrent.Semaphore;
+
/**
* Topic订阅者
*
@@ -47,9 +48,8 @@ public class TopicSubscriber {
private final long latestSubscribeTime = System.currentTimeMillis();
private TopicToken topicFilterToken;
- private int pushVersion = -1;
- private boolean ready = false;
+ private final Semaphore semaphore = new Semaphore(0);
public TopicSubscriber(BrokerTopic topic, MqttSession session, MqttQoS mqttQoS, long nextConsumerOffset, long retainConsumerOffset) {
this.topic = topic;
@@ -60,62 +60,65 @@ public class TopicSubscriber {
}
public void batchPublish(BrokerContext brokerContext) {
- nextConsumerOffset = publish0(brokerContext, 0, nextConsumerOffset);
+ if (mqttSession.isDisconnect()) {
+ return;
+ }
+ semaphore.release();
+ publish0(brokerContext, 0);
+ mqttSession.flush();
}
- private long publish0(BrokerContext brokerContext, int depth, long expectConsumerOffset) {
+ private void publish0(BrokerContext brokerContext, int depth) {
PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider();
- int version = topic.getVersion().get();
- PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), expectConsumerOffset);
+ PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset);
if (persistenceMessage == null) {
- pushVersion = version;
- mqttSession.flush();
- return expectConsumerOffset;
+ if (semaphore.tryAcquire()) {
+ topic.getQueue().offer(this);
+ }
+ return;
}
if (depth > 16) {
- mqttSession.flush();
-// System.out.println("退出递归...");
- return expectConsumerOffset;
+// LOGGER.info("退出递归...");
+ return;
}
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic());
- if (mqttQoS == MqttQoS.AT_LEAST_ONCE || mqttQoS == MqttQoS.EXACTLY_ONCE) {
- publishBuilder.packetId(mqttSession.newPacketId());
- }
if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
publishBuilder.publishProperties(new PublishProperties());
}
- MqttPublishMessage publishMessage = publishBuilder.build();
-
InflightQueue inflightQueue = mqttSession.getInflightQueue();
- int index = inflightQueue.offer(publishMessage, persistenceMessage.getOffset());
- // 飞行队列已满
- if (index == -1) {
- mqttSession.flush();
-// System.out.println("queue is full..." + expectConsumerOffset);
- return expectConsumerOffset;
- }
- long start = System.currentTimeMillis();
- mqttSession.publish(publishMessage, packetId -> {
- //最早发送的消息若收到响应,则更新点位
- long offset = inflightQueue.commit(index);
- if (offset == -1) {
- return;
+ boolean suc = inflightQueue.offer(publishBuilder, offset -> {
+ if (mqttQoS == MqttQoS.AT_MOST_ONCE) {
+ nextConsumerOffset = persistenceMessage.getOffset() + 1;
}
+ //最早发送的消息若收到响应,则更新点位
commitNextConsumerOffset(offset + 1);
if (persistenceMessage.isRetained()) {
setRetainConsumerOffset(getRetainConsumerOffset() + 1);
}
commitRetainConsumerTimestamp(persistenceMessage.getCreateTime());
- }, false);
+// if (inflightQueue.getCount() == 0) {
+ publish0(brokerContext, 0);
+// }
+ }, persistenceMessage.getOffset());
+ // 飞行队列已满
+ if (!suc) {
+// LOGGER.info("queue is full..." + expectConsumerOffset);
+ return;
+ }
+ long start = System.currentTimeMillis();
+ if (mqttQoS != MqttQoS.AT_MOST_ONCE) {
+ nextConsumerOffset = persistenceMessage.getOffset() + 1;
+ }
+
long cost = System.currentTimeMillis() - start;
if (cost > 100) {
System.out.println("publish busy ,cost: " + cost);
}
brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession);
//递归处理下一个消息
- return publish0(brokerContext, ++depth, expectConsumerOffset + 1);
+ publish0(brokerContext, ++depth);
}
public BrokerTopic getTopic() {
@@ -162,16 +165,4 @@ public class TopicSubscriber {
public void setTopicFilterToken(TopicToken topicFilterToken) {
this.topicFilterToken = topicFilterToken;
}
-
- public boolean isReady() {
- return ready;
- }
-
- public void setReady(boolean ready) {
- this.ready = ready;
- }
-
- public int getPushVersion() {
- return pushVersion;
- }
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java
index 1ebfdb53da3060b7b3ad225b2ac90bd942093cfa..2542c27f43a036481992a5c291b0a2930acc5d1c 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java
@@ -9,7 +9,6 @@ import org.smartboot.mqtt.common.eventbus.EventBusSubscriber;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.socket.util.QuickTimerTask;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
@@ -22,21 +21,17 @@ public class ConnectIdleTimeMonitorSubscriber implements EventBusSubscriber map = new ConcurrentHashMap<>();
-
public ConnectIdleTimeMonitorSubscriber(BrokerContext context) {
this.context = context;
}
@Override
public void subscribe(EventType eventType, MqttSession session) {
- map.put(session, session);
- context.getEventBus().subscribe(ServerEventType.CONNECT, (eventType1, object) -> map.remove(object.getSession()));
QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() {
@Override
public void execute() {
- if (map.remove(session) != null) {
- LOGGER.debug("长时间未收到客户端:{} 的Connect消息,连接断开!", session.getClientId());
+ if (!session.isAuthorized()) {
+ LOGGER.info("长时间未收到客户端:{} 的Connect消息,连接断开!", session.getClientId());
session.disconnect();
}
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java
index c141992e8a279f9a10567786ecf547f0ed507b83..45d55b5f35a7e9a79063862ac567bd928ed5e669 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java
@@ -11,6 +11,10 @@ import org.smartboot.mqtt.broker.openapi.controller.DashBoardController;
import org.smartboot.mqtt.broker.openapi.controller.SubscriptionController;
import org.smartboot.mqtt.broker.plugin.Plugin;
import org.smartboot.mqtt.broker.plugin.PluginException;
+import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
+
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ThreadFactory;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -19,6 +23,9 @@ import org.smartboot.mqtt.broker.plugin.PluginException;
public class OpenApiPlugin extends Plugin {
private static final Logger LOGGER = LoggerFactory.getLogger(OpenApiPlugin.class);
private static final String CONFIG_JSON_PATH = "$['broker']['openapi']";
+ private RestfulBootstrap restfulBootstrap;
+
+ private AsynchronousChannelGroup asynchronousChannelGroup;
@Override
protected void initPlugin(BrokerContext brokerContext) {
@@ -28,7 +35,15 @@ public class OpenApiPlugin extends Plugin {
return;
}
try {
- RestfulBootstrap restfulBootstrap = RestfulBootstrap.getInstance(new StaticResourceHandler());
+ asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
+ int i;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "openApi-" + (++i));
+ }
+ });
+ restfulBootstrap = RestfulBootstrap.getInstance(new StaticResourceHandler());
restfulBootstrap.inspect((httpRequest, response) -> {
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Headers", "*");
@@ -39,7 +54,8 @@ public class OpenApiPlugin extends Plugin {
HttpBootstrap bootstrap = restfulBootstrap.bootstrap();
bootstrap.setPort(config.getPort());
- bootstrap.configuration().bannerEnabled(false).host(config.getHost()).readBufferSize(1024 * 8);
+ bootstrap.configuration().bannerEnabled(false).host(config.getHost()).readBufferSize(1024 * 8).group(asynchronousChannelGroup);
+
bootstrap.start();
brokerContext.getProviders().setOpenApiBootStrap(restfulBootstrap);
LOGGER.info("openapi server start success!");
@@ -48,4 +64,10 @@ public class OpenApiPlugin extends Plugin {
throw new PluginException("start openapi exception");
}
}
+
+ @Override
+ protected void destroyPlugin() {
+ restfulBootstrap.bootstrap().shutdown();
+ asynchronousChannelGroup.shutdown();
+ }
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
index 0160113f168799a7af4dd1738903a6fb0469e988..6f57c96e424411e70bb35daf5a5beef3918a202d 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
@@ -80,7 +80,7 @@ public class ConnectProcessor implements MqttProcessor {
} else {
receiveMaximum = context.getBrokerConfigure().getMaxInflight();
}
- session.setInflightQueue(new InflightQueue(receiveMaximum));
+ session.setInflightQueue(new InflightQueue(session, receiveMaximum));
//如果服务端收到清理会话(CleanSession)标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0 之外,
// 还必须将 CONNACK 报文中的当前会话设置(Session Present)标志为 0。
@@ -160,15 +160,11 @@ public class ConnectProcessor implements MqttProcessor {
SessionStateProvider sessionStateProvider = context.getProviders().getSessionStateProvider();
SessionState sessionState = sessionStateProvider.get(session.getClientId());
if (sessionState != null) {
- session.getResponseConsumers().putAll(sessionState.getResponseConsumers());
sessionState.getSubscribers().forEach(session::subscribe);
//客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发
//任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或
//服务端重发消息的情况。
- session.getResponseConsumers().forEach((key, ackMessage) -> {
- session.getResponseConsumers().put(key, ackMessage);
- session.write(ackMessage.getOriginalMessage());
- });
+
}
}
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
index 4b17dc2f4c8cec0f6ef37d1928c806082ba2f6ff..ed4b7b1713c445e03ebf5de2f454cbb7803fca92 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
@@ -2,14 +2,13 @@ package org.smartboot.mqtt.broker.processor;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.MqttSession;
-import org.smartboot.mqtt.common.message.MqttVariableMessage;
-import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
/**
* @author 三刀(zhengjunweimail@163.com)
* @version V1.0 , 2022/4/15
*/
-public class MqttAckProcessor> extends AuthorizedMqttProcessor {
+public class MqttAckProcessor extends AuthorizedMqttProcessor {
@Override
public void process0(BrokerContext context, MqttSession session, T t) {
session.notifyResponse(t);
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
index 2c3778038eccad88ea2037a406abd6e888039a87..d997d6ab8fe746df9e532a3e5bb42c85967c5f03 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java
@@ -13,13 +13,10 @@ import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
-import java.util.function.Consumer;
-
/**
* 发布Topic
*
@@ -78,7 +75,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor) message -> {
+ session.write(pubRecMessage, message -> {
//发送pubRel消息。
//todo
MqttPubQosVariableHeader qosVariableHeader;
@@ -116,7 +114,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor
smart-mqtt
org.smartboot.mqtt
- 0.15
+ 0.16
../pom.xml
4.0.0
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/ClientQosPublisher.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/ClientQosPublisher.java
deleted file mode 100644
index 9feb5a13c8429e61e1ee7401d773d0bbf64ca349..0000000000000000000000000000000000000000
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/ClientQosPublisher.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.smartboot.mqtt.client;
-
-import org.smartboot.mqtt.common.AbstractSession;
-import org.smartboot.mqtt.common.AsyncTask;
-import org.smartboot.mqtt.common.QosPublisher;
-import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.socket.util.QuickTimerTask;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2022/4/25
- */
-public class ClientQosPublisher extends QosPublisher {
-
- @Override
- protected void retry(CompletableFuture future, AbstractSession session, MqttMessage mqttMessage) {
- //注册重试
- QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() {
- @Override
- public void execute() {
- if (!future.isDone()) {
- session.write(mqttMessage);
- }
- }
- }, 1, TimeUnit.SECONDS);
- }
-}
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
index e5f3c80a893eeac827cdd0b9849803b9bb90cdb3..e42ba2a2d25309f4f6df2a79f66da0c78dc07138 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
@@ -4,7 +4,6 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.AbstractSession;
-import org.smartboot.mqtt.common.AckMessage;
import org.smartboot.mqtt.common.DefaultMqttWriter;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.MqttMessageBuilders;
@@ -18,6 +17,7 @@ import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPingReqMessage;
import org.smartboot.mqtt.common.message.MqttPingRespMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
@@ -29,6 +29,7 @@ import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import org.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import org.smartboot.mqtt.common.message.payload.WillMessage;
import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
@@ -57,7 +58,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class MqttClient extends AbstractSession {
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqttClient.class);
/**
* 客户端配置项
*/
@@ -95,7 +96,7 @@ public class MqttClient extends AbstractSession {
}
public MqttClient(String host, int port, String clientId, MqttVersion mqttVersion) {
- super(new ClientQosPublisher(), new EventBusImpl(EventType.types()));
+ super(new EventBusImpl(EventType.types()));
clientConfigure.setHost(host);
clientConfigure.setPort(port);
clientConfigure.setMqttVersion(mqttVersion);
@@ -154,7 +155,7 @@ public class MqttClient extends AbstractSession {
//连接成功,注册订阅消息
if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
- setInflightQueue(new InflightQueue(16));
+ setInflightQueue(new InflightQueue(this, 16));
connected = true;
Runnable runnable;
while ((runnable = registeredTasks.poll()) != null) {
@@ -169,7 +170,7 @@ public class MqttClient extends AbstractSession {
//任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或
//服务端重发消息的情况。
if (!clientConfigure.isCleanSession()) {
- responseConsumers.values().forEach(ackMessage -> write(ackMessage.getOriginalMessage()));
+ //todo
}
consumer.accept(mqttConnAckMessage);
connected = true;
@@ -282,13 +283,16 @@ public class MqttClient extends AbstractSession {
}
MqttUnsubscribeMessage unsubscribedMessage = unsubscribeBuilder.build(properties);
// wait ack message.
- responseConsumers.put(unsubscribedMessage.getVariableHeader().getPacketId(), new AckMessage(unsubscribedMessage, mqttMessage -> {
- ValidateUtils.isTrue(mqttMessage instanceof MqttUnsubAckMessage, "uncorrected message type.");
- for (String unsubscribedTopic : unsubscribedTopics) {
- subscribes.remove(unsubscribedTopic);
- wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter()));
+ responseConsumers.put(unsubscribedMessage.getVariableHeader().getPacketId(), new Consumer>() {
+ @Override
+ public void accept(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
+ ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type.");
+ for (String unsubscribedTopic : unsubscribedTopics) {
+ subscribes.remove(unsubscribedTopic);
+ wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter()));
+ }
}
- }));
+ });
write(unsubscribedMessage);
}
@@ -326,26 +330,30 @@ public class MqttClient extends AbstractSession {
}
MqttSubscribeMessage subscribeMessage = subscribeBuilder.build();
- responseConsumers.put(subscribeMessage.getVariableHeader().getPacketId(), new AckMessage(subscribeMessage, mqttMessage -> {
- List qosValues = ((MqttSubAckMessage) mqttMessage).getPayload().grantedQoSLevels();
- ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response");
- int i = 0;
- for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) {
- MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++)));
- clientConfigure.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos);
- if (subscription.getQualityOfService() != MqttQoS.FAILURE) {
- subscribes.put(subscription.getTopicFilter(), new Subscribe(subscription.getTopicFilter(), minQos, consumer));
- //缓存统配匹配的topic
- TopicToken topicToken = new TopicToken(subscription.getTopicFilter());
- if (topicToken.isWildcards()) {
- wildcardsToken.add(topicToken);
+ responseConsumers.put(subscribeMessage.getVariableHeader().getPacketId(), new Consumer>() {
+ @Override
+ public void accept(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
+ List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels();
+ ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response");
+ ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response");
+ int i = 0;
+ for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) {
+ MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++)));
+ clientConfigure.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos);
+ if (subscription.getQualityOfService() != MqttQoS.FAILURE) {
+ subscribes.put(subscription.getTopicFilter(), new Subscribe(subscription.getTopicFilter(), minQos, consumer));
+ //缓存统配匹配的topic
+ TopicToken topicToken = new TopicToken(subscription.getTopicFilter());
+ if (topicToken.isWildcards()) {
+ wildcardsToken.add(topicToken);
+ }
+ } else {
+ LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter());
}
- } else {
- LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter());
+ subAckConsumer.accept(MqttClient.this, minQos);
}
- subAckConsumer.accept(this, minQos);
}
- }));
+ });
write(subscribeMessage);
}
@@ -373,46 +381,39 @@ public class MqttClient extends AbstractSession {
public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, Consumer consumer) {
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().topicName(topic).qos(qos).payload(payload).retained(retain);
- if (qos.value() > 0) {
- int packetId = newPacketId();
- publishBuilder.packetId(packetId);
- }
//todo
if (getMqttVersion() == MqttVersion.MQTT_5) {
publishBuilder.publishProperties(new PublishProperties());
}
- MqttPublishMessage message = publishBuilder.build();
if (connected) {
- publish(message, consumer);
+ publish(publishBuilder, consumer);
} else {
- registeredTasks.offer(() -> publish(message, consumer));
+ registeredTasks.offer(() -> publish(publishBuilder, consumer));
}
}
- @Override
- public synchronized void publish(MqttPublishMessage message, Consumer consumer) {
+ private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) {
InflightQueue inflightQueue = getInflightQueue();
- int index = inflightQueue.offer(message, 1);
- if (index == -1) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- publish(message, consumer);
- return;
- }
- super.publish(message, packetId -> {
- consumer.accept(packetId);
+ boolean suc = inflightQueue.offer(publishBuilder, offset -> {
+ consumer.accept(publishBuilder.getPacketId());
//最早发送的消息若收到响应,则更新点位
- long offset = inflightQueue.commit(index);
if (offset != -1) {
synchronized (MqttClient.this) {
MqttClient.this.notifyAll();
}
}
-
- }, false);
+ }, 1);
+ flush();
+ if (!suc) {
+ try {
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ publish(publishBuilder, consumer);
+ }
}
public MqttClientConfigure getClientConfigure() {
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
index cfc786a2d6e9c84c97714a13f08d48975cbfce1a..798f3c324a8e6e0413b84deab6b13dbb7454fc0e 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
@@ -32,24 +32,24 @@ import java.util.Map;
public class MqttClientProcessor extends AbstractMessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttClientProcessor.class);
private final MqttClient mqttClient;
- private final Map, MqttProcessor extends MqttMessage>> processorMap = new HashMap<>();
+ private static final Map, MqttProcessor extends MqttMessage>> processors = new HashMap<>();
public MqttClientProcessor(MqttClient mqttClient) {
this.mqttClient = mqttClient;
- processorMap.put(MqttConnAckMessage.class, new ConnAckProcessor());
- processorMap.put(MqttPubAckMessage.class, new MqttAckProcessor());
- processorMap.put(MqttPublishMessage.class, new PublishProcessor());
- processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor());
- processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor());
- processorMap.put(MqttPubRelMessage.class, new MqttAckProcessor());
- processorMap.put(MqttSubAckMessage.class, new MqttAckProcessor());
- processorMap.put(MqttPingRespMessage.class, new MqttPingRespProcessor());
+ processors.put(MqttConnAckMessage.class, new ConnAckProcessor());
+ processors.put(MqttPubAckMessage.class, new MqttAckProcessor());
+ processors.put(MqttPublishMessage.class, new PublishProcessor());
+ processors.put(MqttPubRecMessage.class, new MqttAckProcessor());
+ processors.put(MqttPubCompMessage.class, new MqttAckProcessor());
+ processors.put(MqttPubRelMessage.class, new MqttAckProcessor());
+ processors.put(MqttSubAckMessage.class, new MqttAckProcessor());
+ processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor());
}
@Override
public void process0(AioSession session, MqttMessage msg) {
mqttClient.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(mqttClient, msg));
- MqttProcessor processor = processorMap.get(msg.getClass());
+ MqttProcessor processor = processors.get(msg.getClass());
// LOGGER.info("receive msg:{}", msg);
if (processor != null) {
processor.process(mqttClient, msg);
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java
index 44d8778ea5cb877211fbba7c073148b1c225acf2..e7bef87cc63f11bf0e35242753988058d6590248 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java
@@ -1,14 +1,13 @@
package org.smartboot.mqtt.client.processor;
import org.smartboot.mqtt.client.MqttClient;
-import org.smartboot.mqtt.common.message.MqttVariableMessage;
-import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
/**
* @author 三刀(zhengjunweimail@163.com)
* @version V1.0 , 2022/4/7
*/
-public class MqttAckProcessor> implements MqttProcessor {
+public class MqttAckProcessor implements MqttProcessor {
@Override
public void process(MqttClient mqttClient, T message) {
mqttClient.notifyResponse(message);
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
index f3338c4b1116742f80e95341c16e75b582dc0ca3..dd45017987b38a5549f01d4f0a36b4335a104c8f 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
@@ -10,15 +10,12 @@ import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.util.TopicTokenUtil;
-import java.util.function.Consumer;
-
/**
* 发布Topic
*
@@ -82,7 +79,7 @@ public class PublishProcessor implements MqttProcessor {
}
MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), properties);
MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(variableHeader);
- mqttClient.write(pubAckMessage);
+ mqttClient.write(pubAckMessage, false);
}
private void processQos2(MqttClient session, MqttPublishMessage mqttPublishMessage) {
@@ -95,7 +92,7 @@ public class PublishProcessor implements MqttProcessor {
MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(messageId, properties);
MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader);
- session.write(pubRecMessage, (Consumer) message -> {
+ session.write(pubRecMessage, message -> {
//todo
ReasonProperties reasonProperties = null;
if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) {
@@ -103,7 +100,7 @@ public class PublishProcessor implements MqttProcessor {
}
MqttPubQosVariableHeader qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), reasonProperties);
MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader);
- session.write(pubRelMessage);
+ session.write(pubRelMessage, false);
processPublishMessage(mqttPublishMessage, session);
});
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index f0a3e11ba9fa5da7376235caf90581613152e0a0..6203e97b48a5242c242d94538b43ea303c464c88 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.15
+ 0.16
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
index 37f0dfe1fa5218ccdd7a47ec1a3a37c73775d455..5ab9f5294b542b024eaabc17824c1376fe3ea890 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
@@ -2,14 +2,14 @@ package org.smartboot.mqtt.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
-import org.smartboot.mqtt.common.message.MqttVariableMessage;
+import org.smartboot.mqtt.common.message.MqttPubQosMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.ValidateUtils;
@@ -30,11 +30,6 @@ import java.util.function.Consumer;
public abstract class AbstractSession {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class);
private static final int QOS0_PACKET_ID = 0;
- /**
- * req-resp 消息模式的处理回调
- */
- protected final Map responseConsumers = new ConcurrentHashMap<>();
- private final QosPublisher qosPublisher;
/**
* 用于生成当前会话的报文标识符
*/
@@ -60,26 +55,22 @@ public abstract class AbstractSession {
private MqttVersion mqttVersion;
private InflightQueue inflightQueue;
+ protected Map>> responseConsumers = new ConcurrentHashMap<>();
- public AbstractSession(QosPublisher publisher, EventBus eventBus) {
- this.qosPublisher = publisher;
+ public AbstractSession(EventBus eventBus) {
this.eventBus = eventBus;
}
- public final synchronized void write(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> mqttMessage, Consumer extends MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader>> consumer) {
- responseConsumers.put(mqttMessage.getVariableHeader().getPacketId(), new AckMessage(mqttMessage, consumer));
- write(mqttMessage);
- }
-
- public Map getResponseConsumers() {
- return responseConsumers;
+ public final void write(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> mqttMessage, Consumer> consumer) {
+ responseConsumers.put(mqttMessage.getVariableHeader().getPacketId(), consumer);
+ write(mqttMessage, false);
}
- public final void notifyResponse(MqttVariableMessage extends MqttPacketIdVariableHeader> message) {
- AckMessage ackMessage = responseConsumers.remove(message.getVariableHeader().getPacketId());
- if (ackMessage != null) {
- ackMessage.setDone(true);
- ackMessage.getConsumer().accept(message);
+ public final void notifyResponse(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
+ if (message instanceof MqttPubQosMessage && message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) {
+ inflightQueue.notify((MqttPubQosMessage) message);
+ } else {
+ responseConsumers.remove(message.getVariableHeader().getPacketId()).accept(message);
}
}
@@ -102,7 +93,7 @@ public abstract class AbstractSession {
}
}
- public final synchronized void write(MqttMessage mqttMessage) {
+ public final void write(MqttMessage mqttMessage) {
write(mqttMessage, true);
}
@@ -112,32 +103,6 @@ public abstract class AbstractSession {
}
}
- public void publish(MqttPublishMessage message, Consumer consumer) {
- publish(message, consumer, true);
- }
-
- /**
- * 若发送的Qos为0,则回调的consumer packetId为0
- */
- public void publish(MqttPublishMessage message, Consumer consumer, boolean autoFlush) {
-// LOGGER.info("publish to client:{}, topic:{} packetId:{}", clientId, message.getMqttPublishVariableHeader().topicName(), message.getMqttPublishVariableHeader().packetId());
- switch (message.getFixedHeader().getQosLevel()) {
- case AT_MOST_ONCE:
- try {
- write(message, autoFlush);
- } finally {
- consumer.accept(QOS0_PACKET_ID);
- }
- break;
- case AT_LEAST_ONCE:
- qosPublisher.publishQos1(this, message, consumer, autoFlush);
- break;
- case EXACTLY_ONCE:
- qosPublisher.publishQos2(this, message, consumer, autoFlush);
- break;
- }
- }
-
public long getLatestSendMessageTime() {
return latestSendMessageTime;
}
@@ -163,9 +128,6 @@ public abstract class AbstractSession {
packetIdCreator.set(0);
return newPacketId();
}
- if (responseConsumers.containsKey(packageId)) {
- return newPacketId();
- }
return packageId;
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java
index a2600709fa5c8e8485887c048a05b448a309b6d8..9d480c6d9d37d930e8e6233c7e3f371d1ca80f3f 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java
@@ -1,8 +1,8 @@
package org.smartboot.mqtt.common;
-import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
-import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
+import org.smartboot.mqtt.common.enums.MqttMessageType;
+import org.smartboot.mqtt.common.enums.MqttQoS;
+import org.smartboot.mqtt.common.message.MqttPublishMessage;
import java.util.function.Consumer;
@@ -14,40 +14,62 @@ public class AckMessage {
/**
* 原始消息
*/
- private final MqttMessage originalMessage;
+ private final MqttPublishMessage originalMessage;
+
+ private MqttMessageType expectMessageType;
/**
* 回调事件
*/
- private Consumer extends MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader>> consumer;
+ private final Consumer consumer;
- /**
- * 执行状态
- */
- private boolean done;
+ private final long offset;
- public AckMessage(MqttMessage originalMessage, Consumer extends MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader>> consumer) {
+ private boolean commit;
+
+ private final int packetId;
+
+ public AckMessage(MqttPublishMessage originalMessage, int packetId, Consumer consumer, long offset) {
this.originalMessage = originalMessage;
this.consumer = consumer;
+ this.offset = offset;
+ this.packetId = packetId;
+ if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE) {
+ this.expectMessageType = MqttMessageType.PUBACK;
+ } else if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE) {
+ this.expectMessageType = MqttMessageType.PUBREC;
+ }
}
- public MqttMessage getOriginalMessage() {
+ public MqttPublishMessage getOriginalMessage() {
return originalMessage;
}
- public Consumer getConsumer() {
+ public Consumer getConsumer() {
return consumer;
}
- public void setConsumer(Consumer extends MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader>> consumer) {
- this.consumer = consumer;
+ public MqttMessageType getExpectMessageType() {
+ return expectMessageType;
+ }
+
+ public void setExpectMessageType(MqttMessageType expectMessageType) {
+ this.expectMessageType = expectMessageType;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public boolean isCommit() {
+ return commit;
}
- public boolean isDone() {
- return done;
+ public void setCommit(boolean commit) {
+ this.commit = commit;
}
- public void setDone(boolean done) {
- this.done = done;
+ public int getPacketId() {
+ return packetId;
}
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
index b2e5d444b8b586803bf236382d939c81c3de44ee..fa38e953040ce61fcfc95f6ee6ab083e8debd241 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
@@ -1,76 +1,129 @@
package org.smartboot.mqtt.common;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.common.enums.MqttMessageType;
+import org.smartboot.mqtt.common.enums.MqttQoS;
+import org.smartboot.mqtt.common.enums.MqttVersion;
+import org.smartboot.mqtt.common.message.MqttPubQosMessage;
+import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
+import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.util.ValidateUtils;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
/**
* @author 三刀(zhengjunweimail@163.com)
* @version V1.0 , 2022/4/26
*/
public class InflightQueue {
- private final MqttPublishMessage[] queue;
- private final long[] offsets;
+ private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class);
+ private final AckMessage[] queue;
private int takeIndex;
private int putIndex;
private int count;
- private final ReentrantLock lock = new ReentrantLock(false);
- private final Condition notFull = lock.newCondition();
+ private final AtomicInteger packetId = new AtomicInteger(0);
- public InflightQueue(int size) {
+ private final AbstractSession session;
+
+ public InflightQueue(AbstractSession session, int size) {
ValidateUtils.isTrue(size > 0, "inflight must >0");
- this.queue = new MqttPublishMessage[size];
- this.offsets = new long[size];
+ this.queue = new AckMessage[size];
+ this.session = session;
}
- public int offer(MqttPublishMessage mqttMessage, long offset) {
- lock.lock();
- try {
+ public boolean offer(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer, long offset) {
+ int id = 0;
+ MqttPublishMessage mqttMessage;
+ synchronized (this) {
if (count == queue.length) {
- return -1;
+ return false;
+ }
+ id = packetId.incrementAndGet();
+ // 16位无符号最大值65535
+ if (id > 65535) {
+ id = id % queue.length + queue.length;
+ packetId.set(id);
}
- queue[putIndex] = mqttMessage;
- offsets[putIndex] = offset;
- int index = putIndex++;
+ publishBuilder.packetId(id);
+ mqttMessage = publishBuilder.build();
+ queue[putIndex++] = new AckMessage(mqttMessage, id, consumer, offset);
if (putIndex == queue.length) {
putIndex = 0;
}
count++;
- return index;
- } finally {
- lock.unlock();
+// System.out.println("publish...");
+
}
+ session.write(mqttMessage, false);
+ // QOS直接响应
+ if (mqttMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) {
+ long offset1 = commit(id);
+// ValidateUtils.isTrue(offset1 == -1 || offset1 == offset, "invalid offset");
+ consumer.accept(offset);
+ }
+ return true;
}
- public long commit(int commitIndex) {
- lock.lock();
- try {
- if (commitIndex != takeIndex) {
- //转负数表示以提交
- offsets[commitIndex] = offsets[commitIndex] | Long.MIN_VALUE;
- return -1;
+ public void notify(MqttPubQosMessage message) {
+ AckMessage ackMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length];
+ ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == ackMessage.getExpectMessageType(), "invalid message type");
+ switch (message.getFixedHeader().getMessageType()) {
+ case PUBACK: {
+ long offset = commit(message.getVariableHeader().getPacketId());
+ ackMessage.getConsumer().accept(offset);
+ break;
}
- long offset = offsets[takeIndex++];
- count--;
+ case PUBREC:
+ ackMessage.setExpectMessageType(MqttMessageType.PUBCOMP);
+ //todo
+ ReasonProperties properties = null;
+ if (message.getVersion() == MqttVersion.MQTT_5) {
+ properties = new ReasonProperties();
+ }
+ MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
+ MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
+ session.write(pubRelMessage, false);
+ break;
+ case PUBCOMP:
+ long offset = commit(message.getVariableHeader().getPacketId());
+ ackMessage.getConsumer().accept(offset);
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+ private synchronized long commit(int packetId) {
+ int commitIndex = (packetId - 1) % queue.length;
+ AckMessage ackMessage = queue[commitIndex];
+ ValidateUtils.isTrue(ackMessage.getPacketId() == packetId, "invalid message");
+ ackMessage.setCommit(true);
+
+ if (commitIndex != takeIndex) {
+ return -1;
+ }
+ queue[takeIndex++] = null;
+ count--;
+ if (takeIndex == queue.length) {
+ takeIndex = 0;
+ }
+ while (count > 0 && queue[takeIndex].isCommit()) {
+ ackMessage = queue[takeIndex];
+ queue[takeIndex++] = null;
if (takeIndex == queue.length) {
takeIndex = 0;
}
- while (count > 0 && offsets[takeIndex] < 0) {
- offset = offsets[takeIndex] & ~Long.MIN_VALUE;
- offsets[takeIndex] = 0;
- queue[takeIndex++] = null;
- if (takeIndex == queue.length) {
- takeIndex = 0;
- }
- count--;
- }
- notFull.signal();
- return offset;
- } finally {
- lock.unlock();
+ count--;
}
+ return ackMessage.getOffset();
+ }
+
+ public int getCount() {
+ return count;
}
-}
+}
\ No newline at end of file
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java
index 52763a8556e59f956a2ba9ade36eb41b895932aa..f70d9843199c96a15498a4b746262aacbfee3e38 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java
@@ -78,8 +78,12 @@ public final class MqttMessageBuilders {
return this;
}
+ public int getPacketId() {
+ return packetId;
+ }
+
public MqttPublishMessage build() {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0);
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained);
if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) {
packetId = -1;
}
@@ -119,7 +123,7 @@ public final class MqttMessageBuilders {
}
public MqttSubscribeMessage build() {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false);
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload();
mqttSubscribePayload.setTopicSubscriptions(subscriptions);
MqttSubscribeVariableHeader variableHeader = new MqttSubscribeVariableHeader(packetId, subscribeProperties);
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java
deleted file mode 100644
index 15ca5c73983ec5c04f7487ac71c56649313b4a47..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.smartboot.mqtt.common;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.smartboot.mqtt.common.enums.MqttMessageType;
-import org.smartboot.mqtt.common.enums.MqttQoS;
-import org.smartboot.mqtt.common.enums.MqttVersion;
-import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
-import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
-import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
-import org.smartboot.mqtt.common.util.ValidateUtils;
-
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-public abstract class QosPublisher {
- private static final Logger LOGGER = LoggerFactory.getLogger(QosPublisher.class);
-
- void publishQos1(AbstractSession session, MqttPublishMessage publishMessage, Consumer consumer, boolean autoFlush) {
- MqttQoS qos = publishMessage.getFixedHeader().getQosLevel();
- ValidateUtils.notNull(qos == MqttQoS.AT_LEAST_ONCE, "qos is null");
- CompletableFuture future = new CompletableFuture<>();
- Integer cacheKey = publishMessage.getVariableHeader().getPacketId();
- //至少一次
-
- session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> {
- ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type");
- future.complete(true);
- session.responseConsumers.remove(cacheKey);
- LOGGER.info("Qos1消息发送成功...");
- consumer.accept(publishMessage.getVariableHeader().getPacketId());
- }));
- session.write(publishMessage, autoFlush);
- //注册重试
- retry(future, session, publishMessage);
- }
-
- void publishQos2(AbstractSession session, MqttPublishMessage publishMessage, Consumer consumer, boolean autoFlush) {
- MqttQoS qos = publishMessage.getFixedHeader().getQosLevel();
- ValidateUtils.notNull(qos == MqttQoS.EXACTLY_ONCE, "qos is null");
- Integer cacheKey = publishMessage.getVariableHeader().getPacketId();
- CompletableFuture publishFuture = new CompletableFuture<>();
- //只有一次
- session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> {
- ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type");
- ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId");
- publishFuture.complete(true);
-
- //todo
- ReasonProperties properties = null;
- if (message.getVersion() == MqttVersion.MQTT_5) {
- properties = new ReasonProperties();
- }
- MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
- MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
- CompletableFuture pubRelFuture = new CompletableFuture<>();
- session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> {
- ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type");
- ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId");
- pubRelFuture.complete(true);
- LOGGER.info("Qos2消息发送成功...");
- consumer.accept(compMessage.getVariableHeader().getPacketId());
- }));
- //注册重试
- retry(pubRelFuture, session, pubRelMessage);
- session.write(pubRelMessage);
- }));
- session.write(publishMessage, false);
- retry(publishFuture, session, publishMessage);
- }
-
- protected abstract void retry(CompletableFuture future, AbstractSession session, MqttMessage mqttMessage);
-}
\ No newline at end of file
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/QosCallbackTypeEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/QosCallbackTypeEnum.java
deleted file mode 100644
index 0b0c8ab383c632d5efd922ab9c33c1609ae738ab..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/QosCallbackTypeEnum.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.smartboot.mqtt.common.enums;
-
-/**
- * @author qinluo
- * @date 2022-04-11 16:30:57
- * @since 1.0.0
- */
-public enum QosCallbackTypeEnum {
-
- /**
- * QosLevel 1. (server -> client)
- */
- PUBACK(1, MqttMessageType.PUBACK.value(), 0, null),
-
- /*
- * PUBLISH QosLevel 2 (client -> server)
- *
- */
-
- /**
- * QosLevel 2 (server -> client)
- */
- PUBREC(2, MqttMessageType.PUBREC.value(), 0, null),
-
- /**
- * QosLevel 2 (client -> server)
- */
- PUBREL(3, MqttMessageType.PUBREL.value(), 1, null),
-
- /**
- * QosLevel 2 (server -> client)
- */
- PUBCOMP(4, MqttMessageType.PUBCOMP.value(), 0, null),
-
- ;
-
- static {
- PUBREC.next = PUBREL;
- PUBREL.next = PUBCOMP;
- }
-
- private final int type;
- private final int msgType;
- private final int waitSide;
- private QosCallbackTypeEnum next;
-
- QosCallbackTypeEnum(int type, int msgType, int waitSide, QosCallbackTypeEnum next) {
- this.type = type;
- this.msgType = msgType;
- this.waitSide = waitSide;
- this.next = next;
- }
-
- public int getType() {
- return type;
- }
-
- public int getMsgType() {
- return msgType;
- }
-
- public int getWaitSide() {
- return waitSide;
- }
-
- public static QosCallbackTypeEnum getInstance(int type) {
- for (QosCallbackTypeEnum instance : values()) {
- if (instance.type == type) {
- return instance;
- }
- }
-
- return null;
- }
-
- public static QosCallbackTypeEnum next(int type, int side) {
- QosCallbackTypeEnum instance = getInstance(type);
- if (instance == null) {
- return null;
- }
-
- instance = instance.next;
-
- while (instance != null) {
- if (instance.waitSide == side) {
- return instance;
- }
- instance = instance.next;
- }
-
- return null;
-
- }
-}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java
index f1ccc67d3a99e2fb937711b7ef8ef01d1c2471c4..5bf8d6a1ac308c772f6cc1edba424a87b8691a25 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java
@@ -2,7 +2,6 @@ package org.smartboot.mqtt.common.message;
import org.smartboot.mqtt.common.MqttWriter;
-import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.exception.MqttException;
import org.smartboot.socket.util.BufferUtils;
import org.smartboot.socket.util.DecoderException;
@@ -20,47 +19,6 @@ public final class MqttCodecUtil {
private MqttCodecUtil() {
}
- public static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
- switch (mqttFixedHeader.getMessageType()) {
- case CONNECT:
- case CONNACK:
- case PUBACK:
- case PUBREC:
- case PUBCOMP:
- case SUBACK:
- case UNSUBACK:
- case PINGREQ:
- case PINGRESP:
- case DISCONNECT:
- if (mqttFixedHeader.isDup() ||
- mqttFixedHeader.getQosLevel() != MqttQoS.AT_MOST_ONCE ||
- mqttFixedHeader.isRetain()) {
- return new MqttFixedHeader(
- mqttFixedHeader.getMessageType(),
- false,
- MqttQoS.AT_MOST_ONCE,
- false,
- mqttFixedHeader.remainingLength());
- }
- return mqttFixedHeader;
- case PUBREL:
- case SUBSCRIBE:
- case UNSUBSCRIBE:
- if (mqttFixedHeader.isRetain()) {
- return new MqttFixedHeader(
- mqttFixedHeader.getMessageType(),
- mqttFixedHeader.isDup(),
- mqttFixedHeader.getQosLevel(),
- false,
- mqttFixedHeader.remainingLength());
- }
- return mqttFixedHeader;
- default:
- return mqttFixedHeader;
- }
- }
-
-
/**
* 解码变长字节整数,规范1.5.5
*/
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java
index 4da49d2d223186a99637de8dc48aa7b53cdbbfeb..4e152214bb2a5f8cd1635c8473e20d8ea11994ca 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java
@@ -26,12 +26,20 @@ public class MqttFixedHeader {
public static final MqttFixedHeader PUB_REC_HEADER = new MqttFixedHeader(MqttMessageType.PUBREC, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PUB_REL_HEADER = new MqttFixedHeader(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE);
public static final MqttFixedHeader PUB_COMP_HEADER = new MqttFixedHeader(MqttMessageType.PUBCOMP, MqttQoS.AT_MOST_ONCE);
+ public static final MqttFixedHeader SUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, MqttQoS.AT_LEAST_ONCE);
public static final MqttFixedHeader SUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.SUBACK, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader UNSUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, MqttQoS.AT_LEAST_ONCE);
public static final MqttFixedHeader UNSUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBACK, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PING_REQ_HEADER = new MqttFixedHeader(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader PING_RESP_HEADER = new MqttFixedHeader(MqttMessageType.PINGRESP, MqttQoS.AT_MOST_ONCE);
public static final MqttFixedHeader DISCONNECT_HEADER = new MqttFixedHeader(MqttMessageType.DISCONNECT, MqttQoS.AT_MOST_ONCE);
+
+ public static final MqttFixedHeader PUB_QOS0_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false);
+ public static final MqttFixedHeader PUB_QOS1_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false);
+ public static final MqttFixedHeader PUB_QOS2_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.EXACTLY_ONCE, false);
+
+ public static final MqttFixedHeader PUB_FAILURE_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.FAILURE, false);
+
private final MqttMessageType messageType;
/**
* 重发标志
@@ -42,18 +50,16 @@ public class MqttFixedHeader {
* 保留标志,是否存储消息
*/
private final boolean retain;
- private final int remainingLength;
- public MqttFixedHeader(MqttMessageType messageType, boolean dup, MqttQoS qosLevel, boolean retain, int remainingLength) {
+ public MqttFixedHeader(MqttMessageType messageType, boolean dup, MqttQoS qosLevel, boolean retain) {
this.messageType = messageType;
this.dup = dup;
this.qosLevel = qosLevel;
this.retain = retain;
- this.remainingLength = remainingLength;
}
public MqttFixedHeader(MqttMessageType messageType, MqttQoS qosLevel) {
- this(messageType, false, qosLevel, false, 0);
+ this(messageType, false, qosLevel, false);
}
public MqttMessageType getMessageType() {
@@ -71,9 +77,4 @@ public class MqttFixedHeader {
public boolean isRetain() {
return retain;
}
-
- public int remainingLength() {
- return remainingLength;
- }
-
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java
index a1abe66761f813b0b2d0b9c1167ec1228be04470..b3c21fb98277d3fbb1b8301a648d62ac518b4197 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java
@@ -32,6 +32,10 @@ public abstract class MqttMessage extends ToString {
*/
protected final MqttFixedHeader fixedHeader;
protected MqttVersion version;
+ /**
+ * 剩余长度
+ */
+ private int remainingLength;
public MqttMessage(MqttFixedHeader mqttFixedHeader) {
this.fixedHeader = mqttFixedHeader;
@@ -84,6 +88,13 @@ public abstract class MqttMessage extends ToString {
return messageId;
}
+ public int getRemainingLength() {
+ return remainingLength;
+ }
+
+ public void setRemainingLength(int remainingLength) {
+ this.remainingLength = remainingLength;
+ }
public final MqttVersion getVersion() {
return version;
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java
index 6e9ca561bd05e0b3b70d481949a4f2cc05aed31d..ee919235b6373f41d0a137374b041e462d98e6fa 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java
@@ -28,11 +28,11 @@ public class MqttPubQosMessage extends MqttPacketIdentifierMessage 2) {
+ if (getRemainingLength() > 2) {
reasonCode = buffer.get();
}
//如果剩余长度小于4,则表示没有属性长度字段。
- if (fixedHeader.remainingLength() >= 4) {
+ if (getRemainingLength() >= 4) {
ReasonProperties properties = new ReasonProperties();
properties.decode(buffer);
header = new MqttPubQosVariableHeader(packetId, properties);
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
index 7e32b4f28f3f90393f687485fdc7cab1209188d1..2172884513861d052cde4604a4716dc5279f241f 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java
@@ -53,7 +53,7 @@ public class MqttPublishMessage extends MqttVariableMessage grantedQos = new ArrayList();
int limit = buffer.limit();
buffer.limit(buffer.position() + payloadLength);
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java
index eda1761a34624c315f1e89cf5256ff413948d152..2132b532d1dc4ebe3fae0265408a21095420bfbd 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java
@@ -47,7 +47,7 @@ public class MqttSubscribeMessage extends MqttPacketIdentifierMessage subscribeTopics = new ArrayList<>();
- int payloadLength = fixedHeader.remainingLength() - getVariableHeaderLength();
+ int payloadLength = getRemainingLength() - getVariableHeaderLength();
ValidateUtils.isTrue(buffer.remaining() >= payloadLength, "数据不足");
int limit = buffer.limit();
buffer.limit(buffer.position() + payloadLength);
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java
index b8e637e2e9ac7e230c2a57778f2e81fbb7314eb2..53c142bd2bd2822fdb10fa8721ee320ebb29cfd5 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java
@@ -48,7 +48,7 @@ public class MqttUnsubscribeMessage extends MqttPacketIdentifierMessage unsubscribeTopics = new ArrayList();
- int payloadLength = fixedHeader.remainingLength() - getVariableHeaderLength();
+ int payloadLength = getRemainingLength() - getVariableHeaderLength();
int limit = buffer.limit();
buffer.limit(buffer.position() + payloadLength);
while (buffer.hasRemaining()) {
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java
index b52e06b6ec7e9be3167482afec4f8c65e4a3c329..6ba963fa2159b4576ac5887cc38e13b4c6d600ab 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java
@@ -11,7 +11,7 @@ public abstract class MqttPacketIdVariableHeader e
/**
* 报文标识符
*/
- private final int packetId;
+ private int packetId;
public MqttPacketIdVariableHeader(int packetId, T properties) {
super(properties);
@@ -22,4 +22,8 @@ public abstract class MqttPacketIdVariableHeader e
public int getPacketId() {
return packetId;
}
+
+ public void setPacketId(int packetId) {
+ this.packetId = packetId;
+ }
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java
index 06f5f08321fbac42bf9ec2713a5d429b8be6dddd..3e17d8d8a469520cc23258a04bd8aeeec614f389 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java
@@ -3,6 +3,7 @@ package org.smartboot.mqtt.common.message.variable;
import org.smartboot.mqtt.common.MqttWriter;
import org.smartboot.mqtt.common.message.MqttCodecUtil;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
+import org.smartboot.mqtt.common.util.MqttUtil;
import java.io.IOException;
@@ -31,7 +32,7 @@ public class MqttPublishVariableHeader extends MqttPacketIdVariableHeader 0 ? 2 : 0;
- topicNameBytes = MqttCodecUtil.encodeUTF8(topicName);
+ topicNameBytes = MqttUtil.encodeCache(topicName);
length += topicNameBytes.length;
return length;
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java
index 968857aaf7377e92aca8ef9b6d0e64d6de7ee49c..0d2e9c241642e7b245aadf2f09dde900480abe34 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java
@@ -1,5 +1,7 @@
package org.smartboot.mqtt.common.protocol;
+import org.smartboot.mqtt.common.enums.MqttMessageType;
+import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
@@ -22,6 +24,53 @@ final class MqttMessageFactory {
private MqttMessageFactory() {
}
+ public static MqttFixedHeader newMqttFixedHeader(MqttMessageType messageType, boolean dup, int qosLevel, boolean retain) {
+ switch (messageType) {
+ case CONNECT:
+ return MqttFixedHeader.CONNECT_HEADER;
+ case CONNACK:
+ return MqttFixedHeader.CONN_ACK_HEADER;
+ case SUBSCRIBE:
+ return MqttFixedHeader.SUBSCRIBE_HEADER;
+ case SUBACK:
+ return MqttFixedHeader.SUB_ACK_HEADER;
+ case UNSUBACK:
+ return MqttFixedHeader.UNSUB_ACK_HEADER;
+ case UNSUBSCRIBE:
+ return MqttFixedHeader.UNSUBSCRIBE_HEADER;
+ case PUBLISH:
+ if (dup || retain) {
+ return new MqttFixedHeader(messageType, dup, MqttQoS.valueOf(qosLevel), retain);
+ }
+ switch (qosLevel) {
+ case 0:
+ return MqttFixedHeader.PUB_QOS0_HEADER;
+ case 1:
+ return MqttFixedHeader.PUB_QOS1_HEADER;
+ case 2:
+ return MqttFixedHeader.PUB_QOS2_HEADER;
+ default:
+ return MqttFixedHeader.PUB_FAILURE_HEADER;
+ }
+ case PUBACK:
+ return MqttFixedHeader.PUB_ACK_HEADER;
+ case PUBREC:
+ return MqttFixedHeader.PUB_REC_HEADER;
+ case PUBREL:
+ return MqttFixedHeader.PUB_REL_HEADER;
+ case PUBCOMP:
+ return MqttFixedHeader.PUB_COMP_HEADER;
+ case PINGREQ:
+ return MqttFixedHeader.PING_REQ_HEADER;
+ case PINGRESP:
+ return MqttFixedHeader.PING_RESP_HEADER;
+ case DISCONNECT:
+ return MqttFixedHeader.DISCONNECT_HEADER;
+ default:
+ throw new IllegalArgumentException("unknown message type: " + messageType);
+ }
+ }
+
public static MqttMessage newMessage(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.getMessageType()) {
case CONNECT:
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java
index 7ccc5c87444879c6aa791bcdbda56c90131b4f69..2ac415e2561dacb3868abeeec07b00ae5022a30b 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java
@@ -3,9 +3,7 @@ package org.smartboot.mqtt.common.protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
-import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
-import org.smartboot.mqtt.common.message.MqttCodecUtil;
import org.smartboot.mqtt.common.message.MqttFixedHeader;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.util.ValidateUtils;
@@ -83,27 +81,19 @@ public class MqttProtocol implements Protocol {
}
buffer.mark();
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
- MqttCodecUtil.resetUnusedFields(mqttFixedHeader);
-// switch (mqttFixedHeader.getMessageType()) {
-// case PUBREL:
-// case SUBSCRIBE:
-// case UNSUBSCRIBE:
-// if (mqttFixedHeader.getQosLevel() != MqttQoS.AT_LEAST_ONCE) {
-// throw new DecoderException(mqttFixedHeader.getMessageType().name() + " message must have QoS 1");
-// }
-// }
- unit.mqttMessage = MqttMessageFactory.newMessage(mqttFixedHeader);
- //非MqttConnectMessage对象为null,
- if (unit.mqttMessage.getVersion() == null) {
- unit.mqttMessage.setVersion(attachment.get(MQTT_VERSION_ATTACH_KEY));
- }
+ MqttFixedHeader mqttFixedHeader = MqttMessageFactory.newMqttFixedHeader(messageType, dupFlag, qosLevel, retain);
+ unit.mqttMessage = MqttMessageFactory.newMessage(mqttFixedHeader);
+ unit.mqttMessage.setRemainingLength(remainingLength);
+ //非MqttConnectMessage对象为null,
+ if (unit.mqttMessage.getVersion() == null) {
+ unit.mqttMessage.setVersion(attachment.get(MQTT_VERSION_ATTACH_KEY));
+ }
- unit.state = READ_VARIABLE_HEADER;
+ unit.state = READ_VARIABLE_HEADER;
}
case READ_VARIABLE_HEADER: {
- int remainingLength = unit.mqttMessage.getFixedHeader().remainingLength();
+ int remainingLength = unit.mqttMessage.getRemainingLength();
if (remainingLength > maxBytesInMessage) {
throw new DecoderException("too large message: " + remainingLength + " bytes");
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java
index 42428f63bac43eafcf1251a7aa9f4ecd51383615..de90f3edd7bca7bb9afc25f0a1ad53993a0d6cd4 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java
@@ -3,8 +3,11 @@ package org.smartboot.mqtt.common.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.AbstractSession;
+import org.smartboot.mqtt.common.message.MqttCodecUtil;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -17,6 +20,8 @@ public class MqttUtil {
*/
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
+ private static final Map cache = new ConcurrentHashMap<>();
+
public static boolean containsTopicWildcards(String topicName) {
for (char c : TOPIC_WILDCARDS) {
if (topicName.indexOf(c) >= 0) {
@@ -38,4 +43,8 @@ public class MqttUtil {
return "";
}
}
+
+ public static byte[] encodeCache(String topicName) {
+ return cache.computeIfAbsent(topicName, s -> MqttCodecUtil.encodeUTF8(topicName));
+ }
}