Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/taurus/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod runtime_status;
pub mod runtime_usage;
39 changes: 39 additions & 0 deletions crates/taurus/src/client/runtime_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use code0_flow::flow_service::retry::create_channel_with_retry;
use tonic::transport::Channel;
use tucana::{
aquila::{RuntimeUsageRequest, runtime_usage_service_client::RuntimeUsageServiceClient},
shared::RuntimeUsage,
};

pub struct TaurusRuntimeUsageService {
channel: Channel,
}

impl TaurusRuntimeUsageService {
pub async fn from_url(aquila_url: String) -> Self {
let channel = create_channel_with_retry("Aquila", aquila_url).await;
TaurusRuntimeUsageService { channel }
}

pub async fn update_runtime_usage(&self, runtime_usage: RuntimeUsage) {
log::info!("Updating the current Runtime Status!");
let mut client = RuntimeUsageServiceClient::new(self.channel.clone());

let request = RuntimeUsageRequest {
runtime_usage: vec![runtime_usage],
};

match client.update(request).await {
Ok(response) => {
log::info!(
"Was the update of the RuntimeStatus accepted by Sagittarius? {}",
response.into_inner().success
);
}
Err(err) => {
log::error!("Failed to update RuntimeStatus: {:?}", err);
}
}
}
}

32 changes: 28 additions & 4 deletions crates/taurus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod client;
mod config;

use crate::client::runtime_status::TaurusRuntimeStatusService;
use crate::client::runtime_usage::TaurusRuntimeUsageService;
use crate::config::Config;
use code0_flow::flow_service::FlowUpdateService;

Expand All @@ -11,16 +12,20 @@ use futures_lite::StreamExt;
use log::error;
use prost::Message;
use std::collections::HashMap;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use taurus_core::context::context::Context;
use taurus_core::context::executor::Executor;
use taurus_core::context::registry::FunctionStore;
use taurus_core::context::signal::Signal;
use tokio::signal;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::value::Kind;
use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value};
use tucana::shared::{
ExecutionFlow, NodeFunction, RuntimeFeature, RuntimeUsage, Translation, Value,
};

fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal {
fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, RuntimeUsage) {
let start = Instant::now();
let mut context = Context::default();

let node_functions: HashMap<i64, NodeFunction> = flow
Expand All @@ -29,7 +34,17 @@ fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal {
.map(|node| (node.database_id, node))
.collect();

Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true)
let signal =
Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true);
let duration_millis = start.elapsed().as_millis() as i64;

(
signal,
RuntimeUsage {
flow_id: flow.flow_id,
duration: duration_millis,
},
)
}

#[tokio::main]
Expand All @@ -43,6 +58,7 @@ async fn main() {
let config = Config::new();
let store = FunctionStore::default();
let mut runtime_status_service: Option<TaurusRuntimeStatusService> = None;
let mut runtime_usage_service: Option<TaurusRuntimeUsageService> = None;

let client = match async_nats::connect(config.nats_url.clone()).await {
Ok(client) => {
Expand Down Expand Up @@ -92,6 +108,9 @@ async fn main() {
.send()
.await;

let usage_service = TaurusRuntimeUsageService::from_url(config.aquila_url.clone()).await;
runtime_usage_service = Some(usage_service);

let status_service = TaurusRuntimeStatusService::from_url(
config.aquila_url.clone(),
"taurus".into(),
Expand Down Expand Up @@ -143,7 +162,8 @@ async fn main() {
};

let flow_id = flow.flow_id;
let value = match handle_message(flow, &store) {
let result = handle_message(flow, &store);
let value = match result.0 {
Signal::Failure(error) => {
log::error!(
"RuntimeError occurred, execution failed because: {:?}",
Expand Down Expand Up @@ -180,6 +200,10 @@ async fn main() {
Err(err) => log::error!("Failed to send response: {:?}", err),
}
}

if let Some(usage_service) = &runtime_usage_service {
usage_service.update_runtime_usage(result.1).await;
}
}

log::info!("NATS worker loop ended");
Expand Down