From e16816307211ae3f2673519f2d016d02e5d795d6 Mon Sep 17 00:00:00 2001 From: wangdong <2504590354@qq.com> Date: Wed, 18 Sep 2024 10:32:43 +0800 Subject: [PATCH] modify recv_timed_func --- src/cm_protocol/cs_pipe.c | 39 +++++++++++++++++++---- src/cm_protocol/cs_pipe.h | 2 ++ src/cm_protocol/cs_ssl.c | 67 ++++++++++++++++++++++++++++++++------- src/cm_protocol/cs_ssl.h | 2 +- src/cm_protocol/cs_tcp.c | 42 +++++++++++++++++++----- src/cm_protocol/cs_tcp.h | 2 +- src/cm_protocol/cs_uds.c | 39 ++++++++++++++++++----- src/cm_protocol/cs_uds.h | 2 +- 8 files changed, 158 insertions(+), 37 deletions(-) diff --git a/src/cm_protocol/cs_pipe.c b/src/cm_protocol/cs_pipe.c index bb0e80f..d09d0bb 100644 --- a/src/cm_protocol/cs_pipe.c +++ b/src/cm_protocol/cs_pipe.c @@ -34,7 +34,7 @@ extern "C" { typedef status_t (*recv_func_t)(const void *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event); typedef status_t (*send_func_t)(const void *link, const char *buf, uint32 size, int32 *send_size); -typedef status_t (*recv_timed_func_t)(void *link, char *buf, uint32 size, uint32 timeout); +typedef status_t (*recv_timed_func_t)(void *link, char *buf, uint32 size, int32 *recv_size, uint32 timeout); typedef status_t (*send_timed_func_t)(void *link, const char *buf, uint32 size, uint32 timeout); typedef status_t (*wait_func_t)(void *link, uint32 wait_for, int32 timeout, bool32 *ready); @@ -86,8 +86,8 @@ static const vio_t g_vio_list[] = { GET_VIO(pipe)->vio_send_timed(&(pipe)->link, buf, size, timeout) #define VIO_RECV(pipe, buf, size, len, wait_event) \ GET_VIO(pipe)->vio_recv(&(pipe)->link, buf, size, len, wait_event) -#define VIO_RECV_TIMED(pipe, buf, size, timeout) \ - GET_VIO(pipe)->vio_recv_timed(&(pipe)->link, buf, size, timeout) +#define VIO_RECV_TIMED(pipe, buf, size, len, timeout) \ + GET_VIO(pipe)->vio_recv_timed(&(pipe)->link, buf, size, len, timeout) #define VIO_WAIT(pipe, ev, timeout, ready) \ GET_VIO(pipe)->vio_wait(&(pipe)->link, ev, timeout, ready) @@ -96,6 +96,7 @@ static status_t cs_send_proto_code(cs_pipe_t *pipe, link_ready_ack_t *ack, bool3 tcp_link_t *link = NULL; bool32 ready = CM_FALSE; link = &pipe->link.tcp; + int32 rsize; if (need_send_version) { LOG_RUN_INF("[MES] cs_send_proto_code, send version and proto code"); @@ -131,7 +132,7 @@ static status_t cs_send_proto_code(cs_pipe_t *pipe, link_ready_ack_t *ack, bool3 } // read link_ready_ack - if (cs_tcp_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { + if (cs_tcp_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), &rsize, CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { LOG_RUN_ERR("[MES] cs_send_proto_code, cs_tcp_recv_timed failed"); return CM_ERROR; } @@ -270,6 +271,7 @@ static status_t cs_open_uds_link(const char *server_path, const char *client_pat bool32 ready = CM_FALSE; uint32 proto_code = CM_PROTO_CODE; uint8 local_endian; + int32 rsize; link = &pipe->link.uds; @@ -299,7 +301,7 @@ static status_t cs_open_uds_link(const char *server_path, const char *client_pat } // read link_ready_ack - if (cs_uds_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { + if (cs_uds_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), &rsize, CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { cs_uds_socket_close(&link->sock); return CM_ERROR; } @@ -421,6 +423,11 @@ status_t cs_read_bytes(cs_pipe_t *pipe, char *buf, uint32 max_size, int32 *size) return VIO_RECV(pipe, buf, max_size, size, &wait_event); } +status_t cs_read_bytes_timed(cs_pipe_t *pipe, char *buf, uint32 size, int32 *recv_size, uint32 timeout) +{ + return VIO_RECV_TIMED(pipe, buf, size, recv_size, timeout); +} + status_t cs_read_fixed_size(cs_pipe_t *pipe, char *buf, uint32 size) { bool32 ready; @@ -554,12 +561,15 @@ static status_t cs_read_packet(cs_pipe_t *pipe, cs_packet_t *pack, bool32 cs_cli int32 remain_size; int32 head_size = (int32)sizeof(cs_packet_head_t); int32 err_code = 0; + int32 rsize; - if (VIO_RECV_TIMED(pipe, pack->buf, head_size, CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { + if (VIO_RECV_TIMED(pipe, pack->buf, head_size, &rsize, CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { err_code = cm_get_error_code(); if (err_code == (int32)ERR_TCP_TIMEOUT) { CM_THROW_ERROR(ERR_TCP_TIMEOUT, cs_client ? "read wait for server response" : "read wait for client request"); + } else { + CM_THROW_ERROR(ERR_TCP_RECV, "tcp", errno); } return CM_ERROR; } @@ -578,11 +588,13 @@ static status_t cs_read_packet(cs_pipe_t *pipe, cs_packet_t *pack, bool32 cs_cli return CM_SUCCESS; } - if (VIO_RECV_TIMED(pipe, pack->buf + head_size, remain_size, CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { + if (VIO_RECV_TIMED(pipe, pack->buf + head_size, remain_size, &rsize, CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) { err_code = cm_get_error_code(); if (err_code == (int32)ERR_TCP_TIMEOUT) { CM_THROW_ERROR(ERR_TCP_TIMEOUT, cs_client ? "read wait for server response" : "read wait for client request"); + } else { + CM_THROW_ERROR(ERR_TCP_RECV, "tcp", errno); } return CM_ERROR; } @@ -691,6 +703,19 @@ int cs_get_pipe_sock(cs_pipe_t *pipe) return CM_ERROR; } +void cs_set_pipe_sock_invalid(cs_pipe_t *pipe) +{ + if (pipe->type == CS_TYPE_TCP) { + pipe->link.tcp.sock = CS_INVALID_SOCKET; + } else if (pipe->type == CS_TYPE_SSL) { + pipe->link.ssl.tcp.sock = CS_INVALID_SOCKET; + } else if (pipe->type == CS_TYPE_DOMAIN_SCOKET) { + pipe->link.uds.sock = CS_INVALID_SOCKET; + } else { + CM_ASSERT(0); + } +} + #ifdef __cplusplus } #endif diff --git a/src/cm_protocol/cs_pipe.h b/src/cm_protocol/cs_pipe.h index 7945670..dc51e3b 100644 --- a/src/cm_protocol/cs_pipe.h +++ b/src/cm_protocol/cs_pipe.h @@ -86,6 +86,7 @@ void cs_disconnect(cs_pipe_t *pipe); void cs_shutdown(const cs_pipe_t *pipe); status_t cs_wait(cs_pipe_t *pipe, uint32 wait_for, int32 timeout, bool32 *ready); status_t cs_read_bytes(cs_pipe_t *pipe, char *buf, uint32 max_size, int32 *size); +status_t cs_read_bytes_timed(cs_pipe_t *pipe, char *buf, uint32 size, int32 *recv_size, uint32 timeout); status_t cs_read_fixed_size(cs_pipe_t *pipe, char *buf, uint32 size); status_t cs_send_fixed_size(cs_pipe_t *pipe, char *buf, int32 size); status_t cs_send_bytes(cs_pipe_t *pipe, const char *buf, uint32 size); @@ -101,6 +102,7 @@ status_t cs_call(cs_pipe_t *pipe, cs_packet_t *req, cs_packet_t *ack); status_t cs_call_timed(cs_pipe_t *pipe, cs_packet_t *req, cs_packet_t *ack); void cs_get_remote_host(cs_pipe_t *pipe, char *os_host); int cs_get_pipe_sock(cs_pipe_t *pipe); +void cs_set_pipe_sock_invalid(cs_pipe_t *pipe); #ifdef __cplusplus } diff --git a/src/cm_protocol/cs_ssl.c b/src/cm_protocol/cs_ssl.c index 8f98297..7a1a273 100644 --- a/src/cm_protocol/cs_ssl.c +++ b/src/cm_protocol/cs_ssl.c @@ -1719,18 +1719,59 @@ status_t cs_ssl_recv(ssl_link_t *link, char *buf, uint32 size, int32 *recv_size, return CM_SUCCESS; } -static status_t cs_ssl_recv_remain(ssl_link_t *link, char *buf, uint32 offset, uint32 remain_size, uint32 wait_event, - uint32 timeout) +status_t cs_ssl_recv_once(ssl_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event) +{ + int32 ret, err; + + if (link->ssl_sock == NULL) { + CM_THROW_ERROR(ERR_PEER_CLOSED_REASON, "ssl"); + return CM_ERROR; + } + SSL *ssl = SSL_SOCK(link->ssl_sock); + + if (size == 0) { + (*recv_size) = 0; + return CM_SUCCESS; + } + + /* clear the error queue before the SSL I/O operation */ + cm_set_sock_error(0); + ERR_clear_error(); + + ret = SSL_read(ssl, (void *)buf, (int32)size); + if (ret > 0) { + *recv_size = ret; + return CM_SUCCESS; + } + + if (!cs_ssl_should_retry(link, ret, wait_event, &err)) { + err = cm_get_sock_error(); + if (err == ECONNRESET) { + CM_THROW_ERROR(ERR_PEER_CLOSED, "ssl"); + } else { + CM_THROW_ERROR(ERR_TCP_RECV, "ssl", err); + } + + return CM_ERROR; + } + + *recv_size = 0; + return CM_SUCCESS; +} + +static status_t cs_ssl_recv_remain( + ssl_link_t *link, char *buf, int32 *offset, uint32 remain_size, uint32 wait_event, uint32 timeout) { int32 recv_size; uint32 wait_interval = 0; bool32 ready = CM_FALSE; + int32 poll_wait = timeout > CM_POLL_WAIT ? CM_POLL_WAIT : timeout; while (remain_size > 0) { - CM_RETURN_IFERR(cs_ssl_wait(link, wait_event, CM_POLL_WAIT, &ready) != CM_SUCCESS); + CM_RETURN_IFERR(cs_ssl_wait(link, wait_event, poll_wait, &ready) != CM_SUCCESS); if (!ready) { - wait_interval += CM_POLL_WAIT; + wait_interval += poll_wait; if (wait_interval >= timeout) { CM_THROW_ERROR(ERR_TCP_TIMEOUT, "recv data"); return CM_ERROR; @@ -1739,27 +1780,31 @@ static status_t cs_ssl_recv_remain(ssl_link_t *link, char *buf, uint32 offset, u continue; } - CM_RETURN_IFERR(cs_ssl_recv(link, buf + offset, remain_size, &recv_size, &wait_event) != CM_SUCCESS); + CM_RETURN_IFERR(cs_ssl_recv_once(link, buf + (*offset), remain_size, &recv_size, &wait_event) != CM_SUCCESS); remain_size -= recv_size; - offset += recv_size; + *offset += recv_size; } return CM_SUCCESS; } -status_t cs_ssl_recv_timed(ssl_link_t *link, char *buf, uint32 size, uint32 timeout) +status_t cs_ssl_recv_timed(ssl_link_t *link, char *buf, uint32 size, int32 *rsize, uint32 timeout) { - uint32 remain_size, offset; + uint32 remain_size; int32 recv_size; uint32 wait_event = 0; remain_size = size; - CM_RETURN_IFERR(cs_ssl_recv(link, buf, remain_size, &recv_size, &wait_event) != CM_SUCCESS); + CM_RETURN_IFERR(cs_ssl_recv_once(link, buf, remain_size, &recv_size, &wait_event) != CM_SUCCESS); remain_size -= recv_size; - offset = (uint32)recv_size; + *rsize = recv_size; wait_event = (wait_event == 0) ? CS_WAIT_FOR_READ : wait_event; + if (timeout == 0) { + return CM_SUCCESS; + } + - return cs_ssl_recv_remain(link, buf, offset, remain_size, wait_event, timeout); + return cs_ssl_recv_remain(link, buf, rsize, remain_size, wait_event, timeout); } status_t cs_ssl_wait(ssl_link_t *link, uint32 wait_for, int32 timeout, bool32 *ready) diff --git a/src/cm_protocol/cs_ssl.h b/src/cm_protocol/cs_ssl.h index 7b99722..8b5606b 100644 --- a/src/cm_protocol/cs_ssl.h +++ b/src/cm_protocol/cs_ssl.h @@ -160,7 +160,7 @@ status_t cs_ssl_send_timed(ssl_link_t *link, const char *buf, uint32 size, uint3 * @retval CM_ERROR other error */ status_t cs_ssl_recv(ssl_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event); -status_t cs_ssl_recv_timed(ssl_link_t *link, char *buf, uint32 size, uint32 timeout); +status_t cs_ssl_recv_timed(ssl_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 timeout); /** * wait on SSL socket, till success or timeout diff --git a/src/cm_protocol/cs_tcp.c b/src/cm_protocol/cs_tcp.c index 4fed32c..3cb3357 100644 --- a/src/cm_protocol/cs_tcp.c +++ b/src/cm_protocol/cs_tcp.c @@ -579,24 +579,50 @@ status_t cs_tcp_recv(const tcp_link_t *link, char *buf, uint32 size, int32 *recv return CM_SUCCESS; } +status_t cs_tcp_recv_once(const tcp_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event) +{ + if (size == 0) { + *recv_size = 0; + return CM_SUCCESS; + } + + int32 rsize = recv(link->sock, buf, size, 0); + if (rsize > 0) { + *recv_size = rsize; + return CM_SUCCESS; + } + + if (rsize == 0) { + CM_THROW_ERROR(ERR_PEER_CLOSED, "tcp"); + } else { + CM_THROW_ERROR(ERR_TCP_RECV, "tcp", cm_get_sock_error()); + } + return CM_ERROR; +} + /* cs_tcp_recv_timed must following cs_tcp_wait */ -status_t cs_tcp_recv_timed(tcp_link_t *link, char *buf, uint32 size, uint32 timeout) +status_t cs_tcp_recv_timed(tcp_link_t *link, char *buf, uint32 size, int32 *rsize, uint32 timeout) { - uint32 remain_size, offset; + uint32 remain_size; uint32 wait_interval = 0; int32 recv_size; bool32 ready = CM_FALSE; remain_size = size; - CM_RETURN_IFERR(cs_tcp_recv(link, buf, remain_size, &recv_size, NULL)); + CM_RETURN_IFERR(cs_tcp_recv_once(link, buf, remain_size, &recv_size, NULL)); remain_size -= recv_size; - offset = (uint32)recv_size; + *rsize = recv_size; + if (timeout == 0) { + return CM_SUCCESS; + } + + int32 poll_wait = timeout > CM_POLL_WAIT ? CM_POLL_WAIT : timeout; while (remain_size > 0) { - CM_RETURN_IFERR(cs_tcp_wait(link, CS_WAIT_FOR_READ, CM_POLL_WAIT, &ready)); + CM_RETURN_IFERR(cs_tcp_wait(link, CS_WAIT_FOR_READ, poll_wait, &ready)); if (!ready) { - wait_interval += CM_POLL_WAIT; + wait_interval += poll_wait; if (wait_interval >= timeout) { CM_THROW_ERROR(ERR_TCP_TIMEOUT, "recv data"); return CM_ERROR; @@ -605,9 +631,9 @@ status_t cs_tcp_recv_timed(tcp_link_t *link, char *buf, uint32 size, uint32 time continue; } - CM_RETURN_IFERR(cs_tcp_recv(link, buf + offset, remain_size, &recv_size, NULL)); + CM_RETURN_IFERR(cs_tcp_recv_once(link, buf + (*rsize), remain_size, &recv_size, NULL)); remain_size -= recv_size; - offset += recv_size; + *rsize += recv_size; } return CM_SUCCESS; diff --git a/src/cm_protocol/cs_tcp.h b/src/cm_protocol/cs_tcp.h index 67462f7..160577d 100644 --- a/src/cm_protocol/cs_tcp.h +++ b/src/cm_protocol/cs_tcp.h @@ -99,7 +99,7 @@ void cs_shutdown_socket(socket_t sock); status_t cs_tcp_send(const tcp_link_t *link, const char *buf, uint32 size, int32 *send_size); status_t cs_tcp_send_timed(tcp_link_t *link, const char *buf, uint32 size, uint32 timeout); status_t cs_tcp_recv(const tcp_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event); -status_t cs_tcp_recv_timed(tcp_link_t *link, char *buf, uint32 size, uint32 timeout); +status_t cs_tcp_recv_timed(tcp_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 timeout); status_t cs_tcp_wait(tcp_link_t *link, uint32 wait_for, int32 timeout, bool32 *ready); status_t cs_tcp_init(void); void cs_tcp_deinit(); diff --git a/src/cm_protocol/cs_uds.c b/src/cm_protocol/cs_uds.c index 2af2b02..681f6ac 100644 --- a/src/cm_protocol/cs_uds.c +++ b/src/cm_protocol/cs_uds.c @@ -316,41 +316,64 @@ status_t cs_uds_recv(const uds_link_t *link, char *buf, uint32 size, int32 *recv return CM_SUCCESS; } -status_t cs_uds_recv_timed(uds_link_t *link, char *buf, uint32 size, uint32 timeout) +status_t cs_uds_recv_once(const uds_link_t *link, char *buf, uint32 size, int32 *recv_size) { - uint32 remain_size, offset; + if (size == 0) { + *recv_size = 0; + return CM_SUCCESS; + } + + int32 rsize = (int32)recv(link->sock, buf, size, 0); + if (rsize > 0) { + *recv_size = rsize; + return CM_SUCCESS; + } + return CM_ERROR; +} + +status_t cs_uds_recv_timed(uds_link_t *link, char *buf, uint32 size, int32 *rsize, uint32 timeout) +{ + uint32 remain_size; uint32 wait_interval = 0; int32 recv_size; bool32 ready = CM_FALSE; remain_size = size; - if (cs_uds_recv(link, buf, remain_size, &recv_size) != CM_SUCCESS) { + if (cs_uds_recv_once(link, buf, remain_size, &recv_size) != CM_SUCCESS) { + CM_THROW_ERROR(ERR_TCP_RECV, "uds", errno); return CM_ERROR; } remain_size -= recv_size; - offset = (uint32)recv_size; + *rsize = recv_size; + if (timeout == 0) { + return CM_SUCCESS; + } + + int32 poll_wait = timeout > CM_POLL_WAIT ? CM_POLL_WAIT : timeout; while (remain_size > 0) { - if (cs_uds_wait(link, CS_WAIT_FOR_READ, CM_POLL_WAIT, &ready) != CM_SUCCESS) { + if (cs_uds_wait(link, CS_WAIT_FOR_READ, poll_wait, &ready) != CM_SUCCESS) { return CM_ERROR; } if (!ready) { - wait_interval += CM_POLL_WAIT; + wait_interval += poll_wait; if (wait_interval >= timeout) { + CM_THROW_ERROR(ERR_TCP_TIMEOUT, "uds recv data"); return CM_ERROR; } continue; } - if (cs_uds_recv(link, buf + offset, remain_size, &recv_size) != CM_SUCCESS) { + if (cs_uds_recv_once(link, buf + (*rsize), remain_size, &recv_size) != CM_SUCCESS) { + CM_THROW_ERROR(ERR_TCP_RECV, "uds", errno); return CM_ERROR; } remain_size -= recv_size; - offset += recv_size; + *rsize += recv_size; } return CM_SUCCESS; diff --git a/src/cm_protocol/cs_uds.h b/src/cm_protocol/cs_uds.h index 4fc619e..cce2bd9 100644 --- a/src/cm_protocol/cs_uds.h +++ b/src/cm_protocol/cs_uds.h @@ -83,7 +83,7 @@ void cs_uds_disconnect(uds_link_t *link); status_t cs_uds_send(const uds_link_t *link, const char *buf, uint32 size, int32 *send_size); status_t cs_uds_send_timed(uds_link_t *link, const char *buf, uint32 size, uint32 timeout); status_t cs_uds_recv(const uds_link_t *link, char *buf, uint32 size, int32 *recv_size); -status_t cs_uds_recv_timed(uds_link_t *link, char *buf, uint32 size, uint32 timeout); +status_t cs_uds_recv_timed(uds_link_t *link, char *buf, uint32 size, int32 *recv_size, uint32 timeout); status_t cs_uds_wait(uds_link_t *link, uint32 wait_for, int32 timeout, bool32 *ready); status_t cs_uds_create_listener(const char *name, socket_t *sock, uint16 permissions); int32 cs_uds_getsockname(socket_t sock_ready, cs_sockaddr_un_t *un); -- Gitee