代码拉取完成,页面将自动刷新
//
// Created by Administrator on 2015/5/24.
//
#ifdef __ESVR_LINUX__
#include <poll.h>
#endif
#include "es_log.h"
#include "es_socket.h"
#include <algorithm>
#include <vector>
namespace esvr {
void IOManager::main_loop() {
m_io_type = "poll";
std::vector<struct pollfd> fds;
struct pollfd server_pfd;
server_pfd.fd = m_serverfd;
server_pfd.events = POLLIN;
server_pfd.revents = 0;
fds.push_back(server_pfd);
while (m_start) {
auto queue = get_to_close_queue();
socket_t *pfd;
while ((pfd = queue->pop()) != NULL) {
es_close(*pfd);
auto it = std::find_if(
fds.begin(), fds.end(),
[pfd](const struct pollfd &item) { return item.fd == *pfd; });
if (it != fds.end()) {
delete_fd(it->fd);
it->fd = -1;
}
log_info("force close client function:%s, line:%d, errno:%d, active client size:%d", __FUNCTION__, __LINE__,
errno, m_active_clients.size());
// event dispatch
ClosedEvent event{*pfd};
EventBus::get_instance().fire_event(&event);
}
before_loop_once();
int ret = poll(&fds[0], fds.size(), m_wait_millisecond);
after_loop_once();
if (ret < 0) {
log_fatal("poll error function:%s line:%d errno:%d ret:%d", __FUNCTION__, __LINE__, errno, ret);
m_start = false;
break;
} else if (ret == 0) {
on_idle();
continue;
} else if (ret > 0 && fds[0].revents & POLLIN) {
socket_t cli_fd = accept(m_serverfd, NULL, NULL);
if (cli_fd > 0) {
new_fd(cli_fd);
auto it = std::find_if(fds.begin(), fds.end(),
[](const pollfd &it) { return it.fd < 0; });
if (it != fds.end()) {
it->fd = cli_fd;
it->events = POLLIN;
it->revents = 0;
} else {
struct pollfd poll_fd;
poll_fd.fd = cli_fd;
poll_fd.events = POLLIN;
poll_fd.revents = 0;
fds.push_back(poll_fd);
}
ConnectedEvent event{cli_fd};
EventBus::get_instance().fire_event(&event);
} else {
log_error("accept error function:%s line:%d errno:%d", __FUNCTION__, __LINE__, errno);
}
}
for (auto &it : fds) {
if (it.revents & POLLIN && it.fd > 0 && it.fd != m_serverfd) {
char *buffer = NULL;
size_t buffer_size = 0;
writable_buffer(it.fd, buffer, buffer_size);
if (buffer == NULL || buffer_size <= 0) {
LOG_ERROR("writable buffer for socket ", it.fd, " is null!");
continue;
}
int len;
if ((len = es_recv(it.fd, buffer, buffer_size)) <= 0) {
delete_fd(it.fd);
log_info("recv error function:%s line:%d errno:%d ret:%d active client size:%d", __FUNCTION__,
__LINE__, errno, len, m_active_clients.size());
es_close(it.fd);
ClosedEvent event{it.fd};
EventBus::get_instance().fire_event(&event);
it.fd = -1;
} else {
increase_readable(it.fd, static_cast<size_t>(len));
ReadEvent event{it.fd, buffer, static_cast<size_t>(len)};
EventBus::get_instance().fire_event(&event);
}
}
}
}
}
};
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。