# nnzi-rpc **Repository Path**: wlby/nnzi-rpc ## Basic Information - **Project Name**: nnzi-rpc - **Description**: 类似于 Dubbo 实现的 RPC 框架,注释详细 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2021-09-14 - **Last Updated**: 2021-10-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一、Dubbo 原理 ![在这里插入图片描述](https://img-blog.csdnimg.cn/02f35d227d1d4a79af2dad211d9695f2.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_20,color_FFFFFF,t_70,g_se,x_16) **一次 RPC 调用的流程:** + 服务消费者方(client)一本地调用的形式调用服务。 + Client stub,类似于一个代理对象,将调用的方法具体信息(包括调用的类,方法,参数等)封装为一个消息体,通过注册中心找到服务地址,然后将该消息发送到服务端。 + Server Stub 接受到消息之后,进行解码后传送给服务端,Server 端通过具体信息进行反射调用具体方法,得到结果之后,发送给Server Stub。 + Server Stub 再将结果进行序列化处理之后,通过网络再传输给客户端。 + Client Stub 再对消息进行反序列化,传递给 Client 获得结果。 ## 1. 整体架构 ![在这里插入图片描述](https://img-blog.csdnimg.cn/ca47d47798554ba3ace06004a5b1f889.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_20,color_FFFFFF,t_70,g_se,x_16) Dubbo 整体分为这十层架构,以下对每一层做一个简单解释: + `服务接口层(Service)`:该层是与实际业务逻辑相关的,根据服务提供方和服务消费方的业务设计对应的接口和实现。 + `配置层(Config)`:对外配置接口,以ServiceConfig和ReferenceConfig为中心,可以直接new配置类,也可以通过spring解析配置生成配置类。 + `服务代理层(Proxy)`:服务接口透明代理,生成服务的客户端和服务器端的 Stub,以ServiceProxy为中心,扩展接口为ProxyFactory。 + `服务注册层(Registry)`:封装服务地址的注册与发现,以服务URL为中心,扩展接口为RegistryFactory、Registry和RegistryService。可能没有服务注册中心,此时服务提供方直接暴露服务。 + `集群层(Cluster)`:封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoker为中心,扩展接口为Cluster、Directory、Router和LoadBalance。将多个服务提供方组合为一个服务提供方,实现对服务消费方来透明,只需要与一个服务提供方进行交互。 + `监控层(Monitor)`:RPC调用次数和调用时间监控,以Statistics为中心,扩展接口为MonitorFactory、Monitor和MonitorService。 + `远程调用层(Protocol)`:封将RPC调用,以Invocation和Result为中心,扩展接口为Protocol、Invoker和Exporter。Protocol是服务域,它是Invoker暴露和引用的主功能入口,它负责Invoker的生命周期管理。Invoker是实体域,它是Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。 + `信息交换层(Exchange)`:封装请求响应模式,同步转异步,以Request和Response为中心,扩展接口为Exchanger、ExchangeChannel、ExchangeClient和ExchangeServer。 + `网络传输层(Transport)`:抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec。 + `数据序列化层(Serialize)`:可复用的一些工具,扩展接口为Serialization、 ObjectInput、ObjectOutput和ThreadPool。 ## 2. 服务提供方的暴露服务 当服务启动时,Dubbo 解析完配置文件(或注解),将有用的信息保存了之后,就会将需要注册到注册中心或者直接暴露的服务给暴露出来。 + 会先进入到export方法暴露方法 ```java public synchronized void export() { // 获取并保存一些信息 // ,,, //是否需要延迟暴露 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // 此处就是重点的暴露方法 doExport(); } // 暴露结束方法 exported(); } ``` + 然后进入 doExport()方法将接口地址进行暴露 ```java protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } // 真正对地址进行暴露 doExportUrls(); } ``` + 获取注册中心地址,以及配置的方法,也就是 SpringBoot 添加 @DubboService 的方法。 ```java private void doExportUrls() { // . . . // 从配置文件中获取注册中心的地址 List registryURLs = ConfigValidationUtils.loadRegistries(this, true); // 再从协议配置中取出所有配置注册到注册中心。 for (ProtocolConfig protocolConfig : protocols) { // . . . // 将配置的内容注册到注册中心 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } ``` + 真正获取执行器并且暴露。 ```java private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) { // 对很多值,参数的获取设置与判断 // . . . for (URL registryURL : registryURLs) { // . . . // 真正通过 PROXY_FACTORY 代理工厂获取 Invoker 执行器,包括了该服务的地址,类,方法等种种信息 Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // 再通过 wrapperInvoker 进行包装,类似于使用cglib代理来消除反射的调用 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); /** * 再通过PROTOCOL协议来具体暴露 * 该暴露主要包括两方面 * (1)Dubbo 的暴露:创建 Netty Server 来监听请求 * (2)将服务注册到注册中心 */ Exporter exporter = PROTOCOL.export(wrapperInvoker); // 加入到暴露池中 exporters.add(exporter); } // . . . } ``` **总结:** 服务暴露的过程简单来说就是先通过配置文件或者注解(@DubboService)将需要暴露的服务以及注册中心的信息获取保存到 ServiceConfig,在IOC 容器刷新完成后,通过 export 函数将服务封装为 Invoker 执行器再进行包装为 DelegateProviderMetaDataInvoker ,通过具体使用的协议进行暴露,比如 Dubbo 就是开启 Netty Server 监听请求,并把服务注册到注册中心。 ## 3. 服务消费者方的引用服务 + 对于服务的引用,先定义对应的接口对象(面向接口编程),然后引入的时候,通过一个 @DubboReference 注解自动注入。 ```java @DubboReference private OrderService orderService; ``` + 而该注解就会对应一个 ReferenceBean,该 bean 继承了 FactoryBean 接口,说明可以通过 getBean,CreateBean 的方式来创建获取。 ```java public class ReferenceBean extends ReferenceConfig implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean { ``` + 在getObject() 方法中 通过 get来获取对象。 ```java public synchronized T get() { if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); } if (ref == null) { // 如果容器中没有通过该方法创建 init(); } return ref; } ``` + 在 init 方法中,保存一大堆配置信息之后,通过信息来创建需要的代理对象 ```java public synchronized void init() { // . . . //通过map 中存储的各种信息,来创建该代理对象 ref = createProxy(map); // . . . } ``` + 创建生成代理对象引用远程的执行器。 ```java private T createProxy(Map map) { // . . . if (urls.size() == 1) { /** * 通过 REF_PROTOCOL 对应的协议来引用远程的服务执行器 * interfaceClass, urls.get(0) 分别保存了对应的类信息和注册中心的地址 * 与服务暴露类似,主要做两件事 * (1) Dubbo 创建 Netty Client 可以与 Server 端建立连接 * (2)通过注册中心订阅具体服务提供方地址列表 */ invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { // . . . } // . . . } ``` **总结:** 使用 @DubboReference 注解自动远程服务的代理对象,该注解对应了一个 ReferenceBean 的类,实现了 FactoryBean 接口,可以通过 getBean() 方法获取对象,实际调用中,通过协议生成代理对象,开启 Netty Client 进行网络通信,并订阅注册中心的该服务具体地址。 ## 4. 调用服务 ![在这里插入图片描述](https://img-blog.csdnimg.cn/5b5ab5a1a6b248d58a2d55e776772eb3.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_20,color_FFFFFF,t_70,g_se,x_16) 可以看到 invoker 通过层层封装,包括拦截器拦截记录一些信息,loadBalance 的多个微服务的负载均衡,再通过 Invoker.invoke 使用 Netty 的 client 将调用信息封装为一个 Invocation 来进行网络通信的调用。对应到接受端也会用同样的协议机制来进行反序列化得到具体调用的类以及方法进行调用。 ## 5. SPI 机制 SPI全称为(Service Provider Interface)是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过SPI机制为我们的程序提供拓展功能。 不过,Dubbo 并未使用Java原生的SPI机制,而是对其进行了增强,使其能够更好的满足需求。 ### 5.1 JDK 的 SPI + 定义一个 Person 接口,并用两个类实现它 ```java public interface Person { void hello(); } public class Student implements Person { @Override public void hello() { System.out.println("Student"); } } public class Teacher implements Person{ @Override public void hello() { System.out.println("Teacher"); } } ``` + 创建主测试类, ```java public class SpiTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); scheduledThreadPool.scheduleAtFixedRate(() -> { ServiceLoader loader = ServiceLoader.load(Person.class); Person person = loader.iterator().next(); person.hello(); }, 1, 1, TimeUnit.SECONDS); } } ``` + 可以在 META-INF/services 目录下的文件对 Person 的实现类进行定义。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/16fd26b61c9042b8a541035fd8f630bd.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_13,color_FFFFFF,t_70,g_se,x_16) + 通过配置的方式,就可以为Person 在运行时动态的更换实现类。很神奇吧! ![在这里插入图片描述](https://img-blog.csdnimg.cn/3fa48c0005f547cbbb8c10245a585c55.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_17,color_FFFFFF,t_70,g_se,x_16) #### 实现原理 ```java public static ServiceLoader load(Class service) { // 获取该线程上下文加载器进行加载 ClassLoader cl = Thread.currentThread().getContextClassLoader(); return ServiceLoader.load(service, cl); } ``` ```java private ServiceLoader(Class svc, ClassLoader cl) { service = Objects.requireNonNull(svc, "Service interface cannot be null"); // 如果没有线程上下文加载器,则使用系统类加载器也就是应用类加载器 loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl; acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null; // 重新加载 reload(); } ``` ```java public void reload() { // 将缓存的内容清空 providers.clear(); // 返回一个 LazyIterator lookupIterator = new LazyIterator(service, loader); } ``` 接下来就看看这个返回的 LazyIterator 做了什么。 ```java /** * 定义了寻找类实现的文件 */ private static final String PREFIX = "META-INF/services/"; /** * 遍历实现类 * @return */ private boolean hasNextService() { if (nextName != null) { return true; } // 加载配置文件 if (configs == null) { try { String fullName = PREFIX + service.getName(); if (loader == null) configs = ClassLoader.getSystemResources(fullName); else configs = loader.getResources(fullName); } catch (IOException x) { fail(service, "Error locating configuration files", x); } } // 遍历文件内容 while ((pending == null) || !pending.hasNext()) { if (!configs.hasMoreElements()) { return false; } // 解析文件值 pending = parse(service, configs.nextElement()); } // 获取到实现类的名字 nextName = pending.next(); return true; } ``` ```java /** * 获取下一个服务 * @return */ private S nextService() { if (!hasNextService()) throw new NoSuchElementException(); String cn = nextName; nextName = null; Class c = null; try { c = Class.forName(cn, false, loader); } catch (ClassNotFoundException x) { fail(service, "Provider " + cn + " not found"); } if (!service.isAssignableFrom(c)) { fail(service, "Provider " + cn + " not a subtype"); } try { // 通过反射获取对应的实现类对象 S p = service.cast(c.newInstance()); // 将对象放服务提供者缓存中 providers.put(cn, p); return p; } catch (Throwable x) { fail(service, "Provider " + cn + " could not be instantiated", x); } throw new Error(); // This cannot happen } ``` **总结:** 很明显可以看出,JDK 中的 SPI 机制会将之前获取的缓存清空,并且返回一个新的迭代器,重新通过线程上下文加载器加载所有服务,并且通过反射进行实例化。 缺点也很明显,会加载实例化所有的实现类,不能做到按需加载,浪费空间资源。 ### 5.2 Dubbo 的 SPI Dubbo 的 SPI 对原生的 SPI 进行了优化,可以通过类型得到对应的扩展加载器,然后通过该加载器去获得相应名字的实现类 ```java ServiceRegistry serviceRegistry = ExtensionLoader.getExtensionLoader(ServiceRegistry.class).getExtension("zk"); ``` + getExtensionLoader 方法,获取对应的可扩展类的加载器。 ```java public static ExtensionLoader getExtensionLoader(Class type) { // 传入的类型不能是空 if (type == null) { throw new IllegalArgumentException("Extension type == null"); } // 扩展点必须是接口 if (!type.isInterface()) { throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!"); } // 必须有@SPI注解 if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!"); } // 每个扩展的加载器只会被加载一次,先从缓存卡里面获取 ExtensionLoader loader = (ExtensionLoader) EXTENSION_LOADERS.get(type); if (loader == null) { // 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type)); // 因为上面方法可能返回null 所以这里要重新获取一次 loader = (ExtensionLoader) EXTENSION_LOADERS.get(type); } return loader; } ``` + getExtension 方法获取扩展的类 ```java public T getExtension(String name, boolean wrap) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException("Extension name == null"); } if ("true".equals(name)) { // 默认拓展实现类 return getDefaultExtension(); } // 获取持有目标对象 final Holder holder = getOrCreateHolder(name); Object instance = holder.get(); // 单例模式双检锁 if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { // 创建实例 instance = createExtension(name, wrap); holder.set(instance); } } } return (T) instance; } ``` + 如果不存在该实现类,则通过 createExtension 创建实例 ```java private T createExtension(String name, boolean wrap) { // 加载配置文件所有拓展类,得到配置名-拓展类的map,从map中获取到拓展类 Class clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { // 通过反射创建实例 EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } // 向实例中注入依赖 通过set方法 injectExtension(instance); // . . . // 判断是否实现了Lifecycle调用initialize方法 initExtension(instance); return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); } } ``` #### 自适应扩展 通过 @Adaptive 注解实现,Dubbo中存在很多的扩展类,这些扩展类不可能一开始就全部初始化,那样非常的耗费资源,所以我们应该在使用到该类的时候再进行初始化,也就是懒加载。但是这是比较矛盾的,拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载。 简单来说,就是一个扩展类中可能又存在属性也是扩展类,运行时可以通过传入的URL去动态适配不同的扩展类。 @Adaptive 注解可以标注在类上和方法上: + 标注在类上,表明该类为自定义的适配类,也就是作为其他类的属性时不会生成对应的扩展实现类。 + 标注在方法上,表明需要动态的为该方法创建适配类的实现。会通过传输的参数决定具体使用哪个实现类。 **① 获取对应的扩展实现类** ```java public T getAdaptiveExtension() { // 先从缓存中获取对应的扩展类实现 Object instance = cachedAdaptiveInstance.get(); // 没有的话进行双重加锁创建 if (instance == null) { if (createAdaptiveInstanceError != null) { throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); } } } } return (T) instance; } ``` **② 没有的话进行创建,这样就会得到一个自编译的代理对象。** ```java private T createAdaptiveExtension() { try { // 获取自适应的 Class 通过反射创建,并通过 injectExtension 注入到类中 return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); } } ``` ```java private Class getAdaptiveExtensionClass() { /** * 该方法就是去缓存中获取所有已加载的实现类 * 如果没有就去对应的配置文件中获取所有的实现类全类名,并加载为CLass * 并且会将标有 @Adaptive 注解的扩展类赋值给 cachedAdaptiveClass * 就可以直接返回 */ getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } // 否则就去创建自适应扩展 Class return cachedAdaptiveClass = createAdaptiveExtensionClass(); } ``` ```java private Class createAdaptiveExtensionClass() { // 构造自适应扩展的代码 String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); .// 获取自扩展的编译器,默认是javassist org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); // 通过扩展的编译器动态编译代码 return compiler.compile(code, classLoader); } ``` 之后在调用的时候,就会得到这个代理对象,然后根据传入参数,再通过普通SPI机制去选择对应的实现类进行调用。 ### 5.3 Dubbo 的 IOC 和 AOP #### IOC Dubbo 类似于 Spring,也有一个 IOC 容器,就是 ExtensionFactory 类,该类也有 @SPI 注解,说明也可以进行扩展,该类只有一个方法,就是通过SPI机制,获取对应的扩展实现类 ```java @SPI public interface ExtensionFactory { /** * 根据类型名字获取扩展类 * @param type * @param name * @param * @return */ T getExtension(Class type, String name); } ``` 而依赖注入,就是通过反射获得所有方法,再通过set方法进行注入,注入的对象是从 IOC 容器中获取的。 ```java private T injectExtension(T instance) { // 如果没有 IOC 容器则直接返回 if (objectFactory == null) { return instance; } try { // 就是通过反射获取所有方法 for (Method method : instance.getClass().getMethods()) { // 找到对应的Set方法 if (!isSetter(method)) { continue; } if (method.getAnnotation(DisableInject.class) != null) { continue; } Class pt = method.getParameterTypes()[0]; if (ReflectUtils.isPrimitives(pt)) { continue; } try { String property = getSetterProperty(method); Object object = objectFactory.getExtension(pt, property); if (object != null) { // 通过调用Set方法进行注入 method.invoke(instance, object); } } catch (Exception e) { logger.error("Failed to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; } ``` #### AOP AOP 的实现就是通过装饰器的设计模式,将每个包装的类注入下下一层包装的属性中。最后得到一个经过层层包装的代理对象。 ```java if (wrap) { // 包装类排序 List> wrapperClassesList = new ArrayList<>(); if (cachedWrapperClasses != null) { wrapperClassesList.addAll(cachedWrapperClasses); wrapperClassesList.sort(WrapperComparator.COMPARATOR); Collections.reverse(wrapperClassesList); } /** * AOP 的具体实现 * 就是遍历所有的包装类,将每个类通过依赖注入的方式组合注入到包装类中 * 也就是装饰器模式的运用,最后生成一个包装了所有类的代理类 */ if (CollectionUtils.isNotEmpty(wrapperClassesList)) { for (Class wrapperClass : wrapperClassesList) { Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class); if (wrapper == null || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } } } ``` ### 5.3 Dubbo 的 @Activate 注解 该注解就是判断哪些实现类应该被激活,哪些不需要,在某一个类有很多实现类的时候使用,通过该注解的属性进行判断。 ```java @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Activate { /** * 表示某个组匹配的时候激活 * 例如在消费者端还是生产者端 * @return */ String[] group() default {}; /** * 在 URL 中出现某些参数激活该类 * @return */ String[] value() default {}; /** * 表示实现类的顺序 * @return */ int order() default 0; } ``` ## 6. 序列化协议 ### 6.1 编码和解码 对于 Netty 的解码技术,采用模板设计模式实现了一套编码和解码架构,底层解决了 TCP 粘包和拆包的问题; 对于解决粘包和拆包主要有三种方案: + 在数据报的末尾加入换行符或者特殊标识符号,例如 HTTP,FTP 等;(HTTP 首部之后有一个空行表示之后是实体部分) + 在消息头中指定对应数据包的长度; + 将数据报长度固定死,不足的话则通过空格填补; Netty 对于以上三种方法都有具体的实现: + 对于第一种方案,Netty 中有实现方式 LineBasedFrameDecoder 来判断数据包中是否出现了 “\n” 或者 “\r\n”,来对包进行拆分; + 对于在消息中添加长度的编码器。包括 LengthFieldBasedFrameDecoder 和 LengthFieldDecoder ; + 对于直接限定长度的方式,不常用,比较浪费资源,具体实现的编解码器是 FixedLengthFrameDecoder; ### 6.2 序列化 Dubbo 包括的序列化协议如下: ![在这里插入图片描述](https://img-blog.csdnimg.cn/8872ec6c27464ac9bd7b2b39aa1239c4.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_17,color_FFFFFF,t_70,g_se,x_16) 一般不考虑采用 JDK 自带的序列化协议,原因是 + 不支持跨语言的调用; + 相比于其他序列化协议序列化后的字节数组体积大,因为保存了版本号,类型信息等等要素,传输效率低,性能差。 接下来主要简单讲述本项目使用的跨语言的 protostuff 以及 性能比较优秀的 kryo ### 6.1 protostuff #### 6.1.1 protostuff 的使用: + 可以直接使用封装好的 ProtostuffIOUtil 对象调用对于方法。 + 主要包括的组件: + LinkedBuffer:用户缓冲区,用于存储序列化内容。 + Schema:对象的模式结构。 ```java import io.protostuff.LinkedBuffer; import io.protostuff.ProtostuffIOUtil; import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; /** * @Description: * Protostuff 序列化 * @Date 2021/9/11 20:58 * @author: A.iguodala */ public class ProtostuffSerializer implements Serializer { /** * 使用一个缓存用户空间避免每次都重新开辟 */ private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); /** * 序列化方法 * * @param obj 要序列化的对象 * @return */ @Override public byte[] serialize(Object obj) { // 得到该类的 Class 类模板 Class clazz = obj.getClass(); // 通过 Class类模板获取 Schema 对象模式结构 Schema schema = RuntimeSchema.getSchema(clazz); byte[] bytes; try { // 通过该对象,对象的结构以及缓冲区得到序列化数组 bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER); } finally { BUFFER.clear(); } return bytes; } /** * 反序列化方法 * @param bytes 序列化后的字节数组 * @param clazz 目标类 * @param * @return */ @Override public T deserialize(byte[] bytes, Class clazz) { // 通过 Class 类模板获取 Schema 结构 Schema schema = RuntimeSchema.getSchema(clazz); T obj = schema.newMessage(); // 然后通过结构和byte 数组获取对应对象 ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; } } ``` #### 6.1.2 原理 + 对于序列化,主要就在于 bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER); 这一句方法,由于封装很深,直接展示最后一层。 ```java @Override public final void writeTo(Output output, T message) throws IOException { // 对每个字段进行序列化 for (Field f : getFields()) f.writeTo(output, message); } ``` ```java @Override public void writeTo(Output output, T message) throws IOException { // 将字段内容写入到一个 CharSequence 中 CharSequence value = (CharSequence) us.getObject(message, offset); if (value != null) output.writeString(number, value, false); } ``` 形成类似于该结构的序列化方式: ![在这里插入图片描述](https://img-blog.csdnimg.cn/7f7ccacb9475427f94447b6690c38e63.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_20,color_FFFFFF,t_70,g_se,x_16) + 对于反序列化,主要就是通过序列化得到的字段为对象赋值 ```java @Override public void mergeFrom(Input input, T message) throws IOException { us.putObject(message, offset, input.readString()); } ``` ### 6.2 kryo + kryo 主要也是对属性字段的序列化,这样可以减少序列化的长度,以及通过一些可变长度的字段来代替如 int long 固定字节的字段来减少序列化的长度。 + 另外,kryo 不支持对于bean字段的增删以及不能对不包含无参构造的进行序列化和反序列化。 + kryo 是有线程安全问题的,建议使用 ThreadLocal 保存。 ```java /** * @Description: * Kryo 序列化 * @Date 2021/9/12 20:58 * @author: A.iguodala */ @Slf4j public class KryoSerializer implements Serializer { /** * 因为 Kryo 不是线程安全的需要使用 ThreadLocal 保存 * 并且首先要将需要序列化的类型,注册到 Kryo 中 */ private final ThreadLocal kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.register(NrpcResponse.class); kryo.register(NrpcRequest.class); return kryo; }); /** * 序列化方法 * @param obj 要序列化的对象 * @return */ @Override public byte[] serialize(Object obj) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutputStream)) { Kryo kryo = kryoThreadLocal.get(); // Object->byte:将对象序列化为byte数组 kryo.writeObject(output, obj); kryoThreadLocal.remove(); return output.toBytes(); } catch (Exception e) { throw new SerializeException("Serialization failed"); } } /** * 反序列化方法 * @param bytes 序列化后的字节数组 * @param clazz 目标类 * @param * @return */ @Override public T deserialize(byte[] bytes, Class clazz) { try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInputStream)) { Kryo kryo = kryoThreadLocal.get(); // byte->Object:从byte数组中反序列化出对对象 Object o = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return clazz.cast(o); } catch (Exception e) { throw new SerializeException("Deserialization failed"); } } } ``` ## 7. 负载均衡 Dubbo 提供的负载均衡策略主要包括以下 5 类: ![在这里插入图片描述](https://img-blog.csdnimg.cn/d46d2ef8b9c94bddbd178c21034c0913.png) ### 7.1 RandomLoadBalance 是一种比较容易实现的负载均衡策略,也是Dubbo 默认使用的负载均衡策略。 就是通过加权的随机,负载均衡分发请求。 具体实现就是通过每个执行者的权重计算总权重,再在总权重中随机一个数,看落在哪个执行者的权重范围内,则选择哪个执行者进行执行。 ```java public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; /** * 随机选择一个 invoker 执行器进行调用 * @param invokers 可用的执行器列表 * @param url 服务的具体地址 * @param invocation 调用信息 * @param * @return The selected invoker */ @Override protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { //服务执行器列表长度 int length = invokers.size(); // 是否每个权重都相同,先默认是 boolean sameWeight = true; // 每个执行器的权重数组 int[] weights = new int[length]; // 第一个执行者权重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 计算总权重 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); weights[i] = weight; totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 在总权重中随机一个数,看落在谁的权重范围内 if (totalWeight > 0 && !sameWeight) { int offset = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 如果权重都相同则随机选择 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } } ``` ### 7.2 LeastActiveLoadBalance 最小活跃度轮询,也就是优先选择活跃度最小的服务进行调用,活跃度简单来说就是服务调用的次数,通过一个 ConcurrentHashMap存储调用服务的次数,获取最小的调用,如果存在多个最小的则通过上面随机的方式调用。 ```java public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; /** * 优先选择最小活跃度的服务,也就是调用次数最小的 * @param invokers * @param url * @param invocation * @param * @return */ @Override protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // The least active value of all invokers int leastActive = -1; // The number of invokers having the same least active value (leastActive) int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexes = new int[length]; // the weight of every invokers int[] weights = new int[length]; // The sum of the warmup weights of all the least active invokers int totalWeight = 0; // The weight of the first least active invoker int firstWeight = 0; // Every least active invoker has the same weight value? boolean sameWeight = true; // 循环遍历所有执行者,找到最小的执行者,并记录权重 for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); // 从每个 invocation 中获取调用次数,(从一个 ConcurrentHashMap 中) int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; // If it is the first invoker or the active number of the invoker is less than the current least active number if (leastActive == -1 || active < leastActive) { // Reset the active number of the current invoker to the least active number leastActive = active; // Reset the number of least active invokers leastCount = 1; // Put the first least active invoker first in leastIndexes leastIndexes[0] = i; // Reset totalWeight totalWeight = afterWarmup; // Record the weight the first least active invoker firstWeight = afterWarmup; // Each invoke has the same weight (only one invoker here) sameWeight = true; // If current invoker's active value equals with leaseActive, then accumulating. } else if (active == leastActive) { // Record the index of the least active invoker in leastIndexes order leastIndexes[leastCount++] = i; // Accumulate the total weight of the least active invoker totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && afterWarmup != firstWeight) { sameWeight = false; } } } // 如果只有一个最小的 if (leastCount == 1) { // 调用最小的活跃值 return invokers.get(leastIndexes[0]); } // 如果不止一个最小的,则通过权重随机 if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 权重相同则直接随机 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } } ``` ### 7.3 ConsistentHashLoadBalance >一致性Hash这篇文章就讲的很好 > [https://www.jianshu.com/p/528ce5cd7e8f](https://www.jianshu.com/p/528ce5cd7e8f) 一致性Hash 就是通过请求参数来具体定位服务的方式,Dubbo 通过一致性Hash 算法得到具体的服务地址,为了防止资源倾斜,又加入了虚拟节点。 ### 7.4 RoundRobinLoadBalance 加权重的轮询算法,通过权重来模拟实现轮询 ![在这里插入图片描述](https://img-blog.csdnimg.cn/0ec9ee0f178c4fe497a11a0517ecce83.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQS5pZ3VvZGFsYQ==,size_20,color_FFFFFF,t_70,g_se,x_16) 如上图所示,每个服务会维护一个静态的权重,以及不断变化的动态权重。 每次服务会选择动态权重最大的服务,然后将该服务的动态权重剪去总权重,再下次计算动态权重就是通过【原权重】+ 【动态权重】得到,也就是权重高的经过选择之后,权重会变低,而没有被选择的服务权重会慢慢变高,起到加权以及轮询的作用。 核心思想就是这样,源码中有太多更新时间,刷新权重以及初始化等操作,所以看起来比较复杂就不贴了。 ### 7.5 ShortestResponseLoadBalance 最短响应负载均衡,也就是调用负载均衡响应时间最多的服务,如果有多个就通过加权的随机来选择,和 LeastActiveLoadBalance 类似,都是去一个 ConcurrentHashMap 中取值,取得历次响应时间的平均,然后比较。 ## 8 总结 本文大概讲述了 RPC 工作的大概流程,到 Dubbo 的具体架构,再到 Dubbo 微内核是如何实现的,通过SPI 机制对Dubbo 进行扩展,明白以上原理,就可以大概构思出一个 RPC 框架的具体实现。 # 二、实现一个 RPC 框架。 ## 1. 如何实现 远程服务调用(Remote Procedure Call,RPC),就是能像调用本地方法一样调用远程别的服务提供的方法。 其实主要就是解决三个问题: + `如何表示数据` + 数据包括了传递给方法的参数,以及方法执行后的返回值。 + 无论是将参数传递给另外一个进程,还是从另外一个进程中取回执行结果,都涉及到它们应该如何表示。 + `如何传递数据` + 是指两个网络服务节点,如何进行数据的传输,交换数据。 + 一般基于 UDP 或者 TCP 来进行应用层的传输。 + `如何确定方法` + 也就是如何在目标服务找到对应的方法。 + 可能会引入注册中心的组件,或者通过一些协议来寻找。 ## 2. 实现 > 具体代码可以看 > [https://gitee.com/wlby/nnzi-rpc](https://gitee.com/wlby/nnzi-rpc) + `common` 模块 + 对一些共用的模块进行定义,例如扩展的机制,一些工具类常量等。 + `provider` 模块 + 也就是服务提供方的实现,包括对于服务的注册发现以及负载均衡等。 + `remoting` 模块 + 也就是网络传输的模块,肯定要定义网络传输的对象,定义的协议数据帧格式,定义请求响应的传输类。 + 其次就是 Netty 对于网络传输的具体实现,客户端服务端的实现。 + `core` 模块 + 该模块可以进行一个如何使用的定义,包括如何和Spring整合或者利用注解实现等。 ## 3. 性能调优 通过 Jmeter 压测显示,在无复杂业务场景以及不是特别高并发的情况下,Netty 构建的 RPC 框架性能也十分不错,甚至不逊色于 Dubbo,主要得益于 Netty 本身的高性能,但是对于真实场景中,还是需要对优化方法有不断的尝试。 对于 Netty 的调优主要包括三个方面: ### 3.1 Linux 系统参数调优 + 例如,在对于百万并发的情况下,可能会遇到文件句柄数过多的异常,可以通过修改单个进程打开文件的句柄最大值参数来解决。 + 也就是在 /etc/sysctl.conf 配置文件末尾,加入 fs.file-max=1000000。 ### 3.2 TCP 参数调优 对于 TCP 参数的调优主要就是在启动客户端或者服务器的时候,通过给服务端启动的引导器增加参数来进行调整,也就是 .childOption() 方法,配置 NioSocketChannel 对应套接字(处理 IO 请求)的参数,以及 .option() 配置 NioServerSocketChannel 对应套接字(处理连接请求)的参数。 主要常用参数包括以下: ```java /** * 以下两个参数用于控制 TCP 发送和接收缓冲区的大小 * 缓冲区大小一般设置为网络吞吐量达到带宽上限的值 * 即:缓冲区大小 = 网络带宽 × 网络时延 */ public static final ChannelOption SO_SNDBUF = valueOf("SO_SNDBUF"); public static final ChannelOption SO_RCVBUF = valueOf("SO_RCVBUF"); /** * 该值用于表示是否开启 TCP 底层的心跳检测 * 但是 TCP 心跳检测空闲时间长,以及依赖操作系统,难以修改 * 所以可以用 Netty 的 IdleStateHandler 来代替 */ public static final ChannelOption SO_KEEPALIVE = valueOf("SO_KEEPALIVE"); /** * 该值表示是否复用 TIME_WAIT 的端口 * 当 Netty 服务器大量调用其他接口或数据库的时候,可以开启该参数,避免出现大量 TIME_WAIT * 大量 TIME_WAIT 会导致 (1)占用系统资源,一个 4 kb;(2)占用端口资源,导致无端口可用从而无法建立连接 */ public static final ChannelOption SO_REUSEADDR = valueOf("SO_REUSEADDR"); /** * 表示是否启用 Nagle 算法,也就是将小数据包堆积为大数据包一起发送 * 减少糊涂窗口综合征的情况,但是要主要粘包的解决 */ public static final ChannelOption TCP_NODELAY = valueOf("TCP_NODELAY"); /** * 该值控制全连接队列大小 * 如果在建立连接比较频繁,并且连接建立比较耗时的场景 * 可以适当的扩大该值 */ public static final ChannelOption SO_BACKLOG = valueOf("SO_BACKLOG"); ``` 以下给出一个参数调优案例,具体的参数设置还是要以实际业务出发选择: ```java public void start() { ServerBootstrap serverBootstrap = new ServerBootstrap(); // . . . // 服务端套接字对应的全连接队列大小 serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048); // 设置接收和发送缓冲区大小为 128 KB。(以千兆网卡为例,假设时延为 1ms 则缓冲区为 1000MB/s × 1ms) serverBootstrap.childOption(ChannelOption.SO_SNDBUF, 128 * 1024); serverBootstrap.childOption(ChannelOption.SO_RCVBUF, 128 * 1024); // 不使用 TCP 的保活机制 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false); // 复用 TIME_WAIT 套接字 serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true); // 开启 Nagle 算法 serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); // . . . } ``` ### 3.3 Netty 服务器应用层优化 + `线程调优` + Netty 本身只使用 Boss 线程处理连接任务以及 worker 线程组处理 IO 事件,但是当 Worker 线程被 IO 阻塞时,就降低了多路复用的效率。 + 所以可以再加入一个线程池用于具体 IO 任务的操作,这就涉及到了线程池参数的调优。 + `JVM 调优` + -Xms 和 -Xmx:表示堆内存初始的大小以及最大的大小,应该根据业务和内存模型计算出合适的值,并不是越大越好,一般尽量小于 32 GB,因为在 32GB 之内,对象指针保存的是对象的偏移量内存占用较小,而超过 32GB 就会直接保存对象的具体位置,内存占用大。 + -Xss:每个线程的栈大小,默认为 1MB,减小线程栈大小可以容纳更多的线程,但如果过小会产生栈的溢出。 + JVM 参数的调优主要是为了减少 full GC 的产生,堆内存比较大时,一般采用 G1 回收器来进行垃圾回收,因为 CMS 会产生内存碎片,导致年轻代没有足够空间升入老年代而导致 full gc,或者并发回收失败导致 full gc,使用 g1 可能让 full gc 频率显著下降。 + `利用其它组件` + 例如使用 Redis 缓存等等,这些就不再讨论了。