最近存在一个如下的场景需求。本人需要实现一个雾服务器,其主要功能是接收安卓端的Https请求,然后将该请求中的数据转发给对应的后台认证服务。
其中,在安卓和雾服务器间使用https是因为需要传输userID
、asessionKey
和token
等可能的敏感信息,https使用nginx代理实现。雾服务器和后台认证服务在一台服务器上,所以这两者之间直接使用socket进行通信。
安卓->雾服务器:springboot
雾服务器->后台认证服务(连接池):netty(指定)
此处通信使用载体的都为JSON
本文在Netty Client实战——高并发连接池方案_itboyer的博客-CSDN博客_netty客户端连接池的基础上更改了部分内容以及添加了注释。由于本人的水平有限,代码可能不够简洁高效,也存在部分问题未解决,请见谅。
描述:当有任务需要连接服务器时,会新建一个线程从连接池中获取一个连接。
因为在这里我们需要服务端返回的结果,所以线程池中的线程是以实现Callable
接口实现的。
这里引用原博客中的问题:
Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题:
a. 请求和响应的正确匹配。
当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。(在本文中,使用randomId来完成请求和响应的正确匹配)
b. 请求线程和响应线程的通信。
因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。(此部分在
NettyClientHandler
中由RESULT_MAP来控制实现)
ChannelTaskThread.java
子线程,通过目的服务器地址和全局随机数从连接池中获取对应服务器的channel(nettyClientPool.getChannel(random,socketAddress);
),这里的随机数是要做为channel的属性来实现标识。
/**
* 多线程获取连接池中的连接
* Callable在任务完成之后会有返回值
* Callable<String>:一个具有类型参数的泛型
*/
public class ChannelTaskThread implements Callable<String> {
// 获取netty连接池,NettyClientPool为单例模式,可以获取全局唯一的连接池
private final Logger logger = LoggerFactory.getLogger(getClass());
final NettyClientPool nettyClientPool = NettyClientPool.getInstance();
private String message;
private InetSocketAddress socketAddress;
public ChannelTaskThread(String message, InetSocketAddress socketAddress) {
this.message = message;
this.socketAddress = socketAddress;
}
@Override
public String call() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
// 同一个线程使用同一个全局唯一的随机数
long random = Long.parseLong(sdf.format(new Date())) * 1000000 + Math.round(Math.random()*1000000);
Channel channel = nettyClientPool.getChannel(random,socketAddress);
logger.info("在链接池中取到的Channel:{}",channel.id());
// UnpooledByteBufAllocator: 非池化的内存分配器,用于从堆上或直接内存上进行内存的分配和释放;false:在堆上开启buffer
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buf = allocator.buffer(20);
String msg = message;
// 先写长度,之后再读取长度
byte[] bytes = msg.getBytes();
buf.writeInt(bytes.length);
buf.writeBytes(msg.getBytes());
// 根据特定的handler类型返回对应的handler
NettyClientHandler tcpHandler = channel.pipeline().get(NettyClientHandler.class);
ChannelId id = channel.id();
logger.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]",random,id);
// 这里的serverMsg就是返回的结果
String serverMsg = tcpHandler.sendMessage(buf,channel);
// 释放连接
NettyClientPool.release(channel,socketAddress);
logger.info("接受到返回值:{}",serverMsg);
return serverMsg;
}
}
NettyClientHandler.java
1、请求和响应的匹配
channel会传输服务器的返回值,因为我们可以通过channel的全局随机数属性randomId来完成请求与响应的正确匹配。具体的,在获取到channel执行任务时sendMessage
,为每个channel建立一个阻塞队列linked
,并将相关信息添加到RESULT_MAP
,sendMessage
会不断尝试从linked
中获取值,当channel还没有返回值,sendMessage
会一直阻塞。当存在返回值,会被channelRead()
读取,此时channelRead
会根据channel的randomId将返回值放入对应的阻塞队列linked
,此时由于linked
存在值了,sendMessage
可以取到值并返回对应请求的响应。这里的思想可以简化成一个消费者-生产者问题,消费者sendMessage
只有当channelRead
往阻塞队列中添加值才能继续运行,否则将一直阻塞。
总的来说,这里利用全局随机值实现了请求和响应的匹配。
2、对于通道的回收
对于Netty连接池来说,不需要维护过多的连接,因此,当通道A空闲时,可以判断当前连接池的活跃连接数是否大于预设值,如果大于预设值,则将通道A回收。那么如何统计不同连接池的活跃连接数呢?
在NettyClientHandler
中建立一个Map来存储对应连接池活跃的连接数volatile static Map<Integer, Set<Channel>> coreChannel = new HashMap<>();
。然后在检查通道空闲状态时,将不同连接池的通道分别添加进该map,这样实现了对不同连接池活跃连接数的统计。
但是这样存在一个问题:提前完成任务的channel会一直被保留。
3、心跳消息的处理
连接池会保持着多个与服务器连接的channel,如果服务器对每个心跳消息都做回应,会造成通信资源的浪费,因此在这里的设计是服务器会对接受到的消息字段进行判断,如果发现是心跳机制将不会回复消息。
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 使用阻塞式LinkedBlockingQueue,对应响应结果保存,并发安全,响应结果为String
* 用于记录通道响应的结果集合
*/
private static final Map<Long, LinkedBlockingDeque<String>> RESULT_MAP = new ConcurrentHashMap<>();
volatile static Map<Integer, Set<Channel>> coreChannel = new HashMap<>();
private final Logger logger = LoggerFactory.getLogger(getClass());
public String sendMessage(ByteBuf message, Channel ch){
// 容量为1的阻塞队列
LinkedBlockingDeque<String> linked = new LinkedBlockingDeque<>(1);
// 获取channel中存储的全局唯一随机值
Long randomId = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
RESULT_MAP.put(randomId,linked);
// 发送message
ch.writeAndFlush(message);
String res = null;
try {
// 设置3分钟的获取超时时间或使用take()---获取不到返回结果则一直阻塞
res = RESULT_MAP.get(randomId).poll(3, TimeUnit.MINUTES);
RESULT_MAP.remove(randomId);
}catch (Exception e){
e.printStackTrace();
}
return res;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("into channelRead");
String message = null;
if (msg instanceof String){
message = msg.toString();
}else if (msg instanceof ByteBuf){
message = ((ByteBuf)msg).toString(Charset.defaultCharset());
}
// 获取channel中存储的全局唯一随机值
Long randomId = ctx.channel().attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
// 替换为log
logger.info("READ INFO 服务端返回结果:{}", message);
// 将服务端返回结果返回对应的channel中
LinkedBlockingDeque<String> linked = RESULT_MAP.get(randomId);
if (message != null){
linked.add(message);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
boolean active = ctx.channel().isActive();
logger.debug("[此时通道状态]{}",active);
}
/**
* 心跳机制实现连接的动态回收
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.info("[客户端心跳监测发送] 通道编号:{}",ctx.channel().id());
Channel channel = ctx.channel();
if (evt instanceof IdleStateEvent){
// 当客户端开始发送心跳检测时,说明没有业务请求,释放通道数设定的CORE_CONNECTIONS
if (channel.isActive()){
// 使用pool的hash作为key,维护CORE_CONNECTIONS个通道数,多余关闭
int poolHash = NettyClientPool.getPoolHash(channel);
// 获取poolHash对应的连接集合
Set<Channel> channels = coreChannel.get(poolHash);
channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels;
channels.add(channel);
if (channels.stream().filter(Channel::isActive).count() > DataBusConstant.CORE_CONNECTIONS){
logger.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}",channel.id());
channels.remove(channel);
channel.close();
}
// 将更新后的连接集合到coreChannel中
coreChannel.put(poolHash,channels);
}
String heartBeat = DataBusConstant.HEART_BEAT;
byte[] bytes = heartBeat.getBytes();
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buf = allocator.buffer(20);
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
channel.writeAndFlush(buf);
} else {
super.userEventTriggered(ctx,evt);
}
}
}
官方提供的FixedChannelPool
支持固定连接的连接池,但是不支持连接池的动态回收,通道的动态回收结合心跳机制实现(见上):
NettyClientPool.java
这里需要对Line50处的代码进行说明:
getInetAddresses(address);
// poolMap.get(key)方法会对不存在的key值创建一个新的channelPool
// 为对应的IP+PORT创建channelPool
for (InetSocketAddress address : addressList){
pools.put(address,poolMap.get(address));
}
}
在此时,代码中的poolMap
并没有进行初始化,其实现的是ChannelPoolMap
接口,其get(key)
方法,如果key不存在,则会新建一个FixedChannelPool
,在这里即执行line46的语句。
public class NettyClientPool {
// volatile用来确保将变量的更新操作通知到其他线程。
volatile private static NettyClientPool nettyClientPool;
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* key为目标主机的InetSocketAddress对象,value为目标主机对应的连接池
* InetSocketAddress可以为ip+port,也可以为hostname+port
* FixedChannelPool:ChannelPool,可以强制保持一个最大的连接并发
*/
public ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;
final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
private static final String address = "127.0.0.1:8000,127.0.0.1:7000,127.0.0.1:9000";
// 注意这里的pools是private,poolMap应该是用来跟pools进行信息更新
volatile private static Map<InetSocketAddress,FixedChannelPool> pools = new HashMap<>(3);
// 注意此处使用了volatile 进行了隔离
volatile private static List<InetSocketAddress> addressList;
private NettyClientPool(){
build();
}
public static NettyClientPool getInstance(){
if (nettyClientPool == null){
// 同步操作,即加锁
synchronized (NettyClientPool.class){
// 为了避免多次初始化,此处又重新做了一次null值判断
if (nettyClientPool == null){
nettyClientPool = new NettyClientPool();
}
}
}
return nettyClientPool;
}
public void build(){
logger.info("NettyClientPool build...");
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.option(ChannelOption.SO_KEEPALIVE,true);
poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {
// DataBusConstant.MAX_CONNECTIONS 最大连接数
// bootstrap.remoteAddress(key):bootstrap对remoteKey进行连接
return new FixedChannelPool(bootstrap.remoteAddress(key),new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS);
}
};
// 获取server段的addressList,此处的address应该是多个服务器信息的连写,如"127.0.0.1:80,127.0.0.1:90"
getInetAddresses(address);
// poolMap.get(key)方法会对不存在的key值创建一个新的channelPool
// 为对应的IP+PORT创建channelPool
for (InetSocketAddress address : addressList){
pools.put(address,poolMap.get(address));
}
}
/**
* 功能描述:
* 根据随机数取出的server对应pool,从pool中取出channel
* 连接池的动态扩容: 指定最大连接数为Integer.MAX_VALUE,如果连接池队列中取不到channel,会自动创建channel,默认使用FIFO的获取方式,回收的channel优先被再次get到
* SERVER的宕机自动切换: 指定重试次数,get()发生连接异常,会重新获取
*/
public Channel getChannel(long random, InetSocketAddress address){
int retry = 0;
Channel channel = null;
try {
// random是一个关于时间戳的随机数
// 根据address获取对应的连接池
FixedChannelPool pool = pools.get(address);
// 从连接池获取连接
Future<Channel> future = pool.acquire();
channel = future.get();
// 为channel设置key:random随机数
AttributeKey<Long> randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY);
channel.attr(randomID).set(random);
} catch (ExecutionException e) { //如果是因为服务端挂掉,连接失败而获取不到channel,则随机数执行+1操作,从下一个池获取
logger.info(e.getMessage());
// 每个池子尝试获取2次
int count = 2;
if(retry < addressList.size() * count){
retry++;
return getChannel( random,address);
} else {
logger.info("没有可以获取到channel连接的server,server list [{}]",addressList);
throw new RuntimeException("没有可以获取到channel连接的server");
}
} catch (Exception e) {
e.printStackTrace();
}
return channel;
}
/**
* 回收channel进池,需要保证随机值和getChannel获取到的随机值是同一个,才能从同一个pool中释放资源
* @param ch
*/
public static void release(Channel ch,InetSocketAddress socketAddress){
long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
ch.flush();
pools.get(socketAddress).release(ch);
}
/**
* 获取线程池的hash值
*/
public static int getPoolHash(Channel ch){
// 获取random随机值
long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
InetSocketAddress address = (InetSocketAddress) ch.remoteAddress();
return System.identityHashCode(pools.get(address));
}
/**
* 获取服务端server列表,每个server对应一个pool
*/
public void getInetAddresses(String addresses){
addressList = new ArrayList<>(3);
// 此处需要注意看是否会出错
if (StringUtil.isNullOrEmpty(addresses)){
throw new RuntimeException("address列表为空");
}
String[] splits = addresses.split(",");
for (String address : splits){
String[] split = address.split(":");
if (split.length==0){
throw new RuntimeException("["+address+"]不符合IP:PORT格式");
}
addressList.add(new InetSocketAddress(split[0],Integer.parseInt(split[1])));
}
}
}
NettyChannelPoolHandler.java
注意这里为了解决TCP粘包的问题,使用的是自定义长度帧解码器LengthFieldBasedFrameDecoder
,其用法参考LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园 (cnblogs.com),该类解码器需要在主要消息之前添加消息长度,因此可以看见NettyClientHandler
中的sendMessage
会先添加消息长度。
关于此处为何要选中使用自定义长度解码器而不是分隔符解码器(DelimiterBasedFrameDecoder
),因为在使用python构建服务器时(测试用),不知道以何种方式来完成分隔符解码器,故采用自定义长度解码器。
public class NettyChannelPoolHandler implements ChannelPoolHandler {
// 分隔符
static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes());
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void channelReleased(Channel ch) throws Exception {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
logger.info("|-->回收channel.Channel ID:"+ch.id());
}
@Override
public void channelAcquired(Channel ch) throws Exception {
logger.info("|-->获取Channel. Channel ID: " + ch.id());
}
@Override
public void channelCreated(Channel ch) throws Exception {
logger.info("|-->创建Channel. Channel ID: " + ch.id()
+"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch));
SocketChannel channel = (SocketChannel) ch;
channel.config().setKeepAlive(true);
channel.config().setTcpNoDelay(true);
channel.pipeline()
// 开启netty自带的心跳处理器,每10秒发一次心跳
.addLast(new IdleStateHandler(0,0,10, TimeUnit.SECONDS))
.addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4))
.addLast(new StringDecoder())
.addLast(new NettyClientHandler());
}
}
public class NettyTaskPool {
/**
* 线程池线程数量,对应cachedThreadPoolExecutor
*/
private static final int CORE_POLL_SIZE = 3;
private static final int MAX_POLL_SIZE = Integer.MAX_VALUE;
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POLL_SIZE,
MAX_POLL_SIZE,
3,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
public static String submitTask(String message, InetSocketAddress socketAddress) throws Exception{
// 单个任务在线程池内分配单个线程,用于同步等待封装的返回结果
Future<String> submit = threadPool.submit(new ChannelTaskThread(message,socketAddress));
// Future.get() 获取任务的结果,若存在异常,则抛出ExecutionException
String response = submit.get();
return response;
}
}
public class DataBusConstant {
public static final String DELIMITER = "%#_#%";
public static final String HEART_BEAT = "{\"HeatBeat\":\"ping-pong-ping-pong\"}";
/**
* 最大连接数
*/
public static final int MAX_CONNECTIONS = Integer.MAX_VALUE;
/**
* 核心链接数,该数目内的通道 在没有业务请求时发送心跳防止失活,超过部分的通道close掉
*/
public static final int CORE_CONNECTIONS = 0;
/**
* 同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值
*/
public static final String RANDOM_KEY = "randomID";
/**
* 服务端丢失心跳次数,达到该次数,则关闭通道,默认3次
*/
public static final int LOOS_HEART_BEAT_COUNT = 3;
public static final String HOST_NOT_REACHABLE = "{\"msg\":\"服务未开启\"}";
}
可与常量类合并
public class ConnectionUtil {
public static final InetSocketAddress USER_AUTH = new InetSocketAddress("127.0.0.1",7000);
public static final InetSocketAddress BIO_ATH = new InetSocketAddress("127.0.0.1",8000);
public static final InetSocketAddress LIVE_CHECK = new InetSocketAddress("127.0.0.1",9000);
}
public class Result {
// 前端成功是20000,这里先做一点更改
/**
* code:200-请求成功处理;5000-出错
*/
private int code;
private String message;
private Object data;
public Result(int code, String message, Object data) {
this.code = code;
this.message = message;
this.data = data;
}
public Result(){};
public static Result OK(){
return OK(null);
}
public static Result OK(Object data){
return new Result(200,"操作成功",data);
}
public static Result ERROR(){
return ERROR("操作失败");
}
public static Result ERROR(String message){
return new Result(5000,"操作失败",message);
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
转换对应消息格式
public class ResultUtil {
public static Result getResult(String json){
JsonObject jsonObject = (JsonObject) new JsonParser().parse(json);
int code = jsonObject.remove("code").getAsInt();
if(code == 200){
return Result.OK(json);
}
return Result.ERROR(json);
}
}
接受http请求后,调用上述服务
@RestController
public class TestController {
// 接受json数据
@PostMapping("/testUser")
public Result testUser(@RequestBody Map params) throws Exception {
String jsonString = new Gson().toJson(params);
String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.USER_AUTH);
return ResultUtil.getResult(res);
}
@PostMapping("/testBio")
public Result testBio(@RequestBody Map params) throws Exception {
String jsonString = new Gson().toJson(params);
String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.BIO_ATH);
return ResultUtil.getResult(res);
}
@PostMapping("/testLive")
public Result testLive(@RequestBody Map params) throws Exception {
String jsonString = new Gson().toJson(params);
String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.LIVE_CHECK);
return ResultUtil.getResult(res);
}
}
post请求测试例子:
{"username":"admin","password":"admin123","userId":"xiawei","sessionKey":"sessionKey","token":"token","code":200}
服务器使用python编写
这里在将消息长度写入的时候,需要注意要将其转换为大端序,即 size = struct.pack('>i',len(sendData))
,在接收的时候,需要将其转换为小端序,即lenOfData = struct.unpack('<i',self.request.recv(4))[0]
import socketserver
import json
import struct
ip_port = ("127.0.0.1", 8000)
class UserAuthServer(socketserver.BaseRequestHandler):
def handle(self):
print("conn is:",self.request)
print("addr id:", self.client_address)
while True:
try:
lenOfData = struct.unpack('<i',self.request.recv(4))[0]
data = self.request.recv(lenOfData)
if not data: break
data = data.decode("utf-8")
if("HeatBeat" in json.loads(data)):
continue
print("接收到的消息是:",data)
sendData = data
print("服务器发送的数据为:",sendData)
sendData = data.encode("utf-8")
# 需要转换为大端序
size = struct.pack('>i',len(sendData))
sendData = size + sendData
self.request.sendall(sendData)
except Exception as e:
print(e)
break
if __name__ == "__main__":
s = socketserver.ThreadingTCPServer(ip_port,UserAuthServer)
s.serve_forever()
1、channel连接失败无法返回异常
在“雾服务器”运行期间,如果认证服务未开启,此时channel是无法连接成功,但是此时会抛出异常,会重复尝试连接,这样请求的发起者(安卓)会一直处于等待状态,无法返回异常(即服务器未开启)。
Netty Client实战——高并发连接池方案_itboyer的博客-CSDN博客_netty客户端连接池
python)解决TCP下的粘包问题_Monicx的博客-CSDN博客_python tcp 粘包
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。