Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/video-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ tokio = { version = "1.45", features = [
"rt",
"rt-multi-thread",
"sync",
"macros"
"macros",
"time"
] }
tokio-util = { version = "0.7", features = ["codec"] }
tracing = "0.1"
Expand Down
164 changes: 113 additions & 51 deletions crates/video-streamer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use futures_util::SinkExt;
use iter::{IteratorError, WebmPositionedIterator};
use protocol::{ProtocolCodeC, UserFriendlyError};
use tag_writers::{EncodeWriterConfig, HeaderWriter, WriterResult};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{Mutex, Notify, mpsc};
use tokio::sync::{Mutex, Notify, watch};
use tokio_util::codec::Framed;
use tracing::Instrument;
use webm_iterable::WebmIterator;
Expand Down Expand Up @@ -64,18 +63,34 @@ pub fn webm_stream(

// ChannelWriter is a writer that writes to a channel
let (chunk_writer, chunk_receiver) = ChannelWriter::new();
let (error_sender, error_receiver) = mpsc::channel(1);
let stop_notifier = Arc::new(Notify::new());
let (shutdown_tx, shutdown_rx) = watch::channel(StreamShutdown::Running);

// Bridge the external shutdown signal into the watch channel (single source of truth).
// Wrapped in AbortOnDrop so that early returns/bails from webm_stream always abort the
// bridge task, dropping its shutdown_tx clone and allowing control_task to resolve.
struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
let _bridge_guard = AbortOnDrop({
let shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
shutdown_signal.notified().await;
let _ = shutdown_tx.send(StreamShutdown::ExternalShutdown);
})
});

spawn_sending_task(
ws_frame,
chunk_receiver,
match encode_writer_config.codec {
cadeau::xmf::vpx::VpxCodec::VP8 => Some("vp8".to_owned()),
cadeau::xmf::vpx::VpxCodec::VP9 => Some("vp9".to_owned()),
},
shutdown_signal,
error_receiver,
Arc::clone(&stop_notifier),
shutdown_tx.clone(),
shutdown_rx.clone(),
);

let mut header_writer = HeaderWriter::new(chunk_writer);
Expand Down Expand Up @@ -126,18 +141,17 @@ pub fn webm_stream(
}

retry_count += 1;
match when_eof(&when_new_chunk_appended, Arc::clone(&stop_notifier)) {
Ok(WhenEofControlFlow::Continue) => {
// INVARIANT: `shutdown_rx` must NEVER be consumed (.changed()/.borrow_and_update())
// in this scope. Clones inherit the "last seen" version from the source, so keeping
// the source unconsumed guarantees every clone will detect any pending shutdown.
match when_eof(&when_new_chunk_appended, shutdown_rx.clone()) {
WhenEofControlFlow::Continue => {
webm_itr.rollback_to_last_successful_tag()?;
webm_itr.skip(1)?;
}
Ok(WhenEofControlFlow::Break) => {
WhenEofControlFlow::Break => {
break Ok(());
}
Err(e) => {
error_sender.blocking_send(UserFriendlyError::UnexpectedEOF)?;
anyhow::bail!(e);
}
}
}
Some(Ok(tag)) => {
Expand All @@ -146,7 +160,7 @@ pub fn webm_stream(
if let Some(cut_block_hit_marker) = cut_block_hit_marker.take() {
encode_writer.mark_cut_block_hit(cut_block_hit_marker);
} else {
error_sender.blocking_send(UserFriendlyError::UnexpectedError)?;
let _ = shutdown_tx.send(StreamShutdown::Error(UserFriendlyError::UnexpectedError));
anyhow::bail!("cut block hit twice");
}
}
Expand All @@ -172,28 +186,32 @@ pub fn webm_stream(
}
}
Some(Err(e)) => {
error_sender.blocking_send(UserFriendlyError::UnexpectedError)?;
let _ = shutdown_tx.send(StreamShutdown::Error(UserFriendlyError::UnexpectedError));
break Err(e.into());
}
}
};

info!(?result, "WebM streaming finished");

// _bridge_guard (AbortOnDrop) is dropped here, aborting the bridge task so its
// shutdown_tx clone is dropped. This ensures control_task's shutdown_rx.changed()
// will resolve (Err) instead of hanging forever when webm_stream exits without
// an explicit shutdown signal.
return result;

fn when_eof(
when_new_chunk_appended: &impl Fn() -> tokio::sync::oneshot::Receiver<()>,
stop_notifier: Arc<Notify>,
) -> Result<WhenEofControlFlow, RecvError> {
mut shutdown_rx: watch::Receiver<StreamShutdown>,
) -> WhenEofControlFlow {
let (tx, rx) = tokio::sync::oneshot::channel();
let when_new_chunk_appended_receiver = when_new_chunk_appended();
tokio::spawn(async move {
tokio::select! {
_ = when_new_chunk_appended_receiver => {
let _ = tx.send(WhenEofControlFlow::Continue);
},
_ = stop_notifier.notified() => {
_ = shutdown_rx.changed() => {
let _ = tx.send(WhenEofControlFlow::Break);
},
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
Expand All @@ -202,39 +220,55 @@ pub fn webm_stream(
}
}
});
rx.blocking_recv()
// If the oneshot sender is dropped (task panicked), treat as Break
rx.blocking_recv().unwrap_or(WhenEofControlFlow::Break)
}

enum WhenEofControlFlow {
Continue,
Break,
}
}

fn spawn_sending_task<W>(
ws_frame: Framed<W, ProtocolCodeC>,
mut chunk_receiver: ChannelWriterReceiver,
codec: Option<String>,
shutdown_signal: Arc<Notify>,
mut error_receiver: mpsc::Receiver<UserFriendlyError>,
stop_notifier: Arc<Notify>,
shutdown_tx: watch::Sender<StreamShutdown>,
mut shutdown_rx: watch::Receiver<StreamShutdown>,
) where
W: tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin + Send + 'static,
{
use futures_util::stream::StreamExt;
let ws_frame = Arc::new(Mutex::new(ws_frame));
let ws_frame_clone = Arc::clone(&ws_frame);
let mut handle_shutdown_rx = shutdown_rx.clone();

// Spawn a dedicated task to handle incoming messages from the client
// Reasoning: tokio::select! will stuck on `chunk_receiver.recv()` when there's no more data to receive
// This will disable the ability to receive shutdown signal
let handle = tokio::task::spawn(async move {
loop {
let client_message = {
let mut ws_frame = ws_frame.lock().await;
ws_frame.next().await
// Select on both the next client message and shutdown, so we don't
// get stuck waiting for a silent client when shutdown is requested.
let client_message = tokio::select! {
msg = async {
let mut ws_frame = ws_frame.lock().await;
ws_frame.next().await
} => msg,
_ = handle_shutdown_rx.changed() => {
break;
}
};

match client_message {
None => {
let _ = shutdown_tx.send(StreamShutdown::ClientDisconnected);
break;
}
Some(Err(e)) => {
warn!(error = %e, "Error while receiving message from client");
let _ = shutdown_tx.send(StreamShutdown::ClientDisconnected);
break;
}
Some(Ok(protocol::ClientMessage::Start)) => {
Expand All @@ -249,14 +283,35 @@ fn spawn_sending_task<W>(
)
.await;
}
Some(Ok(protocol::ClientMessage::Pull)) => match chunk_receiver.recv().await {
Some(data) => {
ws_send(&ws_frame, protocol::ServerMessage::Chunk(&data)).await;
}
_ => {
break;
Some(Ok(protocol::ClientMessage::Pull)) => {
tokio::select! {
chunk = chunk_receiver.recv() => {
match chunk {
Some(data) => {
ws_send(&ws_frame, protocol::ServerMessage::Chunk(&data)).await;
}
None => {
// Channel closed, producer is done
break;
}
}
}
_ = handle_shutdown_rx.changed() => {
break;
}
}
},
}
}
}
// Best-effort: deliver a final message to client before closing the socket.
// Read the shutdown reason to decide whether to send End or Error.
let shutdown_reason = handle_shutdown_rx.borrow().clone();
match shutdown_reason {
StreamShutdown::Error(err) => {
ws_send(&ws_frame, protocol::ServerMessage::Error(err)).await;
}
_ => {
ws_send(&ws_frame, protocol::ServerMessage::End).await;
}
}
let _ = ws_frame.lock().await.get_mut().shutdown().await;
Expand All @@ -265,27 +320,31 @@ fn spawn_sending_task<W>(

let control_task = async move {
info!("Starting streaming task");
loop {
tokio::select! {
err = error_receiver.recv() => {
if let Some(err) = err {
ws_send(&ws_frame_clone, protocol::ServerMessage::Error(err)).await;
break;
} else {
continue;
}
},
_ = shutdown_signal.notified() => {
let result = shutdown_rx.changed().await;
if result.is_ok() {
let reason = shutdown_rx.borrow().clone();
match reason {
StreamShutdown::Error(err) => {
ws_send(&ws_frame_clone, protocol::ServerMessage::Error(err)).await;
}
StreamShutdown::ExternalShutdown => {
info!("Received shutdown signal");
ws_send(&ws_frame_clone, protocol::ServerMessage::End).await;
break;
},
}
StreamShutdown::ClientDisconnected => {
ws_send(&ws_frame_clone, protocol::ServerMessage::End).await;
}
StreamShutdown::Running => {
// Spurious wake, shouldn't happen since we only send non-Running values
warn!("Received shutdown signal with Running state, ignoring");
}
}
}
// If result is Err, the sender was dropped — stream is done
info!("Stopping streaming task");
let _ = ws_frame_clone.lock().await.get_mut().shutdown().await;
handle.abort();
stop_notifier.notify_waiters();
// Wait briefly for handle to finish gracefully instead of aborting
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
Ok::<_, anyhow::Error>(())
}
.instrument(tracing::span!(tracing::Level::INFO, "Streaming WebM task"));
Expand All @@ -307,7 +366,10 @@ fn spawn_sending_task<W>(
}
}

enum WhenEofControlFlow {
Continue,
Break,
#[derive(Clone, Debug, PartialEq)]
enum StreamShutdown {
Running,
ClientDisconnected,
ExternalShutdown,
Error(UserFriendlyError),
}
4 changes: 1 addition & 3 deletions crates/video-streamer/src/streamer/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,15 @@ impl codec::Decoder for ProtocolCodeC {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum UserFriendlyError {
UnexpectedError,
UnexpectedEOF,
}

impl UserFriendlyError {
pub(crate) fn as_str(&self) -> &'static str {
match self {
UserFriendlyError::UnexpectedError => "UnexpectedError",
UserFriendlyError::UnexpectedEOF => "UnexpectedEOF",
}
}
}
Expand Down