Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions adapter/cron/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use base::traits::{IdentifiableFlow, LoadConfig, Server};
use chrono::{DateTime, Datelike, Timelike, Utc};
use cron::Schedule;
use std::str::FromStr;
use tucana::shared::ValidationFlow;
use tucana::shared::value::Kind;
use tucana::shared::{RuntimeFeature, Translation, ValidationFlow};

#[derive(Default)]
struct Cron {}
Expand All @@ -24,7 +24,19 @@ impl LoadConfig for CronConfig {
async fn main() {
let server = Cron::default();
let runner = ServerRunner::new(server).await.unwrap();
runner.serve().await.unwrap();

let featues = vec![RuntimeFeature {
name: vec![Translation {
code: "en-US".to_string(),
content: "Cron Adapter".to_string(),
}],
description: vec![Translation {
code: "en-US".to_string(),
content: "A Cron-Adapter is a time-based scheduler that runs commands or scripts automatically at specified times or intervals.".to_string(),
}],
}];

runner.serve(featues, vec![]).await.unwrap();
}

struct Time {
Expand Down
71 changes: 32 additions & 39 deletions adapter/rest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tonic::async_trait;
use tucana::shared::value::Kind::StructValue;
use tucana::shared::{ListValue, value::Kind};
use tucana::shared::{AdapterConfiguration, RuntimeFeature, value::Kind};
use tucana::shared::{Struct, ValidationFlow, Value};
use tucana::shared::{Translation, value::Kind::StructValue};

mod config;
mod route;
Expand All @@ -39,7 +39,30 @@ async fn main() {
Err(err) => panic!("Failed to create server runner: {:?}", err),
};
log::info!("Successfully created runner for http service");
match runner.serve().await {

let addr = runner.get_server_config().port;
let host = runner.get_server_config().host.clone();

let featues = vec![RuntimeFeature {
name: vec![Translation {
code: "en-US".to_string(),
content: "Rest Adapter".to_string(),
}],
description: vec![Translation {
code: "en-US".to_string(),
content: "A Rest-Adapter is a server that exposes resources through HTTP URLs (endpoints). Clients use methods like GET, POST, PUT, and DELETE to retrieve or modify data, typically exchanged as JSON.".to_string(),
}],
}];

let configs = vec![AdapterConfiguration {
data: Some(tucana::shared::adapter_configuration::Data::Endpoint(
format!(
r"{}:{}/${{project_slug}}/${{flow_setting_identifier}}",
host, addr
),
)),
}];
match runner.serve(featues, configs).await {
Ok(_) => (),
Err(err) => panic!("Failed to start server runner: {:?}", err),
};
Expand Down Expand Up @@ -186,8 +209,8 @@ async fn execute_flow_to_hyper_response(
// headers struct
let Value {
kind:
Some(Kind::ListValue(ListValue {
values: header_fields,
Some(Kind::StructValue(Struct {
fields: header_fields,
})),
} = headers_val
else {
Expand All @@ -199,46 +222,16 @@ async fn execute_flow_to_hyper_response(

let http_headers: HashMap<String, String> = header_fields
.iter()
.filter_map(|x| {
.filter_map(|(k, v)| {
if let Value {
kind: Some(StructValue(Struct { fields: f })),
} = x
kind: Some(Kind::StringValue(x)),
} = v
{
Some(f)
Some((k.clone(), x.clone()))
} else {
None
}
})
.filter_map(|f| {
let key = match f.get("key") {
Some(value) => {
if let Value {
kind: Some(Kind::StringValue(x)),
} = value
{
x
} else {
return None;
}
}
None => return None,
};
let value = match f.get("value") {
Some(value) => {
if let Value {
kind: Some(Kind::StringValue(x)),
} = value
{
x
} else {
return None;
}
}
None => return None,
};

Some((key.clone(), value.clone()))
})
.collect();

// status_code number
Expand Down
130 changes: 130 additions & 0 deletions crates/base/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::time::{SystemTime, UNIX_EPOCH};

use tokio::time::sleep;
use tonic::transport::{Channel, Endpoint};
use tucana::{
aquila::{
RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient,
runtime_status_update_request::Status,
},
shared::{AdapterConfiguration, AdapterRuntimeStatus, RuntimeFeature},
};

pub struct DracoRuntimeStatusService {
channel: Channel,
identifier: String,
features: Vec<RuntimeFeature>,
configs: Vec<AdapterConfiguration>,
}

const MAX_BACKOFF: u64 = 2000 * 60;
const MAX_RETRIES: i8 = 10;

// Will create a channel and retry if its not possible
pub async fn create_channel_with_retry(channel_name: &str, url: String) -> Channel {
let mut backoff = 100;
let mut retries = 0;

loop {
let channel = match Endpoint::from_shared(url.clone()) {
Ok(c) => {
log::debug!("Creating a new endpoint for the: {} Service", channel_name);
c.connect_timeout(std::time::Duration::from_secs(2))
.timeout(std::time::Duration::from_secs(10))
}
Err(err) => {
panic!(
"Cannot create Endpoint for Service: `{}`. Reason: {:?}",
channel_name, err
);
}
};

match channel.connect().await {
Ok(ch) => {
return ch;
}
Err(err) => {
log::warn!(
"Retry connect to `{}` using url: `{}` failed: {:?}, retrying in {}ms",
channel_name,
url,
err,
backoff
);
sleep(std::time::Duration::from_millis(backoff)).await;

backoff = (backoff * 2).min(MAX_BACKOFF);
retries += 1;

if retries >= MAX_RETRIES {
panic!("Reached max retries to url {}", url)
}
}
}
}
}
impl DracoRuntimeStatusService {
pub async fn from_url(
aquila_url: String,
identifier: String,
features: Vec<RuntimeFeature>,
configs: Vec<AdapterConfiguration>,
) -> Self {
let channel = create_channel_with_retry("Aquila", aquila_url).await;
Self::new(channel, identifier, features, configs)
}

pub fn new(
channel: Channel,
identifier: String,
features: Vec<RuntimeFeature>,
configs: Vec<AdapterConfiguration>,
) -> Self {
DracoRuntimeStatusService {
channel,
identifier,
features,
configs,
}
}

pub async fn update_runtime_status_by_status(
&self,
status: tucana::shared::adapter_runtime_status::Status,
) {
log::info!("Updating the current Runtime Status!");
let mut client = RuntimeStatusServiceClient::new(self.channel.clone());

let now = SystemTime::now();
let timestamp = match now.duration_since(UNIX_EPOCH) {
Ok(time) => time.as_secs(),
Err(err) => {
log::error!("cannot get current system time: {:?}", err);
0
}
};

let request = RuntimeStatusUpdateRequest {
status: Some(Status::AdapterRuntimeStatus(AdapterRuntimeStatus {
status: status.into(),
timestamp: timestamp as i64,
identifier: self.identifier.clone(),
features: self.features.clone(),
configurations: self.configs.clone(),
})),
};

match client.update(request).await {
Ok(response) => {
log::info!(
"Was the update of the RuntimeStatus accepted by Sagittarius? {}",
response.into_inner().success
);
}
Err(err) => {
log::error!("Failed to update RuntimeStatus: {:?}", err);
}
}
}
}
1 change: 1 addition & 0 deletions crates/base/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod client;
pub mod config;
pub mod runner;
pub mod store;
Expand Down
44 changes: 43 additions & 1 deletion crates/base/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
client::DracoRuntimeStatusService,
config::AdapterConfig,
store::AdapterStore,
traits::{LoadConfig, Server as AdapterServer},
Expand All @@ -8,6 +9,7 @@ use std::sync::Arc;
use tokio::signal;
use tonic::transport::Server;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::{AdapterConfiguration, RuntimeFeature};

/// Context passed to adapter server implementations containing all shared resources
pub struct ServerContext<C: LoadConfig> {
Expand All @@ -23,6 +25,10 @@ pub struct ServerRunner<C: LoadConfig> {
}

impl<C: LoadConfig> ServerRunner<C> {
pub fn get_server_config(&self) -> Arc<C> {
self.context.server_config.clone()
}

pub async fn new<S: AdapterServer<C>>(server: S) -> anyhow::Result<Self> {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Debug)
Expand Down Expand Up @@ -50,11 +56,34 @@ impl<C: LoadConfig> ServerRunner<C> {
})
}

pub async fn serve(self) -> anyhow::Result<()> {
pub async fn serve(
self,
runtime_feature: Vec<RuntimeFeature>,
runtime_config: Vec<AdapterConfiguration>,
) -> anyhow::Result<()> {
let config = self.context.adapter_config.clone();
let mut runtime_status_service: Option<DracoRuntimeStatusService> = None;

log::info!("Starting Draco Variant: {}", config.draco_variant);

if !config.is_static() {
runtime_status_service = Some(
DracoRuntimeStatusService::from_url(
config.aquila_url.clone(),
config.draco_variant.clone(),
runtime_feature,
runtime_config,
)
.await,
);

if let Some(ser) = &runtime_status_service {
ser.update_runtime_status_by_status(
tucana::shared::adapter_runtime_status::Status::NotReady,
)
.await;
};

let definition_service = FlowUpdateService::from_url(
config.aquila_url.clone(),
config.definition_path.as_str(),
Expand Down Expand Up @@ -95,6 +124,13 @@ impl<C: LoadConfig> ServerRunner<C> {
} = self;
// Init the adapter server (e.g. create underlying HTTP server)
server.init(&context).await?;

if let Some(ser) = &runtime_status_service {
ser.update_runtime_status_by_status(
tucana::shared::adapter_runtime_status::Status::Running,
)
.await;
};
log::info!("Draco successfully initialized.");

#[cfg(unix)]
Expand Down Expand Up @@ -158,6 +194,12 @@ impl<C: LoadConfig> ServerRunner<C> {
}
}
}
if let Some(ser) = &runtime_status_service {
ser.update_runtime_status_by_status(
tucana::shared::adapter_runtime_status::Status::Stopped,
)
.await;
};

log::info!("Draco shutdown complete");
Ok(())
Expand Down