Skip to content

Commit 5aa4324

Browse files
jhellersteinJoe HellersteinKiro
authored
feat(hydro_lang): make IR types serializable via serde (#2808)
Closes #2802 Adds `serde::Serialize` to the Hydro IR types so the IR can be emitted as JSON for external analysis. ## Changes ### Serialization support (`hydro_lang/src/compile/ir/mod.rs`) - Derive `serde::Serialize` on `HydroNode`, `HydroRoot`, `HydroIrMetadata`, `HydroIrOpMetadata`, `CollectionKind`, `BoundKind`, `StreamOrder`, `StreamRetry`, `SingletonBoundKind`, `KeyedSingletonBoundKind`, `ClusterMembersState` - Manual `Serialize` impls for types wrapping `syn` AST nodes (`DebugExpr`, `DebugType`, `DebugInstantiate`, `HydroSource`) — serialized as their string representation - Cycle-safe `SharedNode` serialization using a thread-local dedup tracker (mirrors the existing `PRINTED_TEES` pattern used by `Debug`). First occurrence emits `{"$shared": id, "node": ...}`; subsequent occurrences emit `"<shared N>"` - `#[serde(skip)]` on `Backtrace`, `syn::Ident` (in `EmbeddedOutput`), and `NetworkHint` (in `ExternalInput`) ### Public API (`hydro_lang/src/compile/built.rs`) - `BuiltFlow::ir_json()` — serializes the IR as pretty-printed JSON (gated behind `runtime_support` feature), using `serialize_dedup_shared` for clean scoping ### Networking types (`hydro_lang/src/networking/mod.rs`) - Derive `serde::Serialize` on `TcpFault` and `NetworkingInfo` --------- Co-authored-by: Joe Hellerstein <jmhwork@amazon.com> Co-authored-by: Kiro <kiro@amazon.com>
1 parent 35c2d95 commit 5aa4324

7 files changed

Lines changed: 459 additions & 18 deletions

File tree

hydro_lang/src/compile/built.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ impl<'a> BuiltFlow<'a> {
4949
&self.ir
5050
}
5151

52+
/// Serialize the IR as JSON.
53+
#[cfg(feature = "runtime_support")]
54+
pub fn ir_json(&self) -> Result<String, serde_json::Error> {
55+
super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
56+
}
57+
5258
/// Returns all raw location ID -> location name mappings.
5359
pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
5460
&self.location_names

hydro_lang/src/compile/ir/backtrace.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,22 @@ impl Backtrace {
139139
element
140140
})
141141
}
142+
143+
#[cfg(feature = "build")]
144+
/// Format the first user-code frame as `"file:line:col"`, or `None` if unavailable.
145+
pub fn format_span(&self) -> Option<String> {
146+
let elem = self.elements().next()?;
147+
let file = elem.filename.as_ref()?;
148+
let line = elem.lineno?;
149+
let col = elem.colno.unwrap_or(0);
150+
Some(format!("{file}:{line}:{col}"))
151+
}
152+
153+
#[cfg(not(feature = "build"))]
154+
/// Format the first user-code frame as `"file:line:col"`, or `None` if unavailable.
155+
pub fn format_span(&self) -> Option<String> {
156+
None
157+
}
142158
}
143159

144160
#[cfg(feature = "build")]

hydro_lang/src/compile/ir/mod.rs

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ use backtrace::Backtrace;
4040
#[derive(Clone, Hash)]
4141
pub struct DebugExpr(pub Box<syn::Expr>);
4242

43+
impl serde::Serialize for DebugExpr {
44+
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45+
serializer.serialize_str(&self.to_string())
46+
}
47+
}
48+
4349
impl From<syn::Expr> for DebugExpr {
4450
fn from(expr: syn::Expr) -> Self {
4551
Self(Box::new(expr))
@@ -255,11 +261,49 @@ impl Debug for DebugType {
255261
}
256262
}
257263

264+
impl serde::Serialize for DebugType {
265+
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266+
serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267+
}
268+
}
269+
270+
fn serialize_backtrace_as_span<S: serde::Serializer>(
271+
backtrace: &Backtrace,
272+
serializer: S,
273+
) -> Result<S::Ok, S::Error> {
274+
match backtrace.format_span() {
275+
Some(span) => serializer.serialize_some(&span),
276+
None => serializer.serialize_none(),
277+
}
278+
}
279+
280+
fn serialize_ident<S: serde::Serializer>(
281+
ident: &syn::Ident,
282+
serializer: S,
283+
) -> Result<S::Ok, S::Error> {
284+
serializer.serialize_str(&ident.to_string())
285+
}
286+
258287
pub enum DebugInstantiate {
259288
Building,
260289
Finalized(Box<DebugInstantiateFinalized>),
261290
}
262291

292+
impl serde::Serialize for DebugInstantiate {
293+
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294+
match self {
295+
DebugInstantiate::Building => {
296+
serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297+
}
298+
DebugInstantiate::Finalized(_) => {
299+
panic!(
300+
"cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301+
)
302+
}
303+
}
304+
}
305+
}
306+
263307
#[cfg_attr(
264308
not(feature = "build"),
265309
expect(
@@ -310,7 +354,7 @@ impl Clone for DebugInstantiate {
310354
/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
311355
/// during code-gen they simply reference the tee output of the first node
312356
/// instead of creating a redundant `source_stream`.
313-
#[derive(Debug, Hash, Clone)]
357+
#[derive(Debug, Hash, Clone, serde::Serialize)]
314358
pub enum ClusterMembersState {
315359
/// Not yet instantiated.
316360
Uninit,
@@ -324,15 +368,15 @@ pub enum ClusterMembersState {
324368
}
325369

326370
/// A source in a Hydro graph, where data enters the graph.
327-
#[derive(Debug, Hash, Clone)]
371+
#[derive(Debug, Hash, Clone, serde::Serialize)]
328372
pub enum HydroSource {
329373
Stream(DebugExpr),
330374
ExternalNetwork(),
331375
Iter(DebugExpr),
332376
Spin(),
333377
ClusterMembers(LocationId, ClusterMembersState),
334-
Embedded(syn::Ident),
335-
EmbeddedSingleton(syn::Ident),
378+
Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379+
EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
336380
}
337381

338382
#[cfg(feature = "build")]
@@ -676,7 +720,7 @@ where
676720
/// An root in a Hydro graph, which is an pipeline that doesn't emit
677721
/// any downstream values. Traversals over the dataflow graph and
678722
/// generating DFIR IR start from roots.
679-
#[derive(Debug, Hash)]
723+
#[derive(Debug, Hash, serde::Serialize)]
680724
pub enum HydroRoot {
681725
ForEach {
682726
f: DebugExpr,
@@ -704,6 +748,7 @@ pub enum HydroRoot {
704748
op_metadata: HydroIrOpMetadata,
705749
},
706750
EmbeddedOutput {
751+
#[serde(serialize_with = "serialize_ident")]
707752
ident: syn::Ident,
708753
input: Box<HydroNode>,
709754
op_metadata: HydroIrOpMetadata,
@@ -1606,6 +1651,11 @@ pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
16061651
type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
16071652
thread_local! {
16081653
static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1654+
/// Tracks shared nodes already serialized so that `SharedNode::serialize`
1655+
/// emits the full subtree only once and uses a `"<shared N>"` back-reference
1656+
/// on subsequent encounters, preventing infinite loops.
1657+
static SERIALIZED_SHARED: PrintedTees
1658+
= const { RefCell::new(None) };
16091659
}
16101660

16111661
pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
@@ -1623,8 +1673,85 @@ pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
16231673
})
16241674
}
16251675

1676+
/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1677+
/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1678+
/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1679+
/// back-reference. The tracking state is restored when `f` returns or panics.
1680+
pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1681+
let _guard = SerializedSharedGuard::enter();
1682+
f()
1683+
}
1684+
1685+
/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1686+
/// making `serialize_dedup_shared` re-entrant and panic-safe.
1687+
struct SerializedSharedGuard {
1688+
previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1689+
}
1690+
1691+
impl SerializedSharedGuard {
1692+
fn enter() -> Self {
1693+
let previous = SERIALIZED_SHARED.with(|cell| {
1694+
let mut guard = cell.borrow_mut();
1695+
guard.replace((0, HashMap::new()))
1696+
});
1697+
Self { previous }
1698+
}
1699+
}
1700+
1701+
impl Drop for SerializedSharedGuard {
1702+
fn drop(&mut self) {
1703+
SERIALIZED_SHARED.with(|cell| {
1704+
*cell.borrow_mut() = self.previous.take();
1705+
});
1706+
}
1707+
}
1708+
16261709
pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
16271710

1711+
impl serde::Serialize for SharedNode {
1712+
/// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1713+
/// `Tee` / `Partition`). A naïve recursive serialization would revisit the
1714+
/// same subtree every time and, if the graph ever contains a cycle, loop
1715+
/// forever.
1716+
///
1717+
/// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1718+
/// integer id. The first time we see a pointer we assign it the next id and
1719+
/// emit the full subtree as `{"$shared": <id>, "node": …}`. Every later
1720+
/// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1721+
/// recursion. Requires an active `serialize_dedup_shared` scope.
1722+
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1723+
SERIALIZED_SHARED.with(|cell| {
1724+
let mut guard = cell.borrow_mut();
1725+
// (next_id, pointer → assigned_id)
1726+
let state = guard.as_mut().ok_or_else(|| {
1727+
serde::ser::Error::custom(
1728+
"SharedNode serialization requires an active serialize_dedup_shared scope",
1729+
)
1730+
})?;
1731+
let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1732+
1733+
if let Some(&id) = state.1.get(&ptr) {
1734+
drop(guard);
1735+
use serde::ser::SerializeMap;
1736+
let mut map = serializer.serialize_map(Some(1))?;
1737+
map.serialize_entry("$shared_ref", &id)?;
1738+
map.end()
1739+
} else {
1740+
let id = state.0;
1741+
state.0 += 1;
1742+
state.1.insert(ptr, id);
1743+
drop(guard);
1744+
1745+
use serde::ser::SerializeMap;
1746+
let mut map = serializer.serialize_map(Some(2))?;
1747+
map.serialize_entry("$shared", &id)?;
1748+
map.serialize_entry("node", &*self.0.borrow())?;
1749+
map.end()
1750+
}
1751+
})
1752+
}
1753+
}
1754+
16281755
impl SharedNode {
16291756
pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
16301757
Rc::as_ptr(&self.0)
@@ -1668,40 +1795,40 @@ impl Hash for SharedNode {
16681795
}
16691796
}
16701797

1671-
#[derive(Clone, PartialEq, Eq, Debug)]
1798+
#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
16721799
pub enum BoundKind {
16731800
Unbounded,
16741801
Bounded,
16751802
}
16761803

1677-
#[derive(Clone, PartialEq, Eq, Debug)]
1804+
#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
16781805
pub enum StreamOrder {
16791806
NoOrder,
16801807
TotalOrder,
16811808
}
16821809

1683-
#[derive(Clone, PartialEq, Eq, Debug)]
1810+
#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
16841811
pub enum StreamRetry {
16851812
AtLeastOnce,
16861813
ExactlyOnce,
16871814
}
16881815

1689-
#[derive(Clone, PartialEq, Eq, Debug)]
1816+
#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
16901817
pub enum KeyedSingletonBoundKind {
16911818
Unbounded,
16921819
MonotonicValue,
16931820
BoundedValue,
16941821
Bounded,
16951822
}
16961823

1697-
#[derive(Clone, PartialEq, Eq, Debug)]
1824+
#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
16981825
pub enum SingletonBoundKind {
16991826
Unbounded,
17001827
Monotonic,
17011828
Bounded,
17021829
}
17031830

1704-
#[derive(Clone, PartialEq, Eq, Debug)]
1831+
#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
17051832
pub enum CollectionKind {
17061833
Stream {
17071834
bound: BoundKind,
@@ -1755,7 +1882,7 @@ impl CollectionKind {
17551882
}
17561883
}
17571884

1758-
#[derive(Clone)]
1885+
#[derive(Clone, serde::Serialize)]
17591886
pub struct HydroIrMetadata {
17601887
pub location_id: LocationId,
17611888
pub collection_kind: CollectionKind,
@@ -1788,8 +1915,9 @@ impl Debug for HydroIrMetadata {
17881915

17891916
/// Metadata that is specific to the operator itself, rather than its outputs.
17901917
/// This is available on _both_ inner nodes and roots.
1791-
#[derive(Clone)]
1918+
#[derive(Clone, serde::Serialize)]
17921919
pub struct HydroIrOpMetadata {
1920+
#[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
17931921
pub backtrace: Backtrace,
17941922
pub cpu_usage: Option<f64>,
17951923
pub network_recv_cpu_usage: Option<f64>,
@@ -1827,7 +1955,7 @@ impl Hash for HydroIrOpMetadata {
18271955

18281956
/// An intermediate node in a Hydro graph, which consumes data
18291957
/// from upstream nodes and emits data to downstream nodes.
1830-
#[derive(Debug, Hash)]
1958+
#[derive(Debug, Hash, serde::Serialize)]
18311959
pub enum HydroNode {
18321960
Placeholder,
18331961

@@ -2073,6 +2201,7 @@ pub enum HydroNode {
20732201
from_port_id: ExternalPortId,
20742202
from_many: bool,
20752203
codec_type: DebugType,
2204+
#[serde(skip)]
20762205
port_hint: NetworkHint,
20772206
instantiate_fn: DebugInstantiate,
20782207
deserialize_fn: Option<DebugExpr>,
@@ -4603,6 +4732,9 @@ where
46034732
(sink, source, connect_fn)
46044733
}
46054734

4735+
#[cfg(test)]
4736+
mod serde_test;
4737+
46064738
#[cfg(test)]
46074739
mod test {
46084740
use std::mem::size_of;

0 commit comments

Comments
 (0)