43 Star 114 Fork 38

eric_1989 / jnet

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Jnet设计文档

[TOC]

背景说明

JDK7中提供了对网络IO的新的API也就是AIO。其异步特性相对于NIO来说使得编程更加容易,API也更容易理解。但要构筑一个完善的网络IO层仍然需要花费很多的心思和实践。为了简化基于AIO的Java网络IO编程,设计并构建了Jnet框架。

Jnet框架是Java AIO接口体系中一层薄封装,仅进一步降低其编程复杂性,不提供额外的抽象。

托管地址

开源中国:https://gitee.com/eric_ds/jnet

总体架构

Jnet中包含几个重要的模块,如下图所示: mark

简要介绍下几个模块的作用:

  • 读完成器:一个读完成器实例与一个通道绑定。每当通道上数据读取完毕后读完成器就会将本次读取的数据一起送到通过上下文中进行处理
  • 通道上下文:一个通道上下文实例封装了一个SocketChannel。通道完成器对外提供了写出接口,并且可以处理读完成器送入的数据。其内部维持了一个处理器构成的有序链表。读完成器送入的数据在该有序链表上被处理。
  • 写完成器:一个写完成器与一个通道绑定。通道上下文提供的写接口是通过代理了写完成器的输出能力提供的。

在Socket上的数据流动顺序为读完成器-->通道上下文-->写完成器,这是一个单向过程。在整体的数据处理过程中,会需要申请大量的数组或者ByteBuffer用于处理或准备数据。为了降低频繁新建ByteBuffer或者数组带来的GC开销,采用内存分配池的方案来提升性能。

概要设计

模块设计

在总体架构中的三个模块互相之间配合完成了数据的接收,处理,响应值写出等工作。下面会逐一介绍每一个模块的基本设计思路。而在这个之前,首先需要介绍下整个框架体系中的基础数据结构IoBuffer。

IoBuffer

JDK原生自带的ByteBuffer并不能很好的满足框架对Buffer的需求。因此框架设计了一个扩展版本的IoBuffer。IoBuffer与JDK原生的ByteBuffer对比主要有两个不同的特性

  • 独立的读写指针:不像ByteBuffer那样需要执行Flip后才能由写转变为读。IoBuffer具备两个互相独立的读写指针。指针的初始位置均为0.
  • 自动扩容:在执行写入时,如果空间不足,无法完成本次写入,则IoBuffer会自动扩容以容纳需要写入的数据。且扩容本身对外完全无感知,扩容后,所有参数均与原来一致。
  • 不使用时执行free方法:与ByteBuffer在不使用时不关心,交给GC处理不同。IoBuffer在使用完毕不再使用时,需要调用IoBuffer.free()完成释放,否则可能会出现内存泄漏的问题。

读完成器

读完成器接口是继承自JDK的java.nio.channels.CompletionHandler<V,A>接口,只是简单的添加了一个无参的start方法供外部进行首次启动。

读完成器对接口中的方法void completed(V result, A attachment)的实现也很简单。从通道中读取完毕数据,将本次读取的字节数累加到本次读取使用的IoBuffer中。然后将该IoBuffer提供给通道上下文对象进行数据处理。

当出现数据处理异常,或者读取通道数据异常时,会触发JDK的failed(Throwable, A)方法,此时首先将IoBuffer执行free方法进行释放。然后关闭SocketChannel即可。

通道上下文

通道上下文与SocketChannel实例对应,提供了数据处理接口和数据写出接口。通道上下文内部存储着一个由数据处理器构成的有序链表。数据处理器是一个接口,定义如下

public interface DataProcessor<T>
{
 /**
  * 通道初始化时被调用
  * 
  * @param channelContext
  */
 void bind(ChannelContext channelContext);
    
 /**
  * 处理由上一个Invoker传递过来的数据
  * 
  * @param data
  * @param next
  * @throws Throwable
  */
 void process(T data, ProcessorInvoker next) throws Throwable;
}

由数据处理接口传入的IoBuffer参数,会调用数据处理器的有序链表进行处理。由于读处理器需要等待方法返回后才能继续在通道上监听数据,可知第一个数据处理器是工作在单线程环境下的。还有一点需要注意,读处理器传入的IoBuffer参数在方法返回后仍然继续服务于通道数据监听,因此数据处理器进行业务处理器时需要拷贝一份数据出来或者重新申请一个IoBuffer实例,而不能使用入参的IoBuffer实例。

通道上下文的写出接口实际是代理了写完成器的写出接口。

写完成器

写完成器也继承自JDK的java.nio.channels.CompletionHandler<V,A>接口,并且提供了写出接口。定义如下

/**
  * 提供数据供写出<br/>
  * 如果当前写完成器已经处于停止状态,则抛出非法状态异常
  * 
  * @param buffer
  * @throws IllegalStateException
  */
 void offer(IoBuffer buffer) throws IllegalStateException;

该接口工作于多线程环境下。入参的IoBuffer不能另做他用,即使是在方法返回之后。当入参的IoBuffer最终写出完毕后,写完成器会负责调用其free方法进行释放。写完成器内部使用一个队列存储推入的待写出数据。当写完成器的Complete方法被IO线程调用时并且当前的数据已经写出完毕,则会从队列中取出下一个IoBuffer进行写出。

由于一个通道实例只绑定了一个写完成器,因此写完成器的Complete方法是工作于单线程环境,与工作于多线程环境的offer方法一起,实际上构成了一个多生产者单消费者情形。因此内部的存储队列可以采用MPSCqueue。

写完成器是有状态的。其状态机如下

mark

初始情况下,写完成器处于空闲状态。当多线程调用offer方法时,其内部是通过状态的CAS操作,来确保最终只有一个线程可以在通道上执行写出操作。offer方法的流程如下

mark

弃权操作是一个比较负责的二次确认子流程,流程如下

mark

当通道上的数据写出完毕时,写完成器的Complete方法会被IO线程调用,该方法的具体内容主要分为三部分

  • 将当前IoBuffer的内容写出完全
  • 从队列中取得下一个IoBuffer执行写出
  • 队列为空时执行弃权操作

服务端设计

有了三个基本模块后,构建服务端就很简单了。在通道AsynchronousServerSocketChannel上执行方法accept(A, CompletionHandler<AsynchronousSocketChannel, ? super A>)来获得建立起链接的客户端链接。得到SocketChannel实例后,构建起三个模块实例,就可以在这个通道上提供服务了。这里的难点主要在于如何让用户可以更容易的切入构建的环节。由于数据处理器必须由使用者传入,其实现本身不要求线程安全,却也不承诺运行于单线程环境中,所以数据处理器的实例化动作与三大模块的实例化动作是有相关性的。

框架在这里应该提供一定的默认实现供用户选择,但用户仍然应该具备最大的自主权,必要时可以自己实现accept方法中的CompleteHandler实例来控制三大模块的初始化。

客户端设计

客户端设计与服务端设计涉及的细节基本相同,这里不再赘述。唯一一个不同点在于,客户端提供的写出能力中,需要关注是否提供断线后重连这一功能。该功能并不在接口要求中,而是不同的子类实现需要关注的问题。

背压模式

设计缘由

JDK的AIO中,每一个监听端口对应的通道可以绑定一个线程池或者使用默认线程池。链接到该端口的通道会使用监听端口对应通道的线程池。这也就意味着,在一个监听端口上,链接,读取,写出,均只能使用一个线程池。考虑这样的场景:

读完成器不断读取数据并且向业务处理器发送,业务处理器处理后,将响应结果发送至写完成器等待写出,写完成器待发送队列为有限容量队列;当读完成器读取数据发送数据速度快于响应数据的发出,业务处理器因为无法推送数据到写完成器而不断重试,读完成器则等待业务处理器的方法返回,导致当前线程资源被占用。当同一时间这种情况大于线程池线程数量时,就会耗光所有的线程资源。由于没有线程资源供写完成器被调用,导致业务处理器一直无法完成推送数据到写完成器的待发送队列。系统出现了假死情况:程序在运作,没有死锁,CPU100%,无法对业务响应结果数据。

产生上面的问题,在于没有背压机制,读完成器无法在系统到达处理瓶颈时停止读取以减轻下游的处理压力。最终导致整个IO线程池饥饿进而假死。

实现方式

增加背压机制需要整个链路的协同。具体如下

  • 写完成器接收待发送数据时,如果到达容量瓶颈,返回提示信息(比如接口修改返回值为布尔变量,当遭遇瓶颈返回false标识本次推送失败)。
  • 业务处理器可以传递写完成器的推送结果直至最上游,也就是传递至读完成器。
  • 读完成器在接收到写完成器的容量瓶颈信号时,停止数据读取工作。
  • 当写完成器有新的空间可以容纳数据时,发送信号使得读完成器从之前中断的地方恢复。

读完成器工作在单线程环境中,其自我中断并且保存现场很容易。实现上述协同的难点主要有:

  1. 以什么方式进行投递失败的数据重送
  2. 在什么时机进行读完成器的恢复工作

对于问题1,有两种可能性:

  1. 当前线程为IO线程,遭遇下游的压力反馈时,不再继续发送新的数据,并且向上级传递压力反馈。由最末尾也就是最靠近写完成器的业务处理器将投递失败的数据放入背压线程池中进行重投递工作。
  2. 当前线程为非IO线程时,此时既可以将数据投递到背压线程池中进行重投递工作,也可以在当前线程以一定策略重试直到成功或者异常而终止。

对于问题2也有不同可能性:

  1. 写完成器数据发送完成后,执行恢复动作。
  2. 背压线程池重投递成功后,执行恢复动作。

方式1,写完成器本身不存储读完成器的信息,因此每次发送完成后都会执行恢复动作。这就要求在读完成器内部具备一个标志位以避免在不需要的时候执行恢复动作。考虑一种极端情况,压力反馈的传递速度较慢,此时写完成器已经完成了数据写出并执行恢复动作。而由于读完成器尚未收到压力反馈,未设置标志位,就会导致写完成器错过此次的恢复机会。该错过可能会导致后续再也没有机会彼此唤醒。所以方式1实际上是不可行的

方式2,如果一个通道存在多个线程执行重投递任务,会导致读完成器恢复动作的并发竞争。方式2和方式1一样,也会遇到重投递任务完成早于压力反馈的传递的情况,因此无法通过标志位的设定来避免并发竞争。这样一来,在背压线程池中不能存在一个通道的多个重投递任务。

综合考虑问题1和问题2的情况以及解决方式最终会得到如下的背压实现要求:

  • 读完成器收到下游压力反馈时,停止读取,不再注册读取事件。
  • 当前线程为IO线程时,收到下游的压力反馈,不再继续发送新数据,并且向上级传递压力反馈。
  • 当前线程为非IO线程时,出现压力反馈,只能在当前线程内自行重试投递直到成功或者异常情况。
  • 背压线程池内一个通道只能有一个重试任务。

上面的实现要求并未明确何时创建背压任务。基于4个要求,明确背压任务的创建时机为当前线程为IO线程时,最接近触发背压反馈的业务处理器执行背压任务创建;业务处理器顺序A-->B,B触发了背压反馈,A创建背压任务

空文件

简介

全新的jnet Jnet框架是Java AIO接口体系中一层薄封装,仅进一步降低其编程复杂性,不提供额外的抽象。 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/eric_ds/jnet.git
git@gitee.com:eric_ds/jnet.git
eric_ds
jnet
jnet
master

搜索帮助