代码拉取完成,页面将自动刷新
#include "workthread.h"
#include <unistd.h>
#include <chrono>
#include <cstring>
#include <QDebug>
#include "event2/bufferevent.h"
using namespace std;
WorkThread::WorkThread(unsigned short wid)
:wid_(wid)
{
}
WorkThread::~WorkThread()
{
if (pipe_event_)
{
event_del(pipe_event_);
event_free(pipe_event_);
pipe_event_ = nullptr;
}
while(!acceptor_list_.empty())
{
Acceptor* ac = acceptor_list_.front();
acceptor_list_.pop_front();
delete ac;
}
if(resolver_)
{
delete resolver_;
resolver_ = nullptr;
}
}
bool WorkThread::Init(ssl_ctx_st *ssl_ctx)
{
ssl_ctx_ = ssl_ctx;
if(!resolver_)
{
resolver_ = new HandleResolver();
resolver_->Init();
}
// 初始化一对管道,只能在linux系统下使用
if (pipe(pipefd_) == -1)
{
qFatal("管道创建失败");
return false;
}
// 创建管道监听事件
event_config *ev_conf = event_config_new();
event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
this->ev_base_ = event_base_new_with_config(ev_conf);
event_config_free(ev_conf);
if (!ev_base_)
{
qFatal("工作线程初始化失败");
return false;
}
pipe_event_ = event_new(this->ev_base_, pipefd_[0], EV_READ | EV_PERSIST, EventCB, this);
event_add(pipe_event_, 0);
return true;
}
void WorkThread::Start()
{
is_stop_ = false;
pthread_ = new thread(&WorkThread::Run, this);
}
void WorkThread::Stop()
{
is_stop_ = true;
if(pthread_)
{
pthread_->join();
delete pthread_;
pthread_ = nullptr;
}
qInfo("工作线程<%i>退出", wid_);
}
void WorkThread::Notify(const char *sign)
{
write(this->pipefd_[1], sign, strlen(sign));
}
void WorkThread::AddSocket(int fd)
{
std::lock_guard<std::mutex> gm(mtx_);
fd_list_.push_back(fd);
}
void WorkThread::Disconnection(Acceptor *ac)
{
qInfo("客户端连接断开...");
std::lock_guard<std::mutex> gm(mtx_);
acceptor_list_.remove(ac);
delete ac;
}
int WorkThread::Size() const
{
return acceptor_list_.size();
}
void WorkThread::Run()
{
while (!is_stop_)
{
event_base_loop(this->ev_base_, EVLOOP_NONBLOCK);
this_thread::sleep_for(chrono::milliseconds(1));
}
event_base_free(this->ev_base_);
}
void WorkThread::EventCB(int fd, short what, void *arg)
{
WorkThread *work = (WorkThread *)arg;
if(what == EV_READ)
{
work->Activated(fd);
}
}
void WorkThread::Activated(int fd)
{
char buf[2] = {0};
read(fd, buf, 1);
std::lock_guard<std::mutex> gm(mtx_);
if (strcmp(buf, "c") == 0)
{
if(!fd_list_.empty())
{
int fd = fd_list_.front();
fd_list_.pop_front();
Acceptor *ac = new Acceptor(this, ev_base_, resolver_, fd);
if(ac->Init(ssl_ctx_))
{
acceptor_list_.push_back(ac);
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。