Source code for flow.networks.bottleneck

"""Contains the bottleneck network class."""

from flow.core.params import InitialConfig
from flow.core.params import TrafficLightParams
from flow.networks.base import Network
import numpy as np

ADDITIONAL_NET_PARAMS = {
    # the factor multiplying number of lanes.
    "scaling": 1,
    # edge speed limit
    'speed_limit': 23
}


[docs]class BottleneckNetwork(Network): """Network class for bottleneck simulations. This network acts as a scalable representation of the Bay Bridge. It consists of a two-stage lane-drop bottleneck where 4n lanes reduce to 2n and then to n, where n is the scaling value. The length of the bottleneck is fixed. Requires from net_params: * **scaling** : the factor multiplying number of lanes * **speed_limit** : edge speed limit Usage ----- >>> from flow.core.params import NetParams >>> from flow.core.params import VehicleParams >>> from flow.core.params import InitialConfig >>> from flow.networks import BottleneckNetwork >>> >>> network = BottleneckNetwork( >>> name='bottleneck', >>> vehicles=VehicleParams(), >>> net_params=NetParams( >>> additional_params={ >>> 'scaling': 1, >>> 'speed_limit': 1, >>> }, >>> ) >>> ) """ def __init__(self, name, vehicles, net_params, initial_config=InitialConfig(), traffic_lights=TrafficLightParams()): """Instantiate the network class.""" for p in ADDITIONAL_NET_PARAMS.keys(): if p not in net_params.additional_params: raise KeyError('Network parameter "{}" not supplied'.format(p)) super().__init__(name, vehicles, net_params, initial_config, traffic_lights)
[docs] def specify_nodes(self, net_params): """See parent class.""" nodes = [ { "id": "1", "x": 0, "y": 0 }, # pre-toll { "id": "2", "x": 100, "y": 0 }, # toll { "id": "3", "x": 410, "y": 0 }, # light { "id": "4", "x": 550, "y": 0, "type": "zipper", "radius": 20 }, # merge1 { "id": "5", "x": 830, "y": 0, "type": "zipper", "radius": 20 }, # merge2 { "id": "6", "x": 985, "y": 0 }, # fake nodes used for visualization { "id": "fake1", "x": 0, "y": 1 }, { "id": "fake2", "x": 0, "y": 2 } ] # post-merge2 return nodes
[docs] def specify_edges(self, net_params): """See parent class.""" scaling = net_params.additional_params.get("scaling", 1) speed = net_params.additional_params['speed_limit'] assert (isinstance(scaling, int)), "Scaling must be an int" edges = [ { "id": "1", "from": "1", "to": "2", "length": 100, "spreadType": "center", "numLanes": 4 * scaling, "speed": speed }, { "id": "2", "from": "2", "to": "3", "length": 310, "spreadType": "center", "numLanes": 4 * scaling, "speed": speed }, { "id": "3", "from": "3", "to": "4", "length": 140, "spreadType": "center", "numLanes": 4 * scaling, "speed": speed }, { "id": "4", "from": "4", "to": "5", "length": 280, "spreadType": "center", "numLanes": 2 * scaling, "speed": speed }, { "id": "5", "from": "5", "to": "6", "length": 155, "spreadType": "center", "numLanes": scaling, "speed": speed }, # fake edge used for visualization { "id": "fake_edge", "from": "fake1", "to": "fake2", "length": 1, "spreadType": "center", "numLanes": scaling, "speed": speed } ] return edges
[docs] def specify_connections(self, net_params): """See parent class.""" scaling = net_params.additional_params.get("scaling", 1) conn_dic = {} conn = [] for i in range(4 * scaling): conn += [{ "from": "3", "to": "4", "fromLane": i, "toLane": int(np.floor(i / 2)) }] conn_dic["4"] = conn conn = [] for i in range(2 * scaling): conn += [{ "from": "4", "to": "5", "fromLane": i, "toLane": int(np.floor(i / 2)) }] conn_dic["5"] = conn return conn_dic
[docs] def specify_centroids(self, net_params): """See parent class.""" centroids = [] centroids += [{ "id": "1", "from": None, "to": "1", "x": -30, "y": 0, }] centroids += [{ "id": "1", "from": "5", "to": None, "x": 985 + 30, "y": 0, }] return centroids
[docs] def specify_routes(self, net_params): """See parent class.""" rts = { "1": ["1", "2", "3", "4", "5"], "2": ["2", "3", "4", "5"], "3": ["3", "4", "5"], "4": ["4", "5"], "5": ["5"] } return rts
[docs] def specify_edge_starts(self): """See parent class.""" return [("1", 0), ("2", 100), ("3", 405), ("4", 425), ("5", 580)]
[docs] def get_bottleneck_lanes(self, lane): """Return the reduced number of lanes.""" return [int(lane / 2), int(lane / 4)]