diff --git a/backend/src/cv/__init__.py b/backend/src/cv/__init__.py index fcdbf9c..f563a57 100644 --- a/backend/src/cv/__init__.py +++ b/backend/src/cv/__init__.py @@ -1,474 +1,47 @@ -import cv2 -import numpy as np -import os -import json -from pathlib import Path -from typing import List, Dict, Optional, Tuple +from .config import ( + CV_DIR, + DATA_DIR, + MODELS_DIR, + ULTRALYTICS_AVAILABLE, + YOLO26_MODELS, + SUPER_CATEGORIES, + COMMON_BRANDS, + COLORS, + DEFAULT_CONF_THRESHOLD, + DEFAULT_IOU_THRESHOLD, + DEFAULT_IMG_SIZE, +) +from .detectors import ( + YOLO26Detector, + HybridLogoDetector, +) +from .yolo_scanner import ( + start_scanner as start_yolo_scanner, + detect_objects as detect_yolo_objects, +) +from .scanner import ( + start_interactive_capture as start_ollama_scanner, + capture_and_analyze as capture_ollama_once, +) -CV_DIR = Path(__file__).parent -DATA_DIR = CV_DIR / "data" -MODELS_DIR = CV_DIR / "models" - -SUPER_CATEGORIES = { - "Food": 932, - "Clothes": 604, - "Necessities": 432, - "Others": 371, - "Electronic": 224, - "Transportation": 213, - "Leisure": 111, - "Sports": 66, - "Medical": 47 -} - -COMMON_BRANDS = [ - "McDonalds", "Starbucks", "CocaCola", "Pepsi", "KFC", "BurgerKing", - "Subway", "DunkinDonuts", "PizzaHut", "Dominos", "Nestle", "Heineken", - "Nike", "Adidas", "Puma", "UnderArmour", "Levis", "HM", "Zara", "Gap", - "Gucci", "LouisVuitton", "Chanel", "Versace", "Prada", "Armani", - "Apple", "Samsung", "HP", "Dell", "Intel", "AMD", "Nvidia", "Microsoft", - "Sony", "LG", "Huawei", "Xiaomi", "Lenovo", "Asus", "Acer", - "BMW", "Mercedes", "Audi", "Toyota", "Honda", "Ford", "Chevrolet", - "Volkswagen", "Tesla", "Porsche", "Ferrari", "Lamborghini", "Nissan", - "Google", "Facebook", "Twitter", "Instagram", "YouTube", "Amazon", - "Netflix", "Spotify", "Uber", "Airbnb", "PayPal", "Visa", "Mastercard" +__all__ = [ + "CV_DIR", + "DATA_DIR", + "MODELS_DIR", + "ULTRALYTICS_AVAILABLE", + "YOLO26_MODELS", + "SUPER_CATEGORIES", + "COMMON_BRANDS", + "COLORS", + "DEFAULT_CONF_THRESHOLD", + "DEFAULT_IOU_THRESHOLD", + "DEFAULT_IMG_SIZE", + "YOLO26Detector", + "HybridLogoDetector", + "start_yolo_scanner", + "detect_yolo_objects", + "start_ollama_scanner", + "capture_ollama_once", ] -class LogoDet3KDataset: - def __init__(self, dataset_path: Optional[str] = None): - self.dataset_path = None - self.categories = {} - self.brand_templates = {} - - if dataset_path and os.path.exists(dataset_path): - self.dataset_path = Path(dataset_path) - else: - default_paths = [ - DATA_DIR / "LogoDet-3K", - DATA_DIR / "logodet3k", - Path.home() / "Downloads" / "LogoDet-3K", - Path.home() / ".kaggle" / "datasets" / "lyly99" / "logodet3k", - ] - for path in default_paths: - if path.exists(): - self.dataset_path = path - break - - if self.dataset_path: - self._load_categories() - print(f"LogoDet-3K dataset loaded from: {self.dataset_path}") - print(f"Found {len(self.categories)} brand categories") - else: - print("LogoDet-3K dataset not found locally.") - print("\nTo download the dataset:") - print("1. Install kaggle CLI: pip install kaggle") - print("2. Download: kaggle datasets download -d lyly99/logodet3k") - print("3. Extract to:", DATA_DIR / "LogoDet-3K") - - def _load_categories(self): - if not self.dataset_path: - return - - for super_cat in self.dataset_path.iterdir(): - if super_cat.is_dir() and not super_cat.name.startswith('.'): - for brand_dir in super_cat.iterdir(): - if brand_dir.is_dir(): - brand_name = brand_dir.name - self.categories[brand_name] = { - "super_category": super_cat.name, - "path": brand_dir, - "images": list(brand_dir.glob("*.jpg")) + list(brand_dir.glob("*.png")) - } - - def get_brand_templates(self, brand_name: str, max_templates: int = 5) -> List[np.ndarray]: - if brand_name not in self.categories: - return [] - - templates = [] - images = self.categories[brand_name]["images"][:max_templates] - - for img_path in images: - img = cv2.imread(str(img_path)) - if img is not None: - templates.append(img) - - return templates - - def get_all_brands(self) -> List[str]: - return list(self.categories.keys()) - - def get_brands_by_category(self, super_category: str) -> List[str]: - return [ - name for name, info in self.categories.items() - if info["super_category"].lower() == super_category.lower() - ] - -class LogoDetector: - def __init__(self, - model_path: Optional[str] = None, - dataset_path: Optional[str] = None, - use_gpu: bool = True): - self.model_path = model_path - self.use_gpu = use_gpu - self.net = None - self.dataset = LogoDet3KDataset(dataset_path) - - self.conf_threshold = 0.3 - self.nms_threshold = 0.4 - - self.orb = cv2.ORB_create(nfeatures=1000) - self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) - - try: - self.sift = cv2.SIFT_create() - self.flann_matcher = cv2.FlannBasedMatcher( - {"algorithm": 1, "trees": 5}, - {"checks": 50} - ) - except: - self.sift = None - self.flann_matcher = None - - self.brand_features = {} - self._load_model() - self._cache_brand_features() - - def _load_model(self): - if not self.model_path or not os.path.exists(self.model_path): - return - - try: - print(f"Loading model: {self.model_path}") - - if self.model_path.endswith('.onnx'): - self.net = cv2.dnn.readNetFromONNX(self.model_path) - else: - self.net = cv2.dnn.readNet(self.model_path) - - if self.use_gpu: - try: - self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_DEFAULT) - self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_OPENCL) - print("✅ Using OpenCL GPU acceleration") - except: - print("⚠️ GPU not available, using CPU") - - print("Model loaded successfully!") - - except Exception as e: - print(f"Failed to load model: {e}") - self.net = None - - def _cache_brand_features(self): - if not self.dataset.categories: - return - - print("Caching brand features (this may take a moment)...") - - brands_to_cache = [b for b in COMMON_BRANDS if b in self.dataset.categories][:50] - - for brand in brands_to_cache: - templates = self.dataset.get_brand_templates(brand, max_templates=3) - if templates: - features = [] - for tmpl in templates: - gray = cv2.cvtColor(tmpl, cv2.COLOR_BGR2GRAY) - kp, des = self.orb.detectAndCompute(gray, None) - if des is not None: - features.append((kp, des)) - - if features: - self.brand_features[brand] = features - - print(f"Cached features for {len(self.brand_features)} brands") - - def detect(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]: - if conf_threshold is None: - conf_threshold = self.conf_threshold - - detections = [] - - if self.net is not None: - detections = self._detect_with_model(frame, conf_threshold) - - if not detections and self.brand_features: - detections = self._detect_with_features(frame, conf_threshold) - - if not detections: - detections = self._detect_logo_regions(frame) - - return detections - - def _detect_with_model(self, frame: np.ndarray, conf_threshold: float) -> List[Dict]: - height, width = frame.shape[:2] - - blob = cv2.dnn.blobFromImage( - frame, - scalefactor=1/255.0, - size=(640, 640), - swapRB=True, - crop=False - ) - - self.net.setInput(blob) - - try: - output_names = self.net.getUnconnectedOutLayersNames() - outputs = self.net.forward(output_names) - except: - outputs = [self.net.forward()] - - detections = [] - boxes = [] - confidences = [] - class_ids = [] - - for output in outputs: - if len(output.shape) == 3: - output = output[0] - - for detection in output: - if len(detection) < 5: - continue - - scores = detection[4:] if len(detection) > 5 else [detection[4]] - class_id = np.argmax(scores) if len(scores) > 1 else 0 - confidence = float(scores[class_id]) if len(scores) > 1 else float(scores[0]) - - if confidence > conf_threshold: - cx, cy, w, h = detection[:4] - scale_x = width / 640 - scale_y = height / 640 - - x1 = int((cx - w/2) * scale_x) - y1 = int((cy - h/2) * scale_y) - x2 = int((cx + w/2) * scale_x) - y2 = int((cy + h/2) * scale_y) - - boxes.append([x1, y1, x2-x1, y2-y1]) - confidences.append(confidence) - class_ids.append(class_id) - - if boxes: - indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, self.nms_threshold) - for i in indices: - idx = i[0] if isinstance(i, (list, tuple, np.ndarray)) else i - x, y, w, h = boxes[idx] - detections.append({ - "bbox": (x, y, x + w, y + h), - "label": f"Logo-{class_ids[idx]}", - "confidence": confidences[idx], - "class_id": class_ids[idx] - }) - - return detections - - def _detect_with_features(self, frame: np.ndarray, conf_threshold: float) -> List[Dict]: - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - kp_frame, des_frame = self.orb.detectAndCompute(gray, None) - - if des_frame is None or len(kp_frame) < 10: - return [] - - detections = [] - best_matches = [] - - for brand, feature_list in self.brand_features.items(): - for kp_tmpl, des_tmpl in feature_list: - try: - matches = self.bf_matcher.match(des_tmpl, des_frame) - matches = sorted(matches, key=lambda x: x.distance) - good_matches = [m for m in matches[:50] if m.distance < 60] - - if len(good_matches) >= 8: - pts = np.float32([kp_frame[m.trainIdx].pt for m in good_matches]) - if len(pts) > 0: - x_min, y_min = pts.min(axis=0).astype(int) - x_max, y_max = pts.max(axis=0).astype(int) - avg_dist = np.mean([m.distance for m in good_matches]) - confidence = max(0.3, 1.0 - (avg_dist / 100)) - - if confidence >= conf_threshold: - best_matches.append({ - "bbox": (x_min, y_min, x_max, y_max), - "label": brand, - "confidence": confidence, - "match_count": len(good_matches) - }) - except Exception: - continue - - if best_matches: - best_matches.sort(key=lambda x: x["confidence"], reverse=True) - detections = best_matches[:5] - - return detections - - def _detect_logo_regions(self, frame: np.ndarray) -> List[Dict]: - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - blurred = cv2.GaussianBlur(gray, (5, 5), 0) - edges = cv2.Canny(blurred, 80, 200) - - kernel = np.ones((3, 3), np.uint8) - edges = cv2.dilate(edges, kernel, iterations=1) - edges = cv2.erode(edges, kernel, iterations=1) - - contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - detections = [] - height, width = frame.shape[:2] - min_area = (width * height) * 0.01 - max_area = (width * height) * 0.15 - - for contour in contours: - area = cv2.contourArea(contour) - if area < min_area or area > max_area: - continue - - x, y, w, h = cv2.boundingRect(contour) - aspect_ratio = w / h if h > 0 else 0 - - if aspect_ratio < 0.5 or aspect_ratio > 2.0: - continue - - hull = cv2.convexHull(contour) - hull_area = cv2.contourArea(hull) - solidity = area / hull_area if hull_area > 0 else 0 - - if solidity < 0.3: - continue - - roi = gray[y:y+h, x:x+w] - if roi.size == 0: - continue - - corners = cv2.goodFeaturesToTrack(roi, 50, 0.01, 5) - if corners is None or len(corners) < 15: - continue - - roi_edges = edges[y:y+h, x:x+w] - edge_density = np.sum(roi_edges > 0) / (w * h) if (w * h) > 0 else 0 - - if edge_density < 0.05 or edge_density > 0.5: - continue - - corner_score = min(1.0, len(corners) / 40) - solidity_score = solidity - aspect_score = 1.0 - abs(1.0 - aspect_ratio) / 2 - - confidence = (corner_score * 0.4 + solidity_score * 0.3 + aspect_score * 0.3) - - if confidence >= 0.6: - detections.append({ - "bbox": (x, y, x + w, y + h), - "label": "Potential Logo", - "confidence": confidence, - "class_id": -1 - }) - - detections.sort(key=lambda x: x["confidence"], reverse=True) - return detections[:3] - - def draw_detections(self, frame: np.ndarray, detections: List[Dict]) -> np.ndarray: - result = frame.copy() - - for det in detections: - x1, y1, x2, y2 = det["bbox"] - label = det["label"] - conf = det["confidence"] - - if conf > 0.7: - color = (0, 255, 0) - elif conf > 0.5: - color = (0, 255, 255) - else: - color = (0, 165, 255) - - cv2.rectangle(result, (x1, y1), (x2, y2), color, 2) - label_text = f"{label}: {conf:.2f}" - (text_w, text_h), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) - cv2.rectangle(result, (x1, y1 - text_h - 6), (x1 + text_w + 4, y1), color, -1) - cv2.putText(result, label_text, (x1 + 2, y1 - 4), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) - - return result - -def start_scanner(model_path: Optional[str] = None, - dataset_path: Optional[str] = None, - use_gui: bool = True): - print("=" * 60) - print("LogoDet-3K Logo Scanner") - print("3,000 logo categories | 9 super-categories | 200K+ objects") - print("=" * 60) - - detector = LogoDetector( - model_path=model_path, - dataset_path=dataset_path, - use_gpu=True - ) - - cap = cv2.VideoCapture(0) - if not cap.isOpened(): - print("\nError: Could not access camera.") - return - - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 - - writer = None - output_path = CV_DIR / "output.mp4" - - print(f"\n📷 Camera: {width}x{height} @ {fps:.1f}fps") - print("Press 'q' to quit\n") - - frame_count = 0 - try: - while True: - ret, frame = cap.read() - if not ret: - break - - frame_count += 1 - detections = detector.detect(frame) - result_frame = detector.draw_detections(frame, detections) - - info_text = f"Logos: {len(detections)} | Frame: {frame_count}" - cv2.putText(result_frame, info_text, (10, 30), - cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) - - if use_gui: - try: - cv2.imshow('LogoDet-3K Scanner', result_frame) - key = cv2.waitKey(1) & 0xFF - if key == ord('q'): - break - elif key == ord('s'): - cv2.imwrite(str(CV_DIR / f"screenshot_{frame_count}.jpg"), result_frame) - except cv2.error: - use_gui = False - writer = cv2.VideoWriter( - str(output_path), - cv2.VideoWriter_fourcc(*'mp4v'), - fps, - (width, height) - ) - - if not use_gui and writer: - writer.write(result_frame) - except KeyboardInterrupt: - pass - finally: - cap.release() - if writer: - writer.release() - cv2.destroyAllWindows() - -if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser() - parser.add_argument("--model", "-m", type=str) - parser.add_argument("--dataset", "-d", type=str) - parser.add_argument("--no-gui", action="store_true") - args = parser.parse_args() - start_scanner(model_path=args.model, dataset_path=args.dataset, use_gui=not args.no_gui) \ No newline at end of file +__version__ = "2.0.0" diff --git a/backend/src/cv/__main__.py b/backend/src/cv/__main__.py new file mode 100644 index 0000000..9ae637f --- /dev/null +++ b/backend/src/cv/__main__.py @@ -0,0 +1,4 @@ +from .cli import main + +if __name__ == "__main__": + main() diff --git a/backend/src/cv/cli.py b/backend/src/cv/cli.py new file mode 100644 index 0000000..e2c7ec1 --- /dev/null +++ b/backend/src/cv/cli.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +import argparse + +from .config import YOLO26_MODELS +from .yolo_scanner import start_scanner as start_yolo, detect_objects +from .scanner import start_interactive_capture as start_ollama + +def main(): + parser = argparse.ArgumentParser( + description="Ollama and YOLO Logo Detection Scanner" + ) + + parser.add_argument("--model", "-m", type=str) + parser.add_argument("--size", "-s", type=str, default="nano", + choices=["nano", "small", "medium", "large", "xlarge"]) + parser.add_argument("--logo-model", type=str) + parser.add_argument("--yolo", action="store_true") + parser.add_argument("--no-gui", action="store_true") + parser.add_argument("--track", "-t", action="store_true") + parser.add_argument("--hybrid", action="store_true") + parser.add_argument("--image", "-i", type=str) + + args = parser.parse_args() + + if args.image: + detections = detect_objects( + args.image, model_size=args.size, hybrid_mode=args.hybrid + ) + print(f"Found {len(detections)} detections:") + for det in detections: + print(f" {det['label']}: {det['confidence']:.2%}") + + elif args.yolo: + start_yolo( + model_path=args.model, + model_size=args.size, + logo_model_path=args.logo_model, + use_gui=not args.no_gui, + use_tracking=args.track, + hybrid_mode=args.hybrid + ) + + else: + start_ollama() + +if __name__ == "__main__": + main() diff --git a/backend/src/cv/config.py b/backend/src/cv/config.py new file mode 100644 index 0000000..e167391 --- /dev/null +++ b/backend/src/cv/config.py @@ -0,0 +1,61 @@ +import os +from pathlib import Path +from typing import Dict + +CV_DIR = Path(__file__).parent +DATA_DIR = CV_DIR / "data" +MODELS_DIR = CV_DIR / "models" + +DATA_DIR.mkdir(parents=True, exist_ok=True) +MODELS_DIR.mkdir(parents=True, exist_ok=True) + +try: + from ultralytics import YOLO + ULTRALYTICS_AVAILABLE = True +except ImportError: + ULTRALYTICS_AVAILABLE = False + YOLO = None + +YOLO26_MODELS: Dict[str, str] = { + "nano": "yolo26n.pt", + "small": "yolo26s.pt", + "medium": "yolo26m.pt", + "large": "yolo26l.pt", + "xlarge": "yolo26x.pt", +} + +SUPER_CATEGORIES: Dict[str, int] = { + "Food": 932, + "Clothes": 604, + "Necessities": 432, + "Others": 371, + "Electronic": 224, + "Transportation": 213, + "Leisure": 111, + "Sports": 66, + "Medical": 47 +} + +COMMON_BRANDS = [ + "McDonalds", "Starbucks", "CocaCola", "Pepsi", "KFC", "BurgerKing", + "Subway", "DunkinDonuts", "PizzaHut", "Dominos", "Nestle", "Heineken", + "Nike", "Adidas", "Puma", "UnderArmour", "Levis", "HM", "Zara", "Gap", + "Gucci", "LouisVuitton", "Chanel", "Versace", "Prada", "Armani", + "Apple", "Samsung", "HP", "Dell", "Intel", "AMD", "Nvidia", "Microsoft", + "Sony", "LG", "Huawei", "Xiaomi", "Lenovo", "Asus", "Acer", + "BMW", "Mercedes", "Audi", "Toyota", "Honda", "Ford", "Chevrolet", + "Volkswagen", "Tesla", "Porsche", "Ferrari", "Lamborghini", "Nissan", + "Google", "Facebook", "Twitter", "Instagram", "YouTube", "Amazon", + "Netflix", "Spotify", "Uber", "Airbnb", "PayPal", "Visa", "Mastercard" +] + +COLORS = { + "high_conf": (0, 255, 0), + "medium_conf": (0, 255, 255), + "low_conf": (0, 165, 255), + "logo": (255, 0, 255), +} + +DEFAULT_CONF_THRESHOLD = 0.25 +DEFAULT_IOU_THRESHOLD = 0.45 +DEFAULT_IMG_SIZE = 640 diff --git a/backend/src/cv/detectors/__init__.py b/backend/src/cv/detectors/__init__.py new file mode 100644 index 0000000..6681f8f --- /dev/null +++ b/backend/src/cv/detectors/__init__.py @@ -0,0 +1,7 @@ +from .yolo26 import YOLO26Detector +from .hybrid import HybridLogoDetector + +__all__ = [ + "YOLO26Detector", + "HybridLogoDetector", +] diff --git a/backend/src/cv/detectors/hybrid.py b/backend/src/cv/detectors/hybrid.py new file mode 100644 index 0000000..e4beed0 --- /dev/null +++ b/backend/src/cv/detectors/hybrid.py @@ -0,0 +1,154 @@ +import cv2 +import numpy as np +import os +from typing import List, Dict, Optional + +from ..config import ( + ULTRALYTICS_AVAILABLE, + MODELS_DIR, + COLORS, + DEFAULT_CONF_THRESHOLD, +) +from .yolo26 import YOLO26Detector + +if ULTRALYTICS_AVAILABLE: + from ultralytics import YOLO + +class HybridLogoDetector: + def __init__(self, + coco_model_size: str = "nano", + logo_model_path: Optional[str] = None, + conf_threshold: float = DEFAULT_CONF_THRESHOLD, + device: str = "auto"): + self.conf_threshold = conf_threshold + self.device = device + self.coco_detector = None + self.logo_model = None + + if not ULTRALYTICS_AVAILABLE: + raise RuntimeError("Ultralytics not installed. Run: pip install ultralytics") + + print("Loading YOLO26 COCO base model...") + self.coco_detector = YOLO26Detector( + model_size=coco_model_size, + conf_threshold=conf_threshold, + device=device + ) + + if logo_model_path and os.path.exists(logo_model_path): + print(f"Loading logo model: {logo_model_path}") + self.logo_model = YOLO(logo_model_path) + print("Logo model loaded!") + else: + default_logo_model = MODELS_DIR / "logo_detector.pt" + if default_logo_model.exists(): + print(f"Loading default logo model: {default_logo_model}") + self.logo_model = YOLO(str(default_logo_model)) + print("Logo model loaded!") + else: + print("No logo model found.") + + print("Hybrid detector ready!") + + def detect(self, + frame: np.ndarray, + detect_objects: bool = True, + detect_logos: bool = True, + conf_threshold: Optional[float] = None) -> List[Dict]: + conf = conf_threshold if conf_threshold is not None else self.conf_threshold + all_detections = [] + + if detect_objects and self.coco_detector: + object_detections = self.coco_detector.detect(frame, conf_threshold=conf) + for det in object_detections: + det["type"] = "object" + all_detections.extend(object_detections) + + if detect_logos and self.logo_model: + logo_detections = self._detect_logos(frame, conf) + for det in logo_detections: + det["type"] = "logo" + all_detections.extend(logo_detections) + + return all_detections + + def _detect_logos(self, frame: np.ndarray, conf_threshold: float) -> List[Dict]: + if self.logo_model is None: + return [] + + results = self.logo_model( + frame, + conf=conf_threshold, + device=self.device if self.device != "auto" else None, + verbose=False + ) + + detections = [] + for result in results: + boxes = result.boxes + if boxes is None: + continue + + for i in range(len(boxes)): + xyxy = boxes.xyxy[i].cpu().numpy() + x1, y1, x2, y2 = map(int, xyxy) + conf_val = float(boxes.conf[i].cpu().numpy()) + class_id = int(boxes.cls[i].cpu().numpy()) + label = self.logo_model.names[class_id] + + detections.append({ + "bbox": (x1, y1, x2, y2), + "label": label, + "confidence": conf_val, + "class_id": class_id, + "brand": label + }) + + return detections + + def draw_detections(self, + frame: np.ndarray, + detections: List[Dict], + show_labels: bool = True) -> np.ndarray: + result = frame.copy() + + for det in detections: + x1, y1, x2, y2 = det["bbox"] + label = det["label"] + conf = det["confidence"] + det_type = det.get("type", "object") + + if det_type == "logo": + color = COLORS["logo"] + elif conf > 0.7: + color = COLORS["high_conf"] + elif conf > 0.5: + color = COLORS["medium_conf"] + else: + color = COLORS["low_conf"] + + cv2.rectangle(result, (x1, y1), (x2, y2), color, 2) + + if show_labels: + label_text = f"{label}: {conf:.2f}" + (text_w, text_h), _ = cv2.getTextSize( + label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1 + ) + cv2.rectangle( + result, + (x1, y1 - text_h - 8), + (x1 + text_w + 4, y1), + color, + -1 + ) + cv2.putText( + result, + label_text, + (x1 + 2, y1 - 4), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (255, 255, 255) if det_type == "logo" else (0, 0, 0), + 1 + ) + + return result diff --git a/backend/src/cv/detectors/yolo26.py b/backend/src/cv/detectors/yolo26.py new file mode 100644 index 0000000..3d57700 --- /dev/null +++ b/backend/src/cv/detectors/yolo26.py @@ -0,0 +1,186 @@ +import cv2 +import numpy as np +import os +from typing import List, Dict, Optional + +from ..config import ( + ULTRALYTICS_AVAILABLE, + YOLO26_MODELS, + COLORS, + DEFAULT_CONF_THRESHOLD, + DEFAULT_IOU_THRESHOLD, +) + +if ULTRALYTICS_AVAILABLE: + from ultralytics import YOLO + +class YOLO26Detector: + def __init__(self, + model_size: str = "nano", + model_path: Optional[str] = None, + conf_threshold: float = DEFAULT_CONF_THRESHOLD, + iou_threshold: float = DEFAULT_IOU_THRESHOLD, + device: str = "auto"): + self.conf_threshold = conf_threshold + self.iou_threshold = iou_threshold + self.device = device + self.model = None + + if not ULTRALYTICS_AVAILABLE: + raise RuntimeError("Ultralytics not installed. Run: pip install ultralytics") + + if model_path and os.path.exists(model_path): + model_name = model_path + elif model_size in YOLO26_MODELS: + model_name = YOLO26_MODELS[model_size] + else: + print(f"Unknown model size '{model_size}', defaulting to 'nano'") + model_name = YOLO26_MODELS["nano"] + + print(f"Loading YOLO26 model: {model_name}") + self.model = YOLO(model_name) + print(f"YOLO26 model loaded successfully!") + print(f"Classes: {len(self.model.names)} | Device: {device}") + + def detect(self, + frame: np.ndarray, + conf_threshold: Optional[float] = None, + classes: Optional[List[int]] = None) -> List[Dict]: + if self.model is None: + return [] + + conf = conf_threshold if conf_threshold is not None else self.conf_threshold + + results = self.model( + frame, + conf=conf, + iou=self.iou_threshold, + device=self.device if self.device != "auto" else None, + classes=classes, + verbose=False + ) + + detections = [] + for result in results: + boxes = result.boxes + if boxes is None: + continue + + for i in range(len(boxes)): + xyxy = boxes.xyxy[i].cpu().numpy() + x1, y1, x2, y2 = map(int, xyxy) + + conf_val = float(boxes.conf[i].cpu().numpy()) + class_id = int(boxes.cls[i].cpu().numpy()) + label = self.model.names[class_id] + + detections.append({ + "bbox": (x1, y1, x2, y2), + "label": label, + "confidence": conf_val, + "class_id": class_id + }) + + return detections + + def detect_and_track(self, + frame: np.ndarray, + conf_threshold: Optional[float] = None, + tracker: str = "bytetrack.yaml") -> List[Dict]: + if self.model is None: + return [] + + conf = conf_threshold if conf_threshold is not None else self.conf_threshold + + results = self.model.track( + frame, + conf=conf, + iou=self.iou_threshold, + device=self.device if self.device != "auto" else None, + tracker=tracker, + persist=True, + verbose=False + ) + + detections = [] + for result in results: + boxes = result.boxes + if boxes is None: + continue + + for i in range(len(boxes)): + xyxy = boxes.xyxy[i].cpu().numpy() + x1, y1, x2, y2 = map(int, xyxy) + + conf_val = float(boxes.conf[i].cpu().numpy()) + class_id = int(boxes.cls[i].cpu().numpy()) + label = self.model.names[class_id] + + track_id = None + if boxes.id is not None: + track_id = int(boxes.id[i].cpu().numpy()) + + detections.append({ + "bbox": (x1, y1, x2, y2), + "label": label, + "confidence": conf_val, + "class_id": class_id, + "track_id": track_id + }) + + return detections + + def draw_detections(self, + frame: np.ndarray, + detections: List[Dict], + show_labels: bool = True, + show_conf: bool = True) -> np.ndarray: + result = frame.copy() + + for det in detections: + x1, y1, x2, y2 = det["bbox"] + label = det["label"] + conf = det["confidence"] + track_id = det.get("track_id") + + if conf > 0.7: + color = COLORS["high_conf"] + elif conf > 0.5: + color = COLORS["medium_conf"] + else: + color = COLORS["low_conf"] + + cv2.rectangle(result, (x1, y1), (x2, y2), color, 2) + + if show_labels: + label_parts = [label] + if track_id is not None: + label_parts.append(f"ID:{track_id}") + if show_conf: + label_parts.append(f"{conf:.2f}") + label_text = " | ".join(label_parts) + + (text_w, text_h), baseline = cv2.getTextSize( + label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1 + ) + cv2.rectangle( + result, + (x1, y1 - text_h - 8), + (x1 + text_w + 4, y1), + color, + -1 + ) + cv2.putText( + result, + label_text, + (x1 + 2, y1 - 4), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 0, 0), + 1 + ) + + return result + + def get_class_names(self) -> Dict[int, str]: + return self.model.names if self.model else {} diff --git a/backend/src/cv/scanner.py b/backend/src/cv/scanner.py new file mode 100644 index 0000000..7078da8 --- /dev/null +++ b/backend/src/cv/scanner.py @@ -0,0 +1,197 @@ +import cv2 +import json +import numpy as np +from datetime import datetime +from pathlib import Path +from typing import Dict, Optional +from ..ollama.detector import OllamaLogoDetector + +def capture_and_analyze(model: str = "ministral-3:latest", + save_image: bool = True, + output_dir: Optional[str] = None) -> Dict: + cap = cv2.VideoCapture(0) + if not cap.isOpened(): + raise RuntimeError("Could not access camera") + + print("Camera ready. Press SPACE to capture, Q to quit.") + + result = None + + while True: + ret, frame = cap.read() + if not ret: + break + + display = frame.copy() + cv2.putText(display, "Press SPACE to capture | Q to quit", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) + cv2.imshow("Capture", display) + + key = cv2.waitKey(1) & 0xFF + + if key == ord(' '): + print("Analyzing image...") + + if save_image: + if output_dir is None: + output_dir = "./captures" + Path(output_dir).mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + img_path = Path(output_dir) / f"capture_{timestamp}.jpg" + cv2.imwrite(str(img_path), frame) + print(f"Saved: {img_path}") + + detector = OllamaLogoDetector(model=model) + result = detector.detect_from_numpy(frame) + + _display_results(result) + break + + elif key == ord('q'): + break + + cap.release() + cv2.destroyAllWindows() + + return result if result else {"logos_detected": [], "total_count": 0} + +def start_interactive_capture(model: str = "ministral-3:latest", + save_images: bool = True, + output_dir: Optional[str] = None): + cap = cv2.VideoCapture(0) + if not cap.isOpened(): + raise RuntimeError("Could not access camera") + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + print("=" * 60) + print("Ollama Logo Detection - Interactive Mode") + print("=" * 60) + print(f"Camera: {width}x{height}") + print(f"Model: {model}") + print("\nControls:") + print(" SPACE - Capture and analyze") + print(" S - Save frame only") + print(" R - Show last results") + print(" Q - Quit") + print("=" * 60) + + detector = OllamaLogoDetector(model=model) + last_result = None + analyzing = False + status_message = "Ready - Press SPACE to capture" + + if output_dir is None: + output_dir = "./captures" + Path(output_dir).mkdir(parents=True, exist_ok=True) + + while True: + ret, frame = cap.read() + if not ret: + break + + display = frame.copy() + + cv2.rectangle(display, (0, 0), (width, 40), (40, 40, 40), -1) + cv2.putText(display, status_message, (10, 28), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) + + if last_result and last_result.get("logos_detected"): + brands = [l.get("brand", "?") for l in last_result["logos_detected"]] + brand_text = f"Detected: {', '.join(brands[:3])}" + if len(brands) > 3: + brand_text += f" +{len(brands)-3} more" + cv2.rectangle(display, (0, height-35), (width, height), (40, 40, 40), -1) + cv2.putText(display, brand_text, (10, height-10), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) + + cv2.imshow("Ollama Logo Detection", display) + + key = cv2.waitKey(1) & 0xFF + + if key == ord(' ') and not analyzing: + analyzing = True + status_message = "Analyzing with Ollama..." + cv2.imshow("Ollama Logo Detection", display) + cv2.waitKey(1) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + img_path = Path(output_dir) / f"capture_{timestamp}.jpg" + + if save_images: + cv2.imwrite(str(img_path), frame) + + last_result = detector.detect_from_numpy(frame) + + json_path = Path(output_dir) / f"result_{timestamp}.json" + with open(json_path, 'w') as f: + json.dump(last_result, f, indent=2) + + count = last_result.get("total_count", 0) + if count > 0: + status_message = f"Found {count} logo(s)! Press R for details" + else: + status_message = "No logos detected. Try again!" + + print(f"\nCaptured: {img_path}") + print(f"Results: {json_path}") + _display_results(last_result) + + analyzing = False + + elif key == ord('s'): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + img_path = Path(output_dir) / f"capture_{timestamp}.jpg" + cv2.imwrite(str(img_path), frame) + status_message = f"Saved: {img_path.name}" + print(f"Saved: {img_path}") + + elif key == ord('r') and last_result: + print("\n" + "=" * 40) + print("Last Detection Results:") + print("=" * 40) + _display_results(last_result) + + elif key == ord('q'): + break + + cap.release() + cv2.destroyAllWindows() + + print("\nGoodbye!") + return last_result + +def _display_results(result: Dict): + print("\n" + "-" * 40) + + logos = result.get("logos_detected", []) + count = result.get("total_count", len(logos)) + + if count == 0: + print("No logos or brands detected") + if "description" in result: + print(f"Description: {result['description']}") + else: + print(f"Detected {count} logo(s)/brand(s):\n") + for i, logo in enumerate(logos, 1): + brand = logo.get("brand", "Unknown") + conf = logo.get("confidence", "unknown") + loc = logo.get("location", "unknown") + cat = logo.get("category", "") + + print(f" {i}. {brand}") + print(f" Confidence: {conf}") + print(f" Location: {loc}") + if cat: + print(f" Category: {cat}") + print() + + if "error" in result: + print(f"Error: {result['error']}") + + print("-" * 40) + + print("\nJSON Output:") + print(json.dumps(result, indent=2)) diff --git a/backend/src/cv/vision.py b/backend/src/cv/vision.py new file mode 100644 index 0000000..4bbedd3 --- /dev/null +++ b/backend/src/cv/vision.py @@ -0,0 +1,28 @@ +from .config import ( + CV_DIR, + DATA_DIR, + MODELS_DIR, + ULTRALYTICS_AVAILABLE, + YOLO26_MODELS, + SUPER_CATEGORIES, + COMMON_BRANDS, + COLORS, + DEFAULT_CONF_THRESHOLD, + DEFAULT_IOU_THRESHOLD, + DEFAULT_IMG_SIZE, +) +from .detectors import ( + YOLO26Detector, + HybridLogoDetector, +) +from .yolo_scanner import ( + start_scanner as start_yolo_scanner, + detect_objects as detect_yolo_objects, +) +from .scanner import ( + start_interactive_capture as start_ollama_scanner, +) + +if __name__ == "__main__": + from .cli import main + main() \ No newline at end of file diff --git a/backend/src/cv/yolo_scanner.py b/backend/src/cv/yolo_scanner.py new file mode 100644 index 0000000..56ad225 --- /dev/null +++ b/backend/src/cv/yolo_scanner.py @@ -0,0 +1,166 @@ +import cv2 +from pathlib import Path +from typing import List, Dict, Optional + +from .config import ( + CV_DIR, + ULTRALYTICS_AVAILABLE, +) +from .detectors import YOLO26Detector, HybridLogoDetector + +def start_scanner(model_path: Optional[str] = None, + model_size: str = "nano", + logo_model_path: Optional[str] = None, + use_gui: bool = True, + use_tracking: bool = False, + hybrid_mode: bool = False): + print("=" * 60) + if hybrid_mode: + print("YOLO26 Hybrid Scanner (COCO + Logos)") + else: + print("YOLO26 Object Detection Scanner") + print("=" * 60) + + detector = None + + if hybrid_mode and ULTRALYTICS_AVAILABLE: + try: + detector = HybridLogoDetector( + coco_model_size=model_size, + logo_model_path=logo_model_path, + conf_threshold=0.25, + device="auto" + ) + except Exception as e: + print(f"Hybrid detector failed: {e}") + hybrid_mode = False + + if detector is None and ULTRALYTICS_AVAILABLE: + try: + detector = YOLO26Detector( + model_size=model_size, + model_path=model_path, + conf_threshold=0.25, + device="auto" + ) + except Exception as e: + print(f"YOLO26 failed: {e}") + + if detector is None: + print("Error: No detector available.") + return + + cap = cv2.VideoCapture(0) + if not cap.isOpened(): + print("Error: Could not access camera.") + return + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 + + writer = None + output_path = CV_DIR / "output.mp4" + + print(f"Camera: {width}x{height} @ {fps:.1f}fps") + print("Controls: q=quit | s=screenshot | t=tracking") + if hybrid_mode: + print(" o=objects | l=logos | b=both") + + frame_count = 0 + detect_objects_flag = True + detect_logos_flag = True + + try: + while True: + ret, frame = cap.read() + if not ret: + break + + frame_count += 1 + + if hybrid_mode and isinstance(detector, HybridLogoDetector): + detections = detector.detect( + frame, + detect_objects=detect_objects_flag, + detect_logos=detect_logos_flag + ) + elif use_tracking and isinstance(detector, YOLO26Detector): + detections = detector.detect_and_track(frame) + else: + detections = detector.detect(frame) + + result_frame = detector.draw_detections(frame, detections) + + mode_str = "HYBRID" if hybrid_mode else ("TRACK" if use_tracking else "DETECT") + cv2.putText(result_frame, f"{mode_str} | {len(detections)} objects", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) + + if use_gui: + try: + cv2.imshow('YOLO26 Scanner', result_frame) + key = cv2.waitKey(1) & 0xFF + if key == ord('q'): + break + elif key == ord('s'): + path = CV_DIR / f"screenshot_{frame_count}.jpg" + cv2.imwrite(str(path), result_frame) + print(f"Saved: {path}") + elif key == ord('t') and isinstance(detector, YOLO26Detector): + use_tracking = not use_tracking + elif key == ord('o') and hybrid_mode: + detect_objects_flag, detect_logos_flag = True, False + elif key == ord('l') and hybrid_mode: + detect_objects_flag, detect_logos_flag = False, True + elif key == ord('b') and hybrid_mode: + detect_objects_flag, detect_logos_flag = True, True + except cv2.error: + use_gui = False + writer = cv2.VideoWriter( + str(output_path), cv2.VideoWriter_fourcc(*'mp4v'), + fps, (width, height) + ) + + if not use_gui and writer: + writer.write(result_frame) + + except KeyboardInterrupt: + print("Stopping...") + finally: + cap.release() + if writer: + writer.release() + cv2.destroyAllWindows() + +def detect_objects(image_path: str, + model_size: str = "nano", + conf_threshold: float = 0.25, + save_output: bool = True, + hybrid_mode: bool = False) -> List[Dict]: + if not ULTRALYTICS_AVAILABLE: + raise RuntimeError("Ultralytics not installed") + + if hybrid_mode: + detector = HybridLogoDetector( + coco_model_size=model_size, + conf_threshold=conf_threshold + ) + else: + detector = YOLO26Detector( + model_size=model_size, + conf_threshold=conf_threshold + ) + + image = cv2.imread(image_path) + if image is None: + raise ValueError(f"Could not load: {image_path}") + + detections = detector.detect(image) + + if save_output: + result = detector.draw_detections(image, detections) + output = Path(image_path).stem + "_detected.jpg" + cv2.imwrite(output, result) + print(f"Saved: {output}") + + return detections diff --git a/backend/src/ollama/__init__.py b/backend/src/ollama/__init__.py new file mode 100644 index 0000000..14c1c76 --- /dev/null +++ b/backend/src/ollama/__init__.py @@ -0,0 +1,5 @@ +from .detector import OllamaLogoDetector + +__all__ = [ + "OllamaLogoDetector", +] diff --git a/backend/src/ollama/__main__.py b/backend/src/ollama/__main__.py new file mode 100644 index 0000000..9ae637f --- /dev/null +++ b/backend/src/ollama/__main__.py @@ -0,0 +1,4 @@ +from .cli import main + +if __name__ == "__main__": + main() diff --git a/backend/src/ollama/cli.py b/backend/src/ollama/cli.py new file mode 100644 index 0000000..0fbe4eb --- /dev/null +++ b/backend/src/ollama/cli.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +import argparse +import json +import sys + +from .detector import OllamaLogoDetector +from .camera import capture_and_analyze, start_interactive_capture + +def main(): + parser = argparse.ArgumentParser( + description="Detect logos and companies using Ollama vision models" + ) + + parser.add_argument("--image", "-i", type=str) + parser.add_argument("--model", "-m", type=str, default="ministral-3:latest") + parser.add_argument("--output", "-o", type=str) + parser.add_argument("--host", type=str) + parser.add_argument("--single", "-s", action="store_true") + parser.add_argument("--no-save", action="store_true") + parser.add_argument("--output-dir", type=str, default="./captures") + + args = parser.parse_args() + + try: + if args.image: + print(f"Analyzing: {args.image}") + print(f"Model: {args.model}") + + detector = OllamaLogoDetector(model=args.model, host=args.host) + result = detector.detect_from_file(args.image) + + _print_results(result) + + if args.output: + with open(args.output, 'w') as f: + json.dump(result, f, indent=2) + print(f"Results saved to: {args.output}") + + elif args.single: + result = capture_and_analyze( + model=args.model, + save_image=not args.no_save, + output_dir=args.output_dir + ) + + if args.output and result: + with open(args.output, 'w') as f: + json.dump(result, f, indent=2) + print(f"Results saved to: {args.output}") + + else: + start_interactive_capture( + model=args.model, + save_images=not args.no_save, + output_dir=args.output_dir + ) + + except KeyboardInterrupt: + sys.exit(0) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + +def _print_results(result: dict): + print("\n" + "=" * 50) + print("DETECTION RESULTS") + print("=" * 50) + + logos = result.get("logos_detected", []) + count = result.get("total_count", len(logos)) + + if count == 0: + print("\nNo logos or companies detected") + if desc := result.get("description"): + print(f"\nImage description: {desc}") + else: + print(f"\nFound {count} logo(s)/company(s):\n") + + for i, logo in enumerate(logos, 1): + brand = logo.get("brand", "Unknown") + conf = logo.get("confidence", "unknown") + loc = logo.get("location", "unknown") + cat = logo.get("category", "N/A") + + print(f" {i}. {brand}") + print(f" Confidence: {conf}") + print(f" Location: {loc}") + print(f" Category: {cat}") + print() + + if "error" in result: + print(f"\nError occurred: {result['error']}") + + if "raw_response" in result and result.get("parse_error"): + print(f"\nParse error: {result['parse_error']}") + print(f"Raw response:\n{result['raw_response'][:500]}...") + + print("=" * 50) + print("\nRaw JSON:") + print(json.dumps(result, indent=2)) + +if __name__ == "__main__": + main() diff --git a/backend/src/ollama/detector.py b/backend/src/ollama/detector.py new file mode 100644 index 0000000..045b9f0 --- /dev/null +++ b/backend/src/ollama/detector.py @@ -0,0 +1,164 @@ +import base64 +import json +import re +from pathlib import Path +from typing import Dict, List, Optional, Union + +try: + import ollama + OLLAMA_AVAILABLE = True +except ImportError: + OLLAMA_AVAILABLE = False + print("Ollama not installed. Run: pip install ollama") + +DEFAULT_HOST = "https://ollama.sirblob.co" +DEFAULT_MODEL = "ministral-3:latest" + +DEFAULT_PROMPT = """Analyze this image and identify ALL logos, brand names, and company names visible. + +For each logo or brand you detect, provide: +1. The company/brand name +2. Confidence level (high, medium, low) +3. Location in image (top-left, center, bottom-right, etc.) +4. Product category if identifiable + +Return your response as a valid JSON object with this exact structure: +{ + "logos_detected": [ + { + "brand": "Company Name", + "confidence": "high", + "location": "center", + "category": "technology" + } + ], + "total_count": 1, + "description": "Brief description of what's in the image" +} + +If no logos are found, return: +{ + "logos_detected": [], + "total_count": 0, + "description": "Description of image with no visible logos" +} + +IMPORTANT: Return ONLY the JSON object, no other text.""" + +class OllamaLogoDetector: + def __init__(self, + model: str = DEFAULT_MODEL, + host: str = DEFAULT_HOST): + if not OLLAMA_AVAILABLE: + raise RuntimeError("Ollama not installed. Run: pip install ollama") + + self.model = model + self.host = host + self.client = ollama.Client(host=host) + + try: + models = self.client.list() + model_names = [m['name'] for m in models.get('models', [])] + + model_base = model.split(':')[0] + if not any(model_base in name for name in model_names): + print(f"Model '{model}' not found. Available models: {model_names}") + print(f"Pulling {model}...") + self.client.pull(model) + print(f"Model {model} ready!") + else: + print(f"Using Ollama model: {model}") + except Exception as e: + print(f"Could not verify model: {e}") + print("Make sure Ollama is running: ollama serve") + + def detect_from_file(self, + image_path: str, + prompt: Optional[str] = None) -> Dict: + path = Path(image_path) + if not path.exists(): + raise FileNotFoundError(f"Image not found: {image_path}") + + with open(path, 'rb') as f: + image_data = base64.b64encode(f.read()).decode('utf-8') + + return self._analyze_image(image_data, prompt) + + def detect_from_bytes(self, + image_bytes: bytes, + prompt: Optional[str] = None) -> Dict: + image_data = base64.b64encode(image_bytes).decode('utf-8') + return self._analyze_image(image_data, prompt) + + def detect_from_numpy(self, + image_array, + prompt: Optional[str] = None) -> Dict: + import cv2 + + success, buffer = cv2.imencode('.jpg', image_array) + if not success: + raise ValueError("Failed to encode image") + + return self.detect_from_bytes(buffer.tobytes(), prompt) + + def _analyze_image(self, + image_base64: str, + prompt: Optional[str] = None) -> Dict: + if prompt is None: + prompt = DEFAULT_PROMPT + + try: + response = self.client.chat( + model=self.model, + messages=[{ + 'role': 'user', + 'content': prompt, + 'images': [image_base64] + }], + options={ + 'temperature': 0.1, + } + ) + + content = response['message']['content'] + return self._parse_response(content) + + except Exception as e: + return { + "logos_detected": [], + "total_count": 0, + "error": str(e), + "raw_response": None + } + + def _parse_response(self, content: str) -> Dict: + try: + return json.loads(content) + except json.JSONDecodeError: + pass + + json_patterns = [ + r'```json\s*([\s\S]*?)\s*```', + r'```\s*([\s\S]*?)\s*```', + r'\{[\s\S]*\}' + ] + + for pattern in json_patterns: + match = re.search(pattern, content) + if match: + try: + json_str = match.group(1) if '```' in pattern else match.group(0) + return json.loads(json_str) + except json.JSONDecodeError: + continue + + return { + "logos_detected": [], + "total_count": 0, + "raw_response": content, + "parse_error": "Could not extract valid JSON from response" + } + + def get_brands_list(self, result: Dict) -> List[str]: + logos = result.get("logos_detected", []) + return [logo.get("brand", "Unknown") for logo in logos]