Source code for bulletarm_baselines.logger.logger

'''
.. moduleauthor: Colin Kohler <github.com/ColinKohler>
'''

import os
import json
import time
import pickle
import torch
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.tensorboard import SummaryWriter

[docs]class Logger(object): ''' Logger class. Writes log data to tensorboard. Args: results_path (str): Path to save log files to num_eval_eps (int): Number of episodes in a evaluation iteration hyperparameters (dict): Hyperparameters to log. Defaults to None ''' def __init__(self, results_path, checkpoint_interval=500, num_eval_eps=100, hyperparameters=None): self.results_path = results_path self.writer = SummaryWriter(results_path) self.checkpoint_interval = checkpoint_interval self.log_counter = 0 self.scalar_logs = dict() # Training self.num_steps = 0 self.num_eps = 0 self.num_training_steps = 0 self.training_eps_rewards = list() self.loss = dict() self.current_episode_rewards = None # Evaluation self.num_eval_episodes = num_eval_eps # TODO: Dunno if I want this here self.num_eval_intervals = 0 self.eval_eps_rewards = [[]] self.eval_eps_dis_rewards = [[]] self.eval_mean_values = [[]] self.eval_eps_lens = [[]] # sub folders for saving the models and checkpoint self.models_dir = os.path.join(self.results_path, 'models') self.checkpoint_dir = os.path.join(self.results_path, 'checkpoint') os.makedirs(self.models_dir) os.makedirs(self.checkpoint_dir) if hyperparameters: hp_table = [ f'| {k} | {v} |' for k, v in hyperparameters.items() ] self.writer.add_text( 'Hyperparameters', '| Parameter | Value |\n|-------|-------|\n' + '\n'.join(hp_table) ) # TODO: I don't use this atm so tihs is untested.
[docs] def logStep(self, rewards, done_masks): ''' Log episode step. Args: rewards (list[float]): List of rewards done_masks (list[int]): ''' if self.current_episode_rewards is None: self.current_episode_rewards = [0 for _ in rewards] if self.current_episode_rewards and len(rewards) != len(self.current_episode_rewards): raise ValueError("Length of rewards different than was previously logged.") self.num_steps += len(rewards) self.num_eps += np.sum(done_masks) for i, (reward, done) in enumerate(zip(rewards, done_masks)): if done: self.training_eps_rewards.append(self.current_episode_rewards[i] + reward) else: self.current_episode_rewards[i] += reward
[docs] def logTrainingEpisode(self, rewards): ''' Log a episode. Args: rewards (list[float]: Rewards for the entire episode ''' self.num_steps += int(len(rewards)) self.num_eps += 1 self.training_eps_rewards.append(np.sum(rewards))
def logEvalInterval(self): self.num_eval_intervals += 1 self.eval_eps_rewards.append([]) self.eval_eps_dis_rewards.append([]) self.eval_mean_values.append([]) self.eval_eps_lens.append([])
[docs] def logEvalEpisode(self, rewards, values=None, discounted_return=None): ''' Log a evaluation episode. Args: rewards (list[float]: Rewards for the episode values (list[float]): Values for the episode discounted_return (list[float]): Discounted return of the episode ''' self.eval_eps_rewards[self.num_eval_intervals].append(np.sum(rewards)) self.eval_eps_lens[self.num_eval_intervals].append(int(len(rewards))) if values is not None: self.eval_mean_values[self.num_eval_intervals].append(np.mean(values)) if discounted_return is not None: self.eval_eps_dis_rewards[self.num_eval_intervals].append(discounted_return)
def logTrainingStep(self, loss): '''''' self.num_training_steps += 1 if type(loss) is list or type(loss) is tuple: loss = {'loss{}'.format(i): loss[i] for i in range(len(loss))} elif type(loss) is float: loss = {'loss': loss} for k, v in loss.items(): if k in self.loss.keys(): self.loss[k].append(v) else: self.loss[k] = [v]
[docs] def writeLog(self): ''' Write the logdir to the tensorboard summary writer. Calling this too often can slow down training. ''' # the eval list index at self.num_eval_intervals might be being updated, so log the eval list indexed # at self.num_eval_intervals-1 if self.num_eval_intervals > 1: self.writer.add_scalar('1.Evaluate/1.Reward', self.getAvg(self.eval_eps_rewards[self.num_eval_intervals-1], n=self.num_eval_episodes), self.num_eval_intervals-1) self.writer.add_scalar('1.Evaluate/2.Mean_value', self.getAvg(self.eval_mean_values[self.num_eval_intervals-1], n=self.num_eval_episodes), self.num_eval_intervals-1) self.writer.add_scalar('1.Evaluate/3.Eps_len', self.getAvg(self.eval_eps_lens[self.num_eval_intervals-1], n=self.num_eval_episodes), self.num_eval_intervals-1) # TODO: Do we want to allow custom windows here? self.writer.add_scalar('1.Evaluate/4.Learning_curve', self.getAvg(self.training_eps_rewards, n=100), len(self.training_eps_rewards)) self.writer.add_scalar('2.Data/1.Num_eps', self.num_eps, self.log_counter) self.writer.add_scalar('2.Data/2.Num_steps', self.num_steps, self.log_counter) self.writer.add_scalar('2.Data/3.Training_steps', self.num_training_steps, self.log_counter) self.writer.add_scalar('2.Data/4.Training_steps_per_eps_step_ratio', self.num_training_steps / max(1, self.num_steps), self.log_counter) if self.loss: for i, (k, v) in enumerate(self.loss.items()): self.writer.add_scalar('3.Loss/{}.{}_loss'.format(i+1, k), v[-1], self.log_counter) # TODO: I have not needed to test this yet. Can't imagine it doesn't work but still... for k, v in self.scalar_logs.items(): self.writer.add_scalar(k, v, self.log_counter) if self.num_training_steps > 0 and self.num_training_steps % self.checkpoint_interval == 0: self.exportData() self.log_counter += 1
def getSaveState(self): state = { 'num_steps': self.num_steps, 'num_eps': self.num_eps, 'num_training_steps': self.num_training_steps, 'training_eps_rewards': self.training_eps_rewards, 'loss': self.loss, 'num_eval_intervals': self.num_eval_intervals, 'eval_eps_rewards': self.eval_eps_rewards, 'eval_eps_dis_rewards': self.eval_eps_dis_rewards, 'eval_mean_values': self.eval_mean_values, 'eval_eps_lens': self.eval_eps_lens, } return state
[docs] def exportData(self): ''' Export log data as a pickle Args: filepath (str): The filepath to save the exported data to ''' pickle.dump( self.getSaveState(), open(os.path.join(self.results_path, 'log_data.pkl'), 'wb') )
[docs] def getScalars(self, keys): ''' Get data from the scalar log dict. Args: keys (str | list[str]): Key or list of keys to get from the scalar log dict Returns: Object | Dict: Single object when single key is passed or dict containing objects from all keys ''' if isinstance(keys, str): return self.scalar_logs[keys] elif isinstance(keys, list): return {key: self.scalar_logs[key] for key in keys} else: raise TypeError
[docs] def updateScalars(self, keys, values=None): ''' Update the scalar log dict with new values. Args: key (str | dict): Either key to update to the given value or a collection of key-value pairs to update value (Object): Value to update the key to in the log dir. Defaults to None ''' if isinstance(keys, str) and values is not None: self.scalar_logs[keys] = values elif isinstance(keys, dict): self.scalar_logs.update(keys) else: raise TypeError
[docs] def getAvg(self, l, n=0): ''' Numpy mean wrapper to handle empty lists. Args: l (list[float]): The list n (int): Number of trail elements to average over. Defaults to entire list. Returns: float: List average ''' avg = np.mean(l[-n:]) if l else 0 return avg
[docs] def saveParameters(self, parameters): ''' Save the parameters as a json file Args: parameters: parameter dict to save ''' class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() return json.JSONEncoder.default(self, obj) with open(os.path.join(self.results_path, "parameters.json"), 'w') as f: json.dump(parameters, f, cls=NumpyEncoder)
[docs] def saveCheckPoint(self, agent_save_state, buffer_save_state): ''' Save the checkpoint Args: agent_save_state (dict): the agent's save state for checkpointing buffer_save_state (dict): the buffer's save state for checkpointing ''' checkpoint = { 'agent': agent_save_state, 'buffer_state': buffer_save_state, 'logger': self.getSaveState(), 'torch_rng_state': torch.get_rng_state(), 'torch_cuda_rng_state': torch.cuda.get_rng_state(), 'np_rng_state': np.random.get_state() } torch.save(checkpoint, os.path.join(self.checkpoint_dir, 'checkpoint.pt'))
[docs] def loadCheckPoint(self, checkpoint_dir, agent_load_func, buffer_load_func): ''' Load the checkpoint Args: checkpoint_dir: the directory of the checkpoint to load agent_load_func (func): the agent's loading checkpoint function. agent_load_func must take a dict as input to load the agent's checkpoint buffer_load_func (func): the buffer's loading checkpoint function. buffer_load_func must take a dict as input to load the buffer's checkpoint ''' print('loading checkpoint') checkpoint = torch.load(os.path.join(checkpoint_dir, 'checkpoint.pt')) agent_load_func(checkpoint['agent']) buffer_load_func(checkpoint['buffer_state']) self.num_steps = checkpoint['logger']['num_steps'] self.num_eps = checkpoint['logger']['num_eps'] self.num_training_steps = checkpoint['logger']['num_training_steps'] self.training_eps_rewards = checkpoint['logger']['training_eps_rewards'] self.loss = checkpoint['logger']['loss'] self.num_eval_intervals = checkpoint['logger']['num_eval_intervals'] self.eval_eps_rewards = checkpoint['logger']['eval_eps_rewards'] self.eval_eps_dis_rewards = checkpoint['logger']['eval_eps_dis_rewards'] self.eval_mean_values = checkpoint['logger']['eval_mean_values'] self.eval_eps_lens = checkpoint['logger']['eval_eps_lens'] torch.set_rng_state(checkpoint['torch_rng_state']) torch.cuda.set_rng_state(checkpoint['torch_cuda_rng_state']) np.random.set_state(checkpoint['np_rng_state'])
[docs] def getCurrentLoss(self, n=100): ''' Calculate the average loss of previous n steps Args: n: the number of previous training steps to calculate the average loss Returns: the average loss value ''' avg_losses = [] for k, v in self.loss.items(): avg_losses.append(self.getAvg(v, n)) return np.mean(avg_losses)