Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging
from cloud_pipelines_backend.instrumentation import otel_tracing
from cloud_pipelines_backend.instrumentation import opentelemetry as otel

app = fastapi.FastAPI(
title="Cloud Pipelines API",
version="0.0.1",
separate_input_output_schemas=False,
)

# Configure OpenTelemetry tracing
otel_tracing.setup_api_tracing(app)
otel.setup_providers()
otel.instrument_fastapi(app)

# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
OpenTelemetry instrumentation public API.

Usage::

from cloud_pipelines_backend.instrumentation import opentelemetry as otel

otel.setup_providers()
otel.instrument_fastapi(app)
"""

from cloud_pipelines_backend.instrumentation.opentelemetry import auto_instrumentation
from cloud_pipelines_backend.instrumentation.opentelemetry import providers
from cloud_pipelines_backend.instrumentation.opentelemetry import tracing

instrument_fastapi = auto_instrumentation.instrument_fastapi
setup_providers = providers.setup
setup_tracing = tracing.setup

__all__ = [
"instrument_fastapi",
"setup_providers",
"setup_tracing",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Shared OpenTelemetry configuration resolution.

Reads and validates OTel settings from environment variables.
"""

import dataclasses
import enum
import os


class ExporterProtocol(str, enum.Enum):
GRPC = "grpc"
HTTP = "http"


@dataclasses.dataclass(frozen=True, kw_only=True)
class OtelConfig:
endpoint: str
protocol: str
service_name: str


def resolve(service_name: str | None = None) -> OtelConfig | None:
"""Read and validate shared OTel configuration from environment variables.

Returns None if OTel is not configured (no exporter endpoint set).
Raises ValueError if the configuration is invalid.
"""
otel_endpoint = os.environ.get("TANGLE_OTEL_EXPORTER_ENDPOINT")
if not otel_endpoint:
return None

otel_protocol = os.environ.get(
"TANGLE_OTEL_EXPORTER_PROTOCOL", ExporterProtocol.GRPC
)

if service_name is None:
app_env = os.environ.get("TANGLE_ENV", "unknown")
service_name = f"tangle-{app_env}"

if not otel_endpoint.startswith(("http://", "https://")):
raise ValueError(
f"Invalid OTel endpoint format: {otel_endpoint}. "
f"Expected format: http://<host>:<port> or https://<host>:<port>"
)
try:
ExporterProtocol(otel_protocol)
except ValueError:
raise ValueError(
f"Invalid OTel protocol: {otel_protocol}. "
f"Expected values: {', '.join(e.value for e in ExporterProtocol)}"
)

return OtelConfig(
endpoint=otel_endpoint,
protocol=otel_protocol,
service_name=service_name,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
OpenTelemetry provider state checks.

Queries the global OpenTelemetry SDK state to determine which
providers have been configured.
"""

from opentelemetry import metrics as otel_metrics
from opentelemetry import trace
from opentelemetry.sdk import metrics as otel_sdk_metrics
from opentelemetry.sdk import trace as otel_sdk_trace


def has_configured_providers() -> bool:
"""Check whether any OpenTelemetry SDK providers have been configured globally.

Logs provider is omitted while the OpenTelemetry Logs API remains experimental.
"""
return isinstance(
trace.get_tracer_provider(), otel_sdk_trace.TracerProvider
) or isinstance(otel_metrics.get_meter_provider(), otel_sdk_metrics.MeterProvider)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest
from opentelemetry import trace


@pytest.fixture(autouse=True)
def reset_otel_tracer_provider():
"""Reset the global OTel tracer provider between tests.

OTel only allows set_tracer_provider to be called once per process.
We reset the internal guard so each test gets a clean slate.
"""
yield
trace._TRACER_PROVIDER_SET_ONCE._done = False
trace._TRACER_PROVIDER = trace.ProxyTracerProvider()
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Tests for the OpenTelemetry config module."""

import pytest

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config


class TestExporterProtocol:
"""Tests for config.ExporterProtocol enum."""

def test_grpc_value(self):
assert config.ExporterProtocol.GRPC == "grpc"

def test_http_value(self):
assert config.ExporterProtocol.HTTP == "http"

def test_invalid_value_raises(self):
with pytest.raises(ValueError):
config.ExporterProtocol("websocket")


class TestResolve:
"""Tests for config.resolve()."""

def test_returns_none_when_endpoint_not_set(self, monkeypatch):
monkeypatch.delenv("TANGLE_OTEL_EXPORTER_ENDPOINT", raising=False)

result = config.resolve()

assert result is None

def test_returns_config_with_defaults(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")
monkeypatch.delenv("TANGLE_OTEL_EXPORTER_PROTOCOL", raising=False)
monkeypatch.delenv("TANGLE_ENV", raising=False)

result = config.resolve()

assert result is not None
assert result.endpoint == "http://localhost:4317"
assert result.protocol == config.ExporterProtocol.GRPC
assert result.service_name == "tangle-unknown"

def test_uses_custom_service_name(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")

result = config.resolve(service_name="oasis-api")

assert result.service_name == "oasis-api"

def test_service_name_includes_tangle_env(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")
monkeypatch.setenv("TANGLE_ENV", "production")

result = config.resolve()

assert result.service_name == "tangle-production"

def test_respects_http_protocol(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4318")
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_PROTOCOL", "http")

result = config.resolve()

assert result.protocol == config.ExporterProtocol.HTTP

def test_respects_grpc_protocol(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_PROTOCOL", "grpc")

result = config.resolve()

assert result.protocol == config.ExporterProtocol.GRPC

def test_raises_on_invalid_endpoint_format(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "localhost:4317")

with pytest.raises(ValueError, match="Invalid OTel endpoint format"):
config.resolve()

def test_raises_on_invalid_protocol(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_PROTOCOL", "websocket")

with pytest.raises(ValueError, match="Invalid OTel protocol"):
config.resolve()

def test_accepts_https_endpoint(self, monkeypatch):
monkeypatch.setenv(
"TANGLE_OTEL_EXPORTER_ENDPOINT", "https://collector.example.com:4317"
)

result = config.resolve()

assert result.endpoint == "https://collector.example.com:4317"

def test_config_is_frozen(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")

result = config.resolve()

with pytest.raises(AttributeError):
result.endpoint = "http://other:4317"

def test_config_requires_keyword_arguments(self, monkeypatch):
monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317")

result = config.resolve()

with pytest.raises(TypeError):
config.OtelConfig(result.endpoint, result.protocol, result.service_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Tests for the OpenTelemetry provider state checks."""

from opentelemetry import trace
from opentelemetry.sdk import trace as otel_sdk_trace

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import providers


class TestHasConfiguredProviders:
"""Tests for providers.has_configured_providers()."""

def test_returns_false_with_default_provider(self):
assert providers.has_configured_providers() is False

def test_returns_true_with_sdk_tracer_provider(self):
trace.set_tracer_provider(otel_sdk_trace.TracerProvider())

assert providers.has_configured_providers() is True
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
OpenTelemetry auto-instrumentation for FastAPI applications.

Instrumentation is only activated when at least one OpenTelemetry SDK
provider (traces, metrics, or logs) has been configured globally.
"""

import fastapi
import logging

from opentelemetry.instrumentation import fastapi as otel_fastapi

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import providers

_logger = logging.getLogger(__name__)


def instrument_fastapi(app: fastapi.FastAPI) -> None:
"""
Apply OpenTelemetry auto-instrumentation to a FastAPI application.

No-op if no OpenTelemetry SDK providers have been configured globally,
since there would be no backend to receive the telemetry data.

See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html

Args:
app: The FastAPI application instance to instrument.
"""
if not providers.has_configured_providers():
_logger.debug(
"Skipping FastAPI auto-instrumentation: no OpenTelemetry providers configured"
)
return

try:
otel_fastapi.FastAPIInstrumentor.instrument_app(app)
_logger.info("FastAPI auto-instrumentation enabled")
except Exception as e:
_logger.exception("Failed to apply FastAPI auto-instrumentation")
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
OpenTelemetry provider setup.

Provides entry points to configure OpenTelemetry providers.
"""

import logging

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import config
from cloud_pipelines_backend.instrumentation.opentelemetry import tracing

_logger = logging.getLogger(__name__)


def setup(service_name: str | None = None) -> None:
"""
Configure global OpenTelemetry providers (traces, metrics).

No-op if TANGLE_OTEL_EXPORTER_ENDPOINT is not set.

Use this for non-FastAPI entrypoints (e.g. orchestrators, workers) that
need telemetry but have no ASGI app to auto-instrument.

Args:
service_name: Override the default service name reported to the collector.
"""
try:
otel_config = config.resolve(service_name=service_name)
except Exception as e:
_logger.exception("Failed to resolve OpenTelemetry configuration")
return

if otel_config is None:
return

tracing.setup(
endpoint=otel_config.endpoint,
protocol=otel_config.protocol,
service_name=otel_config.service_name,
)

# TODO: Setup metrics provider once it's available
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest
from opentelemetry import trace


@pytest.fixture(autouse=True)
def reset_otel_tracer_provider():
"""Reset the global OTel tracer provider between tests.

OTel only allows set_tracer_provider to be called once per process.
We reset the internal guard so each test gets a clean slate.
"""
yield
trace._TRACER_PROVIDER_SET_ONCE._done = False
trace._TRACER_PROVIDER = trace.ProxyTracerProvider()
Loading