runners
1.amp_policy_runner.py
Class: AmpPolicyRunner
Definition
class AmpPolicyRunner(OnPolicyRunner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- Inheritance:
AmpPolicyRunnerextendsOnPolicyRunner. - Purpose: It implements a full training loop runner for reinforcement learning.
- Specialty: The prefix AMP (Adversarial Motion Priors) indicates that this runner is designed for imitation + RL hybrid training.
Core Method: learn
def learn(self, num_learning_iterations, init_at_random_ep_len=False, trial=None):
Parameters
num_learning_iterations: number of training iterations.init_at_random_ep_len: whether to initialize episodes at random lengths (data augmentation).trial: optional handle for hyperparameter tuning frameworks (Optuna, Ray Tune, etc.), used for reporting metrics.
Execution Flow
1.Writer & Observation Initialization
self.init_writter(init_at_random_ep_len)
obs, extras = self.env.get_observations()
critic_obs = extras["observations"].get("critic", obs)
- Initializes log writer (TensorBoard, wandb, etc.).
- Gets initial observations:
obs: actor observations.critic_obs: critic privileged observations (may include more state info).
2.Switch to Training Mode
self.train_mode()
- Ensures models run in training mode (dropout, batch norm, etc.).
3.Buffers Setup
ep_infos = []
rframebuffer = deque(maxlen=2000)
rewbuffer = deque(maxlen=100)
lenbuffer = deque(maxlen=100)
cur_reward_sum = torch.zeros(self.env.num_envs, ...)
cur_episode_length = torch.zeros(self.env.num_envs, ...)
ep_infos: stores episode statistics.rframebuffer: reward frame buffer.rewbuffer: episode rewards.lenbuffer: episode lengths.cur_reward_sum: cumulative reward per environment.cur_episode_length: cumulative episode length.
4.Main Training Loop
for it in range(start_iter, tot_iter):
(1) Rollout Phase
with torch.inference_mode(self.cfg.get("inference_mode_rollout", True)):
for i in range(self.num_steps_per_env):
obs, critic_obs, rewards, dones, infos = self.rollout_step(obs, critic_obs)
- Executes steps in the environment.
- Collects
(obs, critic_obs, rewards, dones, infos). - Updates cumulative rewards, lengths, and episode statistics.
(2) Learning Phase
self.alg.compute_returns(critic_obs)
losses, stats = self.alg.update(self.current_learning_iteration)
compute_returns: calculates returns/GAE.update: updates policy and critic networks, returns losses and stats.
(3) Evaluation
self.evaluation()
if trial is not None:
trial.report(self.metrics_velrmsd, self.current_learning_iteration)
- Runs evaluation episodes.
- Reports metrics (
metrics_velrmsd,metrics_CoT) to hyperparameter search trial if provided.
(4) Logging & Saving
if self.log_dir is not None and self.current_learning_iteration % self.log_interval == 0:
self.log(locals())
if self.current_learning_iteration % self.save_interval == 0:
self.save(...)
- Periodically logs training data.
- Periodically saves checkpoints.
(5) Code State Archiving
if it == start_iter:
git_file_paths = store_code_state(self.log_dir, self.git_status_repos)
- Saves current git diff for reproducibility.
- Can upload code snapshot to wandb/neptune.
5.Final Save
self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(self.current_learning_iteration)))
- Saves the final model after training ends.
2.dagger_saver.py
Class: DaggerSaver
Definition
class DaggerSaver(DemonstrationSaver):
"""This demonstration saver will rollout the trajectory by running the student policy
(with a probability) and label the trajectory by running the teacher policy."""
- Inheritance: Extends
DemonstrationSaver. - Purpose: Implements the DAgger (Dataset Aggregation) algorithm:
- Rollouts may use the student policy (current trained policy).
- Labels are always generated by the teacher policy (expert / ground truth).
- Key Idea: Correct student actions with expert labels to improve imitation.
Constructor (init)
def __init__(...,
training_policy_logdir,
teacher_act_prob="exp",
update_times_scale=5000,
action_sample_std=0.0,
log_to_tensorboard=False,
**kwargs):
Parameters
training_policy_logdir: directory where the student policy is saved/loaded.teacher_act_prob: probability of using the teacher’s action instead of student’s (can be function or string like"exp").update_times_scale: scale factor for probability schedule.action_sample_std: adds Gaussian noise to student actions (exploration).log_to_tensorboard: whether to log rollout statistics.
Special Initialization
- If
log_to_tensorboard=True, creates a TensorBoard writer. - Wraps
teacher_act_probinto a function usingGET_PROB_FUNCif it’s a string.
Core Methods
1.init_traj_handlers
def init_traj_handlers(self):
self.metadata["training_policy_logdir"] = ...
self.build_training_policy()
- Extends base trajectory handler.
- Stores metadata and builds initial training (student) policy.
2. init_storage_buffer
def init_storage_buffer(self):
self.rollout_episode_infos = []
- Extends base buffer.
- Adds storage for rollout episode info.
3. build_training_policy
def build_training_policy(self):
with open(.../config.json) as f:
config = json.load(f)
training_policy = build_actor_critic(...)
self.training_policy = training_policy
self.training_policy_iteration = 0
- Loads model config.
- Builds a fresh student policy network.
4. load_latest_training_policy
def load_latest_training_policy(self):
models = [file for file in os.listdir(self.training_policy_logdir) if 'model' in file]
...
self.training_policy.load_state_dict(loaded_dict["model_state_dict"])
- Finds the newest saved model checkpoint.
- Loads weights into
self.training_policy. - Updates internal iteration counter.
- Randomly samples mask
use_teacher_act_mask→ determines which envs use teacher vs student actions.
5. get_transition
def get_transition(self):
teacher_actions = self.get_policy_actions()
actions = self.training_policy.act(self.obs)
actions[self.use_teacher_act_mask] = teacher_actions[self.use_teacher_act_mask]
n_obs, n_critic_obs, rewards, dones, infos = self.env.step(actions)
return teacher_actions, rewards, dones, infos, n_obs, n_critic_obs
- Mixes actions:
- Student actions by default.
- Replaced with teacher actions based on mask.
- Teacher actions always label the trajectory.
6. add_transition
def add_transition(self, step_i, infos):
if "episode" in infos:
self.rollout_episode_infos.append(infos["episode"])
- Saves episode info during rollouts.
7. policy_reset
def policy_reset(self, dones):
if dones.any():
self.training_policy.reset(dones)
- Resets both teacher and student policies when episodes end.
8. check_stop
def check_stop(self):
self.load_latest_training_policy()
return super().check_stop()
- Extends base stopping condition.
- Ensures the latest student model is always loaded.
9. print_log
def print_log(self):
for key in self.rollout_episode_infos[0].keys():
...
self.tb_writer.add_scalar('Episode/' + key, value, self.training_policy_iteration)
- Collects statistics from all rollout episodes.
- Computes mean/max/min values.
- Optionally logs them to TensorBoard.
- Prints results in a table.
- Increments student training iteration counter.
3.demonstration.py
General Overview
The DemonstrationSaver is a utility class designed to collect demonstration data from an environment using a given policy and save it into structured trajectory files. It handles trajectory segmentation, storage management, metadata tracking, and optional compression of observation segments.
This tool is particularly useful in imitation learning and offline reinforcement learning, where high-quality datasets of agent–environment interactions are required.
Key Components
Initialization
env→ The simulation environment (must providestep,reset,get_observations, etc.).policy→ Any policy object supporting:act(obs, critic_obs)act_inference(obs, critic_obs)reset(dones)- (optional)
get_hidden_states()if recurrent.
save_dir→ Directory to store demonstration data.rollout_storage_length→ Number of steps per rollout buffer.min_timesteps&min_episodes→ Stopping conditions.success_traj_only→ IfTrue, only saves non-timeout terminated trajectories.use_critic_obs→ Determines whether to use privileged critic observations for action selection.obs_disassemble_mapping→ Mapping for compressing observation segments.demo_by_sample→ IfTrue, sample actions stochastically; otherwise, use deterministic inference.
Trajectory Management
init_traj_handlers()Handles checkpointing of previously collected data. Ensures new runs can continue from existing trajectories.update_traj_handler(env_i, step_slice)Updates trajectory index after an episode ends. Removes failed/timeout trajectories if required.dump_to_file(env_i, step_slice)Saves a slice of trajectory into a.picklefile.dump_metadata()Saves global metadata (timesteps,trajectories, configs).
Storage & Transition Handling
init_storage_buffer()Initializes a rollout buffer for storing transitions (RolloutStorage).collect_step(step_i)Runs one step: queries policy for actions → steps environment → builds + adds transition.save_steps()Dumps rollouts into files whenever the buffer fills. Splits bydonesignals.wrap_up_trajectory(env_i, step_slice)Prepares trajectory dictionary for saving, including compression of observation components if specified.
Policy Interaction
get_policy_actions()Chooses actions based on:- critic vs actor obs
- sampling vs deterministic inference
policy_reset(dones)Resets hidden states when environments finish.
Control Flow
check_stop()→ Determines whether collection should stop based on thresholds.collect_and_save(config=None)→ Main loop to collect rollouts, save them, and log progress.print_log()→ Prints progress (timesteps, throughput).close()&del()→ Cleanup, removing empty directories and finalizing metadata.
Usage Notes
- Use
obs_disassemble_mappingif your observations contain large image-like tensors (e.g.,{"forward_rgb": "normalized_image"}). - Set
success_traj_only=Truewhen generating expert datasets for imitation learning to avoid failed attempts. - Compatible with vectorized environments (
env.num_envs > 1). Each environment gets its own trajectory folder. - Stores data in pickle format for easy reloading.
- Metadata is always updated in
metadata.json.
4.on_policy_runner.py
General Overview
OnPolicyRunner is a training manager for on-policy reinforcement learning algorithms (e.g., PPO, APPO, TPPO). It handles:
- Environment interaction (rollouts, resets).
- Algorithm initialization and storage setup.
- Training loop execution (rollout → update → evaluation → logging).
- Logging with multiple backends (TensorBoard, WandB, Neptune).
- Model saving/loading and checkpoint management.
The class provides a full pipeline for training policies in simulated environments, from initialization to evaluation.
Class Breakdown
4.1 class OnPolicyRunner
Main class that orchestrates on-policy RL training.
Constructor
def __init__(self, env: VecEnv, train_cfg, log_dir=None, device="cpu")
- env (
VecEnv) → Vectorized environment wrapper. - train_cfg (dict) → Training configuration, containing
algorithm,policy, and general hyperparameters. - log_dir (str, optional) → Path for logs and checkpoints.
- device (str) →
"cpu"or"cuda".
Responsibilities:
- Initialize environment, algorithm, actor-critic model.
- Configure rollout storage and observation normalization.
- Prepare logging directories and git state tracking.
4.2 init_writer
def init_writter(self, init_at_random_ep_len: bool)
- Initializes the logging backend (
tensorboard,wandb, orneptune). - Optionally randomizes starting episode lengths for training diversity.
4.3 learn
def learn(self, num_learning_iterations, init_at_random_ep_len=False, trial=None)
- Core training loop.
- Steps:
- Collect rollouts (
rollout_step). - Compute returns.
- Update policy (
self.alg.update). - Evaluate and log metrics.
- Save checkpoints periodically.
- Collect rollouts (
Returns:
- Evaluation metrics (
velrmsd,CoT).
4.4 evaluation
def evaluation(self)
- Computes evaluation metrics from the environment:
tracking_error→ RMSD of velocity.CoT→ Cost of Transport.
4.5 rollout_step
def rollout_step(self, obs, critic_obs, **kwargs)
- Executes one environment step:
- Selects actions from policy.
- Advances environment.
- Normalizes observations.
- Passes step results to algorithm storage.
4.6log
def log(self, locs, width=80, pad=35)
- Logs training statistics to console and configured writer.
- Includes:
- Loss values, reward statistics, episode length.
- FPS, memory usage, learning rate.
- Evaluation metrics (
velrmsd,CoT).
4.7 save
def save(self, path: str, infos=None)
- Saves model checkpoint:
- Algorithm weights.
- RND (Random Network Distillation) state if applicable.
- Training iteration number.
4.8 load
def load(self, path, load_optimizer=True, map_location=None)
- Loads model checkpoint.
- Supports checkpoint manipulation (via
ckpt_manipulator). - Restores iteration count and optional optimizer states.
4.9 get_inference_policy
def get_inference_policy(self, device=None)
- Returns a policy function for inference.
- Ensures normalization and device placement.
4.10 Training/Eval Mode Helpers
train_mode→ Sets networks to training mode.eval_mode→ Sets networks to evaluation mode.
4.11 Git Tracking
def add_git_repo_to_log(self, repo_file_path)
- Adds external repositories to the logging snapshot for reproducibility.
5.two_stage_runner.py
General Overview
TwoStageRunner extends OnPolicyRunner to add a two-stage training process:
- Pretraining Stage — the agent collects transitions from a fixed demonstration dataset (
RolloutDataset). - RL Stage — the agent switches back to the normal on-policy rollout with environment interaction.
This is typically used in imitation learning + reinforcement learning hybrid training, where demonstration data helps bootstrap learning before RL fine-tuning.
Class & Methods
5.1 init(...)
- Calls the parent (
OnPolicyRunner) constructor. - Loads configs related to pretraining:
pretrain_iterations→ how many iterations should use demonstration data before switching to RL.log_interval→ logging frequency (default 50).- Requires
pretrain_datasetin the config (assertcheck).
- Initializes a
RolloutDatasetwith:- Dataset config (
**self.cfg["pretrain_dataset"]) - Number of environments (
self.env.num_envs) - Device (
self.alg.device)
- Dataset config (
Key role: sets up dataset-driven pretraining.
5.2 rollout_step(obs, critic_obs)
Overrides the OnPolicyRunner.rollout_step.
Behavior:
-
If still in pretraining (
current_learning_iteration < pretrain_iterations):-
Fetch a batch from the demonstration dataset:
-
transition, infos = self.rollout_dataset.get_transition_batch() -
Log performance stats from
infos(excepttime_outs) underPerf/dataset_*. -
If a
transitionexists:-
Feed it into the algorithm:
-
self.alg.collect_transition_from_dataset(transition, infos) - Return the transition fields: next obs, next privileged obs, reward, done, infos.
-
-
If no transition is available:
- Refresh observations from the environment (
env.get_observations()/env.get_privileged_observations()).
- Refresh observations from the environment (
-
-
If outside pretraining: Falls back to standard RL rollout:
return super().rollout_step(obs, critic_obs)
Key role: switches between dataset-driven rollouts (pretrain) and environment rollouts (RL).
Key Points
TwoStageRunneris compatible withOnPolicyRunner, only overriding rollout behavior.- Useful for demonstration-guided RL (AMP, DAgger, behavior cloning + RL).
RolloutDatasetsupplies pre-collected transitions instead of callingenv.step().- Logging ensures dataset metrics are tracked alongside RL metrics.