Watch 5 Star 31 Fork 17

RockYang / tio-starterJavaApache-2.0

Create your Gitee Account
Explore and code with more than 5 million developers,Free private repositories !:)
Sign up
starter components for Tio spread retract

Clone or download
Loading...
README.md

Tio-Starter

Tio 通用 TCP 服务的 spring-boot-starter 实现

项目地址

快速开始

首先,你需要在 pom.xml 中引入 tio-core-spring-boot-starter 构件

<dependency>
	<groupId>org.t-io</groupId>
	<artifactId>tio-core-spring-boot-starter</artifactId>
    <version>1.2.2</version>
</dependency>

编写服务端程序

一、给SpringBoot Application 主类添加 @EnableTioServerServer 注解

@SpringBootApplication
@EnableTioServerServer
public class TioServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(TioServerApplication.class, args);
    }
}

二、接下来,修改配置文件

tio:
  core:
    server:
      # websocket port default 9876
      port: 6789
      # 心跳时间
      heartbeat-timeout: 60000
      # 监控时段,多个之间用逗号隔开
      ip-stat-durations: [60]
      # 集群配置 默认关闭
    cluster:
      enabled: false
      # 集群是通过redis的Pub/Sub实现,所以需要配置Redis
      redis:
        ip: 127.0.0.1
        port: 6379
      all: true
      group: true
      ip: true
      user: true
      # SSL 配置
      ssl:
        enabled: false
        key-store:
        password:
        trust-store:

三、编写消息处理类

/**
 * 消息处理 handler, 通过加 {@link TioMsgHandler} 注解启用,否则不会启用
 * 注意: handler 是必须要启用的,否则启动会抛出 {@link TioMsgHandlerNotFoundException} 异常
 *
 * @author yangjian
 */
@TioMsgHandler
public class HelloServerMsgHandler implements ServerAioHandler {


    /**
     * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException
    {
        //提醒:buffer的开始位置并不一定是0,应用需要从buffer.position()开始读取数据
        //收到的数据组不了业务包,则返回null以告诉框架数据不够
        if (readableLength < HelloPacket.HEADER_LENGTH) {
            return null;
        }

        //读取消息体的长度
        int bodyLength = buffer.getInt();

        //数据不正确,则抛出AioDecodeException异常
        if (bodyLength < 0) {
            throw new AioDecodeException("bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
        }

        //计算本次需要的数据长度
        int neededLength = HelloPacket.HEADER_LENGTH + bodyLength;
        //收到的数据是否足够组包
        int isDataEnough = readableLength - neededLength;
        // 不够消息体长度(剩下的buffe组不了消息体)
        if (isDataEnough < 0) {
            return null;
        } else //组包成功
        {
            HelloPacket imPacket = new HelloPacket();
            if (bodyLength > 0) {
                byte[] dst = new byte[bodyLength];
                buffer.get(dst);
                imPacket.setBody(dst);
            }
            return imPacket;
        }
    }

    /**
     * 编码:把业务消息包编码为可以发送的ByteBuffer
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext)
    {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }

        //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = HelloPacket.HEADER_LENGTH + bodyLen;
        //创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        //设置字节序
        buffer.order(groupContext.getByteOrder());

        //写入消息头----消息头的内容就是消息体的长度
        buffer.putInt(bodyLen);

        //写入消息体
        if (body != null) {
            buffer.put(body);
        }
        return buffer;
    }


    /**
     * 处理消息
     */
    @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception 
    {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        if (body != null) {
            String str = new String(body, HelloPacket.CHARSET);
            System.out.println("收到消息:" + str);

            HelloPacket resppacket = new HelloPacket();
            resppacket.setBody(("收到了你的消息,你的消息是:" + str).getBytes(HelloPacket.CHARSET));
            Tio.send(channelContext, resppacket);
        }
        return;
    }
}

四、实现消息实体包

/**
 * 消息包实体
 *
 * @author yangjian
 */
public class HelloPacket extends Packet {
	private static final long serialVersionUID = -172060606924066412L;
	public static final int HEADER_LENGTH = 4;//消息头的长度
	public static final String CHARSET = "utf-8";
	private byte[] body;

	/**
	 * @return the body
	 */
	public byte[] getBody() {
		return body;
	}

	/**
	 * @param body the body to set
	 */
	public void setBody(byte[] body) {
		this.body = body;
	}
}

接下来启动服务端

编写客户端程序

客户端采用 Tio 的常规程序启动,只有三个文件,启动非常简单。

一、编写常量类

public interface Const {
	/**
	 * 服务器地址
	 */
	String SERVER = "127.0.0.1";
	
	/**
	 * 监听端口
	 */
	int PORT = 6789;

	/**
	 * 心跳超时时间
	 */
	int TIMEOUT = 5000;
}

二、消息处理类

public class HelloClientAioHandler implements ClientAioHandler {
	private static HelloPacket heartbeatPacket = new HelloPacket();


	/**
	 * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
	 * 总的消息结构:消息头 + 消息体
	 * 消息头结构:    4个字节,存储消息体的长度
	 * 消息体结构:   对象的json串的byte[]
	 */
	@Override
	public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException
	{
		return PacketUtil.decode(buffer, limit, position, readableLength, channelContext);
	}

	/**
	 * 编码:把业务消息包编码为可以发送的ByteBuffer
	 * 总的消息结构:消息头 + 消息体
	 * 消息头结构:    4个字节,存储消息体的长度
	 * 消息体结构:   对象的json串的byte[]
	 */
	@Override
	public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext)
	{
		HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }

        //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = HelloPacket.HEADER_LENGTH + bodyLen;
        //创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        //设置字节序
        buffer.order(groupContext.getByteOrder());

        //写入消息头----消息头的内容就是消息体的长度
        buffer.putInt(bodyLen);

        //写入消息体
        if (body != null) {
            buffer.put(body);
        }
        return buffer;
	}
	
	/**
	 * 处理消息
	 */
	@Override
	public void handler(Packet packet, ChannelContext channelContext) throws Exception {
		HelloPacket helloPacket = (HelloPacket) packet;
		byte[] body = helloPacket.getBody();
		if (body != null) {
			String str = new String(body, HelloPacket.CHARSET);
			System.out.println("收到消息:" + str);
		}

		return;
	}

	/**
	 * 此方法如果返回null,框架层面则不会发心跳;如果返回非null,框架层面会定时发本方法返回的消息包
	 */
	@Override
	public HelloPacket heartbeatPacket(ChannelContext channelContext) {
		return heartbeatPacket;
	}
}

三、客户端启动类

public class HelloClientStarter 
{
	//服务器节点
	public static Node serverNode = new Node(Const.SERVER, Const.PORT);

	//handler, 包括编码、解码、消息处理
	public static ClientAioHandler tioClientHandler = new HelloClientAioHandler();

	//事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
	public static ClientAioListener aioListener = null;

	//断链后自动连接的,不想自动连接请设为null
	private static ReconnConf reconnConf = new ReconnConf(5000L);

	//一组连接共用的上下文对象
	public static ClientGroupContext clientGroupContext = new ClientGroupContext(tioClientHandler, aioListener, reconnConf);

	public static TioClient tioClient = null;
	public static ClientChannelContext clientChannelContext = null;

	/**
	 * 启动程序入口
	 */
	public static void main(String[] args) throws Exception {
		clientGroupContext.setHeartbeatTimeout(Const.TIMEOUT);
		tioClient = new TioClient(clientGroupContext);
		clientChannelContext = tioClient.connect(serverNode);
	
		send();
	}

	private static void send() throws Exception {
		HelloPacket packet = new HelloPacket();
		packet.setBody("hello world".getBytes(HelloPacket.CHARSET));
		Tio.send(clientChannelContext, packet);
	}
}

启动客户端端,查看终端输出。

服务端输出

原生回调接口支持

跟 handler 一样,其他原生回调接口的使用方法保持不变,只需要在对应的实现类上加上对应的注解就 OK 了。

//最主要的逻辑处理类,必须要写,否则抛异常
public class HelloServerMsgHandler implements ServerAioHandler {}
//可不写,通过加 @TioAioListener 注解启用,否则不会启用
public class HelloServerAioListener implements SocketServerAioListener {}
//可不写, 通过加 @TioGroupListener 注解启用,否则不会启用
public class HelloServerGroupListener implements GroupListener{}
//可不写,通过加 @link TioIpStatListener 注解启用,否则不会启用
public class HelloServerIpStatListener implements IpStatListener {}

这里注意:每个对应的回调接口都需要通过添加注解手动启用,否则默认不启用,不会自动扫描

服务端主动推送

这个也非常简单,只需获取到 TioServerBootstrap ,其他都变得非常简单。

@RestController
public class HelloController {

	static Logger logger = LoggerFactory.getLogger(HelloController.class);

	@Autowired
	private TioServerBootstrap bootstrap;

	@GetMapping("/")
	public String index()
	{
		return "Hello, tio-spring-boot-starter !!!";
	}

	/**
	 * 推送消息到客户端
	 * @throws Exception
	 */
	@GetMapping("/push")
	public String pushMessage() throws Exception {
		HelloPacket packet = new HelloPacket();
		packet.setBody("This message is pushed by Tio Server.".getBytes(HelloPacket.CHARSET));
		Tio.sendToAll(bootstrap.getServerGroupContext(), packet);
		logger.info("Push a message to client successfully");
		return "Push a message to client successfully";
	}
}

客户端输出截图

SSL 支持

# SSL 配置
    ssl:
      enabled: true
      key-store: key-store path
      password: password
      trust-store: trust-store path

集群支持

# 集群配置 默认关闭
    cluster:
      enabled: false
      # 集群是通过redis的Pub/Sub实现,所以需要配置Redis
      redis:
        ip: 127.0.0.1
        port: 6379
      all: true
      group: true
      ip: true
      user: true

完整功能项目源码

Comments ( 10 )

Sign in for post a comment

Java
1
https://gitee.com/blackfox/tio-starter.git
git@gitee.com:blackfox/tio-starter.git
blackfox
tio-starter
tio-starter
master

Search