Retriever logo

Retriever: A Programming Framework for Closed-Loop Robot Agents

A programming framework for building closed-loop robot systems with explicit timing and data handoff, so perception, reasoning, and control can run together at mismatched rates.

1 Stanford University
2 MIT
3 Northeastern University
*Equal advising

Retriever is a programming framework for general-purpose robot agents. It supports closed-loop systems that combine slow VLM reasoning, medium-rate skill execution, and high-rate control in a single program — each module running on its own independent clock, from seconds down to milliseconds. Timing and data handoff are explicit parts of the program, not buried inside callbacks or middleware behavior.

To show what this looks like in practice, we begin with Retriever-0 — a concrete closed-loop agent we built and deployed with the framework.

Retriever-0: a closed-loop manipulation agent

Retriever-0 is a bimanual manipulation agent built entirely inside a single Retriever pipeline. It runs a VLM planner, an execution monitor, a VLA skill policy, and a joint controller in one coordinated closed loop — each module on its own clock, from seconds down to milliseconds.

Retriever-0 is evaluated on two families of long-horizon tasks that require combining partial observability, conditional replanning, and dexterous manipulation:

Retrieval from a deformable bag. The agent reaches into a cloth bag, locates a target object under uncertainty, and extracts it while continuously adapting its grasp strategy as the bag deforms under manipulation.

Spice search and seasoning. The agent searches a set of drawers for a target spice under partial observability, updates its belief based on what it finds, and completes a multi-step seasoning task using retrieved tools. The planner branches conditionally on inspection outcome, so the execution trace varies depending on which drawer holds the target.

A representative trace from the spice search task:

OpenDrawer(TL)InspectDrawer(TL)CloseDrawer(TL)OpenDrawer(TR)InspectDrawer(TR)Pick(spice)Season(food)CloseDrawer(TR)

These tasks require a VLM planner reasoning over seconds, a VLA skill policy running at ~2Hz, and a joint controller running at ~200Hz — all exchanging information inside one loop. The execution monitor watches skill progress and triggers replanning when needed; the VLA receives active skill commands and emits action chunks for the controller to execute. No module is isolated; the loop is closed.

Canonical multi-rate closed-loop robot agent pipeline with planning, skill, and control modules.

The Retriever-0 pipeline: slow VLM planning (with belief and memory), execution monitoring, medium-rate VLA skill execution, and high-rate control — all running simultaneously at mismatched clocks inside one program.

# Retriever-0 pipeline — adapted from the actual Python code (names simplified; port mappings omitted)
wrist_cams = WristCameraFlow() @ Rate(hz=30) # 4× wrist cameras (left/right, top/bottom)
head_cam = HeadCameraFlow() @ Rate(hz=30) # stereo head camera
goal = GoalFlow() @ Rate(hz=1)
belief = BeliefMemoryFlow() @ Trigger("inspection_done")
planner = VLMPlanFlow("gemini") @ Trigger("replan_request")
monitor = ExecMonitorFlow() @ Rate(hz=10)
vla = VLASkillFlow("pi05") @ Rate(hz=2)
ctrl = ControllerFlow() @ Rate(hz=200)
teleop = TeleopFlow() @ Rate(hz=100)
with Pipeline("Retriever-0") as pipe:
# goal → planning and monitoring
goal.then(planner, sync=Latest())
goal.then(monitor, sync=Latest())
# perception → belief → planning
wrist_cams.then(belief, sync=Latest())
belief.then(planner, sync=Latest())
head_cam.then(planner, sync=Latest())
# planning ↔ execution monitoring (replanning loop)
planner.then(monitor, sync=Latest()) # plan chunks
monitor.then(planner, sync=Latest()) # replan requests
# monitoring ↔ skill execution
monitor.then(vla, sync=Latest()) # active skill command
vla.then(monitor, sync=Latest()) # progress p ∈ [0,1]
# operator override
teleop.then(monitor, sync=Latest())
# skill → control (action chunking handled as sync policy)
wrist_cams.then(vla, sync=Latest())
vla.then(ctrl, sync=Latest())
# Message passing backend supports in-process, Python multi-processing, and Rust-based dora-rs
retriever.init(backend="dora")
pipe.run(duration=300.0)

Each module is a Flow — a Python class with a step() method — attached to a clock that says when it runs: @ Rate(hz=2) fires on a fixed schedule, @ Trigger(...) fires on an event. The pipe.then(a, b, sync=Latest()) calls wire the graph and declare how each edge samples upstream data at runtime. The rest of this page explains the design behind Retriever that makes this kind of pipeline possible.

Problem: Building general-purpose robot agents

General-purpose robot agents are rarely a single policy call. Long-horizon tasks under partial observability usually turn into a larger system: perception updates what the robot sees, memory tracks what matters, planning proposes what to do next, monitoring checks progress, and control keeps the robot moving safely in the loop.

Those pieces also do not run at the same speed. Fast control loops may run at hundreds of hertz, cameras and state estimation at tens of hertz, and large-model reasoning on the scale of seconds. That is not an edge case. It is the normal shape of a serious robot system.

The difficulty is that today’s stacks usually leave the timing contract implicit. Data arrives asynchronously, modules wake on different schedules, and the logic for buffering, handoff, and coordination gets buried in callback code, queue behavior, and runtime conventions. As these systems get more capable, that hidden coordination becomes a real source of engineering complexity.

Design choices

Retriever is built around a small set of design choices:

Flow is the concrete programming object that implements those choices.

Concrete design: Flow

Flow is the concrete programming object that implements Retriever’s design choices. It is analogous to nn.Module in PyTorch: a stateful Python class with typed inputs and outputs that you can instantiate, call directly, and compose with others. The key difference is that PyTorch has no notion of when a module should run or how long computation takes. Retriever adds exactly that — each Flow is attached to a clock that determines its schedule, and each edge carries a sync policy that determines how data is sampled across time boundaries.

Theoretical property: closure and the Asynchronous MDP

The overall Retriever design has a key theoretical property: closure. The pipeline as a whole is a Causal Stream Function (CSF) — a well-defined causal map from input history to output — just like each individual Flow. This holds regardless of how many modules the pipeline contains, how many different rates they run at, or whether the graph has feedback cycles.

Formally: a CSF maps an input stream to an output stream such that the output at any time t depends only on inputs up to t. A directed graph of CSFs is itself a CSF — including cyclic graphs and multi-rate pipelines. The entire Retriever-0 pipeline is one CSF with the same guarantees as each individual module.

Retriever also extends the standard discrete-step MDP to an Asynchronous MDP: the environment evolves continuously, and the agent reacts as a CSF with no global “next step” tick.

Four pieces work together to turn a collection of Flows into a running pipeline:

The runtime handles scheduling and alignment; the Flow only sees one aligned input per call:

from dataclasses import dataclass
from retriever import Flow, io
@io
@dataclass
class FlowInput:
observation: dict
memory: dict
goal: str
@io
@dataclass
class FlowOutput:
action: list[float]
score: float
class ExampleFlow(Flow[FlowInput, FlowOutput]):
def __init__(self, model):
self.model = model
def step(self, inp: FlowInput) -> FlowOutput:
action, score = self.model(inp.observation, inp.memory, inp.goal)
return FlowOutput(action=action, score=score)
flow = ExampleFlow(model)
out = flow(FlowInput(observation, memory, goal)) # direct Python call

A Flow is an ordinary Python object. The step() method runs synchronously over one aligned input; flow(inp) works without any runtime infrastructure. When deployed in a graph, the runtime decides when to call each Flow and which upstream snapshot to pass in — no async API required.

Why this should feel familiar

This is intentionally close to ordinary Python and familiar ML/control patterns: a stateful object, a typed input, a typed output, and one synchronous call. Retriever’s addition is not a new async authoring API; it is that schedules, synchronization, and feedback loops live in the graph around the object.

Same pattern, but as a VLA skill flow
@io
@dataclass
class VLAInput:
camera: CameraData
state: RobotState
skill_cmd: SkillCommand
class VLASkillFlow(Flow[VLAInput, ActionChunk]):
def __init__(self, processor, policy):
# simplified — the reference code loads processor/policy from model_name: str
self.processor = processor
self.policy = policy
self.last_chunk = None
def step(self, inp: VLAInput) -> ActionChunk:
model_inputs = self.processor.prepare(
image=inp.camera.rgb,
proprio=self._encode_state(inp.state),
instruction=inp.skill_cmd.text,
previous_chunk=self.last_chunk,
)
raw_actions = self.policy.sample_actions(model_inputs, horizon=8)
chunk = self.processor.decode_chunk(raw_actions, reference_state=inp.state)
self.last_chunk = chunk
return chunk

Sync policy

Synchronization policies over edges in a temporal dataflow graph.

Each edge declares how data is sampled before a Flow runs, replacing implicit “latest message” behavior with explicit rules such as Latest() and Window().

When a Flow wakes, the runtime samples each incoming stream with its declared policy, builds one aligned local input, and then calls the same class:

from retriever import Latest
from retriever.flow import Window
from retriever.flow.types import EventStream
camera_events: EventStream[CameraData]
state_events: EventStream[RobotState]
skill_events: EventStream[SkillCommand]
vla_inp = VLAInput(
camera=camera_events.sample(Latest(), now=t),
state=state_events.sample(Latest(), now=t),
skill_cmd=skill_events.sample(Latest(), now=t),
)
recent_frames = camera_events.sample(
Window(buffer_size=16, duration=0.5),
now=t,
)
next_chunk = vla_flow(vla_inp)

Common primitive policies:

How sync policies align inputs

For each edge uvu \to v, the policy σ\sigma looks at the upstream history available before that firing time and returns one snapshot value for Flow vv. The runtime applies σ\sigma independently on every incoming edge, assembles the aligned bundle, then calls step() exactly once.

Operational detail: lag, deadlines, and safe fallback

When compute cannot keep up, Retriever makes the fallback rule explicit instead of leaving it to queue timing:

This is the same design logic as sync policies: slow-to-fast and fast-to-slow boundaries need explicit contracts rather than middleware luck.

Temporal chunking

Temporal chunking showing slow producer outputs reused by fast consumers.

Slow producers emit time-extended chunks; fast consumers reuse the latest valid chunk on their own clock. Horizon > ~2× compute time enables full overlap.

Slow producers emit reusable chunks so faster consumers can stay on their own clock. In the Retriever-0 pipeline this appears in two ways:

  1. Action chunking (VLA → controller) — the VLA (2Hz) emits an ActionChunk so the controller (200Hz) can run without waiting for each policy inference.
  2. Plan chunking (planner → execution monitor) — the VLM planner proposes bounded-horizon plan chunks asynchronously; the monitor commits them only when skill progress p0.9p \ge 0.9 is sustained for ~0.5s, preventing mid-skill plan rewrites.
Plan chunking and progress-based handoff

The pipeline uses two additional mechanisms on top of temporal chunking:

Retriever vs. ROS2

ROS2 is the most familiar baseline for this kind of system. The difference becomes easiest to see once a slow VLM planner, a medium-rate skill policy, and a fast controller all have to live in the same loop.

Retriever still runs asynchronous, multi-rate systems. The difference is where that complexity lives: authors write ordinary step() logic over one aligned input, while the runtime handles clocks, buffering, and scheduling underneath. In ROS2, much more of that coordination leaks into user code through callbacks, worker threads, topic buffers, locks, and executor policy.

The practical differences:

RetrieverROS2
Authoring modelOrdinary step() code over one aligned inputAsync coordination logic is spread across callbacks, timers, and topic state
Variable latencyDeclared via clocks, triggers, and buffer policiesManual worker threads, queues, drop logic, and stale-data handling
Replay and debuggingIn-process runs can record the same aligned inputs the graph consumedBehavior depends on callback ordering, topic buffering, and executor timing
Illustrative code: Retriever vs. ROS2

The snippets below omit message definitions and model-loading boilerplate. They are here to show where timing, buffering, and launch logic end up.

from dataclasses import dataclass

from retriever import Flow, Pipeline, Rate, Latest, io
from retriever.flow import Trigger

# Omitted: message/model definitions and flow internals.
# The important point is where timing and alignment are declared.

@io
@dataclass
class VLAInput:
  camera: "CameraFrame"
  state: "RobotState"
  active_skill: "SkillCommand"

class PlannerFlow(Flow["BeliefState", "PlanChunk"]):
  def step(self, belief):
      return self.vlm_plan(belief)

class VLASkillFlow(Flow[VLAInput, "ActionChunk"]):
  def step(self, inp):
      return self.vla(inp.camera, inp.state, inp.active_skill)

with Pipeline("Agent") as pipe:
  cam = CameraSourceFlow() @ Rate(hz=30)
  belief = BeliefUpdaterFlow() @ Trigger("observation")
  planner = PlannerFlow("gemini") @ Trigger("state")
  monitor = ExecutionMonitorFlow() @ Trigger("executor_status")
  vla = VLASkillFlow("pi05") @ Rate(hz=2)
  ctrl = ControllerFlow() @ Rate(hz=200)

  # Alignment policy is in the graph, not hidden in callback order.
  pipe.connect(cam, belief, sync=Latest())
  pipe.connect(belief, planner, sync=Latest())
  pipe.connect(planner, monitor, sync=Latest())
  pipe.connect(cam, vla, sync=Latest())
  pipe.connect(monitor, vla, sync=Latest())
  pipe.connect(vla, ctrl, sync=Latest())

if __name__ == "__main__":
  pipe.run(backend="dora", duration=30.0)
Reference code: concrete VLA flow
@io
@dataclass
class VLAInput:
camera: CameraData
state: RobotState
skill_cmd: SkillCommand
class VLASkillFlow(Flow[VLAInput, ActionChunk]):
def __init__(self, model_name: str):
self.processor = load_vla_processor(model_name)
self.policy = load_vla_policy(model_name)
self.last_chunk = None
def reset(self):
self.last_chunk = None
def step(self, inp: VLAInput) -> ActionChunk:
model_inputs = self.processor.prepare(
image=inp.camera.rgb,
proprio=self._encode_state(inp.state),
instruction=inp.skill_cmd.text,
previous_chunk=self.last_chunk,
)
raw_actions = self.policy.sample_actions(model_inputs, horizon=8)
chunk = self.processor.decode_chunk(raw_actions, reference_state=inp.state)
self.last_chunk = chunk
return chunk
# Full pipeline with port mappings — showing how output fields route to input fields
wrist_cams = WristCameraFlow() @ Rate(hz=30)
head_cam = HeadCameraFlow() @ Rate(hz=30)
goal = GoalFlow() @ Rate(hz=1)
belief = BeliefMemoryFlow() @ Trigger("inspection_done")
planner = VLMPlanFlow("gemini") @ Trigger("replan_request")
monitor = ExecMonitorFlow() @ Rate(hz=10)
vla = VLASkillFlow("pi05") @ Rate(hz=2)
ctrl = ControllerFlow() @ Rate(hz=200)
teleop = TeleopFlow() @ Rate(hz=100)
with Pipeline("Retriever-0") as pipe:
goal.then(planner, map={"task": "task"}, sync=Latest())
goal.then(monitor, map={"task": "task"}, sync=Latest())
wrist_cams.then(belief, map={"frame": "frames"}, sync=Latest())
belief.then(planner, map={"belief": "state"}, sync=Latest())
head_cam.then(planner, map={"frame": "frame"}, sync=Latest())
planner.then(monitor, map={"result": "planner_result"}, sync=Latest())
monitor.then(planner, map={"replan_trigger": "context"}, sync=Latest())
monitor.then(vla, map={"current_skill": "prompt"}, sync=Latest())
vla.then(monitor, map={"feedback": "policy_feedback"}, sync=Latest())
wrist_cams.then(vla, map={"frame": "obs_dict"}, sync=Latest())
vla.then(ctrl, sync=Latest())
teleop.then(monitor, sync=Latest())
retriever.init(backend="dora")
pipe.run(duration=300.0)

One graph, many runtimes

Retriever separates graph authoring from execution:

  1. Definition (Python) — users write Flow classes, attach clocks and triggers, and declare how edges should sample upstream data.
  2. Compilation (IR) — the authored graph is validated, port mappings and adapters are resolved, and the result becomes a static representation of topology, clocks, and edge policies.
  3. Execution — a backend lowers that IR into workers, channels, buffers, and scheduling logic without changing the authored graph.

The authored graph stays stable while the execution mapping changes. In practice that means the same pipeline can be stepped in-process for debugging, run on local multiprocessing, or lowered onto Dora without rewriting Flow code.

Concretely, those three stages separate three different concerns:

The compiled graph becomes explicit runtime data: nodes, clocks, adapters, queue sizes, and topology all move out of callback order and into something inspectable.

Illustrative IR snippet
{
"version": "1.0.0",
"metadata": {
"name": "Agent",
"validated": true,
"optimized": false
},
"nodes": [
{
"id": "camera",
"type": "CameraSourceFlow",
"config": {"clock": {"Rate": {"hz": 30}}}
},
{
"id": "vla",
"type": "VLASkillFlow",
"config": {"clock": {"Rate": {"hz": 2}}}
}
],
"edges": [
{
"source": {"node": "camera", "port": "rgb"},
"destination": {"node": "vla", "port": "camera"},
"adapter": {"latest": {"buffer_size": 1}},
"qsize": 10
}
],
"topology": {
"node_count": 2,
"edge_count": 1,
"has_cycle": false
}
}
More runtime detail

The same authored graph can be lowered into several execution modes:

What changes between those modes is the execution mapping underneath the graph, not the authored graph itself. The Python Flow classes, the clocks, and the edgewise sync policies stay fixed; the runtime decides how to place and schedule them.

That is the point of the IR layer: it gives the runtime one explicit object to lower. Backends do not have to rediscover timing intent from callback order or infer data handoff conventions from user code; they receive node definitions, clock declarations, adapters, queue settings, and topology directly.

That separation is why Retriever can support:

Historical note: lineage and portability goals

Retriever sits in a long systems lineage of trying to separate what a program means from where and how it runs.

The portability goals behind Retriever are similar:

The hardest one is portability of meaning. If a slow planner, a medium-rate skill policy, and a fast controller all run together, then clocks, buffering, and snapshot rules have to be part of the semantic contract rather than left implicit.

What we tested

We show three kinds of evidence below: runtime overhead, replay behavior under timing stress, and path-sensitive dynamics where small timing changes can alter the executed computation itself.

Overhead

Message latency benchmark

Question: does explicit clocks-and-sync structure add much cost? Result: Retriever with a Dora backend stays close to native transport baselines across payload sizes.

Latency benchmark results across payload sizes.

End-to-end message latency versus payload size (log-log). Retriever with Dora backend stays close to native dora-rs.

Stress Test

Hybrid differentiable physics

Question: what if the executed path itself changes under tiny timing differences? This example makes replay sensitive by mixing continuous dynamics with discrete contact events.

Hybrid physics stress test for asynchronous determinism.

A bouncing-ball system with discrete impact events, used to stress trace sensitivity.

Replay

Path-gradient consistency

Question: does replay converge to one execution trace or drift across many? Event-time semantics collapse repeated runs onto one trace; arrival-time pub/sub spreads them apart.

Gradient histogram from asynchronous replay runs.

Event-time semantics collapse path gradients to a single trace; arrival-time semantics spread across multiple.

Conclusion

Retriever is a programming model and runtime for asynchronous, compositional robot agents. Clocks and edgewise synchronization policies are explicit parts of the program, so closed-loop pipelines stay modular across mismatched rates and runtimes. The experiments show that overhead stays low and replay is substantially more stable than arrival-time pub/sub.