Skip to content

Commit

Permalink
reffactoring and improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Oct 24, 2024
1 parent e10feb8 commit 83fe3c4
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 272 deletions.
212 changes: 107 additions & 105 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions configuration/src/configs/lake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ impl LakeConfig {
.start_block_height(start_block_height)
.build()?)
}

pub async fn lake_client(
&self,
chain_id: crate::ChainId,
) -> anyhow::Result<near_lake_framework::FastNearClient> {
let fast_near_endpoint = match chain_id {
crate::ChainId::Mainnet => String::from("https://mainnet.neardata.xyz"),
// Testnet is the default chain for other chain_id
_ => String::from("https://testnet.neardata.xyz"),
};
Ok(near_lake_framework::FastNearClient::new(fast_near_endpoint))
}
}

#[derive(Deserialize, Debug, Clone, Default)]
Expand Down
45 changes: 22 additions & 23 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::executor::block_on;
use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig};

use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock};
use crate::utils;

static NEARD_VERSION: &str = env!("CARGO_PKG_VERSION");
static NEARD_BUILD: &str = env!("BUILD_VERSION");
Expand All @@ -20,7 +21,7 @@ pub struct GenesisInfo {
impl GenesisInfo {
pub async fn get(
near_rpc_client: &crate::utils::JsonRpcClient,
fastnear_client: &near_lake_framework::fastnear_client::FastNearClient,
fastnear_client: &near_lake_framework::FastNearClient,
) -> Self {
tracing::info!("Get genesis config...");
let genesis_config = near_rpc_client
Expand All @@ -32,8 +33,7 @@ impl GenesisInfo {
.expect("Error to get genesis config");

let genesis_block =
near_lake_framework::providers::fastnear::fetchers::fetch_first_block(fastnear_client)
.await;
near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client).await;

Self {
genesis_config,
Expand All @@ -45,7 +45,7 @@ impl GenesisInfo {
#[derive(Clone)]
pub struct ServerContext {
/// Fastnear client
pub fastnear_client: near_lake_framework::fastnear_client::FastNearClient,
pub fastnear_client: near_lake_framework::FastNearClient,
/// Database manager
pub db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>,
/// TransactionDetails storage
Expand Down Expand Up @@ -82,37 +82,36 @@ pub struct ServerContext {
}

impl ServerContext {
pub async fn init(
rpc_server_config: configuration::RpcServerConfig,
near_rpc_client: crate::utils::JsonRpcClient,
) -> anyhow::Result<Self> {
pub async fn init(rpc_server_config: configuration::RpcServerConfig) -> anyhow::Result<Self> {
let contract_code_cache_size_in_bytes =
crate::utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size)
.await;
utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size).await;
let contract_code_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
contract_code_cache_size_in_bytes,
));

let block_cache_size_in_bytes =
crate::utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
let blocks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
block_cache_size_in_bytes,
));

let blocks_info_by_finality =
std::sync::Arc::new(BlocksInfoByFinality::new(&near_rpc_client, &blocks_cache).await);
let near_rpc_client = utils::JsonRpcClient::new(
rpc_server_config.general.near_rpc_url.clone(),
rpc_server_config.general.near_archival_rpc_url.clone(),
);
// We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance
let near_rpc_client = near_rpc_client.header(
"Referer".to_string(),
rpc_server_config.general.referer_header_value.clone(),
)?;

let fastnear_client = rpc_server_config
.lake_config
.lake_config(
blocks_info_by_finality
.optimistic_cache_block()
.await
.block_height,
rpc_server_config.general.chain_id.clone(),
)
.await?
.client();
.lake_client(rpc_server_config.general.chain_id.clone())
.await?;

let blocks_info_by_finality = std::sync::Arc::new(
BlocksInfoByFinality::new(&near_rpc_client, &fastnear_client).await,
);

let tx_details_storage = tx_details_storage::TxDetailsStorage::new(
rpc_server_config.tx_details_storage.storage_client().await,
Expand Down
50 changes: 13 additions & 37 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,43 +322,16 @@ async fn main() -> anyhow::Result<()> {
let rpc_server_config =
configuration::read_configuration::<configuration::RpcServerConfig>().await?;

let near_rpc_client = utils::JsonRpcClient::new(
rpc_server_config.general.near_rpc_url.clone(),
rpc_server_config.general.near_archival_rpc_url.clone(),
);
// We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance
let near_rpc_client = near_rpc_client.header(
"Referer".to_string(),
rpc_server_config.general.referer_header_value.clone(),
)?;

let server_port = rpc_server_config.general.server_port;

let server_context = actix_web::web::Data::new(
config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?,
);

// Update final block from fastnear
let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache);
let blocks_info_by_finality_clone =
std::sync::Arc::clone(&server_context.blocks_info_by_finality);
let fastnear_client = server_context.fastnear_client.clone();
tokio::spawn(async move {
utils::update_final_block_regularly(
blocks_cache_clone,
blocks_info_by_finality_clone,
fastnear_client,
near_rpc_client,
)
.await
});
let server_context =
actix_web::web::Data::new(config::ServerContext::init(rpc_server_config.clone()).await?);

// Update optimistic block from fastnear
let blocks_info_by_finality = std::sync::Arc::clone(&server_context.blocks_info_by_finality);
let fastnear_client = server_context.fastnear_client.clone();
tokio::spawn(async move {
utils::update_optimistic_block_regularly(blocks_info_by_finality, fastnear_client).await
});
utils::task_regularly_update_blocks_by_finality(
std::sync::Arc::clone(&server_context.blocks_info_by_finality),
std::sync::Arc::clone(&server_context.blocks_cache),
server_context.fastnear_client.clone(),
server_context.near_rpc_client.clone(),
)
.await;

actix_web::HttpServer::new(move || {
let cors = actix_cors::Cors::permissive();
Expand All @@ -371,7 +344,10 @@ async fn main() -> anyhow::Result<()> {
.service(metrics::get_metrics)
.service(health::get_health_status)
})
.bind(format!("0.0.0.0:{:0>5}", server_port))?
.bind(format!(
"0.0.0.0:{:0>5}",
rpc_server_config.general.server_port
))?
.run()
.await?;

Expand Down
4 changes: 2 additions & 2 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ pub async fn fetch_block(
{
data.blocks_info_by_finality.optimistic_block_view().await
} else {
near_lake_framework::providers::fastnear::fetchers::fetch_block_or_retry(
near_lake_framework::fastnear::fetchers::fetch_block_or_retry(
&data.fastnear_client,
block_height,
)
Expand Down Expand Up @@ -606,7 +606,7 @@ async fn fetch_shards_by_cache_block(
data: &Data<ServerContext>,
cache_block: crate::modules::blocks::CacheBlock,
) -> anyhow::Result<Vec<near_indexer_primitives::IndexerShard>> {
match near_lake_framework::providers::fastnear::fetchers::fetch_streamer_message(
match near_lake_framework::fastnear::fetchers::fetch_streamer_message(
&data.fastnear_client,
cache_block.block_height,
)
Expand Down
45 changes: 13 additions & 32 deletions rpc-server/src/modules/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ pub struct BlockInfo {
}

impl BlockInfo {
// Create new BlockInfo from BlockView. this method is useful only for start rpc-server.
pub async fn new_from_block_view(block_view: near_primitives::views::BlockView) -> Self {
Self {
block_cache: CacheBlock::from(&block_view),
block_view,
changes: vec![], // We left changes empty because block_view doesn't contain state changes.
}
}

// Create new BlockInfo from StreamerMessage.
// This is using to update final and optimistic blocks regularly.
pub async fn new_from_streamer_message(
Expand Down Expand Up @@ -294,35 +285,25 @@ pub struct BlocksInfoByFinality {
impl BlocksInfoByFinality {
pub async fn new(
near_rpc_client: &crate::utils::JsonRpcClient,
blocks_cache: &std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>,
fast_near_client: &near_lake_framework::FastNearClient,
) -> Self {
let final_block_future = crate::utils::get_final_block(near_rpc_client, false);
let optimistic_block_future = crate::utils::get_final_block(near_rpc_client, true);
let validators_future = crate::utils::get_current_validators(near_rpc_client);
let protocol_version_future = crate::utils::get_current_protocol_version(near_rpc_client);

let (final_block, optimistic_block, validators, protocol_version) = futures::try_join!(
final_block_future,
optimistic_block_future,
validators_future,
protocol_version_future
)
.map_err(|err| {
tracing::error!("Error to fetch final block info: {:?}", err);
err
})
.expect("Error to get final block info");

blocks_cache
.put(final_block.header.height, CacheBlock::from(&final_block))
.await;
let final_block =
near_lake_framework::fastnear::fetchers::fetch_last_block(fast_near_client).await;
let optimistic_block =
near_lake_framework::fastnear::fetchers::fetch_optimistic_block(fast_near_client).await;
let validators = crate::utils::get_current_validators(near_rpc_client)
.await
.expect("Failed to get current validators");
let protocol_version = crate::utils::get_current_protocol_version(near_rpc_client)
.await
.expect("Failed to get current protocol version");

Self {
final_block: futures_locks::RwLock::new(
BlockInfo::new_from_block_view(final_block).await,
BlockInfo::new_from_streamer_message(final_block).await,
),
optimistic_block: futures_locks::RwLock::new(
BlockInfo::new_from_block_view(optimistic_block).await,
BlockInfo::new_from_streamer_message(optimistic_block).await,
),
optimistic_changes: futures_locks::RwLock::new(OptimisticChanges::new()),
current_validators: futures_locks::RwLock::new(CurrentValidatorInfo { validators }),
Expand Down
4 changes: 2 additions & 2 deletions rpc-server/src/modules/blocks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn check_block_height(
tracing::instrument(skip(fastnear_client))
)]
pub async fn fetch_chunk_from_fastnear(
fastnear_client: &near_lake_framework::fastnear_client::FastNearClient,
fastnear_client: &near_lake_framework::FastNearClient,
block_height: near_primitives::types::BlockHeight,
shard_id: near_primitives::types::ShardId,
) -> Result<near_primitives::views::ChunkView, near_jsonrpc::primitives::types::chunks::RpcChunkError>
Expand All @@ -56,7 +56,7 @@ pub async fn fetch_chunk_from_fastnear(
block_height,
shard_id
);
match near_lake_framework::providers::fastnear::fetchers::fetch_shard_or_retry(
match near_lake_framework::fastnear::fetchers::fetch_shard_or_retry(
fastnear_client,
block_height,
shard_id,
Expand Down
Loading

0 comments on commit 83fe3c4

Please sign in to comment.