#!/usr/bin/env python3 import os, glob, json, gzip, time, pathlib, math, statistics as stats from datetime import datetime, timezone CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") REPORT_DIR = pathlib.Path(os.getenv("REPORT_DIR", "./reports")) REPORT_DIR.mkdir(parents=True, exist_ok=True) def _iter_files(): paths = sorted(glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")) for p in paths: yield pathlib.Path(p) def _read_jsonl(p: pathlib.Path): data = p.read_bytes() if p.suffix == ".gz": data = gzip.decompress(data) for line in data.splitlines(): if not line.strip(): continue try: rec = json.loads(line) yield rec.get("event", rec) # accept HEC shape or plain except Exception: continue def _to_float(x, default=0.0): try: if isinstance(x, (int, float)): return float(x) if isinstance(x, str): return float(x.strip().replace(",", "")) except Exception: pass return default def _boolish(x): if isinstance(x, bool): return x if isinstance(x, str): return x.lower() in {"true","1","yes"} return False def analyze(events): total = 0 by_status = {} by_step = {} total_amt = 0.0 amounts = [] inst_count = 0 latencies = {"compila": [], "conferma": [], "esito": []} rejections = [] vop_flags = [] by_minute = {} anomalies = [] # collected dicts for e in events: total += 1 st = str(e.get("status","")).lower() or "unknown" step = str(e.get("step","")).lower() or "unknown" by_status[st] = by_status.get(st,0)+1 by_step[step] = by_step.get(step,0)+1 amt = _to_float(e.get("importo")) total_amt += amt amounts.append(amt) if _boolish(e.get("istantaneo")): inst_count += 1 lat = _to_float(e.get("latency_ms"), default=None) if lat is not None and step in latencies: latencies[step].append(lat) # time bucket (minute) ts = e.get("data_pagamento") or e.get("_time") if isinstance(ts, str) and len(ts)>=16: key = ts[:16] # 'YYYY-MM-DDTHH:MM' by_minute[key] = by_minute.get(key, 0) + 1 # collect rejection info if st == "rejected": rejections.append(e) # vop flags vop = (e.get("vop_check") or "").lower() if vop in {"no_match","close_match"}: vop_flags.append({"transaction_id": e.get("transaction_id"), "vop_check": vop, "vop_score": e.get("vop_score"), "importo": amt, "divisa": e.get("divisa")}) # --- anomaly rules --- # A1: rejected EUR >= 10k if st=="rejected" and (e.get("divisa")=="EUR") and amt >= 10000: anomalies.append({"rule":"A1_rejected_high_value_eur", "transaction_id": e.get("transaction_id"), "amount": amt, "divisa": e.get("divisa"), "iban_dest_masked": e.get("iban_dest_masked"), "causale": e.get("causale")}) # A2: VOP no_match or low score & amount >= 5k vop_score = _to_float(e.get("vop_score"), default=None) if (vop=="no_match") or (vop=="close_match" and vop_score is not None and vop_score < 0.75 and amt>=5000): anomalies.append({"rule":"A2_vop_flagged", "transaction_id": e.get("transaction_id"), "vop_check": vop, "vop_score": vop_score, "amount": amt}) # A3: high latency per step thr = {"compila":600, "conferma":800, "esito":900}.get(step, 900) if lat is not None and lat > thr: anomalies.append({"rule":"A3_high_latency", "transaction_id": e.get("transaction_id"), "step": step, "latency_ms": lat}) # A4: instant transfer but pending/rejected if _boolish(e.get("istantaneo")) and st in {"pending","rejected"} and step=="esito": anomalies.append({"rule":"A4_instant_not_accepted", "transaction_id": e.get("transaction_id"), "status": st, "amount": amt}) # spike detection (very simple): minute counts > mean+3*std if by_minute: counts = list(by_minute.values()) mu = stats.mean(counts) sd = stats.pstdev(counts) if len(counts)>1 else 0.0 for minute, c in by_minute.items(): if sd>0 and c > mu + 3*sd: anomalies.append({"rule":"A5_volume_spike", "minute": minute, "count": c, "mu": round(mu,2), "sd": round(sd,2)}) summary = { "events": total, "accepted": by_status.get("accepted",0), "pending": by_status.get("pending",0), "rejected": by_status.get("rejected",0), "rejection_rate": round(by_status.get("rejected",0)/max(total,1), 4), "total_amount": round(total_amt,2), "avg_amount": round((sum(amounts)/len(amounts)) if amounts else 0.0, 2), "instant_share": round(inst_count/max(total,1), 4), "by_step": by_step, "latency_avg_ms": {k:(round(sum(v)/len(v),1) if v else None) for k,v in latencies.items()}, "vop_flags": len(vop_flags), "spike_minutes": len([a for a in anomalies if a["rule"]=="A5_volume_spike"]), "anomaly_count": len(anomalies), } return summary, anomalies def load_all_events(): files = list(_iter_files()) if not files: raise SystemExit(f"No chunk files in {CHUNK_DIR}.") events = [] for p in files: events.extend(_read_jsonl(p)) return events def write_reports(summary, anomalies): ts = int(time.time()) md_path = REPORT_DIR / f"report_{ts}.md" js_path = REPORT_DIR / f"anomalies_{ts}.json" # Markdown md = [] md.append(f"# Bonifico Log Analysis — {datetime.now(timezone.utc).isoformat()}") md.append("") md.append("## Summary") for k,v in summary.items(): if isinstance(v, dict): md.append(f"- **{k}**: `{json.dumps(v, ensure_ascii=False)}`") else: md.append(f"- **{k}**: `{v}`") md.append("") md.append("## Anomalies") if not anomalies: md.append("_None detected by rules._") else: for a in anomalies[:200]: md.append(f"- `{a['rule']}` — `{json.dumps(a, ensure_ascii=False)}`") if len(anomalies) > 200: md.append(f"... and {len(anomalies)-200} more.") md_path.write_text("\n".join(md), encoding="utf-8") # JSON js_path.write_text(json.dumps({"summary":summary,"anomalies":anomalies}, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Wrote {md_path}") print(f"Wrote {js_path}") if __name__ == "__main__": evts = load_all_events() summary, anomalies = analyze(evts) write_reports(summary, anomalies) # also print a short console digest print("\nDigest:", json.dumps({ "events": summary["events"], "rejection_rate": summary["rejection_rate"], "anomaly_count": summary["anomaly_count"] }))