Skip to content

Commit 2f1cd7c

Browse files
committed
perf: Add MergeOrdered IR node with proper sim interleaving hooks
Addresses #2768 Replace the sliced!-based merge_ordered implementation with a dedicated HydroNode::MergeOrdered IR node that has distinct codegen for production and simulation. Production codegen: - Emits union() DFIR operator (both inputs pulled in same stratum) Simulation codegen (two modes): - Bounded/tick: MergeOrderedHook (inline) buffers both batches and generates a valid interleaving via boolean coin flips, preserving per-input order. Explores C(a+b, a) states instead of (a+b)!. - Unbounded/top-level: TopLevelMergeOrderedHook releases one element at a time from the front of either input queue, allowing feedback cycles to deliver elements between releases. API changes: - Removed NoTick bound from merge_ordered — works on any Location and Boundedness now. When bounded, the nondet is still explored in sim. - State space reduced from 26 to 6 instances in the existing test (2 elements per input: C(4,2) = 6 valid interleavings). Tests added: - sim_merge_ordered: ordering preservation assertion (invalid interleavings like [2,1,3,4] are never produced) - sim_merge_ordered_one_empty: pass-through when one input is empty - sim_merge_ordered_cycle_back: feedback cycle where a cycled-back element arrives before elements on the other input - sim_merge_ordered_delayed: delayed element on one input - deploy_merge_ordered_delayed: production localhost test with timing
1 parent 5aa4324 commit 2f1cd7c

5 files changed

Lines changed: 793 additions & 19 deletions

File tree

hydro_lang/src/compile/ir/mod.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,17 @@ pub trait DfirBuilder {
439439
op_meta: &HydroIrOpMetadata,
440440
);
441441

442+
fn merge_ordered(
443+
&mut self,
444+
location: &LocationId,
445+
first_ident: syn::Ident,
446+
second_ident: syn::Ident,
447+
out_ident: &syn::Ident,
448+
in_kind: &CollectionKind,
449+
op_meta: &HydroIrOpMetadata,
450+
operator_tag: Option<&str>,
451+
);
452+
442453
#[expect(clippy::too_many_arguments, reason = "TODO")]
443454
fn create_network(
444455
&mut self,
@@ -596,6 +607,28 @@ impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
596607
);
597608
}
598609

610+
fn merge_ordered(
611+
&mut self,
612+
location: &LocationId,
613+
first_ident: syn::Ident,
614+
second_ident: syn::Ident,
615+
out_ident: &syn::Ident,
616+
_in_kind: &CollectionKind,
617+
_op_meta: &HydroIrOpMetadata,
618+
operator_tag: Option<&str>,
619+
) {
620+
let builder = self.get_dfir_mut(location);
621+
builder.add_dfir(
622+
parse_quote! {
623+
#out_ident = union();
624+
#first_ident -> [0]#out_ident;
625+
#second_ident -> [1]#out_ident;
626+
},
627+
None,
628+
operator_tag,
629+
);
630+
}
631+
599632
fn create_network(
600633
&mut self,
601634
from: &LocationId,
@@ -2036,6 +2069,12 @@ pub enum HydroNode {
20362069
metadata: HydroIrMetadata,
20372070
},
20382071

2072+
MergeOrdered {
2073+
first: Box<HydroNode>,
2074+
second: Box<HydroNode>,
2075+
metadata: HydroIrMetadata,
2076+
},
2077+
20392078
ChainFirst {
20402079
first: Box<HydroNode>,
20412080
second: Box<HydroNode>,
@@ -2312,6 +2351,11 @@ impl HydroNode {
23122351
transform(second.as_mut(), seen_tees);
23132352
}
23142353

2354+
HydroNode::MergeOrdered { first, second, .. } => {
2355+
transform(first.as_mut(), seen_tees);
2356+
transform(second.as_mut(), seen_tees);
2357+
}
2358+
23152359
HydroNode::ChainFirst { first, second, .. } => {
23162360
transform(first.as_mut(), seen_tees);
23172361
transform(second.as_mut(), seen_tees);
@@ -2464,6 +2508,15 @@ impl HydroNode {
24642508
second: Box::new(second.deep_clone(seen_tees)),
24652509
metadata: metadata.clone(),
24662510
},
2511+
HydroNode::MergeOrdered {
2512+
first,
2513+
second,
2514+
metadata,
2515+
} => HydroNode::MergeOrdered {
2516+
first: Box::new(first.deep_clone(seen_tees)),
2517+
second: Box::new(second.deep_clone(seen_tees)),
2518+
metadata: metadata.clone(),
2519+
},
24672520
HydroNode::ChainFirst {
24682521
first,
24692522
second,
@@ -3180,6 +3233,35 @@ impl HydroNode {
31803233
ident_stack.push(chain_ident);
31813234
}
31823235

3236+
HydroNode::MergeOrdered { first, metadata, .. } => {
3237+
let second_ident = ident_stack.pop().unwrap();
3238+
let first_ident = ident_stack.pop().unwrap();
3239+
3240+
let merge_ident =
3241+
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3242+
3243+
match builders_or_callback {
3244+
BuildersOrCallback::Builders(graph_builders) => {
3245+
graph_builders.merge_ordered(
3246+
&first.metadata().location_id,
3247+
first_ident,
3248+
second_ident,
3249+
&merge_ident,
3250+
&first.metadata().collection_kind,
3251+
&metadata.op,
3252+
Some(&next_stmt_id.to_string()),
3253+
);
3254+
}
3255+
BuildersOrCallback::Callback(_, node_callback) => {
3256+
node_callback(node, next_stmt_id);
3257+
}
3258+
}
3259+
3260+
*next_stmt_id += 1;
3261+
3262+
ident_stack.push(merge_ident);
3263+
}
3264+
31833265
HydroNode::ChainFirst { .. } => {
31843266
let second_ident = ident_stack.pop().unwrap();
31853267
let first_ident = ident_stack.pop().unwrap();
@@ -4247,6 +4329,7 @@ impl HydroNode {
42474329
| HydroNode::EndAtomic { .. }
42484330
| HydroNode::Batch { .. }
42494331
| HydroNode::Chain { .. }
4332+
| HydroNode::MergeOrdered { .. }
42504333
| HydroNode::ChainFirst { .. }
42514334
| HydroNode::CrossProduct { .. }
42524335
| HydroNode::CrossSingleton { .. }
@@ -4324,6 +4407,7 @@ impl HydroNode {
43244407
HydroNode::EndAtomic { metadata, .. } => metadata,
43254408
HydroNode::Batch { metadata, .. } => metadata,
43264409
HydroNode::Chain { metadata, .. } => metadata,
4410+
HydroNode::MergeOrdered { metadata, .. } => metadata,
43274411
HydroNode::ChainFirst { metadata, .. } => metadata,
43284412
HydroNode::CrossProduct { metadata, .. } => metadata,
43294413
HydroNode::CrossSingleton { metadata, .. } => metadata,
@@ -4378,6 +4462,7 @@ impl HydroNode {
43784462
HydroNode::EndAtomic { metadata, .. } => metadata,
43794463
HydroNode::Batch { metadata, .. } => metadata,
43804464
HydroNode::Chain { metadata, .. } => metadata,
4465+
HydroNode::MergeOrdered { metadata, .. } => metadata,
43814466
HydroNode::ChainFirst { metadata, .. } => metadata,
43824467
HydroNode::CrossProduct { metadata, .. } => metadata,
43834468
HydroNode::CrossSingleton { metadata, .. } => metadata,
@@ -4436,6 +4521,9 @@ impl HydroNode {
44364521
HydroNode::Chain { first, second, .. } => {
44374522
vec![first, second]
44384523
}
4524+
HydroNode::MergeOrdered { first, second, .. } => {
4525+
vec![first, second]
4526+
}
44394527
HydroNode::ChainFirst { first, second, .. } => {
44404528
vec![first, second]
44414529
}
@@ -4526,6 +4614,13 @@ impl HydroNode {
45264614
HydroNode::Chain { first, second, .. } => {
45274615
format!("Chain({}, {})", first.print_root(), second.print_root())
45284616
}
4617+
HydroNode::MergeOrdered { first, second, .. } => {
4618+
format!(
4619+
"MergeOrdered({}, {})",
4620+
first.print_root(),
4621+
second.print_root()
4622+
)
4623+
}
45294624
HydroNode::ChainFirst { first, second, .. } => {
45304625
format!(
45314626
"ChainFirst({}, {})",

0 commit comments

Comments
 (0)