intesa_splunk/analyzer/offline_analyzer.py

192 lines
7.1 KiB
Python

#!/usr/bin/env python3
import os, glob, json, gzip, time, pathlib, math, statistics as stats
from datetime import datetime, timezone
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out")
REPORT_DIR = pathlib.Path(os.getenv("REPORT_DIR", "./reports"))
REPORT_DIR.mkdir(parents=True, exist_ok=True)
def _iter_files():
paths = sorted(glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*"))
for p in paths:
yield pathlib.Path(p)
def _read_jsonl(p: pathlib.Path):
data = p.read_bytes()
if p.suffix == ".gz":
data = gzip.decompress(data)
for line in data.splitlines():
if not line.strip(): continue
try:
rec = json.loads(line)
yield rec.get("event", rec) # accept HEC shape or plain
except Exception:
continue
def _to_float(x, default=0.0):
try:
if isinstance(x, (int, float)): return float(x)
if isinstance(x, str): return float(x.strip().replace(",", ""))
except Exception:
pass
return default
def _boolish(x):
if isinstance(x, bool): return x
if isinstance(x, str): return x.lower() in {"true","1","yes"}
return False
def analyze(events):
total = 0
by_status = {}
by_step = {}
total_amt = 0.0
amounts = []
inst_count = 0
latencies = {"compila": [], "conferma": [], "esito": []}
rejections = []
vop_flags = []
by_minute = {}
anomalies = [] # collected dicts
for e in events:
total += 1
st = str(e.get("status","")).lower() or "unknown"
step = str(e.get("step","")).lower() or "unknown"
by_status[st] = by_status.get(st,0)+1
by_step[step] = by_step.get(step,0)+1
amt = _to_float(e.get("importo"))
total_amt += amt
amounts.append(amt)
if _boolish(e.get("istantaneo")): inst_count += 1
lat = _to_float(e.get("latency_ms"), default=None)
if lat is not None and step in latencies: latencies[step].append(lat)
# time bucket (minute)
ts = e.get("data_pagamento") or e.get("_time")
if isinstance(ts, str) and len(ts)>=16:
key = ts[:16] # 'YYYY-MM-DDTHH:MM'
by_minute[key] = by_minute.get(key, 0) + 1
# collect rejection info
if st == "rejected":
rejections.append(e)
# vop flags
vop = (e.get("vop_check") or "").lower()
if vop in {"no_match","close_match"}:
vop_flags.append({"transaction_id": e.get("transaction_id"),
"vop_check": vop, "vop_score": e.get("vop_score"),
"importo": amt, "divisa": e.get("divisa")})
# --- anomaly rules ---
# A1: rejected EUR >= 10k
if st=="rejected" and (e.get("divisa")=="EUR") and amt >= 10000:
anomalies.append({"rule":"A1_rejected_high_value_eur",
"transaction_id": e.get("transaction_id"),
"amount": amt,
"divisa": e.get("divisa"),
"iban_dest_masked": e.get("iban_dest_masked"),
"causale": e.get("causale")})
# A2: VOP no_match or low score & amount >= 5k
vop_score = _to_float(e.get("vop_score"), default=None)
if (vop=="no_match") or (vop=="close_match" and vop_score is not None and vop_score < 0.75 and amt>=5000):
anomalies.append({"rule":"A2_vop_flagged",
"transaction_id": e.get("transaction_id"),
"vop_check": vop,
"vop_score": vop_score,
"amount": amt})
# A3: high latency per step
thr = {"compila":600, "conferma":800, "esito":900}.get(step, 900)
if lat is not None and lat > thr:
anomalies.append({"rule":"A3_high_latency",
"transaction_id": e.get("transaction_id"),
"step": step, "latency_ms": lat})
# A4: instant transfer but pending/rejected
if _boolish(e.get("istantaneo")) and st in {"pending","rejected"} and step=="esito":
anomalies.append({"rule":"A4_instant_not_accepted",
"transaction_id": e.get("transaction_id"),
"status": st, "amount": amt})
# spike detection (very simple): minute counts > mean+3*std
if by_minute:
counts = list(by_minute.values())
mu = stats.mean(counts)
sd = stats.pstdev(counts) if len(counts)>1 else 0.0
for minute, c in by_minute.items():
if sd>0 and c > mu + 3*sd:
anomalies.append({"rule":"A5_volume_spike", "minute": minute, "count": c, "mu": round(mu,2), "sd": round(sd,2)})
summary = {
"events": total,
"accepted": by_status.get("accepted",0),
"pending": by_status.get("pending",0),
"rejected": by_status.get("rejected",0),
"rejection_rate": round(by_status.get("rejected",0)/max(total,1), 4),
"total_amount": round(total_amt,2),
"avg_amount": round((sum(amounts)/len(amounts)) if amounts else 0.0, 2),
"instant_share": round(inst_count/max(total,1), 4),
"by_step": by_step,
"latency_avg_ms": {k:(round(sum(v)/len(v),1) if v else None) for k,v in latencies.items()},
"vop_flags": len(vop_flags),
"spike_minutes": len([a for a in anomalies if a["rule"]=="A5_volume_spike"]),
"anomaly_count": len(anomalies),
}
return summary, anomalies
def load_all_events():
files = list(_iter_files())
if not files:
raise SystemExit(f"No chunk files in {CHUNK_DIR}.")
events = []
for p in files:
events.extend(_read_jsonl(p))
return events
def write_reports(summary, anomalies):
ts = int(time.time())
md_path = REPORT_DIR / f"report_{ts}.md"
js_path = REPORT_DIR / f"anomalies_{ts}.json"
# Markdown
md = []
md.append(f"# Bonifico Log Analysis — {datetime.now(timezone.utc).isoformat()}")
md.append("")
md.append("## Summary")
for k,v in summary.items():
if isinstance(v, dict):
md.append(f"- **{k}**: `{json.dumps(v, ensure_ascii=False)}`")
else:
md.append(f"- **{k}**: `{v}`")
md.append("")
md.append("## Anomalies")
if not anomalies:
md.append("_None detected by rules._")
else:
for a in anomalies[:200]:
md.append(f"- `{a['rule']}` — `{json.dumps(a, ensure_ascii=False)}`")
if len(anomalies) > 200:
md.append(f"... and {len(anomalies)-200} more.")
md_path.write_text("\n".join(md), encoding="utf-8")
# JSON
js_path.write_text(json.dumps({"summary":summary,"anomalies":anomalies}, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Wrote {md_path}")
print(f"Wrote {js_path}")
if __name__ == "__main__":
evts = load_all_events()
summary, anomalies = analyze(evts)
write_reports(summary, anomalies)
# also print a short console digest
print("\nDigest:", json.dumps({
"events": summary["events"],
"rejection_rate": summary["rejection_rate"],
"anomaly_count": summary["anomaly_count"]
}))