diff --git a/crates/video-streamer/Cargo.toml b/crates/video-streamer/Cargo.toml index a6cf082b5..0e582d31a 100644 --- a/crates/video-streamer/Cargo.toml +++ b/crates/video-streamer/Cargo.toml @@ -19,7 +19,8 @@ tokio = { version = "1.45", features = [ "rt", "rt-multi-thread", "sync", - "macros" + "macros", + "time" ] } tokio-util = { version = "0.7", features = ["codec"] } tracing = "0.1" diff --git a/crates/video-streamer/src/streamer/mod.rs b/crates/video-streamer/src/streamer/mod.rs index cabac095d..2bce9059e 100644 --- a/crates/video-streamer/src/streamer/mod.rs +++ b/crates/video-streamer/src/streamer/mod.rs @@ -6,8 +6,7 @@ use futures_util::SinkExt; use iter::{IteratorError, WebmPositionedIterator}; use protocol::{ProtocolCodeC, UserFriendlyError}; use tag_writers::{EncodeWriterConfig, HeaderWriter, WriterResult}; -use tokio::sync::oneshot::error::RecvError; -use tokio::sync::{Mutex, Notify, mpsc}; +use tokio::sync::{Mutex, Notify, watch}; use tokio_util::codec::Framed; use tracing::Instrument; use webm_iterable::WebmIterator; @@ -64,8 +63,25 @@ pub fn webm_stream( // ChannelWriter is a writer that writes to a channel let (chunk_writer, chunk_receiver) = ChannelWriter::new(); - let (error_sender, error_receiver) = mpsc::channel(1); - let stop_notifier = Arc::new(Notify::new()); + let (shutdown_tx, shutdown_rx) = watch::channel(StreamShutdown::Running); + + // Bridge the external shutdown signal into the watch channel (single source of truth). + // Wrapped in AbortOnDrop so that early returns/bails from webm_stream always abort the + // bridge task, dropping its shutdown_tx clone and allowing control_task to resolve. + struct AbortOnDrop(tokio::task::JoinHandle<()>); + impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort(); + } + } + let _bridge_guard = AbortOnDrop({ + let shutdown_tx = shutdown_tx.clone(); + tokio::spawn(async move { + shutdown_signal.notified().await; + let _ = shutdown_tx.send(StreamShutdown::ExternalShutdown); + }) + }); + spawn_sending_task( ws_frame, chunk_receiver, @@ -73,9 +89,8 @@ pub fn webm_stream( cadeau::xmf::vpx::VpxCodec::VP8 => Some("vp8".to_owned()), cadeau::xmf::vpx::VpxCodec::VP9 => Some("vp9".to_owned()), }, - shutdown_signal, - error_receiver, - Arc::clone(&stop_notifier), + shutdown_tx.clone(), + shutdown_rx.clone(), ); let mut header_writer = HeaderWriter::new(chunk_writer); @@ -126,18 +141,17 @@ pub fn webm_stream( } retry_count += 1; - match when_eof(&when_new_chunk_appended, Arc::clone(&stop_notifier)) { - Ok(WhenEofControlFlow::Continue) => { + // INVARIANT: `shutdown_rx` must NEVER be consumed (.changed()/.borrow_and_update()) + // in this scope. Clones inherit the "last seen" version from the source, so keeping + // the source unconsumed guarantees every clone will detect any pending shutdown. + match when_eof(&when_new_chunk_appended, shutdown_rx.clone()) { + WhenEofControlFlow::Continue => { webm_itr.rollback_to_last_successful_tag()?; webm_itr.skip(1)?; } - Ok(WhenEofControlFlow::Break) => { + WhenEofControlFlow::Break => { break Ok(()); } - Err(e) => { - error_sender.blocking_send(UserFriendlyError::UnexpectedEOF)?; - anyhow::bail!(e); - } } } Some(Ok(tag)) => { @@ -146,7 +160,7 @@ pub fn webm_stream( if let Some(cut_block_hit_marker) = cut_block_hit_marker.take() { encode_writer.mark_cut_block_hit(cut_block_hit_marker); } else { - error_sender.blocking_send(UserFriendlyError::UnexpectedError)?; + let _ = shutdown_tx.send(StreamShutdown::Error(UserFriendlyError::UnexpectedError)); anyhow::bail!("cut block hit twice"); } } @@ -172,7 +186,7 @@ pub fn webm_stream( } } Some(Err(e)) => { - error_sender.blocking_send(UserFriendlyError::UnexpectedError)?; + let _ = shutdown_tx.send(StreamShutdown::Error(UserFriendlyError::UnexpectedError)); break Err(e.into()); } } @@ -180,12 +194,16 @@ pub fn webm_stream( info!(?result, "WebM streaming finished"); + // _bridge_guard (AbortOnDrop) is dropped here, aborting the bridge task so its + // shutdown_tx clone is dropped. This ensures control_task's shutdown_rx.changed() + // will resolve (Err) instead of hanging forever when webm_stream exits without + // an explicit shutdown signal. return result; fn when_eof( when_new_chunk_appended: &impl Fn() -> tokio::sync::oneshot::Receiver<()>, - stop_notifier: Arc, - ) -> Result { + mut shutdown_rx: watch::Receiver, + ) -> WhenEofControlFlow { let (tx, rx) = tokio::sync::oneshot::channel(); let when_new_chunk_appended_receiver = when_new_chunk_appended(); tokio::spawn(async move { @@ -193,7 +211,7 @@ pub fn webm_stream( _ = when_new_chunk_appended_receiver => { let _ = tx.send(WhenEofControlFlow::Continue); }, - _ = stop_notifier.notified() => { + _ = shutdown_rx.changed() => { let _ = tx.send(WhenEofControlFlow::Break); }, _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { @@ -202,7 +220,13 @@ pub fn webm_stream( } } }); - rx.blocking_recv() + // If the oneshot sender is dropped (task panicked), treat as Break + rx.blocking_recv().unwrap_or(WhenEofControlFlow::Break) + } + + enum WhenEofControlFlow { + Continue, + Break, } } @@ -210,31 +234,41 @@ fn spawn_sending_task( ws_frame: Framed, mut chunk_receiver: ChannelWriterReceiver, codec: Option, - shutdown_signal: Arc, - mut error_receiver: mpsc::Receiver, - stop_notifier: Arc, + shutdown_tx: watch::Sender, + mut shutdown_rx: watch::Receiver, ) where W: tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin + Send + 'static, { use futures_util::stream::StreamExt; let ws_frame = Arc::new(Mutex::new(ws_frame)); let ws_frame_clone = Arc::clone(&ws_frame); + let mut handle_shutdown_rx = shutdown_rx.clone(); + // Spawn a dedicated task to handle incoming messages from the client // Reasoning: tokio::select! will stuck on `chunk_receiver.recv()` when there's no more data to receive // This will disable the ability to receive shutdown signal let handle = tokio::task::spawn(async move { loop { - let client_message = { - let mut ws_frame = ws_frame.lock().await; - ws_frame.next().await + // Select on both the next client message and shutdown, so we don't + // get stuck waiting for a silent client when shutdown is requested. + let client_message = tokio::select! { + msg = async { + let mut ws_frame = ws_frame.lock().await; + ws_frame.next().await + } => msg, + _ = handle_shutdown_rx.changed() => { + break; + } }; match client_message { None => { + let _ = shutdown_tx.send(StreamShutdown::ClientDisconnected); break; } Some(Err(e)) => { warn!(error = %e, "Error while receiving message from client"); + let _ = shutdown_tx.send(StreamShutdown::ClientDisconnected); break; } Some(Ok(protocol::ClientMessage::Start)) => { @@ -249,14 +283,35 @@ fn spawn_sending_task( ) .await; } - Some(Ok(protocol::ClientMessage::Pull)) => match chunk_receiver.recv().await { - Some(data) => { - ws_send(&ws_frame, protocol::ServerMessage::Chunk(&data)).await; - } - _ => { - break; + Some(Ok(protocol::ClientMessage::Pull)) => { + tokio::select! { + chunk = chunk_receiver.recv() => { + match chunk { + Some(data) => { + ws_send(&ws_frame, protocol::ServerMessage::Chunk(&data)).await; + } + None => { + // Channel closed, producer is done + break; + } + } + } + _ = handle_shutdown_rx.changed() => { + break; + } } - }, + } + } + } + // Best-effort: deliver a final message to client before closing the socket. + // Read the shutdown reason to decide whether to send End or Error. + let shutdown_reason = handle_shutdown_rx.borrow().clone(); + match shutdown_reason { + StreamShutdown::Error(err) => { + ws_send(&ws_frame, protocol::ServerMessage::Error(err)).await; + } + _ => { + ws_send(&ws_frame, protocol::ServerMessage::End).await; } } let _ = ws_frame.lock().await.get_mut().shutdown().await; @@ -265,27 +320,31 @@ fn spawn_sending_task( let control_task = async move { info!("Starting streaming task"); - loop { - tokio::select! { - err = error_receiver.recv() => { - if let Some(err) = err { - ws_send(&ws_frame_clone, protocol::ServerMessage::Error(err)).await; - break; - } else { - continue; - } - }, - _ = shutdown_signal.notified() => { + let result = shutdown_rx.changed().await; + if result.is_ok() { + let reason = shutdown_rx.borrow().clone(); + match reason { + StreamShutdown::Error(err) => { + ws_send(&ws_frame_clone, protocol::ServerMessage::Error(err)).await; + } + StreamShutdown::ExternalShutdown => { info!("Received shutdown signal"); ws_send(&ws_frame_clone, protocol::ServerMessage::End).await; - break; - }, + } + StreamShutdown::ClientDisconnected => { + ws_send(&ws_frame_clone, protocol::ServerMessage::End).await; + } + StreamShutdown::Running => { + // Spurious wake, shouldn't happen since we only send non-Running values + warn!("Received shutdown signal with Running state, ignoring"); + } } } + // If result is Err, the sender was dropped — stream is done info!("Stopping streaming task"); let _ = ws_frame_clone.lock().await.get_mut().shutdown().await; - handle.abort(); - stop_notifier.notify_waiters(); + // Wait briefly for handle to finish gracefully instead of aborting + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await; Ok::<_, anyhow::Error>(()) } .instrument(tracing::span!(tracing::Level::INFO, "Streaming WebM task")); @@ -307,7 +366,10 @@ fn spawn_sending_task( } } -enum WhenEofControlFlow { - Continue, - Break, +#[derive(Clone, Debug, PartialEq)] +enum StreamShutdown { + Running, + ClientDisconnected, + ExternalShutdown, + Error(UserFriendlyError), } diff --git a/crates/video-streamer/src/streamer/protocol.rs b/crates/video-streamer/src/streamer/protocol.rs index bc87b9759..b486d65dd 100644 --- a/crates/video-streamer/src/streamer/protocol.rs +++ b/crates/video-streamer/src/streamer/protocol.rs @@ -73,17 +73,15 @@ impl codec::Decoder for ProtocolCodeC { } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub(crate) enum UserFriendlyError { UnexpectedError, - UnexpectedEOF, } impl UserFriendlyError { pub(crate) fn as_str(&self) -> &'static str { match self { UserFriendlyError::UnexpectedError => "UnexpectedError", - UserFriendlyError::UnexpectedEOF => "UnexpectedEOF", } } }