Added LLM and MongoDB Agent Scripts

This commit is contained in:
Shiva Pochampally
2025-09-28 00:03:37 -04:00
parent f1073ef3df
commit 016b95a564
2 changed files with 898 additions and 0 deletions

351
llm/gemini_mongo.py Normal file
View File

@@ -0,0 +1,351 @@
#MONGO_URI=mongodb+srv://Admin:HelloKitty420@geobase.tyxsoir.mongodb.net/crashes
import os
import requests
import time
from datetime import datetime
from pymongo import MongoClient
from langchain_google_genai import ChatGoogleGenerativeAI
from math import radians, sin, cos, sqrt, atan2
# Configuration
GEMINI_API_KEY = "AIzaSyBCbEOo4aK72507hqvpYkE9zXUe-z5aSXA"
# OPENWEATHER_API_KEY = "19c6d988b89b040b603f4b3b1b1b304f"
OPENWEATHER_API_KEY = "8754b3f387fc0f1d96a81f73e303e181"
MONGO_URI = "mongodb+srv://Admin:HelloKitty420@geobase.tyxsoir.mongodb.net/crashes"
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash-lite", api_key=GEMINI_API_KEY)
def connect_to_mongodb():
"""
Connect to MongoDB database and return the collection.
"""
try:
print("Connecting to MongoDB...")
client = MongoClient(MONGO_URI)
# Test the connection
client.admin.command('ping')
print("✅ Successfully connected to MongoDB!")
db = client.crashes # Database name
collection = db.crashes # Collection name - corrected to 'crashes'
# Get collection stats
total_count = collection.estimated_document_count()
print(f"📊 Found {total_count:,} total crash records in database")
# Check specifically for 2020+ data
filter_2020_plus = {"reportDate": {"$gte": datetime(2020, 1, 1)}}
count_2020_plus = collection.count_documents(filter_2020_plus)
print(f"📅 Found {count_2020_plus:,} crash records from 2020 onward")
return collection
except Exception as e:
print(f"❌ Failed to connect to MongoDB: {e}")
return None
def get_crashes_within_radius_mongodb(collection, center_lat, center_lon, radius_km):
"""
Query MongoDB for crashes within specified radius using geospatial query.
Filters for crashes from 2020 onward only.
Args:
collection: MongoDB collection object
center_lat: Latitude of center point
center_lon: Longitude of center point
radius_km: Radius in kilometers
Returns:
List of crash documents within radius from 2020 onward
"""
try:
print(f"🔍 Querying crashes within {radius_km}km of ({center_lat:.6f}, {center_lon:.6f}) from 2020 onward...")
# MongoDB geospatial query using $geoWithin and $centerSphere
# $centerSphere uses radians, so convert km to radians (divide by Earth's radius in km)
radius_radians = radius_km / 6371 # Earth's radius in km
# Combined query: geospatial AND date filter for 2020+
query = {
"location": {
"$geoWithin": {
"$centerSphere": [[center_lon, center_lat], radius_radians]
}
},
"reportDate": {
"$gte": datetime(2020, 1, 1) # Only crashes from 2020 onward
}
}
# Execute the query
cursor = collection.find(query)
crashes = list(cursor)
print(f"📍 Found {len(crashes)} crashes within {radius_km}km radius (from 2020 onward)")
# Add distance calculation to each crash for sorting
for crash in crashes:
if crash.get('location', {}).get('coordinates'):
crash_lon, crash_lat = crash['location']['coordinates']
distance = haversine_distance(center_lat, center_lon, crash_lat, crash_lon)
crash['distance_km'] = distance
# Sort by distance
crashes.sort(key=lambda x: x.get('distance_km', float('inf')))
return crashes
except Exception as e:
print(f"❌ Error querying MongoDB: {e}")
return []
def haversine_distance(lat1, lon1, lat2, lon2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
Returns distance in kilometers
"""
# Convert decimal degrees to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
# Radius of earth in kilometers
r = 6371
return c * r
def get_current_weather(lat, lon, api_key):
"""
Get basic weather data from OpenWeatherMap API.
"""
try:
url = "https://api.openweathermap.org/data/2.5/weather"
response = requests.get(
url,
params={
"lat": lat,
"lon": lon,
"appid": api_key,
"units": "metric"
},
timeout=10
)
response.raise_for_status()
data = response.json()
# Create summary from weather API
main = data.get("main", {})
weather = data.get("weather", [{}])[0]
wind = data.get("wind", {})
summary_parts = []
if main.get("temp") is not None:
summary_parts.append(f"Temperature: {main['temp']:.1f}°C")
if weather.get("description"):
summary_parts.append(f"Conditions: {weather['description']}")
if wind.get("speed") is not None:
summary_parts.append(f"Wind: {wind['speed']:.1f} m/s")
if main.get("humidity") is not None:
summary_parts.append(f"Humidity: {main['humidity']}%")
summary = " | ".join(summary_parts) if summary_parts else "Weather data available"
return data, summary
except Exception as e:
return None, f"Weather API failed: {str(e)}"
def analyze_mongodb_crash_patterns(crashes, center_lat, center_lon, radius_km, weather_summary=None):
"""
Analyze crash patterns from MongoDB data and generate safety assessment.
"""
if not crashes:
return "No crash data available for the specified location and radius."
total_crashes = len(crashes)
avg_distance = sum(crash.get('distance_km', 0) for crash in crashes) / total_crashes if crashes else 0
# Analyze crash patterns from MongoDB structure
crash_analysis = {
'severity_counts': {},
'total_fatalities': 0,
'total_major_injuries': 0,
'total_minor_injuries': 0,
'speeding_involved': 0,
'impaired_involved': 0,
'pedestrian_crashes': 0,
'bicyclist_crashes': 0,
'vehicle_counts': {}
}
# Analyze each crash
for crash in crashes:
# Severity analysis
severity = crash.get('severity', 'Unknown')
crash_analysis['severity_counts'][severity] = crash_analysis['severity_counts'].get(severity, 0) + 1
# Casualty analysis
casualties = crash.get('casualties', {})
# Count fatalities and injuries across all categories
for category in ['bicyclists', 'drivers', 'pedestrians', 'passengers']:
if category in casualties:
crash_analysis['total_fatalities'] += casualties[category].get('fatal', 0)
crash_analysis['total_major_injuries'] += casualties[category].get('major_injuries', 0)
crash_analysis['total_minor_injuries'] += casualties[category].get('minor_injuries', 0)
# Count vulnerable road user involvement
if casualties.get('pedestrians', {}).get('total', 0) > 0:
crash_analysis['pedestrian_crashes'] += 1
if casualties.get('bicyclists', {}).get('total', 0) > 0:
crash_analysis['bicyclist_crashes'] += 1
# Circumstances analysis
circumstances = crash.get('circumstances', {})
if circumstances.get('speeding_involved', False):
crash_analysis['speeding_involved'] += 1
# Check for impairment
if (circumstances.get('pedestrians_impaired', False) or
circumstances.get('bicyclists_impaired', False) or
circumstances.get('drivers_impaired', False)):
crash_analysis['impaired_involved'] += 1
# Vehicle analysis
vehicles = crash.get('vehicles', {})
total_vehicles = vehicles.get('total', 0)
crash_analysis['vehicle_counts'][str(total_vehicles)] = crash_analysis['vehicle_counts'].get(str(total_vehicles), 0) + 1
# Create comprehensive summary for LLM
crash_summary = f"""
SEVERITY BREAKDOWN: {dict(crash_analysis['severity_counts'])}
CASUALTIES:
- Fatal injuries: {crash_analysis['total_fatalities']}
- Major injuries: {crash_analysis['total_major_injuries']}
- Minor injuries: {crash_analysis['total_minor_injuries']}
VULNERABLE ROAD USERS:
- Crashes involving pedestrians: {crash_analysis['pedestrian_crashes']}
- Crashes involving bicyclists: {crash_analysis['bicyclist_crashes']}
RISK FACTORS:
- Crashes involving speeding: {crash_analysis['speeding_involved']}
- Crashes with impairment: {crash_analysis['impaired_involved']}
VEHICLE INVOLVEMENT: {dict(crash_analysis['vehicle_counts'])}"""
# Add current weather information if available
weather_info = ""
if weather_summary:
weather_info = f"""
CURRENT WEATHER CONDITIONS:
{weather_summary}"""
# Create prompt for LLM
prompt = f"""You are a traffic safety expert analyzing recent crash data (2020 onward) and current conditions for location ({center_lat:.6f}, {center_lon:.6f}) within a {radius_km}km radius.
CRASH STATISTICS (2020-Present):
- Total crashes in area: {total_crashes}
- Average distance from center: {avg_distance:.2f} km
- Search area: {radius_km}km radius (approximately {3.14159 * radius_km**2:.1f} km²)
DETAILED CRASH ANALYSIS:{crash_summary}{weather_info}
Based on this comprehensive recent MongoDB crash data (2020 onward), provide:
1. A danger level assessment (Low, Moderate, High, Very High)
2. Key safety concerns based on recent crash patterns AND current weather conditions
3. Specific recommendations for someone traveling to this location RIGHT NOW
4. Notable patterns in recent crash data (severity, vulnerable users, risk factors)
5. How current weather conditions may affect driving safety
Focus on practical, actionable safety advice based on recent trends. Be specific about identified risks and provide clear recommendations."""
try:
response = llm.invoke(prompt)
return response.content
except Exception as e:
return f"Error analyzing crash data with LLM: {e}"
def main():
"""
Main function to analyze crash danger using MongoDB geospatial queries.
"""
print("🚗 MongoDB Traffic Crash Danger Analysis Tool (2020+ Data)")
print("=" * 65)
# Connect to MongoDB
collection = connect_to_mongodb()
if collection is None:
print("❌ Could not connect to MongoDB. Exiting...")
return
# Get user input for location and radius
try:
center_lat = float(input("Enter latitude: "))
center_lon = float(input("Enter longitude: "))
radius_km = float(input("Enter search radius in kilometers (default: 1.0): ") or "1.0")
print(f"\n🔍 Analyzing recent crashes (2020+) within {radius_km}km of ({center_lat:.6f}, {center_lon:.6f})...")
# Query MongoDB for nearby crashes using geospatial indexing
nearby_crashes = get_crashes_within_radius_mongodb(collection, center_lat, center_lon, radius_km)
if len(nearby_crashes) > 0:
print(f"🔴 Closest crash: {nearby_crashes[0]['distance_km']:.3f}km away")
print(f"🔴 Furthest crash: {nearby_crashes[-1]['distance_km']:.3f}km away")
# Display sample crash details from MongoDB structure
print("📊 Sample crash details from MongoDB:")
sample = nearby_crashes[0]
print(f" - ID: {sample.get('crashId', 'N/A')}")
print(f" - Severity: {sample.get('severity', 'N/A')}")
print(f" - Address: {sample.get('address', 'N/A')}")
print(f" - Ward: {sample.get('ward', 'N/A')}")
casualties = sample.get('casualties', {})
total_casualties = 0
for cat in ['bicyclists', 'drivers', 'pedestrians', 'passengers']:
cat_data = casualties.get(cat, {})
total_casualties += (cat_data.get('fatal', 0) +
cat_data.get('major_injuries', 0) +
cat_data.get('minor_injuries', 0))
print(f" - Total casualties: {total_casualties}")
else:
print(" No crashes found within the specified radius.")
# Get current weather conditions
print("\n🌤️ Fetching current weather conditions...")
weather_data, weather_summary = get_current_weather(center_lat, center_lon, OPENWEATHER_API_KEY)
if weather_data is None:
print(f"⚠️ Weather data unavailable: {weather_summary}")
weather_summary = None
else:
print(f"🌤️ Current conditions: {weather_summary}")
# Generate comprehensive safety analysis using LLM
print("\n🤖 Generating comprehensive safety assessment...")
analysis = analyze_mongodb_crash_patterns(nearby_crashes, center_lat, center_lon, radius_km, weather_summary)
print("\n" + "="*65)
print("🚨 RECENT CRASH SAFETY ASSESSMENT REPORT (2020-Present)")
print("="*65)
print(analysis)
except ValueError:
print("❌ Please enter valid numerical values for coordinates and radius.")
except KeyboardInterrupt:
print("\n⚠️ Analysis cancelled by user.")
except Exception as e:
print(f"❌ An error occurred: {e}")
if __name__ == "__main__":
main()

547
llm/gemini_reroute.py Normal file
View File

@@ -0,0 +1,547 @@
import os
import requests
import json
from datetime import datetime
from pymongo import MongoClient
from langchain_google_genai import ChatGoogleGenerativeAI
from math import radians, sin, cos, sqrt, atan2, degrees, atan2
from typing import List, Tuple, Dict, Optional
# Configuration
GEMINI_API_KEY = "AIzaSyBCbEOo4aK72507hqvpYkE9zXUe-z5aSXA"
OPENWEATHER_API_KEY = "8754b3f387fc0f1d96a81f73e303e181"
MONGO_URI = "mongodb+srv://Admin:HelloKitty420@geobase.tyxsoir.mongodb.net/crashes"
MAPBOX_API_KEY = "pk.eyJ1IjoicGllbG9yZDc1NyIsImEiOiJjbWcxdTd6c3AwMXU1MmtxMDh6b2l5amVrIn0.5Es0azrah23GX1e9tmbjGw"
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash-lite", api_key=GEMINI_API_KEY)
class SafeRouteAnalyzer:
def __init__(self, mongo_uri: str):
"""Initialize the safe route analyzer with MongoDB connection."""
try:
self.client = MongoClient(mongo_uri)
self.client.admin.command('ping')
self.db = self.client.crashes
self.collection = self.db.crashes
print("✅ Connected to MongoDB for route safety analysis")
except Exception as e:
print(f"❌ Failed to connect to MongoDB: {e}")
self.collection = None
def haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points in kilometers."""
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return 6371 * c # Earth's radius in km
def get_route_from_mapbox(self, start_lat: float, start_lon: float,
end_lat: float, end_lon: float, profile: str = "driving") -> Dict:
"""
Get route from Mapbox Directions API.
Args:
start_lat, start_lon: Starting coordinates
end_lat, end_lon: Destination coordinates
profile: 'driving', 'walking', or 'cycling'
Returns:
Route data with coordinates, distance, duration
"""
try:
url = f"https://api.mapbox.com/directions/v5/mapbox/{profile}/{start_lon},{start_lat};{end_lon},{end_lat}"
params = {
'access_token': MAPBOX_API_KEY,
'overview': 'full',
'geometries': 'geojson',
'steps': 'true'
}
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
if data.get('code') == 'Ok' and data.get('routes'):
route = data['routes'][0]
geometry = route['geometry']
# Extract coordinates from GeoJSON format
coordinates = [[coord[1], coord[0]] for coord in geometry['coordinates']] # Convert [lon,lat] to [lat,lon]
return {
'success': True,
'coordinates': coordinates, # List of [lat, lon] pairs
'distance_km': route['distance'] / 1000,
'duration_min': route['duration'] / 60,
'geometry': geometry
}
else:
error_msg = data.get('message', 'No route found')
return {'success': False, 'error': error_msg}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_alternative_routes_mapbox(self, start_lat: float, start_lon: float,
end_lat: float, end_lon: float, num_alternatives: int = 3) -> List[Dict]:
"""
Get multiple alternative routes using Mapbox Directions API.
"""
try:
url = f"https://api.mapbox.com/directions/v5/mapbox/driving/{start_lon},{start_lat};{end_lon},{end_lat}"
params = {
'access_token': MAPBOX_API_KEY,
'alternatives': 'true', # Request alternatives
'overview': 'full',
'geometries': 'geojson',
'steps': 'false'
}
response = requests.get(url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
routes = []
if data.get('code') == 'Ok' and data.get('routes'):
for i, route in enumerate(data['routes'][:num_alternatives]):
geometry = route['geometry']
coordinates = [[coord[1], coord[0]] for coord in geometry['coordinates']] # Convert [lon,lat] to [lat,lon]
routes.append({
'route_id': i,
'coordinates': coordinates,
'distance_km': route['distance'] / 1000,
'duration_min': route['duration'] / 60,
'geometry': geometry
})
return routes
except Exception as e:
print(f"Error getting alternative routes: {e}")
return []
def analyze_route_safety(self, route_coordinates: List[Tuple[float, float]],
buffer_km: float = 0.2) -> Dict:
"""
Analyze safety along a route by checking for crashes near route points.
Args:
route_coordinates: List of (lat, lon) tuples along the route
buffer_km: How far to look for crashes around each route point
Returns:
Safety analysis data
"""
if self.collection is None:
return {'error': 'No database connection'}
try:
all_nearby_crashes = []
safety_scores = []
# Sample every Nth point to avoid too many queries (adjust based on route length)
sample_interval = max(1, len(route_coordinates) // 20) # Max 20 sample points
sample_points = route_coordinates[::sample_interval]
print(f"🔍 Analyzing safety at {len(sample_points)} points along route...")
for i, (lat, lon) in enumerate(sample_points):
# Query crashes within buffer distance of this route point
radius_radians = buffer_km / 6371
query = {
"location": {
"$geoWithin": {
"$centerSphere": [[lon, lat], radius_radians]
}
},
"reportDate": {
"$gte": datetime(2020, 1, 1)
}
}
crashes_near_point = list(self.collection.find(query))
# Calculate safety score for this point (lower = safer)
point_safety_score = self.calculate_point_safety_score(crashes_near_point)
safety_scores.append({
'point_index': i * sample_interval,
'coordinates': [lat, lon],
'crashes_count': len(crashes_near_point),
'safety_score': point_safety_score
})
all_nearby_crashes.extend(crashes_near_point)
# Remove duplicate crashes
unique_crashes = {}
for crash in all_nearby_crashes:
crash_id = crash.get('crashId', str(crash.get('_id')))
if crash_id not in unique_crashes:
unique_crashes[crash_id] = crash
unique_crashes_list = list(unique_crashes.values())
# Calculate overall route safety metrics
total_crashes = len(unique_crashes_list)
avg_safety_score = sum(point['safety_score'] for point in safety_scores) / len(safety_scores) if safety_scores else 0
max_danger_score = max((point['safety_score'] for point in safety_scores), default=0)
return {
'total_crashes_near_route': total_crashes,
'average_safety_score': avg_safety_score,
'max_danger_score': max_danger_score,
'safety_points': safety_scores,
'crashes_data': unique_crashes_list,
'route_length_points': len(route_coordinates)
}
except Exception as e:
return {'error': str(e)}
def calculate_point_safety_score(self, crashes: List[Dict]) -> float:
"""
Calculate a safety score for a point based on nearby crashes.
Higher score = more dangerous
"""
if not crashes:
return 0.0
score = 0.0
for crash in crashes:
# Base score for any crash
base_score = 1.0
# Weight by severity
severity = crash.get('severity', '').lower()
if 'fatal' in severity or 'major' in severity:
base_score *= 3.0
elif 'minor' in severity:
base_score *= 1.5
# Weight by casualty count
casualties = crash.get('casualties', {})
total_casualties = 0
for category in ['bicyclists', 'drivers', 'pedestrians', 'passengers']:
if category in casualties:
cat_data = casualties[category]
total_casualties += (cat_data.get('fatal', 0) * 5 +
cat_data.get('major_injuries', 0) * 2 +
cat_data.get('minor_injuries', 0) * 1)
base_score += total_casualties * 0.5
# Weight by circumstances
circumstances = crash.get('circumstances', {})
if circumstances.get('speeding_involved', False):
base_score *= 1.3
if any([circumstances.get('pedestrians_impaired', False),
circumstances.get('bicyclists_impaired', False),
circumstances.get('drivers_impaired', False)]):
base_score *= 1.4
score += base_score
return score
def generate_safety_report_with_llm(self, route_safety_data: Dict,
route_info: Dict, weather_summary: str = None) -> str:
"""
Use LLM to generate comprehensive safety report and route recommendations.
"""
if 'error' in route_safety_data:
return f"Error analyzing route safety: {route_safety_data['error']}"
crashes = route_safety_data.get('crashes_data', [])
safety_points = route_safety_data.get('safety_points', [])
# Find most dangerous sections
dangerous_points = sorted(safety_points, key=lambda x: x['safety_score'], reverse=True)[:3]
# Analyze crash patterns
severity_counts = {}
casualty_summary = {'fatal': 0, 'major': 0, 'minor': 0}
risk_factors = {'speeding': 0, 'impairment': 0, 'pedestrian': 0, 'bicyclist': 0}
for crash in crashes:
severity = crash.get('severity', 'Unknown')
severity_counts[severity] = severity_counts.get(severity, 0) + 1
# Count casualties
casualties = crash.get('casualties', {})
for category in ['bicyclists', 'drivers', 'pedestrians', 'passengers']:
if category in casualties:
cat_data = casualties[category]
casualty_summary['fatal'] += cat_data.get('fatal', 0)
casualty_summary['major'] += cat_data.get('major_injuries', 0)
casualty_summary['minor'] += cat_data.get('minor_injuries', 0)
# Count risk factors
circumstances = crash.get('circumstances', {})
if circumstances.get('speeding_involved', False):
risk_factors['speeding'] += 1
if any([circumstances.get(f'{cat}_impaired', False) for cat in ['pedestrians', 'bicyclists', 'drivers']]):
risk_factors['impairment'] += 1
if casualties.get('pedestrians', {}).get('total', 0) > 0:
risk_factors['pedestrian'] += 1
if casualties.get('bicyclists', {}).get('total', 0) > 0:
risk_factors['bicyclist'] += 1
weather_info = f"\n\nCURRENT WEATHER CONDITIONS:\n{weather_summary}" if weather_summary else ""
prompt = f"""You are an expert traffic safety analyst and route planning specialist. Analyze this route's safety profile and provide recommendations.
ROUTE INFORMATION:
- Distance: {route_info.get('distance_km', 0):.1f} km
- Estimated duration: {route_info.get('duration_min', 0):.0f} minutes
- Analysis points along route: {len(safety_points)}
SAFETY ANALYSIS (2020+ crash data):
- Total crashes near route: {route_safety_data.get('total_crashes_near_route', 0)}
- Average safety score: {route_safety_data.get('average_safety_score', 0):.2f}
- Maximum danger score: {route_safety_data.get('max_danger_score', 0):.2f}
CRASH BREAKDOWN:
- Severity distribution: {severity_counts}
- Casualties: {casualty_summary['fatal']} fatal, {casualty_summary['major']} major injuries, {casualty_summary['minor']} minor injuries
- Risk factors: {risk_factors['speeding']} speeding-related, {risk_factors['impairment']} impairment-related
- Vulnerable users: {risk_factors['pedestrian']} pedestrian crashes, {risk_factors['bicyclist']} bicyclist crashes
MOST DANGEROUS SECTIONS:
{chr(10).join([f"Point {p['point_index']}: {p['crashes_count']} crashes nearby, safety score {p['safety_score']:.1f}" for p in dangerous_points[:3]])}
{weather_info}
Please provide:
1. Overall route safety assessment (SAFE/MODERATE RISK/HIGH RISK/DANGEROUS)
2. Specific dangerous sections to watch out for
3. Driving recommendations for this route considering current conditions
4. Whether an alternative route should be recommended
5. Time-of-day considerations if applicable
6. Weather-specific precautions based on crash patterns
Be specific and actionable in your recommendations."""
try:
response = llm.invoke(prompt)
return response.content
except Exception as e:
return f"Error generating safety analysis: {e}"
def find_safer_route(self, start_lat: float, start_lon: float,
end_lat: float, end_lon: float) -> Dict:
"""
Find the safest route among alternatives by analyzing crash data.
"""
print("🗺️ Getting alternative routes...")
# Get multiple route options
alternative_routes = self.get_alternative_routes_mapbox(start_lat, start_lon, end_lat, end_lon)
if not alternative_routes:
print("❌ No routes found")
return {'error': 'No routes available'}
print(f"📍 Analyzing {len(alternative_routes)} route options for safety...")
# Analyze safety for each route
route_analyses = []
for i, route in enumerate(alternative_routes):
print(f"🔍 Analyzing route {i+1}/{len(alternative_routes)}...")
safety_analysis = self.analyze_route_safety(route['coordinates'])
if 'error' not in safety_analysis:
route_analyses.append({
'route_id': i,
'route_data': route,
'safety_analysis': safety_analysis,
'safety_score': safety_analysis.get('average_safety_score', float('inf'))
})
if not route_analyses:
return {'error': 'Could not analyze any routes for safety'}
# Sort routes by safety (lower score = safer)
route_analyses.sort(key=lambda x: x['safety_score'])
# Get weather for additional context
weather_data, weather_summary = self.get_current_weather(start_lat, start_lon)
# Generate safety reports for top routes
results = {
'recommended_route': route_analyses[0],
'alternative_routes': route_analyses[1:],
'weather_summary': weather_summary
}
# Generate LLM analysis for the safest route
safest_route = route_analyses[0]
safety_report = self.generate_safety_report_with_llm(
safest_route['safety_analysis'],
safest_route['route_data'],
weather_summary
)
results['safety_report'] = safety_report
results['route_comparison'] = self.compare_routes_with_llm(route_analyses, weather_summary)
return results
def compare_routes_with_llm(self, route_analyses: List[Dict], weather_summary: str = None) -> str:
"""
Use LLM to compare multiple routes and explain why one is safer.
"""
if len(route_analyses) < 2:
return "Only one route available for analysis."
comparison_data = []
for i, analysis in enumerate(route_analyses):
route_data = analysis['route_data']
safety_data = analysis['safety_analysis']
comparison_data.append({
'route_num': i + 1,
'distance_km': route_data.get('distance_km', 0),
'duration_min': route_data.get('duration_min', 0),
'crashes_near_route': safety_data.get('total_crashes_near_route', 0),
'safety_score': safety_data.get('average_safety_score', 0),
'max_danger_score': safety_data.get('max_danger_score', 0)
})
weather_info = f"\nCurrent weather: {weather_summary}" if weather_summary else ""
prompt = f"""Compare these route options for safety and provide a recommendation:
ROUTE OPTIONS:
{chr(10).join([f"Route {r['route_num']}: {r['distance_km']:.1f}km, {r['duration_min']:.0f}min, {r['crashes_near_route']} nearby crashes, safety score {r['safety_score']:.2f}" for r in comparison_data])}{weather_info}
Provide:
1. Which route is safest and why
2. Trade-offs between routes (safety vs. time/distance)
3. Clear recommendation with reasoning
4. Any weather-related considerations
Keep it concise and actionable."""
try:
response = llm.invoke(prompt)
return response.content
except Exception as e:
return f"Error comparing routes: {e}"
def get_current_weather(self, lat: float, lon: float) -> Tuple[Optional[Dict], str]:
"""Get current weather conditions."""
try:
url = "https://api.openweathermap.org/data/2.5/weather"
response = requests.get(
url,
params={"lat": lat, "lon": lon, "appid": OPENWEATHER_API_KEY, "units": "metric"},
timeout=10
)
response.raise_for_status()
data = response.json()
main = data.get("main", {})
weather = data.get("weather", [{}])[0]
wind = data.get("wind", {})
summary = f"{main.get('temp', 'N/A')}°C, {weather.get('description', 'N/A')}, wind {wind.get('speed', 'N/A')} m/s"
return data, summary
except Exception as e:
return None, f"Weather unavailable: {e}"
def main():
"""
Demo function showing how to use the SafeRouteAnalyzer.
"""
print("🛣️ Safe Route Planning System")
print("=" * 50)
analyzer = SafeRouteAnalyzer(MONGO_URI)
if analyzer.collection is None:
print("❌ Cannot proceed without database connection")
return
# Get input
try:
print("\n📍 Enter route details:")
start_lat = float(input("Starting latitude: "))
start_lon = float(input("Starting longitude: "))
end_lat = float(input("Destination latitude: "))
end_lon = float(input("Destination longitude: "))
print(f"\n🚗 Planning safe route from ({start_lat:.4f}, {start_lon:.4f}) to ({end_lat:.4f}, {end_lon:.4f})")
# Find the safest route
results = analyzer.find_safer_route(start_lat, start_lon, end_lat, end_lon)
if 'error' in results:
print(f"❌ Error: {results['error']}")
return
# Display results
recommended = results['recommended_route']
route_data = recommended['route_data']
safety_data = recommended['safety_analysis']
print("\n" + "="*50)
print("🏆 RECOMMENDED SAFE ROUTE")
print("="*50)
print(f"📏 Distance: {route_data['distance_km']:.1f} km")
print(f"⏱️ Duration: {route_data['duration_min']:.0f} minutes")
print(f"🚨 Crashes nearby: {safety_data['total_crashes_near_route']}")
print(f"📊 Safety score: {safety_data['average_safety_score']:.2f} (lower is safer)")
print(f"\n🌤️ Weather: {results.get('weather_summary', 'N/A')}")
print("\n📋 SAFETY ANALYSIS:")
print("-" * 30)
print(results['safety_report'])
if len(results['alternative_routes']) > 0:
print("\n🔄 ROUTE COMPARISON:")
print("-" * 30)
print(results['route_comparison'])
# Output for Mapbox visualization
coordinates = recommended['route_data']['coordinates']
print(f"\n🗺️ Route coordinates for Mapbox ({len(coordinates)} points):")
print("First 5 points:", coordinates[:5])
print("Last 5 points:", coordinates[-5:])
# You can save these coordinates to pass to your Mapbox visualization
route_data_to_save = {
'recommended_route': coordinates,
'route_info': route_data,
'safety_summary': {
'total_crashes': safety_data['total_crashes_near_route'],
'average_safety_score': safety_data['average_safety_score'],
'max_danger_score': safety_data['max_danger_score']
}
}
with open('safe_route_coordinates.json', 'w') as f:
json.dump(route_data_to_save, f, indent=2)
print("📁 Route data saved to 'safe_route_coordinates.json'")
except ValueError:
print("❌ Please enter valid numerical coordinates")
except KeyboardInterrupt:
print("\n⚠️ Route planning cancelled")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()