From 70f0dc2189951076d716703bd0badceb5770354c Mon Sep 17 00:00:00 2001 From: wangtong Date: Thu, 28 Mar 2024 09:58:47 +0800 Subject: [PATCH] feat:add query last cloudsyncinfo interface Signed-off-by: wangtong --- .../framework/include/store/store_info.h | 1 + .../service/cloud/cloud_service_impl.cpp | 41 ++++ .../service/cloud/cloud_service_impl.h | 4 +- .../service/cloud/cloud_service_stub.cpp | 17 +- .../service/cloud/cloud_service_stub.h | 3 +- .../service/cloud/cloud_types_util.cpp | 12 ++ .../service/cloud/cloud_types_util.h | 6 + .../service/cloud/sync_manager.cpp | 177 ++++++++++++++++-- .../service/cloud/sync_manager.h | 21 ++- 9 files changed, 264 insertions(+), 18 deletions(-) diff --git a/services/distributeddataservice/framework/include/store/store_info.h b/services/distributeddataservice/framework/include/store/store_info.h index 8138955aa..8c5d02b2b 100644 --- a/services/distributeddataservice/framework/include/store/store_info.h +++ b/services/distributeddataservice/framework/include/store/store_info.h @@ -26,6 +26,7 @@ struct StoreInfo { int32_t instanceId = 0; int32_t user = 0; std::string deviceId; + uint64_t syncId = 0; }; } // namespace OHOS::DistributedData #endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_STORE_INFO_H diff --git a/services/distributeddataservice/service/cloud/cloud_service_impl.cpp b/services/distributeddataservice/service/cloud/cloud_service_impl.cpp index 1f5f16eee..8bf00c959 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_impl.cpp +++ b/services/distributeddataservice/service/cloud/cloud_service_impl.cpp @@ -458,6 +458,47 @@ int32_t CloudServiceImpl::SetGlobalCloudStrategy(Strategy strategy, const std::v return STRATEGY_SAVERS[strategy](values, hapInfo); } +std::pair CloudServiceImpl::QueryLastSyncInfo(const std::string &id, + const std::string &bundleName, const std::string &storeId) +{ + QueryLastResults results; + auto user = Account::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingTokenID()); + auto [status, cloudInfo] = GetCloudInfo(user); + if (status != SUCCESS) { + return { ERROR, results }; + } + if (cloudInfo.apps.find(bundleName) == cloudInfo.apps.end()) { + ZLOGE("Invalid bundleName: %{public}s", bundleName.c_str()); + return { INVALID_ARGUMENT, results }; + } + std::vector schemas; + auto key = cloudInfo.GetSchemaPrefix(bundleName); + if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) { + return { ERROR, results }; + } + + std::vector queryKeys; + for (const auto &schema : schemas) { + if (schema.bundleName != bundleName) { + continue; + } + for (const auto &database : schema.databases) { + if (storeId.empty() || database.alias == storeId) { + queryKeys.push_back({ id, bundleName, database.name }); + } + } + if (queryKeys.empty()) { + ZLOGE("Invalid storeId: %{public}s", Anonymous::Change(storeId).c_str()); + return { INVALID_ARGUMENT, results }; + } + } + + auto ret = syncManager_.QueryLastSyncInfo(queryKeys, results); + ZLOGI("QueryLastSyncInfo code:%{public}d, accountId:%{public}s, bundleName:%{public}s, storeId:%{public}s", ret, + Anonymous::Change(id).c_str(), bundleName.c_str(), Anonymous::Change(storeId).c_str()); + return { ret, results }; +} + int32_t CloudServiceImpl::OnInitialize() { DistributedDB::RuntimeConfig::SetCloudTranslate(std::make_shared()); diff --git a/services/distributeddataservice/service/cloud/cloud_service_impl.h b/services/distributeddataservice/service/cloud/cloud_service_impl.h index b6dff4b7b..66a36af2b 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_impl.h +++ b/services/distributeddataservice/service/cloud/cloud_service_impl.h @@ -43,6 +43,8 @@ public: int32_t NotifyDataChange(const std::string& eventId, const std::string& extraData, int32_t userId) override; std::pair> QueryStatistics(const std::string &id, const std::string &bundleName, const std::string &storeId) override; + std::pair QueryLastSyncInfo(const std::string &id, const std::string &bundleName, + const std::string &storeId) override; int32_t SetGlobalCloudStrategy(Strategy strategy, const std::vector& values) override; std::pair> AllocResourceAndShare(const std::string& storeId, @@ -170,4 +172,4 @@ private: }; } // namespace OHOS::DistributedData -#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_IMPL_H +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_IMPL_H \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_service_stub.cpp b/services/distributeddataservice/service/cloud/cloud_service_stub.cpp index 4aa3c79bd..ef1aa3902 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_stub.cpp +++ b/services/distributeddataservice/service/cloud/cloud_service_stub.cpp @@ -33,6 +33,7 @@ const CloudServiceStub::Handler CloudServiceStub::HANDLERS[TRANS_BUTT] = { &CloudServiceStub::OnNotifyDataChange, &CloudServiceStub::OnNotifyChange, &CloudServiceStub::OnQueryStatistics, + &CloudServiceStub::OnQueryLastSyncInfo, &CloudServiceStub::OnSetGlobalCloudStrategy, &CloudServiceStub::OnAllocResourceAndShare, &CloudServiceStub::OnShare, @@ -138,6 +139,20 @@ int32_t CloudServiceStub::OnNotifyDataChange(MessageParcel &data, MessageParcel return ITypesUtil::Marshal(reply, result) ? ERR_NONE : IPC_STUB_WRITE_PARCEL_ERR; } +int32_t CloudServiceStub::OnQueryLastSyncInfo(MessageParcel &data, MessageParcel &reply) +{ + std::string id; + std::string bundleName; + std::string storeId; + if (!ITypesUtil::Unmarshal(data, id, bundleName, storeId)) { + ZLOGE("Unmarshal id:%{public}s, bundleName:%{public}s, storeId:%{public}s", Anonymous::Change(id).c_str(), + bundleName.c_str(), Anonymous::Change(storeId).c_str()); + return IPC_STUB_INVALID_DATA_ERR; + } + auto [status, results] = QueryLastSyncInfo(id, bundleName, storeId); + return ITypesUtil::Marshal(reply, status, results) ? ERR_NONE : IPC_STUB_WRITE_PARCEL_ERR; +} + int32_t CloudServiceStub::OnSetGlobalCloudStrategy(MessageParcel &data, MessageParcel &reply) { Strategy strategy; @@ -302,4 +317,4 @@ int32_t CloudServiceStub::OnSetCloudStrategy(MessageParcel &data, MessageParcel auto status = SetCloudStrategy(strategy, values); return ITypesUtil::Marshal(reply, status) ? ERR_NONE : IPC_STUB_WRITE_PARCEL_ERR; } -} // namespace OHOS::CloudData +} // namespace OHOS::CloudData \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_service_stub.h b/services/distributeddataservice/service/cloud/cloud_service_stub.h index 4a8fae736..4d30fcca5 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_stub.h +++ b/services/distributeddataservice/service/cloud/cloud_service_stub.h @@ -33,6 +33,7 @@ private: int32_t OnNotifyDataChange(MessageParcel &data, MessageParcel &reply); int32_t OnNotifyChange(MessageParcel &data, MessageParcel &reply); int32_t OnQueryStatistics(MessageParcel &data, MessageParcel &reply); + int32_t OnQueryLastSyncInfo(MessageParcel &data, MessageParcel &reply); int32_t OnSetGlobalCloudStrategy(MessageParcel &data, MessageParcel &reply); int32_t OnAllocResourceAndShare(MessageParcel &data, MessageParcel &reply); @@ -49,4 +50,4 @@ private: static const Handler HANDLERS[TRANS_BUTT]; }; } // namespace OHOS::CloudData -#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_STUB_H +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_STUB_H \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_types_util.cpp b/services/distributeddataservice/service/cloud/cloud_types_util.cpp index f66d8f0b3..f765a5569 100644 --- a/services/distributeddataservice/service/cloud/cloud_types_util.cpp +++ b/services/distributeddataservice/service/cloud/cloud_types_util.cpp @@ -97,4 +97,16 @@ bool Unmarshalling(Strategy &output, MessageParcel &data) output = static_cast(result); return true; } + +template<> +bool Marshalling(const CloudSyncInfo &input, MessageParcel &data) +{ + return Marshal(data, input.startTime, input.finishTime, input.code); +} + +template<> +bool Unmarshalling(CloudSyncInfo &output, MessageParcel &data) +{ + return Unmarshal(data, output.startTime, output.finishTime, output.code); +} } // namespace OHOS::ITypesUtil \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_types_util.h b/services/distributeddataservice/service/cloud/cloud_types_util.h index 7931c8abb..4e151fa9e 100644 --- a/services/distributeddataservice/service/cloud/cloud_types_util.h +++ b/services/distributeddataservice/service/cloud/cloud_types_util.h @@ -31,6 +31,7 @@ using ValueObject = OHOS::NativeRdb::ValueObject; using ValuesBucket = OHOS::NativeRdb::ValuesBucket; using StatisticInfo = OHOS::CloudData::StatisticInfo; using Strategy = OHOS::CloudData::Strategy; +using CloudSyncInfo = OHOS::CloudData::CloudSyncInfo; template<> bool Marshalling(const Participant &input, MessageParcel &data); @@ -62,5 +63,10 @@ bool Unmarshalling(StatisticInfo &output, MessageParcel &data); template<> bool Unmarshalling(Strategy &output, MessageParcel &data); + +template<> +bool Marshalling(const CloudSyncInfo &input, MessageParcel &data); +template<> +bool Unmarshalling(CloudSyncInfo &output, MessageParcel &data); } // namespace OHOS::ITypesUtil #endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_TYPES_UTIL_H diff --git a/services/distributeddataservice/service/cloud/sync_manager.cpp b/services/distributeddataservice/service/cloud/sync_manager.cpp index cae601c0a..917574665 100644 --- a/services/distributeddataservice/service/cloud/sync_manager.cpp +++ b/services/distributeddataservice/service/cloud/sync_manager.cpp @@ -15,6 +15,8 @@ #define LOG_TAG "SyncManager" #include "sync_manager.h" +#include + #include "account/account_delegate.h" #include "cloud/cloud_server.h" #include "cloud/schema_meta.h" @@ -173,31 +175,31 @@ int32_t SyncManager::StopCloudSync(int32_t user) return E_OK; } -bool SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud) +std::pair SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud) { if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) { info.SetError(E_CLOUD_DISABLED); ZLOGE("cloudInfo invalid:%{public}d, ", cloud.IsValid(), Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str()); - return false; + return { false, E_CLOUD_DISABLED }; } if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) { info.SetError(E_CLOUD_DISABLED); ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str()); - return false; + return { false, E_CLOUD_DISABLED }; } if (!DmAdapter::GetInstance().IsNetworkAvailable()) { info.SetError(E_NETWORK_ERROR); ZLOGD("network unavailable"); - return false; + return { false, E_NETWORK_ERROR }; } if (!Account::GetInstance()->IsVerified(info.user_)) { info.SetError(E_USER_UNLOCK); ZLOGD("user unverified"); - return false; + return { false, E_ERROR }; } - return true; + return { true, E_OK }; } ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo) @@ -207,9 +209,6 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount activeInfos_.Erase(info.syncId_); CloudInfo cloud; cloud.user = info.user_; - if (!IsValid(info, cloud)) { - return; - } std::vector schemas; auto key = cloud.GetSchemaPrefix(info.bundleName_); auto retryer = GetRetryer(times, info); @@ -218,6 +217,14 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT); return; } + + UpdateStartSyncInfo(info, cloud); + auto [status, code] = IsValid(info, cloud); + if (!status) { + BatchUpdate(info, cloud, code); + return; + } + Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC); for (auto &schema : schemas) { if (!cloud.IsOn(schema.bundleName)) { @@ -228,11 +235,13 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount continue; } StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId, - cloud.user }; + cloud.user, "", info.syncId_ }; auto status = syncStrategy_->CheckSyncAction(storeInfo); if (status != SUCCESS) { ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status, storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str()); + QueryKey queryKey { cloud.id, schema.bundleName, database.name }; + UpdateFinishSyncInfo(queryKey, info.syncId_, E_BLOCKED_BY_NETWORK_STRATEGY); info.SetError(status); continue; } @@ -247,7 +256,7 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount std::function SyncManager::GetSyncHandler(Retryer retryer) { - return [retryer](const Event &event) { + return [this, retryer](const Event &event) { auto &evt = static_cast(event); auto &storeInfo = evt.GetStoreInfo(); StoreMetaData meta; @@ -270,15 +279,24 @@ std::function SyncManager::GetSyncHandler(Retryer retryer) ZLOGD("database:<%{public}d:%{public}s:%{public}s> sync start", storeInfo.user, storeInfo.bundleName.c_str(), meta.GetStoreAlias().c_str()); auto status = store->Sync({ SyncInfo::DEFAULT_ID }, evt.GetMode(), *(evt.GetQuery()), evt.AutoRetry() - ? [retryer](const GenDetails &details) { + ? [this, retryer, storeInfo](const GenDetails &details) { if (details.empty()) { ZLOGE("retry, details empty"); return; } int32_t code = details.begin()->second.code; + auto progress = details.begin()->second.progress; + if (progress == GenProgress::SYNC_FINISH) { + QueryKey queryKey { + .accountId = GetAccountId(storeInfo.user), + .bundleName = storeInfo.bundleName, + .storeId = storeInfo.storeName + }; + UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code); + } retryer(code == E_LOCKED_BY_OTHERS ? LOCKED_INTERVAL : RETRY_INTERVAL, code); } - : evt.GetAsyncDetail(), evt.GetWait()); + : GetCallback(evt.GetAsyncDetail(), storeInfo), evt.GetWait()); GenAsync async = evt.GetAsyncDetail(); if (status != E_OK && async) { GenDetails details; @@ -405,4 +423,137 @@ AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, } return store; } + +void SyncManager::GetCloudSyncInfo(std::vector> &cloudSyncInfos, SyncInfo &info, + CloudInfo &cloud) +{ + if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true)) { + ZLOGE("not exist cloud metadata, user: %{public}d.", cloud.user); + return; + } + std::vector schemas; + auto key = cloud.GetSchemaPrefix(info.bundleName_); + if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true)) { + ZLOGE("not exist schema metadata, user: %{public}d.", cloud.user); + return; + } + for (auto &schema : schemas) { + if (!cloud.IsOn(schema.bundleName)) { + continue; + } + for (const auto &database : schema.databases) { + if (!info.Contains(database.name)) { + continue; + } + QueryKey queryKey; + queryKey.accountId = cloud.id; + queryKey.bundleName = schema.bundleName; + queryKey.storeId = database.name; + cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_)); + } + } +} + +int32_t SyncManager::QueryLastSyncInfo(const std::vector &queryKeys, QueryLastResults &results) +{ + for (auto &queryKey : queryKeys) { + auto it = lastSyncInfos_.find(queryKey); + if (it == lastSyncInfos_.end()) { + return SUCCESS; + } + it->second.ForEach([&queryKey, &results](uint64_t syncId, CloudSyncInfo &info) { + results.insert(std::pair(queryKey.storeId, info)); + return SUCCESS; + }); + } + return SUCCESS; +} + +void SyncManager::UpdateStartSyncInfo(SyncInfo &syncInfo, CloudInfo &cloud) +{ + std::vector> cloudSyncInfos; + GetCloudSyncInfo(cloudSyncInfos, syncInfo, cloud); + if (cloudSyncInfos.empty()) { + return; + } + CloudSyncInfo info; + info.startTime = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + for (auto &[queryKey, syncId] : cloudSyncInfos) { + lastSyncInfos_[queryKey][syncId] = info; + } +} + +void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, SyncId syncId, int32_t code) +{ + auto it = lastSyncInfos_.find(queryKey); + if (it == lastSyncInfos_.end()) { + return; + } + auto [status, info] = it->second.Find(syncId); + if (!status) { + return; + } + info.finishTime = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + info.code = code; + lastSyncInfos_[queryKey][syncId] = info; +} + +void SyncManager::BatchUpdate(SyncInfo &info, CloudInfo &cloud, int32_t code) +{ + std::vector> cloudSyncInfos; + GetCloudSyncInfo(cloudSyncInfos, info, cloud); + if (cloudSyncInfos.empty()) { + return; + } + for (auto &[queryKey, syncId] : cloudSyncInfos) { + UpdateFinishSyncInfo(queryKey, syncId, code); + } +} + +std::function SyncManager::GetCallback(const GenAsync &async, + const StoreInfo &storeInfo) +{ + return [this, async, storeInfo](const GenDetails &result) { + if (async != nullptr) { + async(result); + } + + if (result.empty()) { + ZLOGE("result is empty"); + return; + } + + if (result.begin()->second.progress != GenProgress::SYNC_FINISH) { + return; + } + + auto id = GetAccountId(storeInfo.user); + if (id.empty()) { + return; + } + QueryKey queryKey{ + .accountId = id, + .bundleName = storeInfo.bundleName, + .storeId = storeInfo.storeName + }; + + int32_t code = result.begin()->second.code; + UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code); + }; +} + +std::string SyncManager::GetAccountId(int32_t user) +{ + CloudInfo cloudInfo; + cloudInfo.user = user; + if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) { + ZLOGE("not exist meta, user:%{public}d.", cloudInfo.user); + return ""; + } + return cloudInfo.id; +} } // namespace OHOS::CloudData \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/sync_manager.h b/services/distributeddataservice/service/cloud/sync_manager.h index ee1323f70..5a2a3173b 100644 --- a/services/distributeddataservice/service/cloud/sync_manager.h +++ b/services/distributeddataservice/service/cloud/sync_manager.h @@ -24,6 +24,8 @@ #include "concurrent_map.h" #include "cloud/cloud_info.h" #include "cloud/sync_strategy.h" +#include "cloud_types.h" +#include "cloud/cloud_event.h" namespace OHOS::CloudData { class SyncManager { public: @@ -33,6 +35,7 @@ public: using RefCount = DistributedData::RefCount; using AutoCache = DistributedData::AutoCache; using StoreMetaData = DistributedData::StoreMetaData; + using Store = std::string; static AutoCache::Store GetStore(const StoreMetaData &meta, int32_t user, bool mustBind = true); class SyncInfo final { public: @@ -69,6 +72,7 @@ public: int32_t Bind(std::shared_ptr executor); int32_t DoCloudSync(SyncInfo syncInfo); int32_t StopCloudSync(int32_t user = 0); + int32_t QueryLastSyncInfo(const std::vector &queryKeys, QueryLastResults &results); private: using Event = DistributedData::Event; @@ -79,6 +83,10 @@ private: using CloudInfo = DistributedData::CloudInfo; using StoreInfo = DistributedData::StoreInfo; using SyncStrategy = DistributedData::SyncStrategy; + using SyncId = uint64_t; + using SyncIdCloudInfos = ConcurrentMap; + using GeneralError = DistributedData::GeneralError; + using GenProgress = DistributedData::GenProgress; static constexpr ExecutorPool::Duration RETRY_INTERVAL = std::chrono::seconds(10); // second static constexpr ExecutorPool::Duration LOCKED_INTERVAL = std::chrono::seconds(30); // second @@ -95,13 +103,22 @@ private: static uint64_t GenerateId(int32_t user); RefCount GenSyncRef(uint64_t syncId); int32_t Compare(uint64_t syncId, int32_t user); - bool IsValid(SyncInfo &info, CloudInfo &cloud); + std::pair IsValid(SyncInfo &info, CloudInfo &cloud); + void GetCloudSyncInfo(std::vector> &cloudSyncInfos, SyncInfo &info, + CloudInfo &cloud); + void UpdateStartSyncInfo(SyncInfo &syncInfo, CloudInfo &cloud); + void UpdateFinishSyncInfo(const QueryKey &queryKey, SyncId syncId, int32_t code); + void BatchUpdate(SyncInfo &info, CloudInfo &cloud, int32_t code); + std::function GetCallback(const GenAsync &async, + const StoreInfo &storeInfo); + std::string GetAccountId(int32_t user); static std::atomic genId_; std::shared_ptr executor_; ConcurrentMap actives_; ConcurrentMap activeInfos_; std::shared_ptr syncStrategy_; + std::map lastSyncInfos_; }; } // namespace OHOS::CloudData -#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H \ No newline at end of file -- Gitee