Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from . import backend_types_sql as bts
from . import component_structures as structures
from . import database_ops
from . import errors
from . import filter_query_sql

Expand Down Expand Up @@ -113,19 +114,15 @@ def create(
},
)
session.add(pipeline_run)
# Mirror created_by into the annotations table so it's searchable
# via filter_query like any other annotation.
if created_by is not None:
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
session.flush()
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run.id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
value=created_by,
)
)
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
session.flush()
_mirror_system_annotations(
session=session,
pipeline_run_id=pipeline_run.id,
created_by=created_by,
pipeline_name=pipeline_name,
)
session.commit()

session.refresh(pipeline_run)
Expand Down Expand Up @@ -244,12 +241,9 @@ def _create_pipeline_run_response(
bts.ExecutionNode, pipeline_run.root_execution_id
)
if execution_node:
task_spec = structures.TaskSpec.from_json_dict(
execution_node.task_spec
pipeline_name = database_ops.get_pipeline_name_from_task_spec(
task_spec_dict=execution_node.task_spec
)
component_spec = task_spec.component_ref.spec
if component_spec:
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
execution_status_stats = self._calculate_execution_status_stats(
Expand Down Expand Up @@ -1153,6 +1147,32 @@ def list_secrets(
]


def _mirror_system_annotations(
*,
session: orm.Session,
pipeline_run_id: bts.IdType,
created_by: str | None,
pipeline_name: str | None,
) -> None:
"""Mirror pipeline run fields as system annotations for filter_query search."""
if created_by:
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
value=created_by,
)
)
if pipeline_name:
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.NAME,
value=pipeline_name,
)
)


def _recursively_create_all_executions_and_artifacts_root(
session: orm.Session,
root_task_spec: structures.TaskSpec,
Expand Down
180 changes: 180 additions & 0 deletions cloud_pipelines_backend/database_ops.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import logging
from typing import Any

import sqlalchemy
from sqlalchemy import orm

from . import backend_types_sql as bts
from . import component_structures as structures
from . import filter_query_sql

logger = logging.getLogger(__name__)


def create_db_engine_and_migrate_db(
database_uri: str,
Expand Down Expand Up @@ -87,6 +93,7 @@ def migrate_db(db_engine: sqlalchemy.Engine):
break

_backfill_pipeline_run_created_by_annotations(db_engine=db_engine)
_backfill_pipeline_run_name_annotations(db_engine=db_engine)


def _is_pipeline_run_annotation_key_already_backfilled(
Expand All @@ -106,6 +113,27 @@ def _is_pipeline_run_annotation_key_already_backfilled(
).scalar()


def get_pipeline_name_from_task_spec(
*,
task_spec_dict: dict[str, Any],
) -> str | None:
"""Extract pipeline name from a task_spec dict via component_ref.spec.name.

Traversal path:
task_spec_dict -> TaskSpec -> component_ref -> spec -> name

Returns None if any step in the chain is missing or parsing fails.
"""
try:
task_spec = structures.TaskSpec.from_json_dict(task_spec_dict)
except Exception:
return None
spec = task_spec.component_ref.spec
if spec is None:
return None
return spec.name or None


def _backfill_pipeline_run_created_by_annotations(
*,
db_engine: sqlalchemy.Engine,
Expand Down Expand Up @@ -142,3 +170,155 @@ def _backfill_pipeline_run_created_by_annotations(
)
session.execute(stmt)
session.commit()


def _backfill_pipeline_names_from_extra_data(
*,
db_engine: sqlalchemy.Engine,
) -> None:
"""Phase 1: bulk SQL backfill from extra_data['pipeline_name'].

INSERT INTO pipeline_run_annotation
SELECT id, key, json_extract(extra_data, '$.pipeline_name')
FROM pipeline_run
WHERE json_extract(...) IS NOT NULL AND != ''

SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL
when extra_data is NULL or the key is absent (no Python error).
"""
with orm.Session(db_engine) as session:
pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string()
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
["pipeline_run_id", "key", "value"],
sqlalchemy.select(
bts.PipelineRun.id,
sqlalchemy.literal(
filter_query_sql.PipelineRunAnnotationSystemKey.NAME
),
pipeline_name_expr,
).where(
pipeline_name_expr.isnot(None),
pipeline_name_expr != "",
),
)
session.execute(stmt)
session.commit()


def _backfill_pipeline_names_from_component_spec(
*,
db_engine: sqlalchemy.Engine,
) -> None:
"""Phase 2: Python fallback for runs still missing a name annotation.

Find the "delta" -- runs that still have no name annotation
after Phase 1 -- using a LEFT JOIN anti-join pattern:

SELECT pr.id, pr.root_execution_id
FROM pipeline_run pr
LEFT JOIN pipeline_run_annotation ann
ON ann.pipeline_run_id = pr.id
AND ann.key = 'system/pipeline_run.name'
WHERE ann.pipeline_run_id IS NULL

How the LEFT JOIN works:

pipeline_run pipeline_run_annotation
+----+------------------+ +--------+---------------------------+-------+
| id | root_exec_id | | run_id | key | value |
+----+------------------+ +--------+---------------------------+-------+
| 1 | exec_1 | | 1 | system/pipeline_run.name | foo |
| 2 | exec_2 | | 3 | system/pipeline_run.name | bar |
| 3 | exec_3 | +--------+---------------------------+-------+
| 4 | exec_4 |
+----+------------------+

LEFT JOIN result (ON run_id = id AND key = 'system/pipeline_run.name'):
+----+------------------+------------+-----------+
| id | root_exec_id | ann.run_id | ann.value |
+----+------------------+------------+-----------+
| 1 | exec_1 | 1 | foo | <- matched
| 2 | exec_2 | NULL | NULL | <- no match
| 3 | exec_3 | 3 | bar | <- matched
| 4 | exec_4 | NULL | NULL | <- no match
+----+------------------+------------+-----------+

+ WHERE ann.pipeline_run_id IS NULL -> rows 2, 4 (the delta)

For each delta run, load execution_node.task_spec and extract
the name via:
task_spec_dict -> TaskSpec -> component_ref -> spec -> name
"""
key = filter_query_sql.PipelineRunAnnotationSystemKey.NAME
ann = bts.PipelineRunAnnotation
with orm.Session(db_engine) as session:
delta_query = (
sqlalchemy.select(
bts.PipelineRun.id,
bts.PipelineRun.root_execution_id,
)
.outerjoin(
ann,
sqlalchemy.and_(
ann.pipeline_run_id == bts.PipelineRun.id,
ann.key == key,
),
)
.where(ann.pipeline_run_id.is_(None))
)
delta_rows = session.execute(delta_query).all()

for run_id, root_execution_id in delta_rows:
execution_node = session.get(bts.ExecutionNode, root_execution_id)
if execution_node is None:
logger.warning(
f"Backfill pipeline run name: run {run_id} has no "
f"execution node (root_execution_id={root_execution_id}), "
"skipping. TODO: consider inserting 'UNKNOWN'?"
)
continue
name = get_pipeline_name_from_task_spec(
task_spec_dict=execution_node.task_spec
)
if name:
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=run_id, key=key, value=name
)
)
else:
logger.warning(
f"Backfill pipeline run name: run {run_id} has no "
"resolvable pipeline name from task_spec "
f"(root_execution_id={root_execution_id}), "
"skipping. TODO: consider inserting 'UNKNOWN'?"
)
session.commit()


def _backfill_pipeline_run_name_annotations(
*,
db_engine: sqlalchemy.Engine,
) -> None:
"""Backfill pipeline_run_annotation with pipeline names.

Skips entirely if any name annotation already exists (i.e. the
write-path is populating them, so the backfill has already run or is
no longer needed).

Phase 1 -- _backfill_pipeline_names_from_extra_data:
Bulk SQL insert from extra_data['pipeline_name'].

Phase 2 -- _backfill_pipeline_names_from_component_spec:
Python fallback for runs Phase 1 missed (extra_data is NULL or
missing the key). Resolves name via component_ref.spec.name.
"""
with orm.Session(db_engine) as session:
if _is_pipeline_run_annotation_key_already_backfilled(
session=session,
key=filter_query_sql.PipelineRunAnnotationSystemKey.NAME,
):
return

_backfill_pipeline_names_from_extra_data(db_engine=db_engine)
_backfill_pipeline_names_from_component_spec(db_engine=db_engine)
Comment on lines +322 to +324
Copy link
Collaborator Author

@yuechao-qin yuechao-qin Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these in a single transaction.

Also, look into removing phase 1 (extra_data) and see if we can do eveyrhting for phase 2 in a single query.

https://dev.mysql.com/doc/refman/8.4/en/json-search-functions.html

SELECT pipeline_run.id, "system/piepline_run.name", JSON_EXTRACT(execution_node.task_spec, ".componentRef.spec.name")
FROM pipeline_run
JOIN
execution_node

INSERT from SELECT

17 changes: 15 additions & 2 deletions cloud_pipelines_backend/filter_query_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from . import filter_query_models

SYSTEM_KEY_PREFIX: Final[str] = "system/"
_PIPELINE_RUN_KEY_PREFIX: Final[str] = f"{SYSTEM_KEY_PREFIX}pipeline_run."


class PipelineRunAnnotationSystemKey(enum.StrEnum):
CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by"
CREATED_BY = f"{_PIPELINE_RUN_KEY_PREFIX}created_by"
NAME = f"{_PIPELINE_RUN_KEY_PREFIX}name"


SYSTEM_KEY_SUPPORTED_PREDICATES: dict[PipelineRunAnnotationSystemKey, set[type]] = {
Expand All @@ -22,6 +24,12 @@ class PipelineRunAnnotationSystemKey(enum.StrEnum):
filter_query_models.ValueEqualsPredicate,
filter_query_models.ValueInPredicate,
},
PipelineRunAnnotationSystemKey.NAME: {
filter_query_models.KeyExistsPredicate,
filter_query_models.ValueEqualsPredicate,
filter_query_models.ValueContainsPredicate,
filter_query_models.ValueInPredicate,
},
}

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -237,7 +245,12 @@ def _convert_legacy_filter_to_filter_query(
"Legacy filter 'created_by' requires a non-empty value."
)
predicates.append(
{"value_equals": {"key": SystemKey.CREATED_BY, "value": value}}
{
"value_equals": {
"key": PipelineRunAnnotationSystemKey.CREATED_BY,
"value": value,
}
}
)
else:
raise NotImplementedError(f"Unsupported filter {filter_value}.")
Expand Down
Loading