1 Star 0 Fork 356

xiaohonghong-debug / workflow

forked from 搜狗开源 / workflow 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
WFTask.h 13.38 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _WFTASK_H_
#define _WFTASK_H_
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <atomic>
#include <utility>
#include <functional>
#include "Executor.h"
#include "ExecRequest.h"
#include "Communicator.h"
#include "CommScheduler.h"
#include "CommRequest.h"
#include "SleepRequest.h"
#include "IORequest.h"
#include "Workflow.h"
#include "WFConnection.h"
enum
{
WFT_STATE_UNDEFINED = -1,
WFT_STATE_SUCCESS = CS_STATE_SUCCESS,
WFT_STATE_TOREPLY = CS_STATE_TOREPLY, /* for server task only */
WFT_STATE_NOREPLY = CS_STATE_TOREPLY + 1, /* for server task only */
WFT_STATE_SYS_ERROR = CS_STATE_ERROR,
WFT_STATE_SSL_ERROR = 65,
WFT_STATE_DNS_ERROR = 66, /* for client task only */
WFT_STATE_TASK_ERROR = 67,
WFT_STATE_ABORTED = CS_STATE_STOPPED /* main process terminated */
};
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
INPUT *get_input() { return &this->input; }
OUTPUT *get_output() { return &this->output; }
public:
void *user_data;
public:
int get_state() const { return this->state; }
int get_error() const { return this->error; }
public:
void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
INPUT input;
OUTPUT output;
std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;
public:
WFThreadTask(ExecQueue *queue, Executor *executor,
std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
ExecRequest(queue, executor),
callback(std::move(cb))
{
this->user_data = NULL;
this->state = WFT_STATE_UNDEFINED;
this->error = 0;
}
protected:
virtual ~WFThreadTask() { }
};
template<class INPUT, class OUTPUT>
class WFMultiThreadTask : public ParallelTask
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
INPUT *get_input(size_t index)
{
return static_cast<Thread *>(this->subtasks[index])->get_input();
}
OUTPUT *get_output(size_t index)
{
return static_cast<Thread *>(this->subtasks[index])->get_output();
}
public:
void *user_data;
public:
int get_state(size_t index) const
{
return static_cast<const Thread *>(this->subtasks[index])->get_state();
}
int get_error(size_t index) const
{
return static_cast<const Thread *>(this->subtasks[index])->get_error();
}
public:
void set_callback(
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)> callback;
protected:
using Thread = WFThreadTask<INPUT, OUTPUT>;
public:
WFMultiThreadTask(Thread *const tasks[], size_t n,
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)>&& cb) :
ParallelTask(new SubTask *[n], n),
callback(std::move(cb))
{
size_t i;
for (i = 0; i < n; i++)
this->subtasks[i] = tasks[i];
this->user_data = NULL;
}
protected:
virtual ~WFMultiThreadTask()
{
size_t n = this->subtasks_nr;
while (n > 0)
delete this->subtasks[--n];
delete []this->subtasks;
}
};
template<class REQ, class RESP>
class WFNetworkTask : public CommRequest
{
public:
/* start(), dismiss() are for client tasks only. */
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
REQ *get_req() { return &this->req; }
RESP *get_resp() { return &this->resp; }
public:
void *user_data;
public:
int get_state() const { return this->state; }
int get_error() const { return this->error; }
/* Call when error is ETIMEDOUT, return values:
* TOR_NOT_TIMEOUT, TOR_WAIT_TIMEOUT, TOR_CONNECT_TIMEOUT,
* TOR_TRANSMIT_TIMEOUT (send or receive).
* SSL connect timeout also returns TOR_CONNECT_TIMEOUT. */
int get_timeout_reason() const { return this->timeout_reason; }
/* Call only in callback or server's process. */
long long get_task_seq() const
{
if (!this->target)
{
errno = ENOTCONN;
return -1;
}
return this->get_seq();
}
int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;
virtual WFConnection *get_connection() const = 0;
public:
/* All in milliseconds. timeout == -1 for unlimited. */
void set_send_timeout(int timeout) { this->send_timeo = timeout; }
void set_receive_timeout(int timeout) { this->receive_timeo = timeout; }
void set_keep_alive(int timeout) { this->keep_alive_timeo = timeout; }
public:
/* noreply(), push() are for server tasks only. */
void noreply()
{
if (this->state == WFT_STATE_TOREPLY)
this->state = WFT_STATE_NOREPLY;
}
virtual int push(const void *buf, size_t size)
{
return this->scheduler->push(buf, size, this);
}
public:
void set_callback(std::function<void (WFNetworkTask<REQ, RESP> *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual int send_timeout() { return this->send_timeo; }
virtual int receive_timeout() { return this->receive_timeo; }
virtual int keep_alive_timeout() { return this->keep_alive_timeo; }
protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->state == WFT_STATE_SYS_ERROR && this->error < 0)
{
this->state = WFT_STATE_SSL_ERROR;
this->error = -this->error;
}
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
int send_timeo;
int receive_timeo;
int keep_alive_timeo;
REQ req;
RESP resp;
std::function<void (WFNetworkTask<REQ, RESP> *)> callback;
protected:
WFNetworkTask(CommSchedObject *object, CommScheduler *scheduler,
std::function<void (WFNetworkTask<REQ, RESP> *)>&& cb) :
CommRequest(object, scheduler),
callback(std::move(cb))
{
this->user_data = NULL;
this->send_timeo = -1;
this->receive_timeo = -1;
this->keep_alive_timeo = 0;
this->target = NULL;
this->timeout_reason = TOR_NOT_TIMEOUT;
this->state = WFT_STATE_UNDEFINED;
this->error = 0;
}
virtual ~WFNetworkTask() { }
};
class WFTimerTask : public SleepRequest
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
void *user_data;
public:
int get_state() const { return this->state; }
int get_error() const { return this->error; }
public:
void set_callback(std::function<void (WFTimerTask *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
std::function<void (WFTimerTask *)> callback;
public:
WFTimerTask(CommScheduler *scheduler,
std::function<void (WFTimerTask *)> cb) :
SleepRequest(scheduler),
callback(std::move(cb))
{
this->user_data = NULL;
this->state = WFT_STATE_UNDEFINED;
this->error = 0;
}
protected:
virtual ~WFTimerTask() { }
};
template<class ARGS>
class WFFileTask : public IORequest
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
ARGS *get_args() { return &this->args; }
long get_retval() const
{
if (this->state == WFT_STATE_SUCCESS)
return this->get_res();
else
return -1;
}
public:
void *user_data;
public:
int get_state() const { return this->state; }
int get_error() const { return this->error; }
public:
void set_callback(std::function<void (WFFileTask<ARGS> *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
ARGS args;
std::function<void (WFFileTask<ARGS> *)> callback;
public:
WFFileTask(IOService *service,
std::function<void (WFFileTask<ARGS> *)>&& cb) :
IORequest(service),
callback(std::move(cb))
{
this->user_data = NULL;
this->state = WFT_STATE_UNDEFINED;
this->error = 0;
}
protected:
virtual ~WFFileTask() { }
};
class WFGenericTask : public SubTask
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
void *user_data;
public:
int get_state() const { return this->state; }
int get_error() const { return this->error; }
protected:
virtual void dispatch()
{
this->subtask_done();
}
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
delete this;
return series->pop();
}
protected:
int state;
int error;
public:
WFGenericTask()
{
this->user_data = NULL;
this->state = WFT_STATE_UNDEFINED;
this->error = 0;
}
protected:
virtual ~WFGenericTask() { }
};
class WFCounterTask : public WFGenericTask
{
public:
virtual void count()
{
if (--this->value == 0)
{
this->state = WFT_STATE_SUCCESS;
this->subtask_done();
}
}
public:
void set_callback(std::function<void (WFCounterTask *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual void dispatch()
{
this->WFCounterTask::count();
}
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
std::atomic<unsigned int> value;
std::function<void (WFCounterTask *)> callback;
public:
WFCounterTask(unsigned int target_value,
std::function<void (WFCounterTask *)>&& cb) :
value(target_value + 1),
callback(std::move(cb))
{
}
protected:
virtual ~WFCounterTask() { }
};
class WFMailboxTask : public WFGenericTask
{
public:
void send(void *msg)
{
*this->next++ = msg;
this->count();
}
void **get_mailbox(size_t *n)
{
*n = this->next - this->mailbox;
return this->mailbox;
}
public:
void set_callback(std::function<void (WFMailboxTask *)> cb)
{
this->callback = std::move(cb);
}
public:
virtual void count()
{
if (--this->value == 0)
{
this->state = WFT_STATE_SUCCESS;
this->subtask_done();
}
}
protected:
virtual void dispatch()
{
this->WFMailboxTask::count();
}
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
void **mailbox;
std::atomic<void **> next;
std::atomic<size_t> value;
std::function<void (WFMailboxTask *)> callback;
public:
WFMailboxTask(void **mailbox, size_t size,
std::function<void (WFMailboxTask *)>&& cb) :
next(mailbox),
value(size + 1),
callback(std::move(cb))
{
this->mailbox = mailbox;
}
WFMailboxTask(std::function<void (WFMailboxTask *)>&& cb) :
next(&this->user_data),
value(2),
callback(std::move(cb))
{
this->mailbox = &this->user_data;
}
protected:
virtual ~WFMailboxTask() { }
};
class WFConditional : public WFGenericTask
{
public:
virtual void signal(void *msg)
{
*this->msgbuf = msg;
if (this->flag.exchange(true))
this->subtask_done();
}
protected:
virtual void dispatch()
{
series_of(this)->push_front(this->task);
this->task = NULL;
if (this->flag.exchange(true))
this->subtask_done();
}
protected:
std::atomic<bool> flag;
SubTask *task;
void **msgbuf;
public:
WFConditional(SubTask *task, void **msgbuf) :
flag(false)
{
this->task = task;
this->msgbuf = msgbuf;
}
WFConditional(SubTask *task) :
flag(false)
{
this->task = task;
this->msgbuf = &this->user_data;
}
protected:
virtual ~WFConditional()
{
delete this->task;
}
};
class WFGoTask : public ExecRequest
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
void dismiss()
{
assert(!series_of(this));
delete this;
}
public:
void *user_data;
public:
int get_state() const { return this->state; }
int get_error() const { return this->error; }
public:
void set_callback(std::function<void (WFGoTask *)> cb)
{
this->callback = std::move(cb);
}
protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
protected:
std::function<void (WFGoTask *)> callback;
public:
WFGoTask(ExecQueue *queue, Executor *executor) :
ExecRequest(queue, executor)
{
this->user_data = NULL;
this->state = WFT_STATE_UNDEFINED;
this->error = 0;
}
protected:
virtual ~WFGoTask() { }
};
#include "WFTask.inl"
#endif
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/xiaohonghong-debug/workflow.git
git@gitee.com:xiaohonghong-debug/workflow.git
xiaohonghong-debug
workflow
workflow
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891