Skip to content

Architecture Overview

ML policies always output the next action. They have no awareness of joint limits, workspace boundaries, or hardware faults. The gap between what a policy wants and what is actually safe is invisible — unless something sits in the middle.

DAM is that layer. It intercepts every action, validates it against a configurable guard stack, and lets both the policy and the hardware driver stay unchanged.


High-Level Data Flow

┌─────────────────────┐
│  Observations       │  Sensor streams from hardware
└──────────┬──────────┘
┌─────────────────────────────────────┐
│  ML Policy / Controller             │  Generates proposed actions
│  (PyTorch, Diffusion, ACT, etc.)    │
└──────────┬──────────────────────────┘
           ▼ Proposed Action
┌─────────────────────────────────────┐
│  DAM Guard Stack (L0–L3)            │  Multi-layer safety filter
│  • L0: OOD Detection                │
│  • L1: Physical Kinematics          │  Decision:
│  • L2: Task Execution               │  PASS / CLAMP / REJECT
│  • L3: Hardware Health Monitor      │
└──────────┬──────────────────────────┘
           ▼ Validated Action
┌─────────────────────┐
│  Fallback Engine    │  Hold / Retreat / E-Stop
│  (if rejected)      │
└──────────┬──────────┘
┌─────────────────────────────────┐
│  Hardware Sinks                 │
│  • Motor controllers            │
│  • Gripper / end-effector       │
│  • Emergency stop circuits      │
└─────────────────────────────────┘

In practice, this means your policy code and hardware drivers don't change — DAM intercepts between them.


Design Principles

  1. Fail-to-Reject — any guard timeout, exception, or unexpected behavior results in immediate rejection
  2. Defense-in-Depth — safety is not a single check; it's four independent layers
  3. Configuration over Code — use YAML for 99% of deployments; Python for advanced tier-2/3 setups
  4. Modularity — swap hardware, policies, and safety rules independently
  5. Observability — every decision is auditable; violations are captured for post-incident analysis
  6. Predictable Runtime Path — keep logging, messaging, and guard work observable and bounded

System Components

1. Control Plane (Python)

The control plane coordinates lifecycle, Stackfile parsing, and hot-reload logic.

# High-level entry point
from dam.runtime.guard_runtime import GuardRuntime

runtime = GuardRuntime.from_stackfile("mystack.yaml")
runtime.register_source("main", my_hardware_source)
runtime.register_policy(my_policy)
runtime.register_sink(my_hardware_sink)

runtime.start_task("mytask")
for _ in range(n_cycles):
    result = runtime.step()

Responsibilities: - Parse and validate Stackfiles - Manage component lifecycle (sources, sinks, guards, policies) - Hot-reload boundary constraints without stopping the loop - Collect telemetry and dispatch to logging

The data plane runs the real-time critical path: observation assembly, guard evaluation, and action dispatch.

Rust Layer (runtime path)
├── ObservationBus       — Zero-copy observation multiplexing
├── ActionBus            — Proposed → validated action pipeline
├── WatchdogTimer        — Per-cycle timeout enforcement
├── RiskController       — Windowed risk aggregation (Phase 2)
└── Guard Evaluators     — Vectorized constraint checking

Why keep this path separate? - High-volume logging and messaging stay away from policy code. - The control loop has fewer Python runtime pauses to account for. - Hardware-oriented deployments get a clearer boundary between orchestration and hot-path work.

Falls back to pure-Python if Rust extension is not compiled.

3. Guard Stack (Python + optional Rust acceleration)

Four independent layers evaluate the proposed action in sequence.

Layer Responsibility Implementation
L0 Detect out-of-distribution observations Memory bank NN or Welford z-score
L1 Physical kinematics: joint limits, workspace, velocity & dynamics Boundary callbacks with clamp/reject aggregation
L2 Task execution: phase, semantic, and command compatibility Active task boundary callbacks
L3 Motor status, temperature, watchdogs Hardware sink health queries

Each guard is completely independent. You can enable/disable any layer in your Stackfile.

4. Boundary System (Configuration-driven)

Boundaries define the safety envelope active during a task. They are pure data — no code required.

boundaries:
  pick_and_place:
    layer: L2
    type: list
    nodes:
      - callback: task_workspace_bounds
        params:
          bounds: [[-0.35, 0.35], [-0.05, 0.45], [0.01, 0.40]]
        fallback: hold_position
        timeout_sec: 15.0
      - callback: task_joint_speed_limit
        params:
          max_speed: 0.08

Adapters & Integrations

DAM plugs into your hardware and policy via duck-typed adapters. If it quacks like a source/policy/sink, DAM will work with it.

Sources (Observations)

class MySource:
    def read(self) -> Observation:
        """Return current sensor state."""
        return Observation(...)

Policies (Action Proposals)

class MyPolicy:
    def step(self, obs: Observation) -> Action:
        """Propose an action given the observation."""
        return Action(...)

Sinks (Hardware)

class MySink:
    def write(self, action: Action) -> None:
        """Execute the action on hardware."""
        ...

    def health_check(self) -> HealthStatus:
        """Report hardware health for L3 guard."""
        return HealthStatus(...)

Built-in adapters: - LeRobot (SO-ARM101 / Koch v1.1) - ROS 2 (joint states, trajectories, TF) - Simulation (MuJoCo, PyBullet, etc.)


Stackfile: The Configuration Format

A Stackfile is a YAML file that wires together all components. No Python code required for tier-1 deployments.

version: "1"

hardware:
  preset: so101_follower
  sources:
    follower_arm:
      type: motor
      port: /dev/tty.usbmodem5AA90244141
  sinks:
    follower_command:
      ref: sources.follower_arm

policy:
  type: act
  pretrained_path: MikeChenYZ/act-soarm-fmb-v2

guards:
  - L1: motion
    phase: 0
  - L2: execution
    phase: 1
  - L3: hardware
    always: true

boundaries:
  joint_position_limits:
    layer: L1
    type: single
    nodes:
      - callback: joint_position_limits
        params:
          upper: [1.57, 1.57, 1.57, ...]
          lower: [-1.57, -1.57, -1.57, ...]

tasks:
  pick_and_place:
    boundaries: [joint_position_limits]

Runtime Modes

Managed Mode

DAM runs its own control loop at a fixed frequency.

runtime = GuardRuntime.from_stackfile("mystack.yaml")
runtime.start_task("mytask")
runtime.run()  # Blocks; runs at ~50 Hz until KeyboardInterrupt

Passive Mode (Default)

Your code drives the loop. DAM executes one cycle per step() call.

runtime = GuardRuntime.from_stackfile("mystack.yaml")
runtime.start_task("mytask")

for _ in range(1000):
    result = runtime.step()
    print(f"Risk: {result.risk_level}, Clamped: {result.was_clamped}")

Telemetry & Observability

Ring Buffer

Every cycle writes a CycleResult to an in-memory ring buffer. Latest N events are queryable via REST API.

MCAP Loopback

When a guard rejects or clamps, DAM captures: - ±30 seconds of observations - All intermediate guard decisions - Risk level timeline

Data is written in MCAP format — a standardized record format for robotics.

Services API

Real-time REST + WebSocket endpoints: - GET /api/telemetry/history — last N cycles - WS /ws/telemetry — live stream - GET /api/risk-log — historical queries - GET /api/risk-log/export/json — data export


Hot-Reload

Modify your Stackfile on disk → DAM reloads within ~500ms, no control loop interruption.

from dam.config.hot_reload import StackfileWatcher

watcher = StackfileWatcher(
    path="mystack.yaml",
    on_change=runtime.apply_pending_reload,
    poll_interval_s=0.5,
)
watcher.start()
# Edit mystack.yaml on disk → changes take effect mid-run

Only static configuration (guard limits, boundary constraints) reloads. Guard class structure and task definitions remain fixed during a run.


Next Steps