diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs index 16360ed..04c0d63 100644 --- a/adapter/cron/src/main.rs +++ b/adapter/cron/src/main.rs @@ -5,8 +5,8 @@ use base::traits::{IdentifiableFlow, LoadConfig, Server}; use chrono::{DateTime, Datelike, Timelike, Utc}; use cron::Schedule; use std::str::FromStr; -use tucana::shared::ValidationFlow; use tucana::shared::value::Kind; +use tucana::shared::{RuntimeFeature, Translation, ValidationFlow}; #[derive(Default)] struct Cron {} @@ -24,7 +24,19 @@ impl LoadConfig for CronConfig { async fn main() { let server = Cron::default(); let runner = ServerRunner::new(server).await.unwrap(); - runner.serve().await.unwrap(); + + let featues = vec![RuntimeFeature { + name: vec![Translation { + code: "en-US".to_string(), + content: "Cron Adapter".to_string(), + }], + description: vec![Translation { + code: "en-US".to_string(), + content: "A Cron-Adapter is a time-based scheduler that runs commands or scripts automatically at specified times or intervals.".to_string(), + }], + }]; + + runner.serve(featues, vec![]).await.unwrap(); } struct Time { diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index e19a985..5dcbcf2 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -21,9 +21,9 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpListener; use tonic::async_trait; -use tucana::shared::value::Kind::StructValue; -use tucana::shared::{ListValue, value::Kind}; +use tucana::shared::{AdapterConfiguration, RuntimeFeature, value::Kind}; use tucana::shared::{Struct, ValidationFlow, Value}; +use tucana::shared::{Translation, value::Kind::StructValue}; mod config; mod route; @@ -39,7 +39,30 @@ async fn main() { Err(err) => panic!("Failed to create server runner: {:?}", err), }; log::info!("Successfully created runner for http service"); - match runner.serve().await { + + let addr = runner.get_server_config().port; + let host = runner.get_server_config().host.clone(); + + let featues = vec![RuntimeFeature { + name: vec![Translation { + code: "en-US".to_string(), + content: "Rest Adapter".to_string(), + }], + description: vec![Translation { + code: "en-US".to_string(), + content: "A Rest-Adapter is a server that exposes resources through HTTP URLs (endpoints). Clients use methods like GET, POST, PUT, and DELETE to retrieve or modify data, typically exchanged as JSON.".to_string(), + }], + }]; + + let configs = vec![AdapterConfiguration { + data: Some(tucana::shared::adapter_configuration::Data::Endpoint( + format!( + r"{}:{}/${{project_slug}}/${{flow_setting_identifier}}", + host, addr + ), + )), + }]; + match runner.serve(featues, configs).await { Ok(_) => (), Err(err) => panic!("Failed to start server runner: {:?}", err), }; @@ -186,8 +209,8 @@ async fn execute_flow_to_hyper_response( // headers struct let Value { kind: - Some(Kind::ListValue(ListValue { - values: header_fields, + Some(Kind::StructValue(Struct { + fields: header_fields, })), } = headers_val else { @@ -199,46 +222,16 @@ async fn execute_flow_to_hyper_response( let http_headers: HashMap = header_fields .iter() - .filter_map(|x| { + .filter_map(|(k, v)| { if let Value { - kind: Some(StructValue(Struct { fields: f })), - } = x + kind: Some(Kind::StringValue(x)), + } = v { - Some(f) + Some((k.clone(), x.clone())) } else { None } }) - .filter_map(|f| { - let key = match f.get("key") { - Some(value) => { - if let Value { - kind: Some(Kind::StringValue(x)), - } = value - { - x - } else { - return None; - } - } - None => return None, - }; - let value = match f.get("value") { - Some(value) => { - if let Value { - kind: Some(Kind::StringValue(x)), - } = value - { - x - } else { - return None; - } - } - None => return None, - }; - - Some((key.clone(), value.clone())) - }) .collect(); // status_code number diff --git a/crates/base/src/client/mod.rs b/crates/base/src/client/mod.rs new file mode 100644 index 0000000..623f354 --- /dev/null +++ b/crates/base/src/client/mod.rs @@ -0,0 +1,130 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use tokio::time::sleep; +use tonic::transport::{Channel, Endpoint}; +use tucana::{ + aquila::{ + RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient, + runtime_status_update_request::Status, + }, + shared::{AdapterConfiguration, AdapterRuntimeStatus, RuntimeFeature}, +}; + +pub struct DracoRuntimeStatusService { + channel: Channel, + identifier: String, + features: Vec, + configs: Vec, +} + +const MAX_BACKOFF: u64 = 2000 * 60; +const MAX_RETRIES: i8 = 10; + +// Will create a channel and retry if its not possible +pub async fn create_channel_with_retry(channel_name: &str, url: String) -> Channel { + let mut backoff = 100; + let mut retries = 0; + + loop { + let channel = match Endpoint::from_shared(url.clone()) { + Ok(c) => { + log::debug!("Creating a new endpoint for the: {} Service", channel_name); + c.connect_timeout(std::time::Duration::from_secs(2)) + .timeout(std::time::Duration::from_secs(10)) + } + Err(err) => { + panic!( + "Cannot create Endpoint for Service: `{}`. Reason: {:?}", + channel_name, err + ); + } + }; + + match channel.connect().await { + Ok(ch) => { + return ch; + } + Err(err) => { + log::warn!( + "Retry connect to `{}` using url: `{}` failed: {:?}, retrying in {}ms", + channel_name, + url, + err, + backoff + ); + sleep(std::time::Duration::from_millis(backoff)).await; + + backoff = (backoff * 2).min(MAX_BACKOFF); + retries += 1; + + if retries >= MAX_RETRIES { + panic!("Reached max retries to url {}", url) + } + } + } + } +} +impl DracoRuntimeStatusService { + pub async fn from_url( + aquila_url: String, + identifier: String, + features: Vec, + configs: Vec, + ) -> Self { + let channel = create_channel_with_retry("Aquila", aquila_url).await; + Self::new(channel, identifier, features, configs) + } + + pub fn new( + channel: Channel, + identifier: String, + features: Vec, + configs: Vec, + ) -> Self { + DracoRuntimeStatusService { + channel, + identifier, + features, + configs, + } + } + + pub async fn update_runtime_status_by_status( + &self, + status: tucana::shared::adapter_runtime_status::Status, + ) { + log::info!("Updating the current Runtime Status!"); + let mut client = RuntimeStatusServiceClient::new(self.channel.clone()); + + let now = SystemTime::now(); + let timestamp = match now.duration_since(UNIX_EPOCH) { + Ok(time) => time.as_secs(), + Err(err) => { + log::error!("cannot get current system time: {:?}", err); + 0 + } + }; + + let request = RuntimeStatusUpdateRequest { + status: Some(Status::AdapterRuntimeStatus(AdapterRuntimeStatus { + status: status.into(), + timestamp: timestamp as i64, + identifier: self.identifier.clone(), + features: self.features.clone(), + configurations: self.configs.clone(), + })), + }; + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the RuntimeStatus accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update RuntimeStatus: {:?}", err); + } + } + } +} diff --git a/crates/base/src/lib.rs b/crates/base/src/lib.rs index 3840a1c..5b90a8c 100644 --- a/crates/base/src/lib.rs +++ b/crates/base/src/lib.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod config; pub mod runner; pub mod store; diff --git a/crates/base/src/runner.rs b/crates/base/src/runner.rs index dc6c67e..aa04a27 100644 --- a/crates/base/src/runner.rs +++ b/crates/base/src/runner.rs @@ -1,4 +1,5 @@ use crate::{ + client::DracoRuntimeStatusService, config::AdapterConfig, store::AdapterStore, traits::{LoadConfig, Server as AdapterServer}, @@ -8,6 +9,7 @@ use std::sync::Arc; use tokio::signal; use tonic::transport::Server; use tonic_health::pb::health_server::HealthServer; +use tucana::shared::{AdapterConfiguration, RuntimeFeature}; /// Context passed to adapter server implementations containing all shared resources pub struct ServerContext { @@ -23,6 +25,10 @@ pub struct ServerRunner { } impl ServerRunner { + pub fn get_server_config(&self) -> Arc { + self.context.server_config.clone() + } + pub async fn new>(server: S) -> anyhow::Result { env_logger::Builder::from_default_env() .filter_level(log::LevelFilter::Debug) @@ -50,11 +56,34 @@ impl ServerRunner { }) } - pub async fn serve(self) -> anyhow::Result<()> { + pub async fn serve( + self, + runtime_feature: Vec, + runtime_config: Vec, + ) -> anyhow::Result<()> { let config = self.context.adapter_config.clone(); + let mut runtime_status_service: Option = None; + log::info!("Starting Draco Variant: {}", config.draco_variant); if !config.is_static() { + runtime_status_service = Some( + DracoRuntimeStatusService::from_url( + config.aquila_url.clone(), + config.draco_variant.clone(), + runtime_feature, + runtime_config, + ) + .await, + ); + + if let Some(ser) = &runtime_status_service { + ser.update_runtime_status_by_status( + tucana::shared::adapter_runtime_status::Status::NotReady, + ) + .await; + }; + let definition_service = FlowUpdateService::from_url( config.aquila_url.clone(), config.definition_path.as_str(), @@ -95,6 +124,13 @@ impl ServerRunner { } = self; // Init the adapter server (e.g. create underlying HTTP server) server.init(&context).await?; + + if let Some(ser) = &runtime_status_service { + ser.update_runtime_status_by_status( + tucana::shared::adapter_runtime_status::Status::Running, + ) + .await; + }; log::info!("Draco successfully initialized."); #[cfg(unix)] @@ -158,6 +194,12 @@ impl ServerRunner { } } } + if let Some(ser) = &runtime_status_service { + ser.update_runtime_status_by_status( + tucana::shared::adapter_runtime_status::Status::Stopped, + ) + .await; + }; log::info!("Draco shutdown complete"); Ok(())