feat(hydro_lang)!: remove NoTick, track consistency in the type system#2796
feat(hydro_lang)!: remove NoTick, track consistency in the type system#2796
NoTick, track consistency in the type system#2796Conversation
Deploying hydro with
|
| Latest commit: |
644d810
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://1d3bbf67.hydroflow.pages.dev |
| Branch Preview URL: | https://type-system-consistency.hydroflow.pages.dev |
369b860 to
ec60d0a
Compare
NoTick, track consistency in the type system
fdeca67 to
86bae0b
Compare
033fef1 to
9b55c6d
Compare
c052afc to
675bfcd
Compare
675bfcd to
644d810
Compare
|
Does this make the error ux for nested ticks (not currently supported) worse? |
MingweiSamuel
left a comment
There was a problem hiding this comment.
I think we should have a trait for top level (slightly different from NoTick) to prevent accidental source stream calls (anything else?) from being called within a tick
| @@ -400,6 +434,11 @@ pub trait Location<'a>: dynamic::DynLocation { | |||
| /// This is useful for implementing protocols that need to track cluster membership, | |||
| /// such as broadcasting to all members or detecting failures. | |||
| /// | |||
| /// # Non-Determinism | |||
| /// This stream is non-deterministic because the timing of membership events, for example | |||
| /// if a node leaves, the membership event may not be received if the node left before the | |||
| /// stream was created. | |||
| /// | |||
| /// # Example | |||
| /// ```rust | |||
| /// # #[cfg(feature = "deploy")] { | |||
| @@ -410,7 +449,7 @@ pub trait Location<'a>: dynamic::DynLocation { | |||
| /// let workers: Cluster<()> = flow.cluster::<()>(); | |||
| /// # // do nothing on each worker | |||
| /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {})); | |||
| /// let cluster_members = p1.source_cluster_members(&workers); | |||
| /// let cluster_members = p1.source_cluster_members(&workers, nondet!(/** late joiners may miss events */)); | |||
| /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode()) | |||
| /// // if there are 4 members in the cluster, we would see a join event for each | |||
| /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... } | |||
| @@ -427,21 +466,77 @@ pub trait Location<'a>: dynamic::DynLocation { | |||
| fn source_cluster_members<C: 'a>( | |||
| &self, | |||
| cluster: &Cluster<'a, C>, | |||
| ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded> | |||
| nondet_start: NonDet, | |||
| ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::NoConsistency, Unbounded> | |||
| where | |||
| Self: Sized + NoTick, | |||
| Self: Sized, | |||
| { | |||
| self.source_cluster_membership_stream(cluster, nondet_start) | |||
| } | |||
There was a problem hiding this comment.
Not much point in marking this #[deprecated] when it has a immediately breaking change, recommend just removing it, or maybe just make it the body panic if you're worried about discoverability?
|
|
||
| fn assert_is_consistent( | ||
| &mut self, | ||
| location: &LocationId, | ||
| in_ident: syn::Ident, | ||
| out_ident: &syn::Ident, | ||
| ); |
There was a problem hiding this comment.
What is this for? Add doc comment
| /// For nested locations like [`Tick`], this returns the root location that contains it. | ||
| fn root(&self) -> Self::Root; | ||
|
|
||
| /// This location with consistency guarantees dropped for the live collection |
There was a problem hiding this comment.
| /// This location with consistency guarantees dropped for the live collection | |
| /// This location but with consistency guarantees dropped for the live collection |
There was a problem hiding this comment.
Pull request overview
This PR removes the NoTick marker trait and replaces its compile-time restrictions with staging-time checks, while also introducing a new “consistency” dimension in the type system for cluster locations (tracked both as a Cluster type parameter and in LocationId::Cluster). It updates Hydro APIs and tests/docs accordingly, and adds an IR node (AssertIsConsistent) plus simulator handling hooks for the new consistency assertions.
Changes:
- Remove
NoTickfrom location APIs and enforce “no nested ticks / no top-level sources inside ticks” via staging-time panics. - Add consistency tracking to cluster locations (
Cluster<..., Con>,LocationId::Cluster(key, consistency)) and propagate it through live collection APIs viaL::NoConsistency,drop_consistency(), andassert_has_consistency_of(...). - Update membership APIs (
source_cluster_membership_stream) to requirenondet!and update tests, snapshots, and docs.
Reviewed changes
Copilot reviewed 46 out of 48 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| template/hydro/.prompts/challenge-keyed-counter.md | Updates tutorial prompt for new Location bounds and L::NoConsistency output typing. |
| hydro_test/src/tutorials/keyed_counter_non_atomic.rs | Removes NoTick bound and adjusts return location typing to L::NoConsistency. |
| hydro_test/src/tutorials/keyed_counter.rs | Removes NoTick bound and adjusts return location typing to L::NoConsistency. |
| hydro_test/src/embedded/m2m_broadcast.rs | Migrates to source_cluster_membership_stream(..., nondet!(...)). |
| hydro_test/src/cluster/snapshots/two_pc_ir.snap | Updates expected IR LocationId::Cluster formatting to include consistency. |
| hydro_test/src/cluster/snapshots/simple_cluster_ir.snap | Updates expected IR LocationId::Cluster formatting to include consistency. |
| hydro_test/src/cluster/snapshots/map_reduce_ir.snap | Updates expected IR LocationId::Cluster formatting to include consistency. |
| hydro_test/src/cluster/snapshots/many_to_many_ir.snap | Updates expected IR LocationId::Cluster formatting to include consistency. |
| hydro_test/src/cluster/snapshots/compute_pi_ir.snap | Updates expected IR LocationId::Cluster formatting to include consistency. |
| hydro_test/src/cluster/simple_cluster.rs | Migrates to source_cluster_membership_stream(..., nondet!(...)). |
| hydro_test/src/cluster/paxos.rs | Removes NoTick bound and adjusts tick stream location typing to Tick<L::NoConsistency>. |
| hydro_test/src/cluster/kv_replica/sequence_payloads.rs | Uses drop_consistency() to satisfy new tick/cycle typing and consistency weakening. |
| hydro_std/src/request_response.rs | Removes NoTick bound and returns L::NoConsistency stream. |
| hydro_std/src/quorum.rs | Removes NoTick bound and adds consistency assertions via assert_has_consistency_of(...). |
| hydro_std/src/compartmentalize.rs | Removes NoTick from trait bounds. |
| hydro_lang/src/viz/render.rs | Adds rendering support for HydroNode::AssertIsConsistent. |
| hydro_lang/src/sim/tests/trophies/snapshots/trace_snapshot.snap | Updates snapshot line numbers after code changes. |
| hydro_lang/src/sim/tests/trophies/sequence_payloads.rs | Removes NoTick and adjusts cycles via drop_consistency(). |
| hydro_lang/src/sim/flow.rs | Adds skip_consistency_assertions option and builder method. |
| hydro_lang/src/sim/builder.rs | Implements assert_is_consistent DFIR emission (no-op or panic depending on sim config). |
| hydro_lang/src/properties/mod.rs | Introduces sealed ConsistencyProof trait and implements it for ManualProof. |
| hydro_lang/src/location/tick.rs | Removes NoTick, adds consistency plumbing to Tick/Atomic, and updates DynLocation method name to dyn_id. |
| hydro_lang/src/location/process.rs | Implements new consistency-related Location trait requirements for Process. |
| hydro_lang/src/location/mod.rs | Adds NoConsistency associated type + consistency APIs; replaces NoTick enforcement with staging-time panics; updates source APIs to return Self::NoConsistency. |
| hydro_lang/src/location/dynamic.rs | Adds ClusterConsistency and extends LocationId::Cluster to include consistency; renames DynLocation::id to dyn_id. |
| hydro_lang/src/location/cluster.rs | Adds Consistency marker trait and consistency type parameter to Cluster; updates Location impl accordingly. |
| hydro_lang/src/live_collections/stream/networking.rs | Switches to source_cluster_membership_stream and adjusts demux return location typing; imports nondet. |
| hydro_lang/src/live_collections/stream/mod.rs | Adds consistency weakening/strengthening APIs; propagates L::NoConsistency through various ops; updates batching signatures. |
| hydro_lang/src/live_collections/sliced/style.rs | Updates slicing/state helpers to operate with dropped-consistency tick locations. |
| hydro_lang/src/live_collections/sliced/mod.rs | Changes Slicable::get_location to return an owned L and updates related bounds. |
| hydro_lang/src/live_collections/singleton.rs | Adds consistency weaken/assert APIs; updates batching/snapshot signatures to Tick<L::NoConsistency>. |
| hydro_lang/src/live_collections/optional.rs | Adds consistency weaken/assert APIs; updates batching/snapshot signatures to Tick<L::NoConsistency>. |
| hydro_lang/src/live_collections/keyed_stream/networking.rs | Adjusts demux return location typing to Cluster<..., NoConsistency>. |
| hydro_lang/src/live_collections/keyed_stream/mod.rs | Adds consistency weaken/assert APIs; propagates L::NoConsistency through assume/batch APIs. |
| hydro_lang/src/live_collections/keyed_singleton.rs | Adds consistency weaken/assert APIs; updates snapshot/batch signatures to Tick<L::NoConsistency>. |
| hydro_lang/src/live_collections/batch_atomic.rs | Makes BatchAtomic lifetime-parametric and returns batched collections in Tick<L::NoConsistency>. |
| hydro_lang/src/lib.rs | Re-exports ConsistencyProof in the prelude. |
| hydro_lang/src/forward_handle.rs | Extends CycleCollectionWithInitial with a required location() accessor. |
| hydro_lang/src/deploy/maelstrom/mod.rs | Removes NoTick constraint from maelstrom_bidi_clients signature. |
| hydro_lang/src/deploy/maelstrom/deploy_runtime_maelstrom.rs | Removes NoTick import/usage. |
| hydro_lang/src/deploy/deploy_graph_containerized_ecs.rs | Updates LocationId::Cluster pattern match to include consistency. |
| hydro_lang/src/compile/ir/mod.rs | Adds HydroNode::AssertIsConsistent and DFIR builder plumbing; updates cluster-member tracking keying and LocationId matches. |
| hydro_lang/src/compile/embedded.rs | Updates LocationId::Cluster matches for embedded membership stream sourcing. |
| hydro_lang/src/compile/deploy.rs | Updates cluster ID extraction and (currently) fabricates LocationId::Cluster in get_all_clusters(). |
| hydro_lang/src/compile/built.rs | Initializes skip_consistency_assertions default in SimFlow construction. |
| docs/docs/hydro/reference/locations/clusters.md | Updates docs to source_cluster_membership_stream(..., nondet!(...)). |
| docs/docs/hydro/learn/quickstart/partitioned-counter.mdx | Updates quickstart to remove NoTick and explain L::NoConsistency in scaled settings. |
Comments suppressed due to low confidence (10)
hydro_lang/src/live_collections/optional.rs:1324
snapshot()returns an optional located attick.drop_consistency(), but builds theHydroNode::Batchmetadata fromtick(not the dropped-consistency tick). If the outer location’s consistency is stronger thanNoConsistency, this can desynchronizemetadata.location_idfrom the returned optional’s location. Generate the metadata from the dropped-consistency tick location.
Optional::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
},
hydro_lang/src/live_collections/keyed_singleton.rs:1436
KeyedSingleton::snapshot_atomic()returns a keyed singleton located attick.drop_consistency(), but theHydroNode::Batchmetadata is still generated fromtick. When the tick’s outer location has a stronger consistency thanNoConsistency, this makesmetadata.location_iddiverge from the returned collection’s location. Generate metadata from the dropped-consistency tick location used for the output.
KeyedSingleton::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
},
hydro_lang/src/live_collections/stream/mod.rs:1959
batch()returns a stream located attick.drop_consistency(), but theHydroNode::Batchmetadata is generated fromtick(not the dropped-consistency tick). This can makemetadata.location_iddisagree with the returned collection’s location, which can misplace nodes in downstream builders/snapshots when consistency differs. Generate the metadata from the same (dropped-consistency) tick location used for the output stream.
Stream::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
},
hydro_lang/src/live_collections/stream/mod.rs:2908
batch_atomic()constructs the output stream attick.drop_consistency(), but buildsHydroNode::Batchmetadata fromtickinstead of the dropped-consistency tick. If the tick’s outer location has a stronger consistency thanNoConsistency, the node metadata’slocation_idcan diverge from the returned collection location. Use the dropped-consistency tick location when generating the node metadata.
Stream::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
},
hydro_lang/src/live_collections/keyed_stream/mod.rs:2619
KeyedStream::batch()returns a keyed stream ontick.drop_consistency(), but theHydroNode::Batchmetadata is generated fromtick(with original consistency). This can causemetadata.location_idto disagree with the returned stream’s location when consistency differs, which may break graph partitioning/building. Generate the metadata from the same dropped-consistency tick used for the output location.
KeyedStream::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick.new_node_metadata(
KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
),
hydro_lang/src/live_collections/keyed_stream/mod.rs:2755
KeyedStream::batch_atomic()returns a keyed stream located attick.drop_consistency(), but theHydroNode::Batchmetadata is still generated fromtick. If the input tick location carries stronger consistency thanNoConsistency, this will makemetadata.location_iddisagree with the returned keyed stream’s location. Use the dropped-consistency tick for metadata generation as well.
KeyedStream::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick.new_node_metadata(
KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
),
},
hydro_lang/src/live_collections/singleton.rs:1092
snapshot_atomic()constructs the output singleton attick.drop_consistency(), but builds theHydroNode::Batchmetadata fromtick(original consistency). This can make the node metadata’slocation_idinconsistent with the returned singleton’s location when consistency differs. Generate metadata from the dropped-consistency tick location used for the output.
Singleton::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
},
hydro_lang/src/live_collections/keyed_singleton.rs:1410
KeyedSingleton::snapshot()returns a keyed singleton located attick.drop_consistency(), but theHydroNode::Batchmetadata is generated fromtick(original consistency). This can lead tometadata.location_iddisagreeing with the returned collection’s location when consistency differs. Generate metadata from the dropped-consistency tick location used for the output.
KeyedSingleton::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
},
hydro_lang/src/live_collections/singleton.rs:1162
snapshot()returns a singleton located attick.drop_consistency(), but theHydroNode::Batchmetadata is generated fromtickrather than the dropped-consistency tick. If the tick’s outer location has non-NoConsistencyconsistency, this can cause a mismatch between the returned singleton location andmetadata.location_id. Generate metadata from the dropped-consistency tick location.
Singleton::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
},
hydro_lang/src/live_collections/optional.rs:1254
snapshot_atomic()returns an optional located attick.drop_consistency(), but constructsHydroNode::Batchmetadata fromtick(original consistency). This can makemetadata.location_iddisagree with the returned optional’s location when consistency differs, which can break graph partitioning/building. Generate metadata from the dropped-consistency tick location instead.
Optional::new(
tick.drop_consistency(),
HydroNode::Batch {
inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
metadata: tick
.new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
},
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> { | ||
| self.location_names | ||
| .iter() | ||
| .filter_map(|(location_key, location_name)| { | ||
| self.clusters | ||
| .get(location_key) | ||
| .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster)) | ||
| self.clusters.get(location_key).map(|cluster| { | ||
| ( | ||
| LocationId::Cluster( | ||
| location_key, | ||
| crate::location::dynamic::ClusterConsistency::EventuallyConsistent, | ||
| ), | ||
| &**location_name, | ||
| cluster, | ||
| ) | ||
| }) |
There was a problem hiding this comment.
get_all_clusters() fabricates a LocationId::Cluster(.., EventuallyConsistent) for every cluster, which can misrepresent the cluster’s actual consistency parameter and can break equality/keying if callers compare LocationIds. Prefer returning a LocationId that matches the underlying cluster location ID (or use NoConsistency if this iterator is meant to return a raw cluster root ID).
| let ids = track_membership(self.location.source_cluster_members(to)); | ||
| let ids = track_membership(self.location.source_cluster_membership_stream( | ||
| to, | ||
| nondet!(/** droppped prefixes don't affect broadcast */), |
There was a problem hiding this comment.
Typo in the non-determinism rationale: "droppped prefixes" has an extra "p".
| nondet!(/** droppped prefixes don't affect broadcast */), | |
| nondet!(/** dropped prefixes don't affect broadcast */), |
| /// Location type with consistency guarantees dropped for the live collection on it. | ||
| type NoConsistency: Location<'a, NoConsistency = Self::NoConsistency>; |
There was a problem hiding this comment.
nit: Rename to DropConsistency to match method / clearer verbiage :)
| .clone() | ||
| .drop_consistency() |
There was a problem hiding this comment.
Doesn't .drop_consistency() do the cloning itself?
| let (r_highest_seq_complete_cycle, r_highest_seq) = replica_tick | ||
| .clone() | ||
| .drop_consistency() | ||
| .cycle::<Optional<usize, Tick<L::NoConsistency>, _>>( |
| /// A marker trait for levels of consistency that can be guaranteed for a live collection placed | ||
| /// across members of a cluster. | ||
| pub trait Consistency { | ||
| /// Gets the runtime enum variant associated with this consistency level. | ||
| fn consistency() -> ClusterConsistency; | ||
| } |
There was a problem hiding this comment.
Should we later do a big refactor to get some consistent (lol) naming conventions for these?
ConsistencyMarker
ConsistencyEnum
ConsistencyType
Or something
| /// Helper trait for live collections which can be batched back into a tick from a matching | ||
| /// atomic region. Used in [`super::Stream::across_ticks`] | ||
| pub trait BatchAtomic { | ||
| pub trait BatchAtomic<'a> { | ||
| /// The type of the stream when returned to the tick. |
There was a problem hiding this comment.
Whats the 'a for? To ensure early binding? (<- Idk what that means)
| parse_quote! { | ||
| #out_ident = #in_ident; | ||
| }, | ||
| None, | ||
| None, |
There was a problem hiding this comment.
How does this magical dfir assignment result in consistency being asserted?
|
Have skimmed the entire diff now |
There was a problem hiding this comment.
This is awesome.
High level comments on polish:
- to avoid increasing the steepness of the learning curve, what can we do to make consistency checking completely ignorable for the beginner?
- I'm concerned that we can't track consistency across network boundaries.
- some of the diagnostics we're providing are ambiguous (use of Cast in the IR for weakening, identifying unnecessary batching as a loss of consistency)
| } else { | ||
| KeyedSingleton::new( | ||
| self.location.drop_consistency(), | ||
| HydroNode::Cast { |
There was a problem hiding this comment.
This Cast is less helpful than the explicit AssertIsConsistent we introduce in the other direction. It'd be nicer for IR analysis etc to have an explicit WeakenConsistency, or for Casts to have an extra field to explain their nature.
| } | ||
|
|
||
| #[doc(hidden)] | ||
| fn make_from_nondet(l2: Self::NoConsistency) -> Self; |
There was a problem hiding this comment.
maybe rename to from_no_consistency
| let ids = track_membership(self.location.source_cluster_members(to)); | ||
| let ids = track_membership(self.location.source_cluster_membership_stream( | ||
| to, | ||
| nondet!(/** droppped prefixes don't affect broadcast */), |
There was a problem hiding this comment.
nit: type droppped -> dropped.
| impl Consistency for EventualConsistency { | ||
| fn consistency() -> ClusterConsistency { | ||
| ClusterConsistency::EventuallyConsistent | ||
| } |
There was a problem hiding this comment.
Arguably annoying that we switch terminology from noun EventualConsistency to adjective EventuallyConsistent -- have to use the right form in the right place. It parses as English better this way but slightly annoying for autocomplete etc to have 2 different words for the same thing.
There was a problem hiding this comment.
Also as discussed this is subtly different than common uses of Eventual Consistency -- more general (also covers Sequential Consistency for TotalOrder), and also in a context without practical quiescence. Might benefit us to use a term that isn't overloaded, like FutureConsistent.
| `)}</CodeBlock> | ||
|
|
||
| The `L: Location<'a> + NoTick` bound means the function works with any location that that is outside a `Tick` (like `Process` or `Cluster`). This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type. | ||
| The `L: Location<'a>` bound means the function works with any location. This lets you reuse the same counter logic whether it runs on a single process or several cluster members. The rest of the function body remains unchanged, it already works with any location type. You'll notice that we also had to modify the location type parameter for the second output stream (the lookup responses), to be `L::NoConsistency`. In addition to tracking determinism, Hydro also tracks whether _eventual consistency_ is guaranteed across several replicas. Now that we will be scaling up the service, Hydro points out that even if the requests are the same across several replicas, the responses may be different because the service uses an asynchronous snapshot of the counter state. Hence, the responses stream is tagged with `L::NoConsistency`. |
There was a problem hiding this comment.
We should have a pointer here to a more involved doc on consistency in Hydro, and some stub documentation page for that content. I.e. acknowledge that this is an insufficient comment for real understanding, and more help is coming. We should also file an issue to write that doc.
As a subtlety -- we're doing something a bit different than what eventual consistency usually implies: in the TotalOrder case we're doing a version of sequential consistency, and there's per-key sequential consistency we can show as well.
There was a problem hiding this comment.
Final question is whether we should use some blunt instrument (e.g. a feature flag, a DontCare setting) for turning off/on consistency analysis, so that beginners can avoid thinking about it. My understanding is that Rust can't do defaults for these properties on our behalf, but maybe there's a clean way to make it opt-in complexity?
| responses: Stream<(K, V), L, Unbounded, NoOrder>, | ||
| metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>, | ||
| ) -> JoinResponses<K, M, V, L> { | ||
| ) -> JoinResponses<K, M, V, L::NoConsistency> { |
There was a problem hiding this comment.
Is NoConsistency necessary? I'd guess that it should be generic and inherited from elsewhere in the flow. I can definitely imagine that req/resp patterns should be eventually consistent in the set domain.
| replica_tick.cycle::<Stream<SequencedKv<K, V>, Tick<L>, Bounded>>(); | ||
| let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick | ||
| .clone() | ||
| .drop_consistency() |
There was a problem hiding this comment.
similar question to enumerate. why is payload sequencing inherently inconsistent if the input is TotalOrder? Are we explicitly shuffling through the buffers? If so why?
| p_max_slot: Optional<usize, Tick<L>, Bounded>, | ||
| c_to_proposers: Stream<P, Tick<L>, Bounded>, | ||
| ) -> Stream<(usize, P), Tick<L>, Bounded> { | ||
| ) -> Stream<(usize, P), Tick<L::NoConsistency>, Bounded> { |
There was a problem hiding this comment.
ideally I'd love to see as few constraints in the implementation as possible, just the consistency constraints at the edges, and see how the consistency freedom/requirements/inheritance works out. I wonder if this label could be made generic in that spirit.
| .source_cluster_members(&cluster) | ||
| .source_cluster_membership_stream( | ||
| &cluster, | ||
| nondet!(/** example designed for a static cluster */), |
There was a problem hiding this comment.
consider using closed_broadcast now that it's merged in.
| input | ||
| .location() | ||
| .source_cluster_members(dst) | ||
| .source_cluster_membership_stream(dst, nondet!(/** test */)) |
There was a problem hiding this comment.
closed_broadcast instead?
Breaking Changes:
NoTick, enforcement of no nested ticks is now handled at staging timeClusterand a field toLocationId::Clusterthat tracks the consistency guarantee of the live collection at that locationLocation::source_cluster_membership_stream(renamed + deprecated fromLocation::source_cluster_members) now requires anondet!since late-joiners will see a non-deterministic suffix of the stream. In a follow up PR, we will replace the now-deprecatedsource_cluster_membersto instead offer an API that aggregates into aKeyedSingleton, which will be guaranteed deterministic and consistent