# jmqx **Repository Path**: istyle/jmqx ## Basic Information - **Project Name**: jmqx - **Description**: Another Open Source MQTT Broker (high-performance, scalable, cluster-supported based on reactor-netty) - **Primary Language**: Java - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-12-09 - **Last Updated**: 2025-12-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Jmqx 开源的轻量级 MQTT 消息代理 Broker Jmqx lightweight MQTT Broker Jmqx 是在 [SMQTT 1.x](https://github.com/quickmsg/smqtt) 基础上的重构版本,主要是对 MQTT 实现部分的重构,并修正了一些问题,SMQTT 是作为一个独立的完整应用,而 Jmqx 的目标是作为一个库栈嵌入到用户应用中,为用户应用提供 MQTT 设备接入能力,实现自己的物联网平台,在此感谢 SMQTT 作者开源了那么优秀的项目。 ## 相同点: 1. 支持 MQTT v3.1、v3.1.1、v5 协议(v5不完整实现) 2. 支持集群 ## 不同点: 1. 不提供消息持久化(只提供内存实现,若需要可参考 SMQTT 或 SMQTTX 的实现) 2. 不提供规则引擎 3. 不提供读取配置文件(参考 jmqx-example 注入配置) ## 改进点: 1. 可同时支持 MQTT、MQTTS、MQTT-WS、MQTT-WSS 端口监听,方便不同需求的设备接入 2. 提供构造器注入用户自定义设备鉴权管理、主题访问控制管理、设备生命周期监听模块,方便与spring boot等框架集成 3. 方便作为库栈嵌入用户应用(参考 jmqx-example) 4. 修复了订阅消息QoS、Retained状态错误、Retain消息重发、消息顺序异常等问题 ## 使用示例 - 单元测试方式启动:单节点 引入依赖 ```xml plus.jmqx.iot jmqx-broker 1.4.1 ``` 编写测试用例 ```java /** * MQTT Broker 测试用例 */ @Slf4j class BootstrapTest { @Test void brokerTest() throws Exception { // 日志配置(可选) LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); loggerContext.getLogger("root").setLevel(Level.INFO); loggerContext.getLogger("reactor.netty").setLevel(Level.INFO); loggerContext.getLogger("plus.jmqx.broker").setLevel(Level.INFO); loggerContext.getLogger("plus.jmqx.broker.mqtt.message.impl").setLevel(Level.DEBUG); // 构建配置信息 MqttConfiguration config = new MqttConfiguration(); // 设置启用SSL(可选) config.setSslEnable(true); config.setSslCa(Objects.requireNonNull(BootstrapTest.class.getResource("/ca.crt")).getPath()); config.setSslCrt(Objects.requireNonNull(BootstrapTest.class.getResource("/server.crt")).getPath()); config.setSslKey(Objects.requireNonNull(BootstrapTest.class.getResource("/server.key")).getPath()); // 构建器注入配置信息(必选)及设备生命周期订阅器(可选) Bootstrap bootstrap = new Bootstrap(config, new PlatformDispatcher() { @Override public Mono onConnect(ConnectMessage message) { return Mono.fromRunnable(() -> { log.info("设备上线:{}", message); }); } @Override public Mono onDisconnect(DisconnectMessage message) { return Mono.fromRunnable(() -> { log.info("设备断开连接:{}", message); }); } @Override public Mono onConnectionLost(ConnectionLostMessage message) { return Mono.fromRunnable(() -> { log.info("设备离线:{}", message); }); } @Override public Mono onPublish(PublishMessage message) { return Mono.fromRunnable(() -> { log.info("设备上报消息:PublishMessage(clientId={}, username={}, topic={}, payload={})", message.getClientId(), message.getUsername(), message.getTopic(), new String(message.getPayload(), StandardCharsets.UTF_8)); }); } }); bootstrap.start().block(); Thread.sleep(3600 * 1000); bootstrap.shutdown(); } } ``` 启动测试用例控制台输出 ```shell 14:13:53.734 [jmqx-event-loop-select-nio-2] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt broker start success host 0:0:0:0:0:0:0:0 port 1883 14:13:53.901 [jmqx-event-loop-select-nio-3] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtts broker start success host 0:0:0:0:0:0:0:0 port 8883 14:13:53.902 [jmqx-event-loop-select-nio-4] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt-ws broker start success host 0:0:0:0:0:0:0:0 port 1884 14:13:53.906 [jmqx-event-loop-select-nio-5] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt-wss broker start success host 0:0:0:0:0:0:0:0 port 8884 ``` - 单元测试方式启动:本机集群 引入依赖 ```xml plus.jmqx.iot jmqx-cluster 1.4.1 ``` 编写测试用例 ```java /** * 集群测试用例 */ @Slf4j public class BootstrapTest { @Test void cluster01() throws Exception { LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); loggerContext.getLogger("root").setLevel(Level.INFO); loggerContext.getLogger("plus.jmqx.broker").setLevel(Level.INFO); loggerContext.getLogger("reactor.netty").setLevel(Level.INFO); MqttConfiguration config = new MqttConfiguration(); config.getClusterConfig().setEnable(true); config.getClusterConfig().setUrl("127.0.0.1:7771,127.0.0.1:7772"); config.getClusterConfig().setPort(7771); config.getClusterConfig().setNode("node-1"); config.getClusterConfig().setNamespace("jmqx"); Bootstrap bootstrap = new Bootstrap(config); bootstrap.start().block(); Thread.sleep(3600 * 1000); bootstrap.shutdown(); } @Test void cluster02() throws Exception { LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); loggerContext.getLogger("root").setLevel(Level.INFO); loggerContext.getLogger("plus.jmqx.broker").setLevel(Level.INFO); loggerContext.getLogger("reactor.netty").setLevel(Level.INFO); MqttConfiguration config = new MqttConfiguration(); config.setPort(2883); config.setSecurePort(9883); config.setWebsocketPort(2884); config.setWebsocketSecurePort(9884); config.getClusterConfig().setEnable(true); config.getClusterConfig().setUrl("127.0.0.1:7771,127.0.0.1:7772"); config.getClusterConfig().setPort(7772); config.getClusterConfig().setNode("node-2"); config.getClusterConfig().setNamespace("jmqx"); Bootstrap bootstrap = new Bootstrap(config); bootstrap.start().block(); Thread.sleep(3600 * 1000); bootstrap.shutdown(); } } ``` 启动测试用例控制台输出 ```shell 14:21:52.411 [main] INFO io.scalecube.cluster.Cluster - [null][doStart] Starting, config: ClusterConfig[metadata=null, metadataTimeout=3000, metadataCodec=io.scalecube.cluster.metadata.JdkMetadataCodec@57ad2aa7, memberId='null', memberAlias='node-1', externalHost='null', externalPort=null, transportConfig=TransportConfig[port=7771, clientSecured=false, connectTimeout=3000, messageCodec=plus.jmqx.broker.cluster.JacksonMessageCodec@5b3f61ff, maxFrameLength=2097152, transportFactory=io.scalecube.transport.netty.tcp.TcpTransportFactory@3e2059ae, addressMapper=java.util.function.Function$$Lambda$1/1560911714@398dada8], failureDetectorConfig=FailureDetectorConfig[pingInterval=1000, pingTimeout=500, pingReqMembers=3], gossipConfig=GossipConfig[gossipFanout=3, gossipInterval=200, gossipRepeatMult=3, gossipSegmentationThreshold=1000], membershipConfig=MembershipConfig[seedMembers=[127.0.0.1:7771, 127.0.0.1:7772], syncInterval=30000, syncTimeout=3000, suspicionMult=5, namespace='jmqx', removedMembersHistorySize=42]] 14:21:52.518 [sc-cluster-io-nio-1] INFO io.scalecube.cluster.transport.api.Transport - [start][/0:0:0:0:0:0:0:0:7771] Bound cluster transport 14:21:52.536 [sc-cluster-io-nio-1] WARN io.scalecube.cluster.membership.MembershipProtocol - [jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771] Filtering out seed address: 127.0.0.1:7771 14:21:52.540 [sc-cluster-io-nio-1] INFO io.scalecube.cluster.membership.MembershipProtocol - [jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771] Making initial Sync to all seed members: [127.0.0.1:7772] 14:21:52.619 [sc-cluster-io-nio-1] WARN io.scalecube.cluster.transport.api.Transport - [127.0.0.1:7771][connect][error] remoteAddress: 127.0.0.1:7772, cause: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:7772 14:21:52.619 [sc-cluster-io-nio-1] WARN io.scalecube.cluster.membership.MembershipProtocol - [jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771] Exception on initial Sync, cause: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:7772 14:21:52.620 [sc-cluster-7771-1] INFO io.scalecube.cluster.Cluster - [jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771][doStart] Started 14:21:52.625 [jmqx-event-loop-select-nio-2] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt broker start success host 0:0:0:0:0:0:0:0 port 1883 14:21:52.626 [jmqx-event-loop-select-nio-3] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt-ws broker start success host 0:0:0:0:0:0:0:0 port 1884 ``` ```shell 14:22:39.104 [main] INFO io.scalecube.cluster.Cluster - [null][doStart] Starting, config: ClusterConfig[metadata=null, metadataTimeout=3000, metadataCodec=io.scalecube.cluster.metadata.JdkMetadataCodec@57ad2aa7, memberId='null', memberAlias='node-2', externalHost='null', externalPort=null, transportConfig=TransportConfig[port=7772, clientSecured=false, connectTimeout=3000, messageCodec=plus.jmqx.broker.cluster.JacksonMessageCodec@5b3f61ff, maxFrameLength=2097152, transportFactory=io.scalecube.transport.netty.tcp.TcpTransportFactory@3e2059ae, addressMapper=java.util.function.Function$$Lambda$1/1560911714@398dada8], failureDetectorConfig=FailureDetectorConfig[pingInterval=1000, pingTimeout=500, pingReqMembers=3], gossipConfig=GossipConfig[gossipFanout=3, gossipInterval=200, gossipRepeatMult=3, gossipSegmentationThreshold=1000], membershipConfig=MembershipConfig[seedMembers=[127.0.0.1:7771, 127.0.0.1:7772], syncInterval=30000, syncTimeout=3000, suspicionMult=5, namespace='jmqx', removedMembersHistorySize=42]] 14:22:39.212 [sc-cluster-io-nio-1] INFO io.scalecube.cluster.transport.api.Transport - [start][/0:0:0:0:0:0:0:0:7772] Bound cluster transport 14:22:39.228 [sc-cluster-io-nio-1] WARN io.scalecube.cluster.membership.MembershipProtocol - [jmqx:node-2:259b35b190434e83@127.0.0.1:7772] Filtering out seed address: 127.0.0.1:7772 14:22:39.232 [sc-cluster-io-nio-1] INFO io.scalecube.cluster.membership.MembershipProtocol - [jmqx:node-2:259b35b190434e83@127.0.0.1:7772] Making initial Sync to all seed members: [127.0.0.1:7771] 14:22:39.511 [sc-cluster-7772-1] INFO io.scalecube.cluster.membership.MembershipProtocol - [jmqx:node-2:259b35b190434e83@127.0.0.1:7772][publishEvent] MembershipEvent[type=ADDED, member=jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771, oldMetadata=null, newMetadata=7e16449-5, timestamp=2025-04-23T06:22:39.510Z] 14:22:39.514 [sc-cluster-7772-1] INFO plus.jmqx.broker.cluster.ScubeClusterRegistry - cluster onMembershipEvent jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771 MembershipEvent[type=ADDED, member=jmqx:node-1:c2bb3af63614e2a@127.0.0.1:7771, oldMetadata=null, newMetadata=7e16449-5, timestamp=2025-04-23T06:22:39.510Z] 14:22:39.515 [sc-cluster-7772-1] INFO io.scalecube.cluster.Cluster - [jmqx:node-2:259b35b190434e83@127.0.0.1:7772][doStart] Started 14:22:39.518 [jmqx-event-loop-select-nio-2] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt broker start success host 0:0:0:0:0:0:0:0 port 2883 14:22:39.519 [jmqx-event-loop-select-nio-3] INFO plus.jmqx.broker.mqtt.transport.impl.MqttTransport - mqtt-ws broker start success host 0:0:0:0:0:0:0:0 port 2884 ``` ## 生成证书 > 这理采用[generate-CA.sh](https://github.com/owntracks/tools/blob/master/TLS/generate-CA.sh)来生成双向认证自签证书 从 https://github.com/owntracks/tools/blob/master/TLS/generate-CA.sh 下载证书生成脚本 ```shell # 生成服务端证书 ./generate-CA.sh server # 生成客户端证书 ./generate-CA.sh client client ```