mirror of
https://github.com/SirBlobby/Hoya26.git
synced 2026-02-04 03:34:34 -05:00
71 lines
2.9 KiB
Python
71 lines
2.9 KiB
Python
import os
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
sys .path .append (os .path .join (os .path .dirname (__file__ ),'..'))
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv ()
|
|
|
|
from src .rag .ingest import process_file
|
|
from src .rag .store import ingest_documents
|
|
from src .mongo .metadata import is_file_processed ,log_processed_file
|
|
|
|
def populate_from_dataset (dataset_dir ,category =None ,force =False ):
|
|
dataset_path =Path (dataset_dir )
|
|
if not dataset_path .exists ():
|
|
print (f"Dataset directory not found: {dataset_dir }")
|
|
return
|
|
|
|
print (f"Scanning {dataset_dir }...")
|
|
if category :
|
|
print (f"Category: {category }")
|
|
|
|
total_chunks =0
|
|
files_processed =0
|
|
|
|
for file_path in dataset_path .glob ('*'):
|
|
if file_path .is_file ()and file_path .suffix .lower ()in ['.csv','.pdf','.txt','.xlsx']:
|
|
if not force and is_file_processed (file_path .name ):
|
|
print (f"Skipping {file_path .name } (already processed)")
|
|
continue
|
|
|
|
print (f"Processing {file_path .name }...")
|
|
try :
|
|
chunks =process_file (str (file_path ))
|
|
if chunks :
|
|
count =ingest_documents (chunks ,source_file =file_path .name ,category =category )
|
|
print (f" Ingested {count } chunks.")
|
|
if count >0 :
|
|
log_processed_file (file_path .name ,category =category ,chunk_count =count )
|
|
total_chunks +=count
|
|
files_processed +=1
|
|
else :
|
|
print (" No text found/extracted.")
|
|
except Exception as e :
|
|
print (f" Error processing file: {e }")
|
|
|
|
print (f"\nFinished! Processed {files_processed } files. Total chunks ingested: {total_chunks }")
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Populate vector database from dataset files")
|
|
parser.add_argument("--category", "-c", type=str, help="Category to assign to ingested documents")
|
|
parser.add_argument("--dir", "-d", type=str, default=None, help="Dataset directory path")
|
|
parser.add_argument("--force", "-f", action="store_true", help="Force re-processing of files even if marked as processed")
|
|
args = parser.parse_args()
|
|
|
|
# Check vector store mode
|
|
use_atlas = os.environ.get("ATLAS_VECTORS", "false").lower() == "true"
|
|
store_name = "MongoDB Atlas Vector Search" if use_atlas else "ChromaDB"
|
|
print(f"--- Vector Store Mode: {store_name} ---")
|
|
|
|
if args.dir:
|
|
dataset_dir = args.dir
|
|
else:
|
|
dataset_dir = os.path.join(os.path.dirname(__file__), '../dataset')
|
|
|
|
# Note: We need to pass force flag to populate_from_dataset ideally,
|
|
# but the function signature doesn't have it. I'll modify the function signature too.
|
|
populate_from_dataset(dataset_dir, category=args.category, force=args.force)
|