diff --git a/services/distributeddataservice/service/data_share/BUILD.gn b/services/distributeddataservice/service/data_share/BUILD.gn index 2d8ffad46a2457495d8ec68dd369e1940ea9d150..9cf48c40b685c19537f26a7ea1b9011a87e7e71e 100644 --- a/services/distributeddataservice/service/data_share/BUILD.gn +++ b/services/distributeddataservice/service/data_share/BUILD.gn @@ -68,6 +68,7 @@ ohos_shared_library("data_share_service") { "strategies/insert_strategy.cpp", "strategies/publish_strategy.cpp", "strategies/query_strategy.cpp", + "strategies/rdb_notify_strategy.cpp", "strategies/subscribe_strategy.cpp", "strategies/template_strategy.cpp", "strategies/update_strategy.cpp", diff --git a/services/distributeddataservice/service/data_share/common/kv_delegate.cpp b/services/distributeddataservice/service/data_share/common/kv_delegate.cpp index d073668504abb73c20587c23fa2e46e8290c3b2c..9e36164adaed8b19c14a2ccf6743d041f5f5cc3a 100644 --- a/services/distributeddataservice/service/data_share/common/kv_delegate.cpp +++ b/services/distributeddataservice/service/data_share/common/kv_delegate.cpp @@ -138,9 +138,9 @@ int32_t KvDelegate::Get(const std::string &collectionName, const Id &id, std::st std::string filter = DistributedData::Serializable::Marshall(id); if (Get(collectionName, filter, "{}", value) != E_OK) { ZLOGE("Get failed, %{public}s %{public}s", collectionName.c_str(), filter.c_str()); - return false; + return E_ERROR; } - return true; + return E_OK; } bool KvDelegate::GetVersion(const std::string &collectionName, const std::string &filter, int &version) diff --git a/services/distributeddataservice/service/data_share/data/published_data.cpp b/services/distributeddataservice/service/data_share/data/published_data.cpp index a3cb46ed155125dc8511c6f697fb33d147de5779..416b27c9e08e7d6ce24b24376540bb017c095c04 100644 --- a/services/distributeddataservice/service/data_share/data/published_data.cpp +++ b/services/distributeddataservice/service/data_share/data/published_data.cpp @@ -16,6 +16,7 @@ #include "published_data.h" #include "log_print.h" +#include "subscriber_managers/published_data_subscriber_manager.h" namespace OHOS::DataShare { bool PublishedData::HasVersion() const @@ -33,8 +34,7 @@ std::string PublishedData::GetValue() const return DistributedData::Serializable::Marshall(value); } -PublishedData::PublishedData(const PublishedDataNode &node, const int version) - : PublishedData(node) +PublishedData::PublishedData(const PublishedDataNode &node, const int version) : PublishedData(node) { value.SetVersion(version); } @@ -84,10 +84,13 @@ bool PublishedDataNode::Unmarshal(const DistributedData::Serializable::json &nod bool ret = GetValue(node, GET_NAME(key), key); ret = ret && GetValue(node, GET_NAME(bundleName), bundleName); ret = ret && GetValue(node, GET_NAME(subscriberId), subscriberId); - ret = ret && GetValue(node, GET_NAME(value), value); + if (ret) { + GetValue(node, GET_NAME(value), value); + VersionData::Unmarshal(node); + } ret = ret && GetValue(node, GET_NAME(timestamp), timestamp); ret = ret && GetValue(node, GET_NAME(userId), userId); - return ret && VersionData::Unmarshal(node); + return ret; } PublishedDataNode::PublishedDataNode(const std::string &key, const std::string &bundleName, int64_t subscriberId, @@ -159,7 +162,10 @@ void PublishedData::ClearAging() return; } std::vector queryResults; - int32_t status = delegate->GetBatch(KvDBDelegate::DATA_TABLE, "{}", "{}", queryResults); + int32_t status = delegate->GetBatch(KvDBDelegate::DATA_TABLE, "{}", + "{\"id_\": true, \"timestamp\": true, \"key\": true, \"bundleName\": true, \"subscriberId\": true, " + "\"userId\": true}", + queryResults); if (status != E_OK) { ZLOGE("db GetBatch failed %{public}d", status); return; @@ -171,8 +177,8 @@ void PublishedData::ClearAging() ZLOGE("Unmarshall %{public}s failed", result.c_str()); continue; } - - if (data.timestamp < lastValidTime) { + if (data.timestamp < lastValidTime && PublishedDataSubscriberManager::GetInstance() + .GetCount(PublishedDataKey(data.key, data.bundleName, data.subscriberId)) == 0) { status = delegate->Delete(KvDBDelegate::DATA_TABLE, Id(PublishedData::GenId(data.key, data.bundleName, data.subscriberId), data.userId)); if (status != E_OK) { @@ -186,4 +192,36 @@ void PublishedData::ClearAging() } return; } + +void PublishedData::UpdateTimestamp( + const std::string &key, const std::string &bundleName, int64_t subscriberId, const int32_t userId) +{ + auto delegate = KvDBDelegate::GetInstance(); + if (delegate == nullptr) { + ZLOGE("db open failed"); + return; + } + std::string queryResult; + int32_t status = + delegate->Get(KvDBDelegate::DATA_TABLE, Id(GenId(key, bundleName, subscriberId), userId), queryResult); + if (status != E_OK) { + ZLOGE("db Get failed, %{private}s %{public}d", queryResult.c_str(), status); + return; + } + PublishedDataNode data; + if (!PublishedDataNode::Unmarshall(queryResult, data)) { + ZLOGE("Unmarshall failed, %{private}s", queryResult.c_str()); + return; + } + auto now = time(nullptr); + if (now <= 0) { + ZLOGE("time failed"); + return; + } + data.timestamp = now; + status = delegate->Upsert(KvDBDelegate::DATA_TABLE, PublishedData(data)); + if (status == E_OK) { + ZLOGI("update timestamp %{private}s", data.key.c_str()); + } +} } // namespace OHOS::DataShare \ No newline at end of file diff --git a/services/distributeddataservice/service/data_share/data/published_data.h b/services/distributeddataservice/service/data_share/data/published_data.h index aba9ee9f448343eb40225dadb74a2a029bd1904b..cff038a3bf972267ceb9219043839cf1aacd0e2b 100644 --- a/services/distributeddataservice/service/data_share/data/published_data.h +++ b/services/distributeddataservice/service/data_share/data/published_data.h @@ -42,6 +42,8 @@ public: explicit PublishedData(const PublishedDataNode &node); static std::vector Query(const std::string &bundleName, int32_t userId); static void Delete(const std::string &bundleName, const int32_t userId); + static void UpdateTimestamp( + const std::string &key, const std::string &bundleName, int64_t subscriberId, const int32_t userId); static void ClearAging(); static int32_t Query(const std::string &filter, PublishedDataNode::Data &publishedData); static std::string GenId(const std::string &key, const std::string &bundleName, int64_t subscriberId); @@ -51,6 +53,7 @@ public: int GetVersion() const override; std::string GetValue() const override; friend class GetDataStrategy; + private: PublishedDataNode value; }; diff --git a/services/distributeddataservice/service/data_share/data_share_service_impl.cpp b/services/distributeddataservice/service/data_share/data_share_service_impl.cpp index d91ae2dd0c15323e769d3333b8d4c6982105fef6..014923768bd00703577e3daf7a452912951f5193 100644 --- a/services/distributeddataservice/service/data_share/data_share_service_impl.cpp +++ b/services/distributeddataservice/service/data_share/data_share_service_impl.cpp @@ -210,7 +210,7 @@ std::vector DataShareServiceImpl::SubscribeRdbData( std::vector results; for (const auto &uri : uris) { auto context = std::make_shared(uri); - results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &observer, &context, this]() -> bool { + results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &observer, &context, this]() { return RdbSubscriberManager::GetInstance().Add( Key(context->uri, id.subscriberId_, id.bundleName_), observer, context, binderInfo_.executors); })); @@ -224,7 +224,7 @@ std::vector DataShareServiceImpl::UnsubscribeRdbData( std::vector results; for (const auto &uri : uris) { auto context = std::make_shared(uri); - results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &context]() -> bool { + results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &context]() { return RdbSubscriberManager::GetInstance().Delete( Key(context->uri, id.subscriberId_, id.bundleName_), context->callerTokenId); })); @@ -238,7 +238,7 @@ std::vector DataShareServiceImpl::EnableRdbSubs( std::vector results; for (const auto &uri : uris) { auto context = std::make_shared(uri); - results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &context]() -> bool { + results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &context]() { return RdbSubscriberManager::GetInstance().Enable( Key(context->uri, id.subscriberId_, id.bundleName_), context); })); @@ -252,7 +252,7 @@ std::vector DataShareServiceImpl::DisableRdbSubs( std::vector results; for (const auto &uri : uris) { auto context = std::make_shared(uri); - results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &context]() -> bool { + results.emplace_back(uri, subscribeStrategy_.Execute(context, [&id, &context]() { return RdbSubscriberManager::GetInstance().Disable( Key(context->uri, id.subscriberId_, id.bundleName_), context->callerTokenId); })); @@ -277,7 +277,7 @@ std::vector DataShareServiceImpl::SubscribePublishedData(const PublishedDataKey key(uri, callerBundleName, subscriberId); context->callerBundleName = callerBundleName; context->calledBundleName = key.bundleName; - result = subscribeStrategy_.Execute(context, [&subscriberId, &observer, &context]() -> bool { + result = subscribeStrategy_.Execute(context, [&subscriberId, &observer, &context]() { return PublishedDataSubscriberManager::GetInstance().Add( PublishedDataKey(context->uri, context->callerBundleName, subscriberId), observer, context->callerTokenId); @@ -285,6 +285,12 @@ std::vector DataShareServiceImpl::SubscribePublishedData(const results.emplace_back(uri, result); if (result == E_OK) { publishedKeys.emplace_back(context->uri, context->callerBundleName, subscriberId); + if (binderInfo_.executors != nullptr) { + binderInfo_.executors->Execute([context, subscriberId]() { + PublishedData::UpdateTimestamp( + context->uri, context->calledBundleName, subscriberId, context->currentUserId); + }); + } userId = context->currentUserId; } } @@ -308,9 +314,16 @@ std::vector DataShareServiceImpl::UnsubscribePublishedData(cons PublishedDataKey key(uri, callerBundleName, subscriberId); context->callerBundleName = callerBundleName; context->calledBundleName = key.bundleName; - results.emplace_back(uri, subscribeStrategy_.Execute(context, [&subscriberId, &context]() -> bool { - return PublishedDataSubscriberManager::GetInstance().Delete( + results.emplace_back(uri, subscribeStrategy_.Execute(context, [&subscriberId, &context, this]() { + auto result = PublishedDataSubscriberManager::GetInstance().Delete( PublishedDataKey(context->uri, context->callerBundleName, subscriberId), context->callerTokenId); + if (result == E_OK && binderInfo_.executors != nullptr) { + binderInfo_.executors->Execute([context, subscriberId]() { + PublishedData::UpdateTimestamp( + context->uri, context->calledBundleName, subscriberId, context->currentUserId); + }); + } + return result; })); } return results; @@ -333,10 +346,16 @@ std::vector DataShareServiceImpl::EnablePubSubs(const std::vect PublishedDataKey key(uri, callerBundleName, subscriberId); context->callerBundleName = callerBundleName; context->calledBundleName = key.bundleName; - result = subscribeStrategy_.Execute(context, [&subscriberId, &context]() -> bool { + result = subscribeStrategy_.Execute(context, [&subscriberId, &context]() { return PublishedDataSubscriberManager::GetInstance().Enable( PublishedDataKey(context->uri, context->callerBundleName, subscriberId), context->callerTokenId); }); + if (result == E_OK && binderInfo_.executors != nullptr) { + binderInfo_.executors->Execute([context, subscriberId]() { + PublishedData::UpdateTimestamp( + context->uri, context->calledBundleName, subscriberId, context->currentUserId); + }); + } results.emplace_back(uri, result); if (result == E_OK) { publishedKeys.emplace_back(context->uri, context->callerBundleName, subscriberId); @@ -363,9 +382,16 @@ std::vector DataShareServiceImpl::DisablePubSubs(const std::vec PublishedDataKey key(uri, callerBundleName, subscriberId); context->callerBundleName = callerBundleName; context->calledBundleName = key.bundleName; - results.emplace_back(uri, subscribeStrategy_.Execute(context, [&subscriberId, &context]() -> bool { - return PublishedDataSubscriberManager::GetInstance().Disable( + results.emplace_back(uri, subscribeStrategy_.Execute(context, [&subscriberId, &context, this]() { + auto result = PublishedDataSubscriberManager::GetInstance().Disable( PublishedDataKey(context->uri, context->callerBundleName, subscriberId), context->callerTokenId); + if (result == E_OK && binderInfo_.executors != nullptr) { + binderInfo_.executors->Execute([context, subscriberId]() { + PublishedData::UpdateTimestamp( + context->uri, context->calledBundleName, subscriberId, context->currentUserId); + }); + } + return result; })); } return results; @@ -430,4 +456,19 @@ int32_t DataShareServiceImpl::OnAppUninstall( RdbHelper::ClearCache(); return EOK; } + +void DataShareServiceImpl::NotifyObserver(const std::string &uri) +{ + ZLOGD("%{private}s try notified", uri.c_str()); + auto context = std::make_shared(uri); + if (!GetCallerBundleName(context->callerBundleName)) { + ZLOGE("get bundleName error, %{private}s", uri.c_str()); + return; + } + auto ret = rdbNotifyStrategy_.Execute(context); + if (ret) { + ZLOGI("%{private}s start notified", uri.c_str()); + RdbSubscriberManager::GetInstance().Emit(uri, context); + } +} } // namespace OHOS::DataShare diff --git a/services/distributeddataservice/service/data_share/data_share_service_impl.h b/services/distributeddataservice/service/data_share/data_share_service_impl.h index 493882da6418fa5e1bf65aaed92f4da247905568..72d102ca4adfc4cbf038f5dfdd12b05685acd7e0 100644 --- a/services/distributeddataservice/service/data_share/data_share_service_impl.h +++ b/services/distributeddataservice/service/data_share/data_share_service_impl.h @@ -27,6 +27,7 @@ #include "insert_strategy.h" #include "publish_strategy.h" #include "query_strategy.h" +#include "rdb_notify_strategy.h" #include "subscribe_strategy.h" #include "template_strategy.h" #include "update_strategy.h" @@ -67,6 +68,7 @@ public: int32_t OnBind(const BindInfo &binderInfo) override; int32_t OnUserChange(uint32_t code, const std::string &user, const std::string &account) override; int32_t OnAppUninstall(const std::string &bundleName, int32_t user, int32_t index, uint32_t tokenId) override; + void NotifyObserver(const std::string &uri) override; private: class Factory { @@ -87,6 +89,7 @@ private: QueryStrategy queryStrategy_; UpdateStrategy updateStrategy_; TemplateStrategy templateStrategy_; + RdbNotifyStrategy rdbNotifyStrategy_; BindInfo binderInfo_; }; } // namespace OHOS::DataShare diff --git a/services/distributeddataservice/service/data_share/data_share_service_stub.cpp b/services/distributeddataservice/service/data_share/data_share_service_stub.cpp index 852b874bd7e3c954bc8b97f25d9df45d60695adc..99383d0849b352dc1b020af6346195ab883ea0c7 100644 --- a/services/distributeddataservice/service/data_share/data_share_service_stub.cpp +++ b/services/distributeddataservice/service/data_share/data_share_service_stub.cpp @@ -329,5 +329,16 @@ int DataShareServiceStub::OnRemoteRequest(uint32_t code, MessageParcel &data, Me } return -1; } + +int32_t DataShareServiceStub::OnRemoteNotifyObserver(MessageParcel &data, MessageParcel &reply) +{ + std::string uri; + if (!ITypesUtil::Unmarshal(data, uri)) { + ZLOGE("read device list failed."); + return -1; + } + NotifyObserver(uri); + return 0; +} } // namespace DataShare } // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/service/data_share/data_share_service_stub.h b/services/distributeddataservice/service/data_share/data_share_service_stub.h index d7e1dc27cc332518c0edac0db751134156e373b3..17ac116ac1b3b4fdd04473f0a894251491bb7281 100644 --- a/services/distributeddataservice/service/data_share/data_share_service_stub.h +++ b/services/distributeddataservice/service/data_share/data_share_service_stub.h @@ -44,6 +44,7 @@ private: int32_t OnRemoteEnablePubSubs(MessageParcel& data, MessageParcel& reply); int32_t OnRemoteDisablePubSubs(MessageParcel& data, MessageParcel& reply); int32_t OnRemoteNotifyConnectDone(MessageParcel& data, MessageParcel& reply); + int32_t OnRemoteNotifyObserver(MessageParcel& data, MessageParcel& reply); using RequestHandle = int (DataShareServiceStub::*)(MessageParcel &, MessageParcel &); static constexpr RequestHandle HANDLERS[DATA_SHARE_SERVICE_CMD_MAX] = { &DataShareServiceStub::OnRemoteInsert, @@ -62,7 +63,8 @@ private: &DataShareServiceStub::OnRemoteUnsubscribePublishedData, &DataShareServiceStub::OnRemoteEnablePubSubs, &DataShareServiceStub::OnRemoteDisablePubSubs, - &DataShareServiceStub::OnRemoteNotifyConnectDone }; + &DataShareServiceStub::OnRemoteNotifyConnectDone, + &DataShareServiceStub::OnRemoteNotifyObserver }; }; } // namespace DataShare } // namespace OHOS diff --git a/services/distributeddataservice/service/data_share/idata_share_service.h b/services/distributeddataservice/service/data_share/idata_share_service.h index 1ae6824fe5edf68db22a02efa72a2efd654e6174..ce2ff44b7c94ad6dd69d2754d078d1fe0b69075e 100644 --- a/services/distributeddataservice/service/data_share/idata_share_service.h +++ b/services/distributeddataservice/service/data_share/idata_share_service.h @@ -46,6 +46,7 @@ public: DATA_SHARE_SERVICE_CMD_ENABLE_SUBSCRIBE_PUBLISHED, DATA_SHARE_SERVICE_CMD_DISABLE_SUBSCRIBE_PUBLISHED, DATA_SHARE_SERVICE_CMD_NOTIFY, + DATA_SHARE_SERVICE_CMD_NOTIFY_OBSERVERS, DATA_SHARE_SERVICE_CMD_MAX }; @@ -79,6 +80,7 @@ public: virtual std::vector DisablePubSubs(const std::vector &uris, const int64_t subscriberId) = 0; virtual void OnConnectDone() = 0; + virtual void NotifyObserver(const std::string &uri) = 0; }; } // namespace OHOS::DataShare #endif diff --git a/services/distributeddataservice/service/data_share/strategies/rdb_notify_strategy.cpp b/services/distributeddataservice/service/data_share/strategies/rdb_notify_strategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2c13eb862d02c939bb3b68fa583d72c6a8ba296c --- /dev/null +++ b/services/distributeddataservice/service/data_share/strategies/rdb_notify_strategy.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define LOG_TAG "RdbNotifyStrategy" + +#include "rdb_notify_strategy.h" + +#include "general/load_config_common_strategy.h" +#include "general/load_config_data_info_strategy.h" +#include "general/load_config_from_bundle_info_strategy.h" +#include "log_print.h" +#include "utils/anonymous.h" + +namespace OHOS::DataShare { +bool RdbNotifyStrategy::Execute(std::shared_ptr context) +{ + auto &preProcess = GetStrategy(); + if (preProcess.IsEmpty()) { + ZLOGE("get strategy fail, maybe memory not enough"); + return false; + } + if (!preProcess(context)) { + ZLOGE("pre process fail, uri: %{public}s", DistributedData::Anonymous::Change(context->uri).c_str()); + return false; + } + if (context->callerBundleName != context->calledBundleName) { + ZLOGE("not your data, cannot notify, callerBundleName: %{public}s, calledBundleName: %{public}s", + context->callerBundleName.c_str(), context->calledBundleName.c_str()); + return false; + } + return true; +} + +SeqStrategy &RdbNotifyStrategy::GetStrategy() +{ + std::lock_guard lock(mutex_); + if (!strategies_.IsEmpty()) { + return strategies_; + } + std::initializer_list list = { + new (std::nothrow)LoadConfigCommonStrategy(), + new (std::nothrow)LoadConfigFromBundleInfoStrategy(), + new (std::nothrow)LoadConfigDataInfoStrategy() + }; + auto ret = strategies_.Init(list); + if (!ret) { + std::for_each(list.begin(), list.end(), [](Strategy *item) { + delete item; + }); + return strategies_; + } + return strategies_; +} +} // namespace OHOS::DataShare \ No newline at end of file diff --git a/services/distributeddataservice/service/data_share/strategies/rdb_notify_strategy.h b/services/distributeddataservice/service/data_share/strategies/rdb_notify_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..d6678ccd741077c4c053e175890c64e9a5b8d92b --- /dev/null +++ b/services/distributeddataservice/service/data_share/strategies/rdb_notify_strategy.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASHARESERVICE_RDB_NOTIFY_STRAGETY_H +#define DATASHARESERVICE_RDB_NOTIFY_STRAGETY_H + +#include + +#include "datashare_values_bucket.h" +#include "seq_strategy.h" + +namespace OHOS::DataShare { +class RdbNotifyStrategy final { +public: + bool Execute(std::shared_ptr context); + +private: + SeqStrategy &GetStrategy(); + std::mutex mutex_; + SeqStrategy strategies_; +}; +} // namespace OHOS::DataShare +#endif diff --git a/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.cpp b/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.cpp index 067c8cda6aa061d419ec03ce4acf0620144a525c..4b7314c88ba32cbdac295d6b30f14089464e8869 100644 --- a/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.cpp +++ b/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.cpp @@ -24,7 +24,7 @@ #include "utils/anonymous.h" namespace OHOS::DataShare { -int32_t SubscribeStrategy::Execute(std::shared_ptr context, std::function process) +int32_t SubscribeStrategy::Execute(std::shared_ptr context, std::function process) { auto &preProcess = GetStrategy(); if (preProcess.IsEmpty()) { diff --git a/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.h b/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.h index 7395b90f5ecaa86e4233f39317758bf5f47af733..3456b66cd4a798112bb168bcbf173d61c53a96de 100644 --- a/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.h +++ b/services/distributeddataservice/service/data_share/strategies/subscribe_strategy.h @@ -23,7 +23,7 @@ namespace OHOS::DataShare { class SubscribeStrategy final { public: - int32_t Execute(std::shared_ptr context, std::function process); + int32_t Execute(std::shared_ptr context, std::function process); private: SeqStrategy &GetStrategy(); diff --git a/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.cpp b/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.cpp index 37bdd91582318eee951c0496b26581ca9382989c..fbcb92d6c5a5eb19823dfc552093d2786a3e5ad0 100644 --- a/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.cpp +++ b/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.cpp @@ -80,6 +80,8 @@ int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, const ui publishedDataCache.ComputeIfPresent(key, [&callerTokenId](const auto &key, std::vector &value) { for (auto it = value.begin(); it != value.end();) { if (it->callerTokenId == callerTokenId) { + ZLOGI("delete publish subscriber, uri %{private}s tokenId %{public}d", key.key.c_str(), + callerTokenId); it = value.erase(it); } else { it++; @@ -175,11 +177,22 @@ void PublishedDataSubscriberManager::PutInto( } } } + void PublishedDataSubscriberManager::Clear() { publishedDataCache.Clear(); } +int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key) +{ + int count = 0; + publishedDataCache.ComputeIfPresent(key, [&count](const auto &key, std::vector &value) { + count = value.size(); + return true; + }); + return count; +} + PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId) : key(key), bundleName(bundle), subscriberId(subscriberId) { diff --git a/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.h b/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.h index f4dc24e2378166362e6406eac4d9ed6a3d9f8530..4e078165347b550c1e66f65881807fa3bf1c09ce 100644 --- a/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.h +++ b/services/distributeddataservice/service/data_share/subscriber_managers/published_data_subscriber_manager.h @@ -49,7 +49,7 @@ public: void Emit(const std::vector &keys, const int32_t userId, const std::string &ownerBundleName, const sptr observer = nullptr); void Clear(); - + int GetCount(const PublishedDataKey &key); private: struct ObserverNode { ObserverNode(const sptr &observer, uint32_t callerTokenId); diff --git a/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp b/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp index 326551a06d534f6a82786cf674044e8055e9d803..a3110b9e9424a93d3ec62acacffdf421d7769dcd 100644 --- a/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp +++ b/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp @@ -232,6 +232,10 @@ void RdbSubscriberManager::Emit(const std::string &uri, std::shared_ptr if (!URIUtils::IsDataProxyURI(uri)) { return; } + if (context->calledSourceDir.empty()) { + LoadConfigDataInfoStrategy loadDataInfo; + loadDataInfo(context); + } rdbCache_.ForEach([&uri, &context, this](const Key &key, std::vector &val) { if (key.uri != uri) { return false;