diff --git a/services/distributeddataservice/app/src/kvstore_data_service.cpp b/services/distributeddataservice/app/src/kvstore_data_service.cpp index 22532caa335d14388d9351d73331329f6bde79de..57b0120c51ae12329a3b359f7e58ebcc48553643 100644 --- a/services/distributeddataservice/app/src/kvstore_data_service.cpp +++ b/services/distributeddataservice/app/src/kvstore_data_service.cpp @@ -610,7 +610,7 @@ void KvStoreDataService::AccountEventChanged(const AccountEventInfo &eventInfo) if (meta.user != eventInfo.userId) { continue; } - ZLOGI("bundlname:%s, user:%s", meta.bundleName.c_str(), meta.user.c_str()); + ZLOGI("bundleName:%{public}s, user:%{public}s", meta.bundleName.c_str(), meta.user.c_str()); MetaDataManager::GetInstance().DelMeta(meta.GetKey()); MetaDataManager::GetInstance().DelMeta(meta.GetStrategyKey()); MetaDataManager::GetInstance().DelMeta(meta.GetSecretKey(), true); diff --git a/services/distributeddataservice/framework/include/store/auto_cache.h b/services/distributeddataservice/framework/include/store/auto_cache.h index 7bb8b4493093223a442059240760d66ef78d3e4c..7967fe87d0aeafe9aae2f1fe7a36eba54a10cfaa 100644 --- a/services/distributeddataservice/framework/include/store/auto_cache.h +++ b/services/distributeddataservice/framework/include/store/auto_cache.h @@ -15,6 +15,7 @@ #ifndef OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_STORE_AUTO_CACHE_H #define OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_STORE_AUTO_CACHE_H +#include #include #include #include @@ -32,6 +33,7 @@ class AutoCache { public: using Error = GeneralError; using Store = std::shared_ptr; + using Stores = std::list>; using Watcher = GeneralWatcher; using Watchers = std::set>; using Time = std::chrono::steady_clock::time_point; @@ -46,6 +48,8 @@ public: API_EXPORT Store GetStore(const StoreMetaData &meta, const Watchers &watchers); + API_EXPORT Stores GetStoresIfPresent(uint32_t tokenId, const std::string &storeName = ""); + API_EXPORT void CloseStore(uint32_t tokenId, const std::string &storeId); API_EXPORT void CloseStore(uint32_t tokenId); diff --git a/services/distributeddataservice/framework/include/store/general_store.h b/services/distributeddataservice/framework/include/store/general_store.h index c39a328b4fd4f5395d7ace0c98805ee44d87d6c9..05f3bfa9862e1d454d4be21fac49464d0854b673 100644 --- a/services/distributeddataservice/framework/include/store/general_store.h +++ b/services/distributeddataservice/framework/include/store/general_store.h @@ -43,6 +43,10 @@ public: CLOUD_END, MODE_BUTT = CLOUD_END, }; + enum HighMode { + MANUAL_SYNC_MODE = 0x00000, + AUTO_SYNC_MODE = 0x10000, + }; enum CleanMode { NEARBY_DATA = 0, CLOUD_DATA, @@ -51,6 +55,21 @@ public: CLEAN_MODE_BUTT }; + static inline int32_t MixMode(int32_t syncMode, int32_t HighMode) + { + return syncMode | HighMode; + } + + static inline int32_t GetSyncMode(int32_t mixMode) + { + return mixMode & 0xFFFF; + } + + static inline int32_t GetHighMode(int32_t mixMode) + { + return mixMode & ~0xFFFF; + } + struct BindInfo { BindInfo(std::shared_ptr db = nullptr, std::shared_ptr loader = nullptr) : db_(std::move(db)), loader_(std::move(loader)) @@ -87,6 +106,10 @@ public: virtual int32_t Unwatch(int32_t origin, Watcher &watcher) = 0; + virtual int32_t RegisterDetailProgressObserver(DetailAsync async) = 0; + + virtual int32_t UnregisterDetailProgressObserver() = 0; + virtual int32_t Close() = 0; virtual int32_t AddRef() = 0; diff --git a/services/distributeddataservice/framework/store/auto_cache.cpp b/services/distributeddataservice/framework/store/auto_cache.cpp index acc206e4c3065de535de06e9c16afc5827a86e93..b4f8647c6096bfd3a86f16641f27f4759ce9e436 100644 --- a/services/distributeddataservice/framework/store/auto_cache.cpp +++ b/services/distributeddataservice/framework/store/auto_cache.cpp @@ -83,6 +83,25 @@ AutoCache::Store AutoCache::GetStore(const StoreMetaData &meta, const Watchers & return store; } +AutoCache::Stores AutoCache::GetStoresIfPresent(uint32_t tokenId, const std::string& storeName) +{ + Stores stores; + stores_.ComputeIfPresent(tokenId, [&stores, &storeName](auto&, std::map& delegates) -> bool { + if (storeName.empty()) { + for (auto& [_, delegate] : delegates) { + stores.push_back(delegate); + } + } else { + auto it = delegates.find(storeName); + if (it != delegates.end()) { + stores.push_back(it->second); + } + } + return !stores.empty(); + }); + return stores; +} + // Should be used within stores_'s thread safe methods void AutoCache::StartTimer() { diff --git a/services/distributeddataservice/service/cloud/sync_manager.h b/services/distributeddataservice/service/cloud/sync_manager.h index d31fa28621f22659ed40d8b172ab97efcec74cf5..67ae2e149af2bb57c5139f4f05e238c4fbee1a32 100644 --- a/services/distributeddataservice/service/cloud/sync_manager.h +++ b/services/distributeddataservice/service/cloud/sync_manager.h @@ -53,7 +53,7 @@ public: private: friend SyncManager; uint64_t syncId_ = 0; - int32_t mode_ = GenStore::CLOUD_TIME_FIRST; + int32_t mode_ = GenStore::MixMode(GenStore::CLOUD_TIME_FIRST, GenStore::AUTO_SYNC_MODE); int32_t user_ = 0; int32_t wait_ = 0; std::string id_ = DEFAULT_ID; diff --git a/services/distributeddataservice/service/rdb/rdb_general_store.cpp b/services/distributeddataservice/service/rdb/rdb_general_store.cpp index 5385952e3b5fa5aaa654f8b5ca520963669586b0..348b58034a2d34e82a39c691590c448372cac11d 100644 --- a/services/distributeddataservice/service/rdb/rdb_general_store.cpp +++ b/services/distributeddataservice/service/rdb/rdb_general_store.cpp @@ -204,7 +204,8 @@ int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &qu } else { dbQuery = rdbQuery->GetQuery(); } - auto dbMode = DistributedDB::SyncMode(mode); + auto syncMode = GeneralStore::GetSyncMode(mode); + auto dbMode = DistributedDB::SyncMode(syncMode); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, " @@ -212,10 +213,10 @@ int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &qu devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode, wait); return GeneralError::E_ALREADY_CLOSED; } - auto status = (mode < NEARBY_END) + auto status = (syncMode < NEARBY_END) ? delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), wait != 0) - : (mode > NEARBY_END && mode < CLOUD_END) - ? delegate_->Sync(devices, dbMode, dbQuery, GetDBProcessCB(std::move(async)), wait) + : (syncMode > NEARBY_END && syncMode < CLOUD_END) + ? delegate_->Sync(devices, dbMode, dbQuery, GetDBProcessCB(std::move(async), GetHighMode(mode)), wait) : DistributedDB::INVALID_ARGS; return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR; } @@ -297,13 +298,13 @@ RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async) }; } -RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async) +RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, int32_t highMode) { - if (!async) { - return [](auto &) {}; + if (!async && (highMode == MANUAL_SYNC_MODE || !async_)) { + return [](auto&) {}; } - return [async = std::move(async)](const std::map &processes) { + return [async, autoAsync = async_, highMode](const std::map& processes) { DistributedData::GenDetails details; for (auto &[id, process] : processes) { auto &detail = details[id]; @@ -322,6 +323,9 @@ RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async) } } async(details); + if (highMode == AUTO_SYNC_MODE && autoAsync) { + autoAsync(details); + } }; } @@ -409,6 +413,18 @@ bool RdbGeneralStore::IsValid() return delegate_ != nullptr; } +int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async) +{ + async_ = std::move(async); + return GenErr::E_OK; +} + +int32_t RdbGeneralStore::UnregisterDetailProgressObserver() +{ + async_ = nullptr; + return GenErr::E_OK; +} + void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data) { if (!HasWatcher()) { diff --git a/services/distributeddataservice/service/rdb/rdb_general_store.h b/services/distributeddataservice/service/rdb/rdb_general_store.h index 915f7b5552f43a382596e3e4b13d77f5a166be15..5a8ea4b468f04ea5c5e7a1dc13998836d523934d 100644 --- a/services/distributeddataservice/service/rdb/rdb_general_store.h +++ b/services/distributeddataservice/service/rdb/rdb_general_store.h @@ -57,6 +57,8 @@ public: int32_t Clean(const std::vector &devices, int32_t mode, const std::string &tableName) override; int32_t Watch(int32_t origin, Watcher &watcher) override; int32_t Unwatch(int32_t origin, Watcher &watcher) override; + int32_t RegisterDetailProgressObserver(DetailAsync async) override; + int32_t UnregisterDetailProgressObserver() override; int32_t Close() override; int32_t AddRef() override; int32_t Release() override; @@ -88,13 +90,14 @@ private: std::string storeId_; }; DBBriefCB GetDBBriefCB(DetailAsync async); - DBProcessCB GetDBProcessCB(DetailAsync async); + DBProcessCB GetDBProcessCB(DetailAsync async, int32_t highMode = AUTO_SYNC_MODE); std::shared_ptr RemoteQuery(const std::string &device, const DistributedDB::RemoteCondition &remoteCondition); ObserverProxy observer_; RdbManager manager_; RdbDelegate *delegate_ = nullptr; + DetailAsync async_ = nullptr; std::shared_ptr rdbCloud_ {}; std::shared_ptr rdbLoader_ {}; BindInfo bindInfo_; diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp index 1d9d74015d16237f7c5896830d665ba17fbd8b44..ede8209fb6c60f045839654e634ff9b21c9b92e3 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp @@ -16,6 +16,7 @@ #include "rdb_notifier_proxy.h" #include "itypes_util.h" #include "log_print.h" +#include "utils/anonymous.h" namespace OHOS::DistributedRdb { using NotifierIFCode = RelationalStore::IRdbNotifierInterfaceCode; @@ -44,7 +45,7 @@ int32_t RdbNotifierProxy::OnComplete(uint32_t seqNum, Details &&result) MessageOption option(MessageOption::TF_ASYNC); if (Remote()->SendRequest( static_cast(NotifierIFCode::RDB_NOTIFIER_CMD_SYNC_COMPLETE), data, reply, option) != 0) { - ZLOGE("send request failed"); + ZLOGE("seqNum:%{public}u, send request failed", seqNum); return RDB_ERROR; } return RDB_OK; @@ -66,7 +67,28 @@ int32_t RdbNotifierProxy::OnChange(const Origin &origin, const PrimaryFields &pr MessageOption option; if (Remote()->SendRequest( static_cast(NotifierIFCode::RDB_NOTIFIER_CMD_DATA_CHANGE), data, reply, option) != 0) { - ZLOGE("send request failed"); + ZLOGE("storeName:%{public}s, send request failed", DistributedData::Anonymous::Change(origin.store).c_str()); + return RDB_ERROR; + } + return RDB_OK; +} + +int32_t RdbNotifierProxy::OnComplete(const std::string& storeName, Details&& result) +{ + MessageParcel data; + if (!data.WriteInterfaceToken(GetDescriptor())) { + ZLOGE("write descriptor failed"); + return RDB_ERROR; + } + if (!ITypesUtil::Marshal(data, storeName, result)) { + return RDB_ERROR; + } + + MessageParcel reply; + MessageOption option(MessageOption::TF_ASYNC); + if (Remote()->SendRequest( + static_cast(NotifierIFCode::RDB_NOTIFIER_CMD_AUTO_SYNC_COMPLETE), data, reply, option) != 0) { + ZLOGE("storeName:%{public}s, send request failed", DistributedData::Anonymous::Change(storeName).c_str()); return RDB_ERROR; } return RDB_OK; diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h index 991fcae4f7e6106589828e1b00f12149b0f17d2b..d9b1d66a745539da6c4bfbae157627e28df82269 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h @@ -31,6 +31,8 @@ public: int32_t OnComplete(uint32_t seqNum, Details &&result) override; + int32_t OnComplete(const std::string& storeName, Details &&result) override; + int32_t OnChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) override; private: diff --git a/services/distributeddataservice/service/rdb/rdb_query.cpp b/services/distributeddataservice/service/rdb/rdb_query.cpp index 733427c97057620bd7b52c07dd55722765186e94..f6fca44ac20f8b7c9748bae2ad183fda7b168149 100644 --- a/services/distributeddataservice/service/rdb/rdb_query.cpp +++ b/services/distributeddataservice/service/rdb/rdb_query.cpp @@ -19,9 +19,6 @@ #include "value_proxy.h" namespace OHOS::DistributedRdb { using namespace DistributedData; - -RdbQuery::RdbQuery(bool isRemote) : isRemote_(isRemote) {} - bool RdbQuery::IsEqual(uint64_t tid) { return tid == TYPE_ID; @@ -32,13 +29,10 @@ std::vector RdbQuery::GetTables() return tables_; } -void RdbQuery::SetDevices(const std::vector &devices) -{ - devices_ = devices; -} - -void RdbQuery::SetSql(const std::string &sql, DistributedData::Values &&args) +void RdbQuery::MakeRemoteQuery(const std::string& devices, const std::string& sql, Values&& args) { + isRemote_ = true; + devices_ = { devices }; sql_ = sql; args_ = std::move(args); } diff --git a/services/distributeddataservice/service/rdb/rdb_query.h b/services/distributeddataservice/service/rdb/rdb_query.h index 41088bd49ae29f7728c3656ace03ee48999d3621..9c236121e1a61045349fa5308b8c2c555b3eccac 100644 --- a/services/distributeddataservice/service/rdb/rdb_query.h +++ b/services/distributeddataservice/service/rdb/rdb_query.h @@ -25,7 +25,6 @@ public: using Predicates = NativeRdb::RdbPredicates; static constexpr uint64_t TYPE_ID = 0x20000001; RdbQuery() = default; - explicit RdbQuery(bool isRemote); ~RdbQuery() override = default; @@ -35,10 +34,9 @@ public: DistributedDB::Query GetQuery() const; DistributedDB::RemoteCondition GetRemoteCondition() const; bool IsRemoteQuery(); - void SetDevices(const std::vector &devices); - void SetSql(const std::string &sql, DistributedData::Values &&args); void FromTable(const std::vector &tables); void MakeQuery(const PredicatesMemo &predicates); + void MakeRemoteQuery(const std::string &devices, const std::string &sql, DistributedData::Values &&args); private: void EqualTo(const RdbPredicateOperation& operation); diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index 2f955dd7d7fe497ded4d18c7d805b39ae3faf1f0..695fce0cc1080d100b82e93485a222927e87a152 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -89,7 +89,7 @@ RdbServiceImpl::RdbServiceImpl() }); auto process = [this](const Event &event) { auto &evt = static_cast(event); - auto storeInfo = evt.GetStoreInfo(); + auto &storeInfo = evt.GetStoreInfo(); StoreMetaData meta; meta.storeId = storeInfo.storeName; meta.bundleName = storeInfo.bundleName; @@ -107,6 +107,7 @@ RdbServiceImpl::RdbServiceImpl() ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str()); return; } + store->RegisterDetailProgressObserver(GetCallbacks(meta.tokenId, storeInfo.storeName)); }; EventCenter::GetInstance().Subscribe(CloudEvent::CLOUD_SYNC, process); } @@ -155,22 +156,23 @@ int32_t RdbServiceImpl::ResolveAutoLaunch(const std::string &identifier, Distrib int32_t RdbServiceImpl::OnAppExit(pid_t uid, pid_t pid, uint32_t tokenId, const std::string &bundleName) { - OnClientDied(pid); - return E_OK; -} - -void RdbServiceImpl::OnClientDied(pid_t pid) -{ - ZLOGI("client dead pid=%{public}d", pid); - syncAgents_.EraseIf([pid](auto &key, SyncAgent &agent) { + ZLOGI("client dead, tokenId:%{public}d, pid:%{public}d ", tokenId, pid); + syncAgents_.EraseIf([pid](auto& key, SyncAgent& agent) { if (agent.pid_ != pid) { return false; } if (agent.watcher_ != nullptr) { agent.watcher_->SetNotifier(nullptr); } + auto stores = AutoCache::GetInstance().GetStoresIfPresent(key); + for (auto store : stores) { + if (store != nullptr) { + store->UnregisterDetailProgressObserver(); + } + } return true; }); + return E_OK; } bool RdbServiceImpl::CheckAccess(const std::string& bundleName, const std::string& storeName) @@ -286,6 +288,25 @@ AutoCache::Watchers RdbServiceImpl::GetWatchers(uint32_t tokenId, const std::str return { agent.watcher_ }; } +RdbServiceImpl::DetailAsync RdbServiceImpl::GetCallbacks(uint32_t tokenId, const std::string& storeName) +{ + sptr notifier = nullptr; + syncAgents_.ComputeIfPresent(tokenId, [&storeName, ¬ifier](auto, SyncAgent& syncAgent) { + if (syncAgent.callBackStores_.count(storeName) != 0) { + notifier = syncAgent.notifier_; + } + return true; + }); + if (notifier == nullptr) { + return nullptr; + } + return [notifier, storeName](const GenDetails& details) { + if (notifier != nullptr) { + notifier->OnComplete(storeName, HandleGenDetails(details)); + } + }; +} + int32_t RdbServiceImpl::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql, const std::vector& selectionArgs, sptr& resultSet) { @@ -298,10 +319,8 @@ int32_t RdbServiceImpl::RemoteQuery(const RdbSyncerParam& param, const std::stri ZLOGE("store is null"); return RDB_ERROR; } - auto values = ValueProxy::Convert(selectionArgs); - RdbQuery rdbQuery(true); - rdbQuery.SetDevices(DmAdapter::ToUUID({ device })); - rdbQuery.SetSql(sql, std::move(values)); + RdbQuery rdbQuery; + rdbQuery.MakeRemoteQuery(device, sql, ValueProxy::Convert(selectionArgs)); auto cursor = store->Query("", rdbQuery); if (cursor == nullptr) { ZLOGE("Query failed, cursor is null"); @@ -390,9 +409,12 @@ void RdbServiceImpl::DoCloudSync(const RdbSyncerParam ¶m, const RdbService:: async(HandleGenDetails(details)); } }; - - auto info = ChangeEvent::EventInfo(option.mode, (option.isAsync ? 0 : WAIT_TIME), - (option.isAsync && option.seqNum == 0), query, (option.isAsync ? asyncCallback : syncCallback)); + auto mixMode = GeneralStore::MixMode(option.mode, + option.isAutoSync ? GeneralStore::AUTO_SYNC_MODE : GeneralStore::MANUAL_SYNC_MODE); + auto info = ChangeEvent::EventInfo(mixMode, (option.isAsync ? 0 : WAIT_TIME), option.isAutoSync, query, + option.isAutoSync ? nullptr + : option.isAsync ? asyncCallback + : syncCallback); auto evt = std::make_unique(std::move(storeInfo), std::move(info)); EventCenter::GetInstance().PostEvent(std::move(evt)); } @@ -420,7 +442,7 @@ int32_t RdbServiceImpl::Subscribe(const RdbSyncerParam ¶m, const SubscribeOp }); if (isCreate) { AutoCache::GetInstance().SetObserver(tokenId, RemoveSuffix(param.storeName_), - GetWatchers(tokenId, param.storeName_)); + GetWatchers(tokenId, param.storeName_)); } return RDB_OK; } @@ -445,7 +467,63 @@ int32_t RdbServiceImpl::UnSubscribe(const RdbSyncerParam ¶m, const Subscribe return true; }); if (destroyed) { - AutoCache::GetInstance().SetObserver(tokenId, param.storeName_, GetWatchers(tokenId, param.storeName_)); + AutoCache::GetInstance().SetObserver(tokenId, RemoveSuffix(param.storeName_), + GetWatchers(tokenId, param.storeName_)); + } + return RDB_OK; +} + +int32_t RdbServiceImpl::RegisterAutoSyncCallback(const RdbSyncerParam& param, + std::shared_ptr observer) +{ + pid_t pid = IPCSkeleton::GetCallingPid(); + auto tokenId = IPCSkeleton::GetCallingTokenID(); + auto storeName = RemoveSuffix(param.storeName_); + bool isCreate = false; + syncAgents_.Compute(tokenId, [pid, ¶m, &storeName, &isCreate](auto, SyncAgent& agent) { + if (pid != agent.pid_) { + agent.ReInit(pid, param.bundleName_); + } + auto it = agent.callBackStores_.find(storeName); + if (it == agent.callBackStores_.end()) { + agent.callBackStores_.insert(std::make_pair(storeName, 0)); + isCreate = true; + } + agent.callBackStores_[storeName]++; + return true; + }); + if (isCreate) { + auto stores = AutoCache::GetInstance().GetStoresIfPresent(tokenId, storeName); + if (!stores.empty() && *stores.begin() != nullptr) { + (*stores.begin())->RegisterDetailProgressObserver(GetCallbacks(tokenId, storeName)); + } + } + return RDB_OK; +} + +int32_t RdbServiceImpl::UnregisterAutoSyncCallback(const RdbSyncerParam& param, + std::shared_ptr observer) +{ + auto tokenId = IPCSkeleton::GetCallingTokenID(); + auto storeName = RemoveSuffix(param.storeName_); + bool destroyed = false; + syncAgents_.ComputeIfPresent(tokenId, [&storeName, &destroyed](auto, SyncAgent& agent) { + auto it = agent.callBackStores_.find(storeName); + if (it == agent.callBackStores_.end()) { + return true; + } + it->second--; + if (it->second <= 0) { + agent.callBackStores_.erase(it); + destroyed = true; + } + return true; + }); + if (destroyed) { + auto stores = AutoCache::GetInstance().GetStoresIfPresent(tokenId, storeName); + if (!stores.empty() && *stores.begin() != nullptr) { + (*stores.begin())->UnregisterDetailProgressObserver(); + } } return RDB_OK; } @@ -654,6 +732,7 @@ void RdbServiceImpl::SyncAgent::ReInit(pid_t pid, const std::string &bundleName) { pid_ = pid; count_ = 0; + callBackStores_.clear(); bundleName_ = bundleName; notifier_ = nullptr; if (watcher_ != nullptr) { diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.h b/services/distributeddataservice/service/rdb/rdb_service_impl.h index 771c1ced64653fd6dc070a8639bf7e8c338893e6..75d878b40a456ea0e94e70efb3187fb24c1f6b95 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.h +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.h @@ -38,12 +38,11 @@ class API_EXPORT RdbServiceImpl : public RdbServiceStub { public: using StoreMetaData = OHOS::DistributedData::StoreMetaData; using SecretKeyMetaData = DistributedData::SecretKeyMetaData; + using DetailAsync = DistributedData::GeneralStore::DetailAsync; using Handler = std::function> &)>; RdbServiceImpl(); virtual ~RdbServiceImpl(); - void OnClientDied(pid_t pid); - /* IPC interface */ std::string ObtainDistributedTableName(const std::string& device, const std::string& table) override; @@ -63,6 +62,12 @@ public: int32_t UnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option, RdbStoreObserver *observer) override; + int32_t RegisterAutoSyncCallback(const RdbSyncerParam& param, + std::shared_ptr observer) override; + + int32_t UnregisterAutoSyncCallback(const RdbSyncerParam& param, + std::shared_ptr observer) override; + int32_t ResolveAutoLaunch(const std::string &identifier, DistributedDB::AutoLaunchParam ¶m) override; int32_t OnAppExit(pid_t uid, pid_t pid, uint32_t tokenId, const std::string &bundleName) override; @@ -81,6 +86,7 @@ private: struct SyncAgent { pid_t pid_ = 0; int32_t count_ = 0; + std::map callBackStores_; std::string bundleName_; sptr notifier_ = nullptr; std::shared_ptr watcher_ = nullptr; @@ -126,6 +132,8 @@ private: Watchers GetWatchers(uint32_t tokenId, const std::string &storeName); + DetailAsync GetCallbacks(uint32_t tokenId, const std::string &storeName); + bool CheckAccess(const std::string& bundleName, const std::string& storeName); std::shared_ptr GetStore(const RdbSyncerParam& param); diff --git a/services/distributeddataservice/service/rdb/rdb_service_stub.cpp b/services/distributeddataservice/service/rdb/rdb_service_stub.cpp index 2e42628bb02e1350b45e69cdf4b5f75cfd90f2e5..9c461576b491cd09e2e366d1cf61140303e8a60b 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_stub.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_stub.cpp @@ -227,4 +227,38 @@ int RdbServiceStub::OnRemoteRequest(uint32_t code, MessageParcel& data, MessageP } return RDB_ERROR; } + +int32_t RdbServiceStub::OnRemoteRegisterDetailProgressObserver(MessageParcel& data, MessageParcel& reply) +{ + RdbSyncerParam param; + if (!ITypesUtil::Unmarshal(data, param)) { + ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s", param.bundleName_.c_str(), + Anonymous::Change(param.storeName_).c_str()); + return IPC_STUB_INVALID_DATA_ERR; + } + + auto status = RegisterAutoSyncCallback(param, nullptr); + if (!ITypesUtil::Marshal(reply, status)) { + ZLOGE("Marshal status:0x%{public}x", status); + return IPC_STUB_WRITE_PARCEL_ERR; + } + return RDB_OK; +} + +int32_t RdbServiceStub::OnRemoteUnregisterDetailProgressObserver(MessageParcel& data, MessageParcel& reply) +{ + RdbSyncerParam param; + if (!ITypesUtil::Unmarshal(data, param)) { + ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s", param.bundleName_.c_str(), + Anonymous::Change(param.storeName_).c_str()); + return IPC_STUB_INVALID_DATA_ERR; + } + + auto status = UnregisterAutoSyncCallback(param, nullptr); + if (!ITypesUtil::Marshal(reply, status)) { + ZLOGE("Marshal status:0x%{public}x", status); + return IPC_STUB_WRITE_PARCEL_ERR; + } + return RDB_OK; +} } // namespace OHOS::DistributedRdb diff --git a/services/distributeddataservice/service/rdb/rdb_service_stub.h b/services/distributeddataservice/service/rdb/rdb_service_stub.h index 8b4aff5f3f16ed68fb212749b47063a6953c46fb..5ccd8e98c14e4eff7523b1c596fc7b9710cfe364 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_stub.h +++ b/services/distributeddataservice/service/rdb/rdb_service_stub.h @@ -50,6 +50,10 @@ private: int32_t OnRemoteDoUnSubscribe(MessageParcel& data, MessageParcel& reply); + int32_t OnRemoteRegisterDetailProgressObserver(MessageParcel& data, MessageParcel& reply); + + int32_t OnRemoteUnregisterDetailProgressObserver(MessageParcel& data, MessageParcel& reply); + int32_t OnRemoteDoRemoteQuery(MessageParcel& data, MessageParcel& reply); using RequestHandle = int (RdbServiceStub::*)(MessageParcel &, MessageParcel &); @@ -65,7 +69,11 @@ private: [static_cast(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE)] = &RdbServiceStub::OnRemoteDoUnSubscribe, [static_cast(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY)] = &RdbServiceStub::OnRemoteDoRemoteQuery, [static_cast(RdbServiceCode::RDB_SERVICE_CMD_GET_SCHEMA)] = &RdbServiceStub::OnGetSchema, - [static_cast(RdbServiceCode::RDB_SERVICE_CMD_DELETE)] = &RdbServiceStub::OnDelete + [static_cast(RdbServiceCode::RDB_SERVICE_CMD_DELETE)] = &RdbServiceStub::OnDelete, + [static_cast(RdbServiceCode::RDB_SERVICE_CMD_REGISTER_AUTOSYNC_PROGRESS_OBSERVER)] = + &RdbServiceStub::OnRemoteRegisterDetailProgressObserver, + [static_cast(RdbServiceCode::RDB_SERVICE_CMD_UNREGISTER_AUTOSYNC_PROGRESS_OBSERVER)] = + &RdbServiceStub::OnRemoteUnregisterDetailProgressObserver }; }; } // namespace OHOS::DistributedRdb