Skip to content

Commit 149d47c

Browse files
committed
Acquire tokio mutex before spawn_blocking to avoid blocking pool threads while waiting for locks
Signed-off-by: Jorge Prendes <jorge.prendes@gmail.com>
1 parent 6f0fba2 commit 149d47c

1 file changed

Lines changed: 152 additions & 107 deletions

File tree

src/js-host-api/src/lib.rs

Lines changed: 152 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
2828
use napi::{tokio, Status};
2929
use napi_derive::napi;
3030
use serde_json::Value as JsonValue;
31-
use tokio::sync::oneshot;
31+
use tokio::sync::{oneshot, Mutex as AsyncMutex, OwnedMappedMutexGuard, OwnedMutexGuard};
3232

3333
// ── napi-rs wrapper architecture ──────────────────────────────────────
3434
//
@@ -47,6 +47,20 @@ use tokio::sync::oneshot;
4747
// `spawn_blocking` closure on a background thread — we clone the `Arc` so the
4848
// wrapper struct remains valid on the JS side while the Rust side works.
4949
//
50+
// ## Mutex choice: std vs tokio
51+
//
52+
// Most wrappers (`SandboxBuilderWrapper`, `ProtoJSSandboxWrapper`,
53+
// `JSSandboxWrapper`) use `std::sync::Mutex` since they only hold locks
54+
// briefly for quick operations.
55+
//
56+
// `LoadedJSSandboxWrapper` uses `tokio::sync::Mutex` because `call_handler()`
57+
// holds the lock for the **entire duration** of guest code execution
58+
// (potentially seconds). Using an async mutex allows:
59+
// - Async methods (`unload`, `dispose`) to `.lock().await` without blocking
60+
// - Long-running methods (`call_handler`, `snapshot`, `restore`) to acquire the
61+
// lock asynchronously, then move the `OwnedMutexGuard` into `spawn_blocking`
62+
// for CPU-intensive guest execution
63+
//
5064
// ## Why `LoadedJSSandboxWrapper` stores fields outside the Mutex
5165
//
5266
// `call_handler()` holds the Mutex for the **entire duration** of
@@ -786,7 +800,7 @@ impl JSSandboxWrapper {
786800
let interrupt = loaded_sandbox.interrupt_handle();
787801
let poisoned_flag = Arc::new(AtomicBool::new(loaded_sandbox.poisoned()));
788802
Ok(LoadedJSSandboxWrapper {
789-
inner: Arc::new(Mutex::new(Some(loaded_sandbox))),
803+
inner: Arc::new(AsyncMutex::new(Some(loaded_sandbox))),
790804
interrupt,
791805
poisoned_flag,
792806
last_call_stats: Arc::new(ArcSwapOption::empty()),
@@ -836,7 +850,7 @@ impl JSSandboxWrapper {
836850
/// ```
837851
#[napi(js_name = "LoadedJSSandbox")]
838852
pub struct LoadedJSSandboxWrapper {
839-
inner: Arc<Mutex<Option<LoadedJSSandbox>>>,
853+
inner: Arc<AsyncMutex<Option<LoadedJSSandbox>>>,
840854

841855
/// Stored **outside** the Mutex so callers can `kill()` a running handler.
842856
///
@@ -875,6 +889,68 @@ pub struct LoadedJSSandboxWrapper {
875889
disposed_flag: Arc<AtomicBool>,
876890
}
877891

892+
type LoadedJSSandboxGuard = OwnedMappedMutexGuard<Option<LoadedJSSandbox>, LoadedJSSandbox>;
893+
894+
impl LoadedJSSandboxWrapper {
895+
/// Borrow the inner value mutably via Mutex, or error if consumed.
896+
async fn with_inner<R>(
897+
&self,
898+
f: impl AsyncFnOnce(LoadedJSSandboxGuard) -> napi::Result<R>,
899+
) -> napi::Result<R> {
900+
let sandbox = self.inner.clone().lock_owned().await;
901+
let sandbox = OwnedMutexGuard::try_map(sandbox, Option::as_mut)
902+
.map_err(|_| consumed_error("LoadedJSSandbox"))?;
903+
f(sandbox).await
904+
}
905+
906+
/// Borrow the inner value mutably via Mutex, or error if consumed.
907+
/// The closure `f` will run using spawn_blocking, so it can perform long-running operations without
908+
/// blocking the Node.js event loop. This is the main way to interact with the inner `LoadedJSSandbox`.
909+
async fn with_blocking_inner<R: Send + 'static>(
910+
&self,
911+
f: impl FnOnce(LoadedJSSandboxGuard) -> napi::Result<R> + Send + 'static,
912+
) -> napi::Result<R> {
913+
self.with_inner(async move |sandbox| {
914+
tokio::task::spawn_blocking(move || f(sandbox))
915+
.await
916+
.map_err(join_error)?
917+
})
918+
.await
919+
}
920+
921+
/// Take ownership of the inner value, returning a consumed-state error if
922+
/// this instance has already been used.
923+
async fn take_inner_with<R>(
924+
&self,
925+
f: impl AsyncFnOnce(LoadedJSSandbox) -> napi::Result<R>,
926+
) -> napi::Result<R> {
927+
let sandbox = self
928+
.inner
929+
.lock()
930+
.await
931+
.take()
932+
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
933+
self.disposed_flag.store(true, Ordering::Release);
934+
f(sandbox).await
935+
}
936+
937+
/// Take ownership of the inner value, returning a consumed-state error if
938+
/// this instance has already been used.
939+
/// The closure `f` will run using spawn_blocking, so it can perform long-running operations without
940+
/// blocking the Node.js event loop. This is the main way to interact with the inner `LoadedJSSandbox`.
941+
async fn take_blocking_inner_with<R: Send + 'static>(
942+
&self,
943+
f: impl FnOnce(LoadedJSSandbox) -> napi::Result<R> + Send + 'static,
944+
) -> napi::Result<R> {
945+
self.take_inner_with(async move |sandbox| {
946+
tokio::task::spawn_blocking(move || f(sandbox))
947+
.await
948+
.map_err(join_error)?
949+
})
950+
.await
951+
}
952+
}
953+
878954
#[napi]
879955
impl LoadedJSSandboxWrapper {
880956
/// Invoke a handler function with the given event data, optionally
@@ -939,7 +1015,6 @@ impl LoadedJSSandboxWrapper {
9391015
)));
9401016
}
9411017

942-
let inner = self.inner.clone();
9431018
let poisoned_flag = self.poisoned_flag.clone();
9441019
let last_call_stats_store = self.last_call_stats.clone();
9451020
let gc = options.gc;
@@ -950,68 +1025,64 @@ impl LoadedJSSandboxWrapper {
9501025
let event_json = serde_json::to_string(&event_data)
9511026
.map_err(|e| invalid_arg_error(&format!("Failed to serialize event: {e}")))?;
9521027

953-
let result_json = tokio::task::spawn_blocking(move || {
954-
let mut guard = inner.lock().map_err(|_| lock_error())?;
955-
let sandbox = guard
956-
.as_mut()
957-
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
958-
959-
// Dispatch to the appropriate Rust method based on whether
960-
// any monitor timeouts are specified.
961-
//
962-
// The three `handle_event_with_monitor` arms look duplicated, but
963-
// each constructs a different concrete monitor type (single or tuple).
964-
// The sealed `MonitorSet` trait is not object-safe, so we can't
965-
// erase the type behind a `dyn` — the match is structurally required.
966-
let result = match (wall_clock_timeout_ms, cpu_timeout_ms) {
967-
// No monitors — fast path
968-
(None, None) => sandbox
969-
.handle_event(handler_name, event_json, gc)
970-
.map_err(to_napi_error),
971-
// Both — tuple with OR semantics (recommended)
972-
(Some(wall_ms), Some(cpu_ms)) => {
973-
let monitor = (
974-
WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
975-
.map_err(to_napi_error)?,
976-
CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
977-
.map_err(to_napi_error)?,
978-
);
979-
sandbox
980-
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
981-
.map_err(to_napi_error)
982-
}
983-
// Wall-clock only
984-
(Some(wall_ms), None) => {
985-
let monitor = WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
986-
.map_err(to_napi_error)?;
1028+
let result_json = self
1029+
.with_blocking_inner(move |mut sandbox| {
1030+
// Dispatch to the appropriate Rust method based on whether
1031+
// any monitor timeouts are specified.
1032+
//
1033+
// The three `handle_event_with_monitor` arms look duplicated, but
1034+
// each constructs a different concrete monitor type (single or tuple).
1035+
// The sealed `MonitorSet` trait is not object-safe, so we can't
1036+
// erase the type behind a `dyn` — the match is structurally required.
1037+
let result = match (wall_clock_timeout_ms, cpu_timeout_ms) {
1038+
// No monitors — fast path
1039+
(None, None) => sandbox
1040+
.handle_event(handler_name, event_json, gc)
1041+
.map_err(to_napi_error),
1042+
// Both — tuple with OR semantics (recommended)
1043+
(Some(wall_ms), Some(cpu_ms)) => {
1044+
let monitor = (
1045+
WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
1046+
.map_err(to_napi_error)?,
1047+
CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
1048+
.map_err(to_napi_error)?,
1049+
);
1050+
sandbox
1051+
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
1052+
.map_err(to_napi_error)
1053+
}
1054+
// Wall-clock only
1055+
(Some(wall_ms), None) => {
1056+
let monitor = WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
1057+
.map_err(to_napi_error)?;
1058+
sandbox
1059+
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
1060+
.map_err(to_napi_error)
1061+
}
1062+
// CPU only
1063+
(None, Some(cpu_ms)) => {
1064+
let monitor = CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
1065+
.map_err(to_napi_error)?;
1066+
sandbox
1067+
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
1068+
.map_err(to_napi_error)
1069+
}
1070+
};
1071+
// Update poisoned flag while we hold the lock — keeps the getter
1072+
// lock-free so it never blocks the Node.js event loop.
1073+
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
1074+
1075+
// Copy execution stats while we still hold the lock.
1076+
last_call_stats_store.store(
9871077
sandbox
988-
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
989-
.map_err(to_napi_error)
990-
}
991-
// CPU only
992-
(None, Some(cpu_ms)) => {
993-
let monitor = CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
994-
.map_err(to_napi_error)?;
995-
sandbox
996-
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
997-
.map_err(to_napi_error)
998-
}
999-
};
1000-
// Update poisoned flag while we hold the lock — keeps the getter
1001-
// lock-free so it never blocks the Node.js event loop.
1002-
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
1078+
.last_call_stats()
1079+
.map(|s| Arc::new(CallStats::from(s))),
1080+
);
10031081

1004-
// Copy execution stats while we still hold the lock.
1005-
last_call_stats_store.store(
1006-
sandbox
1007-
.last_call_stats()
1008-
.map(|s| Arc::new(CallStats::from(s))),
1009-
);
1082+
result
1083+
})
1084+
.await?;
10101085

1011-
result
1012-
})
1013-
.await
1014-
.map_err(join_error)??;
10151086
// Parse the JSON string result back into a JS object
10161087
serde_json::from_str(&result_json).map_err(|e| {
10171088
hl_error(
@@ -1032,18 +1103,9 @@ impl LoadedJSSandboxWrapper {
10321103
/// @throws If already consumed
10331104
#[napi]
10341105
pub async fn unload(&self) -> napi::Result<JSSandboxWrapper> {
1035-
let inner = self.inner.clone();
1036-
let disposed = self.disposed_flag.clone();
1037-
let js_sandbox = tokio::task::spawn_blocking(move || {
1038-
let mut guard = inner.lock().map_err(|_| lock_error())?;
1039-
let loaded = guard
1040-
.take()
1041-
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
1042-
disposed.store(true, Ordering::Release);
1043-
loaded.unload().map_err(to_napi_error)
1044-
})
1045-
.await
1046-
.map_err(join_error)??;
1106+
let js_sandbox = self
1107+
.take_blocking_inner_with(|sandbox| sandbox.unload().map_err(to_napi_error))
1108+
.await?;
10471109
Ok(JSSandboxWrapper {
10481110
inner: Arc::new(Mutex::new(Some(js_sandbox))),
10491111
})
@@ -1145,19 +1207,15 @@ impl LoadedJSSandboxWrapper {
11451207
/// @throws If already consumed
11461208
#[napi]
11471209
pub async fn snapshot(&self) -> napi::Result<SnapshotWrapper> {
1148-
let inner = self.inner.clone();
11491210
let poisoned_flag = self.poisoned_flag.clone();
1150-
let snapshot = tokio::task::spawn_blocking(move || {
1151-
let mut guard = inner.lock().map_err(|_| lock_error())?;
1152-
let sandbox = guard
1153-
.as_mut()
1154-
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
1155-
let result = sandbox.snapshot().map_err(to_napi_error);
1156-
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
1157-
result
1158-
})
1159-
.await
1160-
.map_err(join_error)??;
1211+
1212+
let snapshot = self
1213+
.with_blocking_inner(move |mut sandbox| {
1214+
let result = sandbox.snapshot().map_err(to_napi_error);
1215+
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
1216+
result
1217+
})
1218+
.await?;
11611219
Ok(SnapshotWrapper { inner: snapshot })
11621220
}
11631221

@@ -1172,20 +1230,17 @@ impl LoadedJSSandboxWrapper {
11721230
/// @throws If the snapshot doesn't match this sandbox, or if consumed
11731231
#[napi]
11741232
pub async fn restore(&self, snapshot: &SnapshotWrapper) -> napi::Result<()> {
1175-
let inner = self.inner.clone();
11761233
let snap = snapshot.inner.clone();
11771234
let poisoned_flag = self.poisoned_flag.clone();
1178-
tokio::task::spawn_blocking(move || {
1179-
let mut guard = inner.lock().map_err(|_| lock_error())?;
1180-
let sandbox = guard
1181-
.as_mut()
1182-
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
1235+
1236+
self.with_blocking_inner(move |mut sandbox| {
11831237
let result = sandbox.restore(snap).map_err(to_napi_error);
11841238
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
11851239
result
11861240
})
1187-
.await
1188-
.map_err(join_error)?
1241+
.await?;
1242+
1243+
Ok(())
11891244
}
11901245

11911246
/// Eagerly release the underlying sandbox resources.
@@ -1198,18 +1253,8 @@ impl LoadedJSSandboxWrapper {
11981253
/// Calling `dispose()` on an already-consumed sandbox is a no-op.
11991254
#[napi]
12001255
pub async fn dispose(&self) -> napi::Result<()> {
1201-
if self.disposed_flag.load(Ordering::Acquire) {
1202-
return Ok(());
1203-
}
1204-
let inner = self.inner.clone();
1205-
let disposed = self.disposed_flag.clone();
1206-
tokio::task::spawn_blocking(move || {
1207-
let _ = inner.lock().map_err(|_| lock_error())?.take();
1208-
disposed.store(true, Ordering::Release);
1209-
Ok(())
1210-
})
1211-
.await
1212-
.map_err(join_error)?
1256+
self.take_inner_with(async |_| Ok(())).await?;
1257+
Ok(())
12131258
}
12141259
}
12151260

0 commit comments

Comments
 (0)