diff --git a/src/src_py_lib/__init__.py b/src/src_py_lib/__init__.py index be840d9..342c3dd 100644 --- a/src/src_py_lib/__init__.py +++ b/src/src_py_lib/__init__.py @@ -59,6 +59,15 @@ from src_py_lib.utils.config import ( config_parse_args as parse_args, ) +from src_py_lib.utils.events import ( + CallbackEventSink, + CompositeEventSink, + EventRuntime, + EventSink, + InMemoryEventSink, + JSONLEventSink, + NullEventSink, +) from src_py_lib.utils.http import HTTPClient, HTTPClientError, HTTPResponse from src_py_lib.utils.json_cache import load_json_cache, load_json_subset, save_json_cache from src_py_lib.utils.json_types import ( @@ -71,17 +80,19 @@ json_strs, ) from src_py_lib.utils.logging import ( + EventBridgeHandler, LoggingConfig, LoggingSettings, - configure_logging, + cli_logging_handlers, + cli_run_context, critical, debug, error, info, log_context, log_event, - logging_context, logging_settings_from_config, + observability_context, resolve_log_level_name, span, stage, @@ -94,6 +105,7 @@ OpenTelemetryRuntime, OpenTelemetrySettings, OpenTelemetrySetupError, + OtelLogsSink, configure_open_telemetry, current_traceparent_header, open_telemetry_settings_from_config, @@ -111,13 +123,20 @@ def logging( open_telemetry: OpenTelemetrySettings | None = None, run_fields: Mapping[str, Any] | None = None, run_summary: Callable[[], Mapping[str, Any]] | None = None, + resource: Mapping[str, Any] | None = None, ) -> AbstractContextManager[Path | None]: - """Configure standard CLI logging and emit startup metadata.""" + """Configure standard CLI-mode logging for one run and emit startup metadata. + + CLI mode: installs terminal and event-bridge handlers on the configured + package loggers (never the root logger) and writes the JSONL event log. + Importable-module callers should use `observability_context()` instead, + which never touches stdlib logging handlers. + """ resolved_logging_config = logging_config if open_telemetry is not None: resolved_logging_config = logging_config or logging_settings_from_config(config) resolved_logging_config = LoggingSettings( - logger_name=resolved_logging_config.logger_name, + logger_names=resolved_logging_config.logger_names, terminal_level=resolved_logging_config.terminal_level, log_file_level=resolved_logging_config.log_file_level, log_file=resolved_logging_config.log_file, @@ -130,13 +149,14 @@ def logging( ), open_telemetry=open_telemetry, ) - return logging_context( + return cli_run_context( command or _script_name(), config, git_cwd=git_cwd, logging_config=resolved_logging_config, run_fields=run_fields, run_summary=run_summary, + resource=resource, ) @@ -145,8 +165,13 @@ def _script_name() -> str: __all__ = [ + "CallbackEventSink", + "CompositeEventSink", "Config", "ConfigError", + "EventBridgeHandler", + "EventRuntime", + "EventSink", "GraphQLError", "GraphQLClient", "GitHubClient", @@ -155,15 +180,19 @@ def _script_name() -> str: "HTTPClient", "HTTPClientError", "HTTPResponse", + "InMemoryEventSink", "JSONDict", + "JSONLEventSink", "LinearClient", "LinearClientConfig", "LoggingConfig", "LoggingSettings", + "NullEventSink", "OpenTelemetryConfig", "OpenTelemetryRuntime", "OpenTelemetrySettings", "OpenTelemetrySetupError", + "OtelLogsSink", "PullRequest", "SlackClient", "SlackClientConfig", @@ -178,9 +207,10 @@ def _script_name() -> str: "config_field", "config_field_names", "config_help_formatter", + "cli_logging_handlers", + "cli_run_context", "config_snapshot", "configure_open_telemetry", - "configure_logging", "critical", "current_traceparent_header", "debug", @@ -205,11 +235,11 @@ def _script_name() -> str: "load_json_cache", "load_json_subset", "logging", - "logging_context", "logging_settings_from_config", "log_event", "log_context", "normalize_sourcegraph_endpoint", + "observability_context", "open_telemetry_settings_from_config", "parse_args", "pr_ref_from_url", diff --git a/src/src_py_lib/utils/events.py b/src/src_py_lib/utils/events.py new file mode 100644 index 0000000..c808eea --- /dev/null +++ b/src/src_py_lib/utils/events.py @@ -0,0 +1,211 @@ +"""Structured wide events with explicit sinks, decoupled from stdlib logging. + +Events are plain dicts shaped after the OpenTelemetry Logs Data Model: +`time_unix_nano`, `severity_text`, `severity_number`, `event_name`, `body`, +`trace_id` / `span_id` / `parent_span_id`, and `attributes`. Run-level +metadata (`resource`) is stamped once per run on the run-start event, not +repeated on every line. + +Sinks receive every event as a Mapping. The active sink is ambient run +context, set only by `observability_context()` in `logging.py`; the default +is `NullEventSink`, so importing this library never writes anywhere. +""" + +from __future__ import annotations + +import contextlib +import json +import logging +import threading +from collections.abc import Callable, Mapping +from contextvars import ContextVar +from dataclasses import dataclass, field +from pathlib import Path +from types import TracebackType +from typing import Any, Final, Protocol, Self, cast + +# OpenTelemetry Logs Data Model field names, pinned here so an upstream +# rename is a one-file change. +TIME_UNIX_NANO: Final[str] = "time_unix_nano" +SEVERITY_TEXT: Final[str] = "severity_text" +SEVERITY_NUMBER: Final[str] = "severity_number" +EVENT_NAME: Final[str] = "event_name" +BODY: Final[str] = "body" +TRACE_ID: Final[str] = "trace_id" +SPAN_ID: Final[str] = "span_id" +PARENT_SPAN_ID: Final[str] = "parent_span_id" +ATTRIBUTES: Final[str] = "attributes" +RESOURCE: Final[str] = "resource" + +# OpenTelemetry semantic-convention attribute names used by this library. +ERROR_TYPE: Final[str] = "error.type" +SERVICE_NAME: Final[str] = "service.name" +SERVICE_VERSION: Final[str] = "service.version" +PROCESS_PID: Final[str] = "process.pid" +PROCESS_RUNTIME_NAME: Final[str] = "process.runtime.name" +PROCESS_RUNTIME_VERSION: Final[str] = "process.runtime.version" + +EVENT_FIELD_ORDER: Final[tuple[str, ...]] = ( + TIME_UNIX_NANO, + SEVERITY_TEXT, + SEVERITY_NUMBER, + EVENT_NAME, + TRACE_ID, + SPAN_ID, + PARENT_SPAN_ID, + BODY, + RESOURCE, + ATTRIBUTES, +) + +# Python logging level -> OTel severity number for the level's lower bound. +_SEVERITY_NUMBER_BY_LEVEL: Final[tuple[tuple[int, str, int], ...]] = ( + (logging.CRITICAL, "FATAL", 21), + (logging.ERROR, "ERROR", 17), + (logging.WARNING, "WARN", 13), + (logging.INFO, "INFO", 9), + (logging.DEBUG, "DEBUG", 5), + (0, "TRACE", 1), +) + + +def severity_fields(level: int) -> tuple[str, int]: + """Return OTel `(severity_text, severity_number)` for a Python log level.""" + for threshold, severity_text, severity_number in _SEVERITY_NUMBER_BY_LEVEL: + if level >= threshold: + return severity_text, severity_number + return "TRACE", 1 + + +def ordered_event_payload(payload: Mapping[str, Any]) -> dict[str, Any]: + """Return the payload with model fields first and attributes sorted.""" + ordered: dict[str, Any] = {} + for key in EVENT_FIELD_ORDER: + if key in payload: + ordered[key] = payload[key] + for key in sorted(key for key in payload if key not in ordered): + ordered[key] = payload[key] + attributes = ordered.get(ATTRIBUTES) + if isinstance(attributes, Mapping): + typed_attributes = cast(Mapping[str, Any], attributes) + ordered[ATTRIBUTES] = {key: typed_attributes[key] for key in sorted(typed_attributes)} + return ordered + + +class EventSink(Protocol): + """Receives structured wide events as plain mappings.""" + + def emit(self, event: Mapping[str, Any]) -> None: ... + + +class NullEventSink: + """Discard every event; the default sink outside any run context.""" + + def emit(self, event: Mapping[str, Any]) -> None: + return + + +class InMemoryEventSink: + """Collect events in memory for tests and module callers.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self.events: list[dict[str, Any]] = [] + + def emit(self, event: Mapping[str, Any]) -> None: + with self._lock: + self.events.append(dict(event)) + + +class CallbackEventSink: + """Pass a defensive copy of every event to a caller-supplied function.""" + + def __init__(self, callback: Callable[[dict[str, Any]], None]) -> None: + self._callback = callback + + def emit(self, event: Mapping[str, Any]) -> None: + self._callback(dict(event)) + + +class CompositeEventSink: + """Fan one event out to several sinks.""" + + def __init__(self, sinks: tuple[EventSink, ...]) -> None: + self.sinks = sinks + + def emit(self, event: Mapping[str, Any]) -> None: + for sink in self.sinks: + sink.emit(event) + + +class JSONLEventSink: + """Write each event as one JSON line; thread-safe; usable as a context manager.""" + + def __init__(self, path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + self.path = path + self._lock = threading.Lock() + self._file = path.open("w", encoding="utf-8", buffering=1) + + def emit(self, event: Mapping[str, Any]) -> None: + line = json.dumps(ordered_event_payload(event), default=str) + with self._lock, contextlib.suppress(ValueError): + self._file.write(line + "\n") + + def flush(self) -> None: + with self._lock, contextlib.suppress(ValueError): + self._file.flush() + + def close(self) -> None: + with self._lock, contextlib.suppress(Exception): + self._file.flush() + self._file.close() + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exception_type: type[BaseException] | None, + exception: BaseException | None, + traceback: TracebackType | None, + ) -> None: + self.close() + + +def flush_sink(sink: EventSink) -> None: + """Flush a sink if it supports flushing.""" + flush = getattr(sink, "flush", None) + if callable(flush): + flush() + + +@dataclass(frozen=True) +class EventRuntime: + """Ambient per-run observability scope: where events go and what rides along.""" + + run: str = "" + sink: EventSink = field(default_factory=NullEventSink) + min_level: int = logging.DEBUG + + +_EVENT_RUNTIME: ContextVar[EventRuntime | None] = ContextVar( + "src_py_lib_event_runtime", + default=None, +) +_DEFAULT_RUNTIME: Final[EventRuntime] = EventRuntime() + + +def current_event_runtime() -> EventRuntime: + """Return the active event runtime; outside a run this is a null runtime.""" + return _EVENT_RUNTIME.get() or _DEFAULT_RUNTIME + + +def set_event_runtime(runtime: EventRuntime) -> Any: + """Install an event runtime; returns the reset token.""" + return _EVENT_RUNTIME.set(runtime) + + +def reset_event_runtime(token: Any) -> None: + """Restore the previous event runtime.""" + _EVENT_RUNTIME.reset(token) diff --git a/src/src_py_lib/utils/logging.py b/src/src_py_lib/utils/logging.py index 453b520..323ab56 100644 --- a/src/src_py_lib/utils/logging.py +++ b/src/src_py_lib/utils/logging.py @@ -1,8 +1,17 @@ -"""Central structured logging for small CLIs and scripts. +"""Central structured logging and observability for CLIs and importable libraries. -Use `configure_logging()` once near process startup. Other modules should use -`logging.getLogger(__name__)` for human-readable operator messages and -`span()` / `log_event()` for structured JSONL events. +Three decoupled channels: + +- Human messages: plain `logging.getLogger(__name__)` on module loggers. + Library code never installs handlers; CLI entrypoints opt in via + `cli_logging_handlers()` (or the composed `cli_run_context()`). +- Structured wide events: `span()` / `log_event()` emit OTel-shaped dicts to + the active `EventSink` (see `events.py`). Outside a run context the sink is + null, so importing this library never writes anywhere. +- OpenTelemetry traces: opened by `span()` whenever a provider is configured. + +`observability_context()` owns one run: event runtime, optional OTel setup, +run start/end events, resource sampling, and flush ordering. """ from __future__ import annotations @@ -11,7 +20,6 @@ import contextlib import contextvars import datetime as _datetime -import json import logging import os import secrets @@ -19,7 +27,7 @@ import sys import threading import time -from collections.abc import Callable, Generator, Iterable, Mapping +from collections.abc import Callable, Generator, Iterable, Mapping, Sequence from concurrent.futures import Executor, Future from dataclasses import dataclass, field from pathlib import Path @@ -30,7 +38,7 @@ from pydantic import model_validator -from src_py_lib.utils import telemetry +from src_py_lib.utils import events, telemetry from src_py_lib.utils.config import Config, config_field, config_snapshot RUN: Final[str] = secrets.token_hex(4) @@ -52,26 +60,11 @@ "secret", "token", ) -LOG_FIELD_ORDER: Final[tuple[str, ...]] = ( - "ts", - "command", - "level", - "run", - "trace", - "span", - "parent_span", - "logger", - "event", - "phase", - "stage", - "message", -) +DEFAULT_LOGGER_NAMES: Final[tuple[str, ...]] = ("src_py_lib",) -_STRUCTURED_EVENT_ATTR: Final[str] = "_src_py_lib_structured_event" -_STRUCTURED_FIELDS_ATTR: Final[str] = "_src_py_lib_structured_fields" _HTTPCORE_RESPONSE_HEADERS_PREFIX: Final[str] = "receive_response_headers.complete return_value=" _HTTPX_REQUEST_PREFIX: Final[str] = "HTTP Request: " -_HTTP_DEPENDENCY_LOGGER_PREFIXES: Final[tuple[str, ...]] = ("httpx", "httpcore") +_HTTP_DEPENDENCY_LOGGER_NAMES: Final[tuple[str, ...]] = ("httpx", "httpcore") _CONTEXT: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar("src_py_lib_log_context") _PARENT_SPAN_CONTEXT: contextvars.ContextVar[str | None] = contextvars.ContextVar( "src_py_lib_parent_span_id", default=None @@ -80,9 +73,9 @@ @dataclass(frozen=True) class LoggingSettings: - """Logging destinations and levels.""" + """Logging destinations and levels for one CLI run.""" - logger_name: str = "" + logger_names: tuple[str, ...] = DEFAULT_LOGGER_NAMES terminal_level: str = "info" log_file_level: str | None = None log_file: Path | None = None @@ -174,7 +167,7 @@ def logging_settings_from_config( *, terminal_default: str = "INFO", log_file_default: str | None = DEFAULT_LOG_FILE_LEVEL, - logger_name: str = "", + logger_names: tuple[str, ...] = DEFAULT_LOGGER_NAMES, log_file: Path | None = None, logs_dir: Path | None = DEFAULT_LOGS_DIR, run: str = RUN, @@ -186,7 +179,7 @@ def logging_settings_from_config( """Return `LoggingSettings` using common CLI log-level alias.""" explicit_level = resolve_log_level_name(config) return LoggingSettings( - logger_name=logger_name, + logger_names=logger_names, terminal_level=explicit_level or terminal_default, log_file_level=explicit_level or log_file_default, log_file=log_file, @@ -201,16 +194,16 @@ def logging_settings_from_config( _HTTP_METRICS_LOCK: Final[threading.Lock] = threading.Lock() _HTTP_METRICS: dict[str, int] = { - "http_request_attempt_count": 0, - "http_request_bytes_total": 0, - "http_response_bytes_total": 0, - "http_retry_count": 0, - "http_2xx_count": 0, - "http_3xx_count": 0, - "http_4xx_count": 0, - "http_429_count": 0, - "http_5xx_count": 0, - "http_transport_error_count": 0, + "http.client.request.count": 0, + "http.client.request.body.size.total": 0, + "http.client.response.body.size.total": 0, + "http.client.retry.count": 0, + "http.client.response.2xx.count": 0, + "http.client.response.3xx.count": 0, + "http.client.response.4xx.count": 0, + "http.client.response.429.count": 0, + "http.client.response.5xx.count": 0, + "http.client.transport_error.count": 0, } @@ -312,116 +305,121 @@ def _sample_fields(self) -> dict[str, Any]: return fields -class _DropStructuredEvents(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - return not hasattr(record, _STRUCTURED_EVENT_ATTR) - +def _event_payload( + numeric_level: int, + event_name: str, + *, + attributes: dict[str, Any], + body: str | None = None, +) -> dict[str, Any]: + """Build one OTel-shaped wide event payload.""" + severity_text, severity_number = events.severity_fields(numeric_level) + runtime = events.current_event_runtime() + if runtime.run: + attributes.setdefault("run", runtime.run) + resource = attributes.pop(events.RESOURCE, None) + payload: dict[str, Any] = { + events.TIME_UNIX_NANO: time.time_ns(), + events.SEVERITY_TEXT: severity_text, + events.SEVERITY_NUMBER: severity_number, + events.EVENT_NAME: event_name, + **telemetry.current_trace_fields(_PARENT_SPAN_CONTEXT.get()), + events.ATTRIBUTES: attributes, + } + if resource is not None: + payload[events.RESOURCE] = resource + if body is not None: + payload[events.BODY] = body + return payload -class _DropHTTPDependencyLogs(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - return not any( - record.name == prefix or record.name.startswith(f"{prefix}.") - for prefix in _HTTP_DEPENDENCY_LOGGER_PREFIXES - ) +class EventBridgeHandler(logging.Handler): + """Forward human log records into the event sink as `event_name="log"` events. -class JSONLogFileHandler(logging.Handler): - """Write every log record as one JSON object line.""" + Carries the httpcore wire-debug mining and secret redaction from + `_structured_log_fields()`, plus exception tracebacks via `exc_info`. + Installed only by CLI entrypoints; never by library code. + """ - def __init__(self, path: Path, *, run: str, level: int) -> None: - super().__init__(level=level) - self.path = path - self._run = run - self._lock = threading.Lock() - self._file = path.open("w", encoding="utf-8", buffering=1) + def __init__(self, sink: events.EventSink, *, level: int | str = DEFAULT_LOG_FILE_LEVEL): + super().__init__(level=_log_level(level)) + self.sink = sink def emit(self, record: logging.LogRecord) -> None: try: - timestamp = _datetime.datetime.now(_datetime.UTC).isoformat(timespec="milliseconds") - structured_event = getattr(record, _STRUCTURED_EVENT_ATTR, None) - if isinstance(structured_event, str): - fields = getattr(record, _STRUCTURED_FIELDS_ATTR, {}) - structured_fields: dict[str, Any] = ( - dict(cast(Mapping[str, Any], fields)) if isinstance(fields, Mapping) else {} - ) - payload = { - "ts": timestamp, - "run": self._run, - "level": record.levelname, - "event": structured_event, - **structured_fields, - } - else: - message, log_fields = _structured_log_fields(record) - payload = { - "ts": timestamp, - "run": self._run, - "level": record.levelname, - "event": "log", - "logger": record.name, - "message": message, - } - payload.update(log_fields) - payload.update(_current_log_fields(payload)) - if record.exc_info: - payload["exc_info"] = self.format(record) - with self._lock: - self._file.write(json.dumps(_ordered_payload(payload), default=str) + "\n") + message, mined_fields = _structured_log_fields(record) + numeric_level = record.levelno + mined_level = mined_fields.pop("level", None) + if isinstance(mined_level, str): + numeric_level = _log_level(mined_level) + attributes: dict[str, Any] = { + **_CONTEXT.get({}), + "logger": record.name, + **mined_fields, + } + if record.exc_info: + attributes["exc_info"] = self.format(record) + self.sink.emit( + _event_payload(numeric_level, "log", attributes=attributes, body=message) + ) except Exception: self.handleError(record) - def close(self) -> None: - with contextlib.suppress(Exception), self._lock: - self._file.flush() - self._file.close() - super().close() +@contextlib.contextmanager +def cli_logging_handlers( + *, + sink: events.EventSink | None = None, + logger_names: Sequence[str] = DEFAULT_LOGGER_NAMES, + terminal_level: int | str = "info", + bridge_level: int | str = DEFAULT_LOG_FILE_LEVEL, + suppress_http_dependency_logs: bool = True, +) -> Generator[None]: + """Attach terminal (and optional bridge) handlers to the named loggers. -def configure_logging(config: LoggingSettings | None = None) -> Path | None: - """Configure terminal logging and optional JSON log-file logging. + Adds and removes only its own handlers, restores prior logger levels on + exit, and never touches the root logger or other handlers — safe to + compose with a host application's logging configuration. - Returns the JSON log-file path when file logging is enabled. + With `suppress_http_dependency_logs=False`, httpx/httpcore loggers are + bridged too, restoring wire-level debugging in the event stream. """ - config = config or LoggingSettings() - reset_observability_metrics() - terminal_level = _log_level(config.terminal_level) - log_file_level = _log_file_level(config.log_file_level) - log_file = config.log_file - if log_file is None and config.logs_dir is not None: - log_file = default_log_file(config.logs_dir, run=config.run) - root_or_package_logger = logging.getLogger(config.logger_name) - root_or_package_logger.handlers.clear() - root_or_package_logger.setLevel( - min( - terminal_level, - log_file_level if log_file else terminal_level, - ) + resolved_terminal_level = _log_level(terminal_level) + resolved_bridge_level = _log_level(bridge_level) + handler_level = ( + min(resolved_terminal_level, resolved_bridge_level) + if sink is not None + else resolved_terminal_level ) - root_or_package_logger.propagate = False terminal_handler = logging.StreamHandler() - terminal_handler.setLevel(terminal_level) + terminal_handler.setLevel(resolved_terminal_level) terminal_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) - terminal_handler.addFilter(_DropStructuredEvents()) - if config.suppress_http_dependency_logs and config.logger_name == "": - terminal_handler.addFilter(_DropHTTPDependencyLogs()) - root_or_package_logger.addHandler(terminal_handler) - - if log_file is None: - return None + bridge_handler = EventBridgeHandler(sink, level=resolved_bridge_level) if sink else None - log_file.parent.mkdir(parents=True, exist_ok=True) - _prune_old_log_files(log_file.parent, config.retain_log_files) - log_file_handler = JSONLogFileHandler( - log_file, - run=config.run, - level=log_file_level, - ) - if config.suppress_http_dependency_logs and config.logger_name == "": - log_file_handler.addFilter(_DropHTTPDependencyLogs()) - root_or_package_logger.addHandler(log_file_handler) - root_or_package_logger.info("Writing log events to %s.", log_file) - return log_file + names = tuple(logger_names) + if not suppress_http_dependency_logs: + names += _HTTP_DEPENDENCY_LOGGER_NAMES + previous_levels: dict[str, int] = {} + attached: list[tuple[logging.Logger, logging.Handler]] = [] + try: + for name in names: + logger = logging.getLogger(name) + previous_levels[name] = logger.level + if logger.level == logging.NOTSET or logger.level > handler_level: + logger.setLevel(handler_level) + if name not in _HTTP_DEPENDENCY_LOGGER_NAMES: + logger.addHandler(terminal_handler) + attached.append((logger, terminal_handler)) + if bridge_handler is not None: + logger.addHandler(bridge_handler) + attached.append((logger, bridge_handler)) + yield + finally: + for logger, handler in attached: + logger.removeHandler(handler) + for name, level in previous_levels.items(): + logging.getLogger(name).setLevel(level) def reset_observability_metrics() -> None: @@ -440,30 +438,30 @@ def record_http_attempt( ) -> None: """Record one HTTP attempt for the current run summary.""" with _HTTP_METRICS_LOCK: - _HTTP_METRICS["http_request_attempt_count"] += 1 - _HTTP_METRICS["http_request_bytes_total"] += request_bytes - _HTTP_METRICS["http_response_bytes_total"] += response_bytes + _HTTP_METRICS["http.client.request.count"] += 1 + _HTTP_METRICS["http.client.request.body.size.total"] += request_bytes + _HTTP_METRICS["http.client.response.body.size.total"] += response_bytes if transport_error: - _HTTP_METRICS["http_transport_error_count"] += 1 + _HTTP_METRICS["http.client.transport_error.count"] += 1 if status_code is None: return status_group = 5 if status_code >= 500 else status_code // 100 metric_name = { - 2: "http_2xx_count", - 3: "http_3xx_count", - 4: "http_4xx_count", - 5: "http_5xx_count", + 2: "http.client.response.2xx.count", + 3: "http.client.response.3xx.count", + 4: "http.client.response.4xx.count", + 5: "http.client.response.5xx.count", }.get(status_group) if metric_name is not None: _HTTP_METRICS[metric_name] += 1 if status_code == 429: - _HTTP_METRICS["http_429_count"] += 1 + _HTTP_METRICS["http.client.response.429.count"] += 1 def record_http_retry() -> None: """Record that an HTTP attempt will be retried.""" with _HTTP_METRICS_LOCK: - _HTTP_METRICS["http_retry_count"] += 1 + _HTTP_METRICS["http.client.retry.count"] += 1 def observability_summary() -> dict[str, Any]: @@ -472,25 +470,56 @@ def observability_summary() -> dict[str, Any]: return dict(_HTTP_METRICS) +def _run_resource_fields(extra: Mapping[str, Any] | None = None) -> dict[str, Any]: + """Return OTel resource attributes stamped once on the run-start event.""" + fields: dict[str, Any] = { + events.PROCESS_PID: os.getpid(), + events.PROCESS_RUNTIME_NAME: sys.implementation.name, + events.PROCESS_RUNTIME_VERSION: sys.version.split()[0], + } + fields.update(dict(extra or {})) + return fields + + @contextlib.contextmanager -def logging_context( +def observability_context( name: str, config: object | None = None, *, + sink: events.EventSink | None = None, + run: str = RUN, + min_level: int | str = DEFAULT_LOG_FILE_LEVEL, git_cwd: Path | str | None = None, - logging_config: LoggingSettings | None = None, run_fields: Mapping[str, Any] | None = None, run_summary: Callable[[], Mapping[str, Any]] | None = None, -) -> Generator[Path | None]: - """Configure logging, install command context, and emit startup metadata.""" - resolved_logging_config = logging_config or LoggingSettings( - log_file_level=_src_log_level_from_config(config) - ) + resource: Mapping[str, Any] | None = None, + open_telemetry: telemetry.OpenTelemetrySettings | None = None, + resource_sample_interval_seconds: float | None = None, + log_file: Path | None = None, +) -> Generator[None]: + """Own one run's observability without touching stdlib logging handlers. + + Installs the event runtime (sink, run id, level floor), configures + OpenTelemetry only when explicitly requested, emits run start/end and + startup events, runs the resource sampler, and tears down in order: + sampler -> run-end event -> sink flush -> OTel flush (owned providers + only). The run-end event is emitted even on exceptions and `SystemExit`. + """ + reset_observability_metrics() open_telemetry_runtime = telemetry.configure_open_telemetry( - resolved_logging_config.open_telemetry or telemetry.OpenTelemetrySettings() + open_telemetry or telemetry.OpenTelemetrySettings() + ) + runtime = events.EventRuntime( + run=run, + sink=sink or events.NullEventSink(), + min_level=_log_level(min_level), + ) + runtime_token = events.set_event_runtime(runtime) + sampler = ( + ResourceSampler(resource_sample_interval_seconds) + if resource_sample_interval_seconds is not None + else None ) - log_file = configure_logging(resolved_logging_config) - sampler = _resource_sampler(resolved_logging_config) started = time.perf_counter() error: BaseException | None = None try: @@ -500,17 +529,17 @@ def logging_context( ): if sampler is not None: sampler.start() - start_fields = {"phase": "start", **dict(run_fields or {})} - info("run", logger_name=resolved_logging_config.logger_name, **start_fields) + start_fields: dict[str, Any] = {"phase": "start", **dict(run_fields or {})} + start_fields[events.RESOURCE] = _run_resource_fields(resource) + debug("run", **start_fields) try: startup_event( command=name, config=config, log_file=log_file, git_cwd=_git_cwd_path(git_cwd), - logger_name=resolved_logging_config.logger_name, ) - yield log_file + yield except BaseException as exception: error = exception raise @@ -523,27 +552,82 @@ def logging_context( summary["exit_code"] = _run_exit_code(error) if run_summary is not None: summary.update(dict(run_summary())) - end_fields = { + end_fields: dict[str, Any] = { "phase": "end", "duration_ms": round((time.perf_counter() - started) * 1000.0), "status": "error" if error_type else "ok", - "error_type": error_type, + events.ERROR_TYPE: error_type, **dict(run_fields or {}), **summary, } telemetry.set_current_span_attributes(end_fields) if error_type: telemetry.mark_current_span_error(error_type) - log_event( - "error" if error_type else "info", - "run", - logger_name=resolved_logging_config.logger_name, - **end_fields, - ) + log_event("error" if error_type else "info", "run", **end_fields) finally: + if sink is not None: + events.flush_sink(sink) + events.reset_event_runtime(runtime_token) open_telemetry_runtime.force_flush() +@contextlib.contextmanager +def cli_run_context( + name: str, + config: object | None = None, + *, + git_cwd: Path | str | None = None, + logging_config: LoggingSettings | None = None, + run_fields: Mapping[str, Any] | None = None, + run_summary: Callable[[], Mapping[str, Any]] | None = None, + resource: Mapping[str, Any] | None = None, +) -> Generator[Path | None]: + """Compose CLI-mode logging for one run: JSONL sink + handlers + observability. + + Yields the JSON event-log path (or None when file logging is disabled). + Teardown order: run-end event, sink flush, OTel flush, handler removal, + sink close. + """ + settings = logging_config or LoggingSettings(log_file_level=_src_log_level_from_config(config)) + log_file = settings.log_file + if log_file is None and settings.logs_dir is not None: + log_file = default_log_file(settings.logs_dir, run=settings.run) + with contextlib.ExitStack() as stack: + sink: events.JSONLEventSink | None = None + if log_file is not None: + log_file.parent.mkdir(parents=True, exist_ok=True) + _prune_old_log_files(log_file.parent, settings.retain_log_files) + sink = stack.enter_context(events.JSONLEventSink(log_file)) + stack.enter_context( + cli_logging_handlers( + sink=sink, + logger_names=settings.logger_names, + terminal_level=settings.terminal_level, + bridge_level=_log_file_level(settings.log_file_level), + suppress_http_dependency_logs=settings.suppress_http_dependency_logs, + ) + ) + if log_file is not None: + logging.getLogger(__name__).info("Writing log events to %s.", log_file) + stack.enter_context( + observability_context( + name, + config, + sink=sink, + run=settings.run, + min_level=_log_file_level(settings.log_file_level), + git_cwd=git_cwd, + run_fields=run_fields, + run_summary=run_summary, + resource=resource, + open_telemetry=settings.open_telemetry, + resource_sample_interval_seconds=settings.resource_sample_interval_seconds, + log_file=log_file, + ) + ) + yield log_file + + def default_log_file(logs_dir: Path = DEFAULT_LOGS_DIR, *, run: str = RUN) -> Path: """Return a timestamped log-file path under `logs_dir`.""" timestamp = _datetime.datetime.now(_datetime.UTC).strftime("%Y-%m-%d-%H-%M-%S-%z") @@ -552,21 +636,19 @@ def default_log_file(logs_dir: Path = DEFAULT_LOGS_DIR, *, run: str = RUN) -> Pa def log_event(level: str, key: str, *, logger_name: str = "", **fields: Any) -> None: - """Log one structured event through the configured logger.""" + """Emit one structured wide event to the active sink and the current span. + + `logger_name` is accepted for signature compatibility; events no longer + ride stdlib logging, so it is ignored. + """ + del logger_name numeric_level = _log_level(level) - logger = logging.getLogger(logger_name) - if not logger.isEnabledFor(numeric_level): - return telemetry.add_span_event(key, {"level": logging.getLevelName(numeric_level), **fields}) - logger.log( - numeric_level, - "event=%s", - key, - extra={ - _STRUCTURED_EVENT_ATTR: key, - _STRUCTURED_FIELDS_ATTR: {**_current_log_fields(), **fields}, - }, - ) + runtime = events.current_event_runtime() + if numeric_level < runtime.min_level: + return + attributes = {**_CONTEXT.get({}), **fields} + runtime.sink.emit(_event_payload(numeric_level, key, attributes=attributes)) def debug(key: str, *, logger_name: str = "", **fields: Any) -> None: @@ -621,12 +703,17 @@ def span( logger_name: str = "", **fields: Any, ) -> Generator[dict[str, Any]]: - """Open an observed span and emit start/end structured log events.""" + """Open an observed span; the span-end event is the canonical wide event. + + Attributes accumulated onto the yielded dict during the work ride the end + event alongside duration and status. The start event is demoted to debug + (wide-event discipline: one informative event per unit of work). + """ parent_span_id = telemetry.current_span_id() parent_reset_token = _PARENT_SPAN_CONTEXT.set(parent_span_id) try: with telemetry.open_telemetry_span(key, fields): - log_event(start_level or level, key, logger_name=logger_name, phase="start", **fields) + log_event(start_level or "debug", key, logger_name=logger_name, phase="start", **fields) started = time.perf_counter() extra: dict[str, Any] = {} error: BaseException | None = None @@ -644,11 +731,11 @@ def span( } if error: end_fields["status"] = "error" - end_fields["error_type"] = type(error).__name__ + end_fields[events.ERROR_TYPE] = type(error).__name__ telemetry.mark_current_span_error(type(error).__name__) elif not omit_success_status: end_fields["status"] = "ok" - end_fields["error_type"] = None + end_fields[events.ERROR_TYPE] = None telemetry.set_current_span_attributes(end_fields) log_event( "error" if error else level, @@ -698,16 +785,6 @@ def sanitized_config_snapshot(config: object) -> dict[str, Any]: return snapshot -def _current_log_fields(protected: Mapping[str, Any] | None = None) -> dict[str, Any]: - protected_keys = set(protected or {}) - fields = {key: value for key, value in _CONTEXT.get({}).items() if key not in protected_keys} - trace_fields = telemetry.current_trace_fields(_PARENT_SPAN_CONTEXT.get()) - for key, value in trace_fields.items(): - if key not in protected_keys: - fields[key] = value - return fields - - def startup_event( *, command: str, @@ -750,16 +827,6 @@ def git_short_hash(cwd: Path | None = None) -> str | None: return commit if result.returncode == 0 and commit else None -def _ordered_payload(payload: Mapping[str, Any]) -> dict[str, Any]: - ordered: dict[str, Any] = {} - for key in LOG_FIELD_ORDER: - if key in payload: - ordered[key] = payload[key] - for key in sorted(key for key in payload if key not in ordered): - ordered[key] = payload[key] - return ordered - - def _log_file_level(configured_level: str | None) -> int: if configured_level is not None: return _log_level(configured_level) @@ -881,11 +948,6 @@ def _secret_state(value: object) -> str: return "reference" if isinstance(value, str) and value.startswith("op://") else "provided" -def _resource_sampler(config: LoggingSettings) -> ResourceSampler | None: - interval_seconds = config.resource_sample_interval_seconds - return ResourceSampler(interval_seconds) if interval_seconds is not None else None - - def _run_error_type(exception: BaseException | None) -> str | None: if exception is None: return None diff --git a/src/src_py_lib/utils/telemetry.py b/src/src_py_lib/utils/telemetry.py index 40f2092..470a527 100644 --- a/src/src_py_lib/utils/telemetry.py +++ b/src/src_py_lib/utils/telemetry.py @@ -117,10 +117,11 @@ class OpenTelemetryRuntime: exporting: bool tracer_provider: object | None = None meter_provider: object | None = None + logger_provider: object | None = None def force_flush(self, timeout_millis: int = 30_000) -> None: """Flush configured providers if they expose a force_flush method.""" - for provider in (self.tracer_provider, self.meter_provider): + for provider in (self.tracer_provider, self.meter_provider, self.logger_provider): force_flush = getattr(provider, "force_flush", None) if callable(force_flush): force_flush(timeout_millis=timeout_millis) @@ -170,11 +171,13 @@ def configure_open_telemetry(settings: OpenTelemetrySettings) -> OpenTelemetryRu resource = _resource(settings) tracer_provider = _configure_traces(settings, resource) meter_provider = _configure_metrics(settings, resource) + logger_provider = _configure_logs(settings, resource) if settings.enabled else None return OpenTelemetryRuntime( configured=True, exporting=settings.enabled, tracer_provider=tracer_provider, meter_provider=meter_provider, + logger_provider=logger_provider, ) @@ -217,16 +220,16 @@ def open_telemetry_span(name: str, fields: Mapping[str, object] | None = None) - def current_trace_fields(parent_span_id: str | None = None) -> dict[str, str]: - """Return log-friendly trace/span identifiers for the active span.""" + """Return OTel-named trace/span identifiers for the active span.""" span_context = trace.get_current_span().get_span_context() if not span_context.is_valid: return {} fields = { - "trace": format_trace_id(span_context.trace_id), - "span": format_span_id(span_context.span_id), + "trace_id": format_trace_id(span_context.trace_id), + "span_id": format_span_id(span_context.span_id), } if parent_span_id: - fields["parent_span"] = parent_span_id + fields["parent_span_id"] = parent_span_id return fields @@ -331,6 +334,65 @@ def _configure_traces(settings: OpenTelemetrySettings, resource: object) -> obje return tracer_provider +def _configure_logs(settings: OpenTelemetrySettings, resource: object) -> object: + """Configure an OTLP logs provider; respects a host-configured provider.""" + api_logs = importlib.import_module("opentelemetry._logs") + provider = api_logs.get_logger_provider() + if not _is_default_provider(provider): + return provider + + logger_provider_class = _required_symbol("opentelemetry.sdk._logs", "LoggerProvider") + exporter_class = _required_symbol( + "opentelemetry.exporter.otlp.proto.http._log_exporter", + "OTLPLogExporter", + ) + processor_class = _required_symbol( + "opentelemetry.sdk._logs.export", + "BatchLogRecordProcessor", + ) + exporter = exporter_class( + endpoint=settings.exporter_otlp_endpoint, + headers=_headers(settings.exporter_otlp_headers), + ) + logger_provider = logger_provider_class(resource=resource) + logger_provider.add_log_record_processor(processor_class(exporter)) + api_logs.set_logger_provider(logger_provider) + return logger_provider + + +class OtelLogsSink: + """Emit wide events through the OpenTelemetry Logs API as standard log records. + + Pairs with an OTLP logs provider configured by `configure_open_telemetry` + (or by the host application). Event payloads keep their OTel Logs Data + Model field names, so the mapping is one-to-one. + """ + + def __init__(self, scope_name: str = _TRACER_NAME) -> None: + api_logs = importlib.import_module("opentelemetry._logs") + self._severity_number_class = api_logs.SeverityNumber + self._logger = api_logs.get_logger(scope_name) + + def emit(self, event: Mapping[str, Any]) -> None: + attributes = event.get("attributes") + attribute_fields: Mapping[str, object] = ( + cast(Mapping[str, object], attributes) if isinstance(attributes, Mapping) else {} + ) + severity_number = event.get("severity_number") + self._logger.emit( + timestamp=cast(int | None, event.get("time_unix_nano")), + severity_number=( + self._severity_number_class(severity_number) + if isinstance(severity_number, int) + else None + ), + severity_text=cast(str | None, event.get("severity_text")), + body=cast(str | None, event.get("body")), + attributes=span_attributes(attribute_fields), + event_name=cast(str | None, event.get("event_name")), + ) + + def _configure_metrics(settings: OpenTelemetrySettings, resource: object) -> object: provider = metrics.get_meter_provider() if not _is_default_provider(provider): diff --git a/tests/test_import.py b/tests/test_import.py index 0fd1ebd..4d4684b 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -14,15 +14,27 @@ def test_package_imports(self) -> None: self.assertIsNotNone(src_py_lib) def test_root_public_api_exports_common_entrypoints(self) -> None: + self.assertIsNotNone(src_py_lib.CallbackEventSink) + self.assertIsNotNone(src_py_lib.CompositeEventSink) + self.assertIsNotNone(src_py_lib.EventBridgeHandler) + self.assertIsNotNone(src_py_lib.EventRuntime) + self.assertIsNotNone(src_py_lib.EventSink) self.assertIsNotNone(src_py_lib.GitHubClient) self.assertIsNotNone(src_py_lib.GraphQLClient) self.assertIsNotNone(src_py_lib.HTTPClient) + self.assertIsNotNone(src_py_lib.InMemoryEventSink) self.assertIsNotNone(src_py_lib.JSONDict) + self.assertIsNotNone(src_py_lib.JSONLEventSink) self.assertIsNotNone(src_py_lib.LinearClientConfig) self.assertIsNotNone(src_py_lib.LoggingConfig) self.assertIsNotNone(src_py_lib.LoggingSettings) + self.assertIsNotNone(src_py_lib.NullEventSink) self.assertIsNotNone(src_py_lib.OpenTelemetryConfig) self.assertIsNotNone(src_py_lib.OpenTelemetrySettings) + self.assertIsNotNone(src_py_lib.OtelLogsSink) + self.assertIsNotNone(src_py_lib.cli_logging_handlers) + self.assertIsNotNone(src_py_lib.cli_run_context) + self.assertIsNotNone(src_py_lib.observability_context) self.assertIsNotNone(src_py_lib.resolve_log_level_name) self.assertIsNotNone(src_py_lib.SlackClient) self.assertIsNotNone(src_py_lib.SlackPacer) diff --git a/tests/test_logging_http_clients.py b/tests/test_logging_http_clients.py index f7eee2b..2d26b91 100644 --- a/tests/test_logging_http_clients.py +++ b/tests/test_logging_http_clients.py @@ -6,17 +6,25 @@ import io import json import logging +import os import subprocess +import sys import tempfile import threading import unittest from collections.abc import Mapping -from contextlib import redirect_stderr, redirect_stdout +from contextlib import chdir, redirect_stderr, redirect_stdout from pathlib import Path from typing import Any from unittest.mock import patch import httpx +from opentelemetry import trace +from opentelemetry._logs import set_logger_provider +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import InMemoryLogRecordExporter, SimpleLogRecordProcessor +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter import src_py_lib as src from src_py_lib.clients.github import GitHubClient, graphql_api_url, pr_ref_from_url @@ -59,25 +67,39 @@ load_config_from_args, resolve_config_refs, ) +from src_py_lib.utils.events import ( + EVENT_FIELD_ORDER, + CallbackEventSink, + CompositeEventSink, + InMemoryEventSink, + JSONLEventSink, + NullEventSink, + current_event_runtime, + ordered_event_payload, + severity_fields, +) from src_py_lib.utils.http import HTTPClient, HTTPClientError, HTTPResponse from src_py_lib.utils.json_types import JSONDict, json_dict, json_list from src_py_lib.utils.logging import ( + EventBridgeHandler, LoggingConfig, LoggingSettings, - configure_logging, + cli_logging_handlers, critical, debug, default_log_file, error, info, - log_context, log_event, logging_settings_from_config, + observability_context, resolve_log_level_name, span, + stage, startup_event, warning, ) +from src_py_lib.utils.telemetry import OtelLogsSink class RecordingHTTP(HTTPClient): @@ -111,6 +133,29 @@ def json( return {"data": {"viewer": {"username": "alice"}}} +class RecordingLogHandler(logging.Handler): + """Stdlib logging handler that collects records for isolation assertions.""" + + def __init__(self) -> None: + super().__init__() + self.records: list[logging.LogRecord] = [] + + def emit(self, record: logging.LogRecord) -> None: + self.records.append(record) + + +def events_named(events: list[dict[str, Any]], event_name: str) -> list[dict[str, Any]]: + """Return the structured events with the given event name.""" + return [event for event in events if event["event_name"] == event_name] + + +def phase_event(events: list[dict[str, Any]], event_name: str, phase: str) -> dict[str, Any]: + """Return the first event with the given name and `phase` attribute.""" + return next( + event for event in events_named(events, event_name) if event["attributes"]["phase"] == phase + ) + + class FakeOnePasswordClient(OnePasswordClient): """1Password test double that avoids shelling out.""" @@ -849,6 +894,88 @@ def execute(query: str) -> JSONDict: self.assertEqual(json.loads(output_file.read_text(encoding="utf-8")), schema) +class EventSinkTest(unittest.TestCase): + def test_severity_fields_maps_python_levels_to_otel_pairs(self) -> None: + self.assertEqual(severity_fields(logging.DEBUG), ("DEBUG", 5)) + self.assertEqual(severity_fields(logging.INFO), ("INFO", 9)) + self.assertEqual(severity_fields(logging.WARNING), ("WARN", 13)) + self.assertEqual(severity_fields(logging.ERROR), ("ERROR", 17)) + self.assertEqual(severity_fields(logging.CRITICAL), ("FATAL", 21)) + self.assertEqual(severity_fields(35), ("WARN", 13)) + self.assertEqual(severity_fields(15), ("DEBUG", 5)) + self.assertEqual(severity_fields(1), ("TRACE", 1)) + + def test_ordered_event_payload_puts_model_fields_first_and_sorts_attributes(self) -> None: + payload: dict[str, Any] = { + "attributes": {"zulu": 1, "alpha": 2}, + "custom_extra": True, + "event_name": "example", + "severity_number": 9, + "severity_text": "INFO", + "time_unix_nano": 123, + } + + ordered = ordered_event_payload(payload) + + self.assertEqual( + list(ordered), + [ + "time_unix_nano", + "severity_text", + "severity_number", + "event_name", + "attributes", + "custom_extra", + ], + ) + self.assertEqual(list(ordered["attributes"]), ["alpha", "zulu"]) + + def test_jsonl_event_sink_is_thread_safe_and_ignores_emit_after_close(self) -> None: + with tempfile.TemporaryDirectory() as directory: + path = Path(directory) / "nested" / "events.json" + sink = JSONLEventSink(path) + + def emit_batch(worker: int) -> None: + for index in range(50): + sink.emit( + { + "event_name": f"event_{worker}_{index}", + "attributes": {"worker": worker}, + } + ) + + workers = [threading.Thread(target=emit_batch, args=(worker,)) for worker in range(8)] + for worker_thread in workers: + worker_thread.start() + for worker_thread in workers: + worker_thread.join() + sink.close() + sink.emit({"event_name": "after_close", "attributes": {}}) + + rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()] + self.assertEqual(len(rows), 400) + self.assertEqual(events_named(rows, "after_close"), []) + + def test_composite_sink_fans_out_and_sinks_receive_copies(self) -> None: + received: list[dict[str, Any]] = [] + + def mutating_callback(event: dict[str, Any]) -> None: + received.append(event) + event["mutated"] = True + + memory = InMemoryEventSink() + composite = CompositeEventSink((CallbackEventSink(mutating_callback), memory)) + original: dict[str, Any] = {"event_name": "example", "attributes": {"answer": 42}} + + composite.emit(original) + + self.assertEqual(received[0]["event_name"], "example") + self.assertNotIn("mutated", original) + self.assertNotIn("mutated", memory.events[0]) + memory.events[0]["stored_mutation"] = True + self.assertNotIn("stored_mutation", original) + + class LoggingTest(unittest.TestCase): def test_default_log_file_uses_dashed_timestamp_offset_and_run(self) -> None: path = default_log_file(Path("logs"), run="1ea51330") @@ -859,276 +986,305 @@ def test_default_log_file_uses_dashed_timestamp_offset_and_run(self) -> None: r"^\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-\d{4}-1ea51330\.json$", ) - def test_configure_logging_defaults_log_file_under_logs_dir(self) -> None: - with tempfile.TemporaryDirectory() as directory: - logs_dir = Path(directory) / "logs" - logger_name = "src_py_lib_test_default_logs_dir" - log_file = configure_logging( - LoggingSettings( - logger_name=logger_name, - terminal_level="critical", - logs_dir=logs_dir, - run="test-run", - ) - ) - try: - info("default_log_path", logger_name=logger_name) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() + def test_span_and_log_event_default_to_null_sink_outside_runs(self) -> None: + self.assertIsInstance(current_event_runtime().sink, NullEventSink) + + with span("no_context_span") as extra: + extra["answer"] = 42 + log_event("info", "no_context_event", logger_name="ignored", answer=42) + + def test_observability_context_leaves_root_logger_and_files_untouched(self) -> None: + root_logger = logging.getLogger() + handlers_before = list(root_logger.handlers) + level_before = root_logger.level + sink = InMemoryEventSink() + + with tempfile.TemporaryDirectory() as directory, chdir(directory): + with observability_context("guest-test", sink=sink, run="test-run"): + self.assertEqual(list(root_logger.handlers), handlers_before) + self.assertEqual(root_logger.level, level_before) + with span("guest_span"): + info("guest_event", answer=42) + self.assertEqual(list(Path(directory).iterdir()), []) + + self.assertEqual(list(root_logger.handlers), handlers_before) + self.assertEqual(root_logger.level, level_before) + self.assertEqual(len(events_named(sink.events, "guest_event")), 1) + + def test_observability_context_emits_run_startup_and_run_end_events(self) -> None: + sink = InMemoryEventSink() + config = ExampleConfig(token="secret-token") + + with observability_context( + "unit-test", + config, + sink=sink, + run="test-run", + run_fields={"endpoint": "https://example.com"}, + resource={"service.name": "unit-test-service"}, + ): + info("inside_command", answer=42) + + run_start = phase_event(sink.events, "run", "start") + self.assertEqual(run_start["severity_text"], "DEBUG") + self.assertEqual(run_start["severity_number"], 5) + self.assertEqual(run_start["resource"]["process.pid"], os.getpid()) + self.assertEqual(run_start["resource"]["process.runtime.name"], sys.implementation.name) + self.assertEqual(run_start["resource"]["process.runtime.version"], sys.version.split()[0]) + self.assertEqual(run_start["resource"]["service.name"], "unit-test-service") + + startup = events_named(sink.events, "startup")[0] + self.assertEqual(startup["severity_text"], "INFO") + self.assertNotIn("resource", startup) + self.assertEqual(startup["attributes"]["command"], "unit-test") + self.assertEqual(startup["attributes"]["run"], "test-run") + self.assertEqual(startup["attributes"]["config"]["EXAMPLE_TOKEN"], "provided") + + inside = events_named(sink.events, "inside_command")[0] + self.assertEqual(inside["attributes"]["command"], "unit-test") + self.assertEqual(inside["attributes"]["answer"], 42) + self.assertIsInstance(inside["time_unix_nano"], int) + + run_end = phase_event(sink.events, "run", "end") + self.assertEqual(run_end["severity_text"], "INFO") + end_attributes = run_end["attributes"] + self.assertEqual(end_attributes["status"], "ok") + self.assertIsNone(end_attributes["error.type"]) + self.assertEqual(end_attributes["exit_code"], 0) + self.assertEqual(end_attributes["endpoint"], "https://example.com") + self.assertGreaterEqual(end_attributes["duration_ms"], 0) + self.assertEqual(end_attributes["http.client.request.count"], 0) + self.assertEqual(end_attributes["http.client.retry.count"], 0) + self.assertEqual(end_attributes["http.client.transport_error.count"], 0) + + def test_observability_context_records_system_exit_semantics(self) -> None: + clean_sink = InMemoryEventSink() + with ( + self.assertRaises(SystemExit), + observability_context("unit-test", sink=clean_sink, run="test-run"), + ): + raise SystemExit(0) + + clean_end = phase_event(clean_sink.events, "run", "end") + self.assertEqual(clean_end["severity_text"], "INFO") + self.assertEqual(clean_end["attributes"]["status"], "ok") + self.assertIsNone(clean_end["attributes"]["error.type"]) + self.assertEqual(clean_end["attributes"]["exit_code"], 0) + + failing_sink = InMemoryEventSink() + with ( + self.assertRaises(SystemExit), + observability_context("unit-test", sink=failing_sink, run="test-run"), + ): + raise SystemExit(3) + + failing_end = phase_event(failing_sink.events, "run", "end") + self.assertEqual(failing_end["severity_text"], "ERROR") + self.assertEqual(failing_end["attributes"]["status"], "error") + self.assertEqual(failing_end["attributes"]["error.type"], "SystemExit") + self.assertEqual(failing_end["attributes"]["exit_code"], 3) + + def test_observability_context_min_level_suppresses_debug_events(self) -> None: + sink = InMemoryEventSink() + + with observability_context("unit-test", sink=sink, run="test-run", min_level="info"): + debug("hidden_event") + info("visible_event") + + names = [event["event_name"] for event in sink.events] + self.assertNotIn("hidden_event", names) + self.assertIn("visible_event", names) + self.assertIn("startup", names) + run_phases = [event["attributes"]["phase"] for event in events_named(sink.events, "run")] + self.assertEqual(run_phases, ["end"]) + + def test_observability_context_resource_sampler_emits_samples_and_summary(self) -> None: + sink = InMemoryEventSink() + + with observability_context( + "unit-test", sink=sink, run="test-run", resource_sample_interval_seconds=3600 + ): + pass + + samples = events_named(sink.events, "resource_sample") + self.assertGreaterEqual(len(samples), 2) + for sample in samples: + self.assertEqual(sample["severity_text"], "DEBUG") + self.assertIn("num_threads", sample["attributes"]) + self.assertIn("rss_mb", sample["attributes"]) + + run_end = phase_event(sink.events, "run", "end") + self.assertIn("peak_rss_mb", run_end["attributes"]) + self.assertIn("cpu_user_seconds", run_end["attributes"]) + self.assertIn("cpu_system_seconds", run_end["attributes"]) + self.assertIn("cpu_count_logical", run_end["attributes"]) + + def test_observability_context_resource_sampler_interval_zero_summarizes_only(self) -> None: + sink = InMemoryEventSink() + + with observability_context( + "unit-test", sink=sink, run="test-run", resource_sample_interval_seconds=0 + ): + pass + + self.assertEqual(events_named(sink.events, "resource_sample"), []) + run_end = phase_event(sink.events, "run", "end") + self.assertIn("peak_rss_mb", run_end["attributes"]) + self.assertIn("cpu_count_logical", run_end["attributes"]) + + def test_log_event_helpers_map_string_levels_to_otel_severity(self) -> None: + sink = InMemoryEventSink() + + with observability_context("unit-test", sink=sink, run="test-run"): + log_event("bogus", "fallback_info", logger_name="ignored") + warning("warning_event") + error("error_event") + critical("critical_event") + + severities = { + event["event_name"]: (event["severity_text"], event["severity_number"]) + for event in sink.events + } + self.assertEqual(severities["fallback_info"], ("INFO", 9)) + self.assertEqual(severities["warning_event"], ("WARN", 13)) + self.assertEqual(severities["error_event"], ("ERROR", 17)) + self.assertEqual(severities["critical_event"], ("FATAL", 21)) + + def test_stage_adds_attributes_to_nested_events(self) -> None: + sink = InMemoryEventSink() + + with ( + observability_context("unit-test", sink=sink, run="test-run"), + stage("sync", tenant="acme"), + ): + info("staged_event") + + staged = events_named(sink.events, "staged_event")[0] + self.assertEqual(staged["attributes"]["stage"], "sync") + self.assertEqual(staged["attributes"]["tenant"], "acme") + self.assertEqual(staged["attributes"]["command"], "unit-test") + + def test_startup_event_uses_explicit_git_commit(self) -> None: + sink = InMemoryEventSink() + + with observability_context("unit-test", sink=sink, run="test-run"): + startup_event(command="manual-startup", git_commit="abc1234") + + manual = next( + event + for event in events_named(sink.events, "startup") + if event["attributes"]["command"] == "manual-startup" + ) + self.assertEqual(manual["attributes"]["git_commit"], "abc1234") + self.assertIsNone(manual["attributes"]["log_file"]) + + def test_span_emits_debug_start_and_wide_end_events(self) -> None: + sink = InMemoryEventSink() + + with ( + observability_context("unit-test", sink=sink, run="test-run"), + span("work_unit", items=3) as extra, + ): + extra["processed"] = 2 + + start = phase_event(sink.events, "work_unit", "start") + self.assertEqual(start["severity_text"], "DEBUG") + self.assertEqual(start["attributes"]["items"], 3) + + end = phase_event(sink.events, "work_unit", "end") + self.assertEqual(end["severity_text"], "INFO") + end_attributes = end["attributes"] + self.assertEqual(end_attributes["items"], 3) + self.assertEqual(end_attributes["processed"], 2) + self.assertEqual(end_attributes["status"], "ok") + self.assertIsNone(end_attributes["error.type"]) + self.assertGreaterEqual(end_attributes["duration_ms"], 0) + + def test_span_records_error_status_and_error_type(self) -> None: + sink = InMemoryEventSink() + + with ( + observability_context("unit-test", sink=sink, run="test-run"), + self.assertRaisesRegex(ValueError, "boom"), + span("failing_unit"), + ): + raise ValueError("boom") + + end = phase_event(sink.events, "failing_unit", "end") + self.assertEqual(end["severity_text"], "ERROR") + self.assertEqual(end["attributes"]["status"], "error") + self.assertEqual(end["attributes"]["error.type"], "ValueError") - if log_file is None: - self.fail("configure_logging did not return a default log file") - self.assertEqual(log_file.parent, logs_dir) - self.assertRegex( - log_file.name, - r"^\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-\d{4}-test-run\.json$", - ) - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - self.assertTrue(any(row.get("event") == "default_log_path" for row in rows)) + def test_span_can_lower_start_level_and_omit_success_status(self) -> None: + sink = InMemoryEventSink() + + with ( + observability_context("unit-test", sink=sink, run="test-run", min_level="info"), + span( + "quiet_start", + level="info", + start_level="debug", + omit_success_status=True, + ), + ): + pass - def test_src_log_level_env_controls_log_file_level(self) -> None: - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "src_py_lib_test_log_level" - with patch.dict("os.environ", {"SRC_LOG_LEVEL": "INFO"}): - configure_logging( - LoggingSettings( - logger_name=logger_name, - terminal_level="critical", - log_file=log_file, - run="test-run", - ) - ) - try: - debug("debug_event", logger_name=logger_name) - info("info_event", logger_name=logger_name) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - events = [row["event"] for row in rows] - self.assertNotIn("debug_event", events) - self.assertIn("info_event", events) - - def test_log_and_level_helpers_use_string_levels(self) -> None: - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "src_py_lib_test_string_levels" - configure_logging( - LoggingSettings( - logger_name=logger_name, - terminal_level="critical", - log_file_level="debug", - log_file=log_file, - run="test-run", - ) - ) - try: - log_event("bogus", "fallback_info", logger_name=logger_name) - warning("warning_event", logger_name=logger_name) - error("error_event", logger_name=logger_name) - critical("critical_event", logger_name=logger_name) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - levels = {row["event"]: row["level"] for row in rows} - self.assertEqual(levels["fallback_info"], "INFO") - self.assertEqual(levels["warning_event"], "WARNING") - self.assertEqual(levels["error_event"], "ERROR") - self.assertEqual(levels["critical_event"], "CRITICAL") - - def test_logging_configures_logging_context_and_startup(self) -> None: - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "src_py_lib_test_logging_context" - config = ExampleConfig(token="secret-token") - try: - with src.logging( - config, - command="unit-test", - git_cwd=__file__, - logging_config=LoggingSettings( - logger_name=logger_name, - terminal_level="critical", - log_file=log_file, - run="test-run", - ), - ) as context_log_file: - self.assertEqual(context_log_file, log_file) - info("inside_command", logger_name=logger_name) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - startup = next(row for row in rows if row["event"] == "startup") - inside = next(row for row in rows if row["event"] == "inside_command") - self.assertEqual(startup["command"], "unit-test") - self.assertEqual(startup["config"]["EXAMPLE_TOKEN"], "provided") - self.assertEqual(inside["command"], "unit-test") - - def test_structured_log_file_includes_context_and_sanitized_terminal_omits_event( - self, - ) -> None: - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "src_py_lib_test_logging" - configure_logging( - LoggingSettings( - logger_name=logger_name, - terminal_level="info", - log_file=log_file, - run="test-run", - ) - ) - try: - startup_event( - command="unit-test", - logger_name=logger_name, - git_commit="abc1234", - ) - with log_context(command="unit-test"): - info("example", logger_name=logger_name, answer=42) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - startup = next(row for row in rows if row["event"] == "startup") - self.assertEqual(startup["git_commit"], "abc1234") - self.assertFalse(any("git_commit" in row for row in rows if row["event"] != "startup")) - self.assertEqual( - list(rows[0]), - ["ts", "level", "run", "logger", "event", "message"], - ) - self.assertEqual( - rows[0]["message"], - f"Writing log events to {log_file}.", - ) - self.assertEqual( - list(startup), - [ - "ts", - "command", - "level", - "run", - "event", - "git_commit", - "log_file", - ], - ) - self.assertEqual( - list(rows[-1]), - ["ts", "command", "level", "run", "event", "answer"], - ) - self.assertEqual(rows[-1]["event"], "example") - self.assertEqual(rows[-1]["run"], "test-run") - self.assertEqual(rows[-1]["command"], "unit-test") - self.assertEqual(rows[-1]["answer"], 42) + quiet_events = events_named(sink.events, "quiet_start") + self.assertEqual(len(quiet_events), 1) + attributes = quiet_events[0]["attributes"] + self.assertEqual(attributes["phase"], "end") + self.assertNotIn("status", attributes) + self.assertNotIn("error.type", attributes) def test_span_context_adds_trace_and_span_fields(self) -> None: src.configure_open_telemetry(src.OpenTelemetrySettings(force_traces=True)) - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "src_py_lib_test_traces" - configure_logging( - LoggingSettings( - logger_name=logger_name, - terminal_level="info", - log_file=log_file, - run="test-run", - ) - ) - try: - with span("outer", logger_name=logger_name): - info("inside", logger_name=logger_name, answer=42) - with span("inner", logger_name=logger_name): - logging.getLogger(logger_name).info("inside nested span") - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - outer_start = next( - row for row in rows if row["event"] == "outer" and row["phase"] == "start" - ) - outer_end = next( - row for row in rows if row["event"] == "outer" and row["phase"] == "end" - ) - inside = next(row for row in rows if row["event"] == "inside") - inner_start = next( - row for row in rows if row["event"] == "inner" and row["phase"] == "start" - ) - inner_end = next( - row for row in rows if row["event"] == "inner" and row["phase"] == "end" - ) - inner_log = next(row for row in rows if row.get("message") == "inside nested span") - - self.assertEqual( - list(outer_start), - ["ts", "level", "run", "trace", "span", "event", "phase"], - ) - self.assertEqual(outer_start["trace"], outer_end["trace"]) - self.assertEqual(outer_start["span"], outer_end["span"]) - self.assertEqual(len(outer_start["trace"]), 32) - self.assertEqual(len(outer_start["span"]), 16) - self.assertNotIn("parent_span", outer_start) - - self.assertEqual(inside["trace"], outer_start["trace"]) - self.assertEqual(inside["span"], outer_start["span"]) - - self.assertEqual( - list(inner_start), - [ - "ts", - "level", - "run", - "trace", - "span", - "parent_span", - "event", - "phase", - ], - ) - self.assertEqual(inner_start["trace"], outer_start["trace"]) - self.assertEqual(inner_start["span"], inner_end["span"]) - self.assertEqual(len(inner_start["span"]), 16) - self.assertEqual(inner_start["parent_span"], outer_start["span"]) - self.assertNotEqual(inner_start["span"], outer_start["span"]) - - self.assertEqual( - list(inner_log), - [ - "ts", - "level", - "run", - "trace", - "span", - "parent_span", - "logger", - "event", - "message", - ], - ) - self.assertEqual(inner_log["trace"], outer_start["trace"]) - self.assertEqual(inner_log["span"], inner_start["span"]) - self.assertEqual(inner_log["parent_span"], outer_start["span"]) + sink = InMemoryEventSink() + + with observability_context("trace-test", sink=sink, run="test-run"), span("outer"): + info("inside", answer=42) + with span("inner"): + pass + + run_start = phase_event(sink.events, "run", "start") + outer_start = phase_event(sink.events, "outer", "start") + outer_end = phase_event(sink.events, "outer", "end") + inner_start = phase_event(sink.events, "inner", "start") + inner_end = phase_event(sink.events, "inner", "end") + inside = events_named(sink.events, "inside")[0] + + self.assertEqual(len(outer_start["trace_id"]), 32) + self.assertEqual(len(outer_start["span_id"]), 16) + self.assertEqual(outer_start["trace_id"], outer_end["trace_id"]) + self.assertEqual(outer_start["span_id"], outer_end["span_id"]) + self.assertEqual(outer_start["parent_span_id"], run_start["span_id"]) + + self.assertEqual(inside["trace_id"], outer_start["trace_id"]) + self.assertEqual(inside["span_id"], outer_start["span_id"]) + + self.assertEqual(inner_start["trace_id"], outer_start["trace_id"]) + self.assertEqual(inner_start["span_id"], inner_end["span_id"]) + self.assertEqual(len(inner_start["span_id"]), 16) + self.assertNotEqual(inner_start["span_id"], outer_start["span_id"]) + self.assertEqual(inner_start["parent_span_id"], outer_start["span_id"]) + + def test_log_event_adds_event_to_recording_otel_span(self) -> None: + src.configure_open_telemetry(src.OpenTelemetrySettings(force_traces=True)) + provider = trace.get_tracer_provider() + add_span_processor = getattr(provider, "add_span_processor", None) + if not callable(add_span_processor): + self.skipTest("global tracer provider does not accept span processors") + exporter = InMemorySpanExporter() + add_span_processor(SimpleSpanProcessor(exporter)) + + with span("span_event_holder"): + log_event("info", "observed_event", answer=42) + + holder = next( + exported + for exported in exporter.get_finished_spans() + if exported.name == "span_event_holder" + ) + self.assertIn("observed_event", [event.name for event in holder.events]) def test_otel_helpers_return_current_w3c_traceparent_fields(self) -> None: src.configure_open_telemetry(src.OpenTelemetrySettings(force_traces=True)) @@ -1145,42 +1301,216 @@ def test_otel_helpers_return_current_w3c_traceparent_fields(self) -> None: {"trace_id": traceparent_parts[1], "span_id": traceparent_parts[2]}, ) - def test_span_can_lower_start_level_and_omit_success_status(self) -> None: + +class CliLoggingHandlersTest(unittest.TestCase): + def test_attaches_only_own_handlers_and_restores_prior_state(self) -> None: + logger_name = "src_py_lib_test_handler_isolation" + named_logger = logging.getLogger(logger_name) + named_logger.setLevel(logging.WARNING) + root_logger = logging.getLogger() + customer_named_handler = RecordingLogHandler() + customer_root_handler = RecordingLogHandler() + named_logger.addHandler(customer_named_handler) + root_logger.addHandler(customer_root_handler) + root_handlers_before = list(root_logger.handlers) + sink = InMemoryEventSink() + + try: + with cli_logging_handlers( + sink=sink, logger_names=(logger_name,), terminal_level="critical" + ): + added = [ + handler + for handler in named_logger.handlers + if handler is not customer_named_handler + ] + self.assertEqual( + {type(handler) for handler in added}, + {logging.StreamHandler, EventBridgeHandler}, + ) + self.assertEqual(list(root_logger.handlers), root_handlers_before) + self.assertEqual(named_logger.level, logging.DEBUG) + named_logger.info("customer handlers still see records") + + self.assertEqual(named_logger.handlers, [customer_named_handler]) + self.assertEqual(named_logger.level, logging.WARNING) + self.assertEqual(list(root_logger.handlers), root_handlers_before) + finally: + named_logger.removeHandler(customer_named_handler) + root_logger.removeHandler(customer_root_handler) + named_logger.setLevel(logging.NOTSET) + + self.assertEqual( + [record.getMessage() for record in customer_named_handler.records], + ["customer handlers still see records"], + ) + self.assertEqual( + [record.getMessage() for record in customer_root_handler.records], + ["customer handlers still see records"], + ) + self.assertEqual([event["event_name"] for event in sink.events], ["log"]) + + def test_event_bridge_forwards_human_log_records(self) -> None: + sink = InMemoryEventSink() + + with cli_logging_handlers( + sink=sink, logger_names=("src_py_lib_test_bridge",), terminal_level="critical" + ): + logging.getLogger("src_py_lib_test_bridge.module").info("Wrote %s", "x") + + event = events_named(sink.events, "log")[0] + self.assertEqual(event["body"], "Wrote x") + self.assertEqual(event["severity_text"], "INFO") + self.assertEqual(event["attributes"]["logger"], "src_py_lib_test_bridge.module") + + def test_event_bridge_includes_exception_tracebacks(self) -> None: + sink = InMemoryEventSink() + logger_name = "src_py_lib_test_exception" + + with cli_logging_handlers( + sink=sink, logger_names=(logger_name,), terminal_level="critical" + ): + try: + raise ValueError("kaboom") + except ValueError: + logging.getLogger(logger_name).exception("operation failed") + + event = events_named(sink.events, "log")[0] + self.assertEqual(event["body"], "operation failed") + self.assertEqual(event["severity_text"], "ERROR") + exception_text = event["attributes"]["exc_info"] + self.assertIn("Traceback (most recent call last)", exception_text) + self.assertIn("ValueError: kaboom", exception_text) + + def test_httpcore_response_headers_are_mined_and_redacted(self) -> None: + sink = InMemoryEventSink() + + with cli_logging_handlers( + sink=sink, + logger_names=("src_py_lib_test_httpcore",), + terminal_level="critical", + suppress_http_dependency_logs=False, + ): + logging.getLogger("httpcore.http11").debug( + "receive_response_headers.complete " + "return_value=(b'HTTP/1.1', 200, b'OK', " + "[(b'Zed', b'last'), (b'Content-Type', b'application/json'), " + "(b'Set-Cookie', b'session=secret'), " + "(b'X-Api-Key', b'secret'), (b'Alpha', b'first')])" + ) + + event = events_named(sink.events, "log")[0] + self.assertEqual(event["body"], "receive_response_headers.complete") + attributes = event["attributes"] + self.assertEqual(attributes["logger"], "httpcore.http11") + self.assertEqual(attributes["http_version"], "HTTP/1.1") + self.assertEqual(attributes["status_code"], 200) + self.assertEqual(attributes["reason_phrase"], "OK") + self.assertEqual( + list(attributes["headers"]), + ["alpha", "content-type", "set-cookie", "x-api-key", "zed"], + ) + self.assertEqual( + attributes["headers"], + { + "alpha": "first", + "content-type": "application/json", + "set-cookie": "[redacted]", + "x-api-key": "[redacted]", + "zed": "last", + }, + ) + + def test_httpx_request_logs_are_demoted_to_debug_severity(self) -> None: + sink = InMemoryEventSink() + + with cli_logging_handlers( + sink=sink, + logger_names=("src_py_lib_test_httpx",), + terminal_level="critical", + suppress_http_dependency_logs=False, + ): + logging.getLogger("httpx").info( + 'HTTP Request: POST https://api.linear.app/graphql "HTTP/1.1 200 OK"' + ) + + event = next( + event + for event in events_named(sink.events, "log") + if str(event["body"]).startswith("HTTP Request:") + ) + self.assertEqual(event["severity_text"], "DEBUG") + self.assertEqual(event["severity_number"], 5) + + def test_http_dependency_logs_are_suppressed_by_default(self) -> None: + sink = InMemoryEventSink() + httpx_logger = logging.getLogger("httpx") + httpx_handlers_before = list(httpx_logger.handlers) + + with cli_logging_handlers( + sink=sink, logger_names=("src_py_lib_test_suppressed",), terminal_level="critical" + ): + self.assertEqual(list(httpx_logger.handlers), httpx_handlers_before) + httpx_logger.info('HTTP Request: GET https://example.com "HTTP/1.1 200 OK"') + logging.getLogger("httpcore.http11").debug( + "receive_response_headers.complete return_value=()" + ) + + self.assertEqual(sink.events, []) + + +class CliRunContextTest(unittest.TestCase): + def test_cli_run_context_writes_jsonl_run_lifecycle(self) -> None: with tempfile.TemporaryDirectory() as directory: log_file = Path(directory) / "events.json" - logger_name = "src_py_lib_test_quiet_event" - configure_logging( - LoggingSettings( - logger_name=logger_name, + config = ExampleConfig(token="secret-token") + + with src.logging( + config, + command="unit-test", + git_cwd=__file__, + logging_config=LoggingSettings( terminal_level="critical", - log_file_level="info", + log_file_level="debug", log_file=log_file, run="test-run", - ) - ) - try: - with span( - "quiet_start", - logger_name=logger_name, - level="info", - start_level="debug", - omit_success_status=True, - ): - pass - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - quiet_rows = [row for row in rows if row["event"] == "quiet_start"] - self.assertEqual(len(quiet_rows), 1) - self.assertEqual(quiet_rows[0]["phase"], "end") - self.assertNotIn("status", quiet_rows[0]) - self.assertNotIn("error_type", quiet_rows[0]) - - def test_logging_context_emits_run_summary_resource_and_http_metrics(self) -> None: + ), + ) as context_log_file: + self.assertEqual(context_log_file, log_file) + info("inside_command", answer=42) + + rows = [json.loads(line) for line in log_file.read_text(encoding="utf-8").splitlines()] + + writing = events_named(rows, "log")[0] + self.assertEqual(writing["body"], f"Writing log events to {log_file}.") + + run_start = phase_event(rows, "run", "start") + self.assertEqual(run_start["severity_text"], "DEBUG") + self.assertIn("process.pid", run_start["resource"]) + + startup = events_named(rows, "startup")[0] + self.assertEqual(startup["severity_text"], "INFO") + self.assertEqual(startup["attributes"]["command"], "unit-test") + self.assertEqual(startup["attributes"]["log_file"], str(log_file)) + self.assertEqual(startup["attributes"]["config"]["EXAMPLE_TOKEN"], "provided") + self.assertIn("git_commit", startup["attributes"]) + + inside = events_named(rows, "inside_command")[0] + self.assertEqual(inside["attributes"]["command"], "unit-test") + self.assertEqual(inside["attributes"]["run"], "test-run") + self.assertEqual(inside["attributes"]["answer"], 42) + + run_end = phase_event(rows, "run", "end") + self.assertEqual(run_end["severity_text"], "INFO") + self.assertEqual(run_end["attributes"]["status"], "ok") + + for row in rows: + self.assertLessEqual(set(row), set(EVENT_FIELD_ORDER)) + model_keys = [key for key in row if key in EVENT_FIELD_ORDER] + self.assertEqual(model_keys, [key for key in EVENT_FIELD_ORDER if key in row]) + self.assertEqual(list(row["attributes"]), sorted(row["attributes"])) + + def test_cli_run_context_emits_run_summary_and_resets_http_metrics(self) -> None: attempts = 0 def handler(_request: httpx.Request) -> httpx.Response: @@ -1190,163 +1520,179 @@ def handler(_request: httpx.Request) -> httpx.Response: return httpx.Response(429, json={"retry": True}, headers={"Retry-After": "0"}) return httpx.Response(200, json={"ok": True}) + with tempfile.TemporaryDirectory() as directory: + first_log_file = Path(directory) / "first.json" + second_log_file = Path(directory) / "second.json" + + with src.logging( + command="unit-test", + logging_config=LoggingSettings( + terminal_level="critical", + log_file_level="debug", + log_file=first_log_file, + run="test-run", + resource_sample_interval_seconds=0, + ), + run_fields={"endpoint": "https://example.com"}, + run_summary=lambda: {"custom_count": 7}, + ): + client = HTTPClient( + max_attempts=2, + retry_base_delay_seconds=0, + retry_max_delay_seconds=0, + transport=httpx.MockTransport(handler), + ) + self.assertEqual( + client.json( + "POST", + "https://example.com/api", + json_body={"hello": "world"}, + ), + {"ok": True}, + ) + + rows = [ + json.loads(line) for line in first_log_file.read_text(encoding="utf-8").splitlines() + ] + run_end = phase_event(rows, "run", "end") + attributes = run_end["attributes"] + self.assertEqual(attributes["status"], "ok") + self.assertEqual(attributes["exit_code"], 0) + self.assertEqual(attributes["endpoint"], "https://example.com") + self.assertEqual(attributes["custom_count"], 7) + self.assertEqual(attributes["http.client.request.count"], 2) + self.assertEqual(attributes["http.client.retry.count"], 1) + self.assertEqual(attributes["http.client.response.2xx.count"], 1) + self.assertEqual(attributes["http.client.response.4xx.count"], 1) + self.assertEqual(attributes["http.client.response.429.count"], 1) + self.assertEqual(attributes["http.client.transport_error.count"], 0) + self.assertGreater(attributes["http.client.request.body.size.total"], 0) + self.assertGreater(attributes["http.client.response.body.size.total"], 0) + self.assertIn("cpu_count_logical", attributes) + self.assertIn("peak_rss_mb", attributes) + + with src.logging( + command="unit-test", + logging_config=LoggingSettings( + terminal_level="critical", + log_file_level="debug", + log_file=second_log_file, + run="test-run-2", + ), + ): + pass + + second_rows = [ + json.loads(line) + for line in second_log_file.read_text(encoding="utf-8").splitlines() + ] + second_run_end = phase_event(second_rows, "run", "end") + self.assertEqual(second_run_end["attributes"]["http.client.request.count"], 0) + self.assertEqual(second_run_end["attributes"]["http.client.retry.count"], 0) + + def test_cli_run_context_records_system_exit_code(self) -> None: with tempfile.TemporaryDirectory() as directory: log_file = Path(directory) / "events.json" - try: - with src.logging( + + with ( + self.assertRaises(SystemExit), + src.logging( command="unit-test", logging_config=LoggingSettings( + logger_names=("src_py_lib_test_exit_code",), terminal_level="critical", log_file_level="debug", log_file=log_file, run="test-run", - resource_sample_interval_seconds=0, ), - run_fields={"endpoint": "https://example.com"}, - run_summary=lambda: {"custom_count": 7}, - ): - client = HTTPClient( - max_attempts=2, - retry_base_delay_seconds=0, - retry_max_delay_seconds=0, - transport=httpx.MockTransport(handler), - ) - self.assertEqual( - client.json( - "POST", - "https://example.com/api", - json_body={"hello": "world"}, - ), - {"ok": True}, - ) - finally: - logger = logging.getLogger("") - for handler_ in list(logger.handlers): - logger.removeHandler(handler_) - handler_.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - run_end = next(row for row in rows if row["event"] == "run" and row["phase"] == "end") - self.assertEqual(run_end["status"], "ok") - self.assertEqual(run_end["exit_code"], 0) - self.assertEqual(run_end["endpoint"], "https://example.com") - self.assertEqual(run_end["custom_count"], 7) - self.assertEqual(run_end["http_request_attempt_count"], 2) - self.assertEqual(run_end["http_retry_count"], 1) - self.assertEqual(run_end["http_2xx_count"], 1) - self.assertEqual(run_end["http_429_count"], 1) - self.assertGreater(run_end["http_request_bytes_total"], 0) - self.assertGreater(run_end["http_response_bytes_total"], 0) - self.assertIn("cpu_count_logical", run_end) - - def test_logging_context_records_system_exit_code(self) -> None: + ), + ): + raise SystemExit(3) + + rows = [json.loads(line) for line in log_file.read_text(encoding="utf-8").splitlines()] + run_end = phase_event(rows, "run", "end") + self.assertEqual(run_end["severity_text"], "ERROR") + self.assertEqual(run_end["attributes"]["status"], "error") + self.assertEqual(run_end["attributes"]["error.type"], "SystemExit") + self.assertEqual(run_end["attributes"]["exit_code"], 3) + + def test_src_log_level_env_controls_log_file_level(self) -> None: with tempfile.TemporaryDirectory() as directory: log_file = Path(directory) / "events.json" - try: - with ( - self.assertRaises(SystemExit), - src.logging( - command="unit-test", - logging_config=LoggingSettings( - terminal_level="critical", - log_file_level="debug", - log_file=log_file, - run="test-run", - ), + + with ( + patch.dict("os.environ", {"SRC_LOG_LEVEL": "INFO"}), + src.logging( + command="unit-test", + logging_config=LoggingSettings( + logger_names=("src_py_lib_test_log_level",), + terminal_level="critical", + log_file=log_file, + run="test-run", ), - ): - raise SystemExit(3) - finally: - logger = logging.getLogger("") - for handler_ in list(logger.handlers): - logger.removeHandler(handler_) - handler_.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - run_end = next(row for row in rows if row["event"] == "run" and row["phase"] == "end") - self.assertEqual(run_end["status"], "error") - self.assertEqual(run_end["error_type"], "SystemExit") - self.assertEqual(run_end["exit_code"], 3) - - def test_httpx_request_logs_are_debug_events(self) -> None: + ), + ): + debug("debug_event") + info("info_event") + + rows = [json.loads(line) for line in log_file.read_text(encoding="utf-8").splitlines()] + names = [row["event_name"] for row in rows] + self.assertNotIn("debug_event", names) + self.assertIn("info_event", names) + + def test_cli_run_context_defaults_log_file_under_logs_dir(self) -> None: with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "httpx" - configure_logging( - LoggingSettings( - logger_name=logger_name, + logs_dir = Path(directory) / "logs" + + with src.logging( + command="unit-test", + logging_config=LoggingSettings( + logger_names=("src_py_lib_test_default_logs_dir",), terminal_level="critical", log_file_level="debug", - log_file=log_file, + logs_dir=logs_dir, run="test-run", - ) - ) - try: - logging.getLogger(logger_name).info( - 'HTTP Request: POST https://api.linear.app/graphql "HTTP/1.1 200 OK"' - ) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - request_log = next( - row for row in rows if row.get("message", "").startswith("HTTP Request:") - ) - self.assertEqual(request_log["level"], "DEBUG") + ), + ) as log_file: + info("default_log_path") - def test_httpcore_response_headers_are_structured(self) -> None: - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - logger_name = "httpcore" - configure_logging( - LoggingSettings( - logger_name=logger_name, - terminal_level="info", - log_file_level="debug", - log_file=log_file, - run="test-run", - ) - ) - try: - logging.getLogger("httpcore.http11").debug( - "receive_response_headers.complete " - "return_value=(b'HTTP/1.1', 200, b'OK', " - "[(b'Zed', b'last'), (b'Content-Type', b'application/json'), " - "(b'Set-Cookie', b'session=secret'), " - "(b'X-Api-Key', b'secret'), (b'Alpha', b'first')])" - ) - finally: - logger = logging.getLogger(logger_name) - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - response_headers = next( - row for row in rows if row.get("message") == "receive_response_headers.complete" + if log_file is None: + self.fail("cli_run_context did not yield a default log file") + self.assertEqual(log_file.parent, logs_dir) + self.assertRegex( + log_file.name, + r"^\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-\d{4}-test-run\.json$", ) + rows = [json.loads(line) for line in log_file.read_text(encoding="utf-8").splitlines()] + self.assertEqual(len(events_named(rows, "default_log_path")), 1) - self.assertEqual(response_headers["logger"], "httpcore.http11") - self.assertEqual(response_headers["http_version"], "HTTP/1.1") - self.assertEqual(response_headers["status_code"], 200) - self.assertEqual(response_headers["reason_phrase"], "OK") - self.assertEqual( - list(response_headers["headers"]), - ["alpha", "content-type", "set-cookie", "x-api-key", "zed"], - ) - self.assertEqual( - response_headers["headers"], - { - "alpha": "first", - "content-type": "application/json", - "set-cookie": "[redacted]", - "x-api-key": "[redacted]", - "zed": "last", - }, - ) + +class OtelLogsSinkTest(unittest.TestCase): + def test_otel_logs_sink_round_trips_events_through_logs_api(self) -> None: + exporter = InMemoryLogRecordExporter() + provider = LoggerProvider() + provider.add_log_record_processor(SimpleLogRecordProcessor(exporter)) + set_logger_provider(provider) + sink = OtelLogsSink() + + sink.emit( + { + "time_unix_nano": 123, + "severity_text": "INFO", + "severity_number": 9, + "event_name": "unit_test_event", + "body": "hello", + "attributes": {"answer": 42}, + } + ) + + records = exporter.get_finished_logs() + self.assertEqual(len(records), 1) + record = records[0].log_record + self.assertEqual(record.severity_text, "INFO") + self.assertEqual(record.body, "hello") + self.assertEqual(record.event_name, "unit_test_event") + self.assertEqual(dict(record.attributes or {}), {"answer": 42}) class HTTPClientTest(unittest.TestCase): @@ -1409,55 +1755,38 @@ def handler(_request: httpx.Request) -> httpx.Response: }, ) - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - configure_logging( - LoggingSettings( - terminal_level="critical", - log_file_level="debug", - log_file=log_file, - run="test-run", - ) - ) - try: - client = HTTPClient(max_attempts=1, transport=httpx.MockTransport(handler)) - payload = client.json( - "POST", - "https://user:pass@example.com/api?code=oauth", - headers={"Authorization": "Bearer token"}, - query={"limit": 10, "access_token": "secret", "signature": "signed"}, - json_body={"hello": "world"}, - ) - finally: - logger = logging.getLogger("") - for handler_ in list(logger.handlers): - logger.removeHandler(handler_) - handler_.close() - - self.assertEqual(payload, {"ok": True}) - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - http_request = next( - row - for row in rows - if row.get("event") == "http_request" and row.get("phase") == "end" + sink = InMemoryEventSink() + with observability_context("unit-test", sink=sink, run="test-run", min_level="debug"): + client = HTTPClient(max_attempts=1, transport=httpx.MockTransport(handler)) + payload = client.json( + "POST", + "https://user:pass@example.com/api?code=oauth", + headers={"Authorization": "Bearer token"}, + query={"limit": 10, "access_token": "secret", "signature": "signed"}, + json_body={"hello": "world"}, ) - self.assertFalse(any(row.get("logger") in {"httpx", "httpcore"} for row in rows)) - self.assertEqual(http_request["status_code"], 200) - self.assertEqual(http_request["reason_phrase"], "OK") - self.assertEqual( - http_request["url"], - "https://example.com/api?code=[redacted]&limit=10" - "&access_token=[redacted]&signature=[redacted]", - ) - self.assertEqual(http_request["request_bytes"], len(b'{"hello": "world"}')) - self.assertEqual(http_request["request_headers"]["authorization"], "[redacted]") - self.assertEqual( - list(http_request["response_headers"]), sorted(http_request["response_headers"]) - ) - self.assertEqual(http_request["response_headers"]["content-type"], "application/json") - self.assertEqual(http_request["response_headers"]["set-cookie"], "[redacted]") - self.assertEqual(http_request["response_headers"]["zed"], "last") + self.assertEqual(payload, {"ok": True}) + http_request = phase_event(sink.events, "http_request", "end") + attributes = http_request["attributes"] + + self.assertEqual(http_request["severity_text"], "DEBUG") + self.assertEqual(events_named(sink.events, "log"), []) + self.assertEqual(attributes["status_code"], 200) + self.assertEqual(attributes["reason_phrase"], "OK") + self.assertEqual( + attributes["url"], + "https://example.com/api?code=[redacted]&limit=10" + "&access_token=[redacted]&signature=[redacted]", + ) + self.assertEqual(attributes["request_bytes"], len(b'{"hello": "world"}')) + self.assertEqual(attributes["request_headers"]["authorization"], "[redacted]") + self.assertEqual( + list(attributes["response_headers"]), sorted(attributes["response_headers"]) + ) + self.assertEqual(attributes["response_headers"]["content-type"], "application/json") + self.assertEqual(attributes["response_headers"]["set-cookie"], "[redacted]") + self.assertEqual(attributes["response_headers"]["zed"], "last") def test_json_request_wraps_timeouts(self) -> None: def handler(_request: httpx.Request) -> httpx.Response: @@ -1948,47 +2277,30 @@ def test_graphql_client_emits_query_debug_events(self) -> None: } """ - with tempfile.TemporaryDirectory() as directory: - log_file = Path(directory) / "events.json" - configure_logging( - LoggingSettings( - terminal_level="critical", - log_file_level="debug", - log_file=log_file, - run="test-run", - ) - ) - try: - client.execute(query, variables={"userId": "u1"}, page_size=2) - finally: - logger = logging.getLogger("") - for handler in list(logger.handlers): - logger.removeHandler(handler) - handler.close() - - rows = [json.loads(line) for line in log_file.read_text().splitlines()] - starts = [ - row - for row in rows - if row.get("event") == "graphql_query" and row.get("phase") == "start" - ] - ends = [ - row - for row in rows - if row.get("event") == "graphql_query" and row.get("phase") == "end" - ] + sink = InMemoryEventSink() + with observability_context("unit-test", sink=sink, run="test-run", min_level="debug"): + client.execute(query, variables={"userId": "u1"}, page_size=2) - self.assertEqual([row["query_name"] for row in starts], ["Items", "Items"]) - self.assertEqual([row["page_number"] for row in starts], [1, 2]) - self.assertEqual([row["page_size"] for row in starts], [2, 2]) - self.assertEqual([row["cursor_present"] for row in starts], [False, True]) - self.assertEqual(starts[0]["graphql_client"], "Example") - self.assertEqual( - starts[0]["url"], - "https://example.com/graphql?access_token=[redacted]&query=ok", - ) - self.assertEqual(starts[0]["variable_names"], ["after", "first", "userId"]) - self.assertEqual(ends[0]["response_fields"], ["viewer"]) + query_events = events_named(sink.events, "graphql_query") + starts = [ + event["attributes"] for event in query_events if event["attributes"]["phase"] == "start" + ] + ends = [ + event["attributes"] for event in query_events if event["attributes"]["phase"] == "end" + ] + + self.assertTrue(all(event["severity_text"] == "DEBUG" for event in query_events)) + self.assertEqual([attributes["query_name"] for attributes in starts], ["Items", "Items"]) + self.assertEqual([attributes["page_number"] for attributes in starts], [1, 2]) + self.assertEqual([attributes["page_size"] for attributes in starts], [2, 2]) + self.assertEqual([attributes["cursor_present"] for attributes in starts], [False, True]) + self.assertEqual(starts[0]["graphql_client"], "Example") + self.assertEqual( + starts[0]["url"], + "https://example.com/graphql?access_token=[redacted]&query=ok", + ) + self.assertEqual(starts[0]["variable_names"], ["after", "first", "userId"]) + self.assertEqual(ends[0]["response_fields"], ["viewer"]) def test_graphql_client_requires_end_cursor_for_next_page(self) -> None: http = RecordingHTTP(