import os import sys import argparse from pathlib import Path sys .path .append (os .path .join (os .path .dirname (__file__ ),'..')) from dotenv import load_dotenv load_dotenv () from src .rag .ingest import process_file from src .rag .store import ingest_documents from src .mongo .metadata import is_file_processed ,log_processed_file def populate_from_dataset (dataset_dir ,category =None ,force =False ): dataset_path =Path (dataset_dir ) if not dataset_path .exists (): print (f"Dataset directory not found: {dataset_dir }") return print (f"Scanning {dataset_dir }...") if category : print (f"Category: {category }") total_chunks =0 files_processed =0 for file_path in dataset_path .glob ('*'): if file_path .is_file ()and file_path .suffix .lower ()in ['.csv','.pdf','.txt','.xlsx']: if not force and is_file_processed (file_path .name ): print (f"Skipping {file_path .name } (already processed)") continue print (f"Processing {file_path .name }...") try : chunks =process_file (str (file_path )) if chunks : count =ingest_documents (chunks ,source_file =file_path .name ,category =category ) print (f" Ingested {count } chunks.") if count >0 : log_processed_file (file_path .name ,category =category ,chunk_count =count ) total_chunks +=count files_processed +=1 else : print (" No text found/extracted.") except Exception as e : print (f" Error processing file: {e }") print (f"\nFinished! Processed {files_processed } files. Total chunks ingested: {total_chunks }") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Populate vector database from dataset files") parser.add_argument("--category", "-c", type=str, help="Category to assign to ingested documents") parser.add_argument("--dir", "-d", type=str, default=None, help="Dataset directory path") parser.add_argument("--force", "-f", action="store_true", help="Force re-processing of files even if marked as processed") args = parser.parse_args() # Check vector store mode use_atlas = os.environ.get("ATLAS_VECTORS", "false").lower() == "true" store_name = "MongoDB Atlas Vector Search" if use_atlas else "ChromaDB" print(f"--- Vector Store Mode: {store_name} ---") if args.dir: dataset_dir = args.dir else: dataset_dir = os.path.join(os.path.dirname(__file__), '../dataset') # Note: We need to pass force flag to populate_from_dataset ideally, # but the function signature doesn't have it. I'll modify the function signature too. populate_from_dataset(dataset_dir, category=args.category, force=args.force)