-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(infra): concurrent materializer tests #1243
base: main
Are you sure you want to change the base?
Conversation
} | ||
|
||
pub async fn record(&mut self, label: String) { | ||
let duration = self.start_time.unwrap().elapsed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel expect here if better if you assume caller know calling "start" should happen first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revise this API once the reporting summary is more solid.
|
||
#[derive(Default)] | ||
pub struct NonceManager { | ||
nonces: Arc<Mutex<HashMap<H160, U256>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bottom neck as well, every address is waiting on the same lock. Maybe this might help: https://github.com/xacrimon/dashmap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is just a temporary solution, I was hoping to remove it entirely. If not, I'll optimize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to try building in NonceManager from Ethers. Was there any problem with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use dashmap
. Last time I checked ~6 months ago, it still had soundness issues for async code.
I have a suggestion about how to improve the framework design to make it cleaner and more intuitive. While the current implementation works, there are some areas where terminology and structure can be refined to improve clarity and usability. Consider the following approach:
The Orchestrate the entire benchmarking process. Execute each BenchmarkStep sequentially within the specified duration limit. struct BenchmarkRunner {
steps: Vec<BenchmarkStep>,
max_duration: Duration,
}
impl BenchmarkRunner {
fn new(steps: Vec<BenchmarkStep>, max_duration: Duration) -> Self;
fn run(&self) -> BenchmarkResult;
}
The struct BenchmarkStep<F>
where
F: Fn(TestInput) -> TestResult + Send + Sync + 'static {
concurrency: usize, // Number of concurrent test executions (N)
run_duration: Duration, // Execution time duration (in seconds)
test_fn: Arc<F>,
}
impl<F> BenchmarkStep<F>
where
F: Fn(TestInput) -> TestResult + Send + Sync + 'static
{
fn execute(&self, stop_flag: Arc<AtomicBool>) -> StepResult;
} The
The TestInput structure can remain as it is, without the current struct TestResult {
pub test_id: usize,
pub step_id: usize,
pub tx_hash: Option<H256>,
pub tx_tracker: TransactionTracker,
pub err: Option<anyhow::Error>,
}
Instead of the existing The new method should automatically set the submission time to ensure correct tracking without requiring manual intervention. struct TransactionTracker {
submission_time: Instant,
mempool_time: Option<Instant>,
block_time: Option<Instant>,
}
impl TransactionTracker {
fn new() -> Self;
fn mark_mempool(&mut self);
fn mark_block(&mut self);
fn get_mempool_latency(&self) -> Option<Duration>;
fn get_block_latency(&self) -> Option<Duration>;
}
The struct StepResult {
step_id: usize,
tests: Vec<TestResult>,
avg_mempool_latency: Duration,
avg_block_latency: Duration,
// Additional useful statistics for the step
}
impl StepResult {
fn new(results: Vec<TestResult>) -> Self;
}
The execution engine should support concurrent execution of the test function for a specified duration, allowing precise control over execution time. fn run_concurrent<F>(concurrency: usize, run_duration: Duration, test_fn: F, stop_flag: Arc<AtomicBool>)
where
F: Fn(TestInput) -> TestResult + Send + Sync + 'static; The stop_flag ensures the execution stops when the total benchmark duration is reached or when other termination conditions occur.
The struct BenchmarkResult {
steps: Vec<StepResult>,
} ConclusionThis revised design primarily improves terminology and clarity, making the framework more cohesive and intuitive. The key benefits of the proposed approach include: Encapsulation: Each BenchmarkStep holds its own test function, making it easier to run varied tests within a single benchmark. Clarity: Replacing Intuitive Structure: The separation of responsibilities across BenchmarkRunner, BenchmarkStep, and TransactionTracker makes the design easier to understand and maintain. Overall, this proposal aligns closely with the current design but improves cohesion, intuitiveness, and robustness. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the first major review batch (1/2). Tomorrow, a smaller set of reviews will follow.
Outstanding reviews:
- The tests in
benches.rs
- Thoroughly review
summary.rs
|
||
#[derive(Default)] | ||
pub struct NonceManager { | ||
nonces: Arc<Mutex<HashMap<H160, U256>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to try building in NonceManager from Ethers. Was there any problem with that?
let step_results = Arc::new(tokio::sync::Mutex::new(Vec::new())); | ||
let execution_start = Instant::now(); | ||
loop { | ||
if execution_start.elapsed() > step.duration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe?
while execution_start.elapsed() < step.duration {
fendermint/testing/materializer/src/concurrency/reporting/dataset.rs
Outdated
Show resolved
Hide resolved
fendermint/testing/materializer/src/concurrency/reporting/dataset.rs
Outdated
Show resolved
Hide resolved
fendermint/testing/materializer/src/concurrency/reporting/dataset.rs
Outdated
Show resolved
Hide resolved
fendermint/testing/materializer/src/concurrency/reporting/dataset.rs
Outdated
Show resolved
Hide resolved
fendermint/testing/materializer/src/concurrency/reporting/summary.rs
Outdated
Show resolved
Hide resolved
.await | ||
.unwrap(); | ||
tx = tx.gas(gas_estimation); | ||
assert!(gas_estimation <= max_tx_gas_limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this fail to whole test run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason for allowing this to pass, and having to deal with partial failures. However, this shouldn't normally fail, and if so should fail consistently.
} | ||
input.bencher.mempool(); | ||
|
||
let receipt = pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a timeout here in case it isn't included?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will be added.
fendermint/testing/materializer/src/concurrency/reporting/summary.rs
Outdated
Show resolved
Hide resolved
fendermint/testing/materializer/src/concurrency/reporting/tps.rs
Outdated
Show resolved
Hide resolved
fendermint/testing/materializer/src/concurrency/reporting/tps.rs
Outdated
Show resolved
Hide resolved
Both reviews (2/2) are now complete. That should be all for now :) |
// Copyright 2022-2024 Protocol Labs | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
|
||
pub struct Signal(tokio::sync::Semaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uses a loom::sync::Mutex
internally and is hence not signal safe, since pthread_mutex_lock
is not signal safe. Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was just browsing code base, but i was curious what is your concern here, e.g what signal safety means
it is generally working pattern to hold blocking locks that guard short sections executed in async context. blocking primitives are used consistently in the tokio codebase, one of the examples https://github.com/tokio-rs/tokio/blob/4b3da20c9847b202cf110f7b7772fd4674edaecf/tokio/src/sync/barrier.rs#L142-L148 , and some info here https://tokio.rs/tokio/tutorial/shared-state under Tasks, threads, and contention paragraph
specifically in semaphore it guards a section that doesn't yield by itself, and should be very fast to complete (i expect that to be sub 1us), so preemption by os is very unlikely.
that said, there is actually no waiting in this wrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess you meant that if this is used directly in the signal interrupt handler, then it doesn't protect from re-entrancy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Signal
needs some more context. My assumption was handling UNIX signals from doing a single pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 27 files reviewed, 34 unresolved discussions (waiting on @cryptoAtwill, @dshulyak, @karlem, @LePremierHomme, and @raulk)
fendermint/testing/materializer/src/concurrency/reporting/dataset.rs
line 43 at r6 (raw file):
let median = if count % 2 == 0 { (sorted_data[count / 2 - 1] + sorted_data[count / 2]) / 2.0
Nit: that's not a median, pick one, I'd argue the else branch is all we need
fendermint/testing/materializer/src/concurrency/signal.rs
line 4 at r6 (raw file):
// SPDX-License-Identifier: Apache-2.0, MIT pub struct Signal(tokio::sync::Semaphore);
A better name and some documentation would be great
fendermint/testing/materializer/tests/docker_tests/benches.rs
line 248 at r6 (raw file):
deploy_tx.set_gas(gas_estimation); assert!(gas_estimation <= max_tx_gas_limit);
The setup code deserves a few comments
fendermint/testing/materializer/src/concurrency/reporting/summary.rs
line 59 at r6 (raw file):
} pub fn print(&self) {
Nit: move to std::fmt::Display
implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 28 files reviewed, 34 unresolved discussions (waiting on @cryptoAtwill, @karlem, @LePremierHomme, and @raulk)
|
||
#[derive(Default)] | ||
pub struct NonceManager { | ||
nonces: Arc<Mutex<HashMap<H160, U256>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use dashmap
. Last time I checked ~6 months ago, it still had soundness issues for async code.
Introducing concurrent tests in
materializer
, enabling the generation of traceable workloads without significantly altering how test scenarios are written.Existing tests refactored to use
make_testnet
instead ofwith_testnet
.Progress
docket_tests::benches::test_native_coin_transfer
)docket_tests::benches::test_contract_deployment
)docket_tests::benches::test_contract_call
)NonceManager
was introduced due toget_transaction_count
not being reliable. need to double check thatFor follow-up PRs
This change is