Source code for flow.core.experiment

"""Contains an experiment class for running simulations."""

import logging
import datetime
import numpy as np
import time
import os

from flow.core.util import emission_to_csv


[docs]class Experiment: """ Class for systematically running simulations in any supported simulator. This class acts as a runner for a scenario and environment. In order to use it to run an scenario and environment in the absence of a method specifying the actions of RL agents in the network, type the following: >>> from flow.envs import Env >>> env = Env(...) >>> exp = Experiment(env) # for some env and scenario >>> exp.run(num_runs=1, num_steps=1000) If you wish to specify the actions of RL agents in the network, this may be done as follows: >>> rl_actions = lambda state: 0 # replace with something appropriate >>> exp.run(num_runs=1, num_steps=1000, rl_actions=rl_actions) Finally, if you would like to like to plot and visualize your results, this class can generate csv files from emission files produced by sumo. These files will contain the speeds, positions, edges, etc... of every vehicle in the network at every time step. In order to ensure that the simulator constructs an emission file, set the ``emission_path`` attribute in ``SimParams`` to some path. >>> from flow.core.params import SimParams >>> sim_params = SimParams(emission_path="./data") Once you have included this in your environment, run your Experiment object as follows: >>> exp.run(num_runs=1, num_steps=1000, convert_to_csv=True) After the experiment is complete, look at the "./data" directory. There will be two files, one with the suffix .xml and another with the suffix .csv. The latter should be easily interpretable from any csv reader (e.g. Excel), and can be parsed using tools such as numpy and pandas. Attributes ---------- env : flow.envs.Env the environment object the simulator will run """ def __init__(self, env): """Instantiate Experiment.""" self.env = env logging.info(" Starting experiment {} at {}".format( env.scenario.name, str(datetime.datetime.utcnow()))) logging.info("Initializing environment.")
[docs] def run(self, num_runs, num_steps, rl_actions=None, convert_to_csv=False): """Run the given scenario for a set number of runs and steps per run. Parameters ---------- num_runs : int number of runs the experiment should perform num_steps : int number of steps to be performs in each run of the experiment rl_actions : method, optional maps states to actions to be performed by the RL agents (if there are any) convert_to_csv : bool Specifies whether to convert the emission file created by sumo into a csv file Returns ------- info_dict : dict contains returns, average speed per step """ # raise an error if convert_to_csv is set to True but no emission # file will be generated, to avoid getting an error at the end of the # simulation if convert_to_csv and self.env.sim_params.emission_path is None: raise ValueError( 'The experiment was run with convert_to_csv set ' 'to True, but no emission file will be generated. If you wish ' 'to generate an emission file, you should set the parameter ' 'emission_path in the simulation parameters (SumoParams or ' 'AimsunParams) to the path of the folder where emissions ' 'output should be generated. If you do not wish to generate ' 'emissions, set the convert_to_csv parameter to False.') info_dict = {} if rl_actions is None: def rl_actions(*_): return None rets = [] mean_rets = [] ret_lists = [] vels = [] mean_vels = [] std_vels = [] outflows = [] for i in range(num_runs): vel = np.zeros(num_steps) logging.info("Iter #" + str(i)) ret = 0 ret_list = [] state = self.env.reset() for j in range(num_steps): state, reward, done, _ = self.env.step(rl_actions(state)) vel[j] = np.mean( self.env.k.vehicle.get_speed(self.env.k.vehicle.get_ids())) ret += reward ret_list.append(reward) if done: break rets.append(ret) vels.append(vel) mean_rets.append(np.mean(ret_list)) ret_lists.append(ret_list) mean_vels.append(np.mean(vel)) std_vels.append(np.std(vel)) outflows.append(self.env.k.vehicle.get_outflow_rate(int(500))) print("Round {0}, return: {1}".format(i, ret)) info_dict["returns"] = rets info_dict["velocities"] = vels info_dict["mean_returns"] = mean_rets info_dict["per_step_returns"] = ret_lists info_dict["mean_outflows"] = np.mean(outflows) print("Average, std return: {}, {}".format( np.mean(rets), np.std(rets))) print("Average, std speed: {}, {}".format( np.mean(mean_vels), np.std(mean_vels))) self.env.terminate() if convert_to_csv: # wait a short period of time to ensure the xml file is readable time.sleep(0.1) # collect the location of the emission file dir_path = self.env.sim_params.emission_path emission_filename = \ "{0}-emission.xml".format(self.env.scenario.name) emission_path = os.path.join(dir_path, emission_filename) # convert the emission file into a csv emission_to_csv(emission_path) # Delete the .xml version of the emission file. os.remove(emission_path) return info_dict