Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 152 additions & 21 deletions src/authentication/k8s.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Manage authentication flow for FastAPI endpoints with K8S/OCP."""

import os
from http import HTTPStatus
from typing import Optional, Self, cast

import kubernetes.client
Expand Down Expand Up @@ -29,8 +30,45 @@
)


class ClusterIDUnavailableError(Exception):
"""Cluster ID is not available."""
class K8sAuthenticationError(Exception):
"""Base exception for Kubernetes authentication errors."""


class K8sAPIConnectionError(K8sAuthenticationError):
"""Cannot connect to Kubernetes API server.

Indicates transient failures that may be resolved by retrying.
Maps to HTTP 503 Service Unavailable.
"""


class K8sConfigurationError(K8sAuthenticationError):
"""Kubernetes cluster configuration issue.

Indicates persistent configuration problems requiring admin intervention.
Maps to HTTP 500 Internal Server Error.
"""


class ClusterVersionNotFoundError(K8sConfigurationError):
"""ClusterVersion resource not found in OpenShift cluster.

Raised when the ClusterVersion custom resource does not exist (HTTP 404).
"""


class ClusterVersionPermissionError(K8sConfigurationError):
"""No permission to access ClusterVersion resource.

Raised when RBAC denies access to the ClusterVersion resource (HTTP 403).
"""


class InvalidClusterVersionError(K8sConfigurationError):
"""ClusterVersion resource has invalid structure or missing required fields.

Raised when the ClusterVersion exists but is missing spec.clusterID or has wrong type.
"""


class K8sClientSingleton:
Expand Down Expand Up @@ -156,8 +194,10 @@ def _get_cluster_id(cls) -> str:
str: The cluster's `clusterID`.

Raises:
ClusterIDUnavailableError: If the cluster ID cannot be obtained due
to missing keys, an API error, or any unexpected error.
K8sAPIConnectionError: If the Kubernetes API is unreachable or returns 5xx errors.
ClusterVersionNotFoundError: If the ClusterVersion resource does not exist (404).
ClusterVersionPermissionError: If access to ClusterVersion is denied (403).
InvalidClusterVersionError: If ClusterVersion has invalid structure or missing fields.
"""
try:
custom_objects_api = cls.get_custom_objects_api()
Expand All @@ -170,27 +210,64 @@ def _get_cluster_id(cls) -> str:
)
spec = version_data.get("spec")
if not isinstance(spec, dict):
raise ClusterIDUnavailableError(
raise InvalidClusterVersionError(
"Missing or invalid 'spec' in ClusterVersion"
)
cluster_id = spec.get("clusterID")
if not isinstance(cluster_id, str) or not cluster_id.strip():
raise ClusterIDUnavailableError(
raise InvalidClusterVersionError(
"Missing or invalid 'clusterID' in ClusterVersion"
)
cls._cluster_id = cluster_id
return cluster_id
except KeyError as e:
except ApiException as e:
# Handle specific HTTP status codes from Kubernetes API
if e.status is None:
# No status code indicates a connection/network issue
logger.error("Kubernetes API error with no status code: %s", e.reason)
raise K8sAPIConnectionError(
f"Failed to connect to Kubernetes API: {e.reason}"
) from e

if e.status == HTTPStatus.NOT_FOUND:
logger.error(
"ClusterVersion resource 'version' not found in cluster: %s",
e.reason,
)
raise ClusterVersionNotFoundError(
"ClusterVersion 'version' resource not found in OpenShift cluster"
) from e
if e.status == HTTPStatus.FORBIDDEN:
logger.error(
"Permission denied to access ClusterVersion resource: %s", e.reason
)
raise ClusterVersionPermissionError(
"Insufficient permissions to read ClusterVersion resource"
) from e
# Classify errors by status code range
# 5xx errors and 429 (rate limit) are transient - map to 503
if (
e.status >= HTTPStatus.INTERNAL_SERVER_ERROR
or e.status == HTTPStatus.TOO_MANY_REQUESTS
):
logger.error(
"Kubernetes API unavailable while fetching ClusterVersion (status %s): %s",
e.status,
e.reason,
)
raise K8sAPIConnectionError(
f"Failed to connect to Kubernetes API: {e.reason} (status {e.status})"
) from e
# All other errors (4xx client errors) are configuration issues - map to 500
logger.error(
"Failed to get cluster_id from cluster, missing keys in version object"
"Kubernetes API returned client error while fetching "
"ClusterVersion (status %s): %s",
e.status,
e.reason,
)
raise ClusterIDUnavailableError("Failed to get cluster ID") from e
except ApiException as e:
logger.error("API exception during ClusterInfo: %s", e)
raise ClusterIDUnavailableError("Failed to get cluster ID") from e
except Exception as e:
logger.error("Unexpected error during getting cluster ID: %s", e)
raise ClusterIDUnavailableError("Failed to get cluster ID") from e
raise K8sConfigurationError(
f"Kubernetes API request failed: {e.reason} (status {e.status})"
) from e

@classmethod
def get_cluster_id(cls) -> str:
Expand All @@ -207,7 +284,10 @@ def get_cluster_id(cls) -> str:
str: The cluster identifier.

Raises:
ClusterIDUnavailableError: If running in-cluster and fetching the cluster ID fails.
K8sAPIConnectionError: If the Kubernetes API is unreachable.
ClusterVersionNotFoundError: If the ClusterVersion resource does not exist.
ClusterVersionPermissionError: If access to ClusterVersion is denied.
InvalidClusterVersionError: If ClusterVersion has invalid structure.
"""
if cls._instance is None:
cls()
Expand All @@ -230,7 +310,10 @@ def get_user_info(token: str) -> Optional[kubernetes.client.V1TokenReviewStatus]
The V1TokenReviewStatus if the token is valid, None otherwise.

Raises:
HTTPException: If unable to connect to Kubernetes API or unexpected error occurs.
HTTPException:
503 if Kubernetes API is unavailable (5xx errors, 429 rate limit).
503 if unable to initialize Kubernetes client.
500 if Kubernetes API configuration issue (4xx errors).
"""
try:
auth_api = K8sClientSingleton.get_authn_api()
Expand All @@ -254,8 +337,47 @@ def get_user_info(token: str) -> Optional[kubernetes.client.V1TokenReviewStatus]
if status is not None and status.authenticated:
return status
return None
except ApiException as e:
if e.status is None:
logger.error(
"Kubernetes API error during TokenReview with no status code: %s",
e.reason,
)
response = ServiceUnavailableResponse(
backend_name="Kubernetes API",
cause=f"Failed to connect to Kubernetes API: {e.reason}",
)
raise HTTPException(**response.model_dump()) from e

# 5xx errors and 429 (rate limit) are transient - map to 503
if (
e.status >= HTTPStatus.INTERNAL_SERVER_ERROR
or e.status == HTTPStatus.TOO_MANY_REQUESTS
):
logger.error(
"Kubernetes API unavailable during TokenReview (status %s): %s",
e.status,
e.reason,
)
response = ServiceUnavailableResponse(
backend_name="Kubernetes API",
cause=f"Kubernetes API unavailable: {e.reason} (status {e.status})",
)
raise HTTPException(**response.model_dump()) from e

# All other errors (4xx client errors) are configuration issues - map to 500
logger.error(
"Kubernetes API returned client error during TokenReview (status %s): %s",
e.status,
e.reason,
)
response_obj = InternalServerErrorResponse(
response="Internal server error",
cause=f"Kubernetes API request failed: {e.reason} (status {e.status})",
)
raise HTTPException(**response_obj.model_dump()) from e
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("API exception during TokenReview: %s", e)
logger.error("Unexpected error during TokenReview: %s", e)
return None


Expand Down Expand Up @@ -325,11 +447,20 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
if user.username == "kube:admin":
try:
user.uid = K8sClientSingleton.get_cluster_id()
except ClusterIDUnavailableError as e:
logger.error("Failed to get cluster ID: %s", e)
except K8sAPIConnectionError as e:
# Kubernetes API is unreachable - return 503
logger.error("Cannot connect to Kubernetes API: %s", e)
response = ServiceUnavailableResponse(
backend_name="Kubernetes API",
cause=str(e),
)
raise HTTPException(**response.model_dump()) from e
except K8sConfigurationError as e:
# Cluster misconfiguration or client error - return 500
logger.error("Cluster configuration error: %s", e)
response = InternalServerErrorResponse(
response="Internal server error",
cause="Unable to retrieve cluster ID",
cause=str(e),
)
Comment on lines 461 to 464
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
nl -ba src/models/responses.py | sed -n '1689,1715p'
nl -ba src/models/responses.py | sed -n '2367,2415p'
nl -ba src/authentication/k8s.py | sed -n '395,401p'

Repository: lightspeed-core/lightspeed-stack

Length of output: 118


🏁 Script executed:

# View the line ranges from responses.py and k8s.py using sed
sed -n '1689,1715p' src/models/responses.py
echo "---"
sed -n '2367,2415p' src/models/responses.py
echo "---"
sed -n '395,401p' src/authentication/k8s.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 3289


🏁 Script executed:

# Check HTTPStatus import in k8s.py
head -50 src/authentication/k8s.py | grep -i "httpstatus\|from http"

# Find other InternalServerErrorResponse usages in the codebase
rg "InternalServerErrorResponse" -A 2 -B 1

# Check if HTTPStatus is imported
rg "HTTPStatus" src/authentication/k8s.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 50388


Add missing status_code parameter to InternalServerErrorResponse constructor.

Line 397-400 constructs InternalServerErrorResponse without the required status_code parameter. Since AbstractErrorResponse.__init__ requires status_code: int as a keyword-only argument, this will raise TypeError at runtime.

💡 Proposed fix
                 response = InternalServerErrorResponse(
                     response="Internal server error",
                     cause=str(e),
+                    status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/authentication/k8s.py` around lines 397 - 400, The constructor call
creating InternalServerErrorResponse is missing the required keyword-only
status_code argument (AbstractErrorResponse.__init__ requires status_code: int),
causing a TypeError at runtime; update the instantiation of
InternalServerErrorResponse (where it's created) to pass status_code=500 (or the
appropriate HTTP status constant) along with response and cause so the object
initializes correctly.

raise HTTPException(**response.model_dump()) from e

Expand Down
33 changes: 32 additions & 1 deletion src/models/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,27 @@ class InternalServerErrorResponse(AbstractErrorResponse):
"cause": "Failed to query the database",
},
},
{
"label": "cluster version not found",
"detail": {
"response": "Internal server error",
"cause": "ClusterVersion 'version' resource not found in OpenShift cluster",
},
},
{
"label": "cluster version permission denied",
"detail": {
"response": "Internal server error",
"cause": "Insufficient permissions to read ClusterVersion resource",
},
},
{
"label": "invalid cluster version",
"detail": {
"response": "Internal server error",
"cause": "ClusterVersion missing required field: 'clusterID'",
},
},
]
}
}
Expand Down Expand Up @@ -2537,7 +2558,17 @@ class ServiceUnavailableResponse(AbstractErrorResponse):
"response": "Unable to connect to Llama Stack",
"cause": "Connection error while trying to reach backend service.",
},
}
},
{
"label": "kubernetes api",
"detail": {
"response": "Unable to connect to Kubernetes API",
"cause": (
"Failed to connect to Kubernetes API: "
"Service Unavailable (status 503)"
),
},
},
]
}
}
Expand Down
Loading
Loading