Skip to content

Commit 6aa9a89

Browse files
authored
graphman: Check if its safe to rewind by comparing block to earliest block (#5423)
* graphman: Check if its safe to rewind by comparing block to earliest block * store/postgres: revert_block_ptr checks if its safe to revert based on earliest block and reorg threshold * graph: change default reorg threshold for tests
1 parent b4605ef commit 6aa9a89

File tree

6 files changed

+97
-25
lines changed

6 files changed

+97
-25
lines changed

graph/src/env/mod.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,16 @@ impl EnvVars {
214214
let mapping_handlers = InnerMappingHandlers::init_from_env()?.into();
215215
let store = InnerStore::init_from_env()?.into();
216216

217+
// The default reorganization (reorg) threshold is set to 250.
218+
// For testing purposes, we need to set this threshold to 0 because:
219+
// 1. Many tests involve reverting blocks.
220+
// 2. Blocks cannot be reverted below the reorg threshold.
221+
// Therefore, during tests, we want to set the reorg threshold to 0.
222+
let reorg_threshold =
223+
inner
224+
.reorg_threshold
225+
.unwrap_or_else(|| if cfg!(debug_assertions) { 0 } else { 250 });
226+
217227
Ok(Self {
218228
graphql,
219229
mappings: mapping_handlers,
@@ -265,15 +275,13 @@ impl EnvVars {
265275
external_http_base_url: inner.external_http_base_url,
266276
external_ws_base_url: inner.external_ws_base_url,
267277
static_filters_threshold: inner.static_filters_threshold,
268-
reorg_threshold: inner.reorg_threshold,
278+
reorg_threshold: reorg_threshold,
269279
ingestor_polling_interval: Duration::from_millis(inner.ingestor_polling_interval),
270280
subgraph_settings: inner.subgraph_settings,
271281
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
272282
enable_dips_metrics: inner.enable_dips_metrics.0,
273283
history_blocks_override: inner.history_blocks_override,
274-
min_history_blocks: inner
275-
.min_history_blocks
276-
.unwrap_or(2 * inner.reorg_threshold),
284+
min_history_blocks: inner.min_history_blocks.unwrap_or(2 * reorg_threshold),
277285
dips_metrics_object_store_url: inner.dips_metrics_object_store_url,
278286
section_map: inner.section_map,
279287
})
@@ -396,8 +404,8 @@ struct Inner {
396404
#[envconfig(from = "GRAPH_STATIC_FILTERS_THRESHOLD", default = "10000")]
397405
static_filters_threshold: usize,
398406
// JSON-RPC specific.
399-
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD", default = "250")]
400-
reorg_threshold: BlockNumber,
407+
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD")]
408+
reorg_threshold: Option<BlockNumber>,
401409
#[envconfig(from = "ETHEREUM_POLLING_INTERVAL", default = "1000")]
402410
ingestor_polling_interval: u64,
403411
#[envconfig(from = "GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS")]

node/src/manager/commands/rewind.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ use std::thread;
33
use std::time::Duration;
44
use std::{collections::HashSet, convert::TryFrom};
55

6+
use crate::manager::commands::assign::pause_or_resume;
7+
use crate::manager::deployment::{Deployment, DeploymentSearch};
68
use graph::anyhow::bail;
79
use graph::components::store::{BlockStore as _, ChainStore as _};
10+
use graph::env::ENV_VARS;
811
use graph::prelude::{anyhow, BlockNumber, BlockPtr};
12+
use graph_store_postgres::command_support::catalog::{self as store_catalog};
913
use graph_store_postgres::{connection_pool::ConnectionPool, Store};
1014
use graph_store_postgres::{BlockStore, NotificationSender};
1115

12-
use crate::manager::commands::assign::pause_or_resume;
13-
use crate::manager::deployment::{Deployment, DeploymentSearch};
14-
1516
async fn block_ptr(
1617
store: Arc<BlockStore>,
1718
searches: &[DeploymentSearch],
@@ -71,6 +72,8 @@ pub async fn run(
7172
if !start_block && (block_hash.is_none() || block_number.is_none()) {
7273
bail!("--block-hash and --block-number must be specified when --start-block is not set");
7374
}
75+
let pconn = primary.get()?;
76+
let mut conn = store_catalog::Connection::new(pconn);
7477

7578
let subgraph_store = store.subgraph_store();
7679
let block_store = store.block_store();
@@ -108,6 +111,27 @@ pub async fn run(
108111
)
109112
};
110113

114+
println!("Checking if its safe to rewind deployments");
115+
for deployment in &deployments {
116+
let locator = &deployment.locator();
117+
let site = conn
118+
.locate_site(locator.clone())?
119+
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
120+
let deployment_store = subgraph_store.for_site(&site)?;
121+
let deployment_details = deployment_store.deployment_details_for_id(locator)?;
122+
let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);
123+
124+
if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold {
125+
bail!(
126+
"The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}",
127+
block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0),
128+
locator,
129+
deployment_details.earliest_block_number,
130+
deployment_details.earliest_block_number + ENV_VARS.reorg_threshold
131+
);
132+
}
133+
}
134+
111135
println!("Pausing deployments");
112136
for deployment in &deployments {
113137
pause_or_resume(primary.clone(), &sender, &deployment.locator(), true)?;

store/postgres/src/deployment.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use diesel::{
1414
sql_types::{Nullable, Text},
1515
};
1616
use graph::{
17-
blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError,
17+
blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError, env::ENV_VARS,
1818
schema::EntityType,
1919
};
2020
use graph::{
@@ -539,18 +539,31 @@ pub fn revert_block_ptr(
539539
// Work around a Diesel issue with serializing BigDecimals to numeric
540540
let number = format!("{}::numeric", ptr.number);
541541

542-
update(d::table.filter(d::deployment.eq(id.as_str())))
543-
.set((
544-
d::latest_ethereum_block_number.eq(sql(&number)),
545-
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
546-
d::firehose_cursor.eq(firehose_cursor.as_ref()),
547-
d::reorg_count.eq(d::reorg_count + 1),
548-
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
549-
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
550-
))
551-
.execute(conn)
552-
.map(|_| ())
553-
.map_err(|e| e.into())
542+
let affected_rows = update(
543+
d::table
544+
.filter(d::deployment.eq(id.as_str()))
545+
.filter(d::earliest_block_number.le(ptr.number - ENV_VARS.reorg_threshold)),
546+
)
547+
.set((
548+
d::latest_ethereum_block_number.eq(sql(&number)),
549+
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
550+
d::firehose_cursor.eq(firehose_cursor.as_ref()),
551+
d::reorg_count.eq(d::reorg_count + 1),
552+
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
553+
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
554+
))
555+
.execute(conn)?;
556+
557+
match affected_rows {
558+
1 => Ok(()),
559+
0 => Err(StoreError::Unknown(anyhow!(
560+
"No rows affected. This could be due to an attempt to revert beyond earliest_block + reorg_threshold",
561+
))),
562+
_ => Err(StoreError::Unknown(anyhow!(
563+
"Expected to update 1 row, but {} rows were affected",
564+
affected_rows
565+
))),
566+
}
554567
}
555568

556569
pub fn block_ptr(

store/postgres/src/deployment_store.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use graph::blockchain::block_stream::FirehoseCursor;
88
use graph::blockchain::BlockTime;
99
use graph::components::store::write::RowGroup;
1010
use graph::components::store::{
11-
Batch, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest, PruningStrategy,
12-
QueryPermit, StoredDynamicDataSource, VersionStats,
11+
Batch, DeploymentLocator, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest,
12+
PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats,
1313
};
1414
use graph::components::versions::VERSIONS;
1515
use graph::data::query::Trace;
@@ -527,6 +527,17 @@ impl DeploymentStore {
527527
conn.transaction(|conn| -> Result<_, StoreError> { detail::deployment_details(conn, ids) })
528528
}
529529

530+
pub fn deployment_details_for_id(
531+
&self,
532+
locator: &DeploymentLocator,
533+
) -> Result<DeploymentDetail, StoreError> {
534+
let id = DeploymentId::from(locator.clone());
535+
let conn = &mut *self.get_conn()?;
536+
conn.transaction(|conn| -> Result<_, StoreError> {
537+
detail::deployment_details_for_id(conn, &id)
538+
})
539+
}
540+
530541
pub(crate) fn deployment_statuses(
531542
&self,
532543
sites: &[Arc<Site>],

store/postgres/src/detail.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub struct DeploymentDetail {
5050
fatal_error: Option<String>,
5151
non_fatal_errors: Vec<String>,
5252
/// The earliest block for which we have history
53-
earliest_block_number: i32,
53+
pub earliest_block_number: i32,
5454
pub latest_ethereum_block_hash: Option<Bytes>,
5555
pub latest_ethereum_block_number: Option<BigDecimal>,
5656
last_healthy_ethereum_block_hash: Option<Bytes>,
@@ -268,6 +268,18 @@ pub(crate) fn deployment_details(
268268
Ok(details)
269269
}
270270

271+
/// Return the details for `deployment`
272+
pub(crate) fn deployment_details_for_id(
273+
conn: &mut PgConnection,
274+
deployment: &DeploymentId,
275+
) -> Result<DeploymentDetail, StoreError> {
276+
use subgraph_deployment as d;
277+
d::table
278+
.filter(d::id.eq(&deployment))
279+
.first::<DeploymentDetail>(conn)
280+
.map_err(StoreError::from)
281+
}
282+
271283
pub(crate) fn deployment_statuses(
272284
conn: &mut PgConnection,
273285
sites: &[Arc<Site>],

store/postgres/src/subgraph_store.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ impl SubgraphStore {
262262
pub fn notification_sender(&self) -> Arc<NotificationSender> {
263263
self.sender.clone()
264264
}
265+
266+
pub fn for_site(&self, site: &Site) -> Result<&Arc<DeploymentStore>, StoreError> {
267+
self.inner.for_site(site)
268+
}
265269
}
266270

267271
impl std::ops::Deref for SubgraphStore {

0 commit comments

Comments
 (0)