diff --git a/device/services/ipc/include/socket_context.h b/device/services/ipc/include/socket_context.h index d16cd4ae64d9a205852bac295fb0f4be5a427120..e456aa334dcba2e4876e72bbf83d09420f8ca8d6 100644 --- a/device/services/ipc/include/socket_context.h +++ b/device/services/ipc/include/socket_context.h @@ -21,6 +21,8 @@ #include #endif #include +#include +#include "service_entry.h" #if defined(__i386__) || defined(__x86_64__) const static char DEFAULT_UNIX_SOCKET_PATH[] = "hiprofiler_unix_socket"; @@ -87,6 +89,8 @@ protected: private: static bool ReceiveData(int sock, uint8_t* databuf, uint32_t size); static void* UnixSocketRecv(void* pp, void (*callback)() = nullptr); + static bool AdjustBufferSize(std::vector& buf, ProtocolHead*& pph, uint32_t protoSize); + static void ProcessProtocol(SocketContext* pssr, ProtocolHead* pph, uint32_t head_size); std::thread recvThread_; }; diff --git a/device/services/ipc/include/unix_socket_server.h b/device/services/ipc/include/unix_socket_server.h index 606bf187038e5ac7832a9209f36e22792064f427..dd28dad64c47444cd3d3b4cc0599837b1b3300ed 100644 --- a/device/services/ipc/include/unix_socket_server.h +++ b/device/services/ipc/include/unix_socket_server.h @@ -39,6 +39,10 @@ public: private: void UnixSocketAccept(void (*callback)(int) = nullptr); + bool InitEpoll(); + void EventLoop(void (*callback)(int) = nullptr); + bool HandleAcceptEvent(); + void HandleHangupEvent(int fd, void (*callback)(int) = nullptr); std::map> socketClients_; std::thread acceptThread_; std::mutex mtx_; diff --git a/device/services/ipc/src/proto_encoder_plugin_generator.cpp b/device/services/ipc/src/proto_encoder_plugin_generator.cpp index 65f72ae0f39ed874ab96494a37ece1ebd6963a3c..8fe2b20e5f43a47d424f3e1b43d7524a1d282d7e 100755 --- a/device/services/ipc/src/proto_encoder_plugin_generator.cpp +++ b/device/services/ipc/src/proto_encoder_plugin_generator.cpp @@ -187,123 +187,147 @@ public: // field->is_repeated() for (int i = 0; i < message->field_count(); ++i) { const FieldDescriptor* field = message->field(i); - if (field->type() == FieldDescriptor::TYPE_MESSAGE) { - printer_->Print("inline @typename@* @mora@_@name@()\n", - "mora", field->is_repeated()?"add":"mutable", - "typename", field->message_type()->name(), - "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("return AddSubMessage<@typename@>(FIELDID_@name@);\n", - "typename", field->message_type()->name(), - "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - } else if (field->type() == FieldDescriptor::TYPE_BYTES) { - printer_->Print("inline void set_@name@(const void* bytes, uint32_t size)\n", - "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("AddBytes(FIELDID_@name@, bytes, size);\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - printer_->Print("void set_@name@(GetDataCallback getData)\n", "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("return AddBytesByCallBack(FIELDID_@name@, getData);\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - printer_->Print("inline RandomWriteCtx* startAdd_@name@()\n", "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("return StartAddBytes(FIELDID_@name@);\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - printer_->Print("inline void finishAdd_@name@(int32_t size)\n", "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("return FinishAddBytes(size);\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - } else if (field->type() == FieldDescriptor::TYPE_STRING) { - printer_->Print("inline void @sora@_@name@(const std::string& str)\n", - "sora", field->is_repeated()?"add":"set", - "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("AddBytes(FIELDID_@name@, str.data(), str.size());\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - printer_->Print("inline void @sora@_@name@(std::string&& str)\n", - "sora", field->is_repeated()?"add":"set", - "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("AddBytes(FIELDID_@name@, str.data(), str.size());\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - printer_->Print("inline void @sora@_@name@(const char* str)\n", - "sora", field->is_repeated()?"add":"set", - "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("AddBytes(FIELDID_@name@, str, strlen(str));\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - printer_->Print("inline void @sora@_@name@(const char* str, uint32_t len)\n", - "sora", field->is_repeated()?"add":"set", - "name", Tolowercase(field->name())); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("AddBytes(FIELDID_@name@, str, len);\n", "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - } else { - // varint, fix32, fix64 - printer_->Print("inline void @sora@_@name@(@paramtype@ v)\n", - "sora", field->is_repeated()?"add":"set", "name", Tolowercase(field->name()), - "paramtype", GetParamType(field)); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("@type@(FIELDID_@name@, v);\n", "type", - GetInnerType(field), "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - // packed - if (!field->is_repeated()) { - continue; - } - if (field->type() == FieldDescriptor::TYPE_SINT32 || - field->type() == FieldDescriptor::TYPE_SINT64) { - perror("repeated signed(zigzag) fields are not supported in libprotobuf\n"); - continue; - } - if (!field->is_packed()) { - continue; - } - printer_->Print("inline void add_@name@(const @paramtype@* array, uint32_t size)\n", - "name", Tolowercase(field->name()), - "paramtype", GetParamType(field)); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("@type@(FIELDID_@name@, array, size);\n", - "type", GetInnerType(field, true), "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); - - printer_->Print("inline void add_@name@(const std::vector<@paramtype@>& array)\n", - "name", Tolowercase(field->name()), - "paramtype", GetParamType(field)); - printer_->Print("{\n"); - printer_->Indent(); - printer_->Print("@type@(FIELDID_@name@, array.data(), array.size());\n", - "type", GetInnerType(field, true), "name", field->name()); - printer_->Outdent(); - printer_->Print("}\n"); + switch (field->type()) { + case FieldDescriptor::TYPE_MESSAGE: + GenerateMessageField(field); + break; + case FieldDescriptor::TYPE_BYTES: + GenerateBytesField(field); + break; + case FieldDescriptor::TYPE_STRING: + GenerateStringField(field); + break; + default: + GeneratePrimitiveField(field); + break; } } } + void GenerateMessageField(const FieldDescriptor* field) + { + printer_->Print("inline @typename@* @mora@_@name@()\n", + "mora", field->is_repeated() ? "add" : "mutable", + "typename", field->message_type()->name(), + "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("return AddSubMessage<@typename@>(FIELDID_@name@);\n", + "typename", field->message_type()->name(), + "name", field->name()); + }); + } + + void GenerateBytesField(const FieldDescriptor* field) + { + printer_->Print("inline void set_@name@(const void* bytes, uint32_t size)\n", + "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("AddBytes(FIELDID_@name@, bytes, size);\n", "name", field->name()); + }); + + printer_->Print("void set_@name@(GetDataCallback getData)\n", "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("return AddBytesByCallBack(FIELDID_@name@, getData);\n", "name", field->name()); + }); + + printer_->Print("inline RandomWriteCtx* startAdd_@name@()\n", "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("return StartAddBytes(FIELDID_@name@);\n", "name", field->name()); + }); + + printer_->Print("inline void finishAdd_@name@(int32_t size)\n", "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("return FinishAddBytes(size);\n", "name", field->name()); + }); + } + + void GenerateStringField(const FieldDescriptor* field) + { + const std::string prefix = field->is_repeated() ? "add" : "set"; + printer_->Print("inline void @sora@_@name@(const std::string& str)\n", + "sora", prefix, + "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("AddBytes(FIELDID_@name@, str.data(), str.size());\n", "name", field->name()); + }); + + printer_->Print("inline void @sora@_@name@(std::string&& str)\n", + "sora", prefix, + "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("AddBytes(FIELDID_@name@, str.data(), str.size());\n", "name", field->name()); + }); + + printer_->Print("inline void @sora@_@name@(const char* str)\n", + "sora", prefix, + "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("AddBytes(FIELDID_@name@, str, strlen(str));\n", "name", field->name()); + }); + + printer_->Print("inline void @sora@_@name@(const char* str, uint32_t len)\n", + "sora", prefix, + "name", Tolowercase(field->name())); + PrintBlock([&]() { + printer_->Print("AddBytes(FIELDID_@name@, str, len);\n", "name", field->name()); + }); + } + + void GeneratePrimitiveField(const FieldDescriptor* field) { + // varint, fix32, fix64 + const std::string prefix = field->is_repeated() ? "add" : "set"; + printer_->Print("inline void @sora@_@name@(@paramtype@ v)\n", + "sora", prefix, + "name", Tolowercase(field->name()), + "paramtype", GetParamType(field)); + PrintBlock([&]() { + printer_->Print("@type@(FIELDID_@name@, v);\n", + "type", GetInnerType(field), + "name", field->name()); + }); + // packed + if (!field->is_repeated()) { + return; + } + + if (field->type() == FieldDescriptor::TYPE_SINT32 || + field->type() == FieldDescriptor::TYPE_SINT64) { + perror("repeated signed(zigzag) fields are not supported in libprotobuf\n"); + return; + } + + if (!field->is_packed()) { + return; + } + + printer_->Print("inline void add_@name@(const @paramtype@* array, uint32_t size)\n", + "name", Tolowercase(field->name()), + "paramtype", GetParamType(field)); + PrintBlock([&]() { + printer_->Print("@type@(FIELDID_@name@, array, size);\n", + "type", GetInnerType(field, true), + "name", field->name()); + }); + + printer_->Print("inline void add_@name@(const std::vector<@paramtype@>& array)\n", + "name", Tolowercase(field->name()), + "paramtype", GetParamType(field)); + PrintBlock([&]() { + printer_->Print("@type@(FIELDID_@name@, array.data(), array.size());\n", + "type", GetInnerType(field, true), + "name", field->name()); + }); + } + + template + void PrintBlock(Func&& func) { + printer_->Print("{\n"); + printer_->Indent(); + func(); + printer_->Outdent(); + printer_->Print("}\n"); + } + static std::string GetParamType(const FieldDescriptor* field) { switch (field->type()) { diff --git a/device/services/ipc/src/socket_context.cpp b/device/services/ipc/src/socket_context.cpp index 5b7817998f4acf296d3b9779cfed6f90a651f4d3..e626db0e6b8fbb495c8ed823f50a66d4cc9bcb0b 100644 --- a/device/services/ipc/src/socket_context.cpp +++ b/device/services/ipc/src/socket_context.cpp @@ -101,60 +101,77 @@ bool SocketContext::ReceiveData(int sock, uint8_t* databuf, uint32_t size) return true; } +bool SocketContext::AdjustBufferSize(std::vector& buf, ProtocolHead*& pph, uint32_t protoSize) +{ + if (protoSize > buf.size()) { + if (protoSize > PROTO_SIZE_MAX) { + PROFILER_LOG_ERROR(LOG_CORE, "buffer size out of range %d/%d", protoSize, PROTO_SIZE_MAX); + return false; + } + uint32_t newSize = ((protoSize / MEMORY_BLOCK_UNIT) + 1) * MEMORY_BLOCK_UNIT; + buf.resize(newSize); + pph = (ProtocolHead*)buf.data(); + } + return true; +} + +void SocketContext::ProcessProtocol(SocketContext* pssr, ProtocolHead* pph, uint32_t head_size) +{ + switch (pph->protoType & PROTOCOL_TYPE_FILTER) { + case PROTOCOL_TYPE_RAW: + pssr->RawProtocolProc(pph->protoType & (~PROTOCOL_TYPE_FILTER), pph->datas, pph->protoSize - head_size); + break; + case PROTOCOL_TYPE_PROTOBUF: + if (pssr->serviceBase_ != nullptr) { + pssr->serviceBase_->ProtocolProc(*pssr, pph->protoType & (~PROTOCOL_TYPE_FILTER), pph->datas, + pph->protoSize - head_size); + } + break; + default: + PROFILER_LOG_ERROR(LOG_CORE, "unknown protocol %d", pph->protoType); + break; + } +} + void* SocketContext::UnixSocketRecv(void* pp, void (*callback)()) { pthread_setname_np(pthread_self(), "UnixSocketRecv"); - uint32_t bufferSize = MEMORY_BLOCK_UNIT; - SocketContext* pssr = (SocketContext*)pp; - std::vector buf(bufferSize); + CHECK_TRUE(pssr->socketHandle_ != -1, nullptr, "UnixSocketRecv pssr->socketHandle_ == -1"); - struct ProtocolHead* pph = (struct ProtocolHead*)buf.data(); - uint32_t head_size = sizeof(struct ProtocolHead); + const uint32_t head_size = sizeof(ProtocolHead); + uint32_t bufferSize = MEMORY_BLOCK_UNIT; + std::vector buf(bufferSize); + ProtocolHead* pph = (ProtocolHead*)buf.data(); - CHECK_TRUE(pssr->socketHandle_ != -1, nullptr, "UnixSocketRecv pssr->socketHandle_ ==-1"); while (pssr->socketHandle_ >= 0) { if (!ReceiveData(pssr->socketHandle_, buf.data(), head_size)) { PROFILER_LOG_DEBUG(LOG_CORE, "====== IPC LOST CONNECT ======"); break; } - if (pph->protoSize > bufferSize) { - if (pph->protoSize > PROTO_SIZE_MAX) { - PROFILER_LOG_ERROR(LOG_CORE, "buffer size out of range %d/%d", pph->protoSize, PROTO_SIZE_MAX); - break; - } - bufferSize = (pph->protoSize / MEMORY_BLOCK_UNIT + 1) * MEMORY_BLOCK_UNIT; - buf.resize(bufferSize); - pph = (struct ProtocolHead*)buf.data(); + + if (!AdjustBufferSize(buf, pph, pph->protoSize)) { + break; } + if (pph->protoSize < head_size) { PROFILER_LOG_ERROR(LOG_CORE, "pph->protoSize is less than head_size!"); break; } + if (!ReceiveData(pssr->socketHandle_, buf.data() + head_size, pph->protoSize - head_size)) { PROFILER_LOG_DEBUG(LOG_CORE, "====== IPC LOST CONNECT ======"); break; } - switch (pph->protoType & PROTOCOL_TYPE_FILTER) { - case PROTOCOL_TYPE_RAW: { - pssr->RawProtocolProc(pph->protoType & (~PROTOCOL_TYPE_FILTER), pph->datas, pph->protoSize - head_size); - break; - } - case PROTOCOL_TYPE_PROTOBUF: - if (pssr->serviceBase_ != nullptr) { - pssr->serviceBase_->ProtocolProc(*pssr, pph->protoType & (~PROTOCOL_TYPE_FILTER), pph->datas, - pph->protoSize - head_size); - } - break; - default: - PROFILER_LOG_ERROR(LOG_CORE, "unknown protocol %d", pph->protoType); - break; - } + + ProcessProtocol(pssr, pph, head_size); } + if (callback) { callback(); } + pssr->clientState_ = CLIENT_STAT_THREAD_EXITED; PROFILER_LOG_DEBUG(LOG_CORE, "UnixSocketRecv thread exit"); return nullptr; diff --git a/device/services/ipc/src/unix_socket_server.cpp b/device/services/ipc/src/unix_socket_server.cpp index 28479d4918dbbf6d93082a2a3042620a0b20048b..5a4f7ff2a6b05236f65085233a7a18b0940e8efb 100644 --- a/device/services/ipc/src/unix_socket_server.cpp +++ b/device/services/ipc/src/unix_socket_server.cpp @@ -78,18 +78,26 @@ void UnixSocketServer::RemoveContext(int fd) PROFILER_LOG_ERROR(LOG_CORE, "RemoveContext Client %d not exist", fd); } } - -void UnixSocketServer::UnixSocketAccept(void (*callback)(int)) +bool UnixSocketServer::InitEpoll() { - pthread_setname_np(pthread_self(), "UnixSocketAccept"); - CHECK_TRUE(socketHandle_ != -1, NO_RETVAL, "Unix Socket Accept socketHandle_ == -1"); epfd_ = epoll_create(1); + if (epfd_ == -1) { + PROFILER_LOG_ERROR(LOG_CORE, "epoll_create failed, errno: %s", strerror(errno)); + return false; + } + struct epoll_event evt; evt.data.fd = socketHandle_; evt.events = EPOLLIN | EPOLLHUP; - CHECK_TRUE(epoll_ctl(epfd_, EPOLL_CTL_ADD, socketHandle_, &evt) != -1, NO_RETVAL, "Unix Socket Server Exit"); + CHECK_TRUE(epoll_ctl(epfd_, EPOLL_CTL_ADD, socketHandle_, &evt) != -1, false, "Unix Socket Server Exit"); + return true; +} + +void UnixSocketServer::EventLoop(void (*callback)(int)) +{ struct epoll_event events[EPOLL_MAX_TASK_COUNT]; int retryCount = 0; + while (socketHandle_ != -1) { int nfds = epoll_wait(epfd_, events, EPOLL_MAX_TASK_COUNT, EPOLL_WAIT_TIMEOUT); // timeout value set 1000. if (nfds == -1) { @@ -101,50 +109,75 @@ void UnixSocketServer::UnixSocketAccept(void (*callback)(int)) return; } } + for (int32_t i = 0; i < nfds; ++i) { if (events[i].events & EPOLLIN) { - int clientSocket = accept(socketHandle_, nullptr, nullptr); - CHECK_TRUE(clientSocket != -1, NO_RETVAL, "Accept Failed"); - PROFILER_LOG_INFO(LOG_CORE, "Accept A Client %d", clientSocket); - - struct epoll_event clientEvt; - clientEvt.data.fd = clientSocket; - clientEvt.events = EPOLLHUP; - CHECK_TRUE(epoll_ctl(epfd_, EPOLL_CTL_ADD, clientSocket, &clientEvt) != -1, - NO_RETVAL, "Unix Socket Server Exit"); - std::unique_lock lock(mtx_); - if (socketClients_.find(clientSocket) == socketClients_.end()) { - PROFILER_LOG_DEBUG(LOG_CORE, "new socketClients_ socketClients_.size() = %zu", - socketClients_.size()); - socketClients_[clientSocket] = std::make_shared(clientSocket, *serviceEntry_); - } else { - PROFILER_LOG_ERROR(LOG_CORE, "Client %d exist", clientSocket); - } + CHECK_TRUE(HandleAcceptEvent(), NO_RETVAL, "HandleAcceptEvent failed"); } else if (events[i].events & EPOLLHUP) { - std::unique_lock lock(mtx_); - if (socketClients_.find(events[i].data.fd) != socketClients_.end()) { - struct epoll_event delEvt; - delEvt.data.fd = events[i].data.fd; - delEvt.events = EPOLLHUP; - if (epoll_ctl(epfd_, EPOLL_CTL_DEL, events[i].data.fd, &delEvt) == -1) { - PROFILER_LOG_ERROR(LOG_CORE, "UnixSocketServer epoll_ctl failed, errno: %s", strerror(errno)); - } - lock.unlock(); - if (callback != nullptr) { - callback(events[i].data.fd); - } - std::unique_lock socketMapLock(mtx_); - PROFILER_LOG_DEBUG(LOG_CORE, "socketClients disconnect socketClients_.size() = %zu", - socketClients_.size()); - socketClients_.erase(events[i].data.fd); - } else { - PROFILER_LOG_ERROR(LOG_CORE, "Client %d not exist", events[i].data.fd); - } + HandleHangupEvent(events[i].data.fd, callback); } } } } +bool UnixSocketServer::HandleAcceptEvent() +{ + int clientSocket = accept(socketHandle_, nullptr, nullptr); + CHECK_TRUE(clientSocket != -1, false, "Accept Failed"); + PROFILER_LOG_INFO(LOG_CORE, "Accept A Client %d", clientSocket); + + struct epoll_event clientEvt; + clientEvt.data.fd = clientSocket; + clientEvt.events = EPOLLHUP; + + CHECK_TRUE(epoll_ctl(epfd_, EPOLL_CTL_ADD, clientSocket, &clientEvt) != -1, + false, "Unix Socket Server Exit"); + + std::unique_lock lock(mtx_); + if (socketClients_.find(clientSocket) == socketClients_.end()) { + PROFILER_LOG_DEBUG(LOG_CORE, "new socketClients_ socketClients_.size() = %zu", socketClients_.size()); + socketClients_[clientSocket] = std::make_shared(clientSocket, *serviceEntry_); + } else { + PROFILER_LOG_ERROR(LOG_CORE, "Client %d exist", clientSocket); + } + return true; +} + +void UnixSocketServer::HandleHangupEvent(int fd, void (*callback)(int)) +{ + std::unique_lock lock(mtx_); + if (socketClients_.find(fd) != socketClients_.end()) { + struct epoll_event delEvt; + delEvt.data.fd = fd; + delEvt.events = EPOLLHUP; + + if (epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, &delEvt) == -1) { + PROFILER_LOG_ERROR(LOG_CORE, "UnixSocketServer epoll_ctl failed, errno: %s", strerror(errno)); + } + lock.unlock(); + + if (callback != nullptr) { + callback(fd); + } + + std::unique_lock socketMapLock(mtx_); + PROFILER_LOG_DEBUG(LOG_CORE, "socketClients disconnect socketClients_.size() = %zu", socketClients_.size()); + socketClients_.erase(fd); + } else { + PROFILER_LOG_ERROR(LOG_CORE, "Client %d not exist", fd); + } +} + +void UnixSocketServer::UnixSocketAccept(void (*callback)(int)) +{ + pthread_setname_np(pthread_self(), "UnixSocketAccept"); + CHECK_TRUE(socketHandle_ != -1, NO_RETVAL, "Unix Socket Accept socketHandle_ == -1"); + + CHECK_TRUE(InitEpoll(), NO_RETVAL, "InitEpoll failed"); + + EventLoop(callback); +} + bool UnixSocketServer::StartServer(const std::string& addrname, ServiceEntry& p, void (*callback)(int)) { CHECK_TRUE(socketHandle_ == -1, false, "StartServer FAIL socketHandle_ != -1"); diff --git a/device/services/plugin_service/include/plugin_service.h b/device/services/plugin_service/include/plugin_service.h index 29fb314f060a07ac7d3983be0b31bec09f29b75b..ade0b496fda3566326a27162918ebda3cc2b5a34 100644 --- a/device/services/plugin_service/include/plugin_service.h +++ b/device/services/plugin_service/include/plugin_service.h @@ -115,6 +115,11 @@ public: private: bool StartService(const std::string& unixSocketName); + bool AddNewPlugin(const PluginInfo& pluginInfo); + bool UpdatePlugin(const PluginInfo& pluginInfo); + bool ProcessPluginStatus(const PluginResult& pr); + bool ProcessPluginData(const PluginResult& pr); + bool UpdatePluginOutFileName(const PluginResult& pr); SemaphorePtr GetSemaphore(uint32_t) const; void ReadShareMemoryOffline(PluginContext&); diff --git a/device/services/plugin_service/src/plugin_service.cpp b/device/services/plugin_service/src/plugin_service.cpp index 79ebf41f6798a12eddd421202f7bc835fa4b7406..59076b2615f14d745e1d3d6cc92e166bfecf07c6 100644 --- a/device/services/plugin_service/src/plugin_service.cpp +++ b/device/services/plugin_service/src/plugin_service.cpp @@ -342,64 +342,77 @@ PluginContextPtr PluginService::GetPluginContextById(uint32_t id) return pluginContext_[id]; } -bool PluginService::AddPluginInfo(const PluginInfo& pluginInfo) +bool PluginService::AddNewPlugin(const PluginInfo& pluginInfo) +{ + auto pluginCtx = std::make_shared(); + CHECK_NOTNULL(pluginCtx, false, "create PluginContext failed!"); + + ProfilerPluginCapability capability; + capability.set_path(pluginInfo.path); + capability.set_name(pluginInfo.name); + CHECK_TRUE(ProfilerCapabilityManager::GetInstance().AddCapability(capability), false, + "AddPluginInfo AddCapability FAIL"); + + pluginCtx->name = pluginInfo.name; + pluginCtx->path = pluginInfo.path; + pluginCtx->context = pluginInfo.context; + pluginCtx->config.set_name(pluginInfo.name); + pluginCtx->config.set_plugin_sha256(pluginInfo.sha256); + pluginCtx->profilerPluginState.set_name(pluginInfo.name); + pluginCtx->profilerPluginState.set_state(ProfilerPluginState::REGISTERED); + pluginCtx->sha256 = pluginInfo.sha256; + pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint; + pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData; + pluginCtx->outFileName = pluginInfo.outFileName; + pluginCtx->pluginVersion = pluginInfo.pluginVersion; + + uint32_t pluginId = ++pluginIdCounter_; + std::unique_lock lock(mutex_); + pluginContext_[pluginId] = pluginCtx; + nameIndex_[pluginInfo.name] = pluginId; + + PROFILER_LOG_DEBUG(LOG_CORE, "AddPluginInfo for %s done!", pluginInfo.name.c_str()); + return true; +} + +bool PluginService::UpdatePlugin(const PluginInfo& pluginInfo) { - if (nameIndex_.find(pluginInfo.name) == nameIndex_.end()) { // add new plugin - auto pluginCtx = std::make_shared(); - CHECK_NOTNULL(pluginCtx, false, "create PluginContext failed!"); - - ProfilerPluginCapability capability; - capability.set_path(pluginInfo.path); - capability.set_name(pluginInfo.name); - CHECK_TRUE(ProfilerCapabilityManager::GetInstance().AddCapability(capability), false, - "AddPluginInfo AddCapability FAIL"); - - pluginCtx->name = pluginInfo.name; - pluginCtx->path = pluginInfo.path; - pluginCtx->context = pluginInfo.context; - pluginCtx->config.set_name(pluginInfo.name); - pluginCtx->config.set_plugin_sha256(pluginInfo.sha256); - pluginCtx->profilerPluginState.set_name(pluginInfo.name); - pluginCtx->profilerPluginState.set_state(ProfilerPluginState::REGISTERED); + std::unique_lock lock(mutex_); + CHECK_TRUE(nameIndex_.count(pluginInfo.name) > 0, false, "plugin name %s not found!", pluginInfo.name.c_str()); + + uint32_t pluginId = nameIndex_[pluginInfo.name]; + CHECK_TRUE(pluginContext_.count(pluginId) > 0, false, "plugin id %u not found!", pluginId); + auto pluginCtx = pluginContext_[pluginId]; + + if (!pluginInfo.sha256.empty()) { pluginCtx->sha256 = pluginInfo.sha256; + } + if (pluginInfo.bufferSizeHint != 0) { pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint; + } + if (pluginInfo.isStandaloneFileData) { pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData; + } + if (!pluginInfo.outFileName.empty()) { pluginCtx->outFileName = pluginInfo.outFileName; + } + if (!pluginInfo.pluginVersion.empty()) { pluginCtx->pluginVersion = pluginInfo.pluginVersion; - - uint32_t pluginId = ++pluginIdCounter_; - std::unique_lock lock(mutex_); - pluginContext_[pluginId] = pluginCtx; - nameIndex_[pluginInfo.name] = pluginId; - } else { // update sha256 or bufferSizeHint - std::unique_lock lock(mutex_); - CHECK_TRUE(nameIndex_.count(pluginInfo.name) > 0, false, "plugin name %s not found!", pluginInfo.name.c_str()); - - uint32_t pluginId = nameIndex_[pluginInfo.name]; - CHECK_TRUE(pluginContext_.count(pluginId) > 0, false, "plugin id %u not found!", pluginId); - auto pluginCtx = pluginContext_[pluginId]; - - if (pluginInfo.sha256 != "") { - pluginCtx->sha256 = pluginInfo.sha256; - } - if (pluginInfo.bufferSizeHint != 0) { - pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint; - } - if (pluginInfo.isStandaloneFileData != false) { - pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData; - } - if (pluginInfo.outFileName != "") { - pluginCtx->outFileName = pluginInfo.outFileName; - } - if (pluginInfo.pluginVersion != "") { - pluginCtx->pluginVersion = pluginInfo.pluginVersion; - } } - PROFILER_LOG_DEBUG(LOG_CORE, "AddPluginInfo for %s done!", pluginInfo.name.c_str()); + PROFILER_LOG_DEBUG(LOG_CORE, "UpdatePluginInfo for %s done!", pluginInfo.name.c_str()); return true; } +bool PluginService::AddPluginInfo(const PluginInfo& pluginInfo) +{ + if (nameIndex_.find(pluginInfo.name) == nameIndex_.end()) { + return AddNewPlugin(pluginInfo); + } else { + return UpdatePlugin(pluginInfo); + } +} + bool PluginService::GetPluginInfo(const std::string& pluginName, PluginInfo& pluginInfo) { uint32_t pluginId = 0; @@ -547,6 +560,76 @@ void PluginService::FlushAllData(const std::string& pluginName) PROFILER_LOG_INFO(LOG_CORE, "FlushAllData for %s done!", pluginName.c_str()); } +bool PluginService::ProcessPluginStatus(const PluginResult& pr) +{ + ProfilerPluginState status = pr.status(); + uint32_t pluginId = pr.plugin_id(); + + if (!COMMON::CheckSubscribeVersion(status.version())) { + return true; + } + + PluginContextPtr pluginCtx = GetPluginContextById(pluginId); + CHECK_NOTNULL(pluginCtx, false, "plugin id %u not found!", pluginId); + + if (pluginCtx->profilerStateRepeater == nullptr) { + PROFILER_LOG_ERROR(LOG_CORE, "AppendResult profilerStateRepeater==nullptr %s %d", + status.name().c_str(), pluginId); + return false; + } + + if (!pluginCtx->profilerStateRepeater->PutPluginData(std::make_shared(status))) { + return false; + } + + return true; +} + +bool PluginService::ProcessPluginData(const PluginResult& pr) +{ + if (pr.data().empty()) { + return true; + } + + PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult Size : %zu", pr.data().size()); + + uint32_t pluginId = pr.plugin_id(); + PluginContextPtr pluginCtx = GetPluginContextById(pluginId); + CHECK_NOTNULL(pluginCtx, false, "plugin id %u not found!", pluginId); + + if (pluginCtx->profilerDataRepeater == nullptr) { + PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult profilerDataRepeater==nullptr %s %d", + pr.status().name().c_str(), pluginId); + return false; + } + + auto pluginData = std::make_shared(); + pluginData->set_name(pr.status().name()); + pluginData->set_status(0); + pluginData->set_data(pr.data()); + + if (!pluginCtx->profilerDataRepeater->PutPluginData(pluginData)) { + return false; + } + + return true; +} + +bool PluginService::UpdatePluginOutFileName(const PluginResult& pr) +{ + if (pr.data().size() > 0 || pr.out_file_name().empty()) { + return true; + } + + uint32_t pluginId = pr.plugin_id(); + std::unique_lock lock(mutex_); + CHECK_TRUE(pluginContext_.count(pluginId) > 0, false, "plugin id %u not found!", pluginId); + + pluginContext_[pluginId]->outFileName = pr.out_file_name(); + + return true; +} + bool PluginService::AppendResult(NotifyResultRequest& request) { pluginCommandBuilder_->GetedCommandResponse(request.command_id()); @@ -557,43 +640,22 @@ bool PluginService::AppendResult(NotifyResultRequest& request) int size = request.result_size(); PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult size:%d, cmd id:%d", size, request.command_id()); + for (int i = 0; i < size; i++) { - PluginResult pr = request.result(i); - ProfilerPluginState status = pr.status(); - uint32_t pluginId = pr.plugin_id(); - if (COMMON::CheckSubscribeVersion(status.version())) { - PluginContextPtr pluginCtx = GetPluginContextById(pluginId); - CHECK_NOTNULL(pluginCtx, false, "plugin id %u not found!", pluginId); - if (pluginCtx->profilerStateRepeater == nullptr) { - PROFILER_LOG_ERROR(LOG_CORE, "AppendResult profilerStateRepeater==nullptr %s %d", - pr.status().name().c_str(), pluginId); - return false; - } - if (!pluginCtx->profilerStateRepeater->PutPluginData(std::make_shared(status))) { - return false; - } + const PluginResult& pr = request.result(i); + + if (!ProcessPluginStatus(pr)) { + return false; + } + + if (!ProcessPluginData(pr)) { + return false; + } + + if (!UpdatePluginOutFileName(pr)) { + return false; } - if (pr.data().size() > 0) { - PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult Size : %zu", pr.data().size()); - PluginContextPtr pluginCtx = GetPluginContextById(pluginId); - CHECK_NOTNULL(pluginCtx, false, "plugin id %u not found!", pluginId); - if (pluginCtx->profilerDataRepeater == nullptr) { - PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult profilerDataRepeater==nullptr %s %d", - pr.status().name().c_str(), pluginId); - return false; - } - auto pluginData = std::make_shared(); - pluginData->set_name(pr.status().name()); - pluginData->set_status(0); - pluginData->set_data(pr.data()); - if (!pluginCtx->profilerDataRepeater->PutPluginData(pluginData)) { - return false; - } - } else if (pr.out_file_name() != "") { // updata plugin outFileName - std::unique_lock lock(mutex_); - CHECK_TRUE(pluginContext_.count(pluginId) > 0, false, "plugin id %u not found!", pluginId); - pluginContext_[pluginId]->outFileName = pr.out_file_name(); - } else { + if (pr.data().empty() && pr.out_file_name().empty()) { PROFILER_LOG_DEBUG(LOG_CORE, "Flush?Data From ShareMemory?"); } } diff --git a/device/services/profiler_service/src/profiler_service.cpp b/device/services/profiler_service/src/profiler_service.cpp index b5e37d1e815ccdf6dfd37bd5753a30fcdd15b01e..584b337773a9c86f0e97b6e63a16d067fb78a8bb 100644 --- a/device/services/profiler_service/src/profiler_service.cpp +++ b/device/services/profiler_service/src/profiler_service.cpp @@ -424,89 +424,168 @@ bool ProfilerService::RemoveSessionContext(uint32_t sessionId) return false; } -void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName, - const std::string& outputFile, const std::string& pluginVersion) +namespace { +bool CheckParamsAndPaths(const std::string& pluginName, const std::string& outputFile, + const std::string& resultFile, + std::string& checkedOutputFile, std::string& checkedResultFile) { if (pluginName.empty() || outputFile.empty()) { PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)", pluginName.c_str(), outputFile.c_str()); - return; + return false; } - auto retFile = COMMON::CheckNotExistsFilePath(outputFile); - if (!retFile.first) { + + auto retOutput = COMMON::CheckNotExistsFilePath(outputFile); + if (!retOutput.first) { PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, outputFile.c_str()); - return; - } - std::ifstream fsFile {}; // read from output file - fsFile.open(retFile.second, std::ios_base::in | std::ios_base::binary); - if (!fsFile.good()) { - PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate()); - return; + return false; } - auto targetFile = COMMON::CheckNotExistsFilePath(resultFile); - if (!targetFile.first) { + checkedOutputFile = retOutput.second; + + auto retResult = COMMON::CheckNotExistsFilePath(resultFile); + if (!retResult.first) { PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, resultFile.c_str()); - return; + return false; } - std::ofstream fsTarget {}; // write to profiler ouput file - fsTarget.open(targetFile.second, std::ios_base::in | std::ios_base::out | std::ios_base::binary); - if (!fsTarget.good()) { - PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate()); - return; + checkedResultFile = retResult.second; + + return true; +} + +bool OpenFiles(const std::string& inputFile, const std::string& outputFile, + std::ifstream& fsInput, std::ofstream& fsOutput) +{ + fsInput.open(inputFile, std::ios_base::in | std::ios_base::binary); + if (!fsInput.good()) { + PROFILER_LOG_ERROR(LOG_CORE, "Open input file(%s) failed: %d", inputFile.c_str(), fsInput.rdstate()); + return false; } - fsTarget.seekp(0, std::ios_base::end); - int posFile = fsTarget.tellp(); // for update sha256 - TraceFileHeader header {}; + fsOutput.open(outputFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary); + if (!fsOutput.good()) { + PROFILER_LOG_ERROR(LOG_CORE, "Open output file(%s) failed: %d", outputFile.c_str(), fsOutput.rdstate()); + return false; + } + + return true; +} + +bool WriteHeader(std::ofstream& fsOutput, const std::string& pluginName, + const std::string& pluginVersion, uint64_t fileSize, + TraceFileHeader& header) +{ if (pluginName == "hiperf-plugin") { header.data_.dataType = DataType::HIPERF_DATA; } else { header.data_.dataType = DataType::STANDALONE_DATA; } - fsFile.seekg(0, std::ios_base::end); - uint64_t fileSize = (uint64_t)(fsFile.tellg()); header.data_.length += fileSize; - size_t pluginSize = sizeof(header.data_.standalonePluginName); - int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1); + + int ret = strncpy_s(header.data_.standalonePluginName, sizeof(header.data_.standalonePluginName), + pluginName.c_str(), sizeof(header.data_.standalonePluginName) - 1); if (ret != EOK) { PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str()); - return; + return false; } - pluginSize = sizeof(header.data_.pluginVersion); - ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1); + + ret = strncpy_s(header.data_.pluginVersion, sizeof(header.data_.pluginVersion), + pluginVersion.c_str(), sizeof(header.data_.pluginVersion) - 1); if (ret != EOK) { PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str()); - return; + return false; } - fsTarget.write(reinterpret_cast(&header), sizeof(header)); - if (!fsTarget.good()) { - PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate()); - return; + + fsOutput.write(reinterpret_cast(&header), sizeof(header)); + if (!fsOutput.good()) { + PROFILER_LOG_ERROR(LOG_CORE, "Write header to file failed: %d", fsOutput.rdstate()); + return false; } - SHA256_CTX sha256Ctx; - SHA256_Init(&sha256Ctx); + return true; +} + +bool CopyFileAndCalculateSHA256(std::ifstream& fsInput, std::ofstream& fsOutput, + uint64_t fileSize, SHA256_CTX& sha256Ctx) +{ constexpr uint64_t bufSize = 4 * 1024 * 1024; std::vector buf(bufSize); - uint64_t readSize = 0; - fsFile.seekg(0); - while ((readSize = std::min(bufSize, fileSize)) > 0) { - fsFile.read(buf.data(), readSize); - fsTarget.write(buf.data(), readSize); - if (!fsTarget.good()) { - PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate()); - return; + + fsInput.seekg(0); + SHA256_Init(&sha256Ctx); + + while (fileSize > 0) { + uint64_t readSize = std::min(bufSize, fileSize); + fsInput.read(buf.data(), readSize); + if (!fsInput.good() && !fsInput.eof()) { + PROFILER_LOG_ERROR(LOG_CORE, "Read input file failed"); + return false; } - fileSize -= readSize; + fsOutput.write(buf.data(), readSize); + if (!fsOutput.good()) { + PROFILER_LOG_ERROR(LOG_CORE, "Write output file failed: %d", fsOutput.rdstate()); + return false; + } + fileSize -= readSize; SHA256_Update(&sha256Ctx, buf.data(), readSize); } + + return true; +} + +bool UpdateHeaderWithSHA256(std::ofstream& fsOutput, const TraceFileHeader& header, int64_t posFile) +{ + fsOutput.seekp(posFile, std::ios_base::beg); + fsOutput.write(reinterpret_cast(&header), sizeof(header)); + if (!fsOutput.good()) { + PROFILER_LOG_ERROR(LOG_CORE, "Update header with SHA256 failed: %d", fsOutput.rdstate()); + return false; + } + return true; +} +} + +void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName, + const std::string& outputFile, const std::string& pluginVersion) +{ + std::string checkedOutputFile; + std::string checkedResultFile; + if (!CheckParamsAndPaths(pluginName, outputFile, resultFile, checkedOutputFile, checkedResultFile)) { + return; + } + + std::ifstream fsInput; + std::ofstream fsOutput; + if (!OpenFiles(checkedOutputFile, checkedResultFile, fsInput, fsOutput)) { + return; + } + + fsOutput.seekp(0, std::ios_base::end); + int64_t posFile = fsOutput.tellp(); + + fsInput.seekg(0, std::ios_base::end); + uint64_t fileSize = static_cast(fsInput.tellg()); + + TraceFileHeader header {}; + if (!WriteHeader(fsOutput, pluginName, pluginVersion, fileSize, header)) { + PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed", resultFile.c_str()); + return; + } + + SHA256_CTX sha256Ctx; + if (!CopyFileAndCalculateSHA256(fsInput, fsOutput, fileSize, sha256Ctx)) { + PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed", resultFile.c_str()); + return; + } + SHA256_Final(header.data_.sha256, &sha256Ctx); - fsTarget.seekp(posFile, std::ios_base::beg); - fsTarget.write(reinterpret_cast(&header), sizeof(header)); - fsFile.close(); - fsTarget.close(); + if (!UpdateHeaderWithSHA256(fsOutput, header, posFile)) { + return; + } + + fsInput.close(); + fsOutput.close(); PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str()); }