This action will force synchronization from 百度开源/incubator-brpc, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
类似于kylin的ExecMan, ExecutionQueue提供了异步串行执行的功能。ExecutionQueue的相关技术最早使用在RPC中实现多线程向同一个fd写数据. 在r31345之后加入到bthread。 ExecutionQueue 提供了如下基本功能:
和ExecMan的主要区别:
在多核并发编程领域, Message passing作为一种解决竞争的手段得到了比较广泛的应用,它按照业务依赖的资源将逻辑拆分成若干个独立actor,每个actor负责对应资源的维护工作,当一个流程需要修改某个资源的时候, 就转化为一个消息发送给对应actor,这个actor(通常在另外的上下文中)根据命令内容对这个资源进行相应的修改,之后可以选择唤醒调用者(同步)或者提交到下一个actor(异步)的方式进行后续处理。
ExecutionQueue和mutex都可以用来在多线程场景中消除竞争. 相比较使用mutex, 使用ExecutionQueue有着如下几个优点:
但是缺点也同样明显:
不考虑性能和复杂度,理论上任何系统都可以只使用mutex或者ExecutionQueue来消除竞争. 但是复杂系统的设计上,建议根据不同的场景灵活决定如何使用这两个工具:
总之,多线程编程没有万能的模型,需要根据具体的场景,结合丰富的profliling工具,最终在复杂度和性能之间找到合适的平衡。
特别指出一点,Linux中mutex无竞争的lock/unlock只有需要几条原子指令,在绝大多数场景下的开销都可以忽略不计.
// Iterate over the given tasks
//
// Example:
//
// #include <bthread/execution_queue.h>
//
// int demo_execute(void* meta, TaskIterator<T>& iter) {
// if (iter.is_stopped()) {
// // destroy meta and related resources
// return 0;
// }
// for (; iter; ++iter) {
// // do_something(meta, *iter)
// // or do_something(meta, iter->a_member_of_T)
// }
// return 0;
// }
template <typename T>
class TaskIterator;
// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
// default options.
// Returns 0 on success, errno otherwise
// NOTE: type |T| can be non-POD but must be copy-constructible
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>& iter),
void* meta);
创建的返回值是一个64位的id, 相当于ExecutionQueue实例的一个弱引用, 可以wait-free的在O(1)时间内定位一个ExecutionQueue, 你可以到处拷贝这个id, 甚至可以放在RPC中,作为远端资源的定位工具。 你必须保证meta的生命周期,在对应的ExecutionQueue真正停止前不会释放.
// Stop the ExecutionQueue.
// After this function is called:
// - All the following calls to execution_queue_execute would fail immediately.
// - The executor will call |execute| with TaskIterator::is_queue_stopped() being
// true exactly once when all the pending tasks have been executed, and after
// this point it's ok to release the resource referenced by |meta|.
// Returns 0 on success, errno othrwise
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);
// Wait until the the stop task (Iterator::is_queue_stopped() returns true) has
// been executed
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);
stop和join都可以多次调用, 都会又合理的行为。stop可以随时调用而不用当心线程安全性问题。
和fd的close类似,如果stop不被调用, 相应的资源会永久泄露。
安全释放meta的时机: 可以在execute函数中收到iter.is_queue_stopped()==true的任务的时候释放,也可以等到join返回之后释放. 注意不要double-free
struct TaskOptions {
TaskOptions();
TaskOptions(bool high_priority, bool in_place_if_possible);
// Executor would execute high-priority tasks in the FIFO order but before
// all pending normal-priority tasks.
// NOTE: We don't guarantee any kind of real-time as there might be tasks still
// in process which are uninterruptible.
//
// Default: false
bool high_priority;
// If |in_place_if_possible| is true, execution_queue_execute would call
// execute immediately instead of starting a bthread if possible
//
// Note: Running callbacks in place might cause the dead lock issue, you
// should be very careful turning this flag on.
//
// Default: false
bool in_place_if_possible;
};
const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(/*high_priority=*/ false, /*in_place_if_possible=*/ false);
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(/*high_priority=*/ true, /*in_place_if_possible=*/ false);
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(/*high_priority=*/ false, /*in_place_if_possible=*/ true);
// Thread-safe and Wait-free.
// Execute a task with defaut TaskOptions (normal task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task);
// Thread-safe and Wait-free.
// Execute a task with options. e.g
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, we will use default options (normal task)
// If |handle| is not NULL, we will assign it with the hanlder of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle);
high_priority的task之间的执行顺序也会严格按照提交顺序, 这点和ExecMan不同, ExecMan的QueueExecEmergent的AsyncContex执行顺序是undefined. 但是这也意味着你没有办法将任何任务插队到一个high priority的任务之前执行.
开启inplace_if_possible, 在无竞争的场景中可以省去一次线程调度和cache同步的开销. 但是可能会造成死锁或者递归层数过多(比如不停的ping-pong)的问题,开启前请确定你的代码中不存在这些问题。
/// [Thread safe and ABA free] Cancel the corresponding task.
// Returns:
// -1: The task was executed or h is an invalid handle
// 0: Success
// 1: The task is executing
int execution_queue_cancel(const TaskHandle& h);
返回非0仅仅意味着ExecutionQueue已经将对应的task递给过execute, 真实的逻辑中可能将这个task缓存在另外的容器中,所以这并不意味着逻辑上的task已经结束,你需要在自己的业务上保证这一点.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。