Skip to content
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ LLM-based nodes require a model configured in `models.yaml` and runtime paramete

As of now, LLM inference is supported for TGI, vLLM, OpenAI, Azure, Azure OpenAI, Ollama and Triton compatible servers. Model deployment is external and configured in `models.yaml`.

## SyGra as a Platform

SyGra can be used as a reusable platform to build different categories of tasks on top of the same graph execution engine, node types, processors, and metric infrastructure.

### Eval

Evaluation tasks live under `tasks/eval` and provide a standard pattern for:

- Computing **unit metrics** per record during graph execution
- Computing **aggregator metrics** after the run via graph post-processing

See: [`tasks/eval/README.md`](https://github.com/ServiceNow/SyGra/blob/main/tasks/eval/README.md)

<!-- ![SygraComponents](https://raw.githubusercontent.com/ServiceNow/SyGra/refs/heads/main/docs/resources/images/sygra_usecase2framework.png) -->


Expand Down
27 changes: 18 additions & 9 deletions sygra/core/base_task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,9 @@ def _repeat_to_merge_sequentially(

# merge the primary and secondary dataframe horizontally by randomlly picking one and adding into primary
# primary : M rows(a columns), secondary: N rows(b columns), merged: M rows(a+b columns)
def _shuffle_and_extend(self, primary_df, secondary_df) -> pd.DataFrame:
def _shuffle_and_extend(
self, primary_df: pd.DataFrame, secondary_df: pd.DataFrame
) -> pd.DataFrame:
max_len = len(primary_df)
# Shuffle the secondary dataframe
shuffled_secondary = secondary_df.sample(frac=1).reset_index(drop=True)
Expand All @@ -560,7 +562,7 @@ def _shuffle_and_extend(self, primary_df, secondary_df) -> pd.DataFrame:
final_secondary = pd.concat([shuffled_secondary, extra_rows], ignore_index=True)

# now both dataset are same length, merge and return
return pd.concat([primary_df, final_secondary], axis=1)
return cast(pd.DataFrame, pd.concat([primary_df, final_secondary], axis=1))

def _load_source_data(
self, data_config: dict
Expand All @@ -587,8 +589,8 @@ def _load_source_data(
full_data = self.apply_transforms(source_config_obj, full_data)
elif isinstance(source_config, list):
# if multiple dataset configured as list
dataset_list = []
primary_df = None
dataset_list: list[dict[str, Any]] = []
primary_df: Optional[pd.DataFrame] = None
primary_config = None
# if multiple dataset, verify if join_type and alias is defined in each config(@source and @sink)
if isinstance(source_config, list):
Expand Down Expand Up @@ -650,6 +652,9 @@ def _load_source_data(
ds_conf: dict[str, Any] = ds.get("conf", {})
join_type = ds_conf.get(constants.DATASET_JOIN_TYPE)
current_df = ds.get("dataset")
if current_df is None or not isinstance(current_df, pd.DataFrame):
logger.error("Dataset is missing or not a dataframe")
continue
if join_type == constants.JOIN_TYPE_COLUMN:
sec_alias_name = ds_conf.get(constants.DATASET_ALIAS)
pri_alias_name = (
Expand All @@ -665,22 +670,26 @@ def _load_source_data(
# where_clause = ds.get("conf").get("where_clause")
primary_df = pd.merge(
primary_df,
current_df,
cast(pd.DataFrame, current_df),
left_on=primary_column,
right_on=join_column,
how="left",
)
elif join_type == constants.JOIN_TYPE_SEQUENTIAL:
primary_df = self._repeat_to_merge_sequentially(primary_df, current_df)
primary_df = self._repeat_to_merge_sequentially(
primary_df, cast(pd.DataFrame, current_df)
)
elif join_type == constants.JOIN_TYPE_CROSS:
primary_df = primary_df.merge(current_df, how="cross")
primary_df = primary_df.merge(cast(pd.DataFrame, current_df), how="cross")
elif join_type == constants.JOIN_TYPE_RANDOM:
primary_df = self._shuffle_and_extend(primary_df, current_df)
primary_df = self._shuffle_and_extend(
primary_df, cast(pd.DataFrame, current_df)
)
else:
logger.error("Not implemented join_type")

# now convert dataframe to list of dict (full_data)
full_data = primary_df.to_dict(orient="records")
full_data = cast(list[dict[str, Any]], primary_df.to_dict(orient="records"))
else:
logger.error("Unsupported source config type.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Avoid circular imports
from __future__ import annotations

import importlib
import pkgutil
from typing import TYPE_CHECKING, Dict, List, Type

from sygra.logger.logger_config import logger
Expand Down Expand Up @@ -42,6 +44,33 @@ class AggregatorMetricRegistry:

# Class-level storage (create singleton to have central control)
_metrics: Dict[str, Type[BaseAggregatorMetric]] = {}
_discovered: bool = False

@classmethod
def _ensure_discovered(cls) -> None:
if cls._discovered:
return

try:
import sygra.core.eval.metrics.aggregator_metrics as aggregator_metrics_pkg

for module_info in pkgutil.iter_modules(
aggregator_metrics_pkg.__path__, aggregator_metrics_pkg.__name__ + "."
):
module_name = module_info.name
if module_name.endswith(
(
".base_aggregator_metric",
".aggregator_metric_registry",
)
):
continue
importlib.import_module(module_name)

cls._discovered = True
except Exception as e:
logger.error(f"Failed to auto-discover aggregator metrics: {e}")
cls._discovered = True

@classmethod
def register(cls, name: str, metric_class: Type[BaseAggregatorMetric]) -> None:
Expand Down Expand Up @@ -105,6 +134,8 @@ def get_metric(cls, name: str, **kwargs) -> BaseAggregatorMetric:
# Get metric with custom parameters
topk = AggregatorMetricRegistry.get_metric("top_k_accuracy", k=5)
"""
cls._ensure_discovered()

if name not in cls._metrics:
available = cls.list_metrics()
raise KeyError(
Expand Down Expand Up @@ -135,6 +166,7 @@ def list_metrics(cls) -> List[str]:
AggregatorMetricRegistry.list_metrics()
['accuracy', 'confusion_matrix', 'f1', 'precision', 'recall']
"""
cls._ensure_discovered()
return sorted(cls._metrics.keys())

@classmethod
Expand All @@ -149,6 +181,7 @@ def has_metric(cls, name: str) -> bool:
if AggregatorMetricRegistry.has_metric("f1"):
metric = AggregatorMetricRegistry.get_metric("f1")
"""
cls._ensure_discovered()
return name in cls._metrics

@classmethod
Expand All @@ -163,6 +196,8 @@ def get_metric_class(cls, name: str) -> Type[BaseAggregatorMetric]:
Raises:
KeyError: If metric name is not registered
"""
cls._ensure_discovered()

if name not in cls._metrics:
available = cls.list_metrics()
raise KeyError(
Expand Down
43 changes: 20 additions & 23 deletions sygra/core/eval/metrics/aggregator_metrics/f1_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from typing import Any, Dict, List

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field

from sygra.core.eval.metrics.aggregator_metrics.aggregator_metric_registry import aggregator_metric
from sygra.core.eval.metrics.aggregator_metrics.base_aggregator_metric import BaseAggregatorMetric
Expand All @@ -23,14 +23,6 @@ class F1ScoreMetricConfig(BaseModel):

predicted_key: str = Field(..., min_length=1, description="Key in predicted dict to check")
golden_key: str = Field(..., min_length=1, description="Key in golden dict to check")
positive_class: Any = Field(..., description="Value representing positive class")

@field_validator("positive_class")
@classmethod
def validate_positive_class(cls, v):
if v is None:
raise ValueError("positive_class is required (cannot be None)")
return v


@aggregator_metric("f1_score")
Expand All @@ -43,7 +35,6 @@ class F1ScoreMetric(BaseAggregatorMetric):
Required configuration:
predicted_key: Key in predicted dict to check (e.g., "tool")
golden_key: Key in golden dict to check (e.g., "event")
positive_class: Value representing the positive class (e.g., "click")
"""

def __init__(self, **config):
Expand All @@ -60,15 +51,10 @@ def validate_config(self):
# Store validated fields as instance attributes
self.predicted_key = config_obj.predicted_key
self.golden_key = config_obj.golden_key
self.positive_class = config_obj.positive_class

# Create precision and recall metrics (reuse implementations)
self.precision_metric = PrecisionMetric(
predicted_key=self.predicted_key, positive_class=self.positive_class
)
self.recall_metric = RecallMetric(
golden_key=self.golden_key, positive_class=self.positive_class
)
self.precision_metric = PrecisionMetric(predicted_key=self.predicted_key)
self.recall_metric = RecallMetric(golden_key=self.golden_key)

def get_metadata(self) -> BaseMetricMetadata:
"""Return metadata for F1 score metric"""
Expand All @@ -93,16 +79,27 @@ def calculate(self, results: List[UnitMetricResult]) -> Dict[str, Any]:
"""
if not results:
logger.warning(f"{self.__class__.__name__}: No results provided")
return {"f1_score": 0.0}
return {"average_f1_score": 0.0, "f1_score_per_class": {}}

f1_score = dict()
# Reuse existing metric implementations
precision_result = self.precision_metric.calculate(results)
recall_result = self.recall_metric.calculate(results)

precision = precision_result.get("precision", 0.0)
recall = recall_result.get("recall", 0.0)

# Calculate F1 as harmonic mean of precision and recall
f1_score = self._safe_divide(2 * precision * recall, precision + recall)
average_precision = precision_result.get("average_precision", 0.0)
average_recall = recall_result.get("average_recall", 0.0)
average_f1_score = self._safe_divide(
2 * average_precision * average_recall, average_precision + average_recall
)

precision_classes = set(precision_result.get("precision_per_class", {}).keys())
recall_classes = set(recall_result.get("recall_per_class", {}).keys())
all_classes = precision_classes.union(recall_classes)

for class_ in all_classes:
precision = precision_result.get("precision_per_class", {}).get(class_, 0.0)
recall = recall_result.get("recall_per_class", {}).get(class_, 0.0)
f1_score[class_] = self._safe_divide(2 * precision * recall, precision + recall)

return {"f1_score": f1_score}
return {"average_f1_score": average_f1_score, "f1_score_per_class": f1_score}
77 changes: 50 additions & 27 deletions sygra/core/eval/metrics/aggregator_metrics/precision.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
Measures: Of all predicted positives, how many were actually positive?
"""

from typing import Any, Dict, List
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field

from sygra.core.eval.metrics.aggregator_metrics.aggregator_metric_registry import aggregator_metric
from sygra.core.eval.metrics.aggregator_metrics.base_aggregator_metric import BaseAggregatorMetric
Expand All @@ -20,14 +21,6 @@ class PrecisionMetricConfig(BaseModel):
"""Configuration for Precision Metric"""

predicted_key: str = Field(..., min_length=1, description="Key in predicted dict to check")
positive_class: Any = Field(..., description="Value representing positive class")

@field_validator("positive_class")
@classmethod
def validate_positive_class(cls, v):
if v is None:
raise ValueError("positive_class is required (cannot be None)")
return v


@aggregator_metric("precision")
Expand All @@ -39,12 +32,12 @@ class PrecisionMetric(BaseAggregatorMetric):

Required configuration:
predicted_key: Key in predicted dict to check (e.g., "tool")
positive_class: Value representing the positive class (e.g., "click")
"""

def __init__(self, **config):
"""Initialize precision metric with two-phase initialization."""
super().__init__(**config)
self.predicted_key = None
self.validate_config()
self.metadata = self.get_metadata()

Expand All @@ -55,7 +48,6 @@ def validate_config(self):

# Store validated fields as instance attributes
self.predicted_key = config_obj.predicted_key
self.positive_class = config_obj.positive_class

def get_metadata(self) -> BaseMetricMetadata:
"""Return metadata for precision metric"""
Expand All @@ -76,23 +68,54 @@ def calculate(self, results: List[UnitMetricResult]) -> Dict[str, Any]:
results: List of UnitMetricResult

Returns:
dict: {"precision": float (0.0 to 1.0)}
dict: {
"average_precision": float (0.0 to 1.0)
"precision_per_class": {
"class_1": float (0.0 to 1.0),
"class_2": float (0.0 to 1.0),
...
"class_n": float (0.0 to 1.0)
}
}
"""
if not results:
logger.warning(f"{self.__class__.__name__}: No results provided")
return {"precision": 0.0}

# Calculate TP and FP
tp = sum(
1
for r in results
if r.predicted.get(self.predicted_key) == self.positive_class and r.correct
)
fp = sum(
1
for r in results
if r.predicted.get(self.predicted_key) == self.positive_class and not r.correct
return {"average_precision": 0.0, "precision_per_class": {}}

predicted_count: DefaultDict[str, int] = defaultdict(int)
true_positive: DefaultDict[str, int] = defaultdict(int)

for r in results:
try:
predicted_key = self.predicted_key
if predicted_key is None:
logger.warning(f"{self.__class__.__name__}: predicted_key is not configured")
continue
label = r.predicted[predicted_key]
except KeyError:
logger.warning(
f"{self.__class__.__name__}: Missing predicted_key '{self.predicted_key}' in result"
)
continue

if not isinstance(label, str):
label = str(label)

predicted_count[label] += 1
if r.correct:
true_positive[label] += 1

precision_per_class = {
label: self._safe_divide(true_positive[label], count)
for label, count in predicted_count.items()
}

average_precision = self._safe_divide(
sum(precision_per_class.values()),
len(precision_per_class),
)

precision = self._safe_divide(tp, tp + fp)
return {"precision": precision}
return {
"average_precision": average_precision,
"precision_per_class": precision_per_class,
}
Loading