Source code for flow.controllers.velocity_controllers

"""Contains a list of custom velocity controllers."""

from flow.controllers.base_controller import BaseController
import numpy as np


[docs]class FollowerStopper(BaseController): """Inspired by Dan Work's... work. Dissipation of stop-and-go waves via control of autonomous vehicles: Field experiments https://arxiv.org/abs/1705.01693 Usage ----- See base class for example. Parameters ---------- veh_id : str unique vehicle identifier v_des : float, optional desired speed of the vehicles (m/s) """ def __init__(self, veh_id, car_following_params, v_des=15, danger_edges=None): """Instantiate FollowerStopper.""" BaseController.__init__( self, veh_id, car_following_params, delay=1.0, fail_safe='safe_velocity') # desired speed of the vehicle self.v_des = v_des # maximum achievable acceleration by the vehicle self.max_accel = car_following_params.controller_params['accel'] # other parameters self.dx_1_0 = 4.5 self.dx_2_0 = 5.25 self.dx_3_0 = 6.0 self.d_1 = 1.5 self.d_2 = 1.0 self.d_3 = 0.5 self.danger_edges = danger_edges if danger_edges else {}
[docs] def find_intersection_dist(self, env): """Find distance to intersection. Parameters ---------- env : flow.envs.Env see flow/envs/base.py Returns ------- float distance from the vehicle's current position to the position of the node it is heading toward. """ edge_id = env.k.vehicle.get_edge(self.veh_id) # FIXME this might not be the best way of handling this if edge_id == "": return -10 if 'center' in edge_id: return 0 edge_len = env.k.network.edge_length(edge_id) relative_pos = env.k.vehicle.get_position(self.veh_id) dist = edge_len - relative_pos return dist
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) this_vel = env.k.vehicle.get_speed(self.veh_id) lead_vel = env.k.vehicle.get_speed(lead_id) if self.v_des is None: return None if lead_id is None: v_cmd = self.v_des else: dx = env.k.vehicle.get_headway(self.veh_id) dv_minus = min(lead_vel - this_vel, 0) dx_1 = self.dx_1_0 + 1 / (2 * self.d_1) * dv_minus**2 dx_2 = self.dx_2_0 + 1 / (2 * self.d_2) * dv_minus**2 dx_3 = self.dx_3_0 + 1 / (2 * self.d_3) * dv_minus**2 v = min(max(lead_vel, 0), self.v_des) # compute the desired velocity if dx <= dx_1: v_cmd = 0 elif dx <= dx_2: v_cmd = v * (dx - dx_1) / (dx_2 - dx_1) elif dx <= dx_3: v_cmd = v + (self.v_des - this_vel) * (dx - dx_2) \ / (dx_3 - dx_2) else: v_cmd = self.v_des edge = env.k.vehicle.get_edge(self.veh_id) if edge == "": return None if self.find_intersection_dist(env) <= 10 and \ env.k.vehicle.get_edge(self.veh_id) in self.danger_edges or \ env.k.vehicle.get_edge(self.veh_id)[0] == ":": return None else: # compute the acceleration from the desired velocity return (v_cmd - this_vel) / env.sim_step
[docs]class NonLocalFollowerStopper(FollowerStopper): """Follower stopper that uses the average system speed to compute its acceleration."""
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) this_vel = env.k.vehicle.get_speed(self.veh_id) lead_vel = env.k.vehicle.get_speed(lead_id) self.v_des = np.mean(env.k.vehicle.get_speed(env.k.vehicle.get_ids())) if self.v_des is None: return None if lead_id is None: v_cmd = self.v_des else: dx = env.k.vehicle.get_headway(self.veh_id) dv_minus = min(lead_vel - this_vel, 0) dx_1 = self.dx_1_0 + 1 / (2 * self.d_1) * dv_minus ** 2 dx_2 = self.dx_2_0 + 1 / (2 * self.d_2) * dv_minus ** 2 dx_3 = self.dx_3_0 + 1 / (2 * self.d_3) * dv_minus ** 2 v = min(max(lead_vel, 0), self.v_des) # compute the desired velocity if dx <= dx_1: v_cmd = 0 elif dx <= dx_2: v_cmd = v * (dx - dx_1) / (dx_2 - dx_1) elif dx <= dx_3: v_cmd = v + (self.v_des - this_vel) * (dx - dx_2) \ / (dx_3 - dx_2) else: v_cmd = self.v_des edge = env.k.vehicle.get_edge(self.veh_id) if edge == "": return None if self.find_intersection_dist(env) <= 10 and \ env.k.vehicle.get_edge(self.veh_id) in self.danger_edges or \ env.k.vehicle.get_edge(self.veh_id)[0] == ":": return None else: # compute the acceleration from the desired velocity return (v_cmd - this_vel) / env.sim_step
[docs]class PISaturation(BaseController): """Inspired by Dan Work's... work. Dissipation of stop-and-go waves via control of autonomous vehicles: Field experiments https://arxiv.org/abs/1705.01693 Usage ----- See base class for example. Parameters ---------- veh_id : str unique vehicle identifier car_following_params : flow.core.params.SumoCarFollowingParams object defining sumo-specific car-following parameters """ def __init__(self, veh_id, car_following_params): """Instantiate PISaturation.""" BaseController.__init__(self, veh_id, car_following_params, delay=1.0) # maximum achievable acceleration by the vehicle self.max_accel = car_following_params.controller_params['accel'] # history used to determine AV desired velocity self.v_history = [] # other parameters self.gamma = 2 self.g_l = 7 self.g_u = 30 self.v_catch = 1 # values that are updated by using their old information self.alpha = 0 self.beta = 1 - 0.5 * self.alpha self.U = 0 self.v_target = 0 self.v_cmd = 0
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) lead_vel = env.k.vehicle.get_speed(lead_id) this_vel = env.k.vehicle.get_speed(self.veh_id) dx = env.k.vehicle.get_headway(self.veh_id) dv = lead_vel - this_vel dx_s = max(2 * dv, 4) # update the AV's velocity history self.v_history.append(this_vel) if len(self.v_history) == int(38 / env.sim_step): del self.v_history[0] # update desired velocity values v_des = np.mean(self.v_history) v_target = v_des + self.v_catch \ * min(max((dx - self.g_l) / (self.g_u - self.g_l), 0), 1) # update the alpha and beta values alpha = min(max((dx - dx_s) / self.gamma, 0), 1) beta = 1 - 0.5 * alpha # compute desired velocity self.v_cmd = beta * (alpha * v_target + (1 - alpha) * lead_vel) \ + (1 - beta) * self.v_cmd # compute the acceleration accel = (self.v_cmd - this_vel) / env.sim_step return min(accel, self.max_accel)