Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions engine/packages/depot-client/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use anyhow::{Result, anyhow};
use async_trait::async_trait;
use rivet_envoy_protocol as protocol;
use tokio::runtime::Handle;

use crate::{
Expand All @@ -23,6 +25,30 @@ pub fn vfs_name_for_actor_database(actor_id: &str, generation: u64) -> String {
format!("envoy-sqlite-{actor_id}-g{generation}")
}

struct GenerationFencedTransport {
inner: SqliteTransportHandle,
generation: u64,
}

#[async_trait]
impl crate::vfs::SqliteTransport for GenerationFencedTransport {
async fn get_pages(
&self,
mut request: protocol::SqliteGetPagesRequest,
) -> Result<protocol::SqliteGetPagesResponse> {
request.expected_generation.get_or_insert(self.generation);
self.inner.get_pages(request).await
}

async fn commit(
&self,
mut request: protocol::SqliteCommitRequest,
) -> Result<protocol::SqliteCommitResponse> {
request.expected_generation.get_or_insert(self.generation);
self.inner.commit(request).await
}
}

pub async fn open_database_from_transport(
transport: SqliteTransportHandle,
actor_id: String,
Expand All @@ -32,6 +58,10 @@ pub async fn open_database_from_transport(
) -> Result<NativeDatabaseHandle> {
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
let config = VfsConfig::default();
let transport: SqliteTransportHandle = Arc::new(GenerationFencedTransport {
inner: transport,
generation,
});
let initial_pages = fetch_initial_pages_for_registration(transport.clone(), &actor_id, &config)
.await
.map_err(|e| anyhow!("failed to preload sqlite pages: {e}"))?;
Expand Down
1 change: 1 addition & 0 deletions engine/packages/depot-client/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ async fn fetch_initial_pages(
fn is_initial_main_page_missing(message: &str) -> bool {
message.contains("sqlite database was not found in this bucket branch")
|| message.contains("sqlite meta missing for get_pages")
|| message == "actor does not exist"
}

fn next_temp_aux_path() -> String {
Expand Down
24 changes: 19 additions & 5 deletions engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,8 @@ async fn handle_sqlite_get_pages(
conn: &Conn,
request: protocol::SqliteGetPagesRequest,
) -> Result<protocol::SqliteGetPagesResponse> {
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation)
.await?;

let actor_db = actor_db(ctx, conn, request.actor_id.clone()).await?;
let result = actor_db
Expand Down Expand Up @@ -733,7 +734,8 @@ async fn handle_sqlite_commit(
request: protocol::SqliteCommitRequest,
) -> Result<protocol::SqliteCommitResponse> {
let decode_request_start = Instant::now();
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation)
.await?;
let decode_request_duration = decode_request_start.elapsed();
crate::metrics::SQLITE_COMMIT_ENVOY_DISPATCH_DURATION
.observe(decode_request_duration.as_secs_f64());
Expand Down Expand Up @@ -862,6 +864,19 @@ async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str)
Ok(())
}

async fn validate_sqlite_actor_for_request(
ctx: &StandaloneCtx,
conn: &Conn,
actor_id: &str,
expected_generation: Option<u64>,
) -> Result<()> {
if let Some(generation) = expected_generation {
validate_remote_sqlite_generation(ctx, conn, actor_id, generation).await
} else {
validate_sqlite_actor(ctx, conn, actor_id).await
}
}

async fn validate_remote_sqlite_generation(
ctx: &StandaloneCtx,
conn: &Conn,
Expand Down Expand Up @@ -1125,11 +1140,10 @@ fn depot_error(err: &anyhow::Error) -> Option<&SqliteStorageError> {

fn is_startup_database_miss(
err: &anyhow::Error,
expected_generation: Option<u64>,
_expected_generation: Option<u64>,
expected_head_txid: Option<u64>,
) -> bool {
expected_generation.is_none()
&& expected_head_txid.is_none()
expected_head_txid.is_none()
&& matches!(depot_error(err), Some(SqliteStorageError::DatabaseNotFound))
}

Expand Down
Loading