mirror of
https://github.com/SirBlobby/Hoya26.git
synced 2026-02-04 03:34:34 -05:00
Restore code and save recent updates
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
import os
|
||||
from google import genai
|
||||
from src.chroma.vector_store import search_documents
|
||||
from src.rag.embeddings import get_embedding
|
||||
import os
|
||||
from google import genai
|
||||
from src .chroma .vector_store import search_documents
|
||||
from src .rag .embeddings import get_embedding
|
||||
|
||||
GREENWASHING_ANALYSIS_PROMPT = """
|
||||
GREENWASHING_ANALYSIS_PROMPT ="""
|
||||
You are an expert Environmental, Social, and Governance (ESG) Analyst specialized in detecting 'Greenwashing'.
|
||||
Your task is to analyze the provided context from a company's data reports and determine if they are engaging in greenwashing.
|
||||
|
||||
@@ -21,123 +21,123 @@ Based on the context provided, give a final verdict:
|
||||
- EVIDENCE: [Quote specific parts of the context if possible]
|
||||
"""
|
||||
|
||||
def ask(prompt):
|
||||
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
|
||||
return client.models.generate_content(model="gemini-3-flash-preview", contents=prompt).text
|
||||
def ask (prompt ):
|
||||
client =genai .Client (api_key =os .environ .get ("GOOGLE_API_KEY"))
|
||||
return client .models .generate_content (model ="gemini-3-pro-preview",contents =prompt ).text
|
||||
|
||||
def ask_gemini_with_rag(prompt, category=None):
|
||||
def ask_gemini_with_rag (prompt ,category =None ):
|
||||
"""Ask Gemini with RAG context from the vector database."""
|
||||
# Get embedding for the prompt
|
||||
query_embedding = get_embedding(prompt)
|
||||
|
||||
# Search for relevant documents
|
||||
results = search_documents(query_embedding, num_results=5)
|
||||
|
||||
# Build context from results
|
||||
context = ""
|
||||
for res in results:
|
||||
context += f"--- Document ---\n{res['text']}\n\n"
|
||||
|
||||
# Create full prompt with context
|
||||
full_prompt = f"""You are a helpful sustainability assistant. Use the following context to answer the user's question.
|
||||
|
||||
query_embedding =get_embedding (prompt )
|
||||
|
||||
|
||||
results =search_documents (query_embedding ,num_results =5 )
|
||||
|
||||
|
||||
context =""
|
||||
for res in results :
|
||||
context +=f"--- Document ---\n{res ['text']}\n\n"
|
||||
|
||||
|
||||
full_prompt =f"""You are a helpful sustainability assistant. Use the following context to answer the user's question.
|
||||
If the context doesn't contain relevant information, you can use your general knowledge but mention that.
|
||||
|
||||
CONTEXT:
|
||||
{context}
|
||||
{context }
|
||||
|
||||
USER QUESTION: {prompt}
|
||||
USER QUESTION: {prompt }
|
||||
|
||||
Please provide a helpful and concise response."""
|
||||
|
||||
return ask(full_prompt)
|
||||
return ask (full_prompt )
|
||||
|
||||
def analyze(query, query_embedding, num_results=5, num_alternatives=3):
|
||||
try:
|
||||
results = search_documents(query_embedding, num_results=num_results + num_alternatives + 5)
|
||||
except Exception as e:
|
||||
print(f"Chroma error: {e}")
|
||||
results = []
|
||||
|
||||
if not results:
|
||||
context = "No data found in database for this brand."
|
||||
else:
|
||||
context = "--- START OF REPORT CONTEXT ---\n"
|
||||
for res in results[:num_results]:
|
||||
context += f"RELEVANT DATA CHUNK: {res['text']}\n\n"
|
||||
context += "--- END OF REPORT CONTEXT ---\n"
|
||||
|
||||
full_prompt = f"{GREENWASHING_ANALYSIS_PROMPT}\n\n{context}\n\nUSER QUERY/COMPANY FOCUS: {query}"
|
||||
analysis_text = ask(full_prompt)
|
||||
|
||||
alternatives = []
|
||||
seen_texts = set()
|
||||
for res in results[num_results:]:
|
||||
text_preview = res['text'][:200]
|
||||
if text_preview not in seen_texts:
|
||||
seen_texts.add(text_preview)
|
||||
alternatives.append({"text": res['text'], "score": res.get('score'), "summary": text_preview})
|
||||
if len(alternatives) >= num_alternatives:
|
||||
break
|
||||
|
||||
return {"analysis": analysis_text, "alternatives": alternatives}
|
||||
def analyze (query ,query_embedding ,num_results =5 ,num_alternatives =3 ):
|
||||
try :
|
||||
results =search_documents (query_embedding ,num_results =num_results +num_alternatives +5 )
|
||||
except Exception as e :
|
||||
print (f"Chroma error: {e }")
|
||||
results =[]
|
||||
|
||||
def ask_gemini_with_rag(query, category=None, num_results=5):
|
||||
embedding = get_embedding(query)
|
||||
result = analyze(query, embedding, num_results=num_results)
|
||||
return result["analysis"]
|
||||
if not results :
|
||||
context ="No data found in database for this brand."
|
||||
else :
|
||||
context ="--- START OF REPORT CONTEXT ---\n"
|
||||
for res in results [:num_results ]:
|
||||
context +=f"RELEVANT DATA CHUNK: {res ['text']}\n\n"
|
||||
context +="--- END OF REPORT CONTEXT ---\n"
|
||||
|
||||
def analyze_brand(brand_name):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Analyzing brand: {brand_name}")
|
||||
print('='*60)
|
||||
|
||||
try:
|
||||
print("\n[1/3] Getting embedding for brand...")
|
||||
embedding = get_embedding(brand_name)
|
||||
|
||||
print("[2/3] Querying Chroma database...")
|
||||
result = analyze(brand_name, embedding)
|
||||
|
||||
print("[3/3] Gemini Analysis Complete!\n")
|
||||
print("-"*60)
|
||||
print("ANALYSIS:")
|
||||
print("-"*60)
|
||||
print(result["analysis"])
|
||||
|
||||
print("\n" + "-"*60)
|
||||
print("ALTERNATIVES FROM DATABASE:")
|
||||
print("-"*60)
|
||||
if result["alternatives"]:
|
||||
for i, alt in enumerate(result["alternatives"], 1):
|
||||
print(f"\n{i}. {alt['summary']}...")
|
||||
else:
|
||||
print("No alternatives found in database.")
|
||||
|
||||
print("\n" + "="*60)
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"\nError during analysis: {e}")
|
||||
return None
|
||||
full_prompt =f"{GREENWASHING_ANALYSIS_PROMPT }\n\n{context }\n\nUSER QUERY/COMPANY FOCUS: {query }"
|
||||
analysis_text =ask (full_prompt )
|
||||
|
||||
def scan_and_analyze():
|
||||
from src.cv.scanner import capture_and_analyze as cv_capture
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("CV + Gemini Greenwashing Scanner")
|
||||
print("="*60)
|
||||
print("Using camera to detect brands...")
|
||||
|
||||
cv_result = cv_capture()
|
||||
|
||||
logos = cv_result.get("logos_detected", [])
|
||||
if not logos:
|
||||
print("No brands detected. Try again!")
|
||||
return None
|
||||
|
||||
brand = logos[0].get("brand", "Unknown")
|
||||
print(f"\nDetected brand: {brand}")
|
||||
|
||||
return analyze_brand(brand)
|
||||
alternatives =[]
|
||||
seen_texts =set ()
|
||||
for res in results [num_results :]:
|
||||
text_preview =res ['text'][:200 ]
|
||||
if text_preview not in seen_texts :
|
||||
seen_texts .add (text_preview )
|
||||
alternatives .append ({"text":res ['text'],"score":res .get ('score'),"summary":text_preview })
|
||||
if len (alternatives )>=num_alternatives :
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
scan_and_analyze()
|
||||
return {"analysis":analysis_text ,"alternatives":alternatives }
|
||||
|
||||
def ask_gemini_with_rag (query ,category =None ,num_results =5 ):
|
||||
embedding =get_embedding (query )
|
||||
result =analyze (query ,embedding ,num_results =num_results )
|
||||
return result ["analysis"]
|
||||
|
||||
def analyze_brand (brand_name ):
|
||||
print (f"\n{'='*60 }")
|
||||
print (f"Analyzing brand: {brand_name }")
|
||||
print ('='*60 )
|
||||
|
||||
try :
|
||||
print ("\n[1/3] Getting embedding for brand...")
|
||||
embedding =get_embedding (brand_name )
|
||||
|
||||
print ("[2/3] Querying Chroma database...")
|
||||
result =analyze (brand_name ,embedding )
|
||||
|
||||
print ("[3/3] Gemini Analysis Complete!\n")
|
||||
print ("-"*60 )
|
||||
print ("ANALYSIS:")
|
||||
print ("-"*60 )
|
||||
print (result ["analysis"])
|
||||
|
||||
print ("\n"+"-"*60 )
|
||||
print ("ALTERNATIVES FROM DATABASE:")
|
||||
print ("-"*60 )
|
||||
if result ["alternatives"]:
|
||||
for i ,alt in enumerate (result ["alternatives"],1 ):
|
||||
print (f"\n{i }. {alt ['summary']}...")
|
||||
else :
|
||||
print ("No alternatives found in database.")
|
||||
|
||||
print ("\n"+"="*60 )
|
||||
return result
|
||||
except Exception as e :
|
||||
print (f"\nError during analysis: {e }")
|
||||
return None
|
||||
|
||||
def scan_and_analyze ():
|
||||
from src .cv .scanner import capture_and_analyze as cv_capture
|
||||
|
||||
print ("\n"+"="*60 )
|
||||
print ("CV + Gemini Greenwashing Scanner")
|
||||
print ("="*60 )
|
||||
print ("Using camera to detect brands...")
|
||||
|
||||
cv_result =cv_capture ()
|
||||
|
||||
logos =cv_result .get ("logos_detected",[])
|
||||
if not logos :
|
||||
print ("No brands detected. Try again!")
|
||||
return None
|
||||
|
||||
brand =logos [0 ].get ("brand","Unknown")
|
||||
print (f"\nDetected brand: {brand }")
|
||||
|
||||
return analyze_brand (brand )
|
||||
|
||||
if __name__ =="__main__":
|
||||
scan_and_analyze ()
|
||||
|
||||
Reference in New Issue
Block a user