Skip to content

Commit 6e66c41

Browse files
committed
feat(kafka): Example of consuming data from Kafka
1 parent 2ac96bd commit 6e66c41

16 files changed

Lines changed: 2466 additions & 57 deletions

File tree

.agents/summary/architecture.md

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
# Architecture
2+
3+
## Layered Architecture
4+
5+
Hydro follows a layered compiler architecture where high-level distributed programs are progressively lowered through intermediate representations to executable dataflow graphs.
6+
7+
```mermaid
8+
graph TB
9+
subgraph "User Code"
10+
A["Hydro DSL<br/>(hydro_lang)"]
11+
end
12+
13+
subgraph "High-Level Layer"
14+
B["HydroNode IR<br/>(compile/ir)"]
15+
C["Network Resolution<br/>(compile_network)"]
16+
end
17+
18+
subgraph "Compilation"
19+
D["FlatGraphBuilder<br/>(dfir_lang)"]
20+
E["DfirGraph<br/>(partitioned)"]
21+
F["Rust TokenStream<br/>(codegen)"]
22+
end
23+
24+
subgraph "Runtime"
25+
G["Dfir Scheduler<br/>(dfir_rs)"]
26+
H["Subgraphs + Handoffs"]
27+
I["Pull/Push Operators<br/>(dfir_pipes)"]
28+
end
29+
30+
subgraph "Deployment"
31+
J["Deploy Backends<br/>(hydro_deploy)"]
32+
end
33+
34+
A -->|"staged compilation<br/>(stageleft q!)"| B
35+
B -->|"compile_network()"| C
36+
C -->|"emit()"| D
37+
D -->|"partition_graph()"| E
38+
E -->|"as_code()"| F
39+
F -->|"compile + deploy"| G
40+
G --> H
41+
H --> I
42+
J -.->|"provisions hosts<br/>wires network"| G
43+
```
44+
45+
## Core Design Principles
46+
47+
### 1. Staged Programming (stageleft)
48+
49+
User code is written using the `q!(...)` quoting macro from `stageleft`. Expressions inside `q!()` are captured as AST at compile time and emitted into generated DFIR code for each deployment location. This enables:
50+
- Type-safe code generation without string templates
51+
- IDE support (autocomplete, type checking) for distributed programs
52+
- Zero-cost abstractions — generated code is monomorphized Rust
53+
54+
### 2. Type-Level Distributed Safety
55+
56+
The type system encodes distributed properties as phantom type parameters:
57+
58+
```mermaid
59+
classDiagram
60+
class Stream~T, Loc, Bound, Order, Retry~ {
61+
+map(f) Stream
62+
+filter(f) Stream
63+
+fold(init, f) Singleton
64+
+send_bincode(dest) Stream
65+
}
66+
67+
class Boundedness {
68+
<<trait>>
69+
}
70+
class Bounded
71+
class Unbounded
72+
73+
class Ordering {
74+
<<trait>>
75+
}
76+
class TotalOrder
77+
class NoOrder
78+
79+
class Retries {
80+
<<trait>>
81+
}
82+
class ExactlyOnce
83+
class AtLeastOnce
84+
85+
Boundedness <|-- Bounded
86+
Boundedness <|-- Unbounded
87+
Ordering <|-- TotalOrder
88+
Ordering <|-- NoOrder
89+
Retries <|-- ExactlyOnce
90+
Retries <|-- AtLeastOnce
91+
92+
Stream --> Boundedness
93+
Stream --> Ordering
94+
Stream --> Retries
95+
```
96+
97+
When data crosses network boundaries, the type parameters are weakened based on transport properties (e.g., TCP lossy → `NoOrder`, `AtLeastOnce`). Operations like `fold` on unordered streams require algebraic property proofs (commutativity, idempotence).
98+
99+
### 3. Location-Typed Computation
100+
101+
Every computation is associated with a `Location` — a typed representation of where code runs:
102+
103+
```mermaid
104+
graph TB
105+
subgraph "Top-Level Locations"
106+
P["Process&lt;'a, Tag&gt;<br/>Single machine"]
107+
C["Cluster&lt;'a, Tag&gt;<br/>Group of machines"]
108+
E["External&lt;'a, Tag&gt;<br/>External I/O"]
109+
end
110+
111+
subgraph "Nested Locations"
112+
T["Tick&lt;L&gt;<br/>Clock domain"]
113+
AT["Atomic&lt;L&gt;<br/>Atomicity wrapper"]
114+
end
115+
116+
P --> T
117+
C --> T
118+
T --> AT
119+
```
120+
121+
Phantom tag types (e.g., `Process<'a, Leader>` vs `Process<'a, Follower>`) distinguish locations at the type level, preventing accidental cross-location data access.
122+
123+
### 4. Lattice-Based CRDTs
124+
125+
The `lattices` crate provides algebraic types where merge is associative, commutative, and idempotent — the foundation for conflict-free replicated data types:
126+
127+
```mermaid
128+
graph TB
129+
L["Lattice trait<br/>(Merge + LatticeOrd + IsBot + IsTop)"]
130+
L --> MN["Min&lt;T&gt;"]
131+
L --> MX["Max&lt;T&gt;"]
132+
L --> SU["SetUnion&lt;S&gt;"]
133+
L --> MU["MapUnion&lt;M&gt;"]
134+
L --> P["Pair&lt;A, B&gt;"]
135+
L --> DP["DomPair&lt;K, V&gt;"]
136+
L --> WB["WithBot&lt;L&gt;"]
137+
L --> WT["WithTop&lt;L&gt;"]
138+
L --> CF["Conflict&lt;T&gt;"]
139+
L --> UF["UnionFind"]
140+
```
141+
142+
### 5. Push-Pull Dataflow Execution
143+
144+
The DFIR runtime uses a hybrid push-pull execution model within subgraphs:
145+
146+
```mermaid
147+
graph LR
148+
subgraph "Pull Region"
149+
S1["source_iter"] --> M1["map"] --> F1["filter"]
150+
end
151+
152+
subgraph "Handoff"
153+
H["VecHandoff<br/>(double-buffered)"]
154+
end
155+
156+
subgraph "Push Region"
157+
FE["for_each"]
158+
end
159+
160+
F1 --> H --> FE
161+
```
162+
163+
- **Pull operators** are composed as Rust iterators (lazy evaluation)
164+
- **Push operators** are composed as closure chains (eager evaluation)
165+
- **Handoffs** (double-buffered `Vec<T>`) connect subgraphs across push-pull boundaries
166+
167+
### 6. Stratified Scheduling
168+
169+
The runtime executes subgraphs in strata to handle non-monotonic operations correctly:
170+
171+
```mermaid
172+
sequenceDiagram
173+
participant S0 as Stratum 0<br/>(sources)
174+
participant S1 as Stratum 1<br/>(monotone ops)
175+
participant S2 as Stratum 2<br/>(non-monotone ops)
176+
participant Loop as Loop Block
177+
178+
Note over S0,S2: Tick N
179+
S0->>S1: data via handoffs
180+
S1->>S1: run to fixpoint
181+
S1->>S2: barrier (Stratum delay)
182+
S2->>S2: run to fixpoint
183+
184+
Note over Loop: Loop iteration
185+
Loop->>Loop: repeat until no new data
186+
187+
Note over S0,S2: Tick N+1
188+
S0->>S1: new external events
189+
```
190+
191+
## Compilation Pipeline Detail
192+
193+
```mermaid
194+
flowchart LR
195+
subgraph "Stage 1: Build"
196+
FB["FlowBuilder<br/>create locations<br/>build IR trees"]
197+
end
198+
199+
subgraph "Stage 2: Finalize"
200+
BF["BuiltFlow<br/>collect IR roots<br/>optimize"]
201+
end
202+
203+
subgraph "Stage 3: Deploy Spec"
204+
DF["DeployFlow<br/>assign hosts<br/>resolve network"]
205+
end
206+
207+
subgraph "Stage 4: Compile"
208+
CF["CompiledFlow<br/>emit DFIR graphs<br/>generate binaries"]
209+
end
210+
211+
subgraph "Stage 5: Run"
212+
DR["DeployResult<br/>provision + launch"]
213+
end
214+
215+
FB -->|"finalize()"| BF
216+
BF -->|"with_process/cluster()"| DF
217+
DF -->|"compile()"| CF
218+
CF -->|"deploy()"| DR
219+
```
220+
221+
Each stage is a distinct type (`FlowBuilder``BuiltFlow``DeployFlow``CompiledFlow``DeployResult`), enforcing correct ordering via Rust's type system.
222+
223+
## Deployment Architecture
224+
225+
```mermaid
226+
graph TB
227+
subgraph "Deploy Trait"
228+
DT["Deploy&lt;'a&gt;<br/>o2o/o2m/m2o/m2m sink_source<br/>cluster_ids, cluster_self_id"]
229+
end
230+
231+
subgraph "Backends"
232+
LH["LocalhostHost"]
233+
GCP["GcpComputeEngineHost"]
234+
AZ["AzureHost"]
235+
AWS["AwsEc2Host"]
236+
DK["Docker"]
237+
ECS["ECS"]
238+
EM["Embedded"]
239+
SIM["Simulator"]
240+
ML["Maelstrom"]
241+
end
242+
243+
DT --> LH
244+
DT --> GCP
245+
DT --> AZ
246+
DT --> AWS
247+
DT --> DK
248+
DT --> ECS
249+
DT --> EM
250+
DT --> SIM
251+
DT --> ML
252+
```
253+
254+
The `Deploy<'a>` trait abstracts over deployment targets. Each backend implements host provisioning, binary compilation/copying, network wiring, and service lifecycle management.
255+
256+
## Non-Determinism Tracking
257+
258+
Hydro requires explicit documentation of every non-determinism source via the `NonDet` type and `nondet!` macro:
259+
260+
```rust
261+
// Must explain WHY this is non-deterministic
262+
let timer = process.source_interval(nondet!(
263+
/// Timer for heartbeat — ordering of heartbeats relative to
264+
/// other messages is non-deterministic
265+
Duration::from_secs(1)
266+
));
267+
```
268+
269+
APIs like `source_interval()`, `batch()`, `sample_every()` require a `NonDet` parameter. The `nondet!` macro enforces a doc-comment explanation.
270+
271+
## Algebraic Property System
272+
273+
For operations on unordered/at-least-once streams, the type system requires proofs:
274+
275+
```mermaid
276+
graph LR
277+
subgraph "Properties"
278+
CP["CommutativeProof"]
279+
IP["IdempotentProof"]
280+
MP["MonotoneProof"]
281+
OP["OrderPreservingProof"]
282+
end
283+
284+
subgraph "Validation"
285+
VO["ValidCommutativityFor&lt;Order&gt;"]
286+
VI["ValidIdempotenceFor&lt;Retry&gt;"]
287+
end
288+
289+
CP --> VO
290+
IP --> VI
291+
292+
subgraph "Usage"
293+
F["stream.fold(init, f, algebra)"]
294+
end
295+
296+
VO --> F
297+
VI --> F
298+
```
299+
300+
- `NotProved` commutativity is valid for `TotalOrder` streams (order guaranteed)
301+
- `NotProved` commutativity requires `Proved` for `NoOrder` streams
302+
- `ManualProof` + `manual_proof!` macro allows human-written justifications

0 commit comments

Comments
 (0)