diff --git a/services/sink/transport/include/distributed_input_sink_transport.h b/services/sink/transport/include/distributed_input_sink_transport.h index df48cc2967bd8419bbbf218dd4713ba4323574d4..cab916ca45a18d2c70ea70bd3d693e5eacd0f158 100644 --- a/services/sink/transport/include/distributed_input_sink_transport.h +++ b/services/sink/transport/include/distributed_input_sink_transport.h @@ -95,6 +95,8 @@ private: void RecordEventLog(const std::string &dhId, int32_t type, int32_t code, int32_t value); void RecordEventLog(const std::vector &events); + + void DoSendMsgBatch(const int32_t sessionId, const std::vector &events); private: std::string mySessionName_; std::shared_ptr eventHandler_; diff --git a/services/sink/transport/src/distributed_input_sink_transport.cpp b/services/sink/transport/src/distributed_input_sink_transport.cpp index c5b74f262045434d365f742976842a40bbc799fc..38362866a70ac8eb2c50425c1aa2cfd982e86869 100644 --- a/services/sink/transport/src/distributed_input_sink_transport.cpp +++ b/services/sink/transport/src/distributed_input_sink_transport.cpp @@ -37,6 +37,10 @@ namespace OHOS { namespace DistributedHardware { namespace DistributedInput { +namespace { + // each time, we send msg batch with MAX 20 events. + constexpr int32_t MSG_BTACH_MAX_SIZE = 20; +} DistributedInputSinkTransport::DistributedInputSinkTransport() : mySessionName_("") { std::shared_ptr runner = AppExecFwk::EventRunner::Create(true); @@ -230,6 +234,25 @@ void DistributedInputSinkTransport::SendKeyStateNodeMsgBatch(const int32_t sessi } DHLOGI("SendKeyStateNodeMsgBatch sessionId: %d, event size: %d ", sessionId, events.size()); + int32_t cnt = 0; + std::vector eventBatch; + for (auto ev : events) { + eventBatch.push_back(ev); + cnt++; + if (cnt == MSG_BTACH_MAX_SIZE) { + DoSendMsgBatch(sessionId, eventBatch); + eventBatch.clear(); + cnt = 0; + } + } + + if (!eventBatch.empty()) { + DoSendMsgBatch(sessionId, eventBatch); + } +} + +void DistributedInputSinkTransport::DoSendMsgBatch(const int32_t sessionId, const std::vector &events) +{ int64_t currentTimeNs = GetCurrentTimeUs() * 1000LL; std::shared_ptr eventsJsonArr = std::make_shared(); for (const auto &ev : events) { diff --git a/services/transportbase/src/distributed_input_transport_base.cpp b/services/transportbase/src/distributed_input_transport_base.cpp index f3344d7bbe1c618adcc6282fd8379806e190481f..06a021f3ae56d7f4a8ec3b5d3dfaff09cf0b9c3a 100644 --- a/services/transportbase/src/distributed_input_transport_base.cpp +++ b/services/transportbase/src/distributed_input_transport_base.cpp @@ -521,7 +521,7 @@ void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std:: int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message) { if (message.size() > MSG_MAX_SIZE) { - DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE"); + DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE, msg size: %d", message.size()); return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE; } uint8_t *buf = reinterpret_cast(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));