"""Contains the traffic light grid scenario class."""
from flow.networks.base import Network
from flow.core.params import InitialConfig
from flow.core.params import TrafficLightParams
from collections import defaultdict
import numpy as np
ADDITIONAL_NET_PARAMS = {
# dictionary of traffic light grid array data
"grid_array": {
# number of horizontal rows of edges
"row_num": 3,
# number of vertical columns of edges
"col_num": 2,
# length of inner edges in the traffic light grid network
"inner_length": None,
# length of edges where vehicles enter the network
"short_length": None,
# length of edges where vehicles exit the network
"long_length": None,
# number of cars starting at the edges heading to the top
"cars_top": 20,
# number of cars starting at the edges heading to the bottom
"cars_bot": 20,
# number of cars starting at the edges heading to the left
"cars_left": 20,
# number of cars starting at the edges heading to the right
"cars_right": 20,
},
# number of lanes in the horizontal edges
"horizontal_lanes": 1,
# number of lanes in the vertical edges
"vertical_lanes": 1,
# speed limit for all edges, may be represented as a float value, or a
# dictionary with separate values for vertical and horizontal lanes
"speed_limit": {
"horizontal": 35,
"vertical": 35
}
}
[docs]class TrafficLightGridNetwork(Network):
"""Traffic Light Grid network class.
The traffic light grid network consists of m vertical lanes and n
horizontal lanes, with a total of nxm intersections where the vertical
and horizontal edges meet.
Requires from net_params:
* **grid_array** : dictionary of grid array data, with the following keys
* **row_num** : number of horizontal rows of edges
* **col_num** : number of vertical columns of edges
* **inner_length** : length of inner edges in traffic light grid network
* **short_length** : length of edges that vehicles start on
* **long_length** : length of final edge in route
* **cars_top** : number of cars starting at the edges heading to the top
* **cars_bot** : number of cars starting at the edges heading to the
bottom
* **cars_left** : number of cars starting at the edges heading to the
left
* **cars_right** : number of cars starting at the edges heading to the
right
* **horizontal_lanes** : number of lanes in the horizontal edges
* **vertical_lanes** : number of lanes in the vertical edges
* **speed_limit** : speed limit for all edges. This may be represented as a
float value, or a dictionary with separate values for vertical and
horizontal lanes.
Usage
-----
>>> from flow.core.params import NetParams
>>> from flow.core.params import VehicleParams
>>> from flow.core.params import InitialConfig
>>> from flow.networks import TrafficLightGridNetwork
>>>
>>> network = TrafficLightGridNetwork(
>>> name='grid',
>>> vehicles=VehicleParams(),
>>> net_params=NetParams(
>>> additional_params={
>>> 'grid_array': {
>>> 'row_num': 3,
>>> 'col_num': 2,
>>> 'inner_length': 500,
>>> 'short_length': 500,
>>> 'long_length': 500,
>>> 'cars_top': 20,
>>> 'cars_bot': 20,
>>> 'cars_left': 20,
>>> 'cars_right': 20,
>>> },
>>> 'horizontal_lanes': 1,
>>> 'vertical_lanes': 1,
>>> 'speed_limit': {
>>> 'vertical': 35,
>>> 'horizontal': 35
>>> }
>>> },
>>> )
>>> )
"""
def __init__(self,
name,
vehicles,
net_params,
initial_config=InitialConfig(),
traffic_lights=TrafficLightParams()):
"""Initialize an n*m traffic light grid network."""
optional = ["tl_logic"]
for p in ADDITIONAL_NET_PARAMS.keys():
if p not in net_params.additional_params and p not in optional:
raise KeyError('Network parameter "{}" not supplied'.format(p))
for p in ADDITIONAL_NET_PARAMS["grid_array"].keys():
if p not in net_params.additional_params["grid_array"]:
raise KeyError(
'Grid array parameter "{}" not supplied'.format(p))
# retrieve all additional parameters
# refer to the ADDITIONAL_NET_PARAMS dict for more documentation
self.vertical_lanes = net_params.additional_params["vertical_lanes"]
self.horizontal_lanes = net_params.additional_params[
"horizontal_lanes"]
self.speed_limit = net_params.additional_params["speed_limit"]
if not isinstance(self.speed_limit, dict):
self.speed_limit = {
"horizontal": self.speed_limit,
"vertical": self.speed_limit
}
self.grid_array = net_params.additional_params["grid_array"]
self.row_num = self.grid_array["row_num"]
self.col_num = self.grid_array["col_num"]
self.inner_length = self.grid_array["inner_length"]
self.short_length = self.grid_array["short_length"]
self.long_length = self.grid_array["long_length"]
self.cars_heading_top = self.grid_array["cars_top"]
self.cars_heading_bot = self.grid_array["cars_bot"]
self.cars_heading_left = self.grid_array["cars_left"]
self.cars_heading_right = self.grid_array["cars_right"]
# specifies whether or not there will be traffic lights at the
# intersections (True by default)
self.use_traffic_lights = net_params.additional_params.get(
"traffic_lights", True)
# radius of the inner nodes (ie of the intersections)
self.inner_nodes_radius = 2.9 + 3.3 * max(self.vertical_lanes,
self.horizontal_lanes)
# total number of edges in the network
self.num_edges = 4 * ((self.col_num + 1) * self.row_num + self.col_num)
# name of the network (DO NOT CHANGE)
self.name = "BobLoblawsLawBlog"
super().__init__(name, vehicles, net_params, initial_config,
traffic_lights)
[docs] def specify_nodes(self, net_params):
"""See parent class."""
return self._inner_nodes + self._outer_nodes
[docs] def specify_edges(self, net_params):
"""See parent class."""
return self._inner_edges + self._outer_edges
[docs] def specify_routes(self, net_params):
"""See parent class."""
routes = defaultdict(list)
# build row routes (vehicles go from left to right and vice versa)
for i in range(self.row_num):
bot_id = "bot{}_0".format(i)
top_id = "top{}_{}".format(i, self.col_num)
for j in range(self.col_num + 1):
routes[bot_id] += ["bot{}_{}".format(i, j)]
routes[top_id] += ["top{}_{}".format(i, self.col_num - j)]
# build column routes (vehicles go from top to bottom and vice versa)
for j in range(self.col_num):
left_id = "left{}_{}".format(self.row_num, j)
right_id = "right0_{}".format(j)
for i in range(self.row_num + 1):
routes[left_id] += ["left{}_{}".format(self.row_num - i, j)]
routes[right_id] += ["right{}_{}".format(i, j)]
return routes
[docs] def specify_types(self, net_params):
"""See parent class."""
types = [{
"id": "horizontal",
"numLanes": self.horizontal_lanes,
"speed": self.speed_limit["horizontal"]
}, {
"id": "vertical",
"numLanes": self.vertical_lanes,
"speed": self.speed_limit["vertical"]
}]
return types
# ===============================
# ============ UTILS ============
# ===============================
@property
def _inner_nodes(self):
"""Build out the inner nodes of the network.
The inner nodes correspond to the intersections between the roads. They
are numbered from bottom left, increasing first across the columns and
then across the rows.
For example, the nodes in a traffic light grid with 2 rows and 3 columns
would be indexed as follows:
| | |
--- 3 --- 4 --- 5 ---
| | |
--- 0 --- 1 --- 2 ---
| | |
The id of a node is then "center{index}", for instance "center0" for
node 0, "center1" for node 1 etc.
Returns
-------
list <dict>
List of inner nodes
"""
node_type = "traffic_light" if self.use_traffic_lights else "priority"
nodes = []
for row in range(self.row_num):
for col in range(self.col_num):
nodes.append({
"id": "center{}".format(row * self.col_num + col),
"x": col * self.inner_length,
"y": row * self.inner_length,
"type": node_type,
"radius": self.inner_nodes_radius
})
return nodes
@property
def _outer_nodes(self):
"""Build out the outer nodes of the network.
The outer nodes correspond to the extremities of the roads. There are
two at each extremity, one where the vehicles enter the network
(inflow) and one where the vehicles exit the network (outflow).
Consider the following network with 2 rows and 3 columns, where the
extremities are marked by 'x', the rows are labeled from 0 to 1 and the
columns are labeled from 0 to 2:
x x x
| | |
(1) x----|-----|-----|----x (*)
| | |
(0) x----|-----|-----|----x
| | |
x x x
(0) (1) (2)
On row i, there are two nodes at the left extremity of the row, labeled
"left_row_short{i}" and "left_row_long{i}", as well as two nodes at the
right extremity labeled "right_row_short{i}" and "right_row_long{i}".
On column j, there are two nodes at the bottom extremity of the column,
labeled "bot_col_short{j}" and "bot_col_long{j}", as well as two nodes
at the top extremity labeled "top_col_short{j}" and "top_col_long{j}".
The "short" nodes correspond to where vehicles enter the network while
the "long" nodes correspond to where vehicles exit the network.
For example, at extremity (*) on row (1):
- the id of the input node is "right_row_short1"
- the id of the output node is "right_row_long1"
Returns
-------
list <dict>
List of outer nodes
"""
nodes = []
def new_node(x, y, name, i):
return [{"id": name + str(i), "x": x, "y": y, "type": "priority"}]
# build nodes at the extremities of columns
for col in range(self.col_num):
x = col * self.inner_length
y = (self.row_num - 1) * self.inner_length
nodes += new_node(x, - self.short_length, "bot_col_short", col)
nodes += new_node(x, - self.long_length, "bot_col_long", col)
nodes += new_node(x, y + self.short_length, "top_col_short", col)
nodes += new_node(x, y + self.long_length, "top_col_long", col)
# build nodes at the extremities of rows
for row in range(self.row_num):
x = (self.col_num - 1) * self.inner_length
y = row * self.inner_length
nodes += new_node(- self.short_length, y, "left_row_short", row)
nodes += new_node(- self.long_length, y, "left_row_long", row)
nodes += new_node(x + self.short_length, y, "right_row_short", row)
nodes += new_node(x + self.long_length, y, "right_row_long", row)
return nodes
@property
def _inner_edges(self):
"""Build out the inner edges of the network.
The inner edges are the edges joining the inner nodes to each other.
Consider the following network with n = 2 rows and m = 3 columns,
where the rows are indexed from 0 to 1 and the columns from 0 to 2, and
the inner nodes are marked by 'x':
| | |
(1) ----x-----x-----x----
| | |
(0) ----x-----x-(*)-x----
| | |
(0) (1) (2)
There are n * (m - 1) = 4 horizontal inner edges and (n - 1) * m = 3
vertical inner edges, all that multiplied by two because each edge
consists of two roads going in opposite directions traffic-wise.
On an horizontal edge, the id of the top road is "top{i}_{j}" and the
id of the bottom road is "bot{i}_{j}", where i is the index of the row
where the edge is and j is the index of the column to the right of it.
On a vertical edge, the id of the right road is "right{i}_{j}" and the
id of the left road is "left{i}_{j}", where i is the index of the row
above the edge and j is the index of the column where the edge is.
For example, on edge (*) on row (0): the id of the bottom road (traffic
going from left to right) is "bot0_2" and the id of the top road
(traffic going from right to left) is "top0_2".
Returns
-------
list <dict>
List of inner edges
"""
edges = []
def new_edge(index, from_node, to_node, orientation, lane):
return [{
"id": lane + index,
"type": orientation,
"priority": 78,
"from": "center" + str(from_node),
"to": "center" + str(to_node),
"length": self.inner_length
}]
# Build the horizontal inner edges
for i in range(self.row_num):
for j in range(self.col_num - 1):
node_index = i * self.col_num + j
index = "{}_{}".format(i, j + 1)
edges += new_edge(index, node_index + 1, node_index,
"horizontal", "top")
edges += new_edge(index, node_index, node_index + 1,
"horizontal", "bot")
# Build the vertical inner edges
for i in range(self.row_num - 1):
for j in range(self.col_num):
node_index = i * self.col_num + j
index = "{}_{}".format(i + 1, j)
edges += new_edge(index, node_index, node_index + self.col_num,
"vertical", "right")
edges += new_edge(index, node_index + self.col_num, node_index,
"vertical", "left")
return edges
@property
def _outer_edges(self):
"""Build out the outer edges of the network.
The outer edges are the edges joining the inner nodes to the outer
nodes.
Consider the following network with n = 2 rows and m = 3 columns,
where the rows are indexed from 0 to 1 and the columns from 0 to 2, the
inner nodes are marked by 'x' and the outer nodes by 'o':
o o o
| | |
(1) o---x----x----x-(*)-o
| | |
(0) o---x----x----x-----o
| | |
o o o
(0) (1) (2)
There are n * 2 = 4 horizontal outer edges and m * 2 = 6 vertical outer
edges, all that multiplied by two because each edge consists of two
roads going in opposite directions traffic-wise.
On row i, there are four horizontal edges: the left ones labeled
"bot{i}_0" (in) and "top{i}_0" (out) and the right ones labeled
"bot{i}_{m}" (out) and "top{i}_{m}" (in).
On column j, there are four vertical edges: the bottom ones labeled
"left0_{j}" (out) and "right0_{j}" (in) and the top ones labeled
"left{n}_{j}" (in) and "right{n}_{j}" (out).
For example, on edge (*) on row (1): the id of the bottom road (out)
is "bot1_3" and the id of the top road is "top1_3".
Edges labeled by "in" are edges where vehicles enter the network while
edges labeled by "out" are edges where vehicles exit the network.
Returns
-------
list <dict>
List of outer edges
"""
edges = []
def new_edge(index, from_node, to_node, orientation, length):
return [{
"id": index,
"type": {"v": "vertical", "h": "horizontal"}[orientation],
"priority": 78,
"from": from_node,
"to": to_node,
"length": length
}]
for i in range(self.col_num):
# bottom edges
id1 = "right0_{}".format(i)
id2 = "left0_{}".format(i)
node1 = "bot_col_short{}".format(i)
node2 = "center{}".format(i)
node3 = "bot_col_long{}".format(i)
edges += new_edge(id1, node1, node2, "v", self.short_length)
edges += new_edge(id2, node2, node3, "v", self.long_length)
# top edges
id1 = "left{}_{}".format(self.row_num, i)
id2 = "right{}_{}".format(self.row_num, i)
node1 = "top_col_short{}".format(i)
node2 = "center{}".format((self.row_num - 1) * self.col_num + i)
node3 = "top_col_long{}".format(i)
edges += new_edge(id1, node1, node2, "v", self.short_length)
edges += new_edge(id2, node2, node3, "v", self.long_length)
for j in range(self.row_num):
# left edges
id1 = "bot{}_0".format(j)
id2 = "top{}_0".format(j)
node1 = "left_row_short{}".format(j)
node2 = "center{}".format(j * self.col_num)
node3 = "left_row_long{}".format(j)
edges += new_edge(id1, node1, node2, "h", self.short_length)
edges += new_edge(id2, node2, node3, "h", self.long_length)
# right edges
id1 = "top{}_{}".format(j, self.col_num)
id2 = "bot{}_{}".format(j, self.col_num)
node1 = "right_row_short{}".format(j)
node2 = "center{}".format((j + 1) * self.col_num - 1)
node3 = "right_row_long{}".format(j)
edges += new_edge(id1, node1, node2, "h", self.short_length)
edges += new_edge(id2, node2, node3, "h", self.long_length)
return edges
[docs] def specify_connections(self, net_params):
"""Build out connections at each inner node.
Connections describe what happens at the intersections. Here we link
lanes in straight lines, which means vehicles cannot turn at
intersections, they can only continue in a straight line.
"""
con_dict = {}
def new_con(side, from_id, to_id, lane, signal_group):
return [{
"from": side + from_id,
"to": side + to_id,
"fromLane": str(lane),
"toLane": str(lane),
"signal_group": signal_group
}]
# build connections at each inner node
for i in range(self.row_num):
for j in range(self.col_num):
node_id = "{}_{}".format(i, j)
right_node_id = "{}_{}".format(i, j + 1)
top_node_id = "{}_{}".format(i + 1, j)
conn = []
for lane in range(self.vertical_lanes):
conn += new_con("bot", node_id, right_node_id, lane, 1)
conn += new_con("top", right_node_id, node_id, lane, 1)
for lane in range(self.horizontal_lanes):
conn += new_con("right", node_id, top_node_id, lane, 2)
conn += new_con("left", top_node_id, node_id, lane, 2)
node_id = "center{}".format(i * self.col_num + j)
con_dict[node_id] = conn
return con_dict
# TODO necessary?
[docs] def specify_edge_starts(self):
"""See parent class."""
edgestarts = []
for i in range(self.col_num + 1):
for j in range(self.row_num + 1):
index = "{}_{}".format(j, i)
if i != self.col_num:
edgestarts += [("left" + index, 0 + i * 50 + j * 5000),
("right" + index, 10 + i * 50 + j * 5000)]
if j != self.row_num:
edgestarts += [("top" + index, 15 + i * 50 + j * 5000),
("bot" + index, 20 + i * 50 + j * 5000)]
return edgestarts
# TODO necessary?
[docs] @staticmethod
def gen_custom_start_pos(cls, net_params, initial_config, num_vehicles):
"""See parent class."""
grid_array = net_params.additional_params["grid_array"]
row_num = grid_array["row_num"]
col_num = grid_array["col_num"]
cars_heading_left = grid_array["cars_left"]
cars_heading_right = grid_array["cars_right"]
cars_heading_top = grid_array["cars_top"]
cars_heading_bot = grid_array["cars_bot"]
start_pos = []
x0 = 6 # position of the first car
dx = 10 # distance between each car
start_lanes = []
for i in range(col_num):
start_pos += [("right0_{}".format(i), x0 + k * dx)
for k in range(cars_heading_right)]
start_pos += [("left{}_{}".format(row_num, i), x0 + k * dx)
for k in range(cars_heading_left)]
horz_lanes = np.random.randint(low=0, high=net_params.additional_params["horizontal_lanes"],
size=cars_heading_left + cars_heading_right).tolist()
start_lanes += horz_lanes
for i in range(row_num):
start_pos += [("top{}_{}".format(i, col_num), x0 + k * dx)
for k in range(cars_heading_top)]
start_pos += [("bot{}_0".format(i), x0 + k * dx)
for k in range(cars_heading_bot)]
vert_lanes = np.random.randint(low=0, high=net_params.additional_params["vertical_lanes"],
size=cars_heading_left + cars_heading_right).tolist()
start_lanes += vert_lanes
return start_pos, start_lanes
@property
def node_mapping(self):
"""Map nodes to edges.
Returns a list of pairs (node, connected edges) of all inner nodes
and for each of them, the 4 edges that leave this node.
The nodes are listed in alphabetical order, and within that, edges are
listed in order: [bot, right, top, left].
"""
mapping = {}
for row in range(self.row_num):
for col in range(self.col_num):
node_id = "center{}".format(row * self.col_num + col)
top_edge_id = "left{}_{}".format(row + 1, col)
bot_edge_id = "right{}_{}".format(row, col)
right_edge_id = "top{}_{}".format(row, col + 1)
left_edge_id = "bot{}_{}".format(row, col)
mapping[node_id] = [left_edge_id, bot_edge_id,
right_edge_id, top_edge_id]
return sorted(mapping.items(), key=lambda x: x[0])