mirror of
https://github.com/SirBlobby/Hoya26.git
synced 2026-02-03 19:24:34 -05:00
Ollama CV Logo Detection
This commit is contained in:
@@ -1,474 +1,47 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
from .config import (
|
||||
CV_DIR,
|
||||
DATA_DIR,
|
||||
MODELS_DIR,
|
||||
ULTRALYTICS_AVAILABLE,
|
||||
YOLO26_MODELS,
|
||||
SUPER_CATEGORIES,
|
||||
COMMON_BRANDS,
|
||||
COLORS,
|
||||
DEFAULT_CONF_THRESHOLD,
|
||||
DEFAULT_IOU_THRESHOLD,
|
||||
DEFAULT_IMG_SIZE,
|
||||
)
|
||||
from .detectors import (
|
||||
YOLO26Detector,
|
||||
HybridLogoDetector,
|
||||
)
|
||||
from .yolo_scanner import (
|
||||
start_scanner as start_yolo_scanner,
|
||||
detect_objects as detect_yolo_objects,
|
||||
)
|
||||
from .scanner import (
|
||||
start_interactive_capture as start_ollama_scanner,
|
||||
capture_and_analyze as capture_ollama_once,
|
||||
)
|
||||
|
||||
CV_DIR = Path(__file__).parent
|
||||
DATA_DIR = CV_DIR / "data"
|
||||
MODELS_DIR = CV_DIR / "models"
|
||||
|
||||
SUPER_CATEGORIES = {
|
||||
"Food": 932,
|
||||
"Clothes": 604,
|
||||
"Necessities": 432,
|
||||
"Others": 371,
|
||||
"Electronic": 224,
|
||||
"Transportation": 213,
|
||||
"Leisure": 111,
|
||||
"Sports": 66,
|
||||
"Medical": 47
|
||||
}
|
||||
|
||||
COMMON_BRANDS = [
|
||||
"McDonalds", "Starbucks", "CocaCola", "Pepsi", "KFC", "BurgerKing",
|
||||
"Subway", "DunkinDonuts", "PizzaHut", "Dominos", "Nestle", "Heineken",
|
||||
"Nike", "Adidas", "Puma", "UnderArmour", "Levis", "HM", "Zara", "Gap",
|
||||
"Gucci", "LouisVuitton", "Chanel", "Versace", "Prada", "Armani",
|
||||
"Apple", "Samsung", "HP", "Dell", "Intel", "AMD", "Nvidia", "Microsoft",
|
||||
"Sony", "LG", "Huawei", "Xiaomi", "Lenovo", "Asus", "Acer",
|
||||
"BMW", "Mercedes", "Audi", "Toyota", "Honda", "Ford", "Chevrolet",
|
||||
"Volkswagen", "Tesla", "Porsche", "Ferrari", "Lamborghini", "Nissan",
|
||||
"Google", "Facebook", "Twitter", "Instagram", "YouTube", "Amazon",
|
||||
"Netflix", "Spotify", "Uber", "Airbnb", "PayPal", "Visa", "Mastercard"
|
||||
__all__ = [
|
||||
"CV_DIR",
|
||||
"DATA_DIR",
|
||||
"MODELS_DIR",
|
||||
"ULTRALYTICS_AVAILABLE",
|
||||
"YOLO26_MODELS",
|
||||
"SUPER_CATEGORIES",
|
||||
"COMMON_BRANDS",
|
||||
"COLORS",
|
||||
"DEFAULT_CONF_THRESHOLD",
|
||||
"DEFAULT_IOU_THRESHOLD",
|
||||
"DEFAULT_IMG_SIZE",
|
||||
"YOLO26Detector",
|
||||
"HybridLogoDetector",
|
||||
"start_yolo_scanner",
|
||||
"detect_yolo_objects",
|
||||
"start_ollama_scanner",
|
||||
"capture_ollama_once",
|
||||
]
|
||||
|
||||
class LogoDet3KDataset:
|
||||
def __init__(self, dataset_path: Optional[str] = None):
|
||||
self.dataset_path = None
|
||||
self.categories = {}
|
||||
self.brand_templates = {}
|
||||
|
||||
if dataset_path and os.path.exists(dataset_path):
|
||||
self.dataset_path = Path(dataset_path)
|
||||
else:
|
||||
default_paths = [
|
||||
DATA_DIR / "LogoDet-3K",
|
||||
DATA_DIR / "logodet3k",
|
||||
Path.home() / "Downloads" / "LogoDet-3K",
|
||||
Path.home() / ".kaggle" / "datasets" / "lyly99" / "logodet3k",
|
||||
]
|
||||
for path in default_paths:
|
||||
if path.exists():
|
||||
self.dataset_path = path
|
||||
break
|
||||
|
||||
if self.dataset_path:
|
||||
self._load_categories()
|
||||
print(f"LogoDet-3K dataset loaded from: {self.dataset_path}")
|
||||
print(f"Found {len(self.categories)} brand categories")
|
||||
else:
|
||||
print("LogoDet-3K dataset not found locally.")
|
||||
print("\nTo download the dataset:")
|
||||
print("1. Install kaggle CLI: pip install kaggle")
|
||||
print("2. Download: kaggle datasets download -d lyly99/logodet3k")
|
||||
print("3. Extract to:", DATA_DIR / "LogoDet-3K")
|
||||
|
||||
def _load_categories(self):
|
||||
if not self.dataset_path:
|
||||
return
|
||||
|
||||
for super_cat in self.dataset_path.iterdir():
|
||||
if super_cat.is_dir() and not super_cat.name.startswith('.'):
|
||||
for brand_dir in super_cat.iterdir():
|
||||
if brand_dir.is_dir():
|
||||
brand_name = brand_dir.name
|
||||
self.categories[brand_name] = {
|
||||
"super_category": super_cat.name,
|
||||
"path": brand_dir,
|
||||
"images": list(brand_dir.glob("*.jpg")) + list(brand_dir.glob("*.png"))
|
||||
}
|
||||
|
||||
def get_brand_templates(self, brand_name: str, max_templates: int = 5) -> List[np.ndarray]:
|
||||
if brand_name not in self.categories:
|
||||
return []
|
||||
|
||||
templates = []
|
||||
images = self.categories[brand_name]["images"][:max_templates]
|
||||
|
||||
for img_path in images:
|
||||
img = cv2.imread(str(img_path))
|
||||
if img is not None:
|
||||
templates.append(img)
|
||||
|
||||
return templates
|
||||
|
||||
def get_all_brands(self) -> List[str]:
|
||||
return list(self.categories.keys())
|
||||
|
||||
def get_brands_by_category(self, super_category: str) -> List[str]:
|
||||
return [
|
||||
name for name, info in self.categories.items()
|
||||
if info["super_category"].lower() == super_category.lower()
|
||||
]
|
||||
|
||||
class LogoDetector:
|
||||
def __init__(self,
|
||||
model_path: Optional[str] = None,
|
||||
dataset_path: Optional[str] = None,
|
||||
use_gpu: bool = True):
|
||||
self.model_path = model_path
|
||||
self.use_gpu = use_gpu
|
||||
self.net = None
|
||||
self.dataset = LogoDet3KDataset(dataset_path)
|
||||
|
||||
self.conf_threshold = 0.3
|
||||
self.nms_threshold = 0.4
|
||||
|
||||
self.orb = cv2.ORB_create(nfeatures=1000)
|
||||
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
|
||||
|
||||
try:
|
||||
self.sift = cv2.SIFT_create()
|
||||
self.flann_matcher = cv2.FlannBasedMatcher(
|
||||
{"algorithm": 1, "trees": 5},
|
||||
{"checks": 50}
|
||||
)
|
||||
except:
|
||||
self.sift = None
|
||||
self.flann_matcher = None
|
||||
|
||||
self.brand_features = {}
|
||||
self._load_model()
|
||||
self._cache_brand_features()
|
||||
|
||||
def _load_model(self):
|
||||
if not self.model_path or not os.path.exists(self.model_path):
|
||||
return
|
||||
|
||||
try:
|
||||
print(f"Loading model: {self.model_path}")
|
||||
|
||||
if self.model_path.endswith('.onnx'):
|
||||
self.net = cv2.dnn.readNetFromONNX(self.model_path)
|
||||
else:
|
||||
self.net = cv2.dnn.readNet(self.model_path)
|
||||
|
||||
if self.use_gpu:
|
||||
try:
|
||||
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_DEFAULT)
|
||||
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_OPENCL)
|
||||
print("✅ Using OpenCL GPU acceleration")
|
||||
except:
|
||||
print("⚠️ GPU not available, using CPU")
|
||||
|
||||
print("Model loaded successfully!")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Failed to load model: {e}")
|
||||
self.net = None
|
||||
|
||||
def _cache_brand_features(self):
|
||||
if not self.dataset.categories:
|
||||
return
|
||||
|
||||
print("Caching brand features (this may take a moment)...")
|
||||
|
||||
brands_to_cache = [b for b in COMMON_BRANDS if b in self.dataset.categories][:50]
|
||||
|
||||
for brand in brands_to_cache:
|
||||
templates = self.dataset.get_brand_templates(brand, max_templates=3)
|
||||
if templates:
|
||||
features = []
|
||||
for tmpl in templates:
|
||||
gray = cv2.cvtColor(tmpl, cv2.COLOR_BGR2GRAY)
|
||||
kp, des = self.orb.detectAndCompute(gray, None)
|
||||
if des is not None:
|
||||
features.append((kp, des))
|
||||
|
||||
if features:
|
||||
self.brand_features[brand] = features
|
||||
|
||||
print(f"Cached features for {len(self.brand_features)} brands")
|
||||
|
||||
def detect(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
|
||||
if conf_threshold is None:
|
||||
conf_threshold = self.conf_threshold
|
||||
|
||||
detections = []
|
||||
|
||||
if self.net is not None:
|
||||
detections = self._detect_with_model(frame, conf_threshold)
|
||||
|
||||
if not detections and self.brand_features:
|
||||
detections = self._detect_with_features(frame, conf_threshold)
|
||||
|
||||
if not detections:
|
||||
detections = self._detect_logo_regions(frame)
|
||||
|
||||
return detections
|
||||
|
||||
def _detect_with_model(self, frame: np.ndarray, conf_threshold: float) -> List[Dict]:
|
||||
height, width = frame.shape[:2]
|
||||
|
||||
blob = cv2.dnn.blobFromImage(
|
||||
frame,
|
||||
scalefactor=1/255.0,
|
||||
size=(640, 640),
|
||||
swapRB=True,
|
||||
crop=False
|
||||
)
|
||||
|
||||
self.net.setInput(blob)
|
||||
|
||||
try:
|
||||
output_names = self.net.getUnconnectedOutLayersNames()
|
||||
outputs = self.net.forward(output_names)
|
||||
except:
|
||||
outputs = [self.net.forward()]
|
||||
|
||||
detections = []
|
||||
boxes = []
|
||||
confidences = []
|
||||
class_ids = []
|
||||
|
||||
for output in outputs:
|
||||
if len(output.shape) == 3:
|
||||
output = output[0]
|
||||
|
||||
for detection in output:
|
||||
if len(detection) < 5:
|
||||
continue
|
||||
|
||||
scores = detection[4:] if len(detection) > 5 else [detection[4]]
|
||||
class_id = np.argmax(scores) if len(scores) > 1 else 0
|
||||
confidence = float(scores[class_id]) if len(scores) > 1 else float(scores[0])
|
||||
|
||||
if confidence > conf_threshold:
|
||||
cx, cy, w, h = detection[:4]
|
||||
scale_x = width / 640
|
||||
scale_y = height / 640
|
||||
|
||||
x1 = int((cx - w/2) * scale_x)
|
||||
y1 = int((cy - h/2) * scale_y)
|
||||
x2 = int((cx + w/2) * scale_x)
|
||||
y2 = int((cy + h/2) * scale_y)
|
||||
|
||||
boxes.append([x1, y1, x2-x1, y2-y1])
|
||||
confidences.append(confidence)
|
||||
class_ids.append(class_id)
|
||||
|
||||
if boxes:
|
||||
indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, self.nms_threshold)
|
||||
for i in indices:
|
||||
idx = i[0] if isinstance(i, (list, tuple, np.ndarray)) else i
|
||||
x, y, w, h = boxes[idx]
|
||||
detections.append({
|
||||
"bbox": (x, y, x + w, y + h),
|
||||
"label": f"Logo-{class_ids[idx]}",
|
||||
"confidence": confidences[idx],
|
||||
"class_id": class_ids[idx]
|
||||
})
|
||||
|
||||
return detections
|
||||
|
||||
def _detect_with_features(self, frame: np.ndarray, conf_threshold: float) -> List[Dict]:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
kp_frame, des_frame = self.orb.detectAndCompute(gray, None)
|
||||
|
||||
if des_frame is None or len(kp_frame) < 10:
|
||||
return []
|
||||
|
||||
detections = []
|
||||
best_matches = []
|
||||
|
||||
for brand, feature_list in self.brand_features.items():
|
||||
for kp_tmpl, des_tmpl in feature_list:
|
||||
try:
|
||||
matches = self.bf_matcher.match(des_tmpl, des_frame)
|
||||
matches = sorted(matches, key=lambda x: x.distance)
|
||||
good_matches = [m for m in matches[:50] if m.distance < 60]
|
||||
|
||||
if len(good_matches) >= 8:
|
||||
pts = np.float32([kp_frame[m.trainIdx].pt for m in good_matches])
|
||||
if len(pts) > 0:
|
||||
x_min, y_min = pts.min(axis=0).astype(int)
|
||||
x_max, y_max = pts.max(axis=0).astype(int)
|
||||
avg_dist = np.mean([m.distance for m in good_matches])
|
||||
confidence = max(0.3, 1.0 - (avg_dist / 100))
|
||||
|
||||
if confidence >= conf_threshold:
|
||||
best_matches.append({
|
||||
"bbox": (x_min, y_min, x_max, y_max),
|
||||
"label": brand,
|
||||
"confidence": confidence,
|
||||
"match_count": len(good_matches)
|
||||
})
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if best_matches:
|
||||
best_matches.sort(key=lambda x: x["confidence"], reverse=True)
|
||||
detections = best_matches[:5]
|
||||
|
||||
return detections
|
||||
|
||||
def _detect_logo_regions(self, frame: np.ndarray) -> List[Dict]:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
||||
edges = cv2.Canny(blurred, 80, 200)
|
||||
|
||||
kernel = np.ones((3, 3), np.uint8)
|
||||
edges = cv2.dilate(edges, kernel, iterations=1)
|
||||
edges = cv2.erode(edges, kernel, iterations=1)
|
||||
|
||||
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
detections = []
|
||||
height, width = frame.shape[:2]
|
||||
min_area = (width * height) * 0.01
|
||||
max_area = (width * height) * 0.15
|
||||
|
||||
for contour in contours:
|
||||
area = cv2.contourArea(contour)
|
||||
if area < min_area or area > max_area:
|
||||
continue
|
||||
|
||||
x, y, w, h = cv2.boundingRect(contour)
|
||||
aspect_ratio = w / h if h > 0 else 0
|
||||
|
||||
if aspect_ratio < 0.5 or aspect_ratio > 2.0:
|
||||
continue
|
||||
|
||||
hull = cv2.convexHull(contour)
|
||||
hull_area = cv2.contourArea(hull)
|
||||
solidity = area / hull_area if hull_area > 0 else 0
|
||||
|
||||
if solidity < 0.3:
|
||||
continue
|
||||
|
||||
roi = gray[y:y+h, x:x+w]
|
||||
if roi.size == 0:
|
||||
continue
|
||||
|
||||
corners = cv2.goodFeaturesToTrack(roi, 50, 0.01, 5)
|
||||
if corners is None or len(corners) < 15:
|
||||
continue
|
||||
|
||||
roi_edges = edges[y:y+h, x:x+w]
|
||||
edge_density = np.sum(roi_edges > 0) / (w * h) if (w * h) > 0 else 0
|
||||
|
||||
if edge_density < 0.05 or edge_density > 0.5:
|
||||
continue
|
||||
|
||||
corner_score = min(1.0, len(corners) / 40)
|
||||
solidity_score = solidity
|
||||
aspect_score = 1.0 - abs(1.0 - aspect_ratio) / 2
|
||||
|
||||
confidence = (corner_score * 0.4 + solidity_score * 0.3 + aspect_score * 0.3)
|
||||
|
||||
if confidence >= 0.6:
|
||||
detections.append({
|
||||
"bbox": (x, y, x + w, y + h),
|
||||
"label": "Potential Logo",
|
||||
"confidence": confidence,
|
||||
"class_id": -1
|
||||
})
|
||||
|
||||
detections.sort(key=lambda x: x["confidence"], reverse=True)
|
||||
return detections[:3]
|
||||
|
||||
def draw_detections(self, frame: np.ndarray, detections: List[Dict]) -> np.ndarray:
|
||||
result = frame.copy()
|
||||
|
||||
for det in detections:
|
||||
x1, y1, x2, y2 = det["bbox"]
|
||||
label = det["label"]
|
||||
conf = det["confidence"]
|
||||
|
||||
if conf > 0.7:
|
||||
color = (0, 255, 0)
|
||||
elif conf > 0.5:
|
||||
color = (0, 255, 255)
|
||||
else:
|
||||
color = (0, 165, 255)
|
||||
|
||||
cv2.rectangle(result, (x1, y1), (x2, y2), color, 2)
|
||||
label_text = f"{label}: {conf:.2f}"
|
||||
(text_w, text_h), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
|
||||
cv2.rectangle(result, (x1, y1 - text_h - 6), (x1 + text_w + 4, y1), color, -1)
|
||||
cv2.putText(result, label_text, (x1 + 2, y1 - 4),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
|
||||
|
||||
return result
|
||||
|
||||
def start_scanner(model_path: Optional[str] = None,
|
||||
dataset_path: Optional[str] = None,
|
||||
use_gui: bool = True):
|
||||
print("=" * 60)
|
||||
print("LogoDet-3K Logo Scanner")
|
||||
print("3,000 logo categories | 9 super-categories | 200K+ objects")
|
||||
print("=" * 60)
|
||||
|
||||
detector = LogoDetector(
|
||||
model_path=model_path,
|
||||
dataset_path=dataset_path,
|
||||
use_gpu=True
|
||||
)
|
||||
|
||||
cap = cv2.VideoCapture(0)
|
||||
if not cap.isOpened():
|
||||
print("\nError: Could not access camera.")
|
||||
return
|
||||
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
|
||||
|
||||
writer = None
|
||||
output_path = CV_DIR / "output.mp4"
|
||||
|
||||
print(f"\n📷 Camera: {width}x{height} @ {fps:.1f}fps")
|
||||
print("Press 'q' to quit\n")
|
||||
|
||||
frame_count = 0
|
||||
try:
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
frame_count += 1
|
||||
detections = detector.detect(frame)
|
||||
result_frame = detector.draw_detections(frame, detections)
|
||||
|
||||
info_text = f"Logos: {len(detections)} | Frame: {frame_count}"
|
||||
cv2.putText(result_frame, info_text, (10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
||||
|
||||
if use_gui:
|
||||
try:
|
||||
cv2.imshow('LogoDet-3K Scanner', result_frame)
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
if key == ord('q'):
|
||||
break
|
||||
elif key == ord('s'):
|
||||
cv2.imwrite(str(CV_DIR / f"screenshot_{frame_count}.jpg"), result_frame)
|
||||
except cv2.error:
|
||||
use_gui = False
|
||||
writer = cv2.VideoWriter(
|
||||
str(output_path),
|
||||
cv2.VideoWriter_fourcc(*'mp4v'),
|
||||
fps,
|
||||
(width, height)
|
||||
)
|
||||
|
||||
if not use_gui and writer:
|
||||
writer.write(result_frame)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
cap.release()
|
||||
if writer:
|
||||
writer.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--model", "-m", type=str)
|
||||
parser.add_argument("--dataset", "-d", type=str)
|
||||
parser.add_argument("--no-gui", action="store_true")
|
||||
args = parser.parse_args()
|
||||
start_scanner(model_path=args.model, dataset_path=args.dataset, use_gui=not args.no_gui)
|
||||
__version__ = "2.0.0"
|
||||
|
||||
4
backend/src/cv/__main__.py
Normal file
4
backend/src/cv/__main__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
47
backend/src/cv/cli.py
Normal file
47
backend/src/cv/cli.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
from .config import YOLO26_MODELS
|
||||
from .yolo_scanner import start_scanner as start_yolo, detect_objects
|
||||
from .scanner import start_interactive_capture as start_ollama
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Ollama and YOLO Logo Detection Scanner"
|
||||
)
|
||||
|
||||
parser.add_argument("--model", "-m", type=str)
|
||||
parser.add_argument("--size", "-s", type=str, default="nano",
|
||||
choices=["nano", "small", "medium", "large", "xlarge"])
|
||||
parser.add_argument("--logo-model", type=str)
|
||||
parser.add_argument("--yolo", action="store_true")
|
||||
parser.add_argument("--no-gui", action="store_true")
|
||||
parser.add_argument("--track", "-t", action="store_true")
|
||||
parser.add_argument("--hybrid", action="store_true")
|
||||
parser.add_argument("--image", "-i", type=str)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.image:
|
||||
detections = detect_objects(
|
||||
args.image, model_size=args.size, hybrid_mode=args.hybrid
|
||||
)
|
||||
print(f"Found {len(detections)} detections:")
|
||||
for det in detections:
|
||||
print(f" {det['label']}: {det['confidence']:.2%}")
|
||||
|
||||
elif args.yolo:
|
||||
start_yolo(
|
||||
model_path=args.model,
|
||||
model_size=args.size,
|
||||
logo_model_path=args.logo_model,
|
||||
use_gui=not args.no_gui,
|
||||
use_tracking=args.track,
|
||||
hybrid_mode=args.hybrid
|
||||
)
|
||||
|
||||
else:
|
||||
start_ollama()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
61
backend/src/cv/config.py
Normal file
61
backend/src/cv/config.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
CV_DIR = Path(__file__).parent
|
||||
DATA_DIR = CV_DIR / "data"
|
||||
MODELS_DIR = CV_DIR / "models"
|
||||
|
||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
MODELS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
from ultralytics import YOLO
|
||||
ULTRALYTICS_AVAILABLE = True
|
||||
except ImportError:
|
||||
ULTRALYTICS_AVAILABLE = False
|
||||
YOLO = None
|
||||
|
||||
YOLO26_MODELS: Dict[str, str] = {
|
||||
"nano": "yolo26n.pt",
|
||||
"small": "yolo26s.pt",
|
||||
"medium": "yolo26m.pt",
|
||||
"large": "yolo26l.pt",
|
||||
"xlarge": "yolo26x.pt",
|
||||
}
|
||||
|
||||
SUPER_CATEGORIES: Dict[str, int] = {
|
||||
"Food": 932,
|
||||
"Clothes": 604,
|
||||
"Necessities": 432,
|
||||
"Others": 371,
|
||||
"Electronic": 224,
|
||||
"Transportation": 213,
|
||||
"Leisure": 111,
|
||||
"Sports": 66,
|
||||
"Medical": 47
|
||||
}
|
||||
|
||||
COMMON_BRANDS = [
|
||||
"McDonalds", "Starbucks", "CocaCola", "Pepsi", "KFC", "BurgerKing",
|
||||
"Subway", "DunkinDonuts", "PizzaHut", "Dominos", "Nestle", "Heineken",
|
||||
"Nike", "Adidas", "Puma", "UnderArmour", "Levis", "HM", "Zara", "Gap",
|
||||
"Gucci", "LouisVuitton", "Chanel", "Versace", "Prada", "Armani",
|
||||
"Apple", "Samsung", "HP", "Dell", "Intel", "AMD", "Nvidia", "Microsoft",
|
||||
"Sony", "LG", "Huawei", "Xiaomi", "Lenovo", "Asus", "Acer",
|
||||
"BMW", "Mercedes", "Audi", "Toyota", "Honda", "Ford", "Chevrolet",
|
||||
"Volkswagen", "Tesla", "Porsche", "Ferrari", "Lamborghini", "Nissan",
|
||||
"Google", "Facebook", "Twitter", "Instagram", "YouTube", "Amazon",
|
||||
"Netflix", "Spotify", "Uber", "Airbnb", "PayPal", "Visa", "Mastercard"
|
||||
]
|
||||
|
||||
COLORS = {
|
||||
"high_conf": (0, 255, 0),
|
||||
"medium_conf": (0, 255, 255),
|
||||
"low_conf": (0, 165, 255),
|
||||
"logo": (255, 0, 255),
|
||||
}
|
||||
|
||||
DEFAULT_CONF_THRESHOLD = 0.25
|
||||
DEFAULT_IOU_THRESHOLD = 0.45
|
||||
DEFAULT_IMG_SIZE = 640
|
||||
7
backend/src/cv/detectors/__init__.py
Normal file
7
backend/src/cv/detectors/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .yolo26 import YOLO26Detector
|
||||
from .hybrid import HybridLogoDetector
|
||||
|
||||
__all__ = [
|
||||
"YOLO26Detector",
|
||||
"HybridLogoDetector",
|
||||
]
|
||||
154
backend/src/cv/detectors/hybrid.py
Normal file
154
backend/src/cv/detectors/hybrid.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import os
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
from ..config import (
|
||||
ULTRALYTICS_AVAILABLE,
|
||||
MODELS_DIR,
|
||||
COLORS,
|
||||
DEFAULT_CONF_THRESHOLD,
|
||||
)
|
||||
from .yolo26 import YOLO26Detector
|
||||
|
||||
if ULTRALYTICS_AVAILABLE:
|
||||
from ultralytics import YOLO
|
||||
|
||||
class HybridLogoDetector:
|
||||
def __init__(self,
|
||||
coco_model_size: str = "nano",
|
||||
logo_model_path: Optional[str] = None,
|
||||
conf_threshold: float = DEFAULT_CONF_THRESHOLD,
|
||||
device: str = "auto"):
|
||||
self.conf_threshold = conf_threshold
|
||||
self.device = device
|
||||
self.coco_detector = None
|
||||
self.logo_model = None
|
||||
|
||||
if not ULTRALYTICS_AVAILABLE:
|
||||
raise RuntimeError("Ultralytics not installed. Run: pip install ultralytics")
|
||||
|
||||
print("Loading YOLO26 COCO base model...")
|
||||
self.coco_detector = YOLO26Detector(
|
||||
model_size=coco_model_size,
|
||||
conf_threshold=conf_threshold,
|
||||
device=device
|
||||
)
|
||||
|
||||
if logo_model_path and os.path.exists(logo_model_path):
|
||||
print(f"Loading logo model: {logo_model_path}")
|
||||
self.logo_model = YOLO(logo_model_path)
|
||||
print("Logo model loaded!")
|
||||
else:
|
||||
default_logo_model = MODELS_DIR / "logo_detector.pt"
|
||||
if default_logo_model.exists():
|
||||
print(f"Loading default logo model: {default_logo_model}")
|
||||
self.logo_model = YOLO(str(default_logo_model))
|
||||
print("Logo model loaded!")
|
||||
else:
|
||||
print("No logo model found.")
|
||||
|
||||
print("Hybrid detector ready!")
|
||||
|
||||
def detect(self,
|
||||
frame: np.ndarray,
|
||||
detect_objects: bool = True,
|
||||
detect_logos: bool = True,
|
||||
conf_threshold: Optional[float] = None) -> List[Dict]:
|
||||
conf = conf_threshold if conf_threshold is not None else self.conf_threshold
|
||||
all_detections = []
|
||||
|
||||
if detect_objects and self.coco_detector:
|
||||
object_detections = self.coco_detector.detect(frame, conf_threshold=conf)
|
||||
for det in object_detections:
|
||||
det["type"] = "object"
|
||||
all_detections.extend(object_detections)
|
||||
|
||||
if detect_logos and self.logo_model:
|
||||
logo_detections = self._detect_logos(frame, conf)
|
||||
for det in logo_detections:
|
||||
det["type"] = "logo"
|
||||
all_detections.extend(logo_detections)
|
||||
|
||||
return all_detections
|
||||
|
||||
def _detect_logos(self, frame: np.ndarray, conf_threshold: float) -> List[Dict]:
|
||||
if self.logo_model is None:
|
||||
return []
|
||||
|
||||
results = self.logo_model(
|
||||
frame,
|
||||
conf=conf_threshold,
|
||||
device=self.device if self.device != "auto" else None,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
detections = []
|
||||
for result in results:
|
||||
boxes = result.boxes
|
||||
if boxes is None:
|
||||
continue
|
||||
|
||||
for i in range(len(boxes)):
|
||||
xyxy = boxes.xyxy[i].cpu().numpy()
|
||||
x1, y1, x2, y2 = map(int, xyxy)
|
||||
conf_val = float(boxes.conf[i].cpu().numpy())
|
||||
class_id = int(boxes.cls[i].cpu().numpy())
|
||||
label = self.logo_model.names[class_id]
|
||||
|
||||
detections.append({
|
||||
"bbox": (x1, y1, x2, y2),
|
||||
"label": label,
|
||||
"confidence": conf_val,
|
||||
"class_id": class_id,
|
||||
"brand": label
|
||||
})
|
||||
|
||||
return detections
|
||||
|
||||
def draw_detections(self,
|
||||
frame: np.ndarray,
|
||||
detections: List[Dict],
|
||||
show_labels: bool = True) -> np.ndarray:
|
||||
result = frame.copy()
|
||||
|
||||
for det in detections:
|
||||
x1, y1, x2, y2 = det["bbox"]
|
||||
label = det["label"]
|
||||
conf = det["confidence"]
|
||||
det_type = det.get("type", "object")
|
||||
|
||||
if det_type == "logo":
|
||||
color = COLORS["logo"]
|
||||
elif conf > 0.7:
|
||||
color = COLORS["high_conf"]
|
||||
elif conf > 0.5:
|
||||
color = COLORS["medium_conf"]
|
||||
else:
|
||||
color = COLORS["low_conf"]
|
||||
|
||||
cv2.rectangle(result, (x1, y1), (x2, y2), color, 2)
|
||||
|
||||
if show_labels:
|
||||
label_text = f"{label}: {conf:.2f}"
|
||||
(text_w, text_h), _ = cv2.getTextSize(
|
||||
label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1
|
||||
)
|
||||
cv2.rectangle(
|
||||
result,
|
||||
(x1, y1 - text_h - 8),
|
||||
(x1 + text_w + 4, y1),
|
||||
color,
|
||||
-1
|
||||
)
|
||||
cv2.putText(
|
||||
result,
|
||||
label_text,
|
||||
(x1 + 2, y1 - 4),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.5,
|
||||
(255, 255, 255) if det_type == "logo" else (0, 0, 0),
|
||||
1
|
||||
)
|
||||
|
||||
return result
|
||||
186
backend/src/cv/detectors/yolo26.py
Normal file
186
backend/src/cv/detectors/yolo26.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import os
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
from ..config import (
|
||||
ULTRALYTICS_AVAILABLE,
|
||||
YOLO26_MODELS,
|
||||
COLORS,
|
||||
DEFAULT_CONF_THRESHOLD,
|
||||
DEFAULT_IOU_THRESHOLD,
|
||||
)
|
||||
|
||||
if ULTRALYTICS_AVAILABLE:
|
||||
from ultralytics import YOLO
|
||||
|
||||
class YOLO26Detector:
|
||||
def __init__(self,
|
||||
model_size: str = "nano",
|
||||
model_path: Optional[str] = None,
|
||||
conf_threshold: float = DEFAULT_CONF_THRESHOLD,
|
||||
iou_threshold: float = DEFAULT_IOU_THRESHOLD,
|
||||
device: str = "auto"):
|
||||
self.conf_threshold = conf_threshold
|
||||
self.iou_threshold = iou_threshold
|
||||
self.device = device
|
||||
self.model = None
|
||||
|
||||
if not ULTRALYTICS_AVAILABLE:
|
||||
raise RuntimeError("Ultralytics not installed. Run: pip install ultralytics")
|
||||
|
||||
if model_path and os.path.exists(model_path):
|
||||
model_name = model_path
|
||||
elif model_size in YOLO26_MODELS:
|
||||
model_name = YOLO26_MODELS[model_size]
|
||||
else:
|
||||
print(f"Unknown model size '{model_size}', defaulting to 'nano'")
|
||||
model_name = YOLO26_MODELS["nano"]
|
||||
|
||||
print(f"Loading YOLO26 model: {model_name}")
|
||||
self.model = YOLO(model_name)
|
||||
print(f"YOLO26 model loaded successfully!")
|
||||
print(f"Classes: {len(self.model.names)} | Device: {device}")
|
||||
|
||||
def detect(self,
|
||||
frame: np.ndarray,
|
||||
conf_threshold: Optional[float] = None,
|
||||
classes: Optional[List[int]] = None) -> List[Dict]:
|
||||
if self.model is None:
|
||||
return []
|
||||
|
||||
conf = conf_threshold if conf_threshold is not None else self.conf_threshold
|
||||
|
||||
results = self.model(
|
||||
frame,
|
||||
conf=conf,
|
||||
iou=self.iou_threshold,
|
||||
device=self.device if self.device != "auto" else None,
|
||||
classes=classes,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
detections = []
|
||||
for result in results:
|
||||
boxes = result.boxes
|
||||
if boxes is None:
|
||||
continue
|
||||
|
||||
for i in range(len(boxes)):
|
||||
xyxy = boxes.xyxy[i].cpu().numpy()
|
||||
x1, y1, x2, y2 = map(int, xyxy)
|
||||
|
||||
conf_val = float(boxes.conf[i].cpu().numpy())
|
||||
class_id = int(boxes.cls[i].cpu().numpy())
|
||||
label = self.model.names[class_id]
|
||||
|
||||
detections.append({
|
||||
"bbox": (x1, y1, x2, y2),
|
||||
"label": label,
|
||||
"confidence": conf_val,
|
||||
"class_id": class_id
|
||||
})
|
||||
|
||||
return detections
|
||||
|
||||
def detect_and_track(self,
|
||||
frame: np.ndarray,
|
||||
conf_threshold: Optional[float] = None,
|
||||
tracker: str = "bytetrack.yaml") -> List[Dict]:
|
||||
if self.model is None:
|
||||
return []
|
||||
|
||||
conf = conf_threshold if conf_threshold is not None else self.conf_threshold
|
||||
|
||||
results = self.model.track(
|
||||
frame,
|
||||
conf=conf,
|
||||
iou=self.iou_threshold,
|
||||
device=self.device if self.device != "auto" else None,
|
||||
tracker=tracker,
|
||||
persist=True,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
detections = []
|
||||
for result in results:
|
||||
boxes = result.boxes
|
||||
if boxes is None:
|
||||
continue
|
||||
|
||||
for i in range(len(boxes)):
|
||||
xyxy = boxes.xyxy[i].cpu().numpy()
|
||||
x1, y1, x2, y2 = map(int, xyxy)
|
||||
|
||||
conf_val = float(boxes.conf[i].cpu().numpy())
|
||||
class_id = int(boxes.cls[i].cpu().numpy())
|
||||
label = self.model.names[class_id]
|
||||
|
||||
track_id = None
|
||||
if boxes.id is not None:
|
||||
track_id = int(boxes.id[i].cpu().numpy())
|
||||
|
||||
detections.append({
|
||||
"bbox": (x1, y1, x2, y2),
|
||||
"label": label,
|
||||
"confidence": conf_val,
|
||||
"class_id": class_id,
|
||||
"track_id": track_id
|
||||
})
|
||||
|
||||
return detections
|
||||
|
||||
def draw_detections(self,
|
||||
frame: np.ndarray,
|
||||
detections: List[Dict],
|
||||
show_labels: bool = True,
|
||||
show_conf: bool = True) -> np.ndarray:
|
||||
result = frame.copy()
|
||||
|
||||
for det in detections:
|
||||
x1, y1, x2, y2 = det["bbox"]
|
||||
label = det["label"]
|
||||
conf = det["confidence"]
|
||||
track_id = det.get("track_id")
|
||||
|
||||
if conf > 0.7:
|
||||
color = COLORS["high_conf"]
|
||||
elif conf > 0.5:
|
||||
color = COLORS["medium_conf"]
|
||||
else:
|
||||
color = COLORS["low_conf"]
|
||||
|
||||
cv2.rectangle(result, (x1, y1), (x2, y2), color, 2)
|
||||
|
||||
if show_labels:
|
||||
label_parts = [label]
|
||||
if track_id is not None:
|
||||
label_parts.append(f"ID:{track_id}")
|
||||
if show_conf:
|
||||
label_parts.append(f"{conf:.2f}")
|
||||
label_text = " | ".join(label_parts)
|
||||
|
||||
(text_w, text_h), baseline = cv2.getTextSize(
|
||||
label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1
|
||||
)
|
||||
cv2.rectangle(
|
||||
result,
|
||||
(x1, y1 - text_h - 8),
|
||||
(x1 + text_w + 4, y1),
|
||||
color,
|
||||
-1
|
||||
)
|
||||
cv2.putText(
|
||||
result,
|
||||
label_text,
|
||||
(x1 + 2, y1 - 4),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.5,
|
||||
(0, 0, 0),
|
||||
1
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def get_class_names(self) -> Dict[int, str]:
|
||||
return self.model.names if self.model else {}
|
||||
197
backend/src/cv/scanner.py
Normal file
197
backend/src/cv/scanner.py
Normal file
@@ -0,0 +1,197 @@
|
||||
import cv2
|
||||
import json
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
from ..ollama.detector import OllamaLogoDetector
|
||||
|
||||
def capture_and_analyze(model: str = "ministral-3:latest",
|
||||
save_image: bool = True,
|
||||
output_dir: Optional[str] = None) -> Dict:
|
||||
cap = cv2.VideoCapture(0)
|
||||
if not cap.isOpened():
|
||||
raise RuntimeError("Could not access camera")
|
||||
|
||||
print("Camera ready. Press SPACE to capture, Q to quit.")
|
||||
|
||||
result = None
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
display = frame.copy()
|
||||
cv2.putText(display, "Press SPACE to capture | Q to quit",
|
||||
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
||||
cv2.imshow("Capture", display)
|
||||
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
|
||||
if key == ord(' '):
|
||||
print("Analyzing image...")
|
||||
|
||||
if save_image:
|
||||
if output_dir is None:
|
||||
output_dir = "./captures"
|
||||
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
img_path = Path(output_dir) / f"capture_{timestamp}.jpg"
|
||||
cv2.imwrite(str(img_path), frame)
|
||||
print(f"Saved: {img_path}")
|
||||
|
||||
detector = OllamaLogoDetector(model=model)
|
||||
result = detector.detect_from_numpy(frame)
|
||||
|
||||
_display_results(result)
|
||||
break
|
||||
|
||||
elif key == ord('q'):
|
||||
break
|
||||
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
return result if result else {"logos_detected": [], "total_count": 0}
|
||||
|
||||
def start_interactive_capture(model: str = "ministral-3:latest",
|
||||
save_images: bool = True,
|
||||
output_dir: Optional[str] = None):
|
||||
cap = cv2.VideoCapture(0)
|
||||
if not cap.isOpened():
|
||||
raise RuntimeError("Could not access camera")
|
||||
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
print("=" * 60)
|
||||
print("Ollama Logo Detection - Interactive Mode")
|
||||
print("=" * 60)
|
||||
print(f"Camera: {width}x{height}")
|
||||
print(f"Model: {model}")
|
||||
print("\nControls:")
|
||||
print(" SPACE - Capture and analyze")
|
||||
print(" S - Save frame only")
|
||||
print(" R - Show last results")
|
||||
print(" Q - Quit")
|
||||
print("=" * 60)
|
||||
|
||||
detector = OllamaLogoDetector(model=model)
|
||||
last_result = None
|
||||
analyzing = False
|
||||
status_message = "Ready - Press SPACE to capture"
|
||||
|
||||
if output_dir is None:
|
||||
output_dir = "./captures"
|
||||
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
display = frame.copy()
|
||||
|
||||
cv2.rectangle(display, (0, 0), (width, 40), (40, 40, 40), -1)
|
||||
cv2.putText(display, status_message, (10, 28),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
||||
|
||||
if last_result and last_result.get("logos_detected"):
|
||||
brands = [l.get("brand", "?") for l in last_result["logos_detected"]]
|
||||
brand_text = f"Detected: {', '.join(brands[:3])}"
|
||||
if len(brands) > 3:
|
||||
brand_text += f" +{len(brands)-3} more"
|
||||
cv2.rectangle(display, (0, height-35), (width, height), (40, 40, 40), -1)
|
||||
cv2.putText(display, brand_text, (10, height-10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
|
||||
|
||||
cv2.imshow("Ollama Logo Detection", display)
|
||||
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
|
||||
if key == ord(' ') and not analyzing:
|
||||
analyzing = True
|
||||
status_message = "Analyzing with Ollama..."
|
||||
cv2.imshow("Ollama Logo Detection", display)
|
||||
cv2.waitKey(1)
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
img_path = Path(output_dir) / f"capture_{timestamp}.jpg"
|
||||
|
||||
if save_images:
|
||||
cv2.imwrite(str(img_path), frame)
|
||||
|
||||
last_result = detector.detect_from_numpy(frame)
|
||||
|
||||
json_path = Path(output_dir) / f"result_{timestamp}.json"
|
||||
with open(json_path, 'w') as f:
|
||||
json.dump(last_result, f, indent=2)
|
||||
|
||||
count = last_result.get("total_count", 0)
|
||||
if count > 0:
|
||||
status_message = f"Found {count} logo(s)! Press R for details"
|
||||
else:
|
||||
status_message = "No logos detected. Try again!"
|
||||
|
||||
print(f"\nCaptured: {img_path}")
|
||||
print(f"Results: {json_path}")
|
||||
_display_results(last_result)
|
||||
|
||||
analyzing = False
|
||||
|
||||
elif key == ord('s'):
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
img_path = Path(output_dir) / f"capture_{timestamp}.jpg"
|
||||
cv2.imwrite(str(img_path), frame)
|
||||
status_message = f"Saved: {img_path.name}"
|
||||
print(f"Saved: {img_path}")
|
||||
|
||||
elif key == ord('r') and last_result:
|
||||
print("\n" + "=" * 40)
|
||||
print("Last Detection Results:")
|
||||
print("=" * 40)
|
||||
_display_results(last_result)
|
||||
|
||||
elif key == ord('q'):
|
||||
break
|
||||
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
print("\nGoodbye!")
|
||||
return last_result
|
||||
|
||||
def _display_results(result: Dict):
|
||||
print("\n" + "-" * 40)
|
||||
|
||||
logos = result.get("logos_detected", [])
|
||||
count = result.get("total_count", len(logos))
|
||||
|
||||
if count == 0:
|
||||
print("No logos or brands detected")
|
||||
if "description" in result:
|
||||
print(f"Description: {result['description']}")
|
||||
else:
|
||||
print(f"Detected {count} logo(s)/brand(s):\n")
|
||||
for i, logo in enumerate(logos, 1):
|
||||
brand = logo.get("brand", "Unknown")
|
||||
conf = logo.get("confidence", "unknown")
|
||||
loc = logo.get("location", "unknown")
|
||||
cat = logo.get("category", "")
|
||||
|
||||
print(f" {i}. {brand}")
|
||||
print(f" Confidence: {conf}")
|
||||
print(f" Location: {loc}")
|
||||
if cat:
|
||||
print(f" Category: {cat}")
|
||||
print()
|
||||
|
||||
if "error" in result:
|
||||
print(f"Error: {result['error']}")
|
||||
|
||||
print("-" * 40)
|
||||
|
||||
print("\nJSON Output:")
|
||||
print(json.dumps(result, indent=2))
|
||||
28
backend/src/cv/vision.py
Normal file
28
backend/src/cv/vision.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from .config import (
|
||||
CV_DIR,
|
||||
DATA_DIR,
|
||||
MODELS_DIR,
|
||||
ULTRALYTICS_AVAILABLE,
|
||||
YOLO26_MODELS,
|
||||
SUPER_CATEGORIES,
|
||||
COMMON_BRANDS,
|
||||
COLORS,
|
||||
DEFAULT_CONF_THRESHOLD,
|
||||
DEFAULT_IOU_THRESHOLD,
|
||||
DEFAULT_IMG_SIZE,
|
||||
)
|
||||
from .detectors import (
|
||||
YOLO26Detector,
|
||||
HybridLogoDetector,
|
||||
)
|
||||
from .yolo_scanner import (
|
||||
start_scanner as start_yolo_scanner,
|
||||
detect_objects as detect_yolo_objects,
|
||||
)
|
||||
from .scanner import (
|
||||
start_interactive_capture as start_ollama_scanner,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
from .cli import main
|
||||
main()
|
||||
166
backend/src/cv/yolo_scanner.py
Normal file
166
backend/src/cv/yolo_scanner.py
Normal file
@@ -0,0 +1,166 @@
|
||||
import cv2
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
from .config import (
|
||||
CV_DIR,
|
||||
ULTRALYTICS_AVAILABLE,
|
||||
)
|
||||
from .detectors import YOLO26Detector, HybridLogoDetector
|
||||
|
||||
def start_scanner(model_path: Optional[str] = None,
|
||||
model_size: str = "nano",
|
||||
logo_model_path: Optional[str] = None,
|
||||
use_gui: bool = True,
|
||||
use_tracking: bool = False,
|
||||
hybrid_mode: bool = False):
|
||||
print("=" * 60)
|
||||
if hybrid_mode:
|
||||
print("YOLO26 Hybrid Scanner (COCO + Logos)")
|
||||
else:
|
||||
print("YOLO26 Object Detection Scanner")
|
||||
print("=" * 60)
|
||||
|
||||
detector = None
|
||||
|
||||
if hybrid_mode and ULTRALYTICS_AVAILABLE:
|
||||
try:
|
||||
detector = HybridLogoDetector(
|
||||
coco_model_size=model_size,
|
||||
logo_model_path=logo_model_path,
|
||||
conf_threshold=0.25,
|
||||
device="auto"
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Hybrid detector failed: {e}")
|
||||
hybrid_mode = False
|
||||
|
||||
if detector is None and ULTRALYTICS_AVAILABLE:
|
||||
try:
|
||||
detector = YOLO26Detector(
|
||||
model_size=model_size,
|
||||
model_path=model_path,
|
||||
conf_threshold=0.25,
|
||||
device="auto"
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"YOLO26 failed: {e}")
|
||||
|
||||
if detector is None:
|
||||
print("Error: No detector available.")
|
||||
return
|
||||
|
||||
cap = cv2.VideoCapture(0)
|
||||
if not cap.isOpened():
|
||||
print("Error: Could not access camera.")
|
||||
return
|
||||
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
|
||||
|
||||
writer = None
|
||||
output_path = CV_DIR / "output.mp4"
|
||||
|
||||
print(f"Camera: {width}x{height} @ {fps:.1f}fps")
|
||||
print("Controls: q=quit | s=screenshot | t=tracking")
|
||||
if hybrid_mode:
|
||||
print(" o=objects | l=logos | b=both")
|
||||
|
||||
frame_count = 0
|
||||
detect_objects_flag = True
|
||||
detect_logos_flag = True
|
||||
|
||||
try:
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
frame_count += 1
|
||||
|
||||
if hybrid_mode and isinstance(detector, HybridLogoDetector):
|
||||
detections = detector.detect(
|
||||
frame,
|
||||
detect_objects=detect_objects_flag,
|
||||
detect_logos=detect_logos_flag
|
||||
)
|
||||
elif use_tracking and isinstance(detector, YOLO26Detector):
|
||||
detections = detector.detect_and_track(frame)
|
||||
else:
|
||||
detections = detector.detect(frame)
|
||||
|
||||
result_frame = detector.draw_detections(frame, detections)
|
||||
|
||||
mode_str = "HYBRID" if hybrid_mode else ("TRACK" if use_tracking else "DETECT")
|
||||
cv2.putText(result_frame, f"{mode_str} | {len(detections)} objects",
|
||||
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
|
||||
|
||||
if use_gui:
|
||||
try:
|
||||
cv2.imshow('YOLO26 Scanner', result_frame)
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
if key == ord('q'):
|
||||
break
|
||||
elif key == ord('s'):
|
||||
path = CV_DIR / f"screenshot_{frame_count}.jpg"
|
||||
cv2.imwrite(str(path), result_frame)
|
||||
print(f"Saved: {path}")
|
||||
elif key == ord('t') and isinstance(detector, YOLO26Detector):
|
||||
use_tracking = not use_tracking
|
||||
elif key == ord('o') and hybrid_mode:
|
||||
detect_objects_flag, detect_logos_flag = True, False
|
||||
elif key == ord('l') and hybrid_mode:
|
||||
detect_objects_flag, detect_logos_flag = False, True
|
||||
elif key == ord('b') and hybrid_mode:
|
||||
detect_objects_flag, detect_logos_flag = True, True
|
||||
except cv2.error:
|
||||
use_gui = False
|
||||
writer = cv2.VideoWriter(
|
||||
str(output_path), cv2.VideoWriter_fourcc(*'mp4v'),
|
||||
fps, (width, height)
|
||||
)
|
||||
|
||||
if not use_gui and writer:
|
||||
writer.write(result_frame)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping...")
|
||||
finally:
|
||||
cap.release()
|
||||
if writer:
|
||||
writer.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
def detect_objects(image_path: str,
|
||||
model_size: str = "nano",
|
||||
conf_threshold: float = 0.25,
|
||||
save_output: bool = True,
|
||||
hybrid_mode: bool = False) -> List[Dict]:
|
||||
if not ULTRALYTICS_AVAILABLE:
|
||||
raise RuntimeError("Ultralytics not installed")
|
||||
|
||||
if hybrid_mode:
|
||||
detector = HybridLogoDetector(
|
||||
coco_model_size=model_size,
|
||||
conf_threshold=conf_threshold
|
||||
)
|
||||
else:
|
||||
detector = YOLO26Detector(
|
||||
model_size=model_size,
|
||||
conf_threshold=conf_threshold
|
||||
)
|
||||
|
||||
image = cv2.imread(image_path)
|
||||
if image is None:
|
||||
raise ValueError(f"Could not load: {image_path}")
|
||||
|
||||
detections = detector.detect(image)
|
||||
|
||||
if save_output:
|
||||
result = detector.draw_detections(image, detections)
|
||||
output = Path(image_path).stem + "_detected.jpg"
|
||||
cv2.imwrite(output, result)
|
||||
print(f"Saved: {output}")
|
||||
|
||||
return detections
|
||||
5
backend/src/ollama/__init__.py
Normal file
5
backend/src/ollama/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .detector import OllamaLogoDetector
|
||||
|
||||
__all__ = [
|
||||
"OllamaLogoDetector",
|
||||
]
|
||||
4
backend/src/ollama/__main__.py
Normal file
4
backend/src/ollama/__main__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
103
backend/src/ollama/cli.py
Normal file
103
backend/src/ollama/cli.py
Normal file
@@ -0,0 +1,103 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
|
||||
from .detector import OllamaLogoDetector
|
||||
from .camera import capture_and_analyze, start_interactive_capture
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Detect logos and companies using Ollama vision models"
|
||||
)
|
||||
|
||||
parser.add_argument("--image", "-i", type=str)
|
||||
parser.add_argument("--model", "-m", type=str, default="ministral-3:latest")
|
||||
parser.add_argument("--output", "-o", type=str)
|
||||
parser.add_argument("--host", type=str)
|
||||
parser.add_argument("--single", "-s", action="store_true")
|
||||
parser.add_argument("--no-save", action="store_true")
|
||||
parser.add_argument("--output-dir", type=str, default="./captures")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
if args.image:
|
||||
print(f"Analyzing: {args.image}")
|
||||
print(f"Model: {args.model}")
|
||||
|
||||
detector = OllamaLogoDetector(model=args.model, host=args.host)
|
||||
result = detector.detect_from_file(args.image)
|
||||
|
||||
_print_results(result)
|
||||
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(result, f, indent=2)
|
||||
print(f"Results saved to: {args.output}")
|
||||
|
||||
elif args.single:
|
||||
result = capture_and_analyze(
|
||||
model=args.model,
|
||||
save_image=not args.no_save,
|
||||
output_dir=args.output_dir
|
||||
)
|
||||
|
||||
if args.output and result:
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(result, f, indent=2)
|
||||
print(f"Results saved to: {args.output}")
|
||||
|
||||
else:
|
||||
start_interactive_capture(
|
||||
model=args.model,
|
||||
save_images=not args.no_save,
|
||||
output_dir=args.output_dir
|
||||
)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def _print_results(result: dict):
|
||||
print("\n" + "=" * 50)
|
||||
print("DETECTION RESULTS")
|
||||
print("=" * 50)
|
||||
|
||||
logos = result.get("logos_detected", [])
|
||||
count = result.get("total_count", len(logos))
|
||||
|
||||
if count == 0:
|
||||
print("\nNo logos or companies detected")
|
||||
if desc := result.get("description"):
|
||||
print(f"\nImage description: {desc}")
|
||||
else:
|
||||
print(f"\nFound {count} logo(s)/company(s):\n")
|
||||
|
||||
for i, logo in enumerate(logos, 1):
|
||||
brand = logo.get("brand", "Unknown")
|
||||
conf = logo.get("confidence", "unknown")
|
||||
loc = logo.get("location", "unknown")
|
||||
cat = logo.get("category", "N/A")
|
||||
|
||||
print(f" {i}. {brand}")
|
||||
print(f" Confidence: {conf}")
|
||||
print(f" Location: {loc}")
|
||||
print(f" Category: {cat}")
|
||||
print()
|
||||
|
||||
if "error" in result:
|
||||
print(f"\nError occurred: {result['error']}")
|
||||
|
||||
if "raw_response" in result and result.get("parse_error"):
|
||||
print(f"\nParse error: {result['parse_error']}")
|
||||
print(f"Raw response:\n{result['raw_response'][:500]}...")
|
||||
|
||||
print("=" * 50)
|
||||
print("\nRaw JSON:")
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
164
backend/src/ollama/detector.py
Normal file
164
backend/src/ollama/detector.py
Normal file
@@ -0,0 +1,164 @@
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
try:
|
||||
import ollama
|
||||
OLLAMA_AVAILABLE = True
|
||||
except ImportError:
|
||||
OLLAMA_AVAILABLE = False
|
||||
print("Ollama not installed. Run: pip install ollama")
|
||||
|
||||
DEFAULT_HOST = "https://ollama.sirblob.co"
|
||||
DEFAULT_MODEL = "ministral-3:latest"
|
||||
|
||||
DEFAULT_PROMPT = """Analyze this image and identify ALL logos, brand names, and company names visible.
|
||||
|
||||
For each logo or brand you detect, provide:
|
||||
1. The company/brand name
|
||||
2. Confidence level (high, medium, low)
|
||||
3. Location in image (top-left, center, bottom-right, etc.)
|
||||
4. Product category if identifiable
|
||||
|
||||
Return your response as a valid JSON object with this exact structure:
|
||||
{
|
||||
"logos_detected": [
|
||||
{
|
||||
"brand": "Company Name",
|
||||
"confidence": "high",
|
||||
"location": "center",
|
||||
"category": "technology"
|
||||
}
|
||||
],
|
||||
"total_count": 1,
|
||||
"description": "Brief description of what's in the image"
|
||||
}
|
||||
|
||||
If no logos are found, return:
|
||||
{
|
||||
"logos_detected": [],
|
||||
"total_count": 0,
|
||||
"description": "Description of image with no visible logos"
|
||||
}
|
||||
|
||||
IMPORTANT: Return ONLY the JSON object, no other text."""
|
||||
|
||||
class OllamaLogoDetector:
|
||||
def __init__(self,
|
||||
model: str = DEFAULT_MODEL,
|
||||
host: str = DEFAULT_HOST):
|
||||
if not OLLAMA_AVAILABLE:
|
||||
raise RuntimeError("Ollama not installed. Run: pip install ollama")
|
||||
|
||||
self.model = model
|
||||
self.host = host
|
||||
self.client = ollama.Client(host=host)
|
||||
|
||||
try:
|
||||
models = self.client.list()
|
||||
model_names = [m['name'] for m in models.get('models', [])]
|
||||
|
||||
model_base = model.split(':')[0]
|
||||
if not any(model_base in name for name in model_names):
|
||||
print(f"Model '{model}' not found. Available models: {model_names}")
|
||||
print(f"Pulling {model}...")
|
||||
self.client.pull(model)
|
||||
print(f"Model {model} ready!")
|
||||
else:
|
||||
print(f"Using Ollama model: {model}")
|
||||
except Exception as e:
|
||||
print(f"Could not verify model: {e}")
|
||||
print("Make sure Ollama is running: ollama serve")
|
||||
|
||||
def detect_from_file(self,
|
||||
image_path: str,
|
||||
prompt: Optional[str] = None) -> Dict:
|
||||
path = Path(image_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Image not found: {image_path}")
|
||||
|
||||
with open(path, 'rb') as f:
|
||||
image_data = base64.b64encode(f.read()).decode('utf-8')
|
||||
|
||||
return self._analyze_image(image_data, prompt)
|
||||
|
||||
def detect_from_bytes(self,
|
||||
image_bytes: bytes,
|
||||
prompt: Optional[str] = None) -> Dict:
|
||||
image_data = base64.b64encode(image_bytes).decode('utf-8')
|
||||
return self._analyze_image(image_data, prompt)
|
||||
|
||||
def detect_from_numpy(self,
|
||||
image_array,
|
||||
prompt: Optional[str] = None) -> Dict:
|
||||
import cv2
|
||||
|
||||
success, buffer = cv2.imencode('.jpg', image_array)
|
||||
if not success:
|
||||
raise ValueError("Failed to encode image")
|
||||
|
||||
return self.detect_from_bytes(buffer.tobytes(), prompt)
|
||||
|
||||
def _analyze_image(self,
|
||||
image_base64: str,
|
||||
prompt: Optional[str] = None) -> Dict:
|
||||
if prompt is None:
|
||||
prompt = DEFAULT_PROMPT
|
||||
|
||||
try:
|
||||
response = self.client.chat(
|
||||
model=self.model,
|
||||
messages=[{
|
||||
'role': 'user',
|
||||
'content': prompt,
|
||||
'images': [image_base64]
|
||||
}],
|
||||
options={
|
||||
'temperature': 0.1,
|
||||
}
|
||||
)
|
||||
|
||||
content = response['message']['content']
|
||||
return self._parse_response(content)
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"logos_detected": [],
|
||||
"total_count": 0,
|
||||
"error": str(e),
|
||||
"raw_response": None
|
||||
}
|
||||
|
||||
def _parse_response(self, content: str) -> Dict:
|
||||
try:
|
||||
return json.loads(content)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
json_patterns = [
|
||||
r'```json\s*([\s\S]*?)\s*```',
|
||||
r'```\s*([\s\S]*?)\s*```',
|
||||
r'\{[\s\S]*\}'
|
||||
]
|
||||
|
||||
for pattern in json_patterns:
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
try:
|
||||
json_str = match.group(1) if '```' in pattern else match.group(0)
|
||||
return json.loads(json_str)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return {
|
||||
"logos_detected": [],
|
||||
"total_count": 0,
|
||||
"raw_response": content,
|
||||
"parse_error": "Could not extract valid JSON from response"
|
||||
}
|
||||
|
||||
def get_brands_list(self, result: Dict) -> List[str]:
|
||||
logos = result.get("logos_detected", [])
|
||||
return [logo.get("brand", "Unknown") for logo in logos]
|
||||
Reference in New Issue
Block a user