From ed1f4b4fc4701234137016fb404ee636202ec5dd Mon Sep 17 00:00:00 2001 From: Alexander Nicholson <4584443+DragonStuff@users.noreply.github.com> Date: Sat, 27 Dec 2025 13:08:03 +0700 Subject: [PATCH 1/6] feat(logging): add fluent-bit log shipping Implements #3430. This PR is partially implemented using Cursor. --- docs/docs/guides/server-deployment.md | 80 ++- docs/docs/reference/environment-variables.md | 7 + pyproject.toml | 8 +- .../server/services/logs/__init__.py | 24 + .../server/services/logs/fluentbit.py | 339 +++++++++++ src/dstack/_internal/server/settings.py | 9 + .../server/services/test_fluentbit_logs.py | 572 ++++++++++++++++++ 7 files changed, 1035 insertions(+), 4 deletions(-) create mode 100644 src/dstack/_internal/server/services/logs/fluentbit.py create mode 100644 src/tests/_internal/server/services/test_fluentbit_logs.py diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index f1d7546d77..98b3009bc7 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -159,7 +159,7 @@ $ DSTACK_DATABASE_URL=postgresql+asyncpg://user:password@db-host:5432/dstack dst By default, `dstack` stores workload logs locally in `~/.dstack/server/projects//logs`. For multi-replica server deployments, it's required to store logs externally. -`dstack` supports storing logs using AWS CloudWatch or GCP Logging. +`dstack` supports storing logs using AWS CloudWatch, GCP Logging, or Fluent-bit with Elasticsearch / Opensearch. ### AWS CloudWatch @@ -222,6 +222,78 @@ To store logs using GCP Logging, set the `DSTACK_SERVER_GCP_LOGGING_PROJECT` env +### Fluent-bit + +To store logs using Fluent-bit, set the `DSTACK_SERVER_FLUENTBIT_HOST` environment variable. +Fluent-bit supports two modes depending on how you want to access logs. + +=== "Full mode" + + Logs are shipped to Fluent-bit and can be read back through the dstack UI and CLI via Elasticsearch or OpenSearch. + Use this mode when you want a complete integration with log viewing in dstack: + + ```shell + $ DSTACK_SERVER_FLUENTBIT_HOST=fluentbit.example.com \ + DSTACK_SERVER_ELASTICSEARCH_HOST=https://elasticsearch.example.com:9200 \ + dstack server + ``` + +=== "Ship-only mode" + + Logs are forwarded to Fluent-bit but cannot be read through dstack. + The dstack UI/CLI will show empty logs. Use this mode when: + + - You have an existing logging infrastructure (Kibana, Grafana, Datadog, etc.) + - You only need to forward logs without reading them back through dstack + - You want to reduce operational complexity by not running Elasticsearch/OpenSearch + + ```shell + $ DSTACK_SERVER_FLUENTBIT_HOST=fluentbit.example.com \ + dstack server + ``` + +??? info "Additional configuration" + The following optional environment variables can be used to customize the Fluent-bit integration: + + **Fluent-bit settings:** + + - `DSTACK_SERVER_FLUENTBIT_PORT` – The Fluent-bit port. Defaults to `24224`. + - `DSTACK_SERVER_FLUENTBIT_PROTOCOL` – The protocol to use: `forward` or `http`. Defaults to `forward`. + - `DSTACK_SERVER_FLUENTBIT_TAG_PREFIX` – The tag prefix for logs. Defaults to `dstack`. + + **Elasticsearch/OpenSearch settings (for full mode only):** + + - `DSTACK_SERVER_ELASTICSEARCH_HOST` – The Elasticsearch/OpenSearch host for reading logs. If not set, runs in ship-only mode. + - `DSTACK_SERVER_ELASTICSEARCH_INDEX` – The Elasticsearch/OpenSearch index pattern. Defaults to `dstack-logs`. + - `DSTACK_SERVER_ELASTICSEARCH_API_KEY` – The Elasticsearch/OpenSearch API key for authentication. + +??? info "Fluent-bit configuration" + Configure Fluent-bit to receive logs and forward them to Elasticsearch or OpenSearch. Example configuration: + + ```ini + [INPUT] + Name forward + Listen 0.0.0.0 + Port 24224 + + [OUTPUT] + Name es + Match dstack.* + Host elasticsearch.example.com + Port 9200 + Index dstack-logs + Suppress_Type_Name On + ``` + +??? info "Required dependencies" + To use Fluent-bit log storage, install the `fluentbit` extras: + + ```shell + $ pip install "dstack[all]" -U + # or + $ pip install "dstack[fluentbit]" -U + ``` + ## File storage When using [files](../concepts/dev-environments.md#files) or [repos](../concepts/dev-environments.md#repos), `dstack` uploads local files and diffs to the server so that you can have access to them within runs. By default, the files are stored in the DB and each upload is limited to 2MB. You can configure an object storage to be used for uploads and increase the default limit by setting the `DSTACK_SERVER_CODE_UPLOAD_LIMIT` environment variable @@ -426,8 +498,10 @@ If a deployment is stuck due to a deadlock when applying DB migrations, try scal ??? info "Can I run multiple replicas of dstack server?" - Yes, you can if you configure `dstack` to use [PostgreSQL](#postgresql) and [AWS CloudWatch](#aws-cloudwatch). + Yes, you can if you configure `dstack` to use [PostgreSQL](#postgresql) and an external log storage + such as [AWS CloudWatch](#aws-cloudwatch), [GCP Logging](#gcp-logging), or [Fluent-bit](#fluent-bit). ??? info "Does dstack server support blue-green or rolling deployments?" - Yes, it does if you configure `dstack` to use [PostgreSQL](#postgresql) and [AWS CloudWatch](#aws-cloudwatch). + Yes, it does if you configure `dstack` to use [PostgreSQL](#postgresql) and an external log storage + such as [AWS CloudWatch](#aws-cloudwatch), [GCP Logging](#gcp-logging), or [Fluent-bit](#fluent-bit). diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 4575f1b8f8..62ce97cd12 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -113,6 +113,13 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_SERVER_CLOUDWATCH_LOG_GROUP`{ #DSTACK_SERVER_CLOUDWATCH_LOG_GROUP } – The CloudWatch Logs group for storing workloads logs. If not set, the default file-based log storage is used. - `DSTACK_SERVER_CLOUDWATCH_LOG_REGION`{ #DSTACK_SERVER_CLOUDWATCH_LOG_REGION } – The CloudWatch Logs region. Defaults to `None`. - `DSTACK_SERVER_GCP_LOGGING_PROJECT`{ #DSTACK_SERVER_GCP_LOGGING_PROJECT } – The GCP Logging project for storing workloads logs. If not set, the default file-based log storage is used. +- `DSTACK_SERVER_FLUENTBIT_HOST`{ #DSTACK_SERVER_FLUENTBIT_HOST } – The Fluent-bit host for log forwarding. If set, enables Fluent-bit log storage. +- `DSTACK_SERVER_FLUENTBIT_PORT`{ #DSTACK_SERVER_FLUENTBIT_PORT } – The Fluent-bit port. Defaults to `24224`. +- `DSTACK_SERVER_FLUENTBIT_PROTOCOL`{ #DSTACK_SERVER_FLUENTBIT_PROTOCOL } – The protocol to use: `forward` or `http`. Defaults to `forward`. +- `DSTACK_SERVER_FLUENTBIT_TAG_PREFIX`{ #DSTACK_SERVER_FLUENTBIT_TAG_PREFIX } – The tag prefix for logs. Defaults to `dstack`. +- `DSTACK_SERVER_ELASTICSEARCH_HOST`{ #DSTACK_SERVER_ELASTICSEARCH_HOST } – The Elasticsearch/OpenSearch host for reading logs back through dstack. Optional; if not set, Fluent-bit runs in ship-only mode (logs are forwarded but not readable through dstack UI/CLI). +- `DSTACK_SERVER_ELASTICSEARCH_INDEX`{ #DSTACK_SERVER_ELASTICSEARCH_INDEX } – The Elasticsearch/OpenSearch index pattern. Defaults to `dstack-logs`. +- `DSTACK_SERVER_ELASTICSEARCH_API_KEY`{ #DSTACK_SERVER_ELASTICSEARCH_API_KEY } – The Elasticsearch/OpenSearch API key for authentication. - `DSTACK_ENABLE_PROMETHEUS_METRICS`{ #DSTACK_ENABLE_PROMETHEUS_METRICS } — Enables Prometheus metrics collection and export. - `DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE`{ #DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE } – Request body size limit for services running with a gateway, in bytes. Defaults to 64 MiB. - `DSTACK_SERVICE_CLIENT_TIMEOUT`{ #DSTACK_SERVICE_CLIENT_TIMEOUT } – Timeout in seconds for HTTP requests sent from the in-server proxy and gateways to service replicas. Defaults to 60. diff --git a/pyproject.toml b/pyproject.toml index c0036ff7be..7d4c3a39f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -216,6 +216,12 @@ nebius = [ "nebius>=0.3.4,<0.4; python_version >= '3.10'", "dstack[server]", ] +fluentbit = [ + "fluent-logger>=0.10.0", + "elasticsearch>=8.0.0", + "httpx", + "dstack[server]", +] all = [ - "dstack[gateway,server,aws,azure,gcp,verda,kubernetes,lambda,nebius,oci]", + "dstack[gateway,server,aws,azure,gcp,verda,kubernetes,lambda,nebius,oci,fluentbit]", ] diff --git a/src/dstack/_internal/server/services/logs/__init__.py b/src/dstack/_internal/server/services/logs/__init__.py index 5b06ff4ad2..1f8565d49c 100644 --- a/src/dstack/_internal/server/services/logs/__init__.py +++ b/src/dstack/_internal/server/services/logs/__init__.py @@ -8,6 +8,7 @@ from dstack._internal.server.schemas.logs import PollLogsRequest from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent from dstack._internal.server.services.logs import aws as aws_logs +from dstack._internal.server.services.logs import fluentbit as fluentbit_logs from dstack._internal.server.services.logs import gcp as gcp_logs from dstack._internal.server.services.logs.base import ( LogStorage, @@ -57,6 +58,29 @@ def get_log_storage() -> LogStorage: logger.debug("Using GCP Logs storage") else: logger.error("Cannot use GCP Logs storage: GCP deps are not installed") + elif settings.SERVER_FLUENTBIT_HOST: + if fluentbit_logs.FLUENTBIT_AVAILABLE: + try: + _log_storage = fluentbit_logs.FluentBitLogStorage( + host=settings.SERVER_FLUENTBIT_HOST, + port=settings.SERVER_FLUENTBIT_PORT, + protocol=settings.SERVER_FLUENTBIT_PROTOCOL, + tag_prefix=settings.SERVER_FLUENTBIT_TAG_PREFIX, + es_host=settings.SERVER_ELASTICSEARCH_HOST, + es_index=settings.SERVER_ELASTICSEARCH_INDEX, + es_api_key=settings.SERVER_ELASTICSEARCH_API_KEY, + ) + except LogStorageError as e: + logger.error("Failed to initialize Fluent-bit Logs storage: %s", e) + except Exception: + logger.exception("Got exception when initializing Fluent-bit Logs storage") + else: + if settings.SERVER_ELASTICSEARCH_HOST: + logger.debug("Using Fluent-bit Logs storage with Elasticsearch/OpenSearch") + else: + logger.debug("Using Fluent-bit Logs storage in ship-only mode") + else: + logger.error("Cannot use Fluent-bit Logs storage: fluent-logger is not installed") if _log_storage is None: _log_storage = FileLogStorage() logger.debug("Using file-based storage") diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py new file mode 100644 index 0000000000..064178e3ad --- /dev/null +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -0,0 +1,339 @@ +from typing import List, Optional, Protocol, runtime_checkable +from uuid import UUID + +from dstack._internal.core.models.logs import ( + JobSubmissionLogs, + LogEvent, + LogEventSource, + LogProducer, +) +from dstack._internal.server.models import ProjectModel +from dstack._internal.server.schemas.logs import PollLogsRequest +from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent +from dstack._internal.server.services.logs.base import ( + LogStorage, + LogStorageError, + unix_time_ms_to_datetime, +) +from dstack._internal.utils.common import batched +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +FLUENTBIT_AVAILABLE = True +try: + import httpx + from fluent import sender as fluent_sender +except ImportError: + FLUENTBIT_AVAILABLE = False + +# Check if elasticsearch is available (optional for ship-only mode) +ELASTICSEARCH_AVAILABLE = True +try: + from elasticsearch import Elasticsearch + from elasticsearch.exceptions import ApiError, TransportError + + # Catch both API errors and transport/connection errors + ElasticsearchError: tuple = (ApiError, TransportError) # type: ignore[misc] +except ImportError: + ELASTICSEARCH_AVAILABLE = False + ElasticsearchError = (Exception,) # type: ignore[misc,assignment] + +if FLUENTBIT_AVAILABLE: + + @runtime_checkable + class FluentBitWriter(Protocol): + """Protocol for Fluent-bit log writers.""" + + def write(self, tag: str, records: List[dict]) -> None: + """Write log records to Fluent-bit.""" + ... + + def close(self) -> None: + """Close any resources.""" + ... + + @runtime_checkable + class LogReader(Protocol): + """Protocol for log readers (Interface Segregation Principle).""" + + def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: + """Read logs from the storage backend.""" + ... + + def close(self) -> None: + """Close any resources.""" + ... + + class HTTPFluentBitWriter: + """Writes logs to Fluent-bit via HTTP POST.""" + + def __init__(self, host: str, port: int) -> None: + self._endpoint = f"http://{host}:{port}" + self._client = httpx.Client(timeout=30.0) + + def write(self, tag: str, records: List[dict]) -> None: + for record in records: + try: + self._client.post( + f"{self._endpoint}/{tag}", + json=record, + headers={"Content-Type": "application/json"}, + ) + except httpx.HTTPError as e: + logger.error("Failed to write log to Fluent-bit via HTTP: %s", e) + raise LogStorageError(f"Fluent-bit HTTP error: {e}") from e + + def close(self) -> None: + self._client.close() + + class ForwardFluentBitWriter: + """Writes logs to Fluent-bit using Forward protocol.""" + + def __init__(self, host: str, port: int, tag_prefix: str) -> None: + self._sender = fluent_sender.FluentSender(tag_prefix, host=host, port=port) + self._tag_prefix = tag_prefix + + def write(self, tag: str, records: List[dict]) -> None: + for record in records: + if not self._sender.emit(tag, record): + error = self._sender.last_error + logger.error("Failed to write log to Fluent-bit via Forward: %s", error) + self._sender.clear_last_error() + raise LogStorageError(f"Fluent-bit Forward error: {error}") + + def close(self) -> None: + self._sender.close() + + class NullLogReader: + """ + Null reader for ship-only mode (no Elasticsearch/OpenSearch configured). + + Returns empty logs. Useful when logs are shipped to an external system + that is accessed directly rather than through dstack. + """ + + def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: + return JobSubmissionLogs(logs=[], next_token=None) + + def close(self) -> None: + pass + + if ELASTICSEARCH_AVAILABLE: + + class ElasticsearchReader: + """Reads logs from Elasticsearch or OpenSearch.""" + + def __init__( + self, + host: str, + index: str, + api_key: Optional[str] = None, + ) -> None: + if api_key: + self._client = Elasticsearch(hosts=[host], api_key=api_key) + else: + self._client = Elasticsearch(hosts=[host]) + self._index = index + # Verify connection + try: + self._client.info() + except ElasticsearchError as e: + raise LogStorageError( + f"Failed to connect to Elasticsearch/OpenSearch: {e}" + ) from e + + def read( + self, + stream_name: str, + request: PollLogsRequest, + ) -> JobSubmissionLogs: + sort_order = "desc" if request.descending else "asc" + + query: dict = { + "bool": { + "must": [ + {"term": {"stream.keyword": stream_name}}, + ] + } + } + + if request.start_time: + query["bool"].setdefault("filter", []).append( + {"range": {"@timestamp": {"gt": request.start_time.isoformat()}}} + ) + if request.end_time: + query["bool"].setdefault("filter", []).append( + {"range": {"@timestamp": {"lt": request.end_time.isoformat()}}} + ) + + search_params: dict = { + "index": self._index, + "query": query, + "sort": [{"@timestamp": {"order": sort_order}}], + "size": request.limit, + } + + if request.next_token: + search_params["search_after"] = [request.next_token] + + try: + response = self._client.search(**search_params) + except ElasticsearchError as e: + logger.error("Elasticsearch/OpenSearch search error: %s", e) + raise LogStorageError(f"Elasticsearch/OpenSearch error: {e}") from e + + hits = response.get("hits", {}).get("hits", []) + logs = [] + last_sort_value = None + + for hit in hits: + source = hit.get("_source", {}) + timestamp_str = source.get("@timestamp") + message = source.get("message", "") + + if timestamp_str: + from datetime import datetime + + try: + timestamp = datetime.fromisoformat( + timestamp_str.replace("Z", "+00:00") + ) + except ValueError: + continue + else: + continue + + logs.append( + LogEvent( + timestamp=timestamp, + log_source=LogEventSource.STDOUT, + message=message, + ) + ) + + sort_values = hit.get("sort") + if sort_values: + last_sort_value = sort_values[0] + + next_token = None + if len(logs) == request.limit and last_sort_value is not None: + next_token = str(last_sort_value) + + return JobSubmissionLogs( + logs=logs, + next_token=next_token, + ) + + def close(self) -> None: + self._client.close() + + class FluentBitLogStorage(LogStorage): + """ + Log storage using Fluent-bit for writing and optionally Elasticsearch/OpenSearch for reading. + + Supports two modes: + - Full mode: Writes to Fluent-bit and reads from Elasticsearch/OpenSearch + - Ship-only mode: Writes to Fluent-bit only (no reading, returns empty logs) + """ + + MAX_BATCH_SIZE = 100 + + def __init__( + self, + host: str, + port: int, + protocol: str, + tag_prefix: str, + es_host: Optional[str] = None, + es_index: str = "dstack-logs", + es_api_key: Optional[str] = None, + ) -> None: + self._tag_prefix = tag_prefix + + if protocol == "http": + self._writer: FluentBitWriter = HTTPFluentBitWriter(host=host, port=port) + elif protocol == "forward": + self._writer = ForwardFluentBitWriter(host=host, port=port, tag_prefix=tag_prefix) + else: + raise LogStorageError(f"Unsupported Fluent-bit protocol: {protocol}") + + # Initialize reader based on configuration (Dependency Inversion Principle) + self._reader: LogReader + if es_host: + if not ELASTICSEARCH_AVAILABLE: + raise LogStorageError( + "Elasticsearch/OpenSearch host configured but elasticsearch package " + "is not installed. Install with: pip install elasticsearch" + ) + self._reader = ElasticsearchReader( + host=es_host, + index=es_index, + api_key=es_api_key, + ) + logger.debug( + "Fluent-bit log storage initialized with Elasticsearch/OpenSearch reader" + ) + else: + self._reader = NullLogReader() + logger.info( + "Fluent-bit log storage initialized in ship-only mode " + "(no Elasticsearch/OpenSearch configured for reading)" + ) + + def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmissionLogs: + producer = LogProducer.RUNNER if request.diagnose else LogProducer.JOB + stream_name = self._get_stream_name( + project_name=project.name, + run_name=request.run_name, + job_submission_id=request.job_submission_id, + producer=producer, + ) + return self._reader.read(stream_name=stream_name, request=request) + + def write_logs( + self, + project: ProjectModel, + run_name: str, + job_submission_id: UUID, + runner_logs: List[RunnerLogEvent], + job_logs: List[RunnerLogEvent], + ) -> None: + producers_with_logs = [(LogProducer.RUNNER, runner_logs), (LogProducer.JOB, job_logs)] + for producer, producer_logs in producers_with_logs: + if not producer_logs: + continue + stream_name = self._get_stream_name( + project_name=project.name, + run_name=run_name, + job_submission_id=job_submission_id, + producer=producer, + ) + self._write_logs_to_stream(stream_name=stream_name, logs=producer_logs) + + def _write_logs_to_stream(self, stream_name: str, logs: List[RunnerLogEvent]) -> None: + for batch in batched(logs, self.MAX_BATCH_SIZE): + records = [] + for log in batch: + message = log.message.decode(errors="replace") + timestamp = unix_time_ms_to_datetime(log.timestamp) + records.append( + { + "message": message, + "@timestamp": timestamp.isoformat(), + "stream": stream_name, + } + ) + self._writer.write(tag=stream_name, records=records) + + def close(self) -> None: + try: + self._writer.close() + finally: + self._reader.close() + + def _get_stream_name( + self, project_name: str, run_name: str, job_submission_id: UUID, producer: LogProducer + ) -> str: + return f"{project_name}/{run_name}/{job_submission_id}/{producer.value}" diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 74d1d7b8d5..6e5c8e4bc1 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -78,6 +78,15 @@ SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT") +SERVER_FLUENTBIT_HOST = os.getenv("DSTACK_SERVER_FLUENTBIT_HOST") +SERVER_FLUENTBIT_PORT = int(os.getenv("DSTACK_SERVER_FLUENTBIT_PORT", "24224")) +SERVER_FLUENTBIT_PROTOCOL = os.getenv("DSTACK_SERVER_FLUENTBIT_PROTOCOL", "forward") +SERVER_FLUENTBIT_TAG_PREFIX = os.getenv("DSTACK_SERVER_FLUENTBIT_TAG_PREFIX", "dstack") + +SERVER_ELASTICSEARCH_HOST = os.getenv("DSTACK_SERVER_ELASTICSEARCH_HOST") +SERVER_ELASTICSEARCH_INDEX = os.getenv("DSTACK_SERVER_ELASTICSEARCH_INDEX", "dstack-logs") +SERVER_ELASTICSEARCH_API_KEY = os.getenv("DSTACK_SERVER_ELASTICSEARCH_API_KEY") + SERVER_METRICS_RUNNING_TTL_SECONDS = environ.get_int( "DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS", default=3600 ) diff --git a/src/tests/_internal/server/services/test_fluentbit_logs.py b/src/tests/_internal/server/services/test_fluentbit_logs.py new file mode 100644 index 0000000000..ef67bb0423 --- /dev/null +++ b/src/tests/_internal/server/services/test_fluentbit_logs.py @@ -0,0 +1,572 @@ +from datetime import datetime, timezone +from unittest.mock import Mock, patch +from uuid import UUID + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.server.models import ProjectModel +from dstack._internal.server.schemas.logs import PollLogsRequest +from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent +from dstack._internal.server.services.logs.base import LogStorageError +from dstack._internal.server.services.logs.fluentbit import ( + ELASTICSEARCH_AVAILABLE, + FLUENTBIT_AVAILABLE, +) +from dstack._internal.server.testing.common import create_project + +pytestmark = pytest.mark.skipif(not FLUENTBIT_AVAILABLE, reason="fluent-logger not installed") + +# Conditionally import classes that are only defined when FLUENTBIT_AVAILABLE is True +if FLUENTBIT_AVAILABLE: + from dstack._internal.server.services.logs.fluentbit import ( + FluentBitLogStorage, + ForwardFluentBitWriter, + HTTPFluentBitWriter, + NullLogReader, + ) + + if ELASTICSEARCH_AVAILABLE: + from dstack._internal.server.services.logs.fluentbit import ElasticsearchReader + + +class TestNullLogReader: + """Tests for the NullLogReader (ship-only mode).""" + + def test_read_returns_empty_logs(self): + reader = NullLogReader() + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=100, + ) + result = reader.read("test-stream", request) + + assert result.logs == [] + assert result.next_token is None + + def test_close_does_nothing(self): + reader = NullLogReader() + reader.close() # Should not raise + + +class TestHTTPFluentBitWriter: + """Tests for the HTTPFluentBitWriter.""" + + @pytest.fixture + def mock_httpx_client(self): + with patch("dstack._internal.server.services.logs.fluentbit.httpx.Client") as mock: + yield mock.return_value + + def test_init_creates_client(self, mock_httpx_client): + writer = HTTPFluentBitWriter(host="localhost", port=8080) + assert writer._endpoint == "http://localhost:8080" + + def test_write_posts_records(self, mock_httpx_client): + writer = HTTPFluentBitWriter(host="localhost", port=8080) + records = [ + {"message": "Hello", "@timestamp": "2023-10-06T10:00:00+00:00"}, + {"message": "World", "@timestamp": "2023-10-06T10:00:01+00:00"}, + ] + writer.write(tag="test-tag", records=records) + + assert mock_httpx_client.post.call_count == 2 + mock_httpx_client.post.assert_any_call( + "http://localhost:8080/test-tag", + json=records[0], + headers={"Content-Type": "application/json"}, + ) + mock_httpx_client.post.assert_any_call( + "http://localhost:8080/test-tag", + json=records[1], + headers={"Content-Type": "application/json"}, + ) + + def test_write_raises_on_http_error(self, mock_httpx_client): + import httpx + + mock_httpx_client.post.side_effect = httpx.HTTPError("Connection failed") + writer = HTTPFluentBitWriter(host="localhost", port=8080) + + with pytest.raises(LogStorageError, match="Fluent-bit HTTP error"): + writer.write(tag="test-tag", records=[{"message": "test"}]) + + def test_close_closes_client(self, mock_httpx_client): + writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer.close() + mock_httpx_client.close.assert_called_once() + + +class TestForwardFluentBitWriter: + """Tests for the ForwardFluentBitWriter.""" + + @pytest.fixture + def mock_fluent_sender(self): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock_instance = Mock() + mock_instance.emit.return_value = True + mock.return_value = mock_instance + yield mock_instance + + def test_init_creates_sender(self, mock_fluent_sender): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + mock.assert_called_once_with("dstack", host="localhost", port=24224) + + def test_write_emits_records(self, mock_fluent_sender): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + writer = ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + + records = [ + {"message": "Hello"}, + {"message": "World"}, + ] + writer.write(tag="test-tag", records=records) + + assert mock_fluent_sender.emit.call_count == 2 + + def test_write_raises_on_emit_failure(self, mock_fluent_sender): + mock_fluent_sender.emit.return_value = False + mock_fluent_sender.last_error = Exception("Connection refused") + + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + writer = ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + + with pytest.raises(LogStorageError, match="Fluent-bit Forward error"): + writer.write(tag="test-tag", records=[{"message": "test"}]) + + mock_fluent_sender.clear_last_error.assert_called_once() + + def test_close_closes_sender(self, mock_fluent_sender): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + writer = ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + writer.close() + mock_fluent_sender.close.assert_called_once() + + +class TestFluentBitLogStorage: + """Tests for the FluentBitLogStorage.""" + + @pytest_asyncio.fixture + async def project(self, test_db, session: AsyncSession) -> ProjectModel: + project = await create_project(session=session, name="test-proj") + return project + + @pytest.fixture + def mock_forward_writer(self): + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock_instance = Mock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def mock_http_writer(self): + with patch("dstack._internal.server.services.logs.fluentbit.HTTPFluentBitWriter") as mock: + mock_instance = Mock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def mock_es_reader(self): + with patch("dstack._internal.server.services.logs.fluentbit.ElasticsearchReader") as mock: + mock_instance = Mock() + mock.return_value = mock_instance + yield mock_instance + + def test_init_with_forward_protocol(self, mock_forward_writer): + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + mock.assert_called_once_with(host="localhost", port=24224, tag_prefix="dstack") + assert isinstance(storage._reader, NullLogReader) + + def test_init_with_http_protocol(self, mock_http_writer): + with patch("dstack._internal.server.services.logs.fluentbit.HTTPFluentBitWriter") as mock: + mock.return_value = mock_http_writer + FluentBitLogStorage( + host="localhost", + port=8080, + protocol="http", + tag_prefix="dstack", + ) + mock.assert_called_once_with(host="localhost", port=8080) + + def test_init_with_unsupported_protocol_raises(self): + with pytest.raises(LogStorageError, match="Unsupported Fluent-bit protocol"): + FluentBitLogStorage( + host="localhost", + port=24224, + protocol="grpc", + tag_prefix="dstack", + ) + + def test_init_ship_only_mode(self, mock_forward_writer): + """Test initialization without Elasticsearch (ship-only mode).""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + es_host=None, + ) + assert isinstance(storage._reader, NullLogReader) + + @pytest.mark.skipif(not ELASTICSEARCH_AVAILABLE, reason="elasticsearch not installed") + def test_init_with_elasticsearch(self, mock_forward_writer, mock_es_reader): + """Test initialization with Elasticsearch configured.""" + with ( + patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as writer_mock, + patch( + "dstack._internal.server.services.logs.fluentbit.ElasticsearchReader" + ) as reader_mock, + ): + writer_mock.return_value = mock_forward_writer + reader_mock.return_value = mock_es_reader + + FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + es_host="http://elasticsearch:9200", + es_index="dstack-logs", + es_api_key="test-key", + ) + reader_mock.assert_called_once_with( + host="http://elasticsearch:9200", + index="dstack-logs", + api_key="test-key", + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_write_logs(self, test_db, project: ProjectModel, mock_forward_writer): + """Test writing logs to Fluent-bit.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + storage.write_logs( + project=project, + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[ + RunnerLogEvent(timestamp=1696586513234, message=b"Runner log"), + ], + job_logs=[ + RunnerLogEvent(timestamp=1696586513235, message=b"Job log"), + ], + ) + + assert mock_forward_writer.write.call_count == 2 + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_write_logs_empty_logs_not_written( + self, test_db, project: ProjectModel, mock_forward_writer + ): + """Test that empty log lists are not written.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + storage.write_logs( + project=project, + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[], + job_logs=[], + ) + + mock_forward_writer.write.assert_not_called() + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_ship_only_mode(self, test_db, project: ProjectModel): + """Test that ship-only mode returns empty logs.""" + with patch("dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter"): + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=100, + ) + result = storage.poll_logs(project, request) + + assert result.logs == [] + assert result.next_token is None + + def test_close_closes_writer_and_reader(self, mock_forward_writer): + """Test that close() closes both writer and reader.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + storage.close() + + mock_forward_writer.close.assert_called_once() + + def test_close_closes_reader_even_if_writer_fails(self, mock_forward_writer): + """Test that reader is closed even if writer.close() raises an exception.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock_forward_writer.close.side_effect = Exception("Writer close failed") + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + mock_reader = Mock() + storage._reader = mock_reader + + with pytest.raises(Exception, match="Writer close failed"): + storage.close() + + mock_reader.close.assert_called_once() + + def test_get_stream_name(self, mock_forward_writer): + """Test stream name generation.""" + from dstack._internal.core.models.logs import LogProducer + + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + stream_name = storage._get_stream_name( + project_name="my-project", + run_name="my-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + producer=LogProducer.JOB, + ) + + assert stream_name == "my-project/my-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/job" + + +@pytest.mark.skipif( + not FLUENTBIT_AVAILABLE or not ELASTICSEARCH_AVAILABLE, + reason="fluent-logger or elasticsearch not installed", +) +class TestElasticsearchReader: + """Tests for the ElasticsearchReader.""" + + @pytest.fixture + def mock_es_client(self): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock_instance = Mock() + mock_instance.info.return_value = {"version": {"number": "8.0.0"}} + mock_instance.search.return_value = {"hits": {"hits": []}} + mock.return_value = mock_instance + yield mock_instance + + def test_init_verifies_connection(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + mock_es_client.info.assert_called_once() + + def test_init_with_api_key(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + api_key="test-api-key", + ) + mock.assert_called_once_with(hosts=["http://localhost:9200"], api_key="test-api-key") + + def test_init_connection_error_raises(self): + from elasticsearch.exceptions import ConnectionError as ESConnectionError + + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock_instance = Mock() + mock_instance.info.side_effect = ESConnectionError("Connection refused") + mock.return_value = mock_instance + + with pytest.raises(LogStorageError, match="Failed to connect"): + ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + def test_read_returns_logs(self, mock_es_client): + mock_es_client.search.return_value = { + "hits": { + "hits": [ + { + "_source": { + "@timestamp": "2023-10-06T10:01:53.234000+00:00", + "message": "Hello", + "stream": "test-stream", + }, + "sort": [1696586513234], + }, + { + "_source": { + "@timestamp": "2023-10-06T10:01:53.235000+00:00", + "message": "World", + "stream": "test-stream", + }, + "sort": [1696586513235], + }, + ] + } + } + + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=2, + ) + result = reader.read("test-stream", request) + + assert len(result.logs) == 2 + assert result.logs[0].message == "Hello" + assert result.logs[1].message == "World" + assert result.next_token == "1696586513235" + + def test_read_with_time_filtering(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + start_time=datetime(2023, 10, 6, 10, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2023, 10, 6, 11, 0, 0, tzinfo=timezone.utc), + limit=100, + ) + reader.read("test-stream", request) + + call_args = mock_es_client.search.call_args + query = call_args.kwargs["query"] + assert "filter" in query["bool"] + assert len(query["bool"]["filter"]) == 2 + + def test_read_descending_order(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=100, + descending=True, + ) + reader.read("test-stream", request) + + call_args = mock_es_client.search.call_args + assert call_args.kwargs["sort"] == [{"@timestamp": {"order": "desc"}}] + + def test_read_with_next_token(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + next_token="1696586513234", + limit=100, + ) + reader.read("test-stream", request) + + call_args = mock_es_client.search.call_args + assert call_args.kwargs["search_after"] == ["1696586513234"] + + def test_close_closes_client(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + reader.close() + mock_es_client.close.assert_called_once() From 517659b5a6f2e7000371636f30261b1dc54d4a41 Mon Sep 17 00:00:00 2001 From: Alexander Nicholson <4584443+DragonStuff@users.noreply.github.com> Date: Sat, 27 Dec 2025 13:17:02 +0700 Subject: [PATCH 2/6] Fix pyright errors by using try/except/else pattern for optional imports --- .../server/services/logs/fluentbit.py | 238 +++++++++--------- 1 file changed, 118 insertions(+), 120 deletions(-) diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py index 064178e3ad..7428d2054d 100644 --- a/src/dstack/_internal/server/services/logs/fluentbit.py +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -21,26 +21,133 @@ logger = get_logger(__name__) -FLUENTBIT_AVAILABLE = True -try: - import httpx - from fluent import sender as fluent_sender -except ImportError: - FLUENTBIT_AVAILABLE = False - # Check if elasticsearch is available (optional for ship-only mode) ELASTICSEARCH_AVAILABLE = True try: from elasticsearch import Elasticsearch from elasticsearch.exceptions import ApiError, TransportError +except ImportError: + ELASTICSEARCH_AVAILABLE = False +else: # Catch both API errors and transport/connection errors ElasticsearchError: tuple = (ApiError, TransportError) # type: ignore[misc] -except ImportError: - ELASTICSEARCH_AVAILABLE = False - ElasticsearchError = (Exception,) # type: ignore[misc,assignment] -if FLUENTBIT_AVAILABLE: + class ElasticsearchReader: + """Reads logs from Elasticsearch or OpenSearch.""" + + def __init__( + self, + host: str, + index: str, + api_key: Optional[str] = None, + ) -> None: + if api_key: + self._client = Elasticsearch(hosts=[host], api_key=api_key) + else: + self._client = Elasticsearch(hosts=[host]) + self._index = index + # Verify connection + try: + self._client.info() + except ElasticsearchError as e: + raise LogStorageError( + f"Failed to connect to Elasticsearch/OpenSearch: {e}" + ) from e + + def read( + self, + stream_name: str, + request: PollLogsRequest, + ) -> JobSubmissionLogs: + sort_order = "desc" if request.descending else "asc" + + query: dict = { + "bool": { + "must": [ + {"term": {"stream.keyword": stream_name}}, + ] + } + } + + if request.start_time: + query["bool"].setdefault("filter", []).append( + {"range": {"@timestamp": {"gt": request.start_time.isoformat()}}} + ) + if request.end_time: + query["bool"].setdefault("filter", []).append( + {"range": {"@timestamp": {"lt": request.end_time.isoformat()}}} + ) + + search_params: dict = { + "index": self._index, + "query": query, + "sort": [{"@timestamp": {"order": sort_order}}], + "size": request.limit, + } + + if request.next_token: + search_params["search_after"] = [request.next_token] + + try: + response = self._client.search(**search_params) + except ElasticsearchError as e: + logger.error("Elasticsearch/OpenSearch search error: %s", e) + raise LogStorageError(f"Elasticsearch/OpenSearch error: {e}") from e + + hits = response.get("hits", {}).get("hits", []) + logs = [] + last_sort_value = None + + for hit in hits: + source = hit.get("_source", {}) + timestamp_str = source.get("@timestamp") + message = source.get("message", "") + + if timestamp_str: + from datetime import datetime + + try: + timestamp = datetime.fromisoformat( + timestamp_str.replace("Z", "+00:00") + ) + except ValueError: + continue + else: + continue + + logs.append( + LogEvent( + timestamp=timestamp, + log_source=LogEventSource.STDOUT, + message=message, + ) + ) + + sort_values = hit.get("sort") + if sort_values: + last_sort_value = sort_values[0] + + next_token = None + if len(logs) == request.limit and last_sort_value is not None: + next_token = str(last_sort_value) + + return JobSubmissionLogs( + logs=logs, + next_token=next_token, + ) + + def close(self) -> None: + self._client.close() + + +FLUENTBIT_AVAILABLE = True +try: + import httpx + from fluent import sender as fluent_sender +except ImportError: + FLUENTBIT_AVAILABLE = False +else: @runtime_checkable class FluentBitWriter(Protocol): @@ -120,115 +227,6 @@ def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: def close(self) -> None: pass - if ELASTICSEARCH_AVAILABLE: - - class ElasticsearchReader: - """Reads logs from Elasticsearch or OpenSearch.""" - - def __init__( - self, - host: str, - index: str, - api_key: Optional[str] = None, - ) -> None: - if api_key: - self._client = Elasticsearch(hosts=[host], api_key=api_key) - else: - self._client = Elasticsearch(hosts=[host]) - self._index = index - # Verify connection - try: - self._client.info() - except ElasticsearchError as e: - raise LogStorageError( - f"Failed to connect to Elasticsearch/OpenSearch: {e}" - ) from e - - def read( - self, - stream_name: str, - request: PollLogsRequest, - ) -> JobSubmissionLogs: - sort_order = "desc" if request.descending else "asc" - - query: dict = { - "bool": { - "must": [ - {"term": {"stream.keyword": stream_name}}, - ] - } - } - - if request.start_time: - query["bool"].setdefault("filter", []).append( - {"range": {"@timestamp": {"gt": request.start_time.isoformat()}}} - ) - if request.end_time: - query["bool"].setdefault("filter", []).append( - {"range": {"@timestamp": {"lt": request.end_time.isoformat()}}} - ) - - search_params: dict = { - "index": self._index, - "query": query, - "sort": [{"@timestamp": {"order": sort_order}}], - "size": request.limit, - } - - if request.next_token: - search_params["search_after"] = [request.next_token] - - try: - response = self._client.search(**search_params) - except ElasticsearchError as e: - logger.error("Elasticsearch/OpenSearch search error: %s", e) - raise LogStorageError(f"Elasticsearch/OpenSearch error: {e}") from e - - hits = response.get("hits", {}).get("hits", []) - logs = [] - last_sort_value = None - - for hit in hits: - source = hit.get("_source", {}) - timestamp_str = source.get("@timestamp") - message = source.get("message", "") - - if timestamp_str: - from datetime import datetime - - try: - timestamp = datetime.fromisoformat( - timestamp_str.replace("Z", "+00:00") - ) - except ValueError: - continue - else: - continue - - logs.append( - LogEvent( - timestamp=timestamp, - log_source=LogEventSource.STDOUT, - message=message, - ) - ) - - sort_values = hit.get("sort") - if sort_values: - last_sort_value = sort_values[0] - - next_token = None - if len(logs) == request.limit and last_sort_value is not None: - next_token = str(last_sort_value) - - return JobSubmissionLogs( - logs=logs, - next_token=next_token, - ) - - def close(self) -> None: - self._client.close() - class FluentBitLogStorage(LogStorage): """ Log storage using Fluent-bit for writing and optionally Elasticsearch/OpenSearch for reading. From 1fb637540c24b0376604c27099ee065710ebe8ad Mon Sep 17 00:00:00 2001 From: Alexander Nicholson <4584443+DragonStuff@users.noreply.github.com> Date: Sat, 27 Dec 2025 13:29:42 +0700 Subject: [PATCH 3/6] refactor(fluentbit): cleanup protocol lambdas and address codex comments --- .../server/services/logs/fluentbit.py | 68 ++++++++----------- .../server/services/test_fluentbit_logs.py | 43 ++++++++++-- 2 files changed, 65 insertions(+), 46 deletions(-) diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py index 7428d2054d..dc1a1c721f 100644 --- a/src/dstack/_internal/server/services/logs/fluentbit.py +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Protocol, runtime_checkable +from typing import List, Optional, Protocol from uuid import UUID from dstack._internal.core.models.logs import ( @@ -21,7 +21,6 @@ logger = get_logger(__name__) -# Check if elasticsearch is available (optional for ship-only mode) ELASTICSEARCH_AVAILABLE = True try: from elasticsearch import Elasticsearch @@ -29,8 +28,6 @@ except ImportError: ELASTICSEARCH_AVAILABLE = False else: - - # Catch both API errors and transport/connection errors ElasticsearchError: tuple = (ApiError, TransportError) # type: ignore[misc] class ElasticsearchReader: @@ -51,9 +48,7 @@ def __init__( try: self._client.info() except ElasticsearchError as e: - raise LogStorageError( - f"Failed to connect to Elasticsearch/OpenSearch: {e}" - ) from e + raise LogStorageError(f"Failed to connect to Elasticsearch/OpenSearch: {e}") from e def read( self, @@ -82,12 +77,16 @@ def read( search_params: dict = { "index": self._index, "query": query, - "sort": [{"@timestamp": {"order": sort_order}}], + "sort": [ + {"@timestamp": {"order": sort_order}}, + {"_id": {"order": sort_order}}, + ], "size": request.limit, } if request.next_token: - search_params["search_after"] = [request.next_token] + parts = request.next_token.split(":", 1) + search_params["search_after"] = [parts[0], parts[1]] try: response = self._client.search(**search_params) @@ -97,7 +96,7 @@ def read( hits = response.get("hits", {}).get("hits", []) logs = [] - last_sort_value = None + last_sort_values = None for hit in hits: source = hit.get("_source", {}) @@ -108,9 +107,7 @@ def read( from datetime import datetime try: - timestamp = datetime.fromisoformat( - timestamp_str.replace("Z", "+00:00") - ) + timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) except ValueError: continue else: @@ -125,12 +122,12 @@ def read( ) sort_values = hit.get("sort") - if sort_values: - last_sort_value = sort_values[0] + if sort_values and len(sort_values) >= 2: + last_sort_values = sort_values next_token = None - if len(logs) == request.limit and last_sort_value is not None: - next_token = str(last_sort_value) + if len(logs) == request.limit and last_sort_values is not None: + next_token = f"{last_sort_values[0]}:{last_sort_values[1]}" return JobSubmissionLogs( logs=logs, @@ -149,29 +146,13 @@ def close(self) -> None: FLUENTBIT_AVAILABLE = False else: - @runtime_checkable class FluentBitWriter(Protocol): - """Protocol for Fluent-bit log writers.""" - - def write(self, tag: str, records: List[dict]) -> None: - """Write log records to Fluent-bit.""" - ... + def write(self, tag: str, records: List[dict]) -> None: ... + def close(self) -> None: ... - def close(self) -> None: - """Close any resources.""" - ... - - @runtime_checkable class LogReader(Protocol): - """Protocol for log readers (Interface Segregation Principle).""" - - def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: - """Read logs from the storage backend.""" - ... - - def close(self) -> None: - """Close any resources.""" - ... + def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: ... + def close(self) -> None: ... class HTTPFluentBitWriter: """Writes logs to Fluent-bit via HTTP POST.""" @@ -183,11 +164,21 @@ def __init__(self, host: str, port: int) -> None: def write(self, tag: str, records: List[dict]) -> None: for record in records: try: - self._client.post( + response = self._client.post( f"{self._endpoint}/{tag}", json=record, headers={"Content-Type": "application/json"}, ) + response.raise_for_status() + except httpx.HTTPStatusError as e: + logger.error( + "Fluent-bit HTTP request failed with status %d: %s", + e.response.status_code, + e.response.text, + ) + raise LogStorageError( + f"Fluent-bit HTTP error: status {e.response.status_code}" + ) from e except httpx.HTTPError as e: logger.error("Failed to write log to Fluent-bit via HTTP: %s", e) raise LogStorageError(f"Fluent-bit HTTP error: {e}") from e @@ -257,7 +248,6 @@ def __init__( else: raise LogStorageError(f"Unsupported Fluent-bit protocol: {protocol}") - # Initialize reader based on configuration (Dependency Inversion Principle) self._reader: LogReader if es_host: if not ELASTICSEARCH_AVAILABLE: diff --git a/src/tests/_internal/server/services/test_fluentbit_logs.py b/src/tests/_internal/server/services/test_fluentbit_logs.py index ef67bb0423..239fb6b079 100644 --- a/src/tests/_internal/server/services/test_fluentbit_logs.py +++ b/src/tests/_internal/server/services/test_fluentbit_logs.py @@ -83,7 +83,33 @@ def test_write_posts_records(self, mock_httpx_client): headers={"Content-Type": "application/json"}, ) - def test_write_raises_on_http_error(self, mock_httpx_client): + def test_write_calls_raise_for_status(self, mock_httpx_client): + """Test that response.raise_for_status() is called to detect non-2xx responses.""" + mock_response = Mock() + mock_httpx_client.post.return_value = mock_response + writer = HTTPFluentBitWriter(host="localhost", port=8080) + + writer.write(tag="test-tag", records=[{"message": "test"}]) + + mock_response.raise_for_status.assert_called_once() + + def test_write_raises_on_http_status_error(self, mock_httpx_client): + """Test that 4xx/5xx responses are properly detected and raise LogStorageError.""" + import httpx + + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + mock_httpx_client.post.return_value = mock_response + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server Error", request=Mock(), response=mock_response + ) + writer = HTTPFluentBitWriter(host="localhost", port=8080) + + with pytest.raises(LogStorageError, match="Fluent-bit HTTP error: status 500"): + writer.write(tag="test-tag", records=[{"message": "test"}]) + + def test_write_raises_on_transport_error(self, mock_httpx_client): import httpx mock_httpx_client.post.side_effect = httpx.HTTPError("Connection failed") @@ -468,7 +494,7 @@ def test_read_returns_logs(self, mock_es_client): "message": "Hello", "stream": "test-stream", }, - "sort": [1696586513234], + "sort": [1696586513234, "doc1"], }, { "_source": { @@ -476,7 +502,7 @@ def test_read_returns_logs(self, mock_es_client): "message": "World", "stream": "test-stream", }, - "sort": [1696586513235], + "sort": [1696586513235, "doc2"], }, ] } @@ -499,7 +525,7 @@ def test_read_returns_logs(self, mock_es_client): assert len(result.logs) == 2 assert result.logs[0].message == "Hello" assert result.logs[1].message == "World" - assert result.next_token == "1696586513235" + assert result.next_token == "1696586513235:doc2" def test_read_with_time_filtering(self, mock_es_client): with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: @@ -540,7 +566,10 @@ def test_read_descending_order(self, mock_es_client): reader.read("test-stream", request) call_args = mock_es_client.search.call_args - assert call_args.kwargs["sort"] == [{"@timestamp": {"order": "desc"}}] + assert call_args.kwargs["sort"] == [ + {"@timestamp": {"order": "desc"}}, + {"_id": {"order": "desc"}}, + ] def test_read_with_next_token(self, mock_es_client): with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: @@ -553,13 +582,13 @@ def test_read_with_next_token(self, mock_es_client): request = PollLogsRequest( run_name="test-run", job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), - next_token="1696586513234", + next_token="1696586513234:doc1", limit=100, ) reader.read("test-stream", request) call_args = mock_es_client.search.call_args - assert call_args.kwargs["search_after"] == ["1696586513234"] + assert call_args.kwargs["search_after"] == ["1696586513234", "doc1"] def test_close_closes_client(self, mock_es_client): with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: From 4210e986c220ce6c8b20da6ebe93e5e1895e489b Mon Sep 17 00:00:00 2001 From: Alexander Nicholson <4584443+DragonStuff@users.noreply.github.com> Date: Sun, 28 Dec 2025 14:30:51 +0700 Subject: [PATCH 4/6] feat(fluentbit): validate next_token format and raise ServerClientError for malformed tokens --- .../server/services/logs/fluentbit.py | 6 ++++ .../server/services/test_fluentbit_logs.py | 33 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py index dc1a1c721f..d32426831c 100644 --- a/src/dstack/_internal/server/services/logs/fluentbit.py +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -1,6 +1,7 @@ from typing import List, Optional, Protocol from uuid import UUID +from dstack._internal.core.errors import ServerClientError from dstack._internal.core.models.logs import ( JobSubmissionLogs, LogEvent, @@ -86,6 +87,11 @@ def read( if request.next_token: parts = request.next_token.split(":", 1) + if len(parts) != 2 or not parts[0] or not parts[1]: + raise ServerClientError( + f"Invalid next_token: {request.next_token}. " + "Must be in format 'timestamp:document_id'." + ) search_params["search_after"] = [parts[0], parts[1]] try: diff --git a/src/tests/_internal/server/services/test_fluentbit_logs.py b/src/tests/_internal/server/services/test_fluentbit_logs.py index 239fb6b079..56325167f1 100644 --- a/src/tests/_internal/server/services/test_fluentbit_logs.py +++ b/src/tests/_internal/server/services/test_fluentbit_logs.py @@ -6,6 +6,7 @@ import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession +from dstack._internal.core.errors import ServerClientError from dstack._internal.server.models import ProjectModel from dstack._internal.server.schemas.logs import PollLogsRequest from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent @@ -590,6 +591,38 @@ def test_read_with_next_token(self, mock_es_client): call_args = mock_es_client.search.call_args assert call_args.kwargs["search_after"] == ["1696586513234", "doc1"] + def test_read_with_malformed_next_token_raises_client_error(self, mock_es_client): + """Test that malformed next_token raises ServerClientError (400) instead of IndexError (500).""" + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + next_token="invalid_token_no_colon", + limit=100, + ) + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + request.next_token = ":" + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + request.next_token = ":doc1" + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + request.next_token = "1696586513234:" + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + mock_es_client.search.assert_not_called() + def test_close_closes_client(self, mock_es_client): with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: mock.return_value = mock_es_client From da8ad277d64ad28fd3add0ce7c882fe6b4f9a448 Mon Sep 17 00:00:00 2001 From: "Alexander Nicholson 4584443+DragonStuff@users.noreply.github.com" <4584443+DragonStuff@users.noreply.github.com> Date: Tue, 30 Dec 2025 12:14:07 +0900 Subject: [PATCH 5/6] chore(fluentbit): address quick comments --- docs/docs/guides/server-deployment.md | 2 +- pyproject.toml | 1 - src/dstack/_internal/server/services/logs/fluentbit.py | 3 ++- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index 98b3009bc7..dc5093f2f2 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -240,7 +240,7 @@ Fluent-bit supports two modes depending on how you want to access logs. === "Ship-only mode" - Logs are forwarded to Fluent-bit but cannot be read through dstack. + Logs are forwarded to Fluent-bit but cannot be read through `dstack`. The dstack UI/CLI will show empty logs. Use this mode when: - You have an existing logging infrastructure (Kibana, Grafana, Datadog, etc.) diff --git a/pyproject.toml b/pyproject.toml index 7d4c3a39f0..fc8a7d70a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -219,7 +219,6 @@ nebius = [ fluentbit = [ "fluent-logger>=0.10.0", "elasticsearch>=8.0.0", - "httpx", "dstack[server]", ] all = [ diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py index d32426831c..6e9edc3cf9 100644 --- a/src/dstack/_internal/server/services/logs/fluentbit.py +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -1,6 +1,8 @@ from typing import List, Optional, Protocol from uuid import UUID +import httpx + from dstack._internal.core.errors import ServerClientError from dstack._internal.core.models.logs import ( JobSubmissionLogs, @@ -146,7 +148,6 @@ def close(self) -> None: FLUENTBIT_AVAILABLE = True try: - import httpx from fluent import sender as fluent_sender except ImportError: FLUENTBIT_AVAILABLE = False From e16169a66dcf5281c5e108ce3ff00d12780b7616 Mon Sep 17 00:00:00 2001 From: "Alexander Nicholson 4584443+DragonStuff@users.noreply.github.com" <4584443+DragonStuff@users.noreply.github.com> Date: Tue, 30 Dec 2025 12:30:36 +0900 Subject: [PATCH 6/6] feat(fluentbit): add tag prefix support to HTTPFluentBitWriter --- .../server/services/logs/fluentbit.py | 10 +++-- .../server/services/test_fluentbit_logs.py | 43 +++++++++++++++---- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py index 6e9edc3cf9..b45b2988d5 100644 --- a/src/dstack/_internal/server/services/logs/fluentbit.py +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -164,15 +164,17 @@ def close(self) -> None: ... class HTTPFluentBitWriter: """Writes logs to Fluent-bit via HTTP POST.""" - def __init__(self, host: str, port: int) -> None: + def __init__(self, host: str, port: int, tag_prefix: str) -> None: self._endpoint = f"http://{host}:{port}" self._client = httpx.Client(timeout=30.0) + self._tag_prefix = tag_prefix def write(self, tag: str, records: List[dict]) -> None: + prefixed_tag = f"{self._tag_prefix}.{tag}" if self._tag_prefix else tag for record in records: try: response = self._client.post( - f"{self._endpoint}/{tag}", + f"{self._endpoint}/{prefixed_tag}", json=record, headers={"Content-Type": "application/json"}, ) @@ -249,7 +251,9 @@ def __init__( self._tag_prefix = tag_prefix if protocol == "http": - self._writer: FluentBitWriter = HTTPFluentBitWriter(host=host, port=port) + self._writer: FluentBitWriter = HTTPFluentBitWriter( + host=host, port=port, tag_prefix=tag_prefix + ) elif protocol == "forward": self._writer = ForwardFluentBitWriter(host=host, port=port, tag_prefix=tag_prefix) else: diff --git a/src/tests/_internal/server/services/test_fluentbit_logs.py b/src/tests/_internal/server/services/test_fluentbit_logs.py index 56325167f1..937838e016 100644 --- a/src/tests/_internal/server/services/test_fluentbit_logs.py +++ b/src/tests/_internal/server/services/test_fluentbit_logs.py @@ -61,11 +61,12 @@ def mock_httpx_client(self): yield mock.return_value def test_init_creates_client(self, mock_httpx_client): - writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") assert writer._endpoint == "http://localhost:8080" + assert writer._tag_prefix == "dstack" def test_write_posts_records(self, mock_httpx_client): - writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") records = [ {"message": "Hello", "@timestamp": "2023-10-06T10:00:00+00:00"}, {"message": "World", "@timestamp": "2023-10-06T10:00:01+00:00"}, @@ -74,12 +75,12 @@ def test_write_posts_records(self, mock_httpx_client): assert mock_httpx_client.post.call_count == 2 mock_httpx_client.post.assert_any_call( - "http://localhost:8080/test-tag", + "http://localhost:8080/dstack.test-tag", json=records[0], headers={"Content-Type": "application/json"}, ) mock_httpx_client.post.assert_any_call( - "http://localhost:8080/test-tag", + "http://localhost:8080/dstack.test-tag", json=records[1], headers={"Content-Type": "application/json"}, ) @@ -88,7 +89,7 @@ def test_write_calls_raise_for_status(self, mock_httpx_client): """Test that response.raise_for_status() is called to detect non-2xx responses.""" mock_response = Mock() mock_httpx_client.post.return_value = mock_response - writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") writer.write(tag="test-tag", records=[{"message": "test"}]) @@ -105,7 +106,7 @@ def test_write_raises_on_http_status_error(self, mock_httpx_client): mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( "Server Error", request=Mock(), response=mock_response ) - writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") with pytest.raises(LogStorageError, match="Fluent-bit HTTP error: status 500"): writer.write(tag="test-tag", records=[{"message": "test"}]) @@ -114,16 +115,40 @@ def test_write_raises_on_transport_error(self, mock_httpx_client): import httpx mock_httpx_client.post.side_effect = httpx.HTTPError("Connection failed") - writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") with pytest.raises(LogStorageError, match="Fluent-bit HTTP error"): writer.write(tag="test-tag", records=[{"message": "test"}]) def test_close_closes_client(self, mock_httpx_client): - writer = HTTPFluentBitWriter(host="localhost", port=8080) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") writer.close() mock_httpx_client.close.assert_called_once() + def test_write_applies_tag_prefix(self, mock_httpx_client): + """Test that tag prefix is applied to tags in HTTP requests.""" + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + records = [{"message": "test"}] + writer.write(tag="project/run/job", records=records) + + mock_httpx_client.post.assert_called_once_with( + "http://localhost:8080/dstack.project/run/job", + json=records[0], + headers={"Content-Type": "application/json"}, + ) + + def test_write_with_empty_tag_prefix(self, mock_httpx_client): + """Test that empty tag prefix doesn't break the tag.""" + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="") + records = [{"message": "test"}] + writer.write(tag="test-tag", records=records) + + mock_httpx_client.post.assert_called_once_with( + "http://localhost:8080/test-tag", + json=records[0], + headers={"Content-Type": "application/json"}, + ) + class TestForwardFluentBitWriter: """Tests for the ForwardFluentBitWriter.""" @@ -240,7 +265,7 @@ def test_init_with_http_protocol(self, mock_http_writer): protocol="http", tag_prefix="dstack", ) - mock.assert_called_once_with(host="localhost", port=8080) + mock.assert_called_once_with(host="localhost", port=8080, tag_prefix="dstack") def test_init_with_unsupported_protocol_raises(self): with pytest.raises(LogStorageError, match="Unsupported Fluent-bit protocol"):