diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index c9662f01a2e1ad169a01b0d56b2ce1226f19aca4..a6e80ddfdd777170aef3e09201628de8ecf79a24 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -99,7 +99,7 @@ RdbServiceImpl::Factory::~Factory() { } -RdbServiceImpl::RdbServiceImpl() +RdbServiceImpl::RdbServiceImpl() : eventContainer_(std::make_shared()) { ZLOGI("construct"); DistributedDB::RelationalStoreManager::SetAutoLaunchRequestCallback( @@ -1433,6 +1433,33 @@ int32_t RdbServiceImpl::NotifyDataChange( return RDB_OK; } +void RdbServiceImpl::GlobalEvent::AddEvent(const std::string& path, + const DistributedData::DataChangeEvent::EventInfo& newEvent) +{ + std::lock_guard lock(mutex); + auto& storedEvent = events_[path]; + for (const auto& [tableName, properties] : newEvent.tableProperties) { + auto& globalProps = storedEvent.tableProperties[tableName]; + globalProps.isTrackedDataChange |= properties.isTrackedDataChange; + globalProps.isP2pSyncDataChange |= properties.isP2pSyncDataChange; + } + storedEvent.isFull |= newEvent.isFull; +} + +std::optional RdbServiceImpl::GlobalEvent::GetEvent( + const std::string& path) +{ + std::lock_guard lock(mutex); + auto it = events_.find(path); + if (it == events_.end()) { + ZLOGE("The events to the path:%{public}s does not exist", Anonymous::Change(path).c_str()); + return std::nullopt; + } + auto eventInfo = std::move(it->second); + events_.erase(it); + return eventInfo; +} + bool RdbServiceImpl::IsPostImmediately(const int32_t callingPid, const RdbNotifyConfig &rdbNotifyConfig, StoreInfo &storeInfo, DataChangeEvent::EventInfo &eventInfo, const std::string &path) { @@ -1452,17 +1479,27 @@ bool RdbServiceImpl::IsPostImmediately(const int32_t callingPid, const RdbNotify tasks.erase(path); return !tasks.empty(); } - - if (executors_ != nullptr) { - auto task = [storeInfoInner = storeInfo, eventInfoInner = eventInfo]() { - auto evt = std::make_unique(std::move(storeInfoInner), std::move(eventInfoInner)); + if (executors_ == nullptr) { + tasks.insert_or_assign(path, taskId); + return true; + } + eventContainer_->AddEvent(path, eventInfo); + auto weakContainer = std::weak_ptr(eventContainer_); + auto task = [storeInfoInner = storeInfo, path, weakContainer]() { + auto container = weakContainer.lock(); + if (container == nullptr) { + return; + } + if (auto eventOpt = container->GetEvent(path)) { + auto evt = std::make_unique(std::move(storeInfoInner), std::move(*eventOpt)); EventCenter::GetInstance().PostEvent(std::move(evt)); - }; - if (taskId == ExecutorPool::INVALID_TASK_ID) { - taskId = executors_->Schedule(std::chrono::milliseconds(rdbNotifyConfig.delay_), task); - } else { - taskId = executors_->Reset(taskId, std::chrono::milliseconds(rdbNotifyConfig.delay_)); } + }; + if (taskId == ExecutorPool::INVALID_TASK_ID) { + taskId = executors_->Schedule(std::chrono::milliseconds(rdbNotifyConfig.delay_), task); + } else { + executors_->Remove(taskId); + taskId = executors_->Schedule(std::chrono::milliseconds(rdbNotifyConfig.delay_), task); } tasks.insert_or_assign(path, taskId); return true; diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.h b/services/distributeddataservice/service/rdb/rdb_service_impl.h index 326ece088b4ebe721b393e33e8c2c129cb57f746..523a6c1652e08c83f07e649771c7d7d843ea685c 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.h +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.h @@ -142,6 +142,14 @@ private: }; using SyncAgents = std::map; + struct GlobalEvent { + void AddEvent(const std::string& path, const DistributedData::DataChangeEvent::EventInfo& eventInfo); + std::optional GetEvent(const std::string& path); + private: + std::mutex mutex; + std::map events_; + }; + class RdbStatic : public StaticActs { public: ~RdbStatic() override {}; @@ -266,6 +274,7 @@ private: static Factory factory_; ConcurrentMap syncAgents_; std::shared_ptr executors_; + std::shared_ptr eventContainer_; ConcurrentMap> heartbeatTaskIds_; }; } // namespace OHOS::DistributedRdb diff --git a/services/distributeddataservice/service/test/rdb_service_impl_test.cpp b/services/distributeddataservice/service/test/rdb_service_impl_test.cpp index 5161b9fd9112ccaf09cc659ebd7ba6ee2fa2b794..ba711e979a8fafd5dac4bae94b23e5aab574ab33 100644 --- a/services/distributeddataservice/service/test/rdb_service_impl_test.cpp +++ b/services/distributeddataservice/service/test/rdb_service_impl_test.cpp @@ -2315,5 +2315,167 @@ HWTEST_F(RdbServiceImplTest, SaveSecretKeyMeta_CloneKeyUpdate_NoUpdate_003, Test EXPECT_EQ(MetaDataManager::GetInstance().DelMeta(meta.GetCloneSecretKey(), true), true); EXPECT_EQ(MetaDataManager::GetInstance().DelMeta(meta.GetKey(), true), true); } + +/** + * @tc.name: IsPostImmediately001 + * @tc.desc: Test that tasks does not contain path and rdbNotifyConfig.delay_ == 0. + * @tc.type: FUNC + * @tc.expect: IsPostImmediately() returns true for postImmediately + */ +HWTEST_F(RdbServiceImplTest, IsPostImmediately001, TestSize.Level0) +{ + int32_t callingPid = 123; + RdbNotifyConfig config; + config.delay_ = 0; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + std::string path = "/test/path"; + RdbServiceImpl service; + bool postImmediately = service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + + EXPECT_TRUE(postImmediately); +} + +/** + * @tc.name: IsPostImmediately002 + * @tc.desc: Test that tasks does not contain path and rdbNotifyConfig.delay_ != 0 and executors_ == nullptr. + * @tc.type: FUNC + * @tc.expect: IsPostImmediately() returns false for postImmediately + */ +HWTEST_F(RdbServiceImplTest, IsPostImmediately002, TestSize.Level0) +{ + int32_t callingPid = 123; + RdbNotifyConfig config; + config.delay_ = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + std::string path = "/test/path"; + RdbServiceImpl service; + bool postImmediately = service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + + EXPECT_FALSE(postImmediately); +} + +/** + * @tc.name: IsPostImmediately003 + * @tc.desc: Test if the task already exists, rdbNotifyConfig.delay_ is 0. + * @tc.type: FUNC + * @tc.expect: IsPostImmediately() returns true for postImmediately + */ +HWTEST_F(RdbServiceImplTest, IsPostImmediately003, TestSize.Level0) +{ + int32_t callingPid = 123; + RdbNotifyConfig config; + config.delay_ = 0; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + std::string path = "/test/path"; + RdbServiceImpl service; + + service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + + bool postImmediately = service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + EXPECT_TRUE(postImmediately); +} + +/** + * @tc.name: IsPostImmediately004 + * @tc.desc: Test if the task already exists, rdbNotifyConfig.delay_ is not 0. + * @tc.type: FUNC + * @tc.expect: IsPostImmediately() returns false for postImmediately + */ +HWTEST_F(RdbServiceImplTest, IsPostImmediately004, TestSize.Level0) +{ + int32_t callingPid = 123; + RdbNotifyConfig config; + config.delay_ = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + eventInfo.isFull = true; + eventInfo.tableProperties["table1"] = {1, 0}; + std::string path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + + bool postImmediately = service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + EXPECT_FALSE(postImmediately); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[path]; + service.executors_->Remove(taskId); +} + +/** + * @tc.name: IsPostImmediately005 + * @tc.desc: Test if the task already exists, rdbNotifyConfig.delay_ is not 0 and executors_ is not nullptr. + * @tc.type: FUNC + * @tc.expect: IsPostImmediately() returns false for postImmediately + */ +HWTEST_F(RdbServiceImplTest, IsPostImmediately005, TestSize.Level0) +{ + int32_t callingPid = 456; + RdbNotifyConfig config; + config.delay_ = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + eventInfo.isFull = true; + eventInfo.tableProperties["table1"] = {1, 0}; + std::string path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + DataChangeEvent::EventInfo eventInfo_again; + eventInfo_again.isFull = false; + eventInfo_again.tableProperties["table1"] = {0, 0}; + eventInfo_again.tableProperties["table2"] = {1, 0}; + + bool postImmediately = service.IsPostImmediately(callingPid, config, storeInfo, eventInfo_again, path); + EXPECT_FALSE(postImmediately); + auto globalEvents = service.eventContainer_->events_[path]; + EXPECT_EQ(globalEvents.tableProperties["table1"].isTrackedDataChange, 1); + EXPECT_EQ(globalEvents.tableProperties["table1"].isP2pSyncDataChange, 0); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[path]; + service.executors_->Remove(taskId); +} + +/** + * @tc.name: IsPostImmediately006 + * @tc.desc: Test container == nullptr. + * @tc.type: FUNC + * @tc.expect: IsPostImmediately() returns false for postImmediately + */ +HWTEST_F(RdbServiceImplTest, IsPostImmediately006, TestSize.Level0) +{ + int32_t callingPid = 789; + RdbNotifyConfig config; + config.delay_ = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + std::string path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + bool postImmediately = service.IsPostImmediately(callingPid, config, storeInfo, eventInfo, path); + + EXPECT_FALSE(postImmediately); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[path]; + service.executors_->Remove(taskId); +} + +/** + * @tc.name: GetEvent001 + * @tc.desc: Test path is not in events_. + * @tc.type: FUNC + * @tc.expect: GetEvent returns nullopt + */ +HWTEST_F(RdbServiceImplTest, GetEvent001, TestSize.Level0) +{ + const std::string testPath = "/test/path"; + DataChangeEvent::EventInfo testEventInfo; + RdbServiceImpl service; + auto result = service.eventContainer_->GetEvent(testPath); + EXPECT_EQ(result, std::nullopt); +} } // namespace DistributedRDBTest } // namespace OHOS::Test