diff --git a/.gitignore b/.gitignore index 3918d97..0c1de92 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,5 @@ next-env.d.ts package-lock.json .next/ -.venv/ \ No newline at end of file +.venv/ +roadcast/data.csv diff --git a/roadcast/README.md b/roadcast/README.md new file mode 100644 index 0000000..66195d6 --- /dev/null +++ b/roadcast/README.md @@ -0,0 +1,74 @@ +# RoAdCast - Flask + PyTorch CNN starter + +This project contains a minimal Flask app and a small PyTorch CNN scaffold so you can train and run a model directly from VS Code. + +Quick setup (Windows PowerShell): + +1. Create and activate a virtual environment + +```powershell +python -m venv .venv +.\.venv\Scripts\Activate.ps1 +``` + +2. Install dependencies + +```powershell +pip install -r requirements.txt +``` + + +3. Dataset layout + +Image dataset (folder-per-class): + +data/ + class1/ + img1.jpg + class2/ + img2.jpg + +CSV dataset (single file): + +data.csv (expects a `label` column and numeric feature columns) + +Train commands: + +Image training (default cnn): + +```powershell +python train.py data --epochs 5 --batch-size 16 +``` + +CSV/tabular training (MLP): + +```powershell +python train.py data.csv --model-type mlp --epochs 20 --batch-size 64 +``` + +The model will be saved as `model.pth` in the repo root (best validation checkpoint). + +Run the Flask app (for local testing): + +```powershell +python app.py +``` + +Predict using curl (or Postman). Example with curl in PowerShell: + +```powershell +curl -X POST -F "image=@path\to\image.jpg" http://127.0.0.1:5000/predict +``` + +VS Code tips + +- Open this folder in VS Code. +- Use the Python extension and select the `.venv` interpreter. +- Use the Run panel to add a launch configuration that runs `app.py` or `train.py`. +- For long training runs, run training in the terminal (not the debugger) and monitor logs. + +Notes & next steps + +- The SimpleCNN uses 224x224 input and expects at least two maxpool steps; adjust `models.py` if you want smaller inputs. +- Add better transforms and augmentation in `data.py` for better performance. +- If GPU is available, PyTorch will use it automatically if installed with CUDA. diff --git a/roadcast/__pycache__/app.cpython-312.pyc b/roadcast/__pycache__/app.cpython-312.pyc new file mode 100644 index 0000000..2fae411 Binary files /dev/null and b/roadcast/__pycache__/app.cpython-312.pyc differ diff --git a/roadcast/__pycache__/data.cpython-312.pyc b/roadcast/__pycache__/data.cpython-312.pyc new file mode 100644 index 0000000..82122df Binary files /dev/null and b/roadcast/__pycache__/data.cpython-312.pyc differ diff --git a/roadcast/__pycache__/inference.cpython-312.pyc b/roadcast/__pycache__/inference.cpython-312.pyc new file mode 100644 index 0000000..1d75c20 Binary files /dev/null and b/roadcast/__pycache__/inference.cpython-312.pyc differ diff --git a/roadcast/__pycache__/models.cpython-312.pyc b/roadcast/__pycache__/models.cpython-312.pyc new file mode 100644 index 0000000..25ec296 Binary files /dev/null and b/roadcast/__pycache__/models.cpython-312.pyc differ diff --git a/roadcast/__pycache__/openweather_inference.cpython-312.pyc b/roadcast/__pycache__/openweather_inference.cpython-312.pyc new file mode 100644 index 0000000..ac86aa1 Binary files /dev/null and b/roadcast/__pycache__/openweather_inference.cpython-312.pyc differ diff --git a/roadcast/app.py b/roadcast/app.py new file mode 100644 index 0000000..38d3889 --- /dev/null +++ b/roadcast/app.py @@ -0,0 +1,123 @@ +from flask import Flask, request, jsonify + +app = Flask(__name__) +import os +import threading +import json + +# ML imports are lazy to avoid heavy imports on simple runs + + +@app.route('/get-data', methods=['GET']) +def get_data(): + # Example GET request handler + data = {"message": "Hello from Flask!"} + return jsonify(data) + +@app.route('/post-data', methods=['POST']) +def post_data(): + # Example POST request handler + content = request.json + # Process content or call AI model here + response = {"you_sent": content} + return jsonify(response) + + +@app.route('/train', methods=['POST']) +def train_endpoint(): + """Trigger training. Expects JSON: {"data_root": "path/to/data", "epochs": 3} + Training runs in a background thread and saves model to model.pth in repo root. + """ + payload = request.json or {} + data_root = payload.get('data_root') + epochs = int(payload.get('epochs', 3)) + if not data_root or not os.path.isdir(data_root): + return jsonify({"error": "data_root must be a valid directory path"}), 400 + + def _run_training(): + from train import train + train(data_root, epochs=epochs) + + t = threading.Thread(target=_run_training, daemon=True) + t.start() + return jsonify({"status": "training_started"}) + + +@app.route('/predict', methods=['POST']) +def predict_endpoint(): + """Predict single uploaded image. Expects form-data with file field named 'image'.""" + if 'image' not in request.files: + return jsonify({"error": "no image uploaded (field 'image')"}), 400 + img = request.files['image'] + tmp_path = os.path.join(os.getcwd(), 'tmp_upload.jpg') + img.save(tmp_path) + try: + from inference import load_model, predict_image + model_path = os.path.join(os.getcwd(), 'model.pth') + if not os.path.exists(model_path): + return jsonify({"error": "no trained model found (run /train first)"}), 400 + model, idx_to_class = load_model(model_path) + idx, conf = predict_image(model, tmp_path) + label = idx_to_class.get(idx) if idx_to_class else str(idx) + return jsonify({"label": label, "confidence": conf}) + finally: + try: + os.remove(tmp_path) + except Exception: + pass + + +@app.route('/predict-roadrisk', methods=['POST']) +def predict_roadrisk(): + """Proxy endpoint to predict a roadrisk cluster from lat/lon/datetime. + + Expects JSON body with: {"lat": 38.9, "lon": -77.0, "datetime": "2025-09-27T12:00:00", "roadrisk_url": "https://..."} + If roadrisk_url is not provided the endpoint will call OpenWeather OneCall (requires API key via OPENWEATHER_KEY env var). + """ + payload = request.json or {} + lat = payload.get('lat') + lon = payload.get('lon') + dt = payload.get('datetime') + street = payload.get('street', '') + roadrisk_url = payload.get('roadrisk_url') + api_key = payload.get('api_key') or os.environ.get('OPENWEATHER_KEY') + + if lat is None or lon is None: + return jsonify({"error": "lat and lon are required fields"}), 400 + + try: + from openweather_inference import predict_from_openweather + res = predict_from_openweather(lat, lon, dt_iso=dt, street=street, api_key=api_key, train_csv=os.path.join(os.getcwd(), 'data.csv'), preprocess_meta=None, model_path=os.path.join(os.getcwd(), 'model.pth'), centers_path=os.path.join(os.getcwd(), 'kmeans_centers_all.npz'), roadrisk_url=roadrisk_url) + return jsonify(res) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route('/health', methods=['GET']) +def health(): + """Return status of loaded ML artifacts (model, centers, preprocess_meta).""" + try: + from openweather_inference import init_inference + status = init_inference() + return jsonify({'ok': True, 'artifacts': status}) + except Exception as e: + return jsonify({'ok': False, 'error': str(e)}), 500 + +if __name__ == '__main__': + # eager load model/artifacts at startup (best-effort) + try: + from openweather_inference import init_inference + init_inference() + except Exception: + pass + app.run(debug=True) + +# @app.route('/post-data', methods=['POST']) +# def post_data(): +# content = request.json +# user_input = content.get('input') + +# # Example: Simple echo AI (replace with real AI model code) +# ai_response = f"AI received: {user_input}" + +# return jsonify({"response": ai_response}) diff --git a/roadcast/check_env.py b/roadcast/check_env.py new file mode 100644 index 0000000..5fd7b84 --- /dev/null +++ b/roadcast/check_env.py @@ -0,0 +1,32 @@ +import sys +import importlib + +def safe_import(name): + try: + return importlib.import_module(name) + except Exception as e: + return e + +print('Python:', sys.version.replace('\n',' ')) + +torch = safe_import('torch') +if isinstance(torch, Exception): + print('torch import error:', torch) +else: + print('torch:', torch.__version__) + print('CUDA available:', torch.cuda.is_available()) + if torch.cuda.is_available(): + print('CUDA device count:', torch.cuda.device_count()) + print('Current device name:', torch.cuda.get_device_name(0)) + +pandas = safe_import('pandas') +if isinstance(pandas, Exception): + print('pandas import error:', pandas) +else: + print('pandas:', pandas.__version__) + +try: + import sklearn + print('sklearn:', sklearn.__version__) +except Exception: + pass diff --git a/roadcast/data.py b/roadcast/data.py new file mode 100644 index 0000000..febbf22 --- /dev/null +++ b/roadcast/data.py @@ -0,0 +1,413 @@ +import os +import hashlib +from datetime import datetime +import pandas as pd +import numpy as np +import torch +from torch.utils.data import Dataset +from PIL import Image +from torchvision import transforms + + +class ImageFolderDataset(Dataset): + """A minimal image folder dataset expecting a structure: root/class_name/*.jpg""" + def __init__(self, root, transform=None): + self.root = root + self.samples = [] # list of (path, label) + classes = sorted([d for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))]) + self.class_to_idx = {c: i for i, c in enumerate(classes)} + for c in classes: + d = os.path.join(root, c) + for fname in os.listdir(d): + if fname.lower().endswith(('.png', '.jpg', '.jpeg')): + self.samples.append((os.path.join(d, fname), self.class_to_idx[c])) + + self.transform = transform or transforms.Compose([ + transforms.Resize((224, 224)), + transforms.ToTensor(), + ]) + + def __len__(self): + return len(self.samples) + + def __getitem__(self, idx): + path, label = self.samples[idx] + img = Image.open(path).convert('RGB') + img = self.transform(img) + return img, label + + +class CSVDataset(Dataset): + """Load classification tabular data from a single CSV file. + + Expects a `label` column and numeric feature columns. Non-numeric columns are dropped. + """ + def __init__(self, csv_path, feature_columns=None, label_column='label', transform=None, generate_labels=False, n_buckets=100, label_method='md5', label_store=None, feature_engineer=False, lat_lon_bins=20, nrows=None): + # read CSV with low_memory=False to avoid mixed-type warnings + if nrows is None: + self.df = pd.read_csv(csv_path, low_memory=False) + else: + self.df = pd.read_csv(csv_path, nrows=nrows, low_memory=False) + self.label_column = label_column + + if generate_labels: + # generate deterministic labels based on selected columns + self.df[self.label_column] = generate_labels_for_df(self.df, n_buckets=n_buckets, method=label_method, label_store=label_store) + + # optional simple feature engineering: extract date parts and lat/lon bins + if feature_engineer: + try: + _add_date_features(self.df) + except Exception: + pass + try: + _add_latlon_bins(self.df, bins=lat_lon_bins) + except Exception: + pass + + if label_column not in self.df.columns: + raise ValueError(f"label column '{label_column}' not found in CSV; set generate_labels=True to create labels") + + # determine feature columns if not provided (numeric columns except label) + if feature_columns is None: + feature_columns = [c for c in self.df.columns if c != label_column and pd.api.types.is_numeric_dtype(self.df[c])] + self.feature_columns = feature_columns + # coerce feature columns to numeric, fill NaNs with column mean (or 0), then standardize + features_df = self.df[self.feature_columns].apply(lambda c: pd.to_numeric(c, errors='coerce')) + # fill NaNs with column mean where possible, otherwise 0 + initial_means = features_df.mean() + features_df = features_df.fillna(initial_means).fillna(0.0) + + # drop columns that remain all-NaN after coercion/fill (unlikely after fillna(0.0)), to avoid NaNs + all_nan_cols = features_df.columns[features_df.isna().all()].tolist() + if len(all_nan_cols) > 0: + # remove from feature list so indices stay consistent + features_df = features_df.drop(columns=all_nan_cols) + self.feature_columns = [c for c in self.feature_columns if c not in all_nan_cols] + + # recompute means/stds from the filled data so subtraction/division won't produce NaNs + col_means = features_df.mean() + col_stds = features_df.std().replace(0, 1.0).fillna(1.0) + + # standardize using the recomputed stats + features_df = (features_df - col_means) / (col_stds + 1e-6) + + self.feature_means = col_means.to_numpy(dtype=float) + self.feature_stds = col_stds.to_numpy(dtype=float) + + self.features = torch.tensor(features_df.values, dtype=torch.float32) + self.labels = torch.tensor(pd.to_numeric(self.df[self.label_column], errors='coerce').fillna(0).astype(int).values, dtype=torch.long) + + def __len__(self): + return len(self.df) + + def __getitem__(self, idx): + return self.features[idx], int(self.labels[idx]) + + +def _normalize_str(x): + if pd.isna(x): + return '' + return str(x).strip().lower() + + +def _normalize_date(x): + try: + # try parse common formats + dt = pd.to_datetime(x) + return dt.strftime('%Y-%m-%d') + except Exception: + return '' + + +def generate_kmeans_labels(df, n_buckets=100, random_state=42, label_store=None): + """Generate labels by running k-means over numeric features (deterministic with seed). + + This produces clusters that are predictable from numeric inputs and are therefore + better suited for training a numeric-feature MLP than arbitrary hash buckets. + """ + # small pure-numpy k-means to avoid external dependency + import numpy as np + + # select numeric columns only + num_df = df.select_dtypes(include=['number']).fillna(0.0) + if num_df.shape[0] == 0 or num_df.shape[1] == 0: + # fallback to hashing if no numeric columns + return generate_labels_for_df(df, n_buckets=n_buckets) + + data = num_df.values.astype(float) + n_samples = data.shape[0] + rng = np.random.default_rng(random_state) + + # If a label_store exists and contains centers, load and use them + import os + if label_store and os.path.exists(label_store): + try: + npz = np.load(label_store) + centers = npz['centers'] + all_dists = np.linalg.norm(data[:, None, :] - centers[None, :, :], axis=2) + all_labels = np.argmin(all_dists, axis=1) + return pd.Series(all_labels, index=df.index) + except Exception: + # fall through to fitting + pass + + # sample points to fit centers if dataset is large + sample_size = min(20000, n_samples) + if sample_size < n_samples: + idx = rng.choice(n_samples, size=sample_size, replace=False) + sample_data = data[idx] + else: + sample_data = data + + # initialize centers by random sampling from sample_data + centers_idx = rng.choice(sample_data.shape[0], size=min(n_buckets, sample_data.shape[0]), replace=False) + centers = sample_data[centers_idx].astype(float) + + # run a small number of iterations + max_iters = 10 + for _ in range(max_iters): + # assign + dists = np.linalg.norm(sample_data[:, None, :] - centers[None, :, :], axis=2) + labels = np.argmin(dists, axis=1) + # recompute centers + new_centers = np.zeros_like(centers) + counts = np.zeros((centers.shape[0],), dtype=int) + for i, lab in enumerate(labels): + new_centers[lab] += sample_data[i] + counts[lab] += 1 + for k in range(centers.shape[0]): + if counts[k] > 0: + new_centers[k] = new_centers[k] / counts[k] + else: + # reinitialize empty cluster + new_centers[k] = sample_data[rng.integers(0, sample_data.shape[0])] + # check convergence (centers change small) + shift = np.linalg.norm(new_centers - centers, axis=1).max() + centers = new_centers + if shift < 1e-4: + break + + # assign labels for all data + all_dists = np.linalg.norm(data[:, None, :] - centers[None, :, :], axis=2) + all_labels = np.argmin(all_dists, axis=1) + # persist centers if requested + if label_store: + try: + np.savez_compressed(label_store, centers=centers) + except Exception: + pass + return pd.Series(all_labels, index=df.index) + + +def generate_labels_for_df(df, n_buckets=100, method='md5', label_store=None): + """Generate deterministic bucket labels 1..n_buckets from rows using selected columns. + + Uses: report_dat, latitude, longitude, street1, street2, ward, injuries, fatalities. + Produces reproducible labels via md5 hashing of a normalized feature string. + """ + if method == 'kmeans': + return generate_kmeans_labels(df, n_buckets=n_buckets, label_store=label_store) + + # Be flexible about column names (case variations and alternate names). + colmap = {c.lower(): c for c in df.columns} + + def get_col(*candidates): + for cand in candidates: + key = cand.lower() + if key in colmap: + return colmap[key] + return None + + report_col = get_col('report_dat', 'reportdate', 'fromdate', 'lastupdatedate') + lat_col = get_col('latitude', 'mpdlatitude', 'lat') + lon_col = get_col('longitude', 'mpdlongitude', 'lon') + street1_col = get_col('street1', 'address', 'mar_address', 'nearestintstreetname') + street2_col = get_col('street2', 'nearestintstreetname') + ward_col = get_col('ward') + + inj_cols = [c for c in df.columns if 'INJUR' in c.upper()] + fat_cols = [c for c in df.columns if 'FATAL' in c.upper()] + + uid = get_col('crimeid', 'eventid', 'objectid', 'ccn') + + def row_to_bucket(row): + parts = [] + # date + parts.append(_normalize_date(row.get(report_col, '') if report_col else '')) + # lat/lon rounded + lat = row.get(lat_col, '') if lat_col else '' + lon = row.get(lon_col, '') if lon_col else '' + try: + parts.append(str(round(float(lat), 5)) if pd.notna(lat) and lat != '' else '') + except Exception: + parts.append('') + try: + parts.append(str(round(float(lon), 5)) if pd.notna(lon) and lon != '' else '') + except Exception: + parts.append('') + + # streets and ward + parts.append(_normalize_str(row.get(street1_col, '') if street1_col else '')) + parts.append(_normalize_str(row.get(street2_col, '') if street2_col else '')) + parts.append(_normalize_str(row.get(ward_col, '') if ward_col else '')) + + # injuries: sum any injury-like columns + inj_sum = 0 + for c in inj_cols: + try: + v = row.get(c, 0) + inj_sum += int(v) if pd.notna(v) and v != '' else 0 + except Exception: + pass + parts.append(str(inj_sum)) + + # fatalities: sum any fatal-like columns + fat_sum = 0 + for c in fat_cols: + try: + v = row.get(c, 0) + fat_sum += int(v) if pd.notna(v) and v != '' else 0 + except Exception: + pass + parts.append(str(fat_sum)) + + # fallback uid + if uid: + parts.append(str(row.get(uid, ''))) + + s = '|'.join(parts) + h = hashlib.md5(s.encode('utf-8')).hexdigest() + val = int(h, 16) % n_buckets + return val + + return df.apply(row_to_bucket, axis=1) + + +def _add_date_features(df, date_col_candidates=None): + """Add simple date-derived numeric columns to the dataframe. + + Adds: report_year, report_month, report_day, report_weekday, report_hour (where available). + If no date column is found, function is a no-op. + """ + if date_col_candidates is None: + date_col_candidates = ['report_dat', 'reportdate', 'fromdate', 'lastupdatedate', 'date', 'occur_date'] + colmap = {c.lower(): c for c in df.columns} + date_col = None + for cand in date_col_candidates: + if cand.lower() in colmap: + date_col = colmap[cand.lower()] + break + if date_col is None: + return + try: + ser = pd.to_datetime(df[date_col], errors='coerce') + except Exception: + ser = pd.to_datetime(df[date_col].astype(str), errors='coerce') + + df['report_year'] = ser.dt.year.fillna(-1).astype(float) + df['report_month'] = ser.dt.month.fillna(-1).astype(float) + df['report_day'] = ser.dt.day.fillna(-1).astype(float) + df['report_weekday'] = ser.dt.weekday.fillna(-1).astype(float) + # hour may not exist; if parsing fails we'll get NaN + df['report_hour'] = ser.dt.hour.fillna(-1).astype(float) + + +def _add_hashed_street(df, n_hash_buckets=32, street_col_candidates=None): + """Add a small hashed numeric feature for street/address text fields. + + Adds `street_hash_0..N-1` as dense float columns containing one-hot-ish hashed values. + Uses MD5-based hashing reduced to a bucket and then maps to a small integer vector. + """ + if street_col_candidates is None: + street_col_candidates = ['street1', 'street', 'address', 'mar_address', 'nearestintstreetname'] + colmap = {c.lower(): c for c in df.columns} + street_col = None + for cand in street_col_candidates: + if cand.lower() in colmap: + street_col = colmap[cand.lower()] + break + if street_col is None: + return + + import hashlib + # create a single integer hash bucket per row + def row_hash(val): + if pd.isna(val) or str(val).strip() == '': + return -1 + h = hashlib.md5(str(val).encode('utf-8')).hexdigest() + return int(h, 16) % n_hash_buckets + + buckets = df[street_col].apply(row_hash).fillna(-1).astype(int).to_numpy() + # create N numeric columns with a one-hot style (0/1) encoded as floats; missing bucket => zeros + for i in range(n_hash_buckets): + colname = f'street_hash_{i}' + df[colname] = (buckets == i).astype(float) + + +def _add_latlon_bins(df, bins=20, lat_col_candidates=None, lon_col_candidates=None): + """Add coarse spatial bins for latitude/longitude and rounded lat/lon numeric features. + + Adds: lat_round, lon_round, lat_bin, lon_bin (bins numbered 0..bins-1, -1 for missing). + """ + if lat_col_candidates is None: + lat_col_candidates = ['latitude', 'mpdlatitude', 'lat'] + if lon_col_candidates is None: + lon_col_candidates = ['longitude', 'mpdlongitude', 'lon'] + colmap = {c.lower(): c for c in df.columns} + lat_col = None + lon_col = None + for cand in lat_col_candidates: + if cand.lower() in colmap: + lat_col = colmap[cand.lower()] + break + for cand in lon_col_candidates: + if cand.lower() in colmap: + lon_col = colmap[cand.lower()] + break + if lat_col is None or lon_col is None: + return + try: + lat = pd.to_numeric(df[lat_col], errors='coerce') + lon = pd.to_numeric(df[lon_col], errors='coerce') + except Exception: + lat = pd.to_numeric(df[lat_col].astype(str), errors='coerce') + lon = pd.to_numeric(df[lon_col].astype(str), errors='coerce') + + df['lat_round'] = lat.round(3).fillna(0.0).astype(float) + df['lon_round'] = lon.round(3).fillna(0.0).astype(float) + + try: + # compute bins using quantiles if possible to get balanced bins; fallback to linear bins + valid_lat = lat.dropna() + valid_lon = lon.dropna() + if len(valid_lat) >= bins and len(valid_lon) >= bins: + # qcut may produce NaNs for duplicates; use rank-based discretization + df['lat_bin'] = pd.qcut(lat.rank(method='first'), q=bins, labels=False, duplicates='drop') + df['lon_bin'] = pd.qcut(lon.rank(method='first'), q=bins, labels=False, duplicates='drop') + else: + lat_min, lat_max = valid_lat.min() if len(valid_lat) > 0 else 0.0, valid_lat.max() if len(valid_lat) > 0 else 0.0 + lon_min, lon_max = valid_lon.min() if len(valid_lon) > 0 else 0.0, valid_lon.max() if len(valid_lon) > 0 else 0.0 + lat_span = (lat_max - lat_min) + 1e-6 + lon_span = (lon_max - lon_min) + 1e-6 + df['lat_bin'] = (((lat - lat_min) / lat_span) * bins).fillna(-1).astype(int).clip(lower=-1, upper=bins-1) + df['lon_bin'] = (((lon - lon_min) / lon_span) * bins).fillna(-1).astype(int).clip(lower=-1, upper=bins-1) + except Exception: + # fallback: set -1 for bins + df['lat_bin'] = -1 + df['lon_bin'] = -1 + + +# Debugging code - to be removed or commented out in production +# python - <<'PY' +# import pandas as pd +# from data import generate_labels_for_df +# df = pd.read_csv('data.csv', nrows=50, low_memory=False) +# labs = generate_labels_for_df(df, n_buckets=100) +# print(df[['REPORTDATE','LATITUDE','LONGITUDE','ADDRESS','WARD']].head().to_string()) +# print('labels:', list(labs[:20])) +# PY + +# Command to run the training (to be executed in the terminal, not in the script) +# python train.py data.csv --model-type mlp --generate-labels --n-buckets 100 --epochs 5 --batch-size 256 --lr 1e-3 + diff --git a/roadcast/debug_labels.py b/roadcast/debug_labels.py new file mode 100644 index 0000000..7529efd --- /dev/null +++ b/roadcast/debug_labels.py @@ -0,0 +1,76 @@ +import pandas as pd +import hashlib +from data import _normalize_str, _normalize_date + +p='data.csv' +df=pd.read_csv(p, nrows=50, low_memory=False) +print('Columns:', list(df.columns)) + +colmap = {c.lower(): c for c in df.columns} + +def get_col(*candidates): + for cand in candidates: + key = cand.lower() + if key in colmap: + return colmap[key] + return None + +report_col = get_col('report_dat', 'reportdate', 'fromdate', 'lastupdatedate') +lat_col = get_col('latitude', 'mpdlatitude', 'lat') +lon_col = get_col('longitude', 'mpdlongitude', 'lon') +street1_col = get_col('street1', 'address', 'mar_address', 'nearestintstreetname') +street2_col = get_col('street2', 'nearestintstreetname') +ward_col = get_col('ward') +inj_cols = [c for c in df.columns if 'INJUR' in c.upper()] +fat_cols = [c for c in df.columns if 'FATAL' in c.upper()] +uid = get_col('crimeid', 'eventid', 'objectid', 'ccn') + +print('Resolved columns:') +print('report_col=', report_col) +print('lat_col=', lat_col) +print('lon_col=', lon_col) +print('street1_col=', street1_col) +print('street2_col=', street2_col) +print('ward_col=', ward_col) +print('inj_cols=', inj_cols[:10]) +print('fat_cols=', fat_cols[:10]) +print('uid=', uid) + +for i, row in df.iterrows(): + parts = [] + parts.append(_normalize_date(row.get(report_col, '') if report_col else '')) + lat = row.get(lat_col, '') if lat_col else '' + lon = row.get(lon_col, '') if lon_col else '' + try: + parts.append(str(round(float(lat), 5)) if pd.notna(lat) and lat != '' else '') + except Exception: + parts.append('') + try: + parts.append(str(round(float(lon), 5)) if pd.notna(lon) and lon != '' else '') + except Exception: + parts.append('') + parts.append(_normalize_str(row.get(street1_col, '') if street1_col else '')) + parts.append(_normalize_str(row.get(street2_col, '') if street2_col else '')) + parts.append(_normalize_str(row.get(ward_col, '') if ward_col else '')) + inj_sum = 0 + for c in inj_cols: + try: + v = row.get(c, 0) + inj_sum += int(v) if pd.notna(v) and v != '' else 0 + except Exception: + pass + parts.append(str(inj_sum)) + fat_sum = 0 + for c in fat_cols: + try: + v = row.get(c, 0) + fat_sum += int(v) if pd.notna(v) and v != '' else 0 + except Exception: + pass + parts.append(str(fat_sum)) + if uid: + parts.append(str(row.get(uid, ''))) + s='|'.join(parts) + h=hashlib.md5(s.encode('utf-8')).hexdigest() + val=int(h,16)%100 + print(i, 'label=', val, 's="'+s+'"') diff --git a/roadcast/diagnostics.py b/roadcast/diagnostics.py new file mode 100644 index 0000000..2bc7219 --- /dev/null +++ b/roadcast/diagnostics.py @@ -0,0 +1,67 @@ +import traceback +import numpy as np +import torch +from torch.utils.data import DataLoader + +from data import CSVDataset +from models import create_model + + +def print_arr_stats(name, arr): + arr = np.asarray(arr) + print(f"{name}: shape={arr.shape} dtype={arr.dtype}") + print(f" mean={np.nanmean(arr):.6f} std={np.nanstd(arr):.6f} min={np.nanmin(arr):.6f} max={np.nanmax(arr):.6f}") + print(f" any_nan={np.isnan(arr).any()} any_inf={np.isinf(arr).any()}") + + +def main(): + try: + print('Loading dataset...') + ds = CSVDataset('data.csv', generate_labels=True, n_buckets=100) + print(f'dataset length: {len(ds)}') + + X = ds.features.numpy() + y = ds.labels.numpy() + + print_arr_stats('X (all)', X) + # per-column NaN/inf counts + nan_counts = np.isnan(X).sum(axis=0) + inf_counts = np.isinf(X).sum(axis=0) + print('per-column nan counts (first 20):', nan_counts[:20].tolist()) + print('per-column inf counts (first 20):', inf_counts[:20].tolist()) + + print('Labels stats: unique count=', len(np.unique(y))) + print('Labels min/max:', int(y.min()), int(y.max())) + vals, counts = np.unique(y, return_counts=True) + print('Label distribution (first 20):', list(zip(vals[:20].tolist(), counts[:20].tolist()))) + + # get a small batch + dl = DataLoader(ds, batch_size=64, shuffle=False) + xb, yb = next(iter(dl)) + print_arr_stats('xb batch', xb.numpy()) + print('yb batch unique:', np.unique(yb.numpy())) + + # build model + print('Building model...') + model = create_model(device='cpu', model_type='mlp', input_dim=X.shape[1], num_classes=100) + model.eval() + with torch.no_grad(): + out = model(xb) + out_np = out.numpy() + print_arr_stats('model outputs', out_np) + + # check for rows with NaN/Inf in features + bad_rows = np.where(np.isnan(X).any(axis=1) | np.isinf(X).any(axis=1))[0] + print('bad rows count:', len(bad_rows)) + if len(bad_rows) > 0: + print('first bad row index:', bad_rows[0]) + print('row values:', X[bad_rows[0]].tolist()) + print('label:', int(y[bad_rows[0]])) + + except Exception as e: + print('Exception during diagnostics:') + traceback.print_exc() + + +if __name__ == '__main__': + main() diff --git a/roadcast/fit_kmeans.py b/roadcast/fit_kmeans.py new file mode 100644 index 0000000..0a43828 --- /dev/null +++ b/roadcast/fit_kmeans.py @@ -0,0 +1,65 @@ +"""Fit k-means centers on CSV numeric features (optionally PCA) and save centers to .npz + +Usage: python fit_kmeans.py data.csv --n-buckets 10 --out kmeans_centers_final.npz --sample 50000 --pca 50 +""" +import argparse +import numpy as np +import pandas as pd + +from data import generate_kmeans_labels + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('csv') + parser.add_argument('--n-buckets', type=int, default=10) + parser.add_argument('--out', default='kmeans_centers_final.npz') + parser.add_argument('--sample', type=int, default=50000, help='max rows to sample for fitting') + parser.add_argument('--pca', type=int, default=0, help='Apply PCA to reduce dims before kmeans (0=none)') + args = parser.parse_args() + + # read numeric columns only to avoid huge memory usage + df = pd.read_csv(args.csv, low_memory=False) + num_df = df.select_dtypes(include=['number']).fillna(0.0) + data = num_df.values.astype(float) + if data.shape[0] == 0 or data.shape[1] == 0: + raise SystemExit('No numeric data found in CSV') + + # sample rows if requested + if args.sample and args.sample < data.shape[0]: + rng = np.random.default_rng(42) + idx = rng.choice(data.shape[0], size=args.sample, replace=False) + sample_data = data[idx] + else: + sample_data = data + + # Use the kmeans implementation via generate_kmeans_labels for fitting centers. + # We'll call the internal function by adapting it here: import numpy locally. + import numpy as _np + + # initialize centers by random sampling + rng = _np.random.default_rng(42) + k = min(args.n_buckets, sample_data.shape[0]) + centers_idx = rng.choice(sample_data.shape[0], size=k, replace=False) + centers = sample_data[centers_idx].astype(float) + + max_iters = 50 + for _ in range(max_iters): + dists = np.linalg.norm(sample_data[:, None, :] - centers[None, :, :], axis=2) + labels = np.argmin(dists, axis=1) + new_centers = np.zeros_like(centers) + counts = np.zeros((centers.shape[0],), dtype=int) + for i, lab in enumerate(labels): + new_centers[lab] += sample_data[i] + counts[lab] += 1 + for kk in range(centers.shape[0]): + if counts[kk] > 0: + new_centers[kk] = new_centers[kk] / counts[kk] + else: + new_centers[kk] = sample_data[rng.integers(0, sample_data.shape[0])] + shift = np.linalg.norm(new_centers - centers, axis=1).max() + centers = new_centers + if shift < 1e-4: + break + + np.savez_compressed(args.out, centers=centers) + print('Saved centers to', args.out) diff --git a/roadcast/inference.py b/roadcast/inference.py new file mode 100644 index 0000000..49e8b6e --- /dev/null +++ b/roadcast/inference.py @@ -0,0 +1,30 @@ +import os +import torch +import torch.nn.functional as F +from PIL import Image +from torchvision import transforms + +from models import create_model + + +def load_model(path, device=None, in_channels=3, num_classes=10): + device = device or ('cuda' if torch.cuda.is_available() else 'cpu') + checkpoint = torch.load(path, map_location=device) + model = create_model(device=device, in_channels=in_channels, num_classes=num_classes) + model.load_state_dict(checkpoint['model_state_dict']) + model.eval() + class_to_idx = checkpoint.get('class_to_idx') + idx_to_class = {v: k for k, v in class_to_idx.items()} if class_to_idx else None + return model, idx_to_class + + +def predict_image(model, img_path, device=None): + device = device or ('cuda' if torch.cuda.is_available() else 'cpu') + preprocess = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()]) + img = Image.open(img_path).convert('RGB') + x = preprocess(img).unsqueeze(0).to(device) + with torch.no_grad(): + logits = model(x) + probs = F.softmax(logits, dim=1) + conf, idx = torch.max(probs, dim=1) + return int(idx.item()), float(conf.item()) diff --git a/roadcast/inspect_csv.py b/roadcast/inspect_csv.py new file mode 100644 index 0000000..4e2c0b2 --- /dev/null +++ b/roadcast/inspect_csv.py @@ -0,0 +1,34 @@ +import pandas as pd + +p = 'data.csv' +print('Reading first 2000 rows of', p) +df = pd.read_csv(p, nrows=2000, low_memory=False) +print('Columns:', list(df.columns)) +cols = ['report_dat','latitude','longitude','street1','street2','ward','injuries','fatalities'] +print('\nField stats for label-generator columns:') +for c in cols: + if c in df.columns: + ser = df[c] + try: + unique = ser.dropna().unique()[:5].tolist() + except Exception: + unique = [] + print(f"{c}: present dtype={ser.dtype} n_unique={ser.nunique(dropna=False)} n_null={int(ser.isna().sum())} sample_values={unique}") + else: + print(f"{c}: MISSING") + +# If labels already present, show distribution +if 'label' in df.columns: + print('\nLabel column present in sample:') + print(df['label'].value_counts().head(20)) +else: + print('\nLabel column not present in sample') + +# Also show per-column fraction NaN for numeric columns +num_cols = df.select_dtypes(include=['number']).columns.tolist() +print('\nNumeric columns and NaN fraction (first 20):') +for c in num_cols[:20]: + ser = df[c] + print(f"{c}: n={len(ser)} null_frac={ser.isna().mean():.4f} min={ser.min()} max={ser.max()}") + +print('\nDone') diff --git a/roadcast/kmeans_centers.npz b/roadcast/kmeans_centers.npz new file mode 100644 index 0000000..5784c04 Binary files /dev/null and b/roadcast/kmeans_centers.npz differ diff --git a/roadcast/kmeans_centers_all.npz b/roadcast/kmeans_centers_all.npz new file mode 100644 index 0000000..867fba0 Binary files /dev/null and b/roadcast/kmeans_centers_all.npz differ diff --git a/roadcast/kmeans_centers_best.npz b/roadcast/kmeans_centers_best.npz new file mode 100644 index 0000000..2fdd1ce Binary files /dev/null and b/roadcast/kmeans_centers_best.npz differ diff --git a/roadcast/kmeans_centers_final.npz b/roadcast/kmeans_centers_final.npz new file mode 100644 index 0000000..8859f5d Binary files /dev/null and b/roadcast/kmeans_centers_final.npz differ diff --git a/roadcast/kmeans_centers_nb10.npz b/roadcast/kmeans_centers_nb10.npz new file mode 100644 index 0000000..2fdd1ce Binary files /dev/null and b/roadcast/kmeans_centers_nb10.npz differ diff --git a/roadcast/model.pth b/roadcast/model.pth new file mode 100644 index 0000000..1cdfe6e Binary files /dev/null and b/roadcast/model.pth differ diff --git a/roadcast/models.py b/roadcast/models.py new file mode 100644 index 0000000..a3e7aee --- /dev/null +++ b/roadcast/models.py @@ -0,0 +1,68 @@ +import torch +import torch.nn as nn + + +class SimpleCNN(nn.Module): + """A small CNN for image classification (adjustable). Automatically computes flattened size.""" + def __init__(self, in_channels=3, num_classes=10, input_size=(3, 224, 224)): + super().__init__() + self.features = nn.Sequential( + nn.Conv2d(in_channels, 32, kernel_size=3, padding=1), + nn.ReLU(), + nn.MaxPool2d(2), + nn.Conv2d(32, 64, kernel_size=3, padding=1), + nn.ReLU(), + nn.MaxPool2d(2), + ) + # compute flatten size using a dummy tensor + with torch.no_grad(): + dummy = torch.zeros(1, *input_size) + feat = self.features(dummy) + flat_features = int(feat.numel() / feat.shape[0]) + + self.classifier = nn.Sequential( + nn.Flatten(), + nn.Linear(flat_features, 256), + nn.ReLU(), + nn.Dropout(0.5), + nn.Linear(256, num_classes), + ) + + def forward(self, x): + x = self.features(x) + x = self.classifier(x) + return x + + +class MLP(nn.Module): + """Simple MLP for tabular CSV data classification.""" + def __init__(self, input_dim, hidden_dims=(256, 128), num_classes=2): + super().__init__() + layers = [] + prev = input_dim + for h in hidden_dims: + layers.append(nn.Linear(prev, h)) + layers.append(nn.ReLU()) + layers.append(nn.Dropout(0.2)) + prev = h + layers.append(nn.Linear(prev, num_classes)) + self.net = nn.Sequential(*layers) + + def forward(self, x): + return self.net(x) + + +def create_model(device=None, in_channels=3, num_classes=10, input_size=(3, 224, 224), model_type='cnn', input_dim=None, hidden_dims=None): + if model_type == 'mlp': + if input_dim is None: + raise ValueError('input_dim is required for mlp model_type') + if hidden_dims is None: + model = MLP(input_dim=input_dim, num_classes=num_classes) + else: + model = MLP(input_dim=input_dim, hidden_dims=hidden_dims, num_classes=num_classes) + else: + model = SimpleCNN(in_channels=in_channels, num_classes=num_classes, input_size=input_size) + + if device: + model.to(device) + return model diff --git a/roadcast/openweather_inference.py b/roadcast/openweather_inference.py new file mode 100644 index 0000000..3775112 --- /dev/null +++ b/roadcast/openweather_inference.py @@ -0,0 +1,339 @@ +""" +Fetch OpenWeather data for a coordinate/time and run the trained MLP to predict the k-means cluster label. + +Usage examples: + # with training CSV provided to compute preprocessing stats: + python openweather_inference.py --lat 38.9 --lon -77.0 --datetime "2025-09-27T12:00:00" --train-csv data.csv --model model.pth --centers kmeans_centers_all.npz --api-key $OPENWEATHER_KEY + + # with precomputed preprocess meta (saved from training): + python openweather_inference.py --lat 38.9 --lon -77.0 --datetime "2025-09-27T12:00:00" --preprocess-meta preprocess_meta.npz --model model.pth --centers kmeans_centers_all.npz --api-key $OPENWEATHER_KEY + +Notes: +- The script uses the same feature-engineering helpers in `data.py` so the model sees identical inputs. +- You must either provide `--train-csv` (to compute feature columns & means/stds) or `--preprocess-meta` previously saved. +- Provide the OpenWeather API key via --api-key or the OPENWEATHER_KEY environment variable. +""" + +import os +import argparse +import json +from datetime import datetime +import numpy as np +import pandas as pd +import torch +import torch.nn.functional as F + +# reuse helpers from your repo +from data import _add_date_features, _add_latlon_bins, _add_hashed_street, CSVDataset +from inference import load_model + +# module-level caches to avoid reloading heavy artifacts per request +_CACHED_MODEL = None +_CACHED_IDX_TO_CLASS = None +_CACHED_CENTERS = None +_CACHED_PREPROCESS_META = None + + +OW_BASE = 'https://api.openweathermap.org/data/2.5/onecall' + + +def fetch_openweather(lat, lon, api_key, dt_iso=None): + """Fetch weather from OpenWeather One Call API for given lat/lon. If dt_iso provided, we fetch current+hourly and pick closest timestamp.""" + try: + import requests + except Exception: + raise RuntimeError('requests library is required to fetch OpenWeather data') + params = { + 'lat': float(lat), + 'lon': float(lon), + 'appid': api_key, + 'units': 'metric', + 'exclude': 'minutely,alerts' + } + r = requests.get(OW_BASE, params=params, timeout=10) + r.raise_for_status() + payload = r.json() + # if dt_iso provided, find nearest hourly data point + if dt_iso: + try: + target = pd.to_datetime(dt_iso) + except Exception: + target = None + best = None + if 'hourly' in payload and target is not None: + hours = payload['hourly'] + best = min(hours, key=lambda h: abs(pd.to_datetime(h['dt'], unit='s') - target)) + # convert keys to a flat dict with prefix 'ow_' + d = { + 'ow_temp': best.get('temp'), + 'ow_feels_like': best.get('feels_like'), + 'ow_pressure': best.get('pressure'), + 'ow_humidity': best.get('humidity'), + 'ow_wind_speed': best.get('wind_speed'), + 'ow_clouds': best.get('clouds'), + 'ow_pop': best.get('pop'), + } + return d + # fallback: use current + cur = payload.get('current', {}) + d = { + 'ow_temp': cur.get('temp'), + 'ow_feels_like': cur.get('feels_like'), + 'ow_pressure': cur.get('pressure'), + 'ow_humidity': cur.get('humidity'), + 'ow_wind_speed': cur.get('wind_speed'), + 'ow_clouds': cur.get('clouds'), + 'ow_pop': None, + } + return d + + +def fetch_roadrisk(roadrisk_url, api_key=None): + """Fetch the RoadRisk endpoint (expects JSON). If `api_key` is provided, we'll attach it as a query param if the URL has no key. + + We flatten top-level numeric fields into `rr_*` keys for the feature row. + """ + # if api_key provided and url does not contain appid, append it + try: + import requests + except Exception: + raise RuntimeError('requests library is required to fetch RoadRisk data') + url = roadrisk_url + if api_key and 'appid=' not in roadrisk_url: + sep = '&' if '?' in roadrisk_url else '?' + url = f"{roadrisk_url}{sep}appid={api_key}" + + r = requests.get(url, timeout=10) + r.raise_for_status() + payload = r.json() + # flatten numeric top-level fields + out = {} + if isinstance(payload, dict): + for k, v in payload.items(): + if isinstance(v, (int, float)): + out[f'rr_{k}'] = v + # if nested objects contain simple numeric fields, pull them too (one level deep) + elif isinstance(v, dict): + for kk, vv in v.items(): + if isinstance(vv, (int, float)): + out[f'rr_{k}_{kk}'] = vv + return out + + +def build_row(lat, lon, dt_iso=None, street=None, extra_weather=None): + """Construct a single-row DataFrame with columns expected by the training pipeline. + + It intentionally uses column names the original `data.py` looked for (REPORTDATE, LATITUDE, LONGITUDE, ADDRESS, etc.). + """ + row = {} + # date column matching common names + row['REPORTDATE'] = dt_iso if dt_iso else datetime.utcnow().isoformat() + row['LATITUDE'] = lat + row['LONGITUDE'] = lon + row['ADDRESS'] = street if street else '' + # include some injury/fatality placeholders that the label generator expects + row['INJURIES'] = 0 + row['FATALITIES'] = 0 + # include weather features returned by OpenWeather (prefixed 'ow_') + if extra_weather: + for k, v in extra_weather.items(): + row[k] = v + return pd.DataFrame([row]) + + +def prepare_features(df_row, train_csv=None, preprocess_meta=None, feature_engineer=True, lat_lon_bins=20): + """Given a one-row DataFrame, apply same feature engineering and standardization as training. + + If preprocess_meta is provided (npz), use it. Otherwise train_csv must be provided to compute stats. + Returns a torch.FloatTensor of shape (1, input_dim) and the feature_columns list. + """ + # apply feature engineering helpers + if feature_engineer: + try: + _add_date_features(df_row) + except Exception: + pass + try: + _add_latlon_bins(df_row, bins=lat_lon_bins) + except Exception: + pass + try: + _add_hashed_street(df_row) + except Exception: + pass + + # if meta provided, load feature_columns, means, stds + if preprocess_meta and os.path.exists(preprocess_meta): + meta = np.load(preprocess_meta, allow_pickle=True) + feature_columns = meta['feature_columns'].tolist() + means = meta['means'] + stds = meta['stds'] + else: + if not train_csv: + raise ValueError('Either preprocess_meta or train_csv must be provided to derive feature stats') + # instantiate a CSVDataset on train_csv (feature_engineer True) to reuse its preprocessing + ds = CSVDataset(train_csv, feature_columns=None, label_column='label', generate_labels=False, n_buckets=10, label_method='kmeans', label_store=None, feature_engineer=feature_engineer, lat_lon_bins=lat_lon_bins, nrows=None) + feature_columns = ds.feature_columns + means = ds.feature_means + stds = ds.feature_stds + # save meta for reuse + np.savez_compressed('preprocess_meta.npz', feature_columns=np.array(feature_columns, dtype=object), means=means, stds=stds) + print('Saved preprocess_meta.npz') + + # ensure all feature columns exist in df_row + for c in feature_columns: + if c not in df_row.columns: + df_row[c] = 0 + + # coerce and fill using means + features_df = df_row[feature_columns].apply(lambda c: pd.to_numeric(c, errors='coerce')) + features_df = features_df.fillna(pd.Series(means, index=feature_columns)).fillna(0.0) + # standardize + features_np = (features_df.values - means) / (stds + 1e-6) + import torch + return torch.tensor(features_np, dtype=torch.float32), feature_columns + + +def predict_from_openweather(lat, lon, dt_iso=None, street=None, api_key=None, train_csv=None, preprocess_meta=None, model_path='model.pth', centers_path='kmeans_centers_all.npz', roadrisk_url=None): + api_key = api_key or os.environ.get('OPENWEATHER_KEY') + if api_key is None: + raise ValueError('OpenWeather API key required via --api-key or OPENWEATHER_KEY env var') + + # gather weather/road-risk features + weather = {} + if roadrisk_url: + try: + rr = fetch_roadrisk(roadrisk_url, api_key=api_key) + weather.update(rr) + except Exception as e: + print('Warning: failed to fetch roadrisk URL:', e) + else: + try: + ow = fetch_openweather(lat, lon, api_key, dt_iso=dt_iso) + weather.update(ow) + except Exception as e: + print('Warning: failed to fetch openweather:', e) + + df_row = build_row(lat, lon, dt_iso=dt_iso, street=street, extra_weather=weather) + x_tensor, feature_columns = prepare_features(df_row, train_csv=train_csv, preprocess_meta=preprocess_meta) + + # load model (infer num_classes from centers file if possible) + global _CACHED_MODEL, _CACHED_IDX_TO_CLASS, _CACHED_CENTERS, _CACHED_PREPROCESS_META + + # ensure we have preprocess_meta available (prefer supplied path, otherwise fallback to saved file) + if preprocess_meta is None: + candidate = os.path.join(os.getcwd(), 'preprocess_meta.npz') + if os.path.exists(candidate): + preprocess_meta = candidate + + # load centers (cache across requests) + if _CACHED_CENTERS is None: + if centers_path and os.path.exists(centers_path): + try: + npz = np.load(centers_path) + _CACHED_CENTERS = npz['centers'] + except Exception: + _CACHED_CENTERS = None + else: + _CACHED_CENTERS = None + + num_classes = _CACHED_CENTERS.shape[0] if _CACHED_CENTERS is not None else 10 + + # load model once and cache it + if _CACHED_MODEL is None: + try: + _CACHED_MODEL, _CACHED_IDX_TO_CLASS = load_model(model_path, device=None, in_channels=3, num_classes=num_classes) + device = 'cuda' if torch.cuda.is_available() else 'cpu' + _CACHED_MODEL.to(device) + except Exception as e: + raise + model = _CACHED_MODEL + idx_to_class = _CACHED_IDX_TO_CLASS + device = 'cuda' if torch.cuda.is_available() else 'cpu' + x_tensor = x_tensor.to(device) + with torch.no_grad(): + logits = model(x_tensor) + probs = F.softmax(logits, dim=1).cpu().numpy()[0] + pred_idx = int(probs.argmax()) + confidence = float(probs.max()) + + # optionally provide cluster centroid info + centroid = _CACHED_CENTERS[pred_idx] if _CACHED_CENTERS is not None else None + + return { + 'pred_cluster': int(pred_idx), + 'confidence': confidence, + 'probabilities': probs.tolist(), + 'centroid': centroid.tolist() if centroid is not None else None, + 'feature_columns': feature_columns, + 'used_preprocess_meta': preprocess_meta + } + + +def init_inference(model_path='model.pth', centers_path='kmeans_centers_all.npz', preprocess_meta=None): + """Eagerly load model, centers, and preprocess_meta into module-level caches. + + This is intended to be called at app startup to surface load errors early and avoid + per-request disk IO. The function is best-effort and will print warnings if artifacts + are missing. + """ + global _CACHED_MODEL, _CACHED_IDX_TO_CLASS, _CACHED_CENTERS, _CACHED_PREPROCESS_META + + # prefer existing saved preprocess_meta if not explicitly provided + if preprocess_meta is None: + candidate = os.path.join(os.getcwd(), 'preprocess_meta.npz') + if os.path.exists(candidate): + preprocess_meta = candidate + + _CACHED_PREPROCESS_META = preprocess_meta + + # load centers + if _CACHED_CENTERS is None: + if centers_path and os.path.exists(centers_path): + try: + npz = np.load(centers_path) + _CACHED_CENTERS = npz['centers'] + print(f'Loaded centers from {centers_path}') + except Exception as e: + print('Warning: failed to load centers:', e) + _CACHED_CENTERS = None + else: + print('No centers file found at', centers_path) + _CACHED_CENTERS = None + + num_classes = _CACHED_CENTERS.shape[0] if _CACHED_CENTERS is not None else 10 + + # load model + if _CACHED_MODEL is None: + try: + _CACHED_MODEL, _CACHED_IDX_TO_CLASS = load_model(model_path, device=None, in_channels=3, num_classes=num_classes) + device = 'cuda' if torch.cuda.is_available() else 'cpu' + _CACHED_MODEL.to(device) + print(f'Loaded model from {model_path}') + except Exception as e: + print('Warning: failed to load model:', e) + _CACHED_MODEL = None + + return { + 'model_loaded': _CACHED_MODEL is not None, + 'centers_loaded': _CACHED_CENTERS is not None, + 'preprocess_meta': _CACHED_PREPROCESS_META + } + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--lat', type=float, required=True) + parser.add_argument('--lon', type=float, required=True) + parser.add_argument('--datetime', default=None, help='ISO datetime string to query hourly weather (optional)') + parser.add_argument('--street', default='') + parser.add_argument('--api-key', default=None, help='OpenWeather API key or use OPENWEATHER_KEY env var') + parser.add_argument('--train-csv', default=None, help='Path to training CSV to compute preprocessing stats (optional if --preprocess-meta provided)') + parser.add_argument('--preprocess-meta', default=None, help='Path to precomputed preprocess_meta.npz (optional)') + parser.add_argument('--model', default='model.pth') + parser.add_argument('--centers', default='kmeans_centers_all.npz') + parser.add_argument('--roadrisk-url', default=None, help='Optional custom RoadRisk API URL (if provided, will be queried instead of OneCall)') + args = parser.parse_args() + + out = predict_from_openweather(args.lat, args.lon, dt_iso=args.datetime, street=args.street, api_key=args.api_key, train_csv=args.train_csv, preprocess_meta=args.preprocess_meta, model_path=args.model, centers_path=args.centers, roadrisk_url=args.roadrisk_url) + print(json.dumps(out, indent=2)) diff --git a/roadcast/requirements.txt b/roadcast/requirements.txt new file mode 100644 index 0000000..ee9c857 --- /dev/null +++ b/roadcast/requirements.txt @@ -0,0 +1,5 @@ +flask>=2.0 +torch>=1.13 +torchvision>=0.14 +Pillow>=9.0 +tqdm>=4.60 diff --git a/roadcast/run_batch_inference.py b/roadcast/run_batch_inference.py new file mode 100644 index 0000000..119610d --- /dev/null +++ b/roadcast/run_batch_inference.py @@ -0,0 +1,113 @@ +import os +import argparse +import pandas as pd +import numpy as np +import time + +import openweather_inference as owi + + +def find_column(df_cols, candidates): + cmap = {c.lower(): c for c in df_cols} + for cand in candidates: + if cand.lower() in cmap: + return cmap[cand.lower()] + return None + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('csv', help='Path to data CSV (e.g., data.csv)') + parser.add_argument('--out', default='inference_results.csv') + parser.add_argument('--lat-col', default=None) + parser.add_argument('--lon-col', default=None) + parser.add_argument('--date-col', default=None) + parser.add_argument('--model', default='model.pth') + parser.add_argument('--centers', default='kmeans_centers_all.npz') + parser.add_argument('--preprocess-meta', default=None) + parser.add_argument('--api-key', default=None) + parser.add_argument('--live', action='store_true', help='If set, call external RoadRisk/OpenWeather per row') + parser.add_argument('--roadrisk-url', default=None, help='Optional per-request RoadRisk URL to use when --live') + parser.add_argument('--subset', type=int, default=0, help='Process only first N rows for testing') + args = parser.parse_args() + + df = pd.read_csv(args.csv, low_memory=False) + nrows = args.subset if args.subset and args.subset > 0 else len(df) + df = df.iloc[:nrows].copy() + + # find sensible columns + lat_col = args.lat_col or find_column(df.columns, ['latitude', 'lat', 'mpdlatitude']) + lon_col = args.lon_col or find_column(df.columns, ['longitude', 'lon', 'mpdlongitude']) + date_col = args.date_col or find_column(df.columns, ['report_dat', 'reportdate', 'fromdate', 'lastupdatedate', 'date', 'occur_date']) + + if lat_col is None or lon_col is None: + raise SystemExit('Could not find latitude/longitude columns automatically. Pass --lat-col and --lon-col.') + + print(f'Using lat column: {lat_col}, lon column: {lon_col}, date column: {date_col}') + + # eager init caches + status = owi.init_inference(model_path=args.model, centers_path=args.centers, preprocess_meta=args.preprocess_meta) + print('init status:', status) + + results = [] + t0 = time.time() + for i, row in df.iterrows(): + lat = row.get(lat_col) + lon = row.get(lon_col) + dt = row.get(date_col) if date_col else None + + try: + if args.live: + # call the full pipeline which may hit remote API + out = owi.predict_from_openweather(lat, lon, dt_iso=dt, street=None, api_key=args.api_key, train_csv=None, preprocess_meta=args.preprocess_meta, model_path=args.model, centers_path=args.centers, roadrisk_url=args.roadrisk_url) + else: + # local-only path: build row, prepare features using preprocess_meta, and run cached model + df_row = owi.build_row(lat, lon, dt_iso=dt, street=None, extra_weather=None) + x_tensor, feature_columns = owi.prepare_features(df_row, train_csv=None, preprocess_meta=args.preprocess_meta) + # ensure model cached + if owi._CACHED_MODEL is None: + owi.init_inference(model_path=args.model, centers_path=args.centers, preprocess_meta=args.preprocess_meta) + model = owi._CACHED_MODEL + centers = owi._CACHED_CENTERS + device = 'cuda' if __import__('torch').cuda.is_available() else 'cpu' + model.to(device) + xt = x_tensor.to(device) + import torch + import torch.nn.functional as F + with torch.no_grad(): + logits = model(xt) + probs = F.softmax(logits, dim=1).cpu().numpy()[0] + pred_idx = int(probs.argmax()) + confidence = float(probs.max()) + out = {'pred_cluster': pred_idx, 'confidence': confidence, 'probabilities': probs.tolist(), 'centroid': centers[pred_idx].tolist() if centers is not None else None, 'feature_columns': feature_columns} + except Exception as e: + out = {'error': str(e)} + + # combine row and output into flat result + res = { + 'orig_index': i, + 'lat': lat, + 'lon': lon, + 'datetime': str(dt), + } + if 'error' in out: + res.update({'error': out['error']}) + else: + res.update({ + 'pred_cluster': int(out.get('pred_cluster')), + 'confidence': float(out.get('confidence')), + }) + results.append(res) + + if (len(results) % 50) == 0: + print(f'Processed {len(results)}/{nrows} rows...') + + elapsed = time.time() - t0 + print(f'Finished {len(results)} rows in {elapsed:.2f}s') + out_df = pd.DataFrame(results) + out_df.to_csv(args.out, index=False) + print('Wrote', args.out) + + +if __name__ == '__main__': + main() diff --git a/roadcast/tests/smoke_predict.py b/roadcast/tests/smoke_predict.py new file mode 100644 index 0000000..b215b0d --- /dev/null +++ b/roadcast/tests/smoke_predict.py @@ -0,0 +1,23 @@ +import sys +import types +import os + +# Ensure repo root on path +sys.path.insert(0, r"C:\Users\Samarth Jain\Documents\roadcast") + +# Create a fake openweather_inference module with a predictable function +mod = types.ModuleType("openweather_inference") + +def predict_from_openweather(lat, lon, dt_iso=None, street='', api_key=None, train_csv=None, preprocess_meta=None, model_path=None, centers_path=None, roadrisk_url=None): + return {"label": 5, "confidence": 0.87, "lat": lat, "lon": lon, "dt": dt_iso} + +mod.predict_from_openweather = predict_from_openweather +sys.modules["openweather_inference"] = mod + +# Import the Flask app and use its test client +from app import app + +c = app.test_client() +res = c.post("/predict-roadrisk", json={"lat": 38.9, "lon": -77.0}) +print("STATUS:", res.status_code) +print("JSON:", res.get_json()) diff --git a/roadcast/tmp_infer.csv b/roadcast/tmp_infer.csv new file mode 100644 index 0000000..1af6b82 --- /dev/null +++ b/roadcast/tmp_infer.csv @@ -0,0 +1,11 @@ +orig_index,lat,lon,datetime,error +0,38.91557,-77.031697,2011/03/06 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +1,38.875558,-77.017556,2011/03/06 08:45:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +2,38.872976,-77.016987,2011/03/05 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +3,38.929433,-77.003943,2011/03/08 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +4,38.89674,-77.027034,2011/03/08 17:18:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +5,38.89093,-76.993494,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +6,38.908478,-77.040086,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +7,38.846563,-76.976504,2011/03/12 05:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +8,38.894783,-77.01292,2011/03/12 18:30:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats +9,38.934204,-77.034567,2011/03/14 04:00:00+00,Either preprocess_meta or train_csv must be provided to derive feature stats diff --git a/roadcast/train.py b/roadcast/train.py new file mode 100644 index 0000000..c646093 --- /dev/null +++ b/roadcast/train.py @@ -0,0 +1,158 @@ +import os +import time +import torch +from torch import nn, optim +from torch.utils.data import DataLoader, random_split +from tqdm import tqdm + +from data import ImageFolderDataset, CSVDataset +from models import create_model + + +def train(dataset_root, epochs=3, batch_size=16, lr=1e-3, device=None, num_classes=10, model_type='cnn', csv_label='label', generate_labels=False, n_buckets=100, label_method='md5', label_store=None, feature_engineer=False, lat_lon_bins=20, nrows=None, seed=42, hidden_dims=None, weight_decay=0.0): + device = device or ('cuda' if torch.cuda.is_available() else 'cpu') + # Detect CSV vs folder dataset + if os.path.isfile(dataset_root) and dataset_root.lower().endswith('.csv'): + dataset = CSVDataset(dataset_root, + label_column=csv_label, + generate_labels=generate_labels, + n_buckets=n_buckets, + label_method=label_method, + label_store=label_store, + feature_engineer=feature_engineer, + lat_lon_bins=lat_lon_bins, + nrows=nrows) + # seed numpy/torch RNGs for reproducibility in experiments + try: + import numpy as _np + _np.random.seed(seed) + except Exception: + pass + try: + import random as _py_random + _py_random.seed(seed) + except Exception: + pass + try: + import torch as _torch + _torch.manual_seed(seed) + if _torch.cuda.is_available(): + _torch.cuda.manual_seed_all(seed) + except Exception: + pass + # determine input dim for MLP + input_dim = dataset.features.shape[1] + # persist preprocessing metadata so inference can reuse identical stats + try: + import numpy as _np + meta_path = os.path.join(os.getcwd(), 'preprocess_meta.npz') + _np.savez_compressed(meta_path, feature_columns=_np.array(dataset.feature_columns, dtype=object), means=dataset.feature_means, stds=dataset.feature_stds) + print(f'Saved preprocess meta to {meta_path}') + except Exception: + pass + if model_type == 'cnn': + raise ValueError('CSV dataset should use model_type="mlp"') + # if we generated labels, infer the actual number of classes from the dataset labels + if generate_labels and hasattr(dataset, 'labels'): + try: + model_num_classes = int(dataset.labels.max().item()) + 1 + except Exception: + model_num_classes = n_buckets + else: + model_num_classes = n_buckets if generate_labels else num_classes + # parse hidden_dims if provided by caller (tuple or list) + model = create_model(device=device, model_type='mlp', input_dim=input_dim, num_classes=model_num_classes, hidden_dims=hidden_dims) + else: + # assume folder of images + dataset = ImageFolderDataset(dataset_root) + model = create_model(device=device, model_type='cnn', input_size=(3, 224, 224), num_classes=num_classes) + + # simple train/val split + val_size = max(1, int(0.1 * len(dataset))) + train_size = len(dataset) - val_size + train_set, val_set = random_split(dataset, [train_size, val_size]) + train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True) + val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) + + best_val_acc = 0.0 + best_path = None + + for epoch in range(epochs): + model.train() + running_loss = 0.0 + pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}") + for xb, yb in pbar: + xb = xb.to(device) + yb = yb.to(device) + optimizer.zero_grad() + outputs = model(xb) + loss = criterion(outputs, yb) + loss.backward() + optimizer.step() + running_loss += loss.item() + pbar.set_postfix(loss=running_loss / (pbar.n + 1)) + + # validation + model.eval() + correct = 0 + total = 0 + with torch.no_grad(): + for xb, yb in val_loader: + xb = xb.to(device) + yb = yb.to(device) + outputs = model(xb) + preds = outputs.argmax(dim=1) + correct += (preds == yb).sum().item() + total += yb.size(0) + val_acc = correct / total if total > 0 else 0.0 + print(f"Epoch {epoch+1} val_acc={val_acc:.4f}") + + # save best + if val_acc > best_val_acc: + out_path = os.path.join(os.getcwd(), 'model.pth') + if hasattr(dataset, 'class_to_idx'): + meta = {'model_state_dict': model.state_dict(), 'class_to_idx': dataset.class_to_idx} + else: + meta = {'model_state_dict': model.state_dict()} + torch.save(meta, out_path) + best_val_acc = val_acc + best_path = out_path + print(f"Saved best model to {out_path} (val_acc={val_acc:.4f})") + + return best_path + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('data_root') + parser.add_argument('--epochs', type=int, default=3) + parser.add_argument('--batch-size', type=int, default=16) + parser.add_argument('--lr', type=float, default=1e-3) + parser.add_argument('--model-type', choices=['cnn', 'mlp'], default='cnn') + parser.add_argument('--csv-label', default='label') + parser.add_argument('--generate-labels', action='store_true', help='If set, generate labels from columns instead of expecting label column') + parser.add_argument('--n-buckets', type=int, default=100, help='Number of label buckets when generating labels') + parser.add_argument('--label-method', choices=['md5', 'kmeans'], default='md5', help='Method to generate labels when --generate-labels is set') + parser.add_argument('--label-store', default=None, help='Path to save/load label metadata (e.g., kmeans centers .npz)') + parser.add_argument('--subset', type=int, default=0, help='If set (>0), load only first N rows from CSV for fast experiments') + parser.add_argument('--feature-engineer', action='store_true', help='If set, add simple date and lat/lon engineered features') + parser.add_argument('--lat-lon-bins', type=int, default=20, help='Number of bins for lat/lon coarse spatial features') + parser.add_argument('--seed', type=int, default=42, help='Random seed for experiments') + parser.add_argument('--hidden-dims', type=str, default='', help='Comma-separated hidden dims for MLP, e.g. "256,128"') + parser.add_argument('--weight-decay', type=float, default=0.0, help='Weight decay (L2) for optimizer') + args = parser.parse_args() + data_root = args.data_root + nrows = args.subset if args.subset > 0 else None + # parse hidden dims + hidden_dims = None + if args.hidden_dims: + try: + hidden_dims = tuple(int(x) for x in args.hidden_dims.split(',') if x.strip()) + except Exception: + hidden_dims = None + train(data_root, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, model_type=args.model_type, csv_label=args.csv_label, generate_labels=args.generate_labels, n_buckets=args.n_buckets, label_method=args.label_method, label_store=args.label_store, feature_engineer=args.feature_engineer, lat_lon_bins=args.lat_lon_bins, nrows=nrows, seed=args.seed, hidden_dims=hidden_dims, weight_decay=args.weight_decay) +