Source code for lagom.envs.wrappers.vec_standardize_reward

import numpy as np

from lagom.transform import RunningMeanVar
from lagom.envs import VecEnvWrapper


[docs]class VecStandardizeReward(VecEnvWrapper): r"""Standardize the reward by running estimation of variance. .. warning:: We do not subtract running mean from reward but only divides it by running standard deviation. Because subtraction by mean will alter the reward shape so this might degrade the performance. Note that we perform this transformation from the second incoming reward while keeping first reward unchanged, otherwise it'll have too large magnitude (then just being clipped) due to the fact that we do not subtract it from mean. .. note:: Each :meth:`reset`, we do not clean up the ``self.all_returns`` buffer. Because of discount factor (:math:`< 1`), the running averages will converge after some iterations. Therefore, we do not allow discounted factor as :math:`1.0` since it will lead to unbounded explosion of reward running averages. Args: env (VecEnv): a vectorized environment clip (float): clipping range of standardized reward, i.e. [-clip, clip] gamma (float): discounted factor. Note that the value 1.0 should not be used. constant_var (ndarray): Constant variance to standardize reward. Note that when it is provided, then running average will be ignored. """ def __init__(self, env, clip=10., gamma=0.99, constant_var=None): super().__init__(env) self.clip = clip assert gamma > 0.0 and gamma < 1.0, 'we do not allow discounted factor as 1.0. See docstring for details. ' self.gamma = gamma self.constant_var = constant_var self.eps = 1e-8 if constant_var is None: self.online = True self.running_moments = RunningMeanVar(shape=()) else: self.online = False # Buffer to save discounted returns from each environment self.all_returns = np.zeros(len(env), dtype=np.float64)
[docs] def step(self, actions): observations, rewards, dones, infos = self.env.step(actions) rewards = self.process_reward(rewards) # Set discounted return buffer as zero for those episodes which terminate self.all_returns[dones] = 0.0 return observations, rewards, dones, infos
[docs] def reset(self): # Reset returns buffer, because all environments are also reset self.all_returns.fill(0.0) return super().reset()
def process_reward(self, rewards): # Do NOT subtract from mean, but only divided by std if self.online: self.all_returns = rewards + self.gamma*self.all_returns self.running_moments(self.all_returns) # first reward unchanged, otherwise too large magnitude due to no mean subtraction if self.running_moments.n >= 2: std = np.sqrt(self.running_moments.var + self.eps) rewards = rewards/std else: std = np.sqrt(self.constant_var + self.eps) rewards = rewards/std rewards = np.clip(rewards, -self.clip, self.clip) return rewards @property def var(self): return self.running_moments.var