diff --git a/README.md b/README.md index f2bc71f..93e3a9c 100644 --- a/README.md +++ b/README.md @@ -381,8 +381,10 @@ await tx.commit(); * Only one interruptible transaction can be active per database at a time * The write lock is held for the entire duration - keep transactions short * Uncommitted writes are visible only within the transaction's `read()` method - * Always commit or rollback - abandoned transactions will rollback automatically - on app exit + * If the transaction handle is dropped without calling `commit()` or + `rollback()`, the transaction is automatically rolled back and the write + connection is released back to the pool. This also happens on app exit + and on transaction timeout. To rollback instead of committing: @@ -784,6 +786,10 @@ println!("Transaction completed: {} statements executed", results.len()); For transactions that need to read data mid-transaction: +If `tx` is dropped without calling `commit()` or `rollback()` — including via +an early return from a `?` operator — the transaction is automatically rolled +back and the write connection is released back to the pool. + ```rust // Assuming user_id, product_id, item_total are defined in your application context let user_id = 123; diff --git a/crates/sqlx-sqlite-conn-mgr/src/database.rs b/crates/sqlx-sqlite-conn-mgr/src/database.rs index 90194ae..a48a86b 100644 --- a/crates/sqlx-sqlite-conn-mgr/src/database.rs +++ b/crates/sqlx-sqlite-conn-mgr/src/database.rs @@ -10,7 +10,7 @@ use sqlx::{ConnectOptions, Pool, Sqlite}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use tracing::error; +use tracing::{error, warn}; /// Analysis limit for PRAGMA optimize on close. /// SQLite recommends 100-1000 for older versions; 3.46.0+ handles automatically. @@ -177,12 +177,40 @@ impl SqliteDatabase { .read_only(false) .optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT); + // Defense-in-depth: when any writer is returned to the pool, issue + // ROLLBACK to discard any transaction that a caller may have left open + // (e.g., a writer dropped after BEGIN without COMMIT/ROLLBACK). SQLite + // only auto-rollbacks on connection close, not on pool return, so + // without this the next acquire_writer() sees "cannot start a + // transaction within a transaction". + // + // Error handling: the expected benign case on a clean connection is + // "cannot rollback - no transaction is active" — recycle normally. + // Anything else means ROLLBACK itself failed or the connection is + // wedged; tell the pool not to recycle so a broken connection isn't + // handed to the next caller. let write_conn = SqlitePoolOptions::new() .max_connections(1) .min_connections(0) .idle_timeout(Some(std::time::Duration::from_secs( config.idle_timeout_secs, ))) + .after_release(|conn, _meta| { + Box::pin(async move { + match sqlx::query("ROLLBACK").execute(&mut *conn).await { + Ok(_) => Ok(true), + Err(sqlx::Error::Database(e)) + if e.message().contains("no transaction is active") => + { + Ok(true) + } + Err(err) => { + warn!("after_release ROLLBACK failed, discarding connection: {err}"); + Ok(false) + } + } + }) + }) .connect_with(write_options) .await?; diff --git a/crates/sqlx-sqlite-toolkit/src/transactions.rs b/crates/sqlx-sqlite-toolkit/src/transactions.rs index 01cb709..fafc1ce 100644 --- a/crates/sqlx-sqlite-toolkit/src/transactions.rs +++ b/crates/sqlx-sqlite-toolkit/src/transactions.rs @@ -99,15 +99,28 @@ pub struct ActiveInterruptibleTransaction { transaction_id: String, writer: Option, created_at: Instant, + // Captured at construction so Drop can always spawn the rollback task on a + // valid runtime, even when the struct is dropped from a thread that has no + // tokio thread-local (e.g., Tauri teardown on the main thread). Without a + // stored handle, Drop's synchronous path through PoolConnection::Drop would + // call sqlx's rt::spawn and panic with "this functionality requires a Tokio + // context". + runtime_handle: tokio::runtime::Handle, } impl ActiveInterruptibleTransaction { + /// # Panics + /// + /// Panics if called outside a tokio runtime context. Both production call + /// sites (the plugin command handler and the direct Rust API) run inside + /// async functions, so this is a programming error, not a runtime risk. pub fn new(db_path: String, transaction_id: String, writer: TransactionWriter) -> Self { Self { db_path, transaction_id, writer: Some(writer), created_at: Instant::now(), + runtime_handle: tokio::runtime::Handle::current(), } } @@ -230,17 +243,62 @@ impl From<(String, Vec)> for Statement { } } +/// Upper bound on how long the auto-rollback task may hold the writer permit +/// before it is considered hung and the connection is abandoned. +const DROP_ROLLBACK_TIMEOUT: Duration = Duration::from_secs(5); + impl Drop for ActiveInterruptibleTransaction { fn drop(&mut self) { - // If writer is still present, it means commit/rollback wasn't called. - // SQLite will automatically ROLLBACK the transaction when the connection - // is returned to the pool if no explicit COMMIT was issued. - if self.writer.is_some() { - debug!( - "Dropping transaction for db: {}, tx_id: {} (will auto-rollback)", - self.db_path, self.transaction_id - ); - } + // If writer is still present, commit/rollback was not called. The connection + // is about to return to the pool — we must issue ROLLBACK explicitly because + // sqlx pools reuse the connection (SQLite only auto-rollbacks on close, not + // on pool return). Without this, the next acquire_writer() gets a connection + // with an open transaction and "BEGIN IMMEDIATE" fails. + let Some(mut writer) = self.writer.take() else { + return; + }; + let db_path = std::mem::take(&mut self.db_path); + let tx_id = std::mem::take(&mut self.transaction_id); + + debug!( + "Dropping transaction for db: {}, tx_id: {} (auto-rollback scheduled)", + db_path, tx_id + ); + + // No race with the next acquire_writer(): `writer` owns the PoolConnection + // (via WriteGuard / AttachedWriteGuard), which holds the single-writer + // permit. The permit is not released until `writer` drops at the end of + // this task — after ROLLBACK completes. The next acquire_writer() blocks + // on that permit, so it cannot see a connection with a still-open tx. + // + // The timeout bounds how long a pathological ROLLBACK (stuck I/O, a + // rogue busy lock) can keep the single-writer pool stalled. On timeout + // we drop `writer` inside the runtime; after_release then cleans up. + self.runtime_handle.spawn(async move { + let result = tokio::time::timeout(DROP_ROLLBACK_TIMEOUT, async { + if let Err(e) = writer.rollback().await { + warn!( + "auto-rollback on drop failed (db: {}, tx: {}): {}", + db_path, tx_id, e + ); + } + if let Err(e) = writer.detach_if_attached().await { + warn!( + "detach_all after auto-rollback failed (db: {}, tx: {}): {}", + db_path, tx_id, e + ); + } + // writer drops here — connection returns to pool clean + }) + .await; + + if result.is_err() { + warn!( + "auto-rollback on drop timed out after {:?} (db: {}, tx: {}) — pool's after_release hook will reconcile", + DROP_ROLLBACK_TIMEOUT, db_path, tx_id + ); + } + }); } } @@ -288,8 +346,10 @@ impl ActiveInterruptibleTransactions { Ok(()) } Entry::Occupied(mut e) => { - // If the existing transaction has expired, drop it (auto-rollback) and - // replace with the new one. + // If the existing transaction has expired, roll it back and replace + // with the new one. We rollback explicitly (rather than relying on + // Drop) so the writer is guaranteed to return to the pool clean + // before the caller tries to start a new transaction on it. if e.get().created_at.elapsed() >= self.timeout { warn!( "Evicting expired transaction for db: {} (age: {:?}, timeout: {:?})", @@ -297,8 +357,10 @@ impl ActiveInterruptibleTransactions { e.get().created_at.elapsed(), self.timeout, ); - // Drop the expired transaction (auto-rollback) before inserting the new one - let _expired = e.insert(tx); + let expired = e.insert(tx); + if let Err(err) = expired.rollback().await { + warn!("rollback of expired transaction failed (db: {db_path}): {err}"); + } Ok(()) } else { Err(Error::TransactionAlreadyActive(db_path)) @@ -308,26 +370,30 @@ impl ActiveInterruptibleTransactions { } pub async fn abort_all(&self) { - let mut txs = self.inner.lock().await; - debug!("Aborting {} active interruptible transaction(s)", txs.len()); - - for db_path in txs.keys() { + // Drain under the lock, then release it before awaiting rollbacks so we + // don't hold the mutex across a chain of awaits. + let drained: Vec<(String, ActiveInterruptibleTransaction)> = { + let mut txs = self.inner.lock().await; + debug!("Aborting {} active interruptible transaction(s)", txs.len()); + txs.drain().collect() + }; + + for (db_path, tx) in drained { debug!( - "Dropping interruptible transaction for database: {}", + "Rolling back interruptible transaction for database: {}", db_path ); + if let Err(err) = tx.rollback().await { + warn!("rollback during abort_all failed (db: {db_path}): {err}"); + } } - - // Clear all transactions to drop WriteGuards and release locks - // Dropping triggers auto-rollback via Drop trait - txs.clear(); } /// Remove and return transaction for commit/rollback. /// /// Returns `Err(Error::TransactionTimedOut)` if the transaction has exceeded the - /// configured timeout. The expired transaction is dropped (auto-rolled-back) in - /// that case. + /// configured timeout. The expired transaction is rolled back before the error + /// is returned. pub async fn remove( &self, db_path: &str, @@ -335,7 +401,6 @@ impl ActiveInterruptibleTransactions { ) -> Result { let mut txs = self.inner.lock().await; - // Validate token before removal let tx = txs .get(db_path) .ok_or_else(|| Error::NoActiveTransaction(db_path.to_string()))?; @@ -344,21 +409,27 @@ impl ActiveInterruptibleTransactions { return Err(Error::InvalidTransactionToken); } - // Check if the transaction has expired - if tx.created_at.elapsed() >= self.timeout { - warn!( - "Transaction timed out for db: {} (age: {:?}, timeout: {:?})", - db_path, - tx.created_at.elapsed(), - self.timeout, - ); - // Drop the expired transaction (auto-rollback via Drop) - txs.remove(db_path); - return Err(Error::TransactionTimedOut(db_path.to_string())); + // Happy path: not expired, hand it back to the caller. + if tx.created_at.elapsed() < self.timeout { + // Safe unwrap: we just confirmed the key exists above. + return Ok(txs.remove(db_path).unwrap()); } - // Safe unwrap: we just confirmed the key exists above - Ok(txs.remove(db_path).unwrap()) + // Expired: take it out, release the lock, then rollback without holding + // it so other callers aren't blocked on an unrelated cleanup. + warn!( + "Transaction timed out for db: {} (age: {:?}, timeout: {:?})", + db_path, + tx.created_at.elapsed(), + self.timeout, + ); + let expired = txs.remove(db_path).unwrap(); + drop(txs); + + if let Err(err) = expired.rollback().await { + warn!("rollback of timed-out transaction failed (db: {db_path}): {err}"); + } + Err(Error::TransactionTimedOut(db_path.to_string())) } } diff --git a/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs b/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs index f705902..3958fcd 100644 --- a/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs +++ b/crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs @@ -377,3 +377,141 @@ async fn test_execute_transaction_rollback_on_failure() { db.remove().await.unwrap(); } + +/// How long to wait for the post-drop rollback to complete before failing +/// the test. Comfortably larger than the drop-path rollback timeout, so a +/// regression (stuck rollback, leaked writer permit) fails fast rather than +/// hanging CI on the next `acquire_writer()`. +const DROP_TEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +/// Regression test for issue #46: +/// https://github.com/silvermine/tauri-plugin-sqlite/issues/46 +/// +/// Dropping an interruptible transaction without calling commit/rollback must +/// leave the shared write connection in a clean state so the next transaction +/// can start. Previously the writer was returned to the pool with an open +/// transaction, causing "cannot start a transaction within a transaction" on +/// the next BEGIN IMMEDIATE. +#[tokio::test] +async fn test_dropped_transaction_releases_write_connection() { + let (db, _temp) = create_test_db("test.db").await; + + db.execute( + "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)".into(), + vec![], + ) + .await + .unwrap(); + + // Start a transaction and drop it without commit/rollback. + { + let _tx = db + .begin_interruptible_transaction() + .execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Alice")], + )]) + .await + .unwrap(); + } + + // Starting a second interruptible transaction must succeed — the previous + // writer should have been rolled back and returned to the pool clean. + // Wrapped in a timeout so a regression in the drop path (stuck rollback, + // leaked permit) surfaces as a test failure rather than a hung CI. + let tx2 = tokio::time::timeout( + DROP_TEST_TIMEOUT, + db.begin_interruptible_transaction().execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Bob")], + )]), + ) + .await + .expect("second transaction should not be blocked by leaked writer") + .expect("second transaction should start on a clean connection"); + + tx2.commit().await.unwrap(); + + // Only Bob should be present — Alice's insert was rolled back on drop. + let rows = db + .fetch_all("SELECT name FROM users ORDER BY id".into(), vec![]) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("Bob")); + + db.remove().await.unwrap(); +} + +/// Dropping an interruptible transaction that attached a secondary database +/// must also release the attached-database lock, so the next transaction can +/// attach the same database cleanly. Exercises the `detach_if_attached` branch +/// of the drop path; a regression there would silently hang the next attach. +#[tokio::test] +async fn test_dropped_attached_transaction_releases_writer_and_detaches() { + let (main_db, _temp_main) = create_test_db("main.db").await; + let (attached_db, _temp_attached) = create_test_db("attached.db").await; + + main_db + .execute( + "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)".into(), + vec![], + ) + .await + .unwrap(); + + attached_db + .execute( + "CREATE TABLE archive (id INTEGER PRIMARY KEY, user_name TEXT)".into(), + vec![], + ) + .await + .unwrap(); + + let make_spec = || sqlx_sqlite_conn_mgr::AttachedSpec { + database: std::sync::Arc::clone(attached_db.inner_for_testing()), + schema_name: "archive".to_string(), + mode: sqlx_sqlite_conn_mgr::AttachedMode::ReadOnly, + }; + + // Start an attached transaction, then drop it without commit/rollback. + { + let _tx = main_db + .begin_interruptible_transaction() + .attach(vec![make_spec()]) + .execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Alice")], + )]) + .await + .unwrap(); + } + + // A second attached transaction must succeed: the drop path must have + // rolled back the tx, detached the attached db, and released the writer. + let tx2 = tokio::time::timeout( + DROP_TEST_TIMEOUT, + main_db + .begin_interruptible_transaction() + .attach(vec![make_spec()]) + .execute(vec![( + "INSERT INTO users (name) VALUES (?)", + vec![json!("Bob")], + )]), + ) + .await + .expect("second attached transaction should not be blocked by leaked writer or attach") + .expect("second attached transaction should start on a clean connection"); + + tx2.commit().await.unwrap(); + + let rows = main_db + .fetch_all("SELECT name FROM users ORDER BY id".into(), vec![]) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("Bob")); + + main_db.remove().await.unwrap(); + attached_db.remove().await.unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs index 7f3f51b..f1ff209 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,17 +29,23 @@ const DEFAULT_MAX_DATABASES: usize = 50; /// Tracks cleanup progress during app exit: 0 = not started, 1 = running, 2 = complete. static CLEANUP_STATE: AtomicU8 = AtomicU8::new(0); -/// Guarantees `CLEANUP_STATE` reaches `2` and `app_handle.exit(0)` fires even if the +/// Guarantees `CLEANUP_STATE` reaches `2` and `app_handle.exit(..)` fires even if the /// cleanup task panics. Without this, a panic would leave the state at `1` and subsequent /// user exit attempts would call `prevent_exit()` indefinitely. +/// +/// The exit code carried through is whatever the triggering `ExitRequested` carried — +/// `None` (user-initiated close) becomes `0`, `Some(n)` (programmatic +/// `app_handle.exit(n)`) is preserved so application-level exit codes survive the +/// cleanup detour. struct ExitGuard { app_handle: tauri::AppHandle, + exit_code: i32, } impl Drop for ExitGuard { fn drop(&mut self) { CLEANUP_STATE.store(2, Ordering::SeqCst); - self.app_handle.exit(0); + self.app_handle.exit(self.exit_code); } } @@ -321,27 +327,40 @@ impl Builder { .on_event(|app, event| { match event { RunEvent::ExitRequested { api, code, .. } => { - // Only intercept user-initiated exits (code is None). Programmatic - // exits via app_handle.exit() have Some(code) — let those through - // to avoid an infinite ExitRequested loop. - if code.is_some() { - return; - } - - // Claim cleanup ownership once. If another handler invocation won - // the race, keep exit prevented while its cleanup finishes. - if CLEANUP_STATE - .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - if CLEANUP_STATE.load(Ordering::SeqCst) == 1 { + // Claim cleanup ownership once. Three possible CLEANUP_STATE values: + // 0 → claim it, run cleanup + // 1 → cleanup already in progress (another invocation won the + // race). Keep exit prevented while it finishes. + // 2 → cleanup already complete; this ExitRequested is the + // re-exit fired by ExitGuard. Let it through unchanged. + // + // We deliberately do not skip programmatic exits (code.is_some()). + // A user-space app_handle.exit(N) — fatal-error handler, updater, + // Ctrl+C handler — would otherwise tear down plugin state with + // interruptible transactions still live in the map, and the + // captured-runtime Drop path on the toolkit side still relies on + // the runtime being up when it spawns the rollback. Running + // cleanup here is the clean path. + match CLEANUP_STATE.compare_exchange( + 0, + 1, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => {} + Err(2) => return, + Err(_) => { api.prevent_exit(); debug!("Exit requested while database cleanup is in progress"); + return; } - return; } - info!("App exit requested - cleaning up transactions and databases"); + let exit_code = code.unwrap_or(0); + info!( + "App exit requested (code={}) - cleaning up transactions and databases", + exit_code + ); // Prevent immediate exit so we can close connections and checkpoint WAL api.prevent_exit(); @@ -357,7 +376,7 @@ impl Builder { // then trigger a programmatic exit when done. ExitGuard ensures // CLEANUP_STATE reaches 2 and exit() fires even on panic. tauri::async_runtime::spawn(async move { - let _guard = ExitGuard { app_handle }; + let _guard = ExitGuard { app_handle, exit_code }; // Scope block: drops the RwLock write guard (from instances_clone) // before _guard fires exit(), whose RunEvent::Exit handler calls