diff --git a/docker-compose.yml b/docker-compose.yml index 3543fe0f74eae2550fc2e4ab5b75b7df9744137e..f00444ced09f21ca9ea779ee8e7bcdc2a33f4e80 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ networks: mqtt-network: driver: bridge services: - smart-mqtt: + mqtt-broker: container_name: smart-mqtt hostname: mqtt-broker image: smartboot/smart-mqtt:latest @@ -18,7 +18,8 @@ services: options: max-size: "100m" max-file: "1" -# emqx: + +# mqtt-broker: # container_name: emqx # hostname: mqtt-broker # image: emqx/emqx:5.0.3 @@ -36,8 +37,7 @@ services: smart-mqtt-bench: depends_on: -# - emqx - - smart-mqtt + - mqtt-broker image: smartboot/smart-mqtt-bench:latest read_only: true restart: always @@ -51,6 +51,6 @@ services: options: max-size: "100m" max-file: "1" - command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dpublisher=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe -# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish + command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe +# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file diff --git a/pom.xml b/pom.xml index df998158249bafaf5a7c7f611c3dc20344b16ee2..dd9ff4f03d4613468096461d6271b6ba7be8aa14 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.15 + 0.16 4.0.0 mqtt broker - 0.15 + 0.16 1.5.25 1.1.22 2.6 diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index e594976cb02069091f854df52f6b3089d4edad91..6be6788478bf8572d221d0159384caebfc66ac5a 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.15 + 0.16 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 6f0e231c11a1e567a0b03347cb4427a6b2c59d1f..c06ab594e5b6b898d081a66381671326204679de 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -28,7 +28,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.15"; + public static final String VERSION = "v0.16"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 24852427dc94c93de6d5ae422afe15adba679833..7a198172800ed495fce239b3edebf44254ebede9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -20,7 +20,6 @@ import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.MqttMessageBuilders; import org.smartboot.mqtt.common.enums.MqttMetricEnum; -import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventBusImpl; @@ -36,6 +35,7 @@ import org.smartboot.mqtt.common.to.MetricItemTO; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; +import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; import org.smartboot.socket.extension.plugins.AbstractPlugin; import org.smartboot.socket.transport.AioQuickServer; import org.smartboot.socket.transport.AioSession; @@ -44,6 +44,7 @@ import org.yaml.snakeyaml.Yaml; import java.io.IOException; import java.io.InputStream; import java.lang.management.ManagementFactory; +import java.nio.channels.AsynchronousChannelGroup; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -58,6 +59,7 @@ import java.util.ServiceLoader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -109,6 +111,7 @@ public class BrokerContextImpl implements BrokerContext { * 统计指标 */ private final Map metricMap = new HashMap<>(); + private AsynchronousChannelGroup asynchronousChannelGroup; @Override public void init() throws IOException { @@ -129,10 +132,18 @@ public class BrokerContextImpl implements BrokerContext { try { + asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + int i; + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "smart-mqtt-broker-" + (++i)); + } + }); pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); - server.start(); + server.start(asynchronousChannelGroup); System.out.println(BrokerConfigure.BANNER + "\r\n :: smart-mqtt broker" + "::\t(" + BrokerConfigure.VERSION + ")"); System.out.println("❤️Gitee: https://gitee.com/smartboot/smart-mqtt"); System.out.println("Github: https://github.com/smartboot/smart-mqtt"); @@ -230,6 +241,8 @@ public class BrokerContextImpl implements BrokerContext { providers.setConnectAuthenticationProvider(new ConfiguredConnectAuthenticationProviderImpl(this)); } + private final TopicSubscriber BREAK = new TopicSubscriber(null, null, null, 0, 0); + private void initPushThread() { if (brokerConfigure.getTopicLimit() <= 0) { brokerConfigure.setTopicLimit(10); @@ -269,15 +282,24 @@ public class BrokerContextImpl implements BrokerContext { } try { //存在待输出消息 - Collection subscribers = brokerTopic.getConsumeOffsets().values(); - subscribers.stream().filter(topicSubscriber -> topicSubscriber.isReady() && topicSubscriber.getPushVersion() != brokerTopic.getVersion().get()).forEach(topicSubscriber -> topicSubscriber.batchPublish(BrokerContextImpl.this)); - brokerTopic.setPushing(false); - for (TopicSubscriber subscriber : subscribers) { - if (subscriber.getPushVersion() != brokerTopic.getVersion().get()) { - notifyPush(brokerTopic); - break; + ConcurrentLinkedQueue subscribers = brokerTopic.getQueue(); + subscribers.offer(BREAK); + TopicSubscriber subscriber = null; + int version = brokerTopic.getVersion().get(); + while ((subscriber = subscribers.poll()) != BREAK) { +// if (subscriber == BREAK) { +// break; +// } + try { + subscriber.batchPublish(BrokerContextImpl.this); + } catch (Exception e) { + LOGGER.error("batch publish exception:{}", e.getMessage()); } } + brokerTopic.getSemaphore().release(); + if (version != brokerTopic.getVersion().get() && !subscribers.isEmpty()) { + notifyPush(brokerTopic); + } } catch (Exception e) { LOGGER.error("brokerTopic:{} push message exception", brokerTopic.getTopic(), e); } @@ -333,8 +355,8 @@ public class BrokerContextImpl implements BrokerContext { AsyncTask task = this; PersistenceMessage storedMessage = providers.getRetainMessageProvider().get(subscriber.getTopic().getTopic(), subscriber.getRetainConsumerOffset()); if (storedMessage == null || storedMessage.getCreateTime() > subscriber.getLatestSubscribeTime()) { - subscriber.setReady(true); BrokerTopic topic = subscriber.getTopic(); + topic.getQueue().offer(subscriber); notifyPush(topic); //完成retain消息的消费,正式开始监听Topic @@ -344,51 +366,35 @@ public class BrokerContextImpl implements BrokerContext { MqttSession session = subscriber.getMqttSession(); MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(storedMessage.getPayload()).qos(subscriber.getMqttQoS()).topicName(storedMessage.getTopic()); - if (subscriber.getMqttQoS() == MqttQoS.AT_LEAST_ONCE || subscriber.getMqttQoS() == MqttQoS.EXACTLY_ONCE) { - publishBuilder.packetId(session.newPacketId()); - } if (session.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } - MqttPublishMessage publishMessage = publishBuilder.build(); InflightQueue inflightQueue = session.getInflightQueue(); - int index = inflightQueue.offer(publishMessage, storedMessage.getOffset()); - session.publish(publishMessage, packetId -> { - LOGGER.info("publish retain to client:{} success ,message:{} ", session.getClientId(), publishMessage); - long offset = inflightQueue.commit(index); - if (offset != -1) { - subscriber.setRetainConsumerOffset(offset + 1); - retainPushThreadPool.execute(task); - } else { - LOGGER.error("error..."); - } - - }); + inflightQueue.offer(publishBuilder, offset -> { + LOGGER.info("publish retain to client:{} success ", session.getClientId()); + subscriber.setRetainConsumerOffset(offset + 1); + retainPushThreadPool.execute(task); + }, storedMessage.getOffset()); + session.flush(); } }); } }); eventBus.subscribe(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, (eventType, subscriber) -> { LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getTopic().getTopic()); - subscriber.setReady(true); + subscriber.getTopic().getQueue().offer(subscriber); }); } - private void notifyPush(BrokerTopic topic) { - if (topic.isPushing()) { + void notifyPush(BrokerTopic topic) { + if (!topic.getSemaphore().tryAcquire()) { return; } - synchronized (topic) { - //已加入推送队列 - if (topic.isPushing()) { - return; - } - try { - topic.setPushing(true); - pushTopicQueue.put(topic); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + //已加入推送队列 + try { + pushTopicQueue.put(topic); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @@ -573,6 +579,7 @@ public class BrokerContextImpl implements BrokerContext { pushTopicQueue.offer(SHUTDOWN_TOPIC); pushThreadPool.shutdown(); server.shutdown(); + asynchronousChannelGroup.shutdown(); pagePool.release(); //卸载插件 plugins.forEach(Plugin::uninstall); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerQosPublisher.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerQosPublisher.java deleted file mode 100644 index 28f825d6af87a0d7f632cf97ea36e8b5f3620f11..0000000000000000000000000000000000000000 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerQosPublisher.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.smartboot.mqtt.broker; - -import org.smartboot.mqtt.common.AbstractSession; -import org.smartboot.mqtt.common.AsyncTask; -import org.smartboot.mqtt.common.QosPublisher; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.socket.util.QuickTimerTask; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2022/4/25 - */ -public class BrokerQosPublisher extends QosPublisher { - private final BrokerContext mqttContext; - - public BrokerQosPublisher(BrokerContext mqttContext) { - this.mqttContext = mqttContext; - } - - @Override - protected void retry(CompletableFuture future, AbstractSession session, MqttMessage mqttMessage) { - //注册重试 - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { - @Override - public void execute() { - if (!future.isDone()) { - // 如果客户端发生过断链,则 mqttSession!=session - System.out.println("retry..."); - MqttSession mqttSession = mqttContext.getSession(session.getClientId()); - mqttSession.write(mqttMessage); - } - } - }, 1, TimeUnit.SECONDS); - } -} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java index aa646f314bc6e670def34b174951402618cc229f..c127ee452c7fd560c5d965b1c1a239cf2323d8de 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerTopic.java @@ -4,6 +4,8 @@ import org.smartboot.mqtt.common.Topic; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; /** @@ -21,7 +23,9 @@ public class BrokerTopic extends Topic { /** * 当前Topic是否圈闭推送完成 */ - private boolean pushing; + private final Semaphore semaphore = new Semaphore(1); + + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); public BrokerTopic(String topic) { super(topic); @@ -35,11 +39,11 @@ public class BrokerTopic extends Topic { return version; } - public boolean isPushing() { - return pushing; + public Semaphore getSemaphore() { + return semaphore; } - public void setPushing(boolean pushing) { - this.pushing = pushing; + public ConcurrentLinkedQueue getQueue() { + return queue; } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 8bcf6fe69f93f84edecc9fc8e9b1f08724cfc438..048732d41541778f69ef5dc87240932f7538dbfe 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -11,7 +11,6 @@ import org.smartboot.mqtt.broker.processor.PublishProcessor; import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.common.DefaultMqttWriter; -import org.smartboot.mqtt.common.QosPublisher; import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; @@ -51,7 +50,6 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor onlineSessions = new ConcurrentHashMap<>(); private final Map, MqttProcessor> processorMap = new HashMap<>(); - private final QosPublisher qosPublisher; { processorMap.put(MqttPingReqMessage.class, new PingReqProcessor()); @@ -68,7 +66,6 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor sessionState.getSubscribers().put(topicSubscriber.getTopicFilterToken().getTopicFilter(), topicSubscriber.getMqttQoS())); mqttContext.getProviders().getSessionStateProvider().store(clientId, sessionState); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 8dd931c5480760d00221478305d792ee95fe85a8..2e2ce334a96e9801ab616021d4fd268f1ffe46ce 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -10,9 +10,10 @@ import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventType; -import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; +import java.util.concurrent.Semaphore; + /** * Topic订阅者 * @@ -47,9 +48,8 @@ public class TopicSubscriber { private final long latestSubscribeTime = System.currentTimeMillis(); private TopicToken topicFilterToken; - private int pushVersion = -1; - private boolean ready = false; + private final Semaphore semaphore = new Semaphore(0); public TopicSubscriber(BrokerTopic topic, MqttSession session, MqttQoS mqttQoS, long nextConsumerOffset, long retainConsumerOffset) { this.topic = topic; @@ -60,62 +60,65 @@ public class TopicSubscriber { } public void batchPublish(BrokerContext brokerContext) { - nextConsumerOffset = publish0(brokerContext, 0, nextConsumerOffset); + if (mqttSession.isDisconnect()) { + return; + } + semaphore.release(); + publish0(brokerContext, 0); + mqttSession.flush(); } - private long publish0(BrokerContext brokerContext, int depth, long expectConsumerOffset) { + private void publish0(BrokerContext brokerContext, int depth) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); - int version = topic.getVersion().get(); - PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), expectConsumerOffset); + PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { - pushVersion = version; - mqttSession.flush(); - return expectConsumerOffset; + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + } + return; } if (depth > 16) { - mqttSession.flush(); -// System.out.println("退出递归..."); - return expectConsumerOffset; +// LOGGER.info("退出递归..."); + return; } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic()); - if (mqttQoS == MqttQoS.AT_LEAST_ONCE || mqttQoS == MqttQoS.EXACTLY_ONCE) { - publishBuilder.packetId(mqttSession.newPacketId()); - } if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } - MqttPublishMessage publishMessage = publishBuilder.build(); - InflightQueue inflightQueue = mqttSession.getInflightQueue(); - int index = inflightQueue.offer(publishMessage, persistenceMessage.getOffset()); - // 飞行队列已满 - if (index == -1) { - mqttSession.flush(); -// System.out.println("queue is full..." + expectConsumerOffset); - return expectConsumerOffset; - } - long start = System.currentTimeMillis(); - mqttSession.publish(publishMessage, packetId -> { - //最早发送的消息若收到响应,则更新点位 - long offset = inflightQueue.commit(index); - if (offset == -1) { - return; + boolean suc = inflightQueue.offer(publishBuilder, offset -> { + if (mqttQoS == MqttQoS.AT_MOST_ONCE) { + nextConsumerOffset = persistenceMessage.getOffset() + 1; } + //最早发送的消息若收到响应,则更新点位 commitNextConsumerOffset(offset + 1); if (persistenceMessage.isRetained()) { setRetainConsumerOffset(getRetainConsumerOffset() + 1); } commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - }, false); +// if (inflightQueue.getCount() == 0) { + publish0(brokerContext, 0); +// } + }, persistenceMessage.getOffset()); + // 飞行队列已满 + if (!suc) { +// LOGGER.info("queue is full..." + expectConsumerOffset); + return; + } + long start = System.currentTimeMillis(); + if (mqttQoS != MqttQoS.AT_MOST_ONCE) { + nextConsumerOffset = persistenceMessage.getOffset() + 1; + } + long cost = System.currentTimeMillis() - start; if (cost > 100) { System.out.println("publish busy ,cost: " + cost); } brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, mqttSession); //递归处理下一个消息 - return publish0(brokerContext, ++depth, expectConsumerOffset + 1); + publish0(brokerContext, ++depth); } public BrokerTopic getTopic() { @@ -162,16 +165,4 @@ public class TopicSubscriber { public void setTopicFilterToken(TopicToken topicFilterToken) { this.topicFilterToken = topicFilterToken; } - - public boolean isReady() { - return ready; - } - - public void setReady(boolean ready) { - this.ready = ready; - } - - public int getPushVersion() { - return pushVersion; - } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java index 1ebfdb53da3060b7b3ad225b2ac90bd942093cfa..2542c27f43a036481992a5c291b0a2930acc5d1c 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ConnectIdleTimeMonitorSubscriber.java @@ -9,7 +9,6 @@ import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.socket.util.QuickTimerTask; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -22,21 +21,17 @@ public class ConnectIdleTimeMonitorSubscriber implements EventBusSubscriber map = new ConcurrentHashMap<>(); - public ConnectIdleTimeMonitorSubscriber(BrokerContext context) { this.context = context; } @Override public void subscribe(EventType eventType, MqttSession session) { - map.put(session, session); - context.getEventBus().subscribe(ServerEventType.CONNECT, (eventType1, object) -> map.remove(object.getSession())); QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { @Override public void execute() { - if (map.remove(session) != null) { - LOGGER.debug("长时间未收到客户端:{} 的Connect消息,连接断开!", session.getClientId()); + if (!session.isAuthorized()) { + LOGGER.info("长时间未收到客户端:{} 的Connect消息,连接断开!", session.getClientId()); session.disconnect(); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java index c141992e8a279f9a10567786ecf547f0ed507b83..45d55b5f35a7e9a79063862ac567bd928ed5e669 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/OpenApiPlugin.java @@ -11,6 +11,10 @@ import org.smartboot.mqtt.broker.openapi.controller.DashBoardController; import org.smartboot.mqtt.broker.openapi.controller.SubscriptionController; import org.smartboot.mqtt.broker.plugin.Plugin; import org.smartboot.mqtt.broker.plugin.PluginException; +import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; + +import java.nio.channels.AsynchronousChannelGroup; +import java.util.concurrent.ThreadFactory; /** * @author 三刀(zhengjunweimail@163.com) @@ -19,6 +23,9 @@ import org.smartboot.mqtt.broker.plugin.PluginException; public class OpenApiPlugin extends Plugin { private static final Logger LOGGER = LoggerFactory.getLogger(OpenApiPlugin.class); private static final String CONFIG_JSON_PATH = "$['broker']['openapi']"; + private RestfulBootstrap restfulBootstrap; + + private AsynchronousChannelGroup asynchronousChannelGroup; @Override protected void initPlugin(BrokerContext brokerContext) { @@ -28,7 +35,15 @@ public class OpenApiPlugin extends Plugin { return; } try { - RestfulBootstrap restfulBootstrap = RestfulBootstrap.getInstance(new StaticResourceHandler()); + asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + int i; + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "openApi-" + (++i)); + } + }); + restfulBootstrap = RestfulBootstrap.getInstance(new StaticResourceHandler()); restfulBootstrap.inspect((httpRequest, response) -> { response.setHeader("Access-Control-Allow-Origin", "*"); response.setHeader("Access-Control-Allow-Headers", "*"); @@ -39,7 +54,8 @@ public class OpenApiPlugin extends Plugin { HttpBootstrap bootstrap = restfulBootstrap.bootstrap(); bootstrap.setPort(config.getPort()); - bootstrap.configuration().bannerEnabled(false).host(config.getHost()).readBufferSize(1024 * 8); + bootstrap.configuration().bannerEnabled(false).host(config.getHost()).readBufferSize(1024 * 8).group(asynchronousChannelGroup); + bootstrap.start(); brokerContext.getProviders().setOpenApiBootStrap(restfulBootstrap); LOGGER.info("openapi server start success!"); @@ -48,4 +64,10 @@ public class OpenApiPlugin extends Plugin { throw new PluginException("start openapi exception"); } } + + @Override + protected void destroyPlugin() { + restfulBootstrap.bootstrap().shutdown(); + asynchronousChannelGroup.shutdown(); + } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 0160113f168799a7af4dd1738903a6fb0469e988..6f57c96e424411e70bb35daf5a5beef3918a202d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -80,7 +80,7 @@ public class ConnectProcessor implements MqttProcessor { } else { receiveMaximum = context.getBrokerConfigure().getMaxInflight(); } - session.setInflightQueue(new InflightQueue(receiveMaximum)); + session.setInflightQueue(new InflightQueue(session, receiveMaximum)); //如果服务端收到清理会话(CleanSession)标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0 之外, // 还必须将 CONNACK 报文中的当前会话设置(Session Present)标志为 0。 @@ -160,15 +160,11 @@ public class ConnectProcessor implements MqttProcessor { SessionStateProvider sessionStateProvider = context.getProviders().getSessionStateProvider(); SessionState sessionState = sessionStateProvider.get(session.getClientId()); if (sessionState != null) { - session.getResponseConsumers().putAll(sessionState.getResponseConsumers()); sessionState.getSubscribers().forEach(session::subscribe); //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 //服务端重发消息的情况。 - session.getResponseConsumers().forEach((key, ackMessage) -> { - session.getResponseConsumers().put(key, ackMessage); - session.write(ackMessage.getOriginalMessage()); - }); + } } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java index 4b17dc2f4c8cec0f6ef37d1928c806082ba2f6ff..ed4b7b1713c445e03ebf5de2f454cbb7803fca92 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java @@ -2,14 +2,13 @@ package org.smartboot.mqtt.broker.processor; import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.MqttSession; -import org.smartboot.mqtt.common.message.MqttVariableMessage; -import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; /** * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/4/15 */ -public class MqttAckProcessor> extends AuthorizedMqttProcessor { +public class MqttAckProcessor extends AuthorizedMqttProcessor { @Override public void process0(BrokerContext context, MqttSession session, T t) { session.notifyResponse(t); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index 2c3778038eccad88ea2037a406abd6e888039a87..d997d6ab8fe746df9e532a3e5bb42c85967c5f03 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -13,13 +13,10 @@ import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; -import java.util.function.Consumer; - /** * 发布Topic * @@ -78,7 +75,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor) message -> { + session.write(pubRecMessage, message -> { //发送pubRel消息。 //todo MqttPubQosVariableHeader qosVariableHeader; @@ -116,7 +114,7 @@ public class PublishProcessor extends AuthorizedMqttProcessor smart-mqtt org.smartboot.mqtt - 0.15 + 0.16 ../pom.xml 4.0.0 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/ClientQosPublisher.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/ClientQosPublisher.java deleted file mode 100644 index 9feb5a13c8429e61e1ee7401d773d0bbf64ca349..0000000000000000000000000000000000000000 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/ClientQosPublisher.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.smartboot.mqtt.client; - -import org.smartboot.mqtt.common.AbstractSession; -import org.smartboot.mqtt.common.AsyncTask; -import org.smartboot.mqtt.common.QosPublisher; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.socket.util.QuickTimerTask; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2022/4/25 - */ -public class ClientQosPublisher extends QosPublisher { - - @Override - protected void retry(CompletableFuture future, AbstractSession session, MqttMessage mqttMessage) { - //注册重试 - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { - @Override - public void execute() { - if (!future.isDone()) { - session.write(mqttMessage); - } - } - }, 1, TimeUnit.SECONDS); - } -} diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index e5f3c80a893eeac827cdd0b9849803b9bb90cdb3..e42ba2a2d25309f4f6df2a79f66da0c78dc07138 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -4,7 +4,6 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; -import org.smartboot.mqtt.common.AckMessage; import org.smartboot.mqtt.common.DefaultMqttWriter; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.MqttMessageBuilders; @@ -18,6 +17,7 @@ import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPingReqMessage; import org.smartboot.mqtt.common.message.MqttPingRespMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; @@ -29,6 +29,7 @@ import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.payload.MqttConnectPayload; import org.smartboot.mqtt.common.message.payload.WillMessage; import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; @@ -57,7 +58,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; public class MqttClient extends AbstractSession { - private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(MqttClient.class); /** * 客户端配置项 */ @@ -95,7 +96,7 @@ public class MqttClient extends AbstractSession { } public MqttClient(String host, int port, String clientId, MqttVersion mqttVersion) { - super(new ClientQosPublisher(), new EventBusImpl(EventType.types())); + super(new EventBusImpl(EventType.types())); clientConfigure.setHost(host); clientConfigure.setPort(port); clientConfigure.setMqttVersion(mqttVersion); @@ -154,7 +155,7 @@ public class MqttClient extends AbstractSession { //连接成功,注册订阅消息 if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(16)); + setInflightQueue(new InflightQueue(this, 16)); connected = true; Runnable runnable; while ((runnable = registeredTasks.poll()) != null) { @@ -169,7 +170,7 @@ public class MqttClient extends AbstractSession { //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 //服务端重发消息的情况。 if (!clientConfigure.isCleanSession()) { - responseConsumers.values().forEach(ackMessage -> write(ackMessage.getOriginalMessage())); + //todo } consumer.accept(mqttConnAckMessage); connected = true; @@ -282,13 +283,16 @@ public class MqttClient extends AbstractSession { } MqttUnsubscribeMessage unsubscribedMessage = unsubscribeBuilder.build(properties); // wait ack message. - responseConsumers.put(unsubscribedMessage.getVariableHeader().getPacketId(), new AckMessage(unsubscribedMessage, mqttMessage -> { - ValidateUtils.isTrue(mqttMessage instanceof MqttUnsubAckMessage, "uncorrected message type."); - for (String unsubscribedTopic : unsubscribedTopics) { - subscribes.remove(unsubscribedTopic); - wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); + responseConsumers.put(unsubscribedMessage.getVariableHeader().getPacketId(), new Consumer>() { + @Override + public void accept(MqttPacketIdentifierMessage message) { + ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); + for (String unsubscribedTopic : unsubscribedTopics) { + subscribes.remove(unsubscribedTopic); + wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); + } } - })); + }); write(unsubscribedMessage); } @@ -326,26 +330,30 @@ public class MqttClient extends AbstractSession { } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - responseConsumers.put(subscribeMessage.getVariableHeader().getPacketId(), new AckMessage(subscribeMessage, mqttMessage -> { - List qosValues = ((MqttSubAckMessage) mqttMessage).getPayload().grantedQoSLevels(); - ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); - int i = 0; - for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) { - MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++))); - clientConfigure.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos); - if (subscription.getQualityOfService() != MqttQoS.FAILURE) { - subscribes.put(subscription.getTopicFilter(), new Subscribe(subscription.getTopicFilter(), minQos, consumer)); - //缓存统配匹配的topic - TopicToken topicToken = new TopicToken(subscription.getTopicFilter()); - if (topicToken.isWildcards()) { - wildcardsToken.add(topicToken); + responseConsumers.put(subscribeMessage.getVariableHeader().getPacketId(), new Consumer>() { + @Override + public void accept(MqttPacketIdentifierMessage message) { + List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); + ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); + ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); + int i = 0; + for (MqttTopicSubscription subscription : subscribeMessage.getPayload().getTopicSubscriptions()) { + MqttQoS minQos = MqttQoS.valueOf(Math.min(subscription.getQualityOfService().value(), qosValues.get(i++))); + clientConfigure.getTopicListener().subscribe(subscription.getTopicFilter(), subscription.getQualityOfService() == MqttQoS.FAILURE ? MqttQoS.FAILURE : minQos); + if (subscription.getQualityOfService() != MqttQoS.FAILURE) { + subscribes.put(subscription.getTopicFilter(), new Subscribe(subscription.getTopicFilter(), minQos, consumer)); + //缓存统配匹配的topic + TopicToken topicToken = new TopicToken(subscription.getTopicFilter()); + if (topicToken.isWildcards()) { + wildcardsToken.add(topicToken); + } + } else { + LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); } - } else { - LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); + subAckConsumer.accept(MqttClient.this, minQos); } - subAckConsumer.accept(this, minQos); } - })); + }); write(subscribeMessage); } @@ -373,46 +381,39 @@ public class MqttClient extends AbstractSession { public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, Consumer consumer) { MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().topicName(topic).qos(qos).payload(payload).retained(retain); - if (qos.value() > 0) { - int packetId = newPacketId(); - publishBuilder.packetId(packetId); - } //todo if (getMqttVersion() == MqttVersion.MQTT_5) { publishBuilder.publishProperties(new PublishProperties()); } - MqttPublishMessage message = publishBuilder.build(); if (connected) { - publish(message, consumer); + publish(publishBuilder, consumer); } else { - registeredTasks.offer(() -> publish(message, consumer)); + registeredTasks.offer(() -> publish(publishBuilder, consumer)); } } - @Override - public synchronized void publish(MqttPublishMessage message, Consumer consumer) { + private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) { InflightQueue inflightQueue = getInflightQueue(); - int index = inflightQueue.offer(message, 1); - if (index == -1) { - try { - wait(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - publish(message, consumer); - return; - } - super.publish(message, packetId -> { - consumer.accept(packetId); + boolean suc = inflightQueue.offer(publishBuilder, offset -> { + consumer.accept(publishBuilder.getPacketId()); //最早发送的消息若收到响应,则更新点位 - long offset = inflightQueue.commit(index); if (offset != -1) { synchronized (MqttClient.this) { MqttClient.this.notifyAll(); } } - - }, false); + }, 1); + flush(); + if (!suc) { + try { + synchronized (this) { + wait(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + publish(publishBuilder, consumer); + } } public MqttClientConfigure getClientConfigure() { diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java index cfc786a2d6e9c84c97714a13f08d48975cbfce1a..798f3c324a8e6e0413b84deab6b13dbb7454fc0e 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java @@ -32,24 +32,24 @@ import java.util.Map; public class MqttClientProcessor extends AbstractMessageProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(MqttClientProcessor.class); private final MqttClient mqttClient; - private final Map, MqttProcessor> processorMap = new HashMap<>(); + private static final Map, MqttProcessor> processors = new HashMap<>(); public MqttClientProcessor(MqttClient mqttClient) { this.mqttClient = mqttClient; - processorMap.put(MqttConnAckMessage.class, new ConnAckProcessor()); - processorMap.put(MqttPubAckMessage.class, new MqttAckProcessor()); - processorMap.put(MqttPublishMessage.class, new PublishProcessor()); - processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor()); - processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor()); - processorMap.put(MqttPubRelMessage.class, new MqttAckProcessor()); - processorMap.put(MqttSubAckMessage.class, new MqttAckProcessor()); - processorMap.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); + processors.put(MqttConnAckMessage.class, new ConnAckProcessor()); + processors.put(MqttPubAckMessage.class, new MqttAckProcessor()); + processors.put(MqttPublishMessage.class, new PublishProcessor()); + processors.put(MqttPubRecMessage.class, new MqttAckProcessor()); + processors.put(MqttPubCompMessage.class, new MqttAckProcessor()); + processors.put(MqttPubRelMessage.class, new MqttAckProcessor()); + processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); + processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); } @Override public void process0(AioSession session, MqttMessage msg) { mqttClient.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(mqttClient, msg)); - MqttProcessor processor = processorMap.get(msg.getClass()); + MqttProcessor processor = processors.get(msg.getClass()); // LOGGER.info("receive msg:{}", msg); if (processor != null) { processor.process(mqttClient, msg); diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java index 44d8778ea5cb877211fbba7c073148b1c225acf2..e7bef87cc63f11bf0e35242753988058d6590248 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java @@ -1,14 +1,13 @@ package org.smartboot.mqtt.client.processor; import org.smartboot.mqtt.client.MqttClient; -import org.smartboot.mqtt.common.message.MqttVariableMessage; -import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; /** * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/4/7 */ -public class MqttAckProcessor> implements MqttProcessor { +public class MqttAckProcessor implements MqttProcessor { @Override public void process(MqttClient mqttClient, T message) { mqttClient.notifyResponse(message); diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java index f3338c4b1116742f80e95341c16e75b582dc0ca3..dd45017987b38a5549f01d4f0a36b4335a104c8f 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java @@ -10,15 +10,12 @@ import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; import org.smartboot.mqtt.common.util.TopicTokenUtil; -import java.util.function.Consumer; - /** * 发布Topic * @@ -82,7 +79,7 @@ public class PublishProcessor implements MqttProcessor { } MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), properties); MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(variableHeader); - mqttClient.write(pubAckMessage); + mqttClient.write(pubAckMessage, false); } private void processQos2(MqttClient session, MqttPublishMessage mqttPublishMessage) { @@ -95,7 +92,7 @@ public class PublishProcessor implements MqttProcessor { MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(messageId, properties); MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader); - session.write(pubRecMessage, (Consumer) message -> { + session.write(pubRecMessage, message -> { //todo ReasonProperties reasonProperties = null; if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) { @@ -103,7 +100,7 @@ public class PublishProcessor implements MqttProcessor { } MqttPubQosVariableHeader qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), reasonProperties); MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); - session.write(pubRelMessage); + session.write(pubRelMessage, false); processPublishMessage(mqttPublishMessage, session); }); diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index f0a3e11ba9fa5da7376235caf90581613152e0a0..6203e97b48a5242c242d94538b43ea303c464c88 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.15 + 0.16 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 37f0dfe1fa5218ccdd7a47ec1a3a37c73775d455..5ab9f5294b542b024eaabc17824c1376fe3ea890 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -2,14 +2,14 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; -import org.smartboot.mqtt.common.message.MqttVariableMessage; +import org.smartboot.mqtt.common.message.MqttPubQosMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.ValidateUtils; @@ -30,11 +30,6 @@ import java.util.function.Consumer; public abstract class AbstractSession { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class); private static final int QOS0_PACKET_ID = 0; - /** - * req-resp 消息模式的处理回调 - */ - protected final Map responseConsumers = new ConcurrentHashMap<>(); - private final QosPublisher qosPublisher; /** * 用于生成当前会话的报文标识符 */ @@ -60,26 +55,22 @@ public abstract class AbstractSession { private MqttVersion mqttVersion; private InflightQueue inflightQueue; + protected Map>> responseConsumers = new ConcurrentHashMap<>(); - public AbstractSession(QosPublisher publisher, EventBus eventBus) { - this.qosPublisher = publisher; + public AbstractSession(EventBus eventBus) { this.eventBus = eventBus; } - public final synchronized void write(MqttPacketIdentifierMessage mqttMessage, Consumer> consumer) { - responseConsumers.put(mqttMessage.getVariableHeader().getPacketId(), new AckMessage(mqttMessage, consumer)); - write(mqttMessage); - } - - public Map getResponseConsumers() { - return responseConsumers; + public final void write(MqttPacketIdentifierMessage mqttMessage, Consumer> consumer) { + responseConsumers.put(mqttMessage.getVariableHeader().getPacketId(), consumer); + write(mqttMessage, false); } - public final void notifyResponse(MqttVariableMessage message) { - AckMessage ackMessage = responseConsumers.remove(message.getVariableHeader().getPacketId()); - if (ackMessage != null) { - ackMessage.setDone(true); - ackMessage.getConsumer().accept(message); + public final void notifyResponse(MqttPacketIdentifierMessage message) { + if (message instanceof MqttPubQosMessage && message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) { + inflightQueue.notify((MqttPubQosMessage) message); + } else { + responseConsumers.remove(message.getVariableHeader().getPacketId()).accept(message); } } @@ -102,7 +93,7 @@ public abstract class AbstractSession { } } - public final synchronized void write(MqttMessage mqttMessage) { + public final void write(MqttMessage mqttMessage) { write(mqttMessage, true); } @@ -112,32 +103,6 @@ public abstract class AbstractSession { } } - public void publish(MqttPublishMessage message, Consumer consumer) { - publish(message, consumer, true); - } - - /** - * 若发送的Qos为0,则回调的consumer packetId为0 - */ - public void publish(MqttPublishMessage message, Consumer consumer, boolean autoFlush) { -// LOGGER.info("publish to client:{}, topic:{} packetId:{}", clientId, message.getMqttPublishVariableHeader().topicName(), message.getMqttPublishVariableHeader().packetId()); - switch (message.getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - try { - write(message, autoFlush); - } finally { - consumer.accept(QOS0_PACKET_ID); - } - break; - case AT_LEAST_ONCE: - qosPublisher.publishQos1(this, message, consumer, autoFlush); - break; - case EXACTLY_ONCE: - qosPublisher.publishQos2(this, message, consumer, autoFlush); - break; - } - } - public long getLatestSendMessageTime() { return latestSendMessageTime; } @@ -163,9 +128,6 @@ public abstract class AbstractSession { packetIdCreator.set(0); return newPacketId(); } - if (responseConsumers.containsKey(packageId)) { - return newPacketId(); - } return packageId; } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java index a2600709fa5c8e8485887c048a05b448a309b6d8..9d480c6d9d37d930e8e6233c7e3f371d1ca80f3f 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AckMessage.java @@ -1,8 +1,8 @@ package org.smartboot.mqtt.common; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; -import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; +import org.smartboot.mqtt.common.enums.MqttMessageType; +import org.smartboot.mqtt.common.enums.MqttQoS; +import org.smartboot.mqtt.common.message.MqttPublishMessage; import java.util.function.Consumer; @@ -14,40 +14,62 @@ public class AckMessage { /** * 原始消息 */ - private final MqttMessage originalMessage; + private final MqttPublishMessage originalMessage; + + private MqttMessageType expectMessageType; /** * 回调事件 */ - private Consumer> consumer; + private final Consumer consumer; - /** - * 执行状态 - */ - private boolean done; + private final long offset; - public AckMessage(MqttMessage originalMessage, Consumer> consumer) { + private boolean commit; + + private final int packetId; + + public AckMessage(MqttPublishMessage originalMessage, int packetId, Consumer consumer, long offset) { this.originalMessage = originalMessage; this.consumer = consumer; + this.offset = offset; + this.packetId = packetId; + if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE) { + this.expectMessageType = MqttMessageType.PUBACK; + } else if (originalMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE) { + this.expectMessageType = MqttMessageType.PUBREC; + } } - public MqttMessage getOriginalMessage() { + public MqttPublishMessage getOriginalMessage() { return originalMessage; } - public Consumer getConsumer() { + public Consumer getConsumer() { return consumer; } - public void setConsumer(Consumer> consumer) { - this.consumer = consumer; + public MqttMessageType getExpectMessageType() { + return expectMessageType; + } + + public void setExpectMessageType(MqttMessageType expectMessageType) { + this.expectMessageType = expectMessageType; + } + + public long getOffset() { + return offset; + } + + public boolean isCommit() { + return commit; } - public boolean isDone() { - return done; + public void setCommit(boolean commit) { + this.commit = commit; } - public void setDone(boolean done) { - this.done = done; + public int getPacketId() { + return packetId; } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index b2e5d444b8b586803bf236382d939c81c3de44ee..fa38e953040ce61fcfc95f6ee6ab083e8debd241 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -1,76 +1,129 @@ package org.smartboot.mqtt.common; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.common.enums.MqttMessageType; +import org.smartboot.mqtt.common.enums.MqttQoS; +import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.message.MqttPubQosMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; import org.smartboot.mqtt.common.util.ValidateUtils; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; /** * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/4/26 */ public class InflightQueue { - private final MqttPublishMessage[] queue; - private final long[] offsets; + private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class); + private final AckMessage[] queue; private int takeIndex; private int putIndex; private int count; - private final ReentrantLock lock = new ReentrantLock(false); - private final Condition notFull = lock.newCondition(); + private final AtomicInteger packetId = new AtomicInteger(0); - public InflightQueue(int size) { + private final AbstractSession session; + + public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); - this.queue = new MqttPublishMessage[size]; - this.offsets = new long[size]; + this.queue = new AckMessage[size]; + this.session = session; } - public int offer(MqttPublishMessage mqttMessage, long offset) { - lock.lock(); - try { + public boolean offer(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer, long offset) { + int id = 0; + MqttPublishMessage mqttMessage; + synchronized (this) { if (count == queue.length) { - return -1; + return false; + } + id = packetId.incrementAndGet(); + // 16位无符号最大值65535 + if (id > 65535) { + id = id % queue.length + queue.length; + packetId.set(id); } - queue[putIndex] = mqttMessage; - offsets[putIndex] = offset; - int index = putIndex++; + publishBuilder.packetId(id); + mqttMessage = publishBuilder.build(); + queue[putIndex++] = new AckMessage(mqttMessage, id, consumer, offset); if (putIndex == queue.length) { putIndex = 0; } count++; - return index; - } finally { - lock.unlock(); +// System.out.println("publish..."); + } + session.write(mqttMessage, false); + // QOS直接响应 + if (mqttMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_MOST_ONCE) { + long offset1 = commit(id); +// ValidateUtils.isTrue(offset1 == -1 || offset1 == offset, "invalid offset"); + consumer.accept(offset); + } + return true; } - public long commit(int commitIndex) { - lock.lock(); - try { - if (commitIndex != takeIndex) { - //转负数表示以提交 - offsets[commitIndex] = offsets[commitIndex] | Long.MIN_VALUE; - return -1; + public void notify(MqttPubQosMessage message) { + AckMessage ackMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == ackMessage.getExpectMessageType(), "invalid message type"); + switch (message.getFixedHeader().getMessageType()) { + case PUBACK: { + long offset = commit(message.getVariableHeader().getPacketId()); + ackMessage.getConsumer().accept(offset); + break; } - long offset = offsets[takeIndex++]; - count--; + case PUBREC: + ackMessage.setExpectMessageType(MqttMessageType.PUBCOMP); + //todo + ReasonProperties properties = null; + if (message.getVersion() == MqttVersion.MQTT_5) { + properties = new ReasonProperties(); + } + MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); + MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); + session.write(pubRelMessage, false); + break; + case PUBCOMP: + long offset = commit(message.getVariableHeader().getPacketId()); + ackMessage.getConsumer().accept(offset); + break; + default: + throw new RuntimeException(); + } + } + + private synchronized long commit(int packetId) { + int commitIndex = (packetId - 1) % queue.length; + AckMessage ackMessage = queue[commitIndex]; + ValidateUtils.isTrue(ackMessage.getPacketId() == packetId, "invalid message"); + ackMessage.setCommit(true); + + if (commitIndex != takeIndex) { + return -1; + } + queue[takeIndex++] = null; + count--; + if (takeIndex == queue.length) { + takeIndex = 0; + } + while (count > 0 && queue[takeIndex].isCommit()) { + ackMessage = queue[takeIndex]; + queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; } - while (count > 0 && offsets[takeIndex] < 0) { - offset = offsets[takeIndex] & ~Long.MIN_VALUE; - offsets[takeIndex] = 0; - queue[takeIndex++] = null; - if (takeIndex == queue.length) { - takeIndex = 0; - } - count--; - } - notFull.signal(); - return offset; - } finally { - lock.unlock(); + count--; } + return ackMessage.getOffset(); + } + + public int getCount() { + return count; } -} +} \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java index 52763a8556e59f956a2ba9ade36eb41b895932aa..f70d9843199c96a15498a4b746262aacbfee3e38 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttMessageBuilders.java @@ -78,8 +78,12 @@ public final class MqttMessageBuilders { return this; } + public int getPacketId() { + return packetId; + } + public MqttPublishMessage build() { - MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0); + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained); if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) { packetId = -1; } @@ -119,7 +123,7 @@ public final class MqttMessageBuilders { } public MqttSubscribeMessage build() { - MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false); MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(); mqttSubscribePayload.setTopicSubscriptions(subscriptions); MqttSubscribeVariableHeader variableHeader = new MqttSubscribeVariableHeader(packetId, subscribeProperties); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java deleted file mode 100644 index 15ca5c73983ec5c04f7487ac71c56649313b4a47..0000000000000000000000000000000000000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosPublisher.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.smartboot.mqtt.common; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.common.enums.MqttMessageType; -import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.enums.MqttVersion; -import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; -import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; -import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; -import org.smartboot.mqtt.common.util.ValidateUtils; - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - -public abstract class QosPublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(QosPublisher.class); - - void publishQos1(AbstractSession session, MqttPublishMessage publishMessage, Consumer consumer, boolean autoFlush) { - MqttQoS qos = publishMessage.getFixedHeader().getQosLevel(); - ValidateUtils.notNull(qos == MqttQoS.AT_LEAST_ONCE, "qos is null"); - CompletableFuture future = new CompletableFuture<>(); - Integer cacheKey = publishMessage.getVariableHeader().getPacketId(); - //至少一次 - - session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBACK, "invalid message type"); - future.complete(true); - session.responseConsumers.remove(cacheKey); - LOGGER.info("Qos1消息发送成功..."); - consumer.accept(publishMessage.getVariableHeader().getPacketId()); - })); - session.write(publishMessage, autoFlush); - //注册重试 - retry(future, session, publishMessage); - } - - void publishQos2(AbstractSession session, MqttPublishMessage publishMessage, Consumer consumer, boolean autoFlush) { - MqttQoS qos = publishMessage.getFixedHeader().getQosLevel(); - ValidateUtils.notNull(qos == MqttQoS.EXACTLY_ONCE, "qos is null"); - Integer cacheKey = publishMessage.getVariableHeader().getPacketId(); - CompletableFuture publishFuture = new CompletableFuture<>(); - //只有一次 - session.responseConsumers.put(cacheKey, new AckMessage(publishMessage, message -> { - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == MqttMessageType.PUBREC, "invalid message type"); - ValidateUtils.isTrue(Objects.equals(message.getVariableHeader().getPacketId(), publishMessage.getVariableHeader().getPacketId()), "invalid packetId"); - publishFuture.complete(true); - - //todo - ReasonProperties properties = null; - if (message.getVersion() == MqttVersion.MQTT_5) { - properties = new ReasonProperties(); - } - MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); - MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); - CompletableFuture pubRelFuture = new CompletableFuture<>(); - session.responseConsumers.put(cacheKey, new AckMessage(pubRelMessage, compMessage -> { - ValidateUtils.isTrue(compMessage.getFixedHeader().getMessageType() == MqttMessageType.PUBCOMP, "invalid message type"); - ValidateUtils.isTrue(Objects.equals(compMessage.getVariableHeader().getPacketId(), pubRelMessage.getVariableHeader().getPacketId()), "invalid packetId"); - pubRelFuture.complete(true); - LOGGER.info("Qos2消息发送成功..."); - consumer.accept(compMessage.getVariableHeader().getPacketId()); - })); - //注册重试 - retry(pubRelFuture, session, pubRelMessage); - session.write(pubRelMessage); - })); - session.write(publishMessage, false); - retry(publishFuture, session, publishMessage); - } - - protected abstract void retry(CompletableFuture future, AbstractSession session, MqttMessage mqttMessage); -} \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/QosCallbackTypeEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/QosCallbackTypeEnum.java deleted file mode 100644 index 0b0c8ab383c632d5efd922ab9c33c1609ae738ab..0000000000000000000000000000000000000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/QosCallbackTypeEnum.java +++ /dev/null @@ -1,94 +0,0 @@ -package org.smartboot.mqtt.common.enums; - -/** - * @author qinluo - * @date 2022-04-11 16:30:57 - * @since 1.0.0 - */ -public enum QosCallbackTypeEnum { - - /** - * QosLevel 1. (server -> client) - */ - PUBACK(1, MqttMessageType.PUBACK.value(), 0, null), - - /* - * PUBLISH QosLevel 2 (client -> server) - * - */ - - /** - * QosLevel 2 (server -> client) - */ - PUBREC(2, MqttMessageType.PUBREC.value(), 0, null), - - /** - * QosLevel 2 (client -> server) - */ - PUBREL(3, MqttMessageType.PUBREL.value(), 1, null), - - /** - * QosLevel 2 (server -> client) - */ - PUBCOMP(4, MqttMessageType.PUBCOMP.value(), 0, null), - - ; - - static { - PUBREC.next = PUBREL; - PUBREL.next = PUBCOMP; - } - - private final int type; - private final int msgType; - private final int waitSide; - private QosCallbackTypeEnum next; - - QosCallbackTypeEnum(int type, int msgType, int waitSide, QosCallbackTypeEnum next) { - this.type = type; - this.msgType = msgType; - this.waitSide = waitSide; - this.next = next; - } - - public int getType() { - return type; - } - - public int getMsgType() { - return msgType; - } - - public int getWaitSide() { - return waitSide; - } - - public static QosCallbackTypeEnum getInstance(int type) { - for (QosCallbackTypeEnum instance : values()) { - if (instance.type == type) { - return instance; - } - } - - return null; - } - - public static QosCallbackTypeEnum next(int type, int side) { - QosCallbackTypeEnum instance = getInstance(type); - if (instance == null) { - return null; - } - - instance = instance.next; - - while (instance != null) { - if (instance.waitSide == side) { - return instance; - } - instance = instance.next; - } - - return null; - - } -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java index f1ccc67d3a99e2fb937711b7ef8ef01d1c2471c4..5bf8d6a1ac308c772f6cc1edba424a87b8691a25 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttCodecUtil.java @@ -2,7 +2,6 @@ package org.smartboot.mqtt.common.message; import org.smartboot.mqtt.common.MqttWriter; -import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.socket.util.BufferUtils; import org.smartboot.socket.util.DecoderException; @@ -20,47 +19,6 @@ public final class MqttCodecUtil { private MqttCodecUtil() { } - public static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) { - switch (mqttFixedHeader.getMessageType()) { - case CONNECT: - case CONNACK: - case PUBACK: - case PUBREC: - case PUBCOMP: - case SUBACK: - case UNSUBACK: - case PINGREQ: - case PINGRESP: - case DISCONNECT: - if (mqttFixedHeader.isDup() || - mqttFixedHeader.getQosLevel() != MqttQoS.AT_MOST_ONCE || - mqttFixedHeader.isRetain()) { - return new MqttFixedHeader( - mqttFixedHeader.getMessageType(), - false, - MqttQoS.AT_MOST_ONCE, - false, - mqttFixedHeader.remainingLength()); - } - return mqttFixedHeader; - case PUBREL: - case SUBSCRIBE: - case UNSUBSCRIBE: - if (mqttFixedHeader.isRetain()) { - return new MqttFixedHeader( - mqttFixedHeader.getMessageType(), - mqttFixedHeader.isDup(), - mqttFixedHeader.getQosLevel(), - false, - mqttFixedHeader.remainingLength()); - } - return mqttFixedHeader; - default: - return mqttFixedHeader; - } - } - - /** * 解码变长字节整数,规范1.5.5 */ diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java index 4da49d2d223186a99637de8dc48aa7b53cdbbfeb..4e152214bb2a5f8cd1635c8473e20d8ea11994ca 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java @@ -26,12 +26,20 @@ public class MqttFixedHeader { public static final MqttFixedHeader PUB_REC_HEADER = new MqttFixedHeader(MqttMessageType.PUBREC, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_REL_HEADER = new MqttFixedHeader(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE); public static final MqttFixedHeader PUB_COMP_HEADER = new MqttFixedHeader(MqttMessageType.PUBCOMP, MqttQoS.AT_MOST_ONCE); + public static final MqttFixedHeader SUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, MqttQoS.AT_LEAST_ONCE); public static final MqttFixedHeader SUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.SUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader UNSUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, MqttQoS.AT_LEAST_ONCE); public static final MqttFixedHeader UNSUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PING_REQ_HEADER = new MqttFixedHeader(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PING_RESP_HEADER = new MqttFixedHeader(MqttMessageType.PINGRESP, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader DISCONNECT_HEADER = new MqttFixedHeader(MqttMessageType.DISCONNECT, MqttQoS.AT_MOST_ONCE); + + public static final MqttFixedHeader PUB_QOS0_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false); + public static final MqttFixedHeader PUB_QOS1_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false); + public static final MqttFixedHeader PUB_QOS2_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.EXACTLY_ONCE, false); + + public static final MqttFixedHeader PUB_FAILURE_HEADER = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.FAILURE, false); + private final MqttMessageType messageType; /** * 重发标志 @@ -42,18 +50,16 @@ public class MqttFixedHeader { * 保留标志,是否存储消息 */ private final boolean retain; - private final int remainingLength; - public MqttFixedHeader(MqttMessageType messageType, boolean dup, MqttQoS qosLevel, boolean retain, int remainingLength) { + public MqttFixedHeader(MqttMessageType messageType, boolean dup, MqttQoS qosLevel, boolean retain) { this.messageType = messageType; this.dup = dup; this.qosLevel = qosLevel; this.retain = retain; - this.remainingLength = remainingLength; } public MqttFixedHeader(MqttMessageType messageType, MqttQoS qosLevel) { - this(messageType, false, qosLevel, false, 0); + this(messageType, false, qosLevel, false); } public MqttMessageType getMessageType() { @@ -71,9 +77,4 @@ public class MqttFixedHeader { public boolean isRetain() { return retain; } - - public int remainingLength() { - return remainingLength; - } - } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java index a1abe66761f813b0b2d0b9c1167ec1228be04470..b3c21fb98277d3fbb1b8301a648d62ac518b4197 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttMessage.java @@ -32,6 +32,10 @@ public abstract class MqttMessage extends ToString { */ protected final MqttFixedHeader fixedHeader; protected MqttVersion version; + /** + * 剩余长度 + */ + private int remainingLength; public MqttMessage(MqttFixedHeader mqttFixedHeader) { this.fixedHeader = mqttFixedHeader; @@ -84,6 +88,13 @@ public abstract class MqttMessage extends ToString { return messageId; } + public int getRemainingLength() { + return remainingLength; + } + + public void setRemainingLength(int remainingLength) { + this.remainingLength = remainingLength; + } public final MqttVersion getVersion() { return version; diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java index 6e9ca561bd05e0b3b70d481949a4f2cc05aed31d..ee919235b6373f41d0a137374b041e462d98e6fa 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPubQosMessage.java @@ -28,11 +28,11 @@ public class MqttPubQosMessage extends MqttPacketIdentifierMessage 2) { + if (getRemainingLength() > 2) { reasonCode = buffer.get(); } //如果剩余长度小于4,则表示没有属性长度字段。 - if (fixedHeader.remainingLength() >= 4) { + if (getRemainingLength() >= 4) { ReasonProperties properties = new ReasonProperties(); properties.decode(buffer); header = new MqttPubQosVariableHeader(packetId, properties); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java index 7e32b4f28f3f90393f687485fdc7cab1209188d1..2172884513861d052cde4604a4716dc5279f241f 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttPublishMessage.java @@ -53,7 +53,7 @@ public class MqttPublishMessage extends MqttVariableMessage grantedQos = new ArrayList(); int limit = buffer.limit(); buffer.limit(buffer.position() + payloadLength); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java index eda1761a34624c315f1e89cf5256ff413948d152..2132b532d1dc4ebe3fae0265408a21095420bfbd 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttSubscribeMessage.java @@ -47,7 +47,7 @@ public class MqttSubscribeMessage extends MqttPacketIdentifierMessage subscribeTopics = new ArrayList<>(); - int payloadLength = fixedHeader.remainingLength() - getVariableHeaderLength(); + int payloadLength = getRemainingLength() - getVariableHeaderLength(); ValidateUtils.isTrue(buffer.remaining() >= payloadLength, "数据不足"); int limit = buffer.limit(); buffer.limit(buffer.position() + payloadLength); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java index b8e637e2e9ac7e230c2a57778f2e81fbb7314eb2..53c142bd2bd2822fdb10fa8721ee320ebb29cfd5 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttUnsubscribeMessage.java @@ -48,7 +48,7 @@ public class MqttUnsubscribeMessage extends MqttPacketIdentifierMessage unsubscribeTopics = new ArrayList(); - int payloadLength = fixedHeader.remainingLength() - getVariableHeaderLength(); + int payloadLength = getRemainingLength() - getVariableHeaderLength(); int limit = buffer.limit(); buffer.limit(buffer.position() + payloadLength); while (buffer.hasRemaining()) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java index b52e06b6ec7e9be3167482afec4f8c65e4a3c329..6ba963fa2159b4576ac5887cc38e13b4c6d600ab 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPacketIdVariableHeader.java @@ -11,7 +11,7 @@ public abstract class MqttPacketIdVariableHeader e /** * 报文标识符 */ - private final int packetId; + private int packetId; public MqttPacketIdVariableHeader(int packetId, T properties) { super(properties); @@ -22,4 +22,8 @@ public abstract class MqttPacketIdVariableHeader e public int getPacketId() { return packetId; } + + public void setPacketId(int packetId) { + this.packetId = packetId; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java index 06f5f08321fbac42bf9ec2713a5d429b8be6dddd..3e17d8d8a469520cc23258a04bd8aeeec614f389 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/variable/MqttPublishVariableHeader.java @@ -3,6 +3,7 @@ package org.smartboot.mqtt.common.message.variable; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.message.MqttCodecUtil; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; +import org.smartboot.mqtt.common.util.MqttUtil; import java.io.IOException; @@ -31,7 +32,7 @@ public class MqttPublishVariableHeader extends MqttPacketIdVariableHeader 0 ? 2 : 0; - topicNameBytes = MqttCodecUtil.encodeUTF8(topicName); + topicNameBytes = MqttUtil.encodeCache(topicName); length += topicNameBytes.length; return length; } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java index 968857aaf7377e92aca8ef9b6d0e64d6de7ee49c..0d2e9c241642e7b245aadf2f09dde900480abe34 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java @@ -1,5 +1,7 @@ package org.smartboot.mqtt.common.protocol; +import org.smartboot.mqtt.common.enums.MqttMessageType; +import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.message.MqttDisconnectMessage; @@ -22,6 +24,53 @@ final class MqttMessageFactory { private MqttMessageFactory() { } + public static MqttFixedHeader newMqttFixedHeader(MqttMessageType messageType, boolean dup, int qosLevel, boolean retain) { + switch (messageType) { + case CONNECT: + return MqttFixedHeader.CONNECT_HEADER; + case CONNACK: + return MqttFixedHeader.CONN_ACK_HEADER; + case SUBSCRIBE: + return MqttFixedHeader.SUBSCRIBE_HEADER; + case SUBACK: + return MqttFixedHeader.SUB_ACK_HEADER; + case UNSUBACK: + return MqttFixedHeader.UNSUB_ACK_HEADER; + case UNSUBSCRIBE: + return MqttFixedHeader.UNSUBSCRIBE_HEADER; + case PUBLISH: + if (dup || retain) { + return new MqttFixedHeader(messageType, dup, MqttQoS.valueOf(qosLevel), retain); + } + switch (qosLevel) { + case 0: + return MqttFixedHeader.PUB_QOS0_HEADER; + case 1: + return MqttFixedHeader.PUB_QOS1_HEADER; + case 2: + return MqttFixedHeader.PUB_QOS2_HEADER; + default: + return MqttFixedHeader.PUB_FAILURE_HEADER; + } + case PUBACK: + return MqttFixedHeader.PUB_ACK_HEADER; + case PUBREC: + return MqttFixedHeader.PUB_REC_HEADER; + case PUBREL: + return MqttFixedHeader.PUB_REL_HEADER; + case PUBCOMP: + return MqttFixedHeader.PUB_COMP_HEADER; + case PINGREQ: + return MqttFixedHeader.PING_REQ_HEADER; + case PINGRESP: + return MqttFixedHeader.PING_RESP_HEADER; + case DISCONNECT: + return MqttFixedHeader.DISCONNECT_HEADER; + default: + throw new IllegalArgumentException("unknown message type: " + messageType); + } + } + public static MqttMessage newMessage(MqttFixedHeader mqttFixedHeader) { switch (mqttFixedHeader.getMessageType()) { case CONNECT: diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java index 7ccc5c87444879c6aa791bcdbda56c90131b4f69..2ac415e2561dacb3868abeeec07b00ae5022a30b 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java @@ -3,9 +3,7 @@ package org.smartboot.mqtt.common.protocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; -import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; -import org.smartboot.mqtt.common.message.MqttCodecUtil; import org.smartboot.mqtt.common.message.MqttFixedHeader; import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.util.ValidateUtils; @@ -83,27 +81,19 @@ public class MqttProtocol implements Protocol { } buffer.mark(); - MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength); - MqttCodecUtil.resetUnusedFields(mqttFixedHeader); -// switch (mqttFixedHeader.getMessageType()) { -// case PUBREL: -// case SUBSCRIBE: -// case UNSUBSCRIBE: -// if (mqttFixedHeader.getQosLevel() != MqttQoS.AT_LEAST_ONCE) { -// throw new DecoderException(mqttFixedHeader.getMessageType().name() + " message must have QoS 1"); -// } -// } - unit.mqttMessage = MqttMessageFactory.newMessage(mqttFixedHeader); - //非MqttConnectMessage对象为null, - if (unit.mqttMessage.getVersion() == null) { - unit.mqttMessage.setVersion(attachment.get(MQTT_VERSION_ATTACH_KEY)); - } + MqttFixedHeader mqttFixedHeader = MqttMessageFactory.newMqttFixedHeader(messageType, dupFlag, qosLevel, retain); + unit.mqttMessage = MqttMessageFactory.newMessage(mqttFixedHeader); + unit.mqttMessage.setRemainingLength(remainingLength); + //非MqttConnectMessage对象为null, + if (unit.mqttMessage.getVersion() == null) { + unit.mqttMessage.setVersion(attachment.get(MQTT_VERSION_ATTACH_KEY)); + } - unit.state = READ_VARIABLE_HEADER; + unit.state = READ_VARIABLE_HEADER; } case READ_VARIABLE_HEADER: { - int remainingLength = unit.mqttMessage.getFixedHeader().remainingLength(); + int remainingLength = unit.mqttMessage.getRemainingLength(); if (remainingLength > maxBytesInMessage) { throw new DecoderException("too large message: " + remainingLength + " bytes"); } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java index 42428f63bac43eafcf1251a7aa9f4ecd51383615..de90f3edd7bca7bb9afc25f0a1ad53993a0d6cd4 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttUtil.java @@ -3,8 +3,11 @@ package org.smartboot.mqtt.common.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; +import org.smartboot.mqtt.common.message.MqttCodecUtil; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; /** * @author 三刀(zhengjunweimail@163.com) @@ -17,6 +20,8 @@ public class MqttUtil { */ private static final char[] TOPIC_WILDCARDS = {'#', '+'}; + private static final Map cache = new ConcurrentHashMap<>(); + public static boolean containsTopicWildcards(String topicName) { for (char c : TOPIC_WILDCARDS) { if (topicName.indexOf(c) >= 0) { @@ -38,4 +43,8 @@ public class MqttUtil { return ""; } } + + public static byte[] encodeCache(String topicName) { + return cache.computeIfAbsent(topicName, s -> MqttCodecUtil.encodeUTF8(topicName)); + } }