From 9e54c09c6bb3f39e3886db6b71955614d6c671d3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 23 Jun 2026 07:59:25 -0700 Subject: [PATCH] Fix _build_transport / conn.close race --- kafka/net/connection.py | 7 +++++++ kafka/net/manager.py | 16 ++++++++++++++++ test/net/test_manager.py | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/kafka/net/connection.py b/kafka/net/connection.py index d81a09950..5cf333655 100644 --- a/kafka/net/connection.py +++ b/kafka/net/connection.py @@ -266,6 +266,13 @@ def connection_made(self, transport): To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called. """ + if self.closed: + # A concurrent close() may have torn the connection down while the + # transport was still being built. Setting initializing=True below + # would resurrect an already-closed connection mid-teardown and + # break the fail_in_flight_requests invariant; refuse instead. The + # caller (manager._connect) closes the orphaned transport. + raise Errors.KafkaConnectionError('Connection closed during connect') self.transport = transport if self.transport.get_protocol() != self: self.transport.set_protocol(self) diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 192ca4898..f3536b2e8 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -240,9 +240,22 @@ async def _build_transport(self, node, timeout_at=None): return transport async def _connect(self, node, conn, reset_backoff_on_connect=True, timeout_at=None): + # Tracks ownership of the freshly built transport: while non-None it is + # ours to clean up (the connection hasn't taken it over yet), so the + # finally clause closes it. Cleared once connection_made() succeeds. + transport = None try: transport = await self._build_transport(node, timeout_at=timeout_at) + # The connection (or the whole manager) may have been closed while + # we were building the transport. Handing it to connection_made() + # would flip the conn back to `initializing` and resurrect a + # connection that is already being torn down. Discard + # the new transport instead of reviving a dead connection. + if conn.closed or self.closed: + log.debug('%s: closed during connect; discarding new transport', conn) + return conn.connection_made(transport) + transport = None # conn owns cleanup now; skip finally: transport.close() await conn.initialize(timeout_at=timeout_at) except Exception as exc: log.error('Connection failed: %s', exc) @@ -252,6 +265,9 @@ async def _connect(self, node, conn, reset_backoff_on_connect=True, timeout_at=N Errors.AuthorizationError)): self._auth_failures[node.node_id] = exc return + finally: + if transport is not None: + transport.close() if self._sensors: self._sensors.connection_created.record() diff --git a/test/net/test_manager.py b/test/net/test_manager.py index 83c88f19b..bc8661d2f 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -356,6 +356,43 @@ def test_close_no_connections(self, manager): manager.close() +class TestKafkaConnectionManagerConnectRace: + """A connection can be closed (by manager.close() / bootstrap teardown) + while its _connect() coroutine is still awaiting _build_transport. When + the transport finally arrives, _connect must not resurrect the dead + connection via connection_made() -- doing so flips it back to + `initializing`.""" + + def test_connect_discards_transport_when_closed_during_build(self, net): + manager = KafkaConnectionManager(net) + node = MagicMock(host='broker', port=9092, node_id='bootstrap-0') + conn = KafkaConnection(net, node_id='bootstrap-0', **manager.config) + transport = MagicMock() + + async def fake_build_transport(n, timeout_at=None): + # Simulate a concurrent close landing mid-connect. + conn.close() + return transport + + with patch.object(manager, '_build_transport', + side_effect=fake_build_transport): + net.run(manager._connect(node, conn)) + + # Dead connection stays dead -- not resurrected to `initializing`. + assert conn.closed + assert conn.initializing is False + assert conn.transport is None + # And the orphaned transport is cleaned up rather than leaked. + transport.close.assert_called_once() + + def test_connection_made_refuses_closed_connection(self, net): + conn = KafkaConnection(net, node_id='bootstrap-0') + conn.close() + assert conn.closed + with pytest.raises(Errors.KafkaConnectionError): + conn.connection_made(MagicMock()) + + class TestKafkaConnectionManagerRun: def test_run_function(self, manager): def test_coro():