Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dfir_rs)!: switch transducer -> process (fix #1572) #1684

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datastores/gossip_kv/kv/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub type MemberId = String;

/// Information about a member in the cluster.
///
/// A member is a transducer that is part of the cluster. Leaving or failing is a terminal
/// state for a member. When a transducer restarts and rejoins the cluster, it is considered a
/// A member is a process that is part of the cluster. Leaving or failing is a terminal
/// state for a member. When a process restarts and rejoins the cluster, it is considered a
/// new member.
///
/// # Generic Parameters
Expand Down
2 changes: 1 addition & 1 deletion datastores/gossip_kv/kv/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
dfir_syntax! {

on_start = initialize() -> tee();
on_start -> for_each(|_| info!("{:?}: Transducer {} started.", context.current_tick(), member_id_6));
on_start -> for_each(|_| info!("{:?}: Process {} started.", context.current_tick(), member_id_6));

seed_nodes = source_stream(seed_node_stream)
-> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec<SeedNode<Addr>>| {
Expand Down
2 changes: 1 addition & 1 deletion datastores/gossip_kv/load_test_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const UNKNOWN_ADDRESS: LoadTestAddress = 9999999999;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)]
struct Opts {
/// Number of threads to run. Each thread will run an instance of the gossip-kv server transducer.
/// Number of threads to run. Each thread will run an instance of the gossip-kv server process.
#[clap(short, long, default_value = "5")]
thread_count: usize,

Expand Down
16 changes: 8 additions & 8 deletions dfir_rs/examples/kvs_bench/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ pub fn run_server<RX>(

let buffer_pool = BufferPool::<BUFFER_SIZE>::create_buffer_pool();

let (transducer_to_peers_tx, mut transducer_to_peers_rx) =
let (process_to_peers_tx, mut process_to_peers_rx) =
dfir_rs::util::unsync_channel::<(Bytes, NodeId)>(None);

let (client_to_transducer_tx, client_to_transducer_rx) =
let (client_to_process_tx, client_to_process_rx) =
dfir_rs::util::unsync_channel::<(KvsRequest<BUFFER_SIZE>, NodeId)>(None);
let (transducer_to_client_tx, mut _transducer_to_client_rx) =
let (process_to_client_tx, mut _process_to_client_rx) =
dfir_rs::util::unsync_channel::<(KvsResponse<BUFFER_SIZE>, NodeId)>(None);

let localset = task::LocalSet::new();
Expand All @@ -71,7 +71,7 @@ pub fn run_server<RX>(
collector: Rc::clone(&buffer_pool),
}.deserialize(&mut deserializer).unwrap();

client_to_transducer_tx.try_send((req, node_id)).unwrap();
client_to_process_tx.try_send((req, node_id)).unwrap();
}
}
})
Expand All @@ -86,7 +86,7 @@ pub fn run_server<RX>(
// TODO: Eventually this would get moved into a dfir operator that would return a Bytes struct and be efficient and zero copy and etc.
async move {
loop {
while let Some((serialized_req, node_id)) = transducer_to_peers_rx.next().await {
while let Some((serialized_req, node_id)) = process_to_peers_rx.next().await {
let index = lookup.binary_search(&node_id).unwrap();
topology.tx[index].send(serialized_req).unwrap();
}
Expand Down Expand Up @@ -170,7 +170,7 @@ pub fn run_server<RX>(
union_puts_and_gossip_requests = union();

simulated_put_requests -> union_puts_and_gossip_requests;
source_stream(client_to_transducer_rx)
source_stream(client_to_process_rx)
// -> inspect(|x| println!("{server_id}:{:5}: from peers: {x:?}", context.current_tick()))
-> union_puts_and_gossip_requests;

Expand Down Expand Up @@ -244,7 +244,7 @@ pub fn run_server<RX>(

peers
// -> inspect(|x| println!("{server_id}:{:5}: sending to peers: {x:?}", context.current_tick()))
-> for_each(|(node_id, serialized_req)| transducer_to_peers_tx.try_send((serialized_req, node_id)).unwrap());
-> for_each(|(node_id, serialized_req)| process_to_peers_tx.try_send((serialized_req, node_id)).unwrap());

// join for lookups
lookup = _lattice_join_fused_join::<'static, 'tick, MyLastWriteWins<BUFFER_SIZE>, MySetUnion>();
Expand Down Expand Up @@ -275,7 +275,7 @@ pub fn run_server<RX>(
})
-> flatten()
// -> inspect(|x| println!("{gossip_addr}:{:5}: Response to client: {x:?}", context.current_tick()))
-> for_each(|x| transducer_to_client_tx.try_send(x).unwrap());
-> for_each(|x| process_to_client_tx.try_send(x).unwrap());

};

Expand Down
8 changes: 4 additions & 4 deletions dfir_rs/examples/shopping/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ cargo run -p hydroflow --example shopping -- --opt 5

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).

For options 1-4, the driver runs a single Hydro transducer (thread) that handles client requests.
For options 1-4, the driver runs a single Hydro process (thread) that handles client requests.

For options 5-6, the driver runs two Hydro transducers, one for each side of the network communication.
For options 5-6, the driver runs two Hydro processes, one for each side of the network communication.

For option 7, the driver runs four Hydro transducers: one client proxy and 3 server replicas.
For option 7, the driver runs four Hydro processes: one client proxy and 3 server replicas.

Under all options, the driver runs an additional independent Hydro transducer (thread) to receive the output of the flow and print it to the console. The code for this transducer is in `flows/listener_flow.rs`.
Under all options, the driver runs an additional independent Hydro process (thread) to receive the output of the flow and print it to the console. The code for this process is in `flows/listener_flow.rs`.
8 changes: 4 additions & 4 deletions dfir_rs/examples/shopping/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ pub(crate) async fn run_driver(opts: Opts) {
let client_out_addr = ipv4_resolve("localhost:23460").unwrap();
let (client_out, _, _) = dfir_rs::util::bind_udp_bytes(client_out_addr).await;

// shopping input is handled by the client proxy transducer
// so the server transducer should get an empty iterator as its first argument
// shopping input is handled by the client proxy process
// so the server process should get an empty iterator as its first argument
let empty_ssiv = std::iter::empty();

// Spawn server
Expand Down Expand Up @@ -180,8 +180,8 @@ pub(crate) async fn run_driver(opts: Opts) {
let (out, _, _) = dfir_rs::util::bind_udp_bytes(out_addr).await;
let gossip_addrs = gossip_addrs.clone();

// shopping input is handled by the client proxy transducer
// so the server transducers should get an empty iterator as first argument
// shopping input is handled by the client proxy process
// so the server processes should get an empty iterator as first argument
let empty_ssiv = std::iter::empty();

// Spawn server
Expand Down
6 changes: 3 additions & 3 deletions dfir_rs/examples/shopping/flows/client_state_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub(crate) async fn client_state_flow(
// Set up the Udp socket for proxy-server communication
let (carts_out, carts_in, _) = dfir_rs::util::bind_udp_bytes(local_addr).await;

// This is like server_state_flow, but we split it into two transducers at a different spot.
// Here, the first transducer takes in shopping_ssiv requests and runs a stateful fold_keyed, persisting all the shopping requests in ssiv's.
// The second transducer listens on reqs_in and runs the lookup join.
// This is like server_state_flow, but we split it into two processes at a different spot.
// Here, the first process takes in shopping_ssiv requests and runs a stateful fold_keyed, persisting all the shopping requests in ssiv's.
// The second process listens on reqs_in and runs the lookup join.
dfir_syntax! {
source_iter(shopping_ssiv)
-> fold_keyed::<'static>(SSIV_BOT, ssiv_merge)
Expand Down
2 changes: 1 addition & 1 deletion dfir_rs/examples/shopping/flows/orig_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) async fn orig_flow(
) -> Dfir<'static> {
let client_class = client_class_iter();

// This is the straightforward single-transducer sequential case.
// This is the straightforward single-process sequential case.
// For each Request in "shopping" we look up its "client_class" (basic or prime)
// via a join operator, then we group by (client, class), and for each such pair
// we grow a separate vector of ClLineItems. No seal is needed in the sequential case.
Expand Down
6 changes: 3 additions & 3 deletions dfir_rs/examples/shopping/flows/server_state_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub(crate) async fn server_state_flow(
// Set up the Udp socket for proxy-server communication
let (reqs_out, reqs_in, _) = dfir_rs::util::bind_udp_bytes(local_addr).await;

// This is like push_group_flow, but we split it into two transducers that communicate via reqs_out and reqs_in.
// The first transducer takes in shopping_ssiv requests, and forwards them via reqs_out to the second transducer.
// The second transducer listens on reqs_in and runs the stateful logic of fold_keyed and join.
// This is like push_group_flow, but we split it into two processes that communicate via reqs_out and reqs_in.
// The first process takes in shopping_ssiv requests, and forwards them via reqs_out to the second process.
// The second process listens on reqs_in and runs the stateful logic of fold_keyed and join.
dfir_syntax! {
// Networked: Server-Side State
source_iter(shopping_ssiv)
Expand Down
12 changes: 6 additions & 6 deletions dfir_rs/src/scheduled/ticks.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
//! This module contains types to work with ticks.
//!
//! Each iteration of a Hydroflow transducer loop is called a tick. Associated with the transducer
//! is a clock value, which tells you how many ticks were executed by this transducer prior to the
//! current tick. Each transducer produces totally ordered, sequentially increasing clock values,
//! which you can think of as the "local logical time" at the transducer.
//! Each iteration of a Hydroflow process loop is called a tick. Associated with the process
//! is a clock value, which tells you how many ticks were executed by this process prior to the
//! current tick. Each process produces totally ordered, sequentially increasing clock values,
//! which you can think of as the "local logical time" at the process.

use std::fmt::{Display, Formatter};
use std::ops::{Add, AddAssign, Neg, Sub, SubAssign};

use serde::{Deserialize, Serialize};

/// A point in time during execution on transducer.
/// A point in time during execution on process.
///
/// `TickInstant` instances can be subtracted to calculate the `TickDuration` between them.
///
Expand Down Expand Up @@ -66,7 +66,7 @@ pub struct TickDuration {
impl TickInstant {
/// Create a new TickInstant
///
/// The specified parameter indicates the number of ticks that have elapsed on the transducer,
/// The specified parameter indicates the number of ticks that have elapsed on the process,
/// prior to this one.
pub fn new(ticks: u64) -> Self {
TickInstant(ticks)
Expand Down
68 changes: 34 additions & 34 deletions dfir_rs/src/util/simulation.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! # Hydroflow Deterministic Simulation Testing Framework
//!
//! This module provides a deterministic simulation testing framework for testing Hydroflow
//! transducers.
//! processes.
//!
//! It can be used to test complex interactions between multiple Hydroflow transducers in a
//! It can be used to test complex interactions between multiple Hydroflow processes in a
//! deterministic manner by running them in a single-threaded environment. The framework also
//! provides a "virtual network" implementation that allows production transducers to exchange
//! provides a "virtual network" implementation that allows production processes to exchange
//! messages within the simulation. More importantly, the network is fully under control of the
//! unit test and the test can introduce faults such as message delays, message drops and
//! network partitions.
//!
//! ## Overview
//!
//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
//! aren't real hosts, but rather a collection of individual Hydroflow transducers (one per host)
//! aren't real hosts, but rather a collection of individual Hydroflow processes (one per host)
//! that can communicate with each other over a virtual network. Every host has a "hostname"
//! which uniquely identifies it within the fleet.
//!
Expand Down Expand Up @@ -47,21 +47,21 @@
//! ## Network Processing
//!
//! ### Outboxes & Inboxes
//! When a transducer wishes to send a message to another transducer, it sends the message to an
//! When a process wishes to send a message to another process, it sends the message to an
//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
//! corresponding inboxes on the destination hosts. The network message processing logic is the
//! point at which failures can be injected to change the behavior of the network.
//!
//! ### Interface Names
//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
//! identifies the interface on the host. When a transducer sends a message, it specifies the
//! identifies the interface on the host. When a process sends a message, it specifies the
//! destination hostname and the interface name on that host to which the message should be
//! delivered.
//!
//! ## Progress of Time in the Simulation
//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
//! method on the host. This ultimately runs a single tick on the transducer. The unit test is
//! method on the host. This ultimately runs a single tick on the process. The unit test is
//! also responsible for invoking the network message processing at the time of its choosing and
//! can interleave the progress of time on various hosts and network processing as it sees fit.
//!
Expand All @@ -85,7 +85,7 @@ use crate::scheduled::graph::Dfir;
use crate::util::{collect_ready_async, unbounded_channel};

/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
/// to a specific host (and thus a specific Hydroflow transducer).
/// to a specific host (and thus a specific Hydroflow process).
pub type Hostname = String;

/// An interface name is a unique identifier for an inbox or an outbox on host.
Expand Down Expand Up @@ -127,45 +127,45 @@ impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
/// A message with an delivery address.
pub type MessageWithAddress = (Box<dyn Any>, Address);

/// An inbox is used by a host to receive messages for the transducer.
/// An inbox is used by a host to receive messages for the process.
pub struct Inbox {
sender: Box<dyn MessageSender>,
}

/// Transducers can send messages to other transducers by putting those messages in an outbox
/// Processes can send messages to other processes by putting those messages in an outbox
/// on their host.
pub struct Outbox {
receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
}

/// A host is a single Hydroflow transducer running in the simulation. It has a unique hostname
/// A host is a single Hydroflow process running in the simulation. It has a unique hostname
/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
/// and outboxes.
pub struct Host {
name: Hostname,
transducer: Dfir<'static>,
process: Dfir<'static>,
inputs: HashMap<InterfaceName, Inbox>,
output: HashMap<InterfaceName, Outbox>,
}

impl Host {
/// Run a single tick on the host's transducer. Returns true if any work was done by the
/// transducer. This effectively "advances" time on the transducer.
/// Run a single tick on the host's process. Returns true if any work was done by the
/// process. This effectively "advances" time on the process.
pub fn run_tick(&mut self) -> bool {
self.transducer.run_tick()
self.process.run_tick()
}
}

/// A builder for constructing a host in the simulation.
pub struct HostBuilder {
name: Hostname,
transducer: Option<Dfir<'static>>,
process: Option<Dfir<'static>>,
inboxes: HashMap<InterfaceName, Inbox>,
outboxes: HashMap<InterfaceName, Outbox>,
}

/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
pub struct TransducerBuilderContext<'context> {
pub struct ProcessBuilderContext<'context> {
inboxes: &'context mut HashMap<InterfaceName, Inbox>,
outboxes: &'context mut HashMap<InterfaceName, Outbox>,
}
Expand All @@ -177,9 +177,9 @@ fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
})
}

impl TransducerBuilderContext<'_> {
impl ProcessBuilderContext<'_> {
/// Create a new inbox on the host with the given interface name. Returns a stream that can
/// be read by the transducer using the source_stream dfir operator.
/// be read by the process using the source_stream dfir operator.
pub fn new_inbox<T: 'static>(
&mut self,
interface: InterfaceName,
Expand All @@ -195,7 +195,7 @@ impl TransducerBuilderContext<'_> {
}

/// Creates a new outbox on the host with the given interface name. Returns a sink that can
/// be written to by the transducer using the dest_sink dfir operator.
/// be written to by the process using the dest_sink dfir operator.
pub fn new_outbox<T: 'static>(
&mut self,
interface: InterfaceName,
Expand All @@ -220,35 +220,35 @@ impl HostBuilder {
pub fn new(name: Hostname) -> Self {
HostBuilder {
name,
transducer: None,
process: None,
inboxes: Default::default(),
outboxes: Default::default(),
}
}

/// Supplies the (mandatory) transducer that runs on this host.
pub fn with_transducer<F>(mut self, builder: F) -> Self
/// Supplies the (mandatory) process that runs on this host.
pub fn with_process<F>(mut self, builder: F) -> Self
where
F: FnOnce(&mut TransducerBuilderContext) -> Dfir<'static>,
F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
{
let mut context = TransducerBuilderContext {
let mut context = ProcessBuilderContext {
inboxes: &mut self.inboxes,
outboxes: &mut self.outboxes,
};
let transducer = builder(&mut context);
self.transducer = Some(transducer);
let process = builder(&mut context);
self.process = Some(process);
self
}

/// Builds the host with the supplied configuration.
pub fn build(self) -> Host {
if self.transducer.is_none() {
panic!("Transducer is required to build a host");
if self.process.is_none() {
panic!("Process is required to build a host");
}

Host {
name: self.name,
transducer: self.transducer.unwrap(),
process: self.process.unwrap(),
inputs: self.inboxes,
output: self.outboxes,
}
Expand All @@ -269,13 +269,13 @@ impl Fleet {
}
}

/// Adds a new host to the fleet with the given name and transducer.
pub fn add_host<F>(&mut self, name: String, transducer_builder: F) -> &Host
/// Adds a new host to the fleet with the given name and process.
pub fn add_host<F>(&mut self, name: String, process_builder: F) -> &Host
where
F: FnOnce(&mut TransducerBuilderContext) -> Dfir<'static>,
F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
{
let host = HostBuilder::new(name.clone())
.with_transducer(transducer_builder)
.with_process(process_builder)
.build();
assert!(
self.hosts.insert(host.name.clone(), host).is_none(),
Expand Down
Loading
Loading