Boundary System¶
A boundary is a named safety rule in your Stackfile. It tells DAM: "run this check, with these parameters, on this guard layer." You define boundaries once, attach them to tasks, and DAM enforces them every cycle.
boundaries:
joint_limits:
layer: L1
type: single
nodes:
- callback: joint_position_limits
params:
upper: [1.57, 1.57, 1.57, 1.57, 1.57, 0.08]
lower: [-1.57, -1.57, -1.57, -1.57, -1.57, 0.0]
tasks:
demo:
boundaries: [joint_limits]
This activates joint_position_limits on the L1 guard whenever the demo task runs. If the policy proposes a joint position outside the limits, L1 clamps it.
Anatomy of a Boundary Node¶
Each node has four fields:
| Field | Required | What it does |
|---|---|---|
callback |
yes | Name of a registered check (dam callbacks lists all 18) |
params |
yes | Parameters passed to the callback |
fallback |
no | What to do on rejection: hold_position, retreat, or emergency_stop (default: emergency_stop) |
timeout_sec |
no | Reject if this node stays active longer than N seconds |
The callback does the actual work. The boundary just names it and feeds it parameters.
18 Built-in Callbacks¶
Run make callbacks (or .venv/bin/dam callbacks) to see all of them. Grouped by layer:
L0 -- Out-of-Distribution
| Callback | Behavior |
|---|---|
ood_detector |
Scores observation against trained distribution. Rejects on high anomaly score. |
L1 -- Physical Kinematics
| Callback | Behavior |
|---|---|
joint_position_limits |
Clamps joints to [lower, upper] |
joint_velocity_limit |
Clamps velocities to ±max_velocities (scales all joints proportionally) |
workspace |
Rejects if end-effector leaves the workspace box |
cartesian_velocity_limit |
Rejects if end-effector linear/angular speed exceeds limit |
keep_out_zone |
Rejects if end-effector enters a forbidden box or sphere |
orientation_limit |
Rejects if end-effector tilt from reference axis exceeds limit |
base_geofence |
Rejects if mobile base leaves a geofence polygon |
check_joints_not_moving |
Rejects if any joint velocity exceeds a near-zero threshold |
check_velocity_smooth |
Rejects if velocity jerk exceeds threshold |
L2 -- Task Execution
| Callback | Behavior |
|---|---|
task_gripper_command_guard |
Clamps gripper commands incompatible with active task node |
L3 -- Hardware Monitoring
| Callback | Behavior |
|---|---|
temperature_limit |
Rejects if motor temperature exceeds threshold |
current_limit |
Rejects if motor current exceeds threshold |
voltage_limit |
Rejects if supply voltage outside safe band |
force_limit |
Rejects if force magnitude exceeds threshold |
check_force_torque_safe |
Rejects if force or torque exceeds thresholds |
hardware_watchdog |
Rejects if observations go stale |
host_health_limit |
Rejects if host CPU/GPU/memory/temperature crosses limits |
Key distinction: L1 callbacks that have a well-defined correction (position, velocity) clamp the action. Callbacks where no clean correction exists (workspace, keep-out, geofence) reject.
Container Types¶
A boundary has a type that controls how nodes are activated:
single -- One node, always active¶
boundaries:
workspace:
layer: L1
type: single
nodes:
- callback: workspace
params:
bounds: [[-0.35, 0.35], [-0.05, 0.45], [0.01, 0.40]]
fallback: hold_position
Use for: always-on limits (joint bounds, workspace, hardware watchdog).
list -- Sequential phases¶
Multiple nodes, advanced explicitly. Use loop: true to wrap back to node 0.
boundaries:
pick_and_place:
layer: L2
type: list
loop: false
nodes:
- callback: task_workspace_bounds
params:
bounds: [[-0.35, 0.35], [-0.05, 0.45], [0.01, 0.40]]
fallback: hold_position
timeout_sec: 15.0
- callback: task_workspace_bounds
params:
bounds: [[-0.10, 0.10], [0.10, 0.30], [0.02, 0.20]]
fallback: hold_position
timeout_sec: 8.0
Advance in code:
runtime.start_task("pick_and_place") # starts at node 0
runtime.advance_container("pick_and_place") # move to node 1
Use for: multi-phase tasks (reach, grasp, lift, place).
graph -- Arbitrary transitions (Python only)¶
Nodes connected by conditional edges with priorities. Cannot be fully declared in YAML -- requires Python setup.
from dam.boundary.graph import GraphContainer, Transition
graph = GraphContainer(
nodes={"normal": node_a, "recovery": node_b, "shutdown": node_c},
transitions={
"normal": [Transition(to_node="recovery", condition=fault_detected)],
"recovery": [Transition(to_node="shutdown", condition=unrecoverable)],
},
initial_node_id="normal",
)
Use for: error recovery, state-machine tasks.
Tasks Activate Boundaries¶
A task references one or more boundaries by name. All referenced boundaries are checked every cycle. If any rejects, the action is rejected.
boundaries:
global_workspace:
layer: L1
type: single
nodes:
- callback: workspace
params:
bounds: [[-0.5, 0.5], [-0.2, 0.6], [-0.1, 1.5]]
fallback: emergency_stop
task_speed:
layer: L2
type: single
nodes:
- callback: task_joint_speed_limit
params:
max_speed: 0.3
fallback: hold_position
tasks:
pick_and_place:
boundaries: [global_workspace, task_speed]
Pattern: global L1 boundaries for hard physical limits + task-scoped L2 boundaries for task-specific constraints.
Writing Custom Callbacks¶
When the 18 built-ins don't cover your check, register a custom callback:
from dam.boundary.callbacks._registry import boundary_callback
from dam.guard.pipeline import CallbackResult
from dam.types.observation import Observation
@boundary_callback(
name="force_limited_grasp",
layer="L2",
description="Rejects when contact force exceeds the configured limit.",
params={"max_force_n": "Maximum allowed contact force in newtons."},
)
def force_limited_grasp(*, obs: Observation, max_force_n: float = 30.0) -> CallbackResult:
force_norm = obs.metadata.get("force_norm")
if force_norm is None or float(force_norm) <= max_force_n:
return CallbackResult.ok("force_limited_grasp")
return CallbackResult.violate("force_limited_grasp", "force limit exceeded")
Then use it in a Stackfile like any built-in:
boundaries:
grasp_check:
layer: L2
type: single
nodes:
- callback: force_limited_grasp
params:
max_force_n: 30.0
fallback: hold_position
Rules for custom callbacks:
- All user params come via
**kwargs; runtime injectsobs,action,dtautomatically - Return
CallbackResult.ok(name)orCallbackResult.violate(name, reason) - For clamp-style corrections, return
CallbackResult.clamp(name, validated_action) - Keep execution under 5 ms -- complex logic belongs in the policy
Seeing Boundaries in Action¶
You don't need to read logs to understand what your boundaries are doing. The DAM Console (http://localhost:3000) visualizes boundary activity in real time:
- Per-boundary pass/clamp/reject counts and rates
- Which guard layer triggered and by how much the action was adjusted
- Latency breakdown per cycle
- Risk state timeline
Open the console alongside your run -- it's the fastest way to tell whether your boundaries are too tight, too loose, or just right.
For post-session analysis, MCAP logs capture +/-30 seconds of context around every violation. See Console Walkthrough for a guided tour.
Debugging from CLI¶
.venv/bin/dam validate mystack.yaml # catch schema errors early
make callbacks # verify your callback is registered
In code:
result = runtime.step()
if result.was_rejected:
print(result.rejecting_guard, result.decision_reason)
Tips¶
| Do | Why |
|---|---|
| Start tight, loosen incrementally | Easier to relax than debug a crash |
| Combine L1 global + L2 task boundaries | Defense in depth |
| Test fallbacks in sim before hardware | Verify hold/retreat/e-stop actually work |
| Watch clamp rates, not just rejections | Clamps tell you where the policy pushes limits |
| Version your Stackfiles | Track which boundaries worked for which tasks |
Next Steps¶
- Build your own callback -- Boundary Callbacks walks through the full callback API, parameter reference, and step-by-step guide to implementing and registering a custom check
- Common Stackfile Edits -- quick config recipes
- Guard Stack Explained -- how guards evaluate boundaries
- Console Walkthrough -- read boundary activity in the dashboard