1 Star 8 Fork 4

voidwx/netty-client-connect-pool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

一、引言

1.1 场景

最近存在一个如下的场景需求。本人需要实现一个雾服务器,其主要功能是接收安卓端的Https请求,然后将该请求中的数据转发给对应的后台认证服务。

其中,在安卓和雾服务器间使用https是因为需要传输userIDasessionKeytoken等可能的敏感信息,https使用nginx代理实现。雾服务器和后台认证服务在一台服务器上,所以这两者之间直接使用socket进行通信。

场景需求

1.2 技术选型

安卓->雾服务器:springboot

雾服务器->后台认证服务(连接池):netty(指定)

此处通信使用载体的都为JSON

1.3 声明

本文在Netty Client实战——高并发连接池方案_itboyer的博客-CSDN博客_netty客户端连接池的基础上更改了部分内容以及添加了注释。由于本人的水平有限,代码可能不够简洁高效,也存在部分问题未解决,请见谅。

二、线程模型

描述:当有任务需要连接服务器时,会新建一个线程从连接池中获取一个连接。

2.1 连接池中连接的获取与使用

因为在这里我们需要服务端返回的结果,所以线程池中的线程是以实现Callable接口实现的。

这里引用原博客中的问题:

Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题:

a. 请求和响应的正确匹配。

当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。(在本文中,使用randomId来完成请求和响应的正确匹配)

b. 请求线程和响应线程的通信。

因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。(此部分在NettyClientHandler中由RESULT_MAP来控制实现)

2.1.1 ChannelTaskThread.java

子线程,通过目的服务器地址和全局随机数从连接池中获取对应服务器的channel(nettyClientPool.getChannel(random,socketAddress);),这里的随机数是要做为channel的属性来实现标识。

/**
 * 多线程获取连接池中的连接
 * Callable在任务完成之后会有返回值
 * Callable<String>:一个具有类型参数的泛型
 */
public class ChannelTaskThread implements Callable<String> {
    // 获取netty连接池,NettyClientPool为单例模式,可以获取全局唯一的连接池
    private final Logger logger = LoggerFactory.getLogger(getClass());
    final NettyClientPool nettyClientPool = NettyClientPool.getInstance();
    private String message;
    private InetSocketAddress socketAddress;

    public ChannelTaskThread(String message, InetSocketAddress socketAddress) {
        this.message = message;
        this.socketAddress = socketAddress;
    }

    @Override
    public String call() throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
        // 同一个线程使用同一个全局唯一的随机数
        long random = Long.parseLong(sdf.format(new Date())) * 1000000 + Math.round(Math.random()*1000000);

        Channel channel = nettyClientPool.getChannel(random,socketAddress);
        logger.info("在链接池中取到的Channel:{}",channel.id());
        // UnpooledByteBufAllocator: 非池化的内存分配器,用于从堆上或直接内存上进行内存的分配和释放;false:在堆上开启buffer
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buf = allocator.buffer(20);

        String msg =  message;
        // 先写长度,之后再读取长度
        byte[] bytes = msg.getBytes();
        buf.writeInt(bytes.length);
        buf.writeBytes(msg.getBytes());
        // 根据特定的handler类型返回对应的handler
        NettyClientHandler tcpHandler = channel.pipeline().get(NettyClientHandler.class);
        ChannelId id = channel.id();
        logger.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]",random,id);
        // 这里的serverMsg就是返回的结果
        String serverMsg = tcpHandler.sendMessage(buf,channel);
        // 释放连接
        NettyClientPool.release(channel,socketAddress);
        logger.info("接受到返回值:{}",serverMsg);
        return serverMsg;

    }
}

2.1.2 NettyClientHandler.java

1、请求和响应的匹配

channel会传输服务器的返回值,因为我们可以通过channel的全局随机数属性randomId来完成请求与响应的正确匹配。具体的,在获取到channel执行任务时sendMessage,为每个channel建立一个阻塞队列linked,并将相关信息添加到RESULT_MAPsendMessage会不断尝试从linked中获取值,当channel还没有返回值,sendMessage会一直阻塞。当存在返回值,会被channelRead()读取,此时channelRead会根据channel的randomId将返回值放入对应的阻塞队列linked,此时由于linked存在值了,sendMessage可以取到值并返回对应请求的响应。这里的思想可以简化成一个消费者-生产者问题,消费者sendMessage只有当channelRead往阻塞队列中添加值才能继续运行,否则将一直阻塞。

总的来说,这里利用全局随机值实现了请求和响应的匹配。

2、对于通道的回收

对于Netty连接池来说,不需要维护过多的连接,因此,当通道A空闲时,可以判断当前连接池的活跃连接数是否大于预设值,如果大于预设值,则将通道A回收。那么如何统计不同连接池的活跃连接数呢?

NettyClientHandler中建立一个Map来存储对应连接池活跃的连接数volatile static Map<Integer, Set<Channel>> coreChannel = new HashMap<>();。然后在检查通道空闲状态时,将不同连接池的通道分别添加进该map,这样实现了对不同连接池活跃连接数的统计。

但是这样存在一个问题:提前完成任务的channel会一直被保留

3、心跳消息的处理

连接池会保持着多个与服务器连接的channel,如果服务器对每个心跳消息都做回应,会造成通信资源的浪费,因此在这里的设计是服务器会对接受到的消息字段进行判断,如果发现是心跳机制将不会回复消息。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 使用阻塞式LinkedBlockingQueue,对应响应结果保存,并发安全,响应结果为String
     * 用于记录通道响应的结果集合
     */
    private static final Map<Long, LinkedBlockingDeque<String>> RESULT_MAP = new ConcurrentHashMap<>();
    volatile static Map<Integer, Set<Channel>> coreChannel = new HashMap<>();
    private final Logger logger = LoggerFactory.getLogger(getClass());


    public String sendMessage(ByteBuf message, Channel ch){
        // 容量为1的阻塞队列
        LinkedBlockingDeque<String> linked = new LinkedBlockingDeque<>(1);
        // 获取channel中存储的全局唯一随机值
        Long randomId = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        RESULT_MAP.put(randomId,linked);
        // 发送message
        ch.writeAndFlush(message);
        String res = null;
        try {
            // 设置3分钟的获取超时时间或使用take()---获取不到返回结果则一直阻塞
            res = RESULT_MAP.get(randomId).poll(3, TimeUnit.MINUTES);
            RESULT_MAP.remove(randomId);
        }catch (Exception e){
            e.printStackTrace();
        }
        return res;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.debug("into channelRead");
        String message = null;
        if (msg instanceof String){
            message = msg.toString();
        }else if (msg instanceof ByteBuf){
            message = ((ByteBuf)msg).toString(Charset.defaultCharset());
        }
        // 获取channel中存储的全局唯一随机值
        Long randomId = ctx.channel().attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        // 替换为log

        logger.info("READ INFO 服务端返回结果:{}", message);
        // 将服务端返回结果返回对应的channel中
        LinkedBlockingDeque<String> linked = RESULT_MAP.get(randomId);
        if (message != null){
            linked.add(message);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        boolean active = ctx.channel().isActive();
        logger.debug("[此时通道状态]{}",active);
    }

    /**
     * 心跳机制实现连接的动态回收
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        logger.info("[客户端心跳监测发送] 通道编号:{}",ctx.channel().id());
        Channel channel = ctx.channel();
        if (evt instanceof IdleStateEvent){
            // 当客户端开始发送心跳检测时,说明没有业务请求,释放通道数设定的CORE_CONNECTIONS
            if (channel.isActive()){
                // 使用pool的hash作为key,维护CORE_CONNECTIONS个通道数,多余关闭
                int poolHash = NettyClientPool.getPoolHash(channel);
                // 获取poolHash对应的连接集合
                Set<Channel> channels = coreChannel.get(poolHash);
                channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels;
                channels.add(channel);
                if (channels.stream().filter(Channel::isActive).count() > DataBusConstant.CORE_CONNECTIONS){
                    logger.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}",channel.id());
                    channels.remove(channel);
                    channel.close();
                }
                // 将更新后的连接集合到coreChannel中
                coreChannel.put(poolHash,channels);
            }
            String heartBeat = DataBusConstant.HEART_BEAT;
            byte[] bytes = heartBeat.getBytes();
            UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
            ByteBuf buf = allocator.buffer(20);
            buf.writeInt(bytes.length);
            buf.writeBytes(bytes);
            channel.writeAndFlush(buf);
        } else {
            super.userEventTriggered(ctx,evt);
        }
    }
}

2.2 连接池的创建

官方提供的FixedChannelPool支持固定连接的连接池,但是不支持连接池的动态回收,通道的动态回收结合心跳机制实现(见上):

2.2.1 NettyClientPool.java

这里需要对Line50处的代码进行说明:

getInetAddresses(address);
// poolMap.get(key)方法会对不存在的key值创建一个新的channelPool
// 为对应的IP+PORT创建channelPool
for (InetSocketAddress address : addressList){
    pools.put(address,poolMap.get(address));
  }
}

在此时,代码中的poolMap并没有进行初始化,其实现的是ChannelPoolMap接口,其get(key)方法,如果key不存在,则会新建一个FixedChannelPool,在这里即执行line46的语句。

public class NettyClientPool {
    // volatile用来确保将变量的更新操作通知到其他线程。
    volatile private static NettyClientPool nettyClientPool;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * key为目标主机的InetSocketAddress对象,value为目标主机对应的连接池
     * InetSocketAddress可以为ip+port,也可以为hostname+port
     * FixedChannelPool:ChannelPool,可以强制保持一个最大的连接并发
     */
    public ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;

    final EventLoopGroup group = new NioEventLoopGroup();
    final Bootstrap bootstrap = new Bootstrap();
    private static final String address = "127.0.0.1:8000,127.0.0.1:7000,127.0.0.1:9000";
    // 注意这里的pools是private,poolMap应该是用来跟pools进行信息更新
    volatile private static Map<InetSocketAddress,FixedChannelPool> pools = new HashMap<>(3);
    // 注意此处使用了volatile 进行了隔离
    volatile private static List<InetSocketAddress> addressList;

    private NettyClientPool(){
        build();
    }

    public static NettyClientPool getInstance(){
        if (nettyClientPool == null){
            // 同步操作,即加锁
            synchronized (NettyClientPool.class){
                // 为了避免多次初始化,此处又重新做了一次null值判断
                if (nettyClientPool == null){
                    nettyClientPool = new NettyClientPool();
                }
            }
        }
        return nettyClientPool;
    }

    public void build(){
        logger.info("NettyClientPool build...");
        bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
                .option(ChannelOption.SO_KEEPALIVE,true);
        poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
            @Override
            protected FixedChannelPool newPool(InetSocketAddress key) {
                // DataBusConstant.MAX_CONNECTIONS 最大连接数
                // bootstrap.remoteAddress(key):bootstrap对remoteKey进行连接
                return new FixedChannelPool(bootstrap.remoteAddress(key),new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS);
            }
        };
        // 获取server段的addressList,此处的address应该是多个服务器信息的连写,如"127.0.0.1:80,127.0.0.1:90"
        getInetAddresses(address);
        // poolMap.get(key)方法会对不存在的key值创建一个新的channelPool
        // 为对应的IP+PORT创建channelPool
        for (InetSocketAddress address : addressList){
            pools.put(address,poolMap.get(address));
        }
    }
    /**
     * 功能描述:
     *  根据随机数取出的server对应pool,从pool中取出channel
     *  连接池的动态扩容: 指定最大连接数为Integer.MAX_VALUE,如果连接池队列中取不到channel,会自动创建channel,默认使用FIFO的获取方式,回收的channel优先被再次get到
     *  SERVER的宕机自动切换: 指定重试次数,get()发生连接异常,会重新获取
     */

    public Channel getChannel(long random, InetSocketAddress address){
        int retry = 0;
        Channel channel = null;
        try {
            // random是一个关于时间戳的随机数
            // 根据address获取对应的连接池
            FixedChannelPool pool = pools.get(address);
            // 从连接池获取连接
            Future<Channel> future = pool.acquire();
            channel = future.get();
            // 为channel设置key:random随机数
            AttributeKey<Long> randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY);
            channel.attr(randomID).set(random);
        } catch (ExecutionException e) {  //如果是因为服务端挂掉,连接失败而获取不到channel,则随机数执行+1操作,从下一个池获取
            logger.info(e.getMessage());
            // 每个池子尝试获取2次
            int count = 2;
            if(retry < addressList.size() * count){
                retry++;
                return getChannel( random,address);
            } else {
                logger.info("没有可以获取到channel连接的server,server list [{}]",addressList);
                throw new RuntimeException("没有可以获取到channel连接的server");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return channel;
    }

    /**
     *  回收channel进池,需要保证随机值和getChannel获取到的随机值是同一个,才能从同一个pool中释放资源
     * @param ch
     */
    public static void release(Channel ch,InetSocketAddress socketAddress){
       long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
       ch.flush();
        pools.get(socketAddress).release(ch);
    }

    /**
     * 获取线程池的hash值
     */
    public static int getPoolHash(Channel ch){
        // 获取random随机值
        long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
        InetSocketAddress address = (InetSocketAddress) ch.remoteAddress();
        return System.identityHashCode(pools.get(address));
    }

    /**
     * 获取服务端server列表,每个server对应一个pool
     */
    public void getInetAddresses(String addresses){
        addressList = new ArrayList<>(3);
        // 此处需要注意看是否会出错
        if (StringUtil.isNullOrEmpty(addresses)){
            throw new RuntimeException("address列表为空");
        }
        String[] splits = addresses.split(",");
        for (String address : splits){
            String[] split = address.split(":");
            if (split.length==0){
                throw new RuntimeException("["+address+"]不符合IP:PORT格式");
            }
            addressList.add(new InetSocketAddress(split[0],Integer.parseInt(split[1])));
        }
    }
}

2.2.2 NettyChannelPoolHandler.java

注意这里为了解决TCP粘包的问题,使用的是自定义长度帧解码器LengthFieldBasedFrameDecoder,其用法参考LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园 (cnblogs.com),该类解码器需要在主要消息之前添加消息长度,因此可以看见NettyClientHandler中的sendMessage会先添加消息长度。

关于此处为何要选中使用自定义长度解码器而不是分隔符解码器(DelimiterBasedFrameDecoder),因为在使用python构建服务器时(测试用),不知道以何种方式来完成分隔符解码器,故采用自定义长度解码器。

public class NettyChannelPoolHandler implements ChannelPoolHandler {
    // 分隔符
    static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes());
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void channelReleased(Channel ch) throws Exception {
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
        logger.info("|-->回收channel.Channel ID:"+ch.id());
    }

    @Override
    public void channelAcquired(Channel ch) throws Exception {
        logger.info("|-->获取Channel. Channel ID: " + ch.id());
    }

    @Override
    public void channelCreated(Channel ch) throws Exception {
        logger.info("|-->创建Channel. Channel ID: " + ch.id()
                +"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch));
        SocketChannel channel = (SocketChannel) ch;
        channel.config().setKeepAlive(true);
        channel.config().setTcpNoDelay(true);
        channel.pipeline()
                // 开启netty自带的心跳处理器,每10秒发一次心跳
                .addLast(new IdleStateHandler(0,0,10, TimeUnit.SECONDS))
                .addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4))
                .addLast(new StringDecoder())
                .addLast(new NettyClientHandler());
    }
}

2.3 辅助类

2.3.1 线程池

public class NettyTaskPool {
    /**
     * 线程池线程数量,对应cachedThreadPoolExecutor
     */
    private static final int CORE_POLL_SIZE = 3;
    private static final int MAX_POLL_SIZE = Integer.MAX_VALUE;
    
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POLL_SIZE,
            MAX_POLL_SIZE,
            3,
            TimeUnit.MINUTES,
            new LinkedBlockingDeque<>(),
            new ThreadPoolExecutor.DiscardOldestPolicy()
    );

    public static String submitTask(String message, InetSocketAddress socketAddress) throws Exception{
        // 单个任务在线程池内分配单个线程,用于同步等待封装的返回结果
        Future<String> submit = threadPool.submit(new ChannelTaskThread(message,socketAddress));
        // Future.get() 获取任务的结果,若存在异常,则抛出ExecutionException
        String response = submit.get();
        return response;
    }
}

2.3.2 常量类

public class DataBusConstant {
    public static final String DELIMITER = "%#_#%";

    public static final String HEART_BEAT = "{\"HeatBeat\":\"ping-pong-ping-pong\"}";

    /**
     * 最大连接数
     */
    public static final int MAX_CONNECTIONS = Integer.MAX_VALUE;

    /**
     * 核心链接数,该数目内的通道 在没有业务请求时发送心跳防止失活,超过部分的通道close掉
     */
    public static final int CORE_CONNECTIONS = 0;

    /**
     * 同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值
     */
    public static final String RANDOM_KEY = "randomID";

    /**
     * 服务端丢失心跳次数,达到该次数,则关闭通道,默认3次
     */
    public static final int LOOS_HEART_BEAT_COUNT = 3;

    public static final String HOST_NOT_REACHABLE = "{\"msg\":\"服务未开启\"}";
}

2.3.3 连接常量类

可与常量类合并

public class ConnectionUtil {
    public static final InetSocketAddress USER_AUTH = new InetSocketAddress("127.0.0.1",7000);
    public static final InetSocketAddress BIO_ATH = new InetSocketAddress("127.0.0.1",8000);
    public static final InetSocketAddress LIVE_CHECK = new InetSocketAddress("127.0.0.1",9000);

}

2.3.4 结果类

public class Result {
    // 前端成功是20000,这里先做一点更改
    /**
     * code:200-请求成功处理;5000-出错
     */
    private int code;
    private String message;
    private Object data;

    public Result(int code, String message, Object data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }
    public Result(){};

    public static Result OK(){
        return OK(null);
    }
    public static Result OK(Object data){
        return new Result(200,"操作成功",data);
    }
    public static Result ERROR(){
        return ERROR("操作失败");
    }
    public static Result ERROR(String message){
        return new Result(5000,"操作失败",message);
    }


    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

2.3.5 结果工具类

转换对应消息格式

public class ResultUtil {
    public static Result getResult(String json){
        JsonObject jsonObject = (JsonObject) new JsonParser().parse(json);
        int code = jsonObject.remove("code").getAsInt();
        if(code == 200){
            return Result.OK(json);
        }
        return Result.ERROR(json);
    }
}

2.4 测试

接受http请求后,调用上述服务

@RestController
public class TestController {

    // 接受json数据
    @PostMapping("/testUser")
    public Result testUser(@RequestBody Map params) throws Exception {
        String jsonString = new Gson().toJson(params);
        String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.USER_AUTH);
        return ResultUtil.getResult(res);
    }
    @PostMapping("/testBio")
    public Result testBio(@RequestBody Map params) throws Exception {
        String jsonString = new Gson().toJson(params);
        String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.BIO_ATH);
        return ResultUtil.getResult(res);
    }
    @PostMapping("/testLive")
    public Result testLive(@RequestBody Map params) throws Exception {
        String jsonString = new Gson().toJson(params);
        String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.LIVE_CHECK);
        return ResultUtil.getResult(res);
    }
}

post请求测试例子:

{"username":"admin","password":"admin123","userId":"xiawei","sessionKey":"sessionKey","token":"token","code":200}

2.5 服务器

服务器使用python编写

这里在将消息长度写入的时候,需要注意要将其转换为大端序,即 size = struct.pack('>i',len(sendData)),在接收的时候,需要将其转换为小端序,即lenOfData = struct.unpack('<i',self.request.recv(4))[0]

import socketserver
import json
import struct

ip_port = ("127.0.0.1", 8000)

class UserAuthServer(socketserver.BaseRequestHandler):
    def handle(self):
        print("conn is:",self.request)
        print("addr id:", self.client_address)

        while True:
            try:
                lenOfData  = struct.unpack('<i',self.request.recv(4))[0]
                data = self.request.recv(lenOfData)
                if not data: break
                
                data = data.decode("utf-8")
                if("HeatBeat" in json.loads(data)):
                    continue
                print("接收到的消息是:",data)
                
                sendData = data
                print("服务器发送的数据为:",sendData)
                sendData = data.encode("utf-8")
                # 需要转换为大端序
                size = struct.pack('>i',len(sendData))
                sendData = size + sendData
                self.request.sendall(sendData)
            except Exception as e:
                print(e)
                break 

if __name__ == "__main__":
    s = socketserver.ThreadingTCPServer(ip_port,UserAuthServer)
    s.serve_forever()

三、问题

1、channel连接失败无法返回异常

在“雾服务器”运行期间,如果认证服务未开启,此时channel是无法连接成功,但是此时会抛出异常,会重复尝试连接,这样请求的发起者(安卓)会一直处于等待状态,无法返回异常(即服务器未开启)。

四、参考连接

Netty Client实战——高并发连接池方案_itboyer的博客-CSDN博客_netty客户端连接池

python)解决TCP下的粘包问题_Monicx的博客-CSDN博客_python tcp 粘包

LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园 (cnblogs.com)

源码地址:https://gitee.com/voidwx/netty-client-connect-pool.git

空文件

简介

取消

发行版

暂无发行版

贡献者

全部

语言

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/voidwx/netty-client-connect-pool.git
git@gitee.com:voidwx/netty-client-connect-pool.git
voidwx
netty-client-connect-pool
netty-client-connect-pool
master

搜索帮助