Source code for flow.controllers.car_following_models

"""
Contains several custom car-following control models.

These controllers can be used to modify the acceleration behavior of vehicles
in Flow to match various prominent car-following models that can be calibrated.

Each controller includes the function ``get_accel(self, env) -> acc`` which,
using the current state of the world and existing parameters, uses the control
model to return a vehicle acceleration.
"""
import math
import numpy as np

from flow.controllers.base_controller import BaseController


[docs]class CFMController(BaseController): """CFM controller. Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : SumoCarFollowingParams see parent class k_d : float headway gain (default: 1) k_v : float gain on difference between lead velocity and current (default: 1) k_c : float gain on difference from desired velocity to current (default: 1) d_des : float desired headway (default: 1) v_des : float desired velocity (default: 8) time_delay : float, optional time delay (default: 0.0) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params, k_d=1, k_v=1, k_c=1, d_des=1, v_des=8, time_delay=0.0, noise=0, fail_safe=None, display_warnings=True): """Instantiate a CFM controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.veh_id = veh_id self.k_d = k_d self.k_v = k_v self.k_c = k_c self.d_des = d_des self.v_des = v_des
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) if not lead_id: # no car ahead return self.max_accel lead_vel = env.k.vehicle.get_speed(lead_id) this_vel = env.k.vehicle.get_speed(self.veh_id) d_l = env.k.vehicle.get_headway(self.veh_id) return self.k_d*(d_l - self.d_des) + self.k_v*(lead_vel - this_vel) + \ self.k_c*(self.v_des - this_vel)
[docs]class BCMController(BaseController): """Bilateral car-following model controller. This model looks ahead and behind when computing its acceleration. Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.params.SumoCarFollowingParams see parent class k_d : float gain on distances to lead/following cars (default: 1) k_v : float gain on vehicle velocity differences (default: 1) k_c : float gain on difference from desired velocity to current (default: 1) d_des : float desired headway (default: 1) v_des : float desired velocity (default: 8) time_delay : float time delay (default: 0.5) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params, k_d=1, k_v=1, k_c=1, d_des=1, v_des=8, time_delay=0.0, noise=0, fail_safe=None, display_warnings=True): """Instantiate a Bilateral car-following model controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.veh_id = veh_id self.k_d = k_d self.k_v = k_v self.k_c = k_c self.d_des = d_des self.v_des = v_des
[docs] def get_accel(self, env): """See parent class. From the paper: There would also be additional control rules that take into account minimum safe separation, relative speeds, speed limits, weather and lighting conditions, traffic density and traffic advisories """ lead_id = env.k.vehicle.get_leader(self.veh_id) if not lead_id: # no car ahead return self.max_accel lead_vel = env.k.vehicle.get_speed(lead_id) this_vel = env.k.vehicle.get_speed(self.veh_id) trail_id = env.k.vehicle.get_follower(self.veh_id) trail_vel = env.k.vehicle.get_speed(trail_id) headway = env.k.vehicle.get_headway(self.veh_id) footway = env.k.vehicle.get_headway(trail_id) return self.k_d * (headway - footway) + \ self.k_v * ((lead_vel - this_vel) - (this_vel - trail_vel)) + \ self.k_c * (self.v_des - this_vel)
[docs]class LACController(BaseController): """Linear Adaptive Cruise Control. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.params.SumoCarFollowingParams see parent class k_1 : float design parameter (default: 0.8) k_2 : float design parameter (default: 0.9) h : float desired time gap (default: 1.0) tau : float lag time between control input u and real acceleration a (default:0.1) time_delay : float time delay (default: 0.5) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params, k_1=0.3, k_2=0.4, h=1, tau=0.1, a=0, time_delay=0.0, noise=0, fail_safe=None, display_warnings=True): """Instantiate a Linear Adaptive Cruise controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.veh_id = veh_id self.k_1 = k_1 self.k_2 = k_2 self.h = h self.tau = tau self.a = a
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) lead_vel = env.k.vehicle.get_speed(lead_id) this_vel = env.k.vehicle.get_speed(self.veh_id) headway = env.k.vehicle.get_headway(self.veh_id) L = env.k.vehicle.get_length(self.veh_id) ex = headway - L - self.h * this_vel ev = lead_vel - this_vel u = self.k_1*ex + self.k_2*ev a_dot = -(self.a/self.tau) + (u/self.tau) self.a = a_dot*env.sim_step + self.a return self.a
[docs]class OVMController(BaseController): """Optimal Vehicle Model controller. Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.params.SumoCarFollowingParams see parent class alpha : float gain on desired velocity to current velocity difference (default: 0.6) beta : float gain on lead car velocity and self velocity difference (default: 0.9) h_st : float headway for stopping (default: 5) h_go : float headway for full speed (default: 35) v_max : float max velocity (default: 30) time_delay : float time delay (default: 0.5) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params, alpha=1, beta=1, h_st=2, h_go=15, v_max=30, time_delay=0, noise=0, fail_safe=None, display_warnings=True): """Instantiate an Optimal Vehicle Model controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.veh_id = veh_id self.v_max = v_max self.alpha = alpha self.beta = beta self.h_st = h_st self.h_go = h_go
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) if not lead_id: # no car ahead return self.max_accel lead_vel = env.k.vehicle.get_speed(lead_id) this_vel = env.k.vehicle.get_speed(self.veh_id) h = env.k.vehicle.get_headway(self.veh_id) h_dot = lead_vel - this_vel # V function here - input: h, output : Vh if h <= self.h_st: v_h = 0 elif self.h_st < h < self.h_go: v_h = self.v_max / 2 * (1 - math.cos(math.pi * (h - self.h_st) / (self.h_go - self.h_st))) else: v_h = self.v_max return self.alpha * (v_h - this_vel) + self.beta * h_dot
[docs]class LinearOVM(BaseController): """Linear OVM controller. Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.params.SumoCarFollowingParams see parent class v_max : float max velocity (default: 30) adaptation : float adaptation constant (default: 0.65) h_st : float headway for stopping (default: 5) time_delay : float time delay (default: 0.5) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params, v_max=30, adaptation=0.65, h_st=5, time_delay=0.0, noise=0, fail_safe=None, display_warnings=True): """Instantiate a Linear OVM controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.veh_id = veh_id # 4.8*1.85 for case I, 3.8*1.85 for case II, per Nakayama self.v_max = v_max # TAU in Traffic Flow Dynamics textbook self.adaptation = adaptation self.h_st = h_st
[docs] def get_accel(self, env): """See parent class.""" this_vel = env.k.vehicle.get_speed(self.veh_id) h = env.k.vehicle.get_headway(self.veh_id) # V function here - input: h, output : Vh alpha = 1.689 # the average value from Nakayama paper if h < self.h_st: v_h = 0 elif self.h_st <= h <= self.h_st + self.v_max / alpha: v_h = alpha * (h - self.h_st) else: v_h = self.v_max return (v_h - this_vel) / self.adaptation
[docs]class IDMController(BaseController): """Intelligent Driver Model (IDM) controller. For more information on this controller, see: Treiber, Martin, Ansgar Hennecke, and Dirk Helbing. "Congested traffic states in empirical observations and microscopic simulations." Physical review E 62.2 (2000): 1805. Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.param.SumoCarFollowingParams see parent class v0 : float desirable velocity, in m/s (default: 30) T : float safe time headway, in s (default: 1) a : float max acceleration, in m/s2 (default: 1) b : float comfortable deceleration, in m/s2 (default: 1.5) delta : float acceleration exponent (default: 4) s0 : float linear jam distance, in m (default: 2) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, v0=30, T=1, a=1, b=1.5, delta=4, s0=2, time_delay=0.0, noise=0, fail_safe=None, display_warnings=True, car_following_params=None): """Instantiate an IDM controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.v0 = v0 self.T = T self.a = a self.b = b self.delta = delta self.s0 = s0
[docs] def get_accel(self, env): """See parent class.""" v = env.k.vehicle.get_speed(self.veh_id) lead_id = env.k.vehicle.get_leader(self.veh_id) h = env.k.vehicle.get_headway(self.veh_id) # in order to deal with ZeroDivisionError if abs(h) < 1e-3: h = 1e-3 if lead_id is None or lead_id == '': # no car ahead s_star = 0 else: lead_vel = env.k.vehicle.get_speed(lead_id) s_star = self.s0 + max( 0, v * self.T + v * (v - lead_vel) / (2 * np.sqrt(self.a * self.b))) return self.a * (1 - (v / self.v0)**self.delta - (s_star / h)**2)
[docs]class SimCarFollowingController(BaseController): """Controller whose actions are purely defined by the simulator. Note that methods for implementing noise and failsafes through BaseController, are not available here. However, similar methods are available through sumo when initializing the parameters of the vehicle. Usage: See BaseController for usage example. """
[docs] def get_accel(self, env): """See parent class.""" return None
[docs]class GippsController(BaseController): """Gipps' Model controller. For more information on this controller, see: Traffic Flow Dynamics written by M.Treiber and A.Kesting By courtesy of Springer publisher, http://www.springer.com http://www.traffic-flow-dynamics.org/res/SampleChapter11.pdf Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.param.SumoCarFollowingParams see parent class v0 : float desirable velocity, in m/s (default: 30) acc : float max acceleration, in m/s2 (default: 1.5) b : float comfortable deceleration, in m/s2 (default: -1) b_l : float comfortable deceleration for leading vehicle , in m/s2 (default: -1) s0 : float linear jam distance for saftey, in m (default: 2) tau : float reaction time in s (default: 1) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params=None, v0=30, acc=1.5, b=-1, b_l=-1, s0=2, tau=1, delay=0, noise=0, fail_safe=None, display_warnings=True): """Instantiate a Gipps' controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.v_desired = v0 self.acc = acc self.b = b self.b_l = b_l self.s0 = s0 self.tau = tau
[docs] def get_accel(self, env): """See parent class.""" v = env.k.vehicle.get_speed(self.veh_id) h = env.k.vehicle.get_headway(self.veh_id) v_l = env.k.vehicle.get_speed( env.k.vehicle.get_leader(self.veh_id)) # get velocity dynamics v_acc = v + (2.5 * self.acc * self.tau * ( 1 - (v / self.v_desired)) * np.sqrt(0.025 + (v / self.v_desired))) v_safe = (self.tau * self.b) + np.sqrt(((self.tau**2) * (self.b**2)) - ( self.b * ((2 * (h-self.s0)) - (self.tau * v) - ((v_l**2) / self.b_l)))) v_next = min(v_acc, v_safe, self.v_desired) return (v_next-v)/env.sim_step
[docs]class BandoFTLController(BaseController): """Bando follow-the-leader controller. Usage ----- See BaseController for usage example. Attributes ---------- veh_id : str Vehicle ID for SUMO identification car_following_params : flow.core.params.SumoCarFollowingParams see parent class alpha : float gain on desired velocity to current velocity difference (default: 0.6) beta : float gain on lead car velocity and self velocity difference (default: 0.9) h_st : float headway for stopping (default: 5) h_go : float headway for full speed (default: 35) v_max : float max velocity (default: 30) time_delay : float time delay (default: 0.5) noise : float std dev of normal perturbation to the acceleration (default: 0) fail_safe : str type of flow-imposed failsafe the vehicle should posses, defaults to no failsafe (None) """ def __init__(self, veh_id, car_following_params, alpha=.5, beta=20, h_st=2, h_go=10, v_max=32, want_max_accel=False, time_delay=0, noise=0, fail_safe=None, display_warnings=True): """Instantiate an Bando controller.""" BaseController.__init__( self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise, display_warnings=display_warnings, ) self.veh_id = veh_id self.v_max = v_max self.alpha = alpha self.beta = beta self.h_st = h_st self.h_go = h_go self.want_max_accel = want_max_accel
[docs] def get_accel(self, env): """See parent class.""" lead_id = env.k.vehicle.get_leader(self.veh_id) if not lead_id: # no car ahead if self.want_max_accel: return self.max_accel v_l = env.k.vehicle.get_speed(lead_id) v = env.k.vehicle.get_speed(self.veh_id) s = env.k.vehicle.get_headway(self.veh_id) return self.accel_func(v, v_l, s)
[docs] def accel_func(self, v, v_l, s): """Compute the acceleration function.""" v_h = self.v_max * ((np.tanh(s/self.h_st-2)+np.tanh(2))/(1+np.tanh(2))) s_dot = v_l - v u = self.alpha * (v_h - v) + self.beta * s_dot/(s**2) return u