Skip to content

Commit c1c9479

Browse files
Address PR #2746 review comments for AWS SQS support
Changes made: - Fix typo: `chaning` → `changing` in sqs.rs comment - Replace concrete AWS account ID/queue URL with placeholders in example - Weaken `source_sqs_fifo` guarantees from `TotalOrder`/`ExactlyOnce` to `NoOrder`/`AtLeastOnce` since SQS FIFO only guarantees ordering within a single message group; removed `NonDet` parameter; added detailed doc comment - Add doc comment to `dest_sqs` noting it's for standard queues only (no `message_group_id` set) - Add `features = ["alloc"]` to `futures-util` dependency for stream module - Remove `aws_sqs` from `.vscode/settings.json` cargo features - Improve `nondet!` justification comments in example - Remove unused `TotalOrder` import from sqs.rs Comments dismissed as incorrect/not actionable: - EC2 hardcoded region/AMI: acceptable for an example - Delete-before-return: already has TODO, AtLeastOnce marker is accurate - `?` terminates stream on empty poll: intentional design (same as resolved) - flat_map yields Vec not Message: wrong (Option iterates over inner Vec) Co-authored-by: Infinity 🤖 <infinity@hydro.run>
1 parent 67c7bb3 commit c1c9479

4 files changed

Lines changed: 23 additions & 14 deletions

File tree

.vscode/settings.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
"rust-analyzer.rustfmt.extraArgs": [
1212
"+nightly"
1313
],
14-
"rust-analyzer.cargo.features": [
15-
"aws_sqs"
16-
],
14+
"rust-analyzer.cargo.features": [],
1715
"editor.semanticTokenColorCustomizations": {
1816
"enabled": true,
1917
"rules": {

hydro_test/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ aws-config = { version = "1", features = [
4545
"credentials-login",
4646
], optional = true }
4747
aws-sdk-sqs = { version = "1", optional = true }
48-
futures-util = { version = "0.3.0", default-features = false, optional = true }
48+
futures-util = { version = "0.3.0", default-features = false, features = ["alloc"], optional = true }
4949

5050
[build-dependencies]
5151
stageleft_tool.workspace = true

hydro_test/examples/aws_sqs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use stageleft::q;
1313

1414
type HostCreator = Box<dyn Fn(&mut Deployment) -> Arc<dyn Host>>;
1515

16-
// aws sqs send-message --queue-url 'https://sqs.us-west-1.amazonaws.com/557701608578/hydro_test_queue_standard' --message-body 'foobar'
17-
// AWS_PROFILE='test-profile' cargo run -p hydro_test --example aws_sqs --all-features -- --queue-url 'https://sqs.us-west-1.amazonaws.com/557701608578/hydro_test_queue_standard'
16+
// aws sqs send-message --queue-url 'https://sqs.<REGION>.amazonaws.com/<ACCOUNT_ID>/<QUEUE_NAME>' --message-body 'foobar'
17+
// AWS_PROFILE='<AWS_PROFILE>' cargo run -p hydro_test --example aws_sqs --all-features -- --queue-url 'https://sqs.<REGION>.amazonaws.com/<ACCOUNT_ID>/<QUEUE_NAME>'
1818
#[derive(Parser, Debug)]
1919
struct Args {
2020
#[clap(flatten)]
@@ -62,10 +62,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6262
let sqs_client = sqs_client(sdk_config);
6363
source_sqs_standard(sqs_client, &args.queue_url)
6464
.assume_ordering::<TotalOrder>(
65-
nondet!(/** Print all messages at least once in arbitrary order */),
65+
nondet!(/** Safe to assume total order because the side effect is only printing messages for observation/debugging. */),
6666
)
6767
.assume_retries::<ExactlyOnce>(
68-
nondet!(/** Print all messages at least once in arbitrary order */),
68+
nondet!(/** Safe to assume exactly-once because the side effect is only printing messages for observation/debugging. */),
6969
)
7070
.for_each(q!(|msg| {
7171
println!("MSG: {:?} {:?}", msg.message_id(), msg.body());

hydro_test/src/aws/sqs.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use aws_sdk_sqs::types::{DeleteMessageBatchRequestEntry, Message};
66
use futures_util::stream::StreamExt as _;
77
use hydro_lang::live_collections::boundedness::Boundedness;
88
use hydro_lang::live_collections::stream::{
9-
AtLeastOnce, ExactlyOnce, NoOrder, Ordering, TotalOrder,
9+
AtLeastOnce, ExactlyOnce, NoOrder, Ordering,
1010
};
1111
use hydro_lang::location::Location;
1212
use hydro_lang::prelude::*;
@@ -23,7 +23,7 @@ fn init_rewrites() {
2323
pub fn sqs_client<'a, Loc: Location<'a>>(
2424
sdk_config: Singleton<SdkConfig, Loc, Bounded>,
2525
) -> Singleton<Client, Loc, Bounded> {
26-
// TODO(mingwei): Should this be unbounded since it is a ball of chaning state?
26+
// TODO(mingwei): Should this be unbounded since it is a ball of changing state?
2727
sdk_config.map(q!(|c| Client::new(&c)))
2828
}
2929

@@ -48,15 +48,21 @@ pub fn source_sqs_standard<'a, Loc: Location<'a>>(
4848
.weaken_ordering()
4949
}
5050

51-
/// Exactly-once in-order delivery from a FIFO SQS queue.
51+
/// Delivery from a FIFO SQS queue.
52+
///
53+
/// SQS FIFO only guarantees ordering within a single message group, and
54+
/// duplicates can still occur around visibility-timeout boundaries. Therefore
55+
/// this source conservatively returns `NoOrder` / `AtLeastOnce`. Callers who
56+
/// know their FIFO configuration enforces a single message group and
57+
/// deduplication can `assume_ordering` / `assume_retries` with a [`NonDet`]
58+
/// justification.
5259
///
5360
/// # Non-Determinism
5461
/// The user must ensure `queue_url` is a FIFO queue. If it is not, the output order will be non-deterministic.
5562
pub fn source_sqs_fifo<'a, Loc: Location<'a>>(
5663
client: Singleton<Client, Loc, Bounded>,
5764
queue_url: &'a str,
58-
_nondet: NonDet,
59-
) -> Stream<Message, Loc, Bounded, TotalOrder, ExactlyOnce> {
65+
) -> Stream<Message, Loc, Bounded, NoOrder, AtLeastOnce> {
6066
client
6167
.into_stream()
6268
.flat_map_stream_blocking(q!(move |client| {
@@ -69,9 +75,14 @@ pub fn source_sqs_fifo<'a, Loc: Location<'a>>(
6975
})
7076
.flat_map(futures_util::stream::iter)
7177
}))
78+
.weaken_retries()
79+
.weaken_ordering()
7280
}
7381

74-
/// Writes messages to SQS.
82+
/// Writes messages to a _standard_ SQS queue.
83+
///
84+
/// Does not set `message_group_id`, so sending to a FIFO queue will fail at
85+
/// runtime with `MissingParameter`.
7586
pub fn dest_sqs<'a, Loc: Location<'a>, Bound: Boundedness, Order: Ordering>(
7687
client: Singleton<Client, Loc, Bounded>,
7788
input: Stream<String, Loc, Bound, Order, ExactlyOnce>,

0 commit comments

Comments
 (0)