diff --git a/engine/packages/depot-client/src/database.rs b/engine/packages/depot-client/src/database.rs index 996784b6c6..91cd2f2010 100644 --- a/engine/packages/depot-client/src/database.rs +++ b/engine/packages/depot-client/src/database.rs @@ -1,6 +1,8 @@ use std::sync::Arc; use anyhow::{Result, anyhow}; +use async_trait::async_trait; +use rivet_envoy_protocol as protocol; use tokio::runtime::Handle; use crate::{ @@ -23,6 +25,30 @@ pub fn vfs_name_for_actor_database(actor_id: &str, generation: u64) -> String { format!("envoy-sqlite-{actor_id}-g{generation}") } +struct GenerationFencedTransport { + inner: SqliteTransportHandle, + generation: u64, +} + +#[async_trait] +impl crate::vfs::SqliteTransport for GenerationFencedTransport { + async fn get_pages( + &self, + mut request: protocol::SqliteGetPagesRequest, + ) -> Result { + request.expected_generation.get_or_insert(self.generation); + self.inner.get_pages(request).await + } + + async fn commit( + &self, + mut request: protocol::SqliteCommitRequest, + ) -> Result { + request.expected_generation.get_or_insert(self.generation); + self.inner.commit(request).await + } +} + pub async fn open_database_from_transport( transport: SqliteTransportHandle, actor_id: String, @@ -32,6 +58,10 @@ pub async fn open_database_from_transport( ) -> Result { let vfs_name = vfs_name_for_actor_database(&actor_id, generation); let config = VfsConfig::default(); + let transport: SqliteTransportHandle = Arc::new(GenerationFencedTransport { + inner: transport, + generation, + }); let initial_pages = fetch_initial_pages_for_registration(transport.clone(), &actor_id, &config) .await .map_err(|e| anyhow!("failed to preload sqlite pages: {e}"))?; diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index f9ac4dcd41..e447c4d2a6 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -1854,6 +1854,7 @@ async fn fetch_initial_pages( fn is_initial_main_page_missing(message: &str) -> bool { message.contains("sqlite database was not found in this bucket branch") || message.contains("sqlite meta missing for get_pages") + || message == "actor does not exist" } fn next_temp_aux_path() -> String { diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index b9a33dfd80..ef161a38c8 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -698,7 +698,8 @@ async fn handle_sqlite_get_pages( conn: &Conn, request: protocol::SqliteGetPagesRequest, ) -> Result { - validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation) + .await?; let actor_db = actor_db(ctx, conn, request.actor_id.clone()).await?; let result = actor_db @@ -733,7 +734,8 @@ async fn handle_sqlite_commit( request: protocol::SqliteCommitRequest, ) -> Result { let decode_request_start = Instant::now(); - validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation) + .await?; let decode_request_duration = decode_request_start.elapsed(); crate::metrics::SQLITE_COMMIT_ENVOY_DISPATCH_DURATION .observe(decode_request_duration.as_secs_f64()); @@ -862,6 +864,19 @@ async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str) Ok(()) } +async fn validate_sqlite_actor_for_request( + ctx: &StandaloneCtx, + conn: &Conn, + actor_id: &str, + expected_generation: Option, +) -> Result<()> { + if let Some(generation) = expected_generation { + validate_remote_sqlite_generation(ctx, conn, actor_id, generation).await + } else { + validate_sqlite_actor(ctx, conn, actor_id).await + } +} + async fn validate_remote_sqlite_generation( ctx: &StandaloneCtx, conn: &Conn, @@ -1125,11 +1140,10 @@ fn depot_error(err: &anyhow::Error) -> Option<&SqliteStorageError> { fn is_startup_database_miss( err: &anyhow::Error, - expected_generation: Option, + _expected_generation: Option, expected_head_txid: Option, ) -> bool { - expected_generation.is_none() - && expected_head_txid.is_none() + expected_head_txid.is_none() && matches!(depot_error(err), Some(SqliteStorageError::DatabaseNotFound)) }