Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 103 additions & 17 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
HostTargetingStatement)
from cassandra.marshal import int64_pack
from cassandra.tablets import Tablet, Tablets
from cassandra.tablets import Tablet, Tablets, choose_tablet_version_block, random_tablet_version_block
from cassandra.timestamps import MonotonicTimestampGenerator
from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query

Expand Down Expand Up @@ -3059,6 +3059,8 @@
continuous_paging_options, statement_keyspace)
elif isinstance(query, BoundStatement):
prepared_statement = query.prepared_statement
# The tablet_version_block is filled in per-target-host at send time
# (see ResponseFuture._query), because V2 is negotiated per connection.
message = ExecuteMessage(
prepared_statement.query_id, query.values, cl,
serial_cl, fetch_size, paging_state, timestamp,
Expand Down Expand Up @@ -3093,6 +3095,37 @@
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
continuous_paging_state=None, host=host)

def _compute_tablet_version_block(self, query):
"""
Compute the tablet_version_block byte for a BoundStatement.
Returns an int in [0, 255] or None if V2 should not be used.
"""
Comment on lines +3099 to +3102
routing_key = query.routing_key
if routing_key is None:
return random_tablet_version_block()

keyspace = query.keyspace or self.keyspace
table = query.table
if not keyspace or not table:
return random_tablet_version_block()

# Skip the Murmur3 token hash + tablet lookup when we have no cached
# tablets for this table (vnode tables, or tablet tables on cold start);
# both correctly fall back to a random block below.
if not self.cluster.metadata._tablets.table_has_tablets(keyspace, table):
return random_tablet_version_block()

token_map = self.cluster.metadata.token_map
if token_map is None:
return random_tablet_version_block()

t = query.routing_token(token_map.token_class)
tablet = self.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t)
if tablet is None or tablet.tablet_version is None:
return random_tablet_version_block()

return choose_tablet_version_block(tablet.tablet_version)

def get_execution_profile(self, name):
"""
Returns the execution profile associated with the provided ``name``.
Expand Down Expand Up @@ -3765,6 +3798,7 @@

_uses_peers_v2 = True
_tablets_routing_v1 = False
_tablets_routing_v2 = False

# for testing purposes
_time = time
Expand Down Expand Up @@ -3899,6 +3933,7 @@
else datetime.timedelta(seconds=self._cluster.metadata_request_timeout)

self._tablets_routing_v1 = connection.features.tablets_routing_v1
self._tablets_routing_v2 = connection.features.tablets_routing_v2

# use weak references in both directions
# _clear_watcher will be called when this ControlConnection is about to be finalized
Expand Down Expand Up @@ -4612,7 +4647,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4650 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.11)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down Expand Up @@ -4713,6 +4748,7 @@
_host = None
_control_connection_query_attempted = False
_TABLET_ROUTING_CTYPE = None
_TABLET_ROUTING_V2_CTYPE = None

_warned_timeout = False

Expand Down Expand Up @@ -5002,7 +5038,12 @@
try:
# TODO get connectTimeout from cluster settings
if self.query:
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
# Pass the statement so the pool can reuse the ring token it
# memoized for this request instead of re-hashing the routing key.
connection, request_id = pool.borrow_connection(
timeout=2.0, routing_key=self.query.routing_key,
keyspace=self.query.keyspace, table=self.query.table,
query=self.query)
else:
connection, request_id = pool.borrow_connection(timeout=2.0)
self._connection = connection
Expand All @@ -5011,6 +5052,28 @@
if cb is None:
cb = partial(self._set_result, host, connection, pool)

if isinstance(message, ExecuteMessage):
# Attach the tablet_version_block only when the *specific connection*
# we are about to send on negotiated TABLETS_ROUTING_V2. The server
# reads the trailing tablet_version_block byte only on connections
# that negotiated V2 (gated on the cluster-wide feature), so keying
# off the borrowed connection -- which is already in hand here, since
# borrow_connection() ran above -- is both necessary and sufficient:
# * a V2 connection always gets the block, even if this pool was
# created (and any cached flag latched) before the cluster
# feature was enabled, e.g. mid rolling-upgrade;
# * a non-V2 connection never gets it, even if a sibling shard
# connection in the same pool already negotiated V2 -- which can
# happen transiently while connections opened before and after
# the feature flip coexist. Attaching the block to a non-V2
# connection would leave an unread trailing byte and desync the
# frame, so a pool-level flag cannot get this right regardless of
# how it is latched.
if connection.features.tablets_routing_v2:
message.tablet_version_block = self.session._compute_tablet_version_block(self.query)
else:
message.tablet_version_block = None
Comment on lines +5072 to +5075
Comment on lines +5055 to +5075

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Attach the V2 block on control-connection fallback too.

Line 5072 handles pooled connections, but _query_control_connection() can also send an ExecuteMessage; on a V2 control connection that path omits the mandatory trailing byte and can trigger a protocol error.

Proposed direction
+    def _set_tablet_version_block_for_connection(self, message, connection):
+        if not isinstance(message, ExecuteMessage):
+            return
+        if connection.features.tablets_routing_v2:
+            message.tablet_version_block = self.session._compute_tablet_version_block(self.query)
+        else:
+            message.tablet_version_block = None
+
     def _query(self, host, message=None, cb=None):
         ...
-            if isinstance(message, ExecuteMessage):
-                ...
-                if connection.features.tablets_routing_v2:
-                    message.tablet_version_block = self.session._compute_tablet_version_block(self.query)
-                else:
-                    message.tablet_version_block = None
+            self._set_tablet_version_block_for_connection(message, connection)

Then call the same helper in _query_control_connection() after self._connection = connection and before connection.send_msg(...).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cassandra/cluster.py` around lines 5055 - 5075, The ExecuteMessage
tablet_version_block is only being set in the pooled connection path, so
control-connection fallback can miss the mandatory V2 trailing byte. Update
_query_control_connection() in cassandra/cluster.py to use the same
tablets_routing_v2 check and _compute_tablet_version_block(self.query) logic as
the ExecuteMessage handling in the main send path, placing it after
self._connection = connection and before connection.send_msg(...) so V2 control
connections always include the block and non-V2 ones do not.


Comment on lines +5055 to +5076
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
encoder=self._protocol_handler.encode_message,
decoder=self._protocol_handler.decode_message,
Expand Down Expand Up @@ -5128,21 +5191,44 @@
self._warnings = getattr(response, 'warnings', None)
self._custom_payload = getattr(response, 'custom_payload', None)

if self._custom_payload and self.session.cluster.control_connection._tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
protocol = self.session.cluster.protocol_version
info = self._custom_payload.get('tablets-routing-v1')
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
tablet_routing_info = ctype.from_binary(info, protocol)
first_token = tablet_routing_info[0]
last_token = tablet_routing_info[1]
tablet_replicas = tablet_routing_info[2]
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
keyspace = self.query.keyspace
table = self.query.table
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
if self._custom_payload and connection is not None:
# Parse the routing payload according to what the connection that
# *served this request* negotiated, not the control connection:
# during a rolling upgrade connections may differ, and each
# payload key matches the extension its own connection negotiated.
if connection.features.tablets_routing_v2 and 'tablets-routing-v2' in self._custom_payload:
protocol = self.session.cluster.protocol_version
info = self._custom_payload.get('tablets-routing-v2')
ctype = ResponseFuture._TABLET_ROUTING_V2_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)')
ResponseFuture._TABLET_ROUTING_V2_CTYPE = ctype
tablet_routing_info = ctype.from_binary(info, protocol)
first_token = tablet_routing_info[0]
last_token = tablet_routing_info[1]
tablet_replicas = tablet_routing_info[2]
tablet_version = tablet_routing_info[3]
tablet = Tablet.from_row(first_token, last_token, tablet_replicas, tablet_version)
keyspace = self.query.keyspace
table = self.query.table
if tablet:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
protocol = self.session.cluster.protocol_version
info = self._custom_payload.get('tablets-routing-v1')
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
tablet_routing_info = ctype.from_binary(info, protocol)
first_token = tablet_routing_info[0]
last_token = tablet_routing_info[1]
tablet_replicas = tablet_routing_info[2]
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
keyspace = self.query.keyspace
table = self.query.table
if tablet:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
Comment on lines +5212 to +5231

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Cache routing payloads under the effective keyspace.

Line 5212 and Line 5228 use only self.query.keyspace, but _compute_tablet_version_block() looks up tablets with query.keyspace or self.keyspace. Prepared statements executed in a session keyspace can therefore cache under (None, table) and miss the cache on every later request.

Proposed fix
-                    keyspace = self.query.keyspace
+                    keyspace = self.query.keyspace or self.session.keyspace
                     table = self.query.table
-                    if tablet:
+                    if tablet and keyspace and table:
                         self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
...
-                    keyspace = self.query.keyspace
+                    keyspace = self.query.keyspace or self.session.keyspace
                     table = self.query.table
-                    if tablet:
+                    if tablet and keyspace and table:
                         self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
keyspace = self.query.keyspace
table = self.query.table
if tablet:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
protocol = self.session.cluster.protocol_version
info = self._custom_payload.get('tablets-routing-v1')
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
tablet_routing_info = ctype.from_binary(info, protocol)
first_token = tablet_routing_info[0]
last_token = tablet_routing_info[1]
tablet_replicas = tablet_routing_info[2]
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
keyspace = self.query.keyspace
table = self.query.table
if tablet:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
keyspace = self.query.keyspace or self.session.keyspace
table = self.query.table
if tablet and keyspace and table:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
protocol = self.session.cluster.protocol_version
info = self._custom_payload.get('tablets-routing-v1')
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
if ctype is None:
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
tablet_routing_info = ctype.from_binary(info, protocol)
first_token = tablet_routing_info[0]
last_token = tablet_routing_info[1]
tablet_replicas = tablet_routing_info[2]
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
keyspace = self.query.keyspace or self.session.keyspace
table = self.query.table
if tablet and keyspace and table:
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cassandra/cluster.py` around lines 5212 - 5231, The tablet-routing cache is
using only self.query.keyspace when adding tablets, which can store entries
under a None key and miss cache hits for session-level keyspaces. Update the
tablet handling in ResponseFuture to use the effective keyspace consistently,
matching _compute_tablet_version_block() by falling back to self.keyspace when
self.query.keyspace is unset, and then pass that resolved keyspace into
metadata._tablets.add_tablet().


if isinstance(response, ResultMessage):
if response.kind == RESULT_KIND_SET_KEYSPACE:
Expand Down
63 changes: 63 additions & 0 deletions cassandra/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ class KeyspaceMetadata(object):
A string indicating whether a graph engine is enabled for this keyspace (Core/Classic).
"""

strongly_consistent = False
"""
A boolean indicating whether this keyspace uses strongly-consistent (Raft-based)
tablets. ``True`` only for ScyllaDB keyspaces whose ``consistency`` option is
not eventual (i.e. ``local`` or ``global``). ``False`` for eventually-consistent
keyspaces and for non-ScyllaDB clusters.
"""

_exc_info = None
""" set if metadata parsing failed """

Expand All @@ -815,6 +823,7 @@ def __init__(self, name, durable_writes, strategy_class, strategy_options, graph
self.aggregates = {}
self.views = {}
self.graph_engine = graph_engine
self.strongly_consistent = False

@property
def is_graph_enabled(self):
Expand Down Expand Up @@ -2577,6 +2586,11 @@ class SchemaParserV3(SchemaParserV22):
_SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates"
_SELECT_VIEWS = "SELECT * FROM system_schema.views"

# ScyllaDB-only: per-keyspace consistency option. The column is null for
# eventually-consistent keyspaces (and the whole table is absent on Cassandra
# and on Scylla versions without strongly-consistent tablets).
_SELECT_SCYLLA_KEYSPACES = "SELECT keyspace_name, consistency FROM system_schema.scylla_keyspaces"

def _is_not_scylla(self):
"""Check if NOT connected to ScyllaDB by checking for shard awareness."""
return getattr(getattr(self.connection, 'features', None), 'shard_id', None) is None
Expand Down Expand Up @@ -2610,12 +2624,61 @@ def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
self.indexes_result = []
self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list))
self.keyspace_view_rows = defaultdict(list)
self._scylla_consistency_cache = None

@staticmethod
def _is_strongly_consistent(consistency):
# The server stores the keyspace consistency option as 'eventual',
# 'local', or 'global' (null == eventual). Only non-eventual keyspaces
# have a leader, so treat those as strongly consistent.
return consistency not in (None, "eventual")

def _get_scylla_keyspaces_consistency(self):
"""
Return a ``{keyspace_name: consistency}`` map read from
``system_schema.scylla_keyspaces``.

Only ScyllaDB has this table, and only for keyspaces with a non-default
consistency option. Returns ``{}`` on non-Scylla clusters or when the
table/column is unavailable (older Scylla), in which case every keyspace
is treated as eventually consistent. The result is cached per parser
instance so it is fetched at most once per schema refresh.
"""
if self._scylla_consistency_cache is not None:
return self._scylla_consistency_cache

if self._is_not_scylla():
self._scylla_consistency_cache = {}
return self._scylla_consistency_cache

consistency_by_ks = {}
try:
rows = self._query_build_rows(self._SELECT_SCYLLA_KEYSPACES, lambda row: row)
consistency_by_ks = {row["keyspace_name"]: row.get("consistency") for row in rows}
except Exception:
log.debug("Could not read system_schema.scylla_keyspaces; treating all "
"keyspaces as eventually consistent", exc_info=True)
Comment on lines +2658 to +2660

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win

Narrow the Scylla-keyspaces fallback exception.

Line 2658 should tolerate the expected “table/column unavailable” schema error, but catching every Exception also hides timeouts, connection failures, and parser bugs; Ruff flags this as BLE001. Please catch the specific expected driver/server exception(s) and let unexpected refresh failures surface.

🧰 Tools
🪛 Ruff (0.15.18)

[warning] 2658-2658: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cassandra/metadata.py` around lines 2658 - 2660, The fallback in metadata
schema refresh is too broad because the except block in the
system_schema.scylla_keyspaces read path catches every Exception, which masks
unexpected failures. Narrow the handler around the code in metadata.py that logs
“Could not read system_schema.scylla_keyspaces” so it only catches the specific
expected driver/server schema-unavailable error(s), and let other refresh errors
propagate. Keep the existing debug log and exc_info behavior for the expected
case, but do not use a blanket Exception catch in this branch.

Source: Linters/SAST tools


self._scylla_consistency_cache = consistency_by_ks
return consistency_by_ks

def _set_strong_consistency(self, keyspace_meta):
consistency = self._get_scylla_keyspaces_consistency().get(keyspace_meta.name)
keyspace_meta.strongly_consistent = self._is_strongly_consistent(consistency)
return keyspace_meta

def get_keyspace(self, keyspaces, keyspace):
keyspace_meta = super(SchemaParserV3, self).get_keyspace(keyspaces, keyspace)
if keyspace_meta is not None:
self._set_strong_consistency(keyspace_meta)
return keyspace_meta

def get_all_keyspaces(self):
for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces():
for row in self.keyspace_view_rows[keyspace_meta.name]:
view_meta = self._build_view_metadata(row)
keyspace_meta._add_view_metadata(view_meta)
self._set_strong_consistency(keyspace_meta)
yield keyspace_meta

def get_table(self, keyspaces, keyspace, table):
Expand Down
34 changes: 29 additions & 5 deletions cassandra/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,30 @@ def make_query_plan(self, working_keyspace=None, query=None):
return

replicas = []
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
leader_host = None
token = query.routing_token(self._cluster_metadata.token_map.token_class)
tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, token)

if tablet is not None:
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
child_plan = child.make_query_plan(keyspace, query)

replicas = [host for host in child_plan if host.host_id in replicas_mapped]

# The leader concept only exists for strongly-consistent keyspaces.
# TABLETS_ROUTING_V2 assigns a tablet_version to *every* tablet table
# (eventually- and strongly-consistent alike), so the version alone
# must not be used to infer a leader. For strongly-consistent
# keyspaces the first replica is the leader; yield it first for
# leader-aware routing. Eventually-consistent keyspaces keep normal
# token-aware/shuffled ordering.
ks_meta = self._cluster_metadata.keyspaces.get(keyspace)
if ks_meta is not None and ks_meta.strongly_consistent and tablet.replicas:
leader_host_id = tablet.replicas[0][0]
for host in replicas:
if host.host_id == leader_host_id:
leader_host = host
break
else:
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)

Expand All @@ -523,10 +539,18 @@ def yield_in_order(hosts):
if replica.is_up and child.distance(replica) == distance:
yield replica

# yield replicas: local_rack, local, remote
yield from yield_in_order(replicas)
# If we have a leader hint, yield it first unconditionally.
if leader_host is not None and leader_host.is_up:
yield leader_host

# yield replicas: local_rack, local, remote (skipping leader already yielded)
for host in yield_in_order(replicas):
if host is not leader_host:
yield host
# yield rest of the cluster: local_rack, local, remote
yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas])
for host in yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]):
if host is not leader_host:
yield host

def on_up(self, *args, **kwargs):
return self._child_policy.on_up(*args, **kwargs)
Expand Down
Loading
Loading