Skip to content

Commit 9cdfaf8

Browse files
committed
feat(hydro_lang): add API for safely broadcasting a stream across several keys
1 parent e2abfae commit 9cdfaf8

2 files changed

Lines changed: 18 additions & 14 deletions

File tree

hydro_lang/src/live_collections/keyed_singleton.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,19 @@ where
759759
}
760760

761761
impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
762+
pub fn with_identical_values<T2, O2: Ordering, R: Retries>(
763+
self,
764+
other: Stream<T2, Tick<L>, Bounded, O2, R>,
765+
) -> KeyedStream<K, T2, Tick<L>, Bounded, O2, R>
766+
where
767+
K: Clone,
768+
T2: Clone,
769+
{
770+
self.keys().weaken_retries().cross_product_nested_loop(other).into_keyed().assume_ordering(
771+
nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
772+
)
773+
}
774+
762775
/// Asynchronously yields this keyed singleton outside the tick, which will
763776
/// be asynchronously updated with the latest set of entries inside the tick.
764777
///

hydro_lang/src/live_collections/stream/networking.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::location::dynamic::DynLocation;
1818
use crate::location::external_process::ExternalBincodeStream;
1919
use crate::location::tick::NoAtomic;
2020
use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21-
use crate::nondet::{NonDet, nondet};
21+
use crate::nondet::NonDet;
2222
use crate::staging_util::get_this_crate;
2323

2424
// same as the one in `hydro_std`, but internal use only
@@ -185,15 +185,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>
185185
{
186186
let ids = track_membership(self.location.source_cluster_members(other));
187187
let join_tick = self.location.tick();
188-
let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
188+
let current_members = ids.snapshot(&join_tick, nondet_membership);
189189

190190
current_members
191-
.weaker_retries()
192-
.assume_ordering::<TotalOrder>(
193-
nondet!(/** we send to each member independently, order does not matter */),
194-
)
195-
.cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
196-
.weaken_ordering::<O>()
191+
.with_identical_values(self.batch(&join_tick, nondet_membership))
197192
.all_ticks()
198193
.demux_bincode(other)
199194
}
@@ -511,14 +506,10 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>
511506
{
512507
let ids = track_membership(self.location.source_cluster_members(other));
513508
let join_tick = self.location.tick();
514-
let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
509+
let current_members = ids.snapshot(&join_tick, nondet_membership);
515510

516511
current_members
517-
.weaker_retries()
518-
.assume_ordering::<TotalOrder>(
519-
nondet!(/** we send to each member independently, order does not matter */),
520-
)
521-
.cross_product_nested_loop(self.batch(&join_tick, nondet_membership))
512+
.with_identical_values(self.batch(&join_tick, nondet_membership))
522513
.all_ticks()
523514
.demux_bincode(other)
524515
}

0 commit comments

Comments
 (0)