Skip to content

Commit 0d9b8b8

Browse files
committed
fix: rollback on drop so write conn returns to pool clean (#46)
Dropping an ActiveInterruptibleTransaction without commit/rollback left the write connection in the pool with an open transaction. sqlx pools reuse connections rather than closing them, so SQLite's close-time auto-rollback never fired, and the next acquire_writer() got a connection where BEGIN IMMEDIATE failed with "cannot start a transaction within a transaction". Drop now takes the writer and spawns a tokio task that issues ROLLBACK (and detach_if_attached) before the connection is released to the pool. The insert()/remove()/abort_all() paths that previously relied on Drop now roll back explicitly. As defense-in-depth against any other writer path that might leak a transaction (e.g. user code that does BEGIN and returns early), the write pool gets an after_release hook that unconditionally runs ROLLBACK, ignoring the benign "no transaction is active" error. Adds a regression test that mirrors the reporter's MCVE: start an interruptible transaction, drop it, start a second one, and assert the second succeeds and only its data commits. Updates the README to document the auto-rollback-on-drop guarantee (including early returns via ?).
1 parent 240eb77 commit 0d9b8b8

4 files changed

Lines changed: 170 additions & 47 deletions

File tree

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,10 @@ await tx.commit();
381381
* Only one interruptible transaction can be active per database at a time
382382
* The write lock is held for the entire duration - keep transactions short
383383
* Uncommitted writes are visible only within the transaction's `read()` method
384-
* Always commit or rollback - abandoned transactions will rollback automatically
385-
on app exit
384+
* If the transaction handle is dropped without calling `commit()` or
385+
`rollback()`, the transaction is automatically rolled back and the write
386+
connection is released back to the pool. This also happens on app exit
387+
and on transaction timeout.
386388

387389
To rollback instead of committing:
388390

@@ -784,6 +786,10 @@ println!("Transaction completed: {} statements executed", results.len());
784786

785787
For transactions that need to read data mid-transaction:
786788

789+
If `tx` is dropped without calling `commit()` or `rollback()` — including via
790+
an early return from a `?` operator — the transaction is automatically rolled
791+
back and the write connection is released back to the pool.
792+
787793
```rust
788794
// Assuming user_id, product_id, item_total are defined in your application context
789795
let user_id = 123;

crates/sqlx-sqlite-conn-mgr/src/database.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,26 @@ impl SqliteDatabase {
177177
.read_only(false)
178178
.optimize_on_close(true, OPTIMIZE_ANALYSIS_LIMIT);
179179

180+
// Defense-in-depth: when any writer is returned to the pool, issue
181+
// ROLLBACK to discard any transaction that a caller may have left open
182+
// (e.g., a writer dropped after BEGIN without COMMIT/ROLLBACK). SQLite
183+
// only auto-rollbacks on connection close, not on pool return, so
184+
// without this the next acquire_writer() sees "cannot start a
185+
// transaction within a transaction". The "cannot rollback - no
186+
// transaction is active" error on a clean connection is benign and
187+
// ignored.
180188
let write_conn = SqlitePoolOptions::new()
181189
.max_connections(1)
182190
.min_connections(0)
183191
.idle_timeout(Some(std::time::Duration::from_secs(
184192
config.idle_timeout_secs,
185193
)))
194+
.after_release(|conn, _meta| {
195+
Box::pin(async move {
196+
let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
197+
Ok(true)
198+
})
199+
})
186200
.connect_with(write_options)
187201
.await?;
188202

crates/sqlx-sqlite-toolkit/src/transactions.rs

Lines changed: 93 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -232,14 +232,47 @@ impl From<(String, Vec<JsonValue>)> for Statement {
232232

233233
impl Drop for ActiveInterruptibleTransaction {
234234
fn drop(&mut self) {
235-
// If writer is still present, it means commit/rollback wasn't called.
236-
// SQLite will automatically ROLLBACK the transaction when the connection
237-
// is returned to the pool if no explicit COMMIT was issued.
238-
if self.writer.is_some() {
239-
debug!(
240-
"Dropping transaction for db: {}, tx_id: {} (will auto-rollback)",
241-
self.db_path, self.transaction_id
242-
);
235+
// If writer is still present, commit/rollback was not called. The connection
236+
// is about to return to the pool — we must issue ROLLBACK explicitly because
237+
// sqlx pools reuse the connection (SQLite only auto-rollbacks on close, not
238+
// on pool return). Without this, the next acquire_writer() gets a connection
239+
// with an open transaction and "BEGIN IMMEDIATE" fails.
240+
let Some(mut writer) = self.writer.take() else {
241+
return;
242+
};
243+
let db_path = std::mem::take(&mut self.db_path);
244+
let tx_id = std::mem::take(&mut self.transaction_id);
245+
246+
match tokio::runtime::Handle::try_current() {
247+
Ok(handle) => {
248+
debug!(
249+
"Dropping transaction for db: {}, tx_id: {} (auto-rollback scheduled)",
250+
db_path, tx_id
251+
);
252+
handle.spawn(async move {
253+
if let Err(e) = writer.rollback().await {
254+
warn!(
255+
"auto-rollback on drop failed (db: {}, tx: {}): {}",
256+
db_path, tx_id, e
257+
);
258+
}
259+
if let Err(e) = writer.detach_if_attached().await {
260+
warn!(
261+
"detach_all after auto-rollback failed (db: {}, tx: {}): {}",
262+
db_path, tx_id, e
263+
);
264+
}
265+
// writer drops here — connection returns to pool clean
266+
});
267+
}
268+
Err(_) => {
269+
warn!(
270+
"no tokio runtime at drop for db: {}, tx: {} — write pool after_release hook will clean up",
271+
db_path, tx_id
272+
);
273+
// Writer drops synchronously; the write pool's after_release hook will
274+
// issue ROLLBACK before the connection is handed out again.
275+
}
243276
}
244277
}
245278
}
@@ -288,17 +321,21 @@ impl ActiveInterruptibleTransactions {
288321
Ok(())
289322
}
290323
Entry::Occupied(mut e) => {
291-
// If the existing transaction has expired, drop it (auto-rollback) and
292-
// replace with the new one.
324+
// If the existing transaction has expired, roll it back and replace
325+
// with the new one. We rollback explicitly (rather than relying on
326+
// Drop) so the writer is guaranteed to return to the pool clean
327+
// before the caller tries to start a new transaction on it.
293328
if e.get().created_at.elapsed() >= self.timeout {
294329
warn!(
295330
"Evicting expired transaction for db: {} (age: {:?}, timeout: {:?})",
296331
db_path,
297332
e.get().created_at.elapsed(),
298333
self.timeout,
299334
);
300-
// Drop the expired transaction (auto-rollback) before inserting the new one
301-
let _expired = e.insert(tx);
335+
let expired = e.insert(tx);
336+
if let Err(err) = expired.rollback().await {
337+
warn!("rollback of expired transaction failed (db: {db_path}): {err}");
338+
}
302339
Ok(())
303340
} else {
304341
Err(Error::TransactionAlreadyActive(db_path))
@@ -308,57 +345,68 @@ impl ActiveInterruptibleTransactions {
308345
}
309346

310347
pub async fn abort_all(&self) {
311-
let mut txs = self.inner.lock().await;
312-
debug!("Aborting {} active interruptible transaction(s)", txs.len());
313-
314-
for db_path in txs.keys() {
348+
// Drain under the lock, then release it before awaiting rollbacks so we
349+
// don't hold the mutex across network of awaits.
350+
let drained: Vec<(String, ActiveInterruptibleTransaction)> = {
351+
let mut txs = self.inner.lock().await;
352+
debug!("Aborting {} active interruptible transaction(s)", txs.len());
353+
txs.drain().collect()
354+
};
355+
356+
for (db_path, tx) in drained {
315357
debug!(
316-
"Dropping interruptible transaction for database: {}",
358+
"Rolling back interruptible transaction for database: {}",
317359
db_path
318360
);
361+
if let Err(err) = tx.rollback().await {
362+
warn!("rollback during abort_all failed (db: {db_path}): {err}");
363+
}
319364
}
320-
321-
// Clear all transactions to drop WriteGuards and release locks
322-
// Dropping triggers auto-rollback via Drop trait
323-
txs.clear();
324365
}
325366

326367
/// Remove and return transaction for commit/rollback.
327368
///
328369
/// Returns `Err(Error::TransactionTimedOut)` if the transaction has exceeded the
329-
/// configured timeout. The expired transaction is dropped (auto-rolled-back) in
330-
/// that case.
370+
/// configured timeout. The expired transaction is rolled back before the error
371+
/// is returned.
331372
pub async fn remove(
332373
&self,
333374
db_path: &str,
334375
token_id: &str,
335376
) -> Result<ActiveInterruptibleTransaction> {
336-
let mut txs = self.inner.lock().await;
377+
// Take the expired transaction out under the lock, then rollback without
378+
// holding it so other callers don't block on an unrelated cleanup.
379+
let expired: Option<ActiveInterruptibleTransaction> = {
380+
let mut txs = self.inner.lock().await;
337381

338-
// Validate token before removal
339-
let tx = txs
340-
.get(db_path)
341-
.ok_or_else(|| Error::NoActiveTransaction(db_path.to_string()))?;
382+
let tx = txs
383+
.get(db_path)
384+
.ok_or_else(|| Error::NoActiveTransaction(db_path.to_string()))?;
342385

343-
if tx.transaction_id() != token_id {
344-
return Err(Error::InvalidTransactionToken);
345-
}
386+
if tx.transaction_id() != token_id {
387+
return Err(Error::InvalidTransactionToken);
388+
}
346389

347-
// Check if the transaction has expired
348-
if tx.created_at.elapsed() >= self.timeout {
349-
warn!(
350-
"Transaction timed out for db: {} (age: {:?}, timeout: {:?})",
351-
db_path,
352-
tx.created_at.elapsed(),
353-
self.timeout,
354-
);
355-
// Drop the expired transaction (auto-rollback via Drop)
356-
txs.remove(db_path);
357-
return Err(Error::TransactionTimedOut(db_path.to_string()));
358-
}
390+
if tx.created_at.elapsed() >= self.timeout {
391+
warn!(
392+
"Transaction timed out for db: {} (age: {:?}, timeout: {:?})",
393+
db_path,
394+
tx.created_at.elapsed(),
395+
self.timeout,
396+
);
397+
txs.remove(db_path)
398+
} else {
399+
// Safe unwrap: we just confirmed the key exists above
400+
return Ok(txs.remove(db_path).unwrap());
401+
}
402+
};
359403

360-
// Safe unwrap: we just confirmed the key exists above
361-
Ok(txs.remove(db_path).unwrap())
404+
if let Some(tx) = expired
405+
&& let Err(err) = tx.rollback().await
406+
{
407+
warn!("rollback of timed-out transaction failed (db: {db_path}): {err}");
408+
}
409+
Err(Error::TransactionTimedOut(db_path.to_string()))
362410
}
363411
}
364412

crates/sqlx-sqlite-toolkit/tests/interruptible_transaction_tests.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,3 +377,58 @@ async fn test_execute_transaction_rollback_on_failure() {
377377

378378
db.remove().await.unwrap();
379379
}
380+
381+
/// Regression test for issue #46:
382+
/// https://github.com/silvermine/tauri-plugin-sqlite/issues/46
383+
///
384+
/// Dropping an interruptible transaction without calling commit/rollback must
385+
/// leave the shared write connection in a clean state so the next transaction
386+
/// can start. Previously the writer was returned to the pool with an open
387+
/// transaction, causing "cannot start a transaction within a transaction" on
388+
/// the next BEGIN IMMEDIATE.
389+
#[tokio::test]
390+
async fn test_dropped_transaction_releases_write_connection() {
391+
let (db, _temp) = create_test_db("test.db").await;
392+
393+
db.execute(
394+
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)".into(),
395+
vec![],
396+
)
397+
.await
398+
.unwrap();
399+
400+
// Start a transaction and drop it without commit/rollback.
401+
{
402+
let _tx = db
403+
.begin_interruptible_transaction()
404+
.execute(vec![(
405+
"INSERT INTO users (name) VALUES (?)",
406+
vec![json!("Alice")],
407+
)])
408+
.await
409+
.unwrap();
410+
}
411+
412+
// Starting a second interruptible transaction must succeed — the previous
413+
// writer should have been rolled back and returned to the pool clean.
414+
let tx2 = db
415+
.begin_interruptible_transaction()
416+
.execute(vec![(
417+
"INSERT INTO users (name) VALUES (?)",
418+
vec![json!("Bob")],
419+
)])
420+
.await
421+
.expect("second transaction should start on a clean connection");
422+
423+
tx2.commit().await.unwrap();
424+
425+
// Only Bob should be present — Alice's insert was rolled back on drop.
426+
let rows = db
427+
.fetch_all("SELECT name FROM users ORDER BY id".into(), vec![])
428+
.await
429+
.unwrap();
430+
assert_eq!(rows.len(), 1);
431+
assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("Bob"));
432+
433+
db.remove().await.unwrap();
434+
}

0 commit comments

Comments
 (0)