Skip to content

Commit

Permalink
improvement according pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Jan 19, 2024
1 parent 157e13b commit ff51683
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rpc-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ clap = { version = "3.2.22", features = ["color", "derive", "env"] }
dotenv = "0.15.0"
erased-serde = "0.3.23"
futures = "0.3.24"
futures-locks = "0.7.1"
hex = "0.4.3"
http = "0.2.8"
jsonrpc-v2 = { git = "https://github.com/kobayurii/jsonrpc-v2", rev = "95e7b1d2567ae841163af212a3f25abb6862becb" }
Expand Down
25 changes: 14 additions & 11 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::modules::blocks::{CacheBlock, FinaleBlockInfo};
use crate::modules::blocks::{CacheBlock, FinalBlockInfo};
use clap::Parser;
use futures::executor::block_on;

#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
Expand Down Expand Up @@ -153,11 +154,13 @@ pub struct ServerContext {
pub s3_bucket_name: String,
pub genesis_config: near_chain_configs::GenesisConfig,
pub blocks_cache:
std::sync::Arc<std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>>,
pub final_block_info: std::sync::Arc<std::sync::RwLock<FinaleBlockInfo>>,
std::sync::Arc<futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>>,
pub final_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
pub compiled_contract_code_cache: std::sync::Arc<CompiledCodeCache>,
pub contract_code_cache: std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<near_primitives::hash::CryptoHash, Vec<u8>>>,
futures_locks::RwLock<
crate::cache::LruMemoryCache<near_primitives::hash::CryptoHash, Vec<u8>>,
>,
>,
pub max_gas_burnt: near_primitives::types::Gas,
}
Expand All @@ -171,12 +174,12 @@ impl ServerContext {
s3_bucket_name: String,
genesis_config: near_chain_configs::GenesisConfig,
blocks_cache: std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
final_block_info: std::sync::Arc<std::sync::RwLock<FinaleBlockInfo>>,
final_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
compiled_contract_code_cache: std::sync::Arc<CompiledCodeCache>,
contract_code_cache: std::sync::Arc<
std::sync::RwLock<
futures_locks::RwLock<
crate::cache::LruMemoryCache<near_primitives::hash::CryptoHash, Vec<u8>>,
>,
>,
Expand All @@ -198,7 +201,7 @@ impl ServerContext {
}
pub struct CompiledCodeCache {
pub local_cache: std::sync::Arc<
std::sync::RwLock<
futures_locks::RwLock<
crate::cache::LruMemoryCache<
near_primitives::hash::CryptoHash,
near_vm_runner::logic::CompiledContract,
Expand All @@ -213,18 +216,18 @@ impl near_vm_runner::logic::CompiledContractCache for CompiledCodeCache {
key: &near_primitives::hash::CryptoHash,
value: near_vm_runner::logic::CompiledContract,
) -> std::io::Result<()> {
self.local_cache.write().unwrap().put(*key, value);
block_on(self.local_cache.write()).put(*key, value);
Ok(())
}

fn get(
&self,
key: &near_primitives::hash::CryptoHash,
) -> std::io::Result<Option<near_vm_runner::logic::CompiledContract>> {
Ok(self.local_cache.write().unwrap().get(key).cloned())
Ok(block_on(self.local_cache.write()).get(key).cloned())
}

fn has(&self, key: &near_primitives::hash::CryptoHash) -> std::io::Result<bool> {
Ok(self.local_cache.write().unwrap().contains(key))
Ok(block_on(self.local_cache.write()).contains(key))
}
}
12 changes: 6 additions & 6 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::modules::blocks::FinaleBlockInfo;
use crate::modules::blocks::FinalBlockInfo;
use crate::utils::{
get_final_cache_block, gigabytes_to_bytes, update_final_block_height_regularly,
};
Expand Down Expand Up @@ -100,20 +100,20 @@ async fn main() -> anyhow::Result<()> {
)
.await;

let blocks_cache = std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new(
let blocks_cache = std::sync::Arc::new(futures_locks::RwLock::new(cache::LruMemoryCache::new(
block_cache_size_in_bytes,
)));

let finale_block_info = std::sync::Arc::new(std::sync::RwLock::new(
FinaleBlockInfo::new(&near_rpc_client, &blocks_cache).await,
let finale_block_info = std::sync::Arc::new(futures_locks::RwLock::new(
FinalBlockInfo::new(&near_rpc_client, &blocks_cache).await,
));

let compiled_contract_code_cache = std::sync::Arc::new(config::CompiledCodeCache {
local_cache: std::sync::Arc::new(std::sync::RwLock::new(cache::LruMemoryCache::new(
local_cache: std::sync::Arc::new(futures_locks::RwLock::new(cache::LruMemoryCache::new(
contract_code_cache_size,
))),
});
let contract_code_cache = std::sync::Arc::new(std::sync::RwLock::new(
let contract_code_cache = std::sync::Arc::new(futures_locks::RwLock::new(
cache::LruMemoryCache::new(contract_code_cache_size),
));

Expand Down
2 changes: 1 addition & 1 deletion rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub async fn fetch_block(
near_primitives::types::Finality::Final => Ok(data
.final_block_info
.read()
.unwrap()
.await
.final_block_cache
.block_height),
_ => Err(
Expand Down
8 changes: 4 additions & 4 deletions rpc-server/src/modules/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ pub struct CacheBlock {
}

#[derive(Debug)]
pub struct FinaleBlockInfo {
pub struct FinalBlockInfo {
pub final_block_cache: CacheBlock,
pub current_protocol_config: near_chain_configs::ProtocolConfigView,
}

impl FinaleBlockInfo {
impl FinalBlockInfo {
pub async fn new(
near_rpc_client: &crate::utils::JsonRpcClient,
blocks_cache: &std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
) -> Self {
let final_block = crate::utils::get_final_cache_block(near_rpc_client)
Expand All @@ -35,7 +35,7 @@ impl FinaleBlockInfo {

blocks_cache
.write()
.unwrap()
.await
.put(final_block.block_height, final_block);

Self {
Expand Down
11 changes: 3 additions & 8 deletions rpc-server/src/modules/blocks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub async fn fetch_block_from_cache_or_get(
Ok(data
.final_block_info
.read()
.unwrap()
.await
.final_block_cache
.block_height)
}
Expand All @@ -110,12 +110,7 @@ pub async fn fetch_block_from_cache_or_get(
},
),
};
let block = data
.blocks_cache
.write()
.unwrap()
.get(&block_height?)
.cloned();
let block = data.blocks_cache.write().await.get(&block_height?).cloned();
match block {
Some(block) => Ok(block),
None => {
Expand All @@ -133,7 +128,7 @@ pub async fn fetch_block_from_cache_or_get(

data.blocks_cache
.write()
.unwrap()
.await
.put(block_from_s3.block_view.header.height, block);
Ok(block)
}
Expand Down
22 changes: 7 additions & 15 deletions rpc-server/src/modules/network/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ pub async fn status(
let sys = System::new_all();
let total_memory = sys.total_memory();
let used_memory = sys.used_memory();
let blocks_cache = data.blocks_cache.read().unwrap();
let contract_code_cache = data.contract_code_cache.read().unwrap();
let compiled_contract_code_cache = data
.compiled_contract_code_cache
.local_cache
.read()
.unwrap();
let blocks_cache = data.blocks_cache.read().await;
let contract_code_cache = data.contract_code_cache.read().await;
let compiled_contract_code_cache = data.compiled_contract_code_cache.local_cache.read().await;
let status = StatusResponse {
total_memory: friendly_memory_size_format(total_memory as usize),
used_memory: friendly_memory_size_format(used_memory as usize),
Expand All @@ -49,7 +45,7 @@ pub async fn status(
final_block_height: data
.final_block_info
.read()
.unwrap()
.await
.final_block_cache
.block_height,
};
Expand Down Expand Up @@ -87,7 +83,7 @@ pub async fn validators(
if data
.final_block_info
.read()
.unwrap()
.await
.final_block_cache
.epoch_id
== epoch_id.0
Expand Down Expand Up @@ -254,16 +250,12 @@ async fn protocol_config_call(
let protocol_config = if data
.final_block_info
.read()
.unwrap()
.await
.final_block_cache
.epoch_id
== block.epoch_id
{
let protocol_config = &data
.final_block_info
.read()
.unwrap()
.current_protocol_config;
let protocol_config = &data.final_block_info.read().await.current_protocol_config;
clone_protocol_config(protocol_config)
} else {
data.db_manager
Expand Down
2 changes: 1 addition & 1 deletion rpc-server/src/modules/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn friendly_memory_size_format(memory_size_bytes: usize) -> String {
}
}

/// cannot move out of dereference of `std::sync::RwLockReadGuard<FinaleBlockInfo>`
/// cannot move out of dereference of `futures_locks::RwLockReadGuard<FinalBlockInfo>`
/// move occurs because value `current_protocol_config` has type `ProtocolConfigView`,
/// which does not implement the `Copy` trait
pub fn clone_protocol_config(
Expand Down
8 changes: 5 additions & 3 deletions rpc-server/src/modules/queries/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ pub async fn run_contract(
db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>,
compiled_contract_code_cache: &std::sync::Arc<CompiledCodeCache>,
contract_code_cache: &std::sync::Arc<
std::sync::RwLock<crate::cache::LruMemoryCache<near_primitives::hash::CryptoHash, Vec<u8>>>,
futures_locks::RwLock<
crate::cache::LruMemoryCache<near_primitives::hash::CryptoHash, Vec<u8>>,
>,
>,
block: crate::modules::blocks::CacheBlock,
max_gas_burnt: near_primitives::types::Gas,
Expand All @@ -238,7 +240,7 @@ pub async fn run_contract(

let code: Option<Vec<u8>> = contract_code_cache
.write()
.unwrap()
.await
.get(&contract.data.code_hash())
.cloned();

Expand All @@ -255,7 +257,7 @@ pub async fn run_contract(
})?;
contract_code_cache
.write()
.unwrap()
.await
.put(contract.data.code_hash(), code.data.clone());
near_primitives::contract::ContractCode::new(code.data, Some(contract.data.code_hash()))
}
Expand Down
27 changes: 14 additions & 13 deletions rpc-server/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::modules::blocks::{CacheBlock, FinaleBlockInfo};
use crate::modules::blocks::{CacheBlock, FinalBlockInfo};
#[cfg(feature = "shadow_data_consistency")]
use assert_json_diff::{assert_json_matches_no_panic, CompareMode, Config, NumericMode};
use futures::StreamExt;
Expand Down Expand Up @@ -128,8 +128,10 @@ pub async fn get_current_protocol_config(

async fn handle_streamer_message(
streamer_message: near_indexer_primitives::StreamerMessage,
blocks_cache: std::sync::Arc<std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>>,
finale_block_info: std::sync::Arc<std::sync::RwLock<FinaleBlockInfo>>,
blocks_cache: std::sync::Arc<
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
finale_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
near_rpc_client: &JsonRpcClient,
) -> anyhow::Result<()> {
let block = CacheBlock {
Expand All @@ -143,28 +145,27 @@ async fn handle_streamer_message(
epoch_id: streamer_message.block.header.epoch_id,
};

// let mut finale_block_info_lock = finale_block_info.write().await;
// let current_epoch_id = finale_block_info_lock.final_block_cache.epoch_id;
if finale_block_info.read().unwrap().final_block_cache.epoch_id
if finale_block_info.read().await.final_block_cache.epoch_id
!= streamer_message.block.header.epoch_id
{
tracing::info!(
"New epoch started: {:?}",
streamer_message.block.header.epoch_id
);
// let protocol_config = ;
finale_block_info.write().unwrap().current_protocol_config =
get_current_protocol_config(near_rpc_client).await.unwrap();
finale_block_info.write().await.current_protocol_config =
get_current_protocol_config(near_rpc_client).await?;
}
finale_block_info.write().unwrap().final_block_cache = block;
blocks_cache.write().unwrap().put(block.block_height, block);
finale_block_info.write().await.final_block_cache = block;
blocks_cache.write().await.put(block.block_height, block);
crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?);
Ok(())
}

pub async fn update_final_block_height_regularly(
blocks_cache: std::sync::Arc<std::sync::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>>,
finale_block_info: std::sync::Arc<std::sync::RwLock<FinaleBlockInfo>>,
blocks_cache: std::sync::Arc<
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
finale_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
lake_config: near_lake_framework::LakeConfig,
near_rpc_client: JsonRpcClient,
) -> anyhow::Result<()> {
Expand Down

0 comments on commit ff51683

Please sign in to comment.