11use hydro_lang:: live_collections:: boundedness:: Boundedness ;
22use hydro_lang:: live_collections:: stream:: { AtLeastOnce , ExactlyOnce , NoOrder , Ordering } ;
3- use hydro_lang:: location:: tick:: { NoAtomic , NoTick } ;
43use hydro_lang:: location:: Location ;
4+ use hydro_lang:: location:: tick:: { NoAtomic , NoTick } ;
55use hydro_lang:: prelude:: * ;
66use rdkafka:: message:: OwnedMessage ;
77use rdkafka:: producer:: BaseProducer ;
@@ -72,36 +72,38 @@ where
7272{
7373 location
7474 . singleton ( q ! ( {
75- let consumer: rdkafka:: consumer:: StreamConsumer =
76- rdkafka:: config:: ClientConfig :: new( )
77- . set( "bootstrap.servers" , brokers)
78- . set( "group.id" , group_id)
79- . set( "auto.offset.reset" , "earliest" )
80- . set( "security.protocol" , security_protocol)
81- . create( )
82- . expect( "Failed to create Kafka consumer" ) ;
75+ let consumer: rdkafka:: consumer:: StreamConsumer = rdkafka:: config:: ClientConfig :: new( )
76+ . set( "bootstrap.servers" , brokers)
77+ . set( "group.id" , group_id)
78+ . set( "auto.offset.reset" , "earliest" )
79+ . set( "security.protocol" , security_protocol)
80+ . create( )
81+ . expect( "Failed to create Kafka consumer" ) ;
8382 rdkafka:: consumer:: Consumer :: subscribe( & consumer, & [ topic] )
8483 . expect( "Failed to subscribe to topic" ) ;
8584 std:: sync:: Arc :: new( consumer)
8685 } ) )
8786 . into_stream ( )
88- . flat_map_stream_blocking ( q ! (
89- |consumer: std:: sync:: Arc <rdkafka:: consumer:: StreamConsumer >| {
90- futures_util:: stream:: unfold( consumer, |consumer| async move {
91- loop {
92- match rdkafka:: consumer:: StreamConsumer :: recv( & * consumer) . await {
93- Ok ( msg) => {
94- return Some ( ( rdkafka:: message:: BorrowedMessage :: detach( & msg) , consumer) ) ;
95- }
96- Err ( e) => {
97- eprintln!( "Kafka consumer error: {}" , e) ;
98- continue ;
99- }
87+ . flat_map_stream_blocking ( q ! ( |consumer: std:: sync:: Arc <
88+ rdkafka:: consumer:: StreamConsumer ,
89+ >| {
90+ futures_util:: stream:: unfold( consumer, |consumer| async move {
91+ loop {
92+ match rdkafka:: consumer:: StreamConsumer :: recv( & * consumer) . await {
93+ Ok ( msg) => {
94+ return Some ( (
95+ rdkafka:: message:: BorrowedMessage :: detach( & msg) ,
96+ consumer,
97+ ) ) ;
98+ }
99+ Err ( e) => {
100+ eprintln!( "Kafka consumer error: {}" , e) ;
101+ continue ;
100102 }
101103 }
102- } )
103- }
104- ) )
104+ }
105+ } )
106+ } ) )
105107 . weaken_retries ( )
106108 . weaken_ordering ( )
107109}
@@ -126,7 +128,12 @@ where
126128 . payload( & payload) ;
127129 match producer. send( record) {
128130 Ok ( ( ) ) => break ,
129- Err ( ( rdkafka:: error:: KafkaError :: MessageProduction ( rdkafka:: types:: RDKafkaErrorCode :: QueueFull ) , _) ) => {
131+ Err ( (
132+ rdkafka:: error:: KafkaError :: MessageProduction (
133+ rdkafka:: types:: RDKafkaErrorCode :: QueueFull ,
134+ ) ,
135+ _,
136+ ) ) => {
130137 producer. poll( std:: time:: Duration :: from_millis( 100 ) ) ;
131138 }
132139 Err ( ( e, _) ) => panic!( "Failed to send message to Kafka: {}" , e) ,
@@ -157,14 +164,24 @@ pub async fn setup_topic(brokers: &str, topic: &str, num_partitions: i32, securi
157164
158165/// Blocking version of [`setup_topic`] for use inside synchronous `q!()` blocks.
159166/// Uses the existing tokio runtime handle from the Hydro process.
160- pub fn setup_topic_blocking ( brokers : & str , topic : & str , num_partitions : i32 , security_protocol : & str ) {
167+ pub fn setup_topic_blocking (
168+ brokers : & str ,
169+ topic : & str ,
170+ num_partitions : i32 ,
171+ security_protocol : & str ,
172+ ) {
161173 let handle = tokio:: runtime:: Handle :: current ( ) ;
162174 let brokers = brokers. to_owned ( ) ;
163175 let topic = topic. to_owned ( ) ;
164176 let security_protocol = security_protocol. to_owned ( ) ;
165177 // Spawn a separate thread to avoid calling block_on from within an async context.
166178 std:: thread:: spawn ( move || {
167- handle. block_on ( setup_topic ( & brokers, & topic, num_partitions, & security_protocol) ) ;
179+ handle. block_on ( setup_topic (
180+ & brokers,
181+ & topic,
182+ num_partitions,
183+ & security_protocol,
184+ ) ) ;
168185 } )
169186 . join ( )
170187 . expect ( "Topic setup thread panicked" ) ;
0 commit comments