Skip to content

Commit

Permalink
feat(Gossip KV): Cleanup code duplication in main.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitkulshreshtha committed Nov 5, 2024
1 parent 3143bf5 commit 3940ea5
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 65 deletions.
1 change: 1 addition & 0 deletions .idea/hydroflow.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

140 changes: 75 additions & 65 deletions datastores/gossip_kv/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ use std::future::ready;
use std::hash::Hash;
use std::io::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::num::ParseFloatError;
use std::time::Duration;

use clap::Parser;
use gossip_kv::membership::{MemberDataBuilder, Protocol};
use gossip_kv::server::{server, SeedNode};
use gossip_kv::{ClientRequest, GossipMessage};
use hydroflow::futures::{SinkExt, StreamExt};
use hydroflow::futures::{Sink, SinkExt, StreamExt};
use hydroflow::tokio_stream::wrappers::IntervalStream;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use hydroflow::{bincode, tokio};
use hydroflow::{bincode, bytes, tokio};
use prometheus::{gather, Encoder, TextEncoder};
use serde::Serialize;
use tracing::{error, info, trace};
use warp::Filter;

Expand All @@ -33,15 +35,27 @@ struct Opts {
/// Port to listen for client requests.
#[clap(short, long, default_value = "3001")]
client_port: u16,

/// The duration (in seconds) between gossip rounds.
#[clap(short, long, default_value = "5", value_parser = clap_duration_from_secs)]
gossip_frequency: Duration,
}

/// Parse duration from float string for clap args.
fn clap_duration_from_secs(arg: &str) -> Result<Duration, ParseFloatError> {
arg.parse().map(Duration::from_secs_f32)
}

/// Create a SeedNode from a SeedNodeSettings.
/// Performs a DNS lookup on the address.
fn make_seed_node(settings: &SeedNodeSettings) -> SeedNode<SocketAddr> {
SeedNode {
id: settings.id.clone(),
address: ipv4_resolve(&settings.address).unwrap(),
}
}

/// Handler for the /metrics route. Used to expose prometheus metrics for the server.
async fn metrics_handler() -> Result<impl warp::Reply, Infallible> {
let encoder = TextEncoder::new();
let metric_families = gather();
Expand All @@ -55,19 +69,66 @@ async fn metrics_handler() -> Result<impl warp::Reply, Infallible> {
))
}

/// Setup serialization for outbound networking messages.
fn setup_outbound_serialization<OUTBOUND, MESSAGE>(
outbound: OUTBOUND,
) -> impl Sink<(MESSAGE, SocketAddr), Error = Error>
where
OUTBOUND: Sink<(bytes::Bytes, SocketAddr), Error = Error>,
MESSAGE: Serialize + Debug + Send + 'static,
{
outbound.with(|(msg, addr): (MESSAGE, SocketAddr)| {
ready(Ok::<(bytes::Bytes, SocketAddr), Error>((
hydroflow::util::serialize_to_bytes(msg),
addr,
)))
})
}

/// Setup deserialization for inbound networking messages.
fn setup_inbound_deserialization<INBOUND, MESSAGE>(
inbound: INBOUND,
) -> impl hydroflow::futures::Stream<Item = (MESSAGE, SocketAddr)>
where
INBOUND: hydroflow::futures::Stream<Item = Result<(bytes::BytesMut, SocketAddr), Error>>,
MESSAGE: for<'de> serde::Deserialize<'de> + Debug + Send + 'static,
{
inbound.filter_map(|input| {
let mapped = match input {
Ok((bytes, addr)) => {
let msg: bincode::Result<MESSAGE> = hydroflow::util::deserialize_from_bytes(&bytes);
match msg {
Ok(msg) => Some((msg, addr)),
Err(e) => {
error!("Error deserializing message: {:?}", e);
None
}
}
}
Err(e) => {
error!("Error receiving message: {:?}", e);
None
}
};
ready(mapped)
})
}

#[hydroflow::main]
async fn main() {
tracing_subscriber::fmt::init();

let opts: Opts = Opts::parse();

// Setup metrics server
let metrics_route = warp::path("metrics").and_then(metrics_handler);
tokio::spawn(async move {
info!("Starting metrics server on port 4003");
warp::serve(metrics_route).run(([0, 0, 0, 0], 4003)).await;
});

// Setup protocol information in the member metadata.
// Setup protocol information for this member
let client_protocol_address =
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), opts.client_port);
let gossip_protocol_address =
Expand All @@ -78,6 +139,7 @@ async fn main() {
.add_protocol(Protocol::new("client".into(), client_protocol_address))
.build();

// Bind to the UDP ports
let (client_outbound, client_inbound, _) = bind_udp_bytes(client_protocol_address).await;
let (gossip_outbound, gossip_inbound, _) = bind_udp_bytes(gossip_protocol_address).await;

Expand All @@ -86,70 +148,18 @@ async fn main() {
member_data.id, client_protocol_address
);

// TODO: Remove code duplication here.
// Setup message serialization for outbound client responses.
let client_ob = client_outbound.with(|(msg, addr)| {
ready(Ok::<(hydroflow::bytes::Bytes, SocketAddr), Error>((
hydroflow::util::serialize_to_bytes(msg),
addr,
)))
});
// Setup serde for client requests
let client_ob = setup_outbound_serialization(client_outbound);
let client_ib = setup_inbound_deserialization(client_inbound);

// Setup message deserialization for inbound client requests.
let client_ib = client_inbound.filter_map(|input| {
let mapped = match input {
Ok((bytes, addr)) => {
let msg: bincode::Result<ClientRequest> =
hydroflow::util::deserialize_from_bytes(&bytes);
match msg {
Ok(msg) => Some((msg, addr)),
Err(e) => {
error!("Error deserializing message: {:?}", e);
None
}
}
}
Err(e) => {
error!("Error receiving message: {:?}", e);
None
}
};
ready(mapped)
});

// Setup message serialization for outbound client responses.
let gossip_ob = gossip_outbound.with(|(msg, addr)| {
ready(Ok::<(hydroflow::bytes::Bytes, SocketAddr), Error>((
hydroflow::util::serialize_to_bytes(msg),
addr,
)))
});

// Setup message deserialization for inbound client requests.
let gossip_ib = gossip_inbound.filter_map(|input| {
let mapped = match input {
Ok((bytes, addr)) => {
let msg: bincode::Result<GossipMessage> =
hydroflow::util::deserialize_from_bytes(&bytes);
match msg {
Ok(msg) => Some((msg, addr)),
Err(e) => {
error!("Error deserializing message: {:?}", e);
None
}
}
}
Err(e) => {
error!("Error receiving message: {:?}", e);
None
}
};
ready(mapped)
});
// Setup serde for gossip messages
let gossip_ob = setup_outbound_serialization(gossip_outbound);
let gossip_ib = setup_inbound_deserialization(gossip_inbound);

let gossip_rx =
IntervalStream::new(tokio::time::interval(tokio::time::Duration::from_secs(5))).map(|_| ());
// Setup regular gossip triggers
let gossip_rx = IntervalStream::new(tokio::time::interval(opts.gossip_frequency)).map(|_| ());

// Setup watcher for setting changes
let (_watcher, server_settings, settings_stream) = setup_settings_watch();

let seed_nodes = server_settings
Expand Down

0 comments on commit 3940ea5

Please sign in to comment.