From 3736067046bdea2f1da3db4465cf3ecb8a318474 Mon Sep 17 00:00:00 2001 From: yanhui Date: Thu, 7 Nov 2024 11:25:07 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E9=80=8F=E4=BC=A0=E6=80=BB=E7=BA=BF=E9=94=99=E8=AF=AF=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yanhui Change-Id: I28d62e953129d43e81ac9378bdffb1bcd942731b --- .../communicator/src/communicator_context.cpp | 17 +++++++++++------ .../src/process_communicator_impl.cpp | 4 ++-- .../adapter/communicator/src/softbus_client.cpp | 17 ++++++++--------- .../adapter/communicator/src/softbus_client.h | 2 +- .../communicator/app_device_change_listener.h | 2 +- .../include/communicator/communicator_context.h | 2 +- .../communicator/process_communicator_impl.h | 2 +- .../app/src/kvstore_device_listener.cpp | 3 ++- .../app/src/kvstore_device_listener.h | 2 +- .../service/kvdb/kvdb_service_impl.cpp | 15 +++++++++++++-- 10 files changed, 41 insertions(+), 25 deletions(-) diff --git a/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp b/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp index 691b0aed2..ec11a5c8a 100644 --- a/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp @@ -17,6 +17,7 @@ #include "communicator_context.h" #include "log_print.h" #include "kvstore_utils.h" +#include "softbus_error_code.h" namespace OHOS::DistributedData { using KvUtils = OHOS::DistributedKv::KvStoreUtils; @@ -72,28 +73,32 @@ Status CommunicatorContext::UnRegSessionListener(const DevChangeListener *observ return Status::SUCCESS; } -void CommunicatorContext::NotifySessionReady(const std::string &deviceId) +void CommunicatorContext::NotifySessionReady(const std::string &deviceId, const int &errCode) { if (deviceId.empty()) { ZLOGE("deviceId empty"); return; } - devices_.Insert(deviceId, deviceId); + if (errCode == SOFTBUS_OK) { + devices_.Insert(deviceId, deviceId); + } DeviceInfo devInfo; devInfo.uuid = deviceId; { std::lock_guard lock(mutex_); for (const auto &observer : observers_) { if (observer != nullptr) { - observer->OnSessionReady(devInfo); + observer->OnSessionReady(devInfo, errCode); } } ZLOGI("Notify session begin, deviceId:%{public}s, observer count:%{public}zu", KvUtils::ToBeAnonymous(deviceId).c_str(), observers_.size()); } - std::lock_guard sessionLockGard(sessionMutex_); - if (closeListener_) { - closeListener_(deviceId); + if (errCode == SOFTBUS_OK) { + std::lock_guard sessionLockGard(sessionMutex_); + if (closeListener_) { + closeListener_(deviceId); + } } } diff --git a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp index 2de88fc75..6fb5e1377 100644 --- a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp @@ -221,7 +221,7 @@ void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const Devi onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE)); } -void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info) const +void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info, const int &errCode) const { std::lock_guard lock(sessionMutex_); if (sessionListener_ == nullptr) { @@ -229,7 +229,7 @@ void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info) const } DeviceInfos devInfos; devInfos.identifier = info.uuid; - sessionListener_(devInfos); + sessionListener_(devInfos, errCode); } std::shared_ptr ProcessCommunicatorImpl::GetExtendHeaderHandle(const ExtendInfo &info) diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp index 742628d5e..b01fadfbe 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -114,10 +114,8 @@ Status SoftBusClient::OpenConnect(const ISocketListener *listener) } ZLOGI("Bind Start, device:%{public}s socket:%{public}d type:%{public}u", KvStoreUtils::ToBeAnonymous(client->device_.deviceId).c_str(), clientSocket, type); - auto status = client->Open(clientSocket, QOS_INFOS[type % QOS_BUTT], listener); - if (status == Status::SUCCESS) { - Context::GetInstance().NotifySessionReady(client->device_.deviceId); - } + int32_t status = client->Open(clientSocket, QOS_INFOS[type % QOS_BUTT], listener); + Context::GetInstance().NotifySessionReady(client->device_.deviceId, status); client->isOpening_.store(false); }; Context::GetInstance().GetThreadPool()->Execute(task); @@ -138,7 +136,7 @@ Status SoftBusClient::CheckStatus() return Status::ERROR; } -Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener) +int32_t SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener) { int32_t status = ::Bind(socket, qos, QOS_COUNT, listener); ZLOGI("Bind %{public}s,session:%{public}s,socketId:%{public}d", @@ -148,14 +146,15 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe ZLOGE("[Bind] device:%{public}s socket failed, session:%{public}s,result:%{public}d", KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), status); ::Shutdown(socket); - return Status::NETWORK_ERROR; + return status; } UpdateExpireTime(); uint32_t mtu = 0; std::tie(status, mtu) = GetMtu(socket); if (status != SOFTBUS_OK) { - ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d", pipe_.pipeId.c_str(), socket_); - return Status::NETWORK_ERROR; + ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d, status:%{public}d", pipe_.pipeId.c_str(), socket_, + status); + return status; } { std::lock_guard lock(mutex_); @@ -166,7 +165,7 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe ZLOGI("open %{public}s, session:%{public}s success, socket:%{public}d", KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket_); ConnectManager::GetInstance()->OnSessionOpen(DmAdapter::GetInstance().GetDeviceInfo(device_.deviceId).networkId); - return Status::SUCCESS; + return status; } SoftBusClient::Time SoftBusClient::GetExpireTime() const diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h index 898a7ada7..4a9eeef21 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -52,7 +52,7 @@ public: bool isReuse = false; private: - Status Open(int32_t socket, const QosTV qos[], const ISocketListener *listener); + int32_t Open(int32_t socket, const QosTV qos[], const ISocketListener *listener); std::pair GetMtu(int32_t socket); Time CalcExpireTime() const; diff --git a/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h b/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h index 8d460299d..bba789978 100644 --- a/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h +++ b/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h @@ -32,7 +32,7 @@ public: { return ChangeLevelType::HIGH; } - API_EXPORT virtual void OnSessionReady(const DeviceInfo &info) const {} + API_EXPORT virtual void OnSessionReady(const DeviceInfo &info, const int &errCode) const {} }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/include/communicator/communicator_context.h b/services/distributeddataservice/adapter/include/communicator/communicator_context.h index a77a2db3a..1020c57de 100644 --- a/services/distributeddataservice/adapter/include/communicator/communicator_context.h +++ b/services/distributeddataservice/adapter/include/communicator/communicator_context.h @@ -36,7 +36,7 @@ public: std::shared_ptr GetThreadPool(); Status RegSessionListener(const DevChangeListener *observer); Status UnRegSessionListener(const DevChangeListener *observer); - void NotifySessionReady(const std::string &deviceId); + void NotifySessionReady(const std::string &deviceId, const int &errCode); void NotifySessionClose(const std::string &deviceId); void SetSessionListener(const OnCloseAble &closeAbleCallback); bool IsSessionReady(const std::string &deviceId); diff --git a/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h b/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h index ee32d1418..f16c43d54 100644 --- a/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h +++ b/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h @@ -57,7 +57,7 @@ public: std::vector GetRemoteOnlineDeviceInfosList() override; bool IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo) override; void OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const override; - void OnSessionReady(const DeviceInfo &info) const override; + void OnSessionReady(const DeviceInfo &info, const int &errCode) const override; API_EXPORT std::shared_ptr GetExtendHeaderHandle( const DistributedDB::ExtendInfo &info) override; diff --git a/services/distributeddataservice/app/src/kvstore_device_listener.cpp b/services/distributeddataservice/app/src/kvstore_device_listener.cpp index 1104cfc12..d3cd9cd52 100644 --- a/services/distributeddataservice/app/src/kvstore_device_listener.cpp +++ b/services/distributeddataservice/app/src/kvstore_device_listener.cpp @@ -34,8 +34,9 @@ void KvStoreDeviceListener::OnDeviceChanged( ZLOGI("device is %{public}d", type); } -void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info) const +void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info, const int &errCode) const { + (void)errCode; kvStoreDataService_.OnSessionReady(info); } diff --git a/services/distributeddataservice/app/src/kvstore_device_listener.h b/services/distributeddataservice/app/src/kvstore_device_listener.h index 037c367d1..849afdabe 100644 --- a/services/distributeddataservice/app/src/kvstore_device_listener.h +++ b/services/distributeddataservice/app/src/kvstore_device_listener.h @@ -26,7 +26,7 @@ public: void OnDeviceChanged( const AppDistributedKv::DeviceInfo &info, const AppDistributedKv::DeviceChangeType &type) const override; AppDistributedKv::ChangeLevelType GetChangeLevelType() const override; - void OnSessionReady(const AppDistributedKv::DeviceInfo &info) const override; + void OnSessionReady(const AppDistributedKv::DeviceInfo &info, const int &errCode = 0) const override; private: KvStoreDataService &kvStoreDataService_; diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp index 00911ffe6..9b61ca257 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp @@ -973,7 +973,7 @@ Status KVDBServiceImpl::DoSyncInOrder( if (uuids.empty()) { ZLOGW("no device seqId:0x%{public}" PRIx64 " remote:%{public}zu appId:%{public}s storeId:%{public}s", info.seqId, info.devices.size(), meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str()); - return Status::ERROR; + return Status::DEVICE_NOT_ONLINE; } if (IsNeedMetaSync(meta, uuids)) { auto recv = DeviceMatrix::GetInstance().GetRecvLevel(uuids[0], @@ -1122,7 +1122,18 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in std::to_string(info.syncId), DATA_TYPE, meta.dataType); std::map result; for (auto &[key, status] : dbResult) { - result[key] = ConvertDbStatus(status); + if (status < 0) { // pass on softbus error code + result[key] = static_cast(status); + } else { + if (status == DBStatus::COMM_FAILURE) { + if (DMAdapter::GetInstance().ToUUID(key).empty()) { + result[key] = Status::DEVICE_NOT_ONLINE; + } else { + result[key] = Status::PEER_DATABASE_NOT_EXIST; + } + } + result[key] = ConvertDbStatus(status); + } } for (const auto &device : info.devices) { auto it = result.find(device); -- Gitee From 2f590927e827dacdcd2899920682056d55de1871 Mon Sep 17 00:00:00 2001 From: yanhui Date: Mon, 11 Nov 2024 15:09:29 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E9=9D=9Eratelimit=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E7=9A=84=E9=80=8F=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yanhui Change-Id: I149e7aed49ada9886c0e051da6513ea67f8fb993 --- .../communicator/src/app_pipe_handler.cpp | 4 +-- .../communicator/src/app_pipe_handler.h | 2 +- .../adapter/communicator/src/app_pipe_mgr.cpp | 8 ++--- .../adapter/communicator/src/app_pipe_mgr.h | 2 +- .../src/communication_provider_impl.cpp | 4 +-- .../src/communication_provider_impl.h | 2 +- .../communicator/src/communicator_context.cpp | 2 +- .../src/process_communicator_impl.cpp | 17 +++++---- .../communicator/src/softbus_adapter.h | 6 ++-- .../src/softbus_adapter_standard.cpp | 36 +++++++++++-------- .../communicator/src/softbus_client.cpp | 7 ++++ .../adapter/communicator/src/softbus_client.h | 2 ++ .../communication_provider_impl_test.cpp | 20 +++++------ .../softbus_adapter_standard_test.cpp | 4 +-- .../communicator/app_device_change_listener.h | 2 +- .../communicator/communication_provider.h | 5 +-- .../communicator/communicator_context.h | 2 +- .../communicator/process_communicator_impl.h | 2 +- .../app/src/kvstore_device_listener.cpp | 2 +- .../app/src/kvstore_device_listener.h | 2 +- .../service/kvdb/kvdb_service_impl.cpp | 21 ++++++----- 21 files changed, 87 insertions(+), 65 deletions(-) diff --git a/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.cpp b/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.cpp index c29f60aa3..3820f25fe 100644 --- a/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.cpp +++ b/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.cpp @@ -43,8 +43,8 @@ AppPipeHandler::AppPipeHandler(const PipeInfo &pipeInfo) softbusAdapter_ = SoftBusAdapter::GetInstance(); } -Status AppPipeHandler::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, - uint32_t totalLength, const MessageInfo &info) +std::pair AppPipeHandler::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info) { return softbusAdapter_->SendData(pipeInfo, deviceId, dataInfo, totalLength, info); } diff --git a/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.h b/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.h index 2a2508c5d..1fddf667b 100644 --- a/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.h +++ b/services/distributeddataservice/adapter/communicator/src/app_pipe_handler.h @@ -42,7 +42,7 @@ public: // stop DataChangeListener to watch data change; Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo); // Send data to other device, function will be called back after sent to notify send result. - Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, + std::pair SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info); bool IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer); diff --git a/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.cpp b/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.cpp index f37941470..8073d0c83 100644 --- a/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.cpp +++ b/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.cpp @@ -58,13 +58,13 @@ Status AppPipeMgr::StopWatchDataChange(const AppDataChangeListener *observer, co } // Send data to other device, function will be called back after sent to notify send result. -Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, - uint32_t totalLength, const MessageInfo &info) +std::pair AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info) { if (dataInfo.length > DataBuffer::MAX_TRANSFER_SIZE || dataInfo.length == 0 || dataInfo.data == nullptr || pipeInfo.pipeId.empty() || deviceId.deviceId.empty()) { ZLOGW("Input is invalid, maxSize:%u, current size:%u", DataBuffer::MAX_TRANSFER_SIZE, dataInfo.length); - return Status::ERROR; + return std::make_pair(Status::ERROR, 0); } ZLOGD("pipeInfo:%s ,size:%u, total length:%u", pipeInfo.pipeId.c_str(), dataInfo.length, totalLength); std::shared_ptr appPipeHandler; @@ -73,7 +73,7 @@ Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, auto it = dataBusMap_.find(pipeInfo.pipeId); if (it == dataBusMap_.end()) { ZLOGW("pipeInfo:%s not found", pipeInfo.pipeId.c_str()); - return Status::KEY_NOT_FOUND; + return std::make_pair(Status::KEY_NOT_FOUND, 0); } appPipeHandler = it->second; } diff --git a/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.h b/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.h index 222248c5a..1ffb56599 100644 --- a/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.h +++ b/services/distributeddataservice/adapter/communicator/src/app_pipe_mgr.h @@ -36,7 +36,7 @@ public: Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo); // Send data to other device, function will be called back after sent to notify send result. - Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, + std::pair SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info); // start server Status Start(const PipeInfo &pipeInfo); diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp index 8b8eb4888..65869012d 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp @@ -48,8 +48,8 @@ Status CommunicationProviderImpl::StopWatchDataChange(const AppDataChangeListene return appPipeMgr_.StopWatchDataChange(observer, pipeInfo); } -Status CommunicationProviderImpl::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, - uint32_t totalLength, const MessageInfo &info) +std::pair CommunicationProviderImpl::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info) { return appPipeMgr_.SendData(pipeInfo, deviceId, dataInfo, totalLength, info); } diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h index aac05a9fe..5f13ab415 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h @@ -36,7 +36,7 @@ public: Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo) override; // Send data to other device, function will be called back after sent to notify send result. - Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, + std::pair SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t totalLength, const MessageInfo &info) override; // start 1 server to listen data from other devices; diff --git a/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp b/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp index ec11a5c8a..76f63db9f 100644 --- a/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communicator_context.cpp @@ -73,7 +73,7 @@ Status CommunicatorContext::UnRegSessionListener(const DevChangeListener *observ return Status::SUCCESS; } -void CommunicatorContext::NotifySessionReady(const std::string &deviceId, const int &errCode) +void CommunicatorContext::NotifySessionReady(const std::string &deviceId, int32_t errCode) { if (deviceId.empty()) { ZLOGE("deviceId empty"); diff --git a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp index 6fb5e1377..361214579 100644 --- a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp @@ -139,14 +139,17 @@ DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const const DataInfo dataInfo = { const_cast(data), length}; DeviceId destination; destination.deviceId = dstDevInfo.identifier; - Status errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength); - if (errCode == Status::RATE_LIMIT) { - ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast(errCode)); + auto errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength); + if (errCode.first == Status::RATE_LIMIT) { + ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast(errCode.second)); return DBStatus::RATE_LIMIT; } - if (errCode != Status::SUCCESS) { - ZLOGE("commProvider_ SendData Fail."); - return DBStatus::DB_ERROR; + if (errCode.first != Status::SUCCESS) { + ZLOGE("commProvider_ SendData Fail. code:%{public}d", errCode.second); + if (errCode.second == 0) { + return DBStatus::DB_ERROR; + } + return static_cast(errCode.second); } return DBStatus::OK; } @@ -221,7 +224,7 @@ void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const Devi onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE)); } -void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info, const int &errCode) const +void ProcessCommunicatorImpl::OnSessionReady(const DeviceInfo &info, int32_t errCode) const { std::lock_guard lock(sessionMutex_); if (sessionListener_ == nullptr) { diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index c1d5ffeaf..f478aaef7 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -52,8 +52,8 @@ public: Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo); // Send data to other device, function will be called back after sent to notify send result. - Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t length, - const MessageInfo &info); + std::pair SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const DataInfo &dataInfo, uint32_t length, const MessageInfo &info); bool IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer); @@ -83,7 +83,6 @@ public: void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, const AppDistributedKv::DeviceChangeType &type) const override; - private: using Time = std::chrono::steady_clock::time_point; using Duration = std::chrono::steady_clock::duration; @@ -95,6 +94,7 @@ private: void Reuse(const PipeInfo &pipeInfo, const DeviceId &deviceId, uint32_t qosType, std::shared_ptr &conn); void GetExpireTime(std::shared_ptr &conn); + std::pair GetParams(const std::string &deviceId); static constexpr const char *PKG_NAME = "distributeddata-default"; static constexpr Time INVALID_NEXT = std::chrono::steady_clock::time_point::max(); static constexpr uint32_t QOS_COUNT = 3; diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index 8edf7755d..d0aefae8c 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -189,40 +189,46 @@ void SoftBusAdapter::GetExpireTime(std::shared_ptr &conn) } } -Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, - uint32_t length, const MessageInfo &info) +std::pair SoftBusAdapter::GetParams(const std::string &deviceId) { - std::shared_ptr conn; - bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId); + bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId); uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR; + return std::make_pair(qosType, isOHOSType); +} + +std::pair SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const DataInfo &dataInfo, uint32_t length, const MessageInfo &info) +{ + std::shared_ptr conn; + auto param = GetParams(deviceId.deviceId); bool isReuse = false; - connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, isOHOSType, &isReuse](const auto &key, + connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, ¶m, &isReuse](const auto &key, std::vector> &connects) -> bool { for (auto &connect : connects) { - if (connect->GetQoSType() != qosType) { + if (connect->GetQoSType() != param.first) { continue; } - if (!isOHOSType && connect->needRemove) { + if (!param.second && connect->needRemove) { isReuse = true; return false; } conn = connect; return true; } - auto connect = std::make_shared(pipeInfo, deviceId, qosType); + auto connect = std::make_shared(pipeInfo, deviceId, param.first); connects.emplace_back(connect); conn = connect; return true; }); - if (!isOHOSType && isReuse) { - Reuse(pipeInfo, deviceId, qosType, conn); + if (!param.second && isReuse) { + Reuse(pipeInfo, deviceId, param.first, conn); } if (conn == nullptr) { - return Status::ERROR; + return std::make_pair(Status::ERROR, 0); } auto status = conn->CheckStatus(); if (status == Status::RATE_LIMIT) { - return Status::RATE_LIMIT; + return std::make_pair(Status::RATE_LIMIT, 0); } if (status != Status::SUCCESS) { auto task = [this, connect = std::weak_ptr(conn)]() { @@ -233,14 +239,14 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device }; auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId; ConnectManager::GetInstance()->ApplyConnect(networkId, task); - return Status::RATE_LIMIT; + return std::make_pair(Status::RATE_LIMIT, 0); } - status = conn->SendData(dataInfo, &clientListener_); if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) { GetExpireTime(conn); } - return status; + auto errCode = conn->GetInnerStatus(); + return std::make_pair(status, errCode); } void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId) diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp index b01fadfbe..e80ffb91f 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -75,12 +75,19 @@ Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener * if (ret != SOFTBUS_OK) { expireTime_ = std::chrono::steady_clock::now(); ZLOGE("send data to socket%{public}d failed, ret:%{public}d.", socket_, ret); + innerError_ = ret; return Status::ERROR; } + innerError_ = 0; expireTime_ = CalcExpireTime(); return Status::SUCCESS; } +int32_t SoftBusClient::GetInnerStatus() +{ + return innerError_; +} + Status SoftBusClient::OpenConnect(const ISocketListener *listener) { std::lock_guard lock(mutex_); diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h index 4a9eeef21..9c5dcf4c2 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -48,6 +48,7 @@ public: int32_t GetSocket() const; uint32_t GetQoSType() const; void UpdateExpireTime(); + int32_t GetInnerStatus(); bool needRemove = false; bool isReuse = false; @@ -85,6 +86,7 @@ private: int32_t socket_ = INVALID_SOCKET_ID; int32_t bindState_ = -1; + int32_t innerError_ = 0; }; } // namespace OHOS::AppDistributedKv diff --git a/services/distributeddataservice/adapter/communicator/test/unittest/communication_provider_impl_test.cpp b/services/distributeddataservice/adapter/communicator/test/unittest/communication_provider_impl_test.cpp index 58c8e5340..6c82c8b76 100644 --- a/services/distributeddataservice/adapter/communicator/test/unittest/communication_provider_impl_test.cpp +++ b/services/distributeddataservice/adapter/communicator/test/unittest/communication_provider_impl_test.cpp @@ -133,8 +133,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider005, TestSize.Level const uint8_t *t = reinterpret_cast(content.c_str()); DeviceId di17 = {"127.0.0.2"}; DataInfo data = { const_cast(t), static_cast(content.length())}; - Status status = CommunicationProvider::GetInstance().SendData(id17, di17, data, 0); - EXPECT_NE(status, Status::SUCCESS); + auto status = CommunicationProvider::GetInstance().SendData(id17, di17, data, 0); + EXPECT_NE(status.first, Status::SUCCESS); CommunicationProvider::GetInstance().StopWatchDataChange(dataListener17, id17); CommunicationProvider::GetInstance().Stop(id17); delete dataListener17; @@ -234,8 +234,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider011, TestSize.Level const uint8_t *t = reinterpret_cast(content.c_str()); DeviceId di = {"DeviceId"}; DataInfo data = { const_cast(t), static_cast(content.length())}; - Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); - EXPECT_EQ(status, Status::ERROR); + auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); + EXPECT_EQ(status.first, Status::ERROR); CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id); CommunicationProvider::GetInstance().Stop(id); delete dataListener; @@ -259,8 +259,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider012, TestSize.Level const uint8_t *t = reinterpret_cast(content.c_str()); DeviceId di = {""}; DataInfo data = { const_cast(t), static_cast(content.length())}; - Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); - EXPECT_EQ(status, Status::ERROR); + auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); + EXPECT_EQ(status.first, Status::ERROR); CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id); CommunicationProvider::GetInstance().Stop(id); delete dataListener; @@ -282,8 +282,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider013, TestSize.Level CommunicationProvider::GetInstance().Start(id); DeviceId di = {"DeviceId"}; DataInfo data = {nullptr, 0}; - Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); - EXPECT_EQ(status, Status::ERROR); + auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); + EXPECT_EQ(status.first, Status::ERROR); CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id); CommunicationProvider::GetInstance().Stop(id); delete dataListener; @@ -307,8 +307,8 @@ HWTEST_F(CommunicationProviderImplTest, CommunicationProvider014, TestSize.Level const uint8_t *t = reinterpret_cast(content.c_str()); DeviceId di = {"DeviceId"}; DataInfo data = { const_cast(t), static_cast(content.length())}; - Status status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); - EXPECT_EQ(status, Status::ERROR); + auto status = CommunicationProvider::GetInstance().SendData(id, di, data, 0); + EXPECT_EQ(status.first, Status::ERROR); CommunicationProvider::GetInstance().StopWatchDataChange(dataListener, id); CommunicationProvider::GetInstance().Stop(id); delete dataListener; diff --git a/services/distributeddataservice/adapter/communicator/test/unittest/softbus_adapter_standard_test.cpp b/services/distributeddataservice/adapter/communicator/test/unittest/softbus_adapter_standard_test.cpp index 455f5249a..df8207da8 100644 --- a/services/distributeddataservice/adapter/communicator/test/unittest/softbus_adapter_standard_test.cpp +++ b/services/distributeddataservice/adapter/communicator/test/unittest/softbus_adapter_standard_test.cpp @@ -155,8 +155,8 @@ HWTEST_F(SoftbusAdapterStandardTest, SendData, TestSize.Level1) const uint8_t *t = reinterpret_cast(content.c_str()); DeviceId di = {"DeviceId"}; DataInfo data = { const_cast(t), static_cast(content.length())}; - Status status = SoftBusAdapter::GetInstance()->SendData(id, di, data, 11, { MessageType::DEFAULT }); - EXPECT_NE(status, Status::SUCCESS); + auto status = SoftBusAdapter::GetInstance()->SendData(id, di, data, 11, { MessageType::DEFAULT }); + EXPECT_NE(status.first, Status::SUCCESS); SoftBusAdapter::GetInstance()->StopWatchDataChange(dataListener, id); delete dataListener; } diff --git a/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h b/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h index bba789978..94b6b9f83 100644 --- a/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h +++ b/services/distributeddataservice/adapter/include/communicator/app_device_change_listener.h @@ -32,7 +32,7 @@ public: { return ChangeLevelType::HIGH; } - API_EXPORT virtual void OnSessionReady(const DeviceInfo &info, const int &errCode) const {} + API_EXPORT virtual void OnSessionReady(const DeviceInfo &info, int32_t errCode) const {} }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/include/communicator/communication_provider.h b/services/distributeddataservice/adapter/include/communicator/communication_provider.h index e694ae534..94cd0ed4e 100644 --- a/services/distributeddataservice/adapter/include/communicator/communication_provider.h +++ b/services/distributeddataservice/adapter/include/communicator/communication_provider.h @@ -45,8 +45,9 @@ public: virtual Status StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo) = 0; // Send data to other device, function will be called back after sent to notify send result - virtual Status SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, - uint32_t totalLength, const MessageInfo &info = { MessageType::DEFAULT }) = 0; + virtual std::pair SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const DataInfo &dataInfo, uint32_t totalLength, + const MessageInfo &info = { MessageType::DEFAULT }) = 0; // start one server to listen data from other devices; virtual Status Start(const PipeInfo &pipeInfo) = 0; diff --git a/services/distributeddataservice/adapter/include/communicator/communicator_context.h b/services/distributeddataservice/adapter/include/communicator/communicator_context.h index 1020c57de..ce09ab58a 100644 --- a/services/distributeddataservice/adapter/include/communicator/communicator_context.h +++ b/services/distributeddataservice/adapter/include/communicator/communicator_context.h @@ -36,7 +36,7 @@ public: std::shared_ptr GetThreadPool(); Status RegSessionListener(const DevChangeListener *observer); Status UnRegSessionListener(const DevChangeListener *observer); - void NotifySessionReady(const std::string &deviceId, const int &errCode); + void NotifySessionReady(const std::string &deviceId, int32_t errCode); void NotifySessionClose(const std::string &deviceId); void SetSessionListener(const OnCloseAble &closeAbleCallback); bool IsSessionReady(const std::string &deviceId); diff --git a/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h b/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h index f16c43d54..4620c0fc4 100644 --- a/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h +++ b/services/distributeddataservice/adapter/include/communicator/process_communicator_impl.h @@ -57,7 +57,7 @@ public: std::vector GetRemoteOnlineDeviceInfosList() override; bool IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo) override; void OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const override; - void OnSessionReady(const DeviceInfo &info, const int &errCode) const override; + void OnSessionReady(const DeviceInfo &info, int32_t errCode) const override; API_EXPORT std::shared_ptr GetExtendHeaderHandle( const DistributedDB::ExtendInfo &info) override; diff --git a/services/distributeddataservice/app/src/kvstore_device_listener.cpp b/services/distributeddataservice/app/src/kvstore_device_listener.cpp index d3cd9cd52..65a741b59 100644 --- a/services/distributeddataservice/app/src/kvstore_device_listener.cpp +++ b/services/distributeddataservice/app/src/kvstore_device_listener.cpp @@ -34,7 +34,7 @@ void KvStoreDeviceListener::OnDeviceChanged( ZLOGI("device is %{public}d", type); } -void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info, const int &errCode) const +void KvStoreDeviceListener::OnSessionReady(const AppDistributedKv::DeviceInfo &info, int32_t errCode) const { (void)errCode; kvStoreDataService_.OnSessionReady(info); diff --git a/services/distributeddataservice/app/src/kvstore_device_listener.h b/services/distributeddataservice/app/src/kvstore_device_listener.h index 849afdabe..f9ee50585 100644 --- a/services/distributeddataservice/app/src/kvstore_device_listener.h +++ b/services/distributeddataservice/app/src/kvstore_device_listener.h @@ -26,7 +26,7 @@ public: void OnDeviceChanged( const AppDistributedKv::DeviceInfo &info, const AppDistributedKv::DeviceChangeType &type) const override; AppDistributedKv::ChangeLevelType GetChangeLevelType() const override; - void OnSessionReady(const AppDistributedKv::DeviceInfo &info, const int &errCode = 0) const override; + void OnSessionReady(const AppDistributedKv::DeviceInfo &info, int32_t errCode = 0) const override; private: KvStoreDataService &kvStoreDataService_; diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp index 9b61ca257..1b4ea6fb4 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp @@ -1122,16 +1122,13 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in std::to_string(info.syncId), DATA_TYPE, meta.dataType); std::map result; for (auto &[key, status] : dbResult) { - if (status < 0) { // pass on softbus error code - result[key] = static_cast(status); - } else { - if (status == DBStatus::COMM_FAILURE) { - if (DMAdapter::GetInstance().ToUUID(key).empty()) { - result[key] = Status::DEVICE_NOT_ONLINE; - } else { - result[key] = Status::PEER_DATABASE_NOT_EXIST; - } + if (status == DBStatus::COMM_FAILURE) { + if (DMAdapter::GetInstance().ToUUID(key).empty()) { + result[key] = Status::DEVICE_NOT_ONLINE; + } else { + result[key] = Status::PEER_DATABASE_NOT_EXIST; } + } else { result[key] = ConvertDbStatus(status); } } @@ -1179,6 +1176,12 @@ uint32_t KVDBServiceImpl::GetSyncDelayTime(uint32_t delay, const StoreId &storeI Status KVDBServiceImpl::ConvertDbStatus(DBStatus status) const { + auto innerStatus = static_cast(status); + if (innerStatus < 0) { + ZLOGW("passthrough error code:%{public}d", innerStatus); + return static_cast(status); + } + switch (status) { case DBStatus::BUSY: // fallthrough case DBStatus::DB_ERROR: -- Gitee From 14ab547e313619951b0630bdacf2921784167faf Mon Sep 17 00:00:00 2001 From: yanhui Date: Tue, 12 Nov 2024 14:33:36 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E9=9A=94=E7=A6=BB=E5=AF=B9JS=E4=BE=A7?= =?UTF-8?q?=E7=9A=84=E5=BD=B1=E5=93=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yanhui Change-Id: I990774166cc6ac76b9ea14c6989ccf0449031397 --- .../service/kvdb/kvdb_service_impl.cpp | 35 +++++++++++-------- .../service/kvdb/kvdb_service_impl.h | 1 + 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp index 1b4ea6fb4..c6e6ae977 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp @@ -1121,14 +1121,10 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in SYNC_STORE_ID, Anonymous::Change(meta.storeId), SYNC_APP_ID, meta.bundleName, CONCURRENT_ID, std::to_string(info.syncId), DATA_TYPE, meta.dataType); std::map result; - for (auto &[key, status] : dbResult) { - if (status == DBStatus::COMM_FAILURE) { - if (DMAdapter::GetInstance().ToUUID(key).empty()) { - result[key] = Status::DEVICE_NOT_ONLINE; - } else { - result[key] = Status::PEER_DATABASE_NOT_EXIST; - } - } else { + if (AccessTokenKit::GetTokenTypeFlag(meta.tokenId) != TOKEN_HAP) { + result = ConvertSyncStatusForNative(dbResult); + } else { + for (auto &[key, status] : dbResult) { result[key] = ConvertDbStatus(status); } } @@ -1153,6 +1149,23 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in return SUCCESS; } +std::map KVDBServiceImpl::ConvertSyncStatusForNative(const DBResult &dbResult) +{ + std::map result; + for (auto &[key, status] : dbResult) { + auto innerStatus = static_cast(status); + if (innerStatus < 0) { + ZLOGW("Directly transmit error code. code:%{public}d", innerStatus); + result[key] = static_cast(status); + } else if (status == DBStatus::COMM_FAILURE) { + result[key] = Status::DEVICE_NOT_ONLINE; + } else { + result[key] = ConvertDbStatus(status); + } + } + return result; +} + uint32_t KVDBServiceImpl::GetSyncDelayTime(uint32_t delay, const StoreId &storeId) { if (delay != 0) { @@ -1176,12 +1189,6 @@ uint32_t KVDBServiceImpl::GetSyncDelayTime(uint32_t delay, const StoreId &storeI Status KVDBServiceImpl::ConvertDbStatus(DBStatus status) const { - auto innerStatus = static_cast(status); - if (innerStatus < 0) { - ZLOGW("passthrough error code:%{public}d", innerStatus); - return static_cast(status); - } - switch (status) { case DBStatus::BUSY: // fallthrough case DBStatus::DB_ERROR: diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.h b/services/distributeddataservice/service/kvdb/kvdb_service_impl.h index c2fcef042..3d07db047 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.h +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.h @@ -150,6 +150,7 @@ private: void TryToSync(const StoreMetaData &metaData, bool force = false); bool IsRemoteChange(const StoreMetaData &metaData, const std::string &device); bool IsOHOSType(const std::vector &ids); + std::map ConvertSyncStatusForNative(const DBResult &dbResult); static Factory factory_; ConcurrentMap syncAgents_; std::shared_ptr executors_; -- Gitee From 79bbc98b60208eeaa8e31a7d6463610fec8cad47 Mon Sep 17 00:00:00 2001 From: yanhui Date: Tue, 12 Nov 2024 23:21:43 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E8=AF=84=E5=AE=A1=E6=84=8F=E8=A7=81?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yanhui Change-Id: If60053a03ed15440f1c62786926b3ba2ec13f278 --- .../src/process_communicator_impl.cpp | 15 +++--- .../communicator/src/softbus_adapter.h | 2 +- .../src/softbus_adapter_standard.cpp | 48 +++++++++---------- .../communicator/src/softbus_client.cpp | 9 ++-- .../adapter/communicator/src/softbus_client.h | 4 +- .../service/kvdb/kvdb_service_impl.cpp | 25 +++++----- .../service/kvdb/kvdb_service_impl.h | 2 +- 7 files changed, 52 insertions(+), 53 deletions(-) diff --git a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp index 361214579..b3d43e80a 100644 --- a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp @@ -139,17 +139,18 @@ DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const const DataInfo dataInfo = { const_cast(data), length}; DeviceId destination; destination.deviceId = dstDevInfo.identifier; - auto errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength); - if (errCode.first == Status::RATE_LIMIT) { - ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast(errCode.second)); + auto [errCode, softBusErrCode] = + CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength); + if (errCode == Status::RATE_LIMIT) { + ZLOGD("commProvider_ opening session, status:%{public}d.", static_cast(softBusErrCode)); return DBStatus::RATE_LIMIT; } - if (errCode.first != Status::SUCCESS) { - ZLOGE("commProvider_ SendData Fail. code:%{public}d", errCode.second); - if (errCode.second == 0) { + if (errCode != Status::SUCCESS) { + ZLOGE("commProvider_ SendData Fail. code:%{public}d", softBusErrCode); + if (softBusErrCode == 0) { return DBStatus::DB_ERROR; } - return static_cast(errCode.second); + return static_cast(softBusErrCode); } return DBStatus::OK; } diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index f478aaef7..5e70c3013 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -94,7 +94,7 @@ private: void Reuse(const PipeInfo &pipeInfo, const DeviceId &deviceId, uint32_t qosType, std::shared_ptr &conn); void GetExpireTime(std::shared_ptr &conn); - std::pair GetParams(const std::string &deviceId); + std::pair OpenConnect(const std::shared_ptr &conn, const DeviceId &deviceId); static constexpr const char *PKG_NAME = "distributeddata-default"; static constexpr Time INVALID_NEXT = std::chrono::steady_clock::time_point::max(); static constexpr uint32_t QOS_COUNT = 3; diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index d0aefae8c..73d1bbe1c 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -189,39 +189,33 @@ void SoftBusAdapter::GetExpireTime(std::shared_ptr &conn) } } -std::pair SoftBusAdapter::GetParams(const std::string &deviceId) -{ - bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId); - uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR; - return std::make_pair(qosType, isOHOSType); -} - std::pair SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo, uint32_t length, const MessageInfo &info) { std::shared_ptr conn; - auto param = GetParams(deviceId.deviceId); + bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId); + uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR; bool isReuse = false; - connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, ¶m, &isReuse](const auto &key, + connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, isOHOSType, &isReuse](const auto &key, std::vector> &connects) -> bool { for (auto &connect : connects) { - if (connect->GetQoSType() != param.first) { + if (connect->GetQoSType() != qosType) { continue; } - if (!param.second && connect->needRemove) { + if (!isOHOSType && connect->needRemove) { isReuse = true; return false; } conn = connect; return true; } - auto connect = std::make_shared(pipeInfo, deviceId, param.first); + auto connect = std::make_shared(pipeInfo, deviceId, qosType); connects.emplace_back(connect); conn = connect; return true; }); - if (!param.second && isReuse) { - Reuse(pipeInfo, deviceId, param.first, conn); + if (!isOHOSType && isReuse) { + Reuse(pipeInfo, deviceId, qosType, conn); } if (conn == nullptr) { return std::make_pair(Status::ERROR, 0); @@ -231,24 +225,30 @@ std::pair SoftBusAdapter::SendData(const PipeInfo &pipeInfo, co return std::make_pair(Status::RATE_LIMIT, 0); } if (status != Status::SUCCESS) { - auto task = [this, connect = std::weak_ptr(conn)]() { - auto conn = connect.lock(); - if (conn != nullptr) { - conn->OpenConnect(&clientListener_); - } - }; - auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId; - ConnectManager::GetInstance()->ApplyConnect(networkId, task); - return std::make_pair(Status::RATE_LIMIT, 0); + return OpenConnect(conn, deviceId); } status = conn->SendData(dataInfo, &clientListener_); if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) { GetExpireTime(conn); } - auto errCode = conn->GetInnerStatus(); + auto errCode = conn->GetSoftBusError(); return std::make_pair(status, errCode); } +std::pair SoftBusAdapter::OpenConnect(const std::shared_ptr &conn, + const DeviceId &deviceId) +{ + auto task = [this, connect = std::weak_ptr(conn)]() { + auto conn = connect.lock(); + if (conn != nullptr) { + conn->OpenConnect(&clientListener_); + } + }; + auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId; + ConnectManager::GetInstance()->ApplyConnect(networkId, task); + return std::make_pair(Status::RATE_LIMIT, 0); +} + void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId) { std::shared_ptr conn; diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp index e80ffb91f..9dfb23b2c 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -75,17 +75,18 @@ Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener * if (ret != SOFTBUS_OK) { expireTime_ = std::chrono::steady_clock::now(); ZLOGE("send data to socket%{public}d failed, ret:%{public}d.", socket_, ret); - innerError_ = ret; + softBusError_ = ret; return Status::ERROR; } - innerError_ = 0; + softBusError_ = 0; expireTime_ = CalcExpireTime(); return Status::SUCCESS; } -int32_t SoftBusClient::GetInnerStatus() +int32_t SoftBusClient::GetSoftBusError() { - return innerError_; + std::lock_guard lock(mutex_); + return softBusError_; } Status SoftBusClient::OpenConnect(const ISocketListener *listener) diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h index 9c5dcf4c2..1547e32d8 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -48,7 +48,7 @@ public: int32_t GetSocket() const; uint32_t GetQoSType() const; void UpdateExpireTime(); - int32_t GetInnerStatus(); + int32_t GetSoftBusError(); bool needRemove = false; bool isReuse = false; @@ -86,7 +86,7 @@ private: int32_t socket_ = INVALID_SOCKET_ID; int32_t bindState_ = -1; - int32_t innerError_ = 0; + int32_t softBusError_ = 0; }; } // namespace OHOS::AppDistributedKv diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp index c6e6ae977..6748a6b6b 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp @@ -1122,7 +1122,9 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in std::to_string(info.syncId), DATA_TYPE, meta.dataType); std::map result; if (AccessTokenKit::GetTokenTypeFlag(meta.tokenId) != TOKEN_HAP) { - result = ConvertSyncStatusForNative(dbResult); + for (auto &[key, status] : dbResult) { + result[key] = ConvertDbStatusNative(status); + } } else { for (auto &[key, status] : dbResult) { result[key] = ConvertDbStatus(status); @@ -1149,21 +1151,16 @@ Status KVDBServiceImpl::DoComplete(const StoreMetaData &meta, const SyncInfo &in return SUCCESS; } -std::map KVDBServiceImpl::ConvertSyncStatusForNative(const DBResult &dbResult) +Status KVDBServiceImpl::ConvertDbStatusNative(DBStatus status) { - std::map result; - for (auto &[key, status] : dbResult) { - auto innerStatus = static_cast(status); - if (innerStatus < 0) { - ZLOGW("Directly transmit error code. code:%{public}d", innerStatus); - result[key] = static_cast(status); - } else if (status == DBStatus::COMM_FAILURE) { - result[key] = Status::DEVICE_NOT_ONLINE; - } else { - result[key] = ConvertDbStatus(status); - } + auto innerStatus = static_cast(status); + if (innerStatus < 0) { + return static_cast(status); + } else if (status == DBStatus::COMM_FAILURE) { + return Status::DEVICE_NOT_ONLINE; + } else { + return ConvertDbStatus(status); } - return result; } uint32_t KVDBServiceImpl::GetSyncDelayTime(uint32_t delay, const StoreId &storeId) diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.h b/services/distributeddataservice/service/kvdb/kvdb_service_impl.h index 3d07db047..249578772 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.h +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.h @@ -150,7 +150,7 @@ private: void TryToSync(const StoreMetaData &metaData, bool force = false); bool IsRemoteChange(const StoreMetaData &metaData, const std::string &device); bool IsOHOSType(const std::vector &ids); - std::map ConvertSyncStatusForNative(const DBResult &dbResult); + Status ConvertDbStatusNative(DBStatus status); static Factory factory_; ConcurrentMap syncAgents_; std::shared_ptr executors_; -- Gitee