Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(state-indexer): Bug with indexing epoch validators info #160

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,20 @@ pub trait ReaderDbManager {
block_height: near_primitives::types::BlockHeight,
shard_id: near_primitives::types::ShardId,
) -> anyhow::Result<readnode_primitives::BlockHeightShardId>;

/// Returns epoch validators info by the given epoch id
async fn get_validators_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo>;

/// Return epoch validators info by the given epoch end block height
async fn get_validators_by_end_block_height(
&self,
block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo>;

/// Return protocol config by the given epoch id
async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
Expand Down
11 changes: 11 additions & 0 deletions database/src/base/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ pub trait StateIndexerDbManager {
)>,
) -> anyhow::Result<()>;

async fn get_block_by_hash(
&self,
block_hash: near_primitives::hash::CryptoHash,
) -> anyhow::Result<u64>;

async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()>;
async fn get_last_processed_block_height(&self, indexer_id: &str) -> anyhow::Result<u64>;
async fn add_validators(
Expand All @@ -121,4 +126,10 @@ pub trait StateIndexerDbManager {
epoch_start_height: u64,
protocol_config: &near_chain_configs::ProtocolConfigView,
) -> anyhow::Result<()>;

async fn update_epoch_end_height(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_end_block_hash: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<()>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS validators (
epoch_id text NOT NULL,
epoch_height numeric(20,0) NOT NULL,
epoch_start_height numeric(20,0) NOT NULL,
epoch_end_height numeric(20,0) NULL,
validators_info jsonb NOT NULL
);

Expand All @@ -13,6 +14,7 @@ CREATE TABLE IF NOT EXISTS protocol_configs (
epoch_id text NOT NULL,
epoch_height numeric(20,0) NOT NULL,
epoch_start_height numeric(20,0) NOT NULL,
epoch_end_height numeric(20,0) NULL,
protocol_config jsonb NOT NULL
);

Expand Down
40 changes: 40 additions & 0 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ pub struct Validators {
pub epoch_id: String,
pub epoch_height: bigdecimal::BigDecimal,
pub epoch_start_height: bigdecimal::BigDecimal,
pub epoch_end_height: Option<bigdecimal::BigDecimal>,
pub validators_info: serde_json::Value,
}

Expand All @@ -683,6 +684,19 @@ impl Validators {
Ok(())
}

pub async fn update_epoch_end_height(
mut conn: crate::postgres::PgAsyncConn,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_end_height: bigdecimal::BigDecimal,
) -> anyhow::Result<()> {
diesel::update(validators::table)
.filter(validators::epoch_id.eq(epoch_id.to_string()))
.set(validators::epoch_end_height.eq(epoch_end_height))
.execute(&mut conn)
.await?;
Ok(())
}

pub async fn get_validators(
mut conn: crate::postgres::PgAsyncConn,
epoch_id: near_indexer_primitives::CryptoHash,
Expand All @@ -695,6 +709,18 @@ impl Validators {

Ok(response)
}
pub async fn get_validators_epoch_end_height(
mut conn: crate::postgres::PgAsyncConn,
epoch_end_height: bigdecimal::BigDecimal,
) -> anyhow::Result<Self> {
let response = validators::table
.filter(validators::epoch_end_height.eq(epoch_end_height))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response)
}
}

#[derive(Insertable, Queryable, Selectable)]
Expand All @@ -703,6 +729,7 @@ pub struct ProtocolConfig {
pub epoch_id: String,
pub epoch_height: bigdecimal::BigDecimal,
pub epoch_start_height: bigdecimal::BigDecimal,
pub epoch_end_height: Option<bigdecimal::BigDecimal>,
pub protocol_config: serde_json::Value,
}

Expand All @@ -719,6 +746,19 @@ impl ProtocolConfig {
Ok(())
}

pub async fn update_epoch_end_height(
mut conn: crate::postgres::PgAsyncConn,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_end_height: bigdecimal::BigDecimal,
) -> anyhow::Result<()> {
diesel::update(protocol_configs::table)
.filter(protocol_configs::epoch_id.eq(epoch_id.to_string()))
.set(protocol_configs::epoch_end_height.eq(epoch_end_height))
.execute(&mut conn)
.await?;
Ok(())
}

pub async fn get_protocol_config(
mut conn: crate::postgres::PgAsyncConn,
epoch_id: near_indexer_primitives::CryptoHash,
Expand Down
37 changes: 35 additions & 2 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::postgres::PostgresStorageManager;
use crate::AdditionalDatabaseOptions;
use std::str::FromStr;

use bigdecimal::ToPrimitive;
use borsh::{BorshDeserialize, BorshSerialize};

use crate::postgres::PostgresStorageManager;
use crate::AdditionalDatabaseOptions;

pub struct PostgresDBManager {
pg_pool: crate::postgres::PgAsyncPool,
}
Expand Down Expand Up @@ -341,6 +344,36 @@ impl crate::ReaderDbManager for PostgresDBManager {
})
}

async fn get_validators_by_end_block_height(
&self,
block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo> {
let epoch = crate::models::Validators::get_validators_epoch_end_height(
Self::get_connection(&self.pg_pool).await?,
bigdecimal::BigDecimal::from(block_height),
)
.await?;
let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch.epoch_id)
.map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?;
let epoch_height = epoch
.epoch_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?;
let epoch_start_height = epoch
.epoch_start_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_start_height` to u64"))?;
let (validators_info,) = serde_json::from_value::<(
near_indexer_primitives::views::EpochValidatorInfo,
)>(epoch.validators_info)?;
Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height,
epoch_start_height,
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
Expand Down
2 changes: 2 additions & 0 deletions database/src/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ diesel::table! {
epoch_id -> Text,
epoch_height -> Numeric,
epoch_start_height -> Numeric,
epoch_end_height -> Nullable<Numeric>,
protocol_config -> Jsonb,
}
}
Expand Down Expand Up @@ -127,6 +128,7 @@ diesel::table! {
epoch_id -> Text,
epoch_height -> Numeric,
epoch_start_height -> Numeric,
epoch_end_height -> Nullable<Numeric>,
validators_info -> Jsonb,
}
}
Expand Down
37 changes: 37 additions & 0 deletions database/src/postgres/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,20 @@ impl crate::StateIndexerDbManager for PostgresDBManager {
.await
}

async fn get_block_by_hash(
&self,
block_hash: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<u64> {
let block_height = crate::models::Block::get_block_height_by_hash(
Self::get_connection(&self.pg_pool).await?,
block_hash,
)
.await?;
block_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `block_height` to u64"))
}

async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()> {
crate::models::Meta {
indexer_id: indexer_id.to_string(),
Expand Down Expand Up @@ -309,6 +323,7 @@ impl crate::StateIndexerDbManager for PostgresDBManager {
epoch_id: epoch_id.to_string(),
epoch_height: bigdecimal::BigDecimal::from(epoch_height),
epoch_start_height: bigdecimal::BigDecimal::from(epoch_start_height),
epoch_end_height: None,
validators_info: serde_json::to_value(validators_info)?,
}
.insert_or_ignore(Self::get_connection(&self.pg_pool).await?)
Expand All @@ -327,10 +342,32 @@ impl crate::StateIndexerDbManager for PostgresDBManager {
epoch_id: epoch_id.to_string(),
epoch_height: bigdecimal::BigDecimal::from(epoch_height),
epoch_start_height: bigdecimal::BigDecimal::from(epoch_start_height),
epoch_end_height: None,
protocol_config: serde_json::to_value(protocol_config)?,
}
.insert_or_ignore(Self::get_connection(&self.pg_pool).await?)
.await?;
Ok(())
}

async fn update_epoch_end_height(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_end_block_hash: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<()> {
let epoch_end_height = self.get_block_by_hash(epoch_end_block_hash).await?;

let validators_future = crate::models::Validators::update_epoch_end_height(
Self::get_connection(&self.pg_pool).await?,
epoch_id,
bigdecimal::BigDecimal::from(epoch_end_height),
);
let protocol_config_future = crate::models::ProtocolConfig::update_epoch_end_height(
Self::get_connection(&self.pg_pool).await?,
epoch_id,
bigdecimal::BigDecimal::from(epoch_end_height),
);
futures::future::try_join(validators_future, protocol_config_future).await?;
Ok(())
}
}
41 changes: 39 additions & 2 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::scylladb::ScyllaStorageManager;
use std::convert::TryFrom;
use std::str::FromStr;

use borsh::{BorshDeserialize, BorshSerialize};
use futures::StreamExt;
use num_traits::ToPrimitive;
use scylla::{prepared_statement::PreparedStatement, IntoTypedRows};
use std::convert::TryFrom;

use crate::scylladb::ScyllaStorageManager;

pub struct ScyllaDBManager {
scylla_session: std::sync::Arc<scylla::Session>,
Expand All @@ -21,6 +24,7 @@ pub struct ScyllaDBManager {
get_transaction_by_hash: PreparedStatement,
get_stored_at_block_height_and_shard_id_by_block_height: PreparedStatement,
get_validators_by_epoch_id: PreparedStatement,
get_validators_by_end_block_height: PreparedStatement,
get_protocol_config_by_epoch_id: PreparedStatement,
}

Expand Down Expand Up @@ -121,6 +125,10 @@ impl ScyllaStorageManager for ScyllaDBManager {
&scylla_db_session,
"SELECT epoch_height, validators_info FROM state_indexer.validators WHERE epoch_id = ?",
).await?,
get_validators_by_end_block_height: Self::prepare_read_query(
&scylla_db_session,
"SELECT epoch_id, epoch_height, validators_info FROM state_indexer.validators WHERE epoch_end_height = ?",
).await?,
get_protocol_config_by_epoch_id: Self::prepare_read_query(
&scylla_db_session,
"SELECT protocol_config FROM state_indexer.protocol_configs WHERE epoch_id = ?",
Expand Down Expand Up @@ -486,6 +494,35 @@ impl crate::ReaderDbManager for ScyllaDBManager {
})
}

async fn get_validators_by_end_block_height(
&self,
block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo> {
let (epoch_id, epoch_height, validators_info) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_validators_by_end_block_height,
(num_bigint::BigInt::from(block_height),),
)
.await?
.single_row()?
.into_typed::<(String, num_bigint::BigInt, String)>()?;

let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch_id)
.map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?;

let validators_info: near_primitives::views::EpochValidatorInfo =
serde_json::from_str(&validators_info)?;

Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height: epoch_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?,
epoch_start_height: validators_info.epoch_start_height,
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
Expand Down
Loading
Loading