From f6927ec550ee2839d607ae8258af8ce32eba4e42 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Tue, 29 Oct 2024 08:51:20 +0000 Subject: [PATCH] =?UTF-8?q?1.=E9=87=8D=E6=9E=84node=20join=E9=80=BB?= =?UTF-8?q?=E8=BE=912.node=20join=E5=90=8E=E5=90=88=E5=B9=B6=E4=B8=A4?= =?UTF-8?q?=E4=B8=AA=E8=8A=82=E7=82=B9=E7=9A=84=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 51 ++++++++- bin/nanocl/src/commands/generic.rs | 34 ++++-- bin/nanocld/Cargo.toml | 2 + bin/nanocld/src/services/crud.rs | 128 +++++++++++++++++++++++ bin/nanocld/src/services/mod.rs | 4 +- bin/nanocld/src/services/node.rs | 48 ++++----- crates/nanocld_client/src/http_client.rs | 31 ++++-- 7 files changed, 256 insertions(+), 42 deletions(-) create mode 100644 bin/nanocld/src/services/crud.rs diff --git a/Cargo.lock b/Cargo.lock index 321542f7..af40b4ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1102,6 +1102,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fancy-regex" version = "0.13.0" @@ -1455,6 +1467,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash 0.7.8", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1467,6 +1488,15 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown 0.11.2", +] + [[package]] name = "heck" version = "0.4.1" @@ -1976,9 +2006,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.30.1" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14" dependencies = [ "pkg-config", "vcpkg", @@ -2300,6 +2330,7 @@ dependencies = [ "env_logger", "futures", "futures-util", + "gethostname", "http 1.1.0", "ipnet", "jsonschema", @@ -2318,6 +2349,7 @@ dependencies = [ "nvml-wrapper", "openssl", "rand", + "rusqlite", "serde", "serde_json", "serde_urlencoded", @@ -3525,6 +3557,21 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88f8660c1ff60292143c98d08fc6e2f654d722db50410e3f3797d40baaf9d8f3" +[[package]] +name = "rusqlite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85127183a999f7db96d1a976a309eebbfb6ea3b0b400ddd8340190129de6eb7a" +dependencies = [ + "bitflags 1.3.2", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "memchr", + "smallvec", +] + [[package]] name = "rust-embed" version = "8.5.0" diff --git a/bin/nanocl/src/commands/generic.rs b/bin/nanocl/src/commands/generic.rs index ea8abbfd..5fad10f2 100644 --- a/bin/nanocl/src/commands/generic.rs +++ b/bin/nanocl/src/commands/generic.rs @@ -1,11 +1,10 @@ use clap::Args; use ntex::http::StatusCode; use std::fs::File; -use std::io::Write; +use std::io::{Read, Write}; use dirs::home_dir; use serde_json::json; use nanocld_client::stubs::node::Node; -use std::fs; use nanocl_error::{ http_client::HttpClientError, @@ -20,8 +19,6 @@ use nanocld_client::{ }; use serde::{de::DeserializeOwned, Serialize}; use gethostname::gethostname; -use ntex::http::client::ClientResponse; -use url::quirks::hostname; use crate::{ config::CliConfig, models::{ @@ -319,14 +316,27 @@ pub trait GenericCommandJoin: GenericCommand { opts: &GenericJoinOpts, ) -> IoResult<()> { - let mut local_client = client.clone(); - if local_client.unix_socket != None { + if local_client.unix_socket == None { local_client.url = "http://localhost".to_owned(); local_client.unix_socket = Some(String::from("/run/nanocl/nanocl.sock")); } + let mut path_res = match local_client.send_get( + &format!("/database/get_path"), + None::<()> + ).await { + Ok(res) => res, + Err(_) => { + println!("Get file path of database error!"); + return Ok(()) + } + }; + + let database_path: String = path_res.json().await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + let hostname = gethostname().into_string().unwrap_or_else(|_| "Unknown".to_string()); + let mut res = match local_client .send_get( &format!("/{}/get_by_name?node_name={}", Self::object_name(), hostname), @@ -355,11 +365,11 @@ pub trait GenericCommandJoin: GenericCommand { let mut master_client = client.clone(); master_client.unix_socket = None; master_client.url = format!("http://{}:8585",opts.master_ip); - let res = master_client.send_post( + let res = master_client.send_get( &format!("/{}/join", Self::object_name()), - Some(node_info.clone()), None::<()> ).await?; + if res.status().is_success() { client.send_post( &format!("/{}/update_by_name", Self::object_name()), @@ -399,6 +409,14 @@ pub trait GenericCommandJoin: GenericCommand { // 将 JSON 转换为字符串并写入文件 let host_content_str = serde_json::to_string_pretty(&host_content)?; host_file.write_all(host_content_str.as_bytes())?; + + let mut database_file = File::open(database_path).expect("Failed to open file"); + let mut buffer = Vec::new(); + database_file.read_to_end(&mut buffer).expect("Failed to read file"); + master_client.send_post_file( + &"/database/insert_whole_database".to_string(), + Some(buffer), + ).await?; } Ok(()) } diff --git a/bin/nanocld/Cargo.toml b/bin/nanocld/Cargo.toml index 68404498..8ab82897 100644 --- a/bin/nanocld/Cargo.toml +++ b/bin/nanocld/Cargo.toml @@ -38,6 +38,8 @@ nanocl_stubs = { version = "0.15", features = ["clap"] } serde_yaml = "0.9" [dependencies] +rusqlite = "0.27" +gethostname = "0.2" nvml-wrapper = "0.10.0" sysinfo = "0.27" nanocl_error = { version = "0.4", features = [ diff --git a/bin/nanocld/src/services/crud.rs b/bin/nanocld/src/services/crud.rs new file mode 100644 index 00000000..b5c6b321 --- /dev/null +++ b/bin/nanocld/src/services/crud.rs @@ -0,0 +1,128 @@ +use std::fs::File; +use std::fs; +use ntex::{ + web, +}; +use std::path::Path; +use nanocl_error::http::HttpResult; +use futures::StreamExt; +use std::io::Write; +use rusqlite::Connection; +use crate::{ + models::{SystemState}, + +}; + +pub async fn merge_databases() -> HttpResult { + if let Err(_) = try_merge_databases() { + let s = "Can not merge database"; + return Ok(web::HttpResponse::InternalServerError().json(&s)); + } + Ok(web::HttpResponse::Ok().finish()) +} + +fn try_merge_databases() -> Result<(), Box> { + let conn = Connection::open("database.sqlite")?; + + // 附加外部数据库 + conn.execute("ATTACH DATABASE 'received_database.sqlite' AS db1", [])?; + + // 需要合并的表列表 + let tables = [ + "jobs", "secrets", "specs", "vm_images", "resources", "vms", "metrics", + "cargoes", "events", "resource_kinds", "namespaces", "node_group_links", + "node_groups", "nodes", "object_process_statuses", "processes" + ]; + + // 循环执行 INSERT OR IGNORE 操作 + for table in &tables { + let sql = format!("INSERT OR IGNORE INTO main.{} SELECT * FROM db1.{}", table, table); + conn.execute(&sql, [])?; + } + + // 分离附加的数据库 + conn.execute("DETACH DATABASE db1", [])?; + + // 删除已合并的数据库文件 + fs::remove_file("received_database.sqlite")?; + Ok(()) +} + +/// Insert whole database to another database +#[cfg_attr(feature = "dev", utoipa::path( + post, + tag = "Database", + path = "/database/insert_whole_database", + request_body = Database, + responses( + (status = 200, description = "Database insert successfully"), + (status = 500, description = "Failed to insert database") + ) +))] +#[web::post("/database/insert_whole_database")] +pub async fn insert_whole_database( + state: web::types::State, + mut payload: web::types::Payload +) -> HttpResult { + // 定义保存文件的路径 + let file_path = Path::new("received_database.sqlite"); + + // 创建文件 + let mut file = File::create(&file_path).expect("Failed to create file"); + + // 从请求体流中逐步接收文件内容并写入文件 + while let Some(chunk) = payload.next().await { + let data = chunk.expect("Failed to read chunk"); + file.write_all(&data).expect("Failed to write to file"); + } + + merge_databases().await +} + +/// get the file path of database +#[cfg_attr(feature = "dev", utoipa::path( + get, + tag = "Database", + path = "/database/get_path", + request_body = None, + responses( + (status = 200, description = "Get the file path of database successfully"), + (status = 500, description = "Failed to get the file path of database") + ) +))] +#[web::get("/database/get_path")] +pub async fn get_path( + state: web::types::State, +) -> HttpResult { + let store_addr = state.inner.config.store_addr.clone(); + match store_addr { + Some(addr) => { + // 尝试将路径转换为绝对路径 + let absolute_path = Path::new(&addr).canonicalize(); + match absolute_path { + Ok(path) => { + Ok(web::HttpResponse::Ok().json(&path.display().to_string())) + } + Err(e) => { + // 处理无法转换为绝对路径的错误 + println!("Failed to get absolute path: {}", e); + let s = "Failed to get absolute path"; + Ok(web::HttpResponse::InternalServerError().json(&s)) + } + } + } + None => { + let s = "Can not find the file path of database"; + Ok(web::HttpResponse::InternalServerError().json(&s)) + } + } +} + + + + +pub fn ntex_config(config: &mut web::ServiceConfig) { + config.service(insert_whole_database); + config.service(get_path); +} + diff --git a/bin/nanocld/src/services/mod.rs b/bin/nanocld/src/services/mod.rs index 56f18251..6205a9b8 100644 --- a/bin/nanocld/src/services/mod.rs +++ b/bin/nanocld/src/services/mod.rs @@ -19,6 +19,7 @@ mod secret; mod system; mod vm; mod vm_image; +mod crud; pub async fn unhandled() -> HttpResult { Err(HttpError::not_found("Route or method unhandled")) @@ -63,7 +64,8 @@ pub fn ntex_config(config: &mut web::ServiceConfig) { .configure(process::ntex_config) .configure(job::ntex_config) .configure(event::ntex_config) - .configure(resource_kind::ntex_config), + .configure(resource_kind::ntex_config) + .configure(crud::ntex_config), ); } diff --git a/bin/nanocld/src/services/node.rs b/bin/nanocld/src/services/node.rs index 64fef185..c425276a 100644 --- a/bin/nanocld/src/services/node.rs +++ b/bin/nanocld/src/services/node.rs @@ -1,5 +1,5 @@ use std::{cell::RefCell, fs, rc::Rc, time::Instant}; -use std::io::ErrorKind; +use gethostname::gethostname; use futures::future::ready; use crate::schema::nodes::dsl::*; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; @@ -67,7 +67,8 @@ pub async fn get_node_by_name( let node = NodeDb::read_by_pk(node_name, &state.inner.pool).await?; Ok(web::HttpResponse::Ok().json(&node)) } else { - Ok(web::HttpResponse::BadRequest().body("Missing node_name in request body")) + let s = "Missing node_name in request body"; + Ok(web::HttpResponse::InternalServerError().json(&s)) } } @@ -91,7 +92,10 @@ pub async fn delete_conf() -> HttpResult { // 尝试删除文件 match fs::remove_file(conf_file_path) { Ok(_) => Ok(web::HttpResponse::Ok().finish()), - Err(_) => Ok(web::HttpResponse::BadRequest().body("Delete node conf error!")), + Err(_) => { + let s = "Delete node conf error!"; + Ok(web::HttpResponse::InternalServerError().json(&s)) + }, } } @@ -117,7 +121,8 @@ pub async fn delete_node_by_name( NodeDb::del_by_pk(node_name, &state.inner.pool).await?; Ok(web::HttpResponse::Ok().finish()) } else { - Ok(web::HttpResponse::BadRequest().body("Missing node_name in request body")) + let s = "Missing node_name in request body"; + Ok(web::HttpResponse::InternalServerError().json(&s)) } } @@ -164,41 +169,34 @@ pub async fn update_node_by_name( Ok(web::HttpResponse::Ok().finish()) } Err(e) => { - Ok(web::HttpResponse::InternalServerError().finish()) + let s = "Fail to update node by name!"; + Ok(web::HttpResponse::InternalServerError().json(&s)) } } } /// Join a node #[cfg_attr(feature = "dev", utoipa::path( - post, + get, tag = "Nodes", path = "/nodes/join", - request_body = Node, + request_body = None, responses( - (status = 200, description = "Node added successfully"), - (status = 500, description = "Failed to add node") + (status = 200, description = "Node can join"), + (status = 500, description = "Node can not join") ) ))] -#[web::post("/nodes/join")] +#[web::get("/nodes/join")] pub async fn join_node( state: web::types::State, - new_node: web::types::Json ) -> HttpResult { - match NodeDb::create_if_not_exists(&new_node, &state.inner.pool).await { - Ok(_) => { - // 返回成功信息 - Ok(web::HttpResponse::Ok().finish()) - } - Err(e) => { - // 处理数据库插入错误,返回错误响应 - if e.inner.kind() == ErrorKind::AlreadyExists { - let _ = NodeDb::update_if_exist(&new_node, &state.inner.pool).await?; - Ok(web::HttpResponse::Ok().finish()) - } else{ - Ok(web::HttpResponse::InternalServerError().finish()) - } - } + let hostname = gethostname().into_string().unwrap_or_else(|_| "Unknown".to_string()); + let node = NodeDb::read_by_pk(&hostname, &state.inner.pool).await?; + if node.role == "master" { + Ok(web::HttpResponse::Ok().finish()) + } else { + let s = format!("node {} is a site!", node.endpoint); + Ok(web::HttpResponse::InternalServerError().json(&s)) } } diff --git a/crates/nanocld_client/src/http_client.rs b/crates/nanocld_client/src/http_client.rs index a23045af..a964fd12 100644 --- a/crates/nanocld_client/src/http_client.rs +++ b/crates/nanocld_client/src/http_client.rs @@ -264,7 +264,6 @@ impl NanocldClient { })?; Ok(query) } - pub async fn send_post( &self, url: &str, @@ -272,19 +271,39 @@ impl NanocldClient { query: Option, ) -> Result where - B: serde::Serialize, - Q: serde::Serialize, + B: serde::Serialize, + Q: serde::Serialize, { let mut req = self.post(url)?; if let Some(query) = query { req = req - .query(&query) - .map_err(|err| err.map_err_context(|| "Query"))?; + .query(&query) + .map_err(|err| err.map_err_context(|| "Query"))?; } let mut res = match body { None => req.send().await.map_err(|err| self.send_error(err))?, Some(body) => req - .send_json(&body) + .send_json(&body) + .await + .map_err(|err| self.send_error(err))?, + }; + let status = res.status(); + is_api_error(&mut res, &status).await?; + Ok(res) + } + + pub async fn send_post_file( + &self, + url: &str, + body: Option>, + ) -> Result + { + let mut req = self.post(url)?; + let mut res = match body { + None => req.send().await.map_err(|err| self.send_error(err))?, + Some(body) => req + .header("Content-Type", "application/octet-stream") + .send_body(Bytes::from(body)) .await .map_err(|err| self.send_error(err))?, }; -- Gitee