Skip to content

Commit 0ebfc70

Browse files
committed
Fix NetSocket where the end signal is notified before all messages have been dispatched.
Motivation: The NetSocket implementation directly write the end sentinel message to the pending message queue, ignoging the potentially buffered messages in the connection. Changes: When we consider the message stream ends, we should instead model the last message as a connection message to ensure no reorder happens.
1 parent 5e5f8ee commit 0ebfc70

3 files changed

Lines changed: 58 additions & 4 deletions

File tree

vertx-core/src/main/java/io/vertx/core/net/impl/NetSocketImpl.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public synchronized NetSocket handler(Handler<Buffer> dataHandler) {
180180
return this;
181181
}
182182

183-
private synchronized Handler<Object> messageHandler() {
183+
private synchronized MessageHandler messageHandler() {
184184
return messageHandler;
185185
}
186186

@@ -190,6 +190,10 @@ public synchronized NetSocketInternal messageHandler(Handler<Object> handler) {
190190
messageHandler = new DataMessageHandler();
191191
} else {
192192
messageHandler = new MessageHandler() {
193+
@Override
194+
public void handleEnd() {
195+
// Noop
196+
}
193197
@Override
194198
public void pause() {
195199
doPause();
@@ -390,16 +394,25 @@ public Future<Void> end() {
390394
protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
391395
}
392396

397+
private void handleEnded() {
398+
read(InboundBuffer.END_SENTINEL);
399+
endRead();
400+
}
401+
393402
@Override
394403
protected void handleClosed() {
395-
pending.write(InboundBuffer.END_SENTINEL);
404+
handleEnded();
396405
super.handleClosed();
397406
}
398407

399408
@Override
400409
public void handleMessage(Object msg) {
401-
Handler<Object> handler = messageHandler();
402-
handler.handle(msg);
410+
MessageHandler handler = messageHandler();
411+
if (msg == InboundBuffer.END_SENTINEL) {
412+
handler.handleEnd();
413+
} else {
414+
handler.handle(msg);
415+
}
403416
}
404417

405418
@Override
@@ -432,6 +445,7 @@ public NetSocketImpl shutdownHandler(@Nullable Handler<Void> handler) {
432445
interface MessageHandler extends Handler<Object> {
433446
void pause();
434447
void fetch(long amount);
448+
void handleEnd();
435449
}
436450

437451
private class DataMessageHandler implements MessageHandler {
@@ -446,6 +460,11 @@ public void handle(Object msg) {
446460
}
447461
}
448462

463+
@Override
464+
public void handleEnd() {
465+
pending.write(InboundBuffer.END_SENTINEL);
466+
}
467+
449468
@Override
450469
public void pause() {
451470
pending.pause();

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ final void read(Object msg) {
255255
}
256256
}
257257

258+
final void endRead() {
259+
read = false;
260+
}
261+
258262
private void addPending(Object msg) {
259263
if (pending == null) {
260264
pending = new ArrayDeque<>();

vertx-core/src/test/java/io/vertx/tests/net/NetTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,37 @@ protected void tearDown() throws Exception {
131131
super.tearDown();
132132
}
133133

134+
@Test
135+
public void testEndHandlerCalledAfterAllEmissions() {
136+
Buffer buffer = TestUtils.randomBuffer(1024 * 1024);
137+
server = vertx.createNetServer().connectHandler(so -> {
138+
so.end(buffer);
139+
so.close();
140+
});
141+
server.listen(1234).await();
142+
NetClient client = vertx.createNetClient();
143+
AtomicInteger received = new AtomicInteger();
144+
AtomicInteger ended = new AtomicInteger();
145+
client.connect(1234, "localhost").onComplete(ar -> {
146+
if (ar.succeeded()) {
147+
NetSocket socket = ar.result();
148+
socket.handler(buf -> {
149+
int amount = received.addAndGet(buf.length());
150+
assertEquals(0, ended.get());
151+
socket.pause();
152+
vertx.setTimer(50, t -> {
153+
socket.resume();
154+
});
155+
});
156+
socket.endHandler(v -> {
157+
assertEquals(0, ended.getAndIncrement());
158+
});
159+
}
160+
});
161+
assertWaitUntil(() -> received.get() == buffer.length());
162+
assertWaitUntil(() -> ended.get() > 0);
163+
}
164+
134165
@Test
135166
public void testClientOptions() {
136167
NetClientOptions options = new NetClientOptions();

0 commit comments

Comments
 (0)