diff --git a/docs/docs/guides/server-deployment.md b/docs/docs/guides/server-deployment.md index f1d7546d7..dc5093f2f 100644 --- a/docs/docs/guides/server-deployment.md +++ b/docs/docs/guides/server-deployment.md @@ -159,7 +159,7 @@ $ DSTACK_DATABASE_URL=postgresql+asyncpg://user:password@db-host:5432/dstack dst By default, `dstack` stores workload logs locally in `~/.dstack/server/projects//logs`. For multi-replica server deployments, it's required to store logs externally. -`dstack` supports storing logs using AWS CloudWatch or GCP Logging. +`dstack` supports storing logs using AWS CloudWatch, GCP Logging, or Fluent-bit with Elasticsearch / Opensearch. ### AWS CloudWatch @@ -222,6 +222,78 @@ To store logs using GCP Logging, set the `DSTACK_SERVER_GCP_LOGGING_PROJECT` env +### Fluent-bit + +To store logs using Fluent-bit, set the `DSTACK_SERVER_FLUENTBIT_HOST` environment variable. +Fluent-bit supports two modes depending on how you want to access logs. + +=== "Full mode" + + Logs are shipped to Fluent-bit and can be read back through the dstack UI and CLI via Elasticsearch or OpenSearch. + Use this mode when you want a complete integration with log viewing in dstack: + + ```shell + $ DSTACK_SERVER_FLUENTBIT_HOST=fluentbit.example.com \ + DSTACK_SERVER_ELASTICSEARCH_HOST=https://elasticsearch.example.com:9200 \ + dstack server + ``` + +=== "Ship-only mode" + + Logs are forwarded to Fluent-bit but cannot be read through `dstack`. + The dstack UI/CLI will show empty logs. Use this mode when: + + - You have an existing logging infrastructure (Kibana, Grafana, Datadog, etc.) + - You only need to forward logs without reading them back through dstack + - You want to reduce operational complexity by not running Elasticsearch/OpenSearch + + ```shell + $ DSTACK_SERVER_FLUENTBIT_HOST=fluentbit.example.com \ + dstack server + ``` + +??? info "Additional configuration" + The following optional environment variables can be used to customize the Fluent-bit integration: + + **Fluent-bit settings:** + + - `DSTACK_SERVER_FLUENTBIT_PORT` – The Fluent-bit port. Defaults to `24224`. + - `DSTACK_SERVER_FLUENTBIT_PROTOCOL` – The protocol to use: `forward` or `http`. Defaults to `forward`. + - `DSTACK_SERVER_FLUENTBIT_TAG_PREFIX` – The tag prefix for logs. Defaults to `dstack`. + + **Elasticsearch/OpenSearch settings (for full mode only):** + + - `DSTACK_SERVER_ELASTICSEARCH_HOST` – The Elasticsearch/OpenSearch host for reading logs. If not set, runs in ship-only mode. + - `DSTACK_SERVER_ELASTICSEARCH_INDEX` – The Elasticsearch/OpenSearch index pattern. Defaults to `dstack-logs`. + - `DSTACK_SERVER_ELASTICSEARCH_API_KEY` – The Elasticsearch/OpenSearch API key for authentication. + +??? info "Fluent-bit configuration" + Configure Fluent-bit to receive logs and forward them to Elasticsearch or OpenSearch. Example configuration: + + ```ini + [INPUT] + Name forward + Listen 0.0.0.0 + Port 24224 + + [OUTPUT] + Name es + Match dstack.* + Host elasticsearch.example.com + Port 9200 + Index dstack-logs + Suppress_Type_Name On + ``` + +??? info "Required dependencies" + To use Fluent-bit log storage, install the `fluentbit` extras: + + ```shell + $ pip install "dstack[all]" -U + # or + $ pip install "dstack[fluentbit]" -U + ``` + ## File storage When using [files](../concepts/dev-environments.md#files) or [repos](../concepts/dev-environments.md#repos), `dstack` uploads local files and diffs to the server so that you can have access to them within runs. By default, the files are stored in the DB and each upload is limited to 2MB. You can configure an object storage to be used for uploads and increase the default limit by setting the `DSTACK_SERVER_CODE_UPLOAD_LIMIT` environment variable @@ -426,8 +498,10 @@ If a deployment is stuck due to a deadlock when applying DB migrations, try scal ??? info "Can I run multiple replicas of dstack server?" - Yes, you can if you configure `dstack` to use [PostgreSQL](#postgresql) and [AWS CloudWatch](#aws-cloudwatch). + Yes, you can if you configure `dstack` to use [PostgreSQL](#postgresql) and an external log storage + such as [AWS CloudWatch](#aws-cloudwatch), [GCP Logging](#gcp-logging), or [Fluent-bit](#fluent-bit). ??? info "Does dstack server support blue-green or rolling deployments?" - Yes, it does if you configure `dstack` to use [PostgreSQL](#postgresql) and [AWS CloudWatch](#aws-cloudwatch). + Yes, it does if you configure `dstack` to use [PostgreSQL](#postgresql) and an external log storage + such as [AWS CloudWatch](#aws-cloudwatch), [GCP Logging](#gcp-logging), or [Fluent-bit](#fluent-bit). diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 4575f1b8f..62ce97cd1 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -113,6 +113,13 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_SERVER_CLOUDWATCH_LOG_GROUP`{ #DSTACK_SERVER_CLOUDWATCH_LOG_GROUP } – The CloudWatch Logs group for storing workloads logs. If not set, the default file-based log storage is used. - `DSTACK_SERVER_CLOUDWATCH_LOG_REGION`{ #DSTACK_SERVER_CLOUDWATCH_LOG_REGION } – The CloudWatch Logs region. Defaults to `None`. - `DSTACK_SERVER_GCP_LOGGING_PROJECT`{ #DSTACK_SERVER_GCP_LOGGING_PROJECT } – The GCP Logging project for storing workloads logs. If not set, the default file-based log storage is used. +- `DSTACK_SERVER_FLUENTBIT_HOST`{ #DSTACK_SERVER_FLUENTBIT_HOST } – The Fluent-bit host for log forwarding. If set, enables Fluent-bit log storage. +- `DSTACK_SERVER_FLUENTBIT_PORT`{ #DSTACK_SERVER_FLUENTBIT_PORT } – The Fluent-bit port. Defaults to `24224`. +- `DSTACK_SERVER_FLUENTBIT_PROTOCOL`{ #DSTACK_SERVER_FLUENTBIT_PROTOCOL } – The protocol to use: `forward` or `http`. Defaults to `forward`. +- `DSTACK_SERVER_FLUENTBIT_TAG_PREFIX`{ #DSTACK_SERVER_FLUENTBIT_TAG_PREFIX } – The tag prefix for logs. Defaults to `dstack`. +- `DSTACK_SERVER_ELASTICSEARCH_HOST`{ #DSTACK_SERVER_ELASTICSEARCH_HOST } – The Elasticsearch/OpenSearch host for reading logs back through dstack. Optional; if not set, Fluent-bit runs in ship-only mode (logs are forwarded but not readable through dstack UI/CLI). +- `DSTACK_SERVER_ELASTICSEARCH_INDEX`{ #DSTACK_SERVER_ELASTICSEARCH_INDEX } – The Elasticsearch/OpenSearch index pattern. Defaults to `dstack-logs`. +- `DSTACK_SERVER_ELASTICSEARCH_API_KEY`{ #DSTACK_SERVER_ELASTICSEARCH_API_KEY } – The Elasticsearch/OpenSearch API key for authentication. - `DSTACK_ENABLE_PROMETHEUS_METRICS`{ #DSTACK_ENABLE_PROMETHEUS_METRICS } — Enables Prometheus metrics collection and export. - `DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE`{ #DSTACK_DEFAULT_SERVICE_CLIENT_MAX_BODY_SIZE } – Request body size limit for services running with a gateway, in bytes. Defaults to 64 MiB. - `DSTACK_SERVICE_CLIENT_TIMEOUT`{ #DSTACK_SERVICE_CLIENT_TIMEOUT } – Timeout in seconds for HTTP requests sent from the in-server proxy and gateways to service replicas. Defaults to 60. diff --git a/pyproject.toml b/pyproject.toml index c0036ff7b..fc8a7d70a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -216,6 +216,11 @@ nebius = [ "nebius>=0.3.4,<0.4; python_version >= '3.10'", "dstack[server]", ] +fluentbit = [ + "fluent-logger>=0.10.0", + "elasticsearch>=8.0.0", + "dstack[server]", +] all = [ - "dstack[gateway,server,aws,azure,gcp,verda,kubernetes,lambda,nebius,oci]", + "dstack[gateway,server,aws,azure,gcp,verda,kubernetes,lambda,nebius,oci,fluentbit]", ] diff --git a/src/dstack/_internal/server/services/logs/__init__.py b/src/dstack/_internal/server/services/logs/__init__.py index 5b06ff4ad..1f8565d49 100644 --- a/src/dstack/_internal/server/services/logs/__init__.py +++ b/src/dstack/_internal/server/services/logs/__init__.py @@ -8,6 +8,7 @@ from dstack._internal.server.schemas.logs import PollLogsRequest from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent from dstack._internal.server.services.logs import aws as aws_logs +from dstack._internal.server.services.logs import fluentbit as fluentbit_logs from dstack._internal.server.services.logs import gcp as gcp_logs from dstack._internal.server.services.logs.base import ( LogStorage, @@ -57,6 +58,29 @@ def get_log_storage() -> LogStorage: logger.debug("Using GCP Logs storage") else: logger.error("Cannot use GCP Logs storage: GCP deps are not installed") + elif settings.SERVER_FLUENTBIT_HOST: + if fluentbit_logs.FLUENTBIT_AVAILABLE: + try: + _log_storage = fluentbit_logs.FluentBitLogStorage( + host=settings.SERVER_FLUENTBIT_HOST, + port=settings.SERVER_FLUENTBIT_PORT, + protocol=settings.SERVER_FLUENTBIT_PROTOCOL, + tag_prefix=settings.SERVER_FLUENTBIT_TAG_PREFIX, + es_host=settings.SERVER_ELASTICSEARCH_HOST, + es_index=settings.SERVER_ELASTICSEARCH_INDEX, + es_api_key=settings.SERVER_ELASTICSEARCH_API_KEY, + ) + except LogStorageError as e: + logger.error("Failed to initialize Fluent-bit Logs storage: %s", e) + except Exception: + logger.exception("Got exception when initializing Fluent-bit Logs storage") + else: + if settings.SERVER_ELASTICSEARCH_HOST: + logger.debug("Using Fluent-bit Logs storage with Elasticsearch/OpenSearch") + else: + logger.debug("Using Fluent-bit Logs storage in ship-only mode") + else: + logger.error("Cannot use Fluent-bit Logs storage: fluent-logger is not installed") if _log_storage is None: _log_storage = FileLogStorage() logger.debug("Using file-based storage") diff --git a/src/dstack/_internal/server/services/logs/fluentbit.py b/src/dstack/_internal/server/services/logs/fluentbit.py new file mode 100644 index 000000000..b45b2988d --- /dev/null +++ b/src/dstack/_internal/server/services/logs/fluentbit.py @@ -0,0 +1,338 @@ +from typing import List, Optional, Protocol +from uuid import UUID + +import httpx + +from dstack._internal.core.errors import ServerClientError +from dstack._internal.core.models.logs import ( + JobSubmissionLogs, + LogEvent, + LogEventSource, + LogProducer, +) +from dstack._internal.server.models import ProjectModel +from dstack._internal.server.schemas.logs import PollLogsRequest +from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent +from dstack._internal.server.services.logs.base import ( + LogStorage, + LogStorageError, + unix_time_ms_to_datetime, +) +from dstack._internal.utils.common import batched +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +ELASTICSEARCH_AVAILABLE = True +try: + from elasticsearch import Elasticsearch + from elasticsearch.exceptions import ApiError, TransportError +except ImportError: + ELASTICSEARCH_AVAILABLE = False +else: + ElasticsearchError: tuple = (ApiError, TransportError) # type: ignore[misc] + + class ElasticsearchReader: + """Reads logs from Elasticsearch or OpenSearch.""" + + def __init__( + self, + host: str, + index: str, + api_key: Optional[str] = None, + ) -> None: + if api_key: + self._client = Elasticsearch(hosts=[host], api_key=api_key) + else: + self._client = Elasticsearch(hosts=[host]) + self._index = index + # Verify connection + try: + self._client.info() + except ElasticsearchError as e: + raise LogStorageError(f"Failed to connect to Elasticsearch/OpenSearch: {e}") from e + + def read( + self, + stream_name: str, + request: PollLogsRequest, + ) -> JobSubmissionLogs: + sort_order = "desc" if request.descending else "asc" + + query: dict = { + "bool": { + "must": [ + {"term": {"stream.keyword": stream_name}}, + ] + } + } + + if request.start_time: + query["bool"].setdefault("filter", []).append( + {"range": {"@timestamp": {"gt": request.start_time.isoformat()}}} + ) + if request.end_time: + query["bool"].setdefault("filter", []).append( + {"range": {"@timestamp": {"lt": request.end_time.isoformat()}}} + ) + + search_params: dict = { + "index": self._index, + "query": query, + "sort": [ + {"@timestamp": {"order": sort_order}}, + {"_id": {"order": sort_order}}, + ], + "size": request.limit, + } + + if request.next_token: + parts = request.next_token.split(":", 1) + if len(parts) != 2 or not parts[0] or not parts[1]: + raise ServerClientError( + f"Invalid next_token: {request.next_token}. " + "Must be in format 'timestamp:document_id'." + ) + search_params["search_after"] = [parts[0], parts[1]] + + try: + response = self._client.search(**search_params) + except ElasticsearchError as e: + logger.error("Elasticsearch/OpenSearch search error: %s", e) + raise LogStorageError(f"Elasticsearch/OpenSearch error: {e}") from e + + hits = response.get("hits", {}).get("hits", []) + logs = [] + last_sort_values = None + + for hit in hits: + source = hit.get("_source", {}) + timestamp_str = source.get("@timestamp") + message = source.get("message", "") + + if timestamp_str: + from datetime import datetime + + try: + timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + except ValueError: + continue + else: + continue + + logs.append( + LogEvent( + timestamp=timestamp, + log_source=LogEventSource.STDOUT, + message=message, + ) + ) + + sort_values = hit.get("sort") + if sort_values and len(sort_values) >= 2: + last_sort_values = sort_values + + next_token = None + if len(logs) == request.limit and last_sort_values is not None: + next_token = f"{last_sort_values[0]}:{last_sort_values[1]}" + + return JobSubmissionLogs( + logs=logs, + next_token=next_token, + ) + + def close(self) -> None: + self._client.close() + + +FLUENTBIT_AVAILABLE = True +try: + from fluent import sender as fluent_sender +except ImportError: + FLUENTBIT_AVAILABLE = False +else: + + class FluentBitWriter(Protocol): + def write(self, tag: str, records: List[dict]) -> None: ... + def close(self) -> None: ... + + class LogReader(Protocol): + def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: ... + def close(self) -> None: ... + + class HTTPFluentBitWriter: + """Writes logs to Fluent-bit via HTTP POST.""" + + def __init__(self, host: str, port: int, tag_prefix: str) -> None: + self._endpoint = f"http://{host}:{port}" + self._client = httpx.Client(timeout=30.0) + self._tag_prefix = tag_prefix + + def write(self, tag: str, records: List[dict]) -> None: + prefixed_tag = f"{self._tag_prefix}.{tag}" if self._tag_prefix else tag + for record in records: + try: + response = self._client.post( + f"{self._endpoint}/{prefixed_tag}", + json=record, + headers={"Content-Type": "application/json"}, + ) + response.raise_for_status() + except httpx.HTTPStatusError as e: + logger.error( + "Fluent-bit HTTP request failed with status %d: %s", + e.response.status_code, + e.response.text, + ) + raise LogStorageError( + f"Fluent-bit HTTP error: status {e.response.status_code}" + ) from e + except httpx.HTTPError as e: + logger.error("Failed to write log to Fluent-bit via HTTP: %s", e) + raise LogStorageError(f"Fluent-bit HTTP error: {e}") from e + + def close(self) -> None: + self._client.close() + + class ForwardFluentBitWriter: + """Writes logs to Fluent-bit using Forward protocol.""" + + def __init__(self, host: str, port: int, tag_prefix: str) -> None: + self._sender = fluent_sender.FluentSender(tag_prefix, host=host, port=port) + self._tag_prefix = tag_prefix + + def write(self, tag: str, records: List[dict]) -> None: + for record in records: + if not self._sender.emit(tag, record): + error = self._sender.last_error + logger.error("Failed to write log to Fluent-bit via Forward: %s", error) + self._sender.clear_last_error() + raise LogStorageError(f"Fluent-bit Forward error: {error}") + + def close(self) -> None: + self._sender.close() + + class NullLogReader: + """ + Null reader for ship-only mode (no Elasticsearch/OpenSearch configured). + + Returns empty logs. Useful when logs are shipped to an external system + that is accessed directly rather than through dstack. + """ + + def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: + return JobSubmissionLogs(logs=[], next_token=None) + + def close(self) -> None: + pass + + class FluentBitLogStorage(LogStorage): + """ + Log storage using Fluent-bit for writing and optionally Elasticsearch/OpenSearch for reading. + + Supports two modes: + - Full mode: Writes to Fluent-bit and reads from Elasticsearch/OpenSearch + - Ship-only mode: Writes to Fluent-bit only (no reading, returns empty logs) + """ + + MAX_BATCH_SIZE = 100 + + def __init__( + self, + host: str, + port: int, + protocol: str, + tag_prefix: str, + es_host: Optional[str] = None, + es_index: str = "dstack-logs", + es_api_key: Optional[str] = None, + ) -> None: + self._tag_prefix = tag_prefix + + if protocol == "http": + self._writer: FluentBitWriter = HTTPFluentBitWriter( + host=host, port=port, tag_prefix=tag_prefix + ) + elif protocol == "forward": + self._writer = ForwardFluentBitWriter(host=host, port=port, tag_prefix=tag_prefix) + else: + raise LogStorageError(f"Unsupported Fluent-bit protocol: {protocol}") + + self._reader: LogReader + if es_host: + if not ELASTICSEARCH_AVAILABLE: + raise LogStorageError( + "Elasticsearch/OpenSearch host configured but elasticsearch package " + "is not installed. Install with: pip install elasticsearch" + ) + self._reader = ElasticsearchReader( + host=es_host, + index=es_index, + api_key=es_api_key, + ) + logger.debug( + "Fluent-bit log storage initialized with Elasticsearch/OpenSearch reader" + ) + else: + self._reader = NullLogReader() + logger.info( + "Fluent-bit log storage initialized in ship-only mode " + "(no Elasticsearch/OpenSearch configured for reading)" + ) + + def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmissionLogs: + producer = LogProducer.RUNNER if request.diagnose else LogProducer.JOB + stream_name = self._get_stream_name( + project_name=project.name, + run_name=request.run_name, + job_submission_id=request.job_submission_id, + producer=producer, + ) + return self._reader.read(stream_name=stream_name, request=request) + + def write_logs( + self, + project: ProjectModel, + run_name: str, + job_submission_id: UUID, + runner_logs: List[RunnerLogEvent], + job_logs: List[RunnerLogEvent], + ) -> None: + producers_with_logs = [(LogProducer.RUNNER, runner_logs), (LogProducer.JOB, job_logs)] + for producer, producer_logs in producers_with_logs: + if not producer_logs: + continue + stream_name = self._get_stream_name( + project_name=project.name, + run_name=run_name, + job_submission_id=job_submission_id, + producer=producer, + ) + self._write_logs_to_stream(stream_name=stream_name, logs=producer_logs) + + def _write_logs_to_stream(self, stream_name: str, logs: List[RunnerLogEvent]) -> None: + for batch in batched(logs, self.MAX_BATCH_SIZE): + records = [] + for log in batch: + message = log.message.decode(errors="replace") + timestamp = unix_time_ms_to_datetime(log.timestamp) + records.append( + { + "message": message, + "@timestamp": timestamp.isoformat(), + "stream": stream_name, + } + ) + self._writer.write(tag=stream_name, records=records) + + def close(self) -> None: + try: + self._writer.close() + finally: + self._reader.close() + + def _get_stream_name( + self, project_name: str, run_name: str, job_submission_id: UUID, producer: LogProducer + ) -> str: + return f"{project_name}/{run_name}/{job_submission_id}/{producer.value}" diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 74d1d7b8d..6e5c8e4bc 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -78,6 +78,15 @@ SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT") +SERVER_FLUENTBIT_HOST = os.getenv("DSTACK_SERVER_FLUENTBIT_HOST") +SERVER_FLUENTBIT_PORT = int(os.getenv("DSTACK_SERVER_FLUENTBIT_PORT", "24224")) +SERVER_FLUENTBIT_PROTOCOL = os.getenv("DSTACK_SERVER_FLUENTBIT_PROTOCOL", "forward") +SERVER_FLUENTBIT_TAG_PREFIX = os.getenv("DSTACK_SERVER_FLUENTBIT_TAG_PREFIX", "dstack") + +SERVER_ELASTICSEARCH_HOST = os.getenv("DSTACK_SERVER_ELASTICSEARCH_HOST") +SERVER_ELASTICSEARCH_INDEX = os.getenv("DSTACK_SERVER_ELASTICSEARCH_INDEX", "dstack-logs") +SERVER_ELASTICSEARCH_API_KEY = os.getenv("DSTACK_SERVER_ELASTICSEARCH_API_KEY") + SERVER_METRICS_RUNNING_TTL_SECONDS = environ.get_int( "DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS", default=3600 ) diff --git a/src/tests/_internal/server/services/test_fluentbit_logs.py b/src/tests/_internal/server/services/test_fluentbit_logs.py new file mode 100644 index 000000000..937838e01 --- /dev/null +++ b/src/tests/_internal/server/services/test_fluentbit_logs.py @@ -0,0 +1,659 @@ +from datetime import datetime, timezone +from unittest.mock import Mock, patch +from uuid import UUID + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.errors import ServerClientError +from dstack._internal.server.models import ProjectModel +from dstack._internal.server.schemas.logs import PollLogsRequest +from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent +from dstack._internal.server.services.logs.base import LogStorageError +from dstack._internal.server.services.logs.fluentbit import ( + ELASTICSEARCH_AVAILABLE, + FLUENTBIT_AVAILABLE, +) +from dstack._internal.server.testing.common import create_project + +pytestmark = pytest.mark.skipif(not FLUENTBIT_AVAILABLE, reason="fluent-logger not installed") + +# Conditionally import classes that are only defined when FLUENTBIT_AVAILABLE is True +if FLUENTBIT_AVAILABLE: + from dstack._internal.server.services.logs.fluentbit import ( + FluentBitLogStorage, + ForwardFluentBitWriter, + HTTPFluentBitWriter, + NullLogReader, + ) + + if ELASTICSEARCH_AVAILABLE: + from dstack._internal.server.services.logs.fluentbit import ElasticsearchReader + + +class TestNullLogReader: + """Tests for the NullLogReader (ship-only mode).""" + + def test_read_returns_empty_logs(self): + reader = NullLogReader() + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=100, + ) + result = reader.read("test-stream", request) + + assert result.logs == [] + assert result.next_token is None + + def test_close_does_nothing(self): + reader = NullLogReader() + reader.close() # Should not raise + + +class TestHTTPFluentBitWriter: + """Tests for the HTTPFluentBitWriter.""" + + @pytest.fixture + def mock_httpx_client(self): + with patch("dstack._internal.server.services.logs.fluentbit.httpx.Client") as mock: + yield mock.return_value + + def test_init_creates_client(self, mock_httpx_client): + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + assert writer._endpoint == "http://localhost:8080" + assert writer._tag_prefix == "dstack" + + def test_write_posts_records(self, mock_httpx_client): + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + records = [ + {"message": "Hello", "@timestamp": "2023-10-06T10:00:00+00:00"}, + {"message": "World", "@timestamp": "2023-10-06T10:00:01+00:00"}, + ] + writer.write(tag="test-tag", records=records) + + assert mock_httpx_client.post.call_count == 2 + mock_httpx_client.post.assert_any_call( + "http://localhost:8080/dstack.test-tag", + json=records[0], + headers={"Content-Type": "application/json"}, + ) + mock_httpx_client.post.assert_any_call( + "http://localhost:8080/dstack.test-tag", + json=records[1], + headers={"Content-Type": "application/json"}, + ) + + def test_write_calls_raise_for_status(self, mock_httpx_client): + """Test that response.raise_for_status() is called to detect non-2xx responses.""" + mock_response = Mock() + mock_httpx_client.post.return_value = mock_response + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + + writer.write(tag="test-tag", records=[{"message": "test"}]) + + mock_response.raise_for_status.assert_called_once() + + def test_write_raises_on_http_status_error(self, mock_httpx_client): + """Test that 4xx/5xx responses are properly detected and raise LogStorageError.""" + import httpx + + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + mock_httpx_client.post.return_value = mock_response + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "Server Error", request=Mock(), response=mock_response + ) + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + + with pytest.raises(LogStorageError, match="Fluent-bit HTTP error: status 500"): + writer.write(tag="test-tag", records=[{"message": "test"}]) + + def test_write_raises_on_transport_error(self, mock_httpx_client): + import httpx + + mock_httpx_client.post.side_effect = httpx.HTTPError("Connection failed") + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + + with pytest.raises(LogStorageError, match="Fluent-bit HTTP error"): + writer.write(tag="test-tag", records=[{"message": "test"}]) + + def test_close_closes_client(self, mock_httpx_client): + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + writer.close() + mock_httpx_client.close.assert_called_once() + + def test_write_applies_tag_prefix(self, mock_httpx_client): + """Test that tag prefix is applied to tags in HTTP requests.""" + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack") + records = [{"message": "test"}] + writer.write(tag="project/run/job", records=records) + + mock_httpx_client.post.assert_called_once_with( + "http://localhost:8080/dstack.project/run/job", + json=records[0], + headers={"Content-Type": "application/json"}, + ) + + def test_write_with_empty_tag_prefix(self, mock_httpx_client): + """Test that empty tag prefix doesn't break the tag.""" + writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="") + records = [{"message": "test"}] + writer.write(tag="test-tag", records=records) + + mock_httpx_client.post.assert_called_once_with( + "http://localhost:8080/test-tag", + json=records[0], + headers={"Content-Type": "application/json"}, + ) + + +class TestForwardFluentBitWriter: + """Tests for the ForwardFluentBitWriter.""" + + @pytest.fixture + def mock_fluent_sender(self): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock_instance = Mock() + mock_instance.emit.return_value = True + mock.return_value = mock_instance + yield mock_instance + + def test_init_creates_sender(self, mock_fluent_sender): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + mock.assert_called_once_with("dstack", host="localhost", port=24224) + + def test_write_emits_records(self, mock_fluent_sender): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + writer = ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + + records = [ + {"message": "Hello"}, + {"message": "World"}, + ] + writer.write(tag="test-tag", records=records) + + assert mock_fluent_sender.emit.call_count == 2 + + def test_write_raises_on_emit_failure(self, mock_fluent_sender): + mock_fluent_sender.emit.return_value = False + mock_fluent_sender.last_error = Exception("Connection refused") + + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + writer = ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + + with pytest.raises(LogStorageError, match="Fluent-bit Forward error"): + writer.write(tag="test-tag", records=[{"message": "test"}]) + + mock_fluent_sender.clear_last_error.assert_called_once() + + def test_close_closes_sender(self, mock_fluent_sender): + with patch( + "dstack._internal.server.services.logs.fluentbit.fluent_sender.FluentSender" + ) as mock: + mock.return_value = mock_fluent_sender + writer = ForwardFluentBitWriter(host="localhost", port=24224, tag_prefix="dstack") + writer.close() + mock_fluent_sender.close.assert_called_once() + + +class TestFluentBitLogStorage: + """Tests for the FluentBitLogStorage.""" + + @pytest_asyncio.fixture + async def project(self, test_db, session: AsyncSession) -> ProjectModel: + project = await create_project(session=session, name="test-proj") + return project + + @pytest.fixture + def mock_forward_writer(self): + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock_instance = Mock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def mock_http_writer(self): + with patch("dstack._internal.server.services.logs.fluentbit.HTTPFluentBitWriter") as mock: + mock_instance = Mock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def mock_es_reader(self): + with patch("dstack._internal.server.services.logs.fluentbit.ElasticsearchReader") as mock: + mock_instance = Mock() + mock.return_value = mock_instance + yield mock_instance + + def test_init_with_forward_protocol(self, mock_forward_writer): + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + mock.assert_called_once_with(host="localhost", port=24224, tag_prefix="dstack") + assert isinstance(storage._reader, NullLogReader) + + def test_init_with_http_protocol(self, mock_http_writer): + with patch("dstack._internal.server.services.logs.fluentbit.HTTPFluentBitWriter") as mock: + mock.return_value = mock_http_writer + FluentBitLogStorage( + host="localhost", + port=8080, + protocol="http", + tag_prefix="dstack", + ) + mock.assert_called_once_with(host="localhost", port=8080, tag_prefix="dstack") + + def test_init_with_unsupported_protocol_raises(self): + with pytest.raises(LogStorageError, match="Unsupported Fluent-bit protocol"): + FluentBitLogStorage( + host="localhost", + port=24224, + protocol="grpc", + tag_prefix="dstack", + ) + + def test_init_ship_only_mode(self, mock_forward_writer): + """Test initialization without Elasticsearch (ship-only mode).""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + es_host=None, + ) + assert isinstance(storage._reader, NullLogReader) + + @pytest.mark.skipif(not ELASTICSEARCH_AVAILABLE, reason="elasticsearch not installed") + def test_init_with_elasticsearch(self, mock_forward_writer, mock_es_reader): + """Test initialization with Elasticsearch configured.""" + with ( + patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as writer_mock, + patch( + "dstack._internal.server.services.logs.fluentbit.ElasticsearchReader" + ) as reader_mock, + ): + writer_mock.return_value = mock_forward_writer + reader_mock.return_value = mock_es_reader + + FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + es_host="http://elasticsearch:9200", + es_index="dstack-logs", + es_api_key="test-key", + ) + reader_mock.assert_called_once_with( + host="http://elasticsearch:9200", + index="dstack-logs", + api_key="test-key", + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_write_logs(self, test_db, project: ProjectModel, mock_forward_writer): + """Test writing logs to Fluent-bit.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + storage.write_logs( + project=project, + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[ + RunnerLogEvent(timestamp=1696586513234, message=b"Runner log"), + ], + job_logs=[ + RunnerLogEvent(timestamp=1696586513235, message=b"Job log"), + ], + ) + + assert mock_forward_writer.write.call_count == 2 + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_write_logs_empty_logs_not_written( + self, test_db, project: ProjectModel, mock_forward_writer + ): + """Test that empty log lists are not written.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + storage.write_logs( + project=project, + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[], + job_logs=[], + ) + + mock_forward_writer.write.assert_not_called() + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_ship_only_mode(self, test_db, project: ProjectModel): + """Test that ship-only mode returns empty logs.""" + with patch("dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter"): + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=100, + ) + result = storage.poll_logs(project, request) + + assert result.logs == [] + assert result.next_token is None + + def test_close_closes_writer_and_reader(self, mock_forward_writer): + """Test that close() closes both writer and reader.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + storage.close() + + mock_forward_writer.close.assert_called_once() + + def test_close_closes_reader_even_if_writer_fails(self, mock_forward_writer): + """Test that reader is closed even if writer.close() raises an exception.""" + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock_forward_writer.close.side_effect = Exception("Writer close failed") + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + mock_reader = Mock() + storage._reader = mock_reader + + with pytest.raises(Exception, match="Writer close failed"): + storage.close() + + mock_reader.close.assert_called_once() + + def test_get_stream_name(self, mock_forward_writer): + """Test stream name generation.""" + from dstack._internal.core.models.logs import LogProducer + + with patch( + "dstack._internal.server.services.logs.fluentbit.ForwardFluentBitWriter" + ) as mock: + mock.return_value = mock_forward_writer + storage = FluentBitLogStorage( + host="localhost", + port=24224, + protocol="forward", + tag_prefix="dstack", + ) + + stream_name = storage._get_stream_name( + project_name="my-project", + run_name="my-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + producer=LogProducer.JOB, + ) + + assert stream_name == "my-project/my-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/job" + + +@pytest.mark.skipif( + not FLUENTBIT_AVAILABLE or not ELASTICSEARCH_AVAILABLE, + reason="fluent-logger or elasticsearch not installed", +) +class TestElasticsearchReader: + """Tests for the ElasticsearchReader.""" + + @pytest.fixture + def mock_es_client(self): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock_instance = Mock() + mock_instance.info.return_value = {"version": {"number": "8.0.0"}} + mock_instance.search.return_value = {"hits": {"hits": []}} + mock.return_value = mock_instance + yield mock_instance + + def test_init_verifies_connection(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + mock_es_client.info.assert_called_once() + + def test_init_with_api_key(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + api_key="test-api-key", + ) + mock.assert_called_once_with(hosts=["http://localhost:9200"], api_key="test-api-key") + + def test_init_connection_error_raises(self): + from elasticsearch.exceptions import ConnectionError as ESConnectionError + + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock_instance = Mock() + mock_instance.info.side_effect = ESConnectionError("Connection refused") + mock.return_value = mock_instance + + with pytest.raises(LogStorageError, match="Failed to connect"): + ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + def test_read_returns_logs(self, mock_es_client): + mock_es_client.search.return_value = { + "hits": { + "hits": [ + { + "_source": { + "@timestamp": "2023-10-06T10:01:53.234000+00:00", + "message": "Hello", + "stream": "test-stream", + }, + "sort": [1696586513234, "doc1"], + }, + { + "_source": { + "@timestamp": "2023-10-06T10:01:53.235000+00:00", + "message": "World", + "stream": "test-stream", + }, + "sort": [1696586513235, "doc2"], + }, + ] + } + } + + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=2, + ) + result = reader.read("test-stream", request) + + assert len(result.logs) == 2 + assert result.logs[0].message == "Hello" + assert result.logs[1].message == "World" + assert result.next_token == "1696586513235:doc2" + + def test_read_with_time_filtering(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + start_time=datetime(2023, 10, 6, 10, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2023, 10, 6, 11, 0, 0, tzinfo=timezone.utc), + limit=100, + ) + reader.read("test-stream", request) + + call_args = mock_es_client.search.call_args + query = call_args.kwargs["query"] + assert "filter" in query["bool"] + assert len(query["bool"]["filter"]) == 2 + + def test_read_descending_order(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=100, + descending=True, + ) + reader.read("test-stream", request) + + call_args = mock_es_client.search.call_args + assert call_args.kwargs["sort"] == [ + {"@timestamp": {"order": "desc"}}, + {"_id": {"order": "desc"}}, + ] + + def test_read_with_next_token(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + next_token="1696586513234:doc1", + limit=100, + ) + reader.read("test-stream", request) + + call_args = mock_es_client.search.call_args + assert call_args.kwargs["search_after"] == ["1696586513234", "doc1"] + + def test_read_with_malformed_next_token_raises_client_error(self, mock_es_client): + """Test that malformed next_token raises ServerClientError (400) instead of IndexError (500).""" + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + + request = PollLogsRequest( + run_name="test-run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + next_token="invalid_token_no_colon", + limit=100, + ) + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + request.next_token = ":" + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + request.next_token = ":doc1" + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + request.next_token = "1696586513234:" + with pytest.raises(ServerClientError, match="Invalid next_token"): + reader.read("test-stream", request) + + mock_es_client.search.assert_not_called() + + def test_close_closes_client(self, mock_es_client): + with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock: + mock.return_value = mock_es_client + reader = ElasticsearchReader( + host="http://localhost:9200", + index="dstack-logs", + ) + reader.close() + mock_es_client.close.assert_called_once()