From 9a8230c8cba1b4c88191987f43fd16f790b10e41 Mon Sep 17 00:00:00 2001 From: houpengtao Date: Fri, 11 Nov 2022 15:43:34 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=B8=8A=E7=BA=BF=E9=98=B6=E6=AE=B5?= =?UTF-8?q?=E6=8C=89=E7=85=A7=E6=95=B0=E6=8D=AE=E5=A4=A7=E5=B0=8F=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E9=80=9A=E9=81=93=EF=BC=8C=E5=B9=B6=E4=BB=8E=E8=BD=AF?= =?UTF-8?q?=E6=80=BB=E7=BA=BF=E8=8E=B7=E5=8F=96MTU=E5=A4=A7=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: houpengtao --- .../adapter/communicator/BUILD.gn | 5 +- .../src/communication_provider_impl.cpp | 8 +- .../src/communication_provider_impl.h | 1 + .../src/communication_strategy.cpp | 65 ++---- .../src/communication_strategy_impl.cpp | 37 ++++ ...rategy.h => communication_strategy_impl.h} | 25 +-- .../src/process_communicator_impl.cpp | 12 +- .../communicator/src/softbus_adapter.h | 27 +-- .../src/softbus_adapter_standard.cpp | 195 +++++------------- .../communicator/src/softbus_client.cpp | 162 +++++++++++++++ .../adapter/communicator/src/softbus_client.h | 65 ++++++ .../communicator/calc_sync_data_size.h | 28 +++ .../communicator/communication_provider.h | 1 + .../communicator/communication_strategy.h | 40 ++++ services/distributeddataservice/app/BUILD.gn | 2 + .../calc_sync_data_size_impl.cpp | 105 ++++++++++ .../calc_sync_data/calc_sync_data_size_impl.h | 38 ++++ .../app/src/kvstore_data_service.cpp | 4 + .../app/src/kvstore_meta_manager.h | 4 +- .../distributeddataservice/service/BUILD.gn | 1 + .../service/kvdb/calc_kvdb_sync_data_size.cpp | 43 ++++ .../service/kvdb/calc_kvdb_sync_data_size.h | 30 +++ 22 files changed, 650 insertions(+), 248 deletions(-) create mode 100644 services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp rename services/distributeddataservice/adapter/communicator/src/{communication_strategy.h => communication_strategy_impl.h} (47%) create mode 100644 services/distributeddataservice/adapter/communicator/src/softbus_client.cpp create mode 100644 services/distributeddataservice/adapter/communicator/src/softbus_client.h create mode 100644 services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h create mode 100644 services/distributeddataservice/adapter/include/communicator/communication_strategy.h create mode 100644 services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp create mode 100644 services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h create mode 100644 services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp create mode 100644 services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h diff --git a/services/distributeddataservice/adapter/communicator/BUILD.gn b/services/distributeddataservice/adapter/communicator/BUILD.gn index 7df58e1ac..b8f0199bc 100755 --- a/services/distributeddataservice/adapter/communicator/BUILD.gn +++ b/services/distributeddataservice/adapter/communicator/BUILD.gn @@ -23,12 +23,15 @@ ohos_static_library("distributeddata_communicator_static") { "src/communication_provider_impl.cpp", "src/communication_provider_impl.h", "src/communication_strategy.cpp", - "src/communication_strategy.h", + "src/communication_strategy_impl.cpp", + "src/communication_strategy_impl.h", "src/data_buffer.cpp", "src/device_manager_adapter.cpp", "src/process_communicator_impl.cpp", "src/softbus_adapter.h", "src/softbus_adapter_standard.cpp", + "src/softbus_client.h", + "src/softbus_client.cpp", ] include_dirs = [ diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp index 9611d11c4..167e5bbb4 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp @@ -14,8 +14,6 @@ */ #include "communication_provider_impl.h" - -#include "communication_strategy.h" #include "device_manager_adapter.h" #include "log_print.h" @@ -37,7 +35,6 @@ CommunicationProviderImpl::~CommunicationProviderImpl() Status CommunicationProviderImpl::Initialize() { - CommunicationStrategy::GetInstance().Init(); DmAdapter::GetInstance().Init(); return Status::SUCCESS; } @@ -135,5 +132,10 @@ int32_t CommunicationProviderImpl::ListenBroadcastMsg(const PipeInfo &pipeInfo, { return SoftBusAdapter::GetInstance()->ListenBroadcastMsg(pipeInfo, std::move(listener)); } + +uint32_t CommunicationProviderImpl::GetMtuSize(const DeviceId &deviceId) const +{ + return SoftBusAdapter::GetInstance()->GetMtuSize(deviceId); +} } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h index e66a0d982..a3a550d0b 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h @@ -70,6 +70,7 @@ public: int32_t Broadcast(const PipeInfo &pipeInfo, uint16_t mask) override; int32_t ListenBroadcastMsg(const PipeInfo &pipeInfo, std::function listener) override; + uint32_t GetMtuSize(const DeviceId &deviceId) const override; protected: virtual Status Initialize(); diff --git a/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp b/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp index 3a77fcc94..863bc3731 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp @@ -14,60 +14,19 @@ */ #include "communication_strategy.h" -#include "log_print.h" -#include "device_manager_adapter.h" -#include "kvstore_utils.h" -#ifdef LOG_TAG -#undef LOG_TAG -#endif -#define LOG_TAG "CommunicationStrategy" +#include "communication_strategy_impl.h" -namespace OHOS { -namespace AppDistributedKv { -using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter; -using KvStoreUtils = OHOS::DistributedKv::KvStoreUtils; -CommunicationStrategy &CommunicationStrategy::GetInstance() +namespace OHOS::AppDistributedKv { +std::shared_ptr CommunicationStrategy::instance_; +std::mutex CommunicationStrategy::mutex_; +std::shared_ptr CommunicationStrategy::GetInstance() { - static CommunicationStrategy instance; - return instance; -} - -Status CommunicationStrategy::Init() -{ - return DmAdapter::GetInstance().StartWatchDeviceChange(this, {"strategy"}); -} - -void CommunicationStrategy::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, - const AppDistributedKv::DeviceChangeType &type) const -{ - UpdateCommunicationStrategy(info, type); -} - -void CommunicationStrategy::UpdateCommunicationStrategy(const AppDistributedKv::DeviceInfo &info, - const AppDistributedKv::DeviceChangeType &type) const -{ - ZLOGD("[UpdateCommunicationStrategy] to %{public}s, type:%{public}d", - KvStoreUtils::ToBeAnonymous(info.uuid).c_str(), type); - if (type == AppDistributedKv::DeviceChangeType::DEVICE_ONLINE) { - strategys_.InsertOrAssign(info.uuid, true); - } else if (type == AppDistributedKv::DeviceChangeType::DEVICE_ONREADY) { - strategys_.Erase(info.uuid); - } else { - ; + if (instance_ == nullptr) { + std::lock_guard lock(mutex_); + if (instance_ == nullptr) { + instance_ = std::make_shared(); + } } + return instance_; } - -void CommunicationStrategy::GetStrategy(const std::string &deviceId, int32_t dataLen, std::vector &linkTypes) -{ - if (!strategys_.Contains(deviceId)) { - return; - } - - linkTypes.emplace_back(LINK_TYPE_WIFI_WLAN_5G); - linkTypes.emplace_back(LINK_TYPE_WIFI_WLAN_2G); - linkTypes.emplace_back(LINK_TYPE_WIFI_P2P); - linkTypes.emplace_back(LINK_TYPE_BR); - return; -} -} // namespace AppDistributedKv -} // namespace OHOS \ No newline at end of file +} // namespace OHOS::AppDistributedKv \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp new file mode 100644 index 000000000..79c57e276 --- /dev/null +++ b/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "CommunicationStrategyImpl" +#include "communication_strategy_impl.h" +#include "log_print.h" +#include "kvstore_utils.h" + +namespace OHOS { +namespace AppDistributedKv { +using KvStoreUtils = OHOS::DistributedKv::KvStoreUtils; +void CommunicationStrategyImpl::RegObject(std::shared_ptr object) +{ + calcSyncDataSize_ = object; +} + +CommunicationStrategy::Strategy CommunicationStrategyImpl::GetStrategy(const std::string &deviceId) +{ + if (calcSyncDataSize_ == nullptr || calcSyncDataSize_->CalcDataSize(deviceId) < SWITCH_CONNECTION_THRESHOLD) { + return CommunicationStrategy::Strategy::DEFAULT; + } + return CommunicationStrategy::Strategy::ON_LINE_SELECT_CHANNEL; +} +} // namespace AppDistributedKv +} // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/communication_strategy.h b/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h similarity index 47% rename from services/distributeddataservice/adapter/communicator/src/communication_strategy.h rename to services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h index d2633023d..4bc4b500e 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_strategy.h +++ b/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h @@ -17,29 +17,18 @@ #define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRAGETY_H #include -#include "concurrent_map.h" -#include "app_device_change_listener.h" -#include "session.h" +#include "communication_strategy.h" +#include "visibility.h" namespace OHOS { namespace AppDistributedKv { -class CommunicationStrategy : public AppDistributedKv::AppDeviceChangeListener { +class CommunicationStrategyImpl : public CommunicationStrategy { public: - static CommunicationStrategy &GetInstance(); - Status Init(); - void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, - const AppDistributedKv::DeviceChangeType &type) const override; - void GetStrategy(const std::string &deviceId, int32_t dataLen, std::vector &linkTypes); + void RegObject(std::shared_ptr object) override; + virtual CommunicationStrategy::Strategy GetStrategy(const std::string &deviceId) override; private: - CommunicationStrategy() = default; - ~CommunicationStrategy() = default; - CommunicationStrategy(CommunicationStrategy const &) = delete; - void operator=(CommunicationStrategy const &) = delete; - CommunicationStrategy(CommunicationStrategy &&) = delete; - CommunicationStrategy &operator=(CommunicationStrategy &&) = delete; - void UpdateCommunicationStrategy(const AppDistributedKv::DeviceInfo &info, - const AppDistributedKv::DeviceChangeType &type) const; - mutable ConcurrentMap strategys_; + static constexpr uint32_t SWITCH_CONNECTION_THRESHOLD = 75 * 1024; + std::shared_ptr calcSyncDataSize_; }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp index 5b3712e7c..536a3c31c 100644 --- a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp @@ -146,13 +146,11 @@ uint32_t ProcessCommunicatorImpl::GetMtuSize(const DeviceInfos &devInfo) { ZLOGI("GetMtuSize start"); const auto &comm = CommunicationProvider::GetInstance(); - DeviceInfo deviceInfo = comm.GetDeviceInfo(devInfo.identifier); - DeviceType deviceType = GetDeviceType(deviceInfo.deviceType); - if (deviceType == SMART_WATCH || deviceType == KID_WATCH) { - ZLOGI("GetMtuSize deviceType: %{public}d", deviceInfo.deviceType); - return MTU_SIZE_WATCH; - } - return MTU_SIZE; + DeviceId deviceId = { + .deviceId = devInfo.identifier + }; + + return comm.GetMtuSize(deviceId); } DeviceInfos ProcessCommunicatorImpl::GetLocalDeviceInfos() diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index b5220e0a7..2293e0c70 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -26,10 +26,10 @@ #include #include "app_data_change_listener.h" -#include "block_data.h" #include "platform_specific.h" #include "session.h" #include "softbus_bus_center.h" +#include "softbus_client.h" namespace OHOS { namespace AppDistributedKv { class SoftBusAdapter { @@ -38,8 +38,6 @@ public: ~SoftBusAdapter(); static std::shared_ptr GetInstance(); - static std::string ToBeAnonymous(const std::string &name); - // add DataChangeListener to watch data change; Status StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo); @@ -58,35 +56,28 @@ public: int RemoveSessionServerAdapter(const std::string &sessionName) const; - void InsertSession(const std::string &sessionName, int32_t connId); - - std::string DeleteSession(int32_t connId); - void NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId, const PipeInfo &pipeInfo); - int32_t GetSessionStatus(int32_t connId); - void OnSessionOpen(int32_t connId, int32_t status); - void OnSessionClose(int32_t connId); + std::string OnSessionClose(int32_t connId); int32_t Broadcast(const PipeInfo &pipeInfo, uint16_t mask); void OnBroadcast(const DeviceId &device, uint16_t mask); int32_t ListenBroadcastMsg(const PipeInfo &pipeInfo, std::function listener); + uint32_t GetMtuSize(const DeviceId &deviceId); private: - std::shared_ptr> GetSemaphore(int32_t connId); - Status GetConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId, int32_t dataSize, int32_t &connId); - Status OpenConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId, int32_t dataSize, int32_t &connId); - void InitSessionAttribute(const PipeInfo &pipeInfo, const DeviceId &deviceId, int32_t dataSize, - SessionAttribute &attr); + using KvScheduler = OHOS::DistributedKv::KvScheduler; + std::function CloseIdleConnect(); + + static constexpr int32_t CONNECT_IDLE_CLOSE_COUNT = 60; static std::shared_ptr instance_; ConcurrentMap dataChangeListeners_{}; - ConcurrentMap connects_{}; + std::mutex connMutex_{}; + std::map> connects_ {}; bool flag_ = true; // only for br flag ISessionListener sessionListener_{}; - std::mutex statusMutex_{}; - std::map>> sessionsStatus_; std::function onBroadcast_; }; } // namespace AppDistributedKv diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index 09b4abe04..4dc0a9260 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -15,9 +15,9 @@ #include #include -#include "communication_strategy.h" #include "device_manager_adapter.h" #include "dfx_types.h" +#include "kvstore_utils.h" #include "log_print.h" #include "reporter.h" #include "securec.h" @@ -32,13 +32,6 @@ namespace OHOS { namespace AppDistributedKv { -constexpr int32_t HEAD_SIZE = 3; -constexpr int32_t END_SIZE = 3; -constexpr int32_t MIN_SIZE = HEAD_SIZE + END_SIZE + 3; -constexpr const char *REPLACE_CHAIN = "***"; -constexpr const char *DEFAULT_ANONYMOUS = "******"; -constexpr int32_t SOFTBUS_OK = 0; -constexpr int32_t SOFTBUS_ERR = 1; enum SoftBusAdapterErrorCode : int32_t { SESSION_ID_INVALID = 2, MY_SESSION_NAME_INVALID, @@ -48,7 +41,7 @@ enum SoftBusAdapterErrorCode : int32_t { }; constexpr int32_t SESSION_NAME_SIZE_MAX = 65; constexpr int32_t DEVICE_ID_SIZE_MAX = 65; -constexpr uint32_t WAIT_MAX_TIME = 5; +static constexpr int32_t DEFAULT_MTU_SIZE = 4096; using namespace std; using namespace OHOS::DistributedDataDfx; using namespace OHOS::DistributedKv; @@ -124,19 +117,7 @@ SoftBusAdapter::~SoftBusAdapter() if (onBroadcast_) { UnregNodeDeviceStateCb(&g_callback); } -} - -std::string SoftBusAdapter::ToBeAnonymous(const std::string &name) -{ - if (name.length() <= HEAD_SIZE) { - return DEFAULT_ANONYMOUS; - } - - if (name.length() < MIN_SIZE) { - return (name.substr(0, HEAD_SIZE) + REPLACE_CHAIN); - } - - return (name.substr(0, HEAD_SIZE) + REPLACE_CHAIN + name.substr(name.length() - END_SIZE, END_SIZE)); + connects_.clear(); } std::shared_ptr SoftBusAdapter::GetInstance() @@ -173,119 +154,68 @@ Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppData return Status::ERROR; } -Status SoftBusAdapter::OpenConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId, int32_t dataSize, - int32_t &connId) -{ - SessionAttribute attr = { 0 }; - InitSessionAttribute(pipeInfo, deviceId, dataSize, attr); - int id = OpenSession(pipeInfo.pipeId.c_str(), pipeInfo.pipeId.c_str(), - DmAdapter::GetInstance().ToNetworkID(deviceId.deviceId).c_str(), "GROUP_ID", &attr); - ZLOGI("[OpenSession] to %{public}s,session:%{public}s, connId:%{public}d, linkNum:%{public}d", - ToBeAnonymous(deviceId.deviceId).c_str(), pipeInfo.pipeId.c_str(), id, attr.linkTypeNum); - if (connId < 0) { - ZLOGW("OpenSession %{public}s, type:%{public}d failed, connId:%{public}d", pipeInfo.pipeId.c_str(), - attr.dataType, id); - return Status::NETWORK_ERROR; - } - int state = GetSessionStatus(id); - ZLOGI("waited for notification, state:%{public}d connId:%{public}d", state, id); - if (state != SOFTBUS_OK) { - ZLOGE("OpenSession callback result error"); - CloseSession(id); - return Status::NETWORK_ERROR; - } - - connId = id; - return Status::SUCCESS; -} - -void SoftBusAdapter::InitSessionAttribute(const PipeInfo &pipeInfo, const DeviceId &deviceId, int32_t dataSize, - SessionAttribute &attr) +Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const uint8_t *data, int size, + const MessageInfo &info) { - attr.dataType = TYPE_BYTES; - std::vector linkTypes; - CommunicationStrategy::GetInstance().GetStrategy(deviceId.deviceId, dataSize, linkTypes); - int index = 0; - for (auto const &element : linkTypes) { - attr.linkType[index++] = element; - if (index >= LINK_TYPE_MAX) { - break; + std::shared_ptr conn; + { + lock_guard lock(connMutex_); + std::string key = pipeInfo.pipeId + deviceId.deviceId; + if (connects_.find(key) == connects_.end()) { + connects_.emplace(key, std::make_shared(pipeInfo, deviceId)); } + conn = connects_[key]; } - attr.linkTypeNum = index; - ZLOGD("set session attr pipeInfo:%{public}s deviceId:%{public}s link Num: %{public}d, link size: %{public}zu," - " size: %{public}d", pipeInfo.pipeId.c_str(), - ToBeAnonymous(deviceId.deviceId).c_str(), attr.linkTypeNum, linkTypes.size(), dataSize); -} -Status SoftBusAdapter::GetConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId, int32_t dataSize, int32_t &connId) -{ - auto result = connects_.Find(pipeInfo.pipeId + deviceId.deviceId); - if (result.first) { - connId = result.second; - return Status::SUCCESS; + if (conn) { + return conn->Send(data, size); } - return OpenConnect(pipeInfo, deviceId, dataSize, connId); + return Status::ERROR; } -Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const uint8_t *data, int size, - const MessageInfo &info) +uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId) { - int connId = 0; - auto state = GetConnect(pipeInfo, deviceId, size, connId); - if (state != Status::SUCCESS) { - ZLOGW("get connect %{public}s, type:%{public}d failed, status:%{public}d", pipeInfo.pipeId.c_str(), - info.msgType, state); - return state; - } - - ZLOGD("[SendData] to %{public}s, session:%{public}s, send len:%{public}d, connId:%{public}d", - ToBeAnonymous(deviceId.deviceId).c_str(), pipeInfo.pipeId.c_str(), size, connId); - int32_t ret = SendBytes(connId, data, size); - if (ret != SOFTBUS_OK) { - ZLOGE("[SendBytes] to %{public}d failed, ret:%{public}d.", connId, ret); - return Status::ERROR; + lock_guard lock(connMutex_); + for (const auto& conn : connects_) { + if (*conn.second == deviceId) { + return conn.second->GetMtuSize(); + } } - return Status::SUCCESS; -} -int32_t SoftBusAdapter::GetSessionStatus(int32_t connId) -{ - auto semaphore = GetSemaphore(connId); - return semaphore->GetValue(); + return DEFAULT_MTU_SIZE; } void SoftBusAdapter::OnSessionOpen(int32_t connId, int32_t status) { - auto semaphore = GetSemaphore(connId); - semaphore->SetValue(status); -} - -void SoftBusAdapter::OnSessionClose(int32_t connId) -{ - lock_guard lock(statusMutex_); - auto it = sessionsStatus_.find(connId); - if (it != sessionsStatus_.end()) { - it->second->Clear(SOFTBUS_ERR); - sessionsStatus_.erase(it); + lock_guard lock(connMutex_); + for (const auto& conn : connects_) { + if (*conn.second == connId) { + conn.second->OnConnected(status); + break; + } } } -std::shared_ptr> SoftBusAdapter::GetSemaphore(int32_t connId) +std::string SoftBusAdapter::OnSessionClose(int32_t connId) { - lock_guard lock(statusMutex_); - if (sessionsStatus_.find(connId) == sessionsStatus_.end()) { - sessionsStatus_.emplace(connId, std::make_shared>(WAIT_MAX_TIME, SOFTBUS_ERR)); + lock_guard lock(connMutex_); + std::string name = ""; + for (const auto& conn : connects_) { + if (*conn.second == connId) { + name = conn.first; + connects_.erase(conn.first); + break; + } } - return sessionsStatus_[connId]; + return name; } bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, __attribute__((unused)) const struct DeviceId &peer) { - ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(), ToBeAnonymous(peer.deviceId).c_str()); - + ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(), + KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str()); return true; } @@ -307,26 +237,6 @@ int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) c return RemoveSessionServer("ohos.distributeddata", sessionName.c_str()); } -void SoftBusAdapter::InsertSession(const std::string &sessionName, int32_t connId) -{ - ZLOGD("[InsertSession] connId:%{public}d", connId); - connects_.InsertOrAssign(sessionName, connId); -} - -std::string SoftBusAdapter::DeleteSession(int32_t connId) -{ - std::string name = ""; - connects_.EraseIf([&connId, &name](auto &key, int32_t id) { - if (connId == id) { - name = key; - return true; - } - return false; - }); - - return name; -} - void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId, const PipeInfo &pipeInfo) { @@ -334,7 +244,7 @@ void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const st auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId, [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) { ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(), - ToBeAnonymous(deviceId).c_str()); + KvStoreUtils::ToBeAnonymous(deviceId).c_str()); DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId); value->OnMessage(deviceInfo, data, size, pipeInfo); TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size }; @@ -354,9 +264,10 @@ int32_t SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, uint16_t mask) void SoftBusAdapter::OnBroadcast(const DeviceId &device, uint16_t mask) { - ZLOGI("device:%{public}s mask:0x%{public}x", ToBeAnonymous(device.deviceId).c_str(), mask); + ZLOGI("device:%{public}s mask:0x%{public}x", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str(), mask); if (!onBroadcast_) { - ZLOGW("no listener device:%{public}s mask:0x%{public}x", ToBeAnonymous(device.deviceId).c_str(), mask); + ZLOGW("no listener device:%{public}s mask:0x%{public}x", + KvStoreUtils::ToBeAnonymous(device.deviceId).c_str(), mask); return; } onBroadcast_(device.deviceId, mask); @@ -426,13 +337,8 @@ int AppDataListenerWrap::OnConnectOpened(int connId, int result) } ZLOGD("[OnConnectOpened] conn id:%{public}d, my name:%{public}s, peer name:%{public}s, " - "peer devId:%{public}s, side:%{public}d", - connId, connInfo.myName, connInfo.peerName, SoftBusAdapter::ToBeAnonymous(connInfo.peerDevUuid).c_str(), - connInfo.side); - // only manage the connects opened by the local - if (connInfo.side == 1) { - softBusAdapter_->InsertSession(std::string(connInfo.myName) + connInfo.peerDevUuid, connId); - } + "peer devId:%{public}s, side:%{public}d", connId, connInfo.myName, connInfo.peerName, + KvStoreUtils::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side); return 0; } @@ -440,9 +346,8 @@ void AppDataListenerWrap::OnConnectClosed(int connId) { // when the local close the session, this callback function will not be triggered; // when the current function is called, soft bus has released the session resource, only connId is valid; - softBusAdapter_->OnSessionClose(connId); - std::string name = softBusAdapter_->DeleteSession(connId); - ZLOGI("[SessionClosed] connId:%{public}d, name:%{public}s", connId, SoftBusAdapter::ToBeAnonymous(name).c_str()); + std::string name = softBusAdapter_->OnSessionClose(connId); + ZLOGI("[SessionClosed] connId:%{public}d, name:%{public}s", connId, KvStoreUtils::ToBeAnonymous(name).c_str()); } void AppDataListenerWrap::OnBytesReceived(int connId, const void *data, unsigned int dataLen) @@ -455,8 +360,8 @@ void AppDataListenerWrap::OnBytesReceived(int connId, const void *data, unsigned } ZLOGD("[OnBytesReceived] conn id:%{public}d, peer name:%{public}s, " - "peer devId:%{public}s, side:%{public}d", - connId, connInfo.peerName, SoftBusAdapter::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side); + "peer devId:%{public}s, side:%{public}d, data len:%{public}u", connId, connInfo.peerName, + KvStoreUtils::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side, dataLen); NotifyDataListeners(reinterpret_cast(data), dataLen, connInfo.peerDevUuid, { std::string(connInfo.peerName), "" }); diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp new file mode 100644 index 000000000..017a53e94 --- /dev/null +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "SoftBusClient" +#include "device_manager_adapter.h" +#include "kvstore_utils.h" +#include "log_print.h" +#include "softbus_client.h" +#include "softbus_error_code.h" + +namespace OHOS::AppDistributedKv { +using namespace OHOS::DistributedKv; +using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter; +SoftBusClient::SoftBusClient(const PipeInfo &pipeInfo, const DeviceId &deviceId) : pipe_(pipeInfo), device_(deviceId) +{ + block_ = std::make_shared>(WAIT_MAX_TIME, SOFTBUS_ERR); + mtu_ = DEFAULT_MTU_SIZE; +} + +SoftBusClient::~SoftBusClient() +{ + if (block_) { + block_->Clear(SOFTBUS_ERR); + } + CloseSession(connId_); +} + +bool SoftBusClient::operator==(int32_t connId) +{ + return connId_ == connId; +} + +bool SoftBusClient::operator==(const DeviceId &deviceId) +{ + return device_.deviceId == deviceId.deviceId; +} + +void SoftBusClient::OnConnected(int32_t status) +{ + if (block_) { + block_->SetValue(status); + } +} + +void SoftBusClient::RestoreDefaultValue() +{ + connId_ = INVALID_CONNECT_ID; + status_ = ConnectStatus::DISCONNECT; + strategy_ = Strategy::DEFAULT; + mtu_ = DEFAULT_MTU_SIZE; +} + +uint32_t SoftBusClient::GetMtuSize() const +{ + ZLOGD("get mtu size connId:%{public}d mtu:%{public}d", connId_, mtu_); + return mtu_; +} + +Status SoftBusClient::Send(const uint8_t *data, int size) +{ + std::lock_guard lock(mutex_); + auto result = OpenConnect(); + if (result != Status::SUCCESS) { + return result; + } + + ZLOGD("send data connId:%{public}d, data size:%{public}d.", connId_, size); + int32_t ret = SendBytes(connId_, data, size); + if (ret != SOFTBUS_OK) { + ZLOGE("send data to connId%{public}d failed, ret:%{public}d.", connId_, ret); + return Status::ERROR; + } + + return Status::SUCCESS; +} + +Status SoftBusClient::OpenConnect() +{ + std::vector linkTypes; + Strategy strategy = CommunicationStrategy::GetInstance()->GetStrategy(device_.deviceId); + if (strategy != strategy_) { + ZLOGI("close connId:%{public}d,strategy current:%{public}d, new:%{public}d", connId_, strategy_, strategy); + CloseSession(connId_); + RestoreDefaultValue(); + } + + if (status_ == ConnectStatus::CONNECT_OK) { + return Status::SUCCESS; + } + + auto result = Open(strategy); + if (result != Status::SUCCESS) { + return result; + } + status_ = ConnectStatus::CONNECT_OK; + UpdateMtuSize(); + return Status::SUCCESS; +} + +Status SoftBusClient::Open(Strategy strategy) +{ + block_->Clear(SOFTBUS_ERR); + SessionAttribute attr = { 0 }; + InitSessionAttribute(strategy, attr); + int id = OpenSession(pipe_.pipeId.c_str(), pipe_.pipeId.c_str(), + DmAdapter::GetInstance().ToNetworkID(device_.deviceId).c_str(), "GROUP_ID", &attr); + ZLOGI("open %{public}s,session:%{public}s,connId:%{public}d,linkNum:%{public}d,strategy:%{public}d", + KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), id, attr.linkTypeNum, strategy); + if (id < 0) { + ZLOGW("Open %{public}s, type:%{public}d failed, connId:%{public}d", + pipe_.pipeId.c_str(), attr.dataType, id); + return Status::NETWORK_ERROR; + } + + connId_ = id; + strategy_ = strategy; + int state = block_->GetValue(); + ZLOGI("waited for notification, state:%{public}d connId:%{public}d", state, id); + if (state != SOFTBUS_OK) { + ZLOGE("open callback result error"); + return Status::NETWORK_ERROR; + } + return Status::SUCCESS; +} + +void SoftBusClient::InitSessionAttribute(Strategy strategy, SessionAttribute &attr) +{ + attr.dataType = TYPE_BYTES; + if (strategy == Strategy::DEFAULT) { + return; + } + + int index = 0; + attr.linkType[index++] = LINK_TYPE_WIFI_WLAN_5G; + attr.linkType[index++] = LINK_TYPE_WIFI_WLAN_2G; + attr.linkType[index++] = LINK_TYPE_WIFI_P2P; + attr.linkType[index++] = LINK_TYPE_BR; + attr.linkTypeNum = index; +} + +void SoftBusClient::UpdateMtuSize() +{ + uint32_t mtu = 0; + auto result = GetSessionOption(connId_, SESSION_OPTION_MAX_SENDBYTES_SIZE, &mtu, sizeof(mtu)); + if (result != SOFTBUS_OK) { + return; + } + mtu_ = mtu; +} +} \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h new file mode 100644 index 000000000..9f6bf6a2f --- /dev/null +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_SOFTBUS_CLIENT_H +#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_SOFTBUS_CLIENT_H + +#include +#include + +#include "block_data.h" +#include "communication_strategy.h" +#include "session.h" +#include "softbus_bus_center.h" +namespace OHOS::AppDistributedKv { +class SoftBusClient { +public: + SoftBusClient(const PipeInfo &pipeInfo, const DeviceId &deviceId); + ~SoftBusClient(); + + Status Send(const uint8_t *data, int size); + bool operator==(int32_t connId); + bool operator==(const DeviceId &deviceId); + void OnConnected(int32_t status); + uint32_t GetMtuSize() const; +private: + enum class ConnectStatus : int32_t { + CONNECT_OK, + DISCONNECT, + }; + using Strategy = CommunicationStrategy::Strategy; + Status OpenConnect(); + Status Open(Strategy strategy); + bool IsReconnect(Strategy strategy); + void InitSessionAttribute(Strategy strategy, SessionAttribute &attr); + void RestoreDefaultValue(); + void UpdateMtuSize(); + + static constexpr int32_t INVALID_CONNECT_ID = -1; + static constexpr uint32_t WAIT_MAX_TIME = 10; + static constexpr int32_t DEFAULT_MTU_SIZE = 4096; + int32_t connId_ = INVALID_CONNECT_ID; + Strategy strategy_ = Strategy::DEFAULT; + ConnectStatus status_ = ConnectStatus::DISCONNECT; + std::mutex mutex_; + std::shared_ptr > block_; + PipeInfo pipe_; + DeviceId device_; + uint32_t mtu_; +}; +} + + +#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_SOFTBUS_CLIENT_H diff --git a/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h b/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h new file mode 100644 index 000000000..1a3434af0 --- /dev/null +++ b/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H +#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H +#include "visibility.h" +namespace OHOS::DistributedData { +class API_EXPORT CalcSyncDataSize { +public: + virtual ~CalcSyncDataSize() = default; + + virtual uint32_t CalcDataSize(const std::string &deviceId) = 0; + +}; +} +#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H diff --git a/services/distributeddataservice/adapter/include/communicator/communication_provider.h b/services/distributeddataservice/adapter/include/communicator/communication_provider.h index c5c072857..3e5edad5c 100644 --- a/services/distributeddataservice/adapter/include/communicator/communication_provider.h +++ b/services/distributeddataservice/adapter/include/communicator/communication_provider.h @@ -81,6 +81,7 @@ public: virtual int32_t Broadcast(const PipeInfo &pipeInfo, uint16_t mask) = 0; virtual int32_t ListenBroadcastMsg(const PipeInfo &pipeInfo, std::function listener) = 0; + virtual uint32_t GetMtuSize(const DeviceId &deviceId) const = 0; }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/include/communicator/communication_strategy.h b/services/distributeddataservice/adapter/include/communicator/communication_strategy.h new file mode 100644 index 000000000..c1ed9f44b --- /dev/null +++ b/services/distributeddataservice/adapter/include/communicator/communication_strategy.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRATEGY_H +#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRATEGY_H +#include +#include + +#include "calc_sync_data_size.h" +namespace OHOS::AppDistributedKv { +using namespace OHOS::DistributedData; +class API_EXPORT CommunicationStrategy { +public: + enum class Strategy : int32_t { + DEFAULT, + ON_LINE_SELECT_CHANNEL, + }; + virtual ~CommunicationStrategy() = default; + static std::shared_ptr GetInstance(); + virtual void RegObject(std::shared_ptr object) = 0; + virtual CommunicationStrategy::Strategy GetStrategy(const std::string &deviceId) = 0; +private: + static std::mutex mutex_; + static std::shared_ptr instance_; +}; +} + +#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRATEGY_H diff --git a/services/distributeddataservice/app/BUILD.gn b/services/distributeddataservice/app/BUILD.gn index e7ffbb25a..ec981dfc2 100644 --- a/services/distributeddataservice/app/BUILD.gn +++ b/services/distributeddataservice/app/BUILD.gn @@ -72,6 +72,7 @@ config("module_private_config") { "src", "src/security", "src/backup_rule/include", + "src/calc_sync_data", "//third_party/json/single_include", ] @@ -94,6 +95,7 @@ ohos_shared_library("distributeddataservice") { "src/session_manager/route_head_handler_impl.cpp", "src/session_manager/session_manager.cpp", "src/session_manager/upgrade_manager.cpp", + "src/calc_sync_data/calc_sync_data_size_impl.cpp", ] if (datamgr_service_power) { diff --git a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp new file mode 100644 index 000000000..32caa7537 --- /dev/null +++ b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "CalcSyncDataSizeImpl" +#include "calc_kvdb_sync_data_size.h" +#include "calc_sync_data_size_impl.h" +#include "communication_provider.h" +#include "device_manager_adapter.h" +#include "eventcenter/event_center.h" +#include "kvstore_meta_manager.h" +#include "kvstore_utils.h" +#include "log_print.h" +#include "metadata/meta_data_manager.h" +#include "metadata/store_meta_data_local.h" + +namespace OHOS::DistributedData { +using namespace OHOS::DistributedKv; +using namespace OHOS::AppDistributedKv; +using Commu = AppDistributedKv::CommunicationProvider; +using KvStoreUtils = OHOS::DistributedKv::KvStoreUtils; +using DMAdapter = DistributedData::DeviceManagerAdapter; +CalcSyncDataSizeImpl::CalcSyncDataSizeImpl() +{ + Commu::GetInstance().StartWatchDeviceChange(this, { "calcSyncDataSize" }); +} + +uint32_t CalcSyncDataSizeImpl::CalcDataSize(const std::string &deviceId) +{ + auto it = dataSizes_.Find(deviceId); + if (!it.first) { + return 0; + } + return it.second; +} + +void CalcSyncDataSizeImpl::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, + const AppDistributedKv::DeviceChangeType &type) const +{ + switch (type) { + case AppDistributedKv::DeviceChangeType::DEVICE_ONLINE: + CalcSyncDataSize(info.uuid); + break; + case AppDistributedKv::DeviceChangeType::DEVICE_ONREADY: + dataSizes_.Erase(info.uuid); + break; + default: + break; + } +} + +void CalcSyncDataSizeImpl::CalcSyncDataSize(const std::string &deviceId) const +{ + uint32_t metaSize = CalcMetaDataSize(deviceId); + uint32_t dataSize = CalcKvDataSize(deviceId); + uint32_t totalSize = metaSize + dataSize; + dataSizes_.InsertOrAssign(deviceId, totalSize); + ZLOGI("deviceId: %{public}s, sync total size:%{public}u, meta:%{public}u data:%{public}u", + KvStoreUtils::ToBeAnonymous(deviceId).c_str(), totalSize, metaSize, dataSize); + return; +} + +uint32_t CalcSyncDataSizeImpl::CalcMetaDataSize(const std::string &deviceId) const +{ + uint32_t dataSize = 0; + auto store = KvStoreMetaManager::GetInstance().GetMetaKvStore(); + store->CalculateSyncDataSize(deviceId, dataSize); + return dataSize; +} + +uint32_t CalcSyncDataSizeImpl::CalcKvDataSize(const std::string &deviceId) const +{ + std::vector metaData; + auto prefix = StoreMetaData::GetPrefix({ DMAdapter::GetInstance().GetLocalDevice().uuid }); + if (!MetaDataManager::GetInstance().LoadMeta(prefix, metaData)) { + ZLOGE("load meta failed!"); + return 0; + } + + uint32_t totalSize = 0; + for (const auto &data : metaData) { + StoreMetaDataLocal localMetaData; + MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), localMetaData, true); + if (!localMetaData.HasPolicy(PolicyType::IMMEDIATE_SYNC_ON_ONLINE)) { + continue; + } + + DistributedDB::DBStatus status; + uint32_t dataSize = CalcKvSyncDataSize::CalcSyncDataSize(data, status); + totalSize += dataSize; + } + return totalSize; +} +} \ No newline at end of file diff --git a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h new file mode 100644 index 000000000..b86446362 --- /dev/null +++ b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H +#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H + +#include "app_device_change_listener.h" +#include "concurrent_map.h" +#include "calc_sync_data_size.h" +namespace OHOS::DistributedData { +class API_EXPORT CalcSyncDataSizeImpl : public CalcSyncDataSize, public AppDistributedKv::AppDeviceChangeListener { +public: + CalcSyncDataSizeImpl(); + virtual ~CalcSyncDataSizeImpl() = default; + void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, + const AppDistributedKv::DeviceChangeType &type) const override; + uint32_t CalcDataSize(const std::string &deviceId) override; +private: + void CalcSyncDataSize(const std::string &deviceId) const; + uint32_t CalcMetaDataSize(const std::string &deviceId) const; + uint32_t CalcKvDataSize(const std::string &deviceId) const; + + mutable ConcurrentMap dataSizes_; +}; +} +#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H diff --git a/services/distributeddataservice/app/src/kvstore_data_service.cpp b/services/distributeddataservice/app/src/kvstore_data_service.cpp index 221d484a9..a3916a939 100644 --- a/services/distributeddataservice/app/src/kvstore_data_service.cpp +++ b/services/distributeddataservice/app/src/kvstore_data_service.cpp @@ -26,8 +26,10 @@ #include "auth_delegate.h" #include "auto_launch_export.h" #include "bootstrap.h" +#include "calc_sync_data_size_impl.h" #include "checker/checker_manager.h" #include "communication_provider.h" +#include "communication_strategy.h" #include "config_factory.h" #include "constant.h" #include "dds_trace.h" @@ -64,6 +66,7 @@ using namespace std::chrono; using namespace OHOS::DistributedData; using namespace OHOS::DistributedDataDfx; using namespace OHOS::Security::AccessToken; +using namespace OHOS::AppDistributedKv; using KvStoreDelegateManager = DistributedDB::KvStoreDelegateManager; using SecretKeyMeta = DistributedData::SecretKeyMetaData; using StrategyMetaData = DistributedData::StrategyMeta; @@ -109,6 +112,7 @@ void KvStoreDataService::Initialize() KvStoreMetaManager::GetInstance().InitMetaParameter(); accountEventObserver_ = std::make_shared(*this); AccountDelegate::GetInstance()->Subscribe(accountEventObserver_); + CommunicationStrategy::GetInstance()->RegObject(std::make_shared()); deviceInnerListener_ = std::make_unique(*this); AppDistributedKv::CommunicationProvider::GetInstance().StartWatchDeviceChange( deviceInnerListener_.get(), { "innerListener" }); diff --git a/services/distributeddataservice/app/src/kvstore_meta_manager.h b/services/distributeddataservice/app/src/kvstore_meta_manager.h index a8fd3031d..33de6aaf4 100644 --- a/services/distributeddataservice/app/src/kvstore_meta_manager.h +++ b/services/distributeddataservice/app/src/kvstore_meta_manager.h @@ -175,11 +175,9 @@ public: bool GetKvStoreMetaDataByAppId(const std::string &appId, KvStoreMetaData &metaData); bool GetFullMetaData(std::map &entries, enum DatabaseType type = KVDB); - -private: using NbDelegate = std::shared_ptr; NbDelegate GetMetaKvStore(); - +private: NbDelegate CreateMetaKvStore(); void ConfigMetaDataManager(); diff --git a/services/distributeddataservice/service/BUILD.gn b/services/distributeddataservice/service/BUILD.gn index c81d4dc02..65ff84b01 100644 --- a/services/distributeddataservice/service/BUILD.gn +++ b/services/distributeddataservice/service/BUILD.gn @@ -74,6 +74,7 @@ ohos_shared_library("distributeddatasvc") { "data_share/uri_utils.cpp", "directory/src/directory_manager.cpp", "kvdb/auth_delegate.cpp", + "kvdb/calc_kvdb_sync_data_size.cpp", "kvdb/executor_factory.cpp", "kvdb/kvdb_exporter.cpp", "kvdb/kvdb_service_impl.cpp", diff --git a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp new file mode 100644 index 000000000..24872cf15 --- /dev/null +++ b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "calc_kvdb_sync_data_size.h" +#include "directory_manager.h" +#include "store_cache.h" +namespace OHOS::DistributedKv { +using DBManager = DistributedDB::KvStoreDelegateManager; +using DBStore = DistributedDB::KvStoreNbDelegate; +uint32_t CalcKvSyncDataSize::CalcSyncDataSize(const StoreMetaData &data, DBStatus &status) +{ + DBStore *dbStore = nullptr; + DBManager manager(data.appId, data.user, data.instanceId); + manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) }); + StoreCache cache; + manager.GetKvStore(data.storeId, cache.GetDBOption(data, cache.GetDBPassword(data)), + [&status, &dbStore](auto dbStatus, auto *tmpStore) { + status = dbStatus; + dbStore = tmpStore; + }); + + if (dbStore == nullptr) { + return 0; + } + + uint32_t dataSize = 0; + dbStore->CalculateSyncDataSize(data.deviceId, dataSize); + manager.CloseKvStore(dbStore); + return dataSize; +} +} \ No newline at end of file diff --git a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h new file mode 100644 index 000000000..b273ec798 --- /dev/null +++ b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H +#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H + +#include "metadata/store_meta_data.h" +#include "store_types.h" +#include "visibility.h" +namespace OHOS::DistributedKv { +using namespace DistributedDB; +using namespace OHOS::DistributedData; +class API_EXPORT CalcKvSyncDataSize final { +public: + static uint32_t CalcSyncDataSize(const StoreMetaData &data, DBStatus &status); +}; +} +#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H -- Gitee From d5e85179472ebdc6c48e97915dd888dd03c3597a Mon Sep 17 00:00:00 2001 From: houpengtao Date: Tue, 15 Nov 2022 09:27:02 +0800 Subject: [PATCH 2/3] fixed code check err and review comments Signed-off-by: houpengtao --- .../adapter/communicator/src/communication_provider_impl.cpp | 1 - .../adapter/communicator/src/softbus_adapter.h | 3 --- .../adapter/communicator/src/softbus_client.cpp | 5 +++-- .../adapter/communicator/src/softbus_client.h | 2 +- .../adapter/include/communicator/calc_sync_data_size.h | 2 +- .../adapter/include/communicator/communication_strategy.h | 5 ++++- .../app/src/calc_sync_data/calc_sync_data_size_impl.cpp | 4 ++-- .../app/src/calc_sync_data/calc_sync_data_size_impl.h | 2 +- .../service/kvdb/calc_kvdb_sync_data_size.cpp | 4 ++-- .../service/kvdb/calc_kvdb_sync_data_size.h | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp index 548f2fc0b..873e9e4f1 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp @@ -35,7 +35,6 @@ CommunicationProviderImpl::~CommunicationProviderImpl() Status CommunicationProviderImpl::Initialize() { - DmAdapter::GetInstance().Init(); return Status::SUCCESS; } diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index 2293e0c70..2000ed5cb 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -68,9 +68,6 @@ public: uint32_t GetMtuSize(const DeviceId &deviceId); private: - using KvScheduler = OHOS::DistributedKv::KvScheduler; - std::function CloseIdleConnect(); - static constexpr int32_t CONNECT_IDLE_CLOSE_COUNT = 60; static std::shared_ptr instance_; ConcurrentMap dataChangeListeners_{}; diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp index 017a53e94..9a4c82277 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -17,8 +17,8 @@ #include "device_manager_adapter.h" #include "kvstore_utils.h" #include "log_print.h" -#include "softbus_client.h" #include "softbus_error_code.h" +#include "softbus_client.h" namespace OHOS::AppDistributedKv { using namespace OHOS::DistributedKv; @@ -90,7 +90,7 @@ Status SoftBusClient::OpenConnect() { std::vector linkTypes; Strategy strategy = CommunicationStrategy::GetInstance()->GetStrategy(device_.deviceId); - if (strategy != strategy_) { + if (strategy != strategy_ && connId_ > 0) { ZLOGI("close connId:%{public}d,strategy current:%{public}d, new:%{public}d", connId_, strategy_, strategy); CloseSession(connId_); RestoreDefaultValue(); @@ -138,6 +138,7 @@ Status SoftBusClient::Open(Strategy strategy) void SoftBusClient::InitSessionAttribute(Strategy strategy, SessionAttribute &attr) { attr.dataType = TYPE_BYTES; + // If the dataType is BYTES, the default strategy is wifi_5G > wifi_2.4G > BR, without P2P; if (strategy == Strategy::DEFAULT) { return; } diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h index 9f6bf6a2f..13ef49a52 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -62,4 +62,4 @@ private: } -#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_SOFTBUS_CLIENT_H +#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_SOFTBUS_CLIENT_H diff --git a/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h b/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h index 1a3434af0..3b2d2c706 100644 --- a/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h +++ b/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h @@ -25,4 +25,4 @@ public: }; } -#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H +#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H diff --git a/services/distributeddataservice/adapter/include/communicator/communication_strategy.h b/services/distributeddataservice/adapter/include/communicator/communication_strategy.h index c1ed9f44b..10de989a0 100644 --- a/services/distributeddataservice/adapter/include/communicator/communication_strategy.h +++ b/services/distributeddataservice/adapter/include/communicator/communication_strategy.h @@ -24,7 +24,10 @@ using namespace OHOS::DistributedData; class API_EXPORT CommunicationStrategy { public: enum class Strategy : int32_t { + // If AP is available, the AP is preferred. When AP is not available, only BR can be used; p2p is not support DEFAULT, + // If AP is available, the AP is preferred. When AP is not available, BR is used for a small amount of data + // and P2P is used for a large amount of data; The strategy takes effect only at the device online stage; ON_LINE_SELECT_CHANNEL, }; virtual ~CommunicationStrategy() = default; @@ -37,4 +40,4 @@ private: }; } -#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRATEGY_H +#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRATEGY_H diff --git a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp index 32caa7537..a1c43c5b1 100644 --- a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp +++ b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp @@ -15,7 +15,6 @@ #define LOG_TAG "CalcSyncDataSizeImpl" #include "calc_kvdb_sync_data_size.h" -#include "calc_sync_data_size_impl.h" #include "communication_provider.h" #include "device_manager_adapter.h" #include "eventcenter/event_center.h" @@ -24,6 +23,7 @@ #include "log_print.h" #include "metadata/meta_data_manager.h" #include "metadata/store_meta_data_local.h" +#include "calc_sync_data_size_impl.h" namespace OHOS::DistributedData { using namespace OHOS::DistributedKv; @@ -82,7 +82,7 @@ uint32_t CalcSyncDataSizeImpl::CalcMetaDataSize(const std::string &deviceId) con uint32_t CalcSyncDataSizeImpl::CalcKvDataSize(const std::string &deviceId) const { std::vector metaData; - auto prefix = StoreMetaData::GetPrefix({ DMAdapter::GetInstance().GetLocalDevice().uuid }); + auto prefix = StoreMetaData::GetPrefix({DMAdapter::GetInstance().GetLocalDevice().uuid}); if (!MetaDataManager::GetInstance().LoadMeta(prefix, metaData)) { ZLOGE("load meta failed!"); return 0; diff --git a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h index b86446362..605d2f756 100644 --- a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h +++ b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h @@ -35,4 +35,4 @@ private: mutable ConcurrentMap dataSizes_; }; } -#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H +#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H diff --git a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp index 24872cf15..c78804839 100644 --- a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp +++ b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp @@ -23,7 +23,7 @@ uint32_t CalcKvSyncDataSize::CalcSyncDataSize(const StoreMetaData &data, DBStatu { DBStore *dbStore = nullptr; DBManager manager(data.appId, data.user, data.instanceId); - manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) }); + manager.SetKvStoreConfig({DirectoryManager::GetInstance().GetStorePath(data)}); StoreCache cache; manager.GetKvStore(data.storeId, cache.GetDBOption(data, cache.GetDBPassword(data)), [&status, &dbStore](auto dbStatus, auto *tmpStore) { @@ -31,7 +31,7 @@ uint32_t CalcKvSyncDataSize::CalcSyncDataSize(const StoreMetaData &data, DBStatu dbStore = tmpStore; }); - if (dbStore == nullptr) { + if (status != DistributedDB::DBStatus::OK || dbStore == nullptr) { return 0; } diff --git a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h index b273ec798..065bdea2c 100644 --- a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h +++ b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h @@ -27,4 +27,4 @@ public: static uint32_t CalcSyncDataSize(const StoreMetaData &data, DBStatus &status); }; } -#endif //DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H +#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H -- Gitee From f6aa99800dfddb3911ada697afc34a15d812781d Mon Sep 17 00:00:00 2001 From: houpengtao Date: Thu, 17 Nov 2022 17:26:27 +0800 Subject: [PATCH 3/3] modify review comments Signed-off-by: houpengtao --- .../adapter/communicator/BUILD.gn | 4 +- .../src/communication_provider_impl.cpp | 5 - .../src/communication_provider_impl.h | 1 - .../src/communication_strategy.cpp | 64 +++++++++-- .../src/communication_strategy_impl.cpp | 37 ------ .../src/communication_strategy_impl.h | 36 ------ .../src/process_communicator_impl.cpp | 8 +- .../communicator/src/softbus_adapter.h | 19 +++- .../src/softbus_adapter_standard.cpp | 104 ++++++++++++++--- .../communicator/src/softbus_client.cpp | 47 ++++---- .../adapter/communicator/src/softbus_client.h | 18 +-- .../communicator/calc_sync_data_size.h | 28 ----- .../communicator/communication_provider.h | 1 - .../communicator/communication_strategy.h | 29 +++-- services/distributeddataservice/app/BUILD.gn | 2 - .../calc_sync_data_size_impl.cpp | 105 ------------------ .../calc_sync_data/calc_sync_data_size_impl.h | 38 ------- .../app/src/kvstore_data_service.cpp | 4 - .../app/src/kvstore_meta_manager.cpp | 15 +++ .../app/src/kvstore_meta_manager.h | 4 +- .../distributeddataservice/service/BUILD.gn | 1 - .../service/kvdb/calc_kvdb_sync_data_size.cpp | 43 ------- .../service/kvdb/calc_kvdb_sync_data_size.h | 30 ----- .../service/kvdb/kvdb_service_impl.cpp | 30 +++++ .../service/kvdb/kvdb_service_impl.h | 2 +- 25 files changed, 261 insertions(+), 414 deletions(-) delete mode 100644 services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp delete mode 100644 services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h delete mode 100644 services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h delete mode 100644 services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp delete mode 100644 services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h delete mode 100644 services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp delete mode 100644 services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h diff --git a/services/distributeddataservice/adapter/communicator/BUILD.gn b/services/distributeddataservice/adapter/communicator/BUILD.gn index b8f0199bc..90916afdd 100755 --- a/services/distributeddataservice/adapter/communicator/BUILD.gn +++ b/services/distributeddataservice/adapter/communicator/BUILD.gn @@ -23,15 +23,13 @@ ohos_static_library("distributeddata_communicator_static") { "src/communication_provider_impl.cpp", "src/communication_provider_impl.h", "src/communication_strategy.cpp", - "src/communication_strategy_impl.cpp", - "src/communication_strategy_impl.h", "src/data_buffer.cpp", "src/device_manager_adapter.cpp", "src/process_communicator_impl.cpp", "src/softbus_adapter.h", "src/softbus_adapter_standard.cpp", - "src/softbus_client.h", "src/softbus_client.cpp", + "src/softbus_client.h", ] include_dirs = [ diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp index 873e9e4f1..75b71e644 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp @@ -84,10 +84,5 @@ int32_t CommunicationProviderImpl::ListenBroadcastMsg(const PipeInfo &pipeInfo, { return SoftBusAdapter::GetInstance()->ListenBroadcastMsg(pipeInfo, std::move(listener)); } - -uint32_t CommunicationProviderImpl::GetMtuSize(const DeviceId &deviceId) const -{ - return SoftBusAdapter::GetInstance()->GetMtuSize(deviceId); -} } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h index ee831375c..469a7101d 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.h @@ -51,7 +51,6 @@ public: int32_t Broadcast(const PipeInfo &pipeInfo, uint16_t mask) override; int32_t ListenBroadcastMsg(const PipeInfo &pipeInfo, std::function listener) override; - uint32_t GetMtuSize(const DeviceId &deviceId) const override; protected: virtual Status Initialize(); diff --git a/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp b/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp index 863bc3731..f4e72eba7 100644 --- a/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_strategy.cpp @@ -12,21 +12,61 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + +#define LOG_TAG "CommunicationStrategy" #include "communication_strategy.h" -#include "communication_strategy_impl.h" +#include "log_print.h" +#include "kvstore_utils.h" + +namespace OHOS { +namespace AppDistributedKv { +CommunicationStrategy &CommunicationStrategy::GetInstance() +{ + static CommunicationStrategy instance; + return instance; +} -namespace OHOS::AppDistributedKv { -std::shared_ptr CommunicationStrategy::instance_; -std::mutex CommunicationStrategy::mutex_; -std::shared_ptr CommunicationStrategy::GetInstance() +void CommunicationStrategy::RegGetSyncDataSize(const std::string &type, + const std::function &getDataSize) { - if (instance_ == nullptr) { - std::lock_guard lock(mutex_); - if (instance_ == nullptr) { - instance_ = std::make_shared(); + calcDataSizes_.InsertOrAssign(type, getDataSize); +} + +size_t CommunicationStrategy::CalcSyncDataSize(const std::string &deviceId) +{ + size_t dataSize = 0; + calcDataSizes_.ForEach([&dataSize, &deviceId](const std::string &key, auto &value) { + if (value) { + dataSize += value(deviceId); } + return false; + }); + ZLOGD("calc data size:%{public}zu.", dataSize); + return dataSize; +} + +void CommunicationStrategy::SetStrategy(const std::string &deviceId, Strategy strategy, + const std::function &action) +{ + auto value = strategy; + if (strategy == Strategy::ON_LINE_SELECT_CHANNEL && CalcSyncDataSize(deviceId) < SWITCH_CONNECTION_THRESHOLD) { + value = Strategy::DEFAULT; + } + if (action) { + action(deviceId, value); + } + strategys_.InsertOrAssign(deviceId, value); + return ; +} + +CommunicationStrategy::Strategy CommunicationStrategy::GetStrategy(const std::string &deviceId) +{ + auto result = strategys_.Find(deviceId); + if (!result.first) { + return Strategy::DEFAULT; } - return instance_; + + return result.second; } -} // namespace OHOS::AppDistributedKv \ No newline at end of file +} // namespace AppDistributedKv +} // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp deleted file mode 100644 index 79c57e276..000000000 --- a/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.cpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define LOG_TAG "CommunicationStrategyImpl" -#include "communication_strategy_impl.h" -#include "log_print.h" -#include "kvstore_utils.h" - -namespace OHOS { -namespace AppDistributedKv { -using KvStoreUtils = OHOS::DistributedKv::KvStoreUtils; -void CommunicationStrategyImpl::RegObject(std::shared_ptr object) -{ - calcSyncDataSize_ = object; -} - -CommunicationStrategy::Strategy CommunicationStrategyImpl::GetStrategy(const std::string &deviceId) -{ - if (calcSyncDataSize_ == nullptr || calcSyncDataSize_->CalcDataSize(deviceId) < SWITCH_CONNECTION_THRESHOLD) { - return CommunicationStrategy::Strategy::DEFAULT; - } - return CommunicationStrategy::Strategy::ON_LINE_SELECT_CHANNEL; -} -} // namespace AppDistributedKv -} // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h b/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h deleted file mode 100644 index 4bc4b500e..000000000 --- a/services/distributeddataservice/adapter/communicator/src/communication_strategy_impl.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRAGETY_H -#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRAGETY_H - -#include - -#include "communication_strategy.h" -#include "visibility.h" -namespace OHOS { -namespace AppDistributedKv { -class CommunicationStrategyImpl : public CommunicationStrategy { -public: - void RegObject(std::shared_ptr object) override; - virtual CommunicationStrategy::Strategy GetStrategy(const std::string &deviceId) override; -private: - static constexpr uint32_t SWITCH_CONNECTION_THRESHOLD = 75 * 1024; - std::shared_ptr calcSyncDataSize_; -}; -} // namespace AppDistributedKv -} // namespace OHOS - -#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_COMMUNICATION_STRAGETY_H diff --git a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp index 0007a58d5..380a01a1b 100644 --- a/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/process_communicator_impl.cpp @@ -15,11 +15,10 @@ #define LOG_TAG "processCommunication" -#include "process_communicator_impl.h" - #include "device_manager_adapter.h" #include "log_print.h" - +#include "softbus_adapter.h" +#include "process_communicator_impl.h" namespace OHOS { namespace AppDistributedKv { using namespace DistributedDB; @@ -147,12 +146,11 @@ uint32_t ProcessCommunicatorImpl::GetMtuSize() uint32_t ProcessCommunicatorImpl::GetMtuSize(const DeviceInfos &devInfo) { ZLOGI("GetMtuSize start"); - const auto &comm = CommunicationProvider::GetInstance(); DeviceId deviceId = { .deviceId = devInfo.identifier }; - return comm.GetMtuSize(deviceId); + return SoftBusAdapter::GetInstance()->GetMtuSize(deviceId); } DeviceInfos ProcessCommunicatorImpl::GetLocalDeviceInfos() diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index 2000ed5cb..277d2d73d 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -26,6 +26,8 @@ #include #include "app_data_change_listener.h" +#include "app_device_change_listener.h" +#include "block_data.h" #include "platform_specific.h" #include "session.h" #include "softbus_bus_center.h" @@ -58,6 +60,8 @@ public: void NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId, const PipeInfo &pipeInfo); + int32_t GetSessionStatus(int32_t connId); + void OnSessionOpen(int32_t connId, int32_t status); std::string OnSessionClose(int32_t connId); @@ -67,15 +71,28 @@ public: int32_t ListenBroadcastMsg(const PipeInfo &pipeInfo, std::function listener); uint32_t GetMtuSize(const DeviceId &deviceId); + std::shared_ptr GetConnect(const std::string &deviceId); + + class SofBusDeviceChangeListenerImpl : public AppDistributedKv::AppDeviceChangeListener { + void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, + const AppDistributedKv::DeviceChangeType &type) const override; + }; private: - static constexpr int32_t CONNECT_IDLE_CLOSE_COUNT = 60; + std::shared_ptr> GetSemaphore(int32_t connId); + std::string DelConnect(int32_t connId); + void DelSessionStatus(int32_t connId); + void AfterStrategyUpdate(const std::string &deviceId); + static constexpr uint32_t WAIT_MAX_TIME = 10; static std::shared_ptr instance_; ConcurrentMap dataChangeListeners_{}; std::mutex connMutex_{}; std::map> connects_ {}; bool flag_ = true; // only for br flag ISessionListener sessionListener_{}; + std::mutex statusMutex_{}; + std::map>> sessionsStatus_; std::function onBroadcast_; + static SofBusDeviceChangeListenerImpl listener_; }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index 4dc0a9260..e86d19455 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -41,11 +41,12 @@ enum SoftBusAdapterErrorCode : int32_t { }; constexpr int32_t SESSION_NAME_SIZE_MAX = 65; constexpr int32_t DEVICE_ID_SIZE_MAX = 65; -static constexpr int32_t DEFAULT_MTU_SIZE = 4096; +constexpr uint32_t DEFAULT_MTU_SIZE = 4096u; using namespace std; using namespace OHOS::DistributedDataDfx; using namespace OHOS::DistributedKv; using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter; +using Strategy = CommunicationStrategy::Strategy; struct ConnDetailsInfo { char myName[SESSION_NAME_SIZE_MAX] = ""; char peerName[SESSION_NAME_SIZE_MAX] = ""; @@ -99,7 +100,7 @@ INodeStateCb g_callback = { .onNodeStatusChanged = OnCareEvent, }; } // namespace - +SoftBusAdapter::SofBusDeviceChangeListenerImpl SoftBusAdapter::listener_; SoftBusAdapter::SoftBusAdapter() { ZLOGI("begin"); @@ -109,6 +110,11 @@ SoftBusAdapter::SoftBusAdapter() sessionListener_.OnSessionClosed = AppDataListenerWrap::OnConnectClosed; sessionListener_.OnBytesReceived = AppDataListenerWrap::OnBytesReceived; sessionListener_.OnMessageReceived = AppDataListenerWrap::OnBytesReceived; + + auto status = DmAdapter::GetInstance().StartWatchDeviceChange(&listener_, {"softBusAdapter"}); + if (status != Status::SUCCESS) { + ZLOGW("register device change failed, status:%d", static_cast(status)); + } } SoftBusAdapter::~SoftBusAdapter() @@ -162,45 +168,44 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device lock_guard lock(connMutex_); std::string key = pipeInfo.pipeId + deviceId.deviceId; if (connects_.find(key) == connects_.end()) { - connects_.emplace(key, std::make_shared(pipeInfo, deviceId)); + connects_.emplace(key, std::make_shared(pipeInfo, deviceId, [this](int32_t connId) { + return GetSessionStatus(connId); + })); } conn = connects_[key]; } - if (conn) { + if (conn != nullptr) { return conn->Send(data, size); } return Status::ERROR; } -uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId) +std::shared_ptr SoftBusAdapter::GetConnect(const std::string &deviceId) { lock_guard lock(connMutex_); for (const auto& conn : connects_) { if (*conn.second == deviceId) { - return conn.second->GetMtuSize(); + return conn.second; } } - - return DEFAULT_MTU_SIZE; + return nullptr; } -void SoftBusAdapter::OnSessionOpen(int32_t connId, int32_t status) +uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId) { - lock_guard lock(connMutex_); - for (const auto& conn : connects_) { - if (*conn.second == connId) { - conn.second->OnConnected(status); - break; - } + std::shared_ptr conn = GetConnect(deviceId.deviceId); + if (conn != nullptr) { + return conn->GetMtuSize(); } + return DEFAULT_MTU_SIZE; } -std::string SoftBusAdapter::OnSessionClose(int32_t connId) +std::string SoftBusAdapter::DelConnect(int32_t connId) { lock_guard lock(connMutex_); - std::string name = ""; + std::string name; for (const auto& conn : connects_) { if (*conn.second == connId) { name = conn.first; @@ -211,6 +216,43 @@ std::string SoftBusAdapter::OnSessionClose(int32_t connId) return name; } +void SoftBusAdapter::DelSessionStatus(int32_t connId) +{ + lock_guard lock(statusMutex_); + auto it = sessionsStatus_.find(connId); + if (it != sessionsStatus_.end()) { + it->second->Clear(SOFTBUS_ERR); + sessionsStatus_.erase(it); + } +} + +int32_t SoftBusAdapter::GetSessionStatus(int32_t connId) +{ + auto semaphore = GetSemaphore(connId); + return semaphore->GetValue(); +} + +void SoftBusAdapter::OnSessionOpen(int32_t connId, int32_t status) +{ + auto semaphore = GetSemaphore(connId); + semaphore->SetValue(status); +} + +std::string SoftBusAdapter::OnSessionClose(int32_t connId) +{ + DelSessionStatus(connId); + return DelConnect(connId); +} + +std::shared_ptr> SoftBusAdapter::GetSemaphore(int32_t connId) +{ + lock_guard lock(statusMutex_); + if (sessionsStatus_.find(connId) == sessionsStatus_.end()) { + sessionsStatus_.emplace(connId, std::make_shared>(WAIT_MAX_TIME, SOFTBUS_ERR)); + } + return sessionsStatus_[connId]; +} + bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, __attribute__((unused)) const struct DeviceId &peer) { @@ -372,5 +414,33 @@ void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int siz { softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo); } + +void SoftBusAdapter::SofBusDeviceChangeListenerImpl::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, + const AppDistributedKv::DeviceChangeType &type) const +{ + Strategy strategy = Strategy::BUTT; + switch (type) { + case AppDistributedKv::DeviceChangeType::DEVICE_ONLINE: + strategy = Strategy::ON_LINE_SELECT_CHANNEL; + break; + case AppDistributedKv::DeviceChangeType::DEVICE_ONREADY: + strategy = Strategy::DEFAULT; + break; + default: + break; + } + + if (strategy >= Strategy::BUTT) { + return; + } + + CommunicationStrategy::GetInstance().SetStrategy(info.uuid, strategy, + [this](const std::string deviceId, Strategy strategy) { + std::shared_ptr conn = SoftBusAdapter::GetInstance()->GetConnect(deviceId); + if (conn != nullptr) { + conn->AfterStrategyUpdate(strategy); + } + }); +} } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp index 9a4c82277..20aa2f6b3 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -23,18 +23,18 @@ namespace OHOS::AppDistributedKv { using namespace OHOS::DistributedKv; using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter; -SoftBusClient::SoftBusClient(const PipeInfo &pipeInfo, const DeviceId &deviceId) : pipe_(pipeInfo), device_(deviceId) +SoftBusClient::SoftBusClient(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const std::function &getConnStatus) + : pipe_(pipeInfo), device_(deviceId), getConnStatus_(getConnStatus) { - block_ = std::make_shared>(WAIT_MAX_TIME, SOFTBUS_ERR); mtu_ = DEFAULT_MTU_SIZE; } SoftBusClient::~SoftBusClient() { - if (block_) { - block_->Clear(SOFTBUS_ERR); + if (connId_ > 0) { + CloseSession(connId_); } - CloseSession(connId_); } bool SoftBusClient::operator==(int32_t connId) @@ -42,16 +42,9 @@ bool SoftBusClient::operator==(int32_t connId) return connId_ == connId; } -bool SoftBusClient::operator==(const DeviceId &deviceId) +bool SoftBusClient::operator==(const std::string &deviceId) { - return device_.deviceId == deviceId.deviceId; -} - -void SoftBusClient::OnConnected(int32_t status) -{ - if (block_) { - block_->SetValue(status); - } + return device_.deviceId == deviceId; } void SoftBusClient::RestoreDefaultValue() @@ -88,19 +81,11 @@ Status SoftBusClient::Send(const uint8_t *data, int size) Status SoftBusClient::OpenConnect() { - std::vector linkTypes; - Strategy strategy = CommunicationStrategy::GetInstance()->GetStrategy(device_.deviceId); - if (strategy != strategy_ && connId_ > 0) { - ZLOGI("close connId:%{public}d,strategy current:%{public}d, new:%{public}d", connId_, strategy_, strategy); - CloseSession(connId_); - RestoreDefaultValue(); - } - if (status_ == ConnectStatus::CONNECT_OK) { return Status::SUCCESS; } - auto result = Open(strategy); + auto result = Open(); if (result != Status::SUCCESS) { return result; } @@ -109,9 +94,9 @@ Status SoftBusClient::OpenConnect() return Status::SUCCESS; } -Status SoftBusClient::Open(Strategy strategy) +Status SoftBusClient::Open() { - block_->Clear(SOFTBUS_ERR); + Strategy strategy = CommunicationStrategy::GetInstance().GetStrategy(device_.deviceId); SessionAttribute attr = { 0 }; InitSessionAttribute(strategy, attr); int id = OpenSession(pipe_.pipeId.c_str(), pipe_.pipeId.c_str(), @@ -126,7 +111,7 @@ Status SoftBusClient::Open(Strategy strategy) connId_ = id; strategy_ = strategy; - int state = block_->GetValue(); + int state = getConnStatus_(connId_); ZLOGI("waited for notification, state:%{public}d connId:%{public}d", state, id); if (state != SOFTBUS_OK) { ZLOGE("open callback result error"); @@ -160,4 +145,14 @@ void SoftBusClient::UpdateMtuSize() } mtu_ = mtu; } + +void SoftBusClient::AfterStrategyUpdate(Strategy strategy) +{ + std::lock_guard lock(mutex_); + if (strategy != strategy_ && connId_ > 0) { + ZLOGI("close connId:%{public}d,strategy current:%{public}d, new:%{public}d", connId_, strategy_, strategy); + CloseSession(connId_); + RestoreDefaultValue(); + } +} } \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h index 13ef49a52..a3c228e9e 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -19,45 +19,45 @@ #include #include -#include "block_data.h" #include "communication_strategy.h" #include "session.h" #include "softbus_bus_center.h" namespace OHOS::AppDistributedKv { class SoftBusClient { public: - SoftBusClient(const PipeInfo &pipeInfo, const DeviceId &deviceId); + SoftBusClient(const PipeInfo &pipeInfo, const DeviceId &deviceId, + const std::function &getConnStatus); ~SoftBusClient(); + using Strategy = CommunicationStrategy::Strategy; Status Send(const uint8_t *data, int size); bool operator==(int32_t connId); - bool operator==(const DeviceId &deviceId); - void OnConnected(int32_t status); + bool operator==(const std::string &deviceId); uint32_t GetMtuSize() const; + void AfterStrategyUpdate(Strategy strategy); private: enum class ConnectStatus : int32_t { CONNECT_OK, DISCONNECT, }; - using Strategy = CommunicationStrategy::Strategy; + Status OpenConnect(); - Status Open(Strategy strategy); - bool IsReconnect(Strategy strategy); + Status Open(); void InitSessionAttribute(Strategy strategy, SessionAttribute &attr); void RestoreDefaultValue(); void UpdateMtuSize(); static constexpr int32_t INVALID_CONNECT_ID = -1; static constexpr uint32_t WAIT_MAX_TIME = 10; - static constexpr int32_t DEFAULT_MTU_SIZE = 4096; + static constexpr uint32_t DEFAULT_MTU_SIZE = 4096u; int32_t connId_ = INVALID_CONNECT_ID; Strategy strategy_ = Strategy::DEFAULT; ConnectStatus status_ = ConnectStatus::DISCONNECT; std::mutex mutex_; - std::shared_ptr > block_; PipeInfo pipe_; DeviceId device_; uint32_t mtu_; + std::function getConnStatus_; }; } diff --git a/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h b/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h deleted file mode 100644 index 3b2d2c706..000000000 --- a/services/distributeddataservice/adapter/include/communicator/calc_sync_data_size.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H -#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H -#include "visibility.h" -namespace OHOS::DistributedData { -class API_EXPORT CalcSyncDataSize { -public: - virtual ~CalcSyncDataSize() = default; - - virtual uint32_t CalcDataSize(const std::string &deviceId) = 0; - -}; -} -#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_H diff --git a/services/distributeddataservice/adapter/include/communicator/communication_provider.h b/services/distributeddataservice/adapter/include/communicator/communication_provider.h index b831b1ff8..0418b0c32 100644 --- a/services/distributeddataservice/adapter/include/communicator/communication_provider.h +++ b/services/distributeddataservice/adapter/include/communicator/communication_provider.h @@ -64,7 +64,6 @@ public: virtual int32_t Broadcast(const PipeInfo &pipeInfo, uint16_t mask) = 0; virtual int32_t ListenBroadcastMsg(const PipeInfo &pipeInfo, std::function listener) = 0; - virtual uint32_t GetMtuSize(const DeviceId &deviceId) const = 0; }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/include/communicator/communication_strategy.h b/services/distributeddataservice/adapter/include/communicator/communication_strategy.h index 10de989a0..de747e153 100644 --- a/services/distributeddataservice/adapter/include/communicator/communication_strategy.h +++ b/services/distributeddataservice/adapter/include/communicator/communication_strategy.h @@ -18,9 +18,9 @@ #include #include -#include "calc_sync_data_size.h" +#include "concurrent_map.h" +#include "visibility.h" namespace OHOS::AppDistributedKv { -using namespace OHOS::DistributedData; class API_EXPORT CommunicationStrategy { public: enum class Strategy : int32_t { @@ -29,14 +29,27 @@ public: // If AP is available, the AP is preferred. When AP is not available, BR is used for a small amount of data // and P2P is used for a large amount of data; The strategy takes effect only at the device online stage; ON_LINE_SELECT_CHANNEL, + BUTT }; - virtual ~CommunicationStrategy() = default; - static std::shared_ptr GetInstance(); - virtual void RegObject(std::shared_ptr object) = 0; - virtual CommunicationStrategy::Strategy GetStrategy(const std::string &deviceId) = 0; + static CommunicationStrategy &GetInstance(); + using Strategy = CommunicationStrategy::Strategy; + void RegGetSyncDataSize(const std::string &type, const std::function &getDataSize); + CommunicationStrategy::Strategy GetStrategy(const std::string &deviceId); + void SetStrategy(const std::string &deviceId, Strategy strategy, + const std::function &action); private: - static std::mutex mutex_; - static std::shared_ptr instance_; + CommunicationStrategy() = default; + ~CommunicationStrategy() = default; + CommunicationStrategy(CommunicationStrategy const &) = delete; + void operator=(CommunicationStrategy const &) = delete; + CommunicationStrategy(CommunicationStrategy &&) = delete; + CommunicationStrategy &operator=(CommunicationStrategy &&) = delete; + + size_t CalcSyncDataSize(const std::string &deviceId); + + static constexpr uint32_t SWITCH_CONNECTION_THRESHOLD = 75 * 1024u; + ConcurrentMap> calcDataSizes_; + ConcurrentMap strategys_; }; } diff --git a/services/distributeddataservice/app/BUILD.gn b/services/distributeddataservice/app/BUILD.gn index ec981dfc2..e7ffbb25a 100644 --- a/services/distributeddataservice/app/BUILD.gn +++ b/services/distributeddataservice/app/BUILD.gn @@ -72,7 +72,6 @@ config("module_private_config") { "src", "src/security", "src/backup_rule/include", - "src/calc_sync_data", "//third_party/json/single_include", ] @@ -95,7 +94,6 @@ ohos_shared_library("distributeddataservice") { "src/session_manager/route_head_handler_impl.cpp", "src/session_manager/session_manager.cpp", "src/session_manager/upgrade_manager.cpp", - "src/calc_sync_data/calc_sync_data_size_impl.cpp", ] if (datamgr_service_power) { diff --git a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp deleted file mode 100644 index a1c43c5b1..000000000 --- a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define LOG_TAG "CalcSyncDataSizeImpl" -#include "calc_kvdb_sync_data_size.h" -#include "communication_provider.h" -#include "device_manager_adapter.h" -#include "eventcenter/event_center.h" -#include "kvstore_meta_manager.h" -#include "kvstore_utils.h" -#include "log_print.h" -#include "metadata/meta_data_manager.h" -#include "metadata/store_meta_data_local.h" -#include "calc_sync_data_size_impl.h" - -namespace OHOS::DistributedData { -using namespace OHOS::DistributedKv; -using namespace OHOS::AppDistributedKv; -using Commu = AppDistributedKv::CommunicationProvider; -using KvStoreUtils = OHOS::DistributedKv::KvStoreUtils; -using DMAdapter = DistributedData::DeviceManagerAdapter; -CalcSyncDataSizeImpl::CalcSyncDataSizeImpl() -{ - Commu::GetInstance().StartWatchDeviceChange(this, { "calcSyncDataSize" }); -} - -uint32_t CalcSyncDataSizeImpl::CalcDataSize(const std::string &deviceId) -{ - auto it = dataSizes_.Find(deviceId); - if (!it.first) { - return 0; - } - return it.second; -} - -void CalcSyncDataSizeImpl::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, - const AppDistributedKv::DeviceChangeType &type) const -{ - switch (type) { - case AppDistributedKv::DeviceChangeType::DEVICE_ONLINE: - CalcSyncDataSize(info.uuid); - break; - case AppDistributedKv::DeviceChangeType::DEVICE_ONREADY: - dataSizes_.Erase(info.uuid); - break; - default: - break; - } -} - -void CalcSyncDataSizeImpl::CalcSyncDataSize(const std::string &deviceId) const -{ - uint32_t metaSize = CalcMetaDataSize(deviceId); - uint32_t dataSize = CalcKvDataSize(deviceId); - uint32_t totalSize = metaSize + dataSize; - dataSizes_.InsertOrAssign(deviceId, totalSize); - ZLOGI("deviceId: %{public}s, sync total size:%{public}u, meta:%{public}u data:%{public}u", - KvStoreUtils::ToBeAnonymous(deviceId).c_str(), totalSize, metaSize, dataSize); - return; -} - -uint32_t CalcSyncDataSizeImpl::CalcMetaDataSize(const std::string &deviceId) const -{ - uint32_t dataSize = 0; - auto store = KvStoreMetaManager::GetInstance().GetMetaKvStore(); - store->CalculateSyncDataSize(deviceId, dataSize); - return dataSize; -} - -uint32_t CalcSyncDataSizeImpl::CalcKvDataSize(const std::string &deviceId) const -{ - std::vector metaData; - auto prefix = StoreMetaData::GetPrefix({DMAdapter::GetInstance().GetLocalDevice().uuid}); - if (!MetaDataManager::GetInstance().LoadMeta(prefix, metaData)) { - ZLOGE("load meta failed!"); - return 0; - } - - uint32_t totalSize = 0; - for (const auto &data : metaData) { - StoreMetaDataLocal localMetaData; - MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), localMetaData, true); - if (!localMetaData.HasPolicy(PolicyType::IMMEDIATE_SYNC_ON_ONLINE)) { - continue; - } - - DistributedDB::DBStatus status; - uint32_t dataSize = CalcKvSyncDataSize::CalcSyncDataSize(data, status); - totalSize += dataSize; - } - return totalSize; -} -} \ No newline at end of file diff --git a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h b/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h deleted file mode 100644 index 605d2f756..000000000 --- a/services/distributeddataservice/app/src/calc_sync_data/calc_sync_data_size_impl.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H -#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H - -#include "app_device_change_listener.h" -#include "concurrent_map.h" -#include "calc_sync_data_size.h" -namespace OHOS::DistributedData { -class API_EXPORT CalcSyncDataSizeImpl : public CalcSyncDataSize, public AppDistributedKv::AppDeviceChangeListener { -public: - CalcSyncDataSizeImpl(); - virtual ~CalcSyncDataSizeImpl() = default; - void OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, - const AppDistributedKv::DeviceChangeType &type) const override; - uint32_t CalcDataSize(const std::string &deviceId) override; -private: - void CalcSyncDataSize(const std::string &deviceId) const; - uint32_t CalcMetaDataSize(const std::string &deviceId) const; - uint32_t CalcKvDataSize(const std::string &deviceId) const; - - mutable ConcurrentMap dataSizes_; -}; -} -#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_SYNC_DATA_SIZE_IMPL_H diff --git a/services/distributeddataservice/app/src/kvstore_data_service.cpp b/services/distributeddataservice/app/src/kvstore_data_service.cpp index 252b8e0d6..55b9d70e0 100644 --- a/services/distributeddataservice/app/src/kvstore_data_service.cpp +++ b/services/distributeddataservice/app/src/kvstore_data_service.cpp @@ -26,10 +26,8 @@ #include "auth_delegate.h" #include "auto_launch_export.h" #include "bootstrap.h" -#include "calc_sync_data_size_impl.h" #include "checker/checker_manager.h" #include "communication_provider.h" -#include "communication_strategy.h" #include "config_factory.h" #include "constant.h" #include "dds_trace.h" @@ -66,7 +64,6 @@ using namespace std::chrono; using namespace OHOS::DistributedData; using namespace OHOS::DistributedDataDfx; using namespace OHOS::Security::AccessToken; -using namespace OHOS::AppDistributedKv; using KvStoreDelegateManager = DistributedDB::KvStoreDelegateManager; using SecretKeyMeta = DistributedData::SecretKeyMetaData; using StrategyMetaData = DistributedData::StrategyMeta; @@ -113,7 +110,6 @@ void KvStoreDataService::Initialize() KvStoreMetaManager::GetInstance().InitMetaParameter(); accountEventObserver_ = std::make_shared(*this); AccountDelegate::GetInstance()->Subscribe(accountEventObserver_); - CommunicationStrategy::GetInstance()->RegObject(std::make_shared()); deviceInnerListener_ = std::make_unique(*this); DmAdapter::GetInstance().StartWatchDeviceChange(deviceInnerListener_.get(), { "innerListener" }); } diff --git a/services/distributeddataservice/app/src/kvstore_meta_manager.cpp b/services/distributeddataservice/app/src/kvstore_meta_manager.cpp index 619357adf..c3e09a11d 100644 --- a/services/distributeddataservice/app/src/kvstore_meta_manager.cpp +++ b/services/distributeddataservice/app/src/kvstore_meta_manager.cpp @@ -27,6 +27,7 @@ #include "account_delegate.h" #include "bootstrap.h" #include "communication_provider.h" +#include "communication_strategy.h" #include "constant.h" #include "crypto_manager.h" #include "device_manager_adapter.h" @@ -51,6 +52,7 @@ using DmAdapter = DistributedData::DeviceManagerAdapter; using namespace std::chrono; using namespace OHOS::DistributedData; using namespace DistributedDB; +using namespace OHOS::AppDistributedKv; // APPID: distributeddata // USERID: default @@ -64,6 +66,9 @@ KvStoreMetaManager::KvStoreMetaManager() delegateManager_(Bootstrap::GetInstance().GetProcessLabel(), "default") { ZLOGI("begin."); + CommunicationStrategy::GetInstance().RegGetSyncDataSize("meta_store", [this](const std::string &deviceId) { + return this->GetSyncDataSize(deviceId); + }); } KvStoreMetaManager::~KvStoreMetaManager() @@ -587,5 +592,15 @@ std::string KvStoreMetaManager::GetBackupPath() const return (DirectoryManager::GetInstance().GetMetaBackupPath() + "/" + Crypto::Sha256(label_ + "_" + Bootstrap::GetInstance().GetMetaDBName())); } + +size_t KvStoreMetaManager::GetSyncDataSize(const std::string &deviceId) +{ + auto metaDelegate = GetMetaKvStore(); + if (metaDelegate == nullptr) { + return 0; + } + + return metaDelegate->GetSyncDataSize(deviceId); +} } // namespace DistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/app/src/kvstore_meta_manager.h b/services/distributeddataservice/app/src/kvstore_meta_manager.h index 33de6aaf4..bded1ab24 100644 --- a/services/distributeddataservice/app/src/kvstore_meta_manager.h +++ b/services/distributeddataservice/app/src/kvstore_meta_manager.h @@ -175,9 +175,11 @@ public: bool GetKvStoreMetaDataByAppId(const std::string &appId, KvStoreMetaData &metaData); bool GetFullMetaData(std::map &entries, enum DatabaseType type = KVDB); + size_t GetSyncDataSize(const std::string &deviceId); +private: using NbDelegate = std::shared_ptr; NbDelegate GetMetaKvStore(); -private: + NbDelegate CreateMetaKvStore(); void ConfigMetaDataManager(); diff --git a/services/distributeddataservice/service/BUILD.gn b/services/distributeddataservice/service/BUILD.gn index 65ff84b01..c81d4dc02 100644 --- a/services/distributeddataservice/service/BUILD.gn +++ b/services/distributeddataservice/service/BUILD.gn @@ -74,7 +74,6 @@ ohos_shared_library("distributeddatasvc") { "data_share/uri_utils.cpp", "directory/src/directory_manager.cpp", "kvdb/auth_delegate.cpp", - "kvdb/calc_kvdb_sync_data_size.cpp", "kvdb/executor_factory.cpp", "kvdb/kvdb_exporter.cpp", "kvdb/kvdb_service_impl.cpp", diff --git a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp deleted file mode 100644 index c78804839..000000000 --- a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "calc_kvdb_sync_data_size.h" -#include "directory_manager.h" -#include "store_cache.h" -namespace OHOS::DistributedKv { -using DBManager = DistributedDB::KvStoreDelegateManager; -using DBStore = DistributedDB::KvStoreNbDelegate; -uint32_t CalcKvSyncDataSize::CalcSyncDataSize(const StoreMetaData &data, DBStatus &status) -{ - DBStore *dbStore = nullptr; - DBManager manager(data.appId, data.user, data.instanceId); - manager.SetKvStoreConfig({DirectoryManager::GetInstance().GetStorePath(data)}); - StoreCache cache; - manager.GetKvStore(data.storeId, cache.GetDBOption(data, cache.GetDBPassword(data)), - [&status, &dbStore](auto dbStatus, auto *tmpStore) { - status = dbStatus; - dbStore = tmpStore; - }); - - if (status != DistributedDB::DBStatus::OK || dbStore == nullptr) { - return 0; - } - - uint32_t dataSize = 0; - dbStore->CalculateSyncDataSize(data.deviceId, dataSize); - manager.CloseKvStore(dbStore); - return dataSize; -} -} \ No newline at end of file diff --git a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h b/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h deleted file mode 100644 index 065bdea2c..000000000 --- a/services/distributeddataservice/service/kvdb/calc_kvdb_sync_data_size.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2022 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H -#define DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H - -#include "metadata/store_meta_data.h" -#include "store_types.h" -#include "visibility.h" -namespace OHOS::DistributedKv { -using namespace DistributedDB; -using namespace OHOS::DistributedData; -class API_EXPORT CalcKvSyncDataSize final { -public: - static uint32_t CalcSyncDataSize(const StoreMetaData &data, DBStatus &status); -}; -} -#endif // DISTRIBUTEDDATAMGR_DATAMGR_SERVICE_CALC_KVDB_SYNC_DATA_SIZE_H diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp index 23bb5388a..f352ebd04 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp @@ -23,6 +23,7 @@ #include "backup_manager.h" #include "checker/checker_manager.h" #include "communication_provider.h" +#include "communication_strategy.h" #include "crypto_manager.h" #include "device_manager_adapter.h" #include "directory_manager.h" @@ -61,6 +62,10 @@ KVDBServiceImpl::Factory::~Factory() KVDBServiceImpl::KVDBServiceImpl() { + CommunicationStrategy::GetInstance().RegGetSyncDataSize("kv_store", [this](const std::string &deviceId) { + return GetSyncDataSize(deviceId); + }); + EventCenter::GetInstance().Subscribe(DeviceMatrix::MATRIX_META_FINISHED, [this](const Event &event) { auto &matrixEvent = static_cast(event); auto deviceId = matrixEvent.GetDeviceId(); @@ -724,4 +729,29 @@ void KVDBServiceImpl::SyncAgent::ReInit(pid_t pid, const AppId &appId) delayTimes_.clear(); observers_.clear(); } + +size_t KVDBServiceImpl::GetSyncDataSize(const std::string &deviceId) +{ + std::vector metaData; + auto prefix = StoreMetaData::GetPrefix({DMAdapter::GetInstance().GetLocalDevice().uuid}); + if (!MetaDataManager::GetInstance().LoadMeta(prefix, metaData)) { + ZLOGE("load meta failed!"); + return 0; + } + + size_t totalSize = 0; + for (const auto &data : metaData) { + DistributedDB::DBStatus status; + auto observers = GetObservers(data.tokenId, data.storeId); + auto store = storeCache_.GetStore(data, observers, status); + if (store == nullptr) { + ZLOGE("failed! status:%{public}d appId:%{public}s storeId:%{public}s dir:%{public}s", status, + data.bundleName.c_str(), data.storeId.c_str(), data.dataDir.c_str()); + continue; + } + totalSize += store->GetSyncDataSize(deviceId); + } + + return totalSize; +} } // namespace OHOS::DistributedKv \ No newline at end of file diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.h b/services/distributeddataservice/service/kvdb/kvdb_service_impl.h index 8dcee7ca8..c2aecdd74 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.h +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.h @@ -58,7 +58,7 @@ public: int32_t ResolveAutoLaunch(const std::string &identifier, DBLaunchParam ¶m) override; int32_t OnUserChange(uint32_t code, const std::string &user, const std::string &account) override; int32_t OnReady(const std::string &device) override; - + size_t GetSyncDataSize(const std::string &deviceId); private: using StoreMetaData = OHOS::DistributedData::StoreMetaData; using StrategyMeta = OHOS::DistributedData::StrategyMeta; -- Gitee