1 Star 0 Fork 1

endpoint_rust/portable_parallel

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

portable_parallel

介绍

这是一个rust实现的易于使用的并行计算库,目的就是即开即用的进行各种数据的并行计算处理任务。

软件架构

始于一个非常简单的现实需求,当我同时有大量异步io任务与大量cpu密集处理任务的时候,把他们都揉在一起是不合适的, 这时候如果有一个即开即用的并行库可供使用就好了,项目还处于比较早期的关键技术调研验证阶段。

现阶段我的需求:
1)数据并行切割处理

2)数据处理任务带有优先级,并且高优先级的永远优先执行

3)数据处理结果可以汇总,并且保持正确的顺序
初步的想法分这么几个大的部分:
1) 数据的并行切割

2) 切割后的数据以及相关处理共同形成并行任务列表

3) 并行线程池,对并行任务调度执行

4) 并行数据处理结果汇总

5) 一次数据并行提交为一个整体,只要开始处理其中的一部分,就会把这次的处理任务全部做完,然后按序收集结果

性能测试结果

目前纯验证Task/Job调度能力,10个并行线程,180%cpu消耗,大概550W/s 的Job调度能力

其他思考的点

目前数据并行处理结果collect,采用的是unsafe(RefCell<Vec>)的方式,每个并行线程把结果直接填入对应index的位置,

这样也保证了对一个Task处理结果的有序性,同时没有并发消耗。

一个Task的Job是否完全处理完成,所有并行线程是依靠一个AtomicUsize来同步的,跟交错进行的处理Job相比,损耗暂可不计。

关于处理结果,目前采用的是Box的方式进行处理结果返回,由于要同时兼容不同的处理参数以及不同的返回结果,此处还不是泛型的方式。

----------------------------------------------------------------

重点学习并行库rayon的核心机制

WorkerThread之JobRef注册与调度机制

struct WorkerThread{

worker: Worker<JobRef>,  //半内部任务队列

fifo: JobFifo,           //纯内部fifo任务队列

registry: Arc<Registry>, //共享的JobRef注册管理结构体

}

struct Registry{

sleep: Sleep,                    //WorkerThread基于CondVar的睡眠与唤醒管理

injected_jobs: Injector<JobRef>, //全局公共的任务队列

}

通过上面两个结构体,再结合tokio的基于窃取的任务调度机制,基本上对rayon的Job任务调度机制能猜个大概了:

每个WorkerThread都有自己的纯内部任务队列,仅供自己调度,也是fifo的

每个WorkerThread都有一个半内部任务队列,供自己以及其他空闲WorkerThread调度或者窃取

全局的injected任务队列,供所有WorkerThread竞争调度

为防止饿死,所有的WorkerThread都需要在固定节拍之后去看一眼全局injected任务队列

任务是可重入的吗?任务会在这几个队列之间跳转吗?

下面通过细读代码来一一证实或者修正

spawn一个JobRef的逻辑

1. 如果是WorkerThread自身调用spawn,则把JobRef放入WorkerThread的Worker<JobRef>任务队列(push操作,WorkerThread半内部任务队列),并且看情况进行WorkerThread线程唤醒

2. 如果是非WorkerThread自身调用spawn,则把JobRef放入Registry的injected_jobs任务队列(inject操作,公共任务队列,任意空闲WorkerThread都可以来进行任务窃取),并且看情况进行WorkerThread线程唤醒
WorkThread工作模式
unsafe fn wait_until_cold(&self, latch: &CoreLatch) {

    let abort_guard = unwind::AbortIfPanic;

    let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
    while !latch.probe() {

        if let Some(job) = self
            .take_local_job()
            .or_else(|| self.steal())
            .or_else(|| self.registry.pop_injected_job(self.index))
        {
            self.registry.sleep.work_found(idle_state);
            self.execute(job);
            idle_state = self.registry.sleep.start_looking(self.index, latch);
        } else {
            self.registry
                .sleep
                .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
        }
    }

    self.registry.sleep.work_found(idle_state);

    self.log(|| ThreadSawLatchSet {
        worker: self.index,
        latch_addr: latch.addr(),
    });
    mem::forget(abort_guard); // successful execution, do not abort
}

1. 优先从本地worker半内部任务队列里面取任务,否则从其他WorkerThread的半任务队列里面取任务,最后尝试从全局的injected队列里面取任务
2. 如果有成功取到任务,则execute(job)
3. 从其他WorkerThread的半任务队列里面取任务,采用了一种快速伪随机数的方式来选取一个WorkerThread进行任务窃取
从其他WorkerThread进行任务窃取的代码设计
pub struct Stealer<T> {
    /// A reference to the inner representation of the queue.
    inner: Arc<CachePadded<Inner<T>>>,

    /// The flavor of the queue.
    flavor: Flavor,
}

struct Inner<T> {
    /// The front index.
    front: AtomicIsize,

    /// The back index.
    back: AtomicIsize,

    /// The underlying buffer.
    buffer: CachePadded<Atomic<Buffer<T>>>,
}

真正进行窃取动作的代码

pub fn steal(&self) -> Steal<T> {
  
    let f = self.inner.front.load(Ordering::Acquire);

    if epoch::is_pinned() {
        atomic::fence(Ordering::SeqCst);
    }

    let guard = &epoch::pin();

    let b = self.inner.back.load(Ordering::Acquire);

    if b.wrapping_sub(f) <= 0 {
        return Steal::Empty;
    }

    let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
    let task = unsafe { buffer.deref().read(f) };

    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
        || self
            .inner
            .front
            .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
            .is_err()
    {
        return Steal::Retry;
    }

    // Return the stolen task.
    Steal::Success(unsafe { task.assume_init() })
}

基本上就是利用一个AtomicIsize进行CAS操作来窃取任务,跟tokio的窃取机制稍微有点不一样,tokio基于CAS自己取和窃取别人的,操作细节步骤上有细微差异。

空文件

简介

这是一个rust实现的易于使用的并行计算库,目的就是即开即用的进行各种数据的并行计算处理任务。 展开 收起
取消

发行版

暂无发行版

贡献者 (2)

全部

语言

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Rust
1
https://gitee.com/endpoint_rust/portable_parallel.git
git@gitee.com:endpoint_rust/portable_parallel.git
endpoint_rust
portable_parallel
portable_parallel
master

搜索帮助