# boot-protocol
**Repository Path**: liang-tian-yu/boot-protocol
## Basic Information
- **Project Name**: boot-protocol
- **Description**: springboot对接tcp,mqtt协议
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-06-27
- **Last Updated**: 2025-07-22
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## boot-protocol
springboot对接协议
- mqtt协议
- tcp协议
具体实现看protocol文件夹
## Mqtt
### 安装软件
[emqx服务器下载](https://docs.emqx.com/zh/emqx/v5.3/deploy/install-windows.html)
[MQTTX客户端下载](https://mqttx.app/zh/downloads)
emqx安装解压后,到bin目录文件夹下
```
# 启动
emqx.cmd console
# 关闭
emqx.cmd stop
```
- 使用
控制台访问地址:http://localhost:18083
API地址:http://localhost:18083/api-docs/index.html#/
输入默认账号和默认密码,登录MQTT服务器后台管里界面,输入默认账号为`admin`,默认密码为`public`
**MQTTX客户端**
MQTT5.0客户端工具的语言设置为“简体中文”
### SpringBoot集成
[参考文档SpringBoot集成MQTT客户端](https://juejin.cn/post/7514319877285691455?from=search-suggest#heading-28)
- 导入依赖
```
org.springframework.integration
spring-integration-mqtt
```
- application.yml配置
```
# MQTT配置
mqtt:
# MQTT Broker地址
broker-url: tcp://127.0.0.1:1883
# MQTT 客户端ID
client-id: mqtt-client
# MQTT 用户名
username: admin
# MQTT 密码
password: public
# MQTT默认主题
default-topic: testtopic
```
MqttConfig
```
package com.lty.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Autowired
private MqttReceiver mqttReceiver;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { brokerUrl });
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId + "-out", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId + "-in",
mqttClientFactory(), defaultTopic);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return mqttReceiver;
}
}
```
- MqttReceiver
```
package com.lty.mqtt;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttReceiver implements MessageHandler {
@Override
public void handleMessage(Message> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
System.out.println("Received message from topic: " + topic + ", payload: " + payload);
// TODO 这里可以添加你的业务逻辑
}
}
```
- MqttSender
```
package com.lty.mqtt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
@Service
public class MqttSender {
@Autowired
private MessageChannel mqttOutboundChannel;
public void sendMessage(String topic, String payload) {
Message message = MessageBuilder.withPayload(payload)
.setHeader("mqtt_topic", topic)
.build();
mqttOutboundChannel.send(message);
}
public void sendMessage(String payload) {
sendMessage(null, payload); // 使用默认主题
}
}
```
- dto
```
package com.lty.protocol.mqtt;
import lombok.Data;
/**
* mqtt发送消息
*/
@Data
public class MqttSendTopic {
/**
* 主题名称
*/
public String topic;
/**
* 消息内容
*/
public String message;
}
```
- MqttController
```
package com.lty.controller;
import com.lty.protocol.mqtt.MqttSendTopic;
import com.lty.protocol.mqtt.MqttSender;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class MqttController {
@Resource
private MqttSender mqttSender;
@ApiOperation("发送消息到默认主题")
@PostMapping("/send")
public String sendMessage(@RequestBody MqttSendTopic mqttSendTopic) {
mqttSender.sendMessage(mqttSendTopic.getMessage());
return "Message sent: " + mqttSendTopic.getMessage();
}
@ApiOperation("发送到指定主题")
@PostMapping("/sendTo")
public String sendToTopic(@RequestBody MqttSendTopic mqttSendTopic) {
if (mqttSendTopic.getTopic() == null) {
return "No topics provided";
}
mqttSender.sendMessage(mqttSendTopic.getTopic(), mqttSendTopic.getMessage());
return "Message sent to topic " + mqttSendTopic.getTopic() + ": " + mqttSendTopic.getMessage();
}
}
```
## Netty
SpringBoot整合Netty实现tcp协议连接
测试工具:netassist网络调试助手
- 导入依赖
```
io.netty
netty-all
4.1.100.Final
```
### 服务端
1. 客户端连接管理channelActive(ChannelHandlerContext ctx)
当客户端连接到服务器时触发。
初始化客户端连接信息(如 IP 地址、端口)。
获取对应的数据库连接。
向客户端发送初始化消息(byte 数组)。
在数据库中记录或更新客户端状态(新增或更新连接次数)。
2. channelInactive(ChannelHandlerContext ctx)
当客户端断开连接时触发。
清除内存中的客户端状态。
更新数据库中客户端的状态为“禁用”。
3. 处理收到的消息
```
package com.lty.protocol.tcp;
import com.lty.protocol.tcp.handler.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @ClassName TcpServer
* @Description tcp服务端启动类
*/
@Slf4j
@Component
public class TcpServer implements DisposableBean {
private static final Integer port = 18888;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture future;
@PostConstruct
public void init() {
log.info("正在启动tcp服务器……");
Map map = new HashMap<>();
bossGroup = new NioEventLoopGroup();// 主线程组
workerGroup = new NioEventLoopGroup();// 工作线程组
try {
ServerBootstrap bootstrap = new ServerBootstrap();// 引导对象
bootstrap.group(bossGroup, workerGroup);// 配置工作线程组
bootstrap.channel(NioServerSocketChannel.class);// 配置为NIO的socket通道
bootstrap.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel ch) throws Exception {// 绑定通道参数
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ServerHandler()); // 添加自定义的处理器
}
});
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 缓冲区
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);// ChannelOption对象设置TCP套接字的参数,非必须步骤
future = bootstrap.bind(port).sync();// 使用了Future来启动线程,并绑定了端口
log.info("启动tcp服务器启动成功,正在监听端口:" + port);
/*future.channel().closeFuture().sync();//以异步的方式关闭端口*/
} catch (Exception e) {
log.error("tcp协议端口打开异常,请联系管理员!" + e.getMessage());
// 如果启动失败,应该关闭EventLoopGroup,避免资源泄露
shutdown();
}
}
@Override
public void destroy() throws Exception {
log.info("正在关闭tcp服务器……");
shutdown();
}
/* public static void main(String[] args) {
new TcpServer(8777).init();
}*/
private void shutdown() {
if (future != null && !future.isDone()) {
future.channel().closeFuture().syncUninterruptibly();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);
}
if (bossGroup != null) {
bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);
}
}
}
```
```
package com.lty.protocol.tcp.handler;
import com.lty.protocol.tcp.dto.RemoteClient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.ResultSet;
import java.text.MessageFormat;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName ServerHandler
* @Description 服务处理器
*/
@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
// 存储远程客户端信息(key: channelId, value: RemoteClient)
private Map remoteClients = new HashMap<>();
// 默认客户回码
private static String commonCode = "client";
private Connection connection = null;
private ResultSet resultSet = null;
/**
* 客户端连接处理
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String remoteAddress = ctx.channel().remoteAddress().toString().replace("/", "").split(":")[0];
String channelId = ctx.channel().id().toString();
RemoteClient remoteClient = new RemoteClient();
remoteClient.setAddr(remoteAddress);
remoteClient.setLastRecordTime(LocalDateTime.now().toString());
remoteClient.setLastData("0");
remoteClients.put(channelId, remoteClient);
// 客户端连接时触发,可以记录连接信息
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientAdd = socketAddress.getAddress().getHostAddress();
Integer port = socketAddress.getPort();
// 初始化客户端
initByte(ctx.channel());
// 发送回码给客户端
// sendResponseByte(ctx.channel());
try {
// TODO 数据库中记录或更新客户端状态
} catch (Exception e) {
log.error("初始化客户端信息失败: {}", e.getMessage());
}
super.channelActive(ctx);
}
/**
* 处理接收到的消息
*
* @param ctx 上下文
* @param msg 接收到的消息
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
String channelId = channel.id().toString();
String remoteAddress = channel.remoteAddress().toString().replace("/", "").split(":")[0];
String logOut = MessageFormat.format("channelId:{0}>>>>>>>>ip:{1}>>>>>>>>>msg:{2}", channelId, remoteAddress, msg);
log.info(logOut);
handleReadData(ctx, msg); // 处理解析数据
sendResponseByte(ctx.channel()); // 发送回码给客户端
// 如果tcpServer后续还有待处理的Handler则需要传递消息
// super.channelRead(ctx, msg);
}
/**
* 处理解析数据
*
* @param ctx 上下文
* @param msg 消息
* @throws ParseException 解析异常
*/
private void handleReadData(ChannelHandlerContext ctx, Object msg) throws ParseException {
Channel channel = ctx.channel();
String channelId = channel.id().toString();
String remoteAddress = channel.remoteAddress().toString().replace("/", "").split(":")[0];
String payload = "";
if (msg instanceof String) {
payload = (String) msg;
}
// 解析数据
if (StringUtils.isNotBlank(payload)) {
// TODO 解析数据方法
System.out.println("解析数据: " + payload);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String channelId = channel.id().toString();
remoteClients.remove(channelId);
String clientInfoId = "";
// TODO 更新客户端状态为不活跃
// if (connection != null) {
// clientUpdateSqlInActive = clientUpdateSqlInActive + "'" + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() + "';";
// JdbcUtil.executeUpdate(clientUpdateSqlInActive, connection);
// }
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 记录异常日志
log.error("发生异常: {}", cause.getMessage(), cause);
// 关闭连接
ctx.close();
}
/**
* 发送响应回码给客户端
*/
private void sendResponseByte(Channel ctx) {
byte[] responseBytes = commonCode.getBytes();
ByteBuf response = Unpooled.wrappedBuffer(responseBytes);
// 发送回码给客户端
ctx.writeAndFlush(response);
}
/**
* 初始化客户端信息
*/
private void initByte(Channel ctx) {
byte[] initByte = commonCode.getBytes();
log.info("初始化客户端信息:" + commonCode);
ByteBuf responseInit = Unpooled.wrappedBuffer(initByte);
// 发送回码给客户端
ctx.writeAndFlush(responseInit);
}
}
```
```
package com.lty.protocol.tcp.dto;
import lombok.*;
import java.io.Serializable;
/**
* @ClassName RemoteClient
* @Description 远程客户端信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RemoteClient implements Serializable {
String addr = "";
String lineId = "";
String lastRecordTime = "";
String lastData = "";
private static final long serialVersionUID = 5802160595706522337L;
}
```