Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ def connection_made(self, transport):
To receive data, wait for data_received() calls.
When the connection is closed, connection_lost() is called.
"""
if self.closed:
# A concurrent close() may have torn the connection down while the
# transport was still being built. Setting initializing=True below
# would resurrect an already-closed connection mid-teardown and
# break the fail_in_flight_requests invariant; refuse instead. The
# caller (manager._connect) closes the orphaned transport.
raise Errors.KafkaConnectionError('Connection closed during connect')
self.transport = transport
if self.transport.get_protocol() != self:
self.transport.set_protocol(self)
Expand Down
16 changes: 16 additions & 0 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,22 @@ async def _build_transport(self, node, timeout_at=None):
return transport

async def _connect(self, node, conn, reset_backoff_on_connect=True, timeout_at=None):
# Tracks ownership of the freshly built transport: while non-None it is
# ours to clean up (the connection hasn't taken it over yet), so the
# finally clause closes it. Cleared once connection_made() succeeds.
transport = None
try:
transport = await self._build_transport(node, timeout_at=timeout_at)
# The connection (or the whole manager) may have been closed while
# we were building the transport. Handing it to connection_made()
# would flip the conn back to `initializing` and resurrect a
# connection that is already being torn down. Discard
# the new transport instead of reviving a dead connection.
if conn.closed or self.closed:
log.debug('%s: closed during connect; discarding new transport', conn)
return
conn.connection_made(transport)
transport = None # conn owns cleanup now; skip finally: transport.close()
await conn.initialize(timeout_at=timeout_at)
except Exception as exc:
log.error('Connection failed: %s', exc)
Expand All @@ -252,6 +265,9 @@ async def _connect(self, node, conn, reset_backoff_on_connect=True, timeout_at=N
Errors.AuthorizationError)):
self._auth_failures[node.node_id] = exc
return
finally:
if transport is not None:
transport.close()

if self._sensors:
self._sensors.connection_created.record()
Expand Down
37 changes: 37 additions & 0 deletions test/net/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,43 @@ def test_close_no_connections(self, manager):
manager.close()


class TestKafkaConnectionManagerConnectRace:
"""A connection can be closed (by manager.close() / bootstrap teardown)
while its _connect() coroutine is still awaiting _build_transport. When
the transport finally arrives, _connect must not resurrect the dead
connection via connection_made() -- doing so flips it back to
`initializing`."""

def test_connect_discards_transport_when_closed_during_build(self, net):
manager = KafkaConnectionManager(net)
node = MagicMock(host='broker', port=9092, node_id='bootstrap-0')
conn = KafkaConnection(net, node_id='bootstrap-0', **manager.config)
transport = MagicMock()

async def fake_build_transport(n, timeout_at=None):
# Simulate a concurrent close landing mid-connect.
conn.close()
return transport

with patch.object(manager, '_build_transport',
side_effect=fake_build_transport):
net.run(manager._connect(node, conn))

# Dead connection stays dead -- not resurrected to `initializing`.
assert conn.closed
assert conn.initializing is False
assert conn.transport is None
# And the orphaned transport is cleaned up rather than leaked.
transport.close.assert_called_once()

def test_connection_made_refuses_closed_connection(self, net):
conn = KafkaConnection(net, node_id='bootstrap-0')
conn.close()
assert conn.closed
with pytest.raises(Errors.KafkaConnectionError):
conn.connection_made(MagicMock())


class TestKafkaConnectionManagerRun:
def test_run_function(self, manager):
def test_coro():
Expand Down
Loading