Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get transaction and receipts from tx-indexer process cache #174

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ pub trait ReaderDbManager {
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
async fn get_indexed_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;

/// Returns the block height and shard id by the given block height
async fn get_block_by_height_and_shard_id(
Expand Down
13 changes: 13 additions & 0 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,19 @@ impl TransactionCache {

Ok(response.transaction_details)
}

pub async fn get_transaction_by_hash(
mut conn: crate::postgres::PgAsyncConn,
transaction_hash: &str,
) -> anyhow::Result<Vec<u8>> {
let response = transaction_cache::table
.filter(transaction_cache::transaction_hash.eq(transaction_hash))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response.transaction_details)
}
pub async fn get_transactions(
mut conn: crate::postgres::PgAsyncConn,
start_block_height: u64,
Expand Down
48 changes: 48 additions & 0 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ impl crate::ReaderDbManager for PostgresDBManager {
async fn get_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
if let Ok(transaction) = self.get_indexed_transaction_by_hash(transaction_hash).await {
Ok(transaction)
} else {
self.get_indexing_transaction_by_hash(transaction_hash)
.await
}
}

async fn get_indexed_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
let transaction_data = crate::models::TransactionDetail::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
Expand All @@ -286,6 +298,42 @@ impl crate::ReaderDbManager for PostgresDBManager {
)?)
}

async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
let data_value = crate::models::TransactionCache::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
)
.await?;
let mut transaction_details =
readnode_primitives::CollectingTransactionDetails::try_from_slice(&data_value)?;

let result = crate::models::ReceiptOutcome::get_receipt_outcome(
Self::get_connection(&self.pg_pool).await?,
transaction_details.block_height,
transaction_hash,
)
.await?;
for receipt_outcome in result {
let receipt =
near_primitives::views::ReceiptView::try_from_slice(&receipt_outcome.receipt)
.expect("Failed to deserialize receipt");
let execution_outcome =
near_primitives::views::ExecutionOutcomeWithIdView::try_from_slice(
&receipt_outcome.outcome,
)
.expect("Failed to deserialize execution outcome");
transaction_details.receipts.push(receipt);
transaction_details
.execution_outcomes
.push(execution_outcome)
}

Ok(transaction_details.into())
}

async fn get_block_by_height_and_shard_id(
&self,
block_height: near_primitives::types::BlockHeight,
Expand Down
81 changes: 80 additions & 1 deletion database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct ScyllaDBManager {
get_account_access_keys: PreparedStatement,
get_receipt: PreparedStatement,
get_transaction_by_hash: PreparedStatement,
get_indexing_transaction_by_hash: PreparedStatement,
get_indexing_transaction_receipts: PreparedStatement,
get_stored_at_block_height_and_shard_id_by_block_height: PreparedStatement,
get_validators_by_epoch_id: PreparedStatement,
get_validators_by_end_block_height: PreparedStatement,
Expand Down Expand Up @@ -49,6 +51,20 @@ impl crate::BaseDbManager for ScyllaDBManager {

#[async_trait::async_trait]
impl ScyllaStorageManager for ScyllaDBManager {
async fn create_tables(scylla_db_session: &scylla::Session) -> anyhow::Result<()> {
// Creating index in the tx_indexer_cache.transactions
// for faster search by transaction_hash to avoid ALLOW FILTERING
scylla_db_session
.query(
"
CREATE INDEX IF NOT EXISTS transaction_hash_key ON tx_indexer_cache.transactions (transaction_hash);
",
&[],
)
.await?;
Ok(())
}

async fn prepare(
scylla_db_session: std::sync::Arc<scylla::Session>,
) -> anyhow::Result<Box<Self>> {
Expand Down Expand Up @@ -111,7 +127,14 @@ impl ScyllaStorageManager for ScyllaDBManager {
&scylla_db_session,
"SELECT transaction_details FROM tx_indexer.transactions_details WHERE transaction_hash = ? LIMIT 1",
).await?,

get_indexing_transaction_by_hash: Self::prepare_read_query(
&scylla_db_session,
"SELECT transaction_details FROM tx_indexer_cache.transactions WHERE transaction_hash = ? LIMIT 1",
).await?,
get_indexing_transaction_receipts: Self::prepare_read_query(
&scylla_db_session,
"SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?"
).await?,
get_stored_at_block_height_and_shard_id_by_block_height: Self::prepare_read_query(
&scylla_db_session,
"SELECT stored_at_block_height, shard_id FROM state_indexer.chunks WHERE block_height = ?",
Expand Down Expand Up @@ -418,6 +441,20 @@ impl crate::ReaderDbManager for ScyllaDBManager {
async fn get_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
if let Ok(transaction) = self.get_indexed_transaction_by_hash(transaction_hash).await {
Ok(transaction)
} else {
self.get_indexing_transaction_by_hash(transaction_hash)
.await
}
}

/// Returns the readnode_primitives::TransactionDetails
/// from tx_indexer.transactions_details at the given transaction hash
async fn get_indexed_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
let (data_value,) = Self::execute_prepared_query(
&self.scylla_session,
Expand All @@ -433,6 +470,48 @@ impl crate::ReaderDbManager for ScyllaDBManager {
)?)
}

/// Returns the readnode_primitives::TransactionDetails
/// from tx_indexer_cache.transactions at the given transaction hash
async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
let (data_value,) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_indexing_transaction_by_hash,
(transaction_hash.to_string(),),
)
.await?
.single_row()?
.into_typed::<(Vec<u8>,)>()?;
let mut transaction_details =
readnode_primitives::CollectingTransactionDetails::try_from_slice(&data_value)?;

let mut rows_stream = self
.scylla_session
.execute_iter(
self.get_indexing_transaction_receipts.clone(),
(
num_bigint::BigInt::from(transaction_details.block_height),
transaction_hash.to_string(),
),
)
.await?
.into_typed::<(Vec<u8>, Vec<u8>)>();
while let Some(next_row_res) = rows_stream.next().await {
let (receipt, outcome) = next_row_res?;
let receipt = near_primitives::views::ReceiptView::try_from_slice(&receipt)?;
let execution_outcome =
near_primitives::views::ExecutionOutcomeWithIdView::try_from_slice(&outcome)?;
transaction_details.receipts.push(receipt);
transaction_details
.execution_outcomes
.push(execution_outcome);
}

Ok(transaction_details.into())
}

/// Returns the block height and shard id by the given block height
async fn get_block_by_height_and_shard_id(
&self,
Expand Down
38 changes: 32 additions & 6 deletions readnode-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ impl CollectingTransactionDetails {
TransactionKey::new(self.transaction.hash.clone().to_string(), self.block_height)
}

pub fn to_final_transaction_result(&self) -> anyhow::Result<TransactionDetails> {
let mut outcomes = self.execution_outcomes.clone();
pub fn final_status(&self) -> Option<views::FinalExecutionStatus> {
let mut looking_for_id = self.transaction.hash;
let num_outcomes = outcomes.len();
let finale_status = outcomes.iter().find_map(|outcome_with_id| {
let num_outcomes = self.execution_outcomes.len();
self.execution_outcomes.iter().find_map(|outcome_with_id| {
if outcome_with_id.id == looking_for_id {
match &outcome_with_id.outcome.status {
views::ExecutionStatusView::Unknown if num_outcomes == 1 => {
Expand All @@ -73,8 +72,12 @@ impl CollectingTransactionDetails {
} else {
None
}
});
match finale_status {
})
}

pub fn to_final_transaction_result(&self) -> anyhow::Result<TransactionDetails> {
let mut outcomes = self.execution_outcomes.clone();
match self.final_status() {
Some(status) => {
let receipts_outcome = outcomes.split_off(1);
let transaction_outcome = outcomes.pop().unwrap();
Expand All @@ -91,6 +94,29 @@ impl CollectingTransactionDetails {
}
}

impl From<CollectingTransactionDetails> for TransactionDetails {
fn from(tx: CollectingTransactionDetails) -> Self {
let mut outcomes = tx.execution_outcomes.clone();
let receipts_outcome = outcomes.split_off(1);
let transaction_outcome = outcomes.pop().unwrap();
// Execution status defined by nearcore/chain.rs:get_final_transaction_result
// FinalExecutionStatus::NotStarted - the tx is not converted to the receipt yet
// FinalExecutionStatus::Started - we have at least 1 receipt, but the first leaf receipt_id (using dfs) hasn't finished the execution
// FinalExecutionStatus::Failure - the result of the first leaf receipt_id
// FinalExecutionStatus::SuccessValue - the result of the first leaf receipt_id
let status = tx
.final_status()
.unwrap_or(views::FinalExecutionStatus::NotStarted);
Self {
receipts: tx.receipts,
receipts_outcome,
status,
transaction: tx.transaction,
transaction_outcome,
}
}
}

#[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Debug, Clone)]
pub struct TransactionDetails {
pub receipts: Vec<views::ReceiptView>,
Expand Down
22 changes: 8 additions & 14 deletions rpc-server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,19 @@ async fn handle_streamer_message(
blocks_cache: std::sync::Arc<
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
finale_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
final_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
near_rpc_client: &JsonRpcClient,
) -> anyhow::Result<()> {
let block: CacheBlock = streamer_message.block.into();

if finale_block_info.read().await.final_block_cache.epoch_id != block.epoch_id {
if final_block_info.read().await.final_block_cache.epoch_id != block.epoch_id {
tracing::info!("New epoch started: {:?}", block.epoch_id);
finale_block_info.write().await.current_protocol_config =
final_block_info.write().await.current_protocol_config =
get_current_protocol_config(near_rpc_client).await?;
finale_block_info.write().await.current_validators =
final_block_info.write().await.current_validators =
get_current_validators(near_rpc_client).await?;
}
finale_block_info.write().await.final_block_cache = block;
final_block_info.write().await.final_block_cache = block;
blocks_cache.write().await.put(block.block_height, block);
crate::metrics::FINAL_BLOCK_HEIGHT.set(i64::try_from(block.block_height)?);
Ok(())
Expand All @@ -154,28 +154,22 @@ pub async fn update_final_block_height_regularly(
blocks_cache: std::sync::Arc<
futures_locks::RwLock<crate::cache::LruMemoryCache<u64, CacheBlock>>,
>,
finale_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
final_block_info: std::sync::Arc<futures_locks::RwLock<FinalBlockInfo>>,
rpc_server_config: configuration::RpcServerConfig,
near_rpc_client: JsonRpcClient,
) -> anyhow::Result<()> {
tracing::info!("Task to get and store final block in the cache started");
let lake_config = rpc_server_config
.lake_config
.lake_config(
finale_block_info
.read()
.await
.final_block_cache
.block_height,
)
.lake_config(final_block_info.read().await.final_block_cache.block_height)
.await?;
let (sender, stream) = near_lake_framework::streamer(lake_config);
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| {
handle_streamer_message(
streamer_message,
std::sync::Arc::clone(&blocks_cache),
std::sync::Arc::clone(&finale_block_info),
std::sync::Arc::clone(&final_block_info),
&near_rpc_client,
)
})
Expand Down
1 change: 0 additions & 1 deletion tx-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ license.workspace = true
[dependencies]
actix-web = "4.2.1"
anyhow = "1.0.70"
async-trait = "0.1.66"
clap = { version = "3.2.22", features = ["color", "derive", "env"] }
futures = "0.3.5"
futures-locks = "0.7.1"
Expand Down
Loading
Loading