Task graphs
A task graph is a YAML chain of skill calls plus per-stage success predicates. New long-horizon tasks become 20-line YAML files — and per-stage success rates fall out for free.
What is a task graph?
A task graph is a declarative description of a multi-step manipulation task. Each stage names one atomic skill plus its target part; an optional success field gives the predicate that decides whether the stage achieved its goal. The list of stages forms the chain.
Task graphs replace one-off env classes for composite tasks. Need "grasp a cap, lift it, rotate it 90°, place it back"? That's a 20-line YAML — not 200 lines of Python.
YAML schema
| Field | Type | Required | Meaning |
|---|---|---|---|
name | string | yes | Graph identifier (used in results.json). |
description | string | no | Human-readable summary. |
stages | list | yes | Ordered list of stage entries. |
stages[].skill | string | yes | Skill name as registered in SKILL_REGISTRY. |
stages[].target | object | yes* | { object: <asset_id>, part: <part_name> }. May be omitted for continuation skills that inherit from the prior stage. |
stages[].params | object | no | Override the skill's default parameters. |
stages[].success | predicate | no | Predicate DSL expression. Defaults to "skill returned no error". |
stages[].stage_name | string | no | Label for results.json (defaults to skill name). |
Worked example
Grasp the cap of asset 100221 and lift it 5 cm:
name: grasp_and_lift_cap description: Engage the cap of a small jar and lift it clear. stages: - stage_name: engage skill: grasp_part target: { object: 100221, part: cap } success: grasped("cap") - stage_name: lift skill: pure_lift params: { height_m: 0.05 } success: and(grasped("cap"), lifted("cap", height_m: 0.05))
At evaluation time the stage names become the keys of the per_stage_success dictionary in results.json.
Stages & success predicates
Predicates are written in a small DSL — and / or / not combinators plus six atomic predicates:
grasped(part)— gripper closed on the named part.joint_value(part, op, value)— joint position satisfies the comparator.lifted(part, height_m=X)— part raised by at least X metres relative to its initial pose.pose_near(part, target_pose, tol)— part's pose within tolerance of a target.placed_in(a, b)— object a is supported by container b.stacked_on(a, b)— object a is resting on object b.
See Predicate DSL for the full grammar and the underlying compiler.
Validation
Before a graph runs, validate_task_graph(graph) walks the stage list once and checks four things:
- Every
stages[].skillexists inSKILL_REGISTRY. - Every target part exists in the target asset's
capabilities.json. - Every target part offers all the affordances the stage's skill requires.
- Phase ordering is sensible: a continuation skill is preceded by an interaction (or another continuation) on the same target.
Validation errors point at the offending stage by name and surface the specific affordance or registry miss. Most graph authoring mistakes are caught here, before the simulator boots.
Running a graph
From Python:
from utils.task_graph import load_task_graph, run_task_graph from utils.eval_setup import make_eval_env graph = load_task_graph("configs/grasp_and_lift_cap.yaml") env = make_eval_env(graph) result = run_task_graph(env, graph) print(result.per_stage_success) # {'engage': True, 'lift': False}
From the CLI, the same graph drives recording and evaluation alike:
$ python record.py --task-graph configs/grasp_and_lift_cap.yaml --trials 30 $ python core/policies/pi05/evaluate.py --task-graph configs/grasp_and_lift_cap.yaml --checkpoint ...