Reactive change notifications for SQLite databases using sqlx.
This crate provides transaction-safe change notifications for SQLite databases
using SQLite's native hooks (preupdate_hook, commit_hook, rollback_hook).
- Transaction-safe notifications: Changes only notify after successful commit
- Typed column values: Access old/new values with native SQLite types
- Stream support: Use
tokio_stream::Streamfor async iteration - Multiple subscribers: Broadcast channel supports multiple listeners
- Optional SQLx SQLite Connection Manager integration: Works with
sqlx-sqlite-conn-mgrfor single-writer/multi-reader patterns
Requires SQLite compiled with SQLITE_ENABLE_PREUPDATE_HOOK.
Important: Most system SQLite libraries do NOT have this option enabled by default. You have two options:
-
Use the
bundledfeature (recommended for most users):sqlx-sqlite-observer = { version = "0.8", features = ["bundled"] }
This compiles SQLite from source with preupdate hook support (~1MB binary size increase).
-
Provide your own SQLite with
SQLITE_ENABLE_PREUPDATE_HOOKcompiled in. Useis_preupdate_hook_enabled()to verify at runtime.
If preupdate hooks are not available, SqliteObserver::acquire() will return an
error with a descriptive message.
Add to your Cargo.toml:
[dependencies]
sqlx-sqlite-observer = "0.8"For integration with sqlx-sqlite-conn-mgr:
[dependencies]
sqlx-sqlite-observer = { version = "0.8", features = ["conn-mgr"] }The library uses SQLite's native hooks for transaction-safe change tracking:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ preupdate_hook │────►│ broker.buffer │ │ subscribers │
│ (captures data) │ │ (Vec<Event>) │ │ │
└─────────────────┘ └────────┬────────┘ └─────────────────┘
│ ▲
┌────────────| │
│ │ │
┌─────▼────┐ ┌────▼─────┐ │
│ COMMIT │ │ ROLLBACK │ │
└─────┬────┘ └────┬─────┘ │
│ │ │
▼ ▼ │
on_commit() on_rollback() │
│ │ │
│ buffer.clear() │
│ (discard) │
│ │
└────────────────────────────────────┘
change_tx.send()
(publish)
- When you acquire a connection, observation hooks are registered on the raw SQLite handle
preupdate_hookcaptures changes (table, operation, old/new values) and buffers themcommit_hookfires when a transaction commits, publishing buffered changes to subscribersrollback_hookfires when a transaction rolls back, discarding buffered changes
This ensures subscribers only receive notifications for committed changes.
TableChange: Notification of a change to a database tableTableChangeEvent: Event yielded byTableChangeStream— eitherChange(TableChange)orLagged(u64)ChangeOperation: Insert, Update, or DeleteColumnValue: Typed column value (Null, Integer, Real, Text, Blob)ObserverConfig: Configuration for table filtering and channel capacity
SqliteObserver: Main observer forSqlitePoolconnectionsObservableConnection: Connection wrapper with hooks registered
TableChangeStream: Async stream of table changesTableChangeStreamExt: Extension trait for converting receivers to streams
ObservableSqliteDatabase: Wrapper forSqliteDatabasewith observationObservableWriteGuard: Write guard with hooks registered
Schema information for observed tables (used internally, also exported).
pk_columns: Vec<usize>- Column indices forming the primary keywithout_rowid: bool- Whether the table uses WITHOUT ROWID
The primary_key field on TableChange always contains the actual primary key
value(s) for the affected row:
let change = rx.recv().await?;
// Single-column PK (e.g., INTEGER PRIMARY KEY)
if let Some(ColumnValue::Integer(id)) = change.primary_key.first() {
println!("Changed row id: {}", id);
}
// Composite PK - values are in declaration order
for (i, pk_value) in change.primary_key.iter().enumerate() {
println!("PK column {}: {:?}", i, pk_value);
}Why primary_key instead of just rowid?
SQLite's internal rowid works well for tables with INTEGER PRIMARY KEY, but
has limitations:
- Text or UUID primary keys: The
rowidis an internal integer, not your actual key - Composite primary keys: The
rowiddoesn't represent your multi-column key - WITHOUT ROWID tables: The
rowidfrom the preupdate hook is unreliable
The primary_key field extracts the actual primary key values from the captured
column data, giving you meaningful identifiers regardless of table structure.
For tables created with WITHOUT ROWID, the rowid field in TableChange will
be None:
let change = rx.recv().await?;
if change.rowid.is_none() {
// This is a WITHOUT ROWID table
// Use primary_key instead
println!("PK: {:?}", change.primary_key);
}This is because SQLite's preupdate hook provides the first PRIMARY KEY column (coerced to i64) as the "rowid" for WITHOUT ROWID tables, which may not be meaningful/correct for non-integer or composite primary keys.
use sqlx::SqlitePool;
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:mydb.db").await?;
let observer = SqliteObserver::new(pool, ObserverConfig::default());
// Subscribe to changes on specific tables
let mut rx = observer.subscribe(["users"]);
// Spawn a task to handle notifications
tokio::spawn(async move {
while let Ok(change) = rx.recv().await {
println!(
"Table {} row {} was {:?}",
change.table,
change.rowid.unwrap_or(-1),
change.operation
);
if let Some(ColumnValue::Integer(id)) = change.primary_key.first() {
println!(" PK: {}", id);
}
}
});
// Use the observer to execute queries
let mut conn = observer.acquire().await?;
sqlx::query("INSERT INTO users (name) VALUES (?)")
.bind("Alice")
.execute(&mut **conn)
.await?;
Ok(())
}use futures::StreamExt;
use sqlx::SqlitePool;
use sqlx_sqlite_observer::{
SqliteObserver, ObserverConfig, TableChangeEvent,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:mydb.db").await?;
let config = ObserverConfig::new().with_tables(["users", "posts"]);
let observer = SqliteObserver::new(pool, config);
let mut stream = observer.subscribe_stream(["users"]);
while let Some(event) = stream.next().await {
match event {
TableChangeEvent::Change(change) => {
println!(
"Table {} row {} was {:?}",
change.table,
change.rowid.unwrap_or(-1),
change.operation
);
}
TableChangeEvent::Lagged(n) => {
eprintln!("Missed {} notifications", n);
}
}
}
Ok(())
}use sqlx::SqlitePool;
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:mydb.db").await?;
let config = ObserverConfig::new().with_tables(["users"]);
let observer = SqliteObserver::new(pool, config);
let mut rx = observer.subscribe(["users"]);
let change = rx.recv().await?;
// Access old/new column values
if let Some(old) = &change.old_values {
println!("Old values: {:?}", old);
}
if let Some(new) = &change.new_values {
println!("New values: {:?}", new);
}
// Disable value capture for lower memory usage
let config = ObserverConfig::new()
.with_tables(["users"])
.with_capture_values(false);
let observer = SqliteObserver::new(
SqlitePool::connect("sqlite:mydb.db").await?,
config,
);
// old_values and new_values will be None
Ok(())
}use std::sync::Arc;
use sqlx_sqlite_conn_mgr::SqliteDatabase;
use sqlx_sqlite_observer::{
ObservableSqliteDatabase, ObserverConfig,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = SqliteDatabase::connect("mydb.db", None).await?;
let config = ObserverConfig::new().with_tables(["users"]);
let observable = ObservableSqliteDatabase::new(db, config);
let mut rx = observable.subscribe(["users"]);
// Write through the observable writer
let mut writer = observable.acquire_writer().await?;
sqlx::query("BEGIN").execute(&mut *writer).await?;
sqlx::query("INSERT INTO users (name) VALUES (?)")
.bind("Alice")
.execute(&mut *writer)
.await?;
sqlx::query("COMMIT").execute(&mut *writer).await?;
// Notification arrives after commit
let change = rx.recv().await?;
println!("Changed: {}", change.table);
Ok(())
}The channel_capacity in ObserverConfig determines how many changes can be
buffered. All changes in a transaction are delivered at once on commit. If your
transaction contains more mutating statements than this capacity, messages
will be dropped.
let config = ObserverConfig::new()
.with_tables(["users", "posts"])
.with_channel_capacity(1000); // Handle large transactionsWhen using the Stream API, the stream yields TableChangeEvent values.
Most events are Change variants, but if a consumer falls behind, the
stream yields a Lagged(n) event indicating how many notifications
were missed.
use futures::StreamExt;
use sqlx_sqlite_observer::TableChangeEvent;
# use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
# async fn example(observer: SqliteObserver) {
let mut stream = observer.subscribe_stream(["users"]);
while let Some(event) = stream.next().await {
match event {
TableChangeEvent::Change(change) => {
// Process the change normally
}
TableChangeEvent::Lagged(n) => {
// n notifications were missed — local state may be stale.
// Re-query the database for current state.
tracing::warn!("Missed {} change notifications", n);
}
}
}
# }When does lag happen? The broadcast channel has a fixed capacity (default 256). Lag occurs when the oldest unread messages are overwritten. This can happen in two ways:
- A subscriber processes changes slower than they arrive
- A single transaction contains more mutating statements than the channel capacity, causing messages to be overwritten before the consumer reads them
This is rare under normal conditions but can occur during bulk writes or large transactions.
How to prevent it:
- Increase
channel_capacityviaObserverConfig::with_channel_capacity - Process changes faster (avoid blocking in the stream consumer)
- Use a dedicated task for stream consumption
Note: The broadcast::Receiver API (from subscribe()) surfaces
lag as RecvError::Lagged(n) — the same information, just through
the raw tokio broadcast channel interface rather than the stream.
By default, TableChange includes old_values and new_values with the actual
column data. Disable this for lower memory usage if you only need row IDs:
let config = ObserverConfig::new()
.with_tables(["users"])
.with_capture_values(false); // Only track table + rowidMIT License - see LICENSE for details.