Skip to content

Commit 86bae0b

Browse files
committed
feat(hydro_lang): remove NoTick, track consistency in the type system
1 parent 6803fcd commit 86bae0b

46 files changed

Lines changed: 2243 additions & 1626 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/docs/hydro/learn/quickstart/partitioned-counter.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ Before partitioning the counter, you need to make the `keyed_counter_service` fu
2727
<CodeBlock language="rust" title={
2828
<Link href="https://github.com/hydro-project/hydro/tree/main/hydro_test/src/tutorials/keyed_counter.rs">src/keyed_counter.rs</Link>
2929
} showLineNumbers={8}>{getLines(keyedCounterSrc, 8, 14, `
30-
pub fn keyed_counter_service<'a, L: Location<'a> + NoTick>(
30+
pub fn keyed_counter_service<'a, L: Location<'a>>(
3131
increment_requests: KeyedStream<u32, String, L, Unbounded>,
3232
get_requests: KeyedStream<u32, String, L, Unbounded>,
3333
) -> (
3434
KeyedStream<u32, String, L, Unbounded>,
35-
KeyedStream<u32, (String, usize), L, Unbounded, NoOrder>,
35+
KeyedStream<u32, (String, usize), L::NoConsistency, Unbounded, NoOrder>,
3636
) {
3737
`)}</CodeBlock>
3838

39-
The `L: Location<'a> + NoTick` bound means the function works with any location that that is outside a `Tick` (like `Process` or `Cluster`). This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type.
39+
The `L: Location<'a>` bound means the function works with any location. This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type. You'll notice that we also had to modify the location type parameter for the second output stream (the lookup responses), to be `L::NoConsistency`. In addition to tracking determinism, Hydro also tracks whether _eventual consistency_ is guaranteed across several replicas. Now that we will be scaling up the service, Hydro points out that even if the requests are the same across several replicas, the responses may be different because the service uses an asynchronous snapshot of the counter state. Hence, the responses stream is tagged with `L::NoConsistency`.
4040

4141
## Introducing Clusters
4242

hydro_lang/src/compile/built.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ impl<'a> BuiltFlow<'a> {
153153
cluster_max_sizes: SparseSecondaryMap::new(),
154154
externals_port_registry: Default::default(),
155155
test_safety_only: false,
156+
skip_consistency_assertions: false,
156157
_phantom: PhantomData,
157158
}
158159
}

hydro_lang/src/compile/deploy.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
448448
}
449449

450450
pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
451-
let LocationId::Cluster(location_key) = c.id() else {
451+
let LocationId::Cluster(location_key, _) = c.id() else {
452452
panic!("Cluster ID expected")
453453
};
454454
self.clusters.get(location_key).unwrap()
@@ -472,9 +472,16 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
472472
self.location_names
473473
.iter()
474474
.filter_map(|(location_key, location_name)| {
475-
self.clusters
476-
.get(location_key)
477-
.map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
475+
self.clusters.get(location_key).map(|cluster| {
476+
(
477+
LocationId::Cluster(
478+
location_key,
479+
crate::location::dynamic::ClusterConsistency::EventuallyConsistent,
480+
),
481+
&**location_name,
482+
cluster,
483+
)
484+
})
478485
})
479486
}
480487

hydro_lang/src/compile/embedded.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,11 @@ impl<'a> Deploy<'a> for EmbeddedDeploy {
410410
) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
411411
{
412412
let at_key = match at_location {
413-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
413+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
414414
_ => panic!("cluster_membership_stream must be called from a process or cluster"),
415415
};
416416
let cluster_key = match location_id {
417-
LocationId::Cluster(key) => *key,
417+
LocationId::Cluster(key, _) => *key,
418418
_ => panic!("cluster_membership_stream target must be a cluster"),
419419
};
420420
let vec = env.membership_streams.entry(at_key).unwrap().or_default();

hydro_lang/src/compile/ir/mod.rs

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,13 @@ pub trait DfirBuilder {
427427
serialize: Option<&DebugExpr>,
428428
tag_id: usize,
429429
);
430+
431+
fn assert_is_consistent(
432+
&mut self,
433+
location: &LocationId,
434+
in_ident: syn::Ident,
435+
out_ident: &syn::Ident,
436+
);
430437
}
431438

432439
#[cfg(feature = "build")]
@@ -661,6 +668,22 @@ impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
661668
);
662669
}
663670
}
671+
672+
fn assert_is_consistent(
673+
&mut self,
674+
location: &LocationId,
675+
in_ident: syn::Ident,
676+
out_ident: &syn::Ident,
677+
) {
678+
let builder = self.get_dfir_mut(location);
679+
builder.add_dfir(
680+
parse_quote! {
681+
#out_ident = #in_ident;
682+
},
683+
None,
684+
None,
685+
);
686+
}
664687
}
665688

666689
#[cfg(feature = "build")]
@@ -721,7 +744,7 @@ impl HydroRoot {
721744
&mut self,
722745
extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723746
seen_tees: &mut SeenSharedNodes,
724-
seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
747+
seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
725748
processes: &SparseSecondaryMap<LocationKey, D::Process>,
726749
clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727750
externals: &SparseSecondaryMap<LocationKey, D::External>,
@@ -815,7 +838,7 @@ impl HydroRoot {
815838
)
816839
}
817840
}
818-
LocationId::Cluster(cluster_key) => {
841+
LocationId::Cluster(cluster_key, _) => {
819842
let from_node = clusters
820843
.get(*cluster_key)
821844
.unwrap_or_else(|| {
@@ -863,7 +886,7 @@ impl HydroRoot {
863886
_ => panic!("Embedded output must have Stream collection kind"),
864887
};
865888
let location_key = match input.metadata().location_id.root() {
866-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
889+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
867890
_ => panic!("Embedded output must be on a process or cluster"),
868891
};
869892
D::register_embedded_output(
@@ -964,7 +987,7 @@ impl HydroRoot {
964987
D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965988
)
966989
}
967-
LocationId::Cluster(cluster_key) => {
990+
LocationId::Cluster(cluster_key, _) => {
968991
let to_node = clusters
969992
.get(*cluster_key)
970993
.unwrap_or_else(|| {
@@ -1010,7 +1033,7 @@ impl HydroRoot {
10101033
_ => panic!("Embedded source must have Stream collection kind"),
10111034
};
10121035
let location_key = match metadata.location_id.root() {
1013-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
1036+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
10141037
_ => panic!("Embedded source must be on a process or cluster"),
10151038
};
10161039
D::register_embedded_stream_input(
@@ -1025,7 +1048,7 @@ impl HydroRoot {
10251048
_ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
10261049
};
10271050
let location_key = match metadata.location_id.root() {
1028-
LocationId::Process(key) | LocationId::Cluster(key) => *key,
1051+
LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
10291052
_ => panic!("EmbeddedSingleton source must be on a process or cluster"),
10301053
};
10311054
D::register_embedded_singleton_input(
@@ -1038,7 +1061,7 @@ impl HydroRoot {
10381061
match state {
10391062
ClusterMembersState::Uninit => {
10401063
let at_location = metadata.location_id.root().clone();
1041-
let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1064+
let key = (at_location.clone(), location_id.key());
10421065
if refcell_seen_cluster_members.borrow_mut().insert(key) {
10431066
// First occurrence: call cluster_membership_stream and mark as Stream.
10441067
let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
@@ -1482,7 +1505,7 @@ fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
14821505
LocationId::Atomic(inner) => {
14831506
remap_location(inner, uf);
14841507
}
1485-
LocationId::Process(_) | LocationId::Cluster(_) => {}
1508+
LocationId::Process(_) | LocationId::Cluster(_, _) => {}
14861509
}
14871510
}
14881511

@@ -2077,6 +2100,11 @@ pub enum HydroNode {
20772100
input: Box<HydroNode>,
20782101
metadata: HydroIrMetadata,
20792102
},
2103+
2104+
AssertIsConsistent {
2105+
inner: Box<HydroNode>,
2106+
metadata: HydroIrMetadata,
2107+
},
20802108
}
20812109

20822110
pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
@@ -2165,7 +2193,8 @@ impl HydroNode {
21652193
| HydroNode::BeginAtomic { inner, .. }
21662194
| HydroNode::EndAtomic { inner, .. }
21672195
| HydroNode::Batch { inner, .. }
2168-
| HydroNode::YieldConcat { inner, .. } => {
2196+
| HydroNode::YieldConcat { inner, .. }
2197+
| HydroNode::AssertIsConsistent { inner, .. } => {
21692198
transform(inner.as_mut(), seen_tees);
21702199
}
21712200

@@ -2240,6 +2269,10 @@ impl HydroNode {
22402269
trusted: *trusted,
22412270
metadata: metadata.clone(),
22422271
},
2272+
HydroNode::AssertIsConsistent { inner, metadata } => HydroNode::AssertIsConsistent {
2273+
inner: Box::new(inner.deep_clone(seen_tees)),
2274+
metadata: metadata.clone(),
2275+
},
22432276
HydroNode::Source { source, metadata } => HydroNode::Source {
22442277
source: source.clone(),
22452278
metadata: metadata.clone(),
@@ -2587,6 +2620,30 @@ impl HydroNode {
25872620
// input_ident stays on stack as output
25882621
}
25892622

2623+
HydroNode::AssertIsConsistent { inner, .. } => {
2624+
let inner_ident = ident_stack.pop().unwrap();
2625+
2626+
let out_ident =
2627+
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2628+
2629+
match builders_or_callback {
2630+
BuildersOrCallback::Builders(graph_builders) => {
2631+
graph_builders.assert_is_consistent(
2632+
&inner.metadata().location_id,
2633+
inner_ident,
2634+
&out_ident,
2635+
);
2636+
}
2637+
BuildersOrCallback::Callback(_, node_callback) => {
2638+
node_callback(node, next_stmt_id);
2639+
}
2640+
}
2641+
2642+
*next_stmt_id += 1;
2643+
2644+
ident_stack.push(out_ident);
2645+
}
2646+
25902647
HydroNode::ObserveNonDet {
25912648
inner,
25922649
trusted,
@@ -4034,7 +4091,9 @@ impl HydroNode {
40344091
HydroNode::Placeholder => {
40354092
panic!()
40364093
}
4037-
HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4094+
HydroNode::Cast { .. }
4095+
| HydroNode::ObserveNonDet { .. }
4096+
| HydroNode::AssertIsConsistent { .. } => {}
40384097
HydroNode::Source { source, .. } => match source {
40394098
HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
40404099
HydroSource::ExternalNetwork()
@@ -4119,6 +4178,7 @@ impl HydroNode {
41194178
}
41204179
HydroNode::Cast { metadata, .. } => metadata,
41214180
HydroNode::ObserveNonDet { metadata, .. } => metadata,
4181+
HydroNode::AssertIsConsistent { metadata, .. } => metadata,
41224182
HydroNode::Source { metadata, .. } => metadata,
41234183
HydroNode::SingletonSource { metadata, .. } => metadata,
41244184
HydroNode::CycleSource { metadata, .. } => metadata,
@@ -4172,6 +4232,7 @@ impl HydroNode {
41724232
}
41734233
HydroNode::Cast { metadata, .. } => metadata,
41744234
HydroNode::ObserveNonDet { metadata, .. } => metadata,
4235+
HydroNode::AssertIsConsistent { metadata, .. } => metadata,
41754236
HydroNode::Source { metadata, .. } => metadata,
41764237
HydroNode::SingletonSource { metadata, .. } => metadata,
41774238
HydroNode::CycleSource { metadata, .. } => metadata,
@@ -4233,7 +4294,8 @@ impl HydroNode {
42334294
| HydroNode::YieldConcat { inner, .. }
42344295
| HydroNode::BeginAtomic { inner, .. }
42354296
| HydroNode::EndAtomic { inner, .. }
4236-
| HydroNode::Batch { inner, .. } => {
4297+
| HydroNode::Batch { inner, .. }
4298+
| HydroNode::AssertIsConsistent { inner, .. } => {
42374299
vec![inner]
42384300
}
42394301
HydroNode::Chain { first, second, .. } => {
@@ -4307,6 +4369,7 @@ impl HydroNode {
43074369
}
43084370
HydroNode::Cast { .. } => "Cast()".to_owned(),
43094371
HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4372+
HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
43104373
HydroNode::Source { source, .. } => format!("Source({:?})", source),
43114374
HydroNode::SingletonSource {
43124375
value,
@@ -4433,7 +4496,7 @@ where
44334496
D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
44344497
)
44354498
}
4436-
(&LocationId::Process(from), &LocationId::Cluster(to)) => {
4499+
(&LocationId::Process(from), &LocationId::Cluster(to, _)) => {
44374500
let from_node = processes
44384501
.get(from)
44394502
.unwrap_or_else(|| {
@@ -4463,7 +4526,7 @@ where
44634526
D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
44644527
)
44654528
}
4466-
(&LocationId::Cluster(from), &LocationId::Process(to)) => {
4529+
(&LocationId::Cluster(from, _), &LocationId::Process(to)) => {
44674530
let from_node = clusters
44684531
.get(from)
44694532
.unwrap_or_else(|| {
@@ -4493,7 +4556,7 @@ where
44934556
D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
44944557
)
44954558
}
4496-
(&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4559+
(&LocationId::Cluster(from, _), &LocationId::Cluster(to, _)) => {
44974560
let from_node = clusters
44984561
.get(from)
44994562
.unwrap_or_else(|| {

hydro_lang/src/deploy/deploy_graph_containerized_ecs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ impl EcsDeploy {
376376
}
377377

378378
for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
379-
let LocationId::Cluster(raw_id) = location_id else {
379+
let LocationId::Cluster(raw_id, _) = location_id else {
380380
unreachable!()
381381
};
382382
let task_family_prefix = cluster.name.clone();

hydro_lang/src/deploy/maelstrom/deploy_runtime_maelstrom.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::live_collections::keyed_stream::KeyedStream;
2121
use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
2222
use crate::location::dynamic::LocationId;
2323
use crate::location::member_id::TaglessMemberId;
24-
use crate::location::{Cluster, LocationKey, MembershipEvent, NoTick};
24+
use crate::location::{Cluster, LocationKey, MembershipEvent};
2525
use crate::nondet::nondet;
2626

2727
/// Maelstrom message envelope structure.

hydro_lang/src/deploy/maelstrom/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::de::DeserializeOwned;
66
use crate::forward_handle::ForwardHandle;
77
use crate::live_collections::KeyedStream;
88
use crate::live_collections::stream::TotalOrder;
9-
use crate::location::{Cluster, NoTick};
9+
use crate::location::Cluster;
1010
use crate::nondet::nondet;
1111

1212
#[cfg(stageleft_runtime)]
@@ -39,10 +39,7 @@ pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
3939
) -> (
4040
KeyedStream<String, In, Cluster<'a, C>>,
4141
ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42-
)
43-
where
44-
Cluster<'a, C>: NoTick,
45-
{
42+
) {
4643
use stageleft::q;
4744

4845
use crate::location::Location;

hydro_lang/src/forward_handle.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ where
5050
{
5151
type Location: Location<'a>;
5252

53+
fn location(&self) -> &Self::Location;
54+
5355
fn create_source_with_initial(
5456
cycle_id: CycleId,
5557
initial: Self,

hydro_lang/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub mod prelude {
5858
pub use crate::location::{Cluster, External, Location as _, Process, Tick};
5959
pub use crate::networking::TCP;
6060
pub use crate::nondet::{NonDet, nondet};
61-
pub use crate::properties::{ManualProof, manual_proof};
61+
pub use crate::properties::{ConsistencyProof, ManualProof, manual_proof};
6262

6363
/// A macro to set up a Hydro crate.
6464
#[macro_export]

0 commit comments

Comments
 (0)