shortest path for rerouting

This commit is contained in:
Pranav Malladi
2025-09-27 19:12:09 -04:00
parent 85e30a70a7
commit c939e0a2aa

View File

@@ -7,8 +7,10 @@ Provides:
Never hardcode API keys in source. Provide via api_key argument or set OPENWEATHER_API_KEY / OPENWEATHER_KEY env var. Never hardcode API keys in source. Provide via api_key argument or set OPENWEATHER_API_KEY / OPENWEATHER_KEY env var.
""" """
import os import os
from typing import Tuple, Dict, Any, Optional from typing import Tuple, Dict, Any, Optional, Callable, List
import requests import requests
import heapq
import math
def _get_api_key(explicit_key: Optional[str] = None) -> Optional[str]: def _get_api_key(explicit_key: Optional[str] = None) -> Optional[str]:
if explicit_key: if explicit_key:
@@ -115,3 +117,226 @@ def fetch_road_risk(lat: float, lon: float, extra_params: Optional[dict] = None,
features["road_risk_score"] = float(risk) features["road_risk_score"] = float(risk)
return data, features return data, features
def _haversine_km(a_lat: float, a_lon: float, b_lat: float, b_lon: float) -> float:
# returns distance in kilometers
R = 6371.0
lat1, lon1, lat2, lon2 = map(math.radians, (a_lat, a_lon, b_lat, b_lon))
dlat = lat2 - lat1
dlon = lon2 - lon1
h = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
return 2 * R * math.asin(min(1.0, math.sqrt(h)))
def risk_to_index(risk_score: float, max_risk: float = 10.0, num_bins: int = 10) -> int:
"""
Map a numeric risk_score to an integer index 1..num_bins (higher => more risky).
Uses equal-width bins: 0..(max_risk/num_bins) -> 1, ..., >=max_risk -> num_bins.
"""
if risk_score is None:
return 1
r = float(risk_score)
if r <= 0:
return 1
if r >= max_risk:
return num_bins
bin_width = max_risk / float(num_bins)
return int(r // bin_width) + 1
def get_risk_score(lat: float, lon: float, **fetch_kwargs) -> float:
"""
Wrapper: calls fetch_road_risk and returns features['road_risk_score'] (float).
Pass api_key/roadrisk_url via fetch_kwargs as needed.
"""
_, features = fetch_road_risk(lat, lon, **fetch_kwargs)
return float(features.get("road_risk_score", 0.0))
def compute_reroute(start_lat: float,
start_lon: float,
risk_provider: Callable[[float, float], float] = None,
lat_range: float = 0.005,
lon_range: float = 0.01,
n_lat: int = 7,
n_lon: int = 7,
max_calls: Optional[int] = None,
distance_weight: float = 0.1) -> Dict[str, Any]:
"""
Sample a grid around (start_lat, start_lon), get risk at each grid node via risk_provider,
find the node with minimum risk, and run Dijkstra on the grid (4-neighbors) where edge cost =
average node risk + distance_weight * distance_km. Returns path and stats.
Defaults: n_lat/n_lon small to limit API calls. max_calls optionally caps number of risk_provider calls.
"""
if risk_provider is None:
# default risk provider that calls fetch_road_risk (may require API key in env or fetch_kwargs)
def _rp(lat, lon): return get_risk_score(lat, lon)
risk_provider = _rp
# build grid coordinates
lat_steps = n_lat
lon_steps = n_lon
if lat_steps < 2 or lon_steps < 2:
raise ValueError("n_lat and n_lon must be >= 2")
lat0 = start_lat - lat_range
lon0 = start_lon - lon_range
lat_step = (2 * lat_range) / (lat_steps - 1)
lon_step = (2 * lon_range) / (lon_steps - 1)
coords: List[Tuple[float, float]] = []
for i in range(lat_steps):
for j in range(lon_steps):
coords.append((lat0 + i * lat_step, lon0 + j * lon_step))
# sample risks with caching and optional call limit
risks: List[float] = []
calls = 0
for (lat, lon) in coords:
if max_calls is not None and calls >= max_calls:
# conservative fallback: assume same as start risk if call limit reached
risks.append(float('inf'))
continue
try:
r = float(risk_provider(lat, lon))
except Exception:
r = float('inf')
risks.append(r)
calls += 1
# convert to grid indexed by (i,j)
def idx(i, j): return i * lon_steps + j
# find start index (closest grid node to start)
start_i = round((start_lat - lat0) / lat_step)
start_j = round((start_lon - lon0) / lon_step)
start_i = max(0, min(lat_steps - 1, start_i))
start_j = max(0, min(lon_steps - 1, start_j))
start_index = idx(start_i, start_j)
# find target node = min risk node (ignore inf)
min_risk = min(risks)
if math.isinf(min_risk) or min_risk >= risks[start_index]:
# no better location found or sampling failed
return {
"reroute_needed": False,
"reason": "no_lower_risk_found",
"start_coord": (start_lat, start_lon),
"start_risk": None if math.isinf(risks[start_index]) else risks[start_index],
}
target_index = int(risks.index(min_risk))
# Dijkstra from start_index to target_index
N = len(coords)
dist = [math.inf] * N
prev = [None] * N
dist[start_index] = 0.0
pq = [(0.0, start_index)]
while pq:
d, u = heapq.heappop(pq)
if d > dist[u]:
continue
if u == target_index:
break
ui = u // lon_steps
uj = u % lon_steps
for di, dj in ((1,0),(-1,0),(0,1),(0,-1)):
vi, vj = ui + di, uj + dj
if 0 <= vi < lat_steps and 0 <= vj < lon_steps:
v = idx(vi, vj)
# cost: average node risk + small distance penalty
ru = risks[u]
rv = risks[v]
if math.isinf(ru) or math.isinf(rv):
continue
lat_u, lon_u = coords[u]
lat_v, lon_v = coords[v]
d_km = _haversine_km(lat_u, lon_u, lat_v, lon_v)
w = (ru + rv) / 2.0 + distance_weight * d_km
nd = d + w
if nd < dist[v]:
dist[v] = nd
prev[v] = u
heapq.heappush(pq, (nd, v))
if math.isinf(dist[target_index]):
return {
"reroute_needed": False,
"reason": "no_path_found",
"start_coord": (start_lat, start_lon),
"start_risk": risks[start_index],
"target_risk": risks[target_index],
}
# reconstruct path
path_indices = []
cur = target_index
while cur is not None:
path_indices.append(cur)
cur = prev[cur]
path_indices.reverse()
path_coords = [coords[k] for k in path_indices]
start_risk = risks[start_index]
end_risk = risks[target_index]
improvement = (start_risk - end_risk) if start_risk not in (None, float('inf')) else None
return {
"reroute_needed": True,
"start_coord": (start_lat, start_lon),
"start_risk": start_risk,
"target_coord": coords[target_index],
"target_risk": end_risk,
"path": path_coords,
"path_cost": dist[target_index],
"risk_improvement": improvement,
"grid_shape": (lat_steps, lon_steps),
"calls_made": calls,
}
def compute_index_and_reroute(lat: float,
lon: float,
api_key: Optional[str] = None,
roadrisk_url: Optional[str] = None,
max_risk: float = 10.0,
num_bins: int = 10,
reroute_kwargs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
High-level convenience: get road risk, map to index (1..num_bins), and attempt reroute.
reroute_kwargs are forwarded to compute_reroute (risk_provider will call fetch_road_risk
using provided api_key/roadrisk_url).
"""
if reroute_kwargs is None:
reroute_kwargs = {}
# obtain base risk
data, features = fetch_road_risk(lat, lon, api_key=api_key, roadrisk_url=roadrisk_url)
road_risk = float(features.get("road_risk_score", 0.0))
# compute index: if 'accidents' present in features, prefer that mapping
accidents = features.get("accidents") or features.get("accident_count")
try:
if accidents is not None:
# map raw accident count to index 1..num_bins
from .models import accidents_to_bucket
idx = accidents_to_bucket(int(accidents), max_count=20000, num_bins=num_bins)
else:
idx = risk_to_index(road_risk, max_risk=max_risk, num_bins=num_bins)
except Exception:
idx = risk_to_index(road_risk, max_risk=max_risk, num_bins=num_bins)
# prepare risk_provider that passes api_key/roadrisk_url through
def _rp(lat_, lon_):
return get_risk_score(lat_, lon_, api_key=api_key, roadrisk_url=roadrisk_url)
reroute_info = compute_reroute(lat, lon, risk_provider=_rp, **reroute_kwargs)
return {
"lat": lat,
"lon": lon,
"index": int(idx),
"road_risk_score": road_risk,
"features": features,
"reroute": reroute_info,
"raw_roadrisk_response": data,
}