diff --git a/bin/nanocld/src/main.rs b/bin/nanocld/src/main.rs index 62b1071480bc99302af5fcf1717d25e211a32950..bff2b2a2777331958afa452486a46bde2f140bcf 100644 --- a/bin/nanocld/src/main.rs +++ b/bin/nanocld/src/main.rs @@ -1,5 +1,5 @@ use clap::Parser; -use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; +use diesel::{update, ExpressionMethods, QueryDsl, RunQueryDsl}; use serde_json::json; use nanocl_error::io::FromIo; use nanocl_utils::logger; @@ -8,6 +8,7 @@ use crate::{ repositories::generic::* }; use std::time::Duration; +use chrono::NaiveDateTime; use sysinfo::{System, SystemExt, CpuExt}; use nvml_wrapper::Nvml; use ntex::rt; @@ -98,7 +99,7 @@ async fn start_monitoring_task(shutdown_rx: watch::Receiver, daemon_state: let nvml = match Nvml::init() { Ok(nvml) => Some(nvml), Err(e) => { - log::warn!("NVML 初始化失败,可能机器中没有 NVIDIA 显卡: {}", e); + log::warn!("NVML init error,maybe there is no NVIDIA GPU in this machine: {}", e); None // 没有 NVIDIA 显卡的情况下继续执行程序 } }; @@ -163,7 +164,7 @@ async fn start_monitoring_task(shutdown_rx: watch::Receiver, daemon_state: rt::spawn_blocking(move || { use crate::schema::nodes::dsl::*; let mut conn = utils::store::get_pool_conn(&db_pool).unwrap(); - diesel::update(nodes.find(hostname)) + update(nodes.find(hostname)) .set(( metadata.eq(meta_data), updated_at.eq(chrono::Utc::now().naive_utc().format("%Y-%m-%d %H:%M:%S").to_string()), @@ -209,6 +210,34 @@ async fn start_monitoring_task(shutdown_rx: watch::Receiver, daemon_state: } } println!("send heartbeat succeed!"); + } else { + use crate::schema::nodes::dsl::*; + let mut conn = utils::store::get_pool_conn(&db_pool).unwrap(); + + let now = chrono::Utc::now().naive_utc(); + + // 查询所有 role 为 "site" 的行 + let target_nodes = nodes + .filter(role.eq("site")) + .load::(&mut conn).unwrap(); + + for node in target_nodes { + // 将 updated_at 字段转换为 NaiveDateTime + if let Ok(updated_at_time) = NaiveDateTime::parse_from_str(&node.updated_at, "%Y-%m-%d %H:%M:%S") { + // 计算时间差 + let duration = now.signed_duration_since(updated_at_time); + + // 如果时间差超过两分钟 + if duration > chrono::Duration::minutes(2) { + // 更新 status 为 NotReady + update(nodes.find(node.name)) + .set(status.eq("NotReady")) + .execute(&mut conn).expect("heartbeat update node status error!"); + } + } else { + println!("format node updated_error when heartbeat try to update node status: {}", node.updated_at); + } + } } } Err(e) => { diff --git a/bin/nanocld/src/repositories/node.rs b/bin/nanocld/src/repositories/node.rs index b401204937a6fb22466c2ee96b1eda7e151521f5..9c5107bbae3f5d89e7ef4e040b29173c0f48891c 100644 --- a/bin/nanocld/src/repositories/node.rs +++ b/bin/nanocld/src/repositories/node.rs @@ -1,17 +1,13 @@ use std::{collections::HashMap, net::IpAddr}; - +use std::io::Error; +use clap::error::ErrorKind; use diesel::prelude::*; - +use diesel::update; use nanocl_error::io::{IoError, IoResult}; use nanocl_stubs::generic::GenericFilter; -use crate::{ - gen_sql_multiple, gen_sql_order_by, gen_sql_query, - models::{ColumnType, NodeDb, Pool, SystemState}, - schema::nodes, - vars, -}; +use crate::{gen_sql_multiple, gen_sql_order_by, gen_sql_query, models::{ColumnType, NodeDb, Pool, SystemState}, schema::nodes, utils, vars}; use super::generic::*; @@ -70,13 +66,42 @@ impl RepositoryCountBy for NodeDb { } impl NodeDb { + pub async fn update_if_exist( + node: &NodeDb, + pool: &Pool, + ) -> IoResult { + match NodeDb::read_by_pk(&node.name, pool).await { + Err(_) => Err(IoError::new("node doesn't exist", Error::new(std::io::ErrorKind::NotFound, "node doesn't exist"))), + Ok(_) => { + use crate::schema::nodes::dsl::*; + let mut conn = utils::store::get_pool_conn(&pool).unwrap(); + let new_node = node.clone(); + update(nodes.find(node.name.clone())) + .set(( + role.eq(new_node.role.clone()), + created_at.eq(new_node.created_at.clone()), + updated_at.eq(new_node.updated_at.clone()), + ip_address.eq(new_node.ip_address.clone()), + endpoint.eq(new_node.endpoint.clone()), + master_endpoint.eq(new_node.master_endpoint.clone()), + version.eq(new_node.version.clone()), + metadata.eq(new_node.metadata.clone()), + status.eq(new_node.status.clone()), + )) + .execute(&mut conn).expect("update NodeDb error by creat_if_node_exists"); + Ok(new_node) + }, + } + } pub async fn create_if_not_exists( node: &NodeDb, pool: &Pool, ) -> IoResult { match NodeDb::read_by_pk(&node.name, pool).await { Err(_) => NodeDb::create_from(node.clone(), pool).await, - Ok(node) => Ok(node), + Ok(_) => { + Err(IoError::new("node is exist", Error::new(std::io::ErrorKind::AlreadyExists, "node is exist"))) + }, } } @@ -103,7 +128,14 @@ impl NodeDb { metadata: None, status: "Ready".parse().unwrap(), }; - NodeDb::create_if_not_exists(&node, &state.inner.pool).await?; - Ok(()) + match NodeDb::create_if_not_exists(&node, &state.inner.pool).await { + Ok(_) => Ok(()), + Err(e) => { + if e.inner.kind() == std::io::ErrorKind::AlreadyExists { + NodeDb::update_if_exist(&node, &state.inner.pool).await?; + } + Ok(()) + } + } } } diff --git a/bin/nanocld/src/services/node.rs b/bin/nanocld/src/services/node.rs index 74787e12853c25c950cad404c448e84065ca3ceb..5aba6876b27604efa594ff195dd997c0406e4ee2 100644 --- a/bin/nanocld/src/services/node.rs +++ b/bin/nanocld/src/services/node.rs @@ -1,4 +1,5 @@ use std::{cell::RefCell, rc::Rc, time::Instant}; +use std::io::ErrorKind; use futures::future::ready; use crate::schema::nodes::dsl::*; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; @@ -138,9 +139,14 @@ pub async fn join_node( // 返回成功信息 Ok(web::HttpResponse::Ok().finish()) } - Err(_) => { + Err(e) => { // 处理数据库插入错误,返回错误响应 - Ok(web::HttpResponse::InternalServerError().finish()) + if e.inner.kind() == ErrorKind::AlreadyExists { + let _ = NodeDb::update_if_exist(&new_node, &state.inner.pool).await?; + Ok(web::HttpResponse::Ok().finish()) + } else{ + Ok(web::HttpResponse::InternalServerError().finish()) + } } } } diff --git a/crates/nanocl_error/src/io.rs b/crates/nanocl_error/src/io.rs index a14b7fc3ac9391d9a990b514f6a88637b67a0f39..7ef4a7c6bbe0f642d8d39b468027040b08cdea7a 100644 --- a/crates/nanocl_error/src/io.rs +++ b/crates/nanocl_error/src/io.rs @@ -18,7 +18,7 @@ impl Clone for IoError { impl IoError { pub fn new(context: T, inner: std::io::Error) -> Self where - T: Into, + T: Into, { Self { context: Some(context.into()), @@ -35,7 +35,7 @@ impl IoError { pub fn invalid_data(context: M, message: M) -> Self where - M: ToString + std::fmt::Display, + M: ToString + std::fmt::Display, { Self::new( context.to_string(), @@ -45,7 +45,7 @@ impl IoError { pub fn other(context: M, message: M) -> Self where - M: ToString + std::fmt::Display, + M: ToString + std::fmt::Display, { Self::new( context.to_string(), @@ -55,7 +55,7 @@ impl IoError { pub fn invalid_input(context: M, message: M) -> Self where - M: ToString + std::fmt::Display, + M: ToString + std::fmt::Display, { Self::new( context.to_string(), @@ -68,7 +68,7 @@ impl IoError { pub fn not_found(context: M, message: M) -> Self where - M: ToString + std::fmt::Display, + M: ToString + std::fmt::Display, { Self::new( context.to_string(), @@ -78,7 +78,7 @@ impl IoError { pub fn interrupted(context: M, message: M) -> Self where - M: ToString + std::fmt::Display, + M: ToString + std::fmt::Display, { Self::new( context.to_string(), @@ -185,13 +185,13 @@ pub type IoResult = Result; pub trait FromIo { fn map_err_context(self, context: impl FnOnce() -> C) -> T where - C: ToString + std::fmt::Display; + C: ToString + std::fmt::Display; } impl FromIo for TryLockError { fn map_err_context(self, context: impl FnOnce() -> C) -> IoError where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { IoError::interrupted((context)().to_string(), self.to_string()) } @@ -206,7 +206,7 @@ impl From> for IoError { impl FromIo for PoisonError { fn map_err_context(self, context: impl FnOnce() -> C) -> IoError where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { IoError::interrupted((context)().to_string(), self.to_string()) } @@ -221,7 +221,7 @@ impl From> for IoError { impl FromIo for IoError { fn map_err_context(self, context: impl FnOnce() -> C) -> IoError where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { IoError { context: Some((context)().to_string()), @@ -233,7 +233,7 @@ impl FromIo for IoError { impl FromIo> for std::io::Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -245,7 +245,7 @@ impl FromIo> for std::io::Error { impl FromIo> for std::string::FromUtf8Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -279,7 +279,7 @@ impl From for std::io::Error { impl FromIo> for serde_json::Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -299,7 +299,7 @@ impl From for IoError { impl FromIo> for serde_yaml::Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -312,7 +312,7 @@ impl FromIo> for serde_yaml::Error { impl FromIo> for serde_urlencoded::ser::Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -328,7 +328,7 @@ impl FromIo> for serde_urlencoded::ser::Error { impl FromIo> for bollard_next::errors::Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -351,7 +351,7 @@ impl From for IoError { impl FromIo> for diesel::result::Error { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { let inner = match self { diesel::result::Error::NotFound => { @@ -381,7 +381,7 @@ impl FromIo> for diesel::result::Error { #[cfg(feature = "ntex")] impl From> for IoError where - T: std::fmt::Debug, + T: std::fmt::Debug, { fn from(f: ntex::http::error::BlockingError) -> Self { match f { @@ -399,7 +399,7 @@ where impl FromIo> for ntex::http::client::error::SendRequestError { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { let inner = match self { ntex::http::client::error::SendRequestError::Timeout => { @@ -432,7 +432,7 @@ impl FromIo> for ntex::http::client::error::SendRequestError { impl FromIo> for ntex::http::client::error::JsonPayloadError { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -448,7 +448,7 @@ impl FromIo> for ntex::http::client::error::JsonPayloadError { impl FromIo> for ntex::http::error::PayloadError { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -464,7 +464,7 @@ impl FromIo> for ntex::http::error::PayloadError { impl FromIo> for ntex::ws::error::WsClientBuilderError { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -480,7 +480,7 @@ impl FromIo> for ntex::ws::error::WsClientBuilderError { impl FromIo> for ntex::ws::error::WsClientError { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()), @@ -496,7 +496,7 @@ impl FromIo> for ntex::ws::error::WsClientError { impl FromIo> for tokio::task::JoinError { fn map_err_context(self, context: impl FnOnce() -> C) -> Box where - C: ToString + std::fmt::Display, + C: ToString + std::fmt::Display, { Box::new(IoError { context: Some((context)().to_string()),