Skip to content

Commit

Permalink
A0-3576: Remove substrate sync (#1721)
Browse files Browse the repository at this point in the history
# Description

We are doing this, lets go~~!

## Type of change

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)

# Checklist:

- I have made corresponding changes to the existing documentation
- I have created new documentation
  • Loading branch information
timorleph authored May 14, 2024
1 parent a67cba5 commit 63578b9
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 150 deletions.
77 changes: 36 additions & 41 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::{
};

use finality_aleph::{
run_validator_node, AlephBlockImport, AlephConfig, AllBlockMetrics, BlockImporter,
ChannelProvider, Justification, JustificationTranslator, MillisecsPerBlock, NetConfig,
RateLimiterConfig, RedirectingBlockImport, SessionPeriod, SubstrateChainStatus,
SyncNetworkService, SyncOracle, TracingBlockImport, ValidatorAddressCache,
build_network, run_validator_node, AlephBlockImport, AlephConfig, AllBlockMetrics,
BlockImporter, BuildNetworkOutput, ChannelProvider, Justification, JustificationTranslator,
MillisecsPerBlock, RateLimiterConfig, RedirectingBlockImport, SessionPeriod,
SubstrateChainStatus, SyncOracle, TracingBlockImport, ValidatorAddressCache,
};
use log::warn;
use primitives::{
Expand All @@ -18,10 +18,9 @@ use primitives::{
};
use sc_basic_authorship::ProposerFactory;
use sc_client_api::HeaderBackend;
use sc_consensus::ImportQueue;
use sc_consensus::{ImportQueue, Link};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_consensus_slots::BackoffAuthoringBlocksStrategy;
use sc_network::config::FullNetworkConfiguration;
use sc_service::{error::Error as ServiceError, Configuration, TFullClient, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -246,6 +245,10 @@ fn get_rate_limit_config(aleph_config: &AlephCli) -> RateLimiterConfig {
}
}

struct NoopLink;

impl Link<Block> for NoopLink {}

/// Builds a new service for a full client.
pub fn new_authority(
config: Configuration,
Expand All @@ -261,7 +264,7 @@ pub fn new_authority(

let backoff_authoring_blocks = Some(LimitNonfinalized(aleph_config.max_nonfinalized_blocks()));
let prometheus_registry = config.prometheus_registry().cloned();
let (sync_oracle, _) = SyncOracle::new();
let (sync_oracle, major_sync) = SyncOracle::new();
let proposer_factory = get_proposer_factory(&service_components, &config);
let slot_duration = sc_consensus_aura::slot_duration(&*service_components.client)?;
let (block_import, block_rx) = RedirectingBlockImport::new(service_components.client.clone());
Expand Down Expand Up @@ -298,29 +301,25 @@ pub fn new_authority(

let import_queue_handle = BlockImporter::new(service_components.import_queue.service());

let mut net_config = FullNetworkConfiguration::new(&config.network);
let genesis_hash = service_components
.client
.hash(0)
.ok()
.flatten()
.expect("Genesis block exists.");
let NetConfig {
let BuildNetworkOutput {
network,
authentication_network,
block_sync_network,
} = NetConfig::new(&mut net_config, &genesis_hash);
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
client: service_components.client.clone(),
transaction_pool: service_components.transaction_pool.clone(),
spawn_handle: service_components.task_manager.spawn_handle(),
import_queue: service_components.import_queue,
block_announce_validator_builder: None,
warp_sync_params: None,
block_relay: None,
})?;
sync_service,
tx_handler_controller,
system_rpc_tx,
} = build_network(
&config.network,
config.protocol_id(),
service_components.client.clone(),
major_sync,
service_components.transaction_pool.clone(),
&service_components.task_manager.spawn_handle(),
config
.prometheus_config
.as_ref()
.map(|config| config.registry.clone()),
)?;

let chain_status = SubstrateChainStatus::new(service_components.backend.clone())
.map_err(|e| ServiceError::Other(format!("failed to set up chain status: {e}")))?;
Expand All @@ -347,9 +346,15 @@ pub fn new_authority(
})
};

let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network: network.clone(),
sync_service: sync_network.clone(),
service_components.task_manager.spawn_handle().spawn(
"import-queue",
None,
service_components.import_queue.run(Box::new(NoopLink)),
);

sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network,
sync_service,
client: service_components.client.clone(),
keystore: service_components.keystore_container.local_keystore(),
task_manager: &mut service_components.task_manager,
Expand All @@ -369,14 +374,6 @@ pub fn new_authority(

let rate_limiter_config = get_rate_limit_config(&aleph_config);

// Network event stream needs to be created before starting the network,
// otherwise some events might be missed.
let sync_network_service = SyncNetworkService::new(
network,
sync_network,
vec![authentication_network.name(), block_sync_network.name()],
);

let AlephRuntimeVars {
millisecs_per_block,
session_period,
Expand All @@ -385,7 +382,6 @@ pub fn new_authority(
let aleph_config = AlephConfig {
authentication_network,
block_sync_network,
sync_network_service,
client: service_components.client,
chain_status,
import_queue_handle,
Expand Down Expand Up @@ -413,6 +409,5 @@ pub fn new_authority(
.spawn_essential_handle()
.spawn_blocking("aleph", None, run_validator_node(aleph_config));

network_starter.start_network();
Ok(service_components.task_manager)
}
3 changes: 1 addition & 2 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub use crate::{
metrics::{AllBlockMetrics, DefaultClock, FinalityRateMetrics, TimingBlockMetrics},
network::{
address_cache::{ValidatorAddressCache, ValidatorAddressingInfo},
NetConfig, ProtocolNetwork, SubstratePeerId, SyncNetworkService,
build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId,
},
nodes::run_validator_node,
session::SessionPeriod,
Expand Down Expand Up @@ -262,7 +262,6 @@ pub struct RateLimiterConfig {
pub struct AlephConfig<C, SC, T> {
pub authentication_network: ProtocolNetwork,
pub block_sync_network: ProtocolNetwork,
pub sync_network_service: SyncNetworkService<AlephBlock>,
pub client: Arc<C>,
pub chain_status: SubstrateChainStatus,
pub import_queue_handle: BlockImporter,
Expand Down
17 changes: 11 additions & 6 deletions finality-aleph/src/network/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use sc_client_api::Backend;
use sc_network::{
config::{NetworkConfiguration, ProtocolId},
error::Error as NetworkError,
NetworkService,
};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
Expand All @@ -28,16 +29,16 @@ mod own_protocols;
mod rpc;
mod transactions;

// TODO(A0-3576): should no longer be needed outside of here
use base::network as base_network;
pub use own_protocols::Networks;
use own_protocols::Networks;
use rpc::spawn_rpc_service;
use transactions::spawn_transaction_handler;

const SPAWN_CATEGORY: Option<&str> = Some("networking");

/// Components created when spawning the network.
pub struct NetworkOutput<TP: TransactionPool + 'static> {
pub network: Arc<NetworkService<TP::Block, TP::Hash>>,
pub authentication_network: ProtocolNetwork,
pub block_sync_network: ProtocolNetwork,
// names chosen for compatibility with SpawnTaskParams, get better ones if we ever stop using that
Expand All @@ -48,8 +49,6 @@ pub struct NetworkOutput<TP: TransactionPool + 'static> {

/// Start everything necessary to run the inter-node network and return the interfaces for it.
/// This includes everything in the base network, the base protocol service, and services for handling transactions and RPCs.
// TODO(A0-3576): This code should be used.
#[allow(dead_code)]
pub fn network<TP, BE, C>(
network_config: &NetworkConfiguration,
protocol_id: ProtocolId,
Expand All @@ -60,7 +59,7 @@ pub fn network<TP, BE, C>(
metrics_registry: Option<Registry>,
) -> Result<NetworkOutput<TP>, NetworkError>
where
TP: TransactionPool + 'static,
TP: TransactionPool<Hash = BlockHash> + 'static,
TP::Block: Block<Hash = BlockHash>,
<TP::Block as Block>::Header: Header<Number = BlockNumber>,
BE: Backend<TP::Block>,
Expand Down Expand Up @@ -111,8 +110,14 @@ where
metrics_registry.as_ref(),
spawn_handle,
)?;
let rpc_interface = spawn_rpc_service(network, syncing_service.clone(), client, spawn_handle);
let rpc_interface = spawn_rpc_service(
network.clone(),
syncing_service.clone(),
client,
spawn_handle,
);
Ok(NetworkOutput {
network,
block_sync_network,
authentication_network,
sync_service: syncing_service,
Expand Down
5 changes: 2 additions & 3 deletions finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::{
hash::Hash,
};

pub use build::{network as build_network, NetworkOutput as BuildNetworkOutput};
use network_clique::{AddressingInformation, NetworkIdentity, PeerId};
use parity_scale_codec::Codec;
pub use substrate::{PeerId as SubstratePeerId, ProtocolNetwork, SyncNetworkService};
pub use substrate::{PeerId as SubstratePeerId, ProtocolNetwork};

pub mod address_cache;
mod base_protocol;
Expand All @@ -18,8 +19,6 @@ pub mod session;
mod substrate;
pub mod tcp;

pub use build::Networks as NetConfig;

const LOG_TARGET: &str = "aleph-network";

/// A basic alias for properties we expect basic data to satisfy.
Expand Down
88 changes: 2 additions & 86 deletions finality-aleph/src/network/substrate.rs
Original file line number Diff line number Diff line change
@@ -1,107 +1,23 @@
use std::{
collections::HashSet,
fmt::{Debug, Display, Error as FmtError, Formatter},
iter,
pin::Pin,
sync::Arc,
};

use futures::stream::{Fuse, Stream, StreamExt};
use log::{debug, error, info, trace, warn};
use log::{debug, info, trace, warn};
use parity_scale_codec::DecodeAll;
use rand::{seq::IteratorRandom, thread_rng};
pub use sc_network::PeerId;
use sc_network::{
multiaddr::Protocol as MultiaddressProtocol,
service::traits::{NotificationEvent as SubstrateEvent, ValidationResult},
Multiaddr, NetworkPeers, NetworkService, ProtocolName,
ProtocolName,
};
use sc_network_sync::{SyncEvent, SyncEventStream, SyncingService};
use sp_runtime::traits::Block;
use tokio::time;

use crate::{
network::{Data, GossipNetwork, LOG_TARGET},
STATUS_REPORT_INTERVAL,
};

#[derive(Debug)]
pub enum SyncNetworkServiceError {
NetworkStreamTerminated,
}

impl Display for SyncNetworkServiceError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
match self {
Self::NetworkStreamTerminated => write!(f, "Network event stream ended."),
}
}
}

// TODO(A0-3576): should no longer be needed, as it's purpose happens in base_protocol
/// Service responsible for handling network events emitted by the base sync protocol.
pub struct SyncNetworkService<B: Block> {
sync_stream: Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
network: Arc<NetworkService<B, B::Hash>>,
protocol_names: Vec<ProtocolName>,
}

impl<B: Block> SyncNetworkService<B> {
pub fn new(
network: Arc<NetworkService<B, B::Hash>>,
sync_network: Arc<SyncingService<B>>,
protocol_names: Vec<ProtocolName>,
) -> Self {
Self {
sync_stream: sync_network.event_stream("aleph-syncing-network").fuse(),
network,
protocol_names,
}
}

fn peer_connected(&mut self, remote: PeerId) {
let multiaddress: Multiaddr =
iter::once(MultiaddressProtocol::P2p(remote.into())).collect();
trace!(target: LOG_TARGET, "Connected event from address {:?}", multiaddress);

for name in &self.protocol_names {
if let Err(e) = self
.network
.add_peers_to_reserved_set(name.clone(), iter::once(multiaddress.clone()).collect())
{
error!(target: LOG_TARGET, "add_peers_to_reserved_set failed for {}: {}", name, e);
}
}
}

fn peer_disconnected(&mut self, remote: PeerId) {
trace!(target: LOG_TARGET, "Disconnected event for peer {:?}", remote);
let addresses: Vec<_> = iter::once(remote).collect();

for name in &self.protocol_names {
if let Err(e) = self
.network
.remove_peers_from_reserved_set(name.clone(), addresses.clone())
{
error!(target: LOG_TARGET, "remove_peers_from_reserved_set failed for {}: {}", name, e)
}
}
}

pub async fn run(mut self) -> Result<(), SyncNetworkServiceError> {
use SyncEvent::*;
loop {
match self.sync_stream.next().await {
Some(event) => match event {
PeerConnected(remote) => self.peer_connected(remote),
PeerDisconnected(remote) => self.peer_disconnected(remote),
},
None => return Err(SyncNetworkServiceError::NetworkStreamTerminated),
}
}
}
}

/// A thin wrapper around sc_network::config::NotificationService that stores a list
/// of all currently connected peers, and introduces a few convenience methods to
/// allow broadcasting messages and sending data to random peers.
Expand Down
12 changes: 0 additions & 12 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ where
let AlephConfig {
authentication_network,
block_sync_network,
sync_network_service,
client,
chain_status,
mut import_queue_handle,
Expand Down Expand Up @@ -132,16 +131,6 @@ where
}
});

let sync_network_task = async move {
match sync_network_service.run().await {
Ok(_) => error!(target: LOG_TARGET, "SyncNetworkService finished."),
Err(err) => error!(
target: LOG_TARGET,
"SyncNetworkService finished with error: {err}."
),
}
};

let map_updater = SessionMapUpdater::new(
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())),
FinalityNotifierImpl::new(client.clone()),
Expand Down Expand Up @@ -245,7 +234,6 @@ where
debug!(target: LOG_TARGET, "Sync has started.");

spawn_handle.spawn("aleph/connection_manager", connection_manager_task);
spawn_handle.spawn("aleph/sync_network", sync_network_task);
debug!(target: LOG_TARGET, "Sync network has started.");

let party = ConsensusParty::new(ConsensusPartyParams {
Expand Down

0 comments on commit 63578b9

Please sign in to comment.