192 lines
7.1 KiB
Python
192 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
import os, glob, json, gzip, time, pathlib, math, statistics as stats
|
|
from datetime import datetime, timezone
|
|
|
|
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out")
|
|
REPORT_DIR = pathlib.Path(os.getenv("REPORT_DIR", "./reports"))
|
|
REPORT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _iter_files():
|
|
paths = sorted(glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*"))
|
|
for p in paths:
|
|
yield pathlib.Path(p)
|
|
|
|
def _read_jsonl(p: pathlib.Path):
|
|
data = p.read_bytes()
|
|
if p.suffix == ".gz":
|
|
data = gzip.decompress(data)
|
|
for line in data.splitlines():
|
|
if not line.strip(): continue
|
|
try:
|
|
rec = json.loads(line)
|
|
yield rec.get("event", rec) # accept HEC shape or plain
|
|
except Exception:
|
|
continue
|
|
|
|
def _to_float(x, default=0.0):
|
|
try:
|
|
if isinstance(x, (int, float)): return float(x)
|
|
if isinstance(x, str): return float(x.strip().replace(",", ""))
|
|
except Exception:
|
|
pass
|
|
return default
|
|
|
|
def _boolish(x):
|
|
if isinstance(x, bool): return x
|
|
if isinstance(x, str): return x.lower() in {"true","1","yes"}
|
|
return False
|
|
|
|
def analyze(events):
|
|
total = 0
|
|
by_status = {}
|
|
by_step = {}
|
|
total_amt = 0.0
|
|
amounts = []
|
|
inst_count = 0
|
|
latencies = {"compila": [], "conferma": [], "esito": []}
|
|
rejections = []
|
|
vop_flags = []
|
|
by_minute = {}
|
|
|
|
anomalies = [] # collected dicts
|
|
|
|
for e in events:
|
|
total += 1
|
|
st = str(e.get("status","")).lower() or "unknown"
|
|
step = str(e.get("step","")).lower() or "unknown"
|
|
by_status[st] = by_status.get(st,0)+1
|
|
by_step[step] = by_step.get(step,0)+1
|
|
|
|
amt = _to_float(e.get("importo"))
|
|
total_amt += amt
|
|
amounts.append(amt)
|
|
|
|
if _boolish(e.get("istantaneo")): inst_count += 1
|
|
|
|
lat = _to_float(e.get("latency_ms"), default=None)
|
|
if lat is not None and step in latencies: latencies[step].append(lat)
|
|
|
|
# time bucket (minute)
|
|
ts = e.get("data_pagamento") or e.get("_time")
|
|
if isinstance(ts, str) and len(ts)>=16:
|
|
key = ts[:16] # 'YYYY-MM-DDTHH:MM'
|
|
by_minute[key] = by_minute.get(key, 0) + 1
|
|
|
|
# collect rejection info
|
|
if st == "rejected":
|
|
rejections.append(e)
|
|
|
|
# vop flags
|
|
vop = (e.get("vop_check") or "").lower()
|
|
if vop in {"no_match","close_match"}:
|
|
vop_flags.append({"transaction_id": e.get("transaction_id"),
|
|
"vop_check": vop, "vop_score": e.get("vop_score"),
|
|
"importo": amt, "divisa": e.get("divisa")})
|
|
|
|
# --- anomaly rules ---
|
|
# A1: rejected EUR >= 10k
|
|
if st=="rejected" and (e.get("divisa")=="EUR") and amt >= 10000:
|
|
anomalies.append({"rule":"A1_rejected_high_value_eur",
|
|
"transaction_id": e.get("transaction_id"),
|
|
"amount": amt,
|
|
"divisa": e.get("divisa"),
|
|
"iban_dest_masked": e.get("iban_dest_masked"),
|
|
"causale": e.get("causale")})
|
|
|
|
# A2: VOP no_match or low score & amount >= 5k
|
|
vop_score = _to_float(e.get("vop_score"), default=None)
|
|
if (vop=="no_match") or (vop=="close_match" and vop_score is not None and vop_score < 0.75 and amt>=5000):
|
|
anomalies.append({"rule":"A2_vop_flagged",
|
|
"transaction_id": e.get("transaction_id"),
|
|
"vop_check": vop,
|
|
"vop_score": vop_score,
|
|
"amount": amt})
|
|
|
|
# A3: high latency per step
|
|
thr = {"compila":600, "conferma":800, "esito":900}.get(step, 900)
|
|
if lat is not None and lat > thr:
|
|
anomalies.append({"rule":"A3_high_latency",
|
|
"transaction_id": e.get("transaction_id"),
|
|
"step": step, "latency_ms": lat})
|
|
|
|
# A4: instant transfer but pending/rejected
|
|
if _boolish(e.get("istantaneo")) and st in {"pending","rejected"} and step=="esito":
|
|
anomalies.append({"rule":"A4_instant_not_accepted",
|
|
"transaction_id": e.get("transaction_id"),
|
|
"status": st, "amount": amt})
|
|
|
|
# spike detection (very simple): minute counts > mean+3*std
|
|
if by_minute:
|
|
counts = list(by_minute.values())
|
|
mu = stats.mean(counts)
|
|
sd = stats.pstdev(counts) if len(counts)>1 else 0.0
|
|
for minute, c in by_minute.items():
|
|
if sd>0 and c > mu + 3*sd:
|
|
anomalies.append({"rule":"A5_volume_spike", "minute": minute, "count": c, "mu": round(mu,2), "sd": round(sd,2)})
|
|
|
|
summary = {
|
|
"events": total,
|
|
"accepted": by_status.get("accepted",0),
|
|
"pending": by_status.get("pending",0),
|
|
"rejected": by_status.get("rejected",0),
|
|
"rejection_rate": round(by_status.get("rejected",0)/max(total,1), 4),
|
|
"total_amount": round(total_amt,2),
|
|
"avg_amount": round((sum(amounts)/len(amounts)) if amounts else 0.0, 2),
|
|
"instant_share": round(inst_count/max(total,1), 4),
|
|
"by_step": by_step,
|
|
"latency_avg_ms": {k:(round(sum(v)/len(v),1) if v else None) for k,v in latencies.items()},
|
|
"vop_flags": len(vop_flags),
|
|
"spike_minutes": len([a for a in anomalies if a["rule"]=="A5_volume_spike"]),
|
|
"anomaly_count": len(anomalies),
|
|
}
|
|
return summary, anomalies
|
|
|
|
def load_all_events():
|
|
files = list(_iter_files())
|
|
if not files:
|
|
raise SystemExit(f"No chunk files in {CHUNK_DIR}.")
|
|
events = []
|
|
for p in files:
|
|
events.extend(_read_jsonl(p))
|
|
return events
|
|
|
|
def write_reports(summary, anomalies):
|
|
ts = int(time.time())
|
|
md_path = REPORT_DIR / f"report_{ts}.md"
|
|
js_path = REPORT_DIR / f"anomalies_{ts}.json"
|
|
# Markdown
|
|
md = []
|
|
md.append(f"# Bonifico Log Analysis — {datetime.now(timezone.utc).isoformat()}")
|
|
md.append("")
|
|
md.append("## Summary")
|
|
for k,v in summary.items():
|
|
if isinstance(v, dict):
|
|
md.append(f"- **{k}**: `{json.dumps(v, ensure_ascii=False)}`")
|
|
else:
|
|
md.append(f"- **{k}**: `{v}`")
|
|
md.append("")
|
|
md.append("## Anomalies")
|
|
if not anomalies:
|
|
md.append("_None detected by rules._")
|
|
else:
|
|
for a in anomalies[:200]:
|
|
md.append(f"- `{a['rule']}` — `{json.dumps(a, ensure_ascii=False)}`")
|
|
if len(anomalies) > 200:
|
|
md.append(f"... and {len(anomalies)-200} more.")
|
|
md_path.write_text("\n".join(md), encoding="utf-8")
|
|
# JSON
|
|
js_path.write_text(json.dumps({"summary":summary,"anomalies":anomalies}, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"Wrote {md_path}")
|
|
print(f"Wrote {js_path}")
|
|
|
|
if __name__ == "__main__":
|
|
evts = load_all_events()
|
|
summary, anomalies = analyze(evts)
|
|
write_reports(summary, anomalies)
|
|
# also print a short console digest
|
|
print("\nDigest:", json.dumps({
|
|
"events": summary["events"],
|
|
"rejection_rate": summary["rejection_rate"],
|
|
"anomaly_count": summary["anomaly_count"]
|
|
}))
|