代码拉取完成,页面将自动刷新
这是一个rust实现的易于使用的并行计算库,目的就是即开即用的进行各种数据的并行计算处理任务。
始于一个非常简单的现实需求,当我同时有大量异步io任务与大量cpu密集处理任务的时候,把他们都揉在一起是不合适的, 这时候如果有一个即开即用的并行库可供使用就好了,项目还处于比较早期的关键技术调研验证阶段。
1)数据并行切割处理
2)数据处理任务带有优先级,并且高优先级的永远优先执行
3)数据处理结果可以汇总,并且保持正确的顺序
1) 数据的并行切割
2) 切割后的数据以及相关处理共同形成并行任务列表
3) 并行线程池,对并行任务调度执行
4) 并行数据处理结果汇总
5) 一次数据并行提交为一个整体,只要开始处理其中的一部分,就会把这次的处理任务全部做完,然后按序收集结果
目前纯验证Task/Job调度能力,10个并行线程,180%cpu消耗,大概550W/s 的Job调度能力
目前数据并行处理结果collect,采用的是unsafe(RefCell<Vec>)的方式,每个并行线程把结果直接填入对应index的位置,
这样也保证了对一个Task处理结果的有序性,同时没有并发消耗。
一个Task的Job是否完全处理完成,所有并行线程是依靠一个AtomicUsize来同步的,跟交错进行的处理Job相比,损耗暂可不计。
关于处理结果,目前采用的是Box的方式进行处理结果返回,由于要同时兼容不同的处理参数以及不同的返回结果,此处还不是泛型的方式。
struct WorkerThread{
worker: Worker<JobRef>, //半内部任务队列
fifo: JobFifo, //纯内部fifo任务队列
registry: Arc<Registry>, //共享的JobRef注册管理结构体
}
struct Registry{
sleep: Sleep, //WorkerThread基于CondVar的睡眠与唤醒管理
injected_jobs: Injector<JobRef>, //全局公共的任务队列
}
通过上面两个结构体,再结合tokio的基于窃取的任务调度机制,基本上对rayon的Job任务调度机制能猜个大概了:
每个WorkerThread都有自己的纯内部任务队列,仅供自己调度,也是fifo的
每个WorkerThread都有一个半内部任务队列,供自己以及其他空闲WorkerThread调度或者窃取
全局的injected任务队列,供所有WorkerThread竞争调度
为防止饿死,所有的WorkerThread都需要在固定节拍之后去看一眼全局injected任务队列
任务是可重入的吗?任务会在这几个队列之间跳转吗?
下面通过细读代码来一一证实或者修正
spawn一个JobRef的逻辑
1. 如果是WorkerThread自身调用spawn,则把JobRef放入WorkerThread的Worker<JobRef>任务队列(push操作,WorkerThread半内部任务队列),并且看情况进行WorkerThread线程唤醒
2. 如果是非WorkerThread自身调用spawn,则把JobRef放入Registry的injected_jobs任务队列(inject操作,公共任务队列,任意空闲WorkerThread都可以来进行任务窃取),并且看情况进行WorkerThread线程唤醒
unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
let abort_guard = unwind::AbortIfPanic;
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
if let Some(job) = self
.take_local_job()
.or_else(|| self.steal())
.or_else(|| self.registry.pop_injected_job(self.index))
{
self.registry.sleep.work_found(idle_state);
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
} else {
self.registry
.sleep
.no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
}
}
self.registry.sleep.work_found(idle_state);
self.log(|| ThreadSawLatchSet {
worker: self.index,
latch_addr: latch.addr(),
});
mem::forget(abort_guard); // successful execution, do not abort
}
1. 优先从本地worker半内部任务队列里面取任务,否则从其他WorkerThread的半任务队列里面取任务,最后尝试从全局的injected队列里面取任务
2. 如果有成功取到任务,则execute(job)
3. 从其他WorkerThread的半任务队列里面取任务,采用了一种快速伪随机数的方式来选取一个WorkerThread进行任务窃取
pub struct Stealer<T> {
/// A reference to the inner representation of the queue.
inner: Arc<CachePadded<Inner<T>>>,
/// The flavor of the queue.
flavor: Flavor,
}
struct Inner<T> {
/// The front index.
front: AtomicIsize,
/// The back index.
back: AtomicIsize,
/// The underlying buffer.
buffer: CachePadded<Atomic<Buffer<T>>>,
}
真正进行窃取动作的代码
pub fn steal(&self) -> Steal<T> {
let f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
return Steal::Empty;
}
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let task = unsafe { buffer.deref().read(f) };
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return Steal::Retry;
}
// Return the stolen task.
Steal::Success(unsafe { task.assume_init() })
}
基本上就是利用一个AtomicIsize进行CAS操作来窃取任务,跟tokio的窃取机制稍微有点不一样,tokio基于CAS自己取和窃取别人的,操作细节步骤上有细微差异。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。