Skip to content

Commit 0ced1e2

Browse files
committed
Add MergeOrdered IR node with proper sim interleaving hooks
Addresses #2768 Replace the sliced!-based merge_ordered implementation with a dedicated HydroNode::MergeOrdered IR node that has distinct codegen for production and simulation. Production codegen: - Emits union() DFIR operator (both inputs pulled in same stratum) Simulation codegen (two modes): - Bounded/tick: MergeOrderedHook (inline) buffers both batches and generates a valid interleaving via boolean coin flips, preserving per-input order. Explores C(a+b, a) states instead of (a+b)!. - Unbounded/top-level: TopLevelMergeOrderedHook releases one element at a time from the front of either input queue, allowing feedback cycles to deliver elements between releases. API changes: - Removed NoTick bound from merge_ordered — works on any Location and Boundedness now. When bounded, the nondet is still explored in sim. - State space reduced from 26 to 6 instances in the existing test (2 elements per input: C(4,2) = 6 valid interleavings). Tests added: - sim_merge_ordered: ordering preservation assertion (invalid interleavings like [2,1,3,4] are never produced) - sim_merge_ordered_one_empty: pass-through when one input is empty - sim_merge_ordered_cycle_back: feedback cycle where a cycled-back element arrives before elements on the other input - sim_merge_ordered_delayed: delayed element on one input - deploy_merge_ordered_delayed: production localhost test with timing
1 parent 5aa4324 commit 0ced1e2

5 files changed

Lines changed: 689 additions & 19 deletions

File tree

hydro_lang/src/compile/ir/mod.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,16 @@ pub trait DfirBuilder {
439439
op_meta: &HydroIrOpMetadata,
440440
);
441441

442+
fn merge_ordered(
443+
&mut self,
444+
location: &LocationId,
445+
first_ident: syn::Ident,
446+
second_ident: syn::Ident,
447+
out_ident: &syn::Ident,
448+
in_kind: &CollectionKind,
449+
op_meta: &HydroIrOpMetadata,
450+
);
451+
442452
#[expect(clippy::too_many_arguments, reason = "TODO")]
443453
fn create_network(
444454
&mut self,
@@ -596,6 +606,27 @@ impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
596606
);
597607
}
598608

609+
fn merge_ordered(
610+
&mut self,
611+
location: &LocationId,
612+
first_ident: syn::Ident,
613+
second_ident: syn::Ident,
614+
out_ident: &syn::Ident,
615+
_in_kind: &CollectionKind,
616+
_op_meta: &HydroIrOpMetadata,
617+
) {
618+
let builder = self.get_dfir_mut(location);
619+
builder.add_dfir(
620+
parse_quote! {
621+
#out_ident = union();
622+
#first_ident -> [0]#out_ident;
623+
#second_ident -> [1]#out_ident;
624+
},
625+
None,
626+
None,
627+
);
628+
}
629+
599630
fn create_network(
600631
&mut self,
601632
from: &LocationId,
@@ -2036,6 +2067,12 @@ pub enum HydroNode {
20362067
metadata: HydroIrMetadata,
20372068
},
20382069

2070+
MergeOrdered {
2071+
first: Box<HydroNode>,
2072+
second: Box<HydroNode>,
2073+
metadata: HydroIrMetadata,
2074+
},
2075+
20392076
ChainFirst {
20402077
first: Box<HydroNode>,
20412078
second: Box<HydroNode>,
@@ -2312,6 +2349,11 @@ impl HydroNode {
23122349
transform(second.as_mut(), seen_tees);
23132350
}
23142351

2352+
HydroNode::MergeOrdered { first, second, .. } => {
2353+
transform(first.as_mut(), seen_tees);
2354+
transform(second.as_mut(), seen_tees);
2355+
}
2356+
23152357
HydroNode::ChainFirst { first, second, .. } => {
23162358
transform(first.as_mut(), seen_tees);
23172359
transform(second.as_mut(), seen_tees);
@@ -2464,6 +2506,15 @@ impl HydroNode {
24642506
second: Box::new(second.deep_clone(seen_tees)),
24652507
metadata: metadata.clone(),
24662508
},
2509+
HydroNode::MergeOrdered {
2510+
first,
2511+
second,
2512+
metadata,
2513+
} => HydroNode::MergeOrdered {
2514+
first: Box::new(first.deep_clone(seen_tees)),
2515+
second: Box::new(second.deep_clone(seen_tees)),
2516+
metadata: metadata.clone(),
2517+
},
24672518
HydroNode::ChainFirst {
24682519
first,
24692520
second,
@@ -3180,6 +3231,34 @@ impl HydroNode {
31803231
ident_stack.push(chain_ident);
31813232
}
31823233

3234+
HydroNode::MergeOrdered { first, metadata, .. } => {
3235+
let second_ident = ident_stack.pop().unwrap();
3236+
let first_ident = ident_stack.pop().unwrap();
3237+
3238+
let merge_ident =
3239+
syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3240+
3241+
match builders_or_callback {
3242+
BuildersOrCallback::Builders(graph_builders) => {
3243+
graph_builders.merge_ordered(
3244+
&first.metadata().location_id,
3245+
first_ident,
3246+
second_ident,
3247+
&merge_ident,
3248+
&first.metadata().collection_kind,
3249+
&metadata.op,
3250+
);
3251+
}
3252+
BuildersOrCallback::Callback(_, node_callback) => {
3253+
node_callback(node, next_stmt_id);
3254+
}
3255+
}
3256+
3257+
*next_stmt_id += 1;
3258+
3259+
ident_stack.push(merge_ident);
3260+
}
3261+
31833262
HydroNode::ChainFirst { .. } => {
31843263
let second_ident = ident_stack.pop().unwrap();
31853264
let first_ident = ident_stack.pop().unwrap();
@@ -4247,6 +4326,7 @@ impl HydroNode {
42474326
| HydroNode::EndAtomic { .. }
42484327
| HydroNode::Batch { .. }
42494328
| HydroNode::Chain { .. }
4329+
| HydroNode::MergeOrdered { .. }
42504330
| HydroNode::ChainFirst { .. }
42514331
| HydroNode::CrossProduct { .. }
42524332
| HydroNode::CrossSingleton { .. }
@@ -4324,6 +4404,7 @@ impl HydroNode {
43244404
HydroNode::EndAtomic { metadata, .. } => metadata,
43254405
HydroNode::Batch { metadata, .. } => metadata,
43264406
HydroNode::Chain { metadata, .. } => metadata,
4407+
HydroNode::MergeOrdered { metadata, .. } => metadata,
43274408
HydroNode::ChainFirst { metadata, .. } => metadata,
43284409
HydroNode::CrossProduct { metadata, .. } => metadata,
43294410
HydroNode::CrossSingleton { metadata, .. } => metadata,
@@ -4378,6 +4459,7 @@ impl HydroNode {
43784459
HydroNode::EndAtomic { metadata, .. } => metadata,
43794460
HydroNode::Batch { metadata, .. } => metadata,
43804461
HydroNode::Chain { metadata, .. } => metadata,
4462+
HydroNode::MergeOrdered { metadata, .. } => metadata,
43814463
HydroNode::ChainFirst { metadata, .. } => metadata,
43824464
HydroNode::CrossProduct { metadata, .. } => metadata,
43834465
HydroNode::CrossSingleton { metadata, .. } => metadata,
@@ -4436,6 +4518,9 @@ impl HydroNode {
44364518
HydroNode::Chain { first, second, .. } => {
44374519
vec![first, second]
44384520
}
4521+
HydroNode::MergeOrdered { first, second, .. } => {
4522+
vec![first, second]
4523+
}
44394524
HydroNode::ChainFirst { first, second, .. } => {
44404525
vec![first, second]
44414526
}
@@ -4526,6 +4611,13 @@ impl HydroNode {
45264611
HydroNode::Chain { first, second, .. } => {
45274612
format!("Chain({}, {})", first.print_root(), second.print_root())
45284613
}
4614+
HydroNode::MergeOrdered { first, second, .. } => {
4615+
format!(
4616+
"MergeOrdered({}, {})",
4617+
first.print_root(),
4618+
second.print_root()
4619+
)
4620+
}
45294621
HydroNode::ChainFirst { first, second, .. } => {
45304622
format!(
45314623
"ChainFirst({}, {})",

0 commit comments

Comments
 (0)