From 5752274ccdb63c8541dd3841570eaa4cb8f92d09 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 18 Jan 2024 22:40:56 +0200 Subject: [PATCH] improvement rpc-server to get validators and protocol config --- database/src/base/rpc_server.rs | 9 ++ database/src/postgres/models.rs | 12 ++ database/src/postgres/rpc_server.rs | 31 ++++++ database/src/scylladb/rpc_server.rs | 36 ++++++ database/src/scylladb/state_indexer.rs | 9 ++ rpc-server/src/config.rs | 8 +- rpc-server/src/main.rs | 23 ++-- rpc-server/src/modules/blocks/methods.rs | 7 +- rpc-server/src/modules/blocks/mod.rs | 32 ++++++ rpc-server/src/modules/blocks/utils.rs | 7 +- rpc-server/src/modules/network/methods.rs | 130 ++++++++++++++-------- rpc-server/src/modules/network/mod.rs | 40 +++++++ rpc-server/src/utils.rs | 39 ++++++- 13 files changed, 314 insertions(+), 69 deletions(-) diff --git a/database/src/base/rpc_server.rs b/database/src/base/rpc_server.rs index c918404a..f441ffa2 100644 --- a/database/src/base/rpc_server.rs +++ b/database/src/base/rpc_server.rs @@ -90,11 +90,20 @@ pub trait ReaderDbManager { block_height: near_primitives::types::BlockHeight, shard_id: near_primitives::types::ShardId, ) -> anyhow::Result; + + /// Returns epoch validators info by the given epoch id async fn get_validators_by_epoch_id( &self, epoch_id: near_primitives::hash::CryptoHash, ) -> anyhow::Result; + /// Return epoch validators info by the given epoch end block height + async fn get_validators_by_end_block_height( + &self, + block_height: near_primitives::types::BlockHeight, + ) -> anyhow::Result; + + /// Return protocol config by the given epoch id async fn get_protocol_config_by_epoch_id( &self, epoch_id: near_primitives::hash::CryptoHash, diff --git a/database/src/postgres/models.rs b/database/src/postgres/models.rs index 33c43e48..aaaa0b08 100644 --- a/database/src/postgres/models.rs +++ b/database/src/postgres/models.rs @@ -709,6 +709,18 @@ impl Validators { Ok(response) } + pub async fn get_validators_epoch_end_height( + mut conn: crate::postgres::PgAsyncConn, + epoch_end_height: bigdecimal::BigDecimal, + ) -> anyhow::Result { + let response = validators::table + .filter(validators::epoch_end_height.eq(epoch_end_height)) + .select(Self::as_select()) + .first(&mut conn) + .await?; + + Ok(response) + } } #[derive(Insertable, Queryable, Selectable)] diff --git a/database/src/postgres/rpc_server.rs b/database/src/postgres/rpc_server.rs index 1e3b4321..5034c7a5 100644 --- a/database/src/postgres/rpc_server.rs +++ b/database/src/postgres/rpc_server.rs @@ -2,6 +2,7 @@ use crate::postgres::PostgresStorageManager; use crate::AdditionalDatabaseOptions; use bigdecimal::ToPrimitive; use borsh::{BorshDeserialize, BorshSerialize}; +use std::str::FromStr; pub struct PostgresDBManager { pg_pool: crate::postgres::PgAsyncPool, @@ -341,6 +342,36 @@ impl crate::ReaderDbManager for PostgresDBManager { }) } + async fn get_validators_by_end_block_height( + &self, + block_height: near_primitives::types::BlockHeight, + ) -> anyhow::Result { + let epoch = crate::models::Validators::get_validators_epoch_end_height( + Self::get_connection(&self.pg_pool).await?, + bigdecimal::BigDecimal::from(block_height), + ) + .await?; + let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch.epoch_id) + .map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?; + let epoch_height = epoch + .epoch_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?; + let epoch_start_height = epoch + .epoch_start_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_start_height` to u64"))?; + let (validators_info,) = serde_json::from_value::<( + near_indexer_primitives::views::EpochValidatorInfo, + )>(epoch.validators_info)?; + Ok(readnode_primitives::EpochValidatorsInfo { + epoch_id, + epoch_height, + epoch_start_height, + validators_info, + }) + } + async fn get_protocol_config_by_epoch_id( &self, epoch_id: near_indexer_primitives::CryptoHash, diff --git a/database/src/scylladb/rpc_server.rs b/database/src/scylladb/rpc_server.rs index 08f2b60f..49fe2c44 100644 --- a/database/src/scylladb/rpc_server.rs +++ b/database/src/scylladb/rpc_server.rs @@ -1,9 +1,11 @@ use crate::scylladb::ScyllaStorageManager; use borsh::{BorshDeserialize, BorshSerialize}; use futures::StreamExt; +use near_indexer_primitives::CryptoHash; use num_traits::ToPrimitive; use scylla::{prepared_statement::PreparedStatement, IntoTypedRows}; use std::convert::TryFrom; +use std::str::FromStr; pub struct ScyllaDBManager { scylla_session: std::sync::Arc, @@ -21,6 +23,7 @@ pub struct ScyllaDBManager { get_transaction_by_hash: PreparedStatement, get_stored_at_block_height_and_shard_id_by_block_height: PreparedStatement, get_validators_by_epoch_id: PreparedStatement, + get_validators_by_end_block_height: PreparedStatement, get_protocol_config_by_epoch_id: PreparedStatement, } @@ -121,6 +124,10 @@ impl ScyllaStorageManager for ScyllaDBManager { &scylla_db_session, "SELECT epoch_height, validators_info FROM state_indexer.validators WHERE epoch_id = ?", ).await?, + get_validators_by_end_block_height: Self::prepare_read_query( + &scylla_db_session, + "SELECT epoch_id, epoch_height, validators_info FROM state_indexer.validators WHERE epoch_end_height = ?", + ).await?, get_protocol_config_by_epoch_id: Self::prepare_read_query( &scylla_db_session, "SELECT protocol_config FROM state_indexer.protocol_configs WHERE epoch_id = ?", @@ -486,6 +493,35 @@ impl crate::ReaderDbManager for ScyllaDBManager { }) } + async fn get_validators_by_end_block_height( + &self, + block_height: near_primitives::types::BlockHeight, + ) -> anyhow::Result { + let (epoch_id, epoch_height, validators_info) = Self::execute_prepared_query( + &self.scylla_session, + &self.get_validators_by_end_block_height, + (num_bigint::BigInt::from(block_height),), + ) + .await? + .single_row()? + .into_typed::<(String, num_bigint::BigInt, String)>()?; + + let epoch_id = CryptoHash::from_str(&epoch_id) + .map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?; + + let validators_info: near_primitives::views::EpochValidatorInfo = + serde_json::from_str(&validators_info)?; + + Ok(readnode_primitives::EpochValidatorsInfo { + epoch_id, + epoch_height: epoch_height + .to_u64() + .ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?, + epoch_start_height: validators_info.epoch_start_height, + validators_info, + }) + } + async fn get_protocol_config_by_epoch_id( &self, epoch_id: near_primitives::hash::CryptoHash, diff --git a/database/src/scylladb/state_indexer.rs b/database/src/scylladb/state_indexer.rs index 9cffaa82..ae3da910 100644 --- a/database/src/scylladb/state_indexer.rs +++ b/database/src/scylladb/state_indexer.rs @@ -173,6 +173,15 @@ impl ScyllaStorageManager for ScyllaDBManager { ) .await?; + scylla_db_session + .query( + " + CREATE INDEX IF NOT EXISTS validators_epoch_end_height ON validators (epoch_end_height); + ", + &[], + ) + .await?; + scylla_db_session .query( " diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 19ac5f10..86137c35 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::CacheBlock; +use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; use clap::Parser; #[derive(Parser)] @@ -154,7 +154,7 @@ pub struct ServerContext { pub genesis_config: near_chain_configs::GenesisConfig, pub blocks_cache: std::sync::Arc>>, - pub final_block_height: std::sync::Arc, + pub final_block_info: std::sync::Arc>, pub compiled_contract_code_cache: std::sync::Arc, pub contract_code_cache: std::sync::Arc< std::sync::RwLock>>, @@ -173,7 +173,7 @@ impl ServerContext { blocks_cache: std::sync::Arc< std::sync::RwLock>, >, - final_block_height: std::sync::Arc, + final_block_info: std::sync::Arc>, compiled_contract_code_cache: std::sync::Arc, contract_code_cache: std::sync::Arc< std::sync::RwLock< @@ -189,7 +189,7 @@ impl ServerContext { s3_bucket_name, genesis_config, blocks_cache, - final_block_height, + final_block_info, compiled_contract_code_cache, contract_code_cache, max_gas_burnt, diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 92de26d2..723acf2e 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -1,3 +1,4 @@ +use crate::modules::blocks::FinaleBlockInfo; use crate::utils::{ get_final_cache_block, gigabytes_to_bytes, update_final_block_height_regularly, }; @@ -103,12 +104,9 @@ async fn main() -> anyhow::Result<()> { block_cache_size_in_bytes, ))); - let final_block_height = - std::sync::Arc::new(std::sync::atomic::AtomicU64::new(final_block.block_height)); - blocks_cache - .write() - .unwrap() - .put(final_block.block_height, final_block); + let finale_block_info = std::sync::Arc::new(std::sync::RwLock::new( + FinaleBlockInfo::new(&near_rpc_client, &blocks_cache).await, + )); let compiled_contract_code_cache = std::sync::Arc::new(config::CompiledCodeCache { local_cache: std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new( @@ -151,19 +149,24 @@ async fn main() -> anyhow::Result<()> { s3_config, )), db_manager, - near_rpc_client, + near_rpc_client.clone(), opts.s3_bucket_name.clone(), genesis_config, std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&final_block_height), + std::sync::Arc::clone(&finale_block_info), compiled_contract_code_cache, contract_code_cache, opts.max_gas_burnt, ); tokio::spawn(async move { - update_final_block_height_regularly(final_block_height.clone(), blocks_cache, lake_config) - .await + update_final_block_height_regularly( + blocks_cache, + finale_block_info, + lake_config, + near_rpc_client, + ) + .await }); let rpc = Server::new() diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 5ce6f126..f2659853 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -328,8 +328,11 @@ pub async fn fetch_block( }, near_primitives::types::BlockReference::Finality(finality) => match finality { near_primitives::types::Finality::Final => Ok(data - .final_block_height - .load(std::sync::atomic::Ordering::SeqCst)), + .final_block_info + .read() + .unwrap() + .final_block_cache + .block_height), _ => Err( near_jsonrpc_primitives::types::blocks::RpcBlockError::InternalError { error_message: "Finality other than final is not supported".to_string(), diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index 010a167a..256aa71d 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -12,3 +12,35 @@ pub struct CacheBlock { pub state_root: near_primitives::hash::CryptoHash, pub epoch_id: near_primitives::hash::CryptoHash, } + +#[derive(Debug)] +pub struct FinaleBlockInfo { + pub final_block_cache: CacheBlock, + pub current_protocol_config: near_chain_configs::ProtocolConfigView, +} + +impl FinaleBlockInfo { + pub async fn new( + near_rpc_client: &crate::utils::JsonRpcClient, + blocks_cache: &std::sync::Arc< + std::sync::RwLock>, + >, + ) -> Self { + let final_block = crate::utils::get_final_cache_block(near_rpc_client) + .await + .expect("Error to get final block"); + let protocol_config = crate::utils::get_current_protocol_config(near_rpc_client) + .await + .expect("Error to get protocol_config"); + + blocks_cache + .write() + .unwrap() + .put(final_block.block_height, final_block); + + Self { + final_block_cache: final_block, + current_protocol_config: protocol_config, + } + } +} diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index 8786d7bd..d5109f8c 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -97,8 +97,11 @@ pub async fn fetch_block_from_cache_or_get( near_primitives::types::BlockReference::Finality(_) => { // Returns the final_block_height for all the finalities. Ok(data - .final_block_height - .load(std::sync::atomic::Ordering::SeqCst)) + .final_block_info + .read() + .unwrap() + .final_block_cache + .block_height) } // TODO: return the height of the first block height from S3 (cache it once on the start) near_primitives::types::BlockReference::SyncCheckpoint(_) => Err( diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index c99078cd..22374da7 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -2,12 +2,12 @@ use crate::config::ServerContext; use crate::errors::RPCError; use crate::modules::blocks::utils::fetch_block_from_cache_or_get; use crate::modules::network::{ - friendly_memory_size_format, parse_validator_request, StatusResponse, + clone_protocol_config, friendly_memory_size_format, parse_validator_request, StatusResponse, }; #[cfg(feature = "shadow_data_consistency")] use crate::utils::shadow_compare_results; use jsonrpc_v2::{Data, Params}; -use near_primitives::types::EpochReference; +use near_primitives::types::{BlockId, EpochReference}; use sysinfo::{System, SystemExt}; pub async fn status( @@ -48,8 +48,11 @@ pub async fn status( ), final_block_height: data - .final_block_height - .load(std::sync::atomic::Ordering::SeqCst), + .final_block_info + .read() + .unwrap() + .final_block_cache + .block_height, }; Ok(status) } @@ -71,7 +74,35 @@ pub async fn validators( .map_err(|err| RPCError::parse_error(&err.to_string()))?; tracing::debug!("`validators` called with parameters: {:?}", request); crate::metrics::VALIDATORS_REQUESTS_TOTAL.inc(); - let validator_info = validators_call(&data, &request).await; + // Latest epoch validators fetches from the Near RPC node + if let EpochReference::Latest = &request.epoch_reference { + crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); + let validator_info = data.near_rpc_client.call(request).await?; + return Ok( + near_jsonrpc_primitives::types::validator::RpcValidatorResponse { validator_info }, + ); + }; + + // Current epoch validators fetches from the Near RPC node + if let EpochReference::EpochId(epoch_id) = &request.epoch_reference { + if data + .final_block_info + .read() + .unwrap() + .final_block_cache + .epoch_id + == epoch_id.0 + { + let validator_info = data.near_rpc_client.call(request).await?; + return Ok( + near_jsonrpc_primitives::types::validator::RpcValidatorResponse { validator_info }, + ); + } + }; + + let validator_info = validators_call(&data, &request) + .await + .map_err(|_err| near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch); #[cfg(feature = "shadow_data_consistency")] { @@ -177,41 +208,33 @@ pub async fn protocol_config( async fn validators_call( data: &Data, validator_request: &near_jsonrpc_primitives::types::validator::RpcValidatorRequest, -) -> Result< - near_primitives::views::EpochValidatorInfo, - near_jsonrpc_primitives::types::validator::RpcValidatorError, -> { - let epoch_id = match &validator_request.epoch_reference { - EpochReference::EpochId(epoch_id) => epoch_id.0, - EpochReference::BlockId(block_id) => { - let block_reference = near_primitives::types::BlockReference::BlockId(block_id.clone()); - let block = fetch_block_from_cache_or_get(data, block_reference) - .await - .map_err(|_err| { - near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch - })?; - block.epoch_id +) -> anyhow::Result { + let validators = match &validator_request.epoch_reference { + EpochReference::EpochId(epoch_id) => { + data.db_manager + .get_validators_by_epoch_id(epoch_id.0) + .await? } - EpochReference::Latest => { - crate::metrics::OPTIMISTIC_REQUESTS_TOTAL.inc(); - let block_reference = near_primitives::types::BlockReference::Finality( - near_primitives::types::Finality::Final, - ); - let block = fetch_block_from_cache_or_get(data, block_reference) - .await - .map_err(|_err| { - near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch - })?; - block.epoch_id + EpochReference::BlockId(block_id) => { + let block_height = match block_id { + BlockId::Height(block_height) => *block_height, + BlockId::Hash(_) => { + let block_reference = + near_primitives::types::BlockReference::BlockId(block_id.clone()); + let block = fetch_block_from_cache_or_get(data, block_reference) + .await + .map_err(|_err| { + near_jsonrpc_primitives::types::validator::RpcValidatorError::UnknownEpoch + })?; + block.block_height + } + }; + data.db_manager + .get_validators_by_end_block_height(block_height) + .await? } + _ => anyhow::bail!("Epoch reference is not supported"), }; - let validators = data - .db_manager - .get_validators_by_epoch_id(epoch_id) - .await - .map_err(|_err| { - near_jsonrpc_primitives::types::validator::RpcValidatorError::ValidatorInfoUnavailable - })?; Ok(validators.validators_info) } @@ -229,14 +252,29 @@ async fn protocol_config_call( error_message: err.to_string(), } })?; - let protocol_config = data - .db_manager - .get_protocol_config_by_epoch_id(block.epoch_id) - .await - .map_err(|err| { - near_jsonrpc_primitives::types::config::RpcProtocolConfigError::InternalError { - error_message: err.to_string(), - } - })?; + let protocol_config = if data + .final_block_info + .read() + .unwrap() + .final_block_cache + .epoch_id + == block.epoch_id + { + let protocol_config = &data + .final_block_info + .read() + .unwrap() + .current_protocol_config; + clone_protocol_config(protocol_config) + } else { + data.db_manager + .get_protocol_config_by_epoch_id(block.epoch_id) + .await + .map_err(|err| { + near_jsonrpc_primitives::types::config::RpcProtocolConfigError::InternalError { + error_message: err.to_string(), + } + })? + }; Ok(protocol_config) } diff --git a/rpc-server/src/modules/network/mod.rs b/rpc-server/src/modules/network/mod.rs index 438ab7f2..bb403bdc 100644 --- a/rpc-server/src/modules/network/mod.rs +++ b/rpc-server/src/modules/network/mod.rs @@ -56,3 +56,43 @@ pub fn friendly_memory_size_format(memory_size_bytes: usize) -> String { ) } } + +/// cannot move out of dereference of `std::sync::RwLockReadGuard` +/// move occurs because value `current_protocol_config` has type `ProtocolConfigView`, +/// which does not implement the `Copy` trait +pub fn clone_protocol_config( + protocol_config: &near_chain_configs::ProtocolConfigView, +) -> near_chain_configs::ProtocolConfigView { + near_chain_configs::ProtocolConfigView { + protocol_version: protocol_config.protocol_version, + genesis_time: protocol_config.genesis_time, + chain_id: protocol_config.chain_id.clone(), + genesis_height: protocol_config.genesis_height, + num_block_producer_seats: protocol_config.num_block_producer_seats, + num_block_producer_seats_per_shard: protocol_config + .num_block_producer_seats_per_shard + .clone(), + avg_hidden_validator_seats_per_shard: protocol_config + .avg_hidden_validator_seats_per_shard + .clone(), + dynamic_resharding: protocol_config.dynamic_resharding, + protocol_upgrade_stake_threshold: protocol_config.protocol_upgrade_stake_threshold, + epoch_length: protocol_config.epoch_length, + gas_limit: protocol_config.gas_limit, + min_gas_price: protocol_config.min_gas_price, + max_gas_price: protocol_config.max_gas_price, + block_producer_kickout_threshold: protocol_config.block_producer_kickout_threshold, + chunk_producer_kickout_threshold: protocol_config.chunk_producer_kickout_threshold, + online_min_threshold: protocol_config.online_min_threshold, + online_max_threshold: protocol_config.online_max_threshold, + gas_price_adjustment_rate: protocol_config.gas_price_adjustment_rate, + runtime_config: protocol_config.runtime_config.clone(), + transaction_validity_period: protocol_config.transaction_validity_period, + protocol_reward_rate: protocol_config.protocol_reward_rate, + max_inflation_rate: protocol_config.max_inflation_rate, + num_blocks_per_year: protocol_config.num_blocks_per_year, + protocol_treasury_account: protocol_config.protocol_treasury_account.clone(), + fishermen_threshold: protocol_config.fishermen_threshold, + minimum_stake_divisor: protocol_config.minimum_stake_divisor, + } +} diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index 0cafc191..8bb60c23 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::CacheBlock; +use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; #[cfg(feature = "shadow_data_consistency")] use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode}; use futures::StreamExt; @@ -114,10 +114,23 @@ pub async fn get_final_cache_block(near_rpc_client: &JsonRpcClient) -> Option anyhow::Result { + let params = + near_jsonrpc_client::methods::EXPERIMENTAL_protocol_config::RpcProtocolConfigRequest { + block_reference: near_primitives::types::BlockReference::Finality( + near_primitives::types::Finality::Final, + ), + }; + Ok(near_rpc_client.call(params).await?) +} + async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, blocks_cache: std::sync::Arc>>, - final_block_height: std::sync::Arc, + finale_block_info: std::sync::Arc>, + near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<()> { let block = CacheBlock { block_hash: streamer_message.block.header.hash, @@ -129,16 +142,31 @@ async fn handle_streamer_message( state_root: streamer_message.block.header.prev_state_root, epoch_id: streamer_message.block.header.epoch_id, }; - final_block_height.store(block.block_height, std::sync::atomic::Ordering::SeqCst); + + // let mut finale_block_info_lock = finale_block_info.write().await; + // let current_epoch_id = finale_block_info_lock.final_block_cache.epoch_id; + if finale_block_info.read().unwrap().final_block_cache.epoch_id + != streamer_message.block.header.epoch_id + { + tracing::info!( + "New epoch started: {:?}", + streamer_message.block.header.epoch_id + ); + // let protocol_config = ; + finale_block_info.write().unwrap().current_protocol_config = + get_current_protocol_config(near_rpc_client).await.unwrap(); + } + finale_block_info.write().unwrap().final_block_cache = block; blocks_cache.write().unwrap().put(block.block_height, block); crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?); Ok(()) } pub async fn update_final_block_height_regularly( - final_block_height: std::sync::Arc, blocks_cache: std::sync::Arc>>, + finale_block_info: std::sync::Arc>, lake_config: near_lake_framework::LakeConfig, + near_rpc_client: JsonRpcClient, ) -> anyhow::Result<()> { tracing::info!("Task to get and store final block in the cache started"); let (sender, stream) = near_lake_framework::streamer(lake_config); @@ -147,7 +175,8 @@ pub async fn update_final_block_height_regularly( handle_streamer_message( streamer_message, std::sync::Arc::clone(&blocks_cache), - std::sync::Arc::clone(&final_block_height), + std::sync::Arc::clone(&finale_block_info), + &near_rpc_client, ) }) .buffer_unordered(1usize);