Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,10 @@ await tx.commit();
* Only one interruptible transaction can be active per database at a time
* The write lock is held for the entire duration - keep transactions short
* Uncommitted writes are visible only within the transaction's `read()` method
* Always commit or rollback - abandoned transactions will rollback automatically
on app exit
* If the transaction handle is dropped without calling `commit()` or
`rollback()`, the transaction is automatically rolled back and the write
connection is released back to the pool. This also happens on app exit
and on transaction timeout.

To rollback instead of committing:

Expand Down Expand Up @@ -784,6 +786,10 @@ println!("Transaction completed: {} statements executed", results.len());

For transactions that need to read data mid-transaction:

If `tx` is dropped without calling `commit()` or `rollback()` — including via
an early return from a `?` operator — the transaction is automatically rolled
back and the write connection is released back to the pool.

```rust
// Assuming user_id, product_id, item_total are defined in your application context
let user_id = 123;
Expand Down
30 changes: 29 additions & 1 deletion crates/sqlx-sqlite-conn-mgr/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sqlx::{ConnectOptions, Pool, Sqlite};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tracing::error;
use tracing::{error, warn};

/// Analysis limit for PRAGMA optimize on close.
/// SQLite recommends 100-1000 for older versions; 3.46.0+ handles automatically.
Expand Down Expand Up @@ -177,12 +177,40 @@ impl SqliteDatabase {
.read_only(false)
.optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT);

// Defense-in-depth: when any writer is returned to the pool, issue
// ROLLBACK to discard any transaction that a caller may have left open
// (e.g., a writer dropped after BEGIN without COMMIT/ROLLBACK). SQLite
// only auto-rollbacks on connection close, not on pool return, so
// without this the next acquire_writer() sees "cannot start a
// transaction within a transaction".
//
// Error handling: the expected benign case on a clean connection is
// "cannot rollback - no transaction is active" — recycle normally.
// Anything else means ROLLBACK itself failed or the connection is
// wedged; tell the pool not to recycle so a broken connection isn't
// handed to the next caller.
let write_conn = SqlitePoolOptions::new()
.max_connections(1)
.min_connections(0)
.idle_timeout(Some(std::time::Duration::from_secs(
config.idle_timeout_secs,
)))
.after_release(|conn, _meta| {
Box::pin(async move {
match sqlx::query("ROLLBACK").execute(&mut *conn).await {
Ok(_) => Ok(true),
Err(sqlx::Error::Database(e))
if e.message().contains("no transaction is active") =>
{
Ok(true)
}
Err(err) => {
warn!("after_release ROLLBACK failed, discarding connection: {err}");
Ok(false)
}
}
})
})
.connect_with(write_options)
.await?;

Expand Down
147 changes: 109 additions & 38 deletions crates/sqlx-sqlite-toolkit/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,28 @@ pub struct ActiveInterruptibleTransaction {
transaction_id: String,
writer: Option<TransactionWriter>,
created_at: Instant,
// Captured at construction so Drop can always spawn the rollback task on a
// valid runtime, even when the struct is dropped from a thread that has no
// tokio thread-local (e.g., Tauri teardown on the main thread). Without a
// stored handle, Drop's synchronous path through PoolConnection::Drop would
// call sqlx's rt::spawn and panic with "this functionality requires a Tokio
// context".
runtime_handle: tokio::runtime::Handle,
}

impl ActiveInterruptibleTransaction {
/// # Panics
///
/// Panics if called outside a tokio runtime context. Both production call
/// sites (the plugin command handler and the direct Rust API) run inside
/// async functions, so this is a programming error, not a runtime risk.
pub fn new(db_path: String, transaction_id: String, writer: TransactionWriter) -> Self {
Self {
db_path,
transaction_id,
writer: Some(writer),
created_at: Instant::now(),
runtime_handle: tokio::runtime::Handle::current(),
}
}

Expand Down Expand Up @@ -230,17 +243,62 @@ impl From<(String, Vec<JsonValue>)> for Statement {
}
}

/// Upper bound on how long the auto-rollback task may hold the writer permit
/// before it is considered hung and the connection is abandoned.
const DROP_ROLLBACK_TIMEOUT: Duration = Duration::from_secs(5);

impl Drop for ActiveInterruptibleTransaction {
fn drop(&mut self) {
// If writer is still present, it means commit/rollback wasn't called.
// SQLite will automatically ROLLBACK the transaction when the connection
// is returned to the pool if no explicit COMMIT was issued.
if self.writer.is_some() {
debug!(
"Dropping transaction for db: {}, tx_id: {} (will auto-rollback)",
self.db_path, self.transaction_id
);
}
// If writer is still present, commit/rollback was not called. The connection
// is about to return to the pool — we must issue ROLLBACK explicitly because
// sqlx pools reuse the connection (SQLite only auto-rollbacks on close, not
// on pool return). Without this, the next acquire_writer() gets a connection
// with an open transaction and "BEGIN IMMEDIATE" fails.
let Some(mut writer) = self.writer.take() else {
return;
};
let db_path = std::mem::take(&mut self.db_path);
let tx_id = std::mem::take(&mut self.transaction_id);

debug!(
"Dropping transaction for db: {}, tx_id: {} (auto-rollback scheduled)",
db_path, tx_id
);

// No race with the next acquire_writer(): `writer` owns the PoolConnection
// (via WriteGuard / AttachedWriteGuard), which holds the single-writer
// permit. The permit is not released until `writer` drops at the end of
// this task — after ROLLBACK completes. The next acquire_writer() blocks
// on that permit, so it cannot see a connection with a still-open tx.
//
// The timeout bounds how long a pathological ROLLBACK (stuck I/O, a
// rogue busy lock) can keep the single-writer pool stalled. On timeout
// we drop `writer` inside the runtime; after_release then cleans up.
self.runtime_handle.spawn(async move {
let result = tokio::time::timeout(DROP_ROLLBACK_TIMEOUT, async {
if let Err(e) = writer.rollback().await {
warn!(
"auto-rollback on drop failed (db: {}, tx: {}): {}",
db_path, tx_id, e
);
}
if let Err(e) = writer.detach_if_attached().await {
warn!(
"detach_all after auto-rollback failed (db: {}, tx: {}): {}",
db_path, tx_id, e
);
}
// writer drops here — connection returns to pool clean
})
.await;

if result.is_err() {
warn!(
"auto-rollback on drop timed out after {:?} (db: {}, tx: {}) — pool's after_release hook will reconcile",
DROP_ROLLBACK_TIMEOUT, db_path, tx_id
);
Comment thread
pmorris-dev marked this conversation as resolved.
}
});
}
}

Expand Down Expand Up @@ -288,17 +346,21 @@ impl ActiveInterruptibleTransactions {
Ok(())
}
Entry::Occupied(mut e) => {
// If the existing transaction has expired, drop it (auto-rollback) and
// replace with the new one.
// If the existing transaction has expired, roll it back and replace
// with the new one. We rollback explicitly (rather than relying on
// Drop) so the writer is guaranteed to return to the pool clean
// before the caller tries to start a new transaction on it.
if e.get().created_at.elapsed() >= self.timeout {
warn!(
"Evicting expired transaction for db: {} (age: {:?}, timeout: {:?})",
db_path,
e.get().created_at.elapsed(),
self.timeout,
);
// Drop the expired transaction (auto-rollback) before inserting the new one
let _expired = e.insert(tx);
let expired = e.insert(tx);
if let Err(err) = expired.rollback().await {
warn!("rollback of expired transaction failed (db: {db_path}): {err}");
}
Ok(())
} else {
Err(Error::TransactionAlreadyActive(db_path))
Expand All @@ -308,34 +370,37 @@ impl ActiveInterruptibleTransactions {
}

pub async fn abort_all(&self) {
let mut txs = self.inner.lock().await;
debug!("Aborting {} active interruptible transaction(s)", txs.len());

for db_path in txs.keys() {
// Drain under the lock, then release it before awaiting rollbacks so we
// don't hold the mutex across a chain of awaits.
let drained: Vec<(String, ActiveInterruptibleTransaction)> = {
let mut txs = self.inner.lock().await;
debug!("Aborting {} active interruptible transaction(s)", txs.len());
txs.drain().collect()
};

for (db_path, tx) in drained {
debug!(
"Dropping interruptible transaction for database: {}",
"Rolling back interruptible transaction for database: {}",
db_path
);
if let Err(err) = tx.rollback().await {
warn!("rollback during abort_all failed (db: {db_path}): {err}");
}
}

// Clear all transactions to drop WriteGuards and release locks
// Dropping triggers auto-rollback via Drop trait
txs.clear();
}

/// Remove and return transaction for commit/rollback.
///
/// Returns `Err(Error::TransactionTimedOut)` if the transaction has exceeded the
/// configured timeout. The expired transaction is dropped (auto-rolled-back) in
/// that case.
/// configured timeout. The expired transaction is rolled back before the error
/// is returned.
pub async fn remove(
&self,
db_path: &str,
token_id: &str,
) -> Result<ActiveInterruptibleTransaction> {
let mut txs = self.inner.lock().await;

// Validate token before removal
let tx = txs
.get(db_path)
.ok_or_else(|| Error::NoActiveTransaction(db_path.to_string()))?;
Expand All @@ -344,21 +409,27 @@ impl ActiveInterruptibleTransactions {
return Err(Error::InvalidTransactionToken);
}

// Check if the transaction has expired
if tx.created_at.elapsed() >= self.timeout {
warn!(
"Transaction timed out for db: {} (age: {:?}, timeout: {:?})",
db_path,
tx.created_at.elapsed(),
self.timeout,
);
// Drop the expired transaction (auto-rollback via Drop)
txs.remove(db_path);
return Err(Error::TransactionTimedOut(db_path.to_string()));
// Happy path: not expired, hand it back to the caller.
if tx.created_at.elapsed() < self.timeout {
// Safe unwrap: we just confirmed the key exists above.
return Ok(txs.remove(db_path).unwrap());
}

// Safe unwrap: we just confirmed the key exists above
Ok(txs.remove(db_path).unwrap())
// Expired: take it out, release the lock, then rollback without holding
// it so other callers aren't blocked on an unrelated cleanup.
warn!(
"Transaction timed out for db: {} (age: {:?}, timeout: {:?})",
db_path,
tx.created_at.elapsed(),
self.timeout,
);
let expired = txs.remove(db_path).unwrap();
drop(txs);

if let Err(err) = expired.rollback().await {
warn!("rollback of timed-out transaction failed (db: {db_path}): {err}");
}
Err(Error::TransactionTimedOut(db_path.to_string()))
}
}

Expand Down
Loading
Loading