Added Weather API

This commit is contained in:
Pranav Malladi
2025-09-27 18:13:53 -04:00
parent 2471610d80
commit 629444c382
22 changed files with 629 additions and 308 deletions

29
roadcast/.gitignore vendored Normal file
View File

@@ -0,0 +1,29 @@
# Generated model checkpoints and metadata
model.pth
*.pt
*.pth
preprocess_meta.npz
tmp_infer.csv
label_info.json
label_assignments.npz
# Saved label stores (kmeans etc.)
*.npz
kmeans_centers*.npz
# Ignore training output directories
output/
checkpoints/
# Python caches and virtualenvs
__pycache__/
*.py[cod]
.env/
.venv/
# Test and notebook caches
.pytest_cache/
.ipynb_checkpoints/
# OS files
.DS_Store
Thumbs.db

0
roadcast/KEEP_FILES.txt Normal file
View File

View File

@@ -80,14 +80,26 @@ def predict_roadrisk():
dt = payload.get('datetime')
street = payload.get('street', '')
roadrisk_url = payload.get('roadrisk_url')
api_key = payload.get('api_key') or os.environ.get('OPENWEATHER_KEY')
# prefer explicit api_key in request, otherwise read from OPENWEATHER_API_KEY env var
api_key = payload.get('api_key') or os.environ.get('OPENWEATHER_API_KEY')
if lat is None or lon is None:
return jsonify({"error": "lat and lon are required fields"}), 400
try:
from openweather_inference import predict_from_openweather
res = predict_from_openweather(lat, lon, dt_iso=dt, street=street, api_key=api_key, train_csv=os.path.join(os.getcwd(), 'data.csv'), preprocess_meta=None, model_path=os.path.join(os.getcwd(), 'model.pth'), centers_path=os.path.join(os.getcwd(), 'kmeans_centers_all.npz'), roadrisk_url=roadrisk_url)
# pass api_key (may be None) to the inference helper; helper will raise if a key is required
res = predict_from_openweather(
lat, lon,
dt_iso=dt,
street=street,
api_key=api_key,
train_csv=os.path.join(os.getcwd(), 'data.csv'),
preprocess_meta=None,
model_path=os.path.join(os.getcwd(), 'model.pth'),
centers_path=os.path.join(os.getcwd(), 'kmeans_centers_all.npz'),
roadrisk_url=roadrisk_url
)
return jsonify(res)
except Exception as e:
return jsonify({"error": str(e)}), 500

View File

@@ -1,32 +0,0 @@
import sys
import importlib
def safe_import(name):
try:
return importlib.import_module(name)
except Exception as e:
return e
print('Python:', sys.version.replace('\n',' '))
torch = safe_import('torch')
if isinstance(torch, Exception):
print('torch import error:', torch)
else:
print('torch:', torch.__version__)
print('CUDA available:', torch.cuda.is_available())
if torch.cuda.is_available():
print('CUDA device count:', torch.cuda.device_count())
print('Current device name:', torch.cuda.get_device_name(0))
pandas = safe_import('pandas')
if isinstance(pandas, Exception):
print('pandas import error:', pandas)
else:
print('pandas:', pandas.__version__)
try:
import sklearn
print('sklearn:', sklearn.__version__)
except Exception:
pass

View File

@@ -1,4 +1,26 @@
train the model:
python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 512,256 --epochs 8 --batch-size 256 --feature-engineer --weight-decay 1e-5 --seed 42
python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 1024,512 --epochs 12 --batch-size 256 --lr 1e-3 --lr-step-size 4 --lr-gamma 0.5 --feature-engineer --weight-decay 1e-5 --seed 42
python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 1024,512 --epochs 12 --batch-size 256 --lr 1e-3 --lr-step-size 4 --lr-gamma 0.5 --feature-engineer --weight-decay 1e-5 --seed 42
# train with outputs saved to output/
python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 512,256 --epochs 8 --batch-size 256 --feature-engineer --weight-decay 1e-5 --seed 42 --output-dir output/
# evaluate and visualize:
python evaluate_and_visualize.py \
--checkpoint path/to/checkpoint.pt \
--data data.csv \
--label-col original_label_column_name \
--batch-size 256 \
--sample-index 5 \
--plot
# evaluate
python evaluate_and_visualize.py --checkpoint output/model.pth --data data.csv --label-col label --plot --sample-index 5
# If you used generated labels during training and train.py saved metadata,
# the evaluator will prefer generated labels saved in label_info.json inside the checkpoint dir.
# fetch weather (placeholder)
# from openweather_client import fetch_road_risk
# print(fetch_road_risk(37.7749, -122.4194))

View File

@@ -1,76 +0,0 @@
import pandas as pd
import hashlib
from data import _normalize_str, _normalize_date
p='data.csv'
df=pd.read_csv(p, nrows=50, low_memory=False)
print('Columns:', list(df.columns))
colmap = {c.lower(): c for c in df.columns}
def get_col(*candidates):
for cand in candidates:
key = cand.lower()
if key in colmap:
return colmap[key]
return None
report_col = get_col('report_dat', 'reportdate', 'fromdate', 'lastupdatedate')
lat_col = get_col('latitude', 'mpdlatitude', 'lat')
lon_col = get_col('longitude', 'mpdlongitude', 'lon')
street1_col = get_col('street1', 'address', 'mar_address', 'nearestintstreetname')
street2_col = get_col('street2', 'nearestintstreetname')
ward_col = get_col('ward')
inj_cols = [c for c in df.columns if 'INJUR' in c.upper()]
fat_cols = [c for c in df.columns if 'FATAL' in c.upper()]
uid = get_col('crimeid', 'eventid', 'objectid', 'ccn')
print('Resolved columns:')
print('report_col=', report_col)
print('lat_col=', lat_col)
print('lon_col=', lon_col)
print('street1_col=', street1_col)
print('street2_col=', street2_col)
print('ward_col=', ward_col)
print('inj_cols=', inj_cols[:10])
print('fat_cols=', fat_cols[:10])
print('uid=', uid)
for i, row in df.iterrows():
parts = []
parts.append(_normalize_date(row.get(report_col, '') if report_col else ''))
lat = row.get(lat_col, '') if lat_col else ''
lon = row.get(lon_col, '') if lon_col else ''
try:
parts.append(str(round(float(lat), 5)) if pd.notna(lat) and lat != '' else '')
except Exception:
parts.append('')
try:
parts.append(str(round(float(lon), 5)) if pd.notna(lon) and lon != '' else '')
except Exception:
parts.append('')
parts.append(_normalize_str(row.get(street1_col, '') if street1_col else ''))
parts.append(_normalize_str(row.get(street2_col, '') if street2_col else ''))
parts.append(_normalize_str(row.get(ward_col, '') if ward_col else ''))
inj_sum = 0
for c in inj_cols:
try:
v = row.get(c, 0)
inj_sum += int(v) if pd.notna(v) and v != '' else 0
except Exception:
pass
parts.append(str(inj_sum))
fat_sum = 0
for c in fat_cols:
try:
v = row.get(c, 0)
fat_sum += int(v) if pd.notna(v) and v != '' else 0
except Exception:
pass
parts.append(str(fat_sum))
if uid:
parts.append(str(row.get(uid, '')))
s='|'.join(parts)
h=hashlib.md5(s.encode('utf-8')).hexdigest()
val=int(h,16)%100
print(i, 'label=', val, 's="'+s+'"')

View File

@@ -0,0 +1,193 @@
import argparse
import json
import os
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
# Minimal helper: try to reconstruct the model if checkpoint stores config, else attempt full-model load.
def load_checkpoint(checkpoint_path, model_builder=None, device="cpu"):
ckpt = torch.load(checkpoint_path, map_location=device)
# if checkpoint contains state_dict + model_config, try to rebuild using models.create_model
if isinstance(ckpt, dict) and "model_state_dict" in ckpt:
builder = model_builder
if builder is None:
try:
from models import create_model as _create_model
builder = lambda cfg: _create_model(device=device, model_type=cfg.get("model_type", "mlp") if "model_type" in cfg else "mlp", input_dim=cfg.get("input_dim"), num_classes=cfg.get("num_classes"), hidden_dims=cfg.get("hidden_dims"))
except Exception:
builder = None
if builder is not None and "model_config" in ckpt:
model = builder(ckpt.get("model_config", {}))
model.load_state_dict(ckpt["model_state_dict"])
model.to(device).eval()
meta = {k: v for k, v in ckpt.items() if k not in ("model_state_dict",)}
return model, meta
else:
# try to load full model object or raise
try:
model = ckpt
model.to(device).eval()
return model, {}
except Exception:
raise RuntimeError("Checkpoint contains model_state_dict but cannot reconstruct model; provide model_builder.")
else:
# maybe the full model object was saved
try:
model = ckpt
model.to(device).eval()
return model, {}
except Exception as e:
raise RuntimeError(f"Can't load checkpoint automatically: {e}")
def prepare_features(df, feature_cols=None):
if feature_cols is None:
# assume all columns except label are features
return df.drop(columns=[c for c in df.columns if c.endswith("label")], errors='ignore').values.astype(np.float32)
return df[feature_cols].values.astype(np.float32)
def plot_sample(x, true_label, pred_label):
x = np.asarray(x)
title = f"true: {true_label} pred: {pred_label}"
if x.ndim == 1:
n = x.size
sq = int(np.sqrt(n))
if sq * sq == n:
plt.imshow(x.reshape(sq, sq), cmap="gray")
plt.title(title)
plt.axis("off")
plt.show()
return
if x.size <= 3:
plt.bar(range(x.size), x)
plt.title(title)
plt.show()
return
# fallback: plot first 200 dims as line
plt.plot(x[:200])
plt.title(title + " (first 200 dims)")
plt.show()
return
elif x.ndim == 2:
plt.imshow(x, aspect='auto')
plt.title(title)
plt.show()
return
else:
print("Sample too high-dim to plot, printing summary:")
print("mean", x.mean(), "std", x.std())
def main():
p = argparse.ArgumentParser()
p.add_argument("--checkpoint", required=True, help="Path to saved checkpoint (.pt)")
p.add_argument("--data", required=True, help="CSV with features and optional label column")
p.add_argument("--label-col", default=None, help="Original label column name in CSV (if present)")
p.add_argument("--batch-size", type=int, default=256)
p.add_argument("--sample-index", type=int, default=0, help="Index of a sample to plot")
p.add_argument("--plot", action="store_true")
p.add_argument("--device", default="cpu")
args = p.parse_args()
device = args.device
# If your project has a known model class, replace model_builder with a lambda that instantiates it.
model_builder = None
# load checkpoint
model, meta = load_checkpoint(args.checkpoint, model_builder=model_builder, device=device)
# try to discover preprocess_meta and label_info
ckpt_dir = os.path.dirname(args.checkpoint)
preprocess_meta = None
meta_preprocess_path = os.path.join(ckpt_dir, meta.get("preprocess_meta", "")) if isinstance(meta, dict) else None
if meta_preprocess_path and os.path.exists(meta_preprocess_path):
try:
import numpy as _np
d = _np.load(meta_preprocess_path, allow_pickle=True)
preprocess_meta = {
"feature_columns": [str(x) for x in d["feature_columns"].tolist()],
"means": d["means"].astype(np.float32),
"stds": d["stds"].astype(np.float32),
}
print(f"Loaded preprocess meta from {meta_preprocess_path}")
except Exception:
preprocess_meta = None
# prefer label_col from CSV, otherwise load saved assignments if present
y_true = None
if args.label_col and args.label_col in df.columns:
y_true = df[args.label_col].values
else:
# check label_info from checkpoint dir
label_info_path = os.path.join(ckpt_dir, "label_info.json")
label_info = {}
if os.path.exists(label_info_path):
with open(label_info_path, "r") as f:
label_info = json.load(f)
elif isinstance(meta, dict) and "label_info" in meta:
label_info = meta["label_info"]
if "assignments" in label_info:
y_true = np.array(label_info["assignments"])
elif "assignments_file" in label_info:
try:
import numpy as _np
arr = _np.load(os.path.join(ckpt_dir, label_info["assignments_file"]))
y_true = arr["assignments"]
except Exception:
pass
# prepare features: if preprocess_meta is present use its feature_columns and scaling
if preprocess_meta is not None:
feature_cols = preprocess_meta["feature_columns"]
feature_df = df[feature_cols]
X = feature_df.values.astype(np.float32)
# apply scaling
means = preprocess_meta["means"]
stds = preprocess_meta["stds"]
stds[stds == 0] = 1.0
X = (X - means) / stds
else:
if args.label_col and args.label_col in df.columns:
feature_df = df.drop(columns=[args.label_col])
else:
feature_df = df.select_dtypes(include=[np.number])
X = feature_df.values.astype(np.float32)
# create DataLoader-like batching for inference
model.to(device)
model.eval()
preds = []
with torch.no_grad():
for i in range(0, X.shape[0], args.batch_size):
batch = torch.from_numpy(X[i:i+args.batch_size]).to(device)
out = model(batch) # adapt if your model returns (logits, ...)
if isinstance(out, (tuple, list)):
out = out[0]
probs = F.softmax(out, dim=1) if out.dim() == 2 else out
pred = probs.argmax(dim=1).cpu().numpy()
preds.append(pred)
preds = np.concatenate(preds, axis=0)
if y_true is not None:
acc = accuracy_score(y_true, preds)
print(f"Accuracy: {acc:.4f}")
print("Classification report:")
print(classification_report(y_true, preds, zero_division=0))
else:
print("Predictions computed but no true labels available to compute accuracy.")
print("First 20 predictions:", preds[:20])
if args.plot:
idx = args.sample_index
if idx < 0 or idx >= X.shape[0]:
print("sample-index out of range")
return
sample_x = X[idx]
true_label = y_true[idx] if y_true is not None else None
pred_label = preds[idx]
plot_sample(sample_x, true_label, pred_label)
if __name__ == "__main__":
main()

View File

@@ -1,65 +0,0 @@
"""Fit k-means centers on CSV numeric features (optionally PCA) and save centers to .npz
Usage: python fit_kmeans.py data.csv --n-buckets 10 --out kmeans_centers_final.npz --sample 50000 --pca 50
"""
import argparse
import numpy as np
import pandas as pd
from data import generate_kmeans_labels
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('csv')
parser.add_argument('--n-buckets', type=int, default=10)
parser.add_argument('--out', default='kmeans_centers_final.npz')
parser.add_argument('--sample', type=int, default=50000, help='max rows to sample for fitting')
parser.add_argument('--pca', type=int, default=0, help='Apply PCA to reduce dims before kmeans (0=none)')
args = parser.parse_args()
# read numeric columns only to avoid huge memory usage
df = pd.read_csv(args.csv, low_memory=False)
num_df = df.select_dtypes(include=['number']).fillna(0.0)
data = num_df.values.astype(float)
if data.shape[0] == 0 or data.shape[1] == 0:
raise SystemExit('No numeric data found in CSV')
# sample rows if requested
if args.sample and args.sample < data.shape[0]:
rng = np.random.default_rng(42)
idx = rng.choice(data.shape[0], size=args.sample, replace=False)
sample_data = data[idx]
else:
sample_data = data
# Use the kmeans implementation via generate_kmeans_labels for fitting centers.
# We'll call the internal function by adapting it here: import numpy locally.
import numpy as _np
# initialize centers by random sampling
rng = _np.random.default_rng(42)
k = min(args.n_buckets, sample_data.shape[0])
centers_idx = rng.choice(sample_data.shape[0], size=k, replace=False)
centers = sample_data[centers_idx].astype(float)
max_iters = 50
for _ in range(max_iters):
dists = np.linalg.norm(sample_data[:, None, :] - centers[None, :, :], axis=2)
labels = np.argmin(dists, axis=1)
new_centers = np.zeros_like(centers)
counts = np.zeros((centers.shape[0],), dtype=int)
for i, lab in enumerate(labels):
new_centers[lab] += sample_data[i]
counts[lab] += 1
for kk in range(centers.shape[0]):
if counts[kk] > 0:
new_centers[kk] = new_centers[kk] / counts[kk]
else:
new_centers[kk] = sample_data[rng.integers(0, sample_data.shape[0])]
shift = np.linalg.norm(new_centers - centers, axis=1).max()
centers = new_centers
if shift < 1e-4:
break
np.savez_compressed(args.out, centers=centers)
print('Saved centers to', args.out)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,5 +1,60 @@
import torch
import torch.nn as nn
import math
from typing import Union, Iterable
import numpy as np
import torch as _torch
def accidents_to_bucket(count: Union[int, float, Iterable],
max_count: int = 20000,
num_bins: int = 10) -> Union[int, list, _torch.Tensor, np.ndarray]:
"""
Map accident counts to simple buckets 1..num_bins (equal-width).
Example: max_count=20000, num_bins=10 -> bin width = 2000
0-1999 -> 1, 2000-3999 -> 2, ..., 18000-20000 -> 10
Args:
count: single value or iterable (list/numpy/torch). Values <=0 map to 1, values >= max_count map to num_bins.
max_count: expected maximum count (top of highest bin).
num_bins: number of buckets (default 10).
Returns:
Same type as input (int for scalar, list/numpy/torch for iterables) with values in 1..num_bins.
"""
width = max_count / float(num_bins)
def _bucket_scalar(x):
# clamp
x = 0.0 if x is None else float(x)
if x <= 0:
return 1
if x >= max_count:
return num_bins
return int(x // width) + 1
# scalar int/float
if isinstance(count, (int, float)):
return _bucket_scalar(count)
# torch tensor
if isinstance(count, _torch.Tensor):
x = count.clone().float()
x = _torch.clamp(x, min=0.0, max=float(max_count))
buckets = (x // width).to(_torch.long) + 1
buckets = _torch.clamp(buckets, min=1, max=num_bins)
return buckets
# numpy array
if isinstance(count, np.ndarray):
x = np.clip(count.astype(float), 0.0, float(max_count))
buckets = (x // width).astype(int) + 1
return np.clip(buckets, 1, num_bins)
# generic iterable -> list
if isinstance(count, Iterable):
return [ _bucket_scalar(float(x)) for x in count ]
# fallback
return _bucket_scalar(float(count))
class SimpleCNN(nn.Module):
@@ -18,7 +73,16 @@ class SimpleCNN(nn.Module):
with torch.no_grad():
dummy = torch.zeros(1, *input_size)
feat = self.features(dummy)
flat_features = int(feat.numel() / feat.shape[0])
# flat_features was previously computed as:
# int(feat.numel() / feat.shape[0])
# Explanation:
# feat.shape == (N, C, H, W) (for image inputs)
# feat.numel() == N * C * H * W
# dividing by N (feat.shape[0]) yields C * H * W, i.e. flattened size per sample
# Clearer alternative using tensor shape:
flat_features = int(torch.prod(torch.tensor(feat.shape[1:])).item())
# If you need the linear index mapping for coordinates (c, h, w):
# idx = c * (H * W) + h * W + w
self.classifier = nn.Sequential(
nn.Flatten(),

View File

@@ -0,0 +1,117 @@
"""OpenWeather / Road Risk client.
Provides:
- fetch_weather(lat, lon, api_key=None)
- fetch_road_risk(lat, lon, api_key=None, roadrisk_url=None, extra_params=None)
Never hardcode API keys in source. Provide via api_key argument or set OPENWEATHER_API_KEY / OPENWEATHER_KEY env var.
"""
import os
from typing import Tuple, Dict, Any, Optional
import requests
def _get_api_key(explicit_key: Optional[str] = None) -> Optional[str]:
if explicit_key:
return explicit_key
return os.environ.get("OPENWEATHER_API_KEY") or os.environ.get("OPENWEATHER_KEY")
BASE_URL = "https://api.openweathermap.org/data/2.5"
def fetch_weather(lat: float, lon: float, params: Optional[dict] = None, api_key: Optional[str] = None) -> dict:
"""Call standard OpenWeather /weather endpoint and return parsed JSON."""
key = _get_api_key(api_key)
if key is None:
raise RuntimeError("Set OPENWEATHER_API_KEY or OPENWEATHER_KEY or pass api_key")
q = {"lat": lat, "lon": lon, "appid": key, "units": "metric"}
if params:
q.update(params)
resp = requests.get(f"{BASE_URL}/weather", params=q, timeout=10)
resp.raise_for_status()
return resp.json()
def fetch_road_risk(lat: float, lon: float, extra_params: Optional[dict] = None, api_key: Optional[str] = None, roadrisk_url: Optional[str] = None) -> Tuple[dict, Dict[str, Any]]:
"""
Call OpenWeather /roadrisk endpoint (or provided roadrisk_url) and return (raw_json, features).
features will always include 'road_risk_score' (float). Other numeric fields are included when present.
The implementation:
- prefers explicit numeric keys (road_risk_score, risk_score, score, risk)
- if absent, collects top-level numeric fields and averages common contributors
- if still absent, falls back to a simple weather-derived heuristic using /weather
Note: Do not commit API keys. Pass api_key or set env var.
"""
key = _get_api_key(api_key)
if key is None:
raise RuntimeError("Set OPENWEATHER_API_KEY or OPENWEATHER_KEY or pass api_key")
params = {"lat": lat, "lon": lon, "appid": key}
if extra_params:
params.update(extra_params)
url = roadrisk_url or f"{BASE_URL}/roadrisk"
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
features: Dict[str, Any] = {}
risk: Optional[float] = None
# direct candidates
for candidate in ("road_risk_score", "risk_score", "risk", "score"):
if isinstance(data, dict) and candidate in data:
try:
risk = float(data[candidate])
features[candidate] = risk
break
except Exception:
pass
# if no direct candidate, collect numeric top-level fields
if risk is None and isinstance(data, dict):
numeric_fields = {}
for k, v in data.items():
if isinstance(v, (int, float)):
numeric_fields[k] = float(v)
features.update(numeric_fields)
# try averaging common contributors if present
contributors = []
for name in ("precipitation", "rain", "snow", "visibility", "wind_speed"):
if name in data and isinstance(data[name], (int, float)):
contributors.append(float(data[name]))
if contributors:
# average contributors -> risk proxy
risk = float(sum(contributors) / len(contributors))
# fallback: derive crude risk from /weather
if risk is None:
try:
w = fetch_weather(lat, lon, api_key=key)
main = w.get("main", {})
wind = w.get("wind", {})
weather = w.get("weather", [{}])[0]
# heuristic: rain + high wind + low visibility
derived = 0.0
if isinstance(weather.get("main", ""), str) and "rain" in weather.get("main", "").lower():
derived += 1.0
if (wind.get("speed") or 0) > 6.0:
derived += 0.5
if (w.get("visibility") or 10000) < 5000:
derived += 1.0
risk = float(derived)
features.update({
"temp": main.get("temp"),
"humidity": main.get("humidity"),
"wind_speed": wind.get("speed"),
"visibility": w.get("visibility"),
"weather_main": weather.get("main"),
"weather_id": weather.get("id"),
})
except Exception:
# cannot derive anything; set neutral 0.0
risk = 0.0
features["road_risk_score"] = float(risk)
return data, features

View File

@@ -1,113 +0,0 @@
import os
import argparse
import pandas as pd
import numpy as np
import time
import openweather_inference as owi
def find_column(df_cols, candidates):
cmap = {c.lower(): c for c in df_cols}
for cand in candidates:
if cand.lower() in cmap:
return cmap[cand.lower()]
return None
def main():
parser = argparse.ArgumentParser()
parser.add_argument('csv', help='Path to data CSV (e.g., data.csv)')
parser.add_argument('--out', default='inference_results.csv')
parser.add_argument('--lat-col', default=None)
parser.add_argument('--lon-col', default=None)
parser.add_argument('--date-col', default=None)
parser.add_argument('--model', default='model.pth')
parser.add_argument('--centers', default='kmeans_centers_all.npz')
parser.add_argument('--preprocess-meta', default=None)
parser.add_argument('--api-key', default=None)
parser.add_argument('--live', action='store_true', help='If set, call external RoadRisk/OpenWeather per row')
parser.add_argument('--roadrisk-url', default=None, help='Optional per-request RoadRisk URL to use when --live')
parser.add_argument('--subset', type=int, default=0, help='Process only first N rows for testing')
args = parser.parse_args()
df = pd.read_csv(args.csv, low_memory=False)
nrows = args.subset if args.subset and args.subset > 0 else len(df)
df = df.iloc[:nrows].copy()
# find sensible columns
lat_col = args.lat_col or find_column(df.columns, ['latitude', 'lat', 'mpdlatitude'])
lon_col = args.lon_col or find_column(df.columns, ['longitude', 'lon', 'mpdlongitude'])
date_col = args.date_col or find_column(df.columns, ['report_dat', 'reportdate', 'fromdate', 'lastupdatedate', 'date', 'occur_date'])
if lat_col is None or lon_col is None:
raise SystemExit('Could not find latitude/longitude columns automatically. Pass --lat-col and --lon-col.')
print(f'Using lat column: {lat_col}, lon column: {lon_col}, date column: {date_col}')
# eager init caches
status = owi.init_inference(model_path=args.model, centers_path=args.centers, preprocess_meta=args.preprocess_meta)
print('init status:', status)
results = []
t0 = time.time()
for i, row in df.iterrows():
lat = row.get(lat_col)
lon = row.get(lon_col)
dt = row.get(date_col) if date_col else None
try:
if args.live:
# call the full pipeline which may hit remote API
out = owi.predict_from_openweather(lat, lon, dt_iso=dt, street=None, api_key=args.api_key, train_csv=None, preprocess_meta=args.preprocess_meta, model_path=args.model, centers_path=args.centers, roadrisk_url=args.roadrisk_url)
else:
# local-only path: build row, prepare features using preprocess_meta, and run cached model
df_row = owi.build_row(lat, lon, dt_iso=dt, street=None, extra_weather=None)
x_tensor, feature_columns = owi.prepare_features(df_row, train_csv=None, preprocess_meta=args.preprocess_meta)
# ensure model cached
if owi._CACHED_MODEL is None:
owi.init_inference(model_path=args.model, centers_path=args.centers, preprocess_meta=args.preprocess_meta)
model = owi._CACHED_MODEL
centers = owi._CACHED_CENTERS
device = 'cuda' if __import__('torch').cuda.is_available() else 'cpu'
model.to(device)
xt = x_tensor.to(device)
import torch
import torch.nn.functional as F
with torch.no_grad():
logits = model(xt)
probs = F.softmax(logits, dim=1).cpu().numpy()[0]
pred_idx = int(probs.argmax())
confidence = float(probs.max())
out = {'pred_cluster': pred_idx, 'confidence': confidence, 'probabilities': probs.tolist(), 'centroid': centers[pred_idx].tolist() if centers is not None else None, 'feature_columns': feature_columns}
except Exception as e:
out = {'error': str(e)}
# combine row and output into flat result
res = {
'orig_index': i,
'lat': lat,
'lon': lon,
'datetime': str(dt),
}
if 'error' in out:
res.update({'error': out['error']})
else:
res.update({
'pred_cluster': int(out.get('pred_cluster')),
'confidence': float(out.get('confidence')),
})
results.append(res)
if (len(results) % 50) == 0:
print(f'Processed {len(results)}/{nrows} rows...')
elapsed = time.time() - t0
print(f'Finished {len(results)} rows in {elapsed:.2f}s')
out_df = pd.DataFrame(results)
out_df.to_csv(args.out, index=False)
print('Wrote', args.out)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env bash
# Run from repo root. Make sure your Flask app is running (python app.py) first.
# Export your OpenWeather key (do NOT commit it into code):
# export OPENWEATHER_API_KEY="your_real_key_here"
HOST=${HOST:-http://127.0.0.1:5000}
echo "Test 1: env var key (no explicit api_key in payload)"
curl -s -X POST ${HOST}/predict-roadrisk -H "Content-Type: application/json" -d '{"lat":38.9,"lon":-77.0}' | jq
echo "Test 2: explicit api_key in payload (overrides env var)"
curl -s -X POST ${HOST}/predict-roadrisk -H "Content-Type: application/json" -d '{"lat":38.9,"lon":-77.0,"api_key":"MY_OVERRIDE_KEY"}' | jq

View File

@@ -0,0 +1,70 @@
import os
import builtins
import pytest
from unittest.mock import patch, MagicMock
from openweather_client import fetch_road_risk, fetch_weather, _get_api_key
class DummyResp:
def __init__(self, json_data, status=200):
self._json = json_data
self.status = status
def raise_for_status(self):
if self.status >= 400:
raise Exception("HTTP error")
def json(self):
return self._json
def make_get(mock_json, status=200):
return MagicMock(return_value=DummyResp(mock_json, status=status))
def test_get_api_key_prefers_explicit(monkeypatch):
# explicit key should be returned
assert _get_api_key("EXPLICIT") == "EXPLICIT"
monkeypatch.delenv("OPENWEATHER_API_KEY", raising=False)
monkeypatch.delenv("OPENWEATHER_KEY", raising=False)
assert _get_api_key(None) is None
def test_fetch_road_risk_direct_score(monkeypatch):
# roadrisk returns direct numeric field
resp_json = {"road_risk_score": 2.5, "detail": "ok"}
with patch("openweather_client.requests.get", make_get(resp_json)):
data, features = fetch_road_risk(1.0, 2.0, api_key="TESTKEY")
assert data["road_risk_score"] == 2.5
assert features["road_risk_score"] == 2.5
def test_fetch_road_risk_numeric_fields(monkeypatch):
# roadrisk returns top-level numeric fields, expect average of contributors
resp_json = {"precipitation": 4.0, "visibility": 2000, "other": 3}
with patch("openweather_client.requests.get", make_get(resp_json)):
data, features = fetch_road_risk(1.0, 2.0, api_key="TESTKEY")
# contributors list contains precipitation and visibility and maybe others; check road_risk_score numeric
assert "road_risk_score" in features
assert features["road_risk_score"] > 0
def test_fetch_road_risk_fallback_to_weather(monkeypatch):
# roadrisk returns empty dict; requests.get called first for roadrisk then for weather.
seq = [
DummyResp({}, status=200), # roadrisk empty
DummyResp({
"main": {"temp": 10, "humidity": 80},
"wind": {"speed": 7.5},
"visibility": 3000,
"weather": [{"main": "Rain"}]
}, status=200)
]
# iterate returns next DummyResp
def side_effect(url, params=None, timeout=None):
return seq.pop(0)
with patch("openweather_client.requests.get", side_effect=side_effect):
data, features = fetch_road_risk(1.0, 2.0, api_key="TESTKEY")
# derived heuristic: rain=1.0 + wind>6 => 0.5 + visibility<5000 =>1.0 => total 2.5
assert abs(features["road_risk_score"] - 2.5) < 1e-6
assert features["weather_main"] == "Rain" or features.get("weather_main") == "Rain"
def test_fetch_road_risk_missing_api_key(monkeypatch):
# ensure no env var present
monkeypatch.delenv("OPENWEATHER_API_KEY", raising=False)
monkeypatch.delenv("OPENWEATHER_KEY", raising=False)
with pytest.raises(RuntimeError):
fetch_road_risk(1.0, 2.0)

View File

@@ -0,0 +1,45 @@
import json
import pytest
from unittest.mock import patch
def test_predict_roadrisk_forwards_api_key(monkeypatch):
# import app after monkeypatching env to avoid side effects
from app import app
client = app.test_client()
fake_response = {"road_risk": "ok", "risk_index": 5}
# patch the inference function to capture args and return a dummy response
with patch("openweather_inference.predict_from_openweather") as mock_predict:
mock_predict.return_value = fake_response
payload = {"lat": 38.9, "lon": -77.0, "api_key": "EXPLICIT_TEST_KEY"}
rv = client.post("/predict-roadrisk", data=json.dumps(payload), content_type="application/json")
assert rv.status_code == 200
data = rv.get_json()
assert data == fake_response
# assert that our mocked predict_from_openweather was called and api_key forwarded
assert mock_predict.called
_, called_kwargs = mock_predict.call_args
assert called_kwargs.get("api_key") == "EXPLICIT_TEST_KEY"
def test_predict_roadrisk_uses_env_key_when_not_provided(monkeypatch):
from app import app
client = app.test_client()
fake_response = {"road_risk": "ok", "risk_index": 3}
monkeypatch.setenv("OPENWEATHER_API_KEY", "ENV_TEST_KEY")
with patch("openweather_inference.predict_from_openweather") as mock_predict:
mock_predict.return_value = fake_response
payload = {"lat": 38.9, "lon": -77.0} # no api_key in payload
rv = client.post("/predict-roadrisk", data=json.dumps(payload), content_type="application/json")
assert rv.status_code == 200
data = rv.get_json()
assert data == fake_response
assert mock_predict.called
_, called_kwargs = mock_predict.call_args
assert called_kwargs.get("api_key") == "ENV_TEST_KEY"

View File

@@ -1,11 +0,0 @@
orig_index,lat,lon,datetime,error
0,38.91557,-77.031697,2011/03/06 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
1,38.875558,-77.017556,2011/03/06 08:45:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
2,38.872976,-77.016987,2011/03/05 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
3,38.929433,-77.003943,2011/03/08 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
4,38.89674,-77.027034,2011/03/08 17:18:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
5,38.89093,-76.993494,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
6,38.908478,-77.040086,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
7,38.846563,-76.976504,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
8,38.894783,-77.01292,2011/03/12 18:30:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
9,38.934204,-77.034567,2011/03/14 04:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats
1 orig_index lat lon datetime error
2 0 38.91557 -77.031697 2011/03/06 05:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
3 1 38.875558 -77.017556 2011/03/06 08:45:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
4 2 38.872976 -77.016987 2011/03/05 05:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
5 3 38.929433 -77.003943 2011/03/08 05:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
6 4 38.89674 -77.027034 2011/03/08 17:18:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
7 5 38.89093 -76.993494 2011/03/12 05:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
8 6 38.908478 -77.040086 2011/03/12 05:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
9 7 38.846563 -76.976504 2011/03/12 05:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
10 8 38.894783 -77.01292 2011/03/12 18:30:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats
11 9 38.934204 -77.034567 2011/03/14 04:00:00+00 Either preprocess_meta or train_csv must be provided to derive feature stats

View File

@@ -9,8 +9,10 @@ from data import ImageFolderDataset, CSVDataset
from models import create_model
def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_classes=10, model_type='cnn', csv_label='label', generate_labels=False, n_buckets=100, label_method='md5', label_store=None, feature_engineer=False, lat_lon_bins=20, nrows=None, seed=42, hidden_dims=None, weight_decay=0.0):
def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_classes=10, model_type='mlp', csv_label='label', generate_labels=False, n_buckets=100, label_method='md5', label_store=None, feature_engineer=False, lat_lon_bins=20, nrows=None, seed=42, hidden_dims=None, weight_decay=0.0, output_dir=None):
device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
output_dir = output_dir or os.getcwd()
os.makedirs(output_dir, exist_ok=True)
# Detect CSV vs folder dataset
if os.path.isfile(dataset_root) and dataset_root.lower().endswith('.csv'):
dataset = CSVDataset(dataset_root,
@@ -45,7 +47,7 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class
# persist preprocessing metadata so inference can reuse identical stats
try:
import numpy as _np
meta_path = os.path.join(os.getcwd(), 'preprocess_meta.npz')
meta_path = os.path.join(output_dir, 'preprocess_meta.npz')
_np.savez_compressed(meta_path, feature_columns=_np.array(dataset.feature_columns, dtype=object), means=dataset.feature_means, stds=dataset.feature_stds)
print(f'Saved preprocess meta to {meta_path}')
except Exception:
@@ -60,6 +62,35 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class
model_num_classes = n_buckets
else:
model_num_classes = n_buckets if generate_labels else num_classes
# If labels were generated, save label metadata + assignments (if not huge)
if generate_labels:
try:
label_info = {
"generated": True,
"label_method": label_method,
"n_buckets": n_buckets,
}
# save per-sample assignments if dataset exposes them
if hasattr(dataset, "labels"):
try:
# convert to list (JSON serializable)
assignments = dataset.labels.cpu().numpy().tolist() if hasattr(dataset.labels, "cpu") else dataset.labels.tolist()
# if too large, save as .npz instead
if len(assignments) <= 100000:
label_info["assignments"] = assignments
else:
import numpy as _np
arr_path = os.path.join(output_dir, "label_assignments.npz")
_np.savez_compressed(arr_path, assignments=_np.array(assignments))
label_info["assignments_file"] = os.path.basename(arr_path)
except Exception:
pass
with open(os.path.join(output_dir, "label_info.json"), "w") as f:
import json
json.dump(label_info, f)
print(f"Saved label_info to {os.path.join(output_dir, 'label_info.json')}")
except Exception:
pass
# parse hidden_dims if provided by caller (tuple or list)
model = create_model(device=device, model_type='mlp', input_dim=input_dim, num_classes=model_num_classes, hidden_dims=hidden_dims)
else:
@@ -112,11 +143,23 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class
# save best
if val_acc > best_val_acc:
out_path = os.path.join(os.getcwd(), 'model.pth')
out_path = os.path.join(output_dir, 'model.pth')
# include useful metadata so evaluator can reconstruct
meta = {
'model_state_dict': model.state_dict(),
'model_type': model_type,
'model_config': {
'input_dim': input_dim if model_type == 'mlp' else None,
'num_classes': model_num_classes,
'hidden_dims': hidden_dims,
}
}
if hasattr(dataset, 'class_to_idx'):
meta = {'model_state_dict': model.state_dict(), 'class_to_idx': dataset.class_to_idx}
else:
meta = {'model_state_dict': model.state_dict()}
meta['class_to_idx'] = dataset.class_to_idx
# also record paths to saved preprocess and label info (if present)
meta['preprocess_meta'] = os.path.basename(os.path.join(output_dir, 'preprocess_meta.npz'))
if os.path.exists(os.path.join(output_dir, 'label_info.json')):
meta['label_info'] = json.load(open(os.path.join(output_dir, 'label_info.json'), 'r'))
torch.save(meta, out_path)
best_val_acc = val_acc
best_path = out_path
@@ -127,6 +170,7 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class
if __name__ == '__main__':
import argparse
import json
parser = argparse.ArgumentParser()
parser.add_argument('data_root')
parser.add_argument('--epochs', type=int, default=3)
@@ -144,6 +188,7 @@ if __name__ == '__main__':
parser.add_argument('--seed', type=int, default=42, help='Random seed for experiments')
parser.add_argument('--hidden-dims', type=str, default='', help='Comma-separated hidden dims for MLP, e.g. "256,128"')
parser.add_argument('--weight-decay', type=float, default=0.0, help='Weight decay (L2) for optimizer')
parser.add_argument('--output-dir', default='.', help='Directory to save output files')
args = parser.parse_args()
data_root = args.data_root
nrows = args.subset if args.subset > 0 else None
@@ -154,5 +199,14 @@ if __name__ == '__main__':
hidden_dims = tuple(int(x) for x in args.hidden_dims.split(',') if x.strip())
except Exception:
hidden_dims = None
train(data_root, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, model_type=args.model_type, csv_label=args.csv_label, generate_labels=args.generate_labels, n_buckets=args.n_buckets, label_method=args.label_method, label_store=args.label_store, feature_engineer=args.feature_engineer, lat_lon_bins=args.lat_lon_bins, nrows=nrows, seed=args.seed, hidden_dims=hidden_dims, weight_decay=args.weight_decay)
if args.generate_labels:
os.makedirs(args.output_dir, exist_ok=True)
label_info = {
"generated": True,
"label_method": args.label_method,
"n_buckets": args.n_buckets,
}
with open(os.path.join(args.output_dir, "label_info.json"), "w") as f:
json.dump(label_info, f)
train(data_root, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, model_type=args.model_type, csv_label=args.csv_label, generate_labels=args.generate_labels, n_buckets=args.n_buckets, label_method=args.label_method, label_store=args.label_store, feature_engineer=args.feature_engineer, lat_lon_bins=args.lat_lon_bins, nrows=nrows, seed=args.seed, hidden_dims=hidden_dims, weight_decay=args.weight_decay, output_dir=args.output_dir)