diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index 3d1638982d936057e9a4e074e7724f116e3a0787..a87ff3eb14265c4d462641eec91b5eecd7add9e1 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -91,6 +91,7 @@ private: std::string DelConnect(int32_t socket); void StartCloseSessionTask(const std::string &deviceId); Task GetCloseSessionTask(); + bool CloseSession(const std::string &networkId); static constexpr const char *PKG_NAME = "distributeddata-default"; static constexpr Time INVALID_NEXT = std::chrono::steady_clock::time_point::max(); static constexpr uint32_t QOS_COUNT = 3; diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index e5237c4a531be5a360fffcb76ed08b05ef459c5f..9e1f1c6ec158d0471aa191cf33f62b62e6ab5523 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -17,6 +17,7 @@ #include #include "communicator_context.h" +#include "communication/connect_manager.h" #include "data_level.h" #include "device_manager_adapter.h" #include "dfx_types.h" @@ -108,6 +109,15 @@ SoftBusAdapter::SoftBusAdapter() Context::GetInstance().SetSessionListener([this](const std::string &deviceId) { StartCloseSessionTask(deviceId); }); + + ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) { + return CloseSession(networkId); + }); + ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) { + auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId); + Context::GetInstance().NotifySessionClose(uuid); + }); + ConnectManager::GetInstance()->OnStart(); } SoftBusAdapter::~SoftBusAdapter() @@ -117,6 +127,7 @@ SoftBusAdapter::~SoftBusAdapter() UnregDataLevelChangeCb(PKG_NAME); } connects_.Clear(); + ConnectManager::GetInstance()->OnDestory(); } std::shared_ptr SoftBusAdapter::GetInstance() @@ -178,7 +189,20 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device if (conn == nullptr) { return Status::ERROR; } - auto status = conn->SendData(dataInfo, &clientListener_); + auto status = conn->CheckStatus(); + if (status == Status::RATE_LIMIT) { + return Status::RATE_LIMIT; + } + if (status != Status::SUCCESS) { + auto task = [this, conn]() { + conn->OpenConnect(&clientListener_); + }; + auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId; + ConnectManager::GetInstance()->ApplyConnect(networkId, task); + return Status::RATE_LIMIT; + } + + status = conn->SendData(dataInfo, &clientListener_); if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) { Time now = std::chrono::steady_clock::now(); auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now; @@ -243,7 +267,7 @@ SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask() }); connects_.EraseIf([](const auto &key, const auto &conn) -> bool { if (conn.empty()) { - Context::GetInstance().NotifySessionClose(key); + ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId); } return conn.empty(); }); @@ -518,5 +542,21 @@ void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info, { return; } + +bool SoftBusAdapter::CloseSession(const std::string &networkId) +{ + bool hasSession = false; + auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId); + connects_.Compute(uuid, [&hasSession](const auto &key, auto &connects) { + if (!connects.empty()) { + hasSession = true; + } + return false; + }); + if (hasSession) { + ConnectManager::GetInstance()->OnSessionClose(networkId); + } + return hasSession; +} } // namespace AppDistributedKv } // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp index f46d4e0d9d89b28ee7ffb3da675bad367320e745..304b8cd204974fd2da2bac1368c6ea046a50660c 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.cpp @@ -17,6 +17,7 @@ #include "softbus_client.h" #include "communicator_context.h" +#include "communication/connect_manager.h" #include "device_manager_adapter.h" #include "inner_socket.h" #include "kvstore_utils.h" @@ -64,7 +65,7 @@ uint32_t SoftBusClient::GetTimeout() const Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener *listener) { std::lock_guard lock(mutex_); - auto result = OpenConnect(listener); + auto result = CheckStatus(); if (result != Status::SUCCESS) { return result; } @@ -81,15 +82,14 @@ Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener * Status SoftBusClient::OpenConnect(const ISocketListener *listener) { - if (bindState_ == 0) { - return Status ::SUCCESS; + std::lock_guard lock(mutex_); + auto status = CheckStatus(); + if (status == Status::SUCCESS || status == Status::RATE_LIMIT) { + return status; } if (isOpening_.exchange(true)) { return Status::RATE_LIMIT; } - if (bindState_ == 0) { - return Status ::SUCCESS; - } SocketInfo socketInfo; std::string peerName = pipe_.pipeId; socketInfo.peerName = const_cast(peerName.c_str()); @@ -122,6 +122,20 @@ Status SoftBusClient::OpenConnect(const ISocketListener *listener) return Status::RATE_LIMIT; } +Status SoftBusClient::CheckStatus() +{ + if (bindState_ == 0) { + return Status::SUCCESS; + } + if (isOpening_.load()) { + return Status::RATE_LIMIT; + } + if (bindState_ == 0) { + return Status::SUCCESS; + } + return Status::ERROR; +} + Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListener *listener) { int32_t status = ::Bind(socket, qos, QOS_COUNT, listener); @@ -149,6 +163,7 @@ Status SoftBusClient::Open(int32_t socket, const QosTV qos[], const ISocketListe } ZLOGI("open %{public}s, session:%{public}s success, socket:%{public}d", KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket_); + ConnectManager::GetInstance()->OnSessionOpen(DmAdapter::GetInstance().GetDeviceInfo(device_.deviceId).networkId); return Status::SUCCESS; } diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_client.h b/services/distributeddataservice/adapter/communicator/src/softbus_client.h index b20cf012d05ac8a873192fcb7a7d6fba6bcc8966..8f2b5a0dc6d899ed655006f12ced0a159912e5c4 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_client.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_client.h @@ -37,6 +37,8 @@ public: using Time = std::chrono::steady_clock::time_point; using Duration = std::chrono::steady_clock::duration; + Status CheckStatus(); + Status OpenConnect(const ISocketListener *listener); Status SendData(const DataInfo &dataInfo, const ISocketListener *listener); bool operator==(int32_t socket) const; bool operator==(const std::string &deviceId) const; @@ -48,7 +50,6 @@ public: void UpdateExpireTime(); private: - Status OpenConnect(const ISocketListener *listener); Status Open(int32_t socket, const QosTV qos[], const ISocketListener *listener); std::pair GetMtu(int32_t socket); Time CalcExpireTime() const; diff --git a/services/distributeddataservice/adapter/communicator/test/fuzztest/softbusadapter_fuzzer/BUILD.gn b/services/distributeddataservice/adapter/communicator/test/fuzztest/softbusadapter_fuzzer/BUILD.gn index a7c54aff81950fd9b817d994d126740b98060939..3ee6efe2e6612b7a1f2bf07d736ce21f5c7efb9b 100644 --- a/services/distributeddataservice/adapter/communicator/test/fuzztest/softbusadapter_fuzzer/BUILD.gn +++ b/services/distributeddataservice/adapter/communicator/test/fuzztest/softbusadapter_fuzzer/BUILD.gn @@ -26,6 +26,7 @@ ohos_fuzztest("SoftBusAdapterFuzzTest") { "${data_service_path}/adapter/include/autils", "${data_service_path}/adapter/include/utils", "${data_service_path}/adapter/communicator/src", + "${data_service_path}/framework/include", "${dsoftbus_core_path}", "${kv_store_common_path}", "${kv_store_distributeddb_path}/interfaces/include", diff --git a/services/distributeddataservice/framework/BUILD.gn b/services/distributeddataservice/framework/BUILD.gn index f069d43e881f8669394ea2865e563a37d7ddd43f..99934dbfd2bce6104239f58f8f50ef51f6f61a41 100644 --- a/services/distributeddataservice/framework/BUILD.gn +++ b/services/distributeddataservice/framework/BUILD.gn @@ -65,6 +65,7 @@ ohos_shared_library("distributeddatasvcfwk") { "cloud/subscription.cpp", "cloud/sync_event.cpp", "cloud/sync_strategy.cpp", + "communication/connect_manager.cpp", "directory/directory_manager.cpp", "dump/dump_manager.cpp", "eventcenter/event.cpp", diff --git a/services/distributeddataservice/framework/communication/connect_manager.cpp b/services/distributeddataservice/framework/communication/connect_manager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..74f8f70a88d4e0120b66cfc9e4eada414b975c55 --- /dev/null +++ b/services/distributeddataservice/framework/communication/connect_manager.cpp @@ -0,0 +1,137 @@ +/* +* Copyright (c) 2024 Huawei Device Co., Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#define LOG_TAG "ConnectManager" +#include "communication/connect_manager.h" + +#include "log_print.h" + +namespace OHOS::AppDistributedKv { +std::mutex ConnectManager::mtx_; +std::shared_ptr ConnectManager::instance_ = nullptr; +ConnectManager::CloseSessionTask ConnectManager::closeSessionTask_ = nullptr; +ConcurrentMap ConnectManager::sessionCloseListener_; +ConcurrentMap ConnectManager::sessionOpenListener_; + +std::shared_ptr ConnectManager::GetInstance() +{ + static std::once_flag onceFlag; + std::call_once(onceFlag, [&]() { + std::lock_guard lock(mtx_); + if (instance_ == nullptr) { + instance_ = std::make_shared(); + } + }); + return instance_; +} + +bool ConnectManager::RegisterInstance(std::shared_ptr instance) +{ + std::lock_guard lock(mtx_); + if (instance_ != nullptr) { + ZLOGW("ConnectManager instance has been replaced!"); + } + instance_ = instance; + return true; +} + +bool ConnectManager::CloseSession(const std::string &networkId) +{ + if (closeSessionTask_ != nullptr) { + return closeSessionTask_(networkId); + } + return false; +} + +bool ConnectManager::RegisterCloseSessionTask(CloseSessionTask task) +{ + if (closeSessionTask_ != nullptr) { + ZLOGE("Register close session task error, task already exists."); + return false; + } + closeSessionTask_ = std::move(task); + return true; +} + +bool ConnectManager::RegisterSessionCloseListener(const std::string &name, SessionCloseListener listener) +{ + bool success = false; + sessionCloseListener_.Compute(name, [&success, &listener](const auto &key, auto &value) { + if (value != nullptr) { + ZLOGE("Register session close listener error, type:%{public}s already exists.", key.c_str()); + return true; + } + value = std::move(listener); + success = true; + return true; + }); + return success; +} + +void ConnectManager::UnRegisterSessionCloseListener(const std::string &name) +{ + sessionCloseListener_.Erase(name); +} + +void ConnectManager::OnSessionClose(const std::string &networkId) +{ + sessionCloseListener_.ForEach([&networkId](const auto &key, auto &listener) { + listener(networkId); + return false; + }); +} + +bool ConnectManager::RegisterSessionOpenListener(const std::string &name, SessionOpenListener listener) +{ + bool success = false; + sessionOpenListener_.Compute(name, [&success, &listener](const auto &key, auto &value) { + if (value != nullptr) { + ZLOGE("Register session open listener error, type:%{public}s already exists.", key.c_str()); + return true; + } + value = std::move(listener); + success = true; + return true; + }); + return success; +} + +void ConnectManager::UnRegisterSessionOpenListener(const std::string &name) +{ + sessionOpenListener_.Erase(name); +} + +void ConnectManager::OnSessionOpen(const std::string &networkId) +{ + sessionOpenListener_.ForEach([&networkId](const auto &key, auto &listener) { + listener(networkId); + return false; + }); +} + +void ConnectManager::OnStart() +{ +} + +void ConnectManager::OnDestory() +{ +} + +int32_t ConnectManager::ApplyConnect(__attribute__((unused)) const std::string &networkId, ConnectTask task) +{ + task(); + return 0; +} +} // OHOS::AppDistributedKv \ No newline at end of file diff --git a/services/distributeddataservice/framework/include/communication/connect_manager.h b/services/distributeddataservice/framework/include/communication/connect_manager.h new file mode 100644 index 0000000000000000000000000000000000000000..5e9093c75964e43dbd631edb5bbe67062129005e --- /dev/null +++ b/services/distributeddataservice/framework/include/communication/connect_manager.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_COMMUNICATION_CONNECT_MANAGER_H +#define OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_COMMUNICATION_CONNECT_MANAGER_H + +#include +#include +#include +#include + +#include "concurrent_map.h" +#include "visibility.h" +namespace OHOS { +namespace AppDistributedKv { +class API_EXPORT ConnectManager { +public: + using ConnectTask = std::function; + using CloseSessionTask = std::function; + using SessionCloseListener = std::function; + using SessionOpenListener = std::function; + + API_EXPORT static std::shared_ptr GetInstance(); + API_EXPORT static bool RegisterInstance(std::shared_ptr instance); + + API_EXPORT static bool CloseSession(const std::string &networkId); + API_EXPORT static bool RegisterCloseSessionTask(CloseSessionTask task); + + API_EXPORT static bool RegisterSessionCloseListener(const std::string &name, SessionCloseListener listener); + API_EXPORT static void UnRegisterSessionCloseListener(const std::string &name); + API_EXPORT static void OnSessionClose(const std::string &networkId); + + API_EXPORT static bool RegisterSessionOpenListener(const std::string &name, SessionOpenListener listener); + API_EXPORT static void UnRegisterSessionOpenListener(const std::string &name); + API_EXPORT static void OnSessionOpen(const std::string &networkId); + + ConnectManager() = default; + virtual ~ConnectManager() = default; + + virtual void OnStart(); + virtual void OnDestory(); + virtual int32_t ApplyConnect(const std::string &networkId, ConnectTask task); + +private: + static std::mutex mtx_; + static std::shared_ptr instance_; + static CloseSessionTask closeSessionTask_; + static ConcurrentMap sessionCloseListener_; + static ConcurrentMap sessionOpenListener_; +}; +} // namespace AppDistributedKv +} // namespace OHOS +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_COMMUNICATION_CONNECT_MANAGER_H \ No newline at end of file