From 0972be47280d73976464edb1095dcb6b97221b69 Mon Sep 17 00:00:00 2001 From: Bai Chuan Date: Fri, 14 Jun 2024 01:41:30 +0800 Subject: [PATCH] remove filed state for rebuild indexer cli (#1878) --- .../rooch-indexer/src/actor/reader_indexer.rs | 2 +- crates/rooch-indexer/src/indexer_reader.rs | 29 +++++++++++++++++-- .../rooch-indexer/src/store/sqlite_store.rs | 8 ++--- .../rooch-indexer/src/tests/test_indexer.rs | 2 +- .../src/commands/indexer/commands/rebuild.rs | 19 ++++++++---- crates/rooch/src/commands/statedb/README.md | 2 +- .../src/commands/statedb/commands/export.rs | 13 ++++++--- 7 files changed, 57 insertions(+), 18 deletions(-) diff --git a/crates/rooch-indexer/src/actor/reader_indexer.rs b/crates/rooch-indexer/src/actor/reader_indexer.rs index 5b54a0d880..b404311f8b 100644 --- a/crates/rooch-indexer/src/actor/reader_indexer.rs +++ b/crates/rooch-indexer/src/actor/reader_indexer.rs @@ -77,6 +77,6 @@ impl Handler for IndexerReaderActor { } = msg; self.indexer_reader .query_object_states_with_filter(filter, cursor, limit, descending_order) - .map_err(|e| anyhow!(format!("Failed to query indexer global states: {:?}", e))) + .map_err(|e| anyhow!(format!("Failed to query indexer object states: {:?}", e))) } } diff --git a/crates/rooch-indexer/src/indexer_reader.rs b/crates/rooch-indexer/src/indexer_reader.rs index 0b7ce80317..8b604bfe19 100644 --- a/crates/rooch-indexer/src/indexer_reader.rs +++ b/crates/rooch-indexer/src/indexer_reader.rs @@ -405,7 +405,7 @@ impl IndexerReader { main_where_clause, cursor_clause, order_clause, limit, ); - tracing::debug!("query global states: {}", query); + tracing::debug!("query object states: {}", query); let stored_states = self .get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)? .run_query(|conn| diesel::sql_query(query).load::(conn))?; @@ -415,11 +415,36 @@ impl IndexerReader { .map(|v| v.try_into_indexer_global_state()) .collect::>>() .map_err(|e| { - IndexerError::SQLiteReadError(format!("Cast indexer global states failed: {:?}", e)) + IndexerError::SQLiteReadError(format!("Cast indexer object states failed: {:?}", e)) })?; Ok(result) } + + pub fn query_last_state_index_by_tx_order(&self, tx_order: u64) -> IndexerResult { + let where_clause = format!("{TX_ORDER_STR} = \"{}\"", tx_order as i64); + let order_clause = format!("{TX_ORDER_STR} DESC, {STATE_INDEX_STR} DESC"); + let query = format!( + " + SELECT * FROM object_states \ + WHERE {} \ + ORDER BY {} \ + LIMIT 1 + ", + where_clause, order_clause, + ); + + tracing::debug!("query last state index by tx order: {}", query); + let stored_states = self + .get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)? + .run_query(|conn| diesel::sql_query(query).load::(conn))?; + let last_state_index = if stored_states.is_empty() { + 0 + } else { + stored_states[0].state_index as u64 + 1 + }; + Ok(last_state_index) + } } fn object_type_query(object_type: &StructTag) -> String { diff --git a/crates/rooch-indexer/src/store/sqlite_store.rs b/crates/rooch-indexer/src/store/sqlite_store.rs index a3b397ac26..2c9c08f2fe 100644 --- a/crates/rooch-indexer/src/store/sqlite_store.rs +++ b/crates/rooch-indexer/src/store/sqlite_store.rs @@ -91,16 +91,16 @@ impl SqliteIndexerStore { // )) // .execute(&mut connection) // .map_err(|e| IndexerError::SQLiteWriteError(e.to_string())) - // .context("Failed to write or update global states to SQLiteDB"); + // .context("Failed to write or update object states to SQLiteDB"); // Execute the raw SQL query diesel::sql_query(query.clone()) .execute(&mut connection) .map_err(|e| { - log::error!("Upsert global states Executing Query error: {}", query); + log::error!("Upsert object states Executing Query error: {}", query); IndexerError::SQLiteWriteError(e.to_string()) }) - .context("Failed to write or update global states to SQLiteDB")?; + .context("Failed to write or update object states to SQLiteDB")?; Ok(()) } @@ -117,7 +117,7 @@ impl SqliteIndexerStore { ) .execute(&mut connection) .map_err(|e| IndexerError::SQLiteWriteError(e.to_string())) - .context("Failed to delete global states to SQLiteDB")?; + .context("Failed to delete object states to SQLiteDB")?; Ok(()) } diff --git a/crates/rooch-indexer/src/tests/test_indexer.rs b/crates/rooch-indexer/src/tests/test_indexer.rs index 2ba8bce7e1..811266679a 100644 --- a/crates/rooch-indexer/src/tests/test_indexer.rs +++ b/crates/rooch-indexer/src/tests/test_indexer.rs @@ -166,7 +166,7 @@ fn test_state_store() -> Result<()> { let mut update_object_states = random_update_object_states(new_object_states.clone()); let remove_object_states = random_remove_object_states(); - //Merge new global states and update global states + //Merge new object states and update object states new_object_states.append(&mut update_object_states); indexer_store.persist_or_update_object_states(new_object_states.clone())?; indexer_store.delete_object_states(remove_object_states)?; diff --git a/crates/rooch/src/commands/indexer/commands/rebuild.rs b/crates/rooch/src/commands/indexer/commands/rebuild.rs index 6e302a2d1a..18039ee4ec 100644 --- a/crates/rooch/src/commands/indexer/commands/rebuild.rs +++ b/crates/rooch/src/commands/indexer/commands/rebuild.rs @@ -57,11 +57,11 @@ impl RebuildCommand { pub async fn execute(self) -> RoochResult<()> { let input_path = self.input.clone(); let batch_size = self.batch_size.unwrap(); - let (indexer_store, _indexer_reader, start_time) = self.init(); + let (indexer_store, indexer_reader, start_time) = self.init(); let (tx, rx) = mpsc::sync_channel(2); let produce_updates_thread = - thread::spawn(move || produce_updates(tx, input_path, batch_size)); + thread::spawn(move || produce_updates(tx, indexer_reader, input_path, batch_size)); let apply_updates_thread = thread::spawn(move || apply_updates(rx, indexer_store, start_time)); let _ = produce_updates_thread @@ -94,13 +94,22 @@ struct BatchUpdates { object_states: Vec, } -fn produce_updates(tx: SyncSender, input: PathBuf, batch_size: usize) -> Result<()> { +fn produce_updates( + tx: SyncSender, + indexer_reader: IndexerReader, + input: PathBuf, + batch_size: usize, +) -> Result<()> { let mut csv_reader = BufReader::new(File::open(input).unwrap()); let mut last_state_type = None; - // set genesis tx_order and state_index_generator + // set genesis tx_order and state_index_generator for indexer rebuild let tx_order: u64 = 0; - let mut state_index_generator: u64 = 0; + let mut state_index_generator = indexer_reader.query_last_state_index_by_tx_order(tx_order)?; + println!( + "Indexer rebuild produce_updates state_index_generator start from: {}", + state_index_generator + ); loop { let mut updates = BatchUpdates { diff --git a/crates/rooch/src/commands/statedb/README.md b/crates/rooch/src/commands/statedb/README.md index fa02dc6ab1..67d917dd68 100644 --- a/crates/rooch/src/commands/statedb/README.md +++ b/crates/rooch/src/commands/statedb/README.md @@ -43,7 +43,7 @@ rooch statedb export --output {your file} -d {your rooch data dir} -n main -m {e 3. rooch statedb import ```shell -rooch statedb statedb --input {your file} -d {your rooch data dir} -n main +rooch statedb import --input {your file} -d {your rooch data dir} -n main ``` ### Config diff --git a/crates/rooch/src/commands/statedb/commands/export.rs b/crates/rooch/src/commands/statedb/commands/export.rs index cd16c5ef64..5fd3e4da78 100644 --- a/crates/rooch/src/commands/statedb/commands/export.rs +++ b/crates/rooch/src/commands/statedb/commands/export.rs @@ -248,6 +248,7 @@ impl ExportCommand { root_state_root, obj.id, false, + true, writer, )?; } @@ -295,6 +296,7 @@ impl ExportCommand { root_state_root, object_id.clone(), false, + true, writer, )?; @@ -322,8 +324,9 @@ impl ExportCommand { state_root: H256, parent_state_root: H256, object_id: ObjectID, - // export child object as object state under indexer mode - is_child_object_as_object_state: bool, + // export child field as object state under indexer mode + is_child_field_as_object_state: bool, + is_recursive_export_child_field: bool, writer: &mut Writer, ) -> Result<()> { let starting_key = None; @@ -333,7 +336,7 @@ impl ExportCommand { .get_state_store() .iter(state_root, starting_key.clone())?; - if object_id.has_child() { + if is_recursive_export_child_field && object_id.has_child() { for item in iter { let (_k, v) = item?; if v.is_object() { @@ -345,6 +348,7 @@ impl ExportCommand { state_root, object.id, false, + false, writer, )?; } @@ -359,7 +363,7 @@ impl ExportCommand { // write csv header. { - let state_type = if is_child_object_as_object_state { + let state_type = if is_child_field_as_object_state { GLOBAL_STATE_TYPE_OBJECT } else { GLOBAL_STATE_TYPE_FIELD @@ -420,6 +424,7 @@ impl ExportCommand { root_state_root, obj.id, true, + false, writer, )?; }