3030//! ```
3131//!
3232//! Changes captured by the preupdate hook are buffered until the transaction
33- //! completes. On commit, buffered changes are published to subscribers. On
34- //! rollback (or implicit rollback due to an error), they are discarded without
35- //! notification.
33+ //! (explicit or implicit) completes. On commit, buffered changes are published
34+ //! to subscribers. On rollback, they are discarded without notification.
3635
37- use std:: collections:: HashSet ;
36+ use std:: collections:: { HashMap , HashSet } ;
3837use std:: sync:: Arc ;
3938use std:: time:: Instant ;
4039
4140use parking_lot:: { Mutex , RwLock } ;
4241use tokio:: sync:: broadcast;
43- use tracing:: { debug, trace} ;
42+ use tracing:: { debug, error , trace} ;
4443
45- use crate :: change:: { ChangeOperation , TableChange } ;
44+ use crate :: change:: { ChangeOperation , ColumnValue , TableChange , TableInfo } ;
4645use crate :: hooks:: { PreUpdateEvent , SqliteValue } ;
4746
4847/// Transaction-aware observation broker.
@@ -54,6 +53,7 @@ pub struct ObservationBroker {
5453 buffer : Mutex < Vec < PreUpdateEvent > > ,
5554 change_tx : broadcast:: Sender < TableChange > ,
5655 observed_tables : RwLock < HashSet < String > > ,
56+ table_info : RwLock < HashMap < String , TableInfo > > ,
5757 capture_values : bool ,
5858}
5959
@@ -65,6 +65,7 @@ impl ObservationBroker {
6565 buffer : Mutex :: new ( Vec :: new ( ) ) ,
6666 change_tx,
6767 observed_tables : RwLock :: new ( HashSet :: new ( ) ) ,
68+ table_info : RwLock :: new ( HashMap :: new ( ) ) ,
6869 capture_values,
6970 } )
7071 }
@@ -74,20 +75,25 @@ impl ObservationBroker {
7475 self . observed_tables . read ( ) . contains ( table)
7576 }
7677
77- /// Registers tables for observation.
78+ /// Registers a table for observation with its schema information .
7879 ///
7980 /// Only changes to observed tables will be buffered and published.
80- pub fn observe_tables < I , S > ( & self , tables : I )
81- where
82- I : IntoIterator < Item = S > ,
83- S : AsRef < str > ,
84- {
85- let mut observed = self . observed_tables . write ( ) ;
86- for table in tables {
87- let table_name = table. as_ref ( ) . to_string ( ) ;
88- trace ! ( table = %table_name, "Observing table" ) ;
89- observed. insert ( table_name) ;
90- }
81+ /// The `TableInfo` is required to correctly extract primary key values
82+ /// and determine whether the rowid is meaningful for the table.
83+ pub fn observe_table ( & self , table : & str , info : TableInfo ) {
84+ trace ! (
85+ table = %table,
86+ pk_columns = ?info. pk_columns,
87+ without_rowid = info. without_rowid,
88+ "Observing table with schema info"
89+ ) ;
90+ self . observed_tables . write ( ) . insert ( table. to_string ( ) ) ;
91+ self . table_info . write ( ) . insert ( table. to_string ( ) , info) ;
92+ }
93+
94+ /// Gets the schema information for an observed table.
95+ pub fn get_table_info ( & self , table : & str ) -> Option < TableInfo > {
96+ self . table_info . read ( ) . get ( table) . cloned ( )
9197 }
9298
9399 /// Returns a list of all observed tables.
@@ -125,8 +131,14 @@ impl ObservationBroker {
125131 debug ! ( count = events. len( ) , "Flushing buffered changes on commit" ) ;
126132
127133 for event in events {
128- let table_change = self . event_to_change ( event) ;
129- let _ = self . change_tx . send ( table_change) ;
134+ match self . event_to_change ( event) {
135+ Ok ( table_change) => {
136+ let _ = self . change_tx . send ( table_change) ;
137+ }
138+ Err ( e) => {
139+ error ! ( error = %e, "Failed to convert event to change" ) ;
140+ }
141+ }
130142 }
131143 }
132144
@@ -155,13 +167,22 @@ impl ObservationBroker {
155167 }
156168
157169 /// Converts a PreUpdateEvent to a TableChange for broadcast.
158- fn event_to_change ( & self , event : PreUpdateEvent ) -> TableChange {
159- let rowid = match event. operation {
160- ChangeOperation :: Insert => Some ( event. new_rowid ) ,
161- ChangeOperation :: Delete => Some ( event. old_rowid ) ,
162- ChangeOperation :: Update => Some ( event. new_rowid ) ,
170+ fn event_to_change ( & self , event : PreUpdateEvent ) -> crate :: Result < TableChange > {
171+ let table_info = self . table_info . read ( ) . get ( & event. table ) . cloned ( ) ;
172+
173+ // For WITHOUT ROWID tables, the rowid from preupdate hook is not meaningful
174+ let rowid = match & table_info {
175+ Some ( info) if info. without_rowid => None ,
176+ _ => match event. operation {
177+ ChangeOperation :: Insert => Some ( event. new_rowid ) ,
178+ ChangeOperation :: Delete => Some ( event. old_rowid ) ,
179+ ChangeOperation :: Update => Some ( event. new_rowid ) ,
180+ } ,
163181 } ;
164182
183+ // Extract primary key values from the appropriate column values
184+ let primary_key = self . extract_primary_key ( & event, table_info. as_ref ( ) ) ?;
185+
165186 let ( old_values, new_values) = if self . capture_values {
166187 (
167188 event. old_values . map ( Self :: values_to_vec) ,
@@ -171,14 +192,59 @@ impl ObservationBroker {
171192 ( None , None )
172193 } ;
173194
174- TableChange {
195+ Ok ( TableChange {
175196 table : event. table ,
176197 operation : Some ( event. operation ) ,
177198 rowid,
199+ primary_key,
178200 old_values,
179201 new_values,
180202 timestamp : Instant :: now ( ) ,
203+ } )
204+ }
205+
206+ /// Extracts primary key values from the event based on table schema.
207+ ///
208+ /// Returns an error if the schema has drifted (e.g., table was altered)
209+ /// and PK column indices are out of bounds.
210+ fn extract_primary_key (
211+ & self ,
212+ event : & PreUpdateEvent ,
213+ table_info : Option < & TableInfo > ,
214+ ) -> crate :: Result < Vec < ColumnValue > > {
215+ let Some ( info) = table_info else {
216+ return Ok ( Vec :: new ( ) ) ;
217+ } ;
218+
219+ if info. pk_columns . is_empty ( ) {
220+ return Ok ( Vec :: new ( ) ) ;
221+ }
222+
223+ // For DELETE, use old values; for INSERT/UPDATE, use new values
224+ let values = match event. operation {
225+ ChangeOperation :: Delete => event. old_values . as_ref ( ) ,
226+ ChangeOperation :: Insert | ChangeOperation :: Update => event. new_values . as_ref ( ) ,
227+ } ;
228+
229+ let Some ( values) = values else {
230+ return Ok ( Vec :: new ( ) ) ;
231+ } ;
232+
233+ // Extract values at the PK column indices, erroring if any index is out of bounds
234+ let mut pk_values = Vec :: with_capacity ( info. pk_columns . len ( ) ) ;
235+ for & idx in & info. pk_columns {
236+ match values. get ( idx) {
237+ Some ( v) => pk_values. push ( v. clone ( ) . into ( ) ) ,
238+ None => {
239+ return Err ( crate :: Error :: SchemaMismatch {
240+ table : event. table . clone ( ) ,
241+ expected : info. pk_columns . len ( ) ,
242+ actual : values. len ( ) ,
243+ } ) ;
244+ }
245+ }
181246 }
247+ Ok ( pk_values)
182248 }
183249
184250 /// Converts SqliteValue vec to ColumnValue vec for TableChange.
0 commit comments