# disruptor-learn **Repository Path**: muye56/disruptor-learn ## Basic Information - **Project Name**: disruptor-learn - **Description**: disruptor学习 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-01-09 - **Last Updated**: 2021-01-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## Disruptor 基本模型 Event: 事件(Disruptor中传递数据) EventFactory: 事件工厂(创建生产事件单元) 需要实现EventFactory接口 EventHandler: 事件处理器(消费数据单元) 需要实现EventHandler接口 EventProducer: 事件提供者(向事件单元中塞数据) RingBuffer: 基于数组环状的缓存实现,也是创建sequencer与定义WaitStrategy的入口 Disruptor: 持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等引用 ## Disruptor 核心组件 ### Sequence - 通过顺序递增的序号来编号,管理进行交换的数据(事件) - 对数据(事件)的处理过程总是沿着序号逐个递增处理 - 一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Producer/Consumer)的处理进度 - Sequence可以看成是一个AtomicLong用于标识进度 - 还有另一个目的就是防止不同Sequence之间cpu缓存伪共享(Flase Sharing)的问题 ### Sequencer: Sequencer是Disruptor的真正核心 - 此接口有两个实现类 - SingleProducerSequencer 单生产者模型 - MultiProducerSequencer 多生产者模型 - 主要实现生产者和消费者之间快速、正确地传递数据的并发算法 ### Sequence Barrier: 用于保持对RingBuffer的 Main Published Sequence (Producer)和Consumer之间的平衡关系; Sequence Barrier还定义了决定Consumer是否还有可处理的事件的逻辑。 ### WaitStrategy 决定一个消费者将如何等待生产者将Event置入Disruptor! **主要策略有:** - BlockingWaitStrategy: 是最低效的策略,但对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现(用到锁机制) - SleepingWaitStrategy: 性能表现跟`BlockingWaitStrategy`差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景(无锁) - YieldingWaitStrategy: 性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性(无锁机制) ### Event 从生产者到消费者过程中所处理的数据单元,Disruptor中没有代码表示Event,因为它完全是由用户定义的 ### EventProcessor 主要事件循环,处理Disruptor中的Event,拥有消费者的Sequence\ 它有一个实现类是**`BatchEventProcessor`**,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象 ### EventHandler 由用户实现并且代表了Disruptor中的一个消费者的接口,也就是我们的消费者逻辑都需要写在这里 ### WorkProcessor 确保每个sequence只被一个processor消费,在同一个WorkPool中处理多个WorkProcessor不会消费同样的sequence! ## 并行计算-串、并行操作 EventHandlerGroup handleEventsWith(final EventHandler... handlers) - 串行操作: 使用链式调用的方式 - 并行操作: 使用单独调用的方式 ## Disruptor-多生产者模型讲解 依赖Disruptor配置实现多生产者 ``` create(ProducerType producerType,EventFactory factory,int BufferSize, WaitStrategy waitStrategy) ``` - 依赖`WorkerPool`实现多消费者 ``` workerPool(final RingBuffer ringBuffer,final SequenceBarrier sequenceBarrier,final ExceptionHandler exceptionHandler, final WorkHandler... workHandlers) ``` ## 并发知识汇总 - 并发编程核心 - 并发容器类 - 并发编程核心 - Volatile、Atomic、UnSafe - 并发编程核心 - JUC工具类: CountDownLatch、CyclicBarrier、Future、Exchange、ForkJoin、Semaphore - 并发编程核心 - ReentrantLock、Condition、ReadWriteLock、LockSupport ## Disruptor为何底层性能如此高效? - 高性能之道-数据结构-内存预加载机制 - 数据结构层面: 1. 使用环形结构、数组、内存预加载 2. 使用单线程写方式、内存屏障 3. 消除为共享(填充缓存行) 4. 序号栅栏和序号配合使用来消除锁和CAS - 高性能之道-内核-使用单线程写 Disruptor的RingBuffer,之所以可以做到完全无锁,也是因为"单线程写",这是所有"前提的前提",离开了这个前提条件,没有任何技术可以做到完全无锁; Redis、Netty等等高性能技术框架的设计都是这个核心思想 - 高性能之道-系统内存优化-内存屏障 - 要真正的实现无锁,还需要另外一个关键技术:内存屏障。 - 对应到Java语言,就是valotile变量与happens before语义。 - 内存屏障 - Linux的smp_wmb()/smp_rmb() - 系统内核:拿Linux的kfifo来举例: smp_wmb(), 无论是底层的读写都是使用了Linux的smp_wmb `http://github.com/opennetworklinux/linux-3.8.13/blob/master/kernel/kfifo.c` - 高性能之道-系统缓存优化-消除伪共享 缓存系统中是以缓存行 (cache line) 为单位存储的,缓存行是2的整数幂个连续字节,一般为32-256个字节,最常见的缓存行大小是64个字节 当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享 **在CPU缓存架构中,一般会存在三级缓存L1,L2,L3 容量递增,速度也会递减, L1、L2基本都是被单核独享,L3被插槽上的CPU的所有核共享** 可以参考这篇文章`https://zhuanlan.zhihu.com/p/187593289` ```java class LhsPadding { protected long p1; protected long p2; protected long p3; protected long p4; protected long p5; protected long p6; protected long p7; LhsPadding() { } } class Value extends LhsPadding { protected volatile long value; Value() { } } class RhsPadding extends Value { protected long p9; protected long p10; protected long p11; protected long p12; protected long p13; protected long p14; protected long p15; RhsPadding() { } } ``` 使用以上方法让value独占一个缓存行 - 高性能之道-算法优化-序号栅栏机制 - 我们在生产者进行投递Event的时候,总是会使用: `long sequence = ringBuffer.next();` - Disruptor3.0中,序号栅栏SequenceBarrier和序号Sequence搭配使用,协调和管理消费者与生产者的工作节奏,避免了锁和CAS的使用 - 在Disruptor3.0中,各个消费者和生产者持有自己的序号,这些序号的变化必须满足如下基本条件: 1. 消费者序号数值必须小于生产者序号数值 2. 消费者序号数值必须小于其前置 (依赖关系) 消费者的序号数值 3. 生产者序号数值不能大于消费者中最小的序号数值,以避免生产者速度过快,将还未来得及消费的消息覆盖 - 获取下一个可用序号深度分析 - WaitStrategy等待策略深度分析 Disruptor之所以说是高性能,其实也有一部分原因取决于它等待策略的实现: `WaitStrategy`接口 - EventProcessor核心机制深度分析 EventProcessor实现类: BatchEventProcessor代码核心分析 - EventHandler深度分析