验证中...
Languages: C/C++
Categories: 桌面应用界面
Latest update 2019-05-13 13:42
ScheduleFreequeue.h
Raw Copy
#ifndef SCHEDULEDFREEQUEUE_H
#define SCHEDULEDFREEQUEUE_H
#include <mutex>
#include <thread>
#include <time.h>
#include <iostream>
#include <queue>
#include <chrono>
#include <unistd.h>
#include <atomic>
#include <memory>
#include <future>
#include <functional>
#include <condition_variable>
namespace sysmodule
{
struct Task
{
Task(std::function<void()> func, int delay) : func_(func), delay_(delay), enter_time_(time((time_t *)NULL)){}
bool Timeout()
{
auto now = time((time_t *)NULL);
auto cost = difftime(now, enter_time_);
std::cout << "cost second : " << cost << std::endl;
return cost > (double)delay_;
}
std::function<void()> func_;
int delay_;
time_t enter_time_;
};
class ScheduledFreequeue
{
public:
ScheduledFreequeue(int max_workers = 1) : done_(false), notified_(false)
{
if (max_workers)
{
while (max_workers > 0)
{
consumer_thrds_.push_back(std::thread(&ScheduledFreequeue::Consume, this));
--max_workers;
}
}
timer_thrd_ = std::thread(&ScheduledFreequeue::DelayLoop, this);
}
virtual ~ScheduledFreequeue()
{
Clear();
done_ = true;
notified_ = true;
std::unique_lock<std::mutex> lock(m_);
cond_var_.notify_all();
lock.unlock();
for (size_t i = 0; i < consumer_thrds_.size(); ++i)
{
if(consumer_thrds_[i].joinable())
{
consumer_thrds_[i].join();
}
}
timer_thrd_.join();
}
virtual bool Empty()
{
std::unique_lock<std::mutex> lock(m_);
return q_.empty();
}
template<class F, class... Args>
void Push(int delay, F&& f, Args&&... args)
{
std::unique_lock<std::mutex> lock(m_);
using return_type = typename std::result_of<F(Args...)>::type;
auto func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//auto pfunc = std::make_shared<std::function<decltype(F(args...))()>>(func);
auto pfunc = std::make_shared<std::function<return_type()>>(func);
std::function<void()> task_func = [pfunc](){
(*pfunc)();
};
if (delay > 0)
{
std::cout << "Delay " << delay << " secs to start task\n";
Task t(task_func, delay);
delayed_tasks_.emplace_back(Task(task_func, delay));
return;
}
q_.push(task_func);
notified_ = true;
cond_var_.notify_one();
}
protected:
virtual void Clear()
{
std::unique_lock<std::mutex> lock(m_);
std::queue<std::function<void()>> empty;
std::swap(q_, empty);
}
virtual void DelayLoop()
{
while (!done_)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto it = delayed_tasks_.begin(); it != delayed_tasks_.end();)
{
if ((*it).Timeout())
{
std::unique_lock<std::mutex> lock(m_);
q_.push((*it).func_);
it = delayed_tasks_.erase(it);
notified_ = true;
cond_var_.notify_one();
//++it;
}
else
{
++it;
}
}
}
}
virtual void Consume()
{
while (!done_)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(m_);
while (!notified_)
{
// loop to avoid spurious wakeups
cond_var_.wait(lock);
if (done_ && q_.empty()) return;
}
if (!q_.empty())
{
task = q_.front();
q_.pop();
}
else
{
notified_ = false;
std::cout << "continue loop ...\n";
continue;
}
}
task();
}
}
std::mutex m_;
std::condition_variable cond_var_;
std::atomic<bool> done_;
std::atomic<bool> notified_;
std::queue<std::function<void()>> q_;
std::vector<Task> delayed_tasks_;
std::vector<std::thread> consumer_thrds_;
std::thread timer_thrd_;
private:
ScheduledFreequeue(ScheduledFreequeue const &) = delete;
void operator=(ScheduledFreequeue const &) = delete;
};
}
#endif

Comment list( 0 )

You need to Sign in for post a comment

Help Search