Skip to content

Commit 9bfd3d3

Browse files
Fix resolve_futures poll_ready blocking on pending futures (#2662)
In `ResolveFutures::poll_ready`, the call to `empty_ready` would return `Pending` when the queue contained unresolved futures in blocking mode (no subgraph_waker). This prevented callers from ever calling `start_send` to add new futures, effectively serializing the queue to one future at a time. The fix makes `poll_ready` always return `Done` after opportunistically draining any ready futures. The `Pending` behavior is preserved in `poll_flush`, which correctly blocks until all futures resolve before flushing downstream. Changes: - `dfir_pipes/src/push/resolve_futures.rs`: - `poll_ready` now discards the `empty_ready` result and always returns `Done`, allowing callers to keep adding futures to the queue. - Added regression test `poll_ready_allows_send_while_futures_pending` that uses a two-poll future to verify `poll_ready` doesn't block. - Updated comment on the `Pending` branch in `empty_ready` to clarify it's only relevant for `poll_flush`. Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 3e6e26c commit 9bfd3d3

1 file changed

Lines changed: 73 additions & 3 deletions

File tree

dfir_pipes/src/push/resolve_futures.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ impl<Psh, Queue, QueueInner> ResolveFutures<Psh, Queue, QueueInner> {
8787
if this.subgraph_waker.is_some() {
8888
return PushStep::Done; // We will be re-woken on a future tick
8989
} else {
90-
// We will pend until the queue is emptied.
91-
// TODO(mingwei): Does this mean only one item may be sent at a time?
90+
// Pend until the queue is emptied. This is used by
91+
// poll_flush to block until all futures resolve.
92+
// poll_ready discards this result so callers can keep
93+
// adding futures (see #2662).
9294
return PushStep::Pending(Yes);
9395
}
9496
}
@@ -110,7 +112,10 @@ where
110112
type CanPend = Yes;
111113

112114
fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
113-
self.as_mut().empty_ready(ctx) // TODO(mingwei): see above
115+
// Opportunistically drain any ready futures from the queue, but don't
116+
// block on pending ones. The queue can always accept new futures.
117+
let _ = self.as_mut().empty_ready(ctx);
118+
PushStep::Done
114119
}
115120

116121
fn start_send(self: Pin<&mut Self>, item: Fut, _meta: ()) {
@@ -160,6 +165,71 @@ mod tests {
160165

161166
type Queue = FuturesUnordered<core::future::Ready<i32>>;
162167

168+
/// Regression test for https://github.com/hydro-project/hydro/issues/2662
169+
///
170+
/// `poll_ready` in blocking mode (no subgraph_waker) must return `Done` even
171+
/// when the queue contains pending futures, so that the caller can keep adding
172+
/// new futures via `start_send`. Otherwise the queue effectively serialises to
173+
/// one future at a time.
174+
#[test]
175+
fn poll_ready_allows_send_while_futures_pending() {
176+
use core::task::Poll;
177+
178+
/// A future that is pending on the first poll and ready on the second.
179+
struct TwoPollFuture(bool);
180+
impl Future for TwoPollFuture {
181+
type Output = i32;
182+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
183+
if self.0 {
184+
Poll::Ready(42)
185+
} else {
186+
self.0 = true;
187+
cx.waker().wake_by_ref();
188+
Poll::Pending
189+
}
190+
}
191+
}
192+
193+
type PendQueue = FuturesUnordered<TwoPollFuture>;
194+
195+
let waker = Waker::noop();
196+
let mut cx = Context::from_waker(waker);
197+
198+
let mock = AsyncMockPush::default();
199+
let mut queue: PendQueue = FuturesUnordered::new();
200+
queue.extend(core::iter::once(TwoPollFuture(false)));
201+
202+
let mut rf = ResolveFutures::<_, _, PendQueue>::new(&mut queue, None, mock);
203+
204+
// poll_ready should return Done so we can send more futures, even though
205+
// the queue has a pending future.
206+
let result = Push::<TwoPollFuture, ()>::poll_ready(Pin::new(&mut rf), &mut cx);
207+
assert!(
208+
result.is_done(),
209+
"poll_ready must not block on pending futures in the queue"
210+
);
211+
212+
// We should be able to add another future.
213+
Push::<TwoPollFuture, ()>::start_send(Pin::new(&mut rf), TwoPollFuture(false), ());
214+
215+
// Now flush should eventually resolve everything.
216+
// (May need multiple calls since futures need two polls each.)
217+
let mut saw_done = false;
218+
for _ in 0..4 {
219+
let r = Push::<TwoPollFuture, ()>::poll_flush(Pin::new(&mut rf), &mut cx);
220+
if r.is_done() {
221+
saw_done = true;
222+
break;
223+
}
224+
}
225+
assert!(
226+
saw_done,
227+
"poll_flush did not complete within the expected number of polls"
228+
);
229+
230+
assert_eq!(rf.push.items.len(), 2, "both futures should have resolved");
231+
}
232+
163233
#[test]
164234
fn test_poll_ready_readies_downstream() {
165235
let waker = Waker::noop();

0 commit comments

Comments
 (0)