Datasets
Evaluate
def exact_match(entry, output):
return float(output == entry["expected"])
def run_agent(entry):
return my_agent(entry["input"])
summary = st.dataset.evaluate(
ds.id,
run_agent,
scorers=[exact_match],
max_concurrency=4,
)
print(summary.total, summary.passed, summary.failed)Signature
st.dataset.evaluate(
dataset_id: str,
fn: Callable[[dict], Any],
*,
scorers: list[Callable[[dict, Any], float]] | None = None,
split: str | SplitType | None = None,
max_concurrency: int = 1,
trace: bool = True,
) -> EvalSummary| Parameter | Description |
|---|---|
fn | Your agent wrapper. Takes the entry's data dict, returns whatever your agent produced. |
scorers | Functions of (entry_data, output) -> float. Return 1.0 for pass, 0.0 for fail, or anything in between. |
split | Restrict to one SplitType. |
max_concurrency | Entries to run in parallel. Default 1. |
trace | Emit each entry run as a nested trace. Default True. |
Scorer
def contains_keyword(entry: dict, output) -> float:
return 1.0 if "refund" in str(output).lower() else 0.0The runner aggregates the mean per scorer into EvalSummary.scores.
EvalSummary
| Field | Type |
|---|---|
dataset_id | str |
dataset_name | str |
total | int |
passed | int — every scorer >= 0.5 |
failed | int — any scorer < 0.5 |
error_count | int — fn raised |
avg_duration_ms | float |
scores | dict[str, float] — mean per scorer |
results | tuple[EvalResult, ...] |
Each EvalResult carries entry_id, input_data, expected, actual, scores, passed, error, trace_id, duration_ms. With trace=True, trace_id links to the trace in the dashboard.