Source code for flow.utils.rllib

"""
Utility functions for Flow compatibility with RLlib.

This includes: environment generation, serialization, and visualization.
"""
import json
from copy import deepcopy
import os

from flow.core.params import SumoLaneChangeParams, SumoCarFollowingParams, \
    SumoParams, InitialConfig, EnvParams, NetParams, InFlows
from flow.core.params import TrafficLightParams
from flow.core.params import VehicleParams

from ray.cloudpickle import cloudpickle


[docs]class FlowParamsEncoder(json.JSONEncoder): """ Custom encoder used to generate ``flow_params.json``. Extends ``json.JSONEncoder``. """
[docs] def default(self, obj): """See parent class. Extended to support the VehicleParams object in flow/core/params.py. """ allowed_types = [dict, list, tuple, str, int, float, bool, type(None)] if obj not in allowed_types: if isinstance(obj, VehicleParams): res = deepcopy(obj.initial) for res_i in res: res_i["acceleration_controller"] = \ (res_i["acceleration_controller"][0].__name__, res_i["acceleration_controller"][1]) res_i["lane_change_controller"] = \ (res_i["lane_change_controller"][0].__name__, res_i["lane_change_controller"][1]) if res_i["routing_controller"] is not None: res_i["routing_controller"] = \ (res_i["routing_controller"][0].__name__, res_i["routing_controller"][1]) return res if hasattr(obj, '__name__'): return obj.__name__ else: return obj.__dict__ return json.JSONEncoder.default(self, obj)
[docs]def get_flow_params(config): """Return Flow experiment parameters, given an experiment result folder. Parameters ---------- config : dict < dict > or str May be one of two things: * If it is a dict, then it is the stored RLlib configuration dict. * If it is a string, then it is the path to a flow_params json file. Returns ------- dict flow-related parameters, consisting of the following keys: * exp_tag: name of the experiment * env_name: name of the flow environment the experiment is running on * scenario: name of the scenario class the experiment uses * simulator: simulator that is used by the experiment (e.g. aimsun) * sim: simulation-related parameters (see flow.core.params.SimParams) * env: environment related parameters (see flow.core.params.EnvParams) * net: network-related parameters (see flow.core.params.NetParams and the scenario's documentation or ADDITIONAL_NET_PARAMS component) * veh: vehicles to be placed in the network at the start of a rollout (see flow.core.params.VehicleParams) * initial: parameters affecting the positioning of vehicles upon initialization/reset (see flow.core.params.InitialConfig) * tls: traffic lights to be introduced to specific nodes (see flow.core.params.TrafficLightParams) """ # collect all data from the json file if type(config) == dict: flow_params = json.loads(config['env_config']['flow_params']) else: flow_params = json.load(open(config, 'r')) # reinitialize the vehicles class from stored data veh = VehicleParams() for veh_params in flow_params["veh"]: module = __import__( "flow.controllers", fromlist=[veh_params['acceleration_controller'][0]]) acc_class = getattr(module, veh_params['acceleration_controller'][0]) lc_class = getattr(module, veh_params['lane_change_controller'][0]) acc_controller = (acc_class, veh_params['acceleration_controller'][1]) lc_controller = (lc_class, veh_params['lane_change_controller'][1]) rt_controller = None if veh_params['routing_controller'] is not None: rt_class = getattr(module, veh_params['routing_controller'][0]) rt_controller = (rt_class, veh_params['routing_controller'][1]) # TODO: make ambiguous car_following_params = SumoCarFollowingParams() car_following_params.__dict__ = veh_params["car_following_params"] # TODO: make ambiguous lane_change_params = SumoLaneChangeParams() lane_change_params.__dict__ = veh_params["lane_change_params"] del veh_params["car_following_params"], \ veh_params["lane_change_params"], \ veh_params["acceleration_controller"], \ veh_params["lane_change_controller"], \ veh_params["routing_controller"] veh.add( acceleration_controller=acc_controller, lane_change_controller=lc_controller, routing_controller=rt_controller, car_following_params=car_following_params, lane_change_params=lane_change_params, **veh_params) # convert all parameters from dict to their object form sim = SumoParams() # TODO: add check for simulation type sim.__dict__ = flow_params["sim"].copy() net = NetParams() net.__dict__ = flow_params["net"].copy() net.inflows = InFlows() if flow_params["net"]["inflows"]: net.inflows.__dict__ = flow_params["net"]["inflows"].copy() env = EnvParams() env.__dict__ = flow_params["env"].copy() initial = InitialConfig() if "initial" in flow_params: initial.__dict__ = flow_params["initial"].copy() tls = TrafficLightParams() if "tls" in flow_params: tls.__dict__ = flow_params["tls"].copy() flow_params["sim"] = sim flow_params["env"] = env flow_params["initial"] = initial flow_params["net"] = net flow_params["veh"] = veh flow_params["tls"] = tls return flow_params
[docs]def get_rllib_config(path): """Return the data from the specified rllib configuration file.""" config_path = os.path.join(path, "params.json") if not os.path.exists(config_path): config_path = os.path.join(path, "../params.json") if not os.path.exists(config_path): raise ValueError( "Could not find params.json in either the checkpoint dir or " "its parent directory.") with open(config_path) as f: config = json.load(f) return config
[docs]def get_rllib_pkl(path): """Return the data from the specified rllib configuration file.""" config_path = os.path.join(path, "params.pkl") if not os.path.exists(config_path): config_path = os.path.join(path, "../params.pkl") if not os.path.exists(config_path): raise ValueError( "Could not find params.pkl in either the checkpoint dir or " "its parent directory.") with open(config_path, 'rb') as f: config = cloudpickle.load(f) return config