diff --git a/src/sentry/issues/escalating/escalating.py b/src/sentry/issues/escalating/escalating.py index 4eb305fc31bada..e8808e769848fe 100644 --- a/src/sentry/issues/escalating/escalating.py +++ b/src/sentry/issues/escalating/escalating.py @@ -33,6 +33,12 @@ from sentry.models.group import Group, GroupStatus from sentry.models.grouphistory import GroupHistoryStatus, record_group_history from sentry.models.groupinbox import GroupInboxReason, InboxReasonDetails, add_group_to_inbox +from sentry.search.eap.occurrences.common_queries import count_occurrences +from sentry.search.eap.occurrences.rollout_utils import ( + should_callsite_use_eap_data_in_read, + should_double_read_from_eap, + validate_read, +) from sentry.services.eventstore.models import GroupEvent from sentry.signals import issue_escalating from sentry.snuba.dataset import Dataset, EntityKey @@ -246,7 +252,7 @@ def _extract_organization_and_project_and_group_ids( return group_ids_by_organization -def get_group_hourly_count(group: Group) -> int: +def get_group_hourly_count_snuba(group: Group) -> int: """Return the number of events a group has had today in the last hour""" key = f"hourly-group-count:{group.project.id}:{group.id}" hourly_count = cache.get(key) @@ -281,6 +287,28 @@ def get_group_hourly_count(group: Group) -> int: ] ) cache.set(key, hourly_count, GROUP_HOURLY_COUNT_TTL) + + return int(hourly_count) + + +def get_group_hourly_count_eap(group: Group) -> int: + """Return the number of events a group has had today in the last hour""" + key = f"hourly-group-count-eap:{group.project.id}:{group.id}" + hourly_count = cache.get(key) + + if hourly_count is None: + now = datetime.now() + current_hour = now.replace(minute=0, second=0, microsecond=0) + hourly_count = count_occurrences( + organization=group.project.organization, + projects=[group.project], + start=current_hour, + end=now, + referrer=Referrer.IS_ESCALATING_GROUP.value, + group_id=group.id, + ) + cache.set(key, hourly_count, GROUP_HOURLY_COUNT_TTL) + return int(hourly_count) @@ -288,7 +316,16 @@ def is_escalating(group: Group) -> tuple[bool, int | None]: """ Return whether the group is escalating and the daily forecast if it exists. """ - group_hourly_count = get_group_hourly_count(group) + snuba_count = get_group_hourly_count_snuba(group) + group_hourly_count = snuba_count + + if should_double_read_from_eap(): + eap_count = get_group_hourly_count_eap(group) + validate_read(snuba_count, eap_count, "issues.escalating.is_escalating") + + if should_callsite_use_eap_data_in_read("issues.escalating.is_escalating"): + group_hourly_count = eap_count + forecast_today = EscalatingGroupForecast.fetch_todays_forecast(group.project.id, group.id) # Check if current event occurrence is greater than forecast for today's date if forecast_today and group_hourly_count > forecast_today: diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index f49c13d3a906bc..e6d3d731f9cb74 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3690,9 +3690,11 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) -# Controls whether an org should read data both from Snuba and EAP. -# Will not use or display the EAP data to the user; rather, will just compare the -# data from each source and log whether they match. +# Controls whether occurrence data should be read from both Snuba and EAP. +# Will not use or display the EAP data to the user; rather, will just (1) issue +# the queries to ensure that reads are functional and (2) compare the data from +# each source and log whether they match. +# This option should be controlled on a region-by-region basis. register( "eap.occurrences.should_double_read", type=Bool, @@ -3700,8 +3702,9 @@ flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE, ) -# Controls whether a callsite should use EAP data instead of Snuba data. -# Callsites should only be added after they're known to be safe. +# Controls whether a given callsite should use occurrence data from EAP instead +# of Snuba. Callsites should only be added here after they're known to be safe. +# This option should be controlled on a region-by-region basis. register( "eap.occurrences.callsites_using_eap_data_allowlist", type=Sequence, @@ -3709,7 +3712,6 @@ flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE, ) - # Killswich for LLM issue detection register( "issue-detection.llm-detection.enabled", diff --git a/src/sentry/search/eap/occurrences/common_queries.py b/src/sentry/search/eap/occurrences/common_queries.py new file mode 100644 index 00000000000000..d7fda782c4e11b --- /dev/null +++ b/src/sentry/search/eap/occurrences/common_queries.py @@ -0,0 +1,74 @@ +import logging +from collections.abc import Sequence +from datetime import datetime + +from sentry.models.environment import Environment +from sentry.models.organization import Organization +from sentry.models.project import Project +from sentry.search.eap.types import SearchResolverConfig +from sentry.search.events.types import SnubaParams +from sentry.snuba.occurrences_rpc import Occurrences + +logger = logging.getLogger(__name__) + + +def count_occurrences( + organization: Organization, + projects: Sequence[Project], + start: datetime, + end: datetime, + referrer: str, + group_id: int | None = None, + environments: Sequence[Environment] | None = None, +) -> int: + """ + Count the number of occurrences in EAP matching the given filters. + + Args: + organization: The organization to query + projects: List of projects to query + start: Start timestamp + end: End timestamp + referrer: Referrer string for the query + group_id: Optional group ID to filter by + environments: Optional list of environments to filter by + + Returns: + The count of matching occurrences, or 0 if the query fails + """ + query_string = f"group_id:{group_id}" if group_id is not None else "" + + snuba_params = SnubaParams( + start=start, + end=end, + organization=organization, + projects=projects, + environments=environments if environments else [], + ) + + try: + result = Occurrences.run_table_query( + params=snuba_params, + query_string=query_string, + selected_columns=["count()"], + orderby=None, + offset=0, + limit=1, + referrer=referrer, + config=SearchResolverConfig(), + ) + + if result["data"]: + return int(result["data"][0].get("count()", 0)) + return 0 + except Exception: + logger.exception( + "Fetching occurrence count from EAP failed", + extra={ + "organization_id": organization.id, + "project_ids": [p.id for p in projects], + "group_id": group_id, + "referrer": referrer, + }, + ) + return 0 diff --git a/src/sentry/search/eap/occurrences/definitions.py b/src/sentry/search/eap/occurrences/definitions.py index 0f44b7baa4ee17..e617523f76b70e 100644 --- a/src/sentry/search/eap/occurrences/definitions.py +++ b/src/sentry/search/eap/occurrences/definitions.py @@ -7,6 +7,7 @@ AttributeArgumentDefinition, ColumnDefinitions, ResolvedAttribute, + count_argument_resolver_optimized, ) from sentry.search.eap.common_columns import COMMON_COLUMNS @@ -15,6 +16,12 @@ ] +def count_processor(count_value: int | None) -> int: + if count_value is None: + return 0 + return count_value + + OCCURRENCE_COLUMNS = { column.public_alias: column for column in ( @@ -53,6 +60,29 @@ ) ], ), + "count": AggregateDefinition( + internal_function=Function.FUNCTION_COUNT, + infer_search_type_from_arguments=False, + default_search_type="integer", + processor=count_processor, + arguments=[ + AttributeArgumentDefinition( + attribute_types={ + "duration", + "number", + "integer", + "percentage", + "currency", + *constants.SIZE_TYPE, + *constants.DURATION_TYPE, + }, + default_arg="group_id", + ) + ], + attribute_resolver=count_argument_resolver_optimized( + OCCURRENCES_ALWAYS_PRESENT_ATTRIBUTES + ), + ), }, # c.f. SPAN_AGGREGATE_DEFINITIONS when we're ready. formulas={}, columns=OCCURRENCE_COLUMNS, diff --git a/tests/sentry/issues/escalating/test_escalating.py b/tests/sentry/issues/escalating/test_escalating.py index 0526c6753a5557..7063f5774af4b6 100644 --- a/tests/sentry/issues/escalating/test_escalating.py +++ b/tests/sentry/issues/escalating/test_escalating.py @@ -7,11 +7,17 @@ from uuid import uuid4 import pytest +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + TraceItemColumnValues, + TraceItemTableResponse, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeValue from sentry.issues.escalating.escalating import ( GroupsCountResponse, _start_and_end_dates, - get_group_hourly_count, + get_group_hourly_count_eap, + get_group_hourly_count_snuba, is_escalating, query_groups_past_counts, ) @@ -306,7 +312,7 @@ def test_hourly_count_query(self) -> None: assert group is not None # Events are aggregated in the hourly count query by date rather than the last 24hrs - assert get_group_hourly_count(group) == 1 + assert get_group_hourly_count_snuba(group) == 1 @freeze_time(TIME_YESTERDAY) def test_is_forecast_out_of_range(self) -> None: @@ -354,3 +360,91 @@ def test_is_escalating_two_weeks(self) -> None: # Test cache assert cache.get(f"hourly-group-count:{archived_group.project.id}:{archived_group.id}") == 6 + + +class TestGetGroupHourlyCountEAP(TestCase): + @patch("sentry.snuba.rpc_dataset_common.snuba_rpc.table_rpc") + def test_returns_count_from_eap(self, mock_table_rpc: mock.MagicMock) -> None: + group = self.create_group() + + mock_response = TraceItemTableResponse( + column_values=[ + TraceItemColumnValues( + attribute_name="count()", + results=[AttributeValue(val_double=42.0)], + ) + ] + ) + mock_table_rpc.return_value = [mock_response] + + result = get_group_hourly_count_eap(group) + + assert result == 42 + mock_table_rpc.assert_called_once() + + @patch("sentry.snuba.rpc_dataset_common.snuba_rpc.table_rpc") + def test_returns_zero_on_empty_column_values(self, mock_table_rpc: mock.MagicMock) -> None: + group = self.create_group() + + mock_response = TraceItemTableResponse(column_values=[]) + mock_table_rpc.return_value = [mock_response] + + result = get_group_hourly_count_eap(group) + + assert result == 0 + + @patch("sentry.snuba.rpc_dataset_common.snuba_rpc.table_rpc") + def test_returns_zero_on_exception(self, mock_table_rpc: mock.MagicMock) -> None: + group = self.create_group() + mock_table_rpc.side_effect = Exception("RPC failed") + + result = get_group_hourly_count_eap(group) + + assert result == 0 + + @patch("sentry.issues.escalating.escalating.EscalatingGroupForecast.fetch_todays_forecast") + @patch("sentry.issues.escalating.escalating.get_group_hourly_count_eap") + @patch("sentry.issues.escalating.escalating.get_group_hourly_count_snuba") + def test_uses_snuba_count_as_source_of_truth( + self, mock_snuba: mock.MagicMock, mock_eap: mock.MagicMock, mock_forecast: mock.MagicMock + ) -> None: + group = self.create_group() + + mock_snuba.return_value = 100 + mock_eap.return_value = 5 + mock_forecast.return_value = 50 + + with self.options({"eap.occurrences.should_double_read": True}): + result = is_escalating(group) + + # Should escalate because Snuba count (100) > forecast (50) + assert result == (True, 50) + mock_snuba.assert_called_once_with(group) + mock_eap.assert_called_once_with(group) + + @patch("sentry.issues.escalating.escalating.EscalatingGroupForecast.fetch_todays_forecast") + @patch("sentry.issues.escalating.escalating.get_group_hourly_count_eap") + @patch("sentry.issues.escalating.escalating.get_group_hourly_count_snuba") + def test_uses_eap_count_as_source_of_truth( + self, mock_snuba: mock.MagicMock, mock_eap: mock.MagicMock, mock_forecast: mock.MagicMock + ) -> None: + group = self.create_group() + + mock_snuba.return_value = 100 + mock_eap.return_value = 5 + mock_forecast.return_value = 50 + + with self.options( + { + "eap.occurrences.should_double_read": True, + "eap.occurrences.callsites_using_eap_data_allowlist": [ + "issues.escalating.is_escalating" + ], + } + ): + result = is_escalating(group) + + # Shouldn't escalate because EAP count (5) < forecast (50) + assert result == (False, None) + mock_snuba.assert_called_once_with(group) + mock_eap.assert_called_once_with(group) diff --git a/tests/sentry/snuba/test_occurrences_rpc.py b/tests/sentry/snuba/test_occurrences_rpc.py index 4e766b9d898d6a..5ddc5e813b9d72 100644 --- a/tests/sentry/snuba/test_occurrences_rpc.py +++ b/tests/sentry/snuba/test_occurrences_rpc.py @@ -1,4 +1,11 @@ -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue, IntArray +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeAggregation, + AttributeKey, + AttributeValue, + ExtrapolationMode, + Function, + IntArray, +) from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS @@ -56,3 +63,15 @@ def test_group_id_field(self) -> None: name="group_id", type=AttributeKey.Type.TYPE_INT ) assert virtual_context is None + + def test_count_aggregate(self) -> None: + resolved_column, virtual_context = self.resolver.resolve_column("count()") + assert resolved_column.proto_definition == AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(name="sentry.project_id", type=AttributeKey.Type.TYPE_INT), + label="count()", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ) + assert virtual_context is None + assert resolved_column.public_alias == "count()" + assert resolved_column.search_type == "integer"