"""Contains a list of custom routing controllers."""
import random
import numpy as np
from flow.controllers.base_routing_controller import BaseRouter
[docs]class ContinuousRouter(BaseRouter):
"""A router used to continuously re-route of the vehicle in a closed ring.
This class is useful if vehicles are expected to continuously follow the
same route, and repeat said route once it reaches its end.
Usage
-----
See base class for usage example.
"""
[docs] def choose_route(self, env):
"""See parent class.
Adopt one of the current edge's routes if about to leave the network.
"""
edge = env.k.vehicle.get_edge(self.veh_id)
current_route = env.k.vehicle.get_route(self.veh_id)
if len(current_route) == 0:
# this occurs to inflowing vehicles, whose information is not added
# to the subscriptions in the first step that they departed
return None
elif edge == current_route[-1]:
# choose one of the available routes based on the fraction of times
# the given route can be chosen
num_routes = len(env.available_routes[edge])
frac = [val[1] for val in env.available_routes[edge]]
route_id = np.random.choice(
[i for i in range(num_routes)], size=1, p=frac)[0]
# pass the chosen route
return env.available_routes[edge][route_id][0]
else:
return None
[docs]class MinicityRouter(BaseRouter):
"""A router used to continuously re-route vehicles in minicity network.
This class allows the vehicle to pick a random route at junctions.
Usage
-----
See base class for usage example.
"""
[docs] def choose_route(self, env):
"""See parent class."""
vehicles = env.k.vehicle
veh_id = self.veh_id
veh_edge = vehicles.get_edge(veh_id)
veh_route = vehicles.get_route(veh_id)
veh_next_edge = env.k.network.next_edge(veh_edge,
vehicles.get_lane(veh_id))
not_an_edge = ":"
no_next = 0
if len(veh_next_edge) == no_next:
next_route = None
elif veh_route[-1] == veh_edge:
random_route = random.randint(0, len(veh_next_edge) - 1)
while veh_next_edge[0][0][0] == not_an_edge:
veh_next_edge = env.k.network.next_edge(
veh_next_edge[random_route][0],
veh_next_edge[random_route][1])
next_route = [veh_edge, veh_next_edge[0][0]]
else:
next_route = None
if veh_edge in ['e_37', 'e_51']:
next_route = [veh_edge, 'e_29_u', 'e_21']
return next_route
[docs]class GridRouter(BaseRouter):
"""A router used to re-route a vehicle in a traffic light grid environment.
Usage
-----
See base class for usage example.
"""
[docs] def choose_route(self, env):
"""See parent class."""
if len(env.k.vehicle.get_route(self.veh_id)) == 0:
# this occurs to inflowing vehicles, whose information is not added
# to the subscriptions in the first step that they departed
return None
elif env.k.vehicle.get_edge(self.veh_id) == \
env.k.vehicle.get_route(self.veh_id)[-1]:
return [env.k.vehicle.get_edge(self.veh_id)]
else:
return None
[docs]class BayBridgeRouter(ContinuousRouter):
"""Assists in choosing routes in select cases for the Bay Bridge network.
Extension to the Continuous Router.
Usage
-----
See base class for usage example.
"""
[docs] def choose_route(self, env):
"""See parent class."""
edge = env.k.vehicle.get_edge(self.veh_id)
lane = env.k.vehicle.get_lane(self.veh_id)
if edge == "183343422" and lane in [2] \
or edge == "124952179" and lane in [1, 2]:
new_route = env.available_routes[edge + "_1"][0][0]
else:
new_route = super().choose_route(env)
return new_route
[docs]class I210Router(ContinuousRouter):
"""Assists in choosing routes in select cases for the I-210 sub-network.
Extension to the Continuous Router.
Usage
-----
See base class for usage example.
"""
[docs] def choose_route(self, env):
"""See parent class."""
edge = env.k.vehicle.get_edge(self.veh_id)
lane = env.k.vehicle.get_lane(self.veh_id)
# vehicles on these edges in lanes 4 and 5 are not going to be able to
# make it out in time
if edge == "119257908#1-AddedOffRampEdge" and lane in [5, 4, 3]:
new_route = env.available_routes[
"119257908#1-AddedOffRampEdge"][0][0]
else:
new_route = super().choose_route(env)
return new_route