Datasets
Turn real agent traces into test data. Run your agent against it. Score every output. Your eval harness lives here.
Datasets are scoped to your workspace. Every call uses the workspace from st.init().
Create a Dataset
import staso as st
st.init(api_key="ak_...")
dataset = st.dataset.create(
name="Checkout flow tests",
description="Edge cases from production traces",
columns=[
{"name": "input", "type": "input"},
{"name": "expected_output", "type": "expected_output"},
],
)| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Human-readable name |
description | str | "" | Optional description |
columns | list[dict] | None | Column schema. Each dict needs name and type. |
folder_id | str | None | None | Place the dataset inside a folder |
Column Types
| Type | Purpose |
|---|---|
input | Agent input — prompt, user message, payload |
output | Agent output — the actual answer |
expected_output | Ground truth used for scoring |
expected_tool_calls | Expected tool invocations |
expected_steps | Expected reasoning steps |
scenario | Scenario description for agentic tests |
conversation_history | Prior turns for multi-turn tests |
files | File attachments |
variable | Arbitrary variable |
custom | Anything else |
Get, List, Update, Delete
dataset = st.dataset.get(dataset.id)
datasets = st.dataset.list(folder_id=None, limit=100, offset=0)
dataset = st.dataset.update(dataset.id, name="Checkout v2")
st.dataset.delete(dataset.id)Add Entries
Single entry:
entry = st.dataset.add_entry(
dataset_id=dataset.id,
data={
"input": "Cancel order #1234",
"expected_output": "Canceled. Refund in 3-5 days.",
},
split="test",
)Bulk:
entries = st.dataset.add_entries(
dataset_id=dataset.id,
entries=[
{"input": "...", "expected_output": "..."},
{"input": "...", "expected_output": "..."},
],
split="train",
)| Parameter | Type | Default | Description |
|---|---|---|---|
dataset_id | str | required | Target dataset |
data | dict[str, Any] | required | Column name → value |
split | "train" | "test" | "validation" | None | None | Split assignment |
source_trace_id | str | None | None | Trace this entry was curated from |
source_span_id | str | None | None | Span this entry was curated from |
Splits
Every entry can be tagged with a split — train, test, or validation. Pass split=... to any method that takes it. Use SplitType.TEST if you prefer the enum:
from staso.dataset import SplitType
st.dataset.add_entry(
dataset_id=dataset.id,
data={"input": "..."},
split=SplitType.TEST,
)List, Update, Delete Entries
entries = st.dataset.list_entries(dataset.id, split="test", limit=100)
st.dataset.update_entry(
dataset.id,
entry.id,
data={"expected_output": "Refund in 2-3 days."},
)
st.dataset.delete_entry(dataset.id, entry.id)update_entry merges the partial data dict into the existing row — you only send the columns that changed.
CSV Import and Export
count = st.dataset.upload_csv(dataset.id, "./tests.csv")
print(f"Imported {count} entries")
path = st.dataset.download_csv(dataset.id, "./exported.csv", split="test")Headers become column names. Columns that don't exist yet are added to the dataset automatically. Use split=... to round-trip a single split.
Curate From Traces
Turn real production conversations into a dataset:
dataset = st.dataset.from_traces(
name="Production edge cases",
trace_ids=["trace_abc", "trace_def"],
mapping={"Input": "input", "Output": "output"},
description="Spans that blew up in prod last week",
)Every span in the listed traces becomes a dataset entry. Staso extracts readable text from provider payloads — Anthropic content blocks, OpenAI messages, Claude Code prompts, tool calls — and drops it into the mapped columns. Thinking blocks are skipped.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Dataset name |
trace_ids | list[str] | required | Traces to pull spans from |
mapping | dict[str, str] | None | None | Column name → extractor key (see below) |
description | str | "" | Optional description |
Supported extractor keys: input, output, tool_calls, model, kind, span_name, status, trace_id, session_id, error_message, duration_ms, input_tokens, output_tokens, total_tokens.
If you pass no mapping, the dataset is created with the default template and columns are auto-mapped by name.
Folders
Organize datasets into folders. Folders are scoped to your workspace.
folder = st.dataset.create_folder(name="Checkout", parent_id=None)
nested = st.dataset.create_folder(name="Refunds", parent_id=folder.id)
folders = st.dataset.list_folders(parent_id=None)
st.dataset.delete_folder(folder.id)Pass folder_id=folder.id to st.dataset.create() to place a new dataset inside a folder.
Evaluate
Run your agent against a dataset. Every row gets wrapped in a trace, scored by your scorers, and the results are returned.
def my_agent(row: dict) -> str:
return call_claude(row["input"])
def exact_match(row: dict, output: str) -> float:
return 1.0 if output == row["expected_output"] else 0.0
summary = st.dataset.evaluate(
dataset_id=dataset.id,
fn=my_agent,
scorers=[exact_match],
split="test",
max_concurrency=4,
)
print(f"{summary.passed}/{summary.total} passed")
print(f"Avg exact_match: {summary.scores['exact_match']:.2%}")
for result in summary.results:
if not result.passed:
print(f"FAIL {result.entry_id}: {result.actual}")| Parameter | Type | Default | Description |
|---|---|---|---|
dataset_id | str | required | Target dataset |
fn | Callable[[dict], Any] | required | Your agent function. Receives the row data, returns the output. |
scorers | list[Callable[[dict, Any], float]] | None | Scoring functions. Each returns a float 0-1. |
split | "train" | "test" | "validation" | None | None | Run on a specific split |
max_concurrency | int | 1 | Parallel workers |
trace | bool | True | Wrap every call in st.span for tracing |
The returned EvalSummary has:
.total,.passed,.failed,.error_count.avg_duration_ms.scores— per-scorer averages.results— list ofEvalResult:entry_id,input_data,expected,actual,scores,passed,error,trace_id,duration_ms
Scorers are optional. Omit them if you just want to run your agent across the dataset and capture traces.
Generate Synthetic Entries
Seed a dataset from a prompt or existing rows:
new_entries = st.dataset.generate(
dataset_id=dataset.id,
count=20,
prompt="Generate checkout edge cases that stress test coupon logic",
)
# Or seed from existing rows
new_entries = st.dataset.generate(
dataset_id=dataset.id,
count=10,
seed_entry_ids=[entry_1.id, entry_2.id],
)Staso uses a high-reasoning model to produce entries that match your column schema. Counts against your monthly synthetic generation limit.
| Parameter | Type | Default | Description |
|---|---|---|---|
dataset_id | str | required | Target dataset |
count | int | 10 | How many entries to generate |
prompt | str | None | None | Generation prompt |
seed_entry_ids | list[str] | None | None | Existing entries to use as seeds |
Errors
Every dataset method raises StasoDatasetError on failure. The exception carries status_code and response_body for debugging:
from staso.dataset import StasoDatasetError
try:
st.dataset.add_entry(dataset_id="bad", data={"input": "hi"})
except StasoDatasetError as e:
if e.status_code == 402:
print("Plan limit reached")
elif e.status_code == 404:
print("Dataset not found")
else:
raiseNext
- Tracing — group spans before curating them into datasets
- Conversations — curate whole conversations at once
- Configuration — API keys, workspace setup