diff --git a/agent_runner.py b/agent_runner.py index 58c38d0..cb45054 100644 --- a/agent_runner.py +++ b/agent_runner.py @@ -1,154 +1,303 @@ -import os, glob, json, ujson, gzip, pathlib, re +import os, sys, glob, json, ujson, gzip, pathlib, re from typing import List, Dict, Any -from dataclasses import dataclass -from langchain_openai import ChatOpenAI, OpenAIEmbeddings +from dotenv import load_dotenv +from notify import send_email +from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from langchain.tools import Tool from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -#export OPENAI_API_KEY="sk-..." -#SET API KEY^ +# ----- load .env (defaults to ./.env; override with ENV_FILE=/path/to/.env) ----- +load_dotenv(os.getenv("ENV_FILE", ".env")) -# ---------- Config ---------- -MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini") -EMB_MODEL = os.getenv("EMB_MODEL", "text-embedding-3-small") -CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") # poller file sink -BLOB_DIR = os.getenv("BLOB_DIR", "") # optional local mirror of blobs +# ----- read env (supports both AZURE_* and AOAI_*) ----- +def _norm_endpoint(ep: str | None) -> str: + if not ep: return "" + ep = ep.strip().rstrip("/") + # strip any trailing /openai[/v...] + ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep) + return ep + "/" + +AZ_ENDPOINT = _norm_endpoint( + os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT") +) +AZ_API_KEY = ( + os.getenv("AZURE_OPENAI_API_KEY") + or os.getenv("AOAI_API_KEY") + or os.getenv("OPENAI_API_KEY") +) +AZ_API_VERSION = ( + os.getenv("AZURE_OPENAI_API_VERSION") + or os.getenv("AOAI_API_VERSION") + or "2025-01-01-preview" +) +AZ_CHAT_DEPLOY = ( + os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") + or os.getenv("AOAI_CHAT_DEPLOYMENT") + or "gpt-4o-mini" +) +AZ_EMBED_DEPLOY = ( + os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT") + or os.getenv("AOAI_EMBED_DEPLOYMENT") + or "" +) + +# ----- local data config ----- +CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") +BLOB_DIR = os.getenv("BLOB_DIR", "") TOP_K = int(os.getenv("TOP_K", "12")) +# ---------- Helpers to build LLM/Embeddings for Azure OpenAI ---------- +def make_llm(temperature: float = 0.2) -> AzureChatOpenAI: + if not AZ_ENDPOINT or not AZ_API_KEY: + raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).") + return AzureChatOpenAI( + azure_endpoint=AZ_ENDPOINT, + api_key=AZ_API_KEY, + api_version=AZ_API_VERSION, + azure_deployment=AZ_CHAT_DEPLOY, + temperature=temperature, + ) + +def make_embeddings() -> AzureOpenAIEmbeddings | None: + if not AZ_EMBED_DEPLOY: + return None + return AzureOpenAIEmbeddings( + azure_endpoint=AZ_ENDPOINT, + api_key=AZ_API_KEY, + api_version=AZ_API_VERSION, + azure_deployment=AZ_EMBED_DEPLOY, + ) + # ---------- Load JSONL chunk files ---------- def _iter_chunk_files() -> List[pathlib.Path]: - paths = [] + paths: List[pathlib.Path] = [] if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists(): paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")] if BLOB_DIR and pathlib.Path(BLOB_DIR).exists(): paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)] - # newest first return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True) def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]: data = path.read_bytes() if path.suffix == ".gz": data = gzip.decompress(data) - lines = data.splitlines() - out = [] - for ln in lines: - if not ln.strip(): - continue + out: List[Dict[str, Any]] = [] + for ln in data.splitlines(): + if not ln.strip(): continue try: out.append(ujson.loads(ln)) except Exception: - # tolerate partial/corrupt lines continue return out # Accept either raw events or HEC-shaped {"event": {...}} def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]: - evt = rec.get("event", rec) - # Ensure strings for some fields if needed - return evt + return rec.get("event", rec) def _evt_to_text(evt: Dict[str, Any]) -> str: - # Compact text for embedding/RAG - parts = [] - for k in ["event_type","transaction_id","step","status","importo","divisa","istantaneo", - "spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked", - "vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"]: - v = evt.get(k) - if v is not None: - parts.append(f"{k}={v}") + keys = ["event_type","transaction_id","step","status","importo","divisa","istantaneo", + "spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked", + "vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"] + parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None] return "bonifico | " + " | ".join(parts) -# ---------- Build vector store ---------- +# ---------- Build vector store (only if embeddings deployment exists) ---------- def build_vectorstore(limit_files: int = 20): + embs = make_embeddings() + if embs is None: + raise RuntimeError("No embeddings deployment set. Export AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT.") files = _iter_chunk_files()[:limit_files] if not files: - raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR") - docs = [] - meta_index = [] + raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR.") + docs, meta_index = [], [] for fp in files: rows = _read_jsonl(fp) for rec in rows: evt = _normalize_event(rec) - txt = _evt_to_text(evt) - docs.append(Document(page_content=txt, metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}})) + docs.append(Document( + page_content=_evt_to_text(evt), + metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}} + )) meta_index.append(evt) - embeddings = OpenAIEmbeddings(model=EMB_MODEL) - vs = FAISS.from_documents(docs, embeddings) + vs = FAISS.from_documents(docs, embs) return vs, meta_index -# ---------- Handy utilities (tools) ---------- +# ---------- Tools ---------- def stats_tool_impl(query: str = "") -> str: """ - Return quick stats from latest chunks. Query supports simple filters like: - 'status:rejected min_amount:10000 step:esito' + Filters supported in `query` (space-separated): + status: + step: + divisa: + instant: + vop: + min_amount: + iban_country:<2-letter e.g., IT> + Examples: + 'status:rejected min_amount:10000' + 'vop:no_match step:esito' + 'divisa:EUR instant:true' """ - import math - vs, meta_index = build_vectorstore() - # simple filter pass - min_amount, step, status = 0.0, None, None - m = re.search(r"min_amount:(\d+(\.\d+)?)", query); min_amount = float(m.group(1)) if m else 0.0 - m = re.search(r"step:(\w+)", query); step = m.group(1) if m else None - m = re.search(r"status:(\w+)", query); status = m.group(1) if m else None + # load recent events into memory + files = _iter_chunk_files()[:20] + events = [] + for fp in files: + for rec in _read_jsonl(fp): + events.append(_normalize_event(rec)) - total = 0; rej = 0; amt_sum = 0.0; hi = 0.0; hi_tx = None - for evt in meta_index: - try: - amt = float(evt.get("importo", 0)) - except Exception: - amt = 0.0 - if amt < min_amount: continue - if step and evt.get("step") != step: continue - if status and evt.get("status") != status: continue - total += 1 + # parse filters + q = query.lower() + def _kv(key, pat=r"([^\s]+)"): + m = re.search(fr"{key}:{pat}", q) + return m.group(1) if m else None + + status_f = _kv("status") + step_f = _kv("step") + div_f = _kv("divisa") + vop_f = _kv("vop") + country = _kv("iban_country") + instant_s = _kv("instant") + min_amt_s = _kv("min_amount") + min_amt = float(min_amt_s) if min_amt_s else 0.0 + inst_f = None + if instant_s in {"true","false"}: + inst_f = (instant_s == "true") + + def _boolish(x): + if isinstance(x, bool): return x + if isinstance(x, str): return x.lower() in {"true","1","yes"} + return False + + def keep(e): + try: amt = float(e.get("importo", 0) or 0) + except: amt = 0.0 + if amt < min_amt: return False + if status_f and (str(e.get("status","")).lower() != status_f): return False + if step_f and (str(e.get("step","")).lower() != step_f): return False + if div_f and (str(e.get("divisa","")).upper() != div_f.upper()): return False + if vop_f: + v = str(e.get("vop_check","")).lower() + if v != vop_f: return False + if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f: + return False + if country: + # heuristic from IBAN (dest or origin) + iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper() + if not iban.startswith(country.upper()): + return False + return True + + filtered = [e for e in events if keep(e)] + + total = len(filtered) + rej = sum(1 for e in filtered if str(e.get("status","")).lower()=="rejected") + amt_sum = 0.0; hi = 0.0; hi_tx = None + for e in filtered: + try: amt = float(e.get("importo", 0) or 0) + except: amt = 0.0 amt_sum += amt - if evt.get("status") == "rejected": rej += 1 - if amt > hi: hi, hi_tx = amt, evt.get("transaction_id") - rr = f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})" - return rr + if amt > hi: + hi, hi_tx = amt, e.get("transaction_id") + return f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})" def retrieve_tool_impl(question: str) -> str: - """Semantic retrieve top-K log snippets related to the question.""" vs, _ = build_vectorstore() docs = vs.similarity_search(question, k=TOP_K) - lines = [f"[{i+1}] {d.page_content}" for i,d in enumerate(docs)] - return "\n".join(lines) + return "\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs)) -def raw_sample_tool_impl(n: int = 5) -> str: - """Return n raw events (JSON) from the newest chunks.""" +def raw_sample_tool_impl(arg: str = "") -> str: + """ + Return a few raw JSON events from the newest chunks. + Accepts the same filters as get_stats PLUS optional 'n:' to control how many. + Examples: + 'n:5 status:rejected min_amount:10000' + 'divisa:EUR instant:true step:esito n:3' + """ + q = (arg or "").lower() + + # helpers (same parsing as get_stats) + def _kv(key, pat=r"([^\s]+)"): + m = re.search(fr"{key}:{pat}", q) + return m.group(1) if m else None + + n_s = _kv("n", r"(\d+)") + n = int(n_s) if n_s else 5 + status_f = _kv("status") + step_f = _kv("step") + div_f = _kv("divisa") + vop_f = _kv("vop") + country = _kv("iban_country") + instant_s = _kv("instant") + min_amt_s = _kv("min_amount") + min_amt = float(min_amt_s) if min_amt_s else 0.0 + + inst_f = None + if instant_s in {"true","false"}: + inst_f = (instant_s == "true") + + def _boolish(x): + if isinstance(x, bool): return x + if isinstance(x, str): return x.lower() in {"true","1","yes"} + return False + + def keep(e): + try: amt = float(e.get("importo", 0) or 0) + except: amt = 0.0 + if amt < min_amt: return False + if status_f and (str(e.get("status","")).lower() != status_f): return False + if step_f and (str(e.get("step","")).lower() != step_f): return False + if div_f and (str(e.get("divisa","")).upper() != div_f.upper()): return False + if vop_f: + v = str(e.get("vop_check","")).lower() + if v != vop_f: return False + if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f: + return False + if country: + iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper() + if not iban.startswith(country.upper()): + return False + return True + + # load newest events and filter files = _iter_chunk_files() out = [] for fp in files: for rec in _read_jsonl(fp): - out.append(json.dumps(_normalize_event(rec), ensure_ascii=False)) - if len(out) >= n: break - if len(out) >= n: break + evt = _normalize_event(rec) + if keep(evt): + out.append(json.dumps(evt, ensure_ascii=False)) + if len(out) >= n: + break + if len(out) >= n: + break + + if not out: + return "(no matching events)" return "\n".join(out) + # ---------- Build the agent ---------- def build_agent(): - llm = ChatOpenAI(model=MODEL, temperature=0.2) - + llm = make_llm(temperature=0.2) tools = [ - Tool(name="get_stats", - func=stats_tool_impl, - description="Quick stats over recent events. Usage: pass a filter string like 'status:rejected min_amount:10000 step:esito'."), - Tool(name="retrieve_similar", - func=retrieve_tool_impl, - description="Semantic search over logs. Pass a natural-language question about bonifico logs."), - Tool(name="raw_samples", - func=raw_sample_tool_impl, - description="Return a few raw JSON events to inspect fields.") + Tool(name="get_stats", func=stats_tool_impl, + description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."), + Tool(name="raw_samples", func=raw_sample_tool_impl, + description="Return a few raw JSON events. Accepts filters like get_stats and 'n:'. Example: 'n:5 status:rejected min_amount:10000'.") ] + if AZ_EMBED_DEPLOY: + tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl, + description="Semantic search over logs. Ask a question about bonifico logs.")) system = """You are a payments log analyst. Use the tools to inspect recent Splunk-derived logs for 'bonifico' events. - Prefer 'get_stats' for quick metrics (rejection rate, totals). -- Use 'retrieve_similar' to pull relevant examples before concluding. -- When asked for anomalies, treat as suspicious: rejected EUR transfers >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes. -Return a short, structured report with: Findings, Evidence (IDs/fields), and Recommended actions.""" +- Use 'retrieve_similar' (if available) to pull relevant examples before concluding. +- When asked for anomalies, treat as suspicious: rejected EUR >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes. +Return a short, structured report with: Findings, Evidence, and Recommended actions.""" prompt = ChatPromptTemplate.from_messages([ ("system", system), @@ -156,19 +305,27 @@ Return a short, structured report with: Findings, Evidence (IDs/fields), and Rec ("human", "{input}"), MessagesPlaceholder("agent_scratchpad"), ]) - agent = create_tool_calling_agent(llm, tools, prompt) - executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) - return executor + return AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) -def run_default_question(): +def run_default_question(question_override: str | None = None): agent = build_agent() - question = ( - "Scan the latest chunks. List any anomalies (e.g., rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " + question = question_override or ( + "Scan the latest chunks. List any anomalies " + "(rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " "Give a brief summary and next steps." ) out = agent.invoke({"input": question, "chat_history": []}) - print("\n=== AGENT OUTPUT ===\n", out["output"]) + result = out.get("output", "") + print("\n=== AGENT OUTPUT ===\n", result) + + # Email the result if MAIL_ENABLED=true (handled inside notify.py) + try: + send_email(subject="[Intesa Logs] Agent Report", body_text=result) + except Exception as e: + print("[notify] email failed:", e) if __name__ == "__main__": - run_default_question() + # optional CLI: allow a custom question + custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None + run_default_question(custom if custom else None)