From 2b1ff0b33bbb58641fdee5bd79e419a3abf0c9cd Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Mon, 20 Jan 2025 06:47:52 +0800 Subject: [PATCH] refactor(rooch-da): refactor indexing logic in DA commands (#3199) Refactored data indexing and management, improving modularity and simplifying the DA execution flow. --- Cargo.lock | 395 +++++++++++++-- Cargo.toml | 1 + crates/rooch/Cargo.toml | 1 + crates/rooch/src/commands/da/commands/exec.rs | 72 +-- .../rooch/src/commands/da/commands/index.rs | 110 ++-- crates/rooch/src/commands/da/commands/mod.rs | 468 +++++++++++++----- crates/rooch/src/commands/da/mod.rs | 7 +- .../src/commands/db/commands/dump_tx_root.rs | 70 --- .../db/commands/get_changeset_by_order.rs | 7 +- .../db/commands/get_execution_info_by_hash.rs | 41 ++ .../commands/get_execution_info_by_order.rs | 58 --- crates/rooch/src/commands/db/commands/mod.rs | 3 +- crates/rooch/src/commands/db/mod.rs | 13 +- 13 files changed, 871 insertions(+), 375 deletions(-) delete mode 100644 crates/rooch/src/commands/db/commands/dump_tx_root.rs create mode 100644 crates/rooch/src/commands/db/commands/get_execution_info_by_hash.rs delete mode 100644 crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs diff --git a/Cargo.lock b/Cargo.lock index d40e002f66..c73b9b6c33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1127,7 +1127,7 @@ version = "0.69.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "cexpr", "clang-sys", "itertools 0.12.1", @@ -1350,9 +1350,12 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.5.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" +dependencies = [ + "serde 1.0.216", +] [[package]] name = "bitmaps" @@ -1760,9 +1763,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.21" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" +checksum = "c8293772165d9345bdaaa39b45b2109591e63fe5e6fbc23c6ff930a048aa310b" dependencies = [ "jobserver", "libc", @@ -3359,6 +3362,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 2.0.87", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -3374,6 +3388,15 @@ dependencies = [ "litrs", ] +[[package]] +name = "doxygen-rs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9" +dependencies = [ + "phf", +] + [[package]] name = "drain_filter_polyfill" version = "0.1.3" @@ -4892,7 +4915,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "libc", "libgit2-sys", "log", @@ -4935,7 +4958,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf760ebf69878d9fd8f110c89703d90ce35095324d1f1edcb595c63945ee757" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "ignore", "walkdir", ] @@ -5203,6 +5226,44 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "heed" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd54745cfacb7b97dee45e8fdb91814b62bccddb481debb7de0f9ee6b7bf5b43" +dependencies = [ + "bitflags 2.8.0", + "byteorder", + "heed-traits", + "heed-types", + "libc", + "lmdb-master-sys", + "once_cell", + "page_size", + "serde 1.0.216", + "synchronoise", + "url", +] + +[[package]] +name = "heed-traits" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" + +[[package]] +name = "heed-types" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d" +dependencies = [ + "bincode", + "byteorder", + "heed-traits", + "serde 1.0.216", + "serde_json", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -5556,6 +5617,124 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 2.0.87", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -5564,12 +5743,23 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.5.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", ] [[package]] @@ -6504,7 +6694,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "libc", ] @@ -6569,12 +6759,29 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" + [[package]] name = "litrs" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" +[[package]] +name = "lmdb-master-sys" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472c3760e2a8d0f61f322fb36788021bb36d573c502b50fa3e2bcaac3ec326c9" +dependencies = [ + "cc", + "doxygen-rs", + "libc", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -8227,7 +8434,7 @@ version = "0.10.66" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "cfg-if", "foreign-types", "libc", @@ -8349,6 +8556,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "pairing" version = "0.23.0" @@ -9202,7 +9419,7 @@ checksum = "14cae93065090804185d3b75f0bf93b8eeda30c7a9b4a33d3bdb3988d6229e50" dependencies = [ "bit-set 0.8.0", "bit-vec 0.8.0", - "bitflags 2.5.0", + "bitflags 2.8.0", "lazy_static 1.5.0", "num-traits 0.2.19", "rand 0.8.5", @@ -9587,7 +9804,7 @@ version = "11.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", ] [[package]] @@ -9664,7 +9881,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", ] [[package]] @@ -9917,7 +10134,7 @@ checksum = "b9bf5d465e64b697da6a111cb19e798b5b2ebb18e5faf2ad48e9e8d47c64add2" dependencies = [ "alloy-primitives", "auto_impl", - "bitflags 2.5.0", + "bitflags 2.8.0", "bitvec 1.0.1", "c-kzg", "cfg-if", @@ -10081,6 +10298,7 @@ dependencies = [ "framework-release", "framework-types", "hdrhistogram", + "heed", "hex", "itertools 0.13.0", "lazy_static 1.5.0", @@ -11061,7 +11279,7 @@ version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "errno", "libc", "linux-raw-sys", @@ -11459,7 +11677,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "core-foundation", "core-foundation-sys", "libc", @@ -11819,7 +12037,7 @@ dependencies = [ "arrayvec 0.7.4", "async-trait", "base64 0.22.1", - "bitflags 2.5.0", + "bitflags 2.8.0", "bytes", "dashmap 5.5.3", "flate2", @@ -12426,6 +12644,26 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synchronoise" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2" +dependencies = [ + "crossbeam-queue", +] + +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 2.0.87", +] + [[package]] name = "synthez" version = "0.3.1" @@ -12476,7 +12714,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "core-foundation", "system-configuration-sys 0.6.0", ] @@ -12854,6 +13092,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -13159,7 +13407,7 @@ checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "async-compression", "base64 0.21.7", - "bitflags 2.5.0", + "bitflags 2.8.0", "bytes", "futures-core", "futures-util", @@ -13505,12 +13753,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "unicode-bidi" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" - [[package]] name = "unicode-ident" version = "1.0.12" @@ -13598,9 +13840,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.0" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", @@ -13614,6 +13856,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.1" @@ -14104,7 +14358,7 @@ version = "0.121.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dbe55c8f9d0dbd25d9447a5a889ff90c0cc3feaa7395310d3d826b2c703eaab" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.8.0", "indexmap 2.7.0", "semver 1.0.23", ] @@ -14490,6 +14744,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "ws_stream_wasm" version = "0.7.4" @@ -14572,6 +14838,30 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" +[[package]] +name = "yoke" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +dependencies = [ + "serde 1.0.216", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 2.0.87", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.34" @@ -14592,6 +14882,27 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "zerofrom" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 2.0.87", + "synstructure", +] + [[package]] name = "zeroize" version = "1.7.0" @@ -14612,6 +14923,28 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 2.0.87", +] + [[package]] name = "zip" version = "0.6.6" diff --git a/Cargo.toml b/Cargo.toml index ca8c1facb4..29b8882751 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -184,6 +184,7 @@ futures = "0.3.31" futures-util = "0.3.31" hdrhistogram = "7.5.4" hex = "0.4.3" +heed = "0.21.0" itertools = "0.13.0" jsonrpsee = { version = "0.23.2", features = ["full"] } jpst = "0.1.1" diff --git a/crates/rooch/Cargo.toml b/crates/rooch/Cargo.toml index df4e118e06..65c5c46212 100644 --- a/crates/rooch/Cargo.toml +++ b/crates/rooch/Cargo.toml @@ -35,6 +35,7 @@ codespan-reporting = { workspace = true } termcolor = { workspace = true } itertools = { workspace = true } hdrhistogram = { workspace = true } +heed = { workspace = true } hex = { workspace = true } regex = { workspace = true } parking_lot = { workspace = true } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index 31d76700e6..f410b9a521 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::commands::da::commands::{ - build_rooch_db, LedgerTxGetter, SequencedTxStore, TxDAIndexer, + build_rooch_db, LedgerTxGetter, SequencedTxStore, TxMetaStore, }; use anyhow::Context; use bitcoin::hashes::Hash; @@ -39,11 +39,7 @@ use rooch_types::transaction::{ L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo, }; use std::cmp::{max, min}; -use std::collections::HashMap; -use std::fs::File; -use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; -use std::str::FromStr; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; @@ -65,15 +61,15 @@ pub struct ExecCommand { #[clap(long = "segment-dir")] pub segment_dir: PathBuf, #[clap( - long = "order-state-path", - help = "Path to tx_order:state_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests" + long = "tx-position", + help = "Path to tx_order:tx_hash:l2_block_number database directory" )] - pub order_state_path: PathBuf, + pub tx_position_path: PathBuf, #[clap( - long = "order-hash-path", - help = "Path to tx_order:tx_hash:l2_block_number file" + long = "exp-root", + help = "Path to tx_order:state_root:accumulator_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests" )] - pub order_hash_path: PathBuf, + pub exp_root_path: PathBuf, #[clap( long = "rollback", help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." @@ -197,10 +193,10 @@ impl ExecCommand { ) .await?; - let (order_state_pair, tx_order_end) = self.load_order_state_pair(); let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?; - let tx_da_indexer = TxDAIndexer::load_from_file( - self.order_hash_path.clone(), + let tx_da_indexer = TxMetaStore::new( + self.tx_position_path.clone(), + self.exp_root_path.clone(), moveos_store.transaction_store, rooch_db.rooch_store.clone(), )?; @@ -208,9 +204,7 @@ impl ExecCommand { mode: self.mode, force_align: self.force_align, ledger_tx_getter: ledger_tx_loader, - tx_da_indexer, - order_state_pair, - tx_order_end, + tx_meta_store: tx_da_indexer, sequenced_tx_store, bitcoin_client_proxy, executor, @@ -221,26 +215,6 @@ impl ExecCommand { rooch_db, }) } - - fn load_order_state_pair(&self) -> (HashMap, u64) { - let mut order_state_pair = HashMap::new(); - let mut tx_order_end = 0; - - let mut reader = BufReader::new(File::open(self.order_state_path.clone()).unwrap()); - // collect all `tx_order:state_root` pairs - for line in reader.by_ref().lines() { - let line = line.unwrap(); - let parts: Vec<&str> = line.split(':').collect(); - let tx_order = parts[0].parse::().unwrap(); - let state_root = H256::from_str(parts[1]).unwrap(); - let accumulator_root = H256::from_str(parts[2]).unwrap(); - order_state_pair.insert(tx_order, (state_root, accumulator_root)); - if tx_order > tx_order_end { - tx_order_end = tx_order; - } - } - (order_state_pair, tx_order_end) - } } struct ExecInner { @@ -248,9 +222,7 @@ struct ExecInner { force_align: bool, ledger_tx_getter: LedgerTxGetter, - tx_da_indexer: TxDAIndexer, - order_state_pair: HashMap, - tx_order_end: u64, + tx_meta_store: TxMetaStore, sequenced_tx_store: SequencedTxStore, @@ -389,8 +361,8 @@ impl ExecInner { } async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { - let last_executed_opt = self.tx_da_indexer.find_last_executed()?; - let last_executed_tx_order = match last_executed_opt.clone() { + let last_executed_opt = self.tx_meta_store.find_last_executed()?; + let last_executed_tx_order = match last_executed_opt { Some(v) => v.tx_order, None => 0, }; @@ -427,8 +399,8 @@ impl ExecInner { if let Some(rollback) = rollback_to { if rollback < last_partial_executed_tx_order { let new_last_and_rollback = self - .tx_da_indexer - .slice(rollback, last_partial_executed_tx_order)?; + .tx_meta_store + .get_tx_positions_in_range(rollback, last_partial_executed_tx_order)?; // split into two parts, the first get execution info for new startup, all others rollback let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); info!( @@ -448,9 +420,9 @@ impl ExecInner { })?; } let rollback_execution_info = - self.tx_da_indexer.get_execution_info(new_last.tx_hash)?; + self.tx_meta_store.get_execution_info(new_last.tx_hash)?; let rollback_sequencer_info = - self.tx_da_indexer.get_sequencer_info(new_last.tx_hash)?; + self.tx_meta_store.get_sequencer_info(new_last.tx_hash)?; self.update_startup_info_after_rollback( rollback_execution_info, rollback_sequencer_info, @@ -461,13 +433,12 @@ impl ExecInner { }; let mut next_block_number = last_executed_opt - .clone() .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block .unwrap_or(0); if !self.mode.need_exec() { next_tx_order = last_sequenced_tx + 1; - next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + next_block_number = self.tx_meta_store.find_tx_block(next_tx_order).unwrap(); } info!( "Start to produce transactions from tx_order: {}, check from block: {}", @@ -475,6 +446,7 @@ impl ExecInner { ); let mut produced_tx_order = 0; let mut reach_end = false; + let max_verified_tx_order = self.tx_meta_store.get_max_verified_tx_order(); loop { if reach_end { break; @@ -489,7 +461,7 @@ impl ExecInner { let tx_list = tx_list.unwrap(); for ledger_tx in tx_list { let tx_order = ledger_tx.sequence_info.tx_order; - if tx_order > self.tx_order_end { + if tx_order > max_verified_tx_order { reach_end = true; break; } @@ -583,7 +555,7 @@ impl ExecInner { let is_l2_tx = ledger_tx.data.is_l2_tx(); - let exp_root_opt = self.order_state_pair.get(&tx_order); + let exp_root_opt = self.tx_meta_store.get_exp_roots(tx_order); let exp_state_root = exp_root_opt.map(|v| v.0); let exp_accumulator_root = exp_root_opt.map(|v| v.1); diff --git a/crates/rooch/src/commands/da/commands/index.rs b/crates/rooch/src/commands/da/commands/index.rs index d787d43236..7e9c8d821e 100644 --- a/crates/rooch/src/commands/da/commands/index.rs +++ b/crates/rooch/src/commands/da/commands/index.rs @@ -1,59 +1,113 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::da::commands::{LedgerTxGetter, TxDAIndex}; -use rooch_types::error::{RoochError, RoochResult}; -use std::fs::File; -use std::io::BufWriter; -use std::io::Write; +use crate::commands::da::commands::{LedgerTxGetter, TxPosition, TxPositionIndexer}; +use anyhow::anyhow; +use std::cmp::max; use std::path::PathBuf; +use tracing::info; -/// Index tx_order:tx_hash:block_number to a file from segments +/// Index tx_order:tx_hash:block_number #[derive(Debug, clap::Parser)] pub struct IndexCommand { #[clap(long = "segment-dir", short = 's')] - pub segment_dir: PathBuf, + pub segment_dir: Option, #[clap(long = "index", short = 'i')] pub index_path: PathBuf, + #[clap( + long = "reset-from", + help = "Reset from tx order(inclusive), all tx orders after this will be re-indexed" + )] + pub reset_from: Option, + #[clap(long = "max-block-number", help = "Max block number to index")] + pub max_block_number: Option, + #[clap(long = "file", help = "Load/dump file-based index")] + pub index_file_path: Option, + #[clap(long = "dump", help = "Dump index to file")] + pub dump: bool, } impl IndexCommand { - pub fn execute(self) -> RoochResult<()> { - let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir)?; - let mut block_number = ledger_tx_loader.get_min_chunk_id(); - let mut expected_tx_order = 0; - let file = File::create(self.index_path.clone())?; - let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); + pub fn execute(self) -> anyhow::Result<()> { + if self.index_file_path.is_some() { + return TxPositionIndexer::load_or_dump( + self.index_path, + self.index_file_path.unwrap(), + self.dump, + ); + } + + let db_path = self.index_path.clone(); + let reset_from = self.reset_from; + let mut indexer = TxPositionIndexer::new(db_path, reset_from)?; + + info!("indexer stats after reset: {:?}", indexer.get_stats()?); + + let segment_dir = self.segment_dir.ok_or(anyhow!("segment-dir is required"))?; + let ledger_tx_loader = LedgerTxGetter::new(segment_dir)?; + let mut block_number = indexer.last_block_number; // avoiding partial indexing + let mut expected_tx_order = indexer.last_tx_order + 1; + let stop_at = if let Some(max_block_number) = self.max_block_number { + max(max_block_number, ledger_tx_loader.get_max_chunk_id()) + } else { + ledger_tx_loader.get_max_chunk_id() + }; + + let db = indexer.db; + let mut wtxn = indexer.db_env.write_txn()?; + + let mut done_block = 0; loop { - if block_number > ledger_tx_loader.get_max_chunk_id() { + if block_number > stop_at { break; } let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?; let tx_list = tx_list.unwrap(); for mut ledger_tx in tx_list { let tx_order = ledger_tx.sequence_info.tx_order; - let tx_hash = ledger_tx.tx_hash(); - if expected_tx_order == 0 { - expected_tx_order = tx_order; - } else if tx_order != expected_tx_order { - return Err(RoochError::from(anyhow::anyhow!( - "tx_order mismatch: expected {}, got {}", + if tx_order < expected_tx_order { + continue; + } + if tx_order == indexer.last_tx_order + 1 { + info!( + "begin to index block: {}, tx_order: {}", + block_number, tx_order + ); + } + if tx_order != expected_tx_order { + return Err(anyhow!( + "tx_order not continuous, expect: {}, got: {}", expected_tx_order, tx_order - ))); + )); } - writeln!( - writer, - "{}", - TxDAIndex::new(tx_order, tx_hash, block_number) - )?; + let tx_hash = ledger_tx.tx_hash(); + let tx_position = TxPosition { + tx_order, + tx_hash, + block_number, + }; + db.put(&mut wtxn, &tx_order, &tx_position)?; expected_tx_order += 1; } block_number += 1; + done_block += 1; + if done_block % 1000 == 0 { + wtxn.commit()?; + wtxn = indexer.db_env.write_txn()?; + info!( + "done: block_cnt: {}; next_block_number: {}", + done_block, block_number + ); + } } - writer.flush()?; - file.sync_data()?; + wtxn.commit()?; + + indexer.init_cursor()?; + info!("indexer stats after job: {:?}", indexer.get_stats()?); + indexer.close()?; + Ok(()) } } diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index 264767aa79..7e2a9ebdba 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -2,12 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 use accumulator::{Accumulator, MerkleAccumulator}; +use anyhow::anyhow; +use heed::byteorder::BigEndian; +use heed::types::{SerdeBincode, U64}; +use heed::{Database, Env, EnvOpenOptions}; use metrics::RegistryService; use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; use moveos_types::h256::H256; use moveos_types::moveos_std::object::ObjectMeta; use moveos_types::transaction::TransactionExecutionInfo; -use rooch_common::vec::find_last_true; use rooch_config::RoochOpt; use rooch_db::RoochDB; use rooch_store::RoochStore; @@ -16,11 +19,13 @@ use rooch_types::da::segment::{segment_from_bytes, SegmentID}; use rooch_types::rooch_network::RoochChainID; use rooch_types::sequencer::SequencerInfo; use rooch_types::transaction::{LedgerTransaction, TransactionSequenceInfo}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::fs::File; -use std::io::{BufRead, BufReader, Read}; +use std::io::{BufRead, BufReader, BufWriter, Read, Write}; use std::path::PathBuf; +use std::str::FromStr; use std::sync::atomic::AtomicU64; use tracing::info; @@ -247,149 +252,143 @@ impl LedgerTxGetter { } } -#[derive(Debug, Clone)] -pub struct TxDAIndex { - pub tx_order: u64, - pub tx_hash: H256, - pub block_number: u128, +pub(crate) struct TxMetaStore { + tx_position_indexer: TxPositionIndexer, + exp_roots: HashMap, // tx_order -> (state_root, accumulator_root) + max_verified_tx_order: u64, + transaction_store: TransactionDBStore, + rooch_store: RoochStore, } -impl TxDAIndex { - pub fn new(tx_order: u64, tx_hash: H256, block_number: u128) -> Self { - TxDAIndex { - tx_order, - tx_hash, - block_number, - } - } +struct ExpRootsMap { + exp_roots: HashMap, + max_verified_tx_order: u64, } -impl std::fmt::Display for TxDAIndex { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}:{:?}:{}", - self.tx_order, self.tx_hash, self.block_number - ) +impl TxMetaStore { + pub(crate) fn new( + tx_position_indexer_path: PathBuf, + exp_roots_path: PathBuf, + transaction_store: TransactionDBStore, + rooch_store: RoochStore, + ) -> anyhow::Result { + let tx_position_indexer = TxPositionIndexer::new(tx_position_indexer_path, None)?; + let exp_roots_map = Self::load_exp_roots(exp_roots_path)?; + Ok(TxMetaStore { + tx_position_indexer, + exp_roots: exp_roots_map.exp_roots, + max_verified_tx_order: exp_roots_map.max_verified_tx_order, + transaction_store, + rooch_store, + }) } -} -impl std::str::FromStr for TxDAIndex { - type Err = anyhow::Error; + fn load_exp_roots(exp_roots_path: PathBuf) -> anyhow::Result { + let mut exp_roots = HashMap::new(); + let mut max_verified_tx_order = 0; - fn from_str(s: &str) -> Result { - let parts: Vec<&str> = s.split(':').collect(); - if parts.len() != 3 { - return Err(anyhow::anyhow!("Invalid format")); + let mut reader = BufReader::new(File::open(exp_roots_path)?); + for line in reader.by_ref().lines() { + let line = line.unwrap(); + let parts: Vec<&str> = line.split(':').collect(); + let tx_order = parts[0].parse::()?; + let state_root = H256::from_str(parts[1])?; + let accumulator_root = H256::from_str(parts[2])?; + exp_roots.insert(tx_order, (state_root, accumulator_root)); + if tx_order > max_verified_tx_order { + max_verified_tx_order = tx_order; + } } - let tx_order = parts[0].parse::()?; - let tx_hash = H256::from_str(parts[1])?; - let block_number = parts[2].parse::()?; - Ok(TxDAIndex { - tx_order, - tx_hash, - block_number, + Ok(ExpRootsMap { + exp_roots, + max_verified_tx_order, }) } -} -/// TxOrderHashBlockGetter is used to get TxOrderHashBlock from a file -/// all tx_order_hash_blocks(start from tx_order 1) are stored in a file, -/// each line is a TxOrderHashBlock -pub struct TxDAIndexer { - tx_order_hash_blocks: Vec, - transaction_store: TransactionDBStore, - rooch_store: RoochStore, -} + pub(crate) fn get_exp_roots(&self, tx_order: u64) -> Option<(H256, H256)> { + self.exp_roots.get(&tx_order).cloned() + } -impl TxDAIndexer { - pub fn load_from_file( - file_path: PathBuf, - transaction_store: TransactionDBStore, - rooch_store: RoochStore, - ) -> anyhow::Result { - let mut tx_order_hashes = Vec::with_capacity(70000000); - let mut reader = BufReader::new(File::open(file_path)?); - for line in reader.by_ref().lines() { - let line = line?; - let item = line.parse::()?; - tx_order_hashes.push(item); - } - tx_order_hashes.sort_by(|a, b| a.tx_order.cmp(&b.tx_order)); // avoiding wrong order - info!( - "tx_order:tx_hash:block indexer loaded, tx cnt: {}", - tx_order_hashes.len() - ); - Ok(TxDAIndexer { - tx_order_hash_blocks: tx_order_hashes, - transaction_store, - rooch_store, - }) + pub(crate) fn get_max_verified_tx_order(&self) -> u64 { + self.max_verified_tx_order } - pub fn get_tx_hash(&self, tx_order: u64) -> Option { + pub(crate) fn get_tx_hash(&self, tx_order: u64) -> Option { let r = self - .tx_order_hash_blocks - .binary_search_by(|x| x.tx_order.cmp(&tx_order)); - let idx = match r { - Ok(i) => i, - Err(_) => { - return None; - } - }; - Some(self.tx_order_hash_blocks[idx].tx_hash) + .tx_position_indexer + .get_tx_position(tx_order) + .ok() + .flatten(); + r.map(|tx_position| tx_position.tx_hash) } - pub fn slice(&self, start_tx_order: u64, end_tx_order: u64) -> anyhow::Result> { - let r = self - .tx_order_hash_blocks - .binary_search_by(|x| x.tx_order.cmp(&start_tx_order)); - let start_idx = match r { - Ok(i) => i, - Err(_) => { - return Err(anyhow::anyhow!("start_tx_order not found")); - } - }; - let end_idx = start_idx + (end_tx_order - start_tx_order) as usize; - Ok(self.tx_order_hash_blocks[start_idx..end_idx + 1].to_vec()) + pub(crate) fn get_tx_positions_in_range( + &self, + start_tx_order: u64, + end_tx_order: u64, + ) -> anyhow::Result> { + self.tx_position_indexer + .get_tx_positions_in_range(start_tx_order, end_tx_order) } - pub fn find_last_executed(&self) -> anyhow::Result> { - let r = find_last_true(&self.tx_order_hash_blocks, |item| { - self.has_executed(item.tx_hash) - }); - Ok(r.cloned()) + pub(crate) fn find_last_executed(&self) -> anyhow::Result> { + let predicate = |tx_order: &u64| self.has_executed_by_tx_order(*tx_order); + let last_tx_order = self.tx_position_indexer.last_tx_order; + if last_tx_order == 0 { + // no tx indexed through DA segments + return Ok(None); + } + if !predicate(&1) { + return Ok(None); // first tx in DA segments is not executed + } + if predicate(&last_tx_order) { + return self.tx_position_indexer.get_tx_position(last_tx_order); // last tx is executed + } + + // binary search [1, self.tx_position_indexer.last_tx_order] + let mut left = 1; // first tx is executed, has checked + let mut right = last_tx_order; + + while left + 1 < right { + let mid = left + (right - left) / 2; + if predicate(&mid) { + left = mid; // mid is true, the final answer is mid or on the right + } else { + right = mid; // mid is false, the final answer is on the left + } + } + + // left is the last true position + self.tx_position_indexer.get_tx_position(left) } - pub fn find_tx_block(&self, tx_order: u64) -> Option { + pub(crate) fn find_tx_block(&self, tx_order: u64) -> Option { let r = self - .tx_order_hash_blocks - .binary_search_by(|x| x.tx_order.cmp(&tx_order)); - let idx = match r { - Ok(i) => i, - Err(_) => { - return None; - } - }; - Some(self.tx_order_hash_blocks[idx].block_number) + .tx_position_indexer + .get_tx_position(tx_order) + .ok() + .flatten(); + r.map(|tx_position| tx_position.block_number) + } + + fn has_executed_by_tx_order(&self, tx_order: u64) -> bool { + self.get_tx_hash(tx_order) + .map_or(false, |tx_hash| self.has_executed(tx_hash)) } fn has_executed(&self, tx_hash: H256) -> bool { - let execution_info = self - .transaction_store - .get_tx_execution_info(tx_hash) - .unwrap(); - execution_info.is_some() + self.get_execution_info(tx_hash) + .map_or(false, |info| info.is_some()) } - pub fn get_execution_info( + pub(crate) fn get_execution_info( &self, tx_hash: H256, ) -> anyhow::Result> { self.transaction_store.get_tx_execution_info(tx_hash) } - pub fn get_sequencer_info( + pub(crate) fn get_sequencer_info( &self, tx_hash: H256, ) -> anyhow::Result> { @@ -399,18 +398,245 @@ impl TxDAIndexer { .get_transaction_by_hash(tx_hash)? .map(|transaction| transaction.sequence_info)) } +} - pub fn get_execution_info_by_order( - &self, - tx_order: u64, - ) -> anyhow::Result> { - let tx_hash_option = self.get_tx_hash(tx_order); +const MAP_SIZE: usize = 1 << 34; // 16G +const MAX_DBS: u32 = 1; +const ORDER_DATABASE_NAME: &str = "order_db"; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +pub(crate) struct TxPosition { + pub(crate) tx_order: u64, + pub(crate) tx_hash: H256, + pub(crate) block_number: u128, +} + +pub(crate) struct TxPositionIndexer { + db_env: Env, + db: Database, SerdeBincode>, + last_tx_order: u64, + last_block_number: u128, +} + +#[derive(Debug, Serialize)] +pub(crate) struct TxPositionIndexerStats { + pub(crate) total_tx_count: u64, + pub(crate) last_tx_order: u64, + pub(crate) last_block_number: u128, +} - if let Some(tx_hash) = tx_hash_option { - let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; - Ok(execution_info) +impl TxPositionIndexer { + pub(crate) fn load_or_dump( + db_path: PathBuf, + file_path: PathBuf, + dump: bool, + ) -> anyhow::Result<()> { + if dump { + let indexer = TxPositionIndexer::new(db_path, None)?; + indexer.dump_to_file(file_path) } else { - Ok(None) + TxPositionIndexer::load_from_file(db_path, file_path) + } + } + + pub(crate) fn dump_to_file(&self, file_path: PathBuf) -> anyhow::Result<()> { + let db = self.db; + let file = std::fs::File::create(file_path)?; + let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); + let rtxn = self.db_env.read_txn()?; + let mut iter = db.iter(&rtxn)?; + while let Some((k, v)) = iter.next().transpose()? { + writeln!(writer, "{}:{:?}:{}", k, v.tx_hash, v.block_number)?; } + drop(iter); + rtxn.commit()?; + writer.flush().expect("Unable to flush writer"); + file.sync_data().expect("Unable to sync file"); + Ok(()) + } + + pub(crate) fn load_from_file(db_path: PathBuf, file_path: PathBuf) -> anyhow::Result<()> { + let mut last_tx_order = 0; + let mut last_tx_hash = H256::zero(); + let mut last_block_number = 0; + + let db_env = Self::create_env(db_path.clone())?; + let file = std::fs::File::open(file_path)?; + let reader = std::io::BufReader::new(file); + + let mut wtxn = db_env.write_txn()?; // Begin write_transaction early for create/put + + let mut is_verify = false; + let db: Database, SerdeBincode> = + match db_env.open_database(&wtxn, Some(ORDER_DATABASE_NAME)) { + Ok(Some(db)) => { + info!("Database already exists, verify mode"); + is_verify = true; + db + } + Ok(None) => db_env.create_database(&mut wtxn, Some(ORDER_DATABASE_NAME))?, + Err(e) => return Err(e.into()), // Proper error propagation + }; + wtxn.commit()?; + + let mut wtxn = db_env.write_txn()?; + + for line in reader.lines() { + let line = line?; + let parts: Vec<&str> = line.split(':').collect(); + if parts.len() != 3 { + return Err(anyhow!("invalid line: {}", line)); + } + let tx_order = parts[0].parse::()?; + let tx_hash = H256::from_str(parts[1])?; + let block_number = parts[2].parse::()?; + let tx_position = TxPosition { + tx_order, + tx_hash, + block_number, + }; + + if is_verify { + let rtxn = db_env.read_txn()?; + let ret = db.get(&rtxn, &tx_order)?; + let ret = ret.ok_or(anyhow!("tx_order not found: {}", tx_order))?; + rtxn.commit()?; + assert_eq!(ret, tx_position); + } else { + db.put(&mut wtxn, &tx_order, &tx_position)?; + } + + last_tx_order = tx_order; + last_tx_hash = tx_hash; + last_block_number = block_number; + } + + wtxn.commit()?; + + if last_tx_order != 0 { + let rtxn = db_env.read_txn()?; + let ret = db.last(&rtxn)?; + assert_eq!( + ret, + Some(( + last_tx_order, + TxPosition { + tx_order: last_tx_order, + tx_hash: last_tx_hash, + block_number: last_block_number, + } + )) + ); + } + + { + let rtxn = db_env.read_txn()?; + let final_count = db.iter(&rtxn)?.count(); + info!("Final record count: {}", final_count); + rtxn.commit()?; + } + + db_env.force_sync()?; + + Ok(()) + } + + pub(crate) fn new(db_path: PathBuf, reset_from: Option) -> anyhow::Result { + let db_env = Self::create_env(db_path)?; + let mut txn = db_env.write_txn()?; + let db: Database, SerdeBincode> = + db_env.create_database(&mut txn, Some(ORDER_DATABASE_NAME))?; + txn.commit()?; + + let mut indexer = TxPositionIndexer { + db_env, + db, + last_tx_order: 0, + last_block_number: 0, + }; + if let Some(from) = reset_from { + indexer.reset_from(from)?; + } + + indexer.init_cursor()?; + Ok(indexer) + } + + pub(crate) fn get_tx_position(&self, tx_order: u64) -> anyhow::Result> { + let rtxn = self.db_env.read_txn()?; + let db = self.db; + let ret = db.get(&rtxn, &tx_order)?; + rtxn.commit()?; + Ok(ret) + } + + pub(crate) fn get_tx_positions_in_range( + &self, + start: u64, + end: u64, + ) -> anyhow::Result> { + let rtxn = self.db_env.read_txn()?; + let db = self.db; + let mut tx_positions = Vec::new(); + let range = start..=end; + let mut iter = db.range(&rtxn, &range)?; + while let Some((_k, v)) = iter.next().transpose()? { + tx_positions.push(v); + } + drop(iter); + rtxn.commit()?; + Ok(tx_positions) + } + + fn create_env(db_path: PathBuf) -> anyhow::Result { + let env = unsafe { + EnvOpenOptions::new() + .map_size(MAP_SIZE) // 16G + .max_dbs(MAX_DBS) + .open(db_path)? + }; + Ok(env) + } + + // init cursor by search last tx_order + pub(crate) fn init_cursor(&mut self) -> anyhow::Result<()> { + let rtxn = self.db_env.read_txn()?; + let db = self.db; + if let Some((k, v)) = db.last(&rtxn)? { + self.last_tx_order = k; + self.last_block_number = v.block_number; + } + rtxn.commit()?; + Ok(()) + } + + fn reset_from(&self, from: u64) -> anyhow::Result<()> { + let mut wtxn = self.db_env.write_txn()?; + let db = self.db; + + let range = from..; + let deleted_count = db.delete_range(&mut wtxn, &range)?; + wtxn.commit()?; + info!("deleted {} records from tx_order: {}", deleted_count, from); + Ok(()) + } + + pub(crate) fn get_stats(&self) -> anyhow::Result { + let rtxn = self.db_env.read_txn()?; + let db = self.db; + let count = db.iter(&rtxn)?.count(); + rtxn.commit()?; + Ok(TxPositionIndexerStats { + total_tx_count: count as u64, + last_tx_order: self.last_tx_order, + last_block_number: self.last_block_number, + }) + } + + pub(crate) fn close(&self) -> anyhow::Result<()> { + let env = self.db_env.clone(); + env.force_sync()?; + drop(env); + Ok(()) } } diff --git a/crates/rooch/src/commands/da/mod.rs b/crates/rooch/src/commands/da/mod.rs index ef44bd9cbd..ff040ae6cd 100644 --- a/crates/rooch/src/commands/da/mod.rs +++ b/crates/rooch/src/commands/da/mod.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use clap::Parser; use rooch_types::error::RoochResult; -/// DB Commands +/// DA Commands #[derive(Parser)] pub struct DA { #[clap(subcommand)] @@ -26,7 +26,10 @@ impl CommandAction for DA { DACommand::Unpack(unpack) => unpack.execute().map(|_| "".to_owned()), DACommand::Namespace(namespace) => namespace.execute().map(|_| "".to_owned()), DACommand::Exec(exec) => exec.execute().await.map(|_| "".to_owned()), - DACommand::Index(index) => index.execute().map(|_| "".to_owned()), + DACommand::Index(index) => { + index.execute()?; + Ok("".to_owned()) + } } } } diff --git a/crates/rooch/src/commands/db/commands/dump_tx_root.rs b/crates/rooch/src/commands/db/commands/dump_tx_root.rs deleted file mode 100644 index c7c5794118..0000000000 --- a/crates/rooch/src/commands/db/commands/dump_tx_root.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use crate::commands::da::commands::TxDAIndexer; -use crate::commands::db::commands::init; -use clap::Parser; -use rooch_config::R_OPT_NET_HELP; -use rooch_types::error::RoochResult; -use rooch_types::rooch_network::RoochChainID; -use std::fs::File; -use std::io::{BufWriter, Write}; -use std::path::PathBuf; - -/// Get changeset by order -#[derive(Debug, Parser)] -pub struct DumpTxRootCommand { - #[clap(long, help = "start tx order")] - pub start: u64, - #[clap(long, help = "total tx count, [start, start+limit)")] - pub limit: u64, - #[clap( - long = "order-hash-path", - help = "Path to tx_order:tx_hash:block_number file" - )] - pub order_hash_path: PathBuf, - #[clap(long, help = "tx_order:state_root output file path")] - pub output: PathBuf, - - #[clap(long = "data-dir", short = 'd')] - /// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name - pub base_data_dir: Option, - - /// If local chainid, start the service with a temporary data store. - /// All data will be deleted when the service is stopped. - #[clap(long, short = 'n', help = R_OPT_NET_HELP)] - pub chain_id: Option, -} - -impl DumpTxRootCommand { - pub async fn execute(self) -> RoochResult<()> { - let (_root, rooch_db, _start_time) = init(self.base_data_dir, self.chain_id); - let moveos_store = rooch_db.moveos_store.clone(); - let tx_da_indexer = TxDAIndexer::load_from_file( - self.order_hash_path.clone(), - moveos_store.transaction_store, - rooch_db.rooch_store.clone(), - )?; - - let file = File::create(self.output.clone())?; - let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); - - for tx_order in self.start..self.start + self.limit { - let execution_info = tx_da_indexer.get_execution_info_by_order(tx_order)?; - if execution_info.is_none() { - tracing::warn!("tx_order {} execution_info not found", tx_order); - continue; - } - writeln!( - writer, - "{}:{:?}", - tx_order, - execution_info.unwrap().state_root - )?; - } - writer.flush()?; - file.sync_data()?; - - Ok(()) - } -} diff --git a/crates/rooch/src/commands/db/commands/get_changeset_by_order.rs b/crates/rooch/src/commands/db/commands/get_changeset_by_order.rs index dd7e60fc1e..474aa10a7f 100644 --- a/crates/rooch/src/commands/db/commands/get_changeset_by_order.rs +++ b/crates/rooch/src/commands/db/commands/get_changeset_by_order.rs @@ -3,6 +3,7 @@ use crate::commands::db::commands::init; use clap::Parser; +use moveos_types::state::StateChangeSetExt; use rooch_config::R_OPT_NET_HELP; use rooch_store::state_store::StateStore; use rooch_types::error::RoochResult; @@ -26,13 +27,11 @@ pub struct GetChangesetByOrderCommand { } impl GetChangesetByOrderCommand { - pub async fn execute(self) -> RoochResult<()> { + pub async fn execute(self) -> RoochResult> { let (_root, rooch_db, _start_time) = init(self.base_data_dir, self.chain_id); let rooch_store = rooch_db.rooch_store; let tx_order = self.order; let state_change_set_ext_opt = rooch_store.get_state_change_set(tx_order)?; - println!("{:?}", state_change_set_ext_opt); - - Ok(()) + Ok(state_change_set_ext_opt) } } diff --git a/crates/rooch/src/commands/db/commands/get_execution_info_by_hash.rs b/crates/rooch/src/commands/db/commands/get_execution_info_by_hash.rs new file mode 100644 index 0000000000..feb623c908 --- /dev/null +++ b/crates/rooch/src/commands/db/commands/get_execution_info_by_hash.rs @@ -0,0 +1,41 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::commands::db::commands::init; +use clap::Parser; +use moveos_store::transaction_store::TransactionStore; +use moveos_types::h256::H256; +use moveos_types::transaction::TransactionExecutionInfo; +use rooch_config::R_OPT_NET_HELP; +use rooch_types::error::RoochResult; +use rooch_types::rooch_network::RoochChainID; +use std::path::PathBuf; + +/// Get ExecutionInfo by tx_hash +#[derive(Debug, Parser)] +pub struct GetExecutionInfoByHashCommand { + /// Transaction's hash + #[clap(long)] + pub hash: H256, + + #[clap(long = "data-dir", short = 'd')] + /// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name + pub base_data_dir: Option, + + /// If local chainid, start the service with a temporary data store. + /// All data will be deleted when the service is stopped. + #[clap(long, short = 'n', help = R_OPT_NET_HELP)] + pub chain_id: Option, +} + +impl GetExecutionInfoByHashCommand { + pub fn execute(self) -> RoochResult> { + let (_root, rooch_db, _start_time) = init(self.base_data_dir, self.chain_id); + let moveos_store = rooch_db.moveos_store.clone(); + + let execution_info = moveos_store + .get_transaction_store() + .get_tx_execution_info(self.hash)?; + Ok(execution_info) + } +} diff --git a/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs b/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs deleted file mode 100644 index ccea833bd9..0000000000 --- a/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use crate::commands::da::commands::TxDAIndexer; -use crate::commands::db::commands::init; -use clap::Parser; -use rooch_config::R_OPT_NET_HELP; -use rooch_types::error::RoochResult; -use rooch_types::rooch_network::RoochChainID; -use std::path::PathBuf; - -/// Get ExecutionInfo by order -#[derive(Debug, Parser)] -pub struct GetExecutionInfoByOrderCommand { - #[clap(long)] - pub order: u64, - - #[clap( - long = "order-hash-path", - help = "Path to tx_order:tx_hash:block_number file" - )] - pub order_hash_path: PathBuf, - - #[clap(long = "data-dir", short = 'd')] - /// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name - pub base_data_dir: Option, - - /// If local chainid, start the service with a temporary data store. - /// All data will be deleted when the service is stopped. - #[clap(long, short = 'n', help = R_OPT_NET_HELP)] - pub chain_id: Option, -} - -impl GetExecutionInfoByOrderCommand { - pub fn execute(self) -> RoochResult<()> { - let (_root, rooch_db, _start_time) = init(self.base_data_dir, self.chain_id); - let moveos_store = rooch_db.moveos_store.clone(); - let tx_da_indexer = TxDAIndexer::load_from_file( - self.order_hash_path.clone(), - moveos_store.transaction_store, - rooch_db.rooch_store.clone(), - )?; - - let tx_order = self.order; - - let execution_info = tx_da_indexer.get_execution_info_by_order(tx_order)?; - match execution_info { - Some(_) => { - println!("{}:{:?}", tx_order, execution_info.unwrap()); - } - None => { - tracing::warn!("tx_order {} execution_info not found", tx_order); - } - } - - Ok(()) - } -} diff --git a/crates/rooch/src/commands/db/commands/mod.rs b/crates/rooch/src/commands/db/commands/mod.rs index 93ba8b78b9..e7e97939c0 100644 --- a/crates/rooch/src/commands/db/commands/mod.rs +++ b/crates/rooch/src/commands/db/commands/mod.rs @@ -13,9 +13,8 @@ use std::time::SystemTime; pub mod best_rollback; pub mod drop; -pub mod dump_tx_root; pub mod get_changeset_by_order; -pub mod get_execution_info_by_order; +pub mod get_execution_info_by_hash; pub mod repair; pub mod revert; pub mod rollback; diff --git a/crates/rooch/src/commands/db/mod.rs b/crates/rooch/src/commands/db/mod.rs index efd6c68251..1052301e48 100644 --- a/crates/rooch/src/commands/db/mod.rs +++ b/crates/rooch/src/commands/db/mod.rs @@ -4,9 +4,8 @@ use crate::cli_types::CommandAction; use crate::commands::db::commands::best_rollback::BestRollbackCommand; use crate::commands::db::commands::drop::DropCommand; -use crate::commands::db::commands::dump_tx_root::DumpTxRootCommand; use crate::commands::db::commands::get_changeset_by_order::GetChangesetByOrderCommand; -use crate::commands::db::commands::get_execution_info_by_order::GetExecutionInfoByOrderCommand; +use crate::commands::db::commands::get_execution_info_by_hash::GetExecutionInfoByHashCommand; use crate::commands::db::commands::repair::RepairCommand; use crate::commands::db::commands::revert::RevertCommand; use async_trait::async_trait; @@ -44,11 +43,8 @@ impl CommandAction for DB { serde_json::to_string_pretty(&resp).expect("Failed to serialize response") }) } - DBCommand::DumpTxRoot(dump_tx_root) => dump_tx_root.execute().await.map(|resp| { - serde_json::to_string_pretty(&resp).expect("Failed to serialize response") - }), - DBCommand::GetExecutionInfoByOrder(get_execution_info_by_order) => { - get_execution_info_by_order.execute().map(|resp| { + DBCommand::GetExecutionInfoByHash(get_execution_info_by_hash) => { + get_execution_info_by_hash.execute().map(|resp| { serde_json::to_string_pretty(&resp).expect("Failed to serialize response") }) } @@ -67,7 +63,6 @@ pub enum DBCommand { Drop(DropCommand), Repair(RepairCommand), GetChangesetByOrder(GetChangesetByOrderCommand), - DumpTxRoot(DumpTxRootCommand), - GetExecutionInfoByOrder(GetExecutionInfoByOrderCommand), + GetExecutionInfoByHash(GetExecutionInfoByHashCommand), BestRollback(BestRollbackCommand), }