From 2751290036e093dfe35062afe3a2c286a53515f3 Mon Sep 17 00:00:00 2001 From: Paul Morris <10599524+pmorris-dev@users.noreply.github.com> Date: Tue, 21 Apr 2026 21:57:20 -0400 Subject: [PATCH] fix: rollback on drop so write conn returns to pool clean (#46) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dropping an ActiveInterruptibleTransaction without commit/rollback left the write connection in the pool with an open transaction. sqlx pools reuse connections rather than closing them, so SQLite's close-time auto-rollback never fired, and the next acquire_writer() got a connection where BEGIN IMMEDIATE failed with "cannot start a transaction within a transaction". Drop now takes the writer and spawns a tokio task that issues ROLLBACK (and detach_if_attached) before the connection is released to the pool. The rollback task is bounded by a 5 s timeout so a stuck ROLLBACK cannot hold the single-writer permit indefinitely. To eliminate a panic risk at Drop time on threads without a tokio thread-local (notably Tauri teardown after a programmatic app_handle.exit(N) on the main thread), ActiveInterruptibleTransaction captures a tokio runtime handle at construction and uses it unconditionally in Drop. The insert()/remove()/abort_all() paths that previously relied on Drop now roll back explicitly. As defense-in-depth against any other writer path that might leak a transaction (e.g. user code that does BEGIN and returns early), the write pool gets an after_release hook that runs ROLLBACK. The hook treats "no transaction is active" as the expected benign case and instructs the pool to discard the connection on any other ROLLBACK failure, so a broken connection is not handed to the next caller. The plugin's ExitRequested handler no longer short-circuits on programmatic exits (Some(code)). Treating those the same as user-initiated exits means a user-space app_handle.exit(N) — fatal- error handler, auto-updater, Ctrl+C handler — triggers transaction and database cleanup instead of tearing down plugin state with live interruptible transactions still in the map. The original exit code is preserved through the cleanup detour via ExitGuard. CLEANUP_STATE == 2 is used to recognize the ExitGuard-driven re-exit and let it through. Adds a regression test that mirrors the reporter's MCVE plus a second test covering the attached-database drop path. Both wrap the second begin_interruptible_transaction in tokio::time::timeout so a regression in the drop path fails CI fast instead of hanging. Updates the README to document the auto-rollback-on-drop guarantee (including early returns via ?). Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 10 +- crates/sqlx-sqlite-conn-mgr/src/database.rs | 30 +++- .../sqlx-sqlite-toolkit/src/transactions.rs | 147 +++++++++++++----- .../tests/interruptible_transaction_tests.rs | 138 ++++++++++++++++ src/lib.rs | 57 ++++--- 5 files changed, 322 insertions(+), 60 deletions(-) diff --git a/README.md b/README.md index f2bc71f..93e3a9c 100644 --- a/README.md +++ b/README.md @@ -381,8 +381,10 @@ await tx.commit(); * Only one interruptible transaction can be active per database at a time * The write lock is held for the entire duration - keep transactions short * Uncommitted writes are visible only within the transaction's `read()` method - * Always commit or rollback - abandoned transactions will rollback automatically - on app exit + * If the transaction handle is dropped without calling `commit()` or + `rollback()`, the transaction is automatically rolled back and the write + connection is released back to the pool. This also happens on app exit + and on transaction timeout. To rollback instead of committing: @@ -784,6 +786,10 @@ println!("Transaction completed: {} statements executed", results.len()); For transactions that need to read data mid-transaction: +If `tx` is dropped without calling `commit()` or `rollback()` — including via +an early return from a `?` operator — the transaction is automatically rolled +back and the write connection is released back to the pool. + ```rust // Assuming user_id, product_id, item_total are defined in your application context let user_id = 123; diff --git a/crates/sqlx-sqlite-conn-mgr/src/database.rs b/crates/sqlx-sqlite-conn-mgr/src/database.rs index 90194ae..a48a86b 100644 --- a/crates/sqlx-sqlite-conn-mgr/src/database.rs +++ b/crates/sqlx-sqlite-conn-mgr/src/database.rs @@ -10,7 +10,7 @@ use sqlx::{ConnectOptions, Pool, Sqlite}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use tracing::error; +use tracing::{error, warn}; /// Analysis limit for PRAGMA optimize on close. /// SQLite recommends 100-1000 for older versions; 3.46.0+ handles automatically. @@ -177,12 +177,40 @@ impl SqliteDatabase { .read_only(false) .optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT); + // Defense-in-depth: when any writer is returned to the pool, issue + // ROLLBACK to discard any transaction that a caller may have left open + // (e.g., a writer dropped after BEGIN without COMMIT/ROLLBACK). SQLite + // only auto-rollbacks on connection close, not on pool return, so + // without this the next acquire_writer() sees "cannot start a + // transaction within a transaction". + // + // Error handling: the expected benign case on a clean connection is + // "cannot rollback - no transaction is active" — recycle normally. + // Anything else means ROLLBACK itself failed or the connection is + // wedged; tell the pool not to recycle so a broken connection isn't + // handed to the next caller. let write_conn = SqlitePoolOptions::new() .max_connections(1) .min_connections(0) .idle_timeout(Some(std::time::Duration::from_secs( config.idle_timeout_secs, ))) + .after_release(|conn, _meta| { + Box::pin(async move { + match sqlx::query("ROLLBACK").execute(&mut *conn).await { + Ok(_) => Ok(true), + Err(sqlx::Error::Database(e)) + if e.message().contains("no transaction is active") => + { + Ok(true) + } + Err(err) => { + warn!("after_release ROLLBACK failed, discarding connection: {err}"); + Ok(false) + } + } + }) + }) .connect_with(write_options) .await?; diff --git a/crates/sqlx-sqlite-toolkit/src/transactions.rs b/crates/sqlx-sqlite-toolkit/src/transactions.rs index 01cb709..fafc1ce 100644 --- a/crates/sqlx-sqlite-toolkit/src/transactions.rs +++ b/crates/sqlx-sqlite-toolkit/src/transactions.rs @@ -99,15 +99,28 @@ pub struct ActiveInterruptibleTransaction { transaction_id: String, writer: Option, created_at: Instant, + // Captured at construction so Drop can always spawn the rollback task on a + // valid runtime, even when the struct is dropped from a thread that has no + // tokio thread-local (e.g., Tauri teardown on the main thread). Without a + // stored handle, Drop's synchronous path through PoolConnection::Drop would + // call sqlx's rt::spawn and panic with "this functionality requires a Tokio + // context". + runtime_handle: tokio::runtime::Handle, } impl ActiveInterruptibleTransaction { + /// # Panics + /// + /// Panics if called outside a tokio runtime context. Both production call + /// sites (the plugin command handler and the direct Rust API) run inside + /// async functions, so this is a programming error, not a runtime risk. pub fn new(db_path: String, transaction_id: String, writer: TransactionWriter) -> Self { Self { db_path, transaction_id, writer: Some(writer), created_at: Instant::now(), + runtime_handle: tokio::runtime::Handle::current(), } } @@ -230,17 +243,62 @@ impl From<(String, Vec)> for Statement { } } +/// Upper bound on how long the auto-rollback task may hold the writer permit +/// before it is considered hung and the connection is abandoned. +const DROP_ROLLBACK_TIMEOUT: Duration = Duration::from_secs(5); + impl Drop for ActiveInterruptibleTransaction { fn drop(&mut self) { - // If writer is still present, it means commit/rollback wasn't called. - // SQLite will automatically ROLLBACK the transaction when the connection - // is returned to the pool if no explicit COMMIT was issued. - if self.writer.is_some() { - debug!( - "Dropping transaction for db: {}, tx_id: {} (will auto-rollback)", - self.db_path, self.transaction_id - ); - } + // If writer is still present, commit/rollback was not called. The connection + // is about to return to the pool — we must issue ROLLBACK explicitly because + // sqlx pools reuse the connection (SQLite only auto-rollbacks on close, not + // on pool return). Without this, the next acquire_writer() gets a connection + // with an open transaction and "BEGIN IMMEDIATE" fails. + let Some(mut writer) = self.writer.take() else { + return; + }; + let db_path = std::mem::take(&mut self.db_path); + let tx_id = std::mem::take(&mut self.transaction_id); + + debug!( + "Dropping transaction for db: {}, tx_id: {} (auto-rollback scheduled)", + db_path, tx_id + ); + + // No race with the next acquire_writer(): `writer` owns the PoolConnection + // (via WriteGuard / AttachedWriteGuard), which holds the single-writer + // permit. The permit is not released until `writer` drops at the end of + // this task — after ROLLBACK completes. The next acquire_writer() blocks + // on that permit, so it cannot see a connection with a still-open tx. + // + // The timeout bounds how long a pathological ROLLBACK (stuck I/O, a + // rogue busy lock) can keep the single-writer pool stalled. On timeout + // we drop `writer` inside the runtime; after_release then cleans up. + self.runtime_handle.spawn(async move { + let result = tokio::time::timeout(DROP_ROLLBACK_TIMEOUT, async { + if let Err(e) = writer.rollback().await { + warn!( + "auto-rollback on drop failed (db: {}, tx: {}): {}", + db_path, tx_id, e + ); + } + if let Err(e) = writer.detach_if_attached().await { + warn!( + "detach_all after auto-rollback failed (db: {}, tx: {}): {}", + db_path, tx_id, e + ); + } + // writer drops here — connection returns to pool clean + }) + .await; + + if result.is_err() { + warn!( + "auto-rollback on drop timed out after {:?} (db: {}, tx: {}) — pool's after_release hook will reconcile", + DROP_ROLLBACK_TIMEOUT, db_path, tx_id + ); + } + }); } } @@ -288,8 +346,10 @@ impl ActiveInterruptibleTransactions { Ok(()) } Entry::Occupied(mut e) => { - // If the existing transaction has expired, drop it (auto-rollback) and - // replace with the new one. + // If the existing transaction has expired, roll it back and replace + // with the new one. We rollback explicitly (rather than relying on + // Drop) so the writer is guaranteed to return to the pool clean + // before the caller tries to start a new transaction on it. if e.get().created_at.elapsed() >= self.timeout { warn!( "Evicting expired transaction for db: {} (age: {:?}, timeout: {:?})", @@ -297,8 +357,10 @@ impl ActiveInterruptibleTransactions { e.get().created_at.elapsed(), self.timeout, ); - // Drop the expired transaction (auto-rollback) before inserting the new one - let _expired = e.insert(tx); + let expired = e.insert(tx); + if let Err(err) = expired.rollback().await { + warn!("rollback of expired transaction failed (db: {db_path}): {err}"); + } Ok(()) } else { Err(Error::TransactionAlreadyActive(db_path)) @@ -308,26 +370,30 @@ impl ActiveInterruptibleTransactions { } pub async fn abort_all(&self) { - let mut txs = self.inner.lock().await; - debug!("Aborting {} active interruptible transaction(s)", txs.len()); - - for db_path in txs.keys() { + // Drain under the lock, then release it before awaiting rollbacks so we + // don't hold the mutex across a chain of awaits. + let drained: Vec<(String, ActiveInterruptibleTransaction)> = { + let mut txs = self.inner.lock().await; + debug!("Aborting {} active interruptible transaction(s)", txs.len()); + txs.drain().collect() + }; + + for (db_path, tx) in drained { debug!( - "Dropping interruptible transaction for database: {}", + "Rolling back interruptible transaction for database: {}", db_path ); + if let Err(err) = tx.rollback().await { + warn!("rollback during abort_all failed (db: {db_path}): {err}"); + } } - - // Clear all transactions to drop WriteGuards and release locks - // Dropping triggers auto-rollback via Drop trait - txs.clear(); } /// Remove and return transaction for commit/rollback. /// /// Returns `Err(Error::TransactionTimedOut)` if the transaction has exceeded the - /// configured timeout. The expired transaction is dropped (auto-rolled-back) in - /// that case. + /// configured timeout. The expired transaction is rolled back before the error + /// is returned. pub async fn remove( &self, db_path: &str, @@ -335,7 +401,6 @@ impl ActiveInterruptibleTransactions { ) -> Result { let mut txs = self.inner.lock().await; - // Validate token before removal let tx = txs .get(db_path) .ok_or_else(|| Error::NoActiveTransaction(db_path.to_string()))?; @@ -344,21 +409,27 @@ impl ActiveInterruptibleTransactions { return Err(Error::InvalidTransactionToken); } - // Check if the transaction has expired - if tx.created_at.elapsed() >= self.timeout { - warn!( - "Transaction timed out for db: {} (age: {:?}, timeout: {:?})", - db_path, - tx.created_at.elapsed(), - self.timeout, - ); - // Drop the expired transaction (auto-rollback via Drop) - txs.remove(db_path); - return Err(Error::TransactionTimedOut(db_path.to_string())); + // Happy path: not expired, hand it back to the caller. + if tx.created_at.elapsed() < self.timeout { + // Safe unwrap: we just confirmed the key exists above. + return Ok(txs.remove(db_path).unwrap()); } - // Safe unwrap: we just confirmed the key exists above - Ok(txs.remove(db_path).unwrap()) + // Expired: take it out, release the lock, then rollback without holding + // it so other callers aren't blocked on an unrelated cleanup. + warn!( + "Transaction timed out for db: {} (age: {:?}, timeout: {:?})", + db_path, + tx.created_at.elapsed(), + self.timeout, + ); + let expired = txs.remove(db_path).unwrap(); + drop(txs); + + if let Err(err) = expired.rollback().await { + warn!("rollback of timed-out transaction failed (db: {db_path}): {err}"); + } + Err(Error::TransactionTimedOut(db_path.to_string())) } } diff --git a/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs b/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs index f705902..3958fcd 100644 --- a/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs +++ b/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs @@ -377,3 +377,141 @@ async fn test_execute_transaction_rollback_on_failure() { db.remove().await.unwrap(); } + +/// How long to wait for the post-drop rollback to complete before failing +/// the test. Comfortably larger than the drop-path rollback timeout, so a +/// regression (stuck rollback, leaked writer permit) fails fast rather than +/// hanging CI on the next `acquire_writer()`. +const DROP_TEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +/// Regression test for issue #46: +/// https://github.com/silvermine/tauri-plugin-sqlite/issues/46 +/// +/// Dropping an interruptible transaction without calling commit/rollback must +/// leave the shared write connection in a clean state so the next transaction +/// can start. Previously the writer was returned to the pool with an open +/// transaction, causing "cannot start a transaction within a transaction" on +/// the next BEGIN IMMEDIATE. +#[tokio::test] +async fn test_dropped_transaction_releases_write_connection() { + let (db, _temp) = create_test_db("test.db").await; + + db.execute( + "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)".into(), + vec![], + ) + .await + .unwrap(); + + // Start a transaction and drop it without commit/rollback. + { + let _tx = db + .begin_interruptible_transaction() + .execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Alice")], + )]) + .await + .unwrap(); + } + + // Starting a second interruptible transaction must succeed — the previous + // writer should have been rolled back and returned to the pool clean. + // Wrapped in a timeout so a regression in the drop path (stuck rollback, + // leaked permit) surfaces as a test failure rather than a hung CI. + let tx2 = tokio::time::timeout( + DROP_TEST_TIMEOUT, + db.begin_interruptible_transaction().execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Bob")], + )]), + ) + .await + .expect("second transaction should not be blocked by leaked writer") + .expect("second transaction should start on a clean connection"); + + tx2.commit().await.unwrap(); + + // Only Bob should be present — Alice's insert was rolled back on drop. + let rows = db + .fetch_all("SELECT name FROM users ORDER BY id".into(), vec![]) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("Bob")); + + db.remove().await.unwrap(); +} + +/// Dropping an interruptible transaction that attached a secondary database +/// must also release the attached-database lock, so the next transaction can +/// attach the same database cleanly. Exercises the `detach_if_attached` branch +/// of the drop path; a regression there would silently hang the next attach. +#[tokio::test] +async fn test_dropped_attached_transaction_releases_writer_and_detaches() { + let (main_db, _temp_main) = create_test_db("main.db").await; + let (attached_db, _temp_attached) = create_test_db("attached.db").await; + + main_db + .execute( + "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)".into(), + vec![], + ) + .await + .unwrap(); + + attached_db + .execute( + "CREATE TABLE archive (id INTEGER PRIMARY KEY, user_name TEXT)".into(), + vec![], + ) + .await + .unwrap(); + + let make_spec = || sqlx_sqlite_conn_mgr::AttachedSpec { + database: std::sync::Arc::clone(attached_db.inner_for_testing()), + schema_name: "archive".to_string(), + mode: sqlx_sqlite_conn_mgr::AttachedMode::ReadOnly, + }; + + // Start an attached transaction, then drop it without commit/rollback. + { + let _tx = main_db + .begin_interruptible_transaction() + .attach(vec![make_spec()]) + .execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Alice")], + )]) + .await + .unwrap(); + } + + // A second attached transaction must succeed: the drop path must have + // rolled back the tx, detached the attached db, and released the writer. + let tx2 = tokio::time::timeout( + DROP_TEST_TIMEOUT, + main_db + .begin_interruptible_transaction() + .attach(vec![make_spec()]) + .execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Bob")], + )]), + ) + .await + .expect("second attached transaction should not be blocked by leaked writer or attach") + .expect("second attached transaction should start on a clean connection"); + + tx2.commit().await.unwrap(); + + let rows = main_db + .fetch_all("SELECT name FROM users ORDER BY id".into(), vec![]) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("Bob")); + + main_db.remove().await.unwrap(); + attached_db.remove().await.unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs index 7f3f51b..f1ff209 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,17 +29,23 @@ const DEFAULT_MAX_DATABASES: usize = 50; /// Tracks cleanup progress during app exit: 0 = not started, 1 = running, 2 = complete. static CLEANUP_STATE: AtomicU8 = AtomicU8::new(0); -/// Guarantees `CLEANUP_STATE` reaches `2` and `app_handle.exit(0)` fires even if the +/// Guarantees `CLEANUP_STATE` reaches `2` and `app_handle.exit(..)` fires even if the /// cleanup task panics. Without this, a panic would leave the state at `1` and subsequent /// user exit attempts would call `prevent_exit()` indefinitely. +/// +/// The exit code carried through is whatever the triggering `ExitRequested` carried — +/// `None` (user-initiated close) becomes `0`, `Some(n)` (programmatic +/// `app_handle.exit(n)`) is preserved so application-level exit codes survive the +/// cleanup detour. struct ExitGuard { app_handle: tauri::AppHandle, + exit_code: i32, } impl Drop for ExitGuard { fn drop(&mut self) { CLEANUP_STATE.store(2, Ordering::SeqCst); - self.app_handle.exit(0); + self.app_handle.exit(self.exit_code); } } @@ -321,27 +327,40 @@ impl Builder { .on_event(|app, event| { match event { RunEvent::ExitRequested { api, code, .. } => { - // Only intercept user-initiated exits (code is None). Programmatic - // exits via app_handle.exit() have Some(code) — let those through - // to avoid an infinite ExitRequested loop. - if code.is_some() { - return; - } - - // Claim cleanup ownership once. If another handler invocation won - // the race, keep exit prevented while its cleanup finishes. - if CLEANUP_STATE - .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - if CLEANUP_STATE.load(Ordering::SeqCst) == 1 { + // Claim cleanup ownership once. Three possible CLEANUP_STATE values: + // 0 → claim it, run cleanup + // 1 → cleanup already in progress (another invocation won the + // race). Keep exit prevented while it finishes. + // 2 → cleanup already complete; this ExitRequested is the + // re-exit fired by ExitGuard. Let it through unchanged. + // + // We deliberately do not skip programmatic exits (code.is_some()). + // A user-space app_handle.exit(N) — fatal-error handler, updater, + // Ctrl+C handler — would otherwise tear down plugin state with + // interruptible transactions still live in the map, and the + // captured-runtime Drop path on the toolkit side still relies on + // the runtime being up when it spawns the rollback. Running + // cleanup here is the clean path. + match CLEANUP_STATE.compare_exchange( + 0, + 1, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => {} + Err(2) => return, + Err(_) => { api.prevent_exit(); debug!("Exit requested while database cleanup is in progress"); + return; } - return; } - info!("App exit requested - cleaning up transactions and databases"); + let exit_code = code.unwrap_or(0); + info!( + "App exit requested (code={}) - cleaning up transactions and databases", + exit_code + ); // Prevent immediate exit so we can close connections and checkpoint WAL api.prevent_exit(); @@ -357,7 +376,7 @@ impl Builder { // then trigger a programmatic exit when done. ExitGuard ensures // CLEANUP_STATE reaches 2 and exit() fires even on panic. tauri::async_runtime::spawn(async move { - let _guard = ExitGuard { app_handle }; + let _guard = ExitGuard { app_handle, exit_code }; // Scope block: drops the RwLock write guard (from instances_clone) // before _guard fires exit(), whose RunEvent::Exit handler calls