Skip to content
66 changes: 56 additions & 10 deletions src/sentry/issues/escalating/escalating.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import logging
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from datetime import datetime, timedelta
Expand Down Expand Up @@ -33,20 +34,27 @@
from sentry.models.group import Group, GroupStatus
from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
from sentry.models.groupinbox import GroupInboxReason, InboxReasonDetails, add_group_to_inbox
from sentry.search.eap.occurrences.query import count_occurrences
from sentry.search.eap.occurrences.rollout_utils import (
should_callsite_use_eap_data_in_read,
should_double_read_from_eap,
validate_read,
)
from sentry.services.eventstore.models import GroupEvent
from sentry.signals import issue_escalating
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.referrer import Referrer
from sentry.types.activity import ActivityType
from sentry.types.group import GroupSubStatus
from sentry.utils.cache import cache
from sentry.utils.snuba import raw_snql_query

__all__ = ["query_groups_past_counts", "parse_groups_past_counts"]

REFERRER = "sentry.issues.escalating"
logger = logging.getLogger(__name__)

# The amount of data needed to generate a group forecast
BUCKETS_PER_GROUP = 7 * 24
IS_ESCALATING_REFERRER = "sentry.issues.escalating.is_escalating"
GROUP_HOURLY_COUNT_TTL = 60
HOUR = 3600 # 3600 seconds

Expand Down Expand Up @@ -165,11 +173,14 @@ def _query_with_pagination(
)
request = Request(
dataset=_issue_category_dataset(category),
app_id=REFERRER,
app_id=Referrer.ESCALATING_GROUPS.value,
query=query,
tenant_ids={"referrer": REFERRER, "organization_id": organization_id},
tenant_ids={
"referrer": Referrer.ESCALATING_GROUPS.value,
"organization_id": organization_id,
},
)
results = raw_snql_query(request, referrer=REFERRER)["data"]
results = raw_snql_query(request, referrer=Referrer.ESCALATING_GROUPS.value)["data"]

all_results += results
offset += ELEMENTS_PER_SNUBA_PAGE
Expand Down Expand Up @@ -244,7 +255,7 @@ def _extract_organization_and_project_and_group_ids(
return group_ids_by_organization


def get_group_hourly_count(group: Group) -> int:
def get_group_hourly_count_snuba(group: Group) -> int:
"""Return the number of events a group has had today in the last hour"""
key = f"hourly-group-count:{group.project.id}:{group.id}"
hourly_count = cache.get(key)
Expand All @@ -266,25 +277,60 @@ def get_group_hourly_count(group: Group) -> int:
)
request = Request(
dataset=_issue_category_dataset(group.issue_category),
app_id=IS_ESCALATING_REFERRER,
app_id=Referrer.IS_ESCALATING_GROUP.value,
query=query,
tenant_ids={
"referrer": IS_ESCALATING_REFERRER,
"referrer": Referrer.IS_ESCALATING_GROUP.value,
"organization_id": group.project.organization.id,
},
)
hourly_count = int(
raw_snql_query(request, referrer=IS_ESCALATING_REFERRER)["data"][0]["count()"]
raw_snql_query(request, referrer=Referrer.IS_ESCALATING_GROUP.value)["data"][0][
"count()"
]
)
cache.set(key, hourly_count, GROUP_HOURLY_COUNT_TTL)

return int(hourly_count)


def get_group_hourly_count_eap(group: Group) -> int:
"""Return the number of events a group has had today in the last hour"""
key = f"hourly-group-count-eap:{group.project.id}:{group.id}"
hourly_count = cache.get(key)

if hourly_count is None:
now = datetime.now()
current_hour = now.replace(minute=0, second=0, microsecond=0)

hourly_count = count_occurrences(
organization_id=group.project.organization.id,
project_ids=[group.project.id],
start=current_hour,
end=now,
referrer=Referrer.IS_ESCALATING_GROUP.value,
group_id=group.id,
)

cache.set(key, hourly_count, GROUP_HOURLY_COUNT_TTL)

return int(hourly_count)


def is_escalating(group: Group) -> tuple[bool, int | None]:
"""
Return whether the group is escalating and the daily forecast if it exists.
"""
group_hourly_count = get_group_hourly_count(group)
snuba_count = get_group_hourly_count_snuba(group)
group_hourly_count = snuba_count

if should_double_read_from_eap():
eap_count = get_group_hourly_count_eap(group)
validate_read(snuba_count, eap_count, "issues.escalating.is_escalating")

if should_callsite_use_eap_data_in_read("issues.escalating.is_escalating"):
group_hourly_count = eap_count

forecast_today = EscalatingGroupForecast.fetch_todays_forecast(group.project.id, group.id)
# Check if current event occurrence is greater than forecast for today's date
if forecast_today and group_hourly_count > forecast_today:
Expand Down
14 changes: 8 additions & 6 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3681,26 +3681,28 @@
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Controls whether an org should read data both from Snuba and EAP.
# Will not use or display the EAP data to the user; rather, will just compare the
# data from each source and log whether they match.
# Controls whether occurrence data should be read from both Snuba and EAP.
# Will not use or display the EAP data to the user; rather, will just (1) issue
# the queries to ensure that reads are functional and (2) compare the data from
# each source and log whether they match.
# This option should be controlled on a region-by-region basis.
register(
"eap.occurrences.should_double_read",
type=Bool,
default=False,
flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
)

# Controls whether a callsite should use EAP data instead of Snuba data.
# Callsites should only be added after they're known to be safe.
# Controls whether a given callsite should use occurrence data from EAP instead
# of Snuba. Callsites should only be added here after they're known to be safe.
# This option should be controlled on a region-by-region basis.
register(
"eap.occurrences.callsites_using_eap_data_allowlist",
type=Sequence,
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
)


# Killswich for LLM issue detection
register(
"issue-detection.llm-detection.enabled",
Expand Down
40 changes: 40 additions & 0 deletions src/sentry/search/eap/occurrences/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey

from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute
from sentry.search.eap.common_columns import COMMON_COLUMNS

OCCURRENCES_ALWAYS_PRESENT_ATTRIBUTES = [
AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT),
]


OCCURRENCE_COLUMNS = {
column.public_alias: column
for column in (
COMMON_COLUMNS
+ [
ResolvedAttribute(
public_alias="id",
internal_name="sentry.item_id",
search_type="string",
),
ResolvedAttribute(
public_alias="group_id",
internal_name="group_id",
search_type="integer",
),
]
)
}

OCCURRENCE_DEFINITIONS = ColumnDefinitions(
aggregates={}, # c.f. SPAN_AGGREGATE_DEFINITIONS when we're ready.
formulas={},
columns=OCCURRENCE_COLUMNS,
contexts={},
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE,
filter_aliases={},
alias_to_column=None,
column_to_alias=None,
)
124 changes: 124 additions & 0 deletions src/sentry/search/eap/occurrences/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import logging
from datetime import datetime

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column as EAPColumn
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeAggregation,
AttributeKey,
AttributeValue,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import Function as EAPFunction
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import StrArray
from sentry_protos.snuba.v1.trace_item_filter_pb2 import (
AndFilter,
ComparisonFilter,
TraceItemFilter,
)

from sentry.utils import snuba_rpc

logger = logging.getLogger(__name__)


def count_occurrences(
organization_id: int,
project_ids: list[int],
start: datetime,
end: datetime,
referrer: str,
group_id: int | None = None,
environments: list[str] | None = None,
) -> int:
"""
Count the number of occurrences in EAP matching the given filters.

Args:
organization_id: The organization ID
project_ids: List of project IDs to query
start: Start timestamp
end: End timestamp
referrer: Referrer string for the query
group_id: Optional group ID to filter by
environments: Optional list of environments to filter by

Returns:
The count of matching occurrences, or 0 if the query fails
"""
start_timestamp = Timestamp()
start_timestamp.FromDatetime(start)
end_timestamp = Timestamp()
end_timestamp.FromDatetime(end)

count_column = EAPColumn(
aggregation=AttributeAggregation(
aggregate=EAPFunction.FUNCTION_COUNT,
key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT),
),
label="count",
)

filters: list[TraceItemFilter] = []

if group_id is not None:
group_id_filter = TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_int=group_id),
)
)
filters.append(group_id_filter)

if environments:
environment_filter = TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(name="environment", type=AttributeKey.TYPE_STRING),
op=ComparisonFilter.OP_IN,
value=AttributeValue(val_str_array=StrArray(values=environments)),
)
)
filters.append(environment_filter)

item_filter = None
if len(filters) == 1:
item_filter = filters[0]
elif len(filters) > 1:
item_filter = TraceItemFilter(and_filter=AndFilter(filters=filters))

request = TraceItemTableRequest(
meta=RequestMeta(
organization_id=organization_id,
project_ids=project_ids,
cogs_category="issues",
referrer=referrer,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE,
),
columns=[count_column],
filter=item_filter,
limit=1,
)

try:
count = 0
responses = snuba_rpc.table_rpc([request])
if responses and responses[0].column_values:
results = responses[0].column_values[0].results
if results:
count = int(results[0].val_double)
return count
except Exception:
logger.exception(
"Fetching occurrence count from EAP failed",
extra={
"organization_id": organization_id,
"project_ids": project_ids,
"group_id": group_id,
"referrer": referrer,
},
)
return 0
Loading
Loading