Skip to content

Commit dbcd781

Browse files
committed
Shut down watchers before exiting
1 parent 8dabcce commit dbcd781

2 files changed

Lines changed: 60 additions & 6 deletions

File tree

src/core.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ pub enum RuntimeEffect {
109109
LogInfo {
110110
message: String,
111111
},
112+
StopWatching,
112113
StopAllProcesses,
113114
Exit,
114115
}
@@ -377,6 +378,7 @@ impl RuntimeMachine {
377378
self.pending_effects.push_back(RuntimeEffect::LogInfo {
378379
message: "received ctrl-c, shutting down".into(),
379380
});
381+
self.pending_effects.push_back(RuntimeEffect::StopWatching);
380382
self.pending_effects
381383
.push_back(RuntimeEffect::StopAllProcesses);
382384
self.pending_effects.push_back(RuntimeEffect::Exit);
@@ -849,6 +851,7 @@ mod tests {
849851
message: "received ctrl-c, shutting down".into()
850852
})
851853
);
854+
assert_eq!(runtime.next_effect(), Some(RuntimeEffect::StopWatching));
852855
assert_eq!(runtime.next_effect(), Some(RuntimeEffect::StopAllProcesses));
853856
assert_eq!(runtime.next_effect(), Some(RuntimeEffect::Exit));
854857
assert_eq!(runtime.next_effect(), None);

src/engine.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::collections::{BTreeMap, BTreeSet};
22
use std::path::Path;
3+
use std::sync::Arc;
4+
use std::sync::atomic::{AtomicBool, Ordering};
35
use std::sync::mpsc;
46
use std::time::Duration;
57

@@ -52,6 +54,7 @@ trait RuntimeEffectAdapter {
5254
async fn maintain_processes(&mut self) -> Result<()>;
5355
async fn poll_observed_hook(&mut self, hook: &str) -> Result<bool>;
5456
async fn log_info(&mut self, message: String) -> Result<()>;
57+
async fn stop_watching(&mut self) -> Result<()>;
5558
async fn stop_all_processes(&mut self) -> Result<()>;
5659
}
5760

@@ -66,6 +69,7 @@ struct LiveRuntimeAdapter<'a, 'b> {
6669
processes: &'a mut ProcessManager<'b>,
6770
state: &'a SessionState,
6871
watcher: &'a mut RecommendedWatcher,
72+
watcher_shutdown: Arc<AtomicBool>,
6973
external_event_tx: tokio::sync::mpsc::UnboundedSender<ExternalEventMessage>,
7074
external_event_server: Option<ExternalEventServer>,
7175
browser_reload_server: Option<BrowserReloadServer>,
@@ -88,14 +92,11 @@ impl Engine {
8892
let (tx, rx) = mpsc::channel();
8993
let (external_event_tx, mut external_event_rx) = tokio::sync::mpsc::unbounded_channel();
9094
let tx_watcher = tx.clone();
95+
let watcher_shutdown = Arc::new(AtomicBool::new(false));
96+
let watcher_shutdown_callback = watcher_shutdown.clone();
9197
let mut watcher = RecommendedWatcher::new(
9298
move |result| {
93-
if let Err(error) = tx_watcher.send(result) {
94-
error!(
95-
"failed to forward watcher event into runtime loop: {}",
96-
error
97-
);
98-
}
99+
forward_watcher_event(&tx_watcher, &watcher_shutdown_callback, result);
99100
},
100101
NotifyConfig::default(),
101102
)?;
@@ -111,6 +112,7 @@ impl Engine {
111112
processes: &mut processes,
112113
state: &state,
113114
watcher: &mut watcher,
115+
watcher_shutdown,
114116
external_event_tx,
115117
external_event_server: None,
116118
browser_reload_server: None,
@@ -295,6 +297,12 @@ impl RuntimeEffectAdapter for LiveRuntimeAdapter<'_, '_> {
295297
Ok(())
296298
}
297299

300+
async fn stop_watching(&mut self) -> Result<()> {
301+
self.watcher_shutdown.store(true, Ordering::Relaxed);
302+
self.watcher.unwatch(&self.config.root)?;
303+
Ok(())
304+
}
305+
298306
async fn stop_all_processes(&mut self) -> Result<()> {
299307
self.processes.stop_all(self.state).await
300308
}
@@ -337,6 +345,7 @@ async fn execute_runtime_effects<A: RuntimeEffectAdapter>(
337345
}
338346
}
339347
RuntimeEffect::LogInfo { message } => adapter.log_info(message).await?,
348+
RuntimeEffect::StopWatching => adapter.stop_watching().await?,
340349
RuntimeEffect::StopAllProcesses => adapter.stop_all_processes().await?,
341350
RuntimeEffect::Exit => return Ok(true),
342351
}
@@ -345,6 +354,21 @@ async fn execute_runtime_effects<A: RuntimeEffectAdapter>(
345354
Ok(false)
346355
}
347356

357+
fn forward_watcher_event(
358+
tx: &mpsc::Sender<notify::Result<Event>>,
359+
shutting_down: &AtomicBool,
360+
result: notify::Result<Event>,
361+
) {
362+
if let Err(error) = tx.send(result)
363+
&& !shutting_down.load(Ordering::Relaxed)
364+
{
365+
error!(
366+
"failed to forward watcher event into runtime loop: {}",
367+
error
368+
);
369+
}
370+
}
371+
348372
#[cfg(test)]
349373
async fn run_workflow(
350374
config: &Config,
@@ -903,6 +927,7 @@ mod tests {
903927
calls: Vec<String>,
904928
changed_hooks: BTreeMap<String, bool>,
905929
workflow_errors: BTreeMap<String, String>,
930+
watching: bool,
906931
}
907932

908933
impl MockRuntimeAdapter {
@@ -911,6 +936,7 @@ mod tests {
911936
calls: Vec::new(),
912937
changed_hooks: BTreeMap::new(),
913938
workflow_errors: BTreeMap::new(),
939+
watching: false,
914940
}
915941
}
916942
}
@@ -953,6 +979,7 @@ mod tests {
953979

954980
async fn start_watching(&mut self) -> Result<()> {
955981
self.calls.push("watch".into());
982+
self.watching = true;
956983
Ok(())
957984
}
958985

@@ -971,6 +998,12 @@ mod tests {
971998
Ok(())
972999
}
9731000

1001+
async fn stop_watching(&mut self) -> Result<()> {
1002+
self.calls.push("stop_watch".into());
1003+
self.watching = false;
1004+
Ok(())
1005+
}
1006+
9741007
async fn stop_all_processes(&mut self) -> Result<()> {
9751008
self.calls.push("stop_all".into());
9761009
Ok(())
@@ -1055,11 +1088,29 @@ mod tests {
10551088
"poll:current_post_slug",
10561089
"workflow:publish_post_url:",
10571090
"log:received ctrl-c, shutting down",
1091+
"stop_watch",
10581092
"stop_all",
10591093
]
10601094
);
10611095
}
10621096

1097+
#[test]
1098+
fn forward_watcher_event_ignores_send_failures_after_shutdown() {
1099+
let (tx, rx) = mpsc::channel();
1100+
let shutdown = AtomicBool::new(true);
1101+
drop(rx);
1102+
1103+
forward_watcher_event(
1104+
&tx,
1105+
&shutdown,
1106+
Ok(Event {
1107+
kind: EventKind::Any,
1108+
paths: vec![PathBuf::from("content/layout.html")],
1109+
attrs: Default::default(),
1110+
}),
1111+
);
1112+
}
1113+
10631114
#[tokio::test]
10641115
async fn runtime_machine_does_not_run_observed_workflow_when_hook_state_is_unchanged() {
10651116
let mut config = Config {

0 commit comments

Comments
 (0)