Skip to content

Commit

Permalink
experiment with chunk and block cache
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Dec 26, 2024
1 parent eadfef4 commit 5d5228f
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 66 deletions.
23 changes: 18 additions & 5 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::string::ToString;

use futures::executor::block_on;
use near_indexer_primitives::IndexerShard;
use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig};

use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock};
use crate::utils;

static NEARD_VERSION: &str = env!("CARGO_PKG_VERSION");
static NEARD_BUILD: &str = env!("BUILD_VERSION");
Expand Down Expand Up @@ -58,6 +58,8 @@ pub struct ServerContext {
pub near_rpc_client: crate::utils::JsonRpcClient,
/// Blocks cache
pub blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>,
/// Chunks cache
pub chunks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, Vec<IndexerShard>>>,
/// Final block info include final_block_cache and current_validators_info
pub blocks_info_by_finality: std::sync::Arc<BlocksInfoByFinality>,
/// Cache to store compiled contract codes
Expand All @@ -84,17 +86,27 @@ pub struct ServerContext {
impl ServerContext {
pub async fn init(rpc_server_config: configuration::RpcServerConfig) -> anyhow::Result<Self> {
let contract_code_cache_size_in_bytes =
utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size).await;
crate::utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size)
.await;
let contract_code_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
contract_code_cache_size_in_bytes,
));

let block_cache_size_in_bytes =
utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
// let block_cache_size_in_bytes =
// crate::utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;

// For chunk block we use 5GB. make it configurable. temporary hardcoded
let block_cache_size_in_bytes = crate::utils::gigabytes_to_bytes(5.0).await;
let blocks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
block_cache_size_in_bytes,
));
let near_rpc_client = utils::JsonRpcClient::new(

// For chunk cache we use 5GB. make it configurable. temporary hardcoded
let chunk_cache_size_in_bytes = crate::utils::gigabytes_to_bytes(5.0).await;
let chunks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
chunk_cache_size_in_bytes,
));
let near_rpc_client = crate::utils::JsonRpcClient::new(
rpc_server_config.general.near_rpc_url.clone(),
rpc_server_config.general.near_archival_rpc_url.clone(),
);
Expand Down Expand Up @@ -163,6 +175,7 @@ impl ServerContext {
genesis_info,
near_rpc_client,
blocks_cache,
chunks_cache,
blocks_info_by_finality,
compiled_contract_code_cache,
contract_code_cache,
Expand Down
1 change: 1 addition & 0 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ async fn main() -> anyhow::Result<()> {
utils::task_regularly_update_blocks_by_finality(
std::sync::Arc::clone(&server_context.blocks_info_by_finality),
std::sync::Arc::clone(&server_context.blocks_cache),
std::sync::Arc::clone(&server_context.chunks_cache),
server_context.fastnear_client.clone(),
server_context.near_rpc_client.clone(),
)
Expand Down
177 changes: 150 additions & 27 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async fn changes_in_block_call(

let result = fetch_changes_in_block(
&data,
cache_block,
cache_block.clone(),
&params.block_reference,
"EXPERIMENTAL_changes_in_block",
)
Expand Down Expand Up @@ -309,7 +309,7 @@ async fn changes_in_block_by_type_call(

let result = fetch_changes_in_block_by_type(
&data,
cache_block,
cache_block.clone(),
&params.state_changes_request,
&params.block_reference,
"EXPERIMENTAL_changes",
Expand Down Expand Up @@ -430,6 +430,11 @@ pub async fn fetch_block(
.with_label_values(&[method_name, "cache"])
.inc();
data.blocks_info_by_finality.optimistic_block_view().await
} else if let Some(block) = data.blocks_cache.get(&block_height).await {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
block.block_view.clone()
} else {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "lake"])
Expand Down Expand Up @@ -506,11 +511,88 @@ pub async fn fetch_chunk(
)
.map(|block_height_shard_id| (block_height_shard_id.0, block_height_shard_id.1))?,
};
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "lake"])
.inc();
let chunk_view =
fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id.into()).await?;

let indexed_shards = if block_height
== data
.blocks_info_by_finality
.final_cache_block()
.await
.block_height
{
Some(data.blocks_info_by_finality.final_block_shards().await)
} else if block_height
== data
.blocks_info_by_finality
.optimistic_cache_block()
.await
.block_height
{
Some(data.blocks_info_by_finality.optimistic_block_shards().await)
} else {
data.chunks_cache.get(&block_height).await
};

let chunk_view = if let Some(shards) = indexed_shards {
let indexed_chunk_view = shards
.iter()
.filter_map(|shard| {
if shard.shard_id == shard_id {
Some(shard.clone())
} else {
None
}
})
.next()
.map(|shard| shard.chunk.clone())
.unwrap_or_default();
if let Some(chunk) = indexed_chunk_view {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
// We collect a list of local receipt ids to filter out local receipts from the chunk
let local_receipt_ids: Vec<near_indexer_primitives::CryptoHash> = chunk
.transactions
.iter()
.filter(|indexer_tx| {
indexer_tx.transaction.signer_id == indexer_tx.transaction.receiver_id
})
.map(|indexer_tx| {
*indexer_tx
.outcome
.execution_outcome
.outcome
.receipt_ids
.first()
.expect("Conversion receipt_id must be present in transaction outcome")
})
.collect();
near_primitives::views::ChunkView {
author: chunk.author,
header: chunk.header,
transactions: chunk
.transactions
.into_iter()
.map(|indexer_transaction| indexer_transaction.transaction)
.collect(),
receipts: chunk
.receipts
.into_iter()
.filter(|receipt| !local_receipt_ids.contains(&receipt.receipt_id))
.collect(),
}
} else {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "lake"])
.inc();
fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id.into()).await?
}
} else {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "lake"])
.inc();
fetch_chunk_from_fastnear(&data.fastnear_client, block_height, shard_id.into()).await?
};

// increase block category metrics
crate::metrics::increase_request_category_metrics(
data,
Expand All @@ -535,7 +617,7 @@ async fn fetch_changes_in_block(
near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockByTypeResponse,
near_jsonrpc::primitives::types::changes::RpcStateChangesError,
> {
let trie_keys = fetch_state_changes(data, cache_block, block_reference, method_name)
let trie_keys = fetch_state_changes(data, cache_block.clone(), block_reference, method_name)
.await
.map_err(|err| {
near_jsonrpc::primitives::types::changes::RpcStateChangesError::UnknownBlock {
Expand Down Expand Up @@ -624,7 +706,7 @@ async fn fetch_changes_in_block_by_type(
near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockResponse,
near_jsonrpc::primitives::types::changes::RpcStateChangesError,
> {
let changes = fetch_state_changes(data, cache_block, block_reference, method_name)
let changes = fetch_state_changes(data, cache_block.clone(), block_reference, method_name)
.await
.map_err(|err| {
near_jsonrpc::primitives::types::changes::RpcStateChangesError::UnknownBlock {
Expand Down Expand Up @@ -661,6 +743,9 @@ async fn fetch_state_changes(
"Failed to fetch shards! Finality::None is not supported by rpc_server",
))
} else {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
Ok(data
.blocks_info_by_finality
.optimistic_block_changes()
Expand All @@ -669,15 +754,14 @@ async fn fetch_state_changes(
}
near_primitives::types::Finality::DoomSlug
| near_primitives::types::Finality::Final => {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
Ok(data.blocks_info_by_finality.final_block_changes().await)
}
}
} else {
Ok(fetch_shards_by_cache_block(data, cache_block, method_name)
.await?
.into_iter()
.flat_map(|shard| shard.state_changes)
.collect())
Ok(fetch_shards_by_cache_block(data, cache_block, method_name).await?)
}
}

Expand All @@ -687,20 +771,59 @@ async fn fetch_shards_by_cache_block(
data: &Data<ServerContext>,
cache_block: crate::modules::blocks::CacheBlock,
method_name: &str,
) -> anyhow::Result<Vec<near_indexer_primitives::IndexerShard>> {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "lake"])
.inc();
match near_lake_framework::fastnear::fetchers::fetch_streamer_message(
&data.fastnear_client,
cache_block.block_height,
)
.await
) -> anyhow::Result<near_primitives::views::StateChangesView> {
if cache_block.block_height
== data
.blocks_info_by_finality
.optimistic_cache_block()
.await
.block_height
{
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
Ok(data
.blocks_info_by_finality
.optimistic_block_changes()
.await)
} else if cache_block.block_height
== data
.blocks_info_by_finality
.final_cache_block()
.await
.block_height
{
Some(streamer_message) => Ok(streamer_message.shards),
None => Err(anyhow::anyhow!(
"Failed to fetch shards for block {}",
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
Ok(data.blocks_info_by_finality.final_block_changes().await)
} else if let Some(shards) = data.chunks_cache.get(&cache_block.block_height).await {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "cache"])
.inc();
Ok(shards
.into_iter()
.flat_map(|shard| shard.state_changes)
.collect())
} else {
crate::metrics::REQUESTS_BLOCKS_COUNTERS
.with_label_values(&[method_name, "lake"])
.inc();
match near_lake_framework::fastnear::fetchers::fetch_streamer_message(
&data.fastnear_client,
cache_block.block_height,
)),
)
.await
{
Some(streamer_message) => Ok(streamer_message
.shards
.into_iter()
.flat_map(|shard| shard.state_changes)
.collect()),
None => Err(anyhow::anyhow!(
"Failed to fetch shards for block {}",
cache_block.block_height,
)),
}
}
}
Loading

0 comments on commit 5d5228f

Please sign in to comment.