diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index d10c1a41d97..7880976b918 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -211,6 +211,16 @@ impl EnvVars { let mapping_handlers = InnerMappingHandlers::init_from_env()?.into(); let store = InnerStore::init_from_env()?.into(); + // The default reorganization (reorg) threshold is set to 250. + // For testing purposes, we need to set this threshold to 0 because: + // 1. Many tests involve reverting blocks. + // 2. Blocks cannot be reverted below the reorg threshold. + // Therefore, during tests, we want to set the reorg threshold to 0. + let reorg_threshold = + inner + .reorg_threshold + .unwrap_or_else(|| if cfg!(debug_assertions) { 0 } else { 250 }); + Ok(Self { graphql, mappings: mapping_handlers, @@ -262,15 +272,13 @@ impl EnvVars { external_http_base_url: inner.external_http_base_url, external_ws_base_url: inner.external_ws_base_url, static_filters_threshold: inner.static_filters_threshold, - reorg_threshold: inner.reorg_threshold, + reorg_threshold: reorg_threshold, ingestor_polling_interval: Duration::from_millis(inner.ingestor_polling_interval), subgraph_settings: inner.subgraph_settings, prefer_substreams_block_streams: inner.prefer_substreams_block_streams, enable_dips_metrics: inner.enable_dips_metrics.0, history_blocks_override: inner.history_blocks_override, - min_history_blocks: inner - .min_history_blocks - .unwrap_or(2 * inner.reorg_threshold), + min_history_blocks: inner.min_history_blocks.unwrap_or(2 * reorg_threshold), dips_metrics_object_store_url: inner.dips_metrics_object_store_url, }) } @@ -392,8 +400,8 @@ struct Inner { #[envconfig(from = "GRAPH_STATIC_FILTERS_THRESHOLD", default = "10000")] static_filters_threshold: usize, // JSON-RPC specific. - #[envconfig(from = "ETHEREUM_REORG_THRESHOLD", default = "250")] - reorg_threshold: BlockNumber, + #[envconfig(from = "ETHEREUM_REORG_THRESHOLD")] + reorg_threshold: Option, #[envconfig(from = "ETHEREUM_POLLING_INTERVAL", default = "1000")] ingestor_polling_interval: u64, #[envconfig(from = "GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS")] diff --git a/node/src/manager/commands/rewind.rs b/node/src/manager/commands/rewind.rs index 429cf39c594..0eae7c67a50 100644 --- a/node/src/manager/commands/rewind.rs +++ b/node/src/manager/commands/rewind.rs @@ -3,15 +3,16 @@ use std::thread; use std::time::Duration; use std::{collections::HashSet, convert::TryFrom}; +use crate::manager::commands::assign::pause_or_resume; +use crate::manager::deployment::{Deployment, DeploymentSearch}; use graph::anyhow::bail; use graph::components::store::{BlockStore as _, ChainStore as _}; +use graph::env::ENV_VARS; use graph::prelude::{anyhow, BlockNumber, BlockPtr}; +use graph_store_postgres::command_support::catalog::{self as store_catalog}; use graph_store_postgres::{connection_pool::ConnectionPool, Store}; use graph_store_postgres::{BlockStore, NotificationSender}; -use crate::manager::commands::assign::pause_or_resume; -use crate::manager::deployment::{Deployment, DeploymentSearch}; - async fn block_ptr( store: Arc, searches: &[DeploymentSearch], @@ -71,6 +72,8 @@ pub async fn run( if !start_block && (block_hash.is_none() || block_number.is_none()) { bail!("--block-hash and --block-number must be specified when --start-block is not set"); } + let pconn = primary.get()?; + let mut conn = store_catalog::Connection::new(pconn); let subgraph_store = store.subgraph_store(); let block_store = store.block_store(); @@ -108,6 +111,27 @@ pub async fn run( ) }; + println!("Checking if its safe to rewind deployments"); + for deployment in &deployments { + let locator = &deployment.locator(); + let site = conn + .locate_site(locator.clone())? + .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; + let deployment_store = subgraph_store.for_site(&site)?; + let deployment_details = deployment_store.deployment_details_for_id(locator)?; + let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0); + + if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold { + bail!( + "The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}", + block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0), + locator, + deployment_details.earliest_block_number, + deployment_details.earliest_block_number + ENV_VARS.reorg_threshold + ); + } + } + println!("Pausing deployments"); for deployment in &deployments { pause_or_resume(primary.clone(), &sender, &deployment.locator(), true)?; diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 741e8387152..180aa00953d 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -14,7 +14,7 @@ use diesel::{ sql_types::{Nullable, Text}, }; use graph::{ - blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError, + blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError, env::ENV_VARS, schema::EntityType, }; use graph::{ @@ -539,18 +539,31 @@ pub fn revert_block_ptr( // Work around a Diesel issue with serializing BigDecimals to numeric let number = format!("{}::numeric", ptr.number); - update(d::table.filter(d::deployment.eq(id.as_str()))) - .set(( - d::latest_ethereum_block_number.eq(sql(&number)), - d::latest_ethereum_block_hash.eq(ptr.hash_slice()), - d::firehose_cursor.eq(firehose_cursor.as_ref()), - d::reorg_count.eq(d::reorg_count + 1), - d::current_reorg_depth.eq(d::current_reorg_depth + 1), - d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")), - )) - .execute(conn) - .map(|_| ()) - .map_err(|e| e.into()) + let affected_rows = update( + d::table + .filter(d::deployment.eq(id.as_str())) + .filter(d::earliest_block_number.le(ptr.number - ENV_VARS.reorg_threshold)), + ) + .set(( + d::latest_ethereum_block_number.eq(sql(&number)), + d::latest_ethereum_block_hash.eq(ptr.hash_slice()), + d::firehose_cursor.eq(firehose_cursor.as_ref()), + d::reorg_count.eq(d::reorg_count + 1), + d::current_reorg_depth.eq(d::current_reorg_depth + 1), + d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")), + )) + .execute(conn)?; + + match affected_rows { + 1 => Ok(()), + 0 => Err(StoreError::Unknown(anyhow!( + "No rows affected. This could be due to an attempt to revert beyond earliest_block + reorg_threshold", + ))), + _ => Err(StoreError::Unknown(anyhow!( + "Expected to update 1 row, but {} rows were affected", + affected_rows + ))), + } } pub fn block_ptr( diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index b3215697377..f9ada4149e1 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -8,8 +8,8 @@ use graph::blockchain::block_stream::FirehoseCursor; use graph::blockchain::BlockTime; use graph::components::store::write::RowGroup; use graph::components::store::{ - Batch, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, - QueryPermit, StoredDynamicDataSource, VersionStats, + Batch, DeploymentLocator, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest, + PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats, }; use graph::components::versions::VERSIONS; use graph::data::query::Trace; @@ -527,6 +527,17 @@ impl DeploymentStore { conn.transaction(|conn| -> Result<_, StoreError> { detail::deployment_details(conn, ids) }) } + pub fn deployment_details_for_id( + &self, + locator: &DeploymentLocator, + ) -> Result { + let id = DeploymentId::from(locator.clone()); + let conn = &mut *self.get_conn()?; + conn.transaction(|conn| -> Result<_, StoreError> { + detail::deployment_details_for_id(conn, &id) + }) + } + pub(crate) fn deployment_statuses( &self, sites: &[Arc], diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index ff7eba291cd..994bae3a4aa 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -50,7 +50,7 @@ pub struct DeploymentDetail { fatal_error: Option, non_fatal_errors: Vec, /// The earliest block for which we have history - earliest_block_number: i32, + pub earliest_block_number: i32, pub latest_ethereum_block_hash: Option, pub latest_ethereum_block_number: Option, last_healthy_ethereum_block_hash: Option, @@ -268,6 +268,18 @@ pub(crate) fn deployment_details( Ok(details) } +/// Return the details for `deployment` +pub(crate) fn deployment_details_for_id( + conn: &mut PgConnection, + deployment: &DeploymentId, +) -> Result { + use subgraph_deployment as d; + d::table + .filter(d::id.eq(&deployment)) + .first::(conn) + .map_err(StoreError::from) +} + pub(crate) fn deployment_statuses( conn: &mut PgConnection, sites: &[Arc], diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index ab5b3cea605..528079071a4 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -262,6 +262,10 @@ impl SubgraphStore { pub fn notification_sender(&self) -> Arc { self.sender.clone() } + + pub fn for_site(&self, site: &Site) -> Result<&Arc, StoreError> { + self.inner.for_site(site) + } } impl std::ops::Deref for SubgraphStore {