Skip to content

Commit 2a4e7ab

Browse files
committed
cleanup sqs one-write code
1 parent b9b257b commit 2a4e7ab

1 file changed

Lines changed: 64 additions & 129 deletions

File tree

hydro_test/src/aws/sqs.rs

Lines changed: 64 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use aws_config::SdkConfig;
22
use aws_sdk_sqs::Client;
3+
use aws_sdk_sqs::error::SdkError;
4+
use aws_sdk_sqs::operation::receive_message::ReceiveMessageError;
35
use aws_sdk_sqs::types::{DeleteMessageBatchRequestEntry, Message};
46
use futures_util::stream::StreamExt as _;
57
use hydro_lang::live_collections::boundedness::Boundedness;
6-
use hydro_lang::live_collections::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Ordering};
8+
use hydro_lang::live_collections::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Ordering, TotalOrder};
79
use hydro_lang::location::Location;
810
use hydro_lang::prelude::*;
911

@@ -22,73 +24,46 @@ pub fn sqs_client<'a, Loc: Location<'a>>(
2224
sdk_config.map(q!(|c| Client::new(&c)))
2325
}
2426

25-
/// At-least-once delivery, message ordering isn't preserved.
27+
/// At-least-once unordered delivery from a _standard_ SQS queue.
2628
pub fn source_sqs_standard<'a, Loc: Location<'a>>(
2729
client: Singleton<Client, Loc, Bounded>,
2830
queue_url: &'a str,
2931
) -> Stream<Message, Loc, Bounded, NoOrder, AtLeastOnce> {
3032
client
3133
.flat_map_stream_unordered(q!(move |client| {
32-
let recv_msg = client
33-
.receive_message()
34-
.queue_url(queue_url)
35-
.wait_time_seconds(10)
36-
.max_number_of_messages(10);
37-
let delete_msg = client.delete_message_batch().queue_url(queue_url);
3834
futures_util::stream::unfold((), move |()| {
39-
let recv_msg = recv_msg.clone();
40-
let delete_msg = delete_msg.clone();
35+
let fut = self::sqs_recv(&client, queue_url);
4136
async move {
42-
let result = recv_msg.send().await;
43-
let output = match result {
44-
Ok(output) => output,
45-
Err(e) => {
46-
eprintln!("error receiving message: {e}");
47-
return None;
48-
}
49-
};
50-
51-
let messages = output.messages.unwrap_or_default();
52-
if messages.is_empty() {
53-
return None;
54-
}
55-
56-
// TODO(mingwei): Should we give the user control over this?
57-
delete_msg
58-
.set_entries(Some(
59-
messages
60-
.iter()
61-
.enumerate()
62-
.map(|(i, msg)| {
63-
DeleteMessageBatchRequestEntry::builder()
64-
.id(i.to_string())
65-
.receipt_handle(msg.receipt_handle().unwrap())
66-
.build()
67-
.unwrap()
68-
})
69-
.collect(),
70-
))
71-
.send()
72-
.await
73-
.unwrap();
74-
75-
Some((messages, ()))
37+
let vec = fut.await.expect("Failed to receive from SQS.")?;
38+
Some((vec, ()))
7639
}
7740
})
7841
.flat_map(|vec| futures_util::stream::iter(vec))
7942
}))
8043
.weaken_retries()
8144
}
8245

83-
// /// Exactly-once in-order delivery.
84-
// ///
85-
// /// The user must ensure `queue_url` has is a FIFO queue.
86-
// pub fn source_sqs_fifo<'a, Loc: Location<'a>>(
87-
// client: Singleton<Client, Loc, Bounded>,
88-
// queue_url: &'a str,
89-
// ) -> Stream<Message, Loc, Bounded, TotalOrder, ExactlyOnce> {
90-
// client.flat_map_stream_ordered(q!(|client| self::sqs_stream(client, queue_url)))
91-
// }
46+
/// Exactly-once in-order delivery from a FIFO SQS queue.
47+
///
48+
/// # Non-Determinism
49+
/// The user must ensure `queue_url` is a FIFO queue. If it is not, the output order will be non-deterministic.
50+
pub fn source_sqs_fifo<'a, Loc: Location<'a>>(
51+
client: Singleton<Client, Loc, Bounded>,
52+
queue_url: &'a str,
53+
_nondet: NonDet,
54+
) -> Stream<Message, Loc, Bounded, TotalOrder, ExactlyOnce> {
55+
client
56+
.flat_map_stream_ordered(q!(move |client| {
57+
futures_util::stream::unfold((), move |()| {
58+
let fut = self::sqs_recv(&client, queue_url);
59+
async move {
60+
let vec = fut.await.expect("Failed to receive from SQS.")?;
61+
Some((vec, ()))
62+
}
63+
})
64+
.flat_map(|vec| futures_util::stream::iter(vec))
65+
}))
66+
}
9267

9368
/// Writes messages to SQS.
9469
pub fn dest_sqs<'a, Loc: Location<'a>, Bound: Boundedness, Order: Ordering>(
@@ -104,6 +79,7 @@ pub fn dest_sqs<'a, Loc: Location<'a>, Bound: Boundedness, Order: Ordering>(
10479
.resolve_futures_blocking();
10580
}
10681

82+
// Sends one message to the `queue_url` SQS queue.
10783
fn sqs_send(client: &Client, queue_url: &str, message: String) -> impl use<> + Future<Output = ()> {
10884
let message = client
10985
.send_message()
@@ -114,82 +90,41 @@ fn sqs_send(client: &Client, queue_url: &str, message: String) -> impl use<> + F
11490
}
11591
}
11692

117-
// // TODO(mingwei): return meaningful error type (once DFIR supports it).
118-
// fn sqs_sink(
119-
// client: &Client,
120-
// queue_url: &str,
121-
// ) -> impl use<> + futures::sink::Sink<SendMessageBatchRequestEntry, Error = Infallible> {
122-
// futures::sink::unfold(
123-
// (Vec::new(), client.send_message_batch().queue_url(queue_url)),
124-
// move |(mut buf, send_msg), message| async move {
125-
// debug_assert!(buf.len() < 10);
126-
// buf.push(message);
127-
// if 10 == buf.len() {
128-
// let output = (&send_msg)
129-
// .clone()
130-
// .set_entries(Some(std::mem::take(&mut buf)))
131-
// .send()
132-
// .await
133-
// .expect("Failed to send batch to SQS");
134-
// if !output.failed().is_empty() {
135-
// let err_msg = output
136-
// .failed()
137-
// .iter()
138-
// .fold(String::new(), |acc, failed| format!("{acc}\n{failed:?}"));
139-
// panic!("{}", err_msg);
140-
// }
141-
// }
142-
// Ok((buf, send_msg))
143-
// },
144-
// )
145-
// }
146-
147-
// /// Returns a stream of all available SQS messages. Ends when no more messages are immediately available.
148-
// fn sqs_stream(
149-
// client: Client,
150-
// queue_url: &str,
151-
// ) -> impl use<> + futures_util::stream::Stream<Item = Message> {
152-
// let recv_msg = client
153-
// .receive_message()
154-
// .queue_url(queue_url)
155-
// .wait_time_seconds(10)
156-
// .max_number_of_messages(10);
157-
// let delete_msg = client.delete_message_batch().queue_url(queue_url);
158-
// futures_util::stream::unfold((), move |()| {
159-
// let recv_msg = recv_msg.clone();
160-
// let delete_msg = delete_msg.clone();
161-
// async move {
162-
// let result = recv_msg.send().await;
163-
// let output = match result {
164-
// Ok(output) => output,
165-
// Err(e) => {
166-
// eprintln!("error receiving message: {e}");
167-
// return None;
168-
// }
169-
// };
93+
/// Receives and deletes up to 10 messages from the `queue_url` SQS queue.
94+
fn sqs_recv(client: &Client, queue_url: &str) -> impl use<> + Future<Output = Result<Option<Vec<Message>>, SdkError<ReceiveMessageError>>> {
95+
let recv_msg = client
96+
.receive_message()
97+
.queue_url(queue_url)
98+
.wait_time_seconds(10)
99+
.max_number_of_messages(10);
100+
let delete_msg = client.delete_message_batch().queue_url(queue_url);
101+
async move {
102+
let output = recv_msg.send().await?;
170103

171-
// let messages = output.messages.unwrap_or_default();
104+
let messages = output.messages.unwrap_or_default();
105+
if messages.is_empty() {
106+
return Ok(None);
107+
}
172108

173-
// delete_msg
174-
// .set_entries(Some(
175-
// messages
176-
// .iter()
177-
// .enumerate()
178-
// .map(|(i, msg)| {
179-
// DeleteMessageBatchRequestEntry::builder()
180-
// .id(i.to_string())
181-
// .receipt_handle(msg.receipt_handle().unwrap())
182-
// .build()
183-
// .unwrap()
184-
// })
185-
// .collect(),
186-
// ))
187-
// .send()
188-
// .await
189-
// .unwrap();
109+
// TODO(mingwei): Should we give the user control over deletion?
110+
delete_msg
111+
.set_entries(Some(
112+
messages
113+
.iter()
114+
.enumerate()
115+
.map(|(i, msg)| {
116+
DeleteMessageBatchRequestEntry::builder()
117+
.id(i.to_string())
118+
.receipt_handle(msg.receipt_handle().unwrap())
119+
.build()
120+
.unwrap()
121+
})
122+
.collect(),
123+
))
124+
.send()
125+
.await
126+
.unwrap();
190127

191-
// Some((messages, ()))
192-
// }
193-
// })
194-
// .flat_map(|vec| futures_util::stream::iter(vec))
195-
// }
128+
Ok(Some(messages))
129+
}
130+
}

0 commit comments

Comments
 (0)