Skip to content

Commit

Permalink
improvement rpc-server to get validators and protocol config
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Jan 18, 2024
1 parent 5c3c004 commit 5752274
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 69 deletions.
9 changes: 9 additions & 0 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,20 @@ pub trait ReaderDbManager {
block_height: near_primitives::types::BlockHeight,
shard_id: near_primitives::types::ShardId,
) -> anyhow::Result<readnode_primitives::BlockHeightShardId>;

/// Returns epoch validators info by the given epoch id
async fn get_validators_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo>;

/// Return epoch validators info by the given epoch end block height
async fn get_validators_by_end_block_height(
&self,
block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo>;

/// Return protocol config by the given epoch id
async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
Expand Down
12 changes: 12 additions & 0 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,18 @@ impl Validators {

Ok(response)
}
pub async fn get_validators_epoch_end_height(
mut conn: crate::postgres::PgAsyncConn,
epoch_end_height: bigdecimal::BigDecimal,
) -> anyhow::Result<Self> {
let response = validators::table
.filter(validators::epoch_end_height.eq(epoch_end_height))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response)
}
}

#[derive(Insertable, Queryable, Selectable)]
Expand Down
31 changes: 31 additions & 0 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::postgres::PostgresStorageManager;
use crate::AdditionalDatabaseOptions;
use bigdecimal::ToPrimitive;
use borsh::{BorshDeserialize, BorshSerialize};
use std::str::FromStr;

pub struct PostgresDBManager {
pg_pool: crate::postgres::PgAsyncPool,
Expand Down Expand Up @@ -341,6 +342,36 @@ impl crate::ReaderDbManager for PostgresDBManager {
})
}

async fn get_validators_by_end_block_height(
&self,
block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo> {
let epoch = crate::models::Validators::get_validators_epoch_end_height(
Self::get_connection(&self.pg_pool).await?,
bigdecimal::BigDecimal::from(block_height),
)
.await?;
let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch.epoch_id)
.map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?;
let epoch_height = epoch
.epoch_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?;
let epoch_start_height = epoch
.epoch_start_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_start_height` to u64"))?;
let (validators_info,) = serde_json::from_value::<(
near_indexer_primitives::views::EpochValidatorInfo,
)>(epoch.validators_info)?;
Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height,
epoch_start_height,
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
Expand Down
36 changes: 36 additions & 0 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::scylladb::ScyllaStorageManager;
use borsh::{BorshDeserialize, BorshSerialize};
use futures::StreamExt;
use near_indexer_primitives::CryptoHash;
use num_traits::ToPrimitive;
use scylla::{prepared_statement::PreparedStatement, IntoTypedRows};
use std::convert::TryFrom;
use std::str::FromStr;

pub struct ScyllaDBManager {
scylla_session: std::sync::Arc<scylla::Session>,
Expand All @@ -21,6 +23,7 @@ pub struct ScyllaDBManager {
get_transaction_by_hash: PreparedStatement,
get_stored_at_block_height_and_shard_id_by_block_height: PreparedStatement,
get_validators_by_epoch_id: PreparedStatement,
get_validators_by_end_block_height: PreparedStatement,
get_protocol_config_by_epoch_id: PreparedStatement,
}

Expand Down Expand Up @@ -121,6 +124,10 @@ impl ScyllaStorageManager for ScyllaDBManager {
&scylla_db_session,
"SELECT epoch_height, validators_info FROM state_indexer.validators WHERE epoch_id = ?",
).await?,
get_validators_by_end_block_height: Self::prepare_read_query(
&scylla_db_session,
"SELECT epoch_id, epoch_height, validators_info FROM state_indexer.validators WHERE epoch_end_height = ?",
).await?,
get_protocol_config_by_epoch_id: Self::prepare_read_query(
&scylla_db_session,
"SELECT protocol_config FROM state_indexer.protocol_configs WHERE epoch_id = ?",
Expand Down Expand Up @@ -486,6 +493,35 @@ impl crate::ReaderDbManager for ScyllaDBManager {
})
}

async fn get_validators_by_end_block_height(
&self,
block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo> {
let (epoch_id, epoch_height, validators_info) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_validators_by_end_block_height,
(num_bigint::BigInt::from(block_height),),
)
.await?
.single_row()?
.into_typed::<(String, num_bigint::BigInt, String)>()?;

let epoch_id = CryptoHash::from_str(&epoch_id)
.map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?;

let validators_info: near_primitives::views::EpochValidatorInfo =
serde_json::from_str(&validators_info)?;

Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height: epoch_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?,
epoch_start_height: validators_info.epoch_start_height,
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
Expand Down
9 changes: 9 additions & 0 deletions database/src/scylladb/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ impl ScyllaStorageManager for ScyllaDBManager {
)
.await?;

scylla_db_session
.query(
"
CREATE INDEX IF NOT EXISTS validators_epoch_end_height ON validators (epoch_end_height);
",
&[],
)
.await?;

scylla_db_session
.query(
"
Expand Down
8 changes: 4 additions & 4 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::modules::blocks::CacheBlock;
use crate::modules::blocks::{CacheBlock, FinaleBlockInfo};
use clap::Parser;

#[derive(Parser)]
Expand Down Expand Up @@ -154,7 +154,7 @@ pub struct ServerContext {
pub genesis_config: near_chain_configs::GenesisConfig,
pub blocks_cache:
std::sync::Arc<std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>>,
pub final_block_height: std::sync::Arc<std::sync::atomic::AtomicU64>,
pub final_block_info: std::sync::Arc<std::sync::RwLock<FinaleBlockInfo>>,
pub compiled_contract_code_cache: std::sync::Arc<CompiledCodeCache>,
pub contract_code_cache: std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<near_primitives::hash::CryptoHash, Vec<u8>>>,
Expand All @@ -173,7 +173,7 @@ impl ServerContext {
blocks_cache: std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
final_block_height: std::sync::Arc<std::sync::atomic::AtomicU64>,
final_block_info: std::sync::Arc<std::sync::RwLock<FinaleBlockInfo>>,
compiled_contract_code_cache: std::sync::Arc<CompiledCodeCache>,
contract_code_cache: std::sync::Arc<
std::sync::RwLock<
Expand All @@ -189,7 +189,7 @@ impl ServerContext {
s3_bucket_name,
genesis_config,
blocks_cache,
final_block_height,
final_block_info,
compiled_contract_code_cache,
contract_code_cache,
max_gas_burnt,
Expand Down
23 changes: 13 additions & 10 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::modules::blocks::FinaleBlockInfo;
use crate::utils::{
get_final_cache_block, gigabytes_to_bytes, update_final_block_height_regularly,
};
Expand Down Expand Up @@ -103,12 +104,9 @@ async fn main() -> anyhow::Result<()> {
block_cache_size_in_bytes,
)));

let final_block_height =
std::sync::Arc::new(std::sync::atomic::AtomicU64::new(final_block.block_height));
blocks_cache
.write()
.unwrap()
.put(final_block.block_height, final_block);
let finale_block_info = std::sync::Arc::new(std::sync::RwLock::new(
FinaleBlockInfo::new(&near_rpc_client, &blocks_cache).await,
));

let compiled_contract_code_cache = std::sync::Arc::new(config::CompiledCodeCache {
local_cache: std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new(
Expand Down Expand Up @@ -151,19 +149,24 @@ async fn main() -> anyhow::Result<()> {
s3_config,
)),
db_manager,
near_rpc_client,
near_rpc_client.clone(),
opts.s3_bucket_name.clone(),
genesis_config,
std::sync::Arc::clone(&blocks_cache),
std::sync::Arc::clone(&final_block_height),
std::sync::Arc::clone(&finale_block_info),
compiled_contract_code_cache,
contract_code_cache,
opts.max_gas_burnt,
);

tokio::spawn(async move {
update_final_block_height_regularly(final_block_height.clone(), blocks_cache, lake_config)
.await
update_final_block_height_regularly(
blocks_cache,
finale_block_info,
lake_config,
near_rpc_client,
)
.await
});

let rpc = Server::new()
Expand Down
7 changes: 5 additions & 2 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,11 @@ pub async fn fetch_block(
},
near_primitives::types::BlockReference::Finality(finality) => match finality {
near_primitives::types::Finality::Final => Ok(data
.final_block_height
.load(std::sync::atomic::Ordering::SeqCst)),
.final_block_info
.read()
.unwrap()
.final_block_cache
.block_height),
_ => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::InternalError {
error_message: "Finality other than final is not supported".to_string(),
Expand Down
32 changes: 32 additions & 0 deletions rpc-server/src/modules/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,35 @@ pub struct CacheBlock {
pub state_root: near_primitives::hash::CryptoHash,
pub epoch_id: near_primitives::hash::CryptoHash,
}

#[derive(Debug)]
pub struct FinaleBlockInfo {
pub final_block_cache: CacheBlock,
pub current_protocol_config: near_chain_configs::ProtocolConfigView,
}

impl FinaleBlockInfo {
pub async fn new(
near_rpc_client: &crate::utils::JsonRpcClient,
blocks_cache: &std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
) -> Self {
let final_block = crate::utils::get_final_cache_block(near_rpc_client)
.await
.expect("Error to get final block");
let protocol_config = crate::utils::get_current_protocol_config(near_rpc_client)
.await
.expect("Error to get protocol_config");

blocks_cache
.write()
.unwrap()
.put(final_block.block_height, final_block);

Self {
final_block_cache: final_block,
current_protocol_config: protocol_config,
}
}
}
7 changes: 5 additions & 2 deletions rpc-server/src/modules/blocks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ pub async fn fetch_block_from_cache_or_get(
near_primitives::types::BlockReference::Finality(_) => {
// Returns the final_block_height for all the finalities.
Ok(data
.final_block_height
.load(std::sync::atomic::Ordering::SeqCst))
.final_block_info
.read()
.unwrap()
.final_block_cache
.block_height)
}
// TODO: return the height of the first block height from S3 (cache it once on the start)
near_primitives::types::BlockReference::SyncCheckpoint(_) => Err(
Expand Down
Loading

0 comments on commit 5752274

Please sign in to comment.