Skip to content

Commit c052afc

Browse files
committed
feat(hydro_lang): remove NoTick, track consistency in the type system
1 parent 33dfda8 commit c052afc

48 files changed

Lines changed: 2292 additions & 1633 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/docs/hydro/learn/quickstart/partitioned-counter.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ Before partitioning the counter, you need to make the `keyed_counter_service` fu
2727
<CodeBlock language="rust" title={
2828
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/keyed_counter.rs">src/keyed_counter.rs</Link>
2929
} showLineNumbers={8}>{getLines(keyedCounterSrc, 8, 14, `
30-
pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
30+
pub fn keyed_counter_service<'a, L: Location<'a>>(
3131
increment_requests: KeyedStream<u32, String, L, Unbounded>,
3232
get_requests: KeyedStream<u32, String, L, Unbounded>,
3333
) -> (
3434
KeyedStream<u32, String, L, Unbounded>,
35-
KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
35+
KeyedStream<u32, (String, usize), L::NoConsistency, Unbounded, NoOrder>,
3636
) {
3737
`)}</CodeBlock>
3838

39-
The `L: Location<'a> + NoTick` bound means the function works with any location that that is outside a `Tick` (like `Process` or `Cluster`). This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type.
39+
The `L: Location<'a>` bound means the function works with any location. This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type. You'll notice that we also had to modify the location type parameter for the second output stream (the lookup responses), to be `L::NoConsistency`. In addition to tracking determinism, Hydro also tracks whether _eventual consistency_ is guaranteed across several replicas. Now that we will be scaling up the service, Hydro points out that even if the requests are the same across several replicas, the responses may be different because the service uses an asynchronous snapshot of the counter state. Hence, the responses stream is tagged with `L::NoConsistency`.
4040

4141
## Introducing Clusters
4242

docs/docs/hydro/reference/locations/clusters.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ on_worker.send(&p2, TCP.fail_stop().bincode())
124124
# }));
125125
```
126126

127-
This API requires a [non-determinism guard](../live-collections/determinism.md#unsafe-operations-in-hydro), because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the `broadcast` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `source_cluster_members` method to get a stream of membership events (cluster members joining or leaving):
127+
This API requires a [non-determinism guard](../live-collections/determinism.md#unsafe-operations-in-hydro), because the set of cluster members may asynchronously change over time. Depending on when we are notified of membership changes, we will broadcast to different members. Under the hood, the `broadcast` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `source_cluster_membership_stream` method to get a stream of membership events (cluster members joining or leaving):
128128

129129
```rust
130130
# use hydro_lang::prelude::*;
@@ -134,7 +134,7 @@ let p1 = flow.process::<()>();
134134
let workers: Cluster<()> = flow.cluster::<()>();
135135
# // do nothing on each worker
136136
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
137-
let cluster_members = p1.source_cluster_members(&workers);
137+
let cluster_members = p1.source_cluster_membership_stream(&workers, nondet!(/** late joiners may miss events */));
138138
# cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
139139
// if there are 4 members in the cluster, we would see a join event for each
140140
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }

hydro_lang/src/compile/built.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ impl<'a> BuiltFlow<'a> {
153153
cluster_max_sizes: SparseSecondaryMap::new(),
154154
externals_port_registry: Default::default(),
155155
test_safety_only: false,
156+
skip_consistency_assertions: false,
156157
_phantom: PhantomData,
157158
}
158159
}

hydro_lang/src/compile/deploy.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
448448
}
449449

450450
pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
451-
let LocationId::Cluster(location_key) = c.id() else {
451+
let LocationId::Cluster(location_key, _) = c.id() else {
452452
panic!("Cluster ID expected")
453453
};
454454
self.clusters.get(location_key).unwrap()
@@ -472,9 +472,16 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
472472
self.location_names
473473
.iter()
474474
.filter_map(|(location_key, location_name)| {
475-
self.clusters
476-
.get(location_key)
477-
.map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
475+
self.clusters.get(location_key).map(|cluster| {
476+
(
477+
LocationId::Cluster(
478+
location_key,
479+
crate::location::dynamic::ClusterConsistency::EventuallyConsistent,
480+
),
481+
&**location_name,
482+
cluster,
483+
)
484+
})
478485
})
479486
}
480487

hydro_lang/src/compile/embedded.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,11 @@ impl<'a> Deploy<'a> for EmbeddedDeploy {
410410
) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
411411
{
412412
let at_key = match at_location {
413-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
413+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
414414
_ => panic!("cluster_membership_stream must be called from a process or cluster"),
415415
};
416416
let cluster_key = match location_id {
417-
LocationId::Cluster(key) => *key,
417+
LocationId::Cluster(key, _) => *key,
418418
_ => panic!("cluster_membership_stream target must be a cluster"),
419419
};
420420
let vec = env.membership_streams.entry(at_key).unwrap().or_default();

hydro_lang/src/compile/ir/mod.rs

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,13 @@ pub trait DfirBuilder {
427427
serialize: Option<&DebugExpr>,
428428
tag_id: usize,
429429
);
430+
431+
fn assert_is_consistent(
432+
&mut self,
433+
location: &LocationId,
434+
in_ident: syn::Ident,
435+
out_ident: &syn::Ident,
436+
);
430437
}
431438

432439
#[cfg(feature = "build")]
@@ -661,6 +668,22 @@ impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
661668
);
662669
}
663670
}
671+
672+
fn assert_is_consistent(
673+
&mut self,
674+
location: &LocationId,
675+
in_ident: syn::Ident,
676+
out_ident: &syn::Ident,
677+
) {
678+
let builder = self.get_dfir_mut(location);
679+
builder.add_dfir(
680+
parse_quote! {
681+
#out_ident = #in_ident;
682+
},
683+
None,
684+
None,
685+
);
686+
}
664687
}
665688

666689
#[cfg(feature = "build")]
@@ -721,7 +744,7 @@ impl HydroRoot {
721744
&mut self,
722745
extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723746
seen_tees: &mut SeenSharedNodes,
724-
seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
747+
seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
725748
processes: &SparseSecondaryMap<LocationKey, D::Process>,
726749
clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727750
externals: &SparseSecondaryMap<LocationKey, D::External>,
@@ -815,7 +838,7 @@ impl HydroRoot {
815838
)
816839
}
817840
}
818-
LocationId::Cluster(cluster_key) => {
841+
LocationId::Cluster(cluster_key, _) => {
819842
let from_node = clusters
820843
.get(*cluster_key)
821844
.unwrap_or_else(|| {
@@ -863,7 +886,7 @@ impl HydroRoot {
863886
_ => panic!("Embedded output must have Stream collection kind"),
864887
};
865888
let location_key = match input.metadata().location_id.root() {
866-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
889+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
867890
_ => panic!("Embedded output must be on a process or cluster"),
868891
};
869892
D::register_embedded_output(
@@ -964,7 +987,7 @@ impl HydroRoot {
964987
D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965988
)
966989
}
967-
LocationId::Cluster(cluster_key) => {
990+
LocationId::Cluster(cluster_key, _) => {
968991
let to_node = clusters
969992
.get(*cluster_key)
970993
.unwrap_or_else(|| {
@@ -1010,7 +1033,7 @@ impl HydroRoot {
10101033
_ => panic!("Embedded source must have Stream collection kind"),
10111034
};
10121035
let location_key = match metadata.location_id.root() {
1013-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
1036+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
10141037
_ => panic!("Embedded source must be on a process or cluster"),
10151038
};
10161039
D::register_embedded_stream_input(
@@ -1025,7 +1048,7 @@ impl HydroRoot {
10251048
_ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
10261049
};
10271050
let location_key = match metadata.location_id.root() {
1028-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
1051+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
10291052
_ => panic!("EmbeddedSingleton source must be on a process or cluster"),
10301053
};
10311054
D::register_embedded_singleton_input(
@@ -1038,7 +1061,7 @@ impl HydroRoot {
10381061
match state {
10391062
ClusterMembersState::Uninit => {
10401063
let at_location = metadata.location_id.root().clone();
1041-
let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1064+
let key = (at_location.clone(), location_id.key());
10421065
if refcell_seen_cluster_members.borrow_mut().insert(key) {
10431066
// First occurrence: call cluster_membership_stream and mark as Stream.
10441067
let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
@@ -1482,7 +1505,7 @@ fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
14821505
LocationId::Atomic(inner) => {
14831506
remap_location(inner, uf);
14841507
}
1485-
LocationId::Process(_) | LocationId::Cluster(_) => {}
1508+
LocationId::Process(_) | LocationId::Cluster(_, _) => {}
14861509
}
14871510
}
14881511

@@ -2086,6 +2109,11 @@ pub enum HydroNode {
20862109
input: Box<HydroNode>,
20872110
metadata: HydroIrMetadata,
20882111
},
2112+
2113+
AssertIsConsistent {
2114+
inner: Box<HydroNode>,
2115+
metadata: HydroIrMetadata,
2116+
},
20892117
}
20902118

20912119
pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
@@ -2174,7 +2202,8 @@ impl HydroNode {
21742202
| HydroNode::BeginAtomic { inner, .. }
21752203
| HydroNode::EndAtomic { inner, .. }
21762204
| HydroNode::Batch { inner, .. }
2177-
| HydroNode::YieldConcat { inner, .. } => {
2205+
| HydroNode::YieldConcat { inner, .. }
2206+
| HydroNode::AssertIsConsistent { inner, .. } => {
21782207
transform(inner.as_mut(), seen_tees);
21792208
}
21802209

@@ -2250,6 +2279,10 @@ impl HydroNode {
22502279
trusted: *trusted,
22512280
metadata: metadata.clone(),
22522281
},
2282+
HydroNode::AssertIsConsistent { inner, metadata } => HydroNode::AssertIsConsistent {
2283+
inner: Box::new(inner.deep_clone(seen_tees)),
2284+
metadata: metadata.clone(),
2285+
},
22532286
HydroNode::Source { source, metadata } => HydroNode::Source {
22542287
source: source.clone(),
22552288
metadata: metadata.clone(),
@@ -2606,6 +2639,30 @@ impl HydroNode {
26062639
// input_ident stays on stack as output
26072640
}
26082641

2642+
HydroNode::AssertIsConsistent { inner, .. } => {
2643+
let inner_ident = ident_stack.pop().unwrap();
2644+
2645+
let out_ident =
2646+
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2647+
2648+
match builders_or_callback {
2649+
BuildersOrCallback::Builders(graph_builders) => {
2650+
graph_builders.assert_is_consistent(
2651+
&inner.metadata().location_id,
2652+
inner_ident,
2653+
&out_ident,
2654+
);
2655+
}
2656+
BuildersOrCallback::Callback(_, node_callback) => {
2657+
node_callback(node, next_stmt_id);
2658+
}
2659+
}
2660+
2661+
*next_stmt_id += 1;
2662+
2663+
ident_stack.push(out_ident);
2664+
}
2665+
26092666
HydroNode::ObserveNonDet {
26102667
inner,
26112668
trusted,
@@ -4099,7 +4156,9 @@ impl HydroNode {
40994156
HydroNode::Placeholder => {
41004157
panic!()
41014158
}
4102-
HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4159+
HydroNode::Cast { .. }
4160+
| HydroNode::ObserveNonDet { .. }
4161+
| HydroNode::AssertIsConsistent { .. } => {}
41034162
HydroNode::Source { source, .. } => match source {
41044163
HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
41054164
HydroSource::ExternalNetwork()
@@ -4185,6 +4244,7 @@ impl HydroNode {
41854244
}
41864245
HydroNode::Cast { metadata, .. } => metadata,
41874246
HydroNode::ObserveNonDet { metadata, .. } => metadata,
4247+
HydroNode::AssertIsConsistent { metadata, .. } => metadata,
41884248
HydroNode::Source { metadata, .. } => metadata,
41894249
HydroNode::SingletonSource { metadata, .. } => metadata,
41904250
HydroNode::CycleSource { metadata, .. } => metadata,
@@ -4239,6 +4299,7 @@ impl HydroNode {
42394299
}
42404300
HydroNode::Cast { metadata, .. } => metadata,
42414301
HydroNode::ObserveNonDet { metadata, .. } => metadata,
4302+
HydroNode::AssertIsConsistent { metadata, .. } => metadata,
42424303
HydroNode::Source { metadata, .. } => metadata,
42434304
HydroNode::SingletonSource { metadata, .. } => metadata,
42444305
HydroNode::CycleSource { metadata, .. } => metadata,
@@ -4301,7 +4362,8 @@ impl HydroNode {
43014362
| HydroNode::YieldConcat { inner, .. }
43024363
| HydroNode::BeginAtomic { inner, .. }
43034364
| HydroNode::EndAtomic { inner, .. }
4304-
| HydroNode::Batch { inner, .. } => {
4365+
| HydroNode::Batch { inner, .. }
4366+
| HydroNode::AssertIsConsistent { inner, .. } => {
43054367
vec![inner]
43064368
}
43074369
HydroNode::Chain { first, second, .. } => {
@@ -4376,6 +4438,7 @@ impl HydroNode {
43764438
}
43774439
HydroNode::Cast { .. } => "Cast()".to_owned(),
43784440
HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4441+
HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
43794442
HydroNode::Source { source, .. } => format!("Source({:?})", source),
43804443
HydroNode::SingletonSource {
43814444
value,
@@ -4505,7 +4568,7 @@ where
45054568
D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
45064569
)
45074570
}
4508-
(&LocationId::Process(from), &LocationId::Cluster(to)) => {
4571+
(&LocationId::Process(from), &LocationId::Cluster(to, _)) => {
45094572
let from_node = processes
45104573
.get(from)
45114574
.unwrap_or_else(|| {
@@ -4535,7 +4598,7 @@ where
45354598
D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
45364599
)
45374600
}
4538-
(&LocationId::Cluster(from), &LocationId::Process(to)) => {
4601+
(&LocationId::Cluster(from, _), &LocationId::Process(to)) => {
45394602
let from_node = clusters
45404603
.get(from)
45414604
.unwrap_or_else(|| {
@@ -4565,7 +4628,7 @@ where
45654628
D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
45664629
)
45674630
}
4568-
(&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4631+
(&LocationId::Cluster(from, _), &LocationId::Cluster(to, _)) => {
45694632
let from_node = clusters
45704633
.get(from)
45714634
.unwrap_or_else(|| {

hydro_lang/src/deploy/deploy_graph_containerized_ecs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ impl EcsDeploy {
376376
}
377377

378378
for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
379-
let LocationId::Cluster(raw_id) = location_id else {
379+
let LocationId::Cluster(raw_id, _) = location_id else {
380380
unreachable!()
381381
};
382382
let task_family_prefix = cluster.name.clone();

hydro_lang/src/deploy/maelstrom/deploy_runtime_maelstrom.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::live_collections::keyed_stream::KeyedStream;
2121
use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
2222
use crate::location::dynamic::LocationId;
2323
use crate::location::member_id::TaglessMemberId;
24-
use crate::location::{Cluster, LocationKey, MembershipEvent, NoTick};
24+
use crate::location::{Cluster, LocationKey, MembershipEvent};
2525
use crate::nondet::nondet;
2626

2727
/// Maelstrom message envelope structure.

hydro_lang/src/deploy/maelstrom/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::de::DeserializeOwned;
66
use crate::forward_handle::ForwardHandle;
77
use crate::live_collections::KeyedStream;
88
use crate::live_collections::stream::TotalOrder;
9-
use crate::location::{Cluster, NoTick};
9+
use crate::location::Cluster;
1010
use crate::nondet::nondet;
1111

1212
#[cfg(stageleft_runtime)]
@@ -39,10 +39,7 @@ pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
3939
) -> (
4040
KeyedStream<String, In, Cluster<'a, C>>,
4141
ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42-
)
43-
where
44-
Cluster<'a, C>: NoTick,
45-
{
42+
) {
4643
use stageleft::q;
4744

4845
use crate::location::Location;

hydro_lang/src/forward_handle.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ where
5050
{
5151
type Location: Location<'a>;
5252

53+
fn location(&self) -> &Self::Location;
54+
5355
fn create_source_with_initial(
5456
cycle_id: CycleId,
5557
initial: Self,

0 commit comments

Comments
 (0)