Source code for flow.core.rewards

"""A series of reward functions."""

import numpy as np


[docs]def desired_velocity(env, fail=False, edge_list=None): r"""Encourage proximity to a desired velocity. This function measures the deviation of a system of vehicles from a user-specified desired velocity peaking when all vehicles in the ring are set to this desired velocity. Moreover, in order to ensure that the reward function naturally punishing the early termination of rollouts due to collisions or other failures, the function is formulated as a mapping :math:`r: \\mathcal{S} \\times \\mathcal{A} \\rightarrow \\mathbb{R}_{\\geq 0}`. This is done by subtracting the deviation of the system from the desired velocity from the peak allowable deviation from the desired velocity. Additionally, since the velocity of vehicles are unbounded above, the reward is bounded below by zero, to ensure nonnegativity. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. fail : bool, optional specifies if any crash or other failure occurred in the system edge_list : list of str, optional list of edges the reward is computed over. If no edge_list is defined, the reward is computed over all edges Returns ------- float reward value """ if edge_list is None: veh_ids = env.k.vehicle.get_ids() else: veh_ids = env.k.vehicle.get_ids_by_edge(edge_list) vel = np.array(env.k.vehicle.get_speed(veh_ids)) num_vehicles = len(veh_ids) if any(vel < -100) or fail or num_vehicles == 0: return 0. target_vel = env.env_params.additional_params['target_velocity'] max_cost = np.array([target_vel] * num_vehicles) max_cost = np.linalg.norm(max_cost) cost = vel - target_vel cost = np.linalg.norm(cost) # epsilon term (to deal with ZeroDivisionError exceptions) eps = np.finfo(np.float32).eps return max(max_cost - cost, 0) / (max_cost + eps)
[docs]def average_velocity(env, fail=False): """Encourage proximity to an average velocity. This reward function returns the average velocity of all vehicles in the system. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. fail : bool, optional specifies if any crash or other failure occurred in the system Returns ------- float reward value """ vel = np.array(env.k.vehicle.get_speed(env.k.vehicle.get_ids())) if any(vel < -100) or fail: return 0. if len(vel) == 0: return 0. return np.mean(vel)
[docs]def rl_forward_progress(env, gain=0.1): """Rewared function used to reward the RL vehicles for travelling forward. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. gain : float specifies how much to reward the RL vehicles Returns ------- float reward value """ rl_velocity = env.k.vehicle.get_speed(env.k.vehicle.get_rl_ids()) rl_norm_vel = np.linalg.norm(rl_velocity, 1) return rl_norm_vel * gain
[docs]def boolean_action_penalty(discrete_actions, gain=1.0): """Penalize boolean actions that indicate a switch.""" return gain * np.sum(discrete_actions)
[docs]def min_delay(env): """Reward function used to encourage minimization of total delay. This function measures the deviation of a system of vehicles from all the vehicles smoothly travelling at a fixed speed to their destinations. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. Returns ------- float reward value """ vel = np.array(env.k.vehicle.get_speed(env.k.vehicle.get_ids())) vel = vel[vel >= -1e-6] v_top = max( env.k.network.speed_limit(edge) for edge in env.k.network.get_edge_list()) time_step = env.sim_step max_cost = time_step * sum(vel.shape) # epsilon term (to deal with ZeroDivisionError exceptions) eps = np.finfo(np.float32).eps cost = time_step * sum((v_top - vel) / v_top) return max((max_cost - cost) / (max_cost + eps), 0)
[docs]def avg_delay_specified_vehicles(env, veh_ids): """Calculate the average delay for a set of vehicles in the system. Parameters ---------- env: flow.envs.Env the environment variable, which contains information on the current state of the system. veh_ids: a list of the ids of the vehicles, for which we are calculating average delay Returns ------- float average delay """ sum = 0 for edge in env.k.network.get_edge_list(): for veh_id in env.k.vehicle.get_ids_by_edge(edge): v_top = env.k.network.speed_limit(edge) sum += (v_top - env.k.vehicle.get_speed(veh_id)) / v_top time_step = env.sim_step try: cost = time_step * sum return cost / len(veh_ids) except ZeroDivisionError: return 0
[docs]def min_delay_unscaled(env): """Return the average delay for all vehicles in the system. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. Returns ------- float reward value """ vel = np.array(env.k.vehicle.get_speed(env.k.vehicle.get_ids())) vel = vel[vel >= -1e-6] v_top = max( env.k.network.speed_limit(edge) for edge in env.k.network.get_edge_list()) time_step = env.sim_step # epsilon term (to deal with ZeroDivisionError exceptions) eps = np.finfo(np.float32).eps cost = time_step * sum((v_top - vel) / v_top) return cost / (env.k.vehicle.num_vehicles + eps)
[docs]def penalize_standstill(env, gain=1): """Reward function that penalizes vehicle standstill. Is it better for this to be: a) penalize standstill in general? b) multiplicative based on time that vel=0? Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. gain : float multiplicative factor on the action penalty Returns ------- float reward value """ veh_ids = env.k.vehicle.get_ids() vel = np.array(env.k.vehicle.get_speed(veh_ids)) num_standstill = len(vel[vel == 0]) penalty = gain * num_standstill return -penalty
[docs]def penalize_near_standstill(env, thresh=0.3, gain=1): """Reward function which penalizes vehicles at a low velocity. This reward function is used to penalize vehicles below a specified threshold. This assists with discouraging RL from gamifying a network, which can result in standstill behavior or similarly bad, near-zero velocities. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current thresh : float the velocity threshold below which penalties are applied gain : float multiplicative factor on the action penalty """ veh_ids = env.k.vehicle.get_ids() vel = np.array(env.k.vehicle.get_speed(veh_ids)) penalize = len(vel[vel < thresh]) penalty = gain * penalize return -penalty
[docs]def penalize_headway_variance(vehicles, vids, normalization=1, penalty_gain=1, penalty_exponent=1): """Reward function used to train rl vehicles to encourage large headways. Parameters ---------- vehicles : flow.core.kernel.vehicle.KernelVehicle contains the state of all vehicles in the network (generally self.vehicles) vids : list of str list of ids for vehicles normalization : float, optional constant for scaling (down) the headways penalty_gain : float, optional sets the penalty for each vehicle between 0 and this value penalty_exponent : float, optional used to allow exponential punishing of smaller headways """ headways = penalty_gain * np.power( np.array( [vehicles.get_headway(veh_id) / normalization for veh_id in vids]), penalty_exponent) return -np.var(headways)
[docs]def punish_rl_lane_changes(env, penalty=1): """Penalize an RL vehicle performing lane changes. This reward function is meant to minimize the number of lane changes and RL vehicle performs. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. penalty : float, optional penalty imposed on the reward function for any rl lane change action """ total_lane_change_penalty = 0 for veh_id in env.k.vehicle.get_rl_ids(): if env.k.vehicle.get_last_lc(veh_id) == env.timer: total_lane_change_penalty -= penalty return total_lane_change_penalty
[docs]def energy_consumption(env, gain=.001): """Calculate power consumption of a vehicle. Assumes vehicle is an average sized vehicle. The power calculated here is the lower bound of the actual power consumed by a vehicle. """ power = 0 M = 1200 # mass of average sized vehicle (kg) g = 9.81 # gravitational acceleration (m/s^2) Cr = 0.005 # rolling resistance coefficient Ca = 0.3 # aerodynamic drag coefficient rho = 1.225 # air density (kg/m^3) A = 2.6 # vehicle cross sectional area (m^2) for veh_id in env.k.vehicle.get_ids(): speed = env.k.vehicle.get_speed(veh_id) prev_speed = env.k.vehicle.get_previous_speed(veh_id) accel = abs(speed - prev_speed) / env.sim_step power += M * speed * accel + M * g * Cr * speed + 0.5 * rho * A * Ca * speed ** 3 return -gain * power
[docs]def veh_energy_consumption(env, veh_id, gain=.001): """Calculate power consumption of a vehicle. Assumes vehicle is an average sized vehicle. The power calculated here is the lower bound of the actual power consumed by a vehicle. """ power = 0 M = 1200 # mass of average sized vehicle (kg) g = 9.81 # gravitational acceleration (m/s^2) Cr = 0.005 # rolling resistance coefficient Ca = 0.3 # aerodynamic drag coefficient rho = 1.225 # air density (kg/m^3) A = 2.6 # vehicle cross sectional area (m^2) speed = env.k.vehicle.get_speed(veh_id) prev_speed = env.k.vehicle.get_previous_speed(veh_id) accel = abs(speed - prev_speed) / env.sim_step power += M * speed * accel + M * g * Cr * speed + 0.5 * rho * A * Ca * speed ** 3 return -gain * power
[docs]def miles_per_megajoule(env, veh_ids=None, gain=.001): """Calculate miles per mega-joule of either a particular vehicle or the total average of all the vehicles. Assumes vehicle is an average sized vehicle. The power calculated here is the lower bound of the actual power consumed by a vehicle. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. veh_ids : [list] list of veh_ids to compute the reward over gain : float scaling factor for the reward """ mpj = 0 counter = 0 if veh_ids is None: veh_ids = env.k.vehicle.get_ids() elif not isinstance(veh_ids, list): veh_ids = [veh_ids] for veh_id in veh_ids: speed = env.k.vehicle.get_speed(veh_id) # convert to be positive since the function called is a penalty power = -veh_energy_consumption(env, veh_id, gain=1.0) if power > 0 and speed >= 0.0: counter += 1 # meters / joule is (v * \delta t) / (power * \delta t) mpj += speed / power if counter > 0: mpj /= counter # convert from meters per joule to miles per joule mpj /= 1609.0 # convert from miles per joule to miles per megajoule mpj *= 10**6 return mpj * gain
[docs]def miles_per_gallon(env, veh_ids=None, gain=.001): """Calculate mpg of either a particular vehicle or the total average of all the vehicles. Assumes vehicle is an average sized vehicle. The power calculated here is the lower bound of the actual power consumed by a vehicle. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. veh_ids : [list] list of veh_ids to compute the reward over gain : float scaling factor for the reward """ mpg = 0 counter = 0 if veh_ids is None: veh_ids = env.k.vehicle.get_ids() elif not isinstance(veh_ids, list): veh_ids = [veh_ids] for veh_id in veh_ids: speed = env.k.vehicle.get_speed(veh_id) gallons_per_s = env.k.vehicle.get_fuel_consumption(veh_id) if gallons_per_s > 0 and speed >= 0.0: counter += 1 # meters / gallon is (v * \delta t) / (gallons_per_s * \delta t) mpg += speed / gallons_per_s if counter > 0: mpg /= counter # convert from meters per gallon to miles per gallon mpg /= 1609.0 return mpg * gain