intesa_splunk/out/agent_runner.py
2025-09-24 16:37:32 +00:00

175 lines
7.0 KiB
Python

import os, glob, json, ujson, gzip, pathlib, re
from typing import List, Dict, Any
from dataclasses import dataclass
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain.tools import Tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
#export OPENAI_API_KEY="sk-..."
#SET API KEY^
# ---------- Config ----------
MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
EMB_MODEL = os.getenv("EMB_MODEL", "text-embedding-3-small")
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") # poller file sink
BLOB_DIR = os.getenv("BLOB_DIR", "") # optional local mirror of blobs
TOP_K = int(os.getenv("TOP_K", "12"))
# ---------- Load JSONL chunk files ----------
def _iter_chunk_files() -> List[pathlib.Path]:
paths = []
if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists():
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")]
if BLOB_DIR and pathlib.Path(BLOB_DIR).exists():
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)]
# newest first
return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True)
def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
data = path.read_bytes()
if path.suffix == ".gz":
data = gzip.decompress(data)
lines = data.splitlines()
out = []
for ln in lines:
if not ln.strip():
continue
try:
out.append(ujson.loads(ln))
except Exception:
# tolerate partial/corrupt lines
continue
return out
# Accept either raw events or HEC-shaped {"event": {...}}
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
evt = rec.get("event", rec)
# Ensure strings for some fields if needed
return evt
def _evt_to_text(evt: Dict[str, Any]) -> str:
# Compact text for embedding/RAG
parts = []
for k in ["event_type","transaction_id","step","status","importo","divisa","istantaneo",
"spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked",
"vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"]:
v = evt.get(k)
if v is not None:
parts.append(f"{k}={v}")
return "bonifico | " + " | ".join(parts)
# ---------- Build vector store ----------
def build_vectorstore(limit_files: int = 20):
files = _iter_chunk_files()[:limit_files]
if not files:
raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR")
docs = []
meta_index = []
for fp in files:
rows = _read_jsonl(fp)
for rec in rows:
evt = _normalize_event(rec)
txt = _evt_to_text(evt)
docs.append(Document(page_content=txt, metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}}))
meta_index.append(evt)
embeddings = OpenAIEmbeddings(model=EMB_MODEL)
vs = FAISS.from_documents(docs, embeddings)
return vs, meta_index
# ---------- Handy utilities (tools) ----------
def stats_tool_impl(query: str = "") -> str:
"""
Return quick stats from latest chunks. Query supports simple filters like:
'status:rejected min_amount:10000 step:esito'
"""
import math
vs, meta_index = build_vectorstore()
# simple filter pass
min_amount, step, status = 0.0, None, None
m = re.search(r"min_amount:(\d+(\.\d+)?)", query); min_amount = float(m.group(1)) if m else 0.0
m = re.search(r"step:(\w+)", query); step = m.group(1) if m else None
m = re.search(r"status:(\w+)", query); status = m.group(1) if m else None
total = 0; rej = 0; amt_sum = 0.0; hi = 0.0; hi_tx = None
for evt in meta_index:
try:
amt = float(evt.get("importo", 0))
except Exception:
amt = 0.0
if amt < min_amount: continue
if step and evt.get("step") != step: continue
if status and evt.get("status") != status: continue
total += 1
amt_sum += amt
if evt.get("status") == "rejected": rej += 1
if amt > hi: hi, hi_tx = amt, evt.get("transaction_id")
rr = f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})"
return rr
def retrieve_tool_impl(question: str) -> str:
"""Semantic retrieve top-K log snippets related to the question."""
vs, _ = build_vectorstore()
docs = vs.similarity_search(question, k=TOP_K)
lines = [f"[{i+1}] {d.page_content}" for i,d in enumerate(docs)]
return "\n".join(lines)
def raw_sample_tool_impl(n: int = 5) -> str:
"""Return n raw events (JSON) from the newest chunks."""
files = _iter_chunk_files()
out = []
for fp in files:
for rec in _read_jsonl(fp):
out.append(json.dumps(_normalize_event(rec), ensure_ascii=False))
if len(out) >= n: break
if len(out) >= n: break
return "\n".join(out)
# ---------- Build the agent ----------
def build_agent():
llm = ChatOpenAI(model=MODEL, temperature=0.2)
tools = [
Tool(name="get_stats",
func=stats_tool_impl,
description="Quick stats over recent events. Usage: pass a filter string like 'status:rejected min_amount:10000 step:esito'."),
Tool(name="retrieve_similar",
func=retrieve_tool_impl,
description="Semantic search over logs. Pass a natural-language question about bonifico logs."),
Tool(name="raw_samples",
func=raw_sample_tool_impl,
description="Return a few raw JSON events to inspect fields.")
]
system = """You are a payments log analyst. Use the tools to inspect recent Splunk-derived logs for 'bonifico' events.
- Prefer 'get_stats' for quick metrics (rejection rate, totals).
- Use 'retrieve_similar' to pull relevant examples before concluding.
- When asked for anomalies, treat as suspicious: rejected EUR transfers >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes.
Return a short, structured report with: Findings, Evidence (IDs/fields), and Recommended actions."""
prompt = ChatPromptTemplate.from_messages([
("system", system),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
return executor
def run_default_question():
agent = build_agent()
question = (
"Scan the latest chunks. List any anomalies (e.g., rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps."
)
out = agent.invoke({"input": question, "chat_history": []})
print("\n=== AGENT OUTPUT ===\n", out["output"])
if __name__ == "__main__":
run_default_question()