Skip to content

Commit

Permalink
feat cut (Hyle-org#211)
Browse files Browse the repository at this point in the history
* avoid re-sending sync requests

keep track of previous sent sync requests
in order to avoid resending one.

* improve data_proposal handling

do not make empty data proposals
ignore empty data proposals
do not make a batch with already used Cars

* rename DataProposal.inner to txs

* storage refactoring

* implement data dissemination cut

* implement data dissemination cut

* rename Cut to CutWithTxs. prepare Cut without txs

* remove previous_sync_requests logic

* reduce mempool log details
  • Loading branch information
Alex Boussinet authored Oct 16, 2024
1 parent 848dc3d commit d320bfa
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 308 deletions.
42 changes: 19 additions & 23 deletions src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::{debug, info, warn};
use crate::{
bus::{bus_client, command_response::Query, BusMessage, SharedMessageBus},
handle_messages,
mempool::{Batch, BatchInfo, MempoolCommand, MempoolEvent, MempoolResponse},
mempool::{CutWithTxs, Cut, MempoolCommand, MempoolEvent, MempoolResponse},
model::{
get_current_timestamp, Block, BlockHash, BlockHeight, Hashable, Transaction,
TransactionData, ValidatorPublicKey,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub enum ConsensusCommand {
pub enum ConsensusEvent {
CommitBlock {
validators: Vec<ValidatorPublicKey>,
batch_info: BatchInfo,
cut_lanes: Cut,
block: Block,
},
}
Expand Down Expand Up @@ -117,7 +117,7 @@ pub struct ConsensusProposal {
slot: Slot,
view: u64,
next_leader: u64,
batch_info: BatchInfo,
cut_lanes: Cut,
previous_consensus_proposal_hash: ConsensusProposalHash,
previous_commit_quorum_certificate: QuorumCertificate,
block: Block, // FIXME: Block ou cut ?
Expand Down Expand Up @@ -149,7 +149,7 @@ pub struct ConsensusStore {
buffered_invalid_proposals: HashMap<ConsensusProposalHash, ConsensusProposal>,
// FIXME: pub is here for testing
pub blocks: Vec<Block>,
pending_batches: Vec<Batch>,
pending_cuts: Vec<CutWithTxs>,
}

pub struct Consensus {
Expand Down Expand Up @@ -201,17 +201,17 @@ impl Consensus {
.collect();

// Create block to-be-proposed
let batch = if self.pending_batches.is_empty() {
Batch::default()
let cut = if self.pending_cuts.is_empty() {
CutWithTxs::default()
} else {
self.pending_batches.remove(0)
self.pending_cuts.remove(0)
};
let block = Block {
parent_hash,
height: parent_height + 1,
timestamp: get_current_timestamp(),
new_bonded_validators,
txs: batch.txs,
txs: cut.txs,
};

let validators = self.bft_round_state.staking.bonded();
Expand All @@ -221,7 +221,7 @@ impl Consensus {
slot: self.bft_round_state.slot,
view: self.bft_round_state.view,
next_leader: (self.bft_round_state.leader_index + 1) % validators.len() as u64,
batch_info: batch.info,
cut_lanes: cut.tips,
previous_consensus_proposal_hash,
previous_commit_quorum_certificate,
validators,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl Consensus {
slot: self.bft_round_state.slot,
view: self.bft_round_state.view,
next_leader: 1,
batch_info: BatchInfo::new(self.crypto.validator_pubkey().clone()),
cut_lanes: Cut::default(),
previous_consensus_proposal_hash: ConsensusProposalHash(vec![]),
previous_commit_quorum_certificate: QuorumCertificate::default(),
validators,
Expand Down Expand Up @@ -300,7 +300,7 @@ impl Consensus {
.bus
.send(ConsensusEvent::CommitBlock {
validators: self.bft_round_state.consensus_proposal.validators.clone(),
batch_info: self.bft_round_state.consensus_proposal.batch_info.clone(),
cut_lanes: self.bft_round_state.consensus_proposal.cut_lanes.clone(),
block: self.bft_round_state.consensus_proposal.block.clone(),
})
.context("Failed to send ConsensusEvent::CommitBlock msg on the bus")?;
Expand Down Expand Up @@ -1223,10 +1223,10 @@ impl Consensus {
fn handle_command(&mut self, msg: ConsensusCommand) -> Result<()> {
match msg {
ConsensusCommand::SingleNodeBlockGeneration(block_number) => {
let batch = if self.pending_batches.is_empty() {
Batch::default()
let cut = if self.pending_cuts.is_empty() {
CutWithTxs::default()
} else {
self.pending_batches.remove(0)
self.pending_cuts.remove(0)
};
let parent_hash: String =
rand::Rng::sample_iter(rand::thread_rng(), &rand::distributions::Alphanumeric)
Expand All @@ -1238,13 +1238,13 @@ impl Consensus {
height: BlockHeight(block_number),
timestamp: get_current_timestamp(),
new_bonded_validators: vec![],
txs: batch.txs,
txs: cut.txs,
};
_ = self
.bus
.send(ConsensusEvent::CommitBlock {
validators: self.bft_round_state.consensus_proposal.validators.clone(),
batch_info: batch.info,
cut_lanes: cut.tips,
block: block.clone(),
})
.context("Failed to send ConsensusEvent::CommitBlock msg on the bus")?;
Expand All @@ -1270,13 +1270,9 @@ impl Consensus {

async fn handle_mempool_event(&mut self, msg: MempoolEvent) -> Result<()> {
match msg {
MempoolEvent::LatestBatch(batch) => {
debug!(
"Received batch from {} with txs: {:?} pos {} parent {:?}",
batch.info.validator, batch.txs, batch.info.tip.pos, batch.info.tip.parent,
);
self.pending_batches.push(batch);

MempoolEvent::NewCut(cut) => {
debug!("Received a new cut");
self.pending_cuts.push(cut);
Ok(())
}
}
Expand Down
Loading

0 comments on commit d320bfa

Please sign in to comment.