Source code for bulletarm.env_factory

'''
.. moduleauthor: Colin Kohler <github.com/ColinKohler>
'''

import copy
import numpy as np
import numpy.random as npr

from bulletarm.envs import env_fn
from bulletarm.planners.planner_factory import getPlannerFn

from bulletarm.runner import MultiRunner, SingleRunner

[docs]def getEnvFn(env_type): ''' Get the env creation function of the given type. Args: env_type (str): The type of environment to create Returns: function: A function which creates the env when run ''' if env_type in env_fn.CREATE_ENV_FNS: return env_fn.CREATE_ENV_FNS[env_type] else: raise ValueError('Invalid environment type passed to factory. No env for {}'.format(env_type))
[docs]def createEnvs(num_processes, env_type, env_config={}, planner_config={}): ''' Wrapper function to create either a single env the the main process or some number of envs each in their own seperate process. Args: num_processes (int): Number of envs to create env_type (str): The type of environment to create env_config (dict): Intialization arguments for the env planner_config (dict): Intialization arguments for the planner Returns: EnvRunner: SingleRunner or MultiRunner containing the environment ''' if num_processes == 0: return createSingleProcessEnv(env_type, env_config, planner_config) else: return createMultiprocessEnvs(num_processes, env_type, env_config, planner_config)
[docs]def createSingleProcessEnv(env_type, env_config={}, planner_config={}): ''' Create a single environment Args: env_type (str): The type of environment to create env_config (dict): Intialization arguments for the env planner_config (dict): Intialization arguments for the planner Returns: SingleRunner: SingleRunner containing the environment ''' # Check to make sure a seed is given and generate a random one if it is not if env_type == 'multi_task': for config in env_config: config['seed'] = config['seed'] if 'seed' in config else npr.randint(1000) else: env_config['seed'] = env_config['seed'] if 'seed' in env_config else npr.randint(1000) # Create the environment and planner if planner_config exists env_func = getEnvFn(env_type) env = env_func(env_config) planner = getPlannerFn(env_type, planner_config)(env) return SingleRunner(env, planner)
[docs]def createMultiprocessEnvs(num_processes, env_type, env_config={}, planner_config={}): ''' Create a number of environments on different processes to run in parralel Args: num_processes (int): Number of envs to create env_type (str): The type of environment to create env_config (dict): Intialization arguments for the env planner_config (dict): Intialization arguments for the planner Returns: MultiRunner: MultiRunner containing all environments ''' # Clone env config and set seeds for the different processes env_configs = [copy.deepcopy(env_config) for _ in range(num_processes)] for i, env_config in enumerate(env_configs): if env_type == 'multi_task': for config in env_config: config['seed'] = config['seed'] + i if 'seed' in config else npr.randint(1000) else: env_config['seed'] = env_config['seed'] + i if 'seed' in env_config else npr.randint(1000) # Create the various environments env_func = getEnvFn(env_type) def getEnv(c): def _thunk(): return env_func(c) return _thunk envs = [getEnv(env_configs[i]) for i in range(num_processes)] planners = [getPlannerFn(env_type, planner_config) for i in range(num_processes)] return MultiRunner(envs, planners)