Source code for flow.networks.highway_ramps

"""Contains the highway with ramps network class."""

from flow.networks.base import Network
from flow.core.params import InitialConfig, TrafficLightParams
from collections import defaultdict
from numpy import pi, sin, cos


ADDITIONAL_NET_PARAMS = {
    # lengths of highway, on-ramps and off-ramps respectively
    "highway_length": 300,
    "on_ramps_length": 100,
    "off_ramps_length": 100,
    # number of lanes on highway, on-ramps and off-ramps respectively
    "highway_lanes": 1,
    "on_ramps_lanes": 1,
    "off_ramps_lanes": 1,
    # speed limit on highway, on-ramps and off-ramps respectively
    "highway_speed": 10,
    "on_ramps_speed": 10,
    "off_ramps_speed": 10,
    # positions of the on-ramps
    "on_ramps_pos": [],
    # positions of the off-ramps
    "off_ramps_pos": [],
    # probability for a vehicle to exit the highway at the next off-ramp
    "next_off_ramp_proba": 0.2,
    # ramps angles
    "angle_on_ramps": - 3 * pi / 4,
    "angle_off_ramps": - pi / 4
}


[docs]class HighwayRampsNetwork(Network): """Network class for a highway section with on and off ramps. This network consists of a single or multi-lane highway network with a variable number of on-ramps and off-ramps at arbitrary positions, with arbitrary numbers of lanes. It can be used to generate periodic perturbation on a more realistic highway. Parameters in net_params: * **highway_length** : total length of the highway * **on_ramps_length** : length of each on-ramp * **off_ramps_length** : length of each off-ramp * **highway_lanes** : number of lanes on the highway * **on_ramps_lanes** : number of lanes on each on-ramp * **off_ramps_lanes** : number of lanes on each off-ramp * **highway_speed** : speed limit on the highway * **on_ramps_speed** : speed limit on each on-ramp * **off_ramps_speed** : speed limit on each off-ramp * **on_ramps_pos** : positions of the in-ramps on the highway (int list) * **off_ramps_pos** : positions of the off-ramps on the highway (int list) * **next_off_ramp_proba** : probability for a vehicle to exit the highway at the next off-ramp """ def __init__(self, name, vehicles, net_params, initial_config=InitialConfig(), traffic_lights=TrafficLightParams()): """Initialize a highway with on and off ramps network.""" for p in ADDITIONAL_NET_PARAMS.keys(): if p not in net_params.additional_params: raise KeyError('Network parameter "{}" not supplied'.format(p)) # load parameters into class params = net_params.additional_params self.highway_length = params['highway_length'] self.on_ramps_length = params['on_ramps_length'] self.off_ramps_length = params['off_ramps_length'] self.highway_lanes = params['highway_lanes'] self.on_ramps_lanes = params['on_ramps_lanes'] self.off_ramps_lanes = params['off_ramps_lanes'] self.highway_speed = params['highway_speed'] self.on_ramps_speed = params['on_ramps_speed'] self.off_ramps_speed = params['off_ramps_speed'] self.on_ramps_pos = params['on_ramps_pos'] self.off_ramps_pos = params['off_ramps_pos'] self.p = params['next_off_ramp_proba'] self.angle_on_ramps = params['angle_on_ramps'] self.angle_off_ramps = params['angle_off_ramps'] # generate position of all network nodes self.ramps_pos = sorted(self.on_ramps_pos + self.off_ramps_pos) self.nodes_pos = sorted(list(set([0] + self.ramps_pos + [self.highway_length]))) # highway_pos[x] = id of the highway node whose starting position is x self.highway_pos = {x: i for i, x in enumerate(self.nodes_pos)} # ramp_pos[x] = id of the ramp node whose intersection with the highway # is at position x self.ramp_pos = {x: "on_ramp_{}".format(i) for i, x in enumerate(self.on_ramps_pos)} self.ramp_pos.update({x: "off_ramp_{}".format(i) for i, x in enumerate(self.off_ramps_pos)}) # make sure network is constructable if (len(self.ramps_pos) > 0 and (min(self.ramps_pos) <= 0 or max(self.ramps_pos) >= self.highway_length)): raise ValueError('All ramps positions should be positive and less ' 'than highway length. Current ramps positions: {}' '. Current highway length: {}.'.format( self.ramps_pos, self.highway_length)) if len(self.ramps_pos) != len(list(set(self.ramps_pos))): raise ValueError('Two ramps positions cannot be equal.') super().__init__(name, vehicles, net_params, initial_config, traffic_lights)
[docs] def specify_nodes(self, net_params): """See parent class.""" nodes_highway = [{ "id": "highway_{}".format(i), "x": self.nodes_pos[i], "y": 0, "radius": 10 } for i in range(len(self.nodes_pos))] nodes_on_ramps = [{ "id": "on_ramp_{}".format(i), "x": x + self.on_ramps_length * cos(self.angle_on_ramps), "y": self.on_ramps_length * sin(self.angle_on_ramps) } for i, x in enumerate(self.on_ramps_pos)] nodes_off_ramps = [{ "id": "off_ramp_{}".format(i), "x": x + self.off_ramps_length * cos(self.angle_off_ramps), "y": self.off_ramps_length * sin(self.angle_off_ramps) } for i, x in enumerate(self.off_ramps_pos)] return nodes_highway + nodes_on_ramps + nodes_off_ramps
[docs] def specify_edges(self, net_params): """See parent class.""" highway_edges = [{ "id": "highway_{}".format(i), "type": "highway", "from": "highway_{}".format(i), "to": "highway_{}".format(i + 1), "length": self.nodes_pos[i + 1] - self.nodes_pos[i] } for i in range(len(self.nodes_pos) - 1)] on_ramps_edges = [{ "id": "on_ramp_{}".format(i), "type": "on_ramp", "from": "on_ramp_{}".format(i), "to": "highway_{}".format(self.highway_pos[x]), "length": self.on_ramps_length } for i, x in enumerate(self.on_ramps_pos)] off_ramps_edges = [{ "id": "off_ramp_{}".format(i), "type": "off_ramp", "from": "highway_{}".format(self.highway_pos[x]), "to": "off_ramp_{}".format(i), "length": self.off_ramps_length } for i, x in enumerate(self.off_ramps_pos)] return highway_edges + on_ramps_edges + off_ramps_edges
[docs] def specify_routes(self, net_params): """See parent class.""" def get_routes(start_node_pos): """Compute the routes recursively.""" if start_node_pos not in self.nodes_pos: raise ValueError('{} is not a node position.'.format( start_node_pos)) id_highway_node = self.highway_pos[start_node_pos] if id_highway_node + 1 >= len(self.nodes_pos): return [(["highway_{}".format(id_highway_node - 1)], 1)] id_ramp_node = self.ramp_pos[start_node_pos] routes = get_routes(self.nodes_pos[id_highway_node + 1]) if id_ramp_node.startswith("on"): return ([ (["highway_{}".format(id_highway_node - 1)] + route, prob) for route, prob in routes if not route[0].startswith("on") ] + [ ([id_ramp_node] + route, prob) for route, prob in routes if not route[0].startswith("on") ] + [ (route, prob) for route, prob in routes if route[0].startswith("on") ]) else: return ([ (["highway_{}".format(id_highway_node - 1)] + route, (1 - self.p) * prob) for route, prob in routes if not route[0].startswith("on") ] + [ (["highway_{}".format(id_highway_node - 1), id_ramp_node], self.p * prob) for route, prob in routes if not route[0].startswith("on") ] + [ (route, prob) for route, prob in routes if route[0].startswith("on") ]) routes = get_routes(self.nodes_pos[1]) rts = defaultdict(list) for route, prob in routes: rts[route[0]].append((route, prob)) return rts
[docs] def specify_types(self, net_params): """See parent class.""" types = [{ "id": "highway", "numLanes": self.highway_lanes, "speed": self.highway_speed }, { "id": "on_ramp", "numLanes": self.on_ramps_lanes, "speed": self.on_ramps_speed }, { "id": "off_ramp", "numLanes": self.off_ramps_lanes, "speed": self.off_ramps_speed }] return types