Skip to content

Commit 4983720

Browse files
committed
feat(hydro_test): add Kafka example with producer/consumer module
- Add hydro_test/src/kafka/mod.rs with kafka_producer, kafka_consumer, dest_kafka, and setup_topic helpers following the SQS PR #2746 pattern - Complete hydro_test/examples/kafka.rs: leader produces 1M financial transactions, consumer cluster computes per-account balances - Add 'kafka' feature flag gating rdkafka and futures-util as optional deps - Add llvm-tools component to rust-toolchain.toml for rust-lld
1 parent 79e9d3e commit 4983720

6 files changed

Lines changed: 342 additions & 10 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hydro_test/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ docker = ["hydro_lang/docker_deploy"]
1212
ecs = ["hydro_lang/ecs_deploy"]
1313
maelstrom = ["hydro_lang/maelstrom"]
1414
stageleft_macro_entrypoint = ["hydro_lang/stageleft_macro_entrypoint"]
15+
kafka = ["dep:rdkafka", "dep:futures-util"]
16+
17+
[[example]]
18+
name = "kafka"
19+
required-features = ["kafka"]
1520

1621
[dependencies]
1722
hydro_lang = { path = "../hydro_lang", version = "^0.15.0" }
@@ -33,7 +38,8 @@ bytes = "1.1.0"
3338
# https://github.com/GitoxideLabs/cargo-smart-release/issues/36
3439
example_test = { path = "../example_test", version = "^0.0.0", optional = true }
3540
hydro_build_utils = { path = "../hydro_build_utils", version = "^0.0.1", optional = true }
36-
rdkafka = "0.39.0"
41+
rdkafka = { version = "0.39.0", optional = true }
42+
futures-util = { version = "0.3.0", default-features = false, features = ["alloc"], optional = true }
3743

3844
[build-dependencies]
3945
stageleft_tool.workspace = true

hydro_test/examples/kafka.rs

Lines changed: 178 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,181 @@
1-
fn main() {
2-
// [Leader]
3-
// If it already exists, delete the "financial_transactions" topic
4-
// Create a new topic named "financial_transactions" with 10 partitions.
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
53

6-
// Produce 1 million transactions into the financial_transactions topic, spread evenly
7-
// across the 10 partitions
4+
use clap::{ArgAction, Parser};
5+
use hydro_deploy::gcp::GcpNetwork;
6+
use hydro_deploy::{AwsNetwork, Deployment, Host};
7+
use hydro_lang::deploy::TrybuildHost;
8+
use hydro_lang::live_collections::stream::{ExactlyOnce, TotalOrder};
9+
use hydro_lang::location::Location;
10+
use hydro_lang::nondet::nondet;
11+
use hydro_lang::viz::config::GraphConfig;
12+
use hydro_test::kafka::{dest_kafka, kafka_consumer, kafka_producer, setup_topic};
13+
use stageleft::q;
814

9-
// [Consumers]
10-
// Once the cluster of consumers receives the go-ahead from the leader stream, it may then
11-
// begin to consume the financial transactions and compute the balance of each bank account
15+
type HostCreator = Box<dyn Fn(&mut Deployment) -> Arc<dyn Host>>;
16+
17+
const TOPIC: &str = "financial_transactions";
18+
const NUM_PARTITIONS: i32 = 10;
19+
const NUM_TRANSACTIONS: usize = 1_000_000;
20+
const NUM_CONSUMERS: usize = 3;
21+
22+
// cargo run -p hydro_test --example kafka --features kafka -- --brokers 'localhost:9092'
23+
#[derive(Parser, Debug)]
24+
#[command(group(
25+
clap::ArgGroup::new("cloud")
26+
.args(&["gcp", "aws"])
27+
.multiple(false)
28+
))]
29+
struct Args {
30+
#[clap(flatten)]
31+
graph: GraphConfig,
32+
33+
/// Use GCP for deployment (provide project name)
34+
#[arg(long)]
35+
gcp: Option<String>,
36+
37+
/// Use AWS, make sure credentials are set up
38+
#[arg(long, action = ArgAction::SetTrue)]
39+
aws: bool,
40+
41+
/// Kafka bootstrap servers
42+
#[arg(long, default_value = "localhost:9092")]
43+
brokers: String,
44+
}
45+
46+
enum Leader {}
47+
enum Consumer {}
48+
49+
#[tokio::main]
50+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
51+
let args = Args::parse();
52+
let mut deployment = Deployment::new();
53+
54+
let create_host: HostCreator = if let Some(project) = &args.gcp {
55+
let network = GcpNetwork::new(project, None);
56+
let project = project.clone();
57+
58+
Box::new(move |deployment| -> Arc<dyn Host> {
59+
deployment
60+
.GcpComputeEngineHost()
61+
.project(&project)
62+
.machine_type("e2-micro")
63+
.image("debian-cloud/debian-11")
64+
.region("us-west1-a")
65+
.network(network.clone())
66+
.add()
67+
})
68+
} else if args.aws {
69+
let region = "us-east-1";
70+
let network = AwsNetwork::new(region, None);
71+
72+
Box::new(move |deployment| -> Arc<dyn Host> {
73+
deployment
74+
.AwsEc2Host()
75+
.region(region)
76+
.instance_type("t3.micro")
77+
.ami("ami-0e95a5e2743ec9ec9") // Amazon Linux 2
78+
.network(network.clone())
79+
.add()
80+
})
81+
} else {
82+
let localhost = deployment.Localhost();
83+
Box::new(move |_| -> Arc<dyn Host> { localhost.clone() })
84+
};
85+
86+
// [Leader] Setup topic and produce transactions
87+
setup_topic(&args.brokers, TOPIC, NUM_PARTITIONS).await;
88+
println!("Topic '{}' created with {} partitions", TOPIC, NUM_PARTITIONS);
89+
90+
let mut flow = hydro_lang::compile::builder::FlowBuilder::new();
91+
let leader = flow.process::<Leader>();
92+
let consumers = flow.cluster::<Consumer>();
93+
94+
// Leader: produce 1M transactions spread across 10 partitions.
95+
// Each transaction is (account_id, amount) serialized as key=account, value=amount.
96+
{
97+
let producer = kafka_producer(&leader, &args.brokers);
98+
let transactions = leader.source_iter(q!({
99+
(0..NUM_TRANSACTIONS).map(|i| {
100+
let account = format!("account_{}", i % 100);
101+
let amount = format!("{}", (i % 201) as i64 - 100); // range [-100, 100]
102+
(account, amount)
103+
})
104+
}));
105+
dest_kafka(producer, transactions, TOPIC);
106+
}
107+
108+
// Consumers: read from topic and compute per-account balances.
109+
{
110+
let messages = kafka_consumer(&consumers, &args.brokers, "kafka_example_consumers", TOPIC)
111+
.assume_ordering::<TotalOrder>(
112+
nondet!(/** Safe: side effect is only printing final balances. */),
113+
)
114+
.assume_retries::<ExactlyOnce>(
115+
nondet!(/** Safe: side effect is only printing final balances. */),
116+
)
117+
.filter_map(q!(|msg| {
118+
let key = rdkafka::Message::key(&msg)
119+
.map(|k| String::from_utf8_lossy(k).to_string())?;
120+
let value = rdkafka::Message::payload(&msg)
121+
.map(|v| String::from_utf8_lossy(v).to_string())?;
122+
let amount: i64 = value.parse().ok()?;
123+
Some((key, amount))
124+
}))
125+
.fold(
126+
q!(|| HashMap::<String, i64>::new()),
127+
q!(|balances, (account, amount)| {
128+
*balances.entry(account).or_insert(0) += amount;
129+
}),
130+
);
131+
132+
messages.into_stream().for_each(q!(|balances: HashMap<String, i64>| {
133+
println!("Final balances ({} accounts):", balances.len());
134+
let mut sorted: Vec<_> = balances.into_iter().collect();
135+
sorted.sort_by(|a, b| a.0.cmp(&b.0));
136+
for (account, balance) in sorted.iter().take(10) {
137+
println!(" {}: {}", account, balance);
138+
}
139+
if sorted.len() > 10 {
140+
println!(" ... and {} more accounts", sorted.len() - 10);
141+
}
142+
}));
143+
}
144+
145+
// Extract the IR BEFORE the builder is consumed by deployment methods
146+
let built = flow.finalize();
147+
148+
// Generate graph visualizations based on command line arguments
149+
built.generate_graph_with_config(&args.graph, None)?;
150+
151+
// If we're just generating a graph file, exit early
152+
if args.graph.should_exit_after_graph_generation() {
153+
return Ok(());
154+
}
155+
156+
// Now use the built flow for deployment with optimization
157+
let _nodes = built
158+
.with_default_optimize()
159+
.with_process(
160+
&leader,
161+
TrybuildHost::new(create_host(&mut deployment))
162+
.features(vec!["kafka".to_owned()]),
163+
)
164+
.with_cluster(
165+
&consumers,
166+
(0..NUM_CONSUMERS).map(|_| {
167+
TrybuildHost::new(create_host(&mut deployment))
168+
.features(vec!["kafka".to_owned()])
169+
}),
170+
)
171+
.deploy(&mut deployment);
172+
173+
deployment.deploy().await.unwrap();
174+
deployment.start().await.unwrap();
175+
176+
println!("Running Kafka financial transactions example...");
177+
println!("Press Ctrl+C to stop.");
178+
179+
tokio::signal::ctrl_c().await.unwrap();
180+
Ok(())
12181
}

hydro_test/src/kafka/mod.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::future::Future;
2+
3+
use hydro_lang::live_collections::boundedness::Boundedness;
4+
use hydro_lang::live_collections::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Ordering};
5+
use hydro_lang::location::tick::{NoAtomic, NoTick};
6+
use hydro_lang::location::Location;
7+
use hydro_lang::prelude::*;
8+
use rdkafka::message::OwnedMessage;
9+
use rdkafka::producer::FutureProducer;
10+
11+
#[ctor::ctor]
12+
fn init_rewrites() {
13+
stageleft::add_private_reexport(
14+
vec!["rdkafka", "producer", "future_producer"],
15+
vec!["rdkafka", "producer"],
16+
);
17+
stageleft::add_private_reexport(
18+
vec!["rdkafka", "consumer", "stream_consumer"],
19+
vec!["rdkafka", "consumer"],
20+
);
21+
stageleft::add_private_reexport(
22+
vec!["rdkafka", "message", "owned_message"],
23+
vec!["rdkafka", "message"],
24+
);
25+
stageleft::add_private_reexport(
26+
vec!["futures_util", "stream", "stream"],
27+
vec!["futures_util", "stream"],
28+
);
29+
stageleft::add_private_reexport(
30+
vec!["futures_util", "stream", "unfold"],
31+
vec!["futures_util", "stream"],
32+
);
33+
}
34+
35+
/// Creates a Kafka `FutureProducer` singleton.
36+
pub fn kafka_producer<'a, Loc>(
37+
location: &Loc,
38+
brokers: &'a str,
39+
) -> Singleton<FutureProducer, Loc, Bounded>
40+
where
41+
Loc: Location<'a> + NoTick + NoAtomic,
42+
{
43+
location.singleton(q!({
44+
rdkafka::config::ClientConfig::new()
45+
.set("bootstrap.servers", brokers)
46+
.create::<rdkafka::producer::FutureProducer>()
47+
.expect("Failed to create Kafka producer")
48+
}))
49+
}
50+
51+
/// Consumes messages from a Kafka topic. Returns at-least-once, unordered delivery.
52+
pub fn kafka_consumer<'a, Loc>(
53+
location: &Loc,
54+
brokers: &'a str,
55+
group_id: &'a str,
56+
topic: &'a str,
57+
) -> Stream<OwnedMessage, Loc, Bounded, NoOrder, AtLeastOnce>
58+
where
59+
Loc: Location<'a> + NoTick + NoAtomic,
60+
{
61+
location
62+
.singleton(q!({
63+
let consumer: rdkafka::consumer::StreamConsumer =
64+
rdkafka::config::ClientConfig::new()
65+
.set("bootstrap.servers", brokers)
66+
.set("group.id", group_id)
67+
.set("auto.offset.reset", "earliest")
68+
.create()
69+
.expect("Failed to create Kafka consumer");
70+
rdkafka::consumer::Consumer::subscribe(&consumer, &[topic])
71+
.expect("Failed to subscribe to topic");
72+
std::sync::Arc::new(consumer)
73+
}))
74+
.into_stream()
75+
.flat_map_stream_blocking(q!(
76+
|consumer: std::sync::Arc<rdkafka::consumer::StreamConsumer>| {
77+
futures_util::stream::unfold(consumer, |consumer| async move {
78+
loop {
79+
match rdkafka::consumer::StreamConsumer::recv(&*consumer).await {
80+
Ok(msg) => {
81+
return Some((rdkafka::message::BorrowedMessage::detach(&msg), consumer));
82+
}
83+
Err(e) => {
84+
eprintln!("Kafka consumer error: {}", e);
85+
continue;
86+
}
87+
}
88+
}
89+
})
90+
}
91+
))
92+
.weaken_retries()
93+
.weaken_ordering()
94+
}
95+
96+
/// Sends `(key, payload)` pairs to a Kafka topic.
97+
pub fn dest_kafka<'a, Loc, Bound: Boundedness, Order: Ordering>(
98+
producer: Singleton<FutureProducer, Loc, Bounded>,
99+
input: Stream<(String, String), Loc, Bound, Order, ExactlyOnce>,
100+
topic: &'a str,
101+
) where
102+
Loc: Location<'a>,
103+
{
104+
input
105+
.cross_singleton(producer)
106+
.map(q!(
107+
|((key, payload), producer)| self::kafka_send(producer, topic, key, payload)
108+
))
109+
.resolve_futures_blocking();
110+
}
111+
112+
fn kafka_send(
113+
producer: FutureProducer,
114+
topic: &str,
115+
key: String,
116+
payload: String,
117+
) -> impl Future<Output = ()> {
118+
let topic = topic.to_owned();
119+
async move {
120+
let record = rdkafka::producer::FutureRecord::to(&topic)
121+
.key(&key)
122+
.payload(&payload);
123+
producer
124+
.send(record, rdkafka::util::Timeout::Never)
125+
.await
126+
.expect("Failed to send message to Kafka");
127+
}
128+
}
129+
130+
/// Admin helper: delete topic if it exists, then create it with the given number of partitions.
131+
pub async fn setup_topic(brokers: &str, topic: &str, num_partitions: i32) {
132+
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
133+
use rdkafka::config::ClientConfig;
134+
135+
let admin: AdminClient<rdkafka::client::DefaultClientContext> = ClientConfig::new()
136+
.set("bootstrap.servers", brokers)
137+
.create()
138+
.expect("Failed to create Kafka admin client");
139+
140+
let opts = AdminOptions::new();
141+
142+
// Delete topic if it exists (ignore errors if it doesn't exist)
143+
let _ = admin.delete_topics(&[topic], &opts).await;
144+
// Brief pause to let deletion propagate
145+
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
146+
147+
let new_topic = NewTopic::new(topic, num_partitions, TopicReplication::Fixed(1));
148+
admin
149+
.create_topics(&[new_topic], &opts)
150+
.await
151+
.expect("Failed to create Kafka topic");
152+
}

hydro_test/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pub mod local;
99
pub mod maelstrom;
1010
pub mod tutorials;
1111

12+
#[cfg(feature = "kafka")]
13+
pub mod kafka;
14+
1215
#[doc(hidden)]
1316
#[cfg(doctest)]
1417
mod docs {

rust-toolchain.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ components = [
55
"clippy",
66
# https://github.com/dtolnay/trybuild?tab=readme-ov-file#troubleshooting
77
"rust-src",
8+
"llvm-tools",
89
]
910
targets = ["wasm32-unknown-unknown", "x86_64-unknown-linux-musl"]

0 commit comments

Comments
 (0)