diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 11aefadf..3d089b81 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -94,6 +94,7 @@ from dapr.clients.grpc._state import StateItem, StateOptions from dapr.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy +from dapr.common.logging import silence_grpc_aio_poller_noise from dapr.common.pubsub.subscription import StreamInactiveError from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -191,6 +192,7 @@ def __init__( interceptors.append(api_token_interceptor) set_default_grpc_dns_resolver() + silence_grpc_aio_poller_noise() # Create gRPC channel if self._uri.tls: diff --git a/dapr/common/logging.py b/dapr/common/logging.py new file mode 100644 index 00000000..80023053 --- /dev/null +++ b/dapr/common/logging.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import logging + +_GRPC_AIO_POLLER_NOISE_MARKER = 'PollerCompletionQueue._handle_events' + + +class GrpcAioPollerNoiseFilter(logging.Filter): + """Drops the harmless grpc.aio poller BlockingIOError (EAGAIN) records. + + The poller does a non-blocking read on its wake-up fd and can get EAGAIN, which + asyncio logs at ERROR even though the read is retried and nothing is lost. + See https://github.com/grpc/grpc/issues/42357. + """ + + def filter(self, record: logging.LogRecord) -> bool: + exc = record.exc_info[1] if record.exc_info else None + is_poller_noise = isinstance(exc, BlockingIOError) and ( + _GRPC_AIO_POLLER_NOISE_MARKER in record.getMessage() + ) + return not is_poller_noise + + +def silence_grpc_aio_poller_noise() -> None: + """Installs the poller-noise filter on the asyncio logger if not already present.""" + asyncio_logger = logging.getLogger('asyncio') + if not any(isinstance(f, GrpcAioPollerNoiseFilter) for f in asyncio_logger.filters): + asyncio_logger.addFilter(GrpcAioPollerNoiseFilter()) diff --git a/dapr/ext/workflow/AGENTS.md b/dapr/ext/workflow/AGENTS.md index 112c7a11..80ac07d7 100644 --- a/dapr/ext/workflow/AGENTS.md +++ b/dapr/ext/workflow/AGENTS.md @@ -124,7 +124,7 @@ Workflow (orchestrator) functions must remain generators (`def` with `yield`). T **Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`) and an async-vs-sync decision tree. `tests/ext/workflow/durabletask/test_async_dispatch_regression.py` (marked `perf`) guards the core invariant: a batch of async activities overlaps on the event loop instead of serializing through the thread pool. -**grpc.aio poller log noise.** The async client can emit benign `BlockingIOError: [Errno 11]` ERROR lines from `grpc.aio`'s `PollerCompletionQueue` under load. It is harmless and retried. `get_grpc_aio_channel` installs an internal `asyncio`-logger filter (`_silence_grpc_aio_poller_noise`) that drops only those records, so the SDK suppresses it automatically with no user action. +**grpc.aio poller log noise.** The async client can emit benign `BlockingIOError: [Errno 11]` ERROR lines from `grpc.aio`'s `PollerCompletionQueue` under load (upstream grpc/grpc#42357). It is harmless and retried. `get_grpc_aio_channel` installs an `asyncio`-logger filter (`silence_grpc_aio_poller_noise` from `dapr.common.logging`, shared with the core async `DaprClient`) that drops only those records, so the SDK suppresses it automatically with no user action. ### DaprWorkflowClient (`dapr_workflow_client.py`) diff --git a/dapr/ext/workflow/_durabletask/aio/internal/shared.py b/dapr/ext/workflow/_durabletask/aio/internal/shared.py index e4d39eb5..76f232e1 100644 --- a/dapr/ext/workflow/_durabletask/aio/internal/shared.py +++ b/dapr/ext/workflow/_durabletask/aio/internal/shared.py @@ -9,13 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from typing import Optional, Sequence, Union import grpc from grpc import aio as grpc_aio from grpc.aio import ChannelArgumentType +from dapr.common.logging import silence_grpc_aio_poller_noise from dapr.ext.workflow._durabletask.internal.shared import ( INSECURE_PROTOCOLS, SECURE_PROTOCOLS, @@ -29,30 +29,6 @@ grpc_aio.StreamStreamClientInterceptor, ] -_POLLER_NOISE_MARKER = 'PollerCompletionQueue._handle_events' - - -class _GrpcAioPollerNoiseFilter(logging.Filter): - """Drops the harmless grpc.aio poller BlockingIOError (EAGAIN) records. - - The poller does a non-blocking read on its wake-up fd and can get EAGAIN, which - asyncio logs at ERROR even though the read is retried and nothing is lost. - """ - - def filter(self, record: logging.LogRecord) -> bool: - exc = record.exc_info[1] if record.exc_info else None - is_poller_noise = isinstance(exc, BlockingIOError) and ( - _POLLER_NOISE_MARKER in record.getMessage() - ) - return not is_poller_noise - - -def _silence_grpc_aio_poller_noise() -> None: - """Install the poller-noise filter on the asyncio logger if not already present.""" - asyncio_logger = logging.getLogger('asyncio') - if not any(isinstance(f, _GrpcAioPollerNoiseFilter) for f in asyncio_logger.filters): - asyncio_logger.addFilter(_GrpcAioPollerNoiseFilter()) - def get_grpc_aio_channel( host_address: Optional[str], @@ -68,7 +44,7 @@ def get_grpc_aio_channel( interceptors: Optional sequence of client interceptors to apply to the channel. options: Optional sequence of gRPC channel options as (key, value) tuples. Keys defined in https://grpc.github.io/grpc/core/group__grpc__arg__keys.html """ - _silence_grpc_aio_poller_noise() + silence_grpc_aio_poller_noise() if host_address is None: host_address = get_default_host_address() diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index 2c4972dc..ccec1084 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -14,6 +14,7 @@ """ import json +import logging import socket import tempfile import unittest @@ -38,6 +39,7 @@ UnlockResponseStatus, ) from dapr.clients.grpc._state import Concurrency, Consistency, StateItem, StateOptions +from dapr.common.logging import GrpcAioPollerNoiseFilter from dapr.common.pubsub.subscription import StreamInactiveError from dapr.conf import settings from dapr.proto import common_v1 @@ -62,6 +64,18 @@ def setUpClass(cls): def tearDownClass(cls): cls._fake_dapr_server.stop() + async def test_client_installs_grpc_aio_poller_noise_filter(self): + asyncio_logger = logging.getLogger('asyncio') + for existing in list(asyncio_logger.filters): + if isinstance(existing, GrpcAioPollerNoiseFilter): + asyncio_logger.removeFilter(existing) + + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') + await dapr.close() + + installed = [f for f in asyncio_logger.filters if isinstance(f, GrpcAioPollerNoiseFilter)] + self.assertEqual(len(installed), 1) + async def test_http_extension(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') diff --git a/tests/common/__init__.py b/tests/common/__init__.py new file mode 100644 index 00000000..8fb63c69 --- /dev/null +++ b/tests/common/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/tests/common/test_logging.py b/tests/common/test_logging.py new file mode 100644 index 00000000..fc625b9a --- /dev/null +++ b/tests/common/test_logging.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import logging +import sys +import unittest + +from dapr.common.logging import GrpcAioPollerNoiseFilter, silence_grpc_aio_poller_noise + + +def _error_record(msg: str, exc: BaseException | None) -> logging.LogRecord: + exc_info = None + if exc is not None: + try: + raise exc + except BaseException: + exc_info = sys.exc_info() + return logging.LogRecord('asyncio', logging.ERROR, __file__, 1, msg, (), exc_info) + + +class TestGrpcAioPollerNoiseFilter(unittest.TestCase): + def tearDown(self): + asyncio_logger = logging.getLogger('asyncio') + for existing in list(asyncio_logger.filters): + if isinstance(existing, GrpcAioPollerNoiseFilter): + asyncio_logger.removeFilter(existing) + + def test_filter_drops_poller_eagain_record(self): + record = _error_record( + 'Exception in callback PollerCompletionQueue._handle_events()', + BlockingIOError(11, 'Resource temporarily unavailable'), + ) + self.assertFalse(GrpcAioPollerNoiseFilter().filter(record)) + + def test_filter_keeps_record_without_exception(self): + record = _error_record('some other error', None) + self.assertTrue(GrpcAioPollerNoiseFilter().filter(record)) + + def test_filter_keeps_blockingioerror_without_marker(self): + record = _error_record('unrelated message', BlockingIOError(11, 'nope')) + self.assertTrue(GrpcAioPollerNoiseFilter().filter(record)) + + def test_install_is_idempotent(self): + silence_grpc_aio_poller_noise() + silence_grpc_aio_poller_noise() + asyncio_logger = logging.getLogger('asyncio') + installed = [f for f in asyncio_logger.filters if isinstance(f, GrpcAioPollerNoiseFilter)] + self.assertEqual(len(installed), 1) diff --git a/tests/ext/workflow/durabletask/test_grpc_aio_log_filter.py b/tests/ext/workflow/durabletask/test_grpc_aio_log_filter.py index fbc3fbfc..9242b670 100644 --- a/tests/ext/workflow/durabletask/test_grpc_aio_log_filter.py +++ b/tests/ext/workflow/durabletask/test_grpc_aio_log_filter.py @@ -10,62 +10,22 @@ # limitations under the License. import logging -import sys from unittest.mock import patch +from dapr.common.logging import GrpcAioPollerNoiseFilter from dapr.ext.workflow._durabletask.aio.internal import shared HOST_ADDRESS = 'localhost:50051' -def _record(msg: str, exc: BaseException | None) -> logging.LogRecord: - exc_info = None - if exc is not None: - try: - raise exc - except BaseException: - exc_info = sys.exc_info() - return logging.LogRecord('asyncio', logging.ERROR, __file__, 1, msg, (), exc_info) - - -def test_filter_drops_poller_eagain_record(): - record = _record( - 'Exception in callback PollerCompletionQueue._handle_events()', - BlockingIOError(11, 'Resource temporarily unavailable'), - ) - assert shared._GrpcAioPollerNoiseFilter().filter(record) is False - - -def test_filter_keeps_record_without_exception(): - assert shared._GrpcAioPollerNoiseFilter().filter(_record('some other error', None)) is True - - -def test_filter_keeps_blockingioerror_without_marker(): - record = _record('unrelated message', BlockingIOError(11, 'nope')) - assert shared._GrpcAioPollerNoiseFilter().filter(record) is True - - def test_get_grpc_aio_channel_installs_filter_on_asyncio_logger(): + """Filter behavior itself is covered in tests/common/test_logging.py.""" asyncio_logger = logging.getLogger('asyncio') - for existing in [ - f for f in asyncio_logger.filters if isinstance(f, shared._GrpcAioPollerNoiseFilter) - ]: + for existing in [f for f in asyncio_logger.filters if isinstance(f, GrpcAioPollerNoiseFilter)]: asyncio_logger.removeFilter(existing) with patch('dapr.ext.workflow._durabletask.aio.internal.shared.grpc_aio.insecure_channel'): shared.get_grpc_aio_channel(HOST_ADDRESS, False) - installed = [ - f for f in asyncio_logger.filters if isinstance(f, shared._GrpcAioPollerNoiseFilter) - ] + installed = [f for f in asyncio_logger.filters if isinstance(f, GrpcAioPollerNoiseFilter)] assert len(installed) == 1 # installed once, not duplicated - - -def test_install_is_idempotent(): - asyncio_logger = logging.getLogger('asyncio') - shared._silence_grpc_aio_poller_noise() - shared._silence_grpc_aio_poller_noise() - installed = [ - f for f in asyncio_logger.filters if isinstance(f, shared._GrpcAioPollerNoiseFilter) - ] - assert len(installed) == 1