Skip to content

Commit 369b860

Browse files
committed
feat(hydro_lang): track cross-cluster-member consistency in the type system
1 parent 6803fcd commit 369b860

45 files changed

Lines changed: 1912 additions & 1437 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/docs/hydro/learn/quickstart/partitioned-counter.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Before partitioning the counter, you need to make the `keyed_counter_service` fu
2727
<CodeBlock language="rust" title={
2828
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/keyed_counter.rs">src/keyed_counter.rs</Link>
2929
} showLineNumbers={8}>{getLines(keyedCounterSrc, 8, 14, `
30-
pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
30+
pub fn keyed_counter_service<'a, L: Location<'a>>(
3131
increment_requests: KeyedStream<u32, String, L, Unbounded>,
3232
get_requests: KeyedStream<u32, String, L, Unbounded>,
3333
) -> (
@@ -36,7 +36,7 @@ pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
3636
) {
3737
`)}</CodeBlock>
3838

39-
The `L: Location<'a> + NoTick` bound means the function works with any location that that is outside a `Tick` (like `Process` or `Cluster`). This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type.
39+
The `L: Location<'a>` bound means the function works with any location. This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type.
4040

4141
## Introducing Clusters
4242

hydro_lang/src/compile/built.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ impl<'a> BuiltFlow<'a> {
153153
cluster_max_sizes: SparseSecondaryMap::new(),
154154
externals_port_registry: Default::default(),
155155
test_safety_only: false,
156+
skip_consistency_assertions: false,
156157
_phantom: PhantomData,
157158
}
158159
}

hydro_lang/src/compile/deploy.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
448448
}
449449

450450
pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
451-
let LocationId::Cluster(location_key) = c.id() else {
451+
let LocationId::Cluster(location_key, _) = c.id() else {
452452
panic!("Cluster ID expected")
453453
};
454454
self.clusters.get(location_key).unwrap()
@@ -472,9 +472,16 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
472472
self.location_names
473473
.iter()
474474
.filter_map(|(location_key, location_name)| {
475-
self.clusters
476-
.get(location_key)
477-
.map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
475+
self.clusters.get(location_key).map(|cluster| {
476+
(
477+
LocationId::Cluster(
478+
location_key,
479+
crate::location::dynamic::ClusterConsistency::EventuallyConsistent,
480+
),
481+
&**location_name,
482+
cluster,
483+
)
484+
})
478485
})
479486
}
480487

hydro_lang/src/compile/embedded.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,11 @@ impl<'a> Deploy<'a> for EmbeddedDeploy {
410410
) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
411411
{
412412
let at_key = match at_location {
413-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
413+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
414414
_ => panic!("cluster_membership_stream must be called from a process or cluster"),
415415
};
416416
let cluster_key = match location_id {
417-
LocationId::Cluster(key) => *key,
417+
LocationId::Cluster(key, _) => *key,
418418
_ => panic!("cluster_membership_stream target must be a cluster"),
419419
};
420420
let vec = env.membership_streams.entry(at_key).unwrap().or_default();

hydro_lang/src/compile/ir/mod.rs

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,13 @@ pub trait DfirBuilder {
427427
serialize: Option<&DebugExpr>,
428428
tag_id: usize,
429429
);
430+
431+
fn assert_is_consistent(
432+
&mut self,
433+
location: &LocationId,
434+
in_ident: syn::Ident,
435+
out_ident: &syn::Ident,
436+
);
430437
}
431438

432439
#[cfg(feature = "build")]
@@ -661,6 +668,22 @@ impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
661668
);
662669
}
663670
}
671+
672+
fn assert_is_consistent(
673+
&mut self,
674+
location: &LocationId,
675+
in_ident: syn::Ident,
676+
out_ident: &syn::Ident,
677+
) {
678+
let builder = self.get_dfir_mut(location);
679+
builder.add_dfir(
680+
parse_quote! {
681+
#out_ident = #in_ident;
682+
},
683+
None,
684+
None,
685+
);
686+
}
664687
}
665688

666689
#[cfg(feature = "build")]
@@ -721,7 +744,7 @@ impl HydroRoot {
721744
&mut self,
722745
extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723746
seen_tees: &mut SeenSharedNodes,
724-
seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
747+
seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
725748
processes: &SparseSecondaryMap<LocationKey, D::Process>,
726749
clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727750
externals: &SparseSecondaryMap<LocationKey, D::External>,
@@ -815,7 +838,7 @@ impl HydroRoot {
815838
)
816839
}
817840
}
818-
LocationId::Cluster(cluster_key) => {
841+
LocationId::Cluster(cluster_key, _) => {
819842
let from_node = clusters
820843
.get(*cluster_key)
821844
.unwrap_or_else(|| {
@@ -863,7 +886,7 @@ impl HydroRoot {
863886
_ => panic!("Embedded output must have Stream collection kind"),
864887
};
865888
let location_key = match input.metadata().location_id.root() {
866-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
889+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
867890
_ => panic!("Embedded output must be on a process or cluster"),
868891
};
869892
D::register_embedded_output(
@@ -964,7 +987,7 @@ impl HydroRoot {
964987
D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965988
)
966989
}
967-
LocationId::Cluster(cluster_key) => {
990+
LocationId::Cluster(cluster_key, _) => {
968991
let to_node = clusters
969992
.get(*cluster_key)
970993
.unwrap_or_else(|| {
@@ -1010,7 +1033,7 @@ impl HydroRoot {
10101033
_ => panic!("Embedded source must have Stream collection kind"),
10111034
};
10121035
let location_key = match metadata.location_id.root() {
1013-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
1036+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
10141037
_ => panic!("Embedded source must be on a process or cluster"),
10151038
};
10161039
D::register_embedded_stream_input(
@@ -1025,7 +1048,7 @@ impl HydroRoot {
10251048
_ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
10261049
};
10271050
let location_key = match metadata.location_id.root() {
1028-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
1051+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
10291052
_ => panic!("EmbeddedSingleton source must be on a process or cluster"),
10301053
};
10311054
D::register_embedded_singleton_input(
@@ -1038,7 +1061,7 @@ impl HydroRoot {
10381061
match state {
10391062
ClusterMembersState::Uninit => {
10401063
let at_location = metadata.location_id.root().clone();
1041-
let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1064+
let key = (at_location.clone(), location_id.key());
10421065
if refcell_seen_cluster_members.borrow_mut().insert(key) {
10431066
// First occurrence: call cluster_membership_stream and mark as Stream.
10441067
let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
@@ -1482,7 +1505,7 @@ fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
14821505
LocationId::Atomic(inner) => {
14831506
remap_location(inner, uf);
14841507
}
1485-
LocationId::Process(_) | LocationId::Cluster(_) => {}
1508+
LocationId::Process(_) | LocationId::Cluster(_, _) => {}
14861509
}
14871510
}
14881511

@@ -2077,6 +2100,11 @@ pub enum HydroNode {
20772100
input: Box<HydroNode>,
20782101
metadata: HydroIrMetadata,
20792102
},
2103+
2104+
AssertIsConsistent {
2105+
inner: Box<HydroNode>,
2106+
metadata: HydroIrMetadata,
2107+
},
20802108
}
20812109

20822110
pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
@@ -2165,7 +2193,8 @@ impl HydroNode {
21652193
| HydroNode::BeginAtomic { inner, .. }
21662194
| HydroNode::EndAtomic { inner, .. }
21672195
| HydroNode::Batch { inner, .. }
2168-
| HydroNode::YieldConcat { inner, .. } => {
2196+
| HydroNode::YieldConcat { inner, .. }
2197+
| HydroNode::AssertIsConsistent { inner, .. } => {
21692198
transform(inner.as_mut(), seen_tees);
21702199
}
21712200

@@ -2240,6 +2269,10 @@ impl HydroNode {
22402269
trusted: *trusted,
22412270
metadata: metadata.clone(),
22422271
},
2272+
HydroNode::AssertIsConsistent { inner, metadata } => HydroNode::AssertIsConsistent {
2273+
inner: Box::new(inner.deep_clone(seen_tees)),
2274+
metadata: metadata.clone(),
2275+
},
22432276
HydroNode::Source { source, metadata } => HydroNode::Source {
22442277
source: source.clone(),
22452278
metadata: metadata.clone(),
@@ -2587,6 +2620,30 @@ impl HydroNode {
25872620
// input_ident stays on stack as output
25882621
}
25892622

2623+
HydroNode::AssertIsConsistent { inner, .. } => {
2624+
let inner_ident = ident_stack.pop().unwrap();
2625+
2626+
let out_ident =
2627+
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2628+
2629+
match builders_or_callback {
2630+
BuildersOrCallback::Builders(graph_builders) => {
2631+
graph_builders.assert_is_consistent(
2632+
&inner.metadata().location_id,
2633+
inner_ident,
2634+
&out_ident,
2635+
);
2636+
}
2637+
BuildersOrCallback::Callback(_, node_callback) => {
2638+
node_callback(node, next_stmt_id);
2639+
}
2640+
}
2641+
2642+
*next_stmt_id += 1;
2643+
2644+
ident_stack.push(out_ident);
2645+
}
2646+
25902647
HydroNode::ObserveNonDet {
25912648
inner,
25922649
trusted,
@@ -4034,7 +4091,9 @@ impl HydroNode {
40344091
HydroNode::Placeholder => {
40354092
panic!()
40364093
}
4037-
HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4094+
HydroNode::Cast { .. }
4095+
| HydroNode::ObserveNonDet { .. }
4096+
| HydroNode::AssertIsConsistent { .. } => {}
40384097
HydroNode::Source { source, .. } => match source {
40394098
HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
40404099
HydroSource::ExternalNetwork()
@@ -4119,6 +4178,7 @@ impl HydroNode {
41194178
}
41204179
HydroNode::Cast { metadata, .. } => metadata,
41214180
HydroNode::ObserveNonDet { metadata, .. } => metadata,
4181+
HydroNode::AssertIsConsistent { metadata, .. } => metadata,
41224182
HydroNode::Source { metadata, .. } => metadata,
41234183
HydroNode::SingletonSource { metadata, .. } => metadata,
41244184
HydroNode::CycleSource { metadata, .. } => metadata,
@@ -4172,6 +4232,7 @@ impl HydroNode {
41724232
}
41734233
HydroNode::Cast { metadata, .. } => metadata,
41744234
HydroNode::ObserveNonDet { metadata, .. } => metadata,
4235+
HydroNode::AssertIsConsistent { metadata, .. } => metadata,
41754236
HydroNode::Source { metadata, .. } => metadata,
41764237
HydroNode::SingletonSource { metadata, .. } => metadata,
41774238
HydroNode::CycleSource { metadata, .. } => metadata,
@@ -4233,7 +4294,8 @@ impl HydroNode {
42334294
| HydroNode::YieldConcat { inner, .. }
42344295
| HydroNode::BeginAtomic { inner, .. }
42354296
| HydroNode::EndAtomic { inner, .. }
4236-
| HydroNode::Batch { inner, .. } => {
4297+
| HydroNode::Batch { inner, .. }
4298+
| HydroNode::AssertIsConsistent { inner, .. } => {
42374299
vec![inner]
42384300
}
42394301
HydroNode::Chain { first, second, .. } => {
@@ -4307,6 +4369,7 @@ impl HydroNode {
43074369
}
43084370
HydroNode::Cast { .. } => "Cast()".to_owned(),
43094371
HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4372+
HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
43104373
HydroNode::Source { source, .. } => format!("Source({:?})", source),
43114374
HydroNode::SingletonSource {
43124375
value,
@@ -4433,7 +4496,7 @@ where
44334496
D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
44344497
)
44354498
}
4436-
(&LocationId::Process(from), &LocationId::Cluster(to)) => {
4499+
(&LocationId::Process(from), &LocationId::Cluster(to, _)) => {
44374500
let from_node = processes
44384501
.get(from)
44394502
.unwrap_or_else(|| {
@@ -4463,7 +4526,7 @@ where
44634526
D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
44644527
)
44654528
}
4466-
(&LocationId::Cluster(from), &LocationId::Process(to)) => {
4529+
(&LocationId::Cluster(from, _), &LocationId::Process(to)) => {
44674530
let from_node = clusters
44684531
.get(from)
44694532
.unwrap_or_else(|| {
@@ -4493,7 +4556,7 @@ where
44934556
D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
44944557
)
44954558
}
4496-
(&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4559+
(&LocationId::Cluster(from, _), &LocationId::Cluster(to, _)) => {
44974560
let from_node = clusters
44984561
.get(from)
44994562
.unwrap_or_else(|| {

hydro_lang/src/deploy/deploy_graph_containerized_ecs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ impl EcsDeploy {
376376
}
377377

378378
for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
379-
let LocationId::Cluster(raw_id) = location_id else {
379+
let LocationId::Cluster(raw_id, _) = location_id else {
380380
unreachable!()
381381
};
382382
let task_family_prefix = cluster.name.clone();

hydro_lang/src/deploy/maelstrom/deploy_runtime_maelstrom.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::live_collections::keyed_stream::KeyedStream;
2121
use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
2222
use crate::location::dynamic::LocationId;
2323
use crate::location::member_id::TaglessMemberId;
24-
use crate::location::{Cluster, LocationKey, MembershipEvent, NoTick};
24+
use crate::location::{Cluster, LocationKey, MembershipEvent};
2525
use crate::nondet::nondet;
2626

2727
/// Maelstrom message envelope structure.

hydro_lang/src/deploy/maelstrom/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::de::DeserializeOwned;
66
use crate::forward_handle::ForwardHandle;
77
use crate::live_collections::KeyedStream;
88
use crate::live_collections::stream::TotalOrder;
9-
use crate::location::{Cluster, NoTick};
9+
use crate::location::Cluster;
1010
use crate::nondet::nondet;
1111

1212
#[cfg(stageleft_runtime)]
@@ -39,10 +39,7 @@ pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
3939
) -> (
4040
KeyedStream<String, In, Cluster<'a, C>>,
4141
ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42-
)
43-
where
44-
Cluster<'a, C>: NoTick,
45-
{
42+
) {
4643
use stageleft::q;
4744

4845
use crate::location::Location;

hydro_lang/src/forward_handle.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ where
5050
{
5151
type Location: Location<'a>;
5252

53+
fn location(&self) -> &Self::Location;
54+
5355
fn create_source_with_initial(
5456
cycle_id: CycleId,
5557
initial: Self,

hydro_lang/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub mod prelude {
5858
pub use crate::location::{Cluster, External, Location as _, Process, Tick};
5959
pub use crate::networking::TCP;
6060
pub use crate::nondet::{NonDet, nondet};
61-
pub use crate::properties::{ManualProof, manual_proof};
61+
pub use crate::properties::{ConsistencyProof, ManualProof, manual_proof};
6262

6363
/// A macro to set up a Hydro crate.
6464
#[macro_export]

0 commit comments

Comments
 (0)