Source code for lagom.es

from abc import ABC
from abc import abstractmethod

from collections import namedtuple

import numpy as np

from lagom.transform import LinearSchedule


[docs]class BaseES(ABC): r"""Base class for all evolution strategies. .. note:: The optimization is treated as minimization. e.g. maximize rewards is equivalent to minimize negative rewards. .. note:: For painless parallelization, we highly recommend to use `concurrent.futures.ProcessPoolExecutor` with a few practical tips. * Set `max_workers` argument to control the max parallelization capacity. * When execution get stuck, try to use :class:`CloudpickleWrapper` to wrap the objective function e.g. particularly for lambda, class methods * Use `with ProcessPoolExecutor` once to wrap entire iterative ES generations. Because using this internally for each generation, it can slow down the parallelization dramatically due to overheads. * To reduce overheads further (e.g. PyTorch models, gym environments) * Recreate such models for each generation will be very expensive. * Use initializer function for ProcessPoolExecutor * Within initializer function, define PyTorch models and gym environments as global variables Note that the global variables are defined to each worker independently * Don't forget to use `with torch.no_grad` to increase forward pass speed. """
[docs] @abstractmethod def ask(self): r"""Sample a set of new candidate solutions. Returns ------- solutions : list sampled candidate solutions """ pass
[docs] @abstractmethod def tell(self, solutions, function_values): r"""Update the parameters of the population for a new generation based on the values of the objective function evaluated for sampled solutions. Args: solutions (list/ndarray): candidate solutions returned from :meth:`ask` function_values (list): a list of objective function values evaluated for the sampled solutions. """ pass
@property @abstractmethod def result(self): r"""Return a namedtuple of all results for the optimization. It contains: * xbest: best solution evaluated * fbest: objective function value of the best solution * evals_best: evaluation count when xbest was evaluated * evaluations: evaluations overall done * iterations: number of iterations * xfavorite: distribution mean in "phenotype" space, to be considered as current best estimate of the optimum * stds: effective standard deviations """ pass
[docs]class CMAES(BaseES): r"""Implements CMA-ES algorithm. .. note:: It is a wrapper of the `original CMA-ES implementation`_. Args: x0 (list): initial solution sigma0 (list): initial standard deviation opts (dict): a dictionary of options, e.g. ['popsize', 'seed'] .. _original CMA-ES implementation: https://github.com/CMA-ES/pycma """ def __init__(self, x0, sigma0, opts=None): import cma self.es = cma.CMAEvolutionStrategy(x0, sigma0, opts) self.x0 = self.es.x0 self.sigma0 = self.es.sigma0 self.popsize = self.es.popsize
[docs] def ask(self): return self.es.ask()
[docs] def tell(self, solutions, function_values): self.es.tell(solutions, function_values)
@property def result(self): return self.es.result
[docs]class CEM(BaseES): def __init__(self, x0, sigma0, opts=None): self.x0 = x0 self.sigma0 = sigma0 self.popsize = opts['popsize'] self.elite_ratio = opts['elite_ratio'] self.elite_size = max(1, int(self.elite_ratio*self.popsize)) self.seed = opts['seed'] if 'seed' in opts else np.random.randint(1, 2**32) self.np_random = np.random.RandomState(self.seed) self.noise_scheduler = LinearSchedule(*opts['noise_scheduler_args']) self.iter = 0 # initialize mean and std self.x = np.asarray(x0).astype(np.float32) self.shape = self.x.shape if np.isscalar(sigma0): self.sigma = np.full(self.shape, sigma0, dtype=np.float32) else: self.sigma = np.asarray(sigma0).astype(np.float32) self.xbest = None self.fbest = None
[docs] def ask(self): extra_noise = self.noise_scheduler(self.iter) sigma = np.sqrt(self.sigma**2 + extra_noise) solutions = self.np_random.normal(self.x, sigma, size=(self.popsize,) + self.shape) return solutions
[docs] def tell(self, solutions, function_values): solutions = np.asarray(solutions).astype(np.float32) elite_idx = np.argsort(function_values)[:self.elite_size] elite = solutions[elite_idx] self.x = elite.mean(axis=0) self.sigma = elite.std(axis=0) self.iter += 1 self.xbest = elite[0] self.fbest = function_values[elite_idx[0]]
@property def result(self): CEMResult = namedtuple('CEMResult', ['xbest', 'fbest', 'evals_best', 'evaluations', 'iterations', 'xfavorite', 'stds'], defaults=[None]*7) result = CEMResult(xbest=self.xbest, fbest=self.fbest, iterations=self.iter, xfavorite=self.x, stds=self.sigma) return result def __repr__(self): return f'CEM in dimension {len(self.x0)} (seed={self.seed})'