Source code for lagom.runner.trajectory

import numpy as np


[docs]class Trajectory(object): def __init__(self): self.observations = [] self.actions = [] self.rewards = [] self.step_infos = [] @property def completed(self): return len(self.step_infos) > 0 and self.step_infos[-1].last def add_observation(self, observation): assert not self.completed self.observations.append(observation) @property def numpy_observations(self): out = np.concatenate(self.observations, axis=0) return out @property def last_observation(self): return self.observations[-1] @property def reach_time_limit(self): return self.step_infos[-1].time_limit @property def reach_terminal(self): return self.step_infos[-1].terminal def add_action(self, action): assert not self.completed self.actions.append(action) @property def numpy_actions(self): return np.concatenate(self.actions, axis=0) def add_reward(self, reward): assert not self.completed self.rewards.append(reward) @property def numpy_rewards(self): return np.asarray(self.rewards) def add_step_info(self, step_info): assert not self.completed self.step_infos.append(step_info) if step_info.last: assert self.completed @property def numpy_dones(self): return np.asarray([step_info.done for step_info in self.step_infos]) @property def numpy_masks(self): return 1. - self.numpy_dones @property def infos(self): return [step_info.info for step_info in self.step_infos] def get_all_info(self, key): return [step_info[key] for step_info in self.step_infos] def __len__(self): return len(self.step_infos) def __repr__(self): return f'Trajectory({len(self)})'