Skip to content

Commit 33dfda8

Browse files
jhellersteinJoe Hellersteinshadaj
authored
feat(hydro_lang): smart join that preserves ordering when right side is bounded (#2778)
## Summary When the right-hand side of a `join` is `Bounded`, the join now accumulates the right side into a hash table and streams the left side through, **preserving the left side's ordering**. When both sides are `Unbounded`, the existing symmetric hash join (`NoOrder` output) is used unchanged. ## Motivation Closes #2761. This enables coordination-free programs that join an ordered stream with a bounded lookup table to retain their ordering guarantee through the join, which is critical for proving sequential consistency in the coordination analysis. ## Design A new sealed `JoinBoundedness` trait on the `Boundedness` marker types determines the output ordering at the type level: - `Bounded` right → output preserves left's ordering (`TotalOrder`/`NoOrder`) - `Unbounded` right → output is `NoOrder` (backward compatible) The `Stream::join` signature changes from requiring both sides to have the same boundedness `B` to accepting a right side with any `B2: Boundedness + JoinBoundedness<O>`. Existing code where both sides have the same boundedness continues to compile unchanged. ## Implementation layers | Layer | Change | |-------|--------| | **DFIR operator** | New `join_multiset_half` — asymmetric hash join with stratum-delayed `build` port and streaming `probe` port | | **IR** | New `JoinBounded` variant in `HydroNode` | | **IR lowering** | `JoinBounded` lowers to `join_multiset_half` (like `AntiJoin` → `anti_join`) | | **Stream API** | `Stream::join` and `KeyedStream::join_keyed_stream` accept different boundedness on right side | | **KeyedStream** | `join_keyed_singleton` now uses `JoinBounded` since singleton is always bounded | ## Tests - 6 DFIR-level runtime tests for `join_multiset_half` (basic, ordering, no-match, multi-value, persistence, regression for persistence arg mapping) - 4 compile-time type tests verifying output ordering guarantees - All existing snapshot tests updated (paxos, two_pc) --------- Co-authored-by: Joe Hellerstein <jmhwork@amazon.com> Co-authored-by: Shadaj Laddad <shadaj@users.noreply.github.com>
1 parent 34893c2 commit 33dfda8

16 files changed

Lines changed: 815 additions & 127 deletions

File tree

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
use quote::{ToTokens, quote_spanned};
2+
use syn::parse_quote;
3+
4+
use super::{
5+
DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, PortIndexValue, RANGE_0,
6+
RANGE_1, WriteContextArgs,
7+
};
8+
use crate::graph::ops::Persistence;
9+
10+
/// > 2 input streams of type `<(K, V1)>` (build) and `<(K, V2)>` (probe),
11+
/// > with output type `<(K, (V2, V1))>`
12+
///
13+
/// An asymmetric hash join where the `build` side is accumulated first
14+
/// (stratum-delayed) and then the `probe` side streams through, emitting
15+
/// matches. This preserves the probe side's arrival order.
16+
///
17+
/// ```dfir
18+
/// source_iter(vec![("cat", 'x'), ("dog", 'y')]) -> [build]my_join;
19+
/// source_iter(vec![("cat", 1), ("dog", 2), ("cat", 3)]) -> [probe]my_join;
20+
/// my_join = join_multiset_half()
21+
/// -> assert_eq([("cat", (1, 'x')), ("dog", (2, 'y')), ("cat", (3, 'x'))]);
22+
/// ```
23+
pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints {
24+
name: "join_multiset_half",
25+
categories: &[OperatorCategory::MultiIn],
26+
hard_range_inn: &(2..=2),
27+
soft_range_inn: &(2..=2),
28+
hard_range_out: RANGE_1,
29+
soft_range_out: RANGE_1,
30+
num_args: 0,
31+
persistence_args: &(0..=2),
32+
type_args: RANGE_0,
33+
is_external_input: false,
34+
has_singleton_output: false,
35+
flo_type: None,
36+
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { build, probe })),
37+
ports_out: None,
38+
input_delaytype_fn: |idx| match idx {
39+
PortIndexValue::Path(path) if "build" == path.to_token_stream().to_string() => {
40+
Some(DelayType::Stratum)
41+
}
42+
_else => None,
43+
},
44+
write_fn: |wc @ &WriteContextArgs {
45+
root,
46+
context,
47+
df_ident,
48+
op_span,
49+
work_fn_async,
50+
ident,
51+
is_pull,
52+
inputs,
53+
..
54+
},
55+
diagnostics| {
56+
assert!(is_pull);
57+
58+
let persistences: [_; 2] = wc.persistence_args_disallow_mutable(diagnostics);
59+
60+
let probe_ident = wc.make_ident("probe");
61+
let build_ident = wc.make_ident("build");
62+
let probe_borrow = wc.make_ident("probe_borrow");
63+
let build_borrow = wc.make_ident("build_borrow");
64+
65+
// persistences[0] = build (first port), persistences[1] = probe (second port)
66+
let probe_persist = match persistences[1] {
67+
Persistence::None | Persistence::Tick => false,
68+
Persistence::Loop | Persistence::Static => true,
69+
Persistence::Mutable => unreachable!(),
70+
};
71+
72+
let write_prologue_probe = probe_persist.then(|| {
73+
quote_spanned! {op_span=>
74+
let #probe_ident = #df_ident.add_state(std::cell::RefCell::new(
75+
::std::vec::Vec::new()
76+
));
77+
}
78+
});
79+
let write_prologue_after_probe = probe_persist.then(|| wc
80+
.persistence_as_state_lifespan(persistences[1])
81+
.map(|lifespan| quote_spanned! {op_span=>
82+
#[allow(clippy::redundant_closure_call)]
83+
#df_ident.set_state_lifespan_hook(
84+
#probe_ident, #lifespan, move |rcell| { rcell.borrow_mut().clear(); },
85+
);
86+
})).flatten();
87+
88+
let write_prologue_build = quote_spanned! {op_span=>
89+
let #build_ident = #df_ident.add_state(std::cell::RefCell::new(
90+
#root::rustc_hash::FxHashMap::default()
91+
));
92+
};
93+
let write_prologue_after_build = wc
94+
.persistence_as_state_lifespan(persistences[0])
95+
.map(|lifespan| quote_spanned! {op_span=>
96+
#[allow(clippy::redundant_closure_call)]
97+
#df_ident.set_state_lifespan_hook(
98+
#build_ident, #lifespan, move |rcell| { rcell.borrow_mut().clear(); },
99+
);
100+
}).unwrap_or_default();
101+
102+
let input_build = &inputs[0]; // build before probe (stratum-delayed comes first)
103+
let input_probe = &inputs[1];
104+
105+
let accum_build = quote_spanned! {op_span=>
106+
let fut = #root::dfir_pipes::pull::Pull::for_each(#input_build, |(k, v)| {
107+
#build_borrow.entry(k).or_insert_with(::std::vec::Vec::new).push(v);
108+
});
109+
let () = #work_fn_async(fut).await;
110+
};
111+
112+
let write_iterator = if !probe_persist {
113+
quote_spanned! {op_span=>
114+
let mut #build_borrow = unsafe {
115+
// SAFETY: handle from `#df_ident`.
116+
#context.state_ref_unchecked(#build_ident)
117+
}.borrow_mut();
118+
119+
let #ident = {
120+
#accum_build
121+
122+
// Bound K/V types explicitly to prevent inference failures across subgraph handoffs.
123+
#[allow(clippy::clone_on_copy, noop_method_call)]
124+
#[inline(always)]
125+
fn probe_join<'a, K, V1, V2, I>(
126+
probe: I,
127+
build_state: &'a #root::rustc_hash::FxHashMap<K, ::std::vec::Vec<V1>>,
128+
) -> impl 'a + #root::dfir_pipes::pull::Pull<Item = (K, (V2, V1)), Meta = ()>
129+
where
130+
K: ::std::cmp::Eq + ::std::hash::Hash + ::std::clone::Clone + 'a,
131+
V1: ::std::clone::Clone + 'a,
132+
V2: ::std::clone::Clone + 'a,
133+
I: 'a + #root::dfir_pipes::pull::Pull<Item = (K, V2), Meta = ()>,
134+
{
135+
#root::dfir_pipes::pull::Pull::flat_map(probe, move |(k, v_probe)| {
136+
build_state
137+
.get(&k)
138+
.map(|vals| vals.iter().map(|v_build| (k.clone(), (v_probe.clone(), v_build.clone()))).collect::<::std::vec::Vec<_>>())
139+
.unwrap_or_default()
140+
.into_iter()
141+
})
142+
}
143+
probe_join(#input_probe, &*#build_borrow)
144+
};
145+
}
146+
} else {
147+
quote_spanned! {op_span =>
148+
let (mut #build_borrow, mut #probe_borrow) = unsafe {
149+
// SAFETY: handles from `#df_ident`.
150+
(
151+
#context.state_ref_unchecked(#build_ident).borrow_mut(),
152+
#context.state_ref_unchecked(#probe_ident).borrow_mut(),
153+
)
154+
};
155+
156+
let #ident = {
157+
#accum_build
158+
159+
let replay_idx = if #context.is_first_run_this_tick() {
160+
0
161+
} else {
162+
#probe_borrow.len()
163+
};
164+
165+
// Accum into probe vec
166+
let fut = #root::dfir_pipes::pull::Pull::for_each(#input_probe, |kv| {
167+
#probe_borrow.push(kv);
168+
});
169+
let () = #work_fn_async(fut).await;
170+
171+
// Replay out of probe vec
172+
#[allow(clippy::clone_on_copy, noop_method_call)]
173+
let iter = #probe_borrow[replay_idx..].iter().flat_map(|(k, v_probe)| {
174+
#build_borrow
175+
.get(k)
176+
.map(|vals: &::std::vec::Vec<_>| {
177+
vals.iter().map(|v_build| (k.clone(), (v_probe.clone(), v_build.clone()))).collect::<::std::vec::Vec<_>>()
178+
})
179+
.unwrap_or_default()
180+
});
181+
#root::dfir_pipes::pull::iter(iter)
182+
};
183+
}
184+
};
185+
186+
Ok(OperatorWriteOutput {
187+
write_prologue: quote_spanned! {op_span=>
188+
#write_prologue_probe
189+
#write_prologue_build
190+
},
191+
write_prologue_after: quote_spanned! {op_span=>
192+
#write_prologue_after_probe
193+
#write_prologue_after_build
194+
},
195+
write_iterator,
196+
..Default::default()
197+
})
198+
},
199+
};

dfir_lang/src/graph/ops/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ declare_ops![
310310
join_fused_lhs::JOIN_FUSED_LHS,
311311
join_fused_rhs::JOIN_FUSED_RHS,
312312
join_multiset::JOIN_MULTISET,
313+
join_multiset_half::JOIN_MULTISET_HALF,
313314
fold_keyed::FOLD_KEYED,
314315
reduce_keyed::REDUCE_KEYED,
315316
repeat_n::REPEAT_N,

0 commit comments

Comments
 (0)