Skip to content

Commit ff3c338

Browse files
committed
feat(kafka): Use separate flags for produce and consume for cleaner time benchmarks
1 parent 6e66c41 commit ff3c338

2 files changed

Lines changed: 142 additions & 114 deletions

File tree

hydro_test/examples/kafka.rs

Lines changed: 107 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type HostCreator = Box<dyn Fn(&mut Deployment) -> Arc<dyn Host>>;
1616

1717
const TOPIC_PREFIX: &str = "financial_transactions";
1818
const NUM_PARTITIONS: i32 = 10;
19-
const NUM_TRANSACTIONS: usize = 100_000;
19+
const NUM_TRANSACTIONS: usize = 10_000;
2020
const NUM_CONSUMERS: usize = 3;
2121

2222
// cargo run -p hydro_test --example kafka --features kafka -- --brokers 'localhost:9092'
@@ -45,6 +45,14 @@ struct Args {
4545
/// Kafka security protocol (plaintext or SSL for MSK)
4646
#[arg(long, default_value = "SSL")]
4747
security_protocol: String,
48+
49+
/// Run mode: "produce" (produce only, prints topic name), "consume" (consume only, requires --topic), or "both" (default)
50+
#[arg(long, default_value = "both")]
51+
mode: String,
52+
53+
/// Topic name for consume-only mode (use the topic printed by a produce run)
54+
#[arg(long)]
55+
topic: Option<String>,
4856
}
4957

5058
enum Leader {}
@@ -95,16 +103,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
95103
Box::new(move |_| -> Arc<dyn Host> { localhost.clone() })
96104
};
97105

98-
// Use a unique topic name per run to avoid stale messages from previous runs.
99-
let topic = format!("{}_{}", TOPIC_PREFIX, std::process::id());
106+
let produce = args.mode == "produce" || args.mode == "both";
107+
let consume = args.mode == "consume" || args.mode == "both";
108+
109+
// For consume-only, require --topic; otherwise generate a unique one.
110+
let topic = if let Some(t) = &args.topic {
111+
t.clone()
112+
} else {
113+
format!("{}_{}", TOPIC_PREFIX, std::process::id())
114+
};
100115

101116
let mut flow = hydro_lang::compile::builder::FlowBuilder::new();
102117
let leader = flow.process::<Leader>();
103118
let consumers = flow.cluster::<Consumer>();
104119

105120
// Leader: produce transactions spread across partitions.
106-
// Each transaction is (account_id, amount) serialized as key=account, value=amount.
107-
{
121+
if produce {
108122
let producer = kafka_producer(
109123
&leader,
110124
&args.brokers,
@@ -119,21 +133,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
119133
(account, amount)
120134
})
121135
}));
122-
dest_kafka(producer, transactions, &topic);
123-
// Sentinel so the runner knows when producing is done.
124-
leader
125-
.source_iter(q!(std::iter::once("PRODUCE_DONE".to_string())))
126-
.for_each(q!(|msg| println!("{}", msg)));
136+
let sent = dest_kafka(producer, transactions, &topic);
137+
sent.for_each(q!({
138+
let count = std::cell::Cell::new(0usize);
139+
move |producer| {
140+
let c = count.get() + 1;
141+
count.set(c);
142+
if c >= NUM_TRANSACTIONS {
143+
rdkafka::producer::Producer::flush(&*producer, std::time::Duration::from_secs(30))
144+
.expect("Failed to flush producer");
145+
println!("PRODUCE_DONE {}", c);
146+
}
147+
}
148+
}));
127149
}
128150

129-
// Consumers: read from topic and compute per-account balances.
130-
{
131-
let messages = kafka_consumer(&consumers, &args.brokers, "kafka_example_consumers", &topic, &args.security_protocol)
151+
// Consumers: read from topic and print each message.
152+
if consume {
153+
let _messages = kafka_consumer(&consumers, &args.brokers, "kafka_example_consumers", &topic, &args.security_protocol)
132154
.assume_ordering::<TotalOrder>(
133-
nondet!(/** Safe: side effect is only printing final balances. */),
155+
nondet!(/** Safe: side effect is only printing. */),
134156
)
135157
.assume_retries::<ExactlyOnce>(
136-
nondet!(/** Safe: side effect is only printing final balances. */),
158+
nondet!(/** Safe: side effect is only printing. */),
137159
)
138160
.filter_map(q!(|msg| {
139161
let key = rdkafka::Message::key(&msg)
@@ -156,90 +178,97 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
156178
return Ok(());
157179
}
158180

159-
// Now use the built flow for deployment with optimization
160-
let nodes = built
161-
.with_default_optimize()
162-
.with_process(
163-
&leader,
181+
// Deploy
182+
let mut hosts_builder = built.with_default_optimize();
183+
hosts_builder = hosts_builder.with_process(
184+
&leader,
185+
TrybuildHost::new(create_host(&mut deployment))
186+
.features(vec!["kafka".to_owned()]),
187+
);
188+
hosts_builder = hosts_builder.with_cluster(
189+
&consumers,
190+
(0..NUM_CONSUMERS).map(|_| {
164191
TrybuildHost::new(create_host(&mut deployment))
165-
.features(vec!["kafka".to_owned()]),
166-
)
167-
.with_cluster(
168-
&consumers,
169-
(0..NUM_CONSUMERS).map(|_| {
170-
TrybuildHost::new(create_host(&mut deployment))
171-
.features(vec!["kafka".to_owned()])
172-
}),
173-
)
174-
.deploy(&mut deployment);
192+
.features(vec!["kafka".to_owned()])
193+
}),
194+
);
195+
let nodes = hosts_builder.deploy(&mut deployment);
175196

176197
deployment.deploy().await.unwrap();
177198
deployment.start().await.unwrap();
178199

179-
// Subscribe to stdout from all deployed nodes and count messages.
200+
println!("Running Kafka example (mode={}, topic={topic}, {NUM_TRANSACTIONS} messages)...", args.mode);
201+
180202
let start = std::time::Instant::now();
181203
let total = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
182204
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
183205
let (produce_done_tx, produce_done_rx) = tokio::sync::oneshot::channel::<()>();
184206
{
185207
use hydro_lang::deploy::DeployCrateWrapper;
186208

187-
let leader_node = nodes.get_process(&leader);
188-
let mut leader_out = leader_node.stdout();
189-
let produce_done_tx = std::sync::Mutex::new(Some(produce_done_tx));
190-
tokio::spawn(async move {
191-
while let Some(line) = leader_out.recv().await {
192-
if line.trim() == "PRODUCE_DONE" {
193-
if let Some(tx) = produce_done_tx.lock().unwrap().take() {
194-
let _ = tx.send(());
195-
}
196-
} else {
197-
println!("[Leader] {line}");
198-
}
199-
}
200-
});
201-
202-
for (i, member) in nodes.get_cluster(&consumers).members().into_iter().enumerate() {
203-
let mut member_out = member.stdout();
204-
let total = total.clone();
205-
let done_tx = done_tx.clone();
209+
if produce {
210+
let leader_node = nodes.get_process(&leader);
211+
let mut leader_out = leader_node.stdout();
212+
let produce_done_tx = std::sync::Mutex::new(Some(produce_done_tx));
206213
tokio::spawn(async move {
207-
while let Some(_line) = member_out.recv().await {
208-
let t = total.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
209-
if t % 10_000 == 0 {
210-
println!("[Consumer {i}] ... {t} total messages consumed so far");
211-
}
212-
if t >= NUM_TRANSACTIONS {
213-
let _ = done_tx.send(()).await;
214-
return;
214+
while let Some(line) = leader_out.recv().await {
215+
if line.starts_with("PRODUCE_DONE") {
216+
if let Some(tx) = produce_done_tx.lock().unwrap().take() {
217+
let _ = tx.send(());
218+
}
219+
} else {
220+
println!("[Leader] {line}");
215221
}
216222
}
217223
});
218224
}
225+
226+
if consume {
227+
for (i, member) in nodes.get_cluster(&consumers).members().into_iter().enumerate() {
228+
let mut member_out = member.stdout();
229+
let total = total.clone();
230+
let done_tx = done_tx.clone();
231+
tokio::spawn(async move {
232+
while let Some(_line) = member_out.recv().await {
233+
let t = total.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
234+
if t % 1_000 == 0 {
235+
println!("[Consumer {i}] ... {t} total messages consumed so far");
236+
}
237+
if t >= NUM_TRANSACTIONS {
238+
let _ = done_tx.send(()).await;
239+
return;
240+
}
241+
}
242+
});
243+
}
244+
}
219245
}
220246
drop(done_tx);
221247

222-
println!("Running Kafka financial transactions example ({NUM_TRANSACTIONS} messages)...");
248+
if produce {
249+
let _ = produce_done_rx.await;
250+
let produce_elapsed = start.elapsed();
251+
println!(
252+
"Produce: {NUM_TRANSACTIONS} messages in {:.2?} ({:.0} msgs/sec)",
253+
produce_elapsed,
254+
NUM_TRANSACTIONS as f64 / produce_elapsed.as_secs_f64()
255+
);
256+
if !consume {
257+
println!("Topic: {topic}");
258+
println!("Run consume with: --mode consume --topic {topic}");
259+
return Ok(());
260+
}
261+
}
223262

224-
let _ = produce_done_rx.await;
225-
let produce_elapsed = start.elapsed();
226-
println!(
227-
"Produce: {NUM_TRANSACTIONS} messages in {:.2?} ({:.0} msgs/sec)",
228-
produce_elapsed,
229-
NUM_TRANSACTIONS as f64 / produce_elapsed.as_secs_f64()
230-
);
263+
if consume {
264+
done_rx.recv().await;
265+
let elapsed = start.elapsed();
266+
println!(
267+
"Consume: {NUM_TRANSACTIONS} messages in {:.2?} ({:.0} msgs/sec)",
268+
elapsed,
269+
NUM_TRANSACTIONS as f64 / elapsed.as_secs_f64()
270+
);
271+
}
231272

232-
done_rx.recv().await;
233-
let total_elapsed = start.elapsed();
234-
let consume_elapsed = total_elapsed - produce_elapsed;
235-
println!(
236-
"Consume: {NUM_TRANSACTIONS} messages in {:.2?} ({:.0} msgs/sec)",
237-
consume_elapsed,
238-
NUM_TRANSACTIONS as f64 / consume_elapsed.as_secs_f64()
239-
);
240-
println!(
241-
"Total: {:.2?}",
242-
total_elapsed,
243-
);
244273
Ok(())
245274
}

hydro_test/src/kafka/mod.rs

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
use std::future::Future;
2-
31
use hydro_lang::live_collections::boundedness::Boundedness;
42
use hydro_lang::live_collections::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Ordering};
53
use hydro_lang::location::tick::{NoAtomic, NoTick};
64
use hydro_lang::location::Location;
75
use hydro_lang::prelude::*;
86
use rdkafka::message::OwnedMessage;
9-
use rdkafka::producer::FutureProducer;
7+
use rdkafka::producer::BaseProducer;
8+
9+
type SharedProducer = std::sync::Arc<BaseProducer>;
1010

1111
#[ctor::ctor]
1212
fn init_rewrites() {
1313
stageleft::add_private_reexport(
14-
vec!["rdkafka", "producer", "future_producer"],
14+
vec!["rdkafka", "producer", "base_producer"],
1515
vec!["rdkafka", "producer"],
1616
);
1717
stageleft::add_private_reexport(
@@ -32,7 +32,7 @@ fn init_rewrites() {
3232
);
3333
}
3434

35-
/// Creates a Kafka `FutureProducer` singleton.
35+
/// Creates a Kafka `BaseProducer` singleton wrapped in `Arc` for sharing.
3636
///
3737
/// The topic will be created on the broker before the producer is returned.
3838
/// This runs on the deployed host, so it works even when brokers are in a
@@ -43,17 +43,19 @@ pub fn kafka_producer<'a, Loc>(
4343
security_protocol: &'a str,
4444
topic: &'a str,
4545
num_partitions: i32,
46-
) -> Singleton<FutureProducer, Loc, Bounded>
46+
) -> Singleton<SharedProducer, Loc, Bounded>
4747
where
4848
Loc: Location<'a> + NoTick + NoAtomic,
4949
{
5050
location.singleton(q!({
5151
self::setup_topic_blocking(brokers, topic, num_partitions, security_protocol);
52-
rdkafka::config::ClientConfig::new()
53-
.set("bootstrap.servers", brokers)
54-
.set("security.protocol", security_protocol)
55-
.create::<rdkafka::producer::FutureProducer>()
56-
.expect("Failed to create Kafka producer")
52+
std::sync::Arc::new(
53+
rdkafka::config::ClientConfig::new()
54+
.set("bootstrap.servers", brokers)
55+
.set("security.protocol", security_protocol)
56+
.create::<rdkafka::producer::BaseProducer>()
57+
.expect("Failed to create Kafka producer"),
58+
)
5759
}))
5860
}
5961

@@ -104,38 +106,35 @@ where
104106
.weaken_ordering()
105107
}
106108

107-
/// Sends `(key, payload)` pairs to a Kafka topic.
109+
/// Sends `(key, payload)` pairs to a Kafka topic using `BaseProducer`.
110+
/// Messages are queued without waiting for acks. `poll(Duration::ZERO)` is
111+
/// called after each send to drive delivery callbacks.
108112
pub fn dest_kafka<'a, Loc, Bound: Boundedness, Order: Ordering>(
109-
producer: Singleton<FutureProducer, Loc, Bounded>,
113+
producer: Singleton<SharedProducer, Loc, Bounded>,
110114
input: Stream<(String, String), Loc, Bound, Order, ExactlyOnce>,
111115
topic: &'a str,
112-
) where
116+
) -> Stream<SharedProducer, Loc, Bound, Order, ExactlyOnce>
117+
where
113118
Loc: Location<'a>,
114119
{
115120
input
116121
.cross_singleton(producer)
117-
.map(q!(
118-
|((key, payload), producer)| self::kafka_send(producer, topic, key, payload)
119-
))
120-
.resolve_futures_blocking();
121-
}
122-
123-
fn kafka_send(
124-
producer: FutureProducer,
125-
topic: &str,
126-
key: String,
127-
payload: String,
128-
) -> impl Future<Output = ()> {
129-
let topic = topic.to_owned();
130-
async move {
131-
let record = rdkafka::producer::FutureRecord::to(&topic)
132-
.key(&key)
133-
.payload(&payload);
134-
producer
135-
.send(record, rdkafka::util::Timeout::Never)
136-
.await
137-
.expect("Failed to send message to Kafka");
138-
}
122+
.map(q!(|((key, payload), producer)| {
123+
loop {
124+
let record = rdkafka::producer::BaseRecord::to(topic)
125+
.key(&key)
126+
.payload(&payload);
127+
match producer.send(record) {
128+
Ok(()) => break,
129+
Err((rdkafka::error::KafkaError::MessageProduction(rdkafka::types::RDKafkaErrorCode::QueueFull), _)) => {
130+
producer.poll(std::time::Duration::from_millis(100));
131+
}
132+
Err((e, _)) => panic!("Failed to send message to Kafka: {}", e),
133+
}
134+
}
135+
producer.poll(std::time::Duration::ZERO);
136+
producer
137+
}))
139138
}
140139

141140
/// Admin helper: create a topic with the given number of partitions.

0 commit comments

Comments
 (0)