From 617707f53e691aa9800b77b5470728b8dba9f7f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 21 Apr 2023 22:39:32 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E5=88=86=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smart-mqtt-broker/pom.xml | 2 +- .../mqtt/broker/BrokerConfigure.java | 2 +- .../broker/processor/ConnectProcessor.java | 2 +- smart-mqtt-client/pom.xml | 2 +- .../org/smartboot/mqtt/client/MqttClient.java | 2 +- smart-mqtt-common/pom.xml | 2 +- .../mqtt/common/AbstractSession.java | 3 +- .../smartboot/mqtt/common/InflightQueue.java | 38 ++++++++++++++----- .../mqtt/common/message/MqttFixedHeader.java | 9 ++++- .../common/protocol/MqttMessageFactory.java | 6 +-- 10 files changed, 48 insertions(+), 20 deletions(-) diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index 720ebb4d..23847762 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.18 + 0.19 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index 223c0e2b..f26c8480 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -38,7 +38,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.18"; + public static final String VERSION = "v0.19"; static final Map SystemEnvironments = new HashMap<>(); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index 1710700d..a0b3bf15 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -83,7 +83,7 @@ public class ConnectProcessor implements MqttProcessor { } else { receiveMaximum = context.getBrokerConfigure().getMaxInflight(); } - session.setInflightQueue(new InflightQueue(session, receiveMaximum)); + session.setInflightQueue(new InflightQueue(session, receiveMaximum,false)); //如果服务端收到清理会话(CleanSession)标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0 之外, // 还必须将 CONNACK 报文中的当前会话设置(Session Present)标志为 0。 diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index 6350be7a..1da20a7d 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.18 + 0.19 ../pom.xml 4.0.0 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index a8688d27..dab09140 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -164,7 +164,7 @@ public class MqttClient extends AbstractSession { //连接成功,注册订阅消息 if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(this, 16)); + setInflightQueue(new InflightQueue(this, 16, true)); connected = true; consumeTask(); //重连情况下重新触发订阅逻辑 diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index c8d8bc1c..60c42c94 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.18 + 0.19 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 69e2ddc1..33093da2 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -81,6 +81,7 @@ public abstract class AbstractSession { //重新发送subscribe或unSubscribe消息 QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> { if (!ackMessage.isCommit()) { + mqttMessage.getFixedHeader().setDup(true); write(mqttMessage, consumer); } }, 1, TimeUnit.SECONDS); @@ -100,7 +101,7 @@ public abstract class AbstractSession { qosMessage.setCommit(true); qosMessage.getConsumer().accept(message); } else { - LOGGER.info("message is null"); + LOGGER.info("message is null," + message); } } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index af08078a..6edc9673 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttVariableMessage; @@ -48,12 +49,13 @@ public class InflightQueue { private final AbstractSession session; private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); + private final boolean client; - - public InflightQueue(AbstractSession session, int size) { + public InflightQueue(AbstractSession session, int size, boolean client) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; this.session = session; + this.client = client; } public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { @@ -121,7 +123,9 @@ public class InflightQueue { switch (inflightMessage.getExpectMessageType()) { case PUBACK: case PUBREC: - session.write(inflightMessage.getOriginalMessage()); + MqttMessage mqttMessage = inflightMessage.getOriginalMessage(); + mqttMessage.getFixedHeader().setDup(true); + session.write(mqttMessage); break; case PUBCOMP: ReasonProperties properties = null; @@ -131,6 +135,7 @@ public class InflightQueue { MqttVariableMessage message = inflightMessage.getOriginalMessage(); MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); + pubRelMessage.getFixedHeader().setDup(true); session.write(pubRelMessage); break; default: @@ -148,19 +153,24 @@ public class InflightQueue { */ public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type"); - ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); - inflightMessage.setResponseMessage(message); - inflightMessage.setLatestTime(System.currentTimeMillis()); switch (message.getFixedHeader().getMessageType()) { case SUBACK: case UNSUBACK: case PUBACK: case PUBCOMP: { + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type :" + message + "expect message type:" + inflightMessage.getExpectMessageType()); + ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); + inflightMessage.setResponseMessage(message); + inflightMessage.setLatestTime(System.currentTimeMillis()); commit(inflightMessage); break; } case PUBREC: + boolean dup = inflightMessage.getExpectMessageType() == MqttMessageType.PUBCOMP; + ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType() || dup, "invalid message type :" + message + "expect message type:" + inflightMessage.getExpectMessageType()); + ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); + inflightMessage.setResponseMessage(message); + inflightMessage.setLatestTime(System.currentTimeMillis()); inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP); //todo ReasonProperties properties = null; @@ -169,6 +179,9 @@ public class InflightQueue { } MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); + if (dup) { + pubRelMessage.getFixedHeader().setDup(true); + } session.write(pubRelMessage, false); break; default: @@ -189,10 +202,14 @@ public class InflightQueue { if (takeIndex == queue.length) { takeIndex = 0; } - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + if (client) { + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + } while (count > 0 && queue[takeIndex].isCommit()) { inflightMessage = queue[takeIndex]; - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + if (client) { + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + } queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; @@ -206,6 +223,9 @@ public class InflightQueue { InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } + if (!client) { + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + } while (count < queue.length) { Runnable runnable = runnables.poll(); if (runnable != null) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java index 68147763..cf0c3a35 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttFixedHeader.java @@ -36,10 +36,13 @@ public class MqttFixedHeader extends ToString { public static final MqttFixedHeader PUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.PUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_REC_HEADER = new MqttFixedHeader(MqttMessageType.PUBREC, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PUB_REL_HEADER = new MqttFixedHeader(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE); + public static final MqttFixedHeader PUB_REL_HEADER_DUP = new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_LEAST_ONCE, false); public static final MqttFixedHeader PUB_COMP_HEADER = new MqttFixedHeader(MqttMessageType.PUBCOMP, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader SUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, MqttQoS.AT_LEAST_ONCE); + public static final MqttFixedHeader SUBSCRIBE_HEADER_DUP = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, true, MqttQoS.AT_LEAST_ONCE, false); public static final MqttFixedHeader SUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.SUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader UNSUBSCRIBE_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, MqttQoS.AT_LEAST_ONCE); + public static final MqttFixedHeader UNSUBSCRIBE_HEADER_DUP = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, true, MqttQoS.AT_LEAST_ONCE, false); public static final MqttFixedHeader UNSUB_ACK_HEADER = new MqttFixedHeader(MqttMessageType.UNSUBACK, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PING_REQ_HEADER = new MqttFixedHeader(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE); public static final MqttFixedHeader PING_RESP_HEADER = new MqttFixedHeader(MqttMessageType.PINGRESP, MqttQoS.AT_MOST_ONCE); @@ -55,7 +58,7 @@ public class MqttFixedHeader extends ToString { /** * 重发标志 */ - private final boolean dup; + private boolean dup; private final MqttQoS qosLevel; /** * 保留标志,是否存储消息 @@ -81,6 +84,10 @@ public class MqttFixedHeader extends ToString { return dup; } + public void setDup(boolean dup) { + this.dup = dup; + } + public MqttQoS getQosLevel() { return qosLevel; } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java index 7d11ade5..4734a4dd 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttMessageFactory.java @@ -41,13 +41,13 @@ final class MqttMessageFactory { case CONNACK: return MqttFixedHeader.CONN_ACK_HEADER; case SUBSCRIBE: - return MqttFixedHeader.SUBSCRIBE_HEADER; + return dup ? MqttFixedHeader.SUBSCRIBE_HEADER_DUP : MqttFixedHeader.SUBSCRIBE_HEADER; case SUBACK: return MqttFixedHeader.SUB_ACK_HEADER; case UNSUBACK: return MqttFixedHeader.UNSUB_ACK_HEADER; case UNSUBSCRIBE: - return MqttFixedHeader.UNSUBSCRIBE_HEADER; + return dup ? MqttFixedHeader.UNSUBSCRIBE_HEADER_DUP : MqttFixedHeader.UNSUBSCRIBE_HEADER; case PUBLISH: if (dup || retain) { return new MqttFixedHeader(messageType, dup, MqttQoS.valueOf(qosLevel), retain); @@ -67,7 +67,7 @@ final class MqttMessageFactory { case PUBREC: return MqttFixedHeader.PUB_REC_HEADER; case PUBREL: - return MqttFixedHeader.PUB_REL_HEADER; + return dup ? MqttFixedHeader.PUB_REL_HEADER_DUP : MqttFixedHeader.PUB_REL_HEADER; case PUBCOMP: return MqttFixedHeader.PUB_COMP_HEADER; case PINGREQ: -- Gitee From f532a65c71a1889a448dbd916b9c3a015f8283cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 22 Apr 2023 14:01:13 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E5=88=86=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/MqttBrokerMessageProcessor.java | 5 +- .../broker/processor/MqttAckProcessor.java | 2 +- .../broker/processor/PubRelProcessor.java | 43 +++++++++++++++++ .../broker/processor/PublishProcessor.java | 23 +-------- .../mqtt/client/MqttClientProcessor.java | 3 +- .../client/processor/MqttAckProcessor.java | 2 +- .../client/processor/PubRelProcessor.java | 43 +++++++++++++++++ .../client/processor/PublishProcessor.java | 17 +------ .../mqtt/common/AbstractSession.java | 48 ++++--------------- .../smartboot/mqtt/common/InflightQueue.java | 26 ++++++---- 10 files changed, 120 insertions(+), 92 deletions(-) create mode 100644 smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java create mode 100644 smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index 773d7036..c5911efc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -17,6 +17,7 @@ import org.smartboot.mqtt.broker.processor.DisConnectProcessor; import org.smartboot.mqtt.broker.processor.MqttAckProcessor; import org.smartboot.mqtt.broker.processor.MqttProcessor; import org.smartboot.mqtt.broker.processor.PingReqProcessor; +import org.smartboot.mqtt.broker.processor.PubRelProcessor; import org.smartboot.mqtt.broker.processor.PublishProcessor; import org.smartboot.mqtt.broker.processor.SubscribeProcessor; import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; @@ -37,6 +38,7 @@ import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.extension.plugins.RateLimiterPlugin; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; import org.smartboot.socket.util.Attachment; @@ -68,10 +70,11 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor()); - processorMap.put(MqttPubRelMessage.class, new MqttAckProcessor<>()); + processorMap.put(MqttPubRelMessage.class, new PubRelProcessor()); processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor<>()); processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor()); + addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512)); } public MqttBrokerMessageProcessor(BrokerContext mqttContext) { diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java index 22d8ee88..6196f641 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java @@ -21,6 +21,6 @@ import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; public class MqttAckProcessor extends AuthorizedMqttProcessor { @Override public void process0(BrokerContext context, MqttSession session, T t) { - session.notifyResponse(t); + session.getInflightQueue().notify(t); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java new file mode 100644 index 00000000..7b8573e2 --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PubRelProcessor.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.processor; + +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.common.message.MqttPubCompMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; +import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 4/22/23 + */ +public class PubRelProcessor extends AuthorizedMqttProcessor { + @Override + public void process0(BrokerContext context, MqttSession session, MqttPubRelMessage message) { + //发送pubRel消息。 + //todo + MqttPubQosVariableHeader qosVariableHeader; + //todo + byte code = 0; + if (code != 0) { + ReasonProperties properties = new ReasonProperties(); + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); + qosVariableHeader.setReasonCode(code); + } else { + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), null); + } + MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); + session.write(pubRelMessage, false); + session.notifyPubComp(message.getVariableHeader().getPacketId()); + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java index eaf5f6f1..bdd518f1 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/PublishProcessor.java @@ -21,13 +21,10 @@ import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttReasonCode; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; -import org.smartboot.mqtt.common.util.ValidateUtils; /** * 发布Topic @@ -112,24 +109,6 @@ public class PublishProcessor extends AuthorizedMqttProcessor { - ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message"); - //发送pubRel消息。 - //todo - MqttPubQosVariableHeader qosVariableHeader; - //todo - byte code = 0; - if (code != 0) { - ReasonProperties properties = new ReasonProperties(); - qosVariableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), properties); - qosVariableHeader.setReasonCode(code); - } else { - qosVariableHeader = new MqttPubQosVariableHeader(mqttPublishMessage.getVariableHeader().getPacketId(), null); - } - MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); - session.write(pubRelMessage, false); - // 消息投递至消息总线 - context.getEventBus().publish(ServerEventType.RECEIVE_PUBLISH_MESSAGE, EventObject.newEventObject(session, mqttPublishMessage)); - }); + session.write(pubRecMessage, () -> context.getEventBus().publish(ServerEventType.RECEIVE_PUBLISH_MESSAGE, EventObject.newEventObject(session, mqttPublishMessage))); } } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java index cb87d9b0..b15876b6 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java @@ -16,6 +16,7 @@ import org.smartboot.mqtt.client.processor.ConnAckProcessor; import org.smartboot.mqtt.client.processor.MqttAckProcessor; import org.smartboot.mqtt.client.processor.MqttPingRespProcessor; import org.smartboot.mqtt.client.processor.MqttProcessor; +import org.smartboot.mqtt.client.processor.PubRelProcessor; import org.smartboot.mqtt.client.processor.PublishProcessor; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; @@ -50,7 +51,7 @@ public class MqttClientProcessor extends AbstractMessageProcessor { processors.put(MqttPublishMessage.class, new PublishProcessor()); processors.put(MqttPubRecMessage.class, new MqttAckProcessor()); processors.put(MqttPubCompMessage.class, new MqttAckProcessor()); - processors.put(MqttPubRelMessage.class, new MqttAckProcessor()); + processors.put(MqttPubRelMessage.class, new PubRelProcessor()); processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java index cb398df7..302f4fee 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/MqttAckProcessor.java @@ -20,6 +20,6 @@ import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; public class MqttAckProcessor implements MqttProcessor { @Override public void process(MqttClient mqttClient, T message) { - mqttClient.notifyResponse(message); + mqttClient.getInflightQueue().notify(message); } } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java new file mode 100644 index 00000000..fd8dfb9e --- /dev/null +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PubRelProcessor.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.client.processor; + +import org.smartboot.mqtt.client.MqttClient; +import org.smartboot.mqtt.common.message.MqttPubCompMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; +import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; +import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 4/22/23 + */ +public class PubRelProcessor implements MqttProcessor { + + @Override + public void process(MqttClient mqttClient, MqttPubRelMessage message) { + //发送pubRel消息。 + //todo + MqttPubQosVariableHeader qosVariableHeader; + //todo + byte code = 0; + if (code != 0) { + ReasonProperties properties = new ReasonProperties(); + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); + qosVariableHeader.setReasonCode(code); + } else { + qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), null); + } + MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); + mqttClient.write(pubRelMessage, false); + mqttClient.notifyPubComp(message.getVariableHeader().getPacketId()); + } +} diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java index f4559f9c..3b053b07 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java @@ -18,15 +18,12 @@ import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; import org.smartboot.mqtt.common.util.MqttUtil; -import org.smartboot.mqtt.common.util.ValidateUtils; /** * 发布Topic @@ -104,19 +101,7 @@ public class PublishProcessor implements MqttProcessor { MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(messageId, properties); MqttPubRecMessage pubRecMessage = new MqttPubRecMessage(variableHeader); - session.write(pubRecMessage, message -> { - ValidateUtils.isTrue(message instanceof MqttPubRelMessage, "invalid message"); - //todo - ReasonProperties reasonProperties = null; - if (mqttPublishMessage.getVersion() == MqttVersion.MQTT_5) { - reasonProperties = new ReasonProperties(); - } - MqttPubQosVariableHeader qosVariableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), reasonProperties); - MqttPubCompMessage pubRelMessage = new MqttPubCompMessage(qosVariableHeader); - session.write(pubRelMessage, false); - - processPublishMessage(mqttPublishMessage, session); - }); + session.write(pubRecMessage, () -> processPublishMessage(mqttPublishMessage, session)); } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 33093da2..8c7d78a6 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -12,29 +12,21 @@ package org.smartboot.mqtt.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttSubscribeMessage; -import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; -import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.protocol.MqttProtocol; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.transport.AioSession; import org.smartboot.socket.util.Attachment; -import org.smartboot.socket.util.QuickTimerTask; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; /** * @author 三刀(zhengjunweimail@163.com) @@ -63,49 +55,25 @@ public abstract class AbstractSession { private MqttVersion mqttVersion; private InflightQueue inflightQueue; - private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); + private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); public AbstractSession(EventBus eventBus) { this.eventBus = eventBus; } - public final void write(MqttPacketIdentifierMessage mqttMessage, Consumer> consumer) { - QosMessage ackMessage = new QosMessage(mqttMessage, consumer); - switch (mqttMessage.getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - ValidateUtils.isTrue(mqttMessage instanceof MqttPubRecMessage, "invalid message instance"); - //超时移除即可, - break; - case AT_LEAST_ONCE: - ValidateUtils.isTrue(mqttMessage instanceof MqttSubscribeMessage || mqttMessage instanceof MqttUnsubscribeMessage, "invalid message instance"); - //重新发送subscribe或unSubscribe消息 - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(() -> { - if (!ackMessage.isCommit()) { - mqttMessage.getFixedHeader().setDup(true); - write(mqttMessage, consumer); - } - }, 1, TimeUnit.SECONDS); - default: - throw new UnsupportedOperationException(); - } - ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), ackMessage); + public final void write(MqttPubRecMessage mqttMessage, Runnable callback) { + ackMessageCacheMap.put(mqttMessage.getVariableHeader().getPacketId(), callback); write(mqttMessage, false); } - public final void notifyResponse(MqttPacketIdentifierMessage message) { - if (message.getFixedHeader().getMessageType() != MqttMessageType.PUBREL) { - inflightQueue.notify(message); - } else { - QosMessage qosMessage = ackMessageCacheMap.remove(message.getVariableHeader().getPacketId()); - if (qosMessage != null) { - qosMessage.setCommit(true); - qosMessage.getConsumer().accept(message); - } else { - LOGGER.info("message is null," + message); - } + public final void notifyPubComp(int packetId) { + Runnable consumer = ackMessageCacheMap.remove(packetId); + if (consumer != null) { + consumer.run(); } } + public final synchronized void write(MqttMessage mqttMessage, boolean autoFlush) { try { if (disconnect) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 6edc9673..c1494fe1 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -119,13 +119,14 @@ public class InflightQueue { return; } inflightMessage.setLatestTime(System.currentTimeMillis()); - LOGGER.info("message:{} time out,retry...", inflightMessage.getOriginalMessage().getFixedHeader()); + LOGGER.info("message:{} time out,retry...", inflightMessage.getExpectMessageType()); switch (inflightMessage.getExpectMessageType()) { case PUBACK: case PUBREC: MqttMessage mqttMessage = inflightMessage.getOriginalMessage(); mqttMessage.getFixedHeader().setDup(true); session.write(mqttMessage); + System.out.println("relPublish.."); break; case PUBCOMP: ReasonProperties properties = null; @@ -158,17 +159,21 @@ public class InflightQueue { case UNSUBACK: case PUBACK: case PUBCOMP: { - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType(), "invalid message type :" + message + "expect message type:" + inflightMessage.getExpectMessageType()); - ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); + if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) { + System.out.println("maybe dup ack,ignore:" + message.getFixedHeader().getMessageType()); + break; + } inflightMessage.setResponseMessage(message); inflightMessage.setLatestTime(System.currentTimeMillis()); commit(inflightMessage); break; } case PUBREC: - boolean dup = inflightMessage.getExpectMessageType() == MqttMessageType.PUBCOMP; - ValidateUtils.isTrue(message.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType() || dup, "invalid message type :" + message + "expect message type:" + inflightMessage.getExpectMessageType()); - ValidateUtils.isTrue(message.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message packetId " + message.getVariableHeader().getPacketId() + " " + inflightMessage.getAssignedPacketId()); + //说明此前出现过重复publish,切已经收到过REC,并发送过REL消息 + if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) { + System.out.println("maybe dup pubRec,ignore"); + break; + } inflightMessage.setResponseMessage(message); inflightMessage.setLatestTime(System.currentTimeMillis()); inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP); @@ -179,13 +184,14 @@ public class InflightQueue { } MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); - if (dup) { - pubRelMessage.getFixedHeader().setDup(true); - } session.write(pubRelMessage, false); + if ((inflightMessage.getAssignedPacketId() - 1) % queue.length == takeIndex) { + Attachment attachment = session.session.getAttachment(); + attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(inflightMessage)); + } break; default: - throw new RuntimeException(); + throw new RuntimeException(message.toString()); } } -- Gitee From e38c84e61910a4f20dc0c66ab55803a8b6796647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 22 Apr 2023 16:31:09 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E5=88=86=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dashboard/components.d.ts | 2 ++ dashboard/vite.config.ts | 3 ++- docker-compose.yml | 2 +- pom.xml | 4 ++-- .../org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java | 3 +-- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dashboard/components.d.ts b/dashboard/components.d.ts index a3e44469..990f1edb 100644 --- a/dashboard/components.d.ts +++ b/dashboard/components.d.ts @@ -10,6 +10,7 @@ declare module '@vue/runtime-core' { LayAffix: typeof import('@layui/layui-vue')['LayAffix'] LayAvatar: typeof import('@layui/layui-vue')['LayAvatar'] LayBadge: typeof import('@layui/layui-vue')['LayBadge'] + LayBarcode: typeof import('@layui/layui-vue')['LayBarcode'] LayBody: typeof import('@layui/layui-vue')['LayBody'] LayBreadcrumb: typeof import('@layui/layui-vue')['LayBreadcrumb'] LayBreadcrumbItem: typeof import('@layui/layui-vue')['LayBreadcrumbItem'] @@ -39,6 +40,7 @@ declare module '@vue/runtime-core' { LayMenuItem: typeof import('@layui/layui-vue')['LayMenuItem'] LayPanel: typeof import('@layui/layui-vue')['LayPanel'] LayProgress: typeof import('@layui/layui-vue')['LayProgress'] + LayQrcode: typeof import('@layui/layui-vue')['LayQrcode'] LayResult: typeof import('@layui/layui-vue')['LayResult'] LayRow: typeof import('@layui/layui-vue')['LayRow'] LayScroll: typeof import('@layui/layui-vue')['LayScroll'] diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts index b3fba418..e13d05e7 100644 --- a/dashboard/vite.config.ts +++ b/dashboard/vite.config.ts @@ -11,7 +11,8 @@ export default defineConfig({ server:{ proxy:{ '/api': { - target: 'http://127.0.0.1:18083/api/', + // target: 'http://127.0.0.1:18083/api/', + target: 'http://82.157.162.230:8083/api/', changeOrigin: true, rewrite: path => path.replace(/^\/api/, '') } diff --git a/docker-compose.yml b/docker-compose.yml index 6e8aacf4..530ba050 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,6 @@ services: options: max-size: "100m" max-file: "1" - command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe + command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe # command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file diff --git a/pom.xml b/pom.xml index 55e604a7..8e5c86a0 100644 --- a/pom.xml +++ b/pom.xml @@ -4,12 +4,12 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.18 + 0.19 4.0.0 mqtt broker - 0.18 + 0.19 1.5.26 1.1.22 2.6 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index c5911efc..e0239b09 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -38,7 +38,6 @@ import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.socket.StateMachineEnum; -import org.smartboot.socket.extension.plugins.RateLimiterPlugin; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; import org.smartboot.socket.util.Attachment; @@ -74,7 +73,7 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor()); processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor()); - addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512)); +// addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512)); } public MqttBrokerMessageProcessor(BrokerContext mqttContext) { -- Gitee From 334add4e57fa9929a94f957f4a64e94150ab4b6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 22 Apr 2023 16:56:15 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E5=88=86=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/mqtt/common/InflightQueue.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index c1494fe1..8ad01f77 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -97,7 +97,7 @@ public class InflightQueue { /** * 超时重发 */ - void retry(InflightMessage inflightMessage) { + private void retry(InflightMessage inflightMessage) { if (inflightMessage.isCommit() || session.isDisconnect()) { return; } @@ -126,7 +126,6 @@ public class InflightQueue { MqttMessage mqttMessage = inflightMessage.getOriginalMessage(); mqttMessage.getFixedHeader().setDup(true); session.write(mqttMessage); - System.out.println("relPublish.."); break; case PUBCOMP: ReasonProperties properties = null; @@ -160,7 +159,7 @@ public class InflightQueue { case PUBACK: case PUBCOMP: { if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) { - System.out.println("maybe dup ack,ignore:" + message.getFixedHeader().getMessageType()); +// System.out.println("maybe dup ack,ignore:" + message.getFixedHeader().getMessageType()); break; } inflightMessage.setResponseMessage(message); @@ -171,7 +170,7 @@ public class InflightQueue { case PUBREC: //说明此前出现过重复publish,切已经收到过REC,并发送过REL消息 if (message.getFixedHeader().getMessageType() != inflightMessage.getExpectMessageType() || message.getVariableHeader().getPacketId() != inflightMessage.getAssignedPacketId()) { - System.out.println("maybe dup pubRec,ignore"); +// System.out.println("maybe dup pubRec,ignore"); break; } inflightMessage.setResponseMessage(message); @@ -185,10 +184,6 @@ public class InflightQueue { MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties); MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader); session.write(pubRelMessage, false); - if ((inflightMessage.getAssignedPacketId() - 1) % queue.length == takeIndex) { - Attachment attachment = session.session.getAttachment(); - attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(inflightMessage)); - } break; default: throw new RuntimeException(message.toString()); -- Gitee From 18d346eb308518424f29e1653372f32021459c08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 22 Apr 2023 16:57:46 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E8=BF=98=E5=8E=9F=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/broker/processor/ConnectProcessor.java | 2 +- .../org/smartboot/mqtt/client/MqttClient.java | 2 +- .../org/smartboot/mqtt/common/InflightQueue.java | 16 ++++------------ 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java index a0b3bf15..1710700d 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java @@ -83,7 +83,7 @@ public class ConnectProcessor implements MqttProcessor { } else { receiveMaximum = context.getBrokerConfigure().getMaxInflight(); } - session.setInflightQueue(new InflightQueue(session, receiveMaximum,false)); + session.setInflightQueue(new InflightQueue(session, receiveMaximum)); //如果服务端收到清理会话(CleanSession)标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0 之外, // 还必须将 CONNACK 报文中的当前会话设置(Session Present)标志为 0。 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index dab09140..a8688d27 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -164,7 +164,7 @@ public class MqttClient extends AbstractSession { //连接成功,注册订阅消息 if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { - setInflightQueue(new InflightQueue(this, 16, true)); + setInflightQueue(new InflightQueue(this, 16)); connected = true; consumeTask(); //重连情况下重新触发订阅逻辑 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 8ad01f77..992eac6e 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -49,13 +49,11 @@ public class InflightQueue { private final AbstractSession session; private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); - private final boolean client; - public InflightQueue(AbstractSession session, int size, boolean client) { + public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); this.queue = new InflightMessage[size]; this.session = session; - this.client = client; } public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { @@ -203,14 +201,10 @@ public class InflightQueue { if (takeIndex == queue.length) { takeIndex = 0; } - if (client) { - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); - } + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); while (count > 0 && queue[takeIndex].isCommit()) { inflightMessage = queue[takeIndex]; - if (client) { - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); - } + inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; @@ -224,9 +218,7 @@ public class InflightQueue { InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } - if (!client) { - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); - } + while (count < queue.length) { Runnable runnable = runnables.poll(); if (runnable != null) { -- Gitee From fe9642a53bf5884941f64f97d4ef41b0217be675 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 22 Apr 2023 17:52:58 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/client/MqttClientBootstrap.java | 18 ++++++++++-------- .../mqtt/common/util/MqttMessageBuilders.java | 3 --- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java index e29e0080..e7a7de8c 100644 --- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java +++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java @@ -31,20 +31,22 @@ public class MqttClientBootstrap { //订阅主题 client.subscribe("test", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> { System.out.println("subscribe message:" + new String(publishMessage.getPayload().getPayload())); + }, (mqttClient, mqttQoS) -> { + //最多分发一次 + client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); + //至少分发一次 + client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); + //只分发一次 + client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); }); client.subscribe("test/#", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> { System.out.println("subscribe test/# message:" + new String(publishMessage.getPayload().getPayload())); + }, (mqttClient, mqttQoS) -> { + //只分发一次 + client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); }); - //最多分发一次 - client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); - //至少分发一次 - client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); - //只分发一次 - client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); - //只分发一次 - client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId)); } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java index 7f7518ba..e88fd486 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java @@ -110,9 +110,6 @@ public final class MqttMessageBuilders { public MqttPublishMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained); - if (qos != MqttQoS.AT_LEAST_ONCE && qos != MqttQoS.EXACTLY_ONCE) { - packetId = -packetId; - } MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(packetId, topic, publishProperties); return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload); } -- Gitee From 2abc75687db29bed4b099f3ef4285cf720f1a954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 22 Apr 2023 18:50:23 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 530ba050..66659d7a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,5 +54,5 @@ services: max-size: "100m" max-file: "1" command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe -# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=1000 -Dqos=2 -Dcount=3 -Dpayload=128 org.smartboot.bench.mqtt.Publish +# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file -- Gitee