runners

1.amp_policy_runner.py

Class: AmpPolicyRunner

Definition

class AmpPolicyRunner(OnPolicyRunner):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
  • Inheritance: AmpPolicyRunner extends OnPolicyRunner.
  • Purpose: It implements a full training loop runner for reinforcement learning.
  • Specialty: The prefix AMP (Adversarial Motion Priors) indicates that this runner is designed for imitation + RL hybrid training.

Core Method: learn

def learn(self, num_learning_iterations, init_at_random_ep_len=False, trial=None):

Parameters

  • num_learning_iterations: number of training iterations.
  • init_at_random_ep_len: whether to initialize episodes at random lengths (data augmentation).
  • trial: optional handle for hyperparameter tuning frameworks (Optuna, Ray Tune, etc.), used for reporting metrics.

Execution Flow

1.Writer & Observation Initialization

self.init_writter(init_at_random_ep_len)
obs, extras = self.env.get_observations()
critic_obs = extras["observations"].get("critic", obs)
  • Initializes log writer (TensorBoard, wandb, etc.).
  • Gets initial observations:
    • obs: actor observations.
    • critic_obs: critic privileged observations (may include more state info).

2.Switch to Training Mode

self.train_mode()
  • Ensures models run in training mode (dropout, batch norm, etc.).

3.Buffers Setup

ep_infos = []
rframebuffer = deque(maxlen=2000)
rewbuffer = deque(maxlen=100)
lenbuffer = deque(maxlen=100)
cur_reward_sum = torch.zeros(self.env.num_envs, ...)
cur_episode_length = torch.zeros(self.env.num_envs, ...)
  • ep_infos: stores episode statistics.
  • rframebuffer: reward frame buffer.
  • rewbuffer: episode rewards.
  • lenbuffer: episode lengths.
  • cur_reward_sum: cumulative reward per environment.
  • cur_episode_length: cumulative episode length.

4.Main Training Loop

for it in range(start_iter, tot_iter):
(1) Rollout Phase
with torch.inference_mode(self.cfg.get("inference_mode_rollout", True)):
    for i in range(self.num_steps_per_env):
        obs, critic_obs, rewards, dones, infos = self.rollout_step(obs, critic_obs)
  • Executes steps in the environment.
  • Collects (obs, critic_obs, rewards, dones, infos).
  • Updates cumulative rewards, lengths, and episode statistics.
(2) Learning Phase
self.alg.compute_returns(critic_obs)
losses, stats = self.alg.update(self.current_learning_iteration)
  • compute_returns: calculates returns/GAE.
  • update: updates policy and critic networks, returns losses and stats.
(3) Evaluation
self.evaluation()
if trial is not None:
    trial.report(self.metrics_velrmsd, self.current_learning_iteration)
  • Runs evaluation episodes.
  • Reports metrics (metrics_velrmsd, metrics_CoT) to hyperparameter search trial if provided.
(4) Logging & Saving
if self.log_dir is not None and self.current_learning_iteration % self.log_interval == 0:
    self.log(locals())
if self.current_learning_iteration % self.save_interval == 0:
    self.save(...)
  • Periodically logs training data.
  • Periodically saves checkpoints.
(5) Code State Archiving
if it == start_iter:
    git_file_paths = store_code_state(self.log_dir, self.git_status_repos)
  • Saves current git diff for reproducibility.
  • Can upload code snapshot to wandb/neptune.

5.Final Save

self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(self.current_learning_iteration)))
  • Saves the final model after training ends.

2.dagger_saver.py

Class: DaggerSaver

Definition

class DaggerSaver(DemonstrationSaver):
    """This demonstration saver will rollout the trajectory by running the student policy
    (with a probability) and label the trajectory by running the teacher policy."""
  • Inheritance: Extends DemonstrationSaver.
  • Purpose: Implements the DAgger (Dataset Aggregation) algorithm:
    • Rollouts may use the student policy (current trained policy).
    • Labels are always generated by the teacher policy (expert / ground truth).
  • Key Idea: Correct student actions with expert labels to improve imitation.

Constructor (init)

def __init__(..., 
             training_policy_logdir, 
             teacher_act_prob="exp", 
             update_times_scale=5000, 
             action_sample_std=0.0, 
             log_to_tensorboard=False, 
             **kwargs):

Parameters

  • training_policy_logdir: directory where the student policy is saved/loaded.
  • teacher_act_prob: probability of using the teacher’s action instead of student’s (can be function or string like "exp").
  • update_times_scale: scale factor for probability schedule.
  • action_sample_std: adds Gaussian noise to student actions (exploration).
  • log_to_tensorboard: whether to log rollout statistics.

Special Initialization

  • If log_to_tensorboard=True, creates a TensorBoard writer.
  • Wraps teacher_act_prob into a function using GET_PROB_FUNC if it’s a string.

Core Methods

1.init_traj_handlers

def init_traj_handlers(self):
    self.metadata["training_policy_logdir"] = ...
    self.build_training_policy()
  • Extends base trajectory handler.
  • Stores metadata and builds initial training (student) policy.

2. init_storage_buffer

def init_storage_buffer(self):
    self.rollout_episode_infos = []
  • Extends base buffer.
  • Adds storage for rollout episode info.

3. build_training_policy

def build_training_policy(self):
    with open(.../config.json) as f:
        config = json.load(f)
    training_policy = build_actor_critic(...)
    self.training_policy = training_policy
    self.training_policy_iteration = 0
  • Loads model config.
  • Builds a fresh student policy network.

4. load_latest_training_policy

def load_latest_training_policy(self):
    models = [file for file in os.listdir(self.training_policy_logdir) if 'model' in file]
    ...
    self.training_policy.load_state_dict(loaded_dict["model_state_dict"])
  • Finds the newest saved model checkpoint.
  • Loads weights into self.training_policy.
  • Updates internal iteration counter.
  • Randomly samples mask use_teacher_act_mask → determines which envs use teacher vs student actions.

5. get_transition

def get_transition(self):
    teacher_actions = self.get_policy_actions()
    actions = self.training_policy.act(self.obs)
    actions[self.use_teacher_act_mask] = teacher_actions[self.use_teacher_act_mask]
    n_obs, n_critic_obs, rewards, dones, infos = self.env.step(actions)
    return teacher_actions, rewards, dones, infos, n_obs, n_critic_obs
  • Mixes actions:
    • Student actions by default.
    • Replaced with teacher actions based on mask.
  • Teacher actions always label the trajectory.

6. add_transition

def add_transition(self, step_i, infos):
    if "episode" in infos:
        self.rollout_episode_infos.append(infos["episode"])
  • Saves episode info during rollouts.

7. policy_reset

def policy_reset(self, dones):
    if dones.any():
        self.training_policy.reset(dones)
  • Resets both teacher and student policies when episodes end.

8. check_stop

def check_stop(self):
    self.load_latest_training_policy()
    return super().check_stop()
  • Extends base stopping condition.
  • Ensures the latest student model is always loaded.

9. print_log

def print_log(self):
    for key in self.rollout_episode_infos[0].keys():
        ...
        self.tb_writer.add_scalar('Episode/' + key, value, self.training_policy_iteration)
  • Collects statistics from all rollout episodes.
  • Computes mean/max/min values.
  • Optionally logs them to TensorBoard.
  • Prints results in a table.
  • Increments student training iteration counter.

3.demonstration.py

General Overview

The DemonstrationSaver is a utility class designed to collect demonstration data from an environment using a given policy and save it into structured trajectory files. It handles trajectory segmentation, storage management, metadata tracking, and optional compression of observation segments.

This tool is particularly useful in imitation learning and offline reinforcement learning, where high-quality datasets of agent–environment interactions are required.

Key Components

Initialization

  • env → The simulation environment (must provide step, reset, get_observations, etc.).
  • policy → Any policy object supporting:
    • act(obs, critic_obs)
    • act_inference(obs, critic_obs)
    • reset(dones)
    • (optional) get_hidden_states() if recurrent.
  • save_dir → Directory to store demonstration data.
  • rollout_storage_length → Number of steps per rollout buffer.
  • min_timesteps & min_episodes → Stopping conditions.
  • success_traj_only → If True, only saves non-timeout terminated trajectories.
  • use_critic_obs → Determines whether to use privileged critic observations for action selection.
  • obs_disassemble_mapping → Mapping for compressing observation segments.
  • demo_by_sample → If True, sample actions stochastically; otherwise, use deterministic inference.

Trajectory Management

  • init_traj_handlers() Handles checkpointing of previously collected data. Ensures new runs can continue from existing trajectories.
  • update_traj_handler(env_i, step_slice) Updates trajectory index after an episode ends. Removes failed/timeout trajectories if required.
  • dump_to_file(env_i, step_slice) Saves a slice of trajectory into a .pickle file.
  • dump_metadata() Saves global metadata (timesteps, trajectories, configs).

Storage & Transition Handling

  • init_storage_buffer() Initializes a rollout buffer for storing transitions (RolloutStorage).
  • collect_step(step_i) Runs one step: queries policy for actions → steps environment → builds + adds transition.
  • save_steps() Dumps rollouts into files whenever the buffer fills. Splits by done signals.
  • wrap_up_trajectory(env_i, step_slice) Prepares trajectory dictionary for saving, including compression of observation components if specified.

Policy Interaction

  • get_policy_actions() Chooses actions based on:
    • critic vs actor obs
    • sampling vs deterministic inference
  • policy_reset(dones) Resets hidden states when environments finish.

Control Flow

  • check_stop() → Determines whether collection should stop based on thresholds.
  • collect_and_save(config=None) → Main loop to collect rollouts, save them, and log progress.
  • print_log() → Prints progress (timesteps, throughput).
  • close() & del() → Cleanup, removing empty directories and finalizing metadata.

Usage Notes

  • Use obs_disassemble_mapping if your observations contain large image-like tensors (e.g., {"forward_rgb": "normalized_image"}).
  • Set success_traj_only=True when generating expert datasets for imitation learning to avoid failed attempts.
  • Compatible with vectorized environments (env.num_envs > 1). Each environment gets its own trajectory folder.
  • Stores data in pickle format for easy reloading.
  • Metadata is always updated in metadata.json.

4.on_policy_runner.py

General Overview

OnPolicyRunner is a training manager for on-policy reinforcement learning algorithms (e.g., PPO, APPO, TPPO). It handles:

  • Environment interaction (rollouts, resets).
  • Algorithm initialization and storage setup.
  • Training loop execution (rollout → update → evaluation → logging).
  • Logging with multiple backends (TensorBoard, WandB, Neptune).
  • Model saving/loading and checkpoint management.

The class provides a full pipeline for training policies in simulated environments, from initialization to evaluation.

Class Breakdown

4.1 class OnPolicyRunner

Main class that orchestrates on-policy RL training.

Constructor

def __init__(self, env: VecEnv, train_cfg, log_dir=None, device="cpu")
  • env (VecEnv) → Vectorized environment wrapper.
  • train_cfg (dict) → Training configuration, containing algorithm, policy, and general hyperparameters.
  • log_dir (str, optional) → Path for logs and checkpoints.
  • device (str) → "cpu" or "cuda".

Responsibilities:

  • Initialize environment, algorithm, actor-critic model.
  • Configure rollout storage and observation normalization.
  • Prepare logging directories and git state tracking.

4.2 init_writer

def init_writter(self, init_at_random_ep_len: bool)
  • Initializes the logging backend (tensorboard, wandb, or neptune).
  • Optionally randomizes starting episode lengths for training diversity.

4.3 learn

def learn(self, num_learning_iterations, init_at_random_ep_len=False, trial=None)
  • Core training loop.
  • Steps:
    • Collect rollouts (rollout_step).
    • Compute returns.
    • Update policy (self.alg.update).
    • Evaluate and log metrics.
    • Save checkpoints periodically.

Returns:

  • Evaluation metrics (velrmsd, CoT).

4.4 evaluation

def evaluation(self)
  • Computes evaluation metrics from the environment:
    • tracking_error → RMSD of velocity.
    • CoT → Cost of Transport.

4.5 rollout_step

def rollout_step(self, obs, critic_obs, **kwargs)
  • Executes one environment step:
    • Selects actions from policy.
    • Advances environment.
    • Normalizes observations.
    • Passes step results to algorithm storage.

4.6log

def log(self, locs, width=80, pad=35)
  • Logs training statistics to console and configured writer.
  • Includes:
    • Loss values, reward statistics, episode length.
    • FPS, memory usage, learning rate.
    • Evaluation metrics (velrmsd, CoT).

4.7 save

def save(self, path: str, infos=None)
  • Saves model checkpoint:
    • Algorithm weights.
    • RND (Random Network Distillation) state if applicable.
    • Training iteration number.

4.8 load

def load(self, path, load_optimizer=True, map_location=None)
  • Loads model checkpoint.
  • Supports checkpoint manipulation (via ckpt_manipulator).
  • Restores iteration count and optional optimizer states.

4.9 get_inference_policy

def get_inference_policy(self, device=None)
  • Returns a policy function for inference.
  • Ensures normalization and device placement.

4.10 Training/Eval Mode Helpers

  • train_mode → Sets networks to training mode.
  • eval_mode → Sets networks to evaluation mode.

4.11 Git Tracking

def add_git_repo_to_log(self, repo_file_path)
  • Adds external repositories to the logging snapshot for reproducibility.

5.two_stage_runner.py

General Overview

TwoStageRunner extends OnPolicyRunner to add a two-stage training process:

  1. Pretraining Stage — the agent collects transitions from a fixed demonstration dataset (RolloutDataset).
  2. RL Stage — the agent switches back to the normal on-policy rollout with environment interaction.

This is typically used in imitation learning + reinforcement learning hybrid training, where demonstration data helps bootstrap learning before RL fine-tuning.

Class & Methods

5.1 init(...)

  • Calls the parent (OnPolicyRunner) constructor.
  • Loads configs related to pretraining:
    • pretrain_iterations → how many iterations should use demonstration data before switching to RL.
    • log_interval → logging frequency (default 50).
    • Requires pretrain_dataset in the config (assert check).
  • Initializes a RolloutDataset with:
    • Dataset config (**self.cfg["pretrain_dataset"])
    • Number of environments (self.env.num_envs)
    • Device (self.alg.device)

Key role: sets up dataset-driven pretraining.

5.2 rollout_step(obs, critic_obs)

Overrides the OnPolicyRunner.rollout_step.

Behavior:

  • If still in pretraining (current_learning_iteration < pretrain_iterations):

    • Fetch a batch from the demonstration dataset:

    • transition, infos = self.rollout_dataset.get_transition_batch()
      
    • Log performance stats from infos (except time_outs) under Perf/dataset_*.

    • If a transition exists:

      1. Feed it into the algorithm:

      2. self.alg.collect_transition_from_dataset(transition, infos)
        
      3. Return the transition fields: next obs, next privileged obs, reward, done, infos.
    • If no transition is available:

      1. Refresh observations from the environment (env.get_observations() / env.get_privileged_observations()).
  • If outside pretraining: Falls back to standard RL rollout:

return super().rollout_step(obs, critic_obs)

Key role: switches between dataset-driven rollouts (pretrain) and environment rollouts (RL).

Key Points

  • TwoStageRunner is compatible with OnPolicyRunner, only overriding rollout behavior.
  • Useful for demonstration-guided RL (AMP, DAgger, behavior cloning + RL).
  • RolloutDataset supplies pre-collected transitions instead of calling env.step().
  • Logging ensures dataset metrics are tracked alongside RL metrics.