Architecture Overview¶
ML policies always output the next action. They have no awareness of joint limits, workspace boundaries, or hardware faults. The gap between what a policy wants and what is actually safe is invisible — unless something sits in the middle.
DAM is that layer. It intercepts every action, validates it against a configurable guard stack, and lets both the policy and the hardware driver stay unchanged.
High-Level Data Flow¶
┌─────────────────────┐
│ Observations │ Sensor streams from hardware
└──────────┬──────────┘
│
▼
┌─────────────────────────────────────┐
│ ML Policy / Controller │ Generates proposed actions
│ (PyTorch, Diffusion, ACT, etc.) │
└──────────┬──────────────────────────┘
│
▼ Proposed Action
┌─────────────────────────────────────┐
│ DAM Guard Stack (L0–L3) │ Multi-layer safety filter
│ • L0: OOD Detection │
│ • L1: Physical Kinematics │ Decision:
│ • L2: Task Execution │ PASS / CLAMP / REJECT
│ • L3: Hardware Health Monitor │
└──────────┬──────────────────────────┘
│
▼ Validated Action
┌─────────────────────┐
│ Fallback Engine │ Hold / Retreat / E-Stop
│ (if rejected) │
└──────────┬──────────┘
│
▼
┌─────────────────────────────────┐
│ Hardware Sinks │
│ • Motor controllers │
│ • Gripper / end-effector │
│ • Emergency stop circuits │
└─────────────────────────────────┘
In practice, this means your policy code and hardware drivers don't change — DAM intercepts between them.
Design Principles¶
- Fail-to-Reject — any guard timeout, exception, or unexpected behavior results in immediate rejection
- Defense-in-Depth — safety is not a single check; it's four independent layers
- Configuration over Code — use YAML for 99% of deployments; Python for advanced tier-2/3 setups
- Modularity — swap hardware, policies, and safety rules independently
- Observability — every decision is auditable; violations are captured for post-incident analysis
- Predictable Runtime Path — keep logging, messaging, and guard work observable and bounded
System Components¶
1. Control Plane (Python)¶
The control plane coordinates lifecycle, Stackfile parsing, and hot-reload logic.
# High-level entry point
from dam.runtime.guard_runtime import GuardRuntime
runtime = GuardRuntime.from_stackfile("mystack.yaml")
runtime.register_source("main", my_hardware_source)
runtime.register_policy(my_policy)
runtime.register_sink(my_hardware_sink)
runtime.start_task("mytask")
for _ in range(n_cycles):
result = runtime.step()
Responsibilities: - Parse and validate Stackfiles - Manage component lifecycle (sources, sinks, guards, policies) - Hot-reload boundary constraints without stopping the loop - Collect telemetry and dispatch to logging
2. Data Plane (Rust – optional but recommended)¶
The data plane runs the real-time critical path: observation assembly, guard evaluation, and action dispatch.
Rust Layer (runtime path)
├── ObservationBus — Zero-copy observation multiplexing
├── ActionBus — Proposed → validated action pipeline
├── WatchdogTimer — Per-cycle timeout enforcement
├── RiskController — Windowed risk aggregation (Phase 2)
└── Guard Evaluators — Vectorized constraint checking
Why keep this path separate? - High-volume logging and messaging stay away from policy code. - The control loop has fewer Python runtime pauses to account for. - Hardware-oriented deployments get a clearer boundary between orchestration and hot-path work.
Falls back to pure-Python if Rust extension is not compiled.
3. Guard Stack (Python + optional Rust acceleration)¶
Four independent layers evaluate the proposed action in sequence.
| Layer | Responsibility | Implementation |
|---|---|---|
| L0 | Detect out-of-distribution observations | Memory bank NN or Welford z-score |
| L1 | Physical kinematics: joint limits, workspace, velocity & dynamics | Boundary callbacks with clamp/reject aggregation |
| L2 | Task execution: phase, semantic, and command compatibility | Active task boundary callbacks |
| L3 | Motor status, temperature, watchdogs | Hardware sink health queries |
Each guard is completely independent. You can enable/disable any layer in your Stackfile.
4. Boundary System (Configuration-driven)¶
Boundaries define the safety envelope active during a task. They are pure data — no code required.
boundaries:
pick_and_place:
layer: L2
type: list
nodes:
- callback: task_workspace_bounds
params:
bounds: [[-0.35, 0.35], [-0.05, 0.45], [0.01, 0.40]]
fallback: hold_position
timeout_sec: 15.0
- callback: task_joint_speed_limit
params:
max_speed: 0.08
Adapters & Integrations¶
DAM plugs into your hardware and policy via duck-typed adapters. If it quacks like a source/policy/sink, DAM will work with it.
Sources (Observations)¶
class MySource:
def read(self) -> Observation:
"""Return current sensor state."""
return Observation(...)
Policies (Action Proposals)¶
class MyPolicy:
def step(self, obs: Observation) -> Action:
"""Propose an action given the observation."""
return Action(...)
Sinks (Hardware)¶
class MySink:
def write(self, action: Action) -> None:
"""Execute the action on hardware."""
...
def health_check(self) -> HealthStatus:
"""Report hardware health for L3 guard."""
return HealthStatus(...)
Built-in adapters: - LeRobot (SO-ARM101 / Koch v1.1) - ROS 2 (joint states, trajectories, TF) - Simulation (MuJoCo, PyBullet, etc.)
Stackfile: The Configuration Format¶
A Stackfile is a YAML file that wires together all components. No Python code required for tier-1 deployments.
version: "1"
hardware:
preset: so101_follower
sources:
follower_arm:
type: motor
port: /dev/tty.usbmodem5AA90244141
sinks:
follower_command:
ref: sources.follower_arm
policy:
type: act
pretrained_path: MikeChenYZ/act-soarm-fmb-v2
guards:
- L1: motion
phase: 0
- L2: execution
phase: 1
- L3: hardware
always: true
boundaries:
joint_position_limits:
layer: L1
type: single
nodes:
- callback: joint_position_limits
params:
upper: [1.57, 1.57, 1.57, ...]
lower: [-1.57, -1.57, -1.57, ...]
tasks:
pick_and_place:
boundaries: [joint_position_limits]
Runtime Modes¶
Managed Mode¶
DAM runs its own control loop at a fixed frequency.
runtime = GuardRuntime.from_stackfile("mystack.yaml")
runtime.start_task("mytask")
runtime.run() # Blocks; runs at ~50 Hz until KeyboardInterrupt
Passive Mode (Default)¶
Your code drives the loop. DAM executes one cycle per step() call.
runtime = GuardRuntime.from_stackfile("mystack.yaml")
runtime.start_task("mytask")
for _ in range(1000):
result = runtime.step()
print(f"Risk: {result.risk_level}, Clamped: {result.was_clamped}")
Telemetry & Observability¶
Ring Buffer¶
Every cycle writes a CycleResult to an in-memory ring buffer. Latest N events are queryable via REST API.
MCAP Loopback¶
When a guard rejects or clamps, DAM captures: - ±30 seconds of observations - All intermediate guard decisions - Risk level timeline
Data is written in MCAP format — a standardized record format for robotics.
Services API¶
Real-time REST + WebSocket endpoints:
- GET /api/telemetry/history — last N cycles
- WS /ws/telemetry — live stream
- GET /api/risk-log — historical queries
- GET /api/risk-log/export/json — data export
Hot-Reload¶
Modify your Stackfile on disk → DAM reloads within ~500ms, no control loop interruption.
from dam.config.hot_reload import StackfileWatcher
watcher = StackfileWatcher(
path="mystack.yaml",
on_change=runtime.apply_pending_reload,
poll_interval_s=0.5,
)
watcher.start()
# Edit mystack.yaml on disk → changes take effect mid-run
Only static configuration (guard limits, boundary constraints) reloads. Guard class structure and task definitions remain fixed during a run.
Next Steps¶
- Understand the guards → Guard Stack Explained
- Learn about safety → Safety Model
- Configure boundaries → Boundary System
- Read a Stackfile → Stackfile Walkthrough