# boot-protocol **Repository Path**: liang-tian-yu/boot-protocol ## Basic Information - **Project Name**: boot-protocol - **Description**: springboot对接tcp,mqtt协议 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-06-27 - **Last Updated**: 2025-07-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## boot-protocol springboot对接协议 - mqtt协议 - tcp协议 具体实现看protocol文件夹 ## Mqtt ### 安装软件 [emqx服务器下载](https://docs.emqx.com/zh/emqx/v5.3/deploy/install-windows.html) [MQTTX客户端下载](https://mqttx.app/zh/downloads) emqx安装解压后,到bin目录文件夹下 ``` # 启动 emqx.cmd console # 关闭 emqx.cmd stop ``` - 使用 控制台访问地址:http://localhost:18083 API地址:http://localhost:18083/api-docs/index.html#/ 输入默认账号和默认密码,登录MQTT服务器后台管里界面,输入默认账号为`admin`,默认密码为`public` **MQTTX客户端** MQTT5.0客户端工具的语言设置为“简体中文” ### SpringBoot集成 [参考文档SpringBoot集成MQTT客户端](https://juejin.cn/post/7514319877285691455?from=search-suggest#heading-28) - 导入依赖 ``` org.springframework.integration spring-integration-mqtt ``` - application.yml配置 ``` # MQTT配置 mqtt: # MQTT Broker地址 broker-url: tcp://127.0.0.1:1883 # MQTT 客户端ID client-id: mqtt-client # MQTT 用户名 username: admin # MQTT 密码 password: public # MQTT默认主题 default-topic: testtopic ``` MqttConfig ``` package com.lty.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttConfig { @Value("${mqtt.broker-url}") private String brokerUrl; @Value("${mqtt.client-id}") private String clientId; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.default-topic}") private String defaultTopic; @Autowired private MqttReceiver mqttReceiver; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { brokerUrl }); options.setUserName(username); options.setPassword(password.toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-out", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MqttPahoMessageDrivenChannelAdapter inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-in", mqttClientFactory(), defaultTopic); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return mqttReceiver; } } ``` - MqttReceiver ``` package com.lty.mqtt; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class MqttReceiver implements MessageHandler { @Override public void handleMessage(Message message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); System.out.println("Received message from topic: " + topic + ", payload: " + payload); // TODO 这里可以添加你的业务逻辑 } } ``` - MqttSender ``` package com.lty.mqtt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; @Service public class MqttSender { @Autowired private MessageChannel mqttOutboundChannel; public void sendMessage(String topic, String payload) { Message message = MessageBuilder.withPayload(payload) .setHeader("mqtt_topic", topic) .build(); mqttOutboundChannel.send(message); } public void sendMessage(String payload) { sendMessage(null, payload); // 使用默认主题 } } ``` - dto ``` package com.lty.protocol.mqtt; import lombok.Data; /** * mqtt发送消息 */ @Data public class MqttSendTopic { /** * 主题名称 */ public String topic; /** * 消息内容 */ public String message; } ``` - MqttController ``` package com.lty.controller; import com.lty.protocol.mqtt.MqttSendTopic; import com.lty.protocol.mqtt.MqttSender; import io.swagger.annotations.ApiOperation; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class MqttController { @Resource private MqttSender mqttSender; @ApiOperation("发送消息到默认主题") @PostMapping("/send") public String sendMessage(@RequestBody MqttSendTopic mqttSendTopic) { mqttSender.sendMessage(mqttSendTopic.getMessage()); return "Message sent: " + mqttSendTopic.getMessage(); } @ApiOperation("发送到指定主题") @PostMapping("/sendTo") public String sendToTopic(@RequestBody MqttSendTopic mqttSendTopic) { if (mqttSendTopic.getTopic() == null) { return "No topics provided"; } mqttSender.sendMessage(mqttSendTopic.getTopic(), mqttSendTopic.getMessage()); return "Message sent to topic " + mqttSendTopic.getTopic() + ": " + mqttSendTopic.getMessage(); } } ``` ## Netty SpringBoot整合Netty实现tcp协议连接 测试工具:netassist网络调试助手 - 导入依赖 ``` io.netty netty-all 4.1.100.Final ``` ### 服务端 1. 客户端连接管理channelActive(ChannelHandlerContext ctx) 当客户端连接到服务器时触发。 初始化客户端连接信息(如 IP 地址、端口)。 获取对应的数据库连接。 向客户端发送初始化消息(byte 数组)。 在数据库中记录或更新客户端状态(新增或更新连接次数)。 2. channelInactive(ChannelHandlerContext ctx) 当客户端断开连接时触发。 清除内存中的客户端状态。 更新数据库中客户端的状态为“禁用”。 3. 处理收到的消息 ``` package com.lty.protocol.tcp; import com.lty.protocol.tcp.handler.ServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * @ClassName TcpServer * @Description tcp服务端启动类 */ @Slf4j @Component public class TcpServer implements DisposableBean { private static final Integer port = 18888; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ChannelFuture future; @PostConstruct public void init() { log.info("正在启动tcp服务器……"); Map map = new HashMap<>(); bossGroup = new NioEventLoopGroup();// 主线程组 workerGroup = new NioEventLoopGroup();// 工作线程组 try { ServerBootstrap bootstrap = new ServerBootstrap();// 引导对象 bootstrap.group(bossGroup, workerGroup);// 配置工作线程组 bootstrap.channel(NioServerSocketChannel.class);// 配置为NIO的socket通道 bootstrap.childHandler(new ChannelInitializer() { protected void initChannel(SocketChannel ch) throws Exception {// 绑定通道参数 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ServerHandler()); // 添加自定义的处理器 } }); bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 缓冲区 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);// ChannelOption对象设置TCP套接字的参数,非必须步骤 future = bootstrap.bind(port).sync();// 使用了Future来启动线程,并绑定了端口 log.info("启动tcp服务器启动成功,正在监听端口:" + port); /*future.channel().closeFuture().sync();//以异步的方式关闭端口*/ } catch (Exception e) { log.error("tcp协议端口打开异常,请联系管理员!" + e.getMessage()); // 如果启动失败,应该关闭EventLoopGroup,避免资源泄露 shutdown(); } } @Override public void destroy() throws Exception { log.info("正在关闭tcp服务器……"); shutdown(); } /* public static void main(String[] args) { new TcpServer(8777).init(); }*/ private void shutdown() { if (future != null && !future.isDone()) { future.channel().closeFuture().syncUninterruptibly(); } if (workerGroup != null) { workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS); } if (bossGroup != null) { bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS); } } } ``` ``` package com.lty.protocol.tcp.handler; import com.lty.protocol.tcp.dto.RemoteClient; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.ResultSet; import java.text.MessageFormat; import java.text.ParseException; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; /** * @ClassName ServerHandler * @Description 服务处理器 */ @Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { // 存储远程客户端信息(key: channelId, value: RemoteClient) private Map remoteClients = new HashMap<>(); // 默认客户回码 private static String commonCode = "client"; private Connection connection = null; private ResultSet resultSet = null; /** * 客户端连接处理 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String remoteAddress = ctx.channel().remoteAddress().toString().replace("/", "").split(":")[0]; String channelId = ctx.channel().id().toString(); RemoteClient remoteClient = new RemoteClient(); remoteClient.setAddr(remoteAddress); remoteClient.setLastRecordTime(LocalDateTime.now().toString()); remoteClient.setLastData("0"); remoteClients.put(channelId, remoteClient); // 客户端连接时触发,可以记录连接信息 InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientAdd = socketAddress.getAddress().getHostAddress(); Integer port = socketAddress.getPort(); // 初始化客户端 initByte(ctx.channel()); // 发送回码给客户端 // sendResponseByte(ctx.channel()); try { // TODO 数据库中记录或更新客户端状态 } catch (Exception e) { log.error("初始化客户端信息失败: {}", e.getMessage()); } super.channelActive(ctx); } /** * 处理接收到的消息 * * @param ctx 上下文 * @param msg 接收到的消息 * @throws Exception 异常 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); String channelId = channel.id().toString(); String remoteAddress = channel.remoteAddress().toString().replace("/", "").split(":")[0]; String logOut = MessageFormat.format("channelId:{0}>>>>>>>>ip:{1}>>>>>>>>>msg:{2}", channelId, remoteAddress, msg); log.info(logOut); handleReadData(ctx, msg); // 处理解析数据 sendResponseByte(ctx.channel()); // 发送回码给客户端 // 如果tcpServer后续还有待处理的Handler则需要传递消息 // super.channelRead(ctx, msg); } /** * 处理解析数据 * * @param ctx 上下文 * @param msg 消息 * @throws ParseException 解析异常 */ private void handleReadData(ChannelHandlerContext ctx, Object msg) throws ParseException { Channel channel = ctx.channel(); String channelId = channel.id().toString(); String remoteAddress = channel.remoteAddress().toString().replace("/", "").split(":")[0]; String payload = ""; if (msg instanceof String) { payload = (String) msg; } // 解析数据 if (StringUtils.isNotBlank(payload)) { // TODO 解析数据方法 System.out.println("解析数据: " + payload); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String channelId = channel.id().toString(); remoteClients.remove(channelId); String clientInfoId = ""; // TODO 更新客户端状态为不活跃 // if (connection != null) { // clientUpdateSqlInActive = clientUpdateSqlInActive + "'" + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() + "';"; // JdbcUtil.executeUpdate(clientUpdateSqlInActive, connection); // } super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 记录异常日志 log.error("发生异常: {}", cause.getMessage(), cause); // 关闭连接 ctx.close(); } /** * 发送响应回码给客户端 */ private void sendResponseByte(Channel ctx) { byte[] responseBytes = commonCode.getBytes(); ByteBuf response = Unpooled.wrappedBuffer(responseBytes); // 发送回码给客户端 ctx.writeAndFlush(response); } /** * 初始化客户端信息 */ private void initByte(Channel ctx) { byte[] initByte = commonCode.getBytes(); log.info("初始化客户端信息:" + commonCode); ByteBuf responseInit = Unpooled.wrappedBuffer(initByte); // 发送回码给客户端 ctx.writeAndFlush(responseInit); } } ``` ``` package com.lty.protocol.tcp.dto; import lombok.*; import java.io.Serializable; /** * @ClassName RemoteClient * @Description 远程客户端信息 */ @Data @NoArgsConstructor @AllArgsConstructor public class RemoteClient implements Serializable { String addr = ""; String lineId = ""; String lastRecordTime = ""; String lastData = ""; private static final long serialVersionUID = 5802160595706522337L; } ```