# RPC
**Repository Path**: candydingding/rpc
## Basic Information
- **Project Name**: RPC
- **Description**: 手写RPC框架
视频连接:https://www.imooc.com/video/20219
文档笔记:https://www.yuque.com/lililil-9bxsv/kb/tg9xha
代码地址:https://https://gitee.com/candydingding/rpc
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2022-04-04
- **Last Updated**: 2023-02-24
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 一、RPC概况
## 1.1 RPC简介
RPC:远程过程调用,是分布式系统常见的一种通信方法,从跨进程到跨物理机已经有十几年的历史
优点:可以将远程调用变成像调用本地方法一样简单
## 1.2 系统交互
系统的交互方式有两种:直接交互 和 间接交互(中间件交互),下面介绍一下这两种交互方式
### 1.2.1直接交互

### 1.2.2 间接交互

## 1.3 各种RPC框架的对比

## 1.4 RPC核心原理
### 1.4.1 RPC调用的原理
step1 server把自己的服务注册到registery
step2 client定于redistry,获取自己想知道的服务信息
step3 如果server信息发生了改变,registory会通知订阅者信息发生了改变
step4 client要发起调用,就可以根据从registory中获取的信息直接调用即可

### 1.4.2 Call的调用过程
step1 client调用接口方法(stub中的接口方法)
step2 将调用信息序列号,以便于在网络上传输
step3 client和server之间建立网络连接
step4 server反序列化传输对象
step5 server的stub查找要调用的方法以及参数
step6 server找到实际实现类的对象,通过反射获取执行结果,再次发送到stub上
step7 stub序列化传输对象
step8 server和client建立网络连接
step9 client反序列化传输对象
step10 client获取调用结果

## 1.5 技术栈




# 二、RPC手动实现
## 2.1 创建工程、制定协议、通用工具方法
### 2.1.1 项目类图
一共5大模块

### 2.1.2 项目搭建
step1 新建项目
step2 在项目下,新建6个模块,删除src文件
client:客户端模块
server:服务端模块
codec:编码解码模块
common:通用模块
propto:协议模块
transport:网络通信模块

### 2.1.3 父依赖编写以及导入
```xml
commons-io
commons-io
2.5
org.eclipse.jetty
jetty-servlet
9.4.19.v20190610
com.alibaba
fastjson
1.2.44
junit
junit
4.12
org.projectlombok
lombok
1.18.8
org.slf4j
slf4j-api
1.7.26
ch.qos.logback
logback-classic
1.2.3
```
### 2.1.4 编译的版本控制
```xml
org.apache.maven.plugins
maven-compiler-plugin
3.3
1.8
1.8
```
### 2.1.5 子模块版本统一配置
```xml
1.8
2.5
9.4.19.v20190610
1.2.44
1.18.8
1.7.26
1.2.3
4.12
commons-io
commons-io
${common.version}
org.eclipse.jetty
jetty-servlet
${jetty.version}
com.alibaba
fastjson
${fastjson.version}
junit
junit
${junit.version}
org.projectlombok
lombok
${lombok.version}
org.slf4j
slf4j-api
${slf4j.version}
ch.qos.logback
logback-classic
${logback.version}
org.apache.maven.plugins
maven-compiler-plugin
3.3
${java.version}
${java.version}
```
### 2.1.6 Lombok插件下载
去IDEA中的plugin中查找下载即可

### 2.1.8 设置annotation

## 2.2 协议模块编写
协议模块主要包括 网络结点、请求对象、响应对象、服务类
### 2.2.1 网络通信端点类
```java
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* 表示网络传输的一个端点
*/
@Data
@AllArgsConstructor
public class Peer {
private String host;
private int port;
}
```
### 2.2.2 服务类
```java
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 表示服务
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
private String clazz;
private String method;
private String returnType;
private String[] parameterTypes;
}
```
### 2.2.3 请求类
```java
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* 表示请求的类
*/
@Data
public class Request {
private ServiceDescriptor service;
private Object[] parameters;
}
```
### 2.2.4 响应类
```java
package com.smgeek.gkrpc;
import lombok.Data;
/**
* 表示响应的类
*/
@Data
public class Response {
/**
* 服务返回编码
* 0 成功
* 非0 失败
*/
private int code=0;
/**
* 具体错误的响应信息
*/
private String message="ok";
/**
* 返回数据
*/
private Object data;
}
```
## 2.3 通用模块编写
通用模块只有一个动态代理
### 2.3.1 JDK动态代理类
动态代理
```java
package com.smgeek.gkrpc.common.utils;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
/**
* 反射工具类
*/
public class ReflectionUtils {
/**
* 根据class创建对象
* @param clazz 带创建的对象类
* @param 对象类型
* @return 创建好的对象
*/
public static T newInstance(Class clazz){
try{
return clazz.newInstance();
}catch (Exception e){
throw new IllegalStateException(e);
}
}
/**
* 获取某个clazz的共有方法
* @param clazz 任意类
* @return 当前类的所有方法
*/
public static Method[] getPublicMethods(Class clazz){
//获取这个对象的所有方法
Method[] methods=clazz.getDeclaredMethods();
List pmethods=new ArrayList<>();
for(Method m:methods){
if(Modifier.isPublic(m.getModifiers())){
pmethods.add(m);
}
}
return pmethods.toArray(new Method[0]);
}
/**
* 调用指定对象的指定方法
*
* @param obj 被调用方法的对象
* @param method 被调用的方法
* @param args 方法参数
* @return 返回代理生成的对象
*/
public static Object invoke(Object obj,Method method,Object... args){
try{
return method.invoke(obj,args);
}catch (Exception e){
throw new IllegalStateException(e);
}
}
}
```
### 2.3.2 代理类测试
```java
package com.smgeek.gkrpc.common.utils;
import org.junit.Test;
public class TestClass {
private String a(){
return "a";
}
public String b(){
return "b";
}
protected String c(){
return "c";
}
}
```
```java
package com.smgeek.gkrpc.common.utils;
import org.junit.Test;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ReflectionUtilsTest {
@Test
public void newInstance(){
TestClass t=ReflectionUtils.newInstance(TestClass.class);
assertNotNull(t);
}
@Test
public void getPublicMethods(){
Method[] methods=ReflectionUtils.getPublicMethods(TestClass.class);
assertEquals(1,methods.length);
String mname=methods[0].getName();
assertEquals("b",mname);
}
@Test
public void invoke(){
Method[] methods=ReflectionUtils.getPublicMethods(TestClass.class);
Method b=methods[0];
TestClass t=new TestClass();
Object r = ReflectionUtils.invoke(t, b);
assertEquals("b",r);
}
}
```
## 2.4 序列化模块编写
便于对象在网络上传输,需要序列化和反序列化
### 2.4.1 编码和解码接口
```java
package com.smgeek.gkrpc.codec;
/**
* 序列化 对象转化为byte数组
*/
public interface Encoder {
byte[] encode(Object obj);
}
```
```java
package com.smgeek.gkrpc.codec;
/**
* 反序列化 二进制数组转对象
*/
public interface Decoder {
T decode(byte[] bytes,Class clazz);
}
```
### 2.4.2 pom中引入fastjson
```xml
com.alibaba
fastjson
```
### 2.4.3 序列化、反序列化接口实现
```java
package com.smgeek.gkrpc.codec;
import com.alibaba.fastjson.JSON;
/**
* 基于json的序列化实现
*/
public class JSONEncoder implements Encoder {
@Override
public byte[] encode(Object obj) {
return JSON.toJSONBytes(obj);
}
}
```
```java
package com.smgeek.gkrpc.codec;
import com.alibaba.fastjson.JSON;
/**
* 基于JSON 的反序列化
*/
public class JSONDecoder implements Decoder{
@Override
public T decode(byte[] bytes, Class clazz) {
return JSON.parseObject(bytes,clazz);
}
}
```
### 2.4.4 接口实现的测试
```java
package com.smgeek.gkrpc.codec;
import lombok.Data;
@Data
public class TestBean {
private String name;
private int age;
}
```
```java
package com.smgeek.gkrpc.codec;
import org.junit.Test;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class JSONEncoderTest {
@Test
public void encode(){
Encoder encoder=new JSONEncoder();
TestBean bean=new TestBean();
bean.setName("smgeek");
bean.setAge(18);
byte[] bytes=encoder.encode(bean);
assertNotNull(bytes);
}
}
```
```java
package com.smgeek.gkrpc.codec;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
public class JSONDecoderTest {
@Test
public void decode() {
Encoder encoder=new JSONEncoder();
TestBean bean=new TestBean();
bean.setName("smgeek");
bean.setAge(18);
byte[] bytes=encoder.encode(bean);
Decoder decoder=new JSONDecoder();
TestBean bean2 = decoder.decode(bytes, TestBean.class);
assertEquals(bean.getName(),bean2.getName());
assertEquals(bean.getAge(),bean2.getAge());
}
}
```
## 2.5 网络模块
### 2.5.1 依赖引入
```xml
commons-io
commons-io
org.eclipse.jetty
jetty-servlet
hand.candy
gk-rpc-proto
${project.version}
```
### 2.5.2 客户端网络传输接口
客户端主要作用
- 1 创建连接
- 2 发送数据 并且等待响应
- 3 关闭连接
```java
package com.smgeek.gkrpc.transport;
import com.smgeek.gkrpc.Peer;
import java.io.InputStream;
/**
* 1 创建连接
* 2 发送数据 并且等待响应
* 3 关闭连接
*/
public interface TransportClient {
void connect(Peer peer);
InputStream write(InputStream data);
void close();
}
```
### 2.5.3 服务端网络传输接口
服务端主要作用
- 1 启动 监听端口
- 2 接受请求
- 3 关闭监听
```java
package com.smgeek.gkrpc.transport;
/**
* 1 启动 监听端口
* 2 接受请求
* 3 关闭监听
*/
public interface TransportServer {
void init(int port,RequestHandler handler);
void start();
void stop();
}
```
### 2.5.4 请求处理类接口
```java
package com.smgeek.gkrpc.transport;
import java.io.InputStream;
import java.io.OutputStream;
/**
* 处理网络请求的handler
*/
public interface RequestHandler {
void onRequest(InputStream recive, OutputStream toResp);
}
```
### 2.5.5 接口实现
```java
package com.smgeek.gkrpc.transport;
import com.smgeek.gkrpc.Peer;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
/**
* HTTP 客户端
*/
public class HTTPTransportClient implements TransportClient {
private String url;
@Override
public void connect(Peer peer) {
this.url="http://"+peer.getHost()+":"+peer.getPort();
}
@Override
public InputStream write(InputStream data) {
try {
HttpURLConnection httpConn =(HttpURLConnection)new URL(url).openConnection();
httpConn.setDoOutput(true);
httpConn.setDoInput(true);
httpConn.setRequestMethod("POST");
httpConn.connect();
IOUtils.copy(data,httpConn.getOutputStream());
int resultCode=httpConn.getResponseCode();
if(resultCode==HttpURLConnection.HTTP_OK){
return httpConn.getInputStream();
}else{
return httpConn.getErrorStream();
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public void close() {
}
}
```
```java
package com.smgeek.gkrpc.transport;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* HTTP 服务端
*/
@Slf4j
public class HTTPTransportServer implements TransportServer{
private RequestHandler handler;
private Server server;
@Override
public void init(int port, RequestHandler handler) {
this.handler=handler;
this.server=new Server(port);
//servlet 接收请求
ServletContextHandler ctx=new ServletContextHandler();
server.setHandler(ctx);
ServletHolder holder=new ServletHolder(new ResquestServlet());
ctx.addServlet(holder,"/*");
}
@Override
public void start() {
try {
server.start();
server.join();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
@Override
public void stop() {
try {
server.stop();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
class ResquestServlet extends HttpServlet{
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
log.info("client connect");
InputStream in=req.getInputStream();
OutputStream out=resp.getOutputStream();
if(handler!=null){
handler.onRequest(in,out);
}
out.flush();
}
}
}
```
## 2.6 服务端模块
### 2.6.1 依赖引入
```xml
hand.candy
gk-rpc-proto
1.0-SNAPSHOT
hand.candy
gk-rpc-codec
1.0-SNAPSHOT
hand.candy
gk-rpc-transport
1.0-SNAPSHOT
hand.candy
gk-rpc-common
1.0-SNAPSHOT
commons-io
commons-io
```
### 2.6.2 重写ServiceDescriptor
```java
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;
/**
* 表示服务
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
private String clazz;
private String method;
private String returnType;
private String[] parameterTypes;
public static ServiceDescriptor from(Class clazz, Method method){
ServiceDescriptor sdp=new ServiceDescriptor();
sdp.setClazz(clazz.getName());
sdp.setMethod(method.getName());
sdp.setReturnType(method.getReturnType().getName());
Class[] parameterClasses=method.getParameterTypes();
String[] parameterTypes=new String[parameterClasses.length];
for(int i=0;i void register(Class interfaceClass,T bean ){
serviceManager.register(interfaceClass,bean);
}
public void start(){
this.net.start();;
}
public void stop(){
this.net.stop();
}
private RequestHandler handler= new RequestHandler() {
@Override
public void onRequest(InputStream recive, OutputStream toResp) {
Response resp=new Response();
try {
byte[] inBytes= IOUtils.readFully(recive,recive.available());
Request request=decoder.decode(inBytes,Request.class);
log.info("get request:{}",request);
ServiceInstance sis=serviceManager.lookup(request);
Object ret=serviceInvoker.invoke(sis,request);
resp.setData(ret);
} catch (Exception e) {
log.warn(e.getMessage(),e);
resp.setCode(1);
resp.setMessage("RescServer got error:"+e.getClass().getName()+":"+e.getMessage());
}finally {
try {
byte[] outBytes=encoder.encode(resp);
toResp.write(outBytes);
} catch (IOException e) {
log.warn(e.getMessage(),e);
}
}
}
};
}
```
## 2.7 客户端模块
### 2.7.1 依赖引入
```xml
hand.candy
gk-rpc-proto
${project.version}
hand.candy
gk-rpc-codec
${project.version}
hand.candy
gk-rpc-transport
${project.version}
hand.candy
gk-rpc-common
${project.version}
commons-io
commons-io
```
### 2.7.2 网络连接接口以及实现
```java
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.transport.TransportClient;
import java.util.List;
/**
* @Describe 选择哪个Server来服务
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Date 2022/4/3
*/
public interface TransportSelector {
/**
* 初始化selector
* @param peers 可以链接的server端点信息
* @param count client和server的建立多少连接
* @param clazz client实现的class
*/
void init(List peers, int count,Class extends TransportClient> clazz);
/**
* 选择一个transport 与 server交互
*
* @return 网络client
*/
TransportClient select();
/**
* 释放用完的client
* @param client TransportClient
*/
void release(TransportClient client);
void close();
}
```
```java
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@Slf4j
public class RandomTransportSelector implements TransportSelector {
/**
* 存储连介绍的客户端
*/
private List clients;
public RandomTransportSelector() {
clients=new ArrayList<>();
}
@Override
public synchronized void init(List peers, int count, Class extends TransportClient> clazz) {
count=Math.max(count,1);
for(Peer peer:peers){
for(int i=0;i transportClass= HTTPTransportClient.class;
private Class extends Encoder> encoderClass =JSONEncoder.class;
private Class extends Decoder> decoderClass = JSONDecoder.class;
private Class extends TransportSelector> selectorClass=RandomTransportSelector.class;
private int connectCount=1;
private List severs=Arrays.asList(new Peer("127.0.0.1",3000));
}
```
### 2.7.4 Rpc客户端
```java
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import java.lang.reflect.Proxy;
/**
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Describe
* @Date 2022/4/4
*/
public class RpcClient {
private RpcClientConfig config;
private Encoder encoder;
private Decoder decoder;
private TransportSelector selector;
public RpcClient() {
}
public RpcClient(RpcClientConfig config) {
this.config = config;
this.encoder= ReflectionUtils.newInstance(this.config.getEncoderClass());
this.decoder= ReflectionUtils.newInstance(this.config.getDecoderClass());
this.selector= ReflectionUtils.newInstance(this.config.getSelectorClass());
this.selector.init(this.config.getSevers(),this.config.getConnectCount(),this.config.getTransportClass());
}
public T getProxy(Class clazz){
return (T) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class[]{clazz},
new RemoteInvoker(clazz,encoder,decoder,selector)
);
}
}
```
```java
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.Response;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Describe 调用远程服务的代理类
* @Date 2022/4/4
*/
@Slf4j
public class RemoteInvoker implements InvocationHandler {
private Class clazz;
private Encoder encoder;
private Decoder decoder;
private TransportSelector selector;
public RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder,TransportSelector selector){
this.decoder=decoder;
this.encoder=encoder;
this.selector=selector;
this.clazz=clazz;
};
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request=new Request();
request.setService(ServiceDescriptor.from(clazz,method));
request.setParameters(args);
Response resp = invokeRemote(request);
if(resp==null || resp.getCode()!=0){
throw new IllegalStateException("fail to invoke remotr"+resp);
}
return resp.getData();
}
private Response invokeRemote(Request request) {
TransportClient client=null;
Response resp=null;
try{
client=selector.select();
byte[] outBytes=encoder.encode(request);
InputStream revice = client.write(new ByteArrayInputStream(outBytes));
byte[] inBytes=IOUtils.readFully(revice,revice.available());
resp=decoder.decode(inBytes,Response.class);
}catch (Exception e){
log.warn(e.getMessage(),e);
resp=new Response();
resp.setCode(1);
resp.setMessage("RpcClient got error :"+e.getClass()+":"+e.getMessage());
} finally {
if(client!=null){
selector.release(client);
}
}
return resp;
}
}
```
## 2.8 RPC使用
### 2.8.1 新建模块example

### 2.8.2 引入依赖
```xml
hand.candy
gk-rpc-client
${project.version}
hand.candy
gk-rpc-server
${project.version}
```
### 2.8.3 Client
```java
package single.rpc.example;
import single.rpc.client.RpcClient;
/**
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Describe
* @Date 2022/4/4
*/
public class Client {
public static void main(String[] args) {
RpcClient client = new RpcClient();
CalcService service = client.getProxy(CalcService.class);
int add = service.add(1, 2);
int minus = service.minus(1, 2);
System.out.println(add);
System.out.println(minus);
}
}
```
### 2.8.4 Server
```java
package single.rpc.example;
import single.rpc.server.RpcServer;
import single.rpc.server.RpcServerConfig;
/**
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Describe
* @Date 2022/4/4
*/
public class Server {
public static void main(String[] args) {
RpcServer server = new RpcServer(new RpcServerConfig());
server.register(CalcService.class, new CalcServiceImpl());
server.start();
}
}
```
### 2.8.5 CascService
```java
package single.rpc.example;
/**
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Describe
* @Date 2022/4/4
*/
public interface CalcService {
int add(int a, int b);
int minus(int a, int b);
}
```
### 2.8.6 CalcServiceImpl
```java
package single.rpc.example;
/**
* @Author CandyDingDing
* @Version 1.0
* @Motto 且视他人之疑目如盏盏鬼火,大胆地去走吾之夜路
* @Describe git镜像:https://hub.fastgit.xyz/
* @Date 2022/4/4
*/
public class CalcServiceImpl implements CalcService {
@Override
public int add(int a, int b) {
return a + b;
}
@Override
public int minus(int a, int b) {
return a - b;
}
}
```
### 2.8.7 运行结果
视频课里面的代码有问题:
修正后的代码可以参考[https://https://gitee.com/candydingding/rpc](https://https://gitee.com/candydingding/rpc)
先启动server 再启动client
server结果

client结果

# 三、参考
视频连接:[https://www.imooc.com/video/20219](https://www.imooc.com/video/20219)
文档笔记:[https://www.yuque.com/lililil-9bxsv/kb/tg9xha](https://www.yuque.com/lililil-9bxsv/kb/tg9xha)
代码地址:[https://gitee.com/candydingding/rpc](https://gitee.com/candydingding/rpc)