代码拉取完成,页面将自动刷新
编译器需支持:C++17
1.CLONE项目
2.mkdir build
3.cd build
4.cmake .. -DCMAKE_CXX_COMPILER={支持C++17的编译器}
5.make
6.初期运行 生成配置文件
7.关闭服务 修改配置文件
测试平台: VMware虚拟机15 core:4 mem:8G 主机:thinpad E14 r5 4500 内存16gb
zeromq:
muduo:50字节 单程延迟46us
50000字节 单程延迟64us
MyRpc:50字节 单程延迟43us
50000字节 单程延迟62us
150000字节 单程延迟112us
压力测试
SERVER 100CPU 单线程 1500字节数据包 转发回Client QPS 90000+
Log:
单线程 80字节信息 9341520+qps
Rpc:
单对单序列RPC 解析RPC 调用RPC
SERVER: 序列化RPC+发送包 一次调用一个数据包 连续发送 每次65.601ns
Client: 接受包+解析RPC+调用 每次40.739ns
RPC:
RPC消息包默认格式: 单位字节
|-------------------------------------------------------------------------------------------------------------|
| 消息包大小 4 | 消息类型(定义在dataformat) 4 | 调用PRC序号 4 | 消息数据大小 4 | 是否调用回调 1 | 消息数据 -- |
|-------------------------------------------------------------------------------------------------------------|
connect->Read 返回对端一次Write的内容 即:消息类型(定义在dataformat) 调用PRC序号 消息数据大小 是否调用回调 消息数据 or 任何数据
一次Write对应一次Read 需要口头规定消息包最大大小 以免一次Read无法读尽
Rpc调用过程:
对端注册一个RPC
Rpc.Init_fun(type(口头约定的类型),fun_address 可以是lambda 可以是函数地址,size 希望的接收到的参数数据包大小(消息数据大小),回调函数(可选))
发送端发送一个RPC
Message::Make_rpc(connnet,(口头约定的类型),缓冲区,是否调用回调,参数...)
// Server:
Main():
myrpc::Message message = myprc::Message(); // 初始化messgae
net_tools::Config config("Server"); // 加载配置
net_tools::Log log; // 初始化LOG
net_tools::Tcpserver tcp; // 创建TCP服务
tcp.Set_connect_over_callback(conn_func); // 设置回调
tcp.Initserver(true,2,0); // 注册TCP服务
tcp.Startserver(); // 开始服务
conn_func(net_tools::net::Connect* connect):
connect->Set_read_callback(read_func); // 设置收到消息包回调(可选)
connect->Set_close_callback(close_func); // 设置对端Close回调(可选)
connect->Set_lose_callback(lose_func); // 设置心跳包失效回调(可选)
char buf[512] = {}; // 用户缓冲区
myrpc::Message::Make_rpc(connect,3,buf,true,12,13,14); //调用 3号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)12 13 14
myrpc::Message::Make_rpc(connect,2,buf,true,15); //调用 2号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)15
// Client:
Main():
myrpc::Message message = myprc::Message(); // 初始化messgae
net_tools::Config config("Client"); // 加载配置
net_tools::Log log; // 初始化LOG
net_tools::Tcpclient tcp; // 创建TCP服务
message.Get_rpc()->Init_fun(3,myrpc::base::Rpc::Test,sizeof_fun(myrpc::base::Rpc::Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc_test" << NT_LOG_ENDL;});
message.Get_rpc()->Init_fun(2,Test,sizeof_fun(Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc" << NT_LOG_ENDL;});
{
int b = 0;
auto fun = [=](int a)->void{NT_LOG_INFO << a << b << NT_LOG_ENDL;};
message->Get_rpc()->Init_fun(1,fun); //接口会保存该lambda 延长生命周期
}
tcp.Set_connect_over_callback(conn_func); // 设置回调
tcp.Startclient(true); // 循环尝试连接
pause();
char buf[200] = {}; // 缓冲区
read_func(net_tools::net::Connect* connect)
connect->Read(buf,200); // 读取消息 buf缓冲区地址 200 大小
message->Consume_message(connect,buf,200); // 消费消息
├── base // 网络库 base
| ├── base_buffer // 能移动的buffer 零拷贝发送
│ ├── bplustree // 模板B+树 Timebplustree调用
│ ├── channel // Channel 包装每一个event事件
│ ├── channelpool // 管理一个线程内所有Channel
│ ├── condition // 封装列posix的条件变量
│ ├── config // 解析配置 JSON
│ ├── count_down_cond // 带计数器的条件变量
│ ├── cpuinfo // 分析平台CPU
│ ├── epoll // 封装列linux epoll
│ ├── eventloop // 事件循环
│ ├── eventloopthread // 包装了事件循环的线程
│ ├── eventloopthreadpool // IO线程池
│ ├── function // 引入std::function
│ ├── hash // 闭散列HASH
│ ├── json // 包装列JSONCPP
│ ├── logbuffer // LOG的缓冲区
│ ├── log // LOG调用
│ ├── logfile // LOG写入的文件
│ ├── logstream // LOG iostream风格调用
│ ├── logthread // LOG线程
│ ├── mempool // 分配固定大小的内存池
│ ├── mutex // 封装posix 互斥量
│ ├── noncopyable // 基类 不允许左值拷贝构造
│ ├── thread // 包装了posix线程
│ ├── threadpool // 简易逻辑线程池
│ ├── timebplustree // 管理单个线程单次定时任务
│ ├── timeevent // 单词定时任务
│ ├── timeeventset // 管理单个对象的单次定时任务
│ ├── timeevery // 循环定时任务
│ ├── timeeverymap // 管理单个对象打循环定时任务
│ ├── timequeue // 管理单个线程打循环任务时间轮
│ ├── timer // 包装了timefd定时器
├── rpc // Rpc框架
│ ├── dataformat // 信息包格式定义
│ ├── dmq // 分布式消息队列
│ ├── message // 信息包注册消费中心
│ ├── nodeformat // 节点格式定义
│ ├── rpc // rpc框架
│ ├── raft // raft实现分布式(未实现)
│ ├── rpccall // 元编程Rpc序列 反序列
│ ├── serverfinder // 服务发现
├── net // 网络库 net
│ ├── accept // 封装posix accpet
│ ├── buffer // TCP连接缓冲区
| ├── buffer_reader // base_buffer迭代器
│ ├── connect // 单个TCP连接
│ ├── connectpool // 管理单个线程的CONNECT对象
│ ├── heartbeat // 心跳系统 FOR SERVER
│ ├── socket // 封装posix socket调用
│ ├── tcpclient // TCP client主体or单次连接
│ ├── tcpserver // TCP server
└── user // 一些示例
eventloop已经过时 该网络库为过时的框架
网络库使用eventloop使得代码需要非阻塞 而很多的业务逻辑需要并行的rpc调用 过分的回调函数实现异步 所以说要使用应用层的线程来改造网络库
至少lock_free 追求wait_free 应用层的线程也容易实现负载均衡 避免一个调用卡住后面的调用导致超时 task_steal等
epoll的模型也能转换为io_uring 减少进入内核的次数 从而提高吞吐量
现阶段的rpc是在eventloop上进行的 只能实现丑陋的回调异步 需要应用层线程来实现join 从而让业务逻辑能够简化并行处理多个rpc返回值的问题
而且可以方便的终止rpc 通过一个rpc的返回决定另外的rpc的处理方式
rpc也需要实现可扩展的格式 这在现基础上 可以使用数据包附带偏移量的指示 按照原来预定的设想 通过编译期的参数推导实现入参 框架集成pb 简化依赖项
为了可扩展性 protobuf可选支持
服务发现需要raft的支持 保证服务发现的高可用
集成管理的接口 不停机配置 维护
底层网络库io_uring+应用层线程+集成协议的rpc+raft支持的服务发现+raft接口拓展为集群功能+可选的解耦中间件(类似kafka的消息队列 减少整个大集群连接的拓扑复杂度)
重构网络库
应用层线程支持
rpc协议更新
高可用的服务发现
raft接口拓展机器为集群功能
往上搭建DMQ 消息队列
实现最终的满足业务开发绝大多数需求的rpc架构 MyRpc
推荐书籍:
自研Rpc框架 两步实现Rpc调用
RPC: 使用元编程实现RPC序列反序列以及调用
自制function 实现类型擦除 相对于多态的类型擦除 调用速度提升7倍
faster_function 相对于多态实现的rpccall 速度提升15倍 qps达到 22m
不使用JSON等序列化工具 支持不定长 支持lambda(带捕获参数) 函数指针 自定义类型 回调函数
参考muduo的事件驱动型网络库
相对于muduo添加了定时器树,心跳树功能
两种定时任务策略 定时器树,TIMERFD
适合作为接入,解析数据包,转发服务器,IO密集
LOG系统 c++ iostream风格
缓冲区系统 内部采用内存池 隔离用户与系统调用 简化用户业务逻辑 支持base_buffer零拷贝
内存池 仿ptmalloc 为缓冲区提供大块内存
逻辑线程池 任务队列 用以执行用户注册的业务函数
IO线程池 EPOLL监听注册IO描述符
定时器树 用B+树作为底层 维护收到连接注册的定时期任务或在用户调用的定时期任务
定时器循环任务map 以任务名为key 注册循环定时器任务 每个任务一个timefd 当定时任务多时 可用前缀树优化
心跳树 使用类B+树 通过划分心跳时间片 高效管理连接心跳
Config 使用json解析配置
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型