Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion cloud_pipelines_backend/filter_query_sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import dataclasses
import datetime
import json
import enum
from typing import Final
Expand All @@ -17,6 +18,7 @@
class SystemKey(enum.StrEnum):
CREATED_BY = f"{_PIPELINE_RUN_KEY_PREFIX}created_by"
NAME = f"{_PIPELINE_RUN_KEY_PREFIX}name"
CREATED_AT = f"{_PIPELINE_RUN_KEY_PREFIX}date.created_at"


SYSTEM_KEY_SUPPORTED_PREDICATES: dict[SystemKey, set[type]] = {
Expand All @@ -31,6 +33,9 @@ class SystemKey(enum.StrEnum):
filter_query_models.ValueContainsPredicate,
filter_query_models.ValueInPredicate,
},
SystemKey.CREATED_AT: {
filter_query_models.TimeRangePredicate,
},
}

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -72,6 +77,8 @@ def _get_predicate_key(*, predicate: filter_query_models.Predicate) -> str | Non
return predicate.value_contains.key
case filter_query_models.ValueInPredicate():
return predicate.value_in.key
case filter_query_models.TimeRangePredicate():
return predicate.time_range.key
case _:
return None

Expand Down Expand Up @@ -299,8 +306,9 @@ def _predicate_to_clause(
return _value_contains_to_clause(predicate=predicate)
case filter_query_models.ValueInPredicate():
return _value_in_to_clause(predicate=predicate)
case filter_query_models.TimeRangePredicate():
return _time_range_to_clause(predicate=predicate)
case _:
# TODO: TimeRangePredicate -- not supported currently, will be supported in the future.
raise NotImplementedError(
f"Predicate type {type(predicate).__name__} is not yet implemented."
)
Expand Down Expand Up @@ -361,3 +369,56 @@ def _value_in_to_clause(
bts.PipelineRunAnnotation.value.in_(predicate.value_in.values),
],
)


# ---------------------------------------------------------------------------
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for start_time is None. Changes from code review.

# Column-based predicates (bypass annotation table)
# ---------------------------------------------------------------------------


def _time_range_to_clause(
*, predicate: filter_query_models.TimeRangePredicate
) -> sql.ColumnElement:
"""Build a WHERE clause for pipeline_run.created_at from a time range.

Pydantic's AwareDatetime preserves the original timezone offset, so we
must normalize to naive UTC before comparing against the DB column.

The DB stores "naive UTC" datetimes -- the values represent UTC but carry
no timezone label. For example, the DB stores '2024-01-01 02:30:00', not
'2024-01-01 02:30:00+00:00'. The UtcDateTime type decorator (in
backend_types_sql.py) strips tzinfo on write and re-attaches UTC on read.

Conversion pipeline for input '2024-01-01T08:00:00+05:30':

API request (JSON string)
'2024-01-01T08:00:00+05:30'
|
v
Pydantic AwareDatetime (preserves offset)
datetime(2024, 1, 1, 8, 0, 0, tzinfo=+05:30)
|
v .astimezone(utc) -- converts 08:00 - 05:30 = 02:30
UTC-aware datetime
datetime(2024, 1, 1, 2, 30, 0, tzinfo=UTC)
|
v .replace(tzinfo=None) -- strips timezone label
Naive datetime
datetime(2024, 1, 1, 2, 30, 0)
|
v SQLAlchemy literal_binds -- adds microsecond precision
SQL string
'2024-01-01 02:30:00.000000' <-- matches DB storage format
"""
tr = predicate.time_range
if tr.key != SystemKey.CREATED_AT:
raise errors.InvalidAnnotationKeyError(
f"time_range only supports key {SystemKey.CREATED_AT!r}, got {tr.key!r}"
)
# Convert aware datetimes to naive UTC to match DB storage format.
start_utc = tr.start_time.astimezone(datetime.timezone.utc).replace(tzinfo=None)
clauses: list[sql.ColumnElement] = [bts.PipelineRun.created_at >= start_utc]
if tr.end_time is not None:
end_utc = tr.end_time.astimezone(datetime.timezone.utc).replace(tzinfo=None)
clauses.append(bts.PipelineRun.created_at < end_utc)
return sql.and_(*clauses)
Loading