Skip to content

Commit

Permalink
Merge with new hydro operator api
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Feb 7, 2025
2 parents 5cfd098 + 41e5bb9 commit 268c085
Show file tree
Hide file tree
Showing 123 changed files with 10,662 additions and 2,066 deletions.
20 changes: 11 additions & 9 deletions .github/workflows/build-cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ jobs:
run: |
pip install hydro_deploy/hydro_cli/dist/${{ env.PACKAGE_NAME }}-*.whl --force-reinstall
- name: "Upload wheels"
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: wheels
name: wheels-${{ matrix.target }}
path: hydro_deploy/hydro_cli/dist

linux:
Expand Down Expand Up @@ -92,9 +92,9 @@ jobs:
run: |
pip install hydro_deploy/hydro_cli/dist/${{ env.PACKAGE_NAME }}-*.whl --force-reinstall
- name: "Upload wheels"
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: wheels
name: wheels-${{ matrix.target }}
path: hydro_deploy/hydro_cli/dist

linux-cross:
Expand All @@ -121,9 +121,9 @@ jobs:
manylinux: auto
args: --release --out dist
- name: "Upload wheels"
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: wheels
name: wheels-${{ matrix.platform.target }}
path: hydro_deploy/hydro_cli/dist

windows:
Expand Down Expand Up @@ -158,9 +158,9 @@ jobs:
run: |
python -m pip install hydro_deploy/hydro_cli/dist/${{ env.PACKAGE_NAME }}-*.whl --force-reinstall
- name: "Upload wheels"
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: wheels
name: wheels-${{ matrix.platform.target }}
path: hydro_deploy/hydro_cli/dist

release:
Expand All @@ -175,7 +175,9 @@ jobs:
steps:
- uses: actions/download-artifact@v3
with:
name: wheels
pattern: wheels-*
path: wheels
merge-multiple: true
- uses: actions/setup-python@v5
- name: "Publish to PyPi"
env:
Expand Down
25 changes: 12 additions & 13 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
{
"rust-analyzer.runnableEnv": [
{
"env": {
// Stack backtraces.
"RUST_BACKTRACE": "full",
// Set output levels for `tracing` logging.
"RUST_LOG": "debug,hydroflow=trace",
// Make sure all snapshots are written instead of just the first failure.
"INSTA_FORCE_PASS": "1",
"INSTA_UPDATE": "always",
"TRYBUILD": "overwrite",
}
}
"rust-analyzer.runnables.extraEnv": {
// Stack backtraces.
"RUST_BACKTRACE": "full",
// Set output levels for `tracing` logging.
"RUST_LOG": "debug,dfir_rs=trace",
// Make sure all snapshots are written instead of just the first failure.
"INSTA_FORCE_PASS": "1",
"INSTA_UPDATE": "always",
"TRYBUILD": "overwrite",
},
"rust-analyzer.runnables.extraTestBinaryArgs": [
"--nocapture"
],
"rust-analyzer.cargo.features": ["deploy"],
"editor.semanticTokenColorCustomizations": {
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
<img src="https://raw.githubusercontent.com/hydro-project/hydro/main/docs/static/img/hydro-logo.svg" width="200" alt='"hf"'>
</h1>
<p align="center">
<a href="https://crates.io/crates/hydroflow"><img src="https://img.shields.io/crates/v/hydroflow?style=flat-square&logo=rust" alt="Crates.io"></a>
<a href="https://docs.rs/hydroflow/"><img src="https://img.shields.io/badge/docs.rs-Hydroflow-blue?style=flat-square&logo=read-the-docs&logoColor=white" alt="Docs.rs"></a>
<a href="https://crates.io/crates/hydro_lang"><img src="https://img.shields.io/crates/v/hydro_lang?style=flat-square&logo=rust" alt="Crates.io"></a>
<a href="https://docs.rs/hydro_lang/"><img src="https://img.shields.io/badge/docs.rs-Hydro-blue?style=flat-square&logo=read-the-docs&logoColor=white" alt="Docs.rs"></a>
</p>

Hydro is a novel distributed programming library for standard Rust. Hydro allows developers to build distributed systems that are efficient, scalable, and correct.
Hydro is a high-level distributed programming framework for Rust. Hydro can help you quickly write scalable distributed services that are correct by construction. Much like Rust helps with memory safety, Hydro helps with [**distributed safety**](https://hydro.run/docs/hydro/correctness.md).

Hydro integrates naturally into standard Rust constructs and IDEs, providing types and programming constructs for ensuring distributed correctness. Under the covers it provides a metaprogrammed compiler that optimizes for cross-node issues of scaling and data movement while leveraging Rust and LLVM for per-node performance.
Hydro integrates naturally into standard Rust constructs and IDEs, providing types and programming constructs for ensuring distributed safety. Under the covers it provides a metaprogrammed compiler that optimizes for cross-node issues of scaling and data movement while leveraging Rust and LLVM for per-node performance.

We often describe Hydro via a metaphor: *LLVM for the cloud*. Like LLVM, Hydro is a layered compilation framework with a low-level Internal Representation language. In contrast to LLVM, Hydro focuses on distributed aspects of modern software.

Expand All @@ -17,10 +17,10 @@ We often describe Hydro via a metaphor: *LLVM for the cloud*. Like LLVM, Hydro i
</div>


## The Language (and the Low-Level IR)
Hydro provides a [high-level language](https://hydro.run/docs/hydro) that allows you to program an entire fleet of processes from a single program, and then launch your fleet locally or in the cloud via [Hydro Deploy](https://hydro.run/docs/deploy). Get started with Hydro via the language [documentation](https://hydro.run/docs/hydro) and [examples](https://github.com/hydro-project/hydro/tree/main/hydro_test/examples).
## The Hydro API (and the Low-Level IR)
Hydro provides a [high-level API](https://hydro.run/docs/hydro) that allows you to program an entire fleet of processes from a single program, and then launch your fleet locally or in the cloud via [Hydro Deploy](https://hydro.run/docs/deploy). Get started with Hydro via the API [documentation](https://hydro.run/docs/hydro) and [examples](https://github.com/hydro-project/hydro/tree/main/hydro_test/examples).

> Internally, the Hydro stack compiles Hydro programs into a low-level Dataflow Internal Representation (IR) language called [DFIR](https://hydro.run/docs/dfir); each process corresponds to a separate DFIR program. In rare cases you may want to compose one or more processes in DFIR by hand; see the DFIR [documentation](https://hydro.run/docs/dfir) or [examples](https://github.com/hydro-project/hydro/tree/main/dfir_rs/examples) for details.
> Internally, the Hydro stack compiles Hydro programs into a low-level single-threaded DataFlow Internal Representation (IR) language called [DFIR](https://hydro.run/docs/dfir); each Hydro process corresponds to a separate DFIR program. In rare cases you may want to hand-author one or more processes in DFIR; see the DFIR [documentation](https://hydro.run/docs/dfir) or [examples](https://github.com/hydro-project/hydro/tree/main/dfir_rs/examples) for details.
## Development Setup

Expand All @@ -32,7 +32,7 @@ There have been many frameworks and platforms for distributed programming over t
**Higher level frameworks** have been designed to serve specialized distributed use cases. These including *Client-Server (Monolith)* frameworks (e.g. Ruby on Rails + DBMS), parallel *Bulk Dataflow* frameworks (e.g. Spark, Flink, etc.), and step-wise *Workflows / Pipelines / Serverless / μservice Orchestration* frameworks (e.g. Kafka, Airflow). All of these frameworks offer limited expressibility and are inefficient outside their sweet spot. Each one ties developers' hands in different ways.

**Lower level asynchronous APIs** provide general-purpose distributed interfaces for sequential programming, including
*RPCs*, *Async/Await* frameworks and *Actor* frameworks (e.g. Akka, Ray, Unison, Orleans, gRPC). These interfaces allow developers to build distributed systems *one async sequential process* at a time. While they offer low-level control of individual processes, they provide minimal help for global correctness of the fleet.
*RPCs*, *Async/Await* frameworks and *Actor* frameworks (e.g. Akka, Ray, Unison, Orleans, gRPC). These interfaces allow developers to build distributed systems *one async sequential process* at a time. While they offer low-level control of individual processes, they provide minimal help with ensuring the global correctness of the fleet.

## Towards a more comprehensive approach
What's wanted, we believe, is a proper language stack addressing distributed concerns:
Expand Down
4 changes: 2 additions & 2 deletions datastores/gossip_kv/kv/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub type MemberId = String;

/// Information about a member in the cluster.
///
/// A member is a transducer that is part of the cluster. Leaving or failing is a terminal
/// state for a member. When a transducer restarts and rejoins the cluster, it is considered a
/// A member is a process that is part of the cluster. Leaving or failing is a terminal
/// state for a member. When a process restarts and rejoins the cluster, it is considered a
/// new member.
///
/// # Generic Parameters
Expand Down
4 changes: 2 additions & 2 deletions datastores/gossip_kv/kv/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
dfir_syntax! {

on_start = initialize() -> tee();
on_start -> for_each(|_| info!("{:?}: Transducer {} started.", context.current_tick(), member_id_6));
on_start -> for_each(|_| info!("{:?}: Process {} started.", context.current_tick(), member_id_6));

seed_nodes = source_stream(seed_node_stream)
-> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec<SeedNode<Addr>>| {
Expand Down Expand Up @@ -274,7 +274,7 @@ where
new_writes -> for_each(|x| trace!("NEW WRITE: {:?}", x));

// Step 1: Put the new writes in a map, with the write as the key and a SetBoundedLattice as the value.
infecting_writes = union() -> state::<'static, MapUnionHashMap<MessageId, InfectingWrite>>();
infecting_writes = union() -> state_by::<'static, MapUnionHashMap<MessageId, InfectingWrite>>(std::convert::identity, std::default::Default::default);

new_writes -> map(|write| {
// Ideally, the write itself is the key, but writes are a hashmap and hashmaps don't
Expand Down
2 changes: 1 addition & 1 deletion datastores/gossip_kv/load_test_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const UNKNOWN_ADDRESS: LoadTestAddress = 9999999999;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)]
struct Opts {
/// Number of threads to run. Each thread will run an instance of the gossip-kv server transducer.
/// Number of threads to run. Each thread will run an instance of the gossip-kv server process.
#[clap(short, long, default_value = "5")]
thread_count: usize,

Expand Down
2 changes: 1 addition & 1 deletion dfir_datalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use quote::{quote, ToTokens};
///
/// This uses a variant of Datalog that is similar to [Dedalus](https://www2.eecs.berkeley.edu/Pubs/TechRpts/2009/EECS-2009-173.pdf).
///
/// For examples, see [the datalog tests in the Hydroflow repo](https://github.com/hydro-project/hydro/blob/main/hydroflow/tests/datalog_frontend.rs).
/// For examples, see [the datalog tests in the Hydroflow repo](https://github.com/hydro-project/hydro/blob/main/dfir_rs/tests/datalog_frontend.rs).
// TODO(mingwei): rustdoc examples inline.
#[proc_macro]
pub fn datalog(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
Expand Down
2 changes: 1 addition & 1 deletion dfir_lang/src/graph/ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
};
use crate::diagnostic::{Diagnostic, Level};

/// > 2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
/// > 2 input streams of type `<(K, V1)>` and `<(K, V2)>`, 1 output stream of type `<(K, (V1, V2))>`
///
/// Forms the equijoin of the tuples in the input streams by their first (key) attribute. Note that the result nests the 2nd input field (values) into a tuple in the 2nd output field.
///
Expand Down
2 changes: 1 addition & 1 deletion dfir_lang/src/graph/ops/join_fused.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{
};
use crate::diagnostic::{Diagnostic, Level};

/// > 2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
/// > 2 input streams of type `<(K, V1)>` and `<(K, V2)>`, 1 output stream of type `<(K, (V1, V2))>`
///
/// `join_fused` takes two arguments, they are the configuration options for the left hand side and right hand side inputs respectively.
/// There are three available configuration options, they are `Reduce`: if the input type is the same as the accumulator type,
Expand Down
2 changes: 1 addition & 1 deletion dfir_lang/src/graph/ops/join_multiset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
WriteContextArgs, RANGE_0, RANGE_1,
};

/// > 2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
/// > 2 input streams of type `<(K, V1)>` and `<(K, V2)>`, 1 output stream of type `<(K, (V1, V2))>`
///
/// This operator is equivalent to `join` except that the LHS and RHS are collected into multisets rather than sets before joining.
///
Expand Down
2 changes: 1 addition & 1 deletion dfir_lang/src/graph/ops/lattice_bimorphism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
RANGE_0, RANGE_1,
};

/// An operator representing a [lattice bimorphism](https://hydro.run/docs/hydroflow/lattices_crate/lattice_math#lattice-bimorphism).
/// An operator representing a [lattice bimorphism](https://hydro.run/docs/dfir/lattices_crate/lattice_math#lattice-bimorphism).
///
/// > 2 input streams, of type `LhsItem` and `RhsItem`.
///
Expand Down
2 changes: 1 addition & 1 deletion dfir_lang/src/graph/ops/reduce_keyed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::diagnostic::{Diagnostic, Level};
/// > The output will have one tuple for each distinct `K`, with an accumulated (reduced) value of
/// > type `V`.
///
/// If you need the accumulated value to have a different type than the input, use [`fold_keyed`](#keyed_fold).
/// If you need the accumulated value to have a different type than the input, use [`fold_keyed`](#fold_keyed).
///
/// > Arguments: one Rust closures. The closure takes two arguments: an `&mut` 'accumulator', and
/// > an element. Accumulator should be updated based on the element.
Expand Down
4 changes: 3 additions & 1 deletion dfir_lang/src/graph/ops/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use super::{
/// -> map(SetUnionSingletonSet::new_from)
/// -> state::<SetUnionHashSet<usize>>();
/// ```
/// The `state` operator is equivalent to `state_by` used with an identity mapping operator with
/// `Default::default` providing the factory function.
pub const STATE: OperatorConstraints = OperatorConstraints {
name: "state",
categories: &[OperatorCategory::Persistence],
Expand All @@ -39,7 +41,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints {
diagnostics| {

let wc = WriteContextArgs {
arguments: &parse_quote_spanned!(op_span => ::std::convert::identity),
arguments: &parse_quote_spanned!(op_span => ::std::convert::identity, ::std::default::Default::default),
..wc.clone()
};

Expand Down
46 changes: 35 additions & 11 deletions dfir_lang/src/graph/ops/state_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use super::{
};
use crate::diagnostic::{Diagnostic, Level};

/// List state operator, but with a closure to map the input to the state lattice.
/// List state operator, but with a closure to map the input to the state lattice and a factory
/// function to initialize the internal data structure.
///
/// The emitted outputs (both the referencable singleton and the optional pass-through stream) are
/// of the same type as the inputs to the state_by operator and are not required to be a lattice
Expand All @@ -15,19 +16,37 @@ use crate::diagnostic::{Diagnostic, Level};
/// ```dfir
/// use std::collections::HashSet;
///
///
/// use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
///
/// my_state = source_iter(0..3)
/// -> state_by::<SetUnionHashSet<usize>>(SetUnionSingletonSet::new_from);
/// -> state_by::<SetUnionHashSet<usize>>(SetUnionSingletonSet::new_from, std::default::Default::default);
/// ```
/// The 2nd argument into `state_by` is a factory function that can be used to supply a custom
/// initial value for the backing state. The initial value is still expected to be bottom (and will
/// be checked). This is useful for doing things like pre-allocating buffers, etc. In the above
/// example, it is just using `Default::default()`
///
/// An example of preallocating the capacity in a hashmap:
///
///```dfir
/// use std::collections::HashSet;
/// use lattices::set_union::{SetUnion, CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
///
/// my_state = source_iter(0..3)
/// -> state_by::<SetUnionHashSet<usize>>(SetUnionSingletonSet::new_from, {|| SetUnion::new(HashSet::<usize>::with_capacity(1_000)) });
///```
///
/// The `state` operator is equivalent to `state_by` used with an identity mapping operator with
/// `Default::default` providing the factory function.
pub const STATE_BY: OperatorConstraints = OperatorConstraints {
name: "state_by",
categories: &[OperatorCategory::Persistence],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: &(0..=1),
soft_range_out: &(0..=1),
num_args: 1,
num_args: 2,
persistence_args: &(0..=1),
type_args: &(0..=1),
is_external_input: false,
Expand Down Expand Up @@ -80,19 +99,24 @@ pub const STATE_BY: OperatorConstraints = OperatorConstraints {
_ => unreachable!(),
};


let state_ident = singleton_output_ident;
let mut write_prologue = quote_spanned! {op_span=>
let #state_ident = #hydroflow.add_state(::std::cell::RefCell::new(
<#lattice_type as ::std::default::Default>::default()
));
let factory_fn = &arguments[1];

let mut write_prologue = quote_spanned! { op_span=>
let #state_ident = {
let data_struct : #lattice_type = (#factory_fn)();
::std::debug_assert!(::lattices::IsBot::is_bot(&data_struct));
#hydroflow.add_state(::std::cell::RefCell::new(data_struct))
};
};
if Persistence::Tick == persistence {
write_prologue.extend(quote_spanned! {op_span=>
#hydroflow.set_state_tick_hook(#state_ident, |rcell| { rcell.take(); }); // Resets state to `Default::default()`.
});
}

let func = &arguments[0];
let by_fn = &arguments[0];

// TODO(mingwei): deduplicate codegen
let write_iterator = if is_pull {
Expand All @@ -117,7 +141,7 @@ pub const STATE_BY: OperatorConstraints = OperatorConstraints {
#root::lattices::Merge::merge(&mut *state, (mapfn)(::std::clone::Clone::clone(item)))
})
}
check_input::<_, _, _, _, #lattice_type>(#input, #func, #state_ident, #context)
check_input::<_, _, _, _, #lattice_type>(#input, #by_fn, #state_ident, #context)
};
}
} else if let Some(output) = outputs.first() {
Expand All @@ -141,7 +165,7 @@ pub const STATE_BY: OperatorConstraints = OperatorConstraints {
#root::lattices::Merge::merge(&mut *state, (mapfn)(::std::clone::Clone::clone(item)))
}, push)
}
check_output::<_, _, _, _, #lattice_type>(#output, #func, #state_ident, #context)
check_output::<_, _, _, _, #lattice_type>(#output, #by_fn, #state_ident, #context)
};
}
} else {
Expand All @@ -164,7 +188,7 @@ pub const STATE_BY: OperatorConstraints = OperatorConstraints {
#root::lattices::Merge::merge(&mut *state, (mapfn)(item));
})
}
check_output::<_, _, _, #lattice_type>(#state_ident, #func, #context)
check_output::<_, _, _, #lattice_type>(#state_ident, #by_fn, #context)
};
}
};
Expand Down
Loading

0 comments on commit 268c085

Please sign in to comment.