From 06ecffc434b9153c8ba5e31d1240bfb42078cf72 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Wed, 25 Feb 2026 03:54:11 -0800 Subject: [PATCH] feat: Setup metric provider for manual and auto-instrumented measurements Made-with: Cursor --- .../instrumentation/opentelemetry/__init__.py | 3 + .../opentelemetry/_internal/config.py | 103 ++++++++- .../opentelemetry/_internal/temporality.py | 30 +++ .../opentelemetry/_internal/tests/conftest.py | 12 +- .../_internal/tests/test_config.py | 215 ++++++++++++++++-- .../_internal/tests/test_temporality.py | 63 +++++ .../instrumentation/opentelemetry/metrics.py | 83 +++++++ .../opentelemetry/providers.py | 10 + .../opentelemetry/tests/conftest.py | 12 +- .../opentelemetry/tests/test_metrics.py | 79 +++++++ .../opentelemetry/tests/test_providers.py | 30 +++ .../opentelemetry/tests/test_tracing.py | 4 +- 12 files changed, 605 insertions(+), 39 deletions(-) create mode 100644 cloud_pipelines_backend/instrumentation/opentelemetry/_internal/temporality.py create mode 100644 cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_temporality.py create mode 100644 cloud_pipelines_backend/instrumentation/opentelemetry/metrics.py create mode 100644 cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_metrics.py diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/__init__.py b/cloud_pipelines_backend/instrumentation/opentelemetry/__init__.py index fad5a3a..b44a7a1 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/__init__.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/__init__.py @@ -10,15 +10,18 @@ """ from cloud_pipelines_backend.instrumentation.opentelemetry import auto_instrumentation +from cloud_pipelines_backend.instrumentation.opentelemetry import metrics from cloud_pipelines_backend.instrumentation.opentelemetry import providers from cloud_pipelines_backend.instrumentation.opentelemetry import tracing instrument_fastapi = auto_instrumentation.instrument_fastapi +setup_metrics = metrics.setup setup_providers = providers.setup setup_tracing = tracing.setup __all__ = [ "instrument_fastapi", + "setup_metrics", "setup_providers", "setup_tracing", ] diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/config.py b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/config.py index f02edd0..a6834db 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/config.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/config.py @@ -9,22 +9,74 @@ import os +class EnvVar(str, enum.Enum): + """Environment variables that control OpenTelemetry configuration. + + Trace exporter: + TRACE_EXPORTER_ENDPOINT: OTLP collector URL for trace data. + TRACE_EXPORTER_PROTOCOL: Transport protocol ("grpc" or "http"). + + Metric exporter: + METRIC_EXPORTER_ENDPOINT: OTLP collector URL for metric data. + METRIC_EXPORTER_PROTOCOL: Transport protocol ("grpc" or "http"). + + Metric temporality (per-instrument overrides): + Derived dynamically from MetricsTemporalityConfig field names using + the prefix TANGLE_OTEL_METRICS_TEMPORALITY_ (e.g. + TANGLE_OTEL_METRICS_TEMPORALITY_COUNTER). See MetricsTemporalityConfig + for available fields and their defaults. + + Service identity: + ENV: Application environment, used to build the default service name + (e.g. "tangle-production"). + SERVICE_VERSION: Deployed revision or version tag. + """ + + TRACE_EXPORTER_ENDPOINT = "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT" + TRACE_EXPORTER_PROTOCOL = "TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL" + METRIC_EXPORTER_ENDPOINT = "TANGLE_OTEL_METRIC_EXPORTER_ENDPOINT" + METRIC_EXPORTER_PROTOCOL = "TANGLE_OTEL_METRIC_EXPORTER_PROTOCOL" + ENV = "TANGLE_ENV" + SERVICE_VERSION = "TANGLE_SERVICE_VERSION" + + class ExporterProtocol(str, enum.Enum): GRPC = "grpc" HTTP = "http" +class AggregationTemporality(str, enum.Enum): + DELTA = "delta" + CUMULATIVE = "cumulative" + + @dataclasses.dataclass(frozen=True, kw_only=True) class ExporterConfig: endpoint: str protocol: str +@dataclasses.dataclass(frozen=True, kw_only=True) +class MetricsTemporalityConfig: + counter: str = AggregationTemporality.DELTA + observable_counter: str = AggregationTemporality.DELTA + up_down_counter: str = AggregationTemporality.CUMULATIVE + observable_up_down_counter: str = AggregationTemporality.CUMULATIVE + histogram: str = AggregationTemporality.DELTA + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class MetricsConfig: + exporter: ExporterConfig + temporality: MetricsTemporalityConfig + + @dataclasses.dataclass(frozen=True, kw_only=True) class OtelConfig: service_name: str service_version: str trace_exporter: ExporterConfig | None = None + metrics: MetricsConfig | None = None def _resolve_exporter( @@ -47,12 +99,38 @@ def _resolve_exporter( except ValueError: raise ValueError( f"Invalid OTel protocol: {protocol}. " - f"Expected values: {', '.join(e.value for e in ExporterProtocol)}" + f"Expected values: {', '.join(ExporterProtocol)}" ) return ExporterConfig(endpoint=endpoint, protocol=protocol) +def _resolve_temporality(env_var: str, default: str) -> str: + value = os.environ.get(env_var, default).lower() + try: + AggregationTemporality(value) + except ValueError: + raise ValueError( + f"Invalid OTel metrics temporality: {value} (from {env_var}). " + f"Expected values: {', '.join(AggregationTemporality)}" + ) + return value + + +_TEMPORALITY_ENV_PREFIX = "TANGLE_OTEL_METRICS_TEMPORALITY_" + + +def _resolve_metrics_temporality() -> MetricsTemporalityConfig: + resolved = { + f.name: _resolve_temporality( + f"{_TEMPORALITY_ENV_PREFIX}{f.name.upper()}", + f.default, + ) + for f in dataclasses.fields(MetricsTemporalityConfig) + } + return MetricsTemporalityConfig(**resolved) + + def resolve( service_name: str | None = None, service_version: str | None = None, @@ -63,22 +141,35 @@ def resolve( Raises ValueError if any configured exporter has invalid settings. """ trace_exporter = _resolve_exporter( - endpoint_var="TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", - protocol_var="TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", + endpoint_var=EnvVar.TRACE_EXPORTER_ENDPOINT, + protocol_var=EnvVar.TRACE_EXPORTER_PROTOCOL, + ) + + metrics_exporter = _resolve_exporter( + endpoint_var=EnvVar.METRIC_EXPORTER_ENDPOINT, + protocol_var=EnvVar.METRIC_EXPORTER_PROTOCOL, ) - if trace_exporter is None: + if trace_exporter is None and metrics_exporter is None: return None if service_name is None: - app_env = os.environ.get("TANGLE_ENV", "unknown") + app_env = os.environ.get(EnvVar.ENV, "unknown") service_name = f"tangle-{app_env}" if service_version is None: - service_version = os.environ.get("TANGLE_SERVICE_VERSION", "unknown") + service_version = os.environ.get(EnvVar.SERVICE_VERSION, "unknown") + + metrics = None + if metrics_exporter is not None: + metrics = MetricsConfig( + exporter=metrics_exporter, + temporality=_resolve_metrics_temporality(), + ) return OtelConfig( service_name=service_name, service_version=service_version, trace_exporter=trace_exporter, + metrics=metrics, ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/temporality.py b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/temporality.py new file mode 100644 index 0000000..14d5498 --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/temporality.py @@ -0,0 +1,30 @@ +""" +Maps string-based temporality configuration to OTel SDK types. +""" + +from opentelemetry import metrics as otel_metrics +from opentelemetry.sdk import metrics as otel_sdk_metrics +from opentelemetry.sdk.metrics import export as otel_metrics_export + +from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config + +_TEMPORALITY_MAP = { + config.AggregationTemporality.DELTA: otel_metrics_export.AggregationTemporality.DELTA, + config.AggregationTemporality.CUMULATIVE: otel_metrics_export.AggregationTemporality.CUMULATIVE, +} + + +def build_preferred_temporality( + temporality: config.MetricsTemporalityConfig, +) -> dict[type[otel_metrics.Instrument], otel_metrics_export.AggregationTemporality]: + return { + otel_sdk_metrics.Counter: _TEMPORALITY_MAP[temporality.counter], + otel_sdk_metrics.ObservableCounter: _TEMPORALITY_MAP[ + temporality.observable_counter + ], + otel_sdk_metrics.UpDownCounter: _TEMPORALITY_MAP[temporality.up_down_counter], + otel_sdk_metrics.ObservableUpDownCounter: _TEMPORALITY_MAP[ + temporality.observable_up_down_counter + ], + otel_sdk_metrics.Histogram: _TEMPORALITY_MAP[temporality.histogram], + } diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/conftest.py b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/conftest.py index 8576f23..9154179 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/conftest.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/conftest.py @@ -1,14 +1,18 @@ import pytest +from opentelemetry import metrics as otel_metrics from opentelemetry import trace @pytest.fixture(autouse=True) -def reset_otel_tracer_provider(): - """Reset the global OTel tracer provider between tests. +def reset_otel_providers(): + """Reset global OTel providers between tests. - OTel only allows set_tracer_provider to be called once per process. - We reset the internal guard so each test gets a clean slate. + OTel only allows set_tracer_provider / set_meter_provider to be called + once per process. We reset the internal guards so each test gets a + clean slate. """ yield trace._TRACER_PROVIDER_SET_ONCE._done = False trace._TRACER_PROVIDER = trace.ProxyTracerProvider() + otel_metrics._internal._METER_PROVIDER_SET_ONCE._done = False + otel_metrics._internal._METER_PROVIDER = None diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_config.py b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_config.py index 0493e2c..4bc70b5 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_config.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_config.py @@ -19,11 +19,26 @@ def test_invalid_value_raises(self): config.ExporterProtocol("websocket") +class TestAggregationTemporality: + """Tests for AggregationTemporality enum.""" + + def test_delta_value(self): + assert config.AggregationTemporality.DELTA == "delta" + + def test_cumulative_value(self): + assert config.AggregationTemporality.CUMULATIVE == "cumulative" + + def test_invalid_value_raises(self): + with pytest.raises(ValueError): + config.AggregationTemporality("invalid") + + class TestResolve: """Tests for config.resolve().""" def test_returns_none_when_no_exporters_configured(self, monkeypatch): - monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", raising=False) + monkeypatch.delenv(config.EnvVar.TRACE_EXPORTER_ENDPOINT, raising=False) + monkeypatch.delenv(config.EnvVar.METRIC_EXPORTER_ENDPOINT, raising=False) result = config.resolve() @@ -31,11 +46,11 @@ def test_returns_none_when_no_exporters_configured(self, monkeypatch): def test_returns_config_with_defaults(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) - monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", raising=False) - monkeypatch.delenv("TANGLE_ENV", raising=False) - monkeypatch.delenv("TANGLE_SERVICE_VERSION", raising=False) + monkeypatch.delenv(config.EnvVar.TRACE_EXPORTER_PROTOCOL, raising=False) + monkeypatch.delenv(config.EnvVar.ENV, raising=False) + monkeypatch.delenv(config.EnvVar.SERVICE_VERSION, raising=False) result = config.resolve() @@ -47,7 +62,7 @@ def test_returns_config_with_defaults(self, monkeypatch): def test_uses_custom_service_name(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) result = config.resolve(service_name="oasis-api") @@ -56,9 +71,9 @@ def test_uses_custom_service_name(self, monkeypatch): def test_service_name_includes_tangle_env(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) - monkeypatch.setenv("TANGLE_ENV", "production") + monkeypatch.setenv(config.EnvVar.ENV, "production") result = config.resolve() @@ -66,9 +81,9 @@ def test_service_name_includes_tangle_env(self, monkeypatch): def test_respects_http_protocol(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4318" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4318" ) - monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", "http") + monkeypatch.setenv(config.EnvVar.TRACE_EXPORTER_PROTOCOL, "http") result = config.resolve() @@ -76,32 +91,32 @@ def test_respects_http_protocol(self, monkeypatch): def test_respects_grpc_protocol(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) - monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", "grpc") + monkeypatch.setenv(config.EnvVar.TRACE_EXPORTER_PROTOCOL, "grpc") result = config.resolve() assert result.trace_exporter.protocol == config.ExporterProtocol.GRPC def test_raises_on_invalid_endpoint_format(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "localhost:4317") + monkeypatch.setenv(config.EnvVar.TRACE_EXPORTER_ENDPOINT, "localhost:4317") with pytest.raises(ValueError, match="Invalid OTel endpoint format"): config.resolve() def test_raises_on_invalid_protocol(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) - monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", "websocket") + monkeypatch.setenv(config.EnvVar.TRACE_EXPORTER_PROTOCOL, "websocket") with pytest.raises(ValueError, match="Invalid OTel protocol"): config.resolve() def test_accepts_https_endpoint(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "https://collector.example.com:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "https://collector.example.com:4317" ) result = config.resolve() @@ -110,7 +125,7 @@ def test_accepts_https_endpoint(self, monkeypatch): def test_uses_custom_service_version(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) result = config.resolve(service_version="abc123") @@ -119,9 +134,9 @@ def test_uses_custom_service_version(self, monkeypatch): def test_service_version_from_env(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) - monkeypatch.setenv("TANGLE_SERVICE_VERSION", "def456") + monkeypatch.setenv(config.EnvVar.SERVICE_VERSION, "def456") result = config.resolve() @@ -129,9 +144,9 @@ def test_service_version_from_env(self, monkeypatch): def test_service_version_defaults_to_unknown(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) - monkeypatch.delenv("TANGLE_SERVICE_VERSION", raising=False) + monkeypatch.delenv(config.EnvVar.SERVICE_VERSION, raising=False) result = config.resolve() @@ -139,7 +154,7 @@ def test_service_version_defaults_to_unknown(self, monkeypatch): def test_config_is_frozen(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) result = config.resolve() @@ -149,7 +164,7 @@ def test_config_is_frozen(self, monkeypatch): def test_exporter_config_is_frozen(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" ) result = config.resolve() @@ -164,3 +179,157 @@ def test_config_requires_keyword_arguments(self): def test_exporter_config_requires_keyword_arguments(self): with pytest.raises(TypeError): config.ExporterConfig("http://localhost:4317", "grpc") + + +class TestMetricsExporterResolve: + """Tests for metrics exporter resolution in config.resolve().""" + + def test_metrics_none_when_metric_endpoint_not_set(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.TRACE_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.delenv(config.EnvVar.METRIC_EXPORTER_ENDPOINT, raising=False) + + result = config.resolve() + + assert result is not None + assert result.metrics is None + + def test_resolves_metrics_exporter(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + + result = config.resolve() + + assert result.metrics is not None + assert result.metrics.exporter.endpoint == "http://localhost:4317" + assert result.metrics.exporter.protocol == config.ExporterProtocol.GRPC + + def test_resolves_metrics_http_protocol(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4318" + ) + monkeypatch.setenv(config.EnvVar.METRIC_EXPORTER_PROTOCOL, "http") + + result = config.resolve() + + assert result.metrics.exporter.protocol == config.ExporterProtocol.HTTP + + def test_returns_config_with_only_metrics(self, monkeypatch): + monkeypatch.delenv(config.EnvVar.TRACE_EXPORTER_ENDPOINT, raising=False) + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + + result = config.resolve() + + assert result is not None + assert result.trace_exporter is None + assert result.metrics is not None + + def test_raises_on_invalid_metrics_endpoint(self, monkeypatch): + monkeypatch.setenv(config.EnvVar.METRIC_EXPORTER_ENDPOINT, "bad-endpoint") + + with pytest.raises(ValueError, match="Invalid OTel endpoint format"): + config.resolve() + + def test_raises_on_invalid_metrics_protocol(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.setenv(config.EnvVar.METRIC_EXPORTER_PROTOCOL, "websocket") + + with pytest.raises(ValueError, match="Invalid OTel protocol"): + config.resolve() + + +class TestMetricsTemporalityResolve: + """Tests for metrics temporality env var resolution.""" + + def test_defaults(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + + result = config.resolve() + t = result.metrics.temporality + + assert t.counter == config.AggregationTemporality.DELTA + assert t.observable_counter == config.AggregationTemporality.DELTA + assert t.up_down_counter == config.AggregationTemporality.CUMULATIVE + assert t.observable_up_down_counter == config.AggregationTemporality.CUMULATIVE + assert t.histogram == config.AggregationTemporality.DELTA + + def test_override_counter_temporality(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_COUNTER", "cumulative") + + result = config.resolve() + + assert ( + result.metrics.temporality.counter + == config.AggregationTemporality.CUMULATIVE + ) + + def test_override_histogram_temporality(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_HISTOGRAM", "cumulative") + + result = config.resolve() + + assert ( + result.metrics.temporality.histogram + == config.AggregationTemporality.CUMULATIVE + ) + + def test_override_all_temporalities(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_COUNTER", "cumulative") + monkeypatch.setenv( + "TANGLE_OTEL_METRICS_TEMPORALITY_OBSERVABLE_COUNTER", "cumulative" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_UP_DOWN_COUNTER", "delta") + monkeypatch.setenv( + "TANGLE_OTEL_METRICS_TEMPORALITY_OBSERVABLE_UP_DOWN_COUNTER", "delta" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_HISTOGRAM", "cumulative") + + result = config.resolve() + t = result.metrics.temporality + + assert t.counter == config.AggregationTemporality.CUMULATIVE + assert t.observable_counter == config.AggregationTemporality.CUMULATIVE + assert t.up_down_counter == config.AggregationTemporality.DELTA + assert t.observable_up_down_counter == config.AggregationTemporality.DELTA + assert t.histogram == config.AggregationTemporality.CUMULATIVE + + def test_raises_on_invalid_temporality(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_COUNTER", "invalid") + + with pytest.raises(ValueError, match="Invalid OTel metrics temporality"): + config.resolve() + + def test_temporality_case_insensitive(self, monkeypatch): + monkeypatch.setenv( + config.EnvVar.METRIC_EXPORTER_ENDPOINT, "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_COUNTER", "DELTA") + monkeypatch.setenv("TANGLE_OTEL_METRICS_TEMPORALITY_HISTOGRAM", "Cumulative") + + result = config.resolve() + + assert result.metrics.temporality.counter == config.AggregationTemporality.DELTA + assert ( + result.metrics.temporality.histogram + == config.AggregationTemporality.CUMULATIVE + ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_temporality.py b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_temporality.py new file mode 100644 index 0000000..857a7bb --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/tests/test_temporality.py @@ -0,0 +1,63 @@ +"""Tests for the OpenTelemetry temporality mapping module.""" + +from opentelemetry.sdk import metrics as otel_sdk_metrics +from opentelemetry.sdk.metrics import export as otel_metrics_export + +from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config +from cloud_pipelines_backend.instrumentation.opentelemetry._internal import temporality + + +class TestBuildPreferredTemporality: + """Tests for temporality.build_preferred_temporality().""" + + def test_default_temporality_config_values(self): + t = config.MetricsTemporalityConfig() + + assert t.counter == "delta" + assert t.observable_counter == "delta" + assert t.up_down_counter == "cumulative" + assert t.observable_up_down_counter == "cumulative" + assert t.histogram == "delta" + + def test_maps_defaults_to_sdk_types(self): + result = temporality.build_preferred_temporality( + config.MetricsTemporalityConfig() + ) + + assert ( + result[otel_sdk_metrics.Counter] + == otel_metrics_export.AggregationTemporality.DELTA + ) + assert ( + result[otel_sdk_metrics.ObservableCounter] + == otel_metrics_export.AggregationTemporality.DELTA + ) + assert ( + result[otel_sdk_metrics.UpDownCounter] + == otel_metrics_export.AggregationTemporality.CUMULATIVE + ) + assert ( + result[otel_sdk_metrics.ObservableUpDownCounter] + == otel_metrics_export.AggregationTemporality.CUMULATIVE + ) + assert ( + result[otel_sdk_metrics.Histogram] + == otel_metrics_export.AggregationTemporality.DELTA + ) + + def test_maps_overrides(self): + result = temporality.build_preferred_temporality( + config.MetricsTemporalityConfig( + counter="cumulative", + histogram="cumulative", + ) + ) + + assert ( + result[otel_sdk_metrics.Counter] + == otel_metrics_export.AggregationTemporality.CUMULATIVE + ) + assert ( + result[otel_sdk_metrics.Histogram] + == otel_metrics_export.AggregationTemporality.CUMULATIVE + ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/metrics.py b/cloud_pipelines_backend/instrumentation/opentelemetry/metrics.py new file mode 100644 index 0000000..a645a03 --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/metrics.py @@ -0,0 +1,83 @@ +""" +OpenTelemetry metrics configuration. + +This module sets up the global meter provider with an OTLP exporter. +""" + +import logging + +from opentelemetry import metrics as otel_metrics +from opentelemetry.exporter.otlp.proto.grpc import ( + metric_exporter as otel_grpc_metric_exporter, +) +from opentelemetry.exporter.otlp.proto.http import ( + metric_exporter as otel_http_metric_exporter, +) +from opentelemetry.sdk import metrics as otel_sdk_metrics +from opentelemetry.sdk import resources as otel_resources +from opentelemetry.sdk.metrics import export as otel_metrics_export + +from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config +from cloud_pipelines_backend.instrumentation.opentelemetry._internal import ( + temporality as temporality_mod, +) + +_logger = logging.getLogger(__name__) + + +def setup( + endpoint: str, + protocol: str, + service_name: str, + service_version: str | None = None, + temporality: config.MetricsTemporalityConfig | None = None, +) -> None: + """ + Configure the global OpenTelemetry meter provider. + + Args: + endpoint: The OTLP collector endpoint URL. + protocol: The exporter protocol ("grpc" or "http"). + service_name: The service name reported to the collector. + service_version: The service version (e.g. git revision) reported to the collector. + temporality: Per-instrument aggregation temporality preferences. + """ + try: + _logger.info( + f"Configuring OpenTelemetry metrics, endpoint={endpoint}, " + f"protocol={protocol}, service_name={service_name}, " + f"service_version={service_version}" + ) + + preferred_temporality = ( + temporality_mod.build_preferred_temporality(temporality) + if temporality + else None + ) + + if protocol == config.ExporterProtocol.GRPC: + otel_exporter = otel_grpc_metric_exporter.OTLPMetricExporter( + endpoint=endpoint, + preferred_temporality=preferred_temporality, + ) + else: + otel_exporter = otel_http_metric_exporter.OTLPMetricExporter( + endpoint=endpoint, + preferred_temporality=preferred_temporality, + ) + + attributes = {otel_resources.SERVICE_NAME: service_name} + if service_version: + attributes[otel_resources.SERVICE_VERSION] = service_version + resource = otel_resources.Resource.create(attributes) + + reader = otel_metrics_export.PeriodicExportingMetricReader(otel_exporter) + meter_provider = otel_sdk_metrics.MeterProvider( + resource=resource, + metric_readers=[reader], + ) + otel_metrics.set_meter_provider(meter_provider) + + _logger.info("OpenTelemetry metrics configured successfully.") + except Exception as e: + _logger.exception("Failed to configure OpenTelemetry metrics") diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py b/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py index a4b2e02..1e73cf5 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py @@ -7,6 +7,7 @@ import logging from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config +from cloud_pipelines_backend.instrumentation.opentelemetry import metrics from cloud_pipelines_backend.instrumentation.opentelemetry import tracing _logger = logging.getLogger(__name__) @@ -47,3 +48,12 @@ def setup( service_name=otel_config.service_name, service_version=otel_config.service_version, ) + + if otel_config.metrics: + metrics.setup( + endpoint=otel_config.metrics.exporter.endpoint, + protocol=otel_config.metrics.exporter.protocol, + service_name=otel_config.service_name, + service_version=otel_config.service_version, + temporality=otel_config.metrics.temporality, + ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/conftest.py b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/conftest.py index 8576f23..9154179 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/conftest.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/conftest.py @@ -1,14 +1,18 @@ import pytest +from opentelemetry import metrics as otel_metrics from opentelemetry import trace @pytest.fixture(autouse=True) -def reset_otel_tracer_provider(): - """Reset the global OTel tracer provider between tests. +def reset_otel_providers(): + """Reset global OTel providers between tests. - OTel only allows set_tracer_provider to be called once per process. - We reset the internal guard so each test gets a clean slate. + OTel only allows set_tracer_provider / set_meter_provider to be called + once per process. We reset the internal guards so each test gets a + clean slate. """ yield trace._TRACER_PROVIDER_SET_ONCE._done = False trace._TRACER_PROVIDER = trace.ProxyTracerProvider() + otel_metrics._internal._METER_PROVIDER_SET_ONCE._done = False + otel_metrics._internal._METER_PROVIDER = None diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_metrics.py b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_metrics.py new file mode 100644 index 0000000..883cf03 --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_metrics.py @@ -0,0 +1,79 @@ +"""Tests for the OpenTelemetry metrics module.""" + +from unittest import mock + +from opentelemetry import metrics as otel_metrics +from opentelemetry.sdk import metrics as otel_sdk_metrics + +from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config +from cloud_pipelines_backend.instrumentation.opentelemetry import metrics + + +class TestMetricsSetup: + """Tests for metrics.setup().""" + + def test_sets_global_meter_provider_with_grpc(self): + metrics.setup( + endpoint="http://localhost:4317", + protocol=config.ExporterProtocol.GRPC, + service_name="test-service", + ) + + provider = otel_metrics.get_meter_provider() + assert isinstance(provider, otel_sdk_metrics.MeterProvider) + + def test_sets_global_meter_provider_with_http(self): + metrics.setup( + endpoint="http://localhost:4318", + protocol=config.ExporterProtocol.HTTP, + service_name="test-service", + ) + + provider = otel_metrics.get_meter_provider() + assert isinstance(provider, otel_sdk_metrics.MeterProvider) + + def test_service_name_is_set_on_resource(self): + metrics.setup( + endpoint="http://localhost:4317", + protocol=config.ExporterProtocol.GRPC, + service_name="my-service", + ) + + provider = otel_metrics.get_meter_provider() + assert provider._sdk_config.resource.attributes["service.name"] == "my-service" + + def test_service_version_is_set_on_resource(self): + metrics.setup( + endpoint="http://localhost:4317", + protocol=config.ExporterProtocol.GRPC, + service_name="my-service", + service_version="abc123", + ) + + provider = otel_metrics.get_meter_provider() + assert provider._sdk_config.resource.attributes["service.version"] == "abc123" + + def test_service_version_omitted_when_none(self): + metrics.setup( + endpoint="http://localhost:4317", + protocol=config.ExporterProtocol.GRPC, + service_name="my-service", + ) + + provider = otel_metrics.get_meter_provider() + assert "service.version" not in provider._sdk_config.resource.attributes + + def test_catches_exporter_exception(self): + with mock.patch( + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter.OTLPMetricExporter", + side_effect=RuntimeError("connection failed"), + ): + metrics.setup( + endpoint="http://localhost:4317", + protocol=config.ExporterProtocol.GRPC, + service_name="test-service", + ) + + assert not isinstance( + otel_metrics.get_meter_provider(), otel_sdk_metrics.MeterProvider + ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_providers.py b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_providers.py index bd77460..dc83ffa 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_providers.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_providers.py @@ -2,7 +2,9 @@ from unittest import mock +from opentelemetry import metrics as otel_metrics from opentelemetry import trace +from opentelemetry.sdk import metrics as otel_sdk_metrics from opentelemetry.sdk import trace as otel_sdk_trace from cloud_pipelines_backend.instrumentation.opentelemetry import providers @@ -13,6 +15,7 @@ class TestProvidersSetup: def test_noop_when_no_exporters_configured(self, monkeypatch): monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", raising=False) + monkeypatch.delenv("TANGLE_OTEL_METRIC_EXPORTER_ENDPOINT", raising=False) providers.setup() @@ -69,3 +72,30 @@ def test_catches_config_resolution_errors(self): assert not isinstance( trace.get_tracer_provider(), otel_sdk_trace.TracerProvider ) + + def test_configures_meter_provider(self, monkeypatch): + monkeypatch.setenv( + "TANGLE_OTEL_METRIC_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", raising=False) + + providers.setup() + + assert isinstance( + otel_metrics.get_meter_provider(), otel_sdk_metrics.MeterProvider + ) + + def test_configures_both_providers(self, monkeypatch): + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + monkeypatch.setenv( + "TANGLE_OTEL_METRIC_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + + providers.setup() + + assert isinstance(trace.get_tracer_provider(), otel_sdk_trace.TracerProvider) + assert isinstance( + otel_metrics.get_meter_provider(), otel_sdk_metrics.MeterProvider + ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_tracing.py b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_tracing.py index f69c36a..e09914a 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_tracing.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/tests/test_tracing.py @@ -45,7 +45,7 @@ def test_service_name_is_set_on_resource(self): def test_service_version_is_set_on_resource(self): tracing.setup( endpoint="http://localhost:4317", - protocol="grpc", + protocol=config.ExporterProtocol.GRPC, service_name="my-service", service_version="abc123", ) @@ -56,7 +56,7 @@ def test_service_version_is_set_on_resource(self): def test_service_version_omitted_when_none(self): tracing.setup( endpoint="http://localhost:4317", - protocol="grpc", + protocol=config.ExporterProtocol.GRPC, service_name="my-service", )