# simple-rpc **Repository Path**: feiyi_codes/simple-rpc ## Basic Information - **Project Name**: simple-rpc - **Description**: 基于netty + zookeeper 实现的RPC远程远程调用基础框架 - **Primary Language**: Unknown - **License**: Unlicense - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-02-06 - **Last Updated**: 2025-02-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: Netty, rpc, zookeeper, SpringBoot ## README ## 基于Netty实现简易RPC框架 #### 模块说明 rpc-client-spring-boot-starter 消息消费方引用(客户端) rpc-server-spring-boot-starter 消息服务方引用(服务器) rpc-api-common 公共模块,统一的消息请求及响应格式 rpc-simple-example 使用示例 rpc-customer 消息消费方 rpc-provider 消息生产者 #### 使用说明 1. 服务端引入以下依赖 ``` com.feiyi rpc-provider-api ${project.version} ``` 需要对外提供服务的接口实现了上加入@RpcService注解,这样服务端自动时会自动将服务接口注册到ZK服务器上 ``` @RpcService public class UserInfoServiceImpl implements IUserInfoService { private static final Logger logger = LoggerFactory.getLogger(UserInfoServiceImpl.class); @Override public List getUserInfoList() { return Collections.singletonList(new UserInfo(ThreadLocalRandom.current().nextInt(),"非邑",39)); } @Override public void addUserInfo(UserInfo userInfo) { logger.info("接收用户数据:{}-{}-{}",userInfo.getId(),userInfo.getUsername(),userInfo.getAge()); } } ``` 2. 客户端使用引入以下依赖 ``` com.feiyi rpc-client-spring-boot-starter ${rpc-client.version} com.feiyi rpc-provider-api 0.0.1-SNAPSHOT ``` 需要调用服务端的的接口在具体的实例上加入@RpcServiceClient注解,支持JSON、Kryo、Protostuff三种序列化方式 ``` @RestController @RequestMapping public class UserController { @RpcServiceClient(serializeType= SerializeTypeEnum.Protostuff) IUserInfoService userInfoService; @GetMapping("/list") public ResponseEntity> getUserList(){ return ResponseEntity.ok(userInfoService.getUserInfoList()); } @PostMapping("/add") public ResponseEntity add(@RequestBody UserInfo userInfo){ userInfoService.addUserInfo(userInfo); return ResponseEntity.ok("成功"); } } ``` #### 知识点 ##### 服务的注册与发现 ###### 服务的注册 将需要对外提供服务能力的接口,注册到zk上供消费者识别并消息。服务启动时通过获取有@RpcService注解的服务 在启动netty服务时,注册到ZK服务器,代码片段如下: ``` // 服务的检索 public RpcNettyServer rpcNettyServer() { RpcNettyServer rpcNettyServer = new RpcNettyServer(rpcProperties,applicationContext.getBean(ZookeeperService.class)); Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcService.class); Map serviceBean=new HashMap<>(beansWithAnnotation.size()); for (Object object : beansWithAnnotation.values()) { Class clazz = object.getClass(); RpcService annotation = clazz.getAnnotation(RpcService.class); String serverName = annotation.serverName(); String version = annotation.version(); if (StringUtils.hasLength(serverName)) { String key=serverName.concat("-").concat(version); if (serviceBean.containsKey(key)) { throw new RuntimeException("服务名已存在:".concat(key)); } serviceBean.put(key, object); }else { Class[] interfaces = clazz.getInterfaces(); for (Class anInterface : interfaces) { String key = anInterface.getSimpleName().concat("-").concat(version); if (serviceBean.containsKey(key)) { throw new RuntimeException("服务名已存在:".concat(key)); } serviceBean.put(key, object); } } } logger.info("服务列表:{}", serviceBean); rpcNettyServer.start(serviceBean); return rpcNettyServer; } //服务的注册 for (String key : serversMap.keySet()) { zk.register(key, rpcProperties.getRpcHost() + ":" + rpcProperties.getRpcPort()); logger.info("注册服务到ZK:{} - 地址:{}", key, rpcProperties.getRpcHost() + ":" + rpcProperties.getRpcPort()); } ``` ###### 服务的注册 客户端通过ZK-client 获取相应的服务接口。代码片段如下 ``` //对点点调用,不经过ZK if (StringUtils.hasLength(host) && port > 0) { f = clientBoot.connect(host, port).sync(); } else { String rpcServer = new String(zookeeperClientService.discover(requestModel.getClassName())); String[] serviceAddress = rpcServer.split(":"); f = clientBoot.connect(serviceAddress[0], Integer.parseInt(serviceAddress[1])).sync(); } ``` ##### 序列化与反序列化 支持JSON、Kryo、protobuf多种序列化方式,应对多种通讯场景 | 方式 | 描述 | |----------------------|---------------------------------------------------------------------------------------------------------| | Java内置 Serialization | 简单易用实现Serialization接口即可,序列化的数据包含类的元数据,导致数据量较大
性能不高,序列化及反序列化速度慢
兼容性差,类结构变化导致反序列化失败 | | Json序列化 | 常见的三方库fastjson、gson
可读性强,支持前后兼容
性能较好,但是不如二进制高效,这里的高效主要是指数据体积、传输效率等 | | 谷歌protobuf | 二进制序列化,高效的二进制格式需要定义.proto文件生成相应的java类
性能优越,支持前后兼容 | | Kryo | 二进制序列化,与谷歌protobuf类似,不需要定义文件,手动注册类,使用方便
依赖字节码生成,号称是java中最快的序列化工具
对于数据结构变化支持不是太友好,也就是不太支持前后兼容 | 客户端通过@RpcServiceClient注解的serializeType()指定序列化方式,默认为JSON ``` public class UserController { @RpcServiceClient(serializeType= SerializeTypeEnum.Protostuff) IUserInfoService userInfoService; ``` 服务端通过接收到序列化类型动态添加不同的编码器和解码器 ``` logger.info("原始消息:{}",msg); ByteBuf in = (ByteBuf) msg; byte serializeTypeByte = in.readByte(); logger.info("序列化类型:{}",serializeTypeByte); ChannelPipeline p = ctx.pipeline(); switch (serializeTypeByte) { case 1: p.addLast(new RpcJsonDecode(RequestModel.class)); p.addLast(new RpcJsonEncode(ResponseModel.class)); break; case 2: p.addLast(new RpcKryoDecode(RequestModel.class)); p.addLast(new RpcKryoEncode(ResponseModel.class)); break; case 3: p.addLast(new RpcProtostuffDecode(RequestModel.class)); p.addLast(new RpcProtostuffEncode(ResponseModel.class)); break; default: logger.warn("未知的序列化类型: {}", serializeTypeByte); p.addLast(new SimpleServerHandler()); break; } ``` ##### 动态代理 在客户端调用远程服务接口时,客户端只有服务端提供的服务接口并没有具体的实现,为了能够在实际使用 时能够引入一个具体的对象实例,则需要通过动态代理的方式来构建一个代理对象。通过实现BeanPostProcessor 接口,对还有@RpcServiceClient 注解的对象创建一个代理实例,代码片段如下 ``` Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { RpcServiceClient rpcServiceClient = AnnotationUtils.getAnnotation(field, RpcServiceClient.class); if (rpcServiceClient != null) { Object proxy = Proxy.newProxyInstance( field.getType().getClassLoader(), new Class[]{field.getType()}, new RpcInvocationHandler(bean,zookeeperClientService,rpcServiceClient.host(), rpcServiceClient.port(), StringUtils.hasLength(rpcServiceClient.serverName()) ? rpcServiceClient.serverName() : field.getType().getSimpleName(), rpcServiceClient.version(),rpcServiceClient.serializeType())); field.setAccessible(true); //将代理对象注入到需要实例化的对象比如UserController try { field.set(bean, proxy); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } } return bean; ``` ##### 反射 在服务端收到客户端发起的调用请求时,通过对请求数据的解析,得到具体的需要调用的实现类className, 服务端通过className 进行反射得到具体的实现类,然后对外提供相应的服务 ``` public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("收到数据请求:{}",msg); RequestModel requestModel = (RequestModel) msg; ResponseModel responseModel = new ResponseModel(); Object serviceBean = serversMap.get(requestModel.getClassName()); if (serviceBean == null) { logger.error("服务不存:{}", requestModel.getClassName()); responseModel.setCode(ResponseCodeEnum.NOT_FOUND.getCode()); responseModel.setError_msg(ResponseCodeEnum.NOT_FOUND.getMessage()); }else { Class serviceClass = serviceBean.getClass(); String methodName = requestModel.getMethodName(); Class[] parameterTypes = requestModel.getParameterTypes(); Object[] parameters = requestModel.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); Object data =method.invoke(serviceBean, analyzeParameters(parameterTypes,parameters)); responseModel.setCode(ResponseCodeEnum.SUCCESS.getCode()); responseModel.setRequestId(requestModel.getRequestId()); responseModel.setData(data); } logger.info("响应数据:{}-{}",responseModel.getCode(),responseModel.getData()); // 请求数据返回给客户端 ctx.writeAndFlush(responseModel); } ``` ##### 方法的参数类型及参数值解析 通过反射调用具体的方法时,方法的参数类型和参数值是需要对应的,对于不同的序列化方式需要不同的解码 才能正确的对应参数类型及参数值,对于JSON的序列化需要特殊处理 ``` Object data =method.invoke(serviceBean, analyzeParameters(parameterTypes,parameters)); private Object[] analyzeParameters(Class[] parameterTypes,Object[] parameters){ if (parameters==null || parameters.length==0){ return parameters; }else{ Object[] new_parameters = new Object[parameters.length]; for(int i=0;i