Skip to content

Commit 2bd506d

Browse files
committed
Drain active streams before codec switch
1 parent cc2585f commit 2bd506d

2 files changed

Lines changed: 37 additions & 7 deletions

File tree

server/src/api/routes.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tokio::net::TcpStream;
3131
use tokio::process::Command;
3232
use tokio::sync::mpsc;
3333
use tokio::task;
34-
use tokio::time::timeout;
34+
use tokio::time::{sleep, timeout};
3535
use tower_http::trace::TraceLayer;
3636

3737
#[derive(Clone)]
@@ -741,12 +741,7 @@ async fn set_video_codec(
741741
let codec = normalize_video_codec(&payload.codec).ok_or_else(|| {
742742
AppError::bad_request("Request body must include codec: hevc, h264, or h264-software.")
743743
})?;
744-
let active_streams = state.metrics.active_streams.load(Ordering::Relaxed);
745-
if active_streams > 0 {
746-
return Err(AppError::conflict(format!(
747-
"Cannot switch codec while {active_streams} stream(s) are active."
748-
)));
749-
}
744+
wait_for_streams_to_drain(&state, &udid).await?;
750745
std::env::set_var("SIMDECK_VIDEO_CODEC", codec);
751746
state.registry.remove(&udid);
752747
Ok(json(json_value!({
@@ -755,6 +750,30 @@ async fn set_video_codec(
755750
})))
756751
}
757752

753+
async fn wait_for_streams_to_drain(state: &AppState, udid: &str) -> Result<(), AppError> {
754+
const DRAIN_ATTEMPTS: usize = 40;
755+
const DRAIN_INTERVAL: Duration = Duration::from_millis(100);
756+
757+
for attempt in 0..DRAIN_ATTEMPTS {
758+
let active_streams = state.metrics.active_streams.load(Ordering::Relaxed);
759+
if active_streams == 0 {
760+
return Ok(());
761+
}
762+
if attempt == 0 {
763+
crate::transport::webrtc::cancel_media_stream(udid);
764+
}
765+
sleep(DRAIN_INTERVAL).await;
766+
}
767+
768+
let active_streams = state.metrics.active_streams.load(Ordering::Relaxed);
769+
if active_streams == 0 {
770+
return Ok(());
771+
}
772+
Err(AppError::conflict(format!(
773+
"Timed out waiting for {active_streams} active stream(s) to close before switching codec."
774+
)))
775+
}
776+
758777
async fn open_url(
759778
State(state): State<AppState>,
760779
Path(udid): Path<String>,

server/src/transport/webrtc.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,17 @@ fn clear_webrtc_media_stream(udid: &str, token: &broadcast::Sender<()>) {
408408
}
409409
}
410410

411+
pub fn cancel_media_stream(udid: &str) -> bool {
412+
let Some(streams) = WEBRTC_MEDIA_STREAMS.get() else {
413+
return false;
414+
};
415+
let Some(stream) = streams.lock().unwrap().get(udid).cloned() else {
416+
return false;
417+
};
418+
let _ = stream.send(());
419+
true
420+
}
421+
411422
fn ice_servers() -> Vec<RTCIceServer> {
412423
let mut urls = std::env::var("SIMDECK_WEBRTC_ICE_SERVERS")
413424
.ok()

0 commit comments

Comments
 (0)