diff --git a/Cargo.lock b/Cargo.lock index 6211f37ef07..9f78990128c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,6 +535,21 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + [[package]] name = "async-io" version = "2.4.0" @@ -613,6 +628,55 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-std" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io", + "async-lock", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers 0.3.0", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "async-task" version = "4.7.1" @@ -694,6 +758,53 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.2", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -1895,6 +2006,34 @@ dependencies = [ "utilities", ] +[[package]] +name = "chainflip-state-observer" +version = "0.1.0" +dependencies = [ + "bitvec", + "chainflip-engine", + "custom-rpc", + "futures", + "futures-core", + "futures-util", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-stdout", + "opentelemetry_sdk", + "pallet-cf-elections", + "pallet-cf-validator", + "parity-scale-codec", + "serde", + "state-chain-runtime", + "static_str_ops", + "tokio", + "tracing", + "tracing-core", + "tracing-opentelemetry", + "tracing-subscriber 0.3.19", + "utilities", +] + [[package]] name = "chrono" version = "0.4.39" @@ -2789,7 +2928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1145d32e826a7748b69ee8fc62d3e6355ff7f1051df53141e7048162fc90481b" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.96", ] [[package]] @@ -3445,7 +3584,7 @@ dependencies = [ "sha2 0.10.8", "sha3", "thiserror 1.0.69", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -4460,7 +4599,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ - "gloo-timers", + "gloo-timers 0.2.6", "send_wrapper 0.4.0", ] @@ -4541,6 +4680,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "gensym" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913dce4c5f06c2ea40fc178c06f777ac89fc6b1383e90c254fafb1abe4ba3c82" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", + "uuid 1.13.1", +] + [[package]] name = "gethostname" version = "0.2.3" @@ -4573,6 +4724,18 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "getrandom" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.13.3+wasi-0.2.2", + "windows-targets 0.52.6", +] + [[package]] name = "getrandom_or_panic" version = "0.0.3" @@ -4659,6 +4822,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "gloo-utils" version = "0.2.0" @@ -5150,6 +5325,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.5.2", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -5856,7 +6044,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -5881,7 +6069,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -5923,7 +6111,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tracing", ] @@ -6021,6 +6209,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c33070833c9ee02266356de0c43f723152bd38bd96ddf52c82b3af10c9138b28" +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "kvdb" version = "0.13.0" @@ -6778,6 +6975,9 @@ name = "log" version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -6903,6 +7103,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matrixmultiply" version = "0.3.9" @@ -7820,12 +8026,105 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 1.0.69", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" +dependencies = [ + "async-trait", + "futures-core", + "http 1.2.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.13.4", + "thiserror 1.0.69", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.4", + "tonic", +] + +[[package]] +name = "opentelemetry-stdout" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc8a298402aa5c260be90d10dc54b5a7d4e1025c354848f8e2c976d761351049" +dependencies = [ + "async-trait", + "chrono", + "futures-util", + "opentelemetry", + "opentelemetry_sdk", + "ordered-float", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +dependencies = [ + "async-std", + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -9259,6 +9558,16 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" +dependencies = [ + "bytes", + "prost-derive 0.13.4", +] + [[package]] name = "prost-build" version = "0.11.9" @@ -9328,6 +9637,19 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "prost-derive" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -9865,7 +10187,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration 0.5.1", "tokio", "tokio-native-tls", @@ -11049,7 +11371,7 @@ dependencies = [ "serde_json", "substrate-prometheus-endpoint", "tokio", - "tower", + "tower 0.4.13", "tower-http", ] @@ -13377,6 +13699,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "static_str_ops" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eef2389280bc4d03836302f127e5e784c63cd183e24fe35cf73dc7f3b13a47d4" +dependencies = [ + "gensym", + "lazy_static", +] + [[package]] name = "str0m" version = "0.5.1" @@ -13752,6 +14084,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.12.6" @@ -14211,6 +14549,36 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.7", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.2", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.4", + "socket2 0.5.8", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -14219,13 +14587,32 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.5.2" @@ -14321,6 +14708,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log 0.2.0", + "tracing-subscriber 0.3.19", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -14813,12 +15218,27 @@ dependencies = [ "serde", ] +[[package]] +name = "uuid" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced87ca4be083373936a67f8de945faa23b6b42384bd5b64434850802c6dccd0" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "valuable" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "value-bag" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" + [[package]] name = "vcpkg" version = "0.2.15" @@ -14930,6 +15350,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -15320,6 +15749,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web3" version = "0.19.0" @@ -15738,6 +16177,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 8c0f9f18e6f..bf8398131ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ members = [ "api/bin/chainflip-broker-api", "api/bin/chainflip-cli", "api/bin/chainflip-ingress-egress-tracker", - "api/bin/chainflip-lp-api", + "api/bin/chainflip-lp-api", "api/bin/chainflip-state-observer", "api/lib", "engine", "engine-proc-macros", diff --git a/api/bin/chainflip-state-observer/Cargo.toml b/api/bin/chainflip-state-observer/Cargo.toml new file mode 100644 index 00000000000..17b880dec1e --- /dev/null +++ b/api/bin/chainflip-state-observer/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "chainflip-state-observer" +version = "0.1.0" +edition = "2024" + +[dependencies] +# chainflip-state-observer-framework = {git = "https://github.com/chainflip-io/chainflip-state-observer-framework.git", branch = "feat/PRO-2014/create-observable-trait"} +tracing = "0.1.41" +tracing-core = "0.1.33" +tracing-opentelemetry = "0.28.0" +tracing-subscriber = "0.3.19" +opentelemetry = "0.27.1" +opentelemetry_sdk = { version = "0.27.1", features = ["async-std", "rt-tokio", "trace"] } +opentelemetry-otlp = { version = "0.27.0", features = ["grpc-tonic"] } +opentelemetry-stdout = "0.27.0" +tokio = { version = "*", features = ["rt-multi-thread"]} +static_str_ops = "0.1.2" + +# workspaced deps +bitvec = { workspace = true, default-features = false } +codec = { workspace = true, default-features = false } +futures-util = { workspace = true } +futures = { workspace = true } +futures-core = { workspace = true } +serde = { workspace = true, features = ["derive"] } + +# local dependencies +pallet-cf-elections = { workspace = true, default-features = true } +pallet-cf-validator = { workspace = true, default-features = true } +chainflip-engine = { workspace = true } +custom-rpc = { workspace = true } +state-chain-runtime = { workspace = true, default-features = true } +cf-utilities = { workspace = true, default-features = true } + +[lints] +workspace = true diff --git a/api/bin/chainflip-state-observer/src/elections.rs b/api/bin/chainflip-state-observer/src/elections.rs new file mode 100644 index 00000000000..92184f783b2 --- /dev/null +++ b/api/bin/chainflip-state-observer/src/elections.rs @@ -0,0 +1,197 @@ + +use std::{collections::BTreeMap, fmt::{format, Display}, hash::{DefaultHasher, Hash, Hasher}}; + +use crate::{trace::Trace, ElectionData}; +use codec::{Decode, Encode}; +use pallet_cf_elections::{bitmap_components::ElectionBitmapComponents, electoral_system::BitmapComponentOf, vote_storage::VoteStorage, ElectionIdentifierOf, ElectoralSystemTypes, IndividualComponentOf, SharedDataHash, UniqueMonotonicIdentifier}; +use bitvec::prelude::*; + + +// NOTE! the order is important for ordering the traces! +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub enum Category { + Properties, + NoVote, + Vote(String), +} +use self::Category::*; + +impl Display for Category { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NoVote => write!(f, "Computing vote"), + Vote(s) => write!(f, "Election unchanged: {s}"), + Properties => write!(f, "New properties"), + // IndividualVote(s) => write!(f, "Individual: {s}"), + // PartialVote(s) => write!(f, "Partial: {s}"), + } + } +} + + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub enum Key { + RootBlockHeight(u32), + ElectoralSystem(String), + Election(String), + Category(String, Category), + Validator(u32), + State{summary: String}, +} + +impl Display for Key { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RootBlockHeight(h) => write!(f, "blocks {h}-..."), + Election(e) => write!(f, "{e}"), + Key::Category(extra, category) => write!(f, "[{extra}] {category}"), + Validator(x) => write!(f, "Validator {x}"), + ElectoralSystem(name) => write!(f, "ES {name}"), + State { summary } => write!(f, "{summary}"), + } + } +} + +use Key::*; + +pub fn cloned_vec<'a, XS: IntoIterator, X>(xs: XS) -> Vec +where X : Clone + 'a +{ + xs.into_iter().cloned().collect() +} + +/// Initial value from which the trace state will be created +#[derive(Clone)] +pub struct TraceInit { + pub end_immediately: bool, + pub attributes: Vec<(String, String)> +} + +impl TraceInit { + pub fn with_attribute(&self, key: String, value: String) -> Self { + let mut result = self.clone(); + result.attributes.push((key, value)); + result + } +} + + +struct AsHex(Vec); + +impl Display for AsHex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for b in &self.0 { + write!(f, "{b:x}")?; + } + Ok(()) + } +} + +struct ElectoralSystemStats { + ongoing_count: u32 +} + +pub fn add_ES_stats(mut trace: TraceInit, stats: &BTreeMap) -> TraceInit { + for (name, stat) in stats { + trace = trace.with_attribute(format!("z_count_{name}"), format!("{}", stat.ongoing_count)); + } + trace +} + +pub fn make_traces(data: ElectionData) -> Trace +where IndividualComponentOf: Encode +{ + + let mut votes: BTreeMap<(ElectionIdentifierOf, u32), (String, String)> = BTreeMap::new(); + + for (identifier, (name, properties)) in &data.elections { + + if let Some(bitmaps) = data.bitmaps.get(identifier.unique_monotonic()) { + + for (component, bitmap) in bitmaps { + for (id, bit) in bitmap.iter().enumerate() { + let key3 = Validator(id as u32); + if *bit { + votes.insert((*identifier, id as u32), (AsHex(component.encode()).to_string(), format!("{component:?}"))); + } + } + } + } + + if let Some(individual_components) = data.individual_components.get(identifier.unique_monotonic()) { + for (authority_index, component) in individual_components { + votes.insert((*identifier, *authority_index as u32), (AsHex(component.encode()).to_string(), format!("{component:?}"))); + } + } + } + + let end = TraceInit { + end_immediately: true, + attributes: Vec::new() + }; + let start = TraceInit { + end_immediately: false, + attributes: Vec::new() + }; + + let mut trace = Trace::new(); + + let root_height = data.height - (data.height % 50); + let key0 = RootBlockHeight(root_height); + trace.insert(vec![key0.clone()], end.with_attribute("height".into(), format!("{root_height}"))); + + let mut electoral_system_stats : BTreeMap = BTreeMap::new(); + + for name in data.electoral_system_names { + trace.insert(vec![key0.clone(), ElectoralSystem(name.clone())], end.with_attribute("height".into(), format!("{root_height}"))); + + let current_elections_for_name : Vec<_> = data.elections.iter().filter(|(_, (cur_name, _))| *cur_name == name).collect(); + electoral_system_stats.insert(name, ElectoralSystemStats { ongoing_count: current_elections_for_name.len() as u32 }); + } + + for (identifier, (name, properties)) in &data.elections { + + let input = identifier.encode(); + let mut other: &[u8] = &input; + let id: u64 = Decode::decode(&mut other).unwrap(); + let extra = format!("{:?}", identifier.extra()); + + let key1 = ElectoralSystem(name.clone()); + let key2 = Election(format!("{name} ({id})")); + + // election id + trace.insert(cloned_vec([&key0, &key1, &key2]), end.clone()); + + // properties + let key3 = Category(extra.clone(), Properties); + trace.insert( + cloned_vec([&key0, &key1, &key2, &key3]), + { + let init = end.with_attribute("Properties".into(), format!("{properties:#?}")); + // add_ES_stats(init, &electoral_system_stats) + init + } + ); + + // no votes + for authority_id in 0..data.validators { + if votes.get(&(*identifier, authority_id)).is_none() { + trace.insert(cloned_vec([&key0, &key1, &key2, &Category(extra.clone(), NoVote)]), start.clone()); + trace.insert(cloned_vec([&key0, &key1, &key2, &Category(extra.clone(), NoVote), &Validator(authority_id)]), start.clone()); + } + } + + // votes + for authority_id in 0..data.validators { + if let Some(s) = votes.get(&(*identifier, authority_id)) { + trace.insert(cloned_vec([&key0, &key1, &key2, &Category(extra.clone(), Vote(s.0.clone()))]), start.with_attribute("vote".into(), s.1.clone())); + trace.insert(cloned_vec([&key0, &key1, &key2, &Category(extra.clone(), Vote(s.0.clone())), &Validator(authority_id)]), start.with_attribute("vote".into(), s.1.clone())); + } + } + + } + + trace + +} + diff --git a/api/bin/chainflip-state-observer/src/main.rs b/api/bin/chainflip-state-observer/src/main.rs new file mode 100644 index 00000000000..ae611048917 --- /dev/null +++ b/api/bin/chainflip-state-observer/src/main.rs @@ -0,0 +1,367 @@ +#![feature(btree_extract_if)] + +pub mod elections; +pub mod trace; + +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::{Duration}; +// use std::task::ContextBuilder; + +use bitvec::vec::BitVec; +use chainflip_engine::state_chain_observer::client::chain_api::ChainApi; +use chainflip_engine::witness::dot::polkadot::runtime_apis::metadata::Metadata; +use chainflip_engine::witness::dot::polkadot::storage; +use codec::{Decode, Encode}; +use chainflip_engine::state_chain_observer::client::{ + base_rpc_api::RawRpcApi, extrinsic_api::signed::SignedExtrinsicApi, BlockInfo, + StateChainClient, +}; +// use opentelemetry::global::ObjectSafeSpan; +use opentelemetry::trace::{mark_span_as_active, Span, TraceContextExt as _, Tracer, TracerProvider as _}; +use chainflip_engine::state_chain_observer::client::base_rpc_api::BaseRpcClient; +use custom_rpc::CustomApiClient; +use elections::{make_traces, Key, TraceInit}; +use pallet_cf_validator::AuthorityIndex; +use state_chain_runtime::chainflip::solana_elections::SolanaElectoralSystemRunner; +use tokio::time::sleep; +use tracing::instrument::WithSubscriber; +use tracing::{event, span, Instrument, Level}; +use tracing_core::Callsite; +use tracing_opentelemetry::PreSampledTracer; +use tracing_subscriber::layer::SubscriberExt as _; +use tracing_subscriber::Registry; +// use tracing_subscriber::layer::{Context, SubscriberExt}; +use opentelemetry::{global, Context, KeyValue}; +use opentelemetry_sdk::trace::{RandomIdGenerator, TracerProvider}; +use opentelemetry_sdk::Resource; +use pallet_cf_elections::electoral_system::{BitmapComponentOf}; +use pallet_cf_elections::{ElectionDataFor, ElectionIdentifierOf, ElectionProperties, ElectoralSystemTypes, IndividualComponentOf, UniqueMonotonicIdentifier}; +use state_chain_runtime::{Runtime, SolanaInstance}; +use cf_utilities::task_scope::{self, Scope}; +use futures_util::FutureExt; +use chainflip_engine::state_chain_observer::client::storage_api::StorageApi; +use futures::{stream, StreamExt, TryStreamExt}; +use pallet_cf_elections::{ + electoral_systems::composite::tuple_6_impls::*, +}; +use trace::{diff, get_key_name, map_with_parent, NodeDiff, Trace}; +use std::env; + + +#[derive(Debug, Eq, PartialEq, Clone, Encode, Decode)] +pub struct ElectionData { + pub height: u32, + + pub bitmaps: BTreeMap< + UniqueMonotonicIdentifier, + Vec<(BitmapComponentOf, BitVec)> + >, + + pub individual_components: BTreeMap< + UniqueMonotonicIdentifier, + BTreeMap> + >, + + pub elections: BTreeMap, (String, ES::ElectionProperties)>, + + pub electoral_system_names: Vec, + + pub validators: u32, + + pub _phantom: std::marker::PhantomData +} + + +#[tokio::main(flavor = "multi_thread", worker_threads = 3)] +async fn main() { + println!("Hello, world!"); + + let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder() + .with_batch_exporter( + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build() + .unwrap(), + opentelemetry_sdk::runtime::Tokio, + ) + // .with_sampler(Sampler::AlwaysOn) + .with_id_generator(RandomIdGenerator::default()) + // .with_max_events_per_span(64) + // .with_max_attributes_per_span(16) + // .with_max_events_per_span(16) + .with_resource(Resource::new(vec![KeyValue::new("service.name", "es-overview")])) + .build(); + + global::set_tracer_provider(tracer_provider.clone()); + let tracer = tracer_provider.tracer("tracer-name-new"); + + // Create a tracing layer with the configured tracer + // let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + // Use the tracing subscriber `Registry`, or any other subscriber that impls `LookupSpan` + // let subscriber = Registry::default().with(telemetry); + + // let _guard = tracing::subscriber::set_default(subscriber); + + // event!(Level::INFO, "in hello!"); + + // new_watch(tracer).await; + + + { + + let ctx = Context::new().with_value(KeyValue::new("key", "value")); + // let _guard = ctx.attach(); + + let builder = tracer.span_builder("test_proc") + .with_start_time(std::time::SystemTime::now()) + .with_span_id(tracer.new_span_id()); + + let span = builder.start_with_context(&tracer, &ctx); + + // .start_with_context("test_proc", &ctx); + let ctx1 = ctx.with_span(span); + + sleep(Duration::from_secs(1)).await; + + // mark_span_as_active(ctx1.span()); + + ctx1.span().end(); + + let _results = tracer_provider.force_flush(); + + + + // let span2 = tracer.start_with_context("test_proc_child", &ctx1); + // let ctx2 = ctx1.with_span(span2); + + // sleep(Duration::from_secs(1)).await; + + // let span3 = tracer.start_with_context("test_proc_child2", &ctx2); + // let ctx3 = ctx1.with_span(span3); + + // ctx3.span().add_event("starting??", Vec::new()); + + // sleep(Duration::from_secs(1)).await; + // ctx3.span().end(); + + + // ctx1.span().end(); + // let x = ctx2.span().with_current_subscriber(); + // x.inner().set_attributes([KeyValue::new("mykey", "myvalue")]); + + // sleep(Duration::from_secs(1)).await; + + // ctx2.span().end(); + // sleep(Duration::from_secs(1)).await; + + } + + // let results = tracer_provider.force_flush(); + // for result in results { + // println!("result: {result:?}"); + // } + + // let result = tracer_provider.shutdown(); + // println!("{result:?}"); + + new_watch(tracer, tracer_provider).await; +} + + +fn push_traces(tracer: &T, current: Trace, new: Trace) -> Trace + where T::Span : Span + Send + Sync + 'static +{ + let δ = diff(current, new); + let traces = map_with_parent(δ, |k, p: Option<&Option>, d: NodeDiff| match d { + trace::NodeDiff::Left(context) => { + println!("closing trace {k:?}"); + context.span().end(); + None + }, + trace::NodeDiff::Right(TraceInit { end_immediately, attributes: values }) => { + let context = + if let Some(Some(context)) = p { + + + let key = get_key_name(k); + + let mut span = tracer.start_with_context(key, &context); + for (key, value) in values { + span.set_attribute(KeyValue::new(key, value)); + } + let context = context.with_span(span); + + + println!("open trace {k:?}"); + + context + + } else { + + let key = get_key_name(k); + + let context = Context::new().with_value(KeyValue::new("key", format!("{key:?}"))); + let mut span = tracer.start_with_context(key, &context); + for (key, value) in values { + span.set_attribute(KeyValue::new(key, value)); + } + let context = context.with_span(span); + println!("open trace {k:?} [NO PARENT]"); + context + }; + if end_immediately { + context.span().end(); + } + Some(context) + }, + trace::NodeDiff::Both(x, _) => { + Some(x) + }, + } + ) + .into_iter().filter_map(|(k, v)| match v {Some(v) => Some((k,v)), None => None}).collect(); + traces +} + +async fn new_watch(tracer: T, tracer_provider: TracerProvider) + where T::Span : Span + Send + Sync + 'static + +{ + + + task_scope::task_scope_local(|scope| async move { + + let rpc_url = env::var("CF_RPC_NODE").expect("CF_RPC_NODE required"); + + let (finalized_stream, unfinalized_stream, client) = StateChainClient::connect_without_account(&scope, &rpc_url).await.unwrap(); + + unfinalized_stream.fold((client, (BTreeMap::new(), BTreeMap::new()), tracer), async |(client, (overview_trace, detailed_traces), tracer), block| { + + let _results = tracer_provider.force_flush(); + + // let block_hash = client.latest_finalized_block().hash; + let block_hash = block.hash; + + let bitmaps : BTreeMap = client + .storage_map::, BTreeMap<_,_>>(block_hash) + .await + .expect("could not get storage") + ; + + let all_properties : BTreeMap<_,_> = client + .storage_map::, BTreeMap<_,_>>(block_hash) + .await + .expect("could not get storage"); + + let validators : Vec<_> = client + .storage_value::>(block_hash) + .await + .expect("could not get storage"); + + let mut individual_components = BTreeMap::new(); + for (key, prop) in &all_properties { + for (validator_index, validator) in validators.iter().enumerate() { + + if let Some((_, comp)) = client.storage_double_map_entry::>(block_hash, key.unique_monotonic(), validator) + .await.expect("could not get storage") { + println!("got individual component for election {key:?} for vld {validator_index}: {comp:?}"); + individual_components.entry(*key.unique_monotonic()).or_insert(BTreeMap::new()).insert(validator_index, comp); + // individual_components.insert(key.unique_monotonic(), comp); + } + } + } + + let bitmaps = bitmaps.into_iter() + .map(|(k,v)| (k, v.bitmaps)) + .collect(); + + let elections = all_properties.iter() + .map(|(key, val)| { + let name = match val { + CompositeElectionProperties::A(_) => "Blockheight", + CompositeElectionProperties::B(_) => "Ingress", + CompositeElectionProperties::C(_) => "Nonce", + CompositeElectionProperties::D(_) => "Egress", + CompositeElectionProperties::EE(_) => "Liveness", + CompositeElectionProperties::FF(_) => "Vaultswap", + }; + (key.clone(), (name.into(), val.clone())) + }) + .collect(); + + let result : ElectionData = ElectionData { + height: block.number, + bitmaps, + elections, + individual_components, + validators: validators.len() as u32, + _phantom: Default::default(), + electoral_system_names: vec![ + "Blockheight".into(), + "Ingress".into(), + "Nonce".into(), + "Egress".into(), + "Liveness".into(), + "Vaultswap".into(), + ], + }; + + let new_full_trace = make_traces(result); + let new_overview_trace = new_full_trace.iter().map(|(k,v)| (k.clone(), v.clone())).filter(|(key, _)| key.len() <= 4).collect::>(); + let new_detailed_traces = new_full_trace.iter().map(|(k,v)| (k.clone(), v.clone())).filter(|(key, _)| key.len() >= 2).collect::>(); + + let overview_trace = push_traces(&tracer, overview_trace, new_overview_trace); + let detailed_traces = push_traces(&tracer, detailed_traces, new_detailed_traces); + + + // δ.into_iter().filter_map(|(k,d)| match d ).collect(); + + // let all_properties : BTreeMap<_,_> = client + // .storage_map::, BTreeMap<_,_>>(block_hash) + // .await + // .expect("could not get storage"); + + // let delta_properties : Vec<_> = + // all_properties.iter().map(|(_, value)| match value { + // pallet_cf_elections::electoral_systems::composite::tuple_6_impls::CompositeElectionProperties::C(props) => Some(props), + // _ => None + // }) + // .collect(); + + // let all_state_map : BTreeMap<_,_> = client + // .storage_map::, BTreeMap<_,_>>(block_hash) + // .await + // .expect("could not get storage"); + + // let delta_state : BTreeMap<_,_> = + // all_state_map.iter().filter_map(|(key, value)| match (key,value) { + // (CompositeElectoralUnsynchronisedStateMapKey::C(key), CompositeElectoralUnsynchronisedStateMapValue::C(val)) + // => Some((key,val)), + // _ => None + // }) + // .collect(); + + // let block_height_state = client + // .storage_value::>(block_hash) + // .await + // .expect("could not get storage") + // .map(|(value, ..)| value) + // .expect("could not get block height"); + + (client, (overview_trace, detailed_traces), tracer) + + }).await; + + Ok(()) + + }.boxed_local()).await.unwrap() +} + + + + + diff --git a/api/bin/chainflip-state-observer/src/trace.rs b/api/bin/chainflip-state-observer/src/trace.rs new file mode 100644 index 00000000000..efea14fe408 --- /dev/null +++ b/api/bin/chainflip-state-observer/src/trace.rs @@ -0,0 +1,134 @@ + +use std::{collections::BTreeMap, os::unix::process}; + +// #[derive(Debug)] +// pub enum Trace { +// Composite(V, BTreeMap>), +// // Single(V) +// } +// use Trace::*; + +pub type Trace = BTreeMap,V>; + +pub enum NodeDiff { + Left(V), + Right(W), + Both(V,W) +} + +impl NodeDiff { + + pub fn get_left(&self) -> Option<&W> { + match self { + Left(_) => None, + Right(a) => Some(a), + Both(_, a) => Some(a), + } + } + + pub fn get_right(&self) -> Option<&W> { + match self { + Left(_) => None, + Right(a) => Some(a), + Both(_, a) => Some(a), + } + } +} + +use static_str_ops::staticize; +use NodeDiff::*; + +pub fn diff(a: Trace, b: Trace) -> Trace> { + zip_with(a, b, |v,w| match (v,w) { + (None, None) => None, + (None, Some(w)) => Some(Right(w)), + (Some(v), None) => Some(Left(v)), + (Some(v), Some(w)) => Some(Both(v,w)), + }) +} +pub fn fmap(this: BTreeMap, f: &impl Fn(V) -> W) -> BTreeMap { + this.into_iter().map(|(k,v)| (k, f(v))).collect() +} + +// TODO! This has currently a hardcoded 10! +pub fn map_with_parent(mut this: Trace, f: impl Fn(&Vec, Option<&W>, V) -> W) -> Trace { + let mut processed = BTreeMap::new(); + for length in (0..10) { + for (key, value) in this.extract_if(|k,_| k.len() == length) { + let p; + if key.len() > 0 { + let parent_key = &key[0..key.len() - 1]; + p = processed.get(parent_key); + } else { + p = None; + } + let v = f(&key, p, value); + processed.insert(key, v); + } + } + processed +} + + +pub fn get_key_name(key: &Vec) -> &'static str{ + let name = key.last().map(|x| format!("{x}")).unwrap_or("root".into()); + staticize(&name) +} + + + +// impl Trace { +// pub fn fmap(self, f: &impl Fn(V) -> W) -> Trace { +// match self { +// Composite(x, xs) => Composite(f(x), xs.into_iter().map(|(k,x)| (k, x.fmap(f))).collect()), +// } +// } + +// pub fn filter_some(this: Trace>) -> Self { +// match this { +// Composite(None, btree_map) => Composite(None, BTreeMap::new()), +// Composite(Some(a), btree_map) => , +// } +// } + +// } + +// pub fn diff(a: Option>, b: Option>) -> Option>> { +// match (a, b) { +// (None, None) => None, +// (None, Some(Composite(y, ys))) => Some(Composite(Right(y), ys.into_iter().map(|(k,v)| (k, v.fmap(&Right))).collect())), +// (Some(Composite(x, xs)), None) => Some(Composite(Left(x), xs.into_iter().map(|(k,v)| (k, v.fmap(&Left))).collect())), +// (Some(Composite(x, xs)), Some(Composite(y,ys))) => Some(Composite(Both(x,y), { +// zip_with(xs, ys, |v,w| match (v,w) { +// (None, None) => None, +// (None, Some(w)) => Some(w.fmap(&Right)), +// (Some(v), None) => Some(v.fmap(&Left)), +// (Some(v), Some(w)) => diff(Some(v),Some(w)), +// }) +// })), +// } +// } + +// pub struct TraceFn { +// create: Box) -> W>, +// update: Box W>, +// destroy: Box, +// } + + +// ------ helpers ------- + +fn zip_with(x: BTreeMap, mut y: BTreeMap, f: impl Fn(Option, Option) -> Option) -> BTreeMap { + let mut result = BTreeMap::new(); + for (k, v) in x.into_iter() { + if let Some(x) = f(Some(v), y.remove(&k)) { + result.insert(k, x); + } + } + for (k,w) in y.into_iter() { + if let Some(x) = f(None, Some(w)) { + result.insert(k, x); + } + } + result +} diff --git a/engine/src/state_chain_observer/client.rs b/engine/src/state_chain_observer/client.rs index 4bcdeddfdee..357be128a9b 100644 --- a/engine/src/state_chain_observer/client.rs +++ b/engine/src/state_chain_observer/client.rs @@ -281,13 +281,13 @@ impl block_stream, - _ => { - return Err(CreateStateChainClientError::CompatibilityError( - block_compatibility, - ) - .into()); - }, + _ => block_stream, + // _ => { + // return Err(CreateStateChainClientError::CompatibilityError( + // block_compatibility, + // ) + // .into()); + // }, } }; @@ -296,10 +296,10 @@ impl(latest_block); @@ -320,23 +320,23 @@ impl { + _ => { latest_block = block; block_sender.send(block).await; let _result = latest_block_sender.send(block); }, - CfeCompatibility::NoLongerCompatible => { - if error_on_incompatible_block { - break Err(CreateStateChainClientError::CompatibilityError(block_compatibility).into()); - } else { - tracing::warn!("StateChain block number {} is no longer compatible.", block.number); - } - } - CfeCompatibility::NotYetCompatible => { - // We've either already returned a NotYetCompatible error, or we've waited until we're compatible. So we - // don't expect this case to happen. - break Err(CreateStateChainClientError::CompatibilityError(block_compatibility).into()); - }, + // CfeCompatibility::NoLongerCompatible => { + // if error_on_incompatible_block { + // break Err(CreateStateChainClientError::CompatibilityError(block_compatibility).into()); + // } else { + // tracing::warn!("StateChain block number {} is no longer compatible.", block.number); + // } + // } + // CfeCompatibility::NotYetCompatible => { + // // We've either already returned a NotYetCompatible error, or we've waited until we're compatible. So we + // // don't expect this case to happen. + // break Err(CreateStateChainClientError::CompatibilityError(block_compatibility).into()); + // }, } }, if let Some(block_stream_request) = block_stream_request_receiver.next() => { diff --git a/engine/src/state_chain_observer/client/storage_api.rs b/engine/src/state_chain_observer/client/storage_api.rs index 4fd4abd8c34..1159f7d6e62 100644 --- a/engine/src/state_chain_observer/client/storage_api.rs +++ b/engine/src/state_chain_observer/client/storage_api.rs @@ -180,6 +180,14 @@ pub trait StorageApi { .map(|(_k, v)| v) .collect()) } + + // async fn storage_double_map< + // StorageDoubleMap: StorageDoubleMapAssociatedTypes + 'static, + // ReturnedIter: FromIterator<((::Key1, ::Key2), StorageDoubleMap::Value)> + 'static, + // >( + // &self, + // block_hash: state_chain_runtime::Hash, + // ) -> RpcResult; } #[async_trait] @@ -276,6 +284,7 @@ impl Storag }) .collect()) } + } #[async_trait] @@ -352,6 +361,17 @@ impl< ) -> RpcResult { self.base_rpc_client.storage_map::(block_hash).await } + + // #[track_caller] + // async fn storage_double_map< + // StorageDoubleMap: StorageDoubleMapAssociatedTypes + 'static, + // ReturnedIter: FromIterator<((::Key1, ::Key2), StorageDoubleMap::Value)> + 'static, + // >( + // &self, + // block_hash: state_chain_runtime::Hash, + // ) -> RpcResult { + // self.base_rpc_client.storage_double_map::(block_hash).await + // } } #[derive(Debug)] diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 58ecf09b6c9..e13dde5d85c 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "nightly-2025-01-10" +channel = "nightly-2025-01-09" components = [ "cargo", "clippy", diff --git a/state-chain/custom-rpc/src/lib.rs b/state-chain/custom-rpc/src/lib.rs index a7c4d266711..d9c2f68cafe 100644 --- a/state-chain/custom-rpc/src/lib.rs +++ b/state-chain/custom-rpc/src/lib.rs @@ -970,6 +970,12 @@ pub trait CustomApi { at: Option, ) -> RpcResult>; + #[method(name = "solana_election_data")] + fn cf_solana_election_data( + &self, + at: Option, + ) -> RpcResult< Vec >; + #[method(name = "solana_filter_votes")] fn cf_solana_filter_votes( &self, @@ -1861,6 +1867,13 @@ where self.with_runtime_api(at, |api, hash| api.cf_electoral_data(hash, validator)) } + fn cf_solana_election_data( + &self, + at: Option, + ) -> RpcResult< Vec > { + self.with_runtime_api(at, |api, hash| api.cf_election_data(hash)) + } + fn cf_solana_filter_votes( &self, validator: state_chain_runtime::AccountId, diff --git a/state-chain/pallets/cf-elections/src/electoral_system.rs b/state-chain/pallets/cf-elections/src/electoral_system.rs index bb7b767bdb0..9aec6a7e1a8 100644 --- a/state-chain/pallets/cf-elections/src/electoral_system.rs +++ b/state-chain/pallets/cf-elections/src/electoral_system.rs @@ -36,6 +36,24 @@ impl ConsensusVotes { } } +use sp_std::collections::btree_map::BTreeMap; +use bitvec::prelude::BitVec; +use sp_core::Encode; +use sp_core::Decode; +use crate::UniqueMonotonicIdentifier; +#[derive(Debug, Eq, PartialEq, Clone, Encode, Decode)] +pub struct ElectionData { + // properties: ES::ElectionProperties, + // validators: Vec, + // shared_votes: BTreeMap::SharedData>, + pub bitmaps: BTreeMap< + UniqueMonotonicIdentifier, + Vec<(BitmapComponentOf, BitVec)> + >, + + pub _phantom: sp_std::marker::PhantomData +} + /// A trait for defining all relevant types of an electoral system. pub trait ElectoralSystemTypes: 'static + Sized { type ValidatorId: Parameter + Member + MaybeSerializeDeserialize; diff --git a/state-chain/pallets/cf-elections/src/lib.rs b/state-chain/pallets/cf-elections/src/lib.rs index 1e51050241a..f3248d75a24 100644 --- a/state-chain/pallets/cf-elections/src/lib.rs +++ b/state-chain/pallets/cf-elections/src/lib.rs @@ -195,6 +195,12 @@ pub mod pallet { BlockNumberFor, >; + use crate::electoral_system::ElectionData; + + /// TEMP. + #[allow(type_alias_bounds)] + pub type ElectionDataFor, I: 'static> = ElectionData; + /// A unique identifier for an election. #[derive( PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Encode, Decode, TypeInfo, Default, @@ -680,6 +686,16 @@ pub mod pallet { }); } ElectionConsensusHistoryUpToDate::::remove(unique_monotonic_identifier); + + let mut individual_components = + IndividualComponents::::iter_prefix(unique_monotonic_identifier) + .collect::>(); + + log::info!("mylog: clearing votes for {unique_monotonic_identifier:?}"); + for (validator, component) in &individual_components { + log::info!("mylog: - validator {validator:?} has {component:?}"); + } + } fn delete_election( composite_election_identifier: ElectionIdentifierOf, @@ -747,6 +763,11 @@ pub mod pallet { IndividualComponents::::iter_prefix(unique_monotonic_identifier) .collect::>(); + log::info!("mylog: checking consensus for {election_identifier:?}"); + for (validator, component) in &individual_components { + log::info!("mylog: - validator {validator:?} has {component:?}"); + } + let votes = current_authorities .into_iter() .map(|validator_id| { @@ -930,7 +951,7 @@ pub mod pallet { pub struct ElectionBitmapComponents, I: 'static> { epoch: EpochIndex, #[allow(clippy::type_complexity)] - bitmaps: + pub bitmaps: Vec<(BitmapComponentOf, BitVec)>, #[codec(skip)] _phantom: core::marker::PhantomData<(T, I)>, @@ -1229,6 +1250,18 @@ pub mod pallet { ); for (election_identifier, authority_vote) in authority_votes { + + // - check individual components + let mut individual_components = + IndividualComponents::::iter_prefix(*election_identifier.unique_monotonic()) + .collect::>(); + + log::info!("mylog: voting for {election_identifier:?}"); + for (validator, component) in &individual_components { + log::info!("mylog: - validator {validator:?} has {component:?}"); + } + + // if an identifier refers to a non existent election, skip this vote, // but continue processing others. let unique_monotonic_identifier = if let Ok(unique_monotonic_identifier) = diff --git a/state-chain/runtime/src/lib.rs b/state-chain/runtime/src/lib.rs index 5f10989cbd4..9d663709acc 100644 --- a/state-chain/runtime/src/lib.rs +++ b/state-chain/runtime/src/lib.rs @@ -1439,6 +1439,9 @@ mod benches { ); } +use pallet_cf_elections::ElectionDataFor; +use pallet_cf_elections::electoral_system::ElectionData; + impl_runtime_apis! { impl runtime_apis::ElectoralRuntimeApi for Runtime { fn cf_electoral_data(account_id: AccountId) -> Vec { @@ -1452,6 +1455,20 @@ impl_runtime_apis! { // START custom runtime APIs impl runtime_apis::CustomRuntimeApi for Runtime { + + fn cf_election_data() -> Vec { + let bitmaps = pallet_cf_elections::BitmapComponents::::iter() + .map(|(k,v)| (k, v.bitmaps)) + .collect(); + + let result : ElectionDataFor = ElectionData { + bitmaps, + _phantom: Default::default() + }; + + result.encode() + } + fn cf_is_auction_phase() -> bool { Validator::is_auction_phase() } diff --git a/state-chain/runtime/src/runtime_apis.rs b/state-chain/runtime/src/runtime_apis.rs index dc0817d5b87..1c4f29baea2 100644 --- a/state-chain/runtime/src/runtime_apis.rs +++ b/state-chain/runtime/src/runtime_apis.rs @@ -467,6 +467,9 @@ decl_runtime_apis!( broker: AccountId32, affiliate: Option, ) -> Vec<(AccountId32, AffiliateDetails)>; + + fn cf_election_data() -> Vec; + // ElectionDataFor; } ); diff --git a/utilities/src/with_std/task_scope.rs b/utilities/src/with_std/task_scope.rs index dcd80bcacd5..e341c5dc1e6 100644 --- a/utilities/src/with_std/task_scope.rs +++ b/utilities/src/with_std/task_scope.rs @@ -293,6 +293,120 @@ pub fn task_scope< } } + +#[track_caller] +pub fn task_scope_local< + 'a, + T, + Error: Debug + Send + 'static, + C: for<'b> FnOnce(&'b Scope<'a, Error>) -> futures::future::LocalBoxFuture<'b, Result>, +>( + top_level_task: C, +) -> impl Future> { + let location = core::panic::Location::caller(); + + async move { + tracing::info!( + target: "task_scope", + "scope opened: '{location}'", + ); + let guard = scopeguard::guard((), move |_| { + if std::thread::panicking() { + tracing::error!( + target: "task_scope", + "scope closed by panic: '{location}'" + ); + } else { + tracing::error!( + target: "task_scope", + "scope closed by cancellation: '{location}'" + ); + } + }); + + let (scope, mut task_result_stream) = Scope::new(); + + // try_join ensures if the top level task returns an error we immediately drop + // `task_result_stream`, which in turn cancels all the tasks + let result = tokio::try_join!( + async move { + while let Some(task_result) = task_result_stream.next().await { + match task_result { + Err(error) => { + // Note we drop the task_result_stream on unwind causing all tasks to + // be cancelled/aborted + if let Ok(panic) = error.try_into_panic() { + std::panic::resume_unwind(panic); + } /* else: Can only occur if tokio's runtime is dropped during task + * scope's lifetime, in this case we are about to be cancelled + * ourselves */ + }, + Ok(future_result) => future_result?, + } + } + // task_result_stream has ended meaning scope has been dropped and all tasks + // (excluding the top-level task) have finished running + Ok(()) + }, + // This async move scope ensures scope is dropped when top_level_task and its returned + // future finish (Instead of when this function exits) + async move { + tracing::info!( + target: "task_scope", + "parent task started '{location}'" + ); + let guard = scopeguard::guard((), move |_| { + if std::thread::panicking() { + tracing::error!( + target: "task_scope", + "parent task ended by panic: '{location}'" + ); + } else { + tracing::error!( + target: "task_scope", + "parent task ended by cancellation: '{location}'" + ); + } + }); + let result = top_level_task(&scope).await; + scopeguard::ScopeGuard::into_inner(guard); + match &result { + Ok(_) => tracing::info!( + target: "task_scope", + "parent task ended: '{location}'" + ), + Err(error) => tracing::error!( + target: "task_scope", + "parent task ended by error '{error:?}': '{location}'" + ), + } + result + } + ); + + scopeguard::ScopeGuard::into_inner(guard); + + match result { + Ok((_, t)) => { + tracing::info!( + target: "task_scope", + "scope closed: '{location}'" + ); + Ok(t) + }, + Err(error) => { + tracing::info!( + target: "task_scope", + "scope closed by error: {error:?} '{location}'" + ); + Err(error) + }, + } + } +} + + + type TaskFuture = Pin> + Send>>; #[derive(Clone, Copy)]