Populate DB

This commit is contained in:
2026-01-24 05:08:01 +00:00
parent aad7e6e08d
commit d145f7e94c
4 changed files with 113 additions and 44 deletions

View File

@@ -0,0 +1,52 @@
import os
import sys
from pathlib import Path
# Add backend directory to path so we can import src
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from dotenv import load_dotenv
load_dotenv()
from src.rag.ingest import process_file
from src.rag.store import ingest_documents
from src.mongo.vector_store import is_file_processed, log_processed_file
def populate_from_dataset(dataset_dir):
dataset_path = Path(dataset_dir)
if not dataset_path.exists():
print(f"Dataset directory not found: {dataset_dir}")
return
print(f"Scanning {dataset_dir}...")
total_chunks = 0
files_processed = 0
for file_path in dataset_path.glob('*'):
if file_path.is_file() and file_path.suffix.lower() in ['.csv', '.pdf']:
if is_file_processed(file_path.name):
print(f"Skipping {file_path.name} (already processed)")
continue
print(f"Processing {file_path.name}...")
try:
chunks = process_file(str(file_path))
if chunks:
count = ingest_documents(chunks)
print(f" Ingested {count} chunks.")
if count > 0:
log_processed_file(file_path.name)
total_chunks += count
files_processed += 1
else:
print(" No text found/extracted.")
except Exception as e:
print(f" Error processing file: {e}")
print(f"\nFinished! Processed {files_processed} files. Total chunks ingested: {total_chunks}")
if __name__ == "__main__":
# Assuming run from backend/
dataset_dir = os.path.join(os.path.dirname(__file__), '../dataset')
populate_from_dataset(dataset_dir)

View File

@@ -0,0 +1,8 @@
import os
from pymongo import MongoClient
def get_mongo_client():
uri = os.environ.get("MONGO_URI")
if not uri:
raise ValueError("MONGO_URI environment variable not set")
return MongoClient(uri)

View File

@@ -0,0 +1,49 @@
from .connection import get_mongo_client
def insert_rag_documents(documents, collection_name="rag_documents", db_name="vectors_db"):
client = get_mongo_client()
db = client.get_database(db_name)
collection = db[collection_name]
if documents:
result = collection.insert_many(documents)
return len(result.inserted_ids)
return 0
def search_rag_documents(query_embedding, collection_name="rag_documents", db_name="vectors_db", num_results=5):
client = get_mongo_client()
db = client.get_database(db_name)
collection = db[collection_name]
pipeline = [
{
"$vectorSearch": {
"index": "vector_index",
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": num_results * 10,
"limit": num_results
}
},
{
"$project": {
"_id": 0,
"text": 1,
"score": { "$meta": "vectorSearchScore" }
}
}
]
return list(collection.aggregate(pipeline))
def is_file_processed(filename, log_collection="ingested_files", db_name="vectors_db"):
client = get_mongo_client()
db = client.get_database(db_name)
collection = db[log_collection]
return collection.find_one({"filename": filename}) is not None
def log_processed_file(filename, log_collection="ingested_files", db_name="vectors_db"):
client = get_mongo_client()
db = client.get_database(db_name)
collection = db[log_collection]
collection.insert_one({"filename": filename, "processed_at": 1}) # keeping it simple

View File

@@ -1,18 +1,7 @@
import os from .embeddings import get_embeddings_batch, get_embedding
from pymongo import MongoClient from ..mongo.vector_store import insert_rag_documents, search_rag_documents
from .embeddings import get_embeddings_batch
def get_mongo_client():
uri = os.environ.get("MONGO_URI")
if not uri:
raise ValueError("MONGO_URI environment variable not set")
return MongoClient(uri)
def ingest_documents(text_chunks, collection_name="rag_documents"): def ingest_documents(text_chunks, collection_name="rag_documents"):
client = get_mongo_client()
db = client.get_database("vectors_db")
collection = db[collection_name]
embeddings = get_embeddings_batch(text_chunks) embeddings = get_embeddings_batch(text_chunks)
documents = [] documents = []
@@ -22,37 +11,8 @@ def ingest_documents(text_chunks, collection_name="rag_documents"):
"embedding": embedding "embedding": embedding
}) })
if documents: return insert_rag_documents(documents, collection_name=collection_name)
collection.insert_many(documents)
return len(documents)
return 0
def vector_search(query_text, collection_name="rag_documents", num_results=5): def vector_search(query_text, collection_name="rag_documents", num_results=5):
from .embeddings import get_embedding
query_embedding = get_embedding(query_text) query_embedding = get_embedding(query_text)
return search_rag_documents(query_embedding, collection_name=collection_name, num_results=num_results)
client = get_mongo_client()
db = client.get_database("vectors_db")
collection = db[collection_name]
pipeline = [
{
"$vectorSearch": {
"index": "vector_index",
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": num_results * 10,
"limit": num_results
}
},
{
"$project": {
"_id": 0,
"text": 1,
"score": { "$meta": "vectorSearchScore" }
}
}
]
results = list(collection.aggregate(pipeline))
return results