# easy_rpc **Repository Path**: deng-changtao/easy_rpc ## Basic Information - **Project Name**: easy_rpc - **Description**: 个人探索rpc框架中~~~ - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 20 - **Forks**: 0 - **Created**: 2023-08-12 - **Last Updated**: 2024-11-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## EASY_RPC 一款轻量,全面,一体化远程调用框架 ​ EASY_RPC 使用 netty 高可用的主从Reactor多线程模型 搭建服务器,底层使用NIO减少连接时产生的资源损耗。其内部包含熔断器,负载均衡器,注册中心等等模块,可由用户手动自定义配置。 ## 快速开始 #### 创建一个注册中心 * **引入依赖:** ``` xml io.github.scarecrowQoQ easy_rpc_registry 0.0.3-release ``` **创建一个Maven项目,这里选择springboot模板** ![](/ReadMe_img/image-20230817214028719.png) **在启动类上开启注册中心服务** ```java @SpringBootApplication @EnableEasyRPC public class ServerDemoApplication { public static void main(String[] args) { SpringApplication.run(ServerDemoApplication.class, args); } } ``` * **编写配置文件(以yaml文件为例)** ```yaml rpc: registry: port: 8344 host: 127.0.0.1 ``` #### 创建一个API模块 ![image-20231010172508397](/ReadMe_img/image-20231010172508397.png) * **编写一个测试接口** * ```java public interface UserOrder { public String getOrder(); } ``` #### 创建一个服务提供者,依然是一个SpringBoot项目 ![image-20231010172639353](/ReadMe_img/image-20231010172639353.png) - **引入依赖** - ```xml io.github.scarecrowQoQ easy_rpc_provider 0.0.3-release ``` * **在SpringBoot启动类上开启服务提供机制** * ```java @SpringBootApplication @EnableRpcProvider public class ProviderDemoApplication { public static void main(String[] args) { SpringApplication.run(ProviderDemoApplication.class, args); } } ``` * 创建一个服务实现类,给定服务名,完成创建 ```java @EasyRpcProvider(serviceName = "userOrder") @Service public class OrderService implements UserOrder { @Override public String getOrder() { return "返回:来自服务1的订单处理"; } } ``` #### 创建服务消费者 ![image-20231010172721274](/ReadMe_img/image-20231010172721274.png) * **引入依赖** * ```xml io.github.scarecrowQoQ easy_rpc_consumer 0.0.3-release ``` * **在SpringBoot启动类上开启服务消费功能** * ```java @SpringBootApplication @EnableRpcConsumer public class RpcConsumerDemoApplication { public static void main(String[] args) { SpringApplication.run(RpcConsumerDemoApplication.class, args); } } ``` * **在消费端创建服务调用类,注入接口,添加服务名.** * ```java @Service public class UserServiceImpl implements UserService { @RpcConsumer(serviceName = "userOrder") UserOrder userOrder; @Override public String getUserOrder() { return userOrder.getOrder(); } } ``` 这里示例为了方便展示,使用HTTP调用消费者触发远程服务调用,添加一个web接口 ```java @RestController public class UserController { @Resource UserService userService; @RequestMapping("/order") public String getUserOrder(){ return userService.getUserOrder(); } } ``` 接下来访问:[localhost:8080/order](http://localhost:8080/order) 成功显示![image-20231010172853254](/ReadMe_img/image-20231010172853254.png) **这样就完成一次简单的远程服务调用** ## RPC服务治理 **项目提供了多样的服务治理,包含负载均衡,服务熔断与降级,服务限流与降级,服务路由,自定义过滤,拦截器,全局链路跟踪,下面将介绍如何快速使用这些功能** ### 猫 负载均衡 源码位置(src/main/java/com/rpc/easy_rpc_govern/loadBalancer): **目前提供了四种负载均衡策略选择,可以在rpc.consumer.loadBalance 属性进行配置,默认为轮询** - 一致性哈希 ( value = consistentHash) - 一致性哈希算法将对rpc中参数的值进行hash运算,相同的请求参数将会定位到同一台机器 - 需要注意哈希策略是对参数的toString()返回值进行hash,所以确保对象重写了toString方法。 - 服务轮询 (value = polling) - 将对该服务下的每一台机器进行轮询访问 - LRU最近最少使用 (value = leastActives) - 消费者会记录一段周期内服务请求的次数,优先选择最少请求次数的服务 - 随机访问 (value = random) - 每次请求都将随机访问到每一个服务提供者 ### 心碎 服务熔断与降级 源码位置(src/main/java/com/rpc/easy_rpc_govern/fuse) **服务熔断开启 rpc.consumer.fuseEnable = true 默认不开启,熔断比率(K)默认为1.2,越低越容易开启拦截** **定义: rate = (总请求数量 - K * 正常通过请求数量) / (总请求数 + 1)** 当rate > 0.3 为熔断器半开状态,此时的请求将会有rate * 100 % 的概率被拦截,当rate > 0.9 时熔断器为全开状态,此时所有请求都将被拦截,当rate < 0.1 熔断器将关闭,不进行任何拦截,拦截器将每10秒进行一次刷新状态,以确保服务恢复后可用 ### 锁定 服务限流:方法限流与统一限流 源码位置 (src/main/java/com/rpc/easy_rpc_govern/limit) 关于限流算法我自己写的博客地址:[手把手教你一个注解搞定指定API的用户限流 - 掘金 (juejin.cn)](https://juejin.cn/post/7270696332737495095) **快速使用,指定最大QPS:** ```java @RateLimit(QPS = 3) public String getUserOrder(){ return userOrderApi.getOrder("dct"); } ``` **指定limitKey,限流粒度从方法细化到参数。已支持深度对象属性的选择,当有多个类中含有相同属性名时将选择靠前类的该属性** ```java @RateLimit(QPS = 3,limitKey = "name") public String getUserOrder(String name){ log.info("{},调用了getUserOrder",name); return userOrderApi.getOrder(name); } ``` **指定降级类,需要注意方法名与参数需要与限流方法一致** ```java @RateLimit(QPS = 3,limitKey = "name",fallBack = UserFallBack.class) public String getUserOrder(String name){ log.info("{},调用了getUserOrder",name); return userOrderApi.getOrder(name); } ``` ```java @Slf4j public class UserFallBack { public String getUserOrder(String name){ log.info("{},调用了getUserOrder 的降级方法",name); return "已降级"; } } ``` #### 配置统一限流处理配置 考虑到限流场景很多都是根据用户id来进行限流,并且大多数项目采用jwt来进行用户鉴权,因此可以选择配置统一限流配置可以更加方便。 在请求头中获取token作为用户唯一标识来进行限流,示例代码: ````java @Component public class CommonLimitConfig extends UnifiedLimitConfig implements Filter { @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { HttpServletRequest httpRequest = (HttpServletRequest) servletRequest; Map headers = Collections.list(httpRequest.getHeaderNames()) .stream() .collect(Collectors.toMap(h -> h, httpRequest::getHeader)); String token = headers.get("token"); super.configUnifiedLimitKey(token, 1, () -> {return "方法被限流了";}); // 1 filterChain.doFilter(servletRequest, servletResponse); } } ```` 在web过滤器继承UnifiedLimitConfig类并在方法中调用此代码,设置标识 key即token,配置最大QPS,配置降级处理方法 ```java super.configUnifiedLimitKey(key,QPS,fallback) ``` 在需要限流的方法中添加限流注解,注意不要配置其他参数比如QPS,否则会走单独的方法限流配置而不是统一配置,方法限流粒度优先级高于统一限流粒度。 ```java @RateLimit public String getUserOrder(String type){ log.info("放行"); return userOrderApi.getOrder(type); } ``` ### 爱04 用户自定义:拦截器与过滤器 **请求过滤器,实现RequestFilterHandler接口,并且需要被spring管理** ```java @Component @Slf4j public class UserRemoteReqFilter implements RequestFilterHandler { @Override public void filterHandler(RpcContext rpcContext) { log.info("调用了请求过滤器"); } } ``` **响应过滤器,实现ResponseFilterHandler接口,并且需要被spring管理** ```java @Component @Slf4j public class UserRemoteResFilter implements ResponseFilterHandler { @Override public void filterHandler(ProviderResponse result) { log.info("调用了响应过滤器"); } } ``` **请求拦截器,实现RequestInterceptHandler接口,并且需要被spring管理。返回false拦截** ```java @Slf4j @Component public class UserInterceptor implements RequestInterceptHandler { @Override public Boolean interceptorHandle(RpcContext context) { log.info("调用了请求拦截器"); return true; } } ``` ### 鸟 服务路由:流量权重与服务分组 --- #### 配置服务权重来实现流量分配 ​ 需要注意当服务集群部署时,如果有机器没有配置权重则该机器无法分配到流量。并且配置了权重后负载均衡失效(逻辑冲突) **以下为实例代码** ​ 服务器1将分配9/10的流量,服务器2分配1/10的流量,分配策略为 当前配置weight/ 总共配置weight ```java @EasyRpcProvider(serviceName = "userOrder",weight = 90) @Service public class UserOrderServiceImpl implements UserOrderApi { @Override public String getOrder(Person person) { return "服务器1处理" + person.getName()+": 的订单"; } } ``` ```java @EasyRpcProvider(serviceName = "userOrder",weight = 10) @Service public class UserOrderServiceImpl implements UserOrderApi { @Override public String getOrder(Person person) { return "服务器2处理" + person.getName()+": 的订单"; } } ``` **对消费者进行配置路由分组** ```java @RpcConsumer(serviceName = "userOrder",group = "group2") UserOrderApi userOrderApi; ``` **对服务者进行配置服务分组** ~~~java @EasyRpcProvider(serviceName = "userOrder",group = "group1") @Service public class UserOrderServiceImpl implements UserOrderApi { @Override public String getOrder(Person person) { return "group1处理" + person.getName()+": 的订单"; } } ~~~ ~~~java @EasyRpcProvider(serviceName = "userOrder",group = "group2") @Service public class UserOrderServiceImpl implements UserOrderApi { @Override public String getOrder(Person person) { return "group2处理" + person.getName()+": 的订单"; } } ~~~ ### 弓箭 (2) 全局链路跟踪 **假设使用微服务来搭建商城项目,用户服务调用商品订单服务来下订单,订单服务调用商品服务来查询商品信息是否正确并调用库存服务来扣减库存。一次购买商品共需要经过四个服务,依次为用户服务,订单服务,商品服务,库存服务** 在消费端开启全局链路跟踪功能,当一个节点开启后后面的请求链路将自动开启跟踪功能,一般只需要在最开始的消费节点开启即可 ```yaml rpc: consumer: traceEnable: true ``` 订单服务,商品服务,库存服务 启动类添加开启服务注册和消费的注解,分别占用端口8081,8082,8083 ```java @SpringBootApplication @EnableRpcConsumer @EnableRpcProvider public class xxxxApplication { public static void main(String[] args) { SpringApplication.run(xxxxxApplication.class, args); } } ``` 作为服务提供者也同时作为消费者去消费服务 **订单服务** ~~~ @rpcProvider(serviceName = "userOrder") @Service public class UserOrderServiceImpl implements UserOrderApi { @RpcConsumer(serviceName = "userGoods") UserGoodsApi userGoodsApi; @Override public String getOrder(Person person){ System.out.println("订单服务生成中.... "); try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); } String GoodsName = "电脑"; String userGoods = userGoodsApi.getUserGoods(GoodsName); return "订单服务生成完毕,购买的商品消息:"+userGoods; } } ~~~ **商品服务** ```java @rpcProvider(serviceName = "userGoods") @Service public class UserGoodsServiceImpl implements UserGoodsApi { @RpcConsumer(serviceName = "repertory") RepertoryApi repertoryApi; @Override public String getUserGoods(String GoodsName) { System.out.println("商品服务查询中...."); try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); } if (repertoryApi.GoodsIsAvailable(GoodsName)){ System.out.println("商品服务查询完毕,商品信息存在,请求库存服务扣减库存...."); } return GoodsName; } } ``` **库存服务** ```java @rpcProvider(serviceName = "repertory") @Service public class RepertoryServiceImpl implements RepertoryApi { @Override public Boolean GoodsIsAvailable(String GoodsName) { System.out.println("库存服务处理中.... "); try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("库存服务处理完毕,还有库存.... "); return true; } } ``` **在注册中心中查看调用链信息,调用起始与结束时间,耗时以及调用方和响应方地址** ~~~ 2024-01-22 14:31:19.598 : Span(spanId=3, traceId=406818372525752321, startTime=1705905079535, endTime=1705905079598, requesterAddress=127.0.0.1:59857, responderAddress=127.0.0.1:8083, costTime=63) 2024-01-22 14:31:19.599 : Span(spanId=2, traceId=406818372525752321, startTime=1705905079473, endTime=1705905079599, requesterAddress=127.0.0.1:59844, responderAddress=127.0.0.1:8082, costTime=126) 2024-01-22 14:31:19.601 : Span(spanId=1, traceId=406818372525752321, startTime=1705905079408, endTime=1705905079600, requesterAddress=127.0.0.1:59967, responderAddress=127.0.0.1:8081, costTime=192) ~~~ ## 注册中心集群部署 **项目支持注册中心集群部署,这对一个分布式项目的高可用是非常关键的,其共识算法选择Raft算法,详情可以查看根目录下Raft落地实现.md文件** 创建五个注册中心应用(最好为奇数数量的应用,这样可以加快选举速度),如下图 ![image-20240125173116710](D/ReadMe_img/node_cluster.png) 在每一个与rpc的应用(注册中心,消费者,服务者)的配置文件中配置集群信息 cluster :开启集群部署。 cluster-address: 以列表的形式添加所有机器的ip地址。 host,port 则为本机占有的地址。 ~~~yml rpc: registry: cluster: true cluster-address: - 127.0.0.1:8001 - 127.0.0.1:8002 - 127.0.0.1:8003 host: 127.0.0.1 port: 8001 ~~~ ---- 一直到 registry-node-3 ~~~~ rpc: registry: cluster: true cluster-address: - 127.0.0.1:8001 - 127.0.0.1:8002 - 127.0.0.1:8003 host: 127.0.0.1 port: 8003 ~~~~ 全部开启服务后即开始选举并完成集群部署 leader节点日志打印 ~~~ c.r.e.c.service.ClusterServiceImpl : 节点全部连接成功!开始进入集群选举阶段 c.r.e.c.service.ClusterServiceImpl : --------------------------------------开始本轮选举,当前选期id: 1 ---------------------------------- c.r.e.c.service.ClusterServiceImpl : 随机睡眠: 11 ms c.r.e.c.service.ClusterServiceImpl : 本节点参与本轮选举 c.r.e.c.service.ClusterServiceImpl : 当前票数:1,已收票数:1,总共应收:3。 c.r.e.cluster.handler.ClusterHandler : 来自127.0.0.1:8001的拉票,但是本节点已经投票。我不选择投票 c.r.e.c.service.ClusterServiceImpl : 接收到来自:127.0.0.1:8002 的投票。当前全部票数:2 c.r.e.c.service.ClusterServiceImpl : 127.0.0.1:8001 不选择投票,当前全部票数:3 c.r.e.c.service.ClusterServiceImpl : 当前票数:2,已收票数:3,总共应收:3。 c.r.e.c.service.ClusterServiceImpl : 本轮投票结束,共耗时:130ms。当前票数:2,大于集群半数节点,当前节点为集群Leader节点 ~~~ ## EASY_RPC 主要类说明 - **easy_rpc_consumer: 消费者模块** - consumer->Consumer Processor:消费者主启动类,包含注解扫描,服务获取 - consumer->ConsumerProxy:消费者代理对象,触发消费请求发送 - net Server->Consumer Handler: 消费者netty处理类,负责注册中心,服务提供者数据接受 - service-> Consumer Service: 消费者服务类,具体执行服务列表拉取,服务请求方法 - **easy_rpc_provider: 服务提供者模块** - provider-> ProviderProcessor: 提供者主要进行,开启服务器,服务列表的注册 - provider-> ProviderFinder:服务提供者扫描类,负责扫描提供者注解,生成服务元信息 - service ->ProviderSerivce: 业务执行具体类,负责如消费处理,列表注册 。 - nettyServer -> NettyServerStarter: 开启服务器 - nettyServer -> ProviderHandler: 服务器处理器,用于处理消费请求,调用服务类执行具体方法 - **easy_rpc_service: 注册中心** - nettyServer -> NettyServiceStater: 注册中心 服务器启动类 - nettyServer -> NettyHandlerinit: 服务器处理器,负责服务列表保存,拉取。 - **easy_rpc_domain: 公共模块** - bean: 公共使用的对象,如服务元数据信息,心跳,请求载体 - annotation: 公共使用的注解。 - enumeration:公共使用的枚举类 - **easy_rpc_govern: 服务治理模块** - config 包:包含一个rpc框架的配置类 - context 包:关于rpc 过程调用的上下文和处理管道 - filter 包:包含rpc请求和响应的过滤处理 - fuse 包: 包含rpc 熔断 处理 - interceptor: 包含 rpc 请求和响应拦截处理 - limit 包:包含rpc 调用限流处理 - loadBalancer 包:包含服务负载均衡处理 - route 包:包含服务选择路由处理 - utils 包:一个spring的工具类 - thread 包:包含rpc线程池 - easy_rpc_protocol: 协议模块 - 主要关于编码与解码,调用协议选择(待完善) ## RPC框架开发整理 ### 主要涉及的功能实现 - 提供者: - 实现向注册中心提交服务列表 - 实现向消费者响应服务消费 - 实现向注册中心保持在线状态-心跳检测 - 消费者 - 实现向注册中心获取服务列表并本地缓存 - 实现向服务提供者提交消费请求 - 实现熔断器,对不可用服务进行熔断 - 实现服务限流 - 实现服务路由 - 注册中心 - 实现保存服务提供者的服务列表提交 - 实现消费者获取服务列表的请求 - 定期检查 服务列表是否过期,进行服务剔除 - 服务剔除后主动与消费者提供服务列表更新 - 在心跳的基础上检查服务列表是否上传