Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
"""

from cloud_pipelines_backend.instrumentation.opentelemetry import auto_instrumentation
from cloud_pipelines_backend.instrumentation.opentelemetry import metrics
from cloud_pipelines_backend.instrumentation.opentelemetry import providers
from cloud_pipelines_backend.instrumentation.opentelemetry import tracing

instrument_fastapi = auto_instrumentation.instrument_fastapi
setup_metrics = metrics.setup
setup_providers = providers.setup
setup_tracing = tracing.setup

__all__ = [
"instrument_fastapi",
"setup_metrics",
"setup_providers",
"setup_tracing",
]
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,74 @@
import os


class EnvVar(str, enum.Enum):
"""Environment variables that control OpenTelemetry configuration.

Trace exporter:
TRACE_EXPORTER_ENDPOINT: OTLP collector URL for trace data.
TRACE_EXPORTER_PROTOCOL: Transport protocol ("grpc" or "http").

Metric exporter:
METRIC_EXPORTER_ENDPOINT: OTLP collector URL for metric data.
METRIC_EXPORTER_PROTOCOL: Transport protocol ("grpc" or "http").

Metric temporality (per-instrument overrides):
Derived dynamically from MetricsTemporalityConfig field names using
the prefix TANGLE_OTEL_METRICS_TEMPORALITY_ (e.g.
TANGLE_OTEL_METRICS_TEMPORALITY_COUNTER). See MetricsTemporalityConfig
for available fields and their defaults.

Service identity:
ENV: Application environment, used to build the default service name
(e.g. "tangle-production").
SERVICE_VERSION: Deployed revision or version tag.
"""

TRACE_EXPORTER_ENDPOINT = "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT"
TRACE_EXPORTER_PROTOCOL = "TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL"
METRIC_EXPORTER_ENDPOINT = "TANGLE_OTEL_METRIC_EXPORTER_ENDPOINT"
METRIC_EXPORTER_PROTOCOL = "TANGLE_OTEL_METRIC_EXPORTER_PROTOCOL"
ENV = "TANGLE_ENV"
SERVICE_VERSION = "TANGLE_SERVICE_VERSION"


class ExporterProtocol(str, enum.Enum):
GRPC = "grpc"
HTTP = "http"


class AggregationTemporality(str, enum.Enum):
DELTA = "delta"
CUMULATIVE = "cumulative"


@dataclasses.dataclass(frozen=True, kw_only=True)
class ExporterConfig:
endpoint: str
protocol: str


@dataclasses.dataclass(frozen=True, kw_only=True)
class MetricsTemporalityConfig:
counter: str = AggregationTemporality.DELTA
observable_counter: str = AggregationTemporality.DELTA
up_down_counter: str = AggregationTemporality.CUMULATIVE
observable_up_down_counter: str = AggregationTemporality.CUMULATIVE
histogram: str = AggregationTemporality.DELTA


@dataclasses.dataclass(frozen=True, kw_only=True)
class MetricsConfig:
exporter: ExporterConfig
temporality: MetricsTemporalityConfig


@dataclasses.dataclass(frozen=True, kw_only=True)
class OtelConfig:
service_name: str
service_version: str
trace_exporter: ExporterConfig | None = None
metrics: MetricsConfig | None = None


def _resolve_exporter(
Expand All @@ -47,12 +99,38 @@ def _resolve_exporter(
except ValueError:
raise ValueError(
f"Invalid OTel protocol: {protocol}. "
f"Expected values: {', '.join(e.value for e in ExporterProtocol)}"
f"Expected values: {', '.join(ExporterProtocol)}"
)

return ExporterConfig(endpoint=endpoint, protocol=protocol)


def _resolve_temporality(env_var: str, default: str) -> str:
value = os.environ.get(env_var, default).lower()
try:
AggregationTemporality(value)
except ValueError:
raise ValueError(
f"Invalid OTel metrics temporality: {value} (from {env_var}). "
f"Expected values: {', '.join(AggregationTemporality)}"
)
return value


_TEMPORALITY_ENV_PREFIX = "TANGLE_OTEL_METRICS_TEMPORALITY_"


def _resolve_metrics_temporality() -> MetricsTemporalityConfig:
resolved = {
f.name: _resolve_temporality(
f"{_TEMPORALITY_ENV_PREFIX}{f.name.upper()}",
f.default,
)
for f in dataclasses.fields(MetricsTemporalityConfig)
}
return MetricsTemporalityConfig(**resolved)


def resolve(
service_name: str | None = None,
service_version: str | None = None,
Expand All @@ -63,22 +141,35 @@ def resolve(
Raises ValueError if any configured exporter has invalid settings.
"""
trace_exporter = _resolve_exporter(
endpoint_var="TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT",
protocol_var="TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL",
endpoint_var=EnvVar.TRACE_EXPORTER_ENDPOINT,
protocol_var=EnvVar.TRACE_EXPORTER_PROTOCOL,
)

metrics_exporter = _resolve_exporter(
endpoint_var=EnvVar.METRIC_EXPORTER_ENDPOINT,
protocol_var=EnvVar.METRIC_EXPORTER_PROTOCOL,
)

if trace_exporter is None:
if trace_exporter is None and metrics_exporter is None:
return None

if service_name is None:
app_env = os.environ.get("TANGLE_ENV", "unknown")
app_env = os.environ.get(EnvVar.ENV, "unknown")
service_name = f"tangle-{app_env}"

if service_version is None:
service_version = os.environ.get("TANGLE_SERVICE_VERSION", "unknown")
service_version = os.environ.get(EnvVar.SERVICE_VERSION, "unknown")

metrics = None
if metrics_exporter is not None:
metrics = MetricsConfig(
exporter=metrics_exporter,
temporality=_resolve_metrics_temporality(),
)

return OtelConfig(
service_name=service_name,
service_version=service_version,
trace_exporter=trace_exporter,
metrics=metrics,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Maps string-based temporality configuration to OTel SDK types.
"""

from opentelemetry import metrics as otel_metrics
from opentelemetry.sdk import metrics as otel_sdk_metrics
from opentelemetry.sdk.metrics import export as otel_metrics_export

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config

_TEMPORALITY_MAP = {
config.AggregationTemporality.DELTA: otel_metrics_export.AggregationTemporality.DELTA,
config.AggregationTemporality.CUMULATIVE: otel_metrics_export.AggregationTemporality.CUMULATIVE,
}


def build_preferred_temporality(
temporality: config.MetricsTemporalityConfig,
) -> dict[type[otel_metrics.Instrument], otel_metrics_export.AggregationTemporality]:
return {
otel_sdk_metrics.Counter: _TEMPORALITY_MAP[temporality.counter],
otel_sdk_metrics.ObservableCounter: _TEMPORALITY_MAP[
temporality.observable_counter
],
otel_sdk_metrics.UpDownCounter: _TEMPORALITY_MAP[temporality.up_down_counter],
otel_sdk_metrics.ObservableUpDownCounter: _TEMPORALITY_MAP[
temporality.observable_up_down_counter
],
otel_sdk_metrics.Histogram: _TEMPORALITY_MAP[temporality.histogram],
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import pytest
from opentelemetry import metrics as otel_metrics
from opentelemetry import trace


@pytest.fixture(autouse=True)
def reset_otel_tracer_provider():
"""Reset the global OTel tracer provider between tests.
def reset_otel_providers():
"""Reset global OTel providers between tests.

OTel only allows set_tracer_provider to be called once per process.
We reset the internal guard so each test gets a clean slate.
OTel only allows set_tracer_provider / set_meter_provider to be called
once per process. We reset the internal guards so each test gets a
clean slate.
"""
yield
trace._TRACER_PROVIDER_SET_ONCE._done = False
trace._TRACER_PROVIDER = trace.ProxyTracerProvider()
otel_metrics._internal._METER_PROVIDER_SET_ONCE._done = False
otel_metrics._internal._METER_PROVIDER = None
Loading