diff --git a/roadcast/.gitignore b/roadcast/.gitignore new file mode 100644 index 0000000..7b4b416 --- /dev/null +++ b/roadcast/.gitignore @@ -0,0 +1,29 @@ +# Generated model checkpoints and metadata +model.pth +*.pt +*.pth +preprocess_meta.npz +tmp_infer.csv +label_info.json +label_assignments.npz +# Saved label stores (kmeans etc.) +*.npz +kmeans_centers*.npz + +# Ignore training output directories +output/ +checkpoints/ + +# Python caches and virtualenvs +__pycache__/ +*.py[cod] +.env/ +.venv/ + +# Test and notebook caches +.pytest_cache/ +.ipynb_checkpoints/ + +# OS files +.DS_Store +Thumbs.db diff --git a/roadcast/KEEP_FILES.txt b/roadcast/KEEP_FILES.txt new file mode 100644 index 0000000..e69de29 diff --git a/roadcast/app.py b/roadcast/app.py index 38d3889..9b1be8a 100644 --- a/roadcast/app.py +++ b/roadcast/app.py @@ -80,14 +80,26 @@ def predict_roadrisk(): dt = payload.get('datetime') street = payload.get('street', '') roadrisk_url = payload.get('roadrisk_url') - api_key = payload.get('api_key') or os.environ.get('OPENWEATHER_KEY') + # prefer explicit api_key in request, otherwise read from OPENWEATHER_API_KEY env var + api_key = payload.get('api_key') or os.environ.get('OPENWEATHER_API_KEY') if lat is None or lon is None: return jsonify({"error": "lat and lon are required fields"}), 400 try: from openweather_inference import predict_from_openweather - res = predict_from_openweather(lat, lon, dt_iso=dt, street=street, api_key=api_key, train_csv=os.path.join(os.getcwd(), 'data.csv'), preprocess_meta=None, model_path=os.path.join(os.getcwd(), 'model.pth'), centers_path=os.path.join(os.getcwd(), 'kmeans_centers_all.npz'), roadrisk_url=roadrisk_url) + # pass api_key (may be None) to the inference helper; helper will raise if a key is required + res = predict_from_openweather( + lat, lon, + dt_iso=dt, + street=street, + api_key=api_key, + train_csv=os.path.join(os.getcwd(), 'data.csv'), + preprocess_meta=None, + model_path=os.path.join(os.getcwd(), 'model.pth'), + centers_path=os.path.join(os.getcwd(), 'kmeans_centers_all.npz'), + roadrisk_url=roadrisk_url + ) return jsonify(res) except Exception as e: return jsonify({"error": str(e)}), 500 diff --git a/roadcast/check_env.py b/roadcast/check_env.py deleted file mode 100644 index 5fd7b84..0000000 --- a/roadcast/check_env.py +++ /dev/null @@ -1,32 +0,0 @@ -import sys -import importlib - -def safe_import(name): - try: - return importlib.import_module(name) - except Exception as e: - return e - -print('Python:', sys.version.replace('\n',' ')) - -torch = safe_import('torch') -if isinstance(torch, Exception): - print('torch import error:', torch) -else: - print('torch:', torch.__version__) - print('CUDA available:', torch.cuda.is_available()) - if torch.cuda.is_available(): - print('CUDA device count:', torch.cuda.device_count()) - print('Current device name:', torch.cuda.get_device_name(0)) - -pandas = safe_import('pandas') -if isinstance(pandas, Exception): - print('pandas import error:', pandas) -else: - print('pandas:', pandas.__version__) - -try: - import sklearn - print('sklearn:', sklearn.__version__) -except Exception: - pass diff --git a/roadcast/command.txt b/roadcast/command.txt index b36db8e..a6e534c 100644 --- a/roadcast/command.txt +++ b/roadcast/command.txt @@ -1,4 +1,26 @@ train the model: python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 512,256 --epochs 8 --batch-size 256 --feature-engineer --weight-decay 1e-5 --seed 42 -python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 1024,512 --epochs 12 --batch-size 256 --lr 1e-3 --lr-step-size 4 --lr-gamma 0.5 --feature-engineer --weight-decay 1e-5 --seed 42 \ No newline at end of file +python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 1024,512 --epochs 12 --batch-size 256 --lr 1e-3 --lr-step-size 4 --lr-gamma 0.5 --feature-engineer --weight-decay 1e-5 --seed 42 + +# train with outputs saved to output/ +python train.py data.csv --model-type mlp --generate-labels --label-method kmeans --n-buckets 50 --hidden-dims 512,256 --epochs 8 --batch-size 256 --feature-engineer --weight-decay 1e-5 --seed 42 --output-dir output/ + +# evaluate and visualize: +python evaluate_and_visualize.py \ + --checkpoint path/to/checkpoint.pt \ + --data data.csv \ + --label-col original_label_column_name \ + --batch-size 256 \ + --sample-index 5 \ + --plot + +# evaluate +python evaluate_and_visualize.py --checkpoint output/model.pth --data data.csv --label-col label --plot --sample-index 5 + +# If you used generated labels during training and train.py saved metadata, +# the evaluator will prefer generated labels saved in label_info.json inside the checkpoint dir. + +# fetch weather (placeholder) +# from openweather_client import fetch_road_risk +# print(fetch_road_risk(37.7749, -122.4194)) \ No newline at end of file diff --git a/roadcast/debug_labels.py b/roadcast/debug_labels.py deleted file mode 100644 index 7529efd..0000000 --- a/roadcast/debug_labels.py +++ /dev/null @@ -1,76 +0,0 @@ -import pandas as pd -import hashlib -from data import _normalize_str, _normalize_date - -p='data.csv' -df=pd.read_csv(p, nrows=50, low_memory=False) -print('Columns:', list(df.columns)) - -colmap = {c.lower(): c for c in df.columns} - -def get_col(*candidates): - for cand in candidates: - key = cand.lower() - if key in colmap: - return colmap[key] - return None - -report_col = get_col('report_dat', 'reportdate', 'fromdate', 'lastupdatedate') -lat_col = get_col('latitude', 'mpdlatitude', 'lat') -lon_col = get_col('longitude', 'mpdlongitude', 'lon') -street1_col = get_col('street1', 'address', 'mar_address', 'nearestintstreetname') -street2_col = get_col('street2', 'nearestintstreetname') -ward_col = get_col('ward') -inj_cols = [c for c in df.columns if 'INJUR' in c.upper()] -fat_cols = [c for c in df.columns if 'FATAL' in c.upper()] -uid = get_col('crimeid', 'eventid', 'objectid', 'ccn') - -print('Resolved columns:') -print('report_col=', report_col) -print('lat_col=', lat_col) -print('lon_col=', lon_col) -print('street1_col=', street1_col) -print('street2_col=', street2_col) -print('ward_col=', ward_col) -print('inj_cols=', inj_cols[:10]) -print('fat_cols=', fat_cols[:10]) -print('uid=', uid) - -for i, row in df.iterrows(): - parts = [] - parts.append(_normalize_date(row.get(report_col, '') if report_col else '')) - lat = row.get(lat_col, '') if lat_col else '' - lon = row.get(lon_col, '') if lon_col else '' - try: - parts.append(str(round(float(lat), 5)) if pd.notna(lat) and lat != '' else '') - except Exception: - parts.append('') - try: - parts.append(str(round(float(lon), 5)) if pd.notna(lon) and lon != '' else '') - except Exception: - parts.append('') - parts.append(_normalize_str(row.get(street1_col, '') if street1_col else '')) - parts.append(_normalize_str(row.get(street2_col, '') if street2_col else '')) - parts.append(_normalize_str(row.get(ward_col, '') if ward_col else '')) - inj_sum = 0 - for c in inj_cols: - try: - v = row.get(c, 0) - inj_sum += int(v) if pd.notna(v) and v != '' else 0 - except Exception: - pass - parts.append(str(inj_sum)) - fat_sum = 0 - for c in fat_cols: - try: - v = row.get(c, 0) - fat_sum += int(v) if pd.notna(v) and v != '' else 0 - except Exception: - pass - parts.append(str(fat_sum)) - if uid: - parts.append(str(row.get(uid, ''))) - s='|'.join(parts) - h=hashlib.md5(s.encode('utf-8')).hexdigest() - val=int(h,16)%100 - print(i, 'label=', val, 's="'+s+'"') diff --git a/roadcast/evaluate_and_visualize.py b/roadcast/evaluate_and_visualize.py new file mode 100644 index 0000000..efbae0e --- /dev/null +++ b/roadcast/evaluate_and_visualize.py @@ -0,0 +1,193 @@ +import argparse +import json +import os +import numpy as np +import pandas as pd +import torch +import torch.nn.functional as F +from sklearn.metrics import accuracy_score, classification_report +import matplotlib.pyplot as plt + +# Minimal helper: try to reconstruct the model if checkpoint stores config, else attempt full-model load. +def load_checkpoint(checkpoint_path, model_builder=None, device="cpu"): + ckpt = torch.load(checkpoint_path, map_location=device) + # if checkpoint contains state_dict + model_config, try to rebuild using models.create_model + if isinstance(ckpt, dict) and "model_state_dict" in ckpt: + builder = model_builder + if builder is None: + try: + from models import create_model as _create_model + builder = lambda cfg: _create_model(device=device, model_type=cfg.get("model_type", "mlp") if "model_type" in cfg else "mlp", input_dim=cfg.get("input_dim"), num_classes=cfg.get("num_classes"), hidden_dims=cfg.get("hidden_dims")) + except Exception: + builder = None + if builder is not None and "model_config" in ckpt: + model = builder(ckpt.get("model_config", {})) + model.load_state_dict(ckpt["model_state_dict"]) + model.to(device).eval() + meta = {k: v for k, v in ckpt.items() if k not in ("model_state_dict",)} + return model, meta + else: + # try to load full model object or raise + try: + model = ckpt + model.to(device).eval() + return model, {} + except Exception: + raise RuntimeError("Checkpoint contains model_state_dict but cannot reconstruct model; provide model_builder.") + else: + # maybe the full model object was saved + try: + model = ckpt + model.to(device).eval() + return model, {} + except Exception as e: + raise RuntimeError(f"Can't load checkpoint automatically: {e}") + +def prepare_features(df, feature_cols=None): + if feature_cols is None: + # assume all columns except label are features + return df.drop(columns=[c for c in df.columns if c.endswith("label")], errors='ignore').values.astype(np.float32) + return df[feature_cols].values.astype(np.float32) + +def plot_sample(x, true_label, pred_label): + x = np.asarray(x) + title = f"true: {true_label} pred: {pred_label}" + if x.ndim == 1: + n = x.size + sq = int(np.sqrt(n)) + if sq * sq == n: + plt.imshow(x.reshape(sq, sq), cmap="gray") + plt.title(title) + plt.axis("off") + plt.show() + return + if x.size <= 3: + plt.bar(range(x.size), x) + plt.title(title) + plt.show() + return + # fallback: plot first 200 dims as line + plt.plot(x[:200]) + plt.title(title + " (first 200 dims)") + plt.show() + return + elif x.ndim == 2: + plt.imshow(x, aspect='auto') + plt.title(title) + plt.show() + return + else: + print("Sample too high-dim to plot, printing summary:") + print("mean", x.mean(), "std", x.std()) + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--checkpoint", required=True, help="Path to saved checkpoint (.pt)") + p.add_argument("--data", required=True, help="CSV with features and optional label column") + p.add_argument("--label-col", default=None, help="Original label column name in CSV (if present)") + p.add_argument("--batch-size", type=int, default=256) + p.add_argument("--sample-index", type=int, default=0, help="Index of a sample to plot") + p.add_argument("--plot", action="store_true") + p.add_argument("--device", default="cpu") + args = p.parse_args() + + device = args.device + # If your project has a known model class, replace model_builder with a lambda that instantiates it. + model_builder = None + + # load checkpoint + model, meta = load_checkpoint(args.checkpoint, model_builder=model_builder, device=device) + + # try to discover preprocess_meta and label_info + ckpt_dir = os.path.dirname(args.checkpoint) + preprocess_meta = None + meta_preprocess_path = os.path.join(ckpt_dir, meta.get("preprocess_meta", "")) if isinstance(meta, dict) else None + if meta_preprocess_path and os.path.exists(meta_preprocess_path): + try: + import numpy as _np + d = _np.load(meta_preprocess_path, allow_pickle=True) + preprocess_meta = { + "feature_columns": [str(x) for x in d["feature_columns"].tolist()], + "means": d["means"].astype(np.float32), + "stds": d["stds"].astype(np.float32), + } + print(f"Loaded preprocess meta from {meta_preprocess_path}") + except Exception: + preprocess_meta = None + + # prefer label_col from CSV, otherwise load saved assignments if present + y_true = None + if args.label_col and args.label_col in df.columns: + y_true = df[args.label_col].values + else: + # check label_info from checkpoint dir + label_info_path = os.path.join(ckpt_dir, "label_info.json") + label_info = {} + if os.path.exists(label_info_path): + with open(label_info_path, "r") as f: + label_info = json.load(f) + elif isinstance(meta, dict) and "label_info" in meta: + label_info = meta["label_info"] + if "assignments" in label_info: + y_true = np.array(label_info["assignments"]) + elif "assignments_file" in label_info: + try: + import numpy as _np + arr = _np.load(os.path.join(ckpt_dir, label_info["assignments_file"])) + y_true = arr["assignments"] + except Exception: + pass + + # prepare features: if preprocess_meta is present use its feature_columns and scaling + if preprocess_meta is not None: + feature_cols = preprocess_meta["feature_columns"] + feature_df = df[feature_cols] + X = feature_df.values.astype(np.float32) + # apply scaling + means = preprocess_meta["means"] + stds = preprocess_meta["stds"] + stds[stds == 0] = 1.0 + X = (X - means) / stds + else: + if args.label_col and args.label_col in df.columns: + feature_df = df.drop(columns=[args.label_col]) + else: + feature_df = df.select_dtypes(include=[np.number]) + X = feature_df.values.astype(np.float32) + + # create DataLoader-like batching for inference + model.to(device) + model.eval() + preds = [] + with torch.no_grad(): + for i in range(0, X.shape[0], args.batch_size): + batch = torch.from_numpy(X[i:i+args.batch_size]).to(device) + out = model(batch) # adapt if your model returns (logits, ...) + if isinstance(out, (tuple, list)): + out = out[0] + probs = F.softmax(out, dim=1) if out.dim() == 2 else out + pred = probs.argmax(dim=1).cpu().numpy() + preds.append(pred) + preds = np.concatenate(preds, axis=0) + + if y_true is not None: + acc = accuracy_score(y_true, preds) + print(f"Accuracy: {acc:.4f}") + print("Classification report:") + print(classification_report(y_true, preds, zero_division=0)) + else: + print("Predictions computed but no true labels available to compute accuracy.") + print("First 20 predictions:", preds[:20]) + + if args.plot: + idx = args.sample_index + if idx < 0 or idx >= X.shape[0]: + print("sample-index out of range") + return + sample_x = X[idx] + true_label = y_true[idx] if y_true is not None else None + pred_label = preds[idx] + plot_sample(sample_x, true_label, pred_label) + +if __name__ == "__main__": + main() diff --git a/roadcast/fit_kmeans.py b/roadcast/fit_kmeans.py deleted file mode 100644 index 0a43828..0000000 --- a/roadcast/fit_kmeans.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Fit k-means centers on CSV numeric features (optionally PCA) and save centers to .npz - -Usage: python fit_kmeans.py data.csv --n-buckets 10 --out kmeans_centers_final.npz --sample 50000 --pca 50 -""" -import argparse -import numpy as np -import pandas as pd - -from data import generate_kmeans_labels - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('csv') - parser.add_argument('--n-buckets', type=int, default=10) - parser.add_argument('--out', default='kmeans_centers_final.npz') - parser.add_argument('--sample', type=int, default=50000, help='max rows to sample for fitting') - parser.add_argument('--pca', type=int, default=0, help='Apply PCA to reduce dims before kmeans (0=none)') - args = parser.parse_args() - - # read numeric columns only to avoid huge memory usage - df = pd.read_csv(args.csv, low_memory=False) - num_df = df.select_dtypes(include=['number']).fillna(0.0) - data = num_df.values.astype(float) - if data.shape[0] == 0 or data.shape[1] == 0: - raise SystemExit('No numeric data found in CSV') - - # sample rows if requested - if args.sample and args.sample < data.shape[0]: - rng = np.random.default_rng(42) - idx = rng.choice(data.shape[0], size=args.sample, replace=False) - sample_data = data[idx] - else: - sample_data = data - - # Use the kmeans implementation via generate_kmeans_labels for fitting centers. - # We'll call the internal function by adapting it here: import numpy locally. - import numpy as _np - - # initialize centers by random sampling - rng = _np.random.default_rng(42) - k = min(args.n_buckets, sample_data.shape[0]) - centers_idx = rng.choice(sample_data.shape[0], size=k, replace=False) - centers = sample_data[centers_idx].astype(float) - - max_iters = 50 - for _ in range(max_iters): - dists = np.linalg.norm(sample_data[:, None, :] - centers[None, :, :], axis=2) - labels = np.argmin(dists, axis=1) - new_centers = np.zeros_like(centers) - counts = np.zeros((centers.shape[0],), dtype=int) - for i, lab in enumerate(labels): - new_centers[lab] += sample_data[i] - counts[lab] += 1 - for kk in range(centers.shape[0]): - if counts[kk] > 0: - new_centers[kk] = new_centers[kk] / counts[kk] - else: - new_centers[kk] = sample_data[rng.integers(0, sample_data.shape[0])] - shift = np.linalg.norm(new_centers - centers, axis=1).max() - centers = new_centers - if shift < 1e-4: - break - - np.savez_compressed(args.out, centers=centers) - print('Saved centers to', args.out) diff --git a/roadcast/kmeans_centers.npz b/roadcast/kmeans_centers.npz deleted file mode 100644 index 5784c04..0000000 Binary files a/roadcast/kmeans_centers.npz and /dev/null differ diff --git a/roadcast/kmeans_centers_all.npz b/roadcast/kmeans_centers_all.npz deleted file mode 100644 index 867fba0..0000000 Binary files a/roadcast/kmeans_centers_all.npz and /dev/null differ diff --git a/roadcast/kmeans_centers_best.npz b/roadcast/kmeans_centers_best.npz deleted file mode 100644 index 2fdd1ce..0000000 Binary files a/roadcast/kmeans_centers_best.npz and /dev/null differ diff --git a/roadcast/kmeans_centers_final.npz b/roadcast/kmeans_centers_final.npz deleted file mode 100644 index 8859f5d..0000000 Binary files a/roadcast/kmeans_centers_final.npz and /dev/null differ diff --git a/roadcast/kmeans_centers_nb10.npz b/roadcast/kmeans_centers_nb10.npz deleted file mode 100644 index 2fdd1ce..0000000 Binary files a/roadcast/kmeans_centers_nb10.npz and /dev/null differ diff --git a/roadcast/model.pth b/roadcast/model.pth index 1cdfe6e..4179705 100644 Binary files a/roadcast/model.pth and b/roadcast/model.pth differ diff --git a/roadcast/models.py b/roadcast/models.py index a3e7aee..a415906 100644 --- a/roadcast/models.py +++ b/roadcast/models.py @@ -1,5 +1,60 @@ import torch import torch.nn as nn +import math +from typing import Union, Iterable +import numpy as np +import torch as _torch + +def accidents_to_bucket(count: Union[int, float, Iterable], + max_count: int = 20000, + num_bins: int = 10) -> Union[int, list, _torch.Tensor, np.ndarray]: + """ + Map accident counts to simple buckets 1..num_bins (equal-width). + Example: max_count=20000, num_bins=10 -> bin width = 2000 + 0-1999 -> 1, 2000-3999 -> 2, ..., 18000-20000 -> 10 + + Args: + count: single value or iterable (list/numpy/torch). Values <=0 map to 1, values >= max_count map to num_bins. + max_count: expected maximum count (top of highest bin). + num_bins: number of buckets (default 10). + + Returns: + Same type as input (int for scalar, list/numpy/torch for iterables) with values in 1..num_bins. + """ + width = max_count / float(num_bins) + def _bucket_scalar(x): + # clamp + x = 0.0 if x is None else float(x) + if x <= 0: + return 1 + if x >= max_count: + return num_bins + return int(x // width) + 1 + + # scalar int/float + if isinstance(count, (int, float)): + return _bucket_scalar(count) + + # torch tensor + if isinstance(count, _torch.Tensor): + x = count.clone().float() + x = _torch.clamp(x, min=0.0, max=float(max_count)) + buckets = (x // width).to(_torch.long) + 1 + buckets = _torch.clamp(buckets, min=1, max=num_bins) + return buckets + + # numpy array + if isinstance(count, np.ndarray): + x = np.clip(count.astype(float), 0.0, float(max_count)) + buckets = (x // width).astype(int) + 1 + return np.clip(buckets, 1, num_bins) + + # generic iterable -> list + if isinstance(count, Iterable): + return [ _bucket_scalar(float(x)) for x in count ] + + # fallback + return _bucket_scalar(float(count)) class SimpleCNN(nn.Module): @@ -18,7 +73,16 @@ class SimpleCNN(nn.Module): with torch.no_grad(): dummy = torch.zeros(1, *input_size) feat = self.features(dummy) - flat_features = int(feat.numel() / feat.shape[0]) + # flat_features was previously computed as: + # int(feat.numel() / feat.shape[0]) + # Explanation: + # feat.shape == (N, C, H, W) (for image inputs) + # feat.numel() == N * C * H * W + # dividing by N (feat.shape[0]) yields C * H * W, i.e. flattened size per sample + # Clearer alternative using tensor shape: + flat_features = int(torch.prod(torch.tensor(feat.shape[1:])).item()) + # If you need the linear index mapping for coordinates (c, h, w): + # idx = c * (H * W) + h * W + w self.classifier = nn.Sequential( nn.Flatten(), diff --git a/roadcast/openweather_client.py b/roadcast/openweather_client.py new file mode 100644 index 0000000..a4036b9 --- /dev/null +++ b/roadcast/openweather_client.py @@ -0,0 +1,117 @@ +"""OpenWeather / Road Risk client. + +Provides: +- fetch_weather(lat, lon, api_key=None) +- fetch_road_risk(lat, lon, api_key=None, roadrisk_url=None, extra_params=None) + +Never hardcode API keys in source. Provide via api_key argument or set OPENWEATHER_API_KEY / OPENWEATHER_KEY env var. +""" +import os +from typing import Tuple, Dict, Any, Optional +import requests + +def _get_api_key(explicit_key: Optional[str] = None) -> Optional[str]: + if explicit_key: + return explicit_key + return os.environ.get("OPENWEATHER_API_KEY") or os.environ.get("OPENWEATHER_KEY") + +BASE_URL = "https://api.openweathermap.org/data/2.5" + + +def fetch_weather(lat: float, lon: float, params: Optional[dict] = None, api_key: Optional[str] = None) -> dict: + """Call standard OpenWeather /weather endpoint and return parsed JSON.""" + key = _get_api_key(api_key) + if key is None: + raise RuntimeError("Set OPENWEATHER_API_KEY or OPENWEATHER_KEY or pass api_key") + q = {"lat": lat, "lon": lon, "appid": key, "units": "metric"} + if params: + q.update(params) + resp = requests.get(f"{BASE_URL}/weather", params=q, timeout=10) + resp.raise_for_status() + return resp.json() + + +def fetch_road_risk(lat: float, lon: float, extra_params: Optional[dict] = None, api_key: Optional[str] = None, roadrisk_url: Optional[str] = None) -> Tuple[dict, Dict[str, Any]]: + """ + Call OpenWeather /roadrisk endpoint (or provided roadrisk_url) and return (raw_json, features). + + features will always include 'road_risk_score' (float). Other numeric fields are included when present. + The implementation: + - prefers explicit numeric keys (road_risk_score, risk_score, score, risk) + - if absent, collects top-level numeric fields and averages common contributors + - if still absent, falls back to a simple weather-derived heuristic using /weather + + Note: Do not commit API keys. Pass api_key or set env var. + """ + key = _get_api_key(api_key) + if key is None: + raise RuntimeError("Set OPENWEATHER_API_KEY or OPENWEATHER_KEY or pass api_key") + + params = {"lat": lat, "lon": lon, "appid": key} + if extra_params: + params.update(extra_params) + + url = roadrisk_url or f"{BASE_URL}/roadrisk" + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + data = resp.json() + + features: Dict[str, Any] = {} + risk: Optional[float] = None + + # direct candidates + for candidate in ("road_risk_score", "risk_score", "risk", "score"): + if isinstance(data, dict) and candidate in data: + try: + risk = float(data[candidate]) + features[candidate] = risk + break + except Exception: + pass + + # if no direct candidate, collect numeric top-level fields + if risk is None and isinstance(data, dict): + numeric_fields = {} + for k, v in data.items(): + if isinstance(v, (int, float)): + numeric_fields[k] = float(v) + features.update(numeric_fields) + # try averaging common contributors if present + contributors = [] + for name in ("precipitation", "rain", "snow", "visibility", "wind_speed"): + if name in data and isinstance(data[name], (int, float)): + contributors.append(float(data[name])) + if contributors: + # average contributors -> risk proxy + risk = float(sum(contributors) / len(contributors)) + + # fallback: derive crude risk from /weather + if risk is None: + try: + w = fetch_weather(lat, lon, api_key=key) + main = w.get("main", {}) + wind = w.get("wind", {}) + weather = w.get("weather", [{}])[0] + # heuristic: rain + high wind + low visibility + derived = 0.0 + if isinstance(weather.get("main", ""), str) and "rain" in weather.get("main", "").lower(): + derived += 1.0 + if (wind.get("speed") or 0) > 6.0: + derived += 0.5 + if (w.get("visibility") or 10000) < 5000: + derived += 1.0 + risk = float(derived) + features.update({ + "temp": main.get("temp"), + "humidity": main.get("humidity"), + "wind_speed": wind.get("speed"), + "visibility": w.get("visibility"), + "weather_main": weather.get("main"), + "weather_id": weather.get("id"), + }) + except Exception: + # cannot derive anything; set neutral 0.0 + risk = 0.0 + + features["road_risk_score"] = float(risk) + return data, features diff --git a/roadcast/run_batch_inference.py b/roadcast/run_batch_inference.py deleted file mode 100644 index 119610d..0000000 --- a/roadcast/run_batch_inference.py +++ /dev/null @@ -1,113 +0,0 @@ -import os -import argparse -import pandas as pd -import numpy as np -import time - -import openweather_inference as owi - - -def find_column(df_cols, candidates): - cmap = {c.lower(): c for c in df_cols} - for cand in candidates: - if cand.lower() in cmap: - return cmap[cand.lower()] - return None - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('csv', help='Path to data CSV (e.g., data.csv)') - parser.add_argument('--out', default='inference_results.csv') - parser.add_argument('--lat-col', default=None) - parser.add_argument('--lon-col', default=None) - parser.add_argument('--date-col', default=None) - parser.add_argument('--model', default='model.pth') - parser.add_argument('--centers', default='kmeans_centers_all.npz') - parser.add_argument('--preprocess-meta', default=None) - parser.add_argument('--api-key', default=None) - parser.add_argument('--live', action='store_true', help='If set, call external RoadRisk/OpenWeather per row') - parser.add_argument('--roadrisk-url', default=None, help='Optional per-request RoadRisk URL to use when --live') - parser.add_argument('--subset', type=int, default=0, help='Process only first N rows for testing') - args = parser.parse_args() - - df = pd.read_csv(args.csv, low_memory=False) - nrows = args.subset if args.subset and args.subset > 0 else len(df) - df = df.iloc[:nrows].copy() - - # find sensible columns - lat_col = args.lat_col or find_column(df.columns, ['latitude', 'lat', 'mpdlatitude']) - lon_col = args.lon_col or find_column(df.columns, ['longitude', 'lon', 'mpdlongitude']) - date_col = args.date_col or find_column(df.columns, ['report_dat', 'reportdate', 'fromdate', 'lastupdatedate', 'date', 'occur_date']) - - if lat_col is None or lon_col is None: - raise SystemExit('Could not find latitude/longitude columns automatically. Pass --lat-col and --lon-col.') - - print(f'Using lat column: {lat_col}, lon column: {lon_col}, date column: {date_col}') - - # eager init caches - status = owi.init_inference(model_path=args.model, centers_path=args.centers, preprocess_meta=args.preprocess_meta) - print('init status:', status) - - results = [] - t0 = time.time() - for i, row in df.iterrows(): - lat = row.get(lat_col) - lon = row.get(lon_col) - dt = row.get(date_col) if date_col else None - - try: - if args.live: - # call the full pipeline which may hit remote API - out = owi.predict_from_openweather(lat, lon, dt_iso=dt, street=None, api_key=args.api_key, train_csv=None, preprocess_meta=args.preprocess_meta, model_path=args.model, centers_path=args.centers, roadrisk_url=args.roadrisk_url) - else: - # local-only path: build row, prepare features using preprocess_meta, and run cached model - df_row = owi.build_row(lat, lon, dt_iso=dt, street=None, extra_weather=None) - x_tensor, feature_columns = owi.prepare_features(df_row, train_csv=None, preprocess_meta=args.preprocess_meta) - # ensure model cached - if owi._CACHED_MODEL is None: - owi.init_inference(model_path=args.model, centers_path=args.centers, preprocess_meta=args.preprocess_meta) - model = owi._CACHED_MODEL - centers = owi._CACHED_CENTERS - device = 'cuda' if __import__('torch').cuda.is_available() else 'cpu' - model.to(device) - xt = x_tensor.to(device) - import torch - import torch.nn.functional as F - with torch.no_grad(): - logits = model(xt) - probs = F.softmax(logits, dim=1).cpu().numpy()[0] - pred_idx = int(probs.argmax()) - confidence = float(probs.max()) - out = {'pred_cluster': pred_idx, 'confidence': confidence, 'probabilities': probs.tolist(), 'centroid': centers[pred_idx].tolist() if centers is not None else None, 'feature_columns': feature_columns} - except Exception as e: - out = {'error': str(e)} - - # combine row and output into flat result - res = { - 'orig_index': i, - 'lat': lat, - 'lon': lon, - 'datetime': str(dt), - } - if 'error' in out: - res.update({'error': out['error']}) - else: - res.update({ - 'pred_cluster': int(out.get('pred_cluster')), - 'confidence': float(out.get('confidence')), - }) - results.append(res) - - if (len(results) % 50) == 0: - print(f'Processed {len(results)}/{nrows} rows...') - - elapsed = time.time() - t0 - print(f'Finished {len(results)} rows in {elapsed:.2f}s') - out_df = pd.DataFrame(results) - out_df.to_csv(args.out, index=False) - print('Wrote', args.out) - - -if __name__ == '__main__': - main() diff --git a/roadcast/scripts/test_roadrisk_local.sh b/roadcast/scripts/test_roadrisk_local.sh new file mode 100644 index 0000000..a29da9c --- /dev/null +++ b/roadcast/scripts/test_roadrisk_local.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +# Run from repo root. Make sure your Flask app is running (python app.py) first. +# Export your OpenWeather key (do NOT commit it into code): +# export OPENWEATHER_API_KEY="your_real_key_here" + +HOST=${HOST:-http://127.0.0.1:5000} + +echo "Test 1: env var key (no explicit api_key in payload)" +curl -s -X POST ${HOST}/predict-roadrisk -H "Content-Type: application/json" -d '{"lat":38.9,"lon":-77.0}' | jq + +echo "Test 2: explicit api_key in payload (overrides env var)" +curl -s -X POST ${HOST}/predict-roadrisk -H "Content-Type: application/json" -d '{"lat":38.9,"lon":-77.0,"api_key":"MY_OVERRIDE_KEY"}' | jq diff --git a/roadcast/tests/test_openweather_client.py b/roadcast/tests/test_openweather_client.py new file mode 100644 index 0000000..c1ad426 --- /dev/null +++ b/roadcast/tests/test_openweather_client.py @@ -0,0 +1,70 @@ +import os +import builtins +import pytest +from unittest.mock import patch, MagicMock + +from openweather_client import fetch_road_risk, fetch_weather, _get_api_key + +class DummyResp: + def __init__(self, json_data, status=200): + self._json = json_data + self.status = status + def raise_for_status(self): + if self.status >= 400: + raise Exception("HTTP error") + def json(self): + return self._json + +def make_get(mock_json, status=200): + return MagicMock(return_value=DummyResp(mock_json, status=status)) + +def test_get_api_key_prefers_explicit(monkeypatch): + # explicit key should be returned + assert _get_api_key("EXPLICIT") == "EXPLICIT" + monkeypatch.delenv("OPENWEATHER_API_KEY", raising=False) + monkeypatch.delenv("OPENWEATHER_KEY", raising=False) + assert _get_api_key(None) is None + +def test_fetch_road_risk_direct_score(monkeypatch): + # roadrisk returns direct numeric field + resp_json = {"road_risk_score": 2.5, "detail": "ok"} + with patch("openweather_client.requests.get", make_get(resp_json)): + data, features = fetch_road_risk(1.0, 2.0, api_key="TESTKEY") + assert data["road_risk_score"] == 2.5 + assert features["road_risk_score"] == 2.5 + +def test_fetch_road_risk_numeric_fields(monkeypatch): + # roadrisk returns top-level numeric fields, expect average of contributors + resp_json = {"precipitation": 4.0, "visibility": 2000, "other": 3} + with patch("openweather_client.requests.get", make_get(resp_json)): + data, features = fetch_road_risk(1.0, 2.0, api_key="TESTKEY") + # contributors list contains precipitation and visibility and maybe others; check road_risk_score numeric + assert "road_risk_score" in features + assert features["road_risk_score"] > 0 + +def test_fetch_road_risk_fallback_to_weather(monkeypatch): + # roadrisk returns empty dict; requests.get called first for roadrisk then for weather. + seq = [ + DummyResp({}, status=200), # roadrisk empty + DummyResp({ + "main": {"temp": 10, "humidity": 80}, + "wind": {"speed": 7.5}, + "visibility": 3000, + "weather": [{"main": "Rain"}] + }, status=200) + ] + # iterate returns next DummyResp + def side_effect(url, params=None, timeout=None): + return seq.pop(0) + with patch("openweather_client.requests.get", side_effect=side_effect): + data, features = fetch_road_risk(1.0, 2.0, api_key="TESTKEY") + # derived heuristic: rain=1.0 + wind>6 => 0.5 + visibility<5000 =>1.0 => total 2.5 + assert abs(features["road_risk_score"] - 2.5) < 1e-6 + assert features["weather_main"] == "Rain" or features.get("weather_main") == "Rain" + +def test_fetch_road_risk_missing_api_key(monkeypatch): + # ensure no env var present + monkeypatch.delenv("OPENWEATHER_API_KEY", raising=False) + monkeypatch.delenv("OPENWEATHER_KEY", raising=False) + with pytest.raises(RuntimeError): + fetch_road_risk(1.0, 2.0) diff --git a/roadcast/tests/test_predict_roadrisk_endpoint.py b/roadcast/tests/test_predict_roadrisk_endpoint.py new file mode 100644 index 0000000..56a7f0f --- /dev/null +++ b/roadcast/tests/test_predict_roadrisk_endpoint.py @@ -0,0 +1,45 @@ +import json +import pytest +from unittest.mock import patch + +def test_predict_roadrisk_forwards_api_key(monkeypatch): + # import app after monkeypatching env to avoid side effects + from app import app + client = app.test_client() + + fake_response = {"road_risk": "ok", "risk_index": 5} + + # patch the inference function to capture args and return a dummy response + with patch("openweather_inference.predict_from_openweather") as mock_predict: + mock_predict.return_value = fake_response + + payload = {"lat": 38.9, "lon": -77.0, "api_key": "EXPLICIT_TEST_KEY"} + rv = client.post("/predict-roadrisk", data=json.dumps(payload), content_type="application/json") + assert rv.status_code == 200 + data = rv.get_json() + assert data == fake_response + + # assert that our mocked predict_from_openweather was called and api_key forwarded + assert mock_predict.called + _, called_kwargs = mock_predict.call_args + assert called_kwargs.get("api_key") == "EXPLICIT_TEST_KEY" + +def test_predict_roadrisk_uses_env_key_when_not_provided(monkeypatch): + from app import app + client = app.test_client() + + fake_response = {"road_risk": "ok", "risk_index": 3} + monkeypatch.setenv("OPENWEATHER_API_KEY", "ENV_TEST_KEY") + + with patch("openweather_inference.predict_from_openweather") as mock_predict: + mock_predict.return_value = fake_response + + payload = {"lat": 38.9, "lon": -77.0} # no api_key in payload + rv = client.post("/predict-roadrisk", data=json.dumps(payload), content_type="application/json") + assert rv.status_code == 200 + data = rv.get_json() + assert data == fake_response + + assert mock_predict.called + _, called_kwargs = mock_predict.call_args + assert called_kwargs.get("api_key") == "ENV_TEST_KEY" diff --git a/roadcast/tmp_infer.csv b/roadcast/tmp_infer.csv deleted file mode 100644 index 1af6b82..0000000 --- a/roadcast/tmp_infer.csv +++ /dev/null @@ -1,11 +0,0 @@ -orig_index,lat,lon,datetime,error -0,38.91557,-77.031697,2011/03/06 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -1,38.875558,-77.017556,2011/03/06 08:45:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -2,38.872976,-77.016987,2011/03/05 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -3,38.929433,-77.003943,2011/03/08 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -4,38.89674,-77.027034,2011/03/08 17:18:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -5,38.89093,-76.993494,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -6,38.908478,-77.040086,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -7,38.846563,-76.976504,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -8,38.894783,-77.01292,2011/03/12 18:30:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats -9,38.934204,-77.034567,2011/03/14 04:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats diff --git a/roadcast/train.py b/roadcast/train.py index c646093..6646594 100644 --- a/roadcast/train.py +++ b/roadcast/train.py @@ -9,8 +9,10 @@ from data import ImageFolderDataset, CSVDataset from models import create_model -def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_classes=10, model_type='cnn', csv_label='label', generate_labels=False, n_buckets=100, label_method='md5', label_store=None, feature_engineer=False, lat_lon_bins=20, nrows=None, seed=42, hidden_dims=None, weight_decay=0.0): +def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_classes=10, model_type='mlp', csv_label='label', generate_labels=False, n_buckets=100, label_method='md5', label_store=None, feature_engineer=False, lat_lon_bins=20, nrows=None, seed=42, hidden_dims=None, weight_decay=0.0, output_dir=None): device = device or ('cuda' if torch.cuda.is_available() else 'cpu') + output_dir = output_dir or os.getcwd() + os.makedirs(output_dir, exist_ok=True) # Detect CSV vs folder dataset if os.path.isfile(dataset_root) and dataset_root.lower().endswith('.csv'): dataset = CSVDataset(dataset_root, @@ -45,7 +47,7 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class # persist preprocessing metadata so inference can reuse identical stats try: import numpy as _np - meta_path = os.path.join(os.getcwd(), 'preprocess_meta.npz') + meta_path = os.path.join(output_dir, 'preprocess_meta.npz') _np.savez_compressed(meta_path, feature_columns=_np.array(dataset.feature_columns, dtype=object), means=dataset.feature_means, stds=dataset.feature_stds) print(f'Saved preprocess meta to {meta_path}') except Exception: @@ -60,6 +62,35 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class model_num_classes = n_buckets else: model_num_classes = n_buckets if generate_labels else num_classes + # If labels were generated, save label metadata + assignments (if not huge) + if generate_labels: + try: + label_info = { + "generated": True, + "label_method": label_method, + "n_buckets": n_buckets, + } + # save per-sample assignments if dataset exposes them + if hasattr(dataset, "labels"): + try: + # convert to list (JSON serializable) + assignments = dataset.labels.cpu().numpy().tolist() if hasattr(dataset.labels, "cpu") else dataset.labels.tolist() + # if too large, save as .npz instead + if len(assignments) <= 100000: + label_info["assignments"] = assignments + else: + import numpy as _np + arr_path = os.path.join(output_dir, "label_assignments.npz") + _np.savez_compressed(arr_path, assignments=_np.array(assignments)) + label_info["assignments_file"] = os.path.basename(arr_path) + except Exception: + pass + with open(os.path.join(output_dir, "label_info.json"), "w") as f: + import json + json.dump(label_info, f) + print(f"Saved label_info to {os.path.join(output_dir, 'label_info.json')}") + except Exception: + pass # parse hidden_dims if provided by caller (tuple or list) model = create_model(device=device, model_type='mlp', input_dim=input_dim, num_classes=model_num_classes, hidden_dims=hidden_dims) else: @@ -112,11 +143,23 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class # save best if val_acc > best_val_acc: - out_path = os.path.join(os.getcwd(), 'model.pth') + out_path = os.path.join(output_dir, 'model.pth') + # include useful metadata so evaluator can reconstruct + meta = { + 'model_state_dict': model.state_dict(), + 'model_type': model_type, + 'model_config': { + 'input_dim': input_dim if model_type == 'mlp' else None, + 'num_classes': model_num_classes, + 'hidden_dims': hidden_dims, + } + } if hasattr(dataset, 'class_to_idx'): - meta = {'model_state_dict': model.state_dict(), 'class_to_idx': dataset.class_to_idx} - else: - meta = {'model_state_dict': model.state_dict()} + meta['class_to_idx'] = dataset.class_to_idx + # also record paths to saved preprocess and label info (if present) + meta['preprocess_meta'] = os.path.basename(os.path.join(output_dir, 'preprocess_meta.npz')) + if os.path.exists(os.path.join(output_dir, 'label_info.json')): + meta['label_info'] = json.load(open(os.path.join(output_dir, 'label_info.json'), 'r')) torch.save(meta, out_path) best_val_acc = val_acc best_path = out_path @@ -127,6 +170,7 @@ def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_class if __name__ == '__main__': import argparse + import json parser = argparse.ArgumentParser() parser.add_argument('data_root') parser.add_argument('--epochs', type=int, default=3) @@ -144,6 +188,7 @@ if __name__ == '__main__': parser.add_argument('--seed', type=int, default=42, help='Random seed for experiments') parser.add_argument('--hidden-dims', type=str, default='', help='Comma-separated hidden dims for MLP, e.g. "256,128"') parser.add_argument('--weight-decay', type=float, default=0.0, help='Weight decay (L2) for optimizer') + parser.add_argument('--output-dir', default='.', help='Directory to save output files') args = parser.parse_args() data_root = args.data_root nrows = args.subset if args.subset > 0 else None @@ -154,5 +199,14 @@ if __name__ == '__main__': hidden_dims = tuple(int(x) for x in args.hidden_dims.split(',') if x.strip()) except Exception: hidden_dims = None - train(data_root, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, model_type=args.model_type, csv_label=args.csv_label, generate_labels=args.generate_labels, n_buckets=args.n_buckets, label_method=args.label_method, label_store=args.label_store, feature_engineer=args.feature_engineer, lat_lon_bins=args.lat_lon_bins, nrows=nrows, seed=args.seed, hidden_dims=hidden_dims, weight_decay=args.weight_decay) + if args.generate_labels: + os.makedirs(args.output_dir, exist_ok=True) + label_info = { + "generated": True, + "label_method": args.label_method, + "n_buckets": args.n_buckets, + } + with open(os.path.join(args.output_dir, "label_info.json"), "w") as f: + json.dump(label_info, f) + train(data_root, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, model_type=args.model_type, csv_label=args.csv_label, generate_labels=args.generate_labels, n_buckets=args.n_buckets, label_method=args.label_method, label_store=args.label_store, feature_engineer=args.feature_engineer, lat_lon_bins=args.lat_lon_bins, nrows=nrows, seed=args.seed, hidden_dims=hidden_dims, weight_decay=args.weight_decay, output_dir=args.output_dir)