diff --git a/services/core/BUILD.gn b/services/core/BUILD.gn index d18689224ac8a0a263fc5006348e2a5a1c039018..f06723f404ac7f765838521088bc3d2c4c9cb806 100755 --- a/services/core/BUILD.gn +++ b/services/core/BUILD.gn @@ -48,6 +48,7 @@ ohos_shared_library("distributed_device_profile") { "src/dbstorage/device_profile_storage_manager.cpp", "src/dbstorage/kvstore_death_recipient.cpp", "src/dbstorage/online_sync_table.cpp", + "src/dbstorage/sync_coordinator.cpp", "src/devicemanager/device_info.cpp", "src/devicemanager/device_manager.cpp", "src/distributed_device_profile_service.cpp", diff --git a/services/core/include/dbstorage/device_profile_storage.h b/services/core/include/dbstorage/device_profile_storage.h index 3c9da4593560b1ecc95e625c85b4b7d743bd1e85..24cd615e094072da5f15a7687422d61b0c4425a8 100755 --- a/services/core/include/dbstorage/device_profile_storage.h +++ b/services/core/include/dbstorage/device_profile_storage.h @@ -47,14 +47,14 @@ public: const std::vector& values); virtual int32_t SyncDeviceProfile(const std::vector& deviceIdList, DistributedKv::SyncMode syncMode); + virtual int32_t RegisterSyncCallback(const std::shared_ptr& sycnCb); + virtual int32_t UnRegisterSyncCallback(); void SetOptions(const DistributedKv::Options& options); StorageInitStatus GetInitStatus(); bool RegisterKvStoreInitCallback(const KvStoreInitCallback& callback); int32_t SubscribeKvStore(const std::shared_ptr& observer); int32_t UnSubscribeKvStore(const std::shared_ptr& observer); - int32_t RegisterSyncCallback(const std::shared_ptr& sycnCb); - int32_t UnRegisterSyncCallback(); private: bool TryGetKvStore(); diff --git a/services/core/include/dbstorage/device_profile_storage_manager.h b/services/core/include/dbstorage/device_profile_storage_manager.h index 352d32f54e93dac866fb0abe78a1094b7d0d1587..aee09107888b4612fce41e9ca43c6aa4f6d821ac 100755 --- a/services/core/include/dbstorage/device_profile_storage_manager.h +++ b/services/core/include/dbstorage/device_profile_storage_manager.h @@ -70,6 +70,7 @@ private: std::string GenerateKey(const std::string& udid, const std::string& key, KeyType keyType); void OnNodeOnline(const std::shared_ptr deviceInfo); + void PostOnlineSync(const std::string& deviceId, int32_t retryTimes); bool CheckSyncOption(const SyncOptions& syncOptions); int32_t NotifySyncStart(const sptr& profileEventNotifier); void SetServiceType(const std::string& udid, const std::string& serviceId, ServiceCharacteristicProfile& profile); @@ -78,7 +79,7 @@ private: std::mutex serviceLock_; std::mutex callbackLock_; nlohmann::json servicesJson_; - std::unique_ptr onlineSyncTbl_; + std::shared_ptr onlineSyncTbl_; std::shared_ptr storageHandler_; sptr kvStoreDeathRecipient_; std::string localUdid_; diff --git a/services/core/include/dbstorage/online_sync_table.h b/services/core/include/dbstorage/online_sync_table.h index b08c0659993376d1dd4a521ba5da2bcc2f3c7fc5..49ff99d2429857215d61c5abec1e577a0689d663 100755 --- a/services/core/include/dbstorage/online_sync_table.h +++ b/services/core/include/dbstorage/online_sync_table.h @@ -20,12 +20,25 @@ namespace OHOS { namespace DeviceProfile { -class OnlineSyncTable : public DeviceProfileStorage { +class OnlineSyncTable : public std::enable_shared_from_this, + public DeviceProfileStorage, + public DistributedKv::KvStoreSyncCallback { public: OnlineSyncTable(); ~OnlineSyncTable() = default; void Init() override; + int32_t RegisterSyncCallback(const std::shared_ptr& sycnCb) override; + int32_t UnRegisterSyncCallback() override; + int32_t SyncDeviceProfile(const std::vector& deviceIds, DistributedKv::SyncMode syncMode) override; + + void SyncCompleted(const std::map& results) override; + +private: + std::shared_ptr manualSyncCallback_; + std::vector onlineDeviceIds_; + std::atomic retrySyncTimes_ {0}; + std::mutex tableLock_; }; } // namespace DeviceProfile } // namespace OHOS diff --git a/services/core/include/dbstorage/sync_coordinator.h b/services/core/include/dbstorage/sync_coordinator.h new file mode 100755 index 0000000000000000000000000000000000000000..d82bc0d2cde6a53f332cbdfe5c84a942c4cde59c --- /dev/null +++ b/services/core/include/dbstorage/sync_coordinator.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_DISTRIBUTED_PROFILE_SYNC_COORDINATOR_H +#define OHOS_DISTRIBUTED_PROFILE_SYNC_COORDINATOR_H + +#include +#include + +#include "single_instance.h" + +#include "event_handler.h" + +namespace OHOS { +namespace DeviceProfile { +class SyncCoordinator { + DECLARE_SINGLE_INSTANCE(SyncCoordinator); +public: + using SyncTask = std::function; + + bool Init(); + bool AcquireSync(); + void ReleaseSync(); + void SetSyncTrigger(bool isOnlineTrigger); + bool IsOnlineSync(); + bool DispatchSyncTask(const SyncTask& syncTask); + +private: + std::atomic isOnlineTrigger_ {false}; + std::atomic isOnSync_ {false}; + std::shared_ptr syncHandler_; +}; +} // namespace DeviceProfile +} // namespace OHOS +#endif // OHOS_DISTRIBUTED_PROFILE_SYNC_COORDINATOR_H \ No newline at end of file diff --git a/services/core/src/dbstorage/device_profile_storage.cpp b/services/core/src/dbstorage/device_profile_storage.cpp index eb5908057f7f45d25db77d41e48e3e942c2b24c1..f49931f74815eb775b5a30afabec4c69093997dc 100755 --- a/services/core/src/dbstorage/device_profile_storage.cpp +++ b/services/core/src/dbstorage/device_profile_storage.cpp @@ -248,6 +248,7 @@ int32_t DeviceProfileStorage::DeleteDeviceProfile(const std::string& key) int32_t DeviceProfileStorage::SyncDeviceProfile(const std::vector& deviceIdList, DistributedKv::SyncMode syncMode) { + HILOGI("called"); std::unique_lock writeLock(storageLock_); if (kvStorePtr_ == nullptr) { return ERR_DP_INVALID_PARAMS; diff --git a/services/core/src/dbstorage/device_profile_storage_manager.cpp b/services/core/src/dbstorage/device_profile_storage_manager.cpp index 2f1344f26b779c5241327c83557943e945d10d41..6d89e9caeea78781e27d0cffc592d42da2cfe4bf 100755 --- a/services/core/src/dbstorage/device_profile_storage_manager.cpp +++ b/services/core/src/dbstorage/device_profile_storage_manager.cpp @@ -22,6 +22,7 @@ #include "device_profile_errors.h" #include "device_profile_log.h" #include "device_profile_utils.h" +#include "sync_coordinator.h" #include "ipc_object_proxy.h" #include "ipc_skeleton.h" @@ -41,6 +42,8 @@ const std::string TAG = "DeviceProfileStorageManager"; const std::string SERVICE_TYPE = "type"; const std::string SERVICES = "services"; constexpr int32_t RETRY_TIMES_WAIT_KV_DATA = 30; +constexpr int32_t INTREVAL_POST_ONLINE_SYNC_MS = 50; +constexpr int32_t RETRY_TIMES_POST_ONLINE_SYNC = 15; } IMPLEMENT_SINGLE_INSTANCE(DeviceProfileStorageManager); @@ -48,12 +51,16 @@ IMPLEMENT_SINGLE_INSTANCE(DeviceProfileStorageManager); bool DeviceProfileStorageManager::Init() { if (!inited_) { + if (!SyncCoordinator::GetInstance().Init()) { + HILOGE("SyncCoordinator init failed"); + return false; + } DeviceManager::GetInstance().GetLocalDeviceUdid(localUdid_); if (localUdid_.empty()) { HILOGE("get local udid failed"); return false; } - onlineSyncTbl_ = std::make_unique(); + onlineSyncTbl_ = std::make_shared(); if (onlineSyncTbl_ == nullptr) { return false; } @@ -290,6 +297,7 @@ int32_t DeviceProfileStorageManager::SyncDeviceProfile(const SyncOptions& syncOp if (devicesList.empty()) { DeviceManager::GetInstance().GetDeviceIdList(devicesList); } + SyncCoordinator::GetInstance().SetSyncTrigger(false); std::vector devicesVector(std::vector { devicesList.begin(), devicesList.end() }); int32_t result = onlineSyncTbl_->SyncDeviceProfile(devicesVector, syncOptions.GetSyncMode()); if (result != ERR_OK) { @@ -298,8 +306,8 @@ int32_t DeviceProfileStorageManager::SyncDeviceProfile(const SyncOptions& syncOp return; } }; - if (!storageHandler_->PostTask(syncTask)) { - HILOGE("post task failed"); + if (!SyncCoordinator::GetInstance().DispatchSyncTask(syncTask)) { + HILOGE("post sync task failed"); NotifySyncCompleted(); return ERR_DP_POST_TASK_FAILED; } @@ -308,13 +316,13 @@ int32_t DeviceProfileStorageManager::SyncDeviceProfile(const SyncOptions& syncOp int32_t DeviceProfileStorageManager::NotifySyncStart(const sptr& profileEventNotifier) { + if (!SyncCoordinator::GetInstance().AcquireSync()) { + HILOGW("sync busy"); + return ERR_DP_DEVICE_SYNC_BUSY; + } + { std::lock_guard autoLock(profileSyncLock_); - if (isSync_) { - HILOGW("sync busy"); - return ERR_DP_DEVICE_SYNC_BUSY; - } - isSync_ = true; syncEventNotifier_ = profileEventNotifier; } @@ -326,8 +334,8 @@ int32_t DeviceProfileStorageManager::NotifySyncStart(const sptr& if (SubscribeManager::GetInstance().SubscribeProfileEvents( subscribeInfos, profileEventNotifier, failedEvents) != ERR_OK) { HILOGE("subscribe sync event failed"); + SyncCoordinator::GetInstance().ReleaseSync(); std::lock_guard autoLock(profileSyncLock_); - isSync_ = false; syncEventNotifier_ = nullptr; return ERR_DP_SUBSCRIBE_FAILED; } @@ -337,17 +345,16 @@ int32_t DeviceProfileStorageManager::NotifySyncStart(const sptr& void DeviceProfileStorageManager::NotifySyncCompleted() { HILOGI("called"); + SyncCoordinator::GetInstance().ReleaseSync(); + std::lock_guard autoLock(profileSyncLock_); std::list profileEvents; profileEvents.emplace_back(ProfileEvent::EVENT_SYNC_COMPLETED); std::list failedEvents; - std::lock_guard autoLock(profileSyncLock_); int32_t ret = SubscribeManager::GetInstance().UnsubscribeProfileEvents( profileEvents, syncEventNotifier_, failedEvents); if (ret != ERR_OK) { HILOGW("unsubscribe sync event failed"); } - - isSync_ = false; syncEventNotifier_ = nullptr; } @@ -359,7 +366,7 @@ void DeviceProfileStorageManager::NotifySubscriberDied(const sptr return; } - isSync_ = false; + SyncCoordinator::GetInstance().ReleaseSync(); syncEventNotifier_ = nullptr; } @@ -488,11 +495,34 @@ int32_t DeviceProfileStorageManager::UnRegisterSyncCallback() void DeviceProfileStorageManager::OnNodeOnline(const std::shared_ptr deviceInfo) { - HILOGI("called"); - std::vector onlineDevice = { deviceInfo->GetDeviceId() }; - int32_t errCode = onlineSyncTbl_->SyncDeviceProfile(onlineDevice, SyncMode::PUSH); - if (errCode != ERR_OK) { - HILOGE("online sync errCode = %{public}d", errCode); + std::string deviceId = deviceInfo->GetDeviceId(); + HILOGI("online deviceId %{public}s", DeviceProfileUtils::AnonymizeDeviceId(deviceId).c_str()); + PostOnlineSync(deviceId, 0); +} + +void DeviceProfileStorageManager::PostOnlineSync(const std::string& deviceId, int32_t retryTimes) +{ + if (retryTimes >= RETRY_TIMES_POST_ONLINE_SYNC) { + HILOGE("reach max retry times"); + return; + } + + auto onlineSyncTaks = [this, deviceId = std::move(deviceId), retryTimes = retryTimes]() mutable { + if (!SyncCoordinator::GetInstance().AcquireSync()) { + PostOnlineSync(deviceId, retryTimes++); + return; + } + HILOGI("current retry times = %{public}d", retryTimes); + std::vector onlineDeviceId = { deviceId }; + SyncCoordinator::GetInstance().SetSyncTrigger(true); + int32_t errCode = onlineSyncTbl_->SyncDeviceProfile(onlineDeviceId, SyncMode::PUSH); + if (errCode != ERR_OK) { + HILOGE("online sync errCode = %{public}d", errCode); + } + }; + if (!storageHandler_->PostTask(onlineSyncTaks, INTREVAL_POST_ONLINE_SYNC_MS)) { + HILOGE("post task failed"); + return; } } } // namespace DeviceProfile diff --git a/services/core/src/dbstorage/online_sync_table.cpp b/services/core/src/dbstorage/online_sync_table.cpp index 5a7bf456a5cb6af36277185d9ec3ff0d2d4725dc..aaaf63b0a4f6da581876c2a7c9ed0d8cf1714a49 100755 --- a/services/core/src/dbstorage/online_sync_table.cpp +++ b/services/core/src/dbstorage/online_sync_table.cpp @@ -15,7 +15,11 @@ #include "online_sync_table.h" +#include "device_manager.h" +#include "device_profile_errors.h" #include "device_profile_log.h" +#include "device_profile_utils.h" +#include "sync_coordinator.h" namespace OHOS { namespace DeviceProfile { @@ -26,6 +30,7 @@ const std::string TAG = "OnlineSyncTable"; const std::string APP_ID = "distributed_device_profile_service"; const std::string STORE_ID = "online_sync_storage"; +constexpr int32_t MAX_RETRY_SYNC_TIMES = 10; } OnlineSyncTable::OnlineSyncTable() : DeviceProfileStorage(APP_ID, STORE_ID) @@ -43,6 +48,88 @@ void OnlineSyncTable::Init() }; SetOptions(options); DeviceProfileStorage::Init(); + int32_t errCode = DeviceProfileStorage::RegisterSyncCallback(shared_from_this()); + if (errCode != ERR_OK) { + HILOGE("register sync callback failed, errCode = %{public}d", errCode); + } +} + +int32_t OnlineSyncTable::RegisterSyncCallback(const std::shared_ptr& syncCb) +{ + if (syncCb == nullptr) { + return ERR_DP_INVALID_PARAMS; + } + std::lock_guard autoLock(tableLock_); + manualSyncCallback_ = syncCb; + return ERR_OK; +} + +int32_t OnlineSyncTable::UnRegisterSyncCallback() +{ + std::lock_guard autoLock(tableLock_); + manualSyncCallback_ = nullptr; + return ERR_OK; +} + +int32_t OnlineSyncTable::SyncDeviceProfile(const std::vector& deviceIds, + DistributedKv::SyncMode syncMode) +{ + HILOGI("called"); + auto syncTask = [this, deviceIds = std::move(deviceIds), syncMode]() { + HILOGI("start sync task"); + retrySyncTimes_ = 0; + int32_t errCode = DeviceProfileStorage::SyncDeviceProfile(deviceIds, syncMode); + if (errCode != ERR_OK) { + SyncCoordinator::GetInstance().ReleaseSync(); + HILOGE("sync errCode = %{public}d", errCode); + } + }; + if (!SyncCoordinator::GetInstance().DispatchSyncTask(syncTask)) { + HILOGE("post online sync task failed"); + return ERR_DP_POST_TASK_FAILED; + } + return ERR_OK; +} + +void OnlineSyncTable::SyncCompleted(const std::map& results) +{ + if (!SyncCoordinator::GetInstance().IsOnlineSync()) { + std::lock_guard autoLock(tableLock_); + if (manualSyncCallback_ != nullptr) { + HILOGI("manual sync callback"); + manualSyncCallback_->SyncCompleted(results); + } + return; + } + + HILOGI("online sync callback"); + std::vector failedDeviceIds; + for (const auto& [deviceId, result] : results) { + HILOGI("deviceId = %{public}s, result = %{public}d", + DeviceProfileUtils::AnonymizeDeviceId(deviceId).c_str(), result); + if (result != Status::SUCCESS) { + std::string networkId; + if (!DeviceManager::GetInstance().TransformDeviceId(deviceId, networkId, + DeviceIdType::NETWORKID)) { + HILOGE("transform to networkid failed"); + continue; + } + failedDeviceIds.emplace_back(std::move(networkId)); + } + } + + HILOGI("retry times = %{public}d", retrySyncTimes_.load()); + if ((retrySyncTimes_++ < MAX_RETRY_SYNC_TIMES) && !failedDeviceIds.empty()) { + auto retrySyncTask = [this, deviceIds = std::move(failedDeviceIds)]() { + HILOGI("retrying sync..."); + SyncDeviceProfile(deviceIds, SyncMode::PUSH); + }; + if (!SyncCoordinator::GetInstance().DispatchSyncTask(retrySyncTask)) { + HILOGE("post online sync retry task failed"); + return; + } + } + SyncCoordinator::GetInstance().ReleaseSync(); } } // namespace DeviceProfile -} // namespace OHOS +} // namespace OHOS \ No newline at end of file diff --git a/services/core/src/dbstorage/sync_coordinator.cpp b/services/core/src/dbstorage/sync_coordinator.cpp new file mode 100755 index 0000000000000000000000000000000000000000..a51dcd7494c5f1a8059c86433e70f41703c2bee4 --- /dev/null +++ b/services/core/src/dbstorage/sync_coordinator.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "sync_coordinator.h" + +#include + +#include "device_profile_log.h" + +namespace OHOS { +namespace DeviceProfile { +namespace { +const std::string TAG = "SyncCoordinator"; +} + +IMPLEMENT_SINGLE_INSTANCE(SyncCoordinator); + +bool SyncCoordinator::Init() +{ + auto runner = AppExecFwk::EventRunner::Create("syncHandler"); + syncHandler_ = std::make_shared(runner); + if (syncHandler_ == nullptr) { + return false; + } + return true; +} + +bool SyncCoordinator::AcquireSync() +{ + if (isOnSync_) { + HILOGI("acquire from %{publiic}s", isOnlineTrigger_ ? "online" : "manual"); + return false; + } + isOnSync_ = true; + return true; +} + +void SyncCoordinator::ReleaseSync() +{ + isOnSync_ = false; +} + +void SyncCoordinator::SetSyncTrigger(bool isOnlineTrigger) +{ + isOnlineTrigger_ = isOnlineTrigger; +} + +bool SyncCoordinator::IsOnlineSync() +{ + return isOnlineTrigger_; +} + +bool SyncCoordinator::DispatchSyncTask(const SyncTask& syncTask) +{ + if (!syncHandler_->PostTask(syncTask)) { + HILOGE("post task failed"); + isOnSync_ = false; + return false; + } + return true; +} +} // namespace DeviceProfile +} // namespace OHOS \ No newline at end of file