@@ -737,6 +737,130 @@ async def test():
737737 with s1 , s2 :
738738 loop .run_until_complete (test ())
739739
740+ def test_create_connection_sock_cancel_detaches (self ):
741+ async def client (addr ):
742+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
743+ sock .setblocking (False )
744+ try :
745+ sock .connect (addr )
746+ except BlockingIOError :
747+ pass
748+ await asyncio .sleep (0.01 )
749+
750+ task = asyncio .ensure_future (
751+ self .loop .create_connection (asyncio .Protocol , sock = sock ))
752+ await asyncio .sleep (0 )
753+ task .cancel ()
754+ with self .assertRaises (asyncio .CancelledError ):
755+ await task
756+
757+ # After cancellation the socket must be detached (fd == -1)
758+ # so that its __del__ won't close a recycled fd.
759+ self .assertEqual (sock .fileno (), - 1 )
760+
761+ def _recv_or_abort (sock ):
762+ try :
763+ sock .recv_all (1 )
764+ except ConnectionAbortedError :
765+ pass
766+
767+ with self .tcp_server (_recv_or_abort ,
768+ max_clients = 1 ,
769+ backlog = 1 ) as srv :
770+ self .loop .run_until_complete (client (srv .addr ))
771+
772+ def test_create_connection_sock_cancel_fd_leak (self ):
773+ # Regression test for https://github.com/MagicStack/uvloop/issues/645
774+ # and https://github.com/aio-libs/aiohttp/issues/10506
775+ #
776+ # When create_connection(sock=sock) is cancelled, the socket must
777+ # be detached so its close()/`__del__` won't double-close the fd.
778+ # Without the fix, libuv closes the fd but the socket object still
779+ # references it, enabling a chain of fd corruption and data leak:
780+ #
781+ # 1. cancel → libuv closes fd N
782+ # 2. New connection (victim) reuses fd N
783+ # 3. Stale sock.close() closes fd N → breaks the victim
784+ # 4. Another fd N is opened (new connection)
785+ # 5. Victim writev(N) → data goes to the wrong connection
786+
787+ async def test ():
788+ srv = await asyncio .start_server (
789+ lambda r , w : None ,
790+ '127.0.0.1' , 0 ,
791+ family = socket .AF_INET )
792+ addr = srv .sockets [0 ].getsockname ()
793+
794+ # --- Step 1: create_connection with sock= and cancel it ---
795+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
796+ sock .setblocking (False )
797+ await self .loop .sock_connect (sock , addr )
798+ stale_fd = sock .fileno ()
799+
800+ task = self .loop .create_task (
801+ self .loop .create_connection (asyncio .Protocol , sock = sock )
802+ )
803+ await asyncio .sleep (0 )
804+ task .cancel ()
805+ with self .assertRaises (asyncio .CancelledError ):
806+ await task
807+
808+ # --- Step 2: a victim connection reuses the fd ---
809+ victim_tr , _ = await self .loop .create_connection (
810+ asyncio .Protocol , * addr )
811+ victim_fd = victim_tr .get_extra_info ('socket' ).fileno ()
812+ if victim_fd != stale_fd :
813+ victim_tr .close ()
814+ sock .close ()
815+ srv .close ()
816+ await srv .wait_closed ()
817+ raise unittest .SkipTest (
818+ f'fd not reused (got { victim_fd } , need { stale_fd } )' )
819+
820+ # --- Step 3: stale sock.close() must NOT kill the victim ---
821+ # Allocate the socketpair BEFORE sock.close() so the pair
822+ # fds don't collide with stale_fd.
823+ spy_a , spy_b = socket .socketpair ()
824+ spy_b .setblocking (False )
825+
826+ sock .close ()
827+
828+ # Check whether sock.close() broke the victim's fd.
829+ victim_broken = False
830+ try :
831+ os .fstat (victim_fd )
832+ except OSError :
833+ victim_broken = True
834+
835+ if victim_broken :
836+ # The victim's fd was killed — place a spy socket on
837+ # the freed fd (in production this would be a new
838+ # incoming connection).
839+ os .dup2 (spy_a .fileno (), stale_fd )
840+ spy_a .close ()
841+
842+ # Victim writes. If victim_broken, writev(stale_fd) goes
843+ # to the spy; otherwise it goes to the real connection.
844+ victim_tr .write (b'LEAKED' )
845+
846+ try :
847+ leaked = spy_b .recv (4096 )
848+ except BlockingIOError :
849+ leaked = b''
850+
851+ if victim_broken :
852+ os .close (stale_fd )
853+ spy_b .close ()
854+ victim_tr .close ()
855+ srv .close ()
856+ await srv .wait_closed ()
857+
858+ self .assertEqual (leaked , b'' ,
859+ f"Data leaked to an unrelated socket: "
860+ f"got { leaked !r} " )
861+
862+ self .loop .run_until_complete (test ())
863+
740864
741865class Test_UV_TCP (_TestTCP , tb .UVTestCase ):
742866
@@ -1181,32 +1305,6 @@ def client():
11811305 # let it close
11821306 self .loop .run_until_complete (asyncio .sleep (0.1 ))
11831307
1184- def test_create_connection_sock_cancel_detaches (self ):
1185- async def client (addr ):
1186- sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
1187- sock .setblocking (False )
1188- try :
1189- sock .connect (addr )
1190- except BlockingIOError :
1191- pass
1192- await asyncio .sleep (0.01 )
1193-
1194- task = asyncio .ensure_future (
1195- self .loop .create_connection (asyncio .Protocol , sock = sock ))
1196- await asyncio .sleep (0 )
1197- task .cancel ()
1198- with self .assertRaises (asyncio .CancelledError ):
1199- await task
1200-
1201- # After cancellation the socket must be detached (fd == -1)
1202- # so that its __del__ won't close a recycled fd.
1203- self .assertEqual (sock .fileno (), - 1 )
1204-
1205- with self .tcp_server (lambda sock : sock .recv_all (1 ),
1206- max_clients = 1 ,
1207- backlog = 1 ) as srv :
1208- self .loop .run_until_complete (client (srv .addr ))
1209-
12101308 @unittest .skipUnless (hasattr (socket , 'AF_UNIX' ), 'no Unix sockets' )
12111309 def test_create_connection_wrong_sock (self ):
12121310 sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
0 commit comments