Skip to content

Commit c2d032f

Browse files
committed
quic: eliminate per-received datagram allocation
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode:Opus 4.6
1 parent e6c276f commit c2d032f

5 files changed

Lines changed: 68 additions & 68 deletions

File tree

src/quic/application.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,13 @@ void Session::Application::SendPendingData() {
448448

449449
// Awesome, let's write our packet!
450450
PacketInfo pi;
451-
ssize_t nwrite = WriteVStream(
452-
&path, &pi, packet->data(), &ndatalen, packet->length(),
453-
stream_data, ts);
451+
ssize_t nwrite = WriteVStream(&path,
452+
&pi,
453+
packet->data(),
454+
&ndatalen,
455+
packet->length(),
456+
stream_data,
457+
ts);
454458

455459
// When ndatalen is > 0, that's our indication that stream data was accepted
456460
// in to the packet. Yay!

src/quic/endpoint.cc

Lines changed: 43 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -312,10 +312,18 @@ class Endpoint::UDP::Impl final : public HandleWrap {
312312
SET_SELF_SIZE(Impl)
313313

314314
private:
315+
// Pre-allocated receive buffer. Reused across all datagrams because
316+
// ngtcp2_conn_read_pkt is synchronous — it copies what it needs and
317+
// does not retain a reference to the buffer after returning. This
318+
// eliminates a malloc(64KB)/free(64KB) cycle per received datagram.
319+
static constexpr size_t kRecvBufferSize = 65536; // UV__UDP_DGRAM_MAXSIZE
320+
char recv_buf_[kRecvBufferSize];
321+
315322
static void OnAlloc(uv_handle_t* handle,
316323
size_t suggested_size,
317324
uv_buf_t* buf) {
318-
*buf = From(handle)->env()->allocate_managed_buffer(suggested_size);
325+
auto* impl = From(handle);
326+
*buf = uv_buf_init(impl->recv_buf_, kRecvBufferSize);
319327
}
320328

321329
static void OnReceive(uv_udp_t* handle,
@@ -327,26 +335,22 @@ class Endpoint::UDP::Impl final : public HandleWrap {
327335
DCHECK_NOT_NULL(impl);
328336
DCHECK_NOT_NULL(impl->endpoint_);
329337

330-
auto release_buf = [&]() {
331-
if (buf->base != nullptr) impl->env()->release_managed_buffer(*buf);
332-
};
333-
334338
// Nothing to do in these cases. Specifically, if the nread
335339
// is zero or we have received a partial packet, we are just
336-
// going to ignore it.
340+
// going to ignore it. No buffer release needed — recv_buf_
341+
// is pre-allocated and reused.
337342
if (nread == 0 || flags & UV_UDP_PARTIAL) {
338-
release_buf();
339343
return;
340344
}
341345

342346
if (nread < 0) {
343-
release_buf();
344347
impl->endpoint_->Destroy(CloseContext::RECEIVE_FAILURE,
345348
static_cast<int>(nread));
346349
return;
347350
}
348351

349-
impl->endpoint_->Receive(uv_buf_init(buf->base, static_cast<size_t>(nread)),
352+
impl->endpoint_->Receive(reinterpret_cast<const uint8_t*>(buf->base),
353+
static_cast<size_t>(nread),
350354
SocketAddress(addr));
351355
}
352356

@@ -1240,24 +1244,25 @@ void Endpoint::CloseGracefully() {
12401244
MaybeDestroy();
12411245
}
12421246

1243-
void Endpoint::Receive(const uv_buf_t& buf,
1247+
void Endpoint::Receive(const uint8_t* data,
1248+
size_t len,
12441249
const SocketAddress& remote_address) {
12451250
const auto receive = [&](Session* session,
1246-
Store&& store,
1251+
const uint8_t* pkt_data,
1252+
size_t pkt_len,
12471253
const SocketAddress& local_address,
12481254
const SocketAddress& remote_address,
12491255
const CID& dcid,
12501256
const CID& scid) {
12511257
DCHECK_NOT_NULL(session);
12521258
if (session->is_destroyed()) return;
1253-
size_t len = store.length();
12541259
// Use ReadPacket (no SendPendingDataScope) so that multiple packets
12551260
// received in the same I/O burst are processed before any responses
12561261
// are generated. The deferred flush via BindingData's uv_check
12571262
// callback calls SendPendingData once per dirty session after all
12581263
// packets in the burst have been read.
1259-
if (session->ReadPacket(std::move(store), local_address, remote_address)) {
1260-
STAT_INCREMENT_N(Stats, bytes_received, len);
1264+
if (session->ReadPacket(pkt_data, pkt_len, local_address, remote_address)) {
1265+
STAT_INCREMENT_N(Stats, bytes_received, pkt_len);
12611266
STAT_INCREMENT(Stats, packets_received);
12621267
}
12631268
// Schedule the session for deferred SendPendingData if it hasn't
@@ -1269,7 +1274,9 @@ void Endpoint::Receive(const uv_buf_t& buf,
12691274
}
12701275
};
12711276

1272-
const auto accept = [&](const Session::Config& config, Store&& store) {
1277+
const auto accept = [&](const Session::Config& config,
1278+
const uint8_t* pkt_data,
1279+
size_t pkt_len) {
12731280
// One final check. If the endpoint is closed, closing, or is not listening
12741281
// as a server, then we cannot accept the initial packet.
12751282
if (is_closed() || is_closing() || !is_listening()) return;
@@ -1299,7 +1306,8 @@ void Endpoint::Receive(const uv_buf_t& buf,
12991306
return;
13001307

13011308
receive(session.get(),
1302-
std::move(store),
1309+
pkt_data,
1310+
pkt_len,
13031311
config.local_address,
13041312
config.remote_address,
13051313
config.dcid,
@@ -1309,7 +1317,8 @@ void Endpoint::Receive(const uv_buf_t& buf,
13091317
const auto acceptInitialPacket = [&](const uint32_t version,
13101318
const CID& dcid,
13111319
const CID& scid,
1312-
Store&& store,
1320+
const uint8_t* pkt_data,
1321+
size_t pkt_len,
13131322
const SocketAddress& local_address,
13141323
const SocketAddress& remote_address) {
13151324
// If we're not listening as a server, do not accept an initial packet.
@@ -1319,8 +1328,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
13191328

13201329
// This is our first condition check... A minimal check to see if ngtcp2 can
13211330
// even recognize this packet as a quic packet.
1322-
ngtcp2_vec vec = store;
1323-
if (ngtcp2_accept(&hd, vec.base, vec.len) != NGTCP2_SUCCESS) {
1331+
if (ngtcp2_accept(&hd, pkt_data, pkt_len) != NGTCP2_SUCCESS) {
13241332
// Per the ngtcp2 docs, ngtcp2_accept returns 0 if the check was
13251333
// successful, or an error code if it was not. Currently there's only one
13261334
// documented error code (NGTCP2_ERR_INVALID_ARGUMENT) but we'll handle
@@ -1558,7 +1566,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
15581566
}
15591567
}
15601568

1561-
accept(config, std::move(store));
1569+
accept(config, pkt_data, pkt_len);
15621570
};
15631571

15641572
// When a received packet contains a QUIC short header but cannot be matched
@@ -1574,35 +1582,37 @@ void Endpoint::Receive(const uv_buf_t& buf,
15741582
// possible to avoid a DOS vector.
15751583
const auto maybeStatelessReset = [&](const CID& dcid,
15761584
const CID& scid,
1577-
Store& store,
1585+
const uint8_t* pkt_data,
1586+
size_t pkt_len,
15781587
const SocketAddress& local_address,
15791588
const SocketAddress& remote_address) {
15801589
// Support for stateless resets can be disabled by the application. If that
15811590
// case, or if the packet is too short to contain a reset token, then we
15821591
// skip the remaining checks.
15831592
if (options_.disable_stateless_reset ||
1584-
store.length() < NGTCP2_STATELESS_RESET_TOKENLEN) {
1593+
pkt_len < NGTCP2_STATELESS_RESET_TOKENLEN) {
15851594
return false;
15861595
}
15871596

15881597
// The stateless reset token itself is the *final*
15891598
// NGTCP2_STATELESS_RESET_TOKENLEN bytes in the received packet. If it is a
15901599
// stateless reset then then rest of the bytes in the packet are garbage
15911600
// that we'll ignore.
1592-
ngtcp2_vec vec = store;
1593-
vec.base += (vec.len - NGTCP2_STATELESS_RESET_TOKENLEN);
1601+
const uint8_t* token_pos =
1602+
pkt_data + (pkt_len - NGTCP2_STATELESS_RESET_TOKENLEN);
15941603

15951604
// If a Session has been associated with the token, then it is a valid
15961605
// stateless reset token. We need to dispatch it to the session to be
15971606
// processed.
15981607
auto* session = session_manager().FindSessionByStatelessResetToken(
1599-
StatelessResetToken(vec.base));
1608+
StatelessResetToken(token_pos));
16001609
if (session != nullptr) {
16011610
// If the session happens to have been destroyed already, we'll
16021611
// just ignore the packet.
16031612
if (!session->is_destroyed()) [[likely]] {
16041613
receive(session,
1605-
std::move(store),
1614+
pkt_data,
1615+
pkt_len,
16061616
local_address,
16071617
remote_address,
16081618
dcid,
@@ -1630,22 +1640,8 @@ void Endpoint::Receive(const uv_buf_t& buf,
16301640
// return;
16311641
// }
16321642

1633-
Debug(this, "Received %zu-byte packet from %s", buf.len, remote_address);
1634-
1635-
// The managed buffer here contains the received packet. We do not yet know
1636-
// at this point if it is a valid QUIC packet. We need to do some basic
1637-
// checks. It is critical at this point that we do as little work as possible
1638-
// to avoid a DOS vector.
1639-
std::shared_ptr<BackingStore> backing = env()->release_managed_buffer(buf);
1640-
if (!backing) [[unlikely]] {
1641-
// At this point something bad happened and we need to treat this as a fatal
1642-
// case. There's likely no way to test this specific condition reliably.
1643-
return Destroy(CloseContext::RECEIVE_FAILURE, UV_ENOMEM);
1644-
}
1645-
1646-
Store store(std::move(backing), buf.len, 0);
1643+
Debug(this, "Received %zu-byte packet from %s", len, remote_address);
16471644

1648-
ngtcp2_vec vec = store;
16491645
ngtcp2_version_cid pversion_cid;
16501646

16511647
// This is our first check to see if the received data can be processed as a
@@ -1654,7 +1650,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
16541650
// valid QUIC header but there is still no guarantee that the packet can be
16551651
// successfully processed.
16561652
switch (ngtcp2_pkt_decode_version_cid(
1657-
&pversion_cid, vec.base, vec.len, NGTCP2_MAX_CIDLEN)) {
1653+
&pversion_cid, data, len, NGTCP2_MAX_CIDLEN)) {
16581654
case 0:
16591655
break; // Supported version, continue processing.
16601656
case NGTCP2_ERR_VERSION_NEGOTIATION: {
@@ -1732,7 +1728,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
17321728
// necessary here. We want to return immediately without committing any
17331729
// further resources.
17341730
if (pversion_cid.version == 0 &&
1735-
maybeStatelessReset(dcid, scid, store, addr, remote_address)) {
1731+
maybeStatelessReset(dcid, scid, data, len, addr, remote_address)) {
17361732
Debug(this, "Packet was a stateless reset");
17371733
return; // Stateless reset! Don't do any further processing.
17381734
}
@@ -1747,17 +1743,13 @@ void Endpoint::Receive(const uv_buf_t& buf,
17471743
SendStatelessReset(
17481744
PathDescriptor{
17491745
pversion_cid.version, dcid, scid, addr, remote_address},
1750-
store.length());
1746+
len);
17511747
return;
17521748
}
17531749

17541750
// Process the packet as an initial packet...
1755-
return acceptInitialPacket(pversion_cid.version,
1756-
dcid,
1757-
scid,
1758-
std::move(store),
1759-
addr,
1760-
remote_address);
1751+
return acceptInitialPacket(
1752+
pversion_cid.version, dcid, scid, data, len, addr, remote_address);
17611753
}
17621754

17631755
if (session->is_destroyed()) [[unlikely]] {
@@ -1769,7 +1761,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
17691761
// If we got here, the dcid matched the scid of a known local session. Yay!
17701762
// The session will take over any further processing of the packet.
17711763
Debug(this, "Dispatching packet to known session");
1772-
receive(session.get(), std::move(store), addr, remote_address, dcid, scid);
1764+
receive(session.get(), data, len, addr, remote_address, dcid, scid);
17731765

17741766
// It is important to note that the session may have been destroyed during
17751767
// the call to receive(...). If that's the case, the session object still

src/quic/endpoint.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ class Endpoint final : public AsyncWrap, public Packet::Listener {
409409
// Ref() causes a listening Endpoint to keep the event loop active.
410410
JS_METHOD(Ref);
411411

412-
void Receive(const uv_buf_t& buf, const SocketAddress& from);
412+
void Receive(const uint8_t* data, size_t len, const SocketAddress& from);
413413

414414
AliasedStruct<Stats> stats_;
415415
AliasedStruct<State> state_;

src/quic/session.cc

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,7 +2101,8 @@ void Session::SetLastError(QuicError&& error) {
21012101
impl_->last_error_ = std::move(error);
21022102
}
21032103

2104-
bool Session::Receive(Store&& store,
2104+
bool Session::Receive(const uint8_t* data,
2105+
size_t len,
21052106
const SocketAddress& local_address,
21062107
const SocketAddress& remote_address,
21072108
const PacketInfo& pkt_info,
@@ -2112,24 +2113,23 @@ bool Session::Receive(Store&& store,
21122113
// The hot receive path uses ReadPacket() directly with deferred
21132114
// flush via BindingData's uv_check callback.
21142115
SendPendingDataScope send_scope(this);
2115-
return ReadPacket(
2116-
std::move(store), local_address, remote_address, pkt_info, ts);
2116+
return ReadPacket(data, len, local_address, remote_address, pkt_info, ts);
21172117
}
21182118

2119-
bool Session::ReadPacket(Store&& store,
2119+
bool Session::ReadPacket(const uint8_t* data,
2120+
size_t len,
21202121
const SocketAddress& local_address,
21212122
const SocketAddress& remote_address,
21222123
const PacketInfo& pkt_info,
21232124
uint64_t ts) {
21242125
DCHECK(!is_destroyed());
21252126
impl_->remote_address_ = remote_address;
21262127

2127-
ngtcp2_vec vec = store;
21282128
Path path(local_address, remote_address);
21292129

21302130
Debug(this,
21312131
"Session is receiving %zu-byte packet received along path %s",
2132-
vec.len,
2132+
len,
21332133
path);
21342134

21352135
// It is important to understand that reading the packet will cause
@@ -2150,19 +2150,18 @@ bool Session::ReadPacket(Store&& store,
21502150
// receive path caches a timestamp and passes it to all ReadPacket()
21512151
// calls in the same I/O burst.
21522152
if (ts == 0) ts = uv_hrtime();
2153-
err = ngtcp2_conn_read_pkt(
2154-
*this, &path, pkt_info, vec.base, vec.len, ts);
2153+
err = ngtcp2_conn_read_pkt(*this, &path, pkt_info, data, len, ts);
21552154
}
21562155
if (is_destroyed()) return false;
21572156

2158-
Debug(this, "Session receiving %zu-byte packet with result %d", vec.len, err);
2157+
Debug(this, "Session receiving %zu-byte packet with result %d", len, err);
21592158

21602159
switch (err) {
21612160
case 0: {
2162-
Debug(this, "Session successfully received %zu-byte packet", vec.len);
2161+
Debug(this, "Session successfully received %zu-byte packet", len);
21632162
if (!is_destroyed()) [[likely]] {
21642163
auto& stats_ = impl_->stats_;
2165-
STAT_INCREMENT_N(Stats, bytes_received, vec.len);
2164+
STAT_INCREMENT_N(Stats, bytes_received, len);
21662165
// Process deferred operations that couldn't run inside callback
21672166
// scopes (e.g., HTTP/3 GOAWAY handling that calls into JS).
21682167
application().PostReceive();

src/quic/session.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,8 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
353353
bool early = false;
354354
};
355355

356-
bool Receive(Store&& store,
356+
bool Receive(const uint8_t* data,
357+
size_t len,
357358
const SocketAddress& local_address,
358359
const SocketAddress& remote_address,
359360
const PacketInfo& pkt_info = PacketInfo(),
@@ -367,10 +368,14 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
367368
// Receive() is kept as a convenience wrapper that calls ReadPacket()
368369
// then triggers SendPendingData (for paths like Connect that need
369370
// immediate response).
371+
// The data pointer is used synchronously — ngtcp2_conn_read_pkt does
372+
// not retain a reference after returning, so the caller's buffer can
373+
// be reused immediately.
370374
// When ts is 0 (the default), uv_hrtime() is called internally.
371375
// The batched receive path caches a timestamp and passes it to all
372376
// ReadPacket() calls in the same I/O burst.
373-
bool ReadPacket(Store&& store,
377+
bool ReadPacket(const uint8_t* data,
378+
size_t len,
374379
const SocketAddress& local_address,
375380
const SocketAddress& remote_address,
376381
const PacketInfo& pkt_info = PacketInfo(),

0 commit comments

Comments
 (0)