From fe673fd6f57e300f540d932c1830247a1ee53d57 Mon Sep 17 00:00:00 2001 From: Sven Wang Date: Sun, 5 Jun 2022 20:27:25 +0800 Subject: [PATCH] split store and sync, add device single kv store Signed-off-by: Sven Wang --- .../kvdb/include/device_store_impl.h | 35 +++++ .../kvdb/include/kvdb_service_client.h | 5 +- .../kvdb/include/observer_bridge.h | 5 +- .../kvdb/include/single_store_impl.h | 17 ++- .../kvdb/include/store_factory.h | 4 +- .../kvdb/include/store_result_set.h | 9 +- .../innerkitsimpl/kvdb/include/system_api.h | 1 - .../kvdb/src/device_store_impl.cpp | 126 ++++++++++++++++++ .../kvdb/src/kvdb_service_client.cpp | 17 +++ .../kvdb/src/observer_bridge.cpp | 14 +- .../kvdb/src/single_store_impl.cpp | 110 +++++++-------- .../innerkitsimpl/kvdb/src/store_factory.cpp | 13 +- .../innerkitsimpl/kvdb/src/store_manager.cpp | 6 +- .../kvdb/src/store_result_set.cpp | 15 +-- .../kvdb/test/single_store_impl_test.cpp | 44 ++++++ interfaces/innerkits/distributeddata/BUILD.gn | 1 + .../distributeddata/include/data_query.h | 1 + 17 files changed, 332 insertions(+), 91 deletions(-) create mode 100644 frameworks/innerkitsimpl/kvdb/include/device_store_impl.h create mode 100644 frameworks/innerkitsimpl/kvdb/src/device_store_impl.cpp diff --git a/frameworks/innerkitsimpl/kvdb/include/device_store_impl.h b/frameworks/innerkitsimpl/kvdb/include/device_store_impl.h new file mode 100644 index 000000000..5b50a4002 --- /dev/null +++ b/frameworks/innerkitsimpl/kvdb/include/device_store_impl.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_DEVICE_STORE_IMPL_H +#define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_DEVICE_STORE_IMPL_H +#include "single_store_impl.h" +namespace OHOS::DistributedKv { +class DeviceStoreImpl : public SingleStoreImpl { +public: + using SingleStoreImpl::SingleStoreImpl; + ~DeviceStoreImpl() = default; + +protected: + std::vector ToLocalDBKey(const Key &key) const override; + std::vector ToWholeDBKey(const Key &key) const override; + Key ToKey(DBKey &&key) const override; + std::vector GetPrefix(const Key &prefix) const override; + std::vector GetPrefix(const DataQuery &query) const override; + Convert GetConvert() const override; + std::vector ConvertNetwork(const Key &in, bool withLen = false) const; +}; +} +#endif // OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_DEVICE_STORE_IMPL_H diff --git a/frameworks/innerkitsimpl/kvdb/include/kvdb_service_client.h b/frameworks/innerkitsimpl/kvdb/include/kvdb_service_client.h index 674f5eef5..2f77e6420 100644 --- a/frameworks/innerkitsimpl/kvdb/include/kvdb_service_client.h +++ b/frameworks/innerkitsimpl/kvdb/include/kvdb_service_client.h @@ -12,7 +12,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_SERVICE_CLIENT_H #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_SERVICE_CLIENT_H #include @@ -21,7 +20,7 @@ #include "iremote_broker.h" #include "iremote_proxy.h" #include "kvdb_service.h" - +#include "kvstore_sync_callback_client.h" namespace OHOS::DistributedKv { class API_EXPORT KVDBServiceClient : public IRemoteProxy { public: @@ -47,6 +46,7 @@ public: const std::string &query) override; Status Subscribe(const AppId &appId, const StoreId &storeId, sptr observer) override; Status Unsubscribe(const AppId &appId, const StoreId &storeId, sptr observer) override; + sptr GetSyncAgent(const AppId &appId); private: explicit KVDBServiceClient(const sptr &object); @@ -61,6 +61,7 @@ private: static std::shared_ptr instance_; static std::atomic_bool isWatched_; sptr remote_; + ConcurrentMap> syncAgents_; }; } // namespace OHOS::DistributedKv #endif // OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_SERVICE_CLIENT_H diff --git a/frameworks/innerkitsimpl/kvdb/include/observer_bridge.h b/frameworks/innerkitsimpl/kvdb/include/observer_bridge.h index 584eca638..cb42aea52 100644 --- a/frameworks/innerkitsimpl/kvdb/include/observer_bridge.h +++ b/frameworks/innerkitsimpl/kvdb/include/observer_bridge.h @@ -23,12 +23,13 @@ namespace OHOS::DistributedKv { class IKvStoreObserver; class API_EXPORT ObserverBridge : public DistributedDB::KvStoreObserver { public: - using Convert = std::function; using Observer = DistributedKv::KvStoreObserver; using DBEntry = DistributedDB::Entry; + using DBKey = DistributedDB::Key; using DBChangedData = DistributedDB::KvStoreChangedData; + using Convert = std::function; - ObserverBridge(const AppId &app, const StoreId &store, std::shared_ptr observer, Convert cvt = nullptr); + ObserverBridge(const AppId &app, const StoreId &store, std::shared_ptr observer, Convert convert); ~ObserverBridge(); Status RegisterRemoteObserver(); Status UnregisterRemoteObserver(); diff --git a/frameworks/innerkitsimpl/kvdb/include/single_store_impl.h b/frameworks/innerkitsimpl/kvdb/include/single_store_impl.h index d3cc7b3bf..6c482b572 100644 --- a/frameworks/innerkitsimpl/kvdb/include/single_store_impl.h +++ b/frameworks/innerkitsimpl/kvdb/include/single_store_impl.h @@ -31,7 +31,12 @@ public: using SyncCallback = KvStoreSyncCallback; using ResultSet = KvStoreResultSet; using DBStore = DistributedDB::KvStoreNbDelegate; + using DBEntry = DistributedDB::Entry; + using DBKey = DistributedDB::Key; + using DBValue = DistributedDB::Value; + using DBQuery = DistributedDB::Query; using SyncInfo = KVDBService::SyncInfo; + using Convert = std::function; SingleStoreImpl(const AppId &appId, std::shared_ptr dbStore); ~SingleStoreImpl() = default; StoreId GetStoreId() const override; @@ -71,16 +76,18 @@ public: protected: static constexpr size_t MAX_KEY_LENGTH = 1024; - virtual std::vector ConvertDBKey(const Key &key) const; - virtual Key ConvertKey(DistributedDB::Key &&key) const; - std::function BridgeReleaser(); + virtual std::vector ToLocalDBKey(const Key &key) const; + virtual std::vector ToWholeDBKey(const Key &key) const; + virtual Key ToKey(DBKey &&key) const; + virtual std::vector GetPrefix(const Key &prefix) const; + virtual std::vector GetPrefix(const DataQuery &query) const; + virtual Convert GetConvert() const; private: Status GetResultSet(const DistributedDB::Query &query, std::shared_ptr &resultSet) const; Status GetEntries(const DistributedDB::Query &query, std::vector &entries) const; - std::vector GetPrefix(const DataQuery &query) const; - sptr GetIPCSyncClient(std::shared_ptr service); Status DoSync(const SyncInfo &syncInfo, std::shared_ptr observer); + std::function BridgeReleaser(); std::string appId_; std::string storeId_; diff --git a/frameworks/innerkitsimpl/kvdb/include/store_factory.h b/frameworks/innerkitsimpl/kvdb/include/store_factory.h index 6e544a5f3..26510719b 100644 --- a/frameworks/innerkitsimpl/kvdb/include/store_factory.h +++ b/frameworks/innerkitsimpl/kvdb/include/store_factory.h @@ -24,11 +24,11 @@ namespace OHOS::DistributedKv { class API_EXPORT StoreFactory { public: static StoreFactory &GetInstance(); - std::shared_ptr Create( + std::shared_ptr GetOrOpenStore( const AppId &appId, const StoreId &storeId, const Options &options, const std::string &path, Status &status); Status Delete(const AppId &appId, const StoreId &storeId, const std::string &path); Status Close(const AppId &appId, const StoreId &storeId); - bool IsExits(const AppId &appId, const StoreId &storeId); + bool IsOpen(const AppId &appId, const StoreId &storeId); private: using DBManager = DistributedDB::KvStoreDelegateManager; diff --git a/frameworks/innerkitsimpl/kvdb/include/store_result_set.h b/frameworks/innerkitsimpl/kvdb/include/store_result_set.h index a89f2513a..a861c9931 100644 --- a/frameworks/innerkitsimpl/kvdb/include/store_result_set.h +++ b/frameworks/innerkitsimpl/kvdb/include/store_result_set.h @@ -26,7 +26,10 @@ class API_EXPORT StoreResultSet : public KvStoreResultSet { public: using DBResultSet = DistributedDB::KvStoreResultSet; using DBStore = DistributedDB::KvStoreNbDelegate; - StoreResultSet(DBResultSet *impl, std::shared_ptr dbStore); + using DBEntry = DistributedDB::Entry; + using DBKey = DistributedDB::Key; + using Convert = std::function; + StoreResultSet(DBResultSet *impl, std::shared_ptr dbStore, Convert convert); ~StoreResultSet(); int GetCount() const override; int GetPosition() const override; @@ -43,13 +46,11 @@ public: Status GetEntry(Entry &entry) const override; Status Close() override; -protected: - virtual Key ConvertKey(DistributedDB::Key &&key) const; - private: mutable std::shared_mutex mutex_; DBResultSet *impl_; std::shared_ptr dbStore_; + Convert convert_; }; } // namespace OHOS::DistributedKv #endif // OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_STORE_RESULT_SET_H diff --git a/frameworks/innerkitsimpl/kvdb/include/system_api.h b/frameworks/innerkitsimpl/kvdb/include/system_api.h index 75e2b3fa0..18ca2c5b9 100644 --- a/frameworks/innerkitsimpl/kvdb/include/system_api.h +++ b/frameworks/innerkitsimpl/kvdb/include/system_api.h @@ -12,7 +12,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_SYSTEM_API_H #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_KVDB_SYSTEM_API_H #include "iprocess_system_api_adapter.h" diff --git a/frameworks/innerkitsimpl/kvdb/src/device_store_impl.cpp b/frameworks/innerkitsimpl/kvdb/src/device_store_impl.cpp new file mode 100644 index 000000000..f582fc35f --- /dev/null +++ b/frameworks/innerkitsimpl/kvdb/src/device_store_impl.cpp @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define LOG_TAG "DeviceStoreImpl" +#include "device_store_impl.h" +#include +#include "dev_manager.h" +#include "log_print.h" +namespace OHOS::DistributedKv { +std::vector DeviceStoreImpl::ToLocalDBKey(const Key &key) const +{ + auto deviceId = DevManager::GetInstance().GetLocalDevice().deviceId; + if (deviceId.empty()) { + return {}; + } + + std::vector input = SingleStoreImpl::GetPrefix(key); + if (input.empty()) { + return {}; + } + + // |local uuid|original key|uuid len| + // |---- -----|------------|---4----| + std::vector dbKey; + dbKey.insert(dbKey.end(), deviceId.begin(), deviceId.end()); + dbKey.insert(dbKey.end(), input.begin(), input.end()); + uint32_t length = deviceId.length(); + length = htole32(length); + dbKey.insert(dbKey.end(), &length, &length + sizeof(length)); + return dbKey; +} + +std::vector DeviceStoreImpl::ToWholeDBKey(const Key &key) const +{ + // | device uuid | original key | uuid len | + // |-------------|--------------|-----4----| + return ConvertNetwork(key); +} + +Key DeviceStoreImpl::ToKey(DBKey &&key) const +{ + // | uuid |original key|uuid len| + // |---- -----|------------|---4----| + uint32_t length = *(reinterpret_cast(&(*(key.end() - sizeof(uint32_t))))); + length = le32toh(length); + key.erase(key.begin(), key.begin() + length); + key.erase(key.end() - sizeof(uint32_t), key.end()); + return std::move(key); +} + +std::vector DeviceStoreImpl::GetPrefix(const Key &prefix) const +{ + // | uuid |original key| + // |---- -----|------------| + return ConvertNetwork(prefix); +} + +std::vector DeviceStoreImpl::GetPrefix(const DataQuery &query) const +{ + std::vector prefix; + uint32_t length = query.deviceId_.size(); + prefix.insert(prefix.end(), &length, &length + sizeof(length)); + prefix.insert(prefix.end(), query.deviceId_.begin(), query.deviceId_.end()); + prefix.insert(prefix.end(), query.prefix_.begin(), query.prefix_.end()); + // | uuid |original key| + // |---- -----|------------| + return ConvertNetwork(std::move(prefix)); +} + +SingleStoreImpl::Convert DeviceStoreImpl::GetConvert() const +{ + return [](const DBKey &key, std::string &deviceId) { + uint32_t length = *(reinterpret_cast(&(*(key.end() - sizeof(uint32_t))))); + length = le32toh(length); + deviceId = { key.begin(), key.begin() + length }; + Key result(std::vector(key.begin() + length, key.end() - sizeof(uint32_t))); + return std::move(key); + }; +} + +std::vector DeviceStoreImpl::ConvertNetwork(const Key &in, bool withLen) const +{ + // input + // | network ID len | networkID | original key| + // |--------4-------|-----------|------------| + if (in.Size() < sizeof(uint32_t)) { + return in.Data(); + } + size_t deviceLen = *(reinterpret_cast(in.Data().data())); + std::string deviceId(in.Data().begin() + sizeof(uint32_t), in.Data().begin() + sizeof(uint32_t) + deviceLen); + std::regex patten("^[0-9]*$"); + if (!std::regex_match(deviceId, patten)) { + ZLOGE("device id length is error."); + return in.Data(); + } + deviceId = DevManager::GetInstance().ToUUID(deviceId); + if (deviceId.empty()) { + return in.Data(); + } + + // output + // | device uuid | original key | uuid len | + // |-------------|--------------|----4-----| + // | Mandatory | Mandatory | Optional | + std::vector out; + out.insert(out.end(), deviceId.begin(), deviceId.end()); + out.insert(out.end(), in.Data().begin() + sizeof(uint32_t) + deviceLen, in.Data().end()); + if (withLen) { + uint32_t length = deviceId.length(); + length = htole32(length); + out.insert(out.end(), &length, &length + sizeof(length)); + } + return out; +} +} \ No newline at end of file diff --git a/frameworks/innerkitsimpl/kvdb/src/kvdb_service_client.cpp b/frameworks/innerkitsimpl/kvdb/src/kvdb_service_client.cpp index 794eb1ead..71902cb79 100644 --- a/frameworks/innerkitsimpl/kvdb/src/kvdb_service_client.cpp +++ b/frameworks/innerkitsimpl/kvdb/src/kvdb_service_client.cpp @@ -54,6 +54,7 @@ namespace OHOS::DistributedKv { std::mutex KVDBServiceClient::mutex_; std::shared_ptr KVDBServiceClient::instance_; std::atomic_bool KVDBServiceClient::isWatched_(false); + std::shared_ptr KVDBServiceClient::GetInstance() { if (!isWatched_.exchange(true)) { @@ -278,4 +279,20 @@ Status KVDBServiceClient::Unsubscribe(const AppId &appId, const StoreId &storeId } return static_cast(status); } + +sptr KVDBServiceClient::GetSyncAgent(const AppId &appId) +{ + auto it = syncAgents_.Find(appId); + if (it.first && it.second == nullptr) { + syncAgents_.Erase(appId); + } + + syncAgents_.ComputeIfAbsent(appId.appId, [this](const std::string &key) { + sptr agent = new (std::nothrow) KvStoreSyncCallbackClient(); + RegisterSyncCallback({ key }, { "" }, agent); + return agent; + }); + + return syncAgents_[appId]; +} } // namespace OHOS::DistributedKv \ No newline at end of file diff --git a/frameworks/innerkitsimpl/kvdb/src/observer_bridge.cpp b/frameworks/innerkitsimpl/kvdb/src/observer_bridge.cpp index 263c7da13..17505a1d3 100644 --- a/frameworks/innerkitsimpl/kvdb/src/observer_bridge.cpp +++ b/frameworks/innerkitsimpl/kvdb/src/observer_bridge.cpp @@ -24,9 +24,14 @@ ObserverBridge::ObserverBridge(const AppId &app, const StoreId &store, std::shar ObserverBridge::~ObserverBridge() { - if (remote_ != nullptr) { - KVDBServiceClient::GetInstance()->Unsubscribe(appId_, storeId_, remote_); + if (remote_ == nullptr) { + return; + } + auto service = KVDBServiceClient::GetInstance(); + if (service == nullptr) { + return; } + service->Unsubscribe(appId_, storeId_, remote_); } Status ObserverBridge::RegisterRemoteObserver() @@ -49,6 +54,7 @@ Status ObserverBridge::UnregisterRemoteObserver() if (remote_ == nullptr) { return SUCCESS; } + auto service = KVDBServiceClient::GetInstance(); if (service == nullptr) { return SERVER_UNAVAILABLE; @@ -62,10 +68,10 @@ Status ObserverBridge::UnregisterRemoteObserver() void ObserverBridge::OnChange(const DBChangedData &data) { std::string deviceId; - ChangeNotification notification(ConvertDB(data.GetEntriesInserted(), deviceId), + ChangeNotification notice(ConvertDB(data.GetEntriesInserted(), deviceId), ConvertDB(data.GetEntriesUpdated(), deviceId), ConvertDB(data.GetEntriesDeleted(), deviceId), deviceId, false); - observer_->OnChange(notification); + observer_->OnChange(notice); } std::vector ObserverBridge::ConvertDB(const std::list &dbEntries, std::string &deviceId) const diff --git a/frameworks/innerkitsimpl/kvdb/src/single_store_impl.cpp b/frameworks/innerkitsimpl/kvdb/src/single_store_impl.cpp index 10c5d96cb..44c27a33d 100644 --- a/frameworks/innerkitsimpl/kvdb/src/single_store_impl.cpp +++ b/frameworks/innerkitsimpl/kvdb/src/single_store_impl.cpp @@ -44,7 +44,7 @@ Status SingleStoreImpl::Put(const Key &key, const Value &value) return ALREADY_CLOSED; } - DistributedDB::Key dbKey = ConvertDBKey(key); + DBKey dbKey = ToLocalDBKey(key); if (dbKey.empty()) { ZLOGE("invalid key:%{public}s, size:%{public}zu", StoreUtil::Anonymous(key.ToString()).c_str(), key.Size()); return INVALID_ARGUMENT; @@ -69,10 +69,10 @@ Status SingleStoreImpl::PutBatch(const std::vector &entries) return ALREADY_CLOSED; } - std::vector dbEntries; - DistributedDB::Entry dbEntry; + std::vector dbEntries; for (const auto &entry : entries) { - dbEntry.key = ConvertDBKey(entry.key); + DBEntry dbEntry; + dbEntry.key = ToLocalDBKey(entry.key); if (dbEntry.key.empty()) { ZLOGE("invalid key:%{public}s, size:%{public}zu", StoreUtil::Anonymous(entry.key.ToString()).c_str(), entry.key.Size()); @@ -99,11 +99,12 @@ Status SingleStoreImpl::Delete(const Key &key) return ALREADY_CLOSED; } - DistributedDB::Key dbKey = ConvertDBKey(key); + DBKey dbKey = ToLocalDBKey(key); if (dbKey.empty()) { ZLOGE("invalid key:%{public}s, size:%{public}zu", StoreUtil::Anonymous(key.ToString()).c_str(), key.Size()); return INVALID_ARGUMENT; } + auto dbStatus = dbStore_->Delete(dbKey); auto status = StoreUtil::ConvertStatus(dbStatus); if (status != SUCCESS) { @@ -122,9 +123,9 @@ Status SingleStoreImpl::DeleteBatch(const std::vector &keys) return ALREADY_CLOSED; } - std::vector dbKeys; + std::vector dbKeys; for (const auto &key : keys) { - DistributedDB::Key dbKey = ConvertDBKey(key); + DBKey dbKey = ToLocalDBKey(key); if (dbKey.empty()) { ZLOGE("invalid key:%{public}s, size:%{public}zu", StoreUtil::Anonymous(key.ToString()).c_str(), key.Size()); return INVALID_ARGUMENT; @@ -217,7 +218,8 @@ Status SingleStoreImpl::SubscribeKvStore(SubscribeType type, std::shared_ptr(new ObserverBridge(appId, storeId, observer), release); + pair.second = std::shared_ptr( + new ObserverBridge(appId, storeId, observer, GetConvert()), release); } bridge = pair.second; realType = (realType & (~pair.first)); @@ -301,13 +303,13 @@ Status SingleStoreImpl::Get(const Key &key, Value &value) return ALREADY_CLOSED; } - DistributedDB::Key dbKey = ConvertDBKey(key); + DBKey dbKey = ToWholeDBKey(key); if (dbKey.empty()) { ZLOGE("invalid key:%{public}s, size:%{public}zu", key.ToString().c_str(), key.Size()); return INVALID_ARGUMENT; } - DistributedDB::Value dbValue; + DBValue dbValue; auto dbStatus = dbStore_->Get(dbKey, dbValue); value = std::move(dbValue); auto status = StoreUtil::ConvertStatus(dbStatus); @@ -320,13 +322,13 @@ Status SingleStoreImpl::Get(const Key &key, Value &value) Status SingleStoreImpl::GetEntries(const Key &prefix, std::vector &entries) const { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); - DistributedDB::Key dbPrefix = ConvertDBKey(prefix); + DBKey dbPrefix = GetPrefix(prefix); if (dbPrefix.empty() && !prefix.Empty()) { ZLOGE("invalid prefix:%{public}s, size:%{public}zu", prefix.ToString().c_str(), prefix.Size()); return INVALID_ARGUMENT; } - DistributedDB::Query dbQuery = DistributedDB::Query::Select(); + DBQuery dbQuery = DBQuery::Select(); dbQuery.PrefixKey(dbPrefix); auto status = GetEntries(dbQuery, entries); if (status != SUCCESS) { @@ -338,7 +340,7 @@ Status SingleStoreImpl::GetEntries(const Key &prefix, std::vector &entrie Status SingleStoreImpl::GetEntries(const DataQuery &query, std::vector &entries) const { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); - DistributedDB::Query dbQuery = *(query.query_); + DBQuery dbQuery = *(query.query_); dbQuery.PrefixKey(GetPrefix(query)); auto status = GetEntries(dbQuery, entries); if (status != SUCCESS) { @@ -350,12 +352,13 @@ Status SingleStoreImpl::GetEntries(const DataQuery &query, std::vector &e Status SingleStoreImpl::GetResultSet(const Key &prefix, std::shared_ptr &resultSet) const { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); - DistributedDB::Key dbPrefix = ConvertDBKey(prefix); + DBKey dbPrefix = GetPrefix(prefix); if (dbPrefix.empty() && !prefix.Empty()) { ZLOGE("invalid prefix:%{public}s, size:%{public}zu", prefix.ToString().c_str(), prefix.Size()); return INVALID_ARGUMENT; } - DistributedDB::Query dbQuery = DistributedDB::Query::Select(); + + DBQuery dbQuery = DistributedDB::Query::Select(); dbQuery.PrefixKey(dbPrefix); auto status = GetResultSet(dbQuery, resultSet); if (status != SUCCESS) { @@ -367,7 +370,7 @@ Status SingleStoreImpl::GetResultSet(const Key &prefix, std::shared_ptr &resultSet) const { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); - DistributedDB::Query dbQuery = *(query.query_); + DBQuery dbQuery = *(query.query_); dbQuery.PrefixKey(GetPrefix(query)); auto status = GetResultSet(dbQuery, resultSet); if (status != SUCCESS) { @@ -401,7 +404,7 @@ Status SingleStoreImpl::GetCount(const DataQuery &query, int &result) const return ALREADY_CLOSED; } - DistributedDB::Query dbQuery = *(query.query_); + DBQuery dbQuery = *(query.query_); dbQuery.PrefixKey(GetPrefix(query)); auto dbStatus = dbStore_->GetCount(dbQuery, result); auto status = StoreUtil::ConvertStatus(dbStatus); @@ -570,20 +573,17 @@ Status SingleStoreImpl::Close() return SUCCESS; } -std::vector SingleStoreImpl::ConvertDBKey(const Key &key) const +std::vector SingleStoreImpl::ToLocalDBKey(const Key &key) const { - auto begin = std::find_if(key.Data().begin(), key.Data().end(), [](int ch) { return !std::isspace(ch); }); - auto rBegin = std::find_if(key.Data().rbegin(), key.Data().rend(), [](int ch) { return !std::isspace(ch); }); - auto end = static_cast(rBegin.base()); - std::vector dbKey; - dbKey.assign(begin, end); - if (dbKey.size() >= MAX_KEY_LENGTH) { - dbKey.clear(); - } - return dbKey; + return GetPrefix(key); +} + +std::vector SingleStoreImpl::ToWholeDBKey(const Key &key) const +{ + return GetPrefix(key); } -Key SingleStoreImpl::ConvertKey(DistributedDB::Key &&key) const +Key SingleStoreImpl::ToKey(DBKey &&key) const { return std::move(key); } @@ -625,7 +625,7 @@ Status SingleStoreImpl::GetResultSet(const DistributedDB::Query &query, std::sha if (dbResultSet == nullptr) { return StoreUtil::ConvertStatus(status); } - resultSet = std::make_shared(dbResultSet, dbStore_); + resultSet = std::make_shared(dbResultSet, dbStore_, GetConvert()); return SUCCESS; } @@ -637,41 +637,43 @@ Status SingleStoreImpl::GetEntries(const DistributedDB::Query &query, std::vecto return ALREADY_CLOSED; } - std::vector dbEntries; + std::vector dbEntries; auto dbStatus = dbStore_->GetEntries(query, dbEntries); entries.resize(dbEntries.size()); auto it = entries.begin(); for (auto &dbEntry : dbEntries) { - auto &entry = *it++; - entry.key = ConvertKey(std::move(dbEntry.key)); + auto &entry = *it; + entry.key = ToKey(std::move(dbEntry.key)); entry.value = std::move(dbEntry.value); + ++it; } return StoreUtil::ConvertStatus(dbStatus); } -std::vector SingleStoreImpl::GetPrefix(const DataQuery &query) const +std::vector SingleStoreImpl::GetPrefix(const Key &prefix) const { - std::string prefix = DevManager::GetInstance().ToUUID(query.deviceId_) + query.prefix_; - return { prefix.begin(), prefix.end() }; + auto begin = std::find_if(prefix.Data().begin(), prefix.Data().end(), [](int ch) { return !std::isspace(ch); }); + auto rBegin = std::find_if(prefix.Data().rbegin(), prefix.Data().rend(), [](int ch) { return !std::isspace(ch); }); + auto end = static_cast(rBegin.base()); + std::vector dbKey; + dbKey.assign(begin, end); + if (dbKey.size() >= MAX_KEY_LENGTH) { + dbKey.clear(); + } + return dbKey; } -sptr SingleStoreImpl::GetIPCSyncClient(std::shared_ptr service) +std::vector SingleStoreImpl::GetPrefix(const DataQuery &query) const { - sptr callback; - { - std::lock_guard lock(mutex_); - if (syncCallback_ != nullptr) { - return syncCallback_; - } - syncCallback_ = new (std::nothrow) KvStoreSyncCallbackClient(); - callback = syncCallback_; - } - - if (service != nullptr) { - service->RegisterSyncCallback({ appId_ }, { storeId_ }, callback); - } + return GetPrefix(Key(query.prefix_)); +} - return callback; +SingleStoreImpl::Convert SingleStoreImpl::GetConvert() const +{ + return [](const DBKey &key, std::string &deviceId) { + deviceId = ""; + return Key(key); + }; } Status SingleStoreImpl::DoSync(const SyncInfo &syncInfo, std::shared_ptr observer) @@ -681,12 +683,12 @@ Status SingleStoreImpl::DoSync(const SyncInfo &syncInfo, std::shared_ptrGetSyncAgent({ appId_ }); + if (syncAgent == nullptr) { + ZLOGE("failed! invalid agent app:%{public}s, store:%{public}s!", appId_.c_str(), storeId_.c_str()); return ILLEGAL_STATE; } - syncClient->AddSyncCallback(observer, syncInfo.seqId); + syncAgent->AddSyncCallback(observer, syncInfo.seqId); return service->Sync({ appId_ }, { storeId_ }, syncInfo); } } // namespace OHOS::DistributedKv \ No newline at end of file diff --git a/frameworks/innerkitsimpl/kvdb/src/store_factory.cpp b/frameworks/innerkitsimpl/kvdb/src/store_factory.cpp index ab0487b5d..2b84024d5 100644 --- a/frameworks/innerkitsimpl/kvdb/src/store_factory.cpp +++ b/frameworks/innerkitsimpl/kvdb/src/store_factory.cpp @@ -15,6 +15,7 @@ #define LOG_TAG "StoreFactory" #include "store_factory.h" +#include "device_store_impl.h" #include "log_print.h" #include "security_manager.h" #include "single_store_impl.h" @@ -33,7 +34,7 @@ StoreFactory::StoreFactory() (void)DBManager::SetProcessSystemAPIAdapter(std::make_shared()); } -std::shared_ptr StoreFactory::Create( +std::shared_ptr StoreFactory::GetOrOpenStore( const AppId &appId, const StoreId &storeId, const Options &options, const std::string &path, Status &status) { DBStatus dbStatus = DBStatus::OK; @@ -46,7 +47,7 @@ std::shared_ptr StoreFactory::Create( auto dbManager = GetDBManager(path, appId); auto password = SecurityManager::GetInstance().GetDBPassword(appId, storeId, path); dbManager->GetKvStore(storeId, GetDBOption(options, password), - [&dbManager, &kvStore, &stores, &appId, &dbStatus](auto status, auto *store) { + [&dbManager, &kvStore, &stores, &appId, &dbStatus, &options](auto status, auto *store) { dbStatus = status; if (store == nullptr) { ZLOGE("Create DBStore failed, status:%{public}d", status); @@ -54,7 +55,11 @@ std::shared_ptr StoreFactory::Create( } auto release = [dbManager](auto *store) { dbManager->CloseKvStore(store); }; auto dbStore = std::shared_ptr(store, release); - kvStore = std::make_shared(appId, dbStore); + if (options.kvStoreType == DEVICE_COLLABORATION) { + kvStore = std::make_shared(appId, dbStore); + } else { + kvStore = std::make_shared(appId, dbStore); + } stores[dbStore->GetStoreId()] = kvStore; }); return !stores.empty(); @@ -88,7 +93,7 @@ Status StoreFactory::Close(const AppId &appId, const StoreId &storeId) return SUCCESS; } -bool StoreFactory::IsExits(const AppId &appId, const StoreId &storeId) +bool StoreFactory::IsOpen(const AppId &appId, const StoreId &storeId) { bool isExits = false; stores_.ComputeIfPresent(appId, [&storeId, &isExits](auto &, auto &values) { diff --git a/frameworks/innerkitsimpl/kvdb/src/store_manager.cpp b/frameworks/innerkitsimpl/kvdb/src/store_manager.cpp index 77cb2cbdf..30e79260c 100644 --- a/frameworks/innerkitsimpl/kvdb/src/store_manager.cpp +++ b/frameworks/innerkitsimpl/kvdb/src/store_manager.cpp @@ -28,15 +28,15 @@ StoreManager &StoreManager::GetInstance() std::shared_ptr StoreManager::GetKVStore( const AppId &appId, const StoreId &storeId, const Options &options, const std::string &path, Status &status) { - if (StoreFactory::GetInstance().IsExits(appId, storeId)) { - return StoreFactory::GetInstance().Create(appId, storeId, options, path, status); + if (StoreFactory::GetInstance().IsOpen(appId, storeId)) { + return StoreFactory::GetInstance().GetOrOpenStore(appId, storeId, options, path, status); } auto service = KVDBServiceClient::GetInstance(); if (service != nullptr) { service->BeforeCreate(appId, storeId, options); } - auto kvStore = StoreFactory::GetInstance().Create(appId, storeId, options, path, status); + auto kvStore = StoreFactory::GetInstance().GetOrOpenStore(appId, storeId, options, path, status); auto password = SecurityManager::GetInstance().GetDBPassword(appId, storeId, path); std::vector pwd(password.GetData(), password.GetData() + password.GetSize()); if (service != nullptr) { diff --git a/frameworks/innerkitsimpl/kvdb/src/store_result_set.cpp b/frameworks/innerkitsimpl/kvdb/src/store_result_set.cpp index 105e3d80a..7d1227d20 100644 --- a/frameworks/innerkitsimpl/kvdb/src/store_result_set.cpp +++ b/frameworks/innerkitsimpl/kvdb/src/store_result_set.cpp @@ -15,12 +15,11 @@ #define LOG_TAG "StoreResultSet" #include "store_result_set.h" -#include "dds_trace.h" #include "log_print.h" #include "store_util.h" namespace OHOS::DistributedKv { -StoreResultSet::StoreResultSet(DBResultSet *impl, std::shared_ptr dbStore) - : impl_(impl), dbStore_(std::move(dbStore)) +StoreResultSet::StoreResultSet(DBResultSet *impl, std::shared_ptr dbStore, Convert convert) + : impl_(impl), dbStore_(std::move(dbStore)), convert_(std::move(convert)) { } @@ -170,14 +169,15 @@ Status StoreResultSet::GetEntry(Entry &entry) const return ALREADY_CLOSED; } - DistributedDB::Entry dbEntry; + DBEntry dbEntry; auto dbStatus = impl_->GetEntry(dbEntry); auto status = StoreUtil::ConvertStatus(dbStatus); if (status != SUCCESS) { ZLOGE("failed! status:%{public}d, position:%{public}d", status, impl_->GetPosition()); return status; } - entry.key = ConvertKey(std::move(dbEntry.key)); + std::string deviceId; + entry.key = convert_ ? convert_(std::move(dbEntry.key), deviceId) : Key(std::move(dbEntry.key)); entry.value = std::move(dbEntry.value); return SUCCESS; } @@ -196,9 +196,4 @@ Status StoreResultSet::Close() } return status; } - -Key StoreResultSet::ConvertKey(DistributedDB::Key &&key) const -{ - return Key(std::move(key)); -} } // namespace OHOS::DistributedKv \ No newline at end of file diff --git a/frameworks/innerkitsimpl/kvdb/test/single_store_impl_test.cpp b/frameworks/innerkitsimpl/kvdb/test/single_store_impl_test.cpp index d95b57f26..83c413322 100644 --- a/frameworks/innerkitsimpl/kvdb/test/single_store_impl_test.cpp +++ b/frameworks/innerkitsimpl/kvdb/test/single_store_impl_test.cpp @@ -573,4 +573,48 @@ HWTEST_F(SingleStoreImplTest, GetSecurityLevel, TestSize.Level0) auto status = kvStore_->GetSecurityLevel(securityLevel); ASSERT_EQ(status, SUCCESS); ASSERT_EQ(securityLevel, S1); +} + +/** +* @tc.name: RegisterSyncCallback +* @tc.desc: register the data sync callback +* @tc.type: FUNC +* @tc.require: I4XVQQ +* @tc.author: Sven Wang +*/ +HWTEST_F(SingleStoreImplTest, RegisterSyncCallback, TestSize.Level0) +{ + ASSERT_NE(kvStore_, nullptr); + class TestSyncCallback : public KvStoreSyncCallback { + public: + void SyncCompleted(const map &results) override + { + } + }; + auto callback = std::make_shared(); + auto status = kvStore_->RegisterSyncCallback(callback); + ASSERT_EQ(status, SUCCESS); +} + +/** +* @tc.name: UnRegisterSyncCallback +* @tc.desc: unregister the data sync callback +* @tc.type: FUNC +* @tc.require: I4XVQQ +* @tc.author: Sven Wang +*/ +HWTEST_F(SingleStoreImplTest, UnRegisterSyncCallback, TestSize.Level0) +{ + ASSERT_NE(kvStore_, nullptr); + class TestSyncCallback : public KvStoreSyncCallback { + public: + void SyncCompleted(const map &results) override + { + } + }; + auto callback = std::make_shared(); + auto status = kvStore_->RegisterSyncCallback(callback); + ASSERT_EQ(status, SUCCESS); + status = kvStore_->UnRegisterSyncCallback(); + ASSERT_EQ(status, SUCCESS); } \ No newline at end of file diff --git a/interfaces/innerkits/distributeddata/BUILD.gn b/interfaces/innerkits/distributeddata/BUILD.gn index ee5600f8f..ce959ec90 100644 --- a/interfaces/innerkits/distributeddata/BUILD.gn +++ b/interfaces/innerkits/distributeddata/BUILD.gn @@ -91,6 +91,7 @@ ohos_shared_library("distributeddata_inner") { kvdb_sources = [ "../../../frameworks/innerkitsimpl/kvdb/src/dev_manager.cpp", + "../../../frameworks/innerkitsimpl/kvdb/src/device_store_impl.cpp", "../../../frameworks/innerkitsimpl/kvdb/src/kvdb_service_client.cpp", "../../../frameworks/innerkitsimpl/kvdb/src/observer_bridge.cpp", "../../../frameworks/innerkitsimpl/kvdb/src/security_manager.cpp", diff --git a/interfaces/innerkits/distributeddata/include/data_query.h b/interfaces/innerkits/distributeddata/include/data_query.h index 070479f59..dcaed066d 100644 --- a/interfaces/innerkits/distributeddata/include/data_query.h +++ b/interfaces/innerkits/distributeddata/include/data_query.h @@ -418,6 +418,7 @@ private: friend class QueryHelper; friend class SingleStoreImpl; + friend class DeviceStoreImpl; // equal to static const char * const EQUAL_TO; -- Gitee