diff --git a/services/distributeddataservice/framework/include/store/store_info.h b/services/distributeddataservice/framework/include/store/store_info.h index 8138955aaabd97e38a5bc4c431a8499c5c63c398..8c5d02b2bd2775da2e3035c68d255aa3a0fce251 100644 --- a/services/distributeddataservice/framework/include/store/store_info.h +++ b/services/distributeddataservice/framework/include/store/store_info.h @@ -26,6 +26,7 @@ struct StoreInfo { int32_t instanceId = 0; int32_t user = 0; std::string deviceId; + uint64_t syncId = 0; }; } // namespace OHOS::DistributedData #endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_STORE_INFO_H diff --git a/services/distributeddataservice/service/cloud/cloud_service_impl.cpp b/services/distributeddataservice/service/cloud/cloud_service_impl.cpp index b1d75b09ed70a80514ffe629aba23fd4e774c941..41e33ed3aad53503d021c06f989e641a60134100 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_impl.cpp +++ b/services/distributeddataservice/service/cloud/cloud_service_impl.cpp @@ -458,6 +458,47 @@ int32_t CloudServiceImpl::SetGlobalCloudStrategy(Strategy strategy, const std::v return STRATEGY_SAVERS[strategy](values, hapInfo); } +std::pair CloudServiceImpl::QueryLastSyncInfo(const std::string &id, + const std::string &bundleName, const std::string &storeId) +{ + QueryLastResults results; + auto user = Account::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingTokenID()); + auto [status, cloudInfo] = GetCloudInfo(user); + if (status != SUCCESS) { + return { ERROR, results }; + } + if (cloudInfo.apps.find(bundleName) == cloudInfo.apps.end()) { + ZLOGE("Invalid bundleName: %{public}s", bundleName.c_str()); + return { INVALID_ARGUMENT, results }; + } + std::vector schemas; + auto key = cloudInfo.GetSchemaPrefix(bundleName); + if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) { + return { ERROR, results }; + } + + std::vector queryKeys; + for (const auto &schema : schemas) { + if (schema.bundleName != bundleName) { + continue; + } + for (const auto &database : schema.databases) { + if (storeId.empty() || database.alias == storeId) { + queryKeys.push_back({ id, bundleName, database.name }); + } + } + if (queryKeys.empty()) { + ZLOGE("Invalid storeId: %{public}s", Anonymous::Change(storeId).c_str()); + return { INVALID_ARGUMENT, results }; + } + } + + auto ret = syncManager_.QueryLastSyncInfo(queryKeys, results); + ZLOGI("code:%{public}d, accountId:%{public}s, bundleName:%{public}s, storeId:%{public}s", ret, + Anonymous::Change(id).c_str(), bundleName.c_str(), Anonymous::Change(storeId).c_str()); + return { ret, results }; +} + int32_t CloudServiceImpl::OnInitialize() { DistributedDB::RuntimeConfig::SetCloudTranslate(std::make_shared()); diff --git a/services/distributeddataservice/service/cloud/cloud_service_impl.h b/services/distributeddataservice/service/cloud/cloud_service_impl.h index b6dff4b7ba109ed77f182e39cbc6898338c983c6..66a36af2bcbbb5b064ba638ed1169f05ef839f78 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_impl.h +++ b/services/distributeddataservice/service/cloud/cloud_service_impl.h @@ -43,6 +43,8 @@ public: int32_t NotifyDataChange(const std::string& eventId, const std::string& extraData, int32_t userId) override; std::pair> QueryStatistics(const std::string &id, const std::string &bundleName, const std::string &storeId) override; + std::pair QueryLastSyncInfo(const std::string &id, const std::string &bundleName, + const std::string &storeId) override; int32_t SetGlobalCloudStrategy(Strategy strategy, const std::vector& values) override; std::pair> AllocResourceAndShare(const std::string& storeId, @@ -170,4 +172,4 @@ private: }; } // namespace OHOS::DistributedData -#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_IMPL_H +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_IMPL_H \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_service_stub.cpp b/services/distributeddataservice/service/cloud/cloud_service_stub.cpp index 4aa3c79bdb01d7c4e778a7dc04671d4e1dde7054..ef1aa39029588b36167ca65cfa897e8612568d6b 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_stub.cpp +++ b/services/distributeddataservice/service/cloud/cloud_service_stub.cpp @@ -33,6 +33,7 @@ const CloudServiceStub::Handler CloudServiceStub::HANDLERS[TRANS_BUTT] = { &CloudServiceStub::OnNotifyDataChange, &CloudServiceStub::OnNotifyChange, &CloudServiceStub::OnQueryStatistics, + &CloudServiceStub::OnQueryLastSyncInfo, &CloudServiceStub::OnSetGlobalCloudStrategy, &CloudServiceStub::OnAllocResourceAndShare, &CloudServiceStub::OnShare, @@ -138,6 +139,20 @@ int32_t CloudServiceStub::OnNotifyDataChange(MessageParcel &data, MessageParcel return ITypesUtil::Marshal(reply, result) ? ERR_NONE : IPC_STUB_WRITE_PARCEL_ERR; } +int32_t CloudServiceStub::OnQueryLastSyncInfo(MessageParcel &data, MessageParcel &reply) +{ + std::string id; + std::string bundleName; + std::string storeId; + if (!ITypesUtil::Unmarshal(data, id, bundleName, storeId)) { + ZLOGE("Unmarshal id:%{public}s, bundleName:%{public}s, storeId:%{public}s", Anonymous::Change(id).c_str(), + bundleName.c_str(), Anonymous::Change(storeId).c_str()); + return IPC_STUB_INVALID_DATA_ERR; + } + auto [status, results] = QueryLastSyncInfo(id, bundleName, storeId); + return ITypesUtil::Marshal(reply, status, results) ? ERR_NONE : IPC_STUB_WRITE_PARCEL_ERR; +} + int32_t CloudServiceStub::OnSetGlobalCloudStrategy(MessageParcel &data, MessageParcel &reply) { Strategy strategy; @@ -302,4 +317,4 @@ int32_t CloudServiceStub::OnSetCloudStrategy(MessageParcel &data, MessageParcel auto status = SetCloudStrategy(strategy, values); return ITypesUtil::Marshal(reply, status) ? ERR_NONE : IPC_STUB_WRITE_PARCEL_ERR; } -} // namespace OHOS::CloudData +} // namespace OHOS::CloudData \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_service_stub.h b/services/distributeddataservice/service/cloud/cloud_service_stub.h index 4a8fae7368a68e50feac44c64597d66aa25b4646..4d30fcca5d08202fbe1240f5dceb6baafd77a756 100644 --- a/services/distributeddataservice/service/cloud/cloud_service_stub.h +++ b/services/distributeddataservice/service/cloud/cloud_service_stub.h @@ -33,6 +33,7 @@ private: int32_t OnNotifyDataChange(MessageParcel &data, MessageParcel &reply); int32_t OnNotifyChange(MessageParcel &data, MessageParcel &reply); int32_t OnQueryStatistics(MessageParcel &data, MessageParcel &reply); + int32_t OnQueryLastSyncInfo(MessageParcel &data, MessageParcel &reply); int32_t OnSetGlobalCloudStrategy(MessageParcel &data, MessageParcel &reply); int32_t OnAllocResourceAndShare(MessageParcel &data, MessageParcel &reply); @@ -49,4 +50,4 @@ private: static const Handler HANDLERS[TRANS_BUTT]; }; } // namespace OHOS::CloudData -#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_STUB_H +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_SERVICE_STUB_H \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_types_util.cpp b/services/distributeddataservice/service/cloud/cloud_types_util.cpp index f66d8f0b38b73ff89f2811034d7087d72278f678..f765a556977a6168473767ef3bfd6f8ea0a89979 100644 --- a/services/distributeddataservice/service/cloud/cloud_types_util.cpp +++ b/services/distributeddataservice/service/cloud/cloud_types_util.cpp @@ -97,4 +97,16 @@ bool Unmarshalling(Strategy &output, MessageParcel &data) output = static_cast(result); return true; } + +template<> +bool Marshalling(const CloudSyncInfo &input, MessageParcel &data) +{ + return Marshal(data, input.startTime, input.finishTime, input.code); +} + +template<> +bool Unmarshalling(CloudSyncInfo &output, MessageParcel &data) +{ + return Unmarshal(data, output.startTime, output.finishTime, output.code); +} } // namespace OHOS::ITypesUtil \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/cloud_types_util.h b/services/distributeddataservice/service/cloud/cloud_types_util.h index 7931c8abb46d70d8ba69ec5958e6d981fe92c0be..4e151fa9eedc757dcd99554740f6f67f525e3fdc 100644 --- a/services/distributeddataservice/service/cloud/cloud_types_util.h +++ b/services/distributeddataservice/service/cloud/cloud_types_util.h @@ -31,6 +31,7 @@ using ValueObject = OHOS::NativeRdb::ValueObject; using ValuesBucket = OHOS::NativeRdb::ValuesBucket; using StatisticInfo = OHOS::CloudData::StatisticInfo; using Strategy = OHOS::CloudData::Strategy; +using CloudSyncInfo = OHOS::CloudData::CloudSyncInfo; template<> bool Marshalling(const Participant &input, MessageParcel &data); @@ -62,5 +63,10 @@ bool Unmarshalling(StatisticInfo &output, MessageParcel &data); template<> bool Unmarshalling(Strategy &output, MessageParcel &data); + +template<> +bool Marshalling(const CloudSyncInfo &input, MessageParcel &data); +template<> +bool Unmarshalling(CloudSyncInfo &output, MessageParcel &data); } // namespace OHOS::ITypesUtil #endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_CLOUD_TYPES_UTIL_H diff --git a/services/distributeddataservice/service/cloud/sync_manager.cpp b/services/distributeddataservice/service/cloud/sync_manager.cpp index 3e5ed16d3ec08929ade923f86f77bed9cb8b512a..bd3ee7232be516ae5c2dc2abc1c5ecb82777d8b2 100644 --- a/services/distributeddataservice/service/cloud/sync_manager.cpp +++ b/services/distributeddataservice/service/cloud/sync_manager.cpp @@ -15,6 +15,8 @@ #define LOG_TAG "SyncManager" #include "sync_manager.h" +#include + #include "account/account_delegate.h" #include "cloud/cloud_server.h" #include "cloud/schema_meta.h" @@ -179,52 +181,37 @@ int32_t SyncManager::StopCloudSync(int32_t user) return E_OK; } -bool SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud) +GeneralError SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud) { if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) { info.SetError(E_CLOUD_DISABLED); ZLOGE("cloudInfo invalid:%{public}d, ", cloud.IsValid(), Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str()); - return false; + return E_CLOUD_DISABLED; } if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) { info.SetError(E_CLOUD_DISABLED); ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str()); - return false; + return E_CLOUD_DISABLED; } if (!DmAdapter::GetInstance().IsNetworkAvailable()) { info.SetError(E_NETWORK_ERROR); ZLOGD("network unavailable"); - return false; + return E_NETWORK_ERROR; } if (!Account::GetInstance()->IsVerified(info.user_)) { info.SetError(E_USER_UNLOCK); ZLOGD("user unverified"); - return false; + return E_ERROR; } - return true; + return E_OK; } -ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo) +std::function SyncManager::GetPostEventTask(const std::vector &schemas, CloudInfo &cloud, + SyncInfo &info, bool retry) { - times++; - return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable { - activeInfos_.Erase(info.syncId_); - CloudInfo cloud; - cloud.user = info.user_; - if (!IsValid(info, cloud)) { - return; - } - std::vector schemas; - auto key = cloud.GetSchemaPrefix(info.bundleName_); - auto retryer = GetRetryer(times, info); - if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) { - UpdateSchema(info); - retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT); - return; - } - Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC); + return [this, &cloud, &info, &schemas, retry]() { for (auto &schema : schemas) { if (!cloud.IsOn(schema.bundleName)) { continue; @@ -234,27 +221,66 @@ ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount continue; } StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId, - cloud.user }; + cloud.user, "", info.syncId_ }; auto status = syncStrategy_->CheckSyncAction(storeInfo); if (status != SUCCESS) { ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status, storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str()); + QueryKey queryKey{ cloud.id, schema.bundleName, database.name }; + UpdateFinishSyncInfo(queryKey, info.syncId_, E_BLOCKED_BY_NETWORK_STRATEGY); info.SetError(status); continue; } auto query = info.GenerateQuery(database.name, database.GetTableNames()); SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_ }; auto evt = std::make_unique(std::move(storeInfo), - SyncEvent::EventInfo { syncParam, retry, std::move(query), info.async_ }); + SyncEvent::EventInfo{ syncParam, retry, std::move(query), info.async_ }); EventCenter::GetInstance().PostEvent(std::move(evt)); } } }; } +ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo) +{ + times++; + return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable { + activeInfos_.Erase(info.syncId_); + CloudInfo cloud; + cloud.user = info.user_; + auto retryer = GetRetryer(times, info); + auto schemas = GetSchemaMeta(cloud, info.bundleName_); + if (schemas.empty()) { + UpdateSchema(info); + retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT); + return; + } + + std::vector> cloudSyncInfos; + GetCloudSyncInfo(info, cloud, cloudSyncInfos); + if (cloudSyncInfos.empty()) { + return; + } + UpdateStartSyncInfo(info, cloud, cloudSyncInfos); + auto code = IsValid(info, cloud); + if (code != E_OK) { + for (const auto &[queryKey, syncId] : cloudSyncInfos) { + UpdateFinishSyncInfo(queryKey, syncId, code); + } + return; + } + + Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC); + auto task = GetPostEventTask(schemas, cloud, info, retry); + if (task != nullptr) { + task(); + } + }; +} + std::function SyncManager::GetSyncHandler(Retryer retryer) { - return [retryer](const Event &event) { + return [this, retryer](const Event &event) { auto &evt = static_cast(event); auto &storeInfo = evt.GetStoreInfo(); GenAsync async = evt.GetAsyncDetail(); @@ -266,19 +292,13 @@ std::function SyncManager::GetSyncHandler(Retryer retryer) if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) { ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(), meta.GetStoreAlias().c_str()); - if (async) { - detail.code = E_ERROR; - async(std::move(details)); - } + DoExceptionalCallback(async, details, storeInfo); return; } auto store = GetStore(meta, storeInfo.user); if (store == nullptr) { ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str()); - if (async) { - detail.code = E_ERROR; - async(std::move(details)); - } + DoExceptionalCallback(async, details, storeInfo); return; } @@ -286,15 +306,19 @@ std::function SyncManager::GetSyncHandler(Retryer retryer) meta.GetStoreAlias().c_str()); SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation() }; auto status = store->Sync({ SyncInfo::DEFAULT_ID }, *(evt.GetQuery()), evt.AutoRetry() - ? [retryer](const GenDetails &details) { + ? [this, retryer, storeInfo](const GenDetails &details) { if (details.empty()) { ZLOGE("retry, details empty"); return; } int32_t code = details.begin()->second.code; + if (details.begin()->second.progress == GenProgress::SYNC_FINISH) { + QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, storeInfo.storeName }; + UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code); + } retryer(GetInterval(code), code); } - : evt.GetAsyncDetail(), syncParam); + : GetCallback(evt.GetAsyncDetail(), storeInfo), syncParam); if (status != E_OK && async) { detail.code = status; async(std::move(details)); @@ -322,7 +346,7 @@ std::function SyncManager::GetClientChangeHandler() SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo) { if (times >= RETRY_TIMES) { - return [info = SyncInfo(syncInfo)](Duration, int32_t code) mutable { + return [info = SyncInfo(syncInfo)](Duration, int32_t code) mutable { if (code == E_OK) { return true; } @@ -423,6 +447,129 @@ AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, return store; } +void SyncManager::GetCloudSyncInfo(SyncInfo &info, CloudInfo &cloud, + std::vector> &cloudSyncInfos) +{ + if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true)) { + ZLOGE("not exist cloud metadata, user: %{public}d.", cloud.user); + return; + } + auto schemas = GetSchemaMeta(cloud, info.bundleName_); + if (schemas.empty()) { + ZLOGE("not exist schema metadata, user: %{public}d, bundleName: %{public}s", cloud.user, + info.bundleName_.c_str()); + return; + } + for (auto &schema : schemas) { + if (!cloud.IsOn(schema.bundleName)) { + continue; + } + for (const auto &database : schema.databases) { + if (!info.Contains(database.name)) { + continue; + } + QueryKey queryKey{ .accountId = cloud.id, .bundleName = schema.bundleName, .storeId = database.name }; + cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_)); + } + } +} + +int32_t SyncManager::QueryLastSyncInfo(const std::vector &queryKeys, QueryLastResults &results) +{ + for (auto &queryKey : queryKeys) { + auto it = lastSyncInfos_.find(queryKey); + if (it == lastSyncInfos_.end()) { + return SUCCESS; + } + std::lock_guard lock(mutex_); + it->second.ForEach([&queryKey, &results](uint64_t syncId, CloudSyncInfo &info) { + // -1 means sync not finish + if (info.code != -1) { + results.insert(std::pair(queryKey.storeId, info)); + } + return SUCCESS; + }); + } + return SUCCESS; +} + +void SyncManager::UpdateStartSyncInfo(SyncInfo &syncInfo, CloudInfo &cloud, + std::vector> &cloudSyncInfos) +{ + CloudSyncInfo info; + info.startTime = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + for (auto &[queryKey, syncId] : cloudSyncInfos) { + lastSyncInfos_[queryKey][syncId] = info; + } +} + +void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code) +{ + auto it = lastSyncInfos_.find(queryKey); + if (it == lastSyncInfos_.end()) { + return; + } + auto [isExist, info] = it->second.Find(syncId); + if (!isExist) { + return; + } + std::lock_guard lock(mutex_); + it->second.EraseIf([syncId](uint64_t id, CloudSyncInfo &info) { + // -1 means sync not finish + return syncId != id && info.code != -1; + }); + info.finishTime = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + info.code = code; + lastSyncInfos_[queryKey][syncId] = info; +} + +std::function SyncManager::GetCallback(const GenAsync &async, + const StoreInfo &storeInfo) +{ + return [this, async, storeInfo](const GenDetails &result) { + if (async != nullptr) { + async(result); + } + + if (result.empty()) { + ZLOGE("result is empty"); + return; + } + + if (result.begin()->second.progress != GenProgress::SYNC_FINISH) { + return; + } + + auto id = GetAccountId(storeInfo.user); + if (id.empty()) { + return; + } + QueryKey queryKey{ + .accountId = id, + .bundleName = storeInfo.bundleName, + .storeId = storeInfo.storeName + }; + + int32_t code = result.begin()->second.code; + UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code); + }; +} + +std::string SyncManager::GetAccountId(int32_t user) +{ + CloudInfo cloudInfo; + cloudInfo.user = user; + if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) { + ZLOGE("not exist meta, user:%{public}d.", cloudInfo.user); + return ""; + } + return cloudInfo.id; +} + ExecutorPool::Duration SyncManager::GetInterval(int32_t code) { switch (code) { @@ -434,4 +581,23 @@ ExecutorPool::Duration SyncManager::GetInterval(int32_t code) return RETRY_INTERVAL; } } + +std::vector SyncManager::GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName) +{ + std::vector schemas; + auto key = cloud.GetSchemaPrefix(bundleName); + MetaDataManager::GetInstance().LoadMeta(key, schemas, true); + return schemas; +} + +void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo) +{ + auto &detail = details[SyncInfo::DEFAULT_ID]; + if (async) { + detail.code = E_ERROR; + async(std::move(details)); + } + QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, storeInfo.storeName }; + UpdateFinishSyncInfo(queryKey, storeInfo.syncId, E_ERROR); +} } // namespace OHOS::CloudData \ No newline at end of file diff --git a/services/distributeddataservice/service/cloud/sync_manager.h b/services/distributeddataservice/service/cloud/sync_manager.h index b862ab0ee18cbc468fd65790c9dc380a934b6cdd..a32dc782b178307c1189fc8720404ca2090c3356 100644 --- a/services/distributeddataservice/service/cloud/sync_manager.h +++ b/services/distributeddataservice/service/cloud/sync_manager.h @@ -24,6 +24,8 @@ #include "concurrent_map.h" #include "cloud/cloud_info.h" #include "cloud/sync_strategy.h" +#include "cloud_types.h" +#include "cloud/cloud_event.h" namespace OHOS::CloudData { class SyncManager { public: @@ -33,6 +35,8 @@ public: using RefCount = DistributedData::RefCount; using AutoCache = DistributedData::AutoCache; using StoreMetaData = DistributedData::StoreMetaData; + using Store = std::string; + using SchemaMeta = DistributedData::SchemaMeta; static AutoCache::Store GetStore(const StoreMetaData &meta, int32_t user, bool mustBind = true); class SyncInfo final { public: @@ -71,6 +75,7 @@ public: int32_t Bind(std::shared_ptr executor); int32_t DoCloudSync(SyncInfo syncInfo); int32_t StopCloudSync(int32_t user = 0); + int32_t QueryLastSyncInfo(const std::vector &queryKeys, QueryLastResults &results); private: using Event = DistributedData::Event; @@ -81,6 +86,11 @@ private: using CloudInfo = DistributedData::CloudInfo; using StoreInfo = DistributedData::StoreInfo; using SyncStrategy = DistributedData::SyncStrategy; + using SyncId = uint64_t; + using SyncIdCloudInfos = ConcurrentMap; + using GeneralError = DistributedData::GeneralError; + using GenProgress = DistributedData::GenProgress; + using GenDetails = DistributedData::GenDetails; static constexpr ExecutorPool::Duration RETRY_INTERVAL = std::chrono::seconds(10); // second static constexpr ExecutorPool::Duration LOCKED_INTERVAL = std::chrono::seconds(30); // second @@ -99,13 +109,27 @@ private: Retryer GetRetryer(int32_t times, const SyncInfo &syncInfo); RefCount GenSyncRef(uint64_t syncId); int32_t Compare(uint64_t syncId, int32_t user); - bool IsValid(SyncInfo &info, CloudInfo &cloud); + GeneralError IsValid(SyncInfo &info, CloudInfo &cloud); + void GetCloudSyncInfo(SyncInfo &info, CloudInfo &cloud, + std::vector> &cloudSyncInfos); + void UpdateStartSyncInfo(SyncInfo &syncInfo, CloudInfo &cloud, + std::vector> &cloudSyncInfos); + void UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code); + std::function GetCallback(const GenAsync &async, + const StoreInfo &storeInfo); + std::string GetAccountId(int32_t user); + std::function GetPostEventTask(const std::vector &schemas, CloudInfo &cloud, SyncInfo &info, + bool retry); + std::vector GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName); + void DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo); static std::atomic genId_; std::shared_ptr executor_; ConcurrentMap actives_; ConcurrentMap activeInfos_; std::shared_ptr syncStrategy_; + std::map lastSyncInfos_; + std::mutex mutex_; }; } // namespace OHOS::CloudData -#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H +#endif // OHOS_DISTRIBUTED_DATA_SERVICES_CLOUD_SYNC_MANAGER_H \ No newline at end of file