Demonstrations#

We provide a command line tool to download demonstrations directly from our Hugging Face 🤗 dataset which is done by task ID. The tool will download the demonstration files to a folder and also a few demonstration videos visualizing what the demonstrations look like. See Demos for a list of all supported tasks that have demonstrations.

# Download the demonstration dataset for certain task
python -m mani_skill.utils.download_demo ${ENV_ID}
python -m mani_skill.utils.download_demo # with no args this prints all available datasets
# Download the full datasets (can be very slow)
python -m mani_skill.utils.download_demo all

Demo datasets are typically stored in a minimal format (e.g., no observation data) and store env states instead to compress them. We provide a flexible tool to replay demonstration datasets to modify them e.g. add visual observation data, record videos and more, see the trajectory replay documentation. If you want to generate the original compressed datasets yourself locally we save all scripts used for dataset generation in the data_generation folder. For users looking to benchmark imitation learning we strongly recommend following the instructions on the imitation learning setup page which details how to replay the compressed datasets for benchmarking training datasets.

Format#

All demonstrations for a task are saved in the HDF5 format openable by h5py. Each HDF5 dataset is named trajectory.{obs_mode}.{control_mode}.{sim_backend}.h5, and is associated with a JSON metadata file with the same base name. Unless otherwise specified, trajectory.h5 is short for trajectory.none.pd_joint_pos.physx_cpu.h5, which contains the original demonstrations generated by the pd_joint_pos controller with the none observation mode (empty observations) in the CPU based simulation. However, there may exist demonstrations generated by other controllers. Thus, please check the associated JSON to ensure which controller is used.

Meta Information (JSON)#

Each JSON file contains:

  • env_info (Dict): task (also known as environment) information, which can be used to initialize the task

    • env_id (str): task id

    • max_episode_steps (int)

    • env_kwargs (Dict): keyword arguments to initialize the task. Essential to recreate the environment.

  • episodes (List[Dict]): episode information

  • source_type (Optional[str]): a simple category string describing what process generated the trajectory data.

  • source_desc (Optional[str]): a longer explanation of how the data was generated.

The episode information (the element of episodes) includes:

  • episode_id (int): a unique id to index the episode

  • reset_kwargs (Dict): keyword arguments to reset the task. Essential to reproduce the trajectory.

  • control_mode (str): control mode used for the episode.

  • elapsed_steps (int): trajectory length

  • info (Dict): information at the end of the episode.

With just the meta data, you can usually reproduce the task the same way it was created when the trajectories were collected as so:

env = gym.make(env_info["env_id"], **env_info["env_kwargs"])
episode = env_info["episodes"][0] # picks the first
env.reset(**episode["reset_kwargs"])

Sometimes trajectory data is collected in GPU simulation which means the randomizations are dependent on the number of parallel environments in addition to seed. To ensure the same start state you can use the first environment state data stored in the trajectory and set the environment state accordingly.

Trajectory Data (HDF5)#

Each HDF5 demonstration dataset consists of multiple trajectories. The key of each trajectory is traj_{episode_id}, e.g., traj_0.

Each trajectory is an h5py.Group, which contains:

  • actions: [T, A], np.float32. T is the number of transitions.

  • terminated: [T], np.bool_. It indicates whether the task is terminated or not at each time step.

  • truncated: [T], np.bool_. It indicates whether the task is truncated or not at each time step.

  • env_states: [T+1, D], np.float32. Environment states. It can be used to set the environment to a certain state via env.set_state_dict. However, it may not be enough to reproduce the trajectory.

  • success (optional): [T], np.bool_. It indicates whether the task is successful at each time step. Included if task defines success.

  • fail (optional): [T], np.bool_. It indicates whether the task is in a failure state at each time step. Included if task defines failure.

  • obs (optional): [T+1, D] observations.

Note that env_states is in a dictionary form (and observations may be as well depending on obs_mode), where it is formatted as a dictionary of lists. For example, a typical environment state looks like this:

env_state = env.get_state_dict()
"""
env_state = {
  "actors": {
    "actor_id": [...numpy_actor_state...],
    ...
  },
  "articulations": {
    "articulation_id": [...numpy_articulation_state...],
    ...
  }
}
"""

In the trajectory file env_states will be the same structure but each value/leaf in the dictionary will be a sequence of states representing the state of that particular entity in the simulation over time.

In practice it may be more useful to use slices of the env_states data (or the observations data), which can be done with

import mani_skill.trajectory.utils as trajectory_utils
env_states = trajectory_utils.dict_to_list_of_dicts(env_states)
# now env_states[i] is the same as the data env.get_state_dict() returned at timestep i
i = 10
env_state_i = trajectory_utils.index_dict(env_states, i)
# now env_state_i is the same as the data env.get_state_dict() returned at timestep i

These tools are also used in the PyTorch Dataset implementation we provide which is explained in the next section

Loading Trajectory Datasets#

PyTorch#

We provide an example way to build a PyTorch Dataset and easily load the trajectory .h5 data at haosulab/ManiSkill. It is by no means super optimized but shows how to work with our data format flexibly. A copy of the code is pasted in the dropdown below.

dataset.py
from typing import Union
import h5py
import numpy as np
from torch.utils.data import Dataset
from tqdm import tqdm

from mani_skill.utils.io_utils import load_json
from mani_skill.utils import sapien_utils
from mani_skill.utils import common

# loads h5 data into memory for faster access
def load_h5_data(data):
    out = dict()
    for k in data.keys():
        if isinstance(data[k], h5py.Dataset):
            out[k] = data[k][:]
        else:
            out[k] = load_h5_data(data[k])
    return out

class ManiSkillTrajectoryDataset(Dataset):
    """
    A general torch Dataset you can drop in and use immediately with just about any trajectory .h5 data generated from ManiSkill.
    This class simply is a simple starter code to load trajectory data easily, but does not do any data transformation or anything
    advanced. We recommend you to copy this code directly and modify it for more advanced use cases

    Args:
        dataset_file (str): path to the .h5 file containing the data you want to load
        load_count (int): the number of trajectories from the dataset to load into memory. If -1, will load all into memory
        success_only (bool): whether to skip trajectories that are not successful in the end. Default is false
        device: The location to save data to. If None will store as numpy (the default), otherwise will move data to that device
    """

    def __init__(self, dataset_file: str, load_count=-1, success_only: bool = False, device = None) -> None:
        self.dataset_file = dataset_file
        self.device = device
        self.data = h5py.File(dataset_file, "r")
        json_path = dataset_file.replace(".h5", ".json")
        self.json_data = load_json(json_path)
        self.episodes = self.json_data["episodes"]
        self.env_info = self.json_data["env_info"]
        self.env_id = self.env_info["env_id"]
        self.env_kwargs = self.env_info["env_kwargs"]

        self.obs = None
        self.actions = []
        self.terminated = []
        self.truncated = []
        self.success, self.fail, self.rewards = None, None, None
        if load_count == -1:
            load_count = len(self.episodes)
        for eps_id in tqdm(range(load_count)):
            eps = self.episodes[eps_id]
            if success_only: 
                assert "success" in eps, "episodes in this dataset do not have the success attribute, cannot load dataset with success_only=True"
                if not eps["success"]:
                    continue
            trajectory = self.data[f"traj_{eps['episode_id']}"]
            trajectory = load_h5_data(trajectory)
            eps_len = len(trajectory["actions"])
            
            # exclude the final observation as most learning workflows do not use it
            obs = common.index_dict_array(trajectory["obs"], slice(eps_len))
            if eps_id == 0:
                self.obs = obs
            else:
                self.obs = common.append_dict_array(self.obs, obs)

            self.actions.append(trajectory["actions"])
            self.terminated.append(trajectory["terminated"])
            self.truncated.append(trajectory["truncated"])

            # handle data that might optionally be in the trajectory
            if "rewards" in trajectory:
                if self.rewards is None:
                    self.rewards = [trajectory["rewards"]]
                else:
                    self.rewards.append(trajectory["rewards"])
            if "success" in trajectory:
                if self.success is None:
                    self.success = [trajectory["success"]]
                else:
                    self.success.append(trajectory["success"])
            if "fail" in trajectory:
                if self.fail is None:
                    self.fail = [trajectory["fail"]]
                else:
                    self.fail.append(trajectory["fail"])

        self.actions = np.vstack(self.actions)
        self.terminated = np.concatenate(self.terminated)
        self.truncated = np.concatenate(self.truncated)
        
        if self.rewards is not None:
            self.rewards = np.concatenate(self.rewards)
        if self.success is not None:
            self.success = np.concatenate(self.success)
        if self.fail is not None:
            self.fail = np.concatenate(self.fail)

        def remove_np_uint16(x: Union[np.ndarray, dict]):
            if isinstance(x, dict):
                for k in x.keys():
                    x[k] = remove_np_uint16(x[k])
                return x
            else:
                if x.dtype == np.uint16:
                    return x.astype(np.int32)
                return x
        
        # uint16 dtype is used to conserve disk space and memory
        # you can optimize this dataset code to keep it as uint16 and process that
        # dtype of data yourself. for simplicity we simply cast to a int32 so
        # it can automatically be converted to torch tensors without complaint
        self.obs = remove_np_uint16(self.obs)

        if device is not None:
            self.actions = sapien_utils.to_tensor(self.actions, device=device)
            self.obs = sapien_utils.to_tensor(self.obs, device=device)
            self.terminated = sapien_utils.to_tensor(self.terminated, device=device)
            self.truncated = sapien_utils.to_tensor(self.truncated, device=device)
            if self.rewards is not None:
                self.rewards = sapien_utils.to_tensor(self.rewards, device=device)
            if self.success is not None:
                self.success = sapien_utils.to_tensor(self.terminated, device=device)
            if self.fail is not None:
                self.fail = sapien_utils.to_tensor(self.truncated, device=device)
    def __len__(self):
        return len(self.actions)


    def __getitem__(self, idx):
        action = sapien_utils.to_tensor(self.actions[idx], device=self.device)
        obs = common.index_dict_array(self.obs, idx, inplace=False)

        res = dict(
            obs=obs,
            action=action,
            terminated=self.terminated[idx],
            truncated=self.truncated[idx],
        )
        if self.rewards is not None:
            res.update(reward=self.rewards[idx])
        if self.success is not None:
            res.update(success=self.success[idx])
        if self.fail is not None:
            res.update(fail=self.fail[idx])
        return res