Skip to content

Commit 27a62df

Browse files
authored
feat: SSE (#1487)
Add backend for SSE so that it sends alert message only if state = triggered
1 parent bc409f9 commit 27a62df

File tree

6 files changed

+365
-28
lines changed

6 files changed

+365
-28
lines changed

Cargo.lock

Lines changed: 123 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ parquet = "57.1.0"
3030

3131
# Web server and HTTP-related
3232
actix-cors = "0.7.0"
33+
actix-web-lab = "0.24.3"
3334
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
3435
actix-web-httpauth = "0.8"
3536
actix-web-prometheus = { version = "0.1" }
@@ -83,7 +84,7 @@ tokio = { version = "^1.43", default-features = false, features = [
8384
"fs",
8485
"rt-multi-thread",
8586
] }
86-
tokio-stream = { version = "0.1", features = ["fs"] }
87+
tokio-stream = { version = "0.1.17", features = ["fs"] }
8788
tokio-util = { version = "0.7" }
8889

8990
# # Logging and Metrics

src/alerts/mod.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,10 @@ use crate::alerts::alert_types::ThresholdAlert;
5858
use crate::alerts::target::{NotificationConfig, TARGETS};
5959
use crate::handlers::http::fetch_schema;
6060
use crate::metastore::MetastoreError;
61-
// use crate::handlers::http::query::create_streams_for_distributed;
62-
// use crate::option::Mode;
6361
use crate::parseable::{PARSEABLE, StreamNotFound};
6462
use crate::query::{QUERY_SESSION, resolve_stream_names};
65-
use crate::rbac::map::SessionKey;
63+
use crate::rbac::map::{SessionKey, sessions};
64+
use crate::sse::{SSE_HANDLER, SSEAlertInfo, SSEEvent};
6665
use crate::storage;
6766
use crate::storage::ObjectStorageError;
6867
use crate::sync::alert_runtime;
@@ -606,12 +605,39 @@ impl AlertConfig {
606605

607606
pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> {
608607
let mut context = self.get_context();
609-
context.message = message;
608+
context.message.clone_from(&message);
609+
610610
for target_id in &self.targets {
611611
let target = TARGETS.get_target_by_id(target_id).await?;
612612
trace!("Target (trigger_notifications)-\n{target:?}");
613613
target.call(context.clone());
614614
}
615+
616+
// get active sessions
617+
let active_sessions = sessions().get_active_sessions();
618+
let mut broadcast_to = vec![];
619+
for session in active_sessions {
620+
if user_auth_for_query(&session, &self.query).await.is_ok()
621+
&& let SessionKey::SessionId(id) = &session
622+
{
623+
broadcast_to.push(*id);
624+
}
625+
}
626+
627+
if self.state.eq(&AlertState::Triggered)
628+
&& let Ok(msg) = &serde_json::to_string(&SSEEvent {
629+
criticality: crate::sse::Criticality::Error,
630+
message: crate::sse::Message::AlertEvent(SSEAlertInfo {
631+
id: self.id,
632+
state: self.state,
633+
name: self.title.clone(),
634+
}),
635+
})
636+
&& !broadcast_to.is_empty()
637+
{
638+
SSE_HANDLER.broadcast(msg, Some(&broadcast_to)).await;
639+
}
640+
615641
Ok(())
616642
}
617643

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub mod prism;
4242
pub mod query;
4343
pub mod rbac;
4444
mod response;
45+
pub mod sse;
4546
mod static_schema;
4647
mod stats;
4748
pub mod storage;

src/rbac/map.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use super::{
2828
user,
2929
};
3030
use chrono::{DateTime, Utc};
31+
use itertools::Itertools;
3132
use once_cell::sync::{Lazy, OnceCell};
3233
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
3334
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -168,6 +169,10 @@ pub struct Sessions {
168169
}
169170

170171
impl Sessions {
172+
pub fn get_active_sessions(&self) -> Vec<SessionKey> {
173+
self.active_sessions.keys().cloned().collect_vec()
174+
}
175+
171176
// only checks if the session is expired or not
172177
pub fn is_session_expired(&self, key: &SessionKey) -> bool {
173178
// fetch userid from session key

0 commit comments

Comments
 (0)