diff --git a/Cargo.lock b/Cargo.lock index c5aab71b..3242d60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4550,6 +4550,7 @@ dependencies = [ "dotenv", "erased-serde", "futures", + "futures-locks", "hex", "http", "jsonrpc-v2", diff --git a/rpc-server/Cargo.toml b/rpc-server/Cargo.toml index b8450e81..574ec5d6 100644 --- a/rpc-server/Cargo.toml +++ b/rpc-server/Cargo.toml @@ -16,6 +16,7 @@ clap = { version = "3.2.22", features = ["color", "derive", "env"] } dotenv = "0.15.0" erased-serde = "0.3.23" futures = "0.3.24" +futures-locks = "0.7.1" hex = "0.4.3" http = "0.2.8" jsonrpc-v2 = { git = "https://github.com/kobayurii/jsonrpc-v2", rev = "95e7b1d2567ae841163af212a3f25abb6862becb" } diff --git a/rpc-server/src/config.rs b/rpc-server/src/config.rs index 86137c35..f379f9bb 100644 --- a/rpc-server/src/config.rs +++ b/rpc-server/src/config.rs @@ -1,5 +1,6 @@ -use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; +use crate::modules::blocks::{CacheBlock, FinalBlockInfo}; use clap::Parser; +use futures::executor::block_on; #[derive(Parser)] #[clap(author, version, about, long_about = None)] @@ -153,11 +154,13 @@ pub struct ServerContext { pub s3_bucket_name: String, pub genesis_config: near_chain_configs::GenesisConfig, pub blocks_cache: - std::sync::Arc>>, - pub final_block_info: std::sync::Arc>, + std::sync::Arc>>, + pub final_block_info: std::sync::Arc>, pub compiled_contract_code_cache: std::sync::Arc, pub contract_code_cache: std::sync::Arc< - std::sync::RwLock>>, + futures_locks::RwLock< + crate::cache::LruMemoryCache>, + >, >, pub max_gas_burnt: near_primitives::types::Gas, } @@ -171,12 +174,12 @@ impl ServerContext { s3_bucket_name: String, genesis_config: near_chain_configs::GenesisConfig, blocks_cache: std::sync::Arc< - std::sync::RwLock>, + futures_locks::RwLock>, >, - final_block_info: std::sync::Arc>, + final_block_info: std::sync::Arc>, compiled_contract_code_cache: std::sync::Arc, contract_code_cache: std::sync::Arc< - std::sync::RwLock< + futures_locks::RwLock< crate::cache::LruMemoryCache>, >, >, @@ -198,7 +201,7 @@ impl ServerContext { } pub struct CompiledCodeCache { pub local_cache: std::sync::Arc< - std::sync::RwLock< + futures_locks::RwLock< crate::cache::LruMemoryCache< near_primitives::hash::CryptoHash, near_vm_runner::logic::CompiledContract, @@ -213,7 +216,7 @@ impl near_vm_runner::logic::CompiledContractCache for CompiledCodeCache { key: &near_primitives::hash::CryptoHash, value: near_vm_runner::logic::CompiledContract, ) -> std::io::Result<()> { - self.local_cache.write().unwrap().put(*key, value); + block_on(self.local_cache.write()).put(*key, value); Ok(()) } @@ -221,10 +224,10 @@ impl near_vm_runner::logic::CompiledContractCache for CompiledCodeCache { &self, key: &near_primitives::hash::CryptoHash, ) -> std::io::Result> { - Ok(self.local_cache.write().unwrap().get(key).cloned()) + Ok(block_on(self.local_cache.write()).get(key).cloned()) } fn has(&self, key: &near_primitives::hash::CryptoHash) -> std::io::Result { - Ok(self.local_cache.write().unwrap().contains(key)) + Ok(block_on(self.local_cache.write()).contains(key)) } } diff --git a/rpc-server/src/main.rs b/rpc-server/src/main.rs index 723acf2e..0ec0f829 100644 --- a/rpc-server/src/main.rs +++ b/rpc-server/src/main.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::FinaleBlockInfo; +use crate::modules::blocks::FinalBlockInfo; use crate::utils::{ get_final_cache_block, gigabytes_to_bytes, update_final_block_height_regularly, }; @@ -100,20 +100,20 @@ async fn main() -> anyhow::Result<()> { ) .await; - let blocks_cache = std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new( + let blocks_cache = std::sync::Arc::new(futures_locks::RwLock::new(cache::LruMemoryCache::new( block_cache_size_in_bytes, ))); - let finale_block_info = std::sync::Arc::new(std::sync::RwLock::new( - FinaleBlockInfo::new(&near_rpc_client, &blocks_cache).await, + let finale_block_info = std::sync::Arc::new(futures_locks::RwLock::new( + FinalBlockInfo::new(&near_rpc_client, &blocks_cache).await, )); let compiled_contract_code_cache = std::sync::Arc::new(config::CompiledCodeCache { - local_cache: std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new( + local_cache: std::sync::Arc::new(futures_locks::RwLock::new(cache::LruMemoryCache::new( contract_code_cache_size, ))), }); - let contract_code_cache = std::sync::Arc::new(std::sync::RwLock::new( + let contract_code_cache = std::sync::Arc::new(futures_locks::RwLock::new( cache::LruMemoryCache::new(contract_code_cache_size), )); diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index f2659853..d188ce66 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -330,7 +330,7 @@ pub async fn fetch_block( near_primitives::types::Finality::Final => Ok(data .final_block_info .read() - .unwrap() + .await .final_block_cache .block_height), _ => Err( diff --git a/rpc-server/src/modules/blocks/mod.rs b/rpc-server/src/modules/blocks/mod.rs index 256aa71d..9e13b820 100644 --- a/rpc-server/src/modules/blocks/mod.rs +++ b/rpc-server/src/modules/blocks/mod.rs @@ -14,16 +14,16 @@ pub struct CacheBlock { } #[derive(Debug)] -pub struct FinaleBlockInfo { +pub struct FinalBlockInfo { pub final_block_cache: CacheBlock, pub current_protocol_config: near_chain_configs::ProtocolConfigView, } -impl FinaleBlockInfo { +impl FinalBlockInfo { pub async fn new( near_rpc_client: &crate::utils::JsonRpcClient, blocks_cache: &std::sync::Arc< - std::sync::RwLock>, + futures_locks::RwLock>, >, ) -> Self { let final_block = crate::utils::get_final_cache_block(near_rpc_client) @@ -35,7 +35,7 @@ impl FinaleBlockInfo { blocks_cache .write() - .unwrap() + .await .put(final_block.block_height, final_block); Self { diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index d5109f8c..b23ab2ec 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -99,7 +99,7 @@ pub async fn fetch_block_from_cache_or_get( Ok(data .final_block_info .read() - .unwrap() + .await .final_block_cache .block_height) } @@ -110,12 +110,7 @@ pub async fn fetch_block_from_cache_or_get( }, ), }; - let block = data - .blocks_cache - .write() - .unwrap() - .get(&block_height?) - .cloned(); + let block = data.blocks_cache.write().await.get(&block_height?).cloned(); match block { Some(block) => Ok(block), None => { @@ -133,7 +128,7 @@ pub async fn fetch_block_from_cache_or_get( data.blocks_cache .write() - .unwrap() + .await .put(block_from_s3.block_view.header.height, block); Ok(block) } diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index bee6a881..0cfb2812 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -16,13 +16,9 @@ pub async fn status( let sys = System::new_all(); let total_memory = sys.total_memory(); let used_memory = sys.used_memory(); - let blocks_cache = data.blocks_cache.read().unwrap(); - let contract_code_cache = data.contract_code_cache.read().unwrap(); - let compiled_contract_code_cache = data - .compiled_contract_code_cache - .local_cache - .read() - .unwrap(); + let blocks_cache = data.blocks_cache.read().await; + let contract_code_cache = data.contract_code_cache.read().await; + let compiled_contract_code_cache = data.compiled_contract_code_cache.local_cache.read().await; let status = StatusResponse { total_memory: friendly_memory_size_format(total_memory as usize), used_memory: friendly_memory_size_format(used_memory as usize), @@ -49,7 +45,7 @@ pub async fn status( final_block_height: data .final_block_info .read() - .unwrap() + .await .final_block_cache .block_height, }; @@ -87,7 +83,7 @@ pub async fn validators( if data .final_block_info .read() - .unwrap() + .await .final_block_cache .epoch_id == epoch_id.0 @@ -254,16 +250,12 @@ async fn protocol_config_call( let protocol_config = if data .final_block_info .read() - .unwrap() + .await .final_block_cache .epoch_id == block.epoch_id { - let protocol_config = &data - .final_block_info - .read() - .unwrap() - .current_protocol_config; + let protocol_config = &data.final_block_info.read().await.current_protocol_config; clone_protocol_config(protocol_config) } else { data.db_manager diff --git a/rpc-server/src/modules/network/mod.rs b/rpc-server/src/modules/network/mod.rs index bb403bdc..908df986 100644 --- a/rpc-server/src/modules/network/mod.rs +++ b/rpc-server/src/modules/network/mod.rs @@ -57,7 +57,7 @@ pub fn friendly_memory_size_format(memory_size_bytes: usize) -> String { } } -/// cannot move out of dereference of `std::sync::RwLockReadGuard` +/// cannot move out of dereference of `futures_locks::RwLockReadGuard` /// move occurs because value `current_protocol_config` has type `ProtocolConfigView`, /// which does not implement the `Copy` trait pub fn clone_protocol_config( diff --git a/rpc-server/src/modules/queries/utils.rs b/rpc-server/src/modules/queries/utils.rs index 73d9d9bd..f9c6a970 100644 --- a/rpc-server/src/modules/queries/utils.rs +++ b/rpc-server/src/modules/queries/utils.rs @@ -224,7 +224,9 @@ pub async fn run_contract( db_manager: std::sync::Arc>, compiled_contract_code_cache: &std::sync::Arc, contract_code_cache: &std::sync::Arc< - std::sync::RwLock>>, + futures_locks::RwLock< + crate::cache::LruMemoryCache>, + >, >, block: crate::modules::blocks::CacheBlock, max_gas_burnt: near_primitives::types::Gas, @@ -238,7 +240,7 @@ pub async fn run_contract( let code: Option> = contract_code_cache .write() - .unwrap() + .await .get(&contract.data.code_hash()) .cloned(); @@ -255,7 +257,7 @@ pub async fn run_contract( })?; contract_code_cache .write() - .unwrap() + .await .put(contract.data.code_hash(), code.data.clone()); near_primitives::contract::ContractCode::new(code.data, Some(contract.data.code_hash())) } diff --git a/rpc-server/src/utils.rs b/rpc-server/src/utils.rs index 8bb60c23..41663557 100644 --- a/rpc-server/src/utils.rs +++ b/rpc-server/src/utils.rs @@ -1,4 +1,4 @@ -use crate::modules::blocks::{CacheBlock, FinaleBlockInfo}; +use crate::modules::blocks::{CacheBlock, FinalBlockInfo}; #[cfg(feature = "shadow_data_consistency")] use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode}; use futures::StreamExt; @@ -128,8 +128,10 @@ pub async fn get_current_protocol_config( async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, - blocks_cache: std::sync::Arc>>, - finale_block_info: std::sync::Arc>, + blocks_cache: std::sync::Arc< + futures_locks::RwLock>, + >, + finale_block_info: std::sync::Arc>, near_rpc_client: &JsonRpcClient, ) -> anyhow::Result<()> { let block = CacheBlock { @@ -143,28 +145,27 @@ async fn handle_streamer_message( epoch_id: streamer_message.block.header.epoch_id, }; - // let mut finale_block_info_lock = finale_block_info.write().await; - // let current_epoch_id = finale_block_info_lock.final_block_cache.epoch_id; - if finale_block_info.read().unwrap().final_block_cache.epoch_id + if finale_block_info.read().await.final_block_cache.epoch_id != streamer_message.block.header.epoch_id { tracing::info!( "New epoch started: {:?}", streamer_message.block.header.epoch_id ); - // let protocol_config = ; - finale_block_info.write().unwrap().current_protocol_config = - get_current_protocol_config(near_rpc_client).await.unwrap(); + finale_block_info.write().await.current_protocol_config = + get_current_protocol_config(near_rpc_client).await?; } - finale_block_info.write().unwrap().final_block_cache = block; - blocks_cache.write().unwrap().put(block.block_height, block); + finale_block_info.write().await.final_block_cache = block; + blocks_cache.write().await.put(block.block_height, block); crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?); Ok(()) } pub async fn update_final_block_height_regularly( - blocks_cache: std::sync::Arc>>, - finale_block_info: std::sync::Arc>, + blocks_cache: std::sync::Arc< + futures_locks::RwLock>, + >, + finale_block_info: std::sync::Arc>, lake_config: near_lake_framework::LakeConfig, near_rpc_client: JsonRpcClient, ) -> anyhow::Result<()> {