Email sending funcitonality

Env vars are set in the .env
This commit is contained in:
daniel.g 2025-09-25 13:14:30 +00:00
parent ee1526cfd9
commit 2c0f9704b3

View File

@ -1,154 +1,303 @@
import os, glob, json, ujson, gzip, pathlib, re import os, sys, glob, json, ujson, gzip, pathlib, re
from typing import List, Dict, Any from typing import List, Dict, Any
from dataclasses import dataclass
from langchain_openai import ChatOpenAI, OpenAIEmbeddings from dotenv import load_dotenv
from notify import send_email
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain_community.vectorstores import FAISS from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document from langchain_core.documents import Document
from langchain.tools import Tool from langchain.tools import Tool
from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
#export OPENAI_API_KEY="sk-..." # ----- load .env (defaults to ./.env; override with ENV_FILE=/path/to/.env) -----
#SET API KEY^ load_dotenv(os.getenv("ENV_FILE", ".env"))
# ---------- Config ---------- # ----- read env (supports both AZURE_* and AOAI_*) -----
MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini") def _norm_endpoint(ep: str | None) -> str:
EMB_MODEL = os.getenv("EMB_MODEL", "text-embedding-3-small") if not ep: return ""
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") # poller file sink ep = ep.strip().rstrip("/")
BLOB_DIR = os.getenv("BLOB_DIR", "") # optional local mirror of blobs # strip any trailing /openai[/v...]
ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep)
return ep + "/"
AZ_ENDPOINT = _norm_endpoint(
os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT")
)
AZ_API_KEY = (
os.getenv("AZURE_OPENAI_API_KEY")
or os.getenv("AOAI_API_KEY")
or os.getenv("OPENAI_API_KEY")
)
AZ_API_VERSION = (
os.getenv("AZURE_OPENAI_API_VERSION")
or os.getenv("AOAI_API_VERSION")
or "2025-01-01-preview"
)
AZ_CHAT_DEPLOY = (
os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
or os.getenv("AOAI_CHAT_DEPLOYMENT")
or "gpt-4o-mini"
)
AZ_EMBED_DEPLOY = (
os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT")
or os.getenv("AOAI_EMBED_DEPLOYMENT")
or ""
)
# ----- local data config -----
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out")
BLOB_DIR = os.getenv("BLOB_DIR", "")
TOP_K = int(os.getenv("TOP_K", "12")) TOP_K = int(os.getenv("TOP_K", "12"))
# ---------- Helpers to build LLM/Embeddings for Azure OpenAI ----------
def make_llm(temperature: float = 0.2) -> AzureChatOpenAI:
if not AZ_ENDPOINT or not AZ_API_KEY:
raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).")
return AzureChatOpenAI(
azure_endpoint=AZ_ENDPOINT,
api_key=AZ_API_KEY,
api_version=AZ_API_VERSION,
azure_deployment=AZ_CHAT_DEPLOY,
temperature=temperature,
)
def make_embeddings() -> AzureOpenAIEmbeddings | None:
if not AZ_EMBED_DEPLOY:
return None
return AzureOpenAIEmbeddings(
azure_endpoint=AZ_ENDPOINT,
api_key=AZ_API_KEY,
api_version=AZ_API_VERSION,
azure_deployment=AZ_EMBED_DEPLOY,
)
# ---------- Load JSONL chunk files ---------- # ---------- Load JSONL chunk files ----------
def _iter_chunk_files() -> List[pathlib.Path]: def _iter_chunk_files() -> List[pathlib.Path]:
paths = [] paths: List[pathlib.Path] = []
if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists(): if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists():
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")] paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")]
if BLOB_DIR and pathlib.Path(BLOB_DIR).exists(): if BLOB_DIR and pathlib.Path(BLOB_DIR).exists():
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)] paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)]
# newest first
return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True) return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True)
def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]: def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
data = path.read_bytes() data = path.read_bytes()
if path.suffix == ".gz": if path.suffix == ".gz":
data = gzip.decompress(data) data = gzip.decompress(data)
lines = data.splitlines() out: List[Dict[str, Any]] = []
out = [] for ln in data.splitlines():
for ln in lines: if not ln.strip(): continue
if not ln.strip():
continue
try: try:
out.append(ujson.loads(ln)) out.append(ujson.loads(ln))
except Exception: except Exception:
# tolerate partial/corrupt lines
continue continue
return out return out
# Accept either raw events or HEC-shaped {"event": {...}} # Accept either raw events or HEC-shaped {"event": {...}}
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]: def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
evt = rec.get("event", rec) return rec.get("event", rec)
# Ensure strings for some fields if needed
return evt
def _evt_to_text(evt: Dict[str, Any]) -> str: def _evt_to_text(evt: Dict[str, Any]) -> str:
# Compact text for embedding/RAG keys = ["event_type","transaction_id","step","status","importo","divisa","istantaneo",
parts = [] "spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked",
for k in ["event_type","transaction_id","step","status","importo","divisa","istantaneo", "vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"]
"spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked", parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None]
"vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"]:
v = evt.get(k)
if v is not None:
parts.append(f"{k}={v}")
return "bonifico | " + " | ".join(parts) return "bonifico | " + " | ".join(parts)
# ---------- Build vector store ---------- # ---------- Build vector store (only if embeddings deployment exists) ----------
def build_vectorstore(limit_files: int = 20): def build_vectorstore(limit_files: int = 20):
embs = make_embeddings()
if embs is None:
raise RuntimeError("No embeddings deployment set. Export AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT.")
files = _iter_chunk_files()[:limit_files] files = _iter_chunk_files()[:limit_files]
if not files: if not files:
raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR") raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR.")
docs = [] docs, meta_index = [], []
meta_index = []
for fp in files: for fp in files:
rows = _read_jsonl(fp) rows = _read_jsonl(fp)
for rec in rows: for rec in rows:
evt = _normalize_event(rec) evt = _normalize_event(rec)
txt = _evt_to_text(evt) docs.append(Document(
docs.append(Document(page_content=txt, metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}})) page_content=_evt_to_text(evt),
metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}}
))
meta_index.append(evt) meta_index.append(evt)
embeddings = OpenAIEmbeddings(model=EMB_MODEL) vs = FAISS.from_documents(docs, embs)
vs = FAISS.from_documents(docs, embeddings)
return vs, meta_index return vs, meta_index
# ---------- Handy utilities (tools) ---------- # ---------- Tools ----------
def stats_tool_impl(query: str = "") -> str: def stats_tool_impl(query: str = "") -> str:
""" """
Return quick stats from latest chunks. Query supports simple filters like: Filters supported in `query` (space-separated):
'status:rejected min_amount:10000 step:esito' status:<accepted|pending|rejected>
step:<compila|conferma|esito>
divisa:<EUR|USD|GBP>
instant:<true|false>
vop:<no_match|close_match|match>
min_amount:<float>
iban_country:<2-letter e.g., IT>
Examples:
'status:rejected min_amount:10000'
'vop:no_match step:esito'
'divisa:EUR instant:true'
""" """
import math # load recent events into memory
vs, meta_index = build_vectorstore() files = _iter_chunk_files()[:20]
# simple filter pass events = []
min_amount, step, status = 0.0, None, None for fp in files:
m = re.search(r"min_amount:(\d+(\.\d+)?)", query); min_amount = float(m.group(1)) if m else 0.0 for rec in _read_jsonl(fp):
m = re.search(r"step:(\w+)", query); step = m.group(1) if m else None events.append(_normalize_event(rec))
m = re.search(r"status:(\w+)", query); status = m.group(1) if m else None
total = 0; rej = 0; amt_sum = 0.0; hi = 0.0; hi_tx = None # parse filters
for evt in meta_index: q = query.lower()
try: def _kv(key, pat=r"([^\s]+)"):
amt = float(evt.get("importo", 0)) m = re.search(fr"{key}:{pat}", q)
except Exception: return m.group(1) if m else None
amt = 0.0
if amt < min_amount: continue status_f = _kv("status")
if step and evt.get("step") != step: continue step_f = _kv("step")
if status and evt.get("status") != status: continue div_f = _kv("divisa")
total += 1 vop_f = _kv("vop")
country = _kv("iban_country")
instant_s = _kv("instant")
min_amt_s = _kv("min_amount")
min_amt = float(min_amt_s) if min_amt_s else 0.0
inst_f = None
if instant_s in {"true","false"}:
inst_f = (instant_s == "true")
def _boolish(x):
if isinstance(x, bool): return x
if isinstance(x, str): return x.lower() in {"true","1","yes"}
return False
def keep(e):
try: amt = float(e.get("importo", 0) or 0)
except: amt = 0.0
if amt < min_amt: return False
if status_f and (str(e.get("status","")).lower() != status_f): return False
if step_f and (str(e.get("step","")).lower() != step_f): return False
if div_f and (str(e.get("divisa","")).upper() != div_f.upper()): return False
if vop_f:
v = str(e.get("vop_check","")).lower()
if v != vop_f: return False
if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f:
return False
if country:
# heuristic from IBAN (dest or origin)
iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper()
if not iban.startswith(country.upper()):
return False
return True
filtered = [e for e in events if keep(e)]
total = len(filtered)
rej = sum(1 for e in filtered if str(e.get("status","")).lower()=="rejected")
amt_sum = 0.0; hi = 0.0; hi_tx = None
for e in filtered:
try: amt = float(e.get("importo", 0) or 0)
except: amt = 0.0
amt_sum += amt amt_sum += amt
if evt.get("status") == "rejected": rej += 1 if amt > hi:
if amt > hi: hi, hi_tx = amt, evt.get("transaction_id") hi, hi_tx = amt, e.get("transaction_id")
rr = f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})" return f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})"
return rr
def retrieve_tool_impl(question: str) -> str: def retrieve_tool_impl(question: str) -> str:
"""Semantic retrieve top-K log snippets related to the question."""
vs, _ = build_vectorstore() vs, _ = build_vectorstore()
docs = vs.similarity_search(question, k=TOP_K) docs = vs.similarity_search(question, k=TOP_K)
lines = [f"[{i+1}] {d.page_content}" for i,d in enumerate(docs)] return "\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs))
return "\n".join(lines)
def raw_sample_tool_impl(n: int = 5) -> str: def raw_sample_tool_impl(arg: str = "") -> str:
"""Return n raw events (JSON) from the newest chunks.""" """
Return a few raw JSON events from the newest chunks.
Accepts the same filters as get_stats PLUS optional 'n:<int>' to control how many.
Examples:
'n:5 status:rejected min_amount:10000'
'divisa:EUR instant:true step:esito n:3'
"""
q = (arg or "").lower()
# helpers (same parsing as get_stats)
def _kv(key, pat=r"([^\s]+)"):
m = re.search(fr"{key}:{pat}", q)
return m.group(1) if m else None
n_s = _kv("n", r"(\d+)")
n = int(n_s) if n_s else 5
status_f = _kv("status")
step_f = _kv("step")
div_f = _kv("divisa")
vop_f = _kv("vop")
country = _kv("iban_country")
instant_s = _kv("instant")
min_amt_s = _kv("min_amount")
min_amt = float(min_amt_s) if min_amt_s else 0.0
inst_f = None
if instant_s in {"true","false"}:
inst_f = (instant_s == "true")
def _boolish(x):
if isinstance(x, bool): return x
if isinstance(x, str): return x.lower() in {"true","1","yes"}
return False
def keep(e):
try: amt = float(e.get("importo", 0) or 0)
except: amt = 0.0
if amt < min_amt: return False
if status_f and (str(e.get("status","")).lower() != status_f): return False
if step_f and (str(e.get("step","")).lower() != step_f): return False
if div_f and (str(e.get("divisa","")).upper() != div_f.upper()): return False
if vop_f:
v = str(e.get("vop_check","")).lower()
if v != vop_f: return False
if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f:
return False
if country:
iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper()
if not iban.startswith(country.upper()):
return False
return True
# load newest events and filter
files = _iter_chunk_files() files = _iter_chunk_files()
out = [] out = []
for fp in files: for fp in files:
for rec in _read_jsonl(fp): for rec in _read_jsonl(fp):
out.append(json.dumps(_normalize_event(rec), ensure_ascii=False)) evt = _normalize_event(rec)
if len(out) >= n: break if keep(evt):
if len(out) >= n: break out.append(json.dumps(evt, ensure_ascii=False))
if len(out) >= n:
break
if len(out) >= n:
break
if not out:
return "(no matching events)"
return "\n".join(out) return "\n".join(out)
# ---------- Build the agent ---------- # ---------- Build the agent ----------
def build_agent(): def build_agent():
llm = ChatOpenAI(model=MODEL, temperature=0.2) llm = make_llm(temperature=0.2)
tools = [ tools = [
Tool(name="get_stats", Tool(name="get_stats", func=stats_tool_impl,
func=stats_tool_impl, description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."),
description="Quick stats over recent events. Usage: pass a filter string like 'status:rejected min_amount:10000 step:esito'."), Tool(name="raw_samples", func=raw_sample_tool_impl,
Tool(name="retrieve_similar", description="Return a few raw JSON events. Accepts filters like get_stats and 'n:<int>'. Example: 'n:5 status:rejected min_amount:10000'.")
func=retrieve_tool_impl,
description="Semantic search over logs. Pass a natural-language question about bonifico logs."),
Tool(name="raw_samples",
func=raw_sample_tool_impl,
description="Return a few raw JSON events to inspect fields.")
] ]
if AZ_EMBED_DEPLOY:
tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl,
description="Semantic search over logs. Ask a question about bonifico logs."))
system = """You are a payments log analyst. Use the tools to inspect recent Splunk-derived logs for 'bonifico' events. system = """You are a payments log analyst. Use the tools to inspect recent Splunk-derived logs for 'bonifico' events.
- Prefer 'get_stats' for quick metrics (rejection rate, totals). - Prefer 'get_stats' for quick metrics (rejection rate, totals).
- Use 'retrieve_similar' to pull relevant examples before concluding. - Use 'retrieve_similar' (if available) to pull relevant examples before concluding.
- When asked for anomalies, treat as suspicious: rejected EUR transfers >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes. - When asked for anomalies, treat as suspicious: rejected EUR >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes.
Return a short, structured report with: Findings, Evidence (IDs/fields), and Recommended actions.""" Return a short, structured report with: Findings, Evidence, and Recommended actions."""
prompt = ChatPromptTemplate.from_messages([ prompt = ChatPromptTemplate.from_messages([
("system", system), ("system", system),
@ -156,19 +305,27 @@ Return a short, structured report with: Findings, Evidence (IDs/fields), and Rec
("human", "{input}"), ("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"), MessagesPlaceholder("agent_scratchpad"),
]) ])
agent = create_tool_calling_agent(llm, tools, prompt) agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) return AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
return executor
def run_default_question(): def run_default_question(question_override: str | None = None):
agent = build_agent() agent = build_agent()
question = ( question = question_override or (
"Scan the latest chunks. List any anomalies (e.g., rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " "Scan the latest chunks. List any anomalies "
"(rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps." "Give a brief summary and next steps."
) )
out = agent.invoke({"input": question, "chat_history": []}) out = agent.invoke({"input": question, "chat_history": []})
print("\n=== AGENT OUTPUT ===\n", out["output"]) result = out.get("output", "")
print("\n=== AGENT OUTPUT ===\n", result)
# Email the result if MAIL_ENABLED=true (handled inside notify.py)
try:
send_email(subject="[Intesa Logs] Agent Report", body_text=result)
except Exception as e:
print("[notify] email failed:", e)
if __name__ == "__main__": if __name__ == "__main__":
run_default_question() # optional CLI: allow a custom question
custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
run_default_question(custom if custom else None)