Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ def _get_current_time() -> datetime.datetime:
from .errors import ItemNotFoundError


@dataclasses.dataclass(kw_only=True)
class ExecutionStatusSummary:
total_executions: int = 0
ended_executions: int = 0
has_ended: bool = False

def count_execution_status(
self, *, status: bts.ContainerExecutionStatus, count: int
) -> None:
self.total_executions += count
if status in bts.CONTAINER_STATUSES_ENDED:
self.ended_executions += count

self.has_ended = self.ended_executions == self.total_executions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pattern I would typically try to avoid in other programming languages. Here we are creating a an instance of the class with some initial state, then calling a function on that instance to later on to hydrate the instance with some values, to then return the instance to a caller.

What I would strive to due instead is have a pure / read only model that doesn't write to itself, or get written to after instatiation, but instead gets instantiated with its final values in the constructor. In this case, you could have a function build_execution_status_summar_from_stats that calculates the model attributes, instantiates the model, and returns the model in a read only state.

I could give examples of how this would look in other languages but that wouldn't be helpful.

Here is what Gemini says about implementing that in Python in a "pythonic" way:

@dataclasses.dataclass(frozen=True) # frozen=True makes instances immutable
class ExecutionStatusSummary:
    """A read-only summary of execution statuses."""
    total_executions: int
    ended_executions: int
    has_ended: bool

    @classmethod
    def from_stats(
        cls, stats: Dict[bts.ContainerExecutionStatus, int]
    ) -> "ExecutionStatusSummary":
        """
        Calculates summary attributes from a stats dictionary and returns a new
        immutable instance.
        """
        total_executions = sum(stats.values())

        ended_executions = sum(
            count
            for status, count in stats.items()
            if status in bts.CONTAINER_STATUSES_ENDED
        )

        # This logic is more robust: an empty set of executions hasn't "ended".
        has_ended = total_executions > 0 and ended_executions == total_executions

        return cls(
            total_executions=total_executions,
            ended_executions=ended_executions,
            has_ended=has_ended,
        )

and consuming it as:

summary = ExecutionStatusSummary.from_stats(stats)

Copy link
Collaborator

@morgan-wowk morgan-wowk Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not based on your exact changes and just an example ☝️ . In other words, not a copy/paste replacement and just a suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer less mutation too.
We've discussed this case with Yue Chao and the main reason for mutating implementation is because the instance will be constructed from multiple status stats objects.
Although, what we could do in that case is to first combine multiple dicts into one (via a new function or collection.Counter) and then initialize the structure.

full_status_stats = collections.Counter()
for status_stats in list_of_status_stats:
    full_status_stats.update(status_stats)
ExecutionStatusSummary.from_status_stats(full_status_stats)



# ==== PipelineJobService
@dataclasses.dataclass(kw_only=True)
class PipelineRunResponse:
Expand All @@ -43,6 +59,7 @@ class PipelineRunResponse:
created_at: datetime.datetime | None = None
pipeline_name: str | None = None
execution_status_stats: dict[str, int] | None = None
execution_summary: ExecutionStatusSummary | None = None

@classmethod
def from_db(cls, pipeline_run: bts.PipelineRun) -> "PipelineRunResponse":
Expand Down Expand Up @@ -241,10 +258,12 @@ def create_pipeline_run_response(
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
response.execution_status_stats = self._get_execution_status_stats(
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=pipeline_run.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
return response

return ListPipelineJobsResponse(
Expand All @@ -255,15 +274,20 @@ def create_pipeline_run_response(
next_page_token=next_page_token,
)

def _get_execution_status_stats(
def _get_execution_stats_and_summary(
self,
session: orm.Session,
root_execution_id: bts.IdType,
) -> dict[str, int]:
) -> tuple[dict[str, int], ExecutionStatusSummary]:
stats = self._calculate_execution_status_stats(
session=session, root_execution_id=root_execution_id
)
return {status.value: count for status, count in stats.items()}
summary = ExecutionStatusSummary()
status_stats: dict[str, int] = {}
for status, count in stats.items():
summary.count_execution_status(status=status, count=count)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status_stats[status.value] = count
return status_stats, summary

def _calculate_execution_status_stats(
self, session: orm.Session, root_execution_id: bts.IdType
Expand Down Expand Up @@ -482,22 +506,6 @@ class ArtifactNodeIdResponse:
id: bts.IdType


@dataclasses.dataclass(kw_only=True)
class ExecutionStatusSummary:
total_executions: int = 0
ended_executions: int = 0
has_ended: bool = False

def count_execution_status(
self, *, status: bts.ContainerExecutionStatus, count: int
) -> None:
self.total_executions += count
if status in bts.CONTAINER_STATUSES_ENDED:
self.ended_executions += count

self.has_ended = self.ended_executions == self.total_executions


@dataclasses.dataclass
class GetGraphExecutionStateResponse:
child_execution_status_stats: dict[bts.IdType, dict[str, int]]
Expand Down
10 changes: 8 additions & 2 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,14 @@ def test_list_with_execution_stats(self):
with session_factory() as session:
result = service.list(session=session, include_execution_stats=True)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].root_execution_id == root_id
stats = result.pipeline_runs[0].execution_status_stats
run = result.pipeline_runs[0]
assert run.root_execution_id == root_id
stats = run.execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1
summary = run.execution_summary
assert summary is not None
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False