diff --git a/crates/taurus/src/client/mod.rs b/crates/taurus/src/client/mod.rs index 51d602a..940ca04 100644 --- a/crates/taurus/src/client/mod.rs +++ b/crates/taurus/src/client/mod.rs @@ -1 +1,2 @@ pub mod runtime_status; +pub mod runtime_usage; diff --git a/crates/taurus/src/client/runtime_usage.rs b/crates/taurus/src/client/runtime_usage.rs new file mode 100644 index 0000000..a638c66 --- /dev/null +++ b/crates/taurus/src/client/runtime_usage.rs @@ -0,0 +1,39 @@ +use code0_flow::flow_service::retry::create_channel_with_retry; +use tonic::transport::Channel; +use tucana::{ + aquila::{RuntimeUsageRequest, runtime_usage_service_client::RuntimeUsageServiceClient}, + shared::RuntimeUsage, +}; + +pub struct TaurusRuntimeUsageService { + channel: Channel, +} + +impl TaurusRuntimeUsageService { + pub async fn from_url(aquila_url: String) -> Self { + let channel = create_channel_with_retry("Aquila", aquila_url).await; + TaurusRuntimeUsageService { channel } + } + + pub async fn update_runtime_usage(&self, runtime_usage: RuntimeUsage) { + log::info!("Updating the current Runtime Status!"); + let mut client = RuntimeUsageServiceClient::new(self.channel.clone()); + + let request = RuntimeUsageRequest { + runtime_usage: vec![runtime_usage], + }; + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the RuntimeStatus accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update RuntimeStatus: {:?}", err); + } + } + } +} + diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 1058a49..2d26e87 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -2,6 +2,7 @@ mod client; mod config; use crate::client::runtime_status::TaurusRuntimeStatusService; +use crate::client::runtime_usage::TaurusRuntimeUsageService; use crate::config::Config; use code0_flow::flow_service::FlowUpdateService; @@ -11,6 +12,7 @@ use futures_lite::StreamExt; use log::error; use prost::Message; use std::collections::HashMap; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use taurus_core::context::context::Context; use taurus_core::context::executor::Executor; use taurus_core::context::registry::FunctionStore; @@ -18,9 +20,12 @@ use taurus_core::context::signal::Signal; use tokio::signal; use tonic_health::pb::health_server::HealthServer; use tucana::shared::value::Kind; -use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value}; +use tucana::shared::{ + ExecutionFlow, NodeFunction, RuntimeFeature, RuntimeUsage, Translation, Value, +}; -fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal { +fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, RuntimeUsage) { + let start = Instant::now(); let mut context = Context::default(); let node_functions: HashMap = flow @@ -29,7 +34,17 @@ fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal { .map(|node| (node.database_id, node)) .collect(); - Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true) + let signal = + Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true); + let duration_millis = start.elapsed().as_millis() as i64; + + ( + signal, + RuntimeUsage { + flow_id: flow.flow_id, + duration: duration_millis, + }, + ) } #[tokio::main] @@ -43,6 +58,7 @@ async fn main() { let config = Config::new(); let store = FunctionStore::default(); let mut runtime_status_service: Option = None; + let mut runtime_usage_service: Option = None; let client = match async_nats::connect(config.nats_url.clone()).await { Ok(client) => { @@ -92,6 +108,9 @@ async fn main() { .send() .await; + let usage_service = TaurusRuntimeUsageService::from_url(config.aquila_url.clone()).await; + runtime_usage_service = Some(usage_service); + let status_service = TaurusRuntimeStatusService::from_url( config.aquila_url.clone(), "taurus".into(), @@ -143,7 +162,8 @@ async fn main() { }; let flow_id = flow.flow_id; - let value = match handle_message(flow, &store) { + let result = handle_message(flow, &store); + let value = match result.0 { Signal::Failure(error) => { log::error!( "RuntimeError occurred, execution failed because: {:?}", @@ -180,6 +200,10 @@ async fn main() { Err(err) => log::error!("Failed to send response: {:?}", err), } } + + if let Some(usage_service) = &runtime_usage_service { + usage_service.update_runtime_usage(result.1).await; + } } log::info!("NATS worker loop ended");