Skip to content

Commit

Permalink
create index
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Feb 22, 2024
1 parent b078ef3 commit 2c87457
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
14 changes: 14 additions & 0 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ impl crate::BaseDbManager for ScyllaDBManager {

#[async_trait::async_trait]
impl ScyllaStorageManager for ScyllaDBManager {
async fn create_tables(scylla_db_session: &scylla::Session) -> anyhow::Result<()> {
// Creating index in the tx_indexer_cache.transactions
// for faster search by transaction_hash to avoid ALLOW FILTERING
scylla_db_session
.query(
"
CREATE INDEX IF NOT EXISTS transaction_hash_key ON tx_indexer_cache.transactions (transaction_hash);
",
&[],
)
.await?;
Ok(())
}

async fn prepare(
scylla_db_session: std::sync::Arc<scylla::Session>,
) -> anyhow::Result<Box<Self>> {
Expand Down
6 changes: 3 additions & 3 deletions readnode-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl CollectingTransactionDetails {
TransactionKey::new(self.transaction.hash.clone().to_string(), self.block_height)
}

pub fn finale_status(&self) -> Option<views::FinalExecutionStatus> {
pub fn final_status(&self) -> Option<views::FinalExecutionStatus> {
let mut looking_for_id = self.transaction.hash;
let num_outcomes = self.execution_outcomes.len();
self.execution_outcomes.iter().find_map(|outcome_with_id| {
Expand Down Expand Up @@ -77,7 +77,7 @@ impl CollectingTransactionDetails {

pub fn to_final_transaction_result(&self) -> anyhow::Result<TransactionDetails> {
let mut outcomes = self.execution_outcomes.clone();
match self.finale_status() {
match self.final_status() {
Some(status) => {
let receipts_outcome = outcomes.split_off(1);
let transaction_outcome = outcomes.pop().unwrap();
Expand Down Expand Up @@ -105,7 +105,7 @@ impl From<CollectingTransactionDetails> for TransactionDetails {
// FinalExecutionStatus::Failure - the result of the first leaf receipt_id
// FinalExecutionStatus::SuccessValue - the result of the first leaf receipt_id
let status = tx
.finale_status()
.final_status()
.unwrap_or(views::FinalExecutionStatus::NotStarted);
Self {
receipts: tx.receipts,
Expand Down
22 changes: 8 additions & 14 deletions rpc-server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,19 @@ async fn handle_streamer_message(
blocks_cache: std::sync::Arc<
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
finale_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
final_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
near_rpc_client: &JsonRpcClient,
) -> anyhow::Result<()> {
let block: CacheBlock = streamer_message.block.into();

if finale_block_info.read().await.final_block_cache.epoch_id != block.epoch_id {
if final_block_info.read().await.final_block_cache.epoch_id != block.epoch_id {
tracing::info!("New epoch started: {:?}", block.epoch_id);
finale_block_info.write().await.current_protocol_config =
final_block_info.write().await.current_protocol_config =
get_current_protocol_config(near_rpc_client).await?;
finale_block_info.write().await.current_validators =
final_block_info.write().await.current_validators =
get_current_validators(near_rpc_client).await?;
}
finale_block_info.write().await.final_block_cache = block;
final_block_info.write().await.final_block_cache = block;
blocks_cache.write().await.put(block.block_height, block);
crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?);
Ok(())
Expand All @@ -154,28 +154,22 @@ pub async fn update_final_block_height_regularly(
blocks_cache: std::sync::Arc<
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
finale_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
final_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
rpc_server_config: configuration::RpcServerConfig,
near_rpc_client: JsonRpcClient,
) -> anyhow::Result<()> {
tracing::info!("Task to get and store final block in the cache started");
let lake_config = rpc_server_config
.lake_config
.lake_config(
finale_block_info
.read()
.await
.final_block_cache
.block_height,
)
.lake_config(final_block_info.read().await.final_block_cache.block_height)
.await?;
let (sender, stream) = near_lake_framework::streamer(lake_config);
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| {
handle_streamer_message(
streamer_message,
std::sync::Arc::clone(&blocks_cache),
std::sync::Arc::clone(&finale_block_info),
std::sync::Arc::clone(&final_block_info),
&near_rpc_client,
)
})
Expand Down

0 comments on commit 2c87457

Please sign in to comment.