From cda1aa0726e38d6042af8dde032f7a325587b533 Mon Sep 17 00:00:00 2001 From: htt1997 Date: Sat, 3 Jun 2023 14:58:47 +0800 Subject: [PATCH 1/5] refactor:sync Signed-off-by: htt1997 --- .../framework/cloud/cloud_event.cpp | 9 +- .../framework/include/cloud/cloud_event.h | 6 +- .../framework/include/error/general_error.h | 2 + .../framework/include/store/auto_cache.h | 3 +- .../framework/include/store/general_store.h | 21 +- .../framework/include/store/general_value.h | 19 +- .../framework/include/store/general_watcher.h | 29 +- .../framework/store/auto_cache.cpp | 30 +- .../service/rdb/rdb_general_store.cpp | 7 +- .../service/rdb/rdb_general_store.h | 3 +- .../service/rdb/rdb_notifier_proxy.cpp | 2 +- .../service/rdb/rdb_notifier_proxy.h | 3 +- .../service/rdb/rdb_service_impl.cpp | 319 +++++++++--------- .../service/rdb/rdb_service_impl.h | 89 +++-- .../service/rdb/rdb_service_stub.cpp | 26 +- .../service/rdb/rdb_service_stub.h | 18 - .../service/rdb/rdb_store_observer_impl.cpp | 6 +- .../service/rdb/rdb_store_observer_impl.h | 7 +- .../service/rdb/rdb_syncer.cpp | 104 +++--- .../service/rdb/rdb_syncer.h | 17 +- .../service/rdb/rdb_watcher.cpp | 37 +- .../service/rdb/rdb_watcher.h | 18 +- .../service/test/cloud_data_test.cpp | 4 +- 23 files changed, 376 insertions(+), 403 deletions(-) diff --git a/services/distributeddataservice/framework/cloud/cloud_event.cpp b/services/distributeddataservice/framework/cloud/cloud_event.cpp index 4cc139bd1..8146446d3 100644 --- a/services/distributeddataservice/framework/cloud/cloud_event.cpp +++ b/services/distributeddataservice/framework/cloud/cloud_event.cpp @@ -16,16 +16,11 @@ #include "cloud/cloud_event.h" namespace OHOS::DistributedData { -CloudEvent::CloudEvent(int32_t evtId, CloudEvent::StoreInfo storeInfo, const std::string &featureName) - : Event(evtId), featureName_(featureName), storeInfo_(storeInfo) +CloudEvent::CloudEvent(int32_t evtId, CloudEvent::StoreInfo storeInfo) + : Event(evtId), storeInfo_(std::move(storeInfo)) { } -const std::string& CloudEvent::GetFeatureName() const -{ - return featureName_; -} - const CloudEvent::StoreInfo& CloudEvent::GetStoreInfo() const { return storeInfo_; diff --git a/services/distributeddataservice/framework/include/cloud/cloud_event.h b/services/distributeddataservice/framework/include/cloud/cloud_event.h index fbb09716e..ae67857f0 100644 --- a/services/distributeddataservice/framework/include/cloud/cloud_event.h +++ b/services/distributeddataservice/framework/include/cloud/cloud_event.h @@ -26,6 +26,8 @@ public: FEATURE_INIT = EVT_CLOUD, GET_SCHEMA, DATA_CHANGE, + LOCAL_CHANGE, + CLOUD_SYNC, CLOUD_BUTT }; @@ -37,13 +39,11 @@ public: int32_t user = 0; }; - CloudEvent(int32_t evtId, StoreInfo storeInfo, const std::string &featureName = "relational_store"); + CloudEvent(int32_t evtId, StoreInfo storeInfo); ~CloudEvent() = default; - const std::string& GetFeatureName() const; const StoreInfo& GetStoreInfo() const; private: - std::string featureName_; StoreInfo storeInfo_; }; } // namespace OHOS::DistributedData diff --git a/services/distributeddataservice/framework/include/error/general_error.h b/services/distributeddataservice/framework/include/error/general_error.h index 8c60a9162..27ec5595a 100644 --- a/services/distributeddataservice/framework/include/error/general_error.h +++ b/services/distributeddataservice/framework/include/error/general_error.h @@ -25,6 +25,8 @@ enum GeneralError : int32_t { E_NOT_SUPPORT, E_ALREADY_CONSUMED, E_ALREADY_CLOSED, + E_UNOPENED, + E_RETRY_TIMEOUT, E_BUTT, }; } diff --git a/services/distributeddataservice/framework/include/store/auto_cache.h b/services/distributeddataservice/framework/include/store/auto_cache.h index ca1a5e800..b5e91e689 100644 --- a/services/distributeddataservice/framework/include/store/auto_cache.h +++ b/services/distributeddataservice/framework/include/store/auto_cache.h @@ -64,8 +64,7 @@ private: bool Close(); int32_t GetUser() const; void SetObservers(const Watchers &watchers); - int32_t OnChange(Origin origin, const std::string &id) override; - int32_t OnChange(Origin origin, const std::string &id, const std::vector &values) override; + int32_t OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values) override; private: mutable Time time_; diff --git a/services/distributeddataservice/framework/include/store/general_store.h b/services/distributeddataservice/framework/include/store/general_store.h index 579634d28..ad7985ea2 100644 --- a/services/distributeddataservice/framework/include/store/general_store.h +++ b/services/distributeddataservice/framework/include/store/general_store.h @@ -18,7 +18,6 @@ #include #include -#include "cloud/schema_meta.h" #include "store/cursor.h" #include "store/general_value.h" #include "store/general_watcher.h" @@ -29,9 +28,21 @@ struct Database; class GeneralStore { public: using Watcher = GeneralWatcher; - using AsyncDetail = std::function; - using AsyncStatus = std::function>)>; + using DetailAsync = GenAsync; using Devices = std::vector; + enum SyncMode { + NEARBY_BEGIN, + NEARBY_PUSH = NEARBY_BEGIN, + NEARBY_PULL, + NEARBY_PULL_PUSH, + NEARBY_END, + CLOUD_BEGIN = NEARBY_END, + CLOUD_TIME_FIRST = CLOUD_BEGIN, + CLOUD_NATIVE_FIRST, + CLOUD_ClOUD_FIRST, + CLOUD_END, + MODE_BUTT = CLOUD_END, + }; struct BindInfo { BindInfo(std::shared_ptr db = nullptr, std::shared_ptr loader = nullptr) @@ -57,9 +68,7 @@ public: virtual std::shared_ptr Query(const std::string &table, GenQuery &query) = 0; - virtual int32_t Sync(const Devices &devices, int32_t mode, GenQuery &query, AsyncDetail async, int32_t wait) = 0; - - virtual int32_t Sync(const Devices &devices, int32_t mode, GenQuery &query, AsyncStatus async, int32_t wait) = 0; + virtual int32_t Sync(const Devices &devices, int32_t mode, GenQuery &query, DetailAsync async, int32_t wait) = 0; virtual int32_t Watch(int32_t origin, Watcher &watcher) = 0; diff --git a/services/distributeddataservice/framework/include/store/general_value.h b/services/distributeddataservice/framework/include/store/general_value.h index 9ec687945..18df10a15 100644 --- a/services/distributeddataservice/framework/include/store/general_value.h +++ b/services/distributeddataservice/framework/include/store/general_value.h @@ -15,6 +15,7 @@ #ifndef OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_GENERAL_VALUE_H #define OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_GENERAL_VALUE_H +#include #include #include #include @@ -24,28 +25,28 @@ #include "error/general_error.h" #include "traits.h" namespace OHOS::DistributedData { -enum Progress { +enum GenProgress { SYNC_BEGIN, SYNC_IN_PROGRESS, SYNC_FINISH, }; -struct Statistic { +struct GenStatistic { int32_t total; int32_t success; int32_t failed; int32_t untreated; }; -struct TableDetails { - Statistic upload; - Statistic download; +struct GenTableDetail { + GenStatistic upload; + GenStatistic download; }; -struct ProgressDetails { +struct GenProgressDetail { int32_t progress; int32_t code; - std::map details; + std::map details; }; struct Asset { @@ -61,6 +62,7 @@ struct Asset { struct GenQuery { virtual ~GenQuery() = default; virtual bool IsEqual(uint64_t tid) = 0; + virtual std::vector GetTables() = 0; template int32_t QueryInterface(T *&query) @@ -79,7 +81,8 @@ using Value = std::variant; using VBucket = std::map; using VBuckets = std::vector; - +using GenDetails = std::map; +using GenAsync = std::function; template inline constexpr size_t TYPE_INDEX = Traits::variant_index_of_v; diff --git a/services/distributeddataservice/framework/include/store/general_watcher.h b/services/distributeddataservice/framework/include/store/general_watcher.h index 52130eab7..4583aa9d9 100644 --- a/services/distributeddataservice/framework/include/store/general_watcher.h +++ b/services/distributeddataservice/framework/include/store/general_watcher.h @@ -21,24 +21,33 @@ namespace OHOS::DistributedData { class GeneralWatcher { public: - enum Origin : int32_t { - ORIGIN_CLOUD, - ORIGIN_LOCAL, - ORIGIN_REMOTE, - ORIGIN_ALL, - ORIGIN_BUTT, + struct Origin { + enum OriginType : int32_t { + ORIGIN_LOCAL, + ORIGIN_NEARBY, + ORIGIN_CLOUD, + ORIGIN_ALL, + ORIGIN_BUTT, + }; + int32_t origin = ORIGIN_BUTT; + // origin is ORIGIN_LOCAL, the id is empty + // origin is ORIGIN_NEARBY, the id is networkId; + // origin is ORIGIN_CLOUD, the id is the cloud account id + std::vector id; + std::string store; }; - enum ChangeOp : int32_t { OP_INSERT, OP_UPDATE, OP_DELETE, OP_BUTT, }; - + // PK primary key + using PRIValue = std::variant; + using PRIFields = std::map; + using ChangeInfo = std::map[OP_BUTT]>; virtual ~GeneralWatcher() = default; - virtual int32_t OnChange(Origin origin, const std::string &id) = 0; - virtual int32_t OnChange(Origin origin, const std::string &id, const std::vector &values) = 0; + virtual int32_t OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values) = 0; }; } #endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_GENERAL_WATCHER_H diff --git a/services/distributeddataservice/framework/store/auto_cache.cpp b/services/distributeddataservice/framework/store/auto_cache.cpp index 8b44d688c..ec9dea51e 100644 --- a/services/distributeddataservice/framework/store/auto_cache.cpp +++ b/services/distributeddataservice/framework/store/auto_cache.cpp @@ -147,14 +147,14 @@ AutoCache::Delegate::Delegate(GeneralStore *delegate, const Watchers &watchers, { time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL); if (store_ != nullptr) { - store_->Watch(ORIGIN_ALL, *this); + store_->Watch(Origin::ORIGIN_ALL, *this); } } AutoCache::Delegate::~Delegate() { if (store_ != nullptr) { - store_->Unwatch(ORIGIN_ALL, *this); + store_->Unwatch(Origin::ORIGIN_ALL, *this); store_->Close(); store_ = nullptr; } @@ -175,7 +175,7 @@ bool AutoCache::Delegate::Close() { std::unique_lock lock(mutex_); if (store_ != nullptr) { - store_->Unwatch(ORIGIN_ALL, *this); + store_->Unwatch(Origin::ORIGIN_ALL, *this); auto status = store_->Close(); if (status == Error::E_BUSY) { return false; @@ -196,34 +196,20 @@ int32_t AutoCache::Delegate::GetUser() const return user_; } -int32_t AutoCache::Delegate::OnChange(Origin origin, const std::string &id) +int32_t AutoCache::Delegate::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values) { Watchers watchers; { std::unique_lock lock(mutex_); watchers = watchers_; } - for (auto watcher : watchers) { + size_t remain = watchers.size(); + for (auto &watcher : watchers) { + remain--; if (watcher == nullptr) { continue; } - watcher->OnChange(origin, id); - } - return Error::E_OK; -} - -int32_t AutoCache::Delegate::OnChange(Origin origin, const std::string &id, const std::vector &values) -{ - Watchers watchers; - { - std::unique_lock lock(mutex_); - watchers = watchers_; - } - for (auto watcher : watchers) { - if (watcher == nullptr) { - continue; - } - watcher->OnChange(origin, id, values); + watcher->OnChange(origin, primaryFields, (remain != 0) ? ChangeInfo(values) : std::move(values)); } return Error::E_OK; } diff --git a/services/distributeddataservice/service/rdb/rdb_general_store.cpp b/services/distributeddataservice/service/rdb/rdb_general_store.cpp index 8ccc8d1df..91d58fbb0 100644 --- a/services/distributeddataservice/service/rdb/rdb_general_store.cpp +++ b/services/distributeddataservice/service/rdb/rdb_general_store.cpp @@ -148,12 +148,7 @@ std::shared_ptr RdbGeneralStore::Query(const std::string &table, GenQuer return std::shared_ptr(); } -int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &query, AsyncDetail async, int32_t wait) -{ - return GeneralError::E_NOT_SUPPORT; -} - -int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &query, AsyncStatus async, int32_t wait) +int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &query, DetailAsync async, int32_t wait) { return GeneralError::E_NOT_SUPPORT; } diff --git a/services/distributeddataservice/service/rdb/rdb_general_store.h b/services/distributeddataservice/service/rdb/rdb_general_store.h index f337fe564..30972e02f 100644 --- a/services/distributeddataservice/service/rdb/rdb_general_store.h +++ b/services/distributeddataservice/service/rdb/rdb_general_store.h @@ -46,8 +46,7 @@ public: int32_t Delete(const std::string &table, const std::string &sql, Values &&args) override; std::shared_ptr Query(const std::string &table, const std::string &sql, Values &&args) override; std::shared_ptr Query(const std::string &table, GenQuery &query) override; - int32_t Sync(const Devices &devices, int32_t mode, GenQuery &query, AsyncDetail async, int32_t wait) override; - int32_t Sync(const Devices &devices, int32_t mode, GenQuery &query, AsyncStatus async, int32_t wait) override; + int32_t Sync(const Devices &devices, int32_t mode, GenQuery &query, DetailAsync async, int32_t wait) override; int32_t Watch(int32_t origin, Watcher &watcher) override; int32_t Unwatch(int32_t origin, Watcher &watcher) override; int32_t Close() override; diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp index 8818a0927..97ba6bea9 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp @@ -27,7 +27,7 @@ RdbNotifierProxy::~RdbNotifierProxy() noexcept ZLOGI("destroy"); } -int32_t RdbNotifierProxy::OnComplete(uint32_t seqNum, const SyncResult &result) +int32_t RdbNotifierProxy::OnComplete(uint32_t seqNum, Details &&result) { MessageParcel data; if (!data.WriteInterfaceToken(GetDescriptor())) { diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h index 375cc7456..d0d9c1cb8 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h @@ -29,10 +29,9 @@ public: explicit RdbNotifierProxy(const sptr& object); virtual ~RdbNotifierProxy() noexcept; - int32_t OnComplete(uint32_t seqNum, const SyncResult& result) override; + int32_t OnComplete(uint32_t seqNum, Details &&result) override; int32_t OnChange(const std::string& storeName, const std::vector& devices) override; - private: static inline BrokerDelegator delegator_; }; diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index 24b656daf..bf35f9ad9 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -27,7 +27,6 @@ #include "metadata/appid_meta_data.h" #include "metadata/meta_data_manager.h" #include "metadata/store_meta_data.h" -#include "permission/permission_validator.h" #include "rdb_watcher.h" #include "rdb_notifier_proxy.h" #include "rdb_query.h" @@ -68,25 +67,6 @@ RdbServiceImpl::Factory::~Factory() { } -RdbServiceImpl::DeathRecipientImpl::DeathRecipientImpl(const DeathCallback& callback) - : callback_(callback) -{ - ZLOGI("construct"); -} - -RdbServiceImpl::DeathRecipientImpl::~DeathRecipientImpl() -{ - ZLOGI("destroy"); -} - -void RdbServiceImpl::DeathRecipientImpl::OnRemoteDied(const wptr &object) -{ - ZLOGI("enter"); - if (callback_) { - callback_(); - } -} - RdbServiceImpl::RdbServiceImpl() : autoLaunchObserver_(this) { ZLOGI("construct"); @@ -94,7 +74,7 @@ RdbServiceImpl::RdbServiceImpl() : autoLaunchObserver_(this) [this](const std::string& identifier, DistributedDB::AutoLaunchParam ¶m) { return ResolveAutoLaunch(identifier, param); }); - EventCenter::GetInstance().Subscribe(CloudEvent::DATA_CHANGE, [this](const Event &event) { + auto process = [this](const Event &event) { auto &evt = static_cast(event); auto storeInfo = evt.GetStoreInfo(); StoreMetaData meta; @@ -104,20 +84,18 @@ RdbServiceImpl::RdbServiceImpl() : autoLaunchObserver_(this) meta.instanceId = storeInfo.instanceId; meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid; if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta)) { - ZLOGE("meta empty, bundleName:%{public}s, storeId:%{public}s", - meta.bundleName.c_str(), meta.storeId.c_str()); + ZLOGE("meta empty, bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(), + Anonymous::Change(meta.storeId).c_str()); return; } auto watchers = GetWatchers(meta.tokenId, meta.storeId); auto store = AutoCache::GetInstance().GetStore(meta, watchers); if (store == nullptr) { - ZLOGE("store null, storeId:%{public}s", meta.storeId.c_str()); + ZLOGE("store null, storeId:%{public}s", Anonymous::Change(meta.storeId).c_str()); return; } - for (const auto &watcher : watchers) { // mock for datachange - watcher->OnChange(GeneralWatcher::Origin::ORIGIN_CLOUD, {}); - } - }); + }; + EventCenter::GetInstance().Subscribe(CloudEvent::CLOUD_SYNC, process); } int32_t RdbServiceImpl::ResolveAutoLaunch(const std::string &identifier, DistributedDB::AutoLaunchParam ¶m) @@ -161,6 +139,12 @@ int32_t RdbServiceImpl::ResolveAutoLaunch(const std::string &identifier, Distrib return false; } +int32_t RdbServiceImpl::OnAppExit(pid_t uid, pid_t pid, uint32_t tokenId, const std::string &bundleName) +{ + OnClientDied(pid); + return E_OK; +} + void RdbServiceImpl::OnClientDied(pid_t pid) { ZLOGI("client dead pid=%{public}d", pid); @@ -171,10 +155,10 @@ void RdbServiceImpl::OnClientDied(pid_t pid) } return false; }); - notifiers_.Erase(pid); - identifiers_.EraseIf([pid](const auto& key, pid_t& value) { - return pid == value; + identifiers_.EraseIf([pid](const auto &key, std::pair &value) { + return value.first == pid; }); + syncAgents_.EraseIf([pid](auto &key, SyncAgent &agent) { return agent.pid_ == pid; }); } bool RdbServiceImpl::CheckAccess(const std::string& bundleName, const std::string& storeName) @@ -184,7 +168,7 @@ bool RdbServiceImpl::CheckAccess(const std::string& bundleName, const std::strin storeInfo.tokenId = IPCSkeleton::GetCallingTokenID(); storeInfo.bundleName = bundleName; storeInfo.storeId = RdbSyncer::RemoveSuffix(storeName); - auto instanceId = RdbSyncer::GetInstIndex(storeInfo.tokenId, storeInfo.bundleName); + auto [instanceId, user] = RdbSyncer::GetInstIndexAndUser(storeInfo.tokenId, storeInfo.bundleName); if (instanceId != 0) { return false; } @@ -212,26 +196,23 @@ int32_t RdbServiceImpl::InitNotifier(const RdbSyncerParam ¶m, const sptr(notifier); pid_t pid = IPCSkeleton::GetCallingPid(); - auto recipient = new (std::nothrow) DeathRecipientImpl([this, pid] { - OnClientDied(pid); + uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); + syncAgents_.Compute(tokenId, [¶m, notifierProxy, pid](auto, SyncAgent &agent) { + if (pid != agent.pid_) { + agent.ReInit(pid, param.bundleName_); + } + agent.SetNotifier(notifierProxy); + return true; }); - if (recipient == nullptr) { - ZLOGE("malloc recipient failed"); - return RDB_ERROR; - } - - if (!notifier->AddDeathRecipient(recipient)) { - ZLOGE("link to death failed"); - return RDB_ERROR; - } - notifiers_.Insert(pid, iface_cast(notifier)); - ZLOGI("success pid=%{public}d", pid); + ZLOGI("success tokenId:%{public}x, pid=%{public}d", tokenId, pid); return RDB_OK; } -void RdbServiceImpl::OnDataChange(pid_t pid, const DistributedDB::StoreChangedData &data) +void RdbServiceImpl::OnDataChange(pid_t pid, uint32_t tokenId, const DistributedDB::StoreChangedData &data) { DistributedDB::StoreProperty property; data.GetStoreProperty(property); @@ -239,20 +220,21 @@ void RdbServiceImpl::OnDataChange(pid_t pid, const DistributedDB::StoreChangedDa if (pid == 0) { auto identifier = RelationalStoreManager::GetRelationalStoreIdentifier(property.userId, property.appId, property.storeId); - auto pair = identifiers_.Find(TransferStringToHex(identifier)); - if (!pair.first) { + auto [success, info] = identifiers_.Find(TransferStringToHex(identifier)); + if (!success) { ZLOGI("client doesn't subscribe"); return; } - pid = pair.second; - ZLOGI("fixed pid=%{public}d", pid); + pid = info.first; + tokenId = info.second; + ZLOGI("fixed pid=%{public}d and tokenId=0x%{public}d", pid, tokenId); } - notifiers_.ComputeIfPresent(pid, [&data, &property] (const auto& key, const sptr& value) { + auto [success, agent] = syncAgents_.Find(tokenId); + if (success && agent.notifier_ != nullptr && pid == agent.pid_) { std::string device = data.GetDataChangeDevice(); auto networkId = DmAdapter::GetInstance().ToNetworkID(device); - value->OnChange(property.storeId, { networkId }); - return true; - }); + agent.notifier_->OnChange(property.storeId, { networkId }); + } } void RdbServiceImpl::SyncerTimeout(std::shared_ptr syncer) @@ -260,9 +242,10 @@ void RdbServiceImpl::SyncerTimeout(std::shared_ptr syncer) if (syncer == nullptr) { return; } - ZLOGI("%{public}s", syncer->GetStoreId().c_str()); - syncers_.ComputeIfPresent(syncer->GetPid(), [this, &syncer](const auto& key, StoreSyncersType& syncers) { - syncers.erase(syncer->GetStoreId()); + auto storeId = syncer->GetStoreId(); + ZLOGI("%{public}s", storeId.c_str()); + syncers_.ComputeIfPresent(syncer->GetPid(), [this, storeId](const auto& key, StoreSyncersType& syncers) { + syncers.erase(storeId); syncerNum_--; return true; }); @@ -293,7 +276,7 @@ std::shared_ptr RdbServiceImpl::GetRdbSyncer(const RdbSyncerParam &pa ZLOGE("pid: %{public}d, syncers size: %{public}zu. syncerNum: %{public}d", pid, syncers.size(), syncerNum_); return !syncers.empty(); } - auto rdbObserver = new (std::nothrow) RdbStoreObserverImpl(this, pid); + auto rdbObserver = new (std::nothrow) RdbStoreObserverImpl(this, pid, tokenId); if (rdbObserver == nullptr) { return !syncers.empty(); } @@ -314,14 +297,15 @@ std::shared_ptr RdbServiceImpl::GetRdbSyncer(const RdbSyncerParam &pa }); if (syncer != nullptr) { - identifiers_.Insert(syncer->GetIdentifier(), pid); + identifiers_.Insert(syncer->GetIdentifier(), { pid, tokenId }); } else { ZLOGE("syncer is nullptr"); } return syncer; } -int32_t RdbServiceImpl::SetDistributedTables(const RdbSyncerParam ¶m, const std::vector &tables) +int32_t RdbServiceImpl::SetDistributedTables(const RdbSyncerParam ¶m, const std::vector &tables, + int32_t type) { ZLOGI("enter"); if (!CheckAccess(param.bundleName_, param.storeName_)) { @@ -332,47 +316,48 @@ int32_t RdbServiceImpl::SetDistributedTables(const RdbSyncerParam ¶m, const if (syncer == nullptr) { return RDB_ERROR; } - return syncer->SetDistributedTables(tables); + return syncer->SetDistributedTables(tables, type); } -int32_t RdbServiceImpl::DoSync(const RdbSyncerParam ¶m, const SyncOption &option, - const RdbPredicates &predicates, SyncResult &result) +std::pair RdbServiceImpl::DoSync(const RdbSyncerParam ¶m, const Option &option, + const RdbPredicates &pred) { if (!CheckAccess(param.bundleName_, param.storeName_)) { ZLOGE("permission error"); - return RDB_ERROR; + return {RDB_ERROR, {}}; } auto syncer = GetRdbSyncer(param); if (syncer == nullptr) { - return RDB_ERROR; + return {RDB_ERROR, {}}; } - return syncer->DoSync(option, predicates, result); + Details details = {}; + auto status = syncer->DoSync(option, pred, [&details](auto &&result) mutable { details = std::move(result); }); + return { status, std::move(details) }; } -void RdbServiceImpl::OnAsyncComplete(pid_t pid, uint32_t seqNum, const SyncResult &result) +void RdbServiceImpl::OnAsyncComplete(uint32_t tokenId, uint32_t seqNum, Details &&result) { - ZLOGI("pid=%{public}d seqnum=%{public}u", pid, seqNum); - notifiers_.ComputeIfPresent(pid, [seqNum, &result] (const auto& key, const sptr& value) { - value->OnComplete(seqNum, result); - return true; - }); + ZLOGI("pid=%{public}x seqnum=%{public}u", tokenId, seqNum); + auto [success, agent] = syncAgents_.Find(tokenId); + if (success && agent.notifier_ != nullptr) { + agent.notifier_->OnComplete(seqNum, std::move(result)); + } } -int32_t RdbServiceImpl::DoAsync(const RdbSyncerParam ¶m, uint32_t seqNum, const SyncOption &option, - const RdbPredicates &predicates) +int32_t RdbServiceImpl::DoAsync(const RdbSyncerParam ¶m, const Option &option, const RdbPredicates &pred) { if (!CheckAccess(param.bundleName_, param.storeName_)) { ZLOGE("permission error"); return RDB_ERROR; } - pid_t pid = IPCSkeleton::GetCallingPid(); - ZLOGI("seq num=%{public}u", seqNum); + auto tokenId = IPCSkeleton::GetCallingTokenID(); + ZLOGI("seq num=%{public}u", option.seqNum); auto syncer = GetRdbSyncer(param); if (syncer == nullptr) { return RDB_ERROR; } - return syncer->DoAsync(option, predicates, [this, pid, seqNum](const SyncResult &result) { - OnAsyncComplete(pid, seqNum, result); + return syncer->DoSync(option, pred, [this, tokenId, seqNum = option.seqNum](Details &&result) { + OnAsyncComplete(tokenId, seqNum, std::move(result)); }); } @@ -404,30 +389,91 @@ std::string RdbServiceImpl::GenIdentifier(const RdbSyncerParam ¶m) return TransferStringToHex(identifier); } -int32_t RdbServiceImpl::DoSubscribe(const RdbSyncerParam& param, const SubscribeOption &option) +AutoCache::Watchers RdbServiceImpl::GetWatchers(uint32_t tokenId, const std::string &storeName) +{ + auto [success, agent] = syncAgents_.Find(tokenId); + if (agent.watcher_ == nullptr) { + return {}; + } + return { agent.watcher_ }; +} + +void RdbServiceImpl::SyncAgent::ReInit(pid_t pid, const std::string &bundleName) +{ + pid_ = pid; + count_ = 0; + bundleName_ = bundleName; + notifier_ = nullptr; + if (watcher_ != nullptr) { + watcher_->SetNotifier(nullptr); + } +} + +void RdbServiceImpl::SyncAgent::SetNotifier(sptr notifier) +{ + notifier_ = notifier; + if (watcher_ != nullptr) { + watcher_->SetNotifier(notifier); + } +} + +void RdbServiceImpl::SyncAgent::SetWatcher(std::shared_ptr watcher) +{ + if (watcher_ != watcher) { + watcher_ = watcher; + watcher_->SetNotifier(notifier_); + } +} + +int32_t RdbServiceImpl::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql, + const std::vector& selectionArgs, sptr& resultSet) +{ + if (!CheckAccess(param.bundleName_, param.storeName_)) { + ZLOGE("permission error"); + return RDB_ERROR; + } + auto syncer = GetRdbSyncer(param); + if (syncer == nullptr) { + ZLOGE("syncer is null"); + return RDB_ERROR; + } + return syncer->RemoteQuery(device, sql, selectionArgs, resultSet); +} + +int32_t RdbServiceImpl::Sync(const RdbSyncerParam ¶m, const Option &option, const RdbPredicates &predicates, + const AsyncDetail &async) +{ + if (!option.isAsync) { + auto [status, details] = DoSync(param, option, predicates); + async(std::move(details)); + return status; + } + return DoAsync(param, option, predicates); +} + +int32_t RdbServiceImpl::Subscribe(const RdbSyncerParam ¶m, const SubscribeOption &option, + RdbStoreObserver *observer) { pid_t pid = IPCSkeleton::GetCallingPid(); auto tokenId = IPCSkeleton::GetCallingTokenID(); switch (option.mode) { case SubscribeMode::REMOTE: { auto identifier = GenIdentifier(param); - ZLOGI("%{public}s %{public}.6s %{public}d", param.storeName_.c_str(), identifier.c_str(), pid); - identifiers_.Insert(identifier, pid); + identifiers_.Insert(identifier, std::pair { pid, tokenId }); + ZLOGI("%{public}s %{public}.6s %{public}d", Anonymous::Change(param.storeName_).c_str(), + identifier.c_str(), pid); break; } case SubscribeMode::CLOUD: // fallthrough case SubscribeMode::CLOUD_DETAIL: { - syncAgents_.Compute(tokenId, [this, pid, tokenId, ¶m, &option](auto &key, SyncAgent &value) { - if (pid != value.pid_) { - value.ReInit(pid, param.bundleName_); + syncAgents_.Compute(tokenId, [pid, ¶m](auto &key, SyncAgent &agent) { + if (pid != agent.pid_) { + agent.ReInit(pid, param.bundleName_); } - auto storeName = RdbSyncer::RemoveSuffix(param.storeName_); - auto it = value.watchers_.find(storeName); - if (it == value.watchers_.end()) { - auto watcher = std::make_shared(this, tokenId, storeName); - value.watchers_[storeName] = { watcher }; - value.mode_[storeName] = option.mode; + if (agent.watcher_ == nullptr) { + agent.SetWatcher(std::make_shared()); } + agent.count_++; return true; }); break; @@ -438,63 +484,24 @@ int32_t RdbServiceImpl::DoSubscribe(const RdbSyncerParam& param, const Subscribe return RDB_OK; } -void RdbServiceImpl::OnChange(uint32_t tokenId, const std::string &storeName) -{ - pid_t pid = 0; - syncAgents_.ComputeIfPresent(tokenId, [&pid, &storeName](auto &key, SyncAgent &value) { - pid = value.pid_; - return true; - }); - notifiers_.ComputeIfPresent(pid, [&storeName](const auto& key, const sptr& value) { - value->OnChange(storeName, { storeName }); - return true; - }); -} - -AutoCache::Watchers RdbServiceImpl::GetWatchers(uint32_t tokenId, const std::string &storeName) -{ - AutoCache::Watchers watchers; - syncAgents_.ComputeIfPresent(tokenId, [&storeName, &watchers](auto, SyncAgent &agent) { - auto it = agent.watchers_.find(storeName); - if (it != agent.watchers_.end()) { - watchers = it->second; - } - return true; - }); - return watchers; -} - -void RdbServiceImpl::SyncAgent::ReInit(pid_t pid, const std::string &bundleName) -{ - pid_ = pid; - bundleName_ = bundleName; - watchers_.clear(); - mode_.clear(); -} - -int32_t RdbServiceImpl::DoUnSubscribe(const RdbSyncerParam& param) +int32_t RdbServiceImpl::UnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option, + RdbStoreObserver *observer) { auto identifier = GenIdentifier(param); ZLOGI("%{public}s %{public}.6s", param.storeName_.c_str(), identifier.c_str()); identifiers_.Erase(identifier); + syncAgents_.ComputeIfPresent(IPCSkeleton::GetCallingTokenID(), [](auto &key, SyncAgent &agent) { + if (agent.count_ > 0) { + agent.count_--; + } + if (agent.count_ == 0) { + agent.SetWatcher(nullptr); + } + return true; + }); return RDB_OK; } -int32_t RdbServiceImpl::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql, - const std::vector& selectionArgs, sptr& resultSet) -{ - if (!CheckAccess(param.bundleName_, param.storeName_)) { - ZLOGE("permission error"); - return RDB_ERROR; - } - auto syncer = GetRdbSyncer(param); - if (syncer == nullptr) { - ZLOGE("syncer is null"); - return RDB_ERROR; - } - return syncer->RemoteQuery(device, sql, selectionArgs, resultSet); -} - int32_t RdbServiceImpl::OnInitialize() { return RDB_OK; @@ -512,16 +519,19 @@ int32_t RdbServiceImpl::GetSchema(const RdbSyncerParam ¶m) } if (executors_ != nullptr) { - CloudEvent::StoreInfo storeInfo { IPCSkeleton::GetCallingTokenID(), param.bundleName_, - RdbSyncer::RemoveSuffix(param.storeName_), - RdbSyncer::GetInstIndex(IPCSkeleton::GetCallingTokenID(), param.bundleName_) }; + CloudEvent::StoreInfo storeInfo; + storeInfo.tokenId = IPCSkeleton::GetCallingTokenID(); + storeInfo.bundleName = param.bundleName_; + storeInfo.storeName = RdbSyncer::RemoveSuffix(param.storeName_); + auto [instanceId, user]= RdbSyncer::GetInstIndexAndUser(storeInfo.tokenId, param.bundleName_); + storeInfo.instanceId = instanceId; + storeInfo.user = user; executors_->Execute([storeInfo]() { - auto event = std::make_unique(CloudEvent::GET_SCHEMA, std::move(storeInfo), - "relational_store"); + auto event = std::make_unique(CloudEvent::GET_SCHEMA, std::move(storeInfo)); EventCenter::GetInstance().PostEvent(move(event)); + return; }); } - return RDB_OK; } @@ -530,11 +540,12 @@ StoreMetaData RdbServiceImpl::GetStoreMetaData(const RdbSyncerParam ¶m) StoreMetaData metaData; metaData.uid = IPCSkeleton::GetCallingUid(); metaData.tokenId = IPCSkeleton::GetCallingTokenID(); - metaData.instanceId = RdbSyncer::GetInstIndex(metaData.tokenId, param.bundleName_); + auto [instanceId, user] = RdbSyncer::GetInstIndexAndUser(metaData.tokenId, param.bundleName_); + metaData.instanceId = instanceId; metaData.bundleName = param.bundleName_; metaData.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid; metaData.storeId = RdbSyncer::RemoveSuffix(param.storeName_); - metaData.user = std::to_string(AccountDelegate::GetInstance()->GetUserByToken(metaData.tokenId)); + metaData.user = std::to_string(user); metaData.storeType = param.type_; metaData.securityLevel = param.level_; metaData.area = param.area_; @@ -555,16 +566,16 @@ int32_t RdbServiceImpl::CreateMetaData(const RdbSyncerParam ¶m, StoreMetaDat old.area != meta.area)) { ZLOGE("meta bundle:%{public}s store:%{public}s type:%{public}d->%{public}d encrypt:%{public}d->%{public}d " "area:%{public}d->%{public}d", - meta.bundleName.c_str(), meta.storeId.c_str(), old.storeType, meta.storeType, old.isEncrypt, - meta.isEncrypt, old.area, meta.area); + meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), old.storeType, meta.storeType, + old.isEncrypt, meta.isEncrypt, old.area, meta.area); return RDB_ERROR; } if (!isCreated || meta != old) { Upgrade(param, old); ZLOGD("meta bundle:%{public}s store:%{public}s type:%{public}d->%{public}d encrypt:%{public}d->%{public}d " "area:%{public}d->%{public}d", - meta.bundleName.c_str(), meta.storeId.c_str(), old.storeType, meta.storeType, old.isEncrypt, - meta.isEncrypt, old.area, meta.area); + meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), old.storeType, meta.storeType, + old.isEncrypt, meta.isEncrypt, old.area, meta.area); MetaDataManager::GetInstance().SaveMeta(meta.GetKey(), meta); } AppIDMetaData appIdMeta; @@ -573,8 +584,8 @@ int32_t RdbServiceImpl::CreateMetaData(const RdbSyncerParam ¶m, StoreMetaDat if (!MetaDataManager::GetInstance().SaveMeta(appIdMeta.GetKey(), appIdMeta, true)) { ZLOGE("meta bundle:%{public}s store:%{public}s type:%{public}d->%{public}d encrypt:%{public}d->%{public}d " "area:%{public}d->%{public}d", - meta.bundleName.c_str(), meta.storeId.c_str(), old.storeType, meta.storeType, old.isEncrypt, - meta.isEncrypt, old.area, meta.area); + meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), old.storeType, meta.storeType, + old.isEncrypt, meta.isEncrypt, old.area, meta.area); return RDB_ERROR; } if (!param.isEncrypt_ || param.password_.empty()) { diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.h b/services/distributeddataservice/service/rdb/rdb_service_impl.h index 01831def7..a217cecbd 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.h +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.h @@ -26,8 +26,9 @@ #include "metadata/store_meta_data.h" #include "rdb_notifier_proxy.h" #include "rdb_syncer.h" -#include "store_observer.h" +#include "rdb_watcher.h" #include "store/auto_cache.h" +#include "store_observer.h" #include "visibility.h" namespace OHOS::DistributedRdb { class API_EXPORT RdbServiceImpl : public RdbServiceStub { @@ -41,45 +42,63 @@ public: /* IPC interface */ std::string ObtainDistributedTableName(const std::string& device, const std::string& table) override; - int32_t InitNotifier(const RdbSyncerParam ¶m, const sptr notifier) override; + int32_t InitNotifier(const RdbSyncerParam ¶m, sptr notifier) override; - int32_t SetDistributedTables(const RdbSyncerParam& param, const std::vector& tables) override; + int32_t SetDistributedTables(const RdbSyncerParam ¶m, const std::vector &tables, + int32_t type = DISTRIBUTED_DEVICE) override; int32_t RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql, const std::vector& selectionArgs, sptr& resultSet) override; - void OnDataChange(pid_t pid, const DistributedDB::StoreChangedData& data); + int32_t Sync(const RdbSyncerParam ¶m, const Option &option, const RdbPredicates &predicates, + const AsyncDetail &async) override; + + int32_t Subscribe(const RdbSyncerParam ¶m, const SubscribeOption &option, RdbStoreObserver *observer) override; - void OnChange(uint32_t tokenId, const std::string &storeName); + int32_t UnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option, + RdbStoreObserver *observer) override; + + void OnDataChange(pid_t pid, uint32_t tokenId, const DistributedDB::StoreChangedData& data); int32_t ResolveAutoLaunch(const std::string &identifier, DistributedDB::AutoLaunchParam ¶m) override; int32_t OnInitialize() override; + int32_t OnAppExit(pid_t uid, pid_t pid, uint32_t tokenId, const std::string &bundleName) override; + int32_t GetSchema(const RdbSyncerParam ¶m) override; int32_t OnBind(const BindInfo &bindInfo) override; -protected: - int32_t DoSync(const RdbSyncerParam& param, const SyncOption& option, - const RdbPredicates& predicates, SyncResult& result) override; - - int32_t DoAsync(const RdbSyncerParam& param, uint32_t seqNum, const SyncOption& option, - const RdbPredicates& predicates) override; - - int32_t DoSubscribe(const RdbSyncerParam& param, const SubscribeOption &option) override; - - int32_t DoUnSubscribe(const RdbSyncerParam& param) override; - private: using Watchers = DistributedData::AutoCache::Watchers; struct SyncAgent { pid_t pid_ = 0; + int32_t count_ = 0; std::string bundleName_; - std::map mode_; - std::map watchers_; + sptr notifier_ = nullptr; + std::shared_ptr watcher_ = nullptr; void ReInit(pid_t pid, const std::string &bundleName); + void SetNotifier(sptr notifier); + void SetWatcher(std::shared_ptr watcher); + }; + + class Factory { + public: + Factory(); + ~Factory(); + private: + std::shared_ptr product_; }; + using StoreSyncersType = std::map>; + + static constexpr int32_t MAX_SYNCER_NUM = 50; + static constexpr int32_t MAX_SYNCER_PER_PROCESS = 10; + static constexpr int32_t SYNCER_TIMEOUT = 60 * 1000; // ms + + std::pair DoSync(const RdbSyncerParam ¶m, const Option &option, const RdbPredicates &pred); + + int32_t DoAsync(const RdbSyncerParam ¶m, const Option &option, const RdbPredicates &pred); Watchers GetWatchers(uint32_t tokenId, const std::string &storeName); @@ -91,7 +110,7 @@ private: std::shared_ptr GetRdbSyncer(const RdbSyncerParam& param); - void OnAsyncComplete(pid_t pid, uint32_t seqNum, const SyncResult& result); + void OnAsyncComplete(uint32_t tokenId, uint32_t seqNum, Details&& result); int32_t CreateMetaData(const RdbSyncerParam ¶m, StoreMetaData &old); @@ -101,39 +120,15 @@ private: int32_t Upgrade(const RdbSyncerParam ¶m, const StoreMetaData &old); - class DeathRecipientImpl : public IRemoteObject::DeathRecipient { - public: - using DeathCallback = std::function; - explicit DeathRecipientImpl(const DeathCallback& callback); - ~DeathRecipientImpl() override; - void OnRemoteDied(const wptr &object) override; - private: - const DeathCallback callback_; - }; - class Factory { - public: - Factory(); - ~Factory(); - private: - std::shared_ptr product_; - }; + static std::string TransferStringToHex(const std::string& origStr); - using StoreSyncersType = std::map>; + static Factory factory_; int32_t syncerNum_ {}; ConcurrentMap syncers_; - ConcurrentMap> notifiers_; - ConcurrentMap identifiers_; + ConcurrentMap> identifiers_; + ConcurrentMap syncAgents_; RdbStoreObserverImpl autoLaunchObserver_; - - static Factory factory_; - - static std::string TransferStringToHex(const std::string& origStr); - - static constexpr int32_t MAX_SYNCER_NUM = 50; - static constexpr int32_t MAX_SYNCER_PER_PROCESS = 10; - static constexpr int32_t SYNCER_TIMEOUT = 60 * 1000; // ms std::shared_ptr executors_; - ConcurrentMap syncAgents_; }; } // namespace OHOS::DistributedRdb #endif diff --git a/services/distributeddataservice/service/rdb/rdb_service_stub.cpp b/services/distributeddataservice/service/rdb/rdb_service_stub.cpp index 034e98220..87ec6b42d 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_stub.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_stub.cpp @@ -94,16 +94,16 @@ int32_t RdbServiceStub::OnRemoteSetDistributedTables(MessageParcel &data, Messag int32_t RdbServiceStub::OnRemoteDoSync(MessageParcel &data, MessageParcel &reply) { RdbSyncerParam param; - SyncOption option {}; + Option option {}; RdbPredicates predicates; if (!ITypesUtil::Unmarshal(data, param, option, predicates)) { - ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s tables:%{public}s", param.bundleName_.c_str(), - param.storeName_.c_str(), predicates.table_.c_str()); + ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s tables:%{public}zu", param.bundleName_.c_str(), + param.storeName_.c_str(), predicates.tables_.size()); return IPC_STUB_INVALID_DATA_ERR; } - SyncResult result; - auto status = DoSync(param, option, predicates, result); + Details result = {}; + auto status = Sync(param, option, predicates, [&result](Details &&details) { result = std::move(details); }); if (!ITypesUtil::Marshal(reply, status, result)) { ZLOGE("Marshal status:0x%{public}x result size:%{public}zu", status, result.size()); return IPC_STUB_WRITE_PARCEL_ERR; @@ -114,16 +114,15 @@ int32_t RdbServiceStub::OnRemoteDoSync(MessageParcel &data, MessageParcel &reply int32_t RdbServiceStub::OnRemoteDoAsync(MessageParcel &data, MessageParcel &reply) { RdbSyncerParam param; - uint32_t seqNum; - SyncOption option {}; + Option option {}; RdbPredicates predicates; - if (!ITypesUtil::Unmarshal(data, param, seqNum, option, predicates)) { + if (!ITypesUtil::Unmarshal(data, param, option, predicates)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s seqNum:%{public}u tables:%{public}s", - param.bundleName_.c_str(), param.storeName_.c_str(), seqNum, predicates.table_.c_str()); + param.bundleName_.c_str(), param.storeName_.c_str(), option.seqNum, + (*(predicates.tables_.begin())).c_str()); return IPC_STUB_INVALID_DATA_ERR; } - - auto status = DoAsync(param, seqNum, option, predicates); + auto status = Sync(param, option, predicates, nullptr); if (!ITypesUtil::Marshal(reply, status)) { ZLOGE("Marshal status:0x%{public}x", status); return IPC_STUB_WRITE_PARCEL_ERR; @@ -141,7 +140,7 @@ int32_t RdbServiceStub::OnRemoteDoSubscribe(MessageParcel &data, MessageParcel & return IPC_STUB_INVALID_DATA_ERR; } - auto status = DoSubscribe(param, option); + auto status = Subscribe(param, option, nullptr); if (!ITypesUtil::Marshal(reply, status)) { ZLOGE("Marshal status:0x%{public}x", status); return IPC_STUB_WRITE_PARCEL_ERR; @@ -152,13 +151,14 @@ int32_t RdbServiceStub::OnRemoteDoSubscribe(MessageParcel &data, MessageParcel & int32_t RdbServiceStub::OnRemoteDoUnSubscribe(MessageParcel &data, MessageParcel &reply) { RdbSyncerParam param; + SubscribeOption option; if (!ITypesUtil::Unmarshal(data, param)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s", param.bundleName_.c_str(), param.storeName_.c_str()); return IPC_STUB_INVALID_DATA_ERR; } - auto status = DoUnSubscribe(param); + auto status = UnSubscribe(param, option, nullptr); if (!ITypesUtil::Marshal(reply, status)) { ZLOGE("Marshal status:0x%{public}x", status); return IPC_STUB_WRITE_PARCEL_ERR; diff --git a/services/distributeddataservice/service/rdb/rdb_service_stub.h b/services/distributeddataservice/service/rdb/rdb_service_stub.h index 04c57e577..b67c3e5b1 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_stub.h +++ b/services/distributeddataservice/service/rdb/rdb_service_stub.h @@ -27,24 +27,6 @@ public: DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedRdb.IRdbService"); int OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply) override; - int32_t Sync(const RdbSyncerParam& param, const SyncOption& option, - const RdbPredicates& predicates, const SyncCallback& callback) override - { - return 0; - } - - int32_t Subscribe(const RdbSyncerParam& param, const SubscribeOption& option, - RdbStoreObserver *observer) override - { - return 0; - } - - int32_t UnSubscribe(const RdbSyncerParam& param, const SubscribeOption& option, - RdbStoreObserver *observer) override - { - return 0; - } - private: static bool CheckInterfaceToken(MessageParcel& data); diff --git a/services/distributeddataservice/service/rdb/rdb_store_observer_impl.cpp b/services/distributeddataservice/service/rdb/rdb_store_observer_impl.cpp index 036eedb8d..e9e7ec53a 100644 --- a/services/distributeddataservice/service/rdb/rdb_store_observer_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_store_observer_impl.cpp @@ -20,8 +20,8 @@ #include "log_print.h" namespace OHOS::DistributedRdb { -RdbStoreObserverImpl::RdbStoreObserverImpl(RdbServiceImpl* owner, pid_t pid) - : pid_(pid), owner_(owner) +RdbStoreObserverImpl::RdbStoreObserverImpl(RdbServiceImpl* owner, pid_t pid, uint32_t tokenId) + : pid_(pid), tokenId_(tokenId), owner_(owner) { ZLOGI("construct"); } @@ -35,7 +35,7 @@ void RdbStoreObserverImpl::OnChange(const DistributedDB::StoreChangedData &data) { ZLOGI("enter"); if (owner_ != nullptr) { - owner_->OnDataChange(pid_, data); + owner_->OnDataChange(pid_, tokenId_, data); } } } // namespace OHOS::DistributedRdb diff --git a/services/distributeddataservice/service/rdb/rdb_store_observer_impl.h b/services/distributeddataservice/service/rdb/rdb_store_observer_impl.h index e0043f5f9..0c7af9015 100644 --- a/services/distributeddataservice/service/rdb/rdb_store_observer_impl.h +++ b/services/distributeddataservice/service/rdb/rdb_store_observer_impl.h @@ -23,15 +23,16 @@ namespace OHOS::DistributedRdb { class RdbServiceImpl; class RdbStoreObserverImpl : public DistributedDB::StoreObserver { public: - explicit RdbStoreObserverImpl(RdbServiceImpl* owner, pid_t pid = 0); + explicit RdbStoreObserverImpl(RdbServiceImpl* owner, pid_t pid = 0, uint32_t tokenId = 0); ~RdbStoreObserverImpl() override; void OnChange(const DistributedDB::StoreChangedData &data) override; private: - pid_t pid_ {}; - RdbServiceImpl* owner_ {}; + pid_t pid_ = 0; + uint32_t tokenId_ = 0; + RdbServiceImpl* owner_ = nullptr; }; } // namespace OHOS::DistributedRdb #endif diff --git a/services/distributeddataservice/service/rdb/rdb_syncer.cpp b/services/distributeddataservice/service/rdb/rdb_syncer.cpp index 5422c143b..2ccbe7b78 100644 --- a/services/distributeddataservice/service/rdb/rdb_syncer.cpp +++ b/services/distributeddataservice/service/rdb/rdb_syncer.cpp @@ -22,19 +22,13 @@ #include "checker/checker_manager.h" #include "crypto_manager.h" #include "device_manager_adapter.h" -#include "directory/directory_manager.h" -#include "kvstore_utils.h" #include "log_print.h" -#include "metadata/appid_meta_data.h" #include "metadata/meta_data_manager.h" -#include "metadata/store_meta_data.h" +#include "rdb_query.h" #include "rdb_result_set_impl.h" -#include "types.h" -#include "utils/constant.h" -#include "utils/converter.h" +#include "store/general_store.h" #include "types_export.h" - -using OHOS::DistributedKv::KvStoreUtils; +#include "utils/anonymous.h" using OHOS::DistributedKv::AccountDelegate; using namespace OHOS::Security::AccessToken; using namespace OHOS::DistributedData; @@ -169,11 +163,11 @@ int32_t RdbSyncer::InitDBDelegate(const StoreMetaData &meta) } option.observer = observer_; std::string fileName = meta.dataDir; - ZLOGI("path=%{public}s storeId=%{public}s", fileName.c_str(), meta.storeId.c_str()); + ZLOGI("path=%{public}s storeId=%{public}s", fileName.c_str(), Anonymous::Change(meta.storeId).c_str()); auto status = manager_->OpenStore(fileName, meta.storeId, option, delegate_); if (status != DistributedDB::DBStatus::OK) { ZLOGE("open store failed, path=%{public}s storeId=%{public}s status=%{public}d", - fileName.c_str(), meta.storeId.c_str(), status); + fileName.c_str(), Anonymous::Change(meta.storeId).c_str(), status); return RDB_ERROR; } ZLOGI("open store success"); @@ -182,10 +176,10 @@ int32_t RdbSyncer::InitDBDelegate(const StoreMetaData &meta) return RDB_OK; } -int32_t RdbSyncer::GetInstIndex(uint32_t tokenId, const std::string &bundleName) +std::pair RdbSyncer::GetInstIndexAndUser(uint32_t tokenId, const std::string &bundleName) { if (AccessTokenKit::GetTokenTypeFlag(tokenId) != TOKEN_HAP) { - return 0; + return { 0, 0 }; } HapTokenInfo tokenInfo; @@ -194,9 +188,9 @@ int32_t RdbSyncer::GetInstIndex(uint32_t tokenId, const std::string &bundleName) if (errCode != RET_SUCCESS) { ZLOGE("GetHapTokenInfo error:%{public}d, tokenId:0x%{public}x appId:%{public}s", errCode, tokenId, bundleName.c_str()); - return -1; + return { -1, -1 }; } - return tokenInfo.instIndex; + return { tokenInfo.instIndex, tokenInfo.userID }; } DistributedDB::RelationalStoreDelegate* RdbSyncer::GetDelegate() @@ -205,7 +199,7 @@ DistributedDB::RelationalStoreDelegate* RdbSyncer::GetDelegate() return delegate_; } -int32_t RdbSyncer::SetDistributedTables(const std::vector &tables) +int32_t RdbSyncer::SetDistributedTables(const std::vector &tables, int32_t type) { auto* delegate = GetDelegate(); if (delegate == nullptr) { @@ -215,7 +209,7 @@ int32_t RdbSyncer::SetDistributedTables(const std::vector &tables) for (const auto& table : tables) { ZLOGI("%{public}s", table.c_str()); - auto dBStatus = delegate->CreateDistributedTable(table); + auto dBStatus = delegate->CreateDistributedTable(table, static_cast(type)); if (dBStatus != DistributedDB::DBStatus::OK) { ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d", table.c_str(), dBStatus); return RDB_ERROR; @@ -234,7 +228,7 @@ std::vector RdbSyncer::GetConnectDevices() } ZLOGI("size=%{public}u", static_cast(devices.size())); for (const auto& device: devices) { - ZLOGI("%{public}s", KvStoreUtils::ToBeAnonymous(device).c_str()); + ZLOGI("%{public}s", Anonymous::Change(device).c_str()); } return devices; } @@ -245,19 +239,18 @@ std::vector RdbSyncer::NetworkIdToUUID(const std::vector %{public}s", KvStoreUtils::ToBeAnonymous(networkId).c_str(), - KvStoreUtils::ToBeAnonymous(uuid).c_str()); + ZLOGI("%{public}s <--> %{public}s", Anonymous::Change(networkId).c_str(), Anonymous::Change(uuid).c_str()); } return uuids; } -void RdbSyncer::HandleSyncStatus(const std::map> &syncStatus, - SyncResult &result) +Details RdbSyncer::HandleSyncStatus(const std::map> &syncStatus) { + Details details; for (const auto& status : syncStatus) { auto res = DistributedDB::DBStatus::OK; for (const auto& tableStatus : status.second) { @@ -272,8 +265,10 @@ void RdbSyncer::HandleSyncStatus(const std::map 1) { + query.FromTable(predicates.tables_); + } for (const auto& operation : predicates.operations_) { if (operation.operator_ >= 0 && operation.operator_ < OPERATOR_MAX) { HANDLES[operation.operator_](operation, query); @@ -333,7 +332,7 @@ DistributedDB::Query RdbSyncer::MakeQuery(const RdbPredicates &predicates) return query; } -int32_t RdbSyncer::DoSync(const SyncOption &option, const RdbPredicates &predicates, SyncResult &result) +int32_t RdbSyncer::DoSync(const Option &option, const RdbPredicates &predicates, const AsyncDetail &async) { ZLOGI("enter"); auto* delegate = GetDelegate(); @@ -342,44 +341,19 @@ int32_t RdbSyncer::DoSync(const SyncOption &option, const RdbPredicates &predica return RDB_ERROR; } - std::vector devices; - if (predicates.devices_.empty()) { - devices = NetworkIdToUUID(GetConnectDevices()); - } else { - devices = NetworkIdToUUID(predicates.devices_); + if (option.mode < DistributedData::GeneralStore::NEARBY_END) { + auto &networkIds = predicates.devices_; + auto devices = networkIds.empty() ? NetworkIdToUUID(GetConnectDevices()) : NetworkIdToUUID(networkIds); + return delegate->Sync( + devices, static_cast(option.mode), MakeQuery(predicates), + [async](const std::map> &syncStatus) { + async(HandleSyncStatus(syncStatus)); + }, + option.isAsync); + } else if (option.mode < DistributedData::GeneralStore::CLOUD_END) { + return RDB_OK; } - - ZLOGI("delegate sync"); - return delegate->Sync(devices, static_cast(option.mode), - MakeQuery(predicates), [&result] (const std::map> &syncStatus) { - HandleSyncStatus(syncStatus, result); - }, true); -} - -int32_t RdbSyncer::DoAsync(const SyncOption &option, const RdbPredicates &predicates, const SyncCallback& callback) -{ - auto* delegate = GetDelegate(); - if (delegate == nullptr) { - ZLOGE("delegate is nullptr"); - return RDB_ERROR; - } - - std::vector devices; - if (predicates.devices_.empty()) { - devices = NetworkIdToUUID(GetConnectDevices()); - } else { - devices = NetworkIdToUUID(predicates.devices_); - } - - ZLOGI("delegate sync"); - return delegate->Sync(devices, static_cast(option.mode), - MakeQuery(predicates), [callback] (const std::map> &syncStatus) { - SyncResult result; - HandleSyncStatus(syncStatus, result); - callback(result); - }, false); + return RDB_OK; } int32_t RdbSyncer::RemoteQuery(const std::string& device, const std::string& sql, @@ -417,7 +391,7 @@ int32_t RdbSyncer::RemoveDeviceData() } DistributedDB::DBStatus status = delegate->RemoveDeviceData(); if (status != DistributedDB::DBStatus::OK) { - ZLOGE("DistributedDB RemoveDeviceData failed, status is %{public}d.", status); + ZLOGE("DistributedDB RemoveDeviceData failed, status is %{public}d.", status); return RDB_ERROR; } return RDB_OK; diff --git a/services/distributeddataservice/service/rdb/rdb_syncer.h b/services/distributeddataservice/service/rdb/rdb_syncer.h index 30dc84021..9b63b0f4d 100644 --- a/services/distributeddataservice/service/rdb/rdb_syncer.h +++ b/services/distributeddataservice/service/rdb/rdb_syncer.h @@ -19,18 +19,20 @@ #include #include +#include "iremote_object.h" +#include "metadata/secret_key_meta_data.h" #include "metadata/store_meta_data.h" +#include "rdb_service.h" #include "rdb_store_observer_impl.h" #include "rdb_types.h" #include "relational_store_delegate.h" #include "relational_store_manager.h" -#include "metadata/secret_key_meta_data.h" -#include "iremote_object.h" namespace OHOS::DistributedRdb { class RdbSyncer { public: using StoreMetaData = OHOS::DistributedData::StoreMetaData; using SecretKeyMetaData = DistributedData::SecretKeyMetaData; + using Option = DistributedRdb::RdbService::Option; RdbSyncer(const RdbSyncerParam& param, RdbStoreObserverImpl* observer); ~RdbSyncer() noexcept; @@ -46,11 +48,9 @@ public: std::string GetIdentifier() const; - int32_t SetDistributedTables(const std::vector& tables); - - int32_t DoSync(const SyncOption& option, const RdbPredicates& predicates, SyncResult& result); + int32_t SetDistributedTables(const std::vector &tables, int32_t type); - int32_t DoAsync(const SyncOption& option, const RdbPredicates& predicates, const SyncCallback& callback); + int32_t DoSync(const Option &option, const RdbPredicates &predicates, const AsyncDetail &async); int32_t RemoteQuery(const std::string& device, const std::string& sql, const std::vector& selectionArgs, sptr& resultSet); @@ -59,7 +59,7 @@ public: static std::string RemoveSuffix(const std::string& name); - static int32_t GetInstIndex(uint32_t tokenId, const std::string &bundleName); + static std::pair GetInstIndexAndUser(uint32_t tokenId, const std::string &bundleName); static bool GetPassword(const StoreMetaData &metaData, DistributedDB::CipherPassword &password); @@ -89,8 +89,7 @@ private: static std::vector GetConnectDevices(); static std::vector NetworkIdToUUID(const std::vector& networkIds); - static void HandleSyncStatus(const std::map>& SyncStatus, - SyncResult& result); + static Details HandleSyncStatus(const std::map> &SyncStatus); static void EqualTo(const RdbPredicateOperation& operation, DistributedDB::Query& query); static void NotEqualTo(const RdbPredicateOperation& operation, DistributedDB::Query& query); static void And(const RdbPredicateOperation& operation, DistributedDB::Query& query); diff --git a/services/distributeddataservice/service/rdb/rdb_watcher.cpp b/services/distributeddataservice/service/rdb/rdb_watcher.cpp index 01572668f..6e712bc7c 100644 --- a/services/distributeddataservice/service/rdb/rdb_watcher.cpp +++ b/services/distributeddataservice/service/rdb/rdb_watcher.cpp @@ -16,28 +16,43 @@ #define LOG_TAG "RdbWatcher" #include "rdb_watcher.h" -#include "log_print.h" + #include "error/general_error.h" +#include "log_print.h" namespace OHOS::DistributedRdb { using namespace DistributedData; -using Err = DistributedData::GeneralError; -RdbWatcher::RdbWatcher(RdbServiceImpl *rdbService, uint32_t tokenId, const std::string &storeName) - : rdbService_(rdbService), tokenId_(tokenId), storeName_(storeName) +using Error = DistributedData::GeneralError; +RdbWatcher::RdbWatcher() { } -int32_t RdbWatcher::OnChange(Origin origin, const std::string &id) +int32_t RdbWatcher::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values) { - if (rdbService_ == nullptr) { - return Err::E_ERROR; + auto notifier = GetNotifier(); + if (notifier == nullptr) { + return E_NOT_INIT; } - rdbService_->OnChange(tokenId_, storeName_); - return Err::E_OK; + DistributedRdb::Origin rdbOrigin; + rdbOrigin.origin = origin.origin; + rdbOrigin.id = origin.id; + rdbOrigin.store = origin.store; + // notifier OnChange() + return E_OK; } -int32_t RdbWatcher::OnChange(Origin origin, const std::string &id, const std::vector &values) +sptr RdbWatcher::GetNotifier() const { - return Err::E_NOT_SUPPORT; + std::shared_lock lock(mutex_); + return notifier_; +} + +void RdbWatcher::SetNotifier(sptr notifier) +{ + std::unique_lock lock(mutex_); + if (notifier_ == notifier) { + return; + } + notifier_ = notifier; } } // namespace OHOS::DistributedRdb diff --git a/services/distributeddataservice/service/rdb/rdb_watcher.h b/services/distributeddataservice/service/rdb/rdb_watcher.h index 65af02163..ac898854c 100644 --- a/services/distributeddataservice/service/rdb/rdb_watcher.h +++ b/services/distributeddataservice/service/rdb/rdb_watcher.h @@ -15,23 +15,23 @@ #ifndef OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_WATCHER_H #define OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_WATCHER_H -#include "rdb_service_impl.h" +#include +#include +#include "rdb_notifier_proxy.h" #include "store/general_value.h" #include "store/general_watcher.h" namespace OHOS::DistributedRdb { -class RdbServiceImpl; class RdbWatcher : public DistributedData::GeneralWatcher { public: - explicit RdbWatcher(RdbServiceImpl *rdbService, uint32_t tokenId, const std::string &storeName); - int32_t OnChange(Origin origin, const std::string &id) override; - int32_t OnChange(Origin origin, const std::string &id, - const std::vector &values) override; + RdbWatcher(); + int32_t OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values) override; + sptr GetNotifier() const; + void SetNotifier(sptr notifier); private: - RdbServiceImpl* rdbService_ {}; - uint32_t tokenId_ = 0; - std::string storeName_ {}; + mutable std::shared_mutex mutex_; + sptr notifier_; }; } // namespace OHOS::DistributedRdb #endif // OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_WATCHER_H diff --git a/services/distributeddataservice/service/test/cloud_data_test.cpp b/services/distributeddataservice/service/test/cloud_data_test.cpp index d77bfb04b..fcde19b46 100644 --- a/services/distributeddataservice/service/test/cloud_data_test.cpp +++ b/services/distributeddataservice/service/test/cloud_data_test.cpp @@ -170,7 +170,7 @@ void CloudDataTest::TearDown() {} /** * @tc.name: GetSchema -* @tc.desc: GetSchema from cloud. +* @tc.desc: GetSchema from cloud when no schema in meta. * @tc.type: FUNC * @tc.require: * @tc.author: ht @@ -186,7 +186,7 @@ HWTEST_F(CloudDataTest, GetSchema, TestSize.Level0) ASSERT_FALSE( MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetSchemaKey(TEST_CLOUD_BUNDLE), schemaMeta, true)); CloudEvent::StoreInfo storeInfo { OHOS::IPCSkeleton::GetCallingTokenID(), TEST_CLOUD_BUNDLE, TEST_CLOUD_STORE, 0 }; - auto event = std::make_unique(CloudEvent::GET_SCHEMA, std::move(storeInfo), "test_service"); + auto event = std::make_unique(CloudEvent::GET_SCHEMA, std::move(storeInfo)); EventCenter::GetInstance().PostEvent(move(event)); ASSERT_TRUE( MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetSchemaKey(TEST_CLOUD_BUNDLE), schemaMeta, true)); -- Gitee From a1c323e2db324f5d6025b25056cdd259489d8775 Mon Sep 17 00:00:00 2001 From: htt1997 Date: Tue, 6 Jun 2023 16:20:20 +0800 Subject: [PATCH 2/5] refactor:observer Signed-off-by: htt1997 --- .../service/permission/src/permit_delegate.cpp | 2 +- .../service/rdb/rdb_notifier_proxy.cpp | 4 ++-- .../service/rdb/rdb_notifier_proxy.h | 3 ++- .../service/rdb/rdb_service_impl.cpp | 10 ++++++++-- .../distributeddataservice/service/rdb/rdb_syncer.cpp | 1 + 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/services/distributeddataservice/service/permission/src/permit_delegate.cpp b/services/distributeddataservice/service/permission/src/permit_delegate.cpp index 7a4bd29e6..52b9af7c4 100644 --- a/services/distributeddataservice/service/permission/src/permit_delegate.cpp +++ b/services/distributeddataservice/service/permission/src/permit_delegate.cpp @@ -79,7 +79,7 @@ bool PermitDelegate::SyncActivate(const ActiveParam ¶m) bool PermitDelegate::VerifyPermission(const CheckParam ¶m, uint8_t flag) { ZLOGI("user:%{public}s, appId:%{public}s, storeId:%{public}s, remote devId:%{public}s, instanceId:%{public}d," - "flag:%{public}u", param.userId.c_str(), param.appId.c_str(), param.storeId.c_str(), + "flag:%{public}u", param.userId.c_str(), param.appId.c_str(), Anonymous::Change(param.storeId).c_str(), Anonymous::Change(param.deviceId).c_str(), param.instanceId, flag); auto devId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid; diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp index 97ba6bea9..bd9ce05b1 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp @@ -47,14 +47,14 @@ int32_t RdbNotifierProxy::OnComplete(uint32_t seqNum, Details &&result) return RDB_OK; } -int RdbNotifierProxy::OnChange(const std::string &storeName, const std::vector &devices) +int32_t RdbNotifierProxy::OnChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) { MessageParcel data; if (!data.WriteInterfaceToken(GetDescriptor())) { ZLOGE("write descriptor failed"); return RDB_ERROR; } - if (!ITypesUtil::Marshal(data, storeName, devices)) { + if (!ITypesUtil::Marshal(data, origin, primaries, changeInfo)) { ZLOGE("write store name or devices failed"); return RDB_ERROR; } diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h index d0d9c1cb8..991fcae4f 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h @@ -31,7 +31,8 @@ public: int32_t OnComplete(uint32_t seqNum, Details &&result) override; - int32_t OnChange(const std::string& storeName, const std::vector& devices) override; + int32_t OnChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) override; + private: static inline BrokerDelegator delegator_; }; diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index bf35f9ad9..e2d6de204 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -233,7 +233,11 @@ void RdbServiceImpl::OnDataChange(pid_t pid, uint32_t tokenId, const Distributed if (success && agent.notifier_ != nullptr && pid == agent.pid_) { std::string device = data.GetDataChangeDevice(); auto networkId = DmAdapter::GetInstance().ToNetworkID(device); - agent.notifier_->OnChange(property.storeId, { networkId }); + Origin origin; + origin.origin = Origin::ORIGIN_NEARBY; + origin.store = property.storeId; + origin.id.push_back(networkId); + agent.notifier_->OnChange(origin, {}, {}); } } @@ -445,7 +449,9 @@ int32_t RdbServiceImpl::Sync(const RdbSyncerParam ¶m, const Option &option, { if (!option.isAsync) { auto [status, details] = DoSync(param, option, predicates); - async(std::move(details)); + if (async != nullptr) { + async(std::move(details)); + } return status; } return DoAsync(param, option, predicates); diff --git a/services/distributeddataservice/service/rdb/rdb_syncer.cpp b/services/distributeddataservice/service/rdb/rdb_syncer.cpp index 2ccbe7b78..2453d660e 100644 --- a/services/distributeddataservice/service/rdb/rdb_syncer.cpp +++ b/services/distributeddataservice/service/rdb/rdb_syncer.cpp @@ -353,6 +353,7 @@ int32_t RdbSyncer::DoSync(const Option &option, const RdbPredicates &predicates, } else if (option.mode < DistributedData::GeneralStore::CLOUD_END) { return RDB_OK; } + ZLOGI("delegate sync"); return RDB_OK; } -- Gitee From cc6d1ba1fc69077a47cbd4348e768b200d9f4e9e Mon Sep 17 00:00:00 2001 From: htt1997 Date: Tue, 6 Jun 2023 16:20:20 +0800 Subject: [PATCH 3/5] refactor:observer Signed-off-by: htt1997 --- .../service/permission/src/permit_delegate.cpp | 2 +- .../service/rdb/rdb_notifier_proxy.cpp | 4 ++-- .../service/rdb/rdb_notifier_proxy.h | 3 ++- .../service/rdb/rdb_service_impl.cpp | 10 ++++++++-- .../distributeddataservice/service/rdb/rdb_syncer.cpp | 1 + 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/services/distributeddataservice/service/permission/src/permit_delegate.cpp b/services/distributeddataservice/service/permission/src/permit_delegate.cpp index 7a4bd29e6..52b9af7c4 100644 --- a/services/distributeddataservice/service/permission/src/permit_delegate.cpp +++ b/services/distributeddataservice/service/permission/src/permit_delegate.cpp @@ -79,7 +79,7 @@ bool PermitDelegate::SyncActivate(const ActiveParam ¶m) bool PermitDelegate::VerifyPermission(const CheckParam ¶m, uint8_t flag) { ZLOGI("user:%{public}s, appId:%{public}s, storeId:%{public}s, remote devId:%{public}s, instanceId:%{public}d," - "flag:%{public}u", param.userId.c_str(), param.appId.c_str(), param.storeId.c_str(), + "flag:%{public}u", param.userId.c_str(), param.appId.c_str(), Anonymous::Change(param.storeId).c_str(), Anonymous::Change(param.deviceId).c_str(), param.instanceId, flag); auto devId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid; diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp index 97ba6bea9..bd9ce05b1 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.cpp @@ -47,14 +47,14 @@ int32_t RdbNotifierProxy::OnComplete(uint32_t seqNum, Details &&result) return RDB_OK; } -int RdbNotifierProxy::OnChange(const std::string &storeName, const std::vector &devices) +int32_t RdbNotifierProxy::OnChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) { MessageParcel data; if (!data.WriteInterfaceToken(GetDescriptor())) { ZLOGE("write descriptor failed"); return RDB_ERROR; } - if (!ITypesUtil::Marshal(data, storeName, devices)) { + if (!ITypesUtil::Marshal(data, origin, primaries, changeInfo)) { ZLOGE("write store name or devices failed"); return RDB_ERROR; } diff --git a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h index d0d9c1cb8..991fcae4f 100644 --- a/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h +++ b/services/distributeddataservice/service/rdb/rdb_notifier_proxy.h @@ -31,7 +31,8 @@ public: int32_t OnComplete(uint32_t seqNum, Details &&result) override; - int32_t OnChange(const std::string& storeName, const std::vector& devices) override; + int32_t OnChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) override; + private: static inline BrokerDelegator delegator_; }; diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index bf35f9ad9..e2d6de204 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -233,7 +233,11 @@ void RdbServiceImpl::OnDataChange(pid_t pid, uint32_t tokenId, const Distributed if (success && agent.notifier_ != nullptr && pid == agent.pid_) { std::string device = data.GetDataChangeDevice(); auto networkId = DmAdapter::GetInstance().ToNetworkID(device); - agent.notifier_->OnChange(property.storeId, { networkId }); + Origin origin; + origin.origin = Origin::ORIGIN_NEARBY; + origin.store = property.storeId; + origin.id.push_back(networkId); + agent.notifier_->OnChange(origin, {}, {}); } } @@ -445,7 +449,9 @@ int32_t RdbServiceImpl::Sync(const RdbSyncerParam ¶m, const Option &option, { if (!option.isAsync) { auto [status, details] = DoSync(param, option, predicates); - async(std::move(details)); + if (async != nullptr) { + async(std::move(details)); + } return status; } return DoAsync(param, option, predicates); diff --git a/services/distributeddataservice/service/rdb/rdb_syncer.cpp b/services/distributeddataservice/service/rdb/rdb_syncer.cpp index 2ccbe7b78..2453d660e 100644 --- a/services/distributeddataservice/service/rdb/rdb_syncer.cpp +++ b/services/distributeddataservice/service/rdb/rdb_syncer.cpp @@ -353,6 +353,7 @@ int32_t RdbSyncer::DoSync(const Option &option, const RdbPredicates &predicates, } else if (option.mode < DistributedData::GeneralStore::CLOUD_END) { return RDB_OK; } + ZLOGI("delegate sync"); return RDB_OK; } -- Gitee From c793c975c0ede9ae1372493dd3bb0d5008fd9348 Mon Sep 17 00:00:00 2001 From: htt1997 Date: Wed, 7 Jun 2023 12:19:49 +0800 Subject: [PATCH 4/5] fix:anonymous Signed-off-by: htt1997 --- .../service/rdb/rdb_service_impl.cpp | 8 ++++-- .../service/rdb/rdb_service_stub.cpp | 28 +++++++++---------- .../service/rdb/rdb_watcher.cpp | 2 +- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index e2d6de204..cc7108e42 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -247,7 +247,7 @@ void RdbServiceImpl::SyncerTimeout(std::shared_ptr syncer) return; } auto storeId = syncer->GetStoreId(); - ZLOGI("%{public}s", storeId.c_str()); + ZLOGI("%{public}s", Anonymous::Change(storeId).c_str()); syncers_.ComputeIfPresent(syncer->GetPid(), [this, storeId](const auto& key, StoreSyncersType& syncers) { syncers.erase(storeId); syncerNum_--; @@ -335,7 +335,11 @@ std::pair RdbServiceImpl::DoSync(const RdbSyncerParam ¶m, return {RDB_ERROR, {}}; } Details details = {}; - auto status = syncer->DoSync(option, pred, [&details](auto &&result) mutable { details = std::move(result); }); + auto status = syncer->DoSync(option, pred, [&details, ¶m](auto &&result) mutable { + ZLOGD("Sync complete, bundleName:%{public}s, storeName:%{public}s", param.bundleName_.c_str(), + Anonymous::Change(param.storeName_).c_str()); + details = std::move(result); + }); return { status, std::move(details) }; } diff --git a/services/distributeddataservice/service/rdb/rdb_service_stub.cpp b/services/distributeddataservice/service/rdb/rdb_service_stub.cpp index 87ec6b42d..7c703fbfa 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_stub.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_stub.cpp @@ -22,13 +22,13 @@ #include "utils/anonymous.h" namespace OHOS::DistributedRdb { +using Anonymous = DistributedData::Anonymous; int32_t RdbServiceStub::OnRemoteObtainDistributedTableName(MessageParcel &data, MessageParcel &reply) { std::string device; std::string table; if (!ITypesUtil::Unmarshal(data, device, table)) { - ZLOGE("Unmarshal device:%{public}s table:%{public}s", DistributedData::Anonymous::Change(device).c_str(), - table.c_str()); + ZLOGE("Unmarshal device:%{public}s table:%{public}s", Anonymous::Change(device).c_str(), table.c_str()); return IPC_STUB_INVALID_DATA_ERR; } @@ -45,7 +45,7 @@ int32_t RdbServiceStub::OnGetSchema(MessageParcel &data, MessageParcel &reply) RdbSyncerParam param; if (!ITypesUtil::Unmarshal(data, param)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s", param.bundleName_.c_str(), - param.storeName_.c_str()); + Anonymous::Change(param.storeName_).c_str()); return IPC_STUB_INVALID_DATA_ERR; } auto status = GetSchema(param); @@ -62,7 +62,7 @@ int32_t RdbServiceStub::OnRemoteInitNotifier(MessageParcel &data, MessageParcel sptr notifier; if (!ITypesUtil::Unmarshal(data, param, notifier) || notifier == nullptr) { ZLOGE("Unmarshal bundleName:%{public}s storeName_:%{public}s notifier is nullptr:%{public}d", - param.bundleName_.c_str(), param.storeName_.c_str(), notifier == nullptr); + param.bundleName_.c_str(), Anonymous::Change(param.storeName_).c_str(), notifier == nullptr); return IPC_STUB_INVALID_DATA_ERR; } auto status = InitNotifier(param, notifier); @@ -79,7 +79,7 @@ int32_t RdbServiceStub::OnRemoteSetDistributedTables(MessageParcel &data, Messag std::vector tables; if (!ITypesUtil::Unmarshal(data, param, tables)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s tables size:%{public}zu", - param.bundleName_.c_str(), param.storeName_.c_str(), tables.size()); + param.bundleName_.c_str(), Anonymous::Change(param.storeName_).c_str(), tables.size()); return IPC_STUB_INVALID_DATA_ERR; } @@ -98,7 +98,7 @@ int32_t RdbServiceStub::OnRemoteDoSync(MessageParcel &data, MessageParcel &reply RdbPredicates predicates; if (!ITypesUtil::Unmarshal(data, param, option, predicates)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s tables:%{public}zu", param.bundleName_.c_str(), - param.storeName_.c_str(), predicates.tables_.size()); + Anonymous::Change(param.storeName_).c_str(), predicates.tables_.size()); return IPC_STUB_INVALID_DATA_ERR; } @@ -117,9 +117,9 @@ int32_t RdbServiceStub::OnRemoteDoAsync(MessageParcel &data, MessageParcel &repl Option option {}; RdbPredicates predicates; if (!ITypesUtil::Unmarshal(data, param, option, predicates)) { - ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s seqNum:%{public}u tables:%{public}s", - param.bundleName_.c_str(), param.storeName_.c_str(), option.seqNum, - (*(predicates.tables_.begin())).c_str()); + ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s seqNum:%{public}u table:%{public}s", + param.bundleName_.c_str(), Anonymous::Change(param.storeName_).c_str(), option.seqNum, + predicates.tables_.empty() ? "null" : predicates.tables_.begin()->c_str()); return IPC_STUB_INVALID_DATA_ERR; } auto status = Sync(param, option, predicates, nullptr); @@ -136,7 +136,7 @@ int32_t RdbServiceStub::OnRemoteDoSubscribe(MessageParcel &data, MessageParcel & SubscribeOption option; if (!ITypesUtil::Unmarshal(data, param, option)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s", param.bundleName_.c_str(), - param.storeName_.c_str()); + Anonymous::Change(param.storeName_).c_str()); return IPC_STUB_INVALID_DATA_ERR; } @@ -154,7 +154,7 @@ int32_t RdbServiceStub::OnRemoteDoUnSubscribe(MessageParcel &data, MessageParcel SubscribeOption option; if (!ITypesUtil::Unmarshal(data, param)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s", param.bundleName_.c_str(), - param.storeName_.c_str()); + Anonymous::Change(param.storeName_).c_str()); return IPC_STUB_INVALID_DATA_ERR; } @@ -174,9 +174,9 @@ int32_t RdbServiceStub::OnRemoteDoRemoteQuery(MessageParcel& data, MessageParcel std::vector selectionArgs; if (!ITypesUtil::Unmarshal(data, param, device, sql, selectionArgs)) { ZLOGE("Unmarshal bundleName_:%{public}s storeName_:%{public}s device:%{public}s sql:%{public}s " - "selectionArgs size:%{public}zu", param.bundleName_.c_str(), param.storeName_.c_str(), - DistributedData::Anonymous::Change(device).c_str(), - DistributedData::Anonymous::Change(sql).c_str(), selectionArgs.size()); + "selectionArgs size:%{public}zu", param.bundleName_.c_str(), + Anonymous::Change(param.storeName_).c_str(), Anonymous::Change(device).c_str(), + Anonymous::Change(sql).c_str(), selectionArgs.size()); return IPC_STUB_INVALID_DATA_ERR; } diff --git a/services/distributeddataservice/service/rdb/rdb_watcher.cpp b/services/distributeddataservice/service/rdb/rdb_watcher.cpp index 6e712bc7c..c929a7692 100644 --- a/services/distributeddataservice/service/rdb/rdb_watcher.cpp +++ b/services/distributeddataservice/service/rdb/rdb_watcher.cpp @@ -37,7 +37,7 @@ int32_t RdbWatcher::OnChange(const Origin &origin, const PRIFields &primaryField rdbOrigin.origin = origin.origin; rdbOrigin.id = origin.id; rdbOrigin.store = origin.store; - // notifier OnChange() + notifier->OnChange(rdbOrigin, primaryFields, std::move(values)); return E_OK; } -- Gitee From 1facbc99a60651b57b8f1ef1cedcece9bf8c2212 Mon Sep 17 00:00:00 2001 From: htt1997 Date: Wed, 7 Jun 2023 16:33:07 +0800 Subject: [PATCH 5/5] fix:wait param Signed-off-by: htt1997 --- services/distributeddataservice/service/rdb/rdb_syncer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/distributeddataservice/service/rdb/rdb_syncer.cpp b/services/distributeddataservice/service/rdb/rdb_syncer.cpp index 2453d660e..2faede552 100644 --- a/services/distributeddataservice/service/rdb/rdb_syncer.cpp +++ b/services/distributeddataservice/service/rdb/rdb_syncer.cpp @@ -349,7 +349,7 @@ int32_t RdbSyncer::DoSync(const Option &option, const RdbPredicates &predicates, [async](const std::map> &syncStatus) { async(HandleSyncStatus(syncStatus)); }, - option.isAsync); + !option.isAsync); } else if (option.mode < DistributedData::GeneralStore::CLOUD_END) { return RDB_OK; } -- Gitee