commit 6a85b7d3f01fcd8d87bcdea801f1cdaa6a77d6f8 Author: daniel.g Date: Wed Sep 24 16:37:32 2025 +0000 Upload files to "out" diff --git a/out/.env.example b/out/.env.example new file mode 100644 index 0000000..7012c59 --- /dev/null +++ b/out/.env.example @@ -0,0 +1,9 @@ +# Splunk admin password used on FIRST boot (persists in splunk-etc/var volumes) +SPLUNK_PASSWORD=Str0ngP@ss!9 + +# HEC token the seeder/curl will use (already accepted by the Splunk container) +SPLUNK_HEC_TOKEN=dev-0123456789abcdef + +# Azure (optional; only needed if SINK=blob or blob+sb) +AZURE_STORAGE_CONNECTION_STRING= +AZURE_SERVICEBUS_CONNECTION_STRING= diff --git a/out/agent_runner.py b/out/agent_runner.py new file mode 100644 index 0000000..58c38d0 --- /dev/null +++ b/out/agent_runner.py @@ -0,0 +1,174 @@ +import os, glob, json, ujson, gzip, pathlib, re +from typing import List, Dict, Any +from dataclasses import dataclass + +from langchain_openai import ChatOpenAI, OpenAIEmbeddings +from langchain_community.vectorstores import FAISS +from langchain_core.documents import Document +from langchain.tools import Tool +from langchain.agents import AgentExecutor, create_tool_calling_agent +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder + +#export OPENAI_API_KEY="sk-..." +#SET API KEY^ + +# ---------- Config ---------- +MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini") +EMB_MODEL = os.getenv("EMB_MODEL", "text-embedding-3-small") +CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") # poller file sink +BLOB_DIR = os.getenv("BLOB_DIR", "") # optional local mirror of blobs +TOP_K = int(os.getenv("TOP_K", "12")) + +# ---------- Load JSONL chunk files ---------- +def _iter_chunk_files() -> List[pathlib.Path]: + paths = [] + if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists(): + paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")] + if BLOB_DIR and pathlib.Path(BLOB_DIR).exists(): + paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)] + # newest first + return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True) + +def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]: + data = path.read_bytes() + if path.suffix == ".gz": + data = gzip.decompress(data) + lines = data.splitlines() + out = [] + for ln in lines: + if not ln.strip(): + continue + try: + out.append(ujson.loads(ln)) + except Exception: + # tolerate partial/corrupt lines + continue + return out + +# Accept either raw events or HEC-shaped {"event": {...}} +def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]: + evt = rec.get("event", rec) + # Ensure strings for some fields if needed + return evt + +def _evt_to_text(evt: Dict[str, Any]) -> str: + # Compact text for embedding/RAG + parts = [] + for k in ["event_type","transaction_id","step","status","importo","divisa","istantaneo", + "spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked", + "vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"]: + v = evt.get(k) + if v is not None: + parts.append(f"{k}={v}") + return "bonifico | " + " | ".join(parts) + +# ---------- Build vector store ---------- +def build_vectorstore(limit_files: int = 20): + files = _iter_chunk_files()[:limit_files] + if not files: + raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR") + docs = [] + meta_index = [] + for fp in files: + rows = _read_jsonl(fp) + for rec in rows: + evt = _normalize_event(rec) + txt = _evt_to_text(evt) + docs.append(Document(page_content=txt, metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}})) + meta_index.append(evt) + embeddings = OpenAIEmbeddings(model=EMB_MODEL) + vs = FAISS.from_documents(docs, embeddings) + return vs, meta_index + +# ---------- Handy utilities (tools) ---------- +def stats_tool_impl(query: str = "") -> str: + """ + Return quick stats from latest chunks. Query supports simple filters like: + 'status:rejected min_amount:10000 step:esito' + """ + import math + vs, meta_index = build_vectorstore() + # simple filter pass + min_amount, step, status = 0.0, None, None + m = re.search(r"min_amount:(\d+(\.\d+)?)", query); min_amount = float(m.group(1)) if m else 0.0 + m = re.search(r"step:(\w+)", query); step = m.group(1) if m else None + m = re.search(r"status:(\w+)", query); status = m.group(1) if m else None + + total = 0; rej = 0; amt_sum = 0.0; hi = 0.0; hi_tx = None + for evt in meta_index: + try: + amt = float(evt.get("importo", 0)) + except Exception: + amt = 0.0 + if amt < min_amount: continue + if step and evt.get("step") != step: continue + if status and evt.get("status") != status: continue + total += 1 + amt_sum += amt + if evt.get("status") == "rejected": rej += 1 + if amt > hi: hi, hi_tx = amt, evt.get("transaction_id") + rr = f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})" + return rr + +def retrieve_tool_impl(question: str) -> str: + """Semantic retrieve top-K log snippets related to the question.""" + vs, _ = build_vectorstore() + docs = vs.similarity_search(question, k=TOP_K) + lines = [f"[{i+1}] {d.page_content}" for i,d in enumerate(docs)] + return "\n".join(lines) + +def raw_sample_tool_impl(n: int = 5) -> str: + """Return n raw events (JSON) from the newest chunks.""" + files = _iter_chunk_files() + out = [] + for fp in files: + for rec in _read_jsonl(fp): + out.append(json.dumps(_normalize_event(rec), ensure_ascii=False)) + if len(out) >= n: break + if len(out) >= n: break + return "\n".join(out) + +# ---------- Build the agent ---------- +def build_agent(): + llm = ChatOpenAI(model=MODEL, temperature=0.2) + + tools = [ + Tool(name="get_stats", + func=stats_tool_impl, + description="Quick stats over recent events. Usage: pass a filter string like 'status:rejected min_amount:10000 step:esito'."), + Tool(name="retrieve_similar", + func=retrieve_tool_impl, + description="Semantic search over logs. Pass a natural-language question about bonifico logs."), + Tool(name="raw_samples", + func=raw_sample_tool_impl, + description="Return a few raw JSON events to inspect fields.") + ] + + system = """You are a payments log analyst. Use the tools to inspect recent Splunk-derived logs for 'bonifico' events. +- Prefer 'get_stats' for quick metrics (rejection rate, totals). +- Use 'retrieve_similar' to pull relevant examples before concluding. +- When asked for anomalies, treat as suspicious: rejected EUR transfers >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes. +Return a short, structured report with: Findings, Evidence (IDs/fields), and Recommended actions.""" + + prompt = ChatPromptTemplate.from_messages([ + ("system", system), + MessagesPlaceholder("chat_history"), + ("human", "{input}"), + MessagesPlaceholder("agent_scratchpad"), + ]) + + agent = create_tool_calling_agent(llm, tools, prompt) + executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) + return executor + +def run_default_question(): + agent = build_agent() + question = ( + "Scan the latest chunks. List any anomalies (e.g., rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " + "Give a brief summary and next steps." + ) + out = agent.invoke({"input": question, "chat_history": []}) + print("\n=== AGENT OUTPUT ===\n", out["output"]) + +if __name__ == "__main__": + run_default_question() diff --git a/out/compose.yaml b/out/compose.yaml new file mode 100644 index 0000000..01ad97f --- /dev/null +++ b/out/compose.yaml @@ -0,0 +1,62 @@ +version: "3.9" + +services: + splunk: + image: splunk/splunk:9.4.2 + container_name: splunk + restart: unless-stopped + ports: + - "8000:8000" # Splunk Web + - "8088:8088" # HEC + - "8089:8089" # Management API + environment: + SPLUNK_START_ARGS: --accept-license + SPLUNK_PASSWORD: ${SPLUNK_PASSWORD} + SPLUNK_HEC_TOKEN: ${SPLUNK_HEC_TOKEN} + volumes: + - splunk-etc:/opt/splunk/etc + - splunk-var:/opt/splunk/var + healthcheck: + test: ["CMD-SHELL", "curl -sk https://localhost:8089/services/server/info | grep -q version"] + interval: 10s + timeout: 5s + retries: 30 + + poller: + build: + context: ./poller + container_name: splunk-poller + restart: unless-stopped + depends_on: + splunk: + condition: service_healthy + environment: + # --- Splunk connection --- + SPLUNK_HOST: splunk + SPLUNK_PORT: "8089" + SPLUNK_USER: admin + SPLUNK_PW: ${SPLUNK_PASSWORD} + SPLUNK_VERIFY_SSL: "false" # self-signed in container + # --- What to read --- + SPLUNK_INDEX: intesa_payments + SPLUNK_SOURCETYPE: intesa:bonifico + INITIAL_LOOKBACK: -24h@h + # --- Polling / chunking --- + SLEEP_SECONDS: "60" + MAX_CHUNK_BYTES: "1800000" + CREATE_INDEX_IF_MISSING: "true" + # --- Sink selection: file | blob | blob+sb --- + SINK: file + OUTDIR: /app/out + # --- Azure (only if using blob / blob+sb) --- + AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-} + AZURE_STORAGE_CONTAINER: bank-logs + AZURE_SERVICEBUS_CONNECTION_STRING: ${AZURE_SERVICEBUS_CONNECTION_STRING:-} + AZURE_SERVICEBUS_QUEUE: log-chunks + AZURE_COMPRESS: "true" + volumes: + - ./out:/app/out + +volumes: + splunk-etc: + splunk-var: diff --git a/out/offline_analyzer.py b/out/offline_analyzer.py new file mode 100644 index 0000000..4eb810b --- /dev/null +++ b/out/offline_analyzer.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +import os, glob, json, gzip, time, pathlib, math, statistics as stats +from datetime import datetime, timezone + +CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") +REPORT_DIR = pathlib.Path(os.getenv("REPORT_DIR", "./reports")) +REPORT_DIR.mkdir(parents=True, exist_ok=True) + +def _iter_files(): + paths = sorted(glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")) + for p in paths: + yield pathlib.Path(p) + +def _read_jsonl(p: pathlib.Path): + data = p.read_bytes() + if p.suffix == ".gz": + data = gzip.decompress(data) + for line in data.splitlines(): + if not line.strip(): continue + try: + rec = json.loads(line) + yield rec.get("event", rec) # accept HEC shape or plain + except Exception: + continue + +def _to_float(x, default=0.0): + try: + if isinstance(x, (int, float)): return float(x) + if isinstance(x, str): return float(x.strip().replace(",", "")) + except Exception: + pass + return default + +def _boolish(x): + if isinstance(x, bool): return x + if isinstance(x, str): return x.lower() in {"true","1","yes"} + return False + +def analyze(events): + total = 0 + by_status = {} + by_step = {} + total_amt = 0.0 + amounts = [] + inst_count = 0 + latencies = {"compila": [], "conferma": [], "esito": []} + rejections = [] + vop_flags = [] + by_minute = {} + + anomalies = [] # collected dicts + + for e in events: + total += 1 + st = str(e.get("status","")).lower() or "unknown" + step = str(e.get("step","")).lower() or "unknown" + by_status[st] = by_status.get(st,0)+1 + by_step[step] = by_step.get(step,0)+1 + + amt = _to_float(e.get("importo")) + total_amt += amt + amounts.append(amt) + + if _boolish(e.get("istantaneo")): inst_count += 1 + + lat = _to_float(e.get("latency_ms"), default=None) + if lat is not None and step in latencies: latencies[step].append(lat) + + # time bucket (minute) + ts = e.get("data_pagamento") or e.get("_time") + if isinstance(ts, str) and len(ts)>=16: + key = ts[:16] # 'YYYY-MM-DDTHH:MM' + by_minute[key] = by_minute.get(key, 0) + 1 + + # collect rejection info + if st == "rejected": + rejections.append(e) + + # vop flags + vop = (e.get("vop_check") or "").lower() + if vop in {"no_match","close_match"}: + vop_flags.append({"transaction_id": e.get("transaction_id"), + "vop_check": vop, "vop_score": e.get("vop_score"), + "importo": amt, "divisa": e.get("divisa")}) + + # --- anomaly rules --- + # A1: rejected EUR >= 10k + if st=="rejected" and (e.get("divisa")=="EUR") and amt >= 10000: + anomalies.append({"rule":"A1_rejected_high_value_eur", + "transaction_id": e.get("transaction_id"), + "amount": amt, + "divisa": e.get("divisa"), + "iban_dest_masked": e.get("iban_dest_masked"), + "causale": e.get("causale")}) + + # A2: VOP no_match or low score & amount >= 5k + vop_score = _to_float(e.get("vop_score"), default=None) + if (vop=="no_match") or (vop=="close_match" and vop_score is not None and vop_score < 0.75 and amt>=5000): + anomalies.append({"rule":"A2_vop_flagged", + "transaction_id": e.get("transaction_id"), + "vop_check": vop, + "vop_score": vop_score, + "amount": amt}) + + # A3: high latency per step + thr = {"compila":600, "conferma":800, "esito":900}.get(step, 900) + if lat is not None and lat > thr: + anomalies.append({"rule":"A3_high_latency", + "transaction_id": e.get("transaction_id"), + "step": step, "latency_ms": lat}) + + # A4: instant transfer but pending/rejected + if _boolish(e.get("istantaneo")) and st in {"pending","rejected"} and step=="esito": + anomalies.append({"rule":"A4_instant_not_accepted", + "transaction_id": e.get("transaction_id"), + "status": st, "amount": amt}) + + # spike detection (very simple): minute counts > mean+3*std + if by_minute: + counts = list(by_minute.values()) + mu = stats.mean(counts) + sd = stats.pstdev(counts) if len(counts)>1 else 0.0 + for minute, c in by_minute.items(): + if sd>0 and c > mu + 3*sd: + anomalies.append({"rule":"A5_volume_spike", "minute": minute, "count": c, "mu": round(mu,2), "sd": round(sd,2)}) + + summary = { + "events": total, + "accepted": by_status.get("accepted",0), + "pending": by_status.get("pending",0), + "rejected": by_status.get("rejected",0), + "rejection_rate": round(by_status.get("rejected",0)/max(total,1), 4), + "total_amount": round(total_amt,2), + "avg_amount": round((sum(amounts)/len(amounts)) if amounts else 0.0, 2), + "instant_share": round(inst_count/max(total,1), 4), + "by_step": by_step, + "latency_avg_ms": {k:(round(sum(v)/len(v),1) if v else None) for k,v in latencies.items()}, + "vop_flags": len(vop_flags), + "spike_minutes": len([a for a in anomalies if a["rule"]=="A5_volume_spike"]), + "anomaly_count": len(anomalies), + } + return summary, anomalies + +def load_all_events(): + files = list(_iter_files()) + if not files: + raise SystemExit(f"No chunk files in {CHUNK_DIR}.") + events = [] + for p in files: + events.extend(_read_jsonl(p)) + return events + +def write_reports(summary, anomalies): + ts = int(time.time()) + md_path = REPORT_DIR / f"report_{ts}.md" + js_path = REPORT_DIR / f"anomalies_{ts}.json" + # Markdown + md = [] + md.append(f"# Bonifico Log Analysis — {datetime.now(timezone.utc).isoformat()}") + md.append("") + md.append("## Summary") + for k,v in summary.items(): + if isinstance(v, dict): + md.append(f"- **{k}**: `{json.dumps(v, ensure_ascii=False)}`") + else: + md.append(f"- **{k}**: `{v}`") + md.append("") + md.append("## Anomalies") + if not anomalies: + md.append("_None detected by rules._") + else: + for a in anomalies[:200]: + md.append(f"- `{a['rule']}` — `{json.dumps(a, ensure_ascii=False)}`") + if len(anomalies) > 200: + md.append(f"... and {len(anomalies)-200} more.") + md_path.write_text("\n".join(md), encoding="utf-8") + # JSON + js_path.write_text(json.dumps({"summary":summary,"anomalies":anomalies}, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"Wrote {md_path}") + print(f"Wrote {js_path}") + +if __name__ == "__main__": + evts = load_all_events() + summary, anomalies = analyze(evts) + write_reports(summary, anomalies) + # also print a short console digest + print("\nDigest:", json.dumps({ + "events": summary["events"], + "rejection_rate": summary["rejection_rate"], + "anomaly_count": summary["anomaly_count"] + })) diff --git a/out/sampleLogs.txt b/out/sampleLogs.txt new file mode 100644 index 0000000..12d5678 --- /dev/null +++ b/out/sampleLogs.txt @@ -0,0 +1,65 @@ +#Cli preset parameters +HEC_URL="https://localhost:8088/services/collector/event" +HEC_TOKEN="dev-0123456789abcdef" +INDEX="intesa_payments" +SOURCETYPE="intesa:bonifico" + +#Cli script for generating logs +gen_iban(){ local d=""; for _ in $(seq 1 25); do d="${d}$((RANDOM%10))"; done; echo "IT${d}"; } +mask_iban(){ local i="$1"; local pre="${i:0:6}"; local suf="${i: -4}"; local n=$(( ${#i}-10 )); printf "%s%0.s*" "$pre" $(seq 1 $n); echo -n "$suf"; } +rand_amount(){ awk 'BEGIN{srand(); printf "%.2f", 5+rand()*14995}'; } +rand_bool_str(){ if ((RANDOM%2)); then echo "true"; else echo "false"; fi; } +pick(){ local a=("$@"); echo "${a[$RANDOM%${#a[@]}]}"; } + +spese=(SHA OUR BEN) +divise=(EUR EUR EUR EUR USD GBP) +statuses=(accepted pending rejected) + +for tx in {1..20}; do + txid=$(cat /proc/sys/kernel/random/uuid 2>/dev/null || uuidgen 2>/dev/null || openssl rand -hex 16) + t0=$(date -u +%s); t1=$((t0+1)); t2=$((t1+2)) + iso0=$(date -u -d @$t0 +%FT%T.%6NZ) + iso1=$(date -u -d @$t1 +%FT%T.%6NZ) + iso2=$(date -u -d @$t2 +%FT%T.%6NZ) + + src=$(gen_iban); dst=$(gen_iban) + srcm=$(mask_iban "$src"); dstm=$(mask_iban "$dst") + amt=$(rand_amount) + dv=$(pick "${divise[@]}") + inst=$(rand_bool_str) + sp=$(pick "${spese[@]}") + final=$(pick "${statuses[@]}") + + send() { + local when="$1" iso="$2" step="$3" status="$4" + curl -sk "$HEC_URL" \ + -H "Authorization: Splunk $HEC_TOKEN" -H "Content-Type: application/json" \ + -d @- <