From c88830441de6e07d4ee8f65ddc090a7144734bcf Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 17 Jan 2024 18:20:02 +0200 Subject: [PATCH 1/4] (state-indexer): Bug with indexing epoch validators info --- database/src/base/state_indexer.rs | 11 +++ .../2023-12-07-144820_epochs/up.sql | 2 + database/src/postgres/models.rs | 28 +++++++ database/src/postgres/schema.rs | 2 + database/src/postgres/state_indexer.rs | 37 +++++++++ database/src/scylladb/state_indexer.rs | 83 +++++++++++++++++-- epoch-indexer/src/config.rs | 4 +- epoch-indexer/src/lib.rs | 82 +++++++++++++----- epoch-indexer/src/main.rs | 43 +++++++--- readnode-primitives/src/lib.rs | 8 ++ 10 files changed, 257 insertions(+), 43 deletions(-) diff --git a/database/src/base/state_indexer.rs b/database/src/base/state_indexer.rs index 0fa72bdb..844d38a5 100644 --- a/database/src/base/state_indexer.rs +++ b/database/src/base/state_indexer.rs @@ -104,6 +104,11 @@ pub trait StateIndexerDbManager { )>, ) -> anyhow::Result<()>; + async fn get_block_by_hash( + &self, + block_hash: near_primitives::hash::CryptoHash, + ) -> anyhow::Result; + async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()>; async fn get_last_processed_block_height(&self, indexer_id: &str) -> anyhow::Result; async fn add_validators( @@ -121,4 +126,10 @@ pub trait StateIndexerDbManager { epoch_start_height: u64, protocol_config: &near_chain_configs::ProtocolConfigView, ) -> anyhow::Result<()>; + + async fn update_epoch_end_height( + &self, + epoch_id: near_indexer_primitives::CryptoHash, + epoch_end_block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result<()>; } diff --git a/database/src/postgres/migrations/2023-12-07-144820_epochs/up.sql b/database/src/postgres/migrations/2023-12-07-144820_epochs/up.sql index 065efc7c..59793081 100644 --- a/database/src/postgres/migrations/2023-12-07-144820_epochs/up.sql +++ b/database/src/postgres/migrations/2023-12-07-144820_epochs/up.sql @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS validators ( epoch_id text NOT NULL, epoch_height numeric(20,0) NOT NULL, epoch_start_height numeric(20,0) NOT NULL, + epoch_end_height numeric(20,0) NULL, validators_info jsonb NOT NULL ); @@ -13,6 +14,7 @@ CREATE TABLE IF NOT EXISTS protocol_configs ( epoch_id text NOT NULL, epoch_height numeric(20,0) NOT NULL, epoch_start_height numeric(20,0) NOT NULL, + epoch_end_height numeric(20,0) NULL, protocol_config jsonb NOT NULL ); diff --git a/database/src/postgres/models.rs b/database/src/postgres/models.rs index d49eb5ff..33c43e48 100644 --- a/database/src/postgres/models.rs +++ b/database/src/postgres/models.rs @@ -667,6 +667,7 @@ pub struct Validators { pub epoch_id: String, pub epoch_height: bigdecimal::BigDecimal, pub epoch_start_height: bigdecimal::BigDecimal, + pub epoch_end_height: Option, pub validators_info: serde_json::Value, } @@ -683,6 +684,19 @@ impl Validators { Ok(()) } + pub async fn update_epoch_end_height( + mut conn: crate::postgres::PgAsyncConn, + epoch_id: near_indexer_primitives::CryptoHash, + epoch_end_height: bigdecimal::BigDecimal, + ) -> anyhow::Result<()> { + diesel::update(validators::table) + .filter(validators::epoch_id.eq(epoch_id.to_string())) + .set(validators::epoch_end_height.eq(epoch_end_height)) + .execute(&mut conn) + .await?; + Ok(()) + } + pub async fn get_validators( mut conn: crate::postgres::PgAsyncConn, epoch_id: near_indexer_primitives::CryptoHash, @@ -703,6 +717,7 @@ pub struct ProtocolConfig { pub epoch_id: String, pub epoch_height: bigdecimal::BigDecimal, pub epoch_start_height: bigdecimal::BigDecimal, + pub epoch_end_height: Option, pub protocol_config: serde_json::Value, } @@ -719,6 +734,19 @@ impl ProtocolConfig { Ok(()) } + pub async fn update_epoch_end_height( + mut conn: crate::postgres::PgAsyncConn, + epoch_id: near_indexer_primitives::CryptoHash, + epoch_end_height: bigdecimal::BigDecimal, + ) -> anyhow::Result<()> { + diesel::update(protocol_configs::table) + .filter(protocol_configs::epoch_id.eq(epoch_id.to_string())) + .set(protocol_configs::epoch_end_height.eq(epoch_end_height)) + .execute(&mut conn) + .await?; + Ok(()) + } + pub async fn get_protocol_config( mut conn: crate::postgres::PgAsyncConn, epoch_id: near_indexer_primitives::CryptoHash, diff --git a/database/src/postgres/schema.rs b/database/src/postgres/schema.rs index f71855cc..2d7f1f2b 100644 --- a/database/src/postgres/schema.rs +++ b/database/src/postgres/schema.rs @@ -35,6 +35,7 @@ diesel::table! { epoch_id -> Text, epoch_height -> Numeric, epoch_start_height -> Numeric, + epoch_end_height -> Nullable, protocol_config -> Jsonb, } } @@ -127,6 +128,7 @@ diesel::table! { epoch_id -> Text, epoch_height -> Numeric, epoch_start_height -> Numeric, + epoch_end_height -> Nullable, validators_info -> Jsonb, } } diff --git a/database/src/postgres/state_indexer.rs b/database/src/postgres/state_indexer.rs index c2f088d7..d5a4b7bd 100644 --- a/database/src/postgres/state_indexer.rs +++ b/database/src/postgres/state_indexer.rs @@ -279,6 +279,20 @@ impl crate::StateIndexerDbManager for PostgresDBManager { .await } + async fn get_block_by_hash( + &self, + block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result { + let block_height = crate::models::Block::get_block_height_by_hash( + Self::get_connection(&self.pg_pool).await?, + block_hash, + ) + .await?; + block_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `block_height` to u64")) + } + async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()> { crate::models::Meta { indexer_id: indexer_id.to_string(), @@ -309,6 +323,7 @@ impl crate::StateIndexerDbManager for PostgresDBManager { epoch_id: epoch_id.to_string(), epoch_height: bigdecimal::BigDecimal::from(epoch_height), epoch_start_height: bigdecimal::BigDecimal::from(epoch_start_height), + epoch_end_height: None, validators_info: serde_json::to_value(validators_info)?, } .insert_or_ignore(Self::get_connection(&self.pg_pool).await?) @@ -327,10 +342,32 @@ impl crate::StateIndexerDbManager for PostgresDBManager { epoch_id: epoch_id.to_string(), epoch_height: bigdecimal::BigDecimal::from(epoch_height), epoch_start_height: bigdecimal::BigDecimal::from(epoch_start_height), + epoch_end_height: None, protocol_config: serde_json::to_value(protocol_config)?, } .insert_or_ignore(Self::get_connection(&self.pg_pool).await?) .await?; Ok(()) } + + async fn update_epoch_end_height( + &self, + epoch_id: near_indexer_primitives::CryptoHash, + epoch_end_block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result<()> { + let epoch_end_height = self.get_block_by_hash(epoch_end_block_hash).await?; + + let validators_future = crate::models::Validators::update_epoch_end_height( + Self::get_connection(&self.pg_pool).await?, + epoch_id, + bigdecimal::BigDecimal::from(epoch_end_height), + ); + let protocol_config_future = crate::models::ProtocolConfig::update_epoch_end_height( + Self::get_connection(&self.pg_pool).await?, + epoch_id, + bigdecimal::BigDecimal::from(epoch_end_height), + ); + futures::future::try_join(validators_future, protocol_config_future).await?; + Ok(()) + } } diff --git a/database/src/scylladb/state_indexer.rs b/database/src/scylladb/state_indexer.rs index 466001ce..7e357bec 100644 --- a/database/src/scylladb/state_indexer.rs +++ b/database/src/scylladb/state_indexer.rs @@ -23,10 +23,16 @@ pub struct ScyllaDBManager { delete_account: PreparedStatement, add_block: PreparedStatement, + get_block_by_hash: PreparedStatement, add_chunk: PreparedStatement, + add_validators: PreparedStatement, add_protocol_config: PreparedStatement, + update_validators_epoch_end_height: PreparedStatement, + update_protocol_config_epoch_end_height: PreparedStatement, + add_account_state: PreparedStatement, + update_meta: PreparedStatement, last_processed_block_height: PreparedStatement, } @@ -158,6 +164,7 @@ impl ScyllaStorageManager for ScyllaDBManager { epoch_id varchar, epoch_height varint, epoch_start_height varint, + epoch_end_height varint, validators_info text, PRIMARY KEY (epoch_id) ) @@ -173,6 +180,7 @@ impl ScyllaStorageManager for ScyllaDBManager { epoch_id varchar, epoch_height varint, epoch_start_height varint, + epoch_end_height varint, protocol_config text, PRIMARY KEY (epoch_id) ) @@ -344,6 +352,10 @@ impl ScyllaStorageManager for ScyllaDBManager { VALUES (?, ?)", ) .await?, + get_block_by_hash: Self::prepare_read_query( + &scylla_db_session, + "SELECT block_height FROM state_indexer.blocks WHERE block_hash = ?", + ).await?, add_chunk: Self::prepare_write_query( &scylla_db_session, "INSERT INTO state_indexer.chunks @@ -354,15 +366,25 @@ impl ScyllaStorageManager for ScyllaDBManager { add_validators: Self::prepare_write_query( &scylla_db_session, "INSERT INTO state_indexer.validators - (epoch_id, epoch_height, epoch_start_height, validators_info) - VALUES (?, ?, ?, ?)", + (epoch_id, epoch_height, epoch_start_height, epoch_end_height, validators_info) + VALUES (?, ?, ?, NULL, ?)", ) .await?, add_protocol_config: Self::prepare_write_query( &scylla_db_session, "INSERT INTO state_indexer.protocol_configs - (epoch_id, epoch_height, epoch_start_height, protocol_config) - VALUES (?, ?, ?, ?)", + (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config) + VALUES (?, ?, ?, NULL, ?)", + ) + .await?, + update_validators_epoch_end_height: Self::prepare_write_query( + &scylla_db_session, + "UPDATE state_indexer.validators SET epoch_end_height = ? WHERE epoch_id = ?", + ) + .await?, + update_protocol_config_epoch_end_height: Self::prepare_write_query( + &scylla_db_session, + "UPDATE state_indexer.protocol_configs SET epoch_end_height = ? WHERE epoch_id = ?", ) .await?, add_account_state: Self::prepare_write_query( @@ -514,10 +536,10 @@ impl crate::StateIndexerDbManager for ScyllaDBManager { ) -> anyhow::Result<()> { let public_key_hex = hex::encode(public_key).to_string(); - let mut account_keys = match self.get_access_keys(account_id.clone(), block_height).await { - Ok(account_keys) => account_keys, - Err(_) => std::collections::HashMap::new(), - }; + let mut account_keys = self + .get_access_keys(account_id.clone(), block_height) + .await + .unwrap_or_default(); match access_key { Some(access_key) => { @@ -676,6 +698,24 @@ impl crate::StateIndexerDbManager for ScyllaDBManager { Ok(()) } + async fn get_block_by_hash( + &self, + block_hash: near_primitives::hash::CryptoHash, + ) -> anyhow::Result { + let (result,) = Self::execute_prepared_query( + &self.scylla_session, + &self.get_block_by_hash, + (block_hash.to_string(),), + ) + .await? + .single_row()? + .into_typed::<(num_bigint::BigInt,)>()?; + + result + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `block_height` to u64")) + } + async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()> { Self::execute_prepared_query( &self.scylla_session, @@ -741,4 +781,31 @@ impl crate::StateIndexerDbManager for ScyllaDBManager { .await?; Ok(()) } + + async fn update_epoch_end_height( + &self, + epoch_id: near_indexer_primitives::CryptoHash, + epoch_end_block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result<()> { + let epoch_end_height = self.get_block_by_hash(epoch_end_block_hash).await?; + + let validators_future = Self::execute_prepared_query( + &self.scylla_session, + &self.update_validators_epoch_end_height, + ( + num_bigint::BigInt::from(epoch_end_height), + epoch_id.to_string(), + ), + ); + let protocol_config_future = Self::execute_prepared_query( + &self.scylla_session, + &self.update_protocol_config_epoch_end_height, + ( + num_bigint::BigInt::from(epoch_end_height), + epoch_id.to_string(), + ), + ); + futures::future::try_join(validators_future, protocol_config_future).await?; + Ok(()) + } } diff --git a/epoch-indexer/src/config.rs b/epoch-indexer/src/config.rs index aaf836a5..f7cbe1c9 100644 --- a/epoch-indexer/src/config.rs +++ b/epoch-indexer/src/config.rs @@ -103,8 +103,8 @@ impl Opts { pub fn rpc_url(&self) -> &str { match &self.chain_id { - ChainId::Mainnet(_) => "https://rpc.mainnet.near.org", - ChainId::Testnet(_) => "https://rpc.testnet.near.org", + ChainId::Mainnet(_) => "https://archival-rpc.mainnet.near.org", + ChainId::Testnet(_) => "https://archival-rpc.testnet.near.org", } } pub async fn to_s3_client(&self) -> near_lake_framework::s3_fetchers::LakeS3Client { diff --git a/epoch-indexer/src/lib.rs b/epoch-indexer/src/lib.rs index 2683d1fa..eb196e06 100644 --- a/epoch-indexer/src/lib.rs +++ b/epoch-indexer/src/lib.rs @@ -92,6 +92,7 @@ pub async fn get_epoch_info_by_id( epoch_id, epoch_height: validators_info.epoch_height, epoch_start_height: validators_info.epoch_start_height, + epoch_end_height: None, validators_info, protocol_config, }) @@ -102,7 +103,7 @@ pub async fn get_epoch_info_by_block_height( s3_client: &near_lake_framework::s3_fetchers::LakeS3Client, s3_bucket_name: &str, rpc_client: &near_jsonrpc_client::JsonRpcClient, -) -> anyhow::Result { +) -> anyhow::Result { let block_heights = near_lake_framework::s3_fetchers::list_block_heights( s3_client, s3_bucket_name, @@ -115,41 +116,82 @@ pub async fn get_epoch_info_by_block_height( block_heights[0], ) .await?; - get_epoch_info_by_id(block.header.epoch_id, rpc_client).await + let epoch_info = get_epoch_info_by_id(block.header.epoch_id, rpc_client).await?; + + Ok( + readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId { + previous_epoch_id: None, + epoch_info, + next_epoch_id: block.header.next_epoch_id, + }, + ) + } pub async fn first_epoch( + s3_client: &near_lake_framework::s3_fetchers::LakeS3Client, + s3_bucket_name: &str, rpc_client: &near_jsonrpc_client::JsonRpcClient, -) -> anyhow::Result { - get_epoch_info_by_id(CryptoHash::default(), rpc_client).await +) -> anyhow::Result { + let epoch_info = get_epoch_info_by_id(CryptoHash::default(), rpc_client).await?; + let first_epoch_block = near_lake_framework::s3_fetchers::fetch_block_or_retry( + s3_client, + s3_bucket_name, + epoch_info.epoch_start_height, + ) + .await?; + Ok(readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId { + previous_epoch_id: None, + epoch_info, + next_epoch_id: first_epoch_block.header.next_epoch_id, + }) } pub async fn get_next_epoch( - current_epoch: &readnode_primitives::IndexedEpochInfo, + current_epoch: &readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId, s3_client: &near_lake_framework::s3_fetchers::LakeS3Client, s3_bucket_name: &str, rpc_client: &near_jsonrpc_client::JsonRpcClient, -) -> anyhow::Result { - let block_heights = near_lake_framework::s3_fetchers::list_block_heights( - s3_client, - s3_bucket_name, - current_epoch.epoch_start_height, - ) - .await?; - let epoch_first_block = near_lake_framework::s3_fetchers::fetch_block_or_retry( +) -> anyhow::Result { + + let mut epoch_info = get_epoch_info_by_id(current_epoch.next_epoch_id, rpc_client).await?; + + let epoch_info_first_block = near_lake_framework::s3_fetchers::fetch_block_or_retry( s3_client, s3_bucket_name, - block_heights[0], + epoch_info.epoch_start_height, ) - .await?; - let next_epoch_id = epoch_first_block.header.next_epoch_id; - let mut epoch_info = get_epoch_info_by_id(next_epoch_id, rpc_client).await?; - if current_epoch.epoch_id == CryptoHash::default() { + .await?; + if current_epoch.epoch_info.epoch_id == CryptoHash::default() { epoch_info.epoch_height = 1; } else { - epoch_info.epoch_height = current_epoch.epoch_height + 1 + epoch_info.epoch_height = current_epoch.epoch_info.epoch_height + 1 }; - Ok(epoch_info) + Ok( + readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId { + previous_epoch_id: Some(current_epoch.epoch_info.epoch_id), + epoch_info, + next_epoch_id: epoch_info_first_block.header.next_epoch_id, + }, + ) +} + +pub async fn update_epoch_end_height( + db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static), + epoch_id: Option, + epoch_end_block_hash: CryptoHash, +) -> anyhow::Result<()> { + if let Some(epoch_id) = epoch_id { + tracing::info!( + "Update epoch_end_height: epoch_id: {:?}, epoch_end_height: {}", + epoch_id, + epoch_end_block_hash + ); + db_manager + .update_epoch_end_height(epoch_id, epoch_end_block_hash) + .await?; + } + Ok(()) } pub async fn save_epoch_info( diff --git a/epoch-indexer/src/main.rs b/epoch-indexer/src/main.rs index 08da2f5c..391e3d07 100644 --- a/epoch-indexer/src/main.rs +++ b/epoch-indexer/src/main.rs @@ -12,25 +12,38 @@ async fn index_epochs( db_manager: impl StateIndexerDbManager + Sync + Send + 'static, rpc_client: near_jsonrpc_client::JsonRpcClient, indexer_id: &str, - start_epoch: readnode_primitives::IndexedEpochInfo, + start_epoch: readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId, ) -> anyhow::Result<()> { let mut epoch = start_epoch; loop { - epoch = match epoch_indexer::get_next_epoch(&epoch, s3_client, s3_bucket_name, &rpc_client) - .await - { - Ok(epoch) => epoch, - Err(e) => { - anyhow::bail!("Error fetching next epoch: {:?}", e); - } - }; + let epoch_info = + match epoch_indexer::get_next_epoch(&epoch, s3_client, s3_bucket_name, &rpc_client) + .await + { + Ok(next_epoch) => next_epoch, + Err(e) => { + anyhow::bail!("Error fetching next epoch: {:?}", e); + } + }; - if let Err(e) = epoch_indexer::save_epoch_info(&epoch, &db_manager, None).await { + if let Err(e) = + epoch_indexer::save_epoch_info(&epoch_info.epoch_info, &db_manager, None).await + { tracing::warn!("Error saving epoch info: {:?}", e); } + if let Err(e) = epoch_indexer::update_epoch_end_height( + &db_manager, + epoch_info.previous_epoch_id, + epoch_info.next_epoch_id, + ) + .await + { + tracing::warn!("Error update epoch_end_height: {:?}", e); + } db_manager - .update_meta(indexer_id, epoch.epoch_start_height) + .update_meta(indexer_id, epoch.epoch_info.epoch_start_height) .await?; + epoch = epoch_info; } } @@ -97,7 +110,11 @@ async fn main() -> anyhow::Result<()> { let rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url()); let epoch = match opts.start_options() { - StartOptions::FromGenesis => epoch_indexer::first_epoch(&rpc_client).await?, + StartOptions::FromGenesis => epoch_indexer::first_epoch( + &s3_client, + &opts.s3_bucket_name, + &rpc_client, + ).await?, StartOptions::FromInterruption => { let block_height = db_manager .get_last_processed_block_height(opts.indexer_id.as_str()) @@ -111,7 +128,7 @@ async fn main() -> anyhow::Result<()> { .await? } }; - epoch_indexer::save_epoch_info(&epoch, &db_manager, None).await?; + epoch_indexer::save_epoch_info(&epoch.epoch_info, &db_manager, None).await?; index_epochs( &s3_client, diff --git a/readnode-primitives/src/lib.rs b/readnode-primitives/src/lib.rs index 65b5e53e..69a47bf3 100644 --- a/readnode-primitives/src/lib.rs +++ b/readnode-primitives/src/lib.rs @@ -180,10 +180,18 @@ pub struct IndexedEpochInfo { pub epoch_id: CryptoHash, pub epoch_height: u64, pub epoch_start_height: u64, + pub epoch_end_height: Option, pub validators_info: views::EpochValidatorInfo, pub protocol_config: near_chain_configs::ProtocolConfigView, } +#[derive(Debug)] +pub struct IndexedEpochInfoWithPreviousAndNextEpochId { + pub previous_epoch_id: Option, + pub epoch_info: IndexedEpochInfo, + pub next_epoch_id: CryptoHash, +} + // TryFrom impls for defined types impl TryFrom<(T, T)> for BlockHeightShardId From 5c3c004c6e46988be72cba8ec6032fd3d637c218 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 18 Jan 2024 16:20:03 +0200 Subject: [PATCH 2/4] improvement state-indexer handle epochs --- database/src/postgres/state_indexer.rs | 2 +- database/src/scylladb/state_indexer.rs | 9 +-- epoch-indexer/src/config.rs | 4 +- epoch-indexer/src/lib.rs | 22 +++---- epoch-indexer/src/main.rs | 8 +-- state-indexer/src/main.rs | 84 ++++++++++++-------------- 6 files changed, 60 insertions(+), 69 deletions(-) diff --git a/database/src/postgres/state_indexer.rs b/database/src/postgres/state_indexer.rs index d5a4b7bd..ba1c8a1e 100644 --- a/database/src/postgres/state_indexer.rs +++ b/database/src/postgres/state_indexer.rs @@ -287,7 +287,7 @@ impl crate::StateIndexerDbManager for PostgresDBManager { Self::get_connection(&self.pg_pool).await?, block_hash, ) - .await?; + .await?; block_height .to_u64() .ok_or_else(|| anyhow::anyhow!("Failed to parse `block_height` to u64")) diff --git a/database/src/scylladb/state_indexer.rs b/database/src/scylladb/state_indexer.rs index 7e357bec..9cffaa82 100644 --- a/database/src/scylladb/state_indexer.rs +++ b/database/src/scylladb/state_indexer.rs @@ -355,7 +355,8 @@ impl ScyllaStorageManager for ScyllaDBManager { get_block_by_hash: Self::prepare_read_query( &scylla_db_session, "SELECT block_height FROM state_indexer.blocks WHERE block_hash = ?", - ).await?, + ) + .await?, add_chunk: Self::prepare_write_query( &scylla_db_session, "INSERT INTO state_indexer.chunks @@ -707,9 +708,9 @@ impl crate::StateIndexerDbManager for ScyllaDBManager { &self.get_block_by_hash, (block_hash.to_string(),), ) - .await? - .single_row()? - .into_typed::<(num_bigint::BigInt,)>()?; + .await? + .single_row()? + .into_typed::<(num_bigint::BigInt,)>()?; result .to_u64() diff --git a/epoch-indexer/src/config.rs b/epoch-indexer/src/config.rs index f7cbe1c9..aaf836a5 100644 --- a/epoch-indexer/src/config.rs +++ b/epoch-indexer/src/config.rs @@ -103,8 +103,8 @@ impl Opts { pub fn rpc_url(&self) -> &str { match &self.chain_id { - ChainId::Mainnet(_) => "https://archival-rpc.mainnet.near.org", - ChainId::Testnet(_) => "https://archival-rpc.testnet.near.org", + ChainId::Mainnet(_) => "https://rpc.mainnet.near.org", + ChainId::Testnet(_) => "https://rpc.testnet.near.org", } } pub async fn to_s3_client(&self) -> near_lake_framework::s3_fetchers::LakeS3Client { diff --git a/epoch-indexer/src/lib.rs b/epoch-indexer/src/lib.rs index eb196e06..e0a9d709 100644 --- a/epoch-indexer/src/lib.rs +++ b/epoch-indexer/src/lib.rs @@ -125,7 +125,6 @@ pub async fn get_epoch_info_by_block_height( next_epoch_id: block.header.next_epoch_id, }, ) - } pub async fn first_epoch( @@ -139,12 +138,14 @@ pub async fn first_epoch( s3_bucket_name, epoch_info.epoch_start_height, ) - .await?; - Ok(readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId { - previous_epoch_id: None, - epoch_info, - next_epoch_id: first_epoch_block.header.next_epoch_id, - }) + .await?; + Ok( + readnode_primitives::IndexedEpochInfoWithPreviousAndNextEpochId { + previous_epoch_id: None, + epoch_info, + next_epoch_id: first_epoch_block.header.next_epoch_id, + }, + ) } pub async fn get_next_epoch( @@ -153,7 +154,6 @@ pub async fn get_next_epoch( s3_bucket_name: &str, rpc_client: &near_jsonrpc_client::JsonRpcClient, ) -> anyhow::Result { - let mut epoch_info = get_epoch_info_by_id(current_epoch.next_epoch_id, rpc_client).await?; let epoch_info_first_block = near_lake_framework::s3_fetchers::fetch_block_or_retry( @@ -161,7 +161,7 @@ pub async fn get_next_epoch( s3_bucket_name, epoch_info.epoch_start_height, ) - .await?; + .await?; if current_epoch.epoch_info.epoch_id == CryptoHash::default() { epoch_info.epoch_height = 1; } else { @@ -183,7 +183,7 @@ pub async fn update_epoch_end_height( ) -> anyhow::Result<()> { if let Some(epoch_id) = epoch_id { tracing::info!( - "Update epoch_end_height: epoch_id: {:?}, epoch_end_height: {}", + "Update epoch_end_height: epoch_id: {:?}, epoch_end_height_hash: {}", epoch_id, epoch_end_block_hash ); @@ -223,7 +223,7 @@ pub async fn save_epoch_info( tracing::info!( "Save epoch info: epoch_id: {:?}, epoch_height: {:?}, epoch_start_height: {}", epoch.epoch_id, - epoch.epoch_height, + epoch_height, epoch.epoch_start_height, ); Ok(()) diff --git a/epoch-indexer/src/main.rs b/epoch-indexer/src/main.rs index 391e3d07..fd842f39 100644 --- a/epoch-indexer/src/main.rs +++ b/epoch-indexer/src/main.rs @@ -110,11 +110,9 @@ async fn main() -> anyhow::Result<()> { let rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url()); let epoch = match opts.start_options() { - StartOptions::FromGenesis => epoch_indexer::first_epoch( - &s3_client, - &opts.s3_bucket_name, - &rpc_client, - ).await?, + StartOptions::FromGenesis => { + epoch_indexer::first_epoch(&s3_client, &opts.s3_bucket_name, &rpc_client).await? + } StartOptions::FromInterruption => { let block_height = db_manager .get_last_processed_block_height(opts.indexer_id.as_str()) diff --git a/state-indexer/src/main.rs b/state-indexer/src/main.rs index eea5dbb3..16b403cc 100644 --- a/state-indexer/src/main.rs +++ b/state-indexer/src/main.rs @@ -29,28 +29,19 @@ async fn handle_streamer_message( ) -> anyhow::Result<()> { let block_height = streamer_message.block.header.height; let block_hash = streamer_message.block.header.hash; - let new_epoch_id = streamer_message.block.header.epoch_id; - tracing::debug!(target: INDEXER, "Block height {}", block_height,); + let current_epoch_id = streamer_message.block.header.epoch_id; + let next_epoch_id = streamer_message.block.header.next_epoch_id; - // handle first indexing epoch - let mut stats_lock = stats.write().await; - let current_epoch_id = if let Some(current_epoch_id) = stats_lock.current_epoch_id { - current_epoch_id - } else { - let (epoch_id, height) = - handle_epoch(new_epoch_id, None, stats_lock.current_epoch_height, rpc_client, db_manager).await?; - stats_lock.current_epoch_id = Some(epoch_id); - stats_lock.current_epoch_height = height; - epoch_id - }; - drop(stats_lock); + tracing::debug!(target: INDEXER, "Block height {}", block_height,); stats.write().await.block_heights_processing.insert(block_height); + let handle_epoch_future = handle_epoch( - new_epoch_id, - Some(current_epoch_id), + stats.read().await.current_epoch_id, stats.read().await.current_epoch_height, + current_epoch_id, + next_epoch_id, rpc_client, db_manager, ); @@ -80,9 +71,20 @@ async fn handle_streamer_message( stats_lock.block_heights_processing.remove(&block_height); stats_lock.blocks_processed_count += 1; stats_lock.last_processed_block_height = block_height; - if current_epoch_id != new_epoch_id { - stats_lock.current_epoch_id = Some(new_epoch_id); - stats_lock.current_epoch_height += 1; + if let Some(stats_epoch_id) = stats_lock.current_epoch_id { + if current_epoch_id != stats_epoch_id { + stats_lock.current_epoch_id = Some(current_epoch_id); + if stats_epoch_id == CryptoHash::default() { + stats_lock.current_epoch_height = 1; + } else { + stats_lock.current_epoch_height += 1; + } + } + } else { + // handle first indexing epoch + let epoch_info = epoch_indexer::get_epoch_info_by_id(current_epoch_id, rpc_client).await?; + stats_lock.current_epoch_id = Some(current_epoch_id); + stats_lock.current_epoch_height = epoch_info.epoch_height; } Ok(()) } @@ -103,37 +105,27 @@ async fn handle_block( #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(db_manager)))] async fn handle_epoch( - new_epoch_id: CryptoHash, - current_epoch_id: Option, - current_epoch_height: u64, + stats_current_epoch_id: Option, + stats_current_epoch_height: u64, + current_epoch_id: CryptoHash, + next_epoch_id: CryptoHash, rpc_client: &near_jsonrpc_client::JsonRpcClient, db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static), -) -> anyhow::Result<(CryptoHash, u64)> { - let epoch_info = match current_epoch_id { - Some(current_epoch_id) => { - if new_epoch_id != current_epoch_id { - Some(epoch_indexer::get_epoch_info_by_id(new_epoch_id, rpc_client).await?) - } else { - None - } - } - None => Some(epoch_indexer::get_epoch_info_by_id(new_epoch_id, rpc_client).await?), - }; - if let Some(epoch_info) = epoch_info { - let epoch_height = if let Some(current_epoch_id) = current_epoch_id { - if current_epoch_id == CryptoHash::default() { - 1 - } else { - current_epoch_height + 1 - } +) -> anyhow::Result<()> { + if let Some(stats_epoch_id) = stats_current_epoch_id { + if stats_epoch_id == current_epoch_id { + // If epoch didn't change, we don't need handle it + Ok(()) } else { - epoch_info.validators_info.epoch_height - }; - epoch_indexer::save_epoch_info(&epoch_info, db_manager, Some(epoch_height)).await?; - - Ok((new_epoch_id, epoch_height)) + // If epoch changed, we need to save epoch info and update epoch_end_height + let epoch_info = epoch_indexer::get_epoch_info_by_id(stats_epoch_id, rpc_client).await?; + epoch_indexer::save_epoch_info(&epoch_info, db_manager, Some(stats_current_epoch_height)).await?; + epoch_indexer::update_epoch_end_height(db_manager, Some(stats_epoch_id), next_epoch_id).await?; + Ok(()) + } } else { - Ok((new_epoch_id, current_epoch_height)) + // If stats_current_epoch_id is None, we don't need handle it + Ok(()) } } From 157e13be32a1c55ab9f9542c262642690927d14e Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 18 Jan 2024 22:40:56 +0200 Subject: [PATCH 3/4] improvement rpc-server to get validators and protocol config --- database/src/base/rpc_server.rs | 9 ++ database/src/postgres/models.rs | 12 +++ database/src/postgres/rpc_server.rs | 31 +++++++ database/src/scylladb/rpc_server.rs | 36 ++++++++ database/src/scylladb/state_indexer.rs | 9 ++ rpc-server/src/config.rs | 8 +- rpc-server/src/main.rs | 23 +++-- rpc-server/src/modules/blocks/methods.rs | 7 +- rpc-server/src/modules/blocks/mod.rs | 32 +++++++ rpc-server/src/modules/blocks/utils.rs | 7 +- rpc-server/src/modules/network/methods.rs | 107 +++++++++++++++------- rpc-server/src/modules/network/mod.rs | 40 ++++++++ rpc-server/src/utils.rs | 39 +++++++- 13 files changed, 302 insertions(+), 58 deletions(-) diff --git a/database/src/base/rpc_server.rs b/database/src/base/rpc_server.rs index c918404a..f441ffa2 100644 --- a/database/src/base/rpc_server.rs +++ b/database/src/base/rpc_server.rs @@ -90,11 +90,20 @@ pub trait ReaderDbManager { block_height: near_primitives::types::BlockHeight, shard_id: near_primitives::types::ShardId, ) -> anyhow::Result; + + /// Returns epoch validators info by the given epoch id async fn get_validators_by_epoch_id( &self, epoch_id: near_primitives::hash::CryptoHash, ) -> anyhow::Result; + /// Return epoch validators info by the given epoch end block height + async fn get_validators_by_end_block_height( + &self, + block_height: near_primitives::types::BlockHeight, + ) -> anyhow::Result; + + /// Return protocol config by the given epoch id async fn get_protocol_config_by_epoch_id( &self, epoch_id: near_primitives::hash::CryptoHash, diff --git a/database/src/postgres/models.rs b/database/src/postgres/models.rs index 33c43e48..aaaa0b08 100644 --- a/database/src/postgres/models.rs +++ b/database/src/postgres/models.rs @@ -709,6 +709,18 @@ impl Validators { Ok(response) } + pub async fn get_validators_epoch_end_height( + mut conn: crate::postgres::PgAsyncConn, + epoch_end_height: bigdecimal::BigDecimal, + ) -> anyhow::Result { + let response = validators::table + .filter(validators::epoch_end_height.eq(epoch_end_height)) + .select(Self::as_select()) + .first(&mut conn) + .await?; + + Ok(response) + } } #[derive(Insertable, Queryable, Selectable)] diff --git a/database/src/postgres/rpc_server.rs b/database/src/postgres/rpc_server.rs index 1e3b4321..5034c7a5 100644 --- a/database/src/postgres/rpc_server.rs +++ b/database/src/postgres/rpc_server.rs @@ -2,6 +2,7 @@ use crate::postgres::PostgresStorageManager; use crate::AdditionalDatabaseOptions; use bigdecimal::ToPrimitive; use borsh::{BorshDeserialize, BorshSerialize}; +use std::str::FromStr; pub struct PostgresDBManager { pg_pool: crate::postgres::PgAsyncPool, @@ -341,6 +342,36 @@ impl crate::ReaderDbManager for PostgresDBManager { }) } + async fn get_validators_by_end_block_height( + &self, + block_height: near_primitives::types::BlockHeight, + ) -> anyhow::Result { + let epoch = crate::models::Validators::get_validators_epoch_end_height( + Self::get_connection(&self.pg_pool).await?, + bigdecimal::BigDecimal::from(block_height), + ) + .await?; + let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch.epoch_id) + .map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?; + let epoch_height = epoch + .epoch_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?; + let epoch_start_height = epoch + .epoch_start_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_start_height` to u64"))?; + let (validators_info,) = serde_json::from_value::<( + near_indexer_primitives::views::EpochValidatorInfo, + )>(epoch.validators_info)?; + Ok(readnode_primitives::EpochValidatorsInfo { + epoch_id, + epoch_height, + epoch_start_height, + validators_info, + }) + } + async fn get_protocol_config_by_epoch_id( &self, epoch_id: near_indexer_primitives::CryptoHash, diff --git a/database/src/scylladb/rpc_server.rs b/database/src/scylladb/rpc_server.rs index 08f2b60f..49fe2c44 100644 --- a/database/src/scylladb/rpc_server.rs +++ b/database/src/scylladb/rpc_server.rs @@ -1,9 +1,11 @@ use crate::scylladb::ScyllaStorageManager; use borsh::{BorshDeserialize, BorshSerialize}; use futures::StreamExt; +use near_indexer_primitives::CryptoHash; use num_traits::ToPrimitive; use scylla::{prepared_statement::PreparedStatement, IntoTypedRows}; use std::convert::TryFrom; +use std::str::FromStr; pub struct ScyllaDBManager { scylla_session: std::sync::Arc, @@ -21,6 +23,7 @@ pub struct ScyllaDBManager { get_transaction_by_hash: PreparedStatement, get_stored_at_block_height_and_shard_id_by_block_height: PreparedStatement, get_validators_by_epoch_id: PreparedStatement, + get_validators_by_end_block_height: PreparedStatement, get_protocol_config_by_epoch_id: PreparedStatement, } @@ -121,6 +124,10 @@ impl ScyllaStorageManager for ScyllaDBManager { &scylla_db_session, "SELECT epoch_height, validators_info FROM state_indexer.validators WHERE epoch_id = ?", ).await?, + get_validators_by_end_block_height: Self::prepare_read_query( + &scylla_db_session, + "SELECT epoch_id, epoch_height, validators_info FROM state_indexer.validators WHERE epoch_end_height = ?", + ).await?, get_protocol_config_by_epoch_id: Self::prepare_read_query( &scylla_db_session, "SELECT protocol_config FROM state_indexer.protocol_configs WHERE epoch_id = ?", @@ -486,6 +493,35 @@ impl crate::ReaderDbManager for ScyllaDBManager { }) } + async fn get_validators_by_end_block_height( + &self, + block_height: near_primitives::types::BlockHeight, + ) -> anyhow::Result { + let (epoch_id, epoch_height, validators_info) = Self::execute_prepared_query( + &self.scylla_session, + &self.get_validators_by_end_block_height, + (num_bigint::BigInt::from(block_height),), + ) + .await? + .single_row()? + .into_typed::<(String, num_bigint::BigInt, String)>()?; + + let epoch_id = CryptoHash::from_str(&epoch_id) + .map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?; + + let validators_info: near_primitives::views::EpochValidatorInfo = + serde_json::from_str(&validators_info)?; + + Ok(readnode_primitives::EpochValidatorsInfo { + epoch_id, + epoch_height: epoch_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?, + epoch_start_height: validators_info.epoch_start_height, + validators_info, + }) + } + async fn get_protocol_config_by_epoch_id( &self, epoch_id: near_primitives::hash::CryptoHash, diff --git a/database/src/scylladb/state_indexer.rs b/database/src/scylladb/state_indexer.rs index 9cffaa82..ae3da910 100644 --- a/database/src/scylladb/state_indexer.rs +++ b/database/src/scylladb/state_indexer.rs @@ -173,6 +173,15 @@ impl ScyllaStorageManager for ScyllaDBManager { ) .await?; + scylla_db_session + .query( + " + CREATE INDEX IF NOT EXISTS validators_epoch_end_height ON validators (epoch_end_height); + ", + &[], + ) + .await?; + scylla_db_session .query( " diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 19ac5f10..86137c35 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::CacheBlock; +use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; use clap::Parser; #[derive(Parser)] @@ -154,7 +154,7 @@ pub struct ServerContext { pub genesis_config: near_chain_configs::GenesisConfig, pub blocks_cache: std::sync::Arc>>, - pub final_block_height: std::sync::Arc, + pub final_block_info: std::sync::Arc>, pub compiled_contract_code_cache: std::sync::Arc, pub contract_code_cache: std::sync::Arc< std::sync::RwLock>>, @@ -173,7 +173,7 @@ impl ServerContext { blocks_cache: std::sync::Arc< std::sync::RwLock>, >, - final_block_height: std::sync::Arc, + final_block_info: std::sync::Arc>, compiled_contract_code_cache: std::sync::Arc, contract_code_cache: std::sync::Arc< std::sync::RwLock< @@ -189,7 +189,7 @@ impl ServerContext { s3_bucket_name, genesis_config, blocks_cache, - final_block_height, + final_block_info, compiled_contract_code_cache, contract_code_cache, max_gas_burnt, diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 92de26d2..723acf2e 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -1,3 +1,4 @@ +use crate::modules::blocks::FinaleBlockInfo; use crate::utils::{ get_final_cache_block, gigabytes_to_bytes, update_final_block_height_regularly, }; @@ -103,12 +104,9 @@ async fn main() -> anyhow::Result<()> { block_cache_size_in_bytes, ))); - let final_block_height = - std::sync::Arc::new(std::sync::atomic::AtomicU64::new(final_block.block_height)); - blocks_cache - .write() - .unwrap() - .put(final_block.block_height, final_block); + let finale_block_info = std::sync::Arc::new(std::sync::RwLock::new( + FinaleBlockInfo::new(&near_rpc_client, &blocks_cache).await, + )); let compiled_contract_code_cache = std::sync::Arc::new(config::CompiledCodeCache { local_cache: std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new( @@ -151,19 +149,24 @@ async fn main() -> anyhow::Result<()> { s3_config, )), db_manager, - near_rpc_client, + near_rpc_client.clone(), opts.s3_bucket_name.clone(), genesis_config, std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&final_block_height), + std::sync::Arc::clone(&finale_block_info), compiled_contract_code_cache, contract_code_cache, opts.max_gas_burnt, ); tokio::spawn(async move { - update_final_block_height_regularly(final_block_height.clone(), blocks_cache, lake_config) - .await + update_final_block_height_regularly( + blocks_cache, + finale_block_info, + lake_config, + near_rpc_client, + ) + .await }); let rpc = Server::new() diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 5ce6f126..f2659853 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -328,8 +328,11 @@ pub async fn fetch_block( }, near_primitives::types::BlockReference::Finality(finality) => match finality { near_primitives::types::Finality::Final => Ok(data - .final_block_height - .load(std::sync::atomic::Ordering::SeqCst)), + .final_block_info + .read() + .unwrap() + .final_block_cache + .block_height), _ => Err( near_jsonrpc_primitives::types::blocks::RpcBlockError::InternalError { error_message: "Finality other than final is not supported".to_string(), diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index 010a167a..256aa71d 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -12,3 +12,35 @@ pub struct CacheBlock { pub state_root: near_primitives::hash::CryptoHash, pub epoch_id: near_primitives::hash::CryptoHash, } + +#[derive(Debug)] +pub struct FinaleBlockInfo { + pub final_block_cache: CacheBlock, + pub current_protocol_config: near_chain_configs::ProtocolConfigView, +} + +impl FinaleBlockInfo { + pub async fn new( + near_rpc_client: &crate::utils::JsonRpcClient, + blocks_cache: &std::sync::Arc< + std::sync::RwLock>, + >, + ) -> Self { + let final_block = crate::utils::get_final_cache_block(near_rpc_client) + .await + .expect("Error to get final block"); + let protocol_config = crate::utils::get_current_protocol_config(near_rpc_client) + .await + .expect("Error to get protocol_config"); + + blocks_cache + .write() + .unwrap() + .put(final_block.block_height, final_block); + + Self { + final_block_cache: final_block, + current_protocol_config: protocol_config, + } + } +} diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index 8786d7bd..d5109f8c 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -97,8 +97,11 @@ pub async fn fetch_block_from_cache_or_get( near_primitives::types::BlockReference::Finality(_) => { // Returns the final_block_height for all the finalities. Ok(data - .final_block_height - .load(std::sync::atomic::Ordering::SeqCst)) + .final_block_info + .read() + .unwrap() + .final_block_cache + .block_height) } // TODO: return the height of the first block height from S3 (cache it once on the start) near_primitives::types::BlockReference::SyncCheckpoint(_) => Err( diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index c99078cd..bee6a881 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -2,12 +2,11 @@ use crate::config::ServerContext; use crate::errors::RPCError; use crate::modules::blocks::utils::fetch_block_from_cache_or_get; use crate::modules::network::{ - friendly_memory_size_format, parse_validator_request, StatusResponse, + clone_protocol_config, friendly_memory_size_format, parse_validator_request, StatusResponse, }; #[cfg(feature = "shadow_data_consistency")] use crate::utils::shadow_compare_results; use jsonrpc_v2::{Data, Params}; -use near_primitives::types::EpochReference; use sysinfo::{System, SystemExt}; pub async fn status( @@ -48,8 +47,11 @@ pub async fn status( ), final_block_height: data - .final_block_height - .load(std::sync::atomic::Ordering::SeqCst), + .final_block_info + .read() + .unwrap() + .final_block_cache + .block_height, }; Ok(status) } @@ -71,6 +73,32 @@ pub async fn validators( .map_err(|err| RPCError::parse_error(&err.to_string()))?; tracing::debug!("`validators` called with parameters: {:?}", request); crate::metrics::VALIDATORS_REQUESTS_TOTAL.inc(); + // Latest epoch validators fetches from the Near RPC node + if let near_primitives::types::EpochReference::Latest = &request.epoch_reference { + crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); + let validator_info = data.near_rpc_client.call(request).await?; + return Ok( + near_jsonrpc_primitives::types::validator::RpcValidatorResponse { validator_info }, + ); + }; + + // Current epoch validators fetches from the Near RPC node + if let near_primitives::types::EpochReference::EpochId(epoch_id) = &request.epoch_reference { + if data + .final_block_info + .read() + .unwrap() + .final_block_cache + .epoch_id + == epoch_id.0 + { + let validator_info = data.near_rpc_client.call(request).await?; + return Ok( + near_jsonrpc_primitives::types::validator::RpcValidatorResponse { validator_info }, + ); + } + }; + let validator_info = validators_call(&data, &request).await; #[cfg(feature = "shadow_data_consistency")] @@ -181,37 +209,31 @@ async fn validators_call( near_primitives::views::EpochValidatorInfo, near_jsonrpc_primitives::types::validator::RpcValidatorError, > { - let epoch_id = match &validator_request.epoch_reference { - EpochReference::EpochId(epoch_id) => epoch_id.0, - EpochReference::BlockId(block_id) => { + let validators = match &validator_request.epoch_reference { + near_primitives::types::EpochReference::EpochId(epoch_id) => data + .db_manager + .get_validators_by_epoch_id(epoch_id.0) + .await + .map_err(|_err| { + near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch + })?, + near_primitives::types::EpochReference::BlockId(block_id) => { let block_reference = near_primitives::types::BlockReference::BlockId(block_id.clone()); let block = fetch_block_from_cache_or_get(data, block_reference) .await .map_err(|_err| { near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch })?; - block.epoch_id + data.db_manager + .get_validators_by_end_block_height(block.block_height) + .await.map_err(|_err| { + near_jsonrpc_primitives::types::validator::RpcValidatorError::ValidatorInfoUnavailable + })? } - EpochReference::Latest => { - crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); - let block_reference = near_primitives::types::BlockReference::Finality( - near_primitives::types::Finality::Final, - ); - let block = fetch_block_from_cache_or_get(data, block_reference) - .await - .map_err(|_err| { - near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch - })?; - block.epoch_id + _ => { + return Err(near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch) } }; - let validators = data - .db_manager - .get_validators_by_epoch_id(epoch_id) - .await - .map_err(|_err| { - near_jsonrpc_primitives::types::validator::RpcValidatorError::ValidatorInfoUnavailable - })?; Ok(validators.validators_info) } @@ -229,14 +251,29 @@ async fn protocol_config_call( error_message: err.to_string(), } })?; - let protocol_config = data - .db_manager - .get_protocol_config_by_epoch_id(block.epoch_id) - .await - .map_err(|err| { - near_jsonrpc_primitives::types::config::RpcProtocolConfigError::InternalError { - error_message: err.to_string(), - } - })?; + let protocol_config = if data + .final_block_info + .read() + .unwrap() + .final_block_cache + .epoch_id + == block.epoch_id + { + let protocol_config = &data + .final_block_info + .read() + .unwrap() + .current_protocol_config; + clone_protocol_config(protocol_config) + } else { + data.db_manager + .get_protocol_config_by_epoch_id(block.epoch_id) + .await + .map_err(|err| { + near_jsonrpc_primitives::types::config::RpcProtocolConfigError::InternalError { + error_message: err.to_string(), + } + })? + }; Ok(protocol_config) } diff --git a/rpc-server/src/modules/network/mod.rs b/rpc-server/src/modules/network/mod.rs index 438ab7f2..bb403bdc 100644 --- a/rpc-server/src/modules/network/mod.rs +++ b/rpc-server/src/modules/network/mod.rs @@ -56,3 +56,43 @@ pub fn friendly_memory_size_format(memory_size_bytes: usize) -> String { ) } } + +/// cannot move out of dereference of `std::sync::RwLockReadGuard` +/// move occurs because value `current_protocol_config` has type `ProtocolConfigView`, +/// which does not implement the `Copy` trait +pub fn clone_protocol_config( + protocol_config: &near_chain_configs::ProtocolConfigView, +) -> near_chain_configs::ProtocolConfigView { + near_chain_configs::ProtocolConfigView { + protocol_version: protocol_config.protocol_version, + genesis_time: protocol_config.genesis_time, + chain_id: protocol_config.chain_id.clone(), + genesis_height: protocol_config.genesis_height, + num_block_producer_seats: protocol_config.num_block_producer_seats, + num_block_producer_seats_per_shard: protocol_config + .num_block_producer_seats_per_shard + .clone(), + avg_hidden_validator_seats_per_shard: protocol_config + .avg_hidden_validator_seats_per_shard + .clone(), + dynamic_resharding: protocol_config.dynamic_resharding, + protocol_upgrade_stake_threshold: protocol_config.protocol_upgrade_stake_threshold, + epoch_length: protocol_config.epoch_length, + gas_limit: protocol_config.gas_limit, + min_gas_price: protocol_config.min_gas_price, + max_gas_price: protocol_config.max_gas_price, + block_producer_kickout_threshold: protocol_config.block_producer_kickout_threshold, + chunk_producer_kickout_threshold: protocol_config.chunk_producer_kickout_threshold, + online_min_threshold: protocol_config.online_min_threshold, + online_max_threshold: protocol_config.online_max_threshold, + gas_price_adjustment_rate: protocol_config.gas_price_adjustment_rate, + runtime_config: protocol_config.runtime_config.clone(), + transaction_validity_period: protocol_config.transaction_validity_period, + protocol_reward_rate: protocol_config.protocol_reward_rate, + max_inflation_rate: protocol_config.max_inflation_rate, + num_blocks_per_year: protocol_config.num_blocks_per_year, + protocol_treasury_account: protocol_config.protocol_treasury_account.clone(), + fishermen_threshold: protocol_config.fishermen_threshold, + minimum_stake_divisor: protocol_config.minimum_stake_divisor, + } +} diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index 0cafc191..8bb60c23 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::CacheBlock; +use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; #[cfg(feature = "shadow_data_consistency")] use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode}; use futures::StreamExt; @@ -114,10 +114,23 @@ pub async fn get_final_cache_block(near_rpc_client: &JsonRpcClient) -> Option anyhow::Result { + let params = + near_jsonrpc_client::methods::EXPERIMENTAL_protocol_config::RpcProtocolConfigRequest { + block_reference: near_primitives::types::BlockReference::Finality( + near_primitives::types::Finality::Final, + ), + }; + Ok(near_rpc_client.call(params).await?) +} + async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, blocks_cache: std::sync::Arc>>, - final_block_height: std::sync::Arc, + finale_block_info: std::sync::Arc>, + near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<()> { let block = CacheBlock { block_hash: streamer_message.block.header.hash, @@ -129,16 +142,31 @@ async fn handle_streamer_message( state_root: streamer_message.block.header.prev_state_root, epoch_id: streamer_message.block.header.epoch_id, }; - final_block_height.store(block.block_height, std::sync::atomic::Ordering::SeqCst); + + // let mut finale_block_info_lock = finale_block_info.write().await; + // let current_epoch_id = finale_block_info_lock.final_block_cache.epoch_id; + if finale_block_info.read().unwrap().final_block_cache.epoch_id + != streamer_message.block.header.epoch_id + { + tracing::info!( + "New epoch started: {:?}", + streamer_message.block.header.epoch_id + ); + // let protocol_config = ; + finale_block_info.write().unwrap().current_protocol_config = + get_current_protocol_config(near_rpc_client).await.unwrap(); + } + finale_block_info.write().unwrap().final_block_cache = block; blocks_cache.write().unwrap().put(block.block_height, block); crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?); Ok(()) } pub async fn update_final_block_height_regularly( - final_block_height: std::sync::Arc, blocks_cache: std::sync::Arc>>, + finale_block_info: std::sync::Arc>, lake_config: near_lake_framework::LakeConfig, + near_rpc_client: JsonRpcClient, ) -> anyhow::Result<()> { tracing::info!("Task to get and store final block in the cache started"); let (sender, stream) = near_lake_framework::streamer(lake_config); @@ -147,7 +175,8 @@ pub async fn update_final_block_height_regularly( handle_streamer_message( streamer_message, std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&final_block_height), + std::sync::Arc::clone(&finale_block_info), + &near_rpc_client, ) }) .buffer_unordered(1usize); From 44d0c488a414b72fad8ee82b2b5d0276f82814ce Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Fri, 19 Jan 2024 13:15:08 +0200 Subject: [PATCH 4/4] improvement according pr comments --- Cargo.lock | 1 + database/src/postgres/rpc_server.rs | 8 ++++--- database/src/scylladb/rpc_server.rs | 11 ++++----- perf-testing/src/chunks.rs | 6 +++-- readnode-primitives/src/lib.rs | 5 +++-- rpc-server/Cargo.toml | 1 + rpc-server/src/config.rs | 25 ++++++++++++--------- rpc-server/src/errors.rs | 4 +++- rpc-server/src/main.rs | 12 +++++----- rpc-server/src/modules/blocks/methods.rs | 2 +- rpc-server/src/modules/blocks/mod.rs | 8 +++---- rpc-server/src/modules/blocks/utils.rs | 11 +++------ rpc-server/src/modules/network/methods.rs | 22 ++++++------------ rpc-server/src/modules/network/mod.rs | 2 +- rpc-server/src/modules/queries/mod.rs | 3 ++- rpc-server/src/modules/queries/utils.rs | 8 ++++--- rpc-server/src/modules/state/utils.rs | 3 ++- rpc-server/src/utils.rs | 27 ++++++++++++----------- 18 files changed, 82 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c5aab71b..3242d60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4550,6 +4550,7 @@ dependencies = [ "dotenv", "erased-serde", "futures", + "futures-locks", "hex", "http", "jsonrpc-v2", diff --git a/database/src/postgres/rpc_server.rs b/database/src/postgres/rpc_server.rs index 5034c7a5..7c8d4429 100644 --- a/database/src/postgres/rpc_server.rs +++ b/database/src/postgres/rpc_server.rs @@ -1,8 +1,10 @@ -use crate::postgres::PostgresStorageManager; -use crate::AdditionalDatabaseOptions; +use std::str::FromStr; + use bigdecimal::ToPrimitive; use borsh::{BorshDeserialize, BorshSerialize}; -use std::str::FromStr; + +use crate::postgres::PostgresStorageManager; +use crate::AdditionalDatabaseOptions; pub struct PostgresDBManager { pg_pool: crate::postgres::PgAsyncPool, diff --git a/database/src/scylladb/rpc_server.rs b/database/src/scylladb/rpc_server.rs index 49fe2c44..f0672327 100644 --- a/database/src/scylladb/rpc_server.rs +++ b/database/src/scylladb/rpc_server.rs @@ -1,11 +1,12 @@ -use crate::scylladb::ScyllaStorageManager; +use std::convert::TryFrom; +use std::str::FromStr; + use borsh::{BorshDeserialize, BorshSerialize}; use futures::StreamExt; -use near_indexer_primitives::CryptoHash; use num_traits::ToPrimitive; use scylla::{prepared_statement::PreparedStatement, IntoTypedRows}; -use std::convert::TryFrom; -use std::str::FromStr; + +use crate::scylladb::ScyllaStorageManager; pub struct ScyllaDBManager { scylla_session: std::sync::Arc, @@ -506,7 +507,7 @@ impl crate::ReaderDbManager for ScyllaDBManager { .single_row()? .into_typed::<(String, num_bigint::BigInt, String)>()?; - let epoch_id = CryptoHash::from_str(&epoch_id) + let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch_id) .map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?; let validators_info: near_primitives::views::EpochValidatorInfo = diff --git a/perf-testing/src/chunks.rs b/perf-testing/src/chunks.rs index ef9a419a..723817b9 100644 --- a/perf-testing/src/chunks.rs +++ b/perf-testing/src/chunks.rs @@ -1,9 +1,11 @@ -use crate::{TestResult, TxInfo}; +use std::time::{Duration, Instant}; + use futures::future::join_all; use near_jsonrpc_client::{methods, JsonRpcClient}; use near_jsonrpc_primitives::types::chunks::ChunkReference; use near_primitives::types::{BlockHeight, BlockId}; -use std::time::{Duration, Instant}; + +use crate::{TestResult, TxInfo}; // While testing chunks, it's important to collect accounts and transactions for further tests async fn get_random_chunk( diff --git a/readnode-primitives/src/lib.rs b/readnode-primitives/src/lib.rs index 69a47bf3..d24d1921 100644 --- a/readnode-primitives/src/lib.rs +++ b/readnode-primitives/src/lib.rs @@ -1,9 +1,10 @@ +use std::convert::TryFrom; +use std::str::FromStr; + use borsh::{BorshDeserialize, BorshSerialize}; use near_indexer_primitives::{views, CryptoHash, IndexerTransactionWithOutcome}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; -use std::convert::TryFrom; -use std::str::FromStr; #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)] pub struct TransactionKey { diff --git a/rpc-server/Cargo.toml b/rpc-server/Cargo.toml index b8450e81..574ec5d6 100644 --- a/rpc-server/Cargo.toml +++ b/rpc-server/Cargo.toml @@ -16,6 +16,7 @@ clap = { version = "3.2.22", features = ["color", "derive", "env"] } dotenv = "0.15.0" erased-serde = "0.3.23" futures = "0.3.24" +futures-locks = "0.7.1" hex = "0.4.3" http = "0.2.8" jsonrpc-v2 = { git = "https://github.com/kobayurii/jsonrpc-v2", rev = "95e7b1d2567ae841163af212a3f25abb6862becb" } diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 86137c35..f379f9bb 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -1,5 +1,6 @@ -use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; +use crate::modules::blocks::{CacheBlock, FinalBlockInfo}; use clap::Parser; +use futures::executor::block_on; #[derive(Parser)] #[clap(author, version, about, long_about = None)] @@ -153,11 +154,13 @@ pub struct ServerContext { pub s3_bucket_name: String, pub genesis_config: near_chain_configs::GenesisConfig, pub blocks_cache: - std::sync::Arc>>, - pub final_block_info: std::sync::Arc>, + std::sync::Arc>>, + pub final_block_info: std::sync::Arc>, pub compiled_contract_code_cache: std::sync::Arc, pub contract_code_cache: std::sync::Arc< - std::sync::RwLock>>, + futures_locks::RwLock< + crate::cache::LruMemoryCache>, + >, >, pub max_gas_burnt: near_primitives::types::Gas, } @@ -171,12 +174,12 @@ impl ServerContext { s3_bucket_name: String, genesis_config: near_chain_configs::GenesisConfig, blocks_cache: std::sync::Arc< - std::sync::RwLock>, + futures_locks::RwLock>, >, - final_block_info: std::sync::Arc>, + final_block_info: std::sync::Arc>, compiled_contract_code_cache: std::sync::Arc, contract_code_cache: std::sync::Arc< - std::sync::RwLock< + futures_locks::RwLock< crate::cache::LruMemoryCache>, >, >, @@ -198,7 +201,7 @@ impl ServerContext { } pub struct CompiledCodeCache { pub local_cache: std::sync::Arc< - std::sync::RwLock< + futures_locks::RwLock< crate::cache::LruMemoryCache< near_primitives::hash::CryptoHash, near_vm_runner::logic::CompiledContract, @@ -213,7 +216,7 @@ impl near_vm_runner::logic::CompiledContractCache for CompiledCodeCache { key: &near_primitives::hash::CryptoHash, value: near_vm_runner::logic::CompiledContract, ) -> std::io::Result<()> { - self.local_cache.write().unwrap().put(*key, value); + block_on(self.local_cache.write()).put(*key, value); Ok(()) } @@ -221,10 +224,10 @@ impl near_vm_runner::logic::CompiledContractCache for CompiledCodeCache { &self, key: &near_primitives::hash::CryptoHash, ) -> std::io::Result> { - Ok(self.local_cache.write().unwrap().get(key).cloned()) + Ok(block_on(self.local_cache.write()).get(key).cloned()) } fn has(&self, key: &near_primitives::hash::CryptoHash) -> std::io::Result { - Ok(self.local_cache.write().unwrap().contains(key)) + Ok(block_on(self.local_cache.write()).contains(key)) } } diff --git a/rpc-server/src/errors.rs b/rpc-server/src/errors.rs index bdfedc16..e0922b4f 100644 --- a/rpc-server/src/errors.rs +++ b/rpc-server/src/errors.rs @@ -1,5 +1,7 @@ -use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError}; use std::ops::{Deref, DerefMut}; + +use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError}; + type BoxedSerialize = Box; #[derive(Debug, serde::Serialize)] diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 723acf2e..0ec0f829 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::FinaleBlockInfo; +use crate::modules::blocks::FinalBlockInfo; use crate::utils::{ get_final_cache_block, gigabytes_to_bytes, update_final_block_height_regularly, }; @@ -100,20 +100,20 @@ async fn main() -> anyhow::Result<()> { ) .await; - let blocks_cache = std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new( + let blocks_cache = std::sync::Arc::new(futures_locks::RwLock::new(cache::LruMemoryCache::new( block_cache_size_in_bytes, ))); - let finale_block_info = std::sync::Arc::new(std::sync::RwLock::new( - FinaleBlockInfo::new(&near_rpc_client, &blocks_cache).await, + let finale_block_info = std::sync::Arc::new(futures_locks::RwLock::new( + FinalBlockInfo::new(&near_rpc_client, &blocks_cache).await, )); let compiled_contract_code_cache = std::sync::Arc::new(config::CompiledCodeCache { - local_cache: std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new( + local_cache: std::sync::Arc::new(futures_locks::RwLock::new(cache::LruMemoryCache::new( contract_code_cache_size, ))), }); - let contract_code_cache = std::sync::Arc::new(std::sync::RwLock::new( + let contract_code_cache = std::sync::Arc::new(futures_locks::RwLock::new( cache::LruMemoryCache::new(contract_code_cache_size), )); diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index f2659853..d188ce66 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -330,7 +330,7 @@ pub async fn fetch_block( near_primitives::types::Finality::Final => Ok(data .final_block_info .read() - .unwrap() + .await .final_block_cache .block_height), _ => Err( diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index 256aa71d..9e13b820 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -14,16 +14,16 @@ pub struct CacheBlock { } #[derive(Debug)] -pub struct FinaleBlockInfo { +pub struct FinalBlockInfo { pub final_block_cache: CacheBlock, pub current_protocol_config: near_chain_configs::ProtocolConfigView, } -impl FinaleBlockInfo { +impl FinalBlockInfo { pub async fn new( near_rpc_client: &crate::utils::JsonRpcClient, blocks_cache: &std::sync::Arc< - std::sync::RwLock>, + futures_locks::RwLock>, >, ) -> Self { let final_block = crate::utils::get_final_cache_block(near_rpc_client) @@ -35,7 +35,7 @@ impl FinaleBlockInfo { blocks_cache .write() - .unwrap() + .await .put(final_block.block_height, final_block); Self { diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index d5109f8c..b23ab2ec 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -99,7 +99,7 @@ pub async fn fetch_block_from_cache_or_get( Ok(data .final_block_info .read() - .unwrap() + .await .final_block_cache .block_height) } @@ -110,12 +110,7 @@ pub async fn fetch_block_from_cache_or_get( }, ), }; - let block = data - .blocks_cache - .write() - .unwrap() - .get(&block_height?) - .cloned(); + let block = data.blocks_cache.write().await.get(&block_height?).cloned(); match block { Some(block) => Ok(block), None => { @@ -133,7 +128,7 @@ pub async fn fetch_block_from_cache_or_get( data.blocks_cache .write() - .unwrap() + .await .put(block_from_s3.block_view.header.height, block); Ok(block) } diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index bee6a881..0cfb2812 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -16,13 +16,9 @@ pub async fn status( let sys = System::new_all(); let total_memory = sys.total_memory(); let used_memory = sys.used_memory(); - let blocks_cache = data.blocks_cache.read().unwrap(); - let contract_code_cache = data.contract_code_cache.read().unwrap(); - let compiled_contract_code_cache = data - .compiled_contract_code_cache - .local_cache - .read() - .unwrap(); + let blocks_cache = data.blocks_cache.read().await; + let contract_code_cache = data.contract_code_cache.read().await; + let compiled_contract_code_cache = data.compiled_contract_code_cache.local_cache.read().await; let status = StatusResponse { total_memory: friendly_memory_size_format(total_memory as usize), used_memory: friendly_memory_size_format(used_memory as usize), @@ -49,7 +45,7 @@ pub async fn status( final_block_height: data .final_block_info .read() - .unwrap() + .await .final_block_cache .block_height, }; @@ -87,7 +83,7 @@ pub async fn validators( if data .final_block_info .read() - .unwrap() + .await .final_block_cache .epoch_id == epoch_id.0 @@ -254,16 +250,12 @@ async fn protocol_config_call( let protocol_config = if data .final_block_info .read() - .unwrap() + .await .final_block_cache .epoch_id == block.epoch_id { - let protocol_config = &data - .final_block_info - .read() - .unwrap() - .current_protocol_config; + let protocol_config = &data.final_block_info.read().await.current_protocol_config; clone_protocol_config(protocol_config) } else { data.db_manager diff --git a/rpc-server/src/modules/network/mod.rs b/rpc-server/src/modules/network/mod.rs index bb403bdc..908df986 100644 --- a/rpc-server/src/modules/network/mod.rs +++ b/rpc-server/src/modules/network/mod.rs @@ -57,7 +57,7 @@ pub fn friendly_memory_size_format(memory_size_bytes: usize) -> String { } } -/// cannot move out of dereference of `std::sync::RwLockReadGuard` +/// cannot move out of dereference of `futures_locks::RwLockReadGuard` /// move occurs because value `current_protocol_config` has type `ProtocolConfigView`, /// which does not implement the `Copy` trait pub fn clone_protocol_config( diff --git a/rpc-server/src/modules/queries/mod.rs b/rpc-server/src/modules/queries/mod.rs index f371cc6a..3b192147 100644 --- a/rpc-server/src/modules/queries/mod.rs +++ b/rpc-server/src/modules/queries/mod.rs @@ -1,6 +1,7 @@ +use std::collections::HashMap; + use database::ReaderDbManager; use futures::executor::block_on; -use std::collections::HashMap; pub mod methods; pub mod utils; diff --git a/rpc-server/src/modules/queries/utils.rs b/rpc-server/src/modules/queries/utils.rs index 73d9d9bd..f9c6a970 100644 --- a/rpc-server/src/modules/queries/utils.rs +++ b/rpc-server/src/modules/queries/utils.rs @@ -224,7 +224,9 @@ pub async fn run_contract( db_manager: std::sync::Arc>, compiled_contract_code_cache: &std::sync::Arc, contract_code_cache: &std::sync::Arc< - std::sync::RwLock>>, + futures_locks::RwLock< + crate::cache::LruMemoryCache>, + >, >, block: crate::modules::blocks::CacheBlock, max_gas_burnt: near_primitives::types::Gas, @@ -238,7 +240,7 @@ pub async fn run_contract( let code: Option> = contract_code_cache .write() - .unwrap() + .await .get(&contract.data.code_hash()) .cloned(); @@ -255,7 +257,7 @@ pub async fn run_contract( })?; contract_code_cache .write() - .unwrap() + .await .put(contract.data.code_hash(), code.data.clone()); near_primitives::contract::ContractCode::new(code.data, Some(contract.data.code_hash())) } diff --git a/rpc-server/src/modules/state/utils.rs b/rpc-server/src/modules/state/utils.rs index 801a8309..5ae28ed2 100644 --- a/rpc-server/src/modules/state/utils.rs +++ b/rpc-server/src/modules/state/utils.rs @@ -1,6 +1,7 @@ -use futures::StreamExt; use std::collections::HashMap; +use futures::StreamExt; + #[cfg_attr( feature = "tracing-instrumentation", tracing::instrument(skip(db_manager)) diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index 8bb60c23..41663557 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; +use crate::modules::blocks::{CacheBlock, FinalBlockInfo}; #[cfg(feature = "shadow_data_consistency")] use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode}; use futures::StreamExt; @@ -128,8 +128,10 @@ pub async fn get_current_protocol_config( async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, - blocks_cache: std::sync::Arc>>, - finale_block_info: std::sync::Arc>, + blocks_cache: std::sync::Arc< + futures_locks::RwLock>, + >, + finale_block_info: std::sync::Arc>, near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<()> { let block = CacheBlock { @@ -143,28 +145,27 @@ async fn handle_streamer_message( epoch_id: streamer_message.block.header.epoch_id, }; - // let mut finale_block_info_lock = finale_block_info.write().await; - // let current_epoch_id = finale_block_info_lock.final_block_cache.epoch_id; - if finale_block_info.read().unwrap().final_block_cache.epoch_id + if finale_block_info.read().await.final_block_cache.epoch_id != streamer_message.block.header.epoch_id { tracing::info!( "New epoch started: {:?}", streamer_message.block.header.epoch_id ); - // let protocol_config = ; - finale_block_info.write().unwrap().current_protocol_config = - get_current_protocol_config(near_rpc_client).await.unwrap(); + finale_block_info.write().await.current_protocol_config = + get_current_protocol_config(near_rpc_client).await?; } - finale_block_info.write().unwrap().final_block_cache = block; - blocks_cache.write().unwrap().put(block.block_height, block); + finale_block_info.write().await.final_block_cache = block; + blocks_cache.write().await.put(block.block_height, block); crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?); Ok(()) } pub async fn update_final_block_height_regularly( - blocks_cache: std::sync::Arc>>, - finale_block_info: std::sync::Arc>, + blocks_cache: std::sync::Arc< + futures_locks::RwLock>, + >, + finale_block_info: std::sync::Arc>, lake_config: near_lake_framework::LakeConfig, near_rpc_client: JsonRpcClient, ) -> anyhow::Result<()> {