From b3767d62714e332456f00005414e7b5260693352 Mon Sep 17 00:00:00 2001 From: jiangliuwei Date: Mon, 2 Dec 2024 17:48:24 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E5=AE=B9=E5=99=A8=E9=9D=99=E6=80=81IP=E7=9A=84topic=E7=9A=84?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 2 + bin/nanocld/Cargo.toml | 4 +- bin/nanocld/src/eventbus/create.rs | 8 +- bin/nanocld/src/eventbus/get.rs | 116 ++++++++++++++++++ bin/nanocld/src/eventbus/mod.rs | 1 + .../src/eventbus/models/apiserver_cargo.rs | 2 - bin/nanocld/src/eventbus/models/get_ip.rs | 13 ++ bin/nanocld/src/eventbus/models/mod.rs | 3 +- bin/nanocld/src/models/ip.rs | 54 ++++++++ bin/nanocld/src/models/mod.rs | 2 + bin/nanocld/src/utils/container/process.rs | 33 ++++- 11 files changed, 226 insertions(+), 12 deletions(-) create mode 100644 bin/nanocld/src/eventbus/get.rs create mode 100644 bin/nanocld/src/eventbus/models/get_ip.rs create mode 100644 bin/nanocld/src/models/ip.rs diff --git a/Cargo.lock b/Cargo.lock index 60880d49..658cba79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3033,6 +3033,7 @@ dependencies = [ name = "nanocld" version = "0.15.0" dependencies = [ + "anyhow", "bollard-next", "chrono", "clap", @@ -3050,6 +3051,7 @@ dependencies = [ "http 1.1.0", "ipnet", "jsonschema 0.18.3", + "lazy_static", "libc", "log", "metrsd_client", diff --git a/bin/nanocld/Cargo.toml b/bin/nanocld/Cargo.toml index 4e559e67..65ab002c 100644 --- a/bin/nanocld/Cargo.toml +++ b/bin/nanocld/Cargo.toml @@ -103,4 +103,6 @@ http = "1.1.0" dirs = "5.0.1" cri-api = "0.1.4" feventbus = "0.3.0" -fleet_apiserver = "0.3.5" \ No newline at end of file +fleet_apiserver = "0.3.5" +lazy_static = "1.5.0" +anyhow = "1.0.89" \ No newline at end of file diff --git a/bin/nanocld/src/eventbus/create.rs b/bin/nanocld/src/eventbus/create.rs index 6e80db14..d315706e 100644 --- a/bin/nanocld/src/eventbus/create.rs +++ b/bin/nanocld/src/eventbus/create.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use serde_json::Value; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use crate::schema::vm_images::format; fn log_and_return_error( message: &str, @@ -171,13 +172,13 @@ async fn handle_create_cargo_from_apiserver( .and_then(|m| serde_json::from_str(m).ok()); let updated_metadata = insert_nanocl_group(&cargo_meta, ""); cargo_partial.metadata = Some(updated_metadata.to_string()); - + let obj = CargoObjCreateIn { namespace: namespace.clone(), spec: cargo_partial.clone(), version: version.clone(), }; - + match CargoDb::create_obj(&obj, &system_state).await { Ok(cargo) => { let system_pro = Arc::clone(&system_state); @@ -190,8 +191,7 @@ async fn handle_create_cargo_from_apiserver( &kind_key, &ProcessKind::Cargo, &system_pro, - ) - .await + ).await { Ok(_) => { log::info!("cargo create successful: {:?}", cargo.spec.name); diff --git a/bin/nanocld/src/eventbus/get.rs b/bin/nanocld/src/eventbus/get.rs new file mode 100644 index 00000000..8c34d22c --- /dev/null +++ b/bin/nanocld/src/eventbus/get.rs @@ -0,0 +1,116 @@ +use crate::eventbus::eventbus; +use crate::eventbus::models::apiserver_cargo::apiservercargo; +use crate::eventbus::models::get_ip::{IPReq, IpRsp}; +use crate::models::{CargoDb, CargoObjCreateIn, SystemState}; +use crate::objects::generic::ObjCreate; +use crate::utils; +use diesel::serialize::IsNull::No; +use feventbus::impls::nats::nats::NatsCli; +use feventbus::message::Message; +use feventbus::traits::consumer::MessageHandler; +use feventbus::traits::controller::EventBus; +use nanocl_stubs::cargo::Cargo; +use nanocl_stubs::cargo_spec::CargoSpecPartial; +use nanocl_stubs::cargo_spec::ReplicationMode::Auto; +use nanocl_stubs::cri_spec::ContainerMetadataPartial; +use nanocl_stubs::process::ProcessKind; +use nanocl_stubs::statefile::Statefile; +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Debug; +use std::sync::Arc; +use serde_json::Value; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; +use crate::schema::vm_images::format; + +fn log_and_return_error( + message: &str, + error: E, +) -> Result<(), Box> +where + E: Error + Debug + Send + Sync + 'static, +{ + log::error!("{}", message); + Err(Box::new(error)) +} + +// The core logic for creating cargo +async fn handle_create_cargo_from_apiserver( + msg: IPReq, +) -> Result> { + let mut ip_rsp = IpRsp{ + ipv4: "".to_string(), + ipv6: "".to_string(), + }; + if let Some(hm) =crate::models::ip::get_module_values(msg.ip_content.as_str()){ + hm.get("iscas.net/static-ipv4").map(|value| { + ip_rsp.ipv4 = value.clone(); + }); + + hm.get("iscas.net/static-ipv6").map(|value| { + ip_rsp.ipv6 = value.clone(); + }); + }; + match serde_json::to_string(&ip_rsp) { + Ok(json_str) => Ok(json_str), + Err(err) => { + Err(Box::new(err)) + } + } +} + + +// The core logic for creating cargo +pub async fn setup_reply_to_get_resource_topic( + topic: String, +) where + T: Send + + Sync + + serde::de::DeserializeOwned + + serde::Serialize + + Debug + + Clone + + 'static, +{ + let reply_handler: MessageHandler = + Arc::new(move |msg: Message| { + Box::pin(async move { + + + let body = msg.body.ok_or_else(|| { + feventbus::err::Error::MessageHandling( + "Message body is missing".to_string(), + ) + })?; + + if let Ok(ip_req) = + serde_json::from_value::(body.clone()) + { + handle_create_cargo_from_apiserver(ip_req) + .await + .map_err(|e| { + log::error!("Cargo created failed: {:?}", e); + feventbus::err::Error::MessageHandling(format!( + "Cargo created failed: {:?}", + e + )) + }) + }else { + return Err(feventbus::err::Error::MessageHandling( + "create body json unmarshl failed".to_string(), + )); + } + }) + }); + + let nats_cli = Arc::new(NatsCli::new().await.unwrap()); + match eventbus::reply_to_topic(topic.clone(), nats_cli, reply_handler).await { + Ok(_) => { + log::info!("Subscribed to topic: {}", topic); + } + Err(e) => { + log::error!("Failed to subscribe to topic: {}. Error: {:?}", topic, e); + } + } +} diff --git a/bin/nanocld/src/eventbus/mod.rs b/bin/nanocld/src/eventbus/mod.rs index f1d8eabe..b2e4d07d 100644 --- a/bin/nanocld/src/eventbus/mod.rs +++ b/bin/nanocld/src/eventbus/mod.rs @@ -2,3 +2,4 @@ pub mod eventbus; pub mod create; mod delete; mod models; +mod get; diff --git a/bin/nanocld/src/eventbus/models/apiserver_cargo.rs b/bin/nanocld/src/eventbus/models/apiserver_cargo.rs index e2fbb875..a0e56674 100644 --- a/bin/nanocld/src/eventbus/models/apiserver_cargo.rs +++ b/bin/nanocld/src/eventbus/models/apiserver_cargo.rs @@ -3,8 +3,6 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; - - #[derive(Debug, Serialize, Deserialize, Clone)] pub struct apiservercargo { pub content: apiservercargomes, diff --git a/bin/nanocld/src/eventbus/models/get_ip.rs b/bin/nanocld/src/eventbus/models/get_ip.rs new file mode 100644 index 00000000..5d822bb9 --- /dev/null +++ b/bin/nanocld/src/eventbus/models/get_ip.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct IPReq { + pub ip_content: String, +} + + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct IpRsp { + pub ipv4: String, + pub ipv6: String, +} diff --git a/bin/nanocld/src/eventbus/models/mod.rs b/bin/nanocld/src/eventbus/models/mod.rs index 2c860455..e60a9aa7 100644 --- a/bin/nanocld/src/eventbus/models/mod.rs +++ b/bin/nanocld/src/eventbus/models/mod.rs @@ -1,2 +1,3 @@ pub mod apiserver_cargo; -pub mod del_apiserver; \ No newline at end of file +pub mod del_apiserver; +pub(crate) mod get_ip; \ No newline at end of file diff --git a/bin/nanocld/src/models/ip.rs b/bin/nanocld/src/models/ip.rs new file mode 100644 index 00000000..1a87dae4 --- /dev/null +++ b/bin/nanocld/src/models/ip.rs @@ -0,0 +1,54 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use lazy_static::lazy_static; + +type ConfigMap = HashMap>; + +lazy_static! { + static ref CONFIG: Arc> = Arc::new(RwLock::new(HashMap::new())); +} + +/// 设置或更新配置项 +pub fn set_config(module: &str, key: &str, value: &str) { + let mut config = CONFIG.write().unwrap(); + let module_config = config.entry(module.to_string()).or_insert_with(HashMap::new); + module_config.insert(key.to_string(), value.to_string()); +} + +/// 获取配置项 +pub fn get_config(module: &str, key: &str) -> Option { + let config = CONFIG.read().unwrap(); + config.get(module).and_then(|m| m.get(key)).cloned() +} + +/// 获取指定模块的所有值 +pub fn get_module_values(module: &str) -> Option> { + let config = CONFIG.read().unwrap(); + config.get(module).cloned() +} + +/// 删除指定模块的所有值 +pub fn remove_module(module: &str) -> Option> { + let mut config = CONFIG.write().unwrap(); + config.remove(module) +} + +/// 删除配置项 +pub fn remove_config(module: &str, key: &str) -> Option { + let mut config = CONFIG.write().unwrap(); + if let Some(module_config) = config.get_mut(module) { + let value = module_config.remove(key); + if module_config.is_empty() { + config.remove(module); // 如果模块配置为空,则删除模块 + } + value + } else { + None + } +} + +/// 获取所有配置项 +pub fn get_all_configs() -> ConfigMap { + let config = CONFIG.read().unwrap(); + config.clone() // 返回一个副本 +} \ No newline at end of file diff --git a/bin/nanocld/src/models/mod.rs b/bin/nanocld/src/models/mod.rs index 51c3b362..1be6a7d7 100644 --- a/bin/nanocld/src/models/mod.rs +++ b/bin/nanocld/src/models/mod.rs @@ -54,6 +54,8 @@ mod task_manager; pub use task_manager::*; mod object_process_status; +pub(crate) mod ip; + pub use object_process_status::*; pub type Pool = R2D2Pool>; diff --git a/bin/nanocld/src/utils/container/process.rs b/bin/nanocld/src/utils/container/process.rs index 9021cb92..508a21eb 100644 --- a/bin/nanocld/src/utils/container/process.rs +++ b/bin/nanocld/src/utils/container/process.rs @@ -12,6 +12,7 @@ use nanocl_error::{ use nanocl_stubs::cri_spec::CriConfig; use nanocl_stubs::process::{Process, ProcessKind, ProcessPartial}; +use crate::schema::vm_images::format; use crate::{ models::{ProcessDb, SystemState}, repositories::generic::*, @@ -41,6 +42,18 @@ pub async fn create( .create_pod_and_container_req(name.to_string(), item.clone()) .await; + let medata = run_pod_req.clone().config.unwrap().metadata.unwrap(); + let key = format!("{}-{}", medata.name, medata.namespace); + + let annotations = &container_config.annotations; + annotations.get("iscas.net/static-ipv4").map(|value| { + crate::models::ip::set_config(key.as_str(), "iscas.net/static-ipv4", value) + }); + + annotations.get("iscas.net/static-ipv6").map(|val| { + crate::models::ip::set_config(key.as_str(), "iscas.net/static-ipv6", val) + }); + // 第二步:创建container let create_con_rep = state .inner @@ -51,7 +64,10 @@ pub async fn create( Some(run_pod_req), ) .await - .map_err(|e| HttpError::internal_server_error(e.to_string()))?; + .map_err(|e| { + crate::models::ip::remove_module(key.as_str()); + HttpError::internal_server_error(e.to_string()) + })?; // 第三步:start contaienr state @@ -62,7 +78,10 @@ pub async fn create( container_id: create_con_rep.create_container_rep.container_id.clone(), })) .await - .map_err(|e| HttpError::internal_server_error(e.to_string()))?; + .map_err(|e| { + crate::models::ip::remove_module(key.as_str()); + HttpError::internal_server_error(e.to_string()) + })?; // 第三步:inspect sandbox let sandbox_inspect = state @@ -74,7 +93,10 @@ pub async fn create( verbose: true, })) .await - .map_err(|e| HttpError::internal_server_error(e.to_string()))?; + .map_err(|e| { + crate::models::ip::remove_module(key.as_str()); + HttpError::internal_server_error(e.to_string()) + })?; // 第四步:inspect container let contaienr_inspect = state @@ -86,7 +108,10 @@ pub async fn create( verbose: true, })) .await - .map_err(|e| HttpError::internal_server_error(e.to_string()))?; + .map_err(|e| { + crate::models::ip::remove_module(key.as_str()); + HttpError::internal_server_error(e.to_string()) + })?; let created_at = if let Some(status) = contaienr_inspect.clone().status { let created_at_str = status.created_at.to_string(); -- Gitee From e3e89e0093d55df42d313ef5f3d089a0e94e0f26 Mon Sep 17 00:00:00 2001 From: "songpenglei@cnu.edu.cn" <14342538+songpenley@user.noreply.gitee.com> Date: Tue, 3 Dec 2024 16:56:49 +0800 Subject: [PATCH 2/3] resolve json cannot deserialize --- bin/nanocld/src/eventbus/models/get_ip.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bin/nanocld/src/eventbus/models/get_ip.rs b/bin/nanocld/src/eventbus/models/get_ip.rs index 5d822bb9..40aabfc2 100644 --- a/bin/nanocld/src/eventbus/models/get_ip.rs +++ b/bin/nanocld/src/eventbus/models/get_ip.rs @@ -2,12 +2,15 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct IPReq { + #[serde(rename = "ip_content")] pub ip_content: String, } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct IpRsp { + #[serde(rename = "ipv4")] pub ipv4: String, + #[serde(rename = "ipv6")] pub ipv6: String, } -- Gitee From 192f3729a9011013b3e204bedb9c547a5992d5bc Mon Sep 17 00:00:00 2001 From: jiangliuwei Date: Wed, 4 Dec 2024 14:09:12 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=BA=E5=AE=B9?= =?UTF-8?q?=E5=99=A8=E8=AE=BE=E7=BD=AE=E9=9D=99=E6=80=81ip=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 8 +- bin/nanocld/Cargo.toml | 2 +- bin/nanocld/src/eventbus/eventbus.rs | 16 ++- bin/nanocld/src/eventbus/get.rs | 132 ++++++++++----------- bin/nanocld/src/utils/container/process.rs | 4 +- 5 files changed, 82 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 658cba79..e6c2b54c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1585,9 +1585,9 @@ dependencies = [ [[package]] name = "feventbus" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81b7cca7940f29e24592a89727028106825660bd8f9aae1d85283008fea92220" +checksum = "dbdc34ea392307524e52d79bf66e2afc0112ec557574bebd093b2a8763fed75a" dependencies = [ "async-nats", "async-trait", @@ -1645,7 +1645,7 @@ dependencies = [ "diesel_migrations", "dotenv", "env_logger", - "feventbus 0.3.0", + "feventbus 0.3.1", "jsonschema 0.23.0", "k8s-openapi", "lazy_static", @@ -3043,7 +3043,7 @@ dependencies = [ "diesel_migrations", "dirs", "env_logger", - "feventbus 0.3.0", + "feventbus 0.3.1", "fleet_apiserver", "futures", "futures-util", diff --git a/bin/nanocld/Cargo.toml b/bin/nanocld/Cargo.toml index 65ab002c..316b5134 100644 --- a/bin/nanocld/Cargo.toml +++ b/bin/nanocld/Cargo.toml @@ -102,7 +102,7 @@ serde_urlencoded = "0.7.1" http = "1.1.0" dirs = "5.0.1" cri-api = "0.1.4" -feventbus = "0.3.0" +feventbus = "0.3.1" fleet_apiserver = "0.3.5" lazy_static = "1.5.0" anyhow = "1.0.89" \ No newline at end of file diff --git a/bin/nanocld/src/eventbus/eventbus.rs b/bin/nanocld/src/eventbus/eventbus.rs index 0061a3c5..6edcade2 100644 --- a/bin/nanocld/src/eventbus/eventbus.rs +++ b/bin/nanocld/src/eventbus/eventbus.rs @@ -1,4 +1,4 @@ -use crate::eventbus::{create, delete}; +use crate::eventbus::{create, delete, get}; use crate::models::{CargoDb, CargoObjCreateIn, SystemState}; use crate::objects::generic::ObjCreate; use feventbus::impls::nats::nats::NatsCli; @@ -19,7 +19,7 @@ pub fn analyze(state: SystemState) { rt::Arbiter::new().exec_fn(move || { rt::spawn(async move { loop { - if let Err(err) = handle_event_message(state.clone()).await { + if let Err(err) = handle_event_message(state.clone()).await { log::warn!("event::handle_event_message: {:?}", err); } ntex::time::sleep(time::Duration::from_secs(1)).await; @@ -31,7 +31,7 @@ pub async fn handle_event_message( system_state: SystemState, ) -> Result<(), Box> { let system_state = Arc::new(system_state); - + let create_task = tokio::spawn(create::setup_reply_to_create_resource_topic::< serde_json::Value, @@ -42,13 +42,21 @@ pub async fn handle_event_message( serde_json::Value, >("DELETE".to_string(), Arc::clone(&system_state))); + let get_task = tokio::spawn(get::setup_reply_to_get_resource_topic::< + serde_json::Value, + >("GET".to_string())); + let _ = delete_task .await .map_err(|e| Box::new(e) as Box)?; + let _ = create_task .await .map_err(|e| Box::new(e) as Box)?; - + + let _ = get_task + .await + .map_err(|e| Box::new(e) as Box)?; Ok(()) } diff --git a/bin/nanocld/src/eventbus/get.rs b/bin/nanocld/src/eventbus/get.rs index 8c34d22c..e655b3ef 100644 --- a/bin/nanocld/src/eventbus/get.rs +++ b/bin/nanocld/src/eventbus/get.rs @@ -3,6 +3,7 @@ use crate::eventbus::models::apiserver_cargo::apiservercargo; use crate::eventbus::models::get_ip::{IPReq, IpRsp}; use crate::models::{CargoDb, CargoObjCreateIn, SystemState}; use crate::objects::generic::ObjCreate; +use crate::schema::vm_images::format; use crate::utils; use diesel::serialize::IsNull::No; use feventbus::impls::nats::nats::NatsCli; @@ -15,57 +16,54 @@ use nanocl_stubs::cargo_spec::ReplicationMode::Auto; use nanocl_stubs::cri_spec::ContainerMetadataPartial; use nanocl_stubs::process::ProcessKind; use nanocl_stubs::statefile::Statefile; +use serde_json::Value; use std::collections::HashMap; use std::error::Error; use std::fmt::Debug; use std::sync::Arc; -use serde_json::Value; use tokio::sync::oneshot; use tokio::task::JoinHandle; -use crate::schema::vm_images::format; fn log_and_return_error( - message: &str, - error: E, + message: &str, + error: E, ) -> Result<(), Box> where - E: Error + Debug + Send + Sync + 'static, + E: Error + Debug + Send + Sync + 'static, { - log::error!("{}", message); - Err(Box::new(error)) + log::error!("{}", message); + Err(Box::new(error)) } // The core logic for creating cargo -async fn handle_create_cargo_from_apiserver( - msg: IPReq, +async fn handle_get_ip( + msg: IPReq, ) -> Result> { - let mut ip_rsp = IpRsp{ - ipv4: "".to_string(), - ipv6: "".to_string(), - }; - if let Some(hm) =crate::models::ip::get_module_values(msg.ip_content.as_str()){ - hm.get("iscas.net/static-ipv4").map(|value| { - ip_rsp.ipv4 = value.clone(); - }); + let mut ip_rsp = IpRsp { + ipv4: "".to_string(), + ipv6: "".to_string(), + }; + if let Some(hm) = + crate::models::ip::get_module_values(msg.ip_content.as_str()) + { + hm.get("iscas.net/static-ipv4").map(|value| { + ip_rsp.ipv4 = value.clone(); + }); - hm.get("iscas.net/static-ipv6").map(|value| { - ip_rsp.ipv6 = value.clone(); - }); - }; - match serde_json::to_string(&ip_rsp) { - Ok(json_str) => Ok(json_str), - Err(err) => { - Err(Box::new(err)) - } - } + hm.get("iscas.net/static-ipv6").map(|value| { + ip_rsp.ipv6 = value.clone(); + }); + }; + match serde_json::to_string(&ip_rsp) { + Ok(json_str) => Ok(json_str), + Err(err) => Err(Box::new(err)), + } } - // The core logic for creating cargo -pub async fn setup_reply_to_get_resource_topic( - topic: String, -) where - T: Send +pub async fn setup_reply_to_get_resource_topic(topic: String) +where + T: Send + Sync + serde::de::DeserializeOwned + serde::Serialize @@ -73,44 +71,40 @@ pub async fn setup_reply_to_get_resource_topic( + Clone + 'static, { - let reply_handler: MessageHandler = - Arc::new(move |msg: Message| { - Box::pin(async move { - - - let body = msg.body.ok_or_else(|| { - feventbus::err::Error::MessageHandling( - "Message body is missing".to_string(), - ) - })?; - - if let Ok(ip_req) = - serde_json::from_value::(body.clone()) - { - handle_create_cargo_from_apiserver(ip_req) - .await - .map_err(|e| { - log::error!("Cargo created failed: {:?}", e); - feventbus::err::Error::MessageHandling(format!( - "Cargo created failed: {:?}", - e - )) - }) - }else { - return Err(feventbus::err::Error::MessageHandling( - "create body json unmarshl failed".to_string(), - )); - } - }) - }); + let reply_handler: MessageHandler = + Arc::new(move |msg: Message| { + Box::pin(async move { + let body = msg.body.ok_or_else(|| { + feventbus::err::Error::MessageHandling( + "Message body is missing".to_string(), + ) + })?; - let nats_cli = Arc::new(NatsCli::new().await.unwrap()); - match eventbus::reply_to_topic(topic.clone(), nats_cli, reply_handler).await { - Ok(_) => { - log::info!("Subscribed to topic: {}", topic); - } - Err(e) => { - log::error!("Failed to subscribe to topic: {}. Error: {:?}", topic, e); + let cm = crate::models::ip::get_all_configs(); + println!("--------{:#?}-----", cm); + if let Ok(ip_req) = serde_json::from_value::(body.clone()) { + handle_get_ip(ip_req).await.map_err(|e| { + log::error!("Cargo created failed: {:?}", e); + feventbus::err::Error::MessageHandling(format!( + "Cargo created failed: {:?}", + e + )) + }) + } else { + return Err(feventbus::err::Error::MessageHandling( + "create body json unmarshl failed".to_string(), + )); } + }) + }); + + let nats_cli = Arc::new(NatsCli::new().await.unwrap()); + match eventbus::reply_to_topic(topic.clone(), nats_cli, reply_handler).await { + Ok(_) => { + log::info!("Subscribed to topic: {}", topic); + } + Err(e) => { + log::error!("Failed to subscribe to topic: {}. Error: {:?}", topic, e); } + } } diff --git a/bin/nanocld/src/utils/container/process.rs b/bin/nanocld/src/utils/container/process.rs index 508a21eb..159b46fa 100644 --- a/bin/nanocld/src/utils/container/process.rs +++ b/bin/nanocld/src/utils/container/process.rs @@ -112,10 +112,10 @@ pub async fn create( crate::models::ip::remove_module(key.as_str()); HttpError::internal_server_error(e.to_string()) })?; - + crate::models::ip::remove_module(key.as_str()); let created_at = if let Some(status) = contaienr_inspect.clone().status { let created_at_str = status.created_at.to_string(); - log::info!("--------get create at {}--------", created_at_str.clone()); + // log::info!("--------get create at {}--------", created_at_str.clone()); let timestamp: u64 = created_at_str.parse().map_err(|err| { HttpError::internal_server_error(format!("Unable to parse date {err}")) })?; -- Gitee