474 lines
18 KiB
Python
474 lines
18 KiB
Python
# api/flask_app.py
|
|
import os, io, glob, gzip, json, time, uuid, tempfile, pathlib, datetime as dt
|
|
from typing import List, Dict, Any, Tuple
|
|
from urllib.parse import urlparse
|
|
from flask import Flask, request, jsonify
|
|
from dotenv import load_dotenv
|
|
|
|
# Load .env locally (App Service uses App Settings instead)
|
|
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
|
|
|
# Optional email
|
|
from notify import send_email
|
|
|
|
# ---------------- Azure SDKs (guarded) ----------------
|
|
try:
|
|
from azure.storage.blob import BlobServiceClient, BlobClient
|
|
except Exception:
|
|
BlobServiceClient = None
|
|
BlobClient = None
|
|
|
|
def _blob_client() -> "BlobServiceClient":
|
|
if not BlobServiceClient:
|
|
raise RuntimeError("azure-storage-blob not installed")
|
|
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
if not cs:
|
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
|
return BlobServiceClient.from_connection_string(cs)
|
|
|
|
def _download_blob_bytes(container: str, blob_name: str) -> bytes:
|
|
svc = _blob_client()
|
|
blob = svc.get_blob_client(container=container, blob=blob_name)
|
|
return blob.download_blob().readall()
|
|
|
|
def _download_sas_bytes(sas_url: str) -> bytes:
|
|
if BlobClient:
|
|
blob = BlobClient.from_blob_url(sas_url)
|
|
return blob.download_blob().readall()
|
|
else:
|
|
import urllib.request
|
|
return urllib.request.urlopen(sas_url, timeout=30).read()
|
|
|
|
# ---------------- Agent (guarded) ----------------
|
|
_CAN_LLM = True
|
|
try:
|
|
import agent_runner # we'll temporarily set agent_runner.CHUNK_DIR when a blob is provided
|
|
build_agent = agent_runner.build_agent
|
|
except Exception:
|
|
_CAN_LLM = False
|
|
agent_runner = None
|
|
build_agent = None
|
|
|
|
app = Flask(__name__)
|
|
|
|
# ---------------- Config ----------------
|
|
CHUNK_DIR = pathlib.Path(os.getenv("CHUNK_DIR", "/app/out"))
|
|
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
AUTO_ANALYZE = os.getenv("AUTO_ANALYZE", "true").lower() in {"1","true","yes"}
|
|
DEFAULT_QUESTION = os.getenv("DEFAULT_QUESTION",
|
|
"Scan the latest chunks. List anomalies (rejected EUR >= 10000 and vop_no_match). "
|
|
"Provide brief evidence and next steps."
|
|
)
|
|
EMAIL_SEND_BY_DEFAULT = os.getenv("MAIL_ENABLED", "false").lower() in {"1","true","yes"}
|
|
|
|
# ---------------- Helpers ----------------
|
|
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Accept raw or HEC-shaped records (`{'event': {...}}`)."""
|
|
return rec.get("event", rec)
|
|
|
|
def _write_chunk_from_lines(jsonl_lines: List[bytes], dest_dir: pathlib.Path) -> pathlib.Path:
|
|
name = f"chunk_{int(time.time())}_{uuid.uuid4().hex[:8]}.jsonl.gz"
|
|
path = dest_dir / name
|
|
buf = io.BytesIO()
|
|
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
|
|
for ln in jsonl_lines:
|
|
gz.write(ln if ln.endswith(b"\n") else ln + b"\n")
|
|
path.write_bytes(buf.getvalue())
|
|
return path
|
|
|
|
def _bytes_is_gzip(b: bytes) -> bool:
|
|
return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B
|
|
|
|
def _decode_utf8(b: bytes) -> str:
|
|
# be permissive; some exports contain odd characters / BOM
|
|
return b.decode("utf-8", errors="replace").lstrip("\ufeff")
|
|
|
|
def _normalize_bytes_to_jsonl_lines(data: bytes) -> Tuple[List[bytes], int]:
|
|
"""
|
|
Accepts: JSONL, JSONL.GZ, JSON array, or single JSON object.
|
|
Returns: (list of JSONL lines as bytes, number_of_events)
|
|
Raises: ValueError if format is not recognized.
|
|
"""
|
|
# decompress if gz
|
|
if _bytes_is_gzip(data):
|
|
try:
|
|
data = gzip.decompress(data)
|
|
except Exception:
|
|
raise ValueError("Invalid gzip stream")
|
|
|
|
text = _decode_utf8(data).strip()
|
|
if not text:
|
|
raise ValueError("Empty blob")
|
|
|
|
# Try JSON first (object or array)
|
|
try:
|
|
parsed = json.loads(text)
|
|
if isinstance(parsed, dict):
|
|
ev = _normalize_event(parsed)
|
|
return [json.dumps(ev, separators=(",", ":")).encode("utf-8")], 1
|
|
if isinstance(parsed, list):
|
|
lines = []
|
|
count = 0
|
|
for item in parsed:
|
|
if isinstance(item, (dict,)):
|
|
ev = _normalize_event(item)
|
|
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
|
|
count += 1
|
|
if count == 0:
|
|
raise ValueError("JSON array has no objects")
|
|
return lines, count
|
|
# If JSON but not dict/list, fall through to JSONL attempt
|
|
except Exception:
|
|
pass
|
|
|
|
# Try JSONL (one JSON object per line)
|
|
lines = []
|
|
count = 0
|
|
for raw in text.splitlines():
|
|
s = raw.strip()
|
|
if not s:
|
|
continue
|
|
try:
|
|
obj = json.loads(s)
|
|
ev = _normalize_event(obj)
|
|
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
|
|
count += 1
|
|
except Exception:
|
|
# Not valid JSON per line -> not JSONL
|
|
raise ValueError("Unrecognized blob format. Expected JSONL, JSONL.GZ, JSON array, or single JSON object.")
|
|
if count == 0:
|
|
raise ValueError("No JSON objects found")
|
|
return lines, count
|
|
|
|
def _iter_recent_events(limit_files: int = 10, limit_events: int = 5000) -> List[Dict[str, Any]]:
|
|
"""Load recent events from newest chunk_*.jsonl(.gz) and hec_*.jsonl(.gz) in CHUNK_DIR."""
|
|
patterns = ["chunk_*.jsonl*", "hec_*.jsonl*"]
|
|
files: List[pathlib.Path] = []
|
|
for pat in patterns:
|
|
files += [pathlib.Path(p) for p in glob.glob(str(CHUNK_DIR / pat))]
|
|
files = sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)[:limit_files]
|
|
|
|
out: List[Dict[str, Any]] = []
|
|
for fp in files:
|
|
data = fp.read_bytes()
|
|
if fp.suffix == ".gz":
|
|
data = gzip.decompress(data)
|
|
for ln in data.splitlines():
|
|
if not ln.strip():
|
|
continue
|
|
try:
|
|
rec = json.loads(ln)
|
|
out.append(_normalize_event(rec))
|
|
except Exception:
|
|
continue
|
|
if len(out) >= limit_events:
|
|
return out
|
|
return out
|
|
|
|
def _as_float(x) -> float:
|
|
try:
|
|
return float(x)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def _rules_only_report() -> str:
|
|
"""Simple rules when AOAI creds are not set."""
|
|
evts = _iter_recent_events(limit_files=20, limit_events=20000)
|
|
total = len(evts)
|
|
rej_hi, vop_no = [], []
|
|
|
|
for e in evts:
|
|
status = (e.get("status") or "").lower()
|
|
divisa = (e.get("divisa") or "").upper()
|
|
amt = _as_float(e.get("importo"))
|
|
vop = (e.get("vop_check") or "").lower()
|
|
if status == "rejected" and divisa == "EUR" and amt >= 10000:
|
|
rej_hi.append({"transaction_id": e.get("transaction_id"), "importo": amt, "step": e.get("step")})
|
|
if vop in {"no_match", "vop_no_match"}:
|
|
vop_no.append({"transaction_id": e.get("transaction_id"), "step": e.get("step")})
|
|
|
|
lines = []
|
|
lines.append("### Findings")
|
|
lines.append(f"- Total events scanned: {total}")
|
|
lines.append(f"- High-value rejected (EUR≥10000): {len(rej_hi)}")
|
|
lines.append(f"- VOP no_match: {len(vop_no)}")
|
|
if rej_hi[:5]:
|
|
lines.append(" - Examples (rejected): " + ", ".join(
|
|
f"{i.get('transaction_id')}({i.get('importo')})" for i in rej_hi[:5] if i.get('transaction_id')
|
|
))
|
|
if vop_no[:5]:
|
|
lines.append(" - Examples (vop_no_match): " + ", ".join(
|
|
f"{i.get('transaction_id')}" for i in vop_no[:5] if i.get('transaction_id')
|
|
))
|
|
lines.append("")
|
|
lines.append("### Recommended actions")
|
|
for r in (
|
|
"Validate VOP mismatches; confirm beneficiary details.",
|
|
"Investigate rejection reasons for all EUR≥10k transactions.",
|
|
"Check spike trends by hour/day and counterparties.",
|
|
):
|
|
lines.append(f"- {r}")
|
|
return "\n".join(lines)
|
|
|
|
def _try_llm_report(question: str) -> str:
|
|
if not _CAN_LLM or not build_agent:
|
|
raise RuntimeError("LLM agent not available")
|
|
agent = build_agent()
|
|
out = agent.invoke({"input": question, "chat_history": []})
|
|
return out.get("output", "") or "(no output from agent)"
|
|
|
|
def _run_analysis(question: str) -> Dict[str, Any]:
|
|
t0 = time.time()
|
|
used = "rules"
|
|
try:
|
|
if _CAN_LLM and os.getenv("AZURE_OPENAI_ENDPOINT") and (
|
|
os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY")
|
|
):
|
|
txt = _try_llm_report(question)
|
|
used = "llm"
|
|
else:
|
|
txt = _rules_only_report()
|
|
except Exception as e:
|
|
txt = _rules_only_report()
|
|
used = f"rules (fallback from llm error: {str(e)[:120]})"
|
|
return {"ok": True, "analyzer": used, "duration_sec": round(time.time() - t0, 3), "result": txt}
|
|
|
|
# ---------------- Routes ----------------
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok", "auto_analyze": AUTO_ANALYZE, "chunk_dir": str(CHUNK_DIR)}, 200
|
|
|
|
@app.post("/ingest")
|
|
def ingest():
|
|
"""
|
|
POST JSON:
|
|
{ "events": [ {..}, {..} ] } OR a single object {..}
|
|
Writes a chunk and (optionally) auto-analyzes.
|
|
"""
|
|
body = request.get_json(force=True, silent=True)
|
|
if body is None:
|
|
return jsonify({"ok": False, "error": "invalid JSON"}), 400
|
|
|
|
# normalize to list (support HEC-style {"event": {...}})
|
|
events: List[Dict[str, Any]] = []
|
|
if isinstance(body, dict) and "events" in body and isinstance(body["events"], list):
|
|
events = [_normalize_event(e) for e in body["events"] if isinstance(e, dict)]
|
|
elif isinstance(body, list):
|
|
events = [_normalize_event(e) for e in body if isinstance(e, dict)]
|
|
elif isinstance(body, dict):
|
|
events = [_normalize_event(body)]
|
|
else:
|
|
return jsonify({"ok": False, "error": "payload must be an object or list"}), 400
|
|
|
|
if not events:
|
|
return jsonify({"ok": False, "error": "no events to ingest"}), 400
|
|
|
|
lines = [json.dumps(e, separators=(",", ":")).encode("utf-8") for e in events]
|
|
path = _write_chunk_from_lines(lines, CHUNK_DIR)
|
|
|
|
report = None
|
|
if AUTO_ANALYZE:
|
|
report = _run_analysis(DEFAULT_QUESTION)
|
|
if EMAIL_SEND_BY_DEFAULT:
|
|
try:
|
|
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"])
|
|
except Exception:
|
|
pass
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"written": len(events),
|
|
"chunk_file": path.name,
|
|
"auto_analyzed": bool(AUTO_ANALYZE),
|
|
"report": report,
|
|
}), 200
|
|
|
|
@app.post("/ingest_blob")
|
|
def ingest_blob():
|
|
"""
|
|
Accept a blob, normalize it to chunk_*.jsonl.gz in CHUNK_DIR, optionally analyze.
|
|
|
|
Body:
|
|
{
|
|
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
|
|
// OR
|
|
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
|
|
"analyze": true,
|
|
"email": {"send": true, "to": "x@x"}
|
|
}
|
|
"""
|
|
payload = request.get_json(force=True, silent=True) or {}
|
|
analyze = bool(payload.get("analyze"))
|
|
data: bytes = b""
|
|
|
|
try:
|
|
if payload.get("sas_url"):
|
|
data = _download_sas_bytes(payload["sas_url"])
|
|
elif payload.get("container") and payload.get("blob_name"):
|
|
data = _download_blob_bytes(payload["container"], payload["blob_name"])
|
|
else:
|
|
return jsonify({"ok": False, "error": "Provide 'sas_url' OR ('container' + 'blob_name')"}), 400
|
|
|
|
lines, count = _normalize_bytes_to_jsonl_lines(data)
|
|
path = _write_chunk_from_lines(lines, CHUNK_DIR)
|
|
|
|
report = None
|
|
if analyze:
|
|
report = _run_analysis(DEFAULT_QUESTION)
|
|
email_cfg = payload.get("email") or {}
|
|
if email_cfg.get("send"):
|
|
try:
|
|
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"], to_addr=email_cfg.get("to"))
|
|
except Exception:
|
|
pass
|
|
|
|
return jsonify({"ok": True, "written": count, "chunk_file": path.name, "report": report}), 200
|
|
|
|
except ValueError as ve:
|
|
return jsonify({"ok": False, "error": str(ve)}), 400
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "error": str(e)}), 500
|
|
|
|
@app.post("/analyze")
|
|
def analyze():
|
|
"""
|
|
Manually trigger analysis over the newest chunks, OR over a specific blob.
|
|
|
|
Body options:
|
|
{
|
|
"question": "...",
|
|
"email": {"send": true, "to": "x@x"},
|
|
"blob": {
|
|
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
|
|
// OR
|
|
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
|
|
}
|
|
}
|
|
"""
|
|
global CHUNK_DIR # declare BEFORE any use in this function
|
|
payload = request.get_json(force=True, silent=True) or {}
|
|
question = payload.get("question") or DEFAULT_QUESTION
|
|
|
|
tmp_dir = None
|
|
prev_env_chunk = os.getenv("CHUNK_DIR")
|
|
prev_agent_chunk = (getattr(agent_runner, "CHUNK_DIR", str(CHUNK_DIR))
|
|
if agent_runner else str(CHUNK_DIR))
|
|
prev_local_chunk = CHUNK_DIR
|
|
|
|
try:
|
|
blob_req = payload.get("blob")
|
|
if blob_req:
|
|
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
|
|
# download to bytes
|
|
if blob_req.get("sas_url"):
|
|
data = _download_sas_bytes(blob_req["sas_url"])
|
|
elif blob_req.get("container") and blob_req.get("blob_name"):
|
|
data = _download_blob_bytes(blob_req["container"], blob_req["blob_name"])
|
|
else:
|
|
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
|
|
|
|
# normalize to a real chunk_*.jsonl.gz in tmp_dir
|
|
lines, _ = _normalize_bytes_to_jsonl_lines(data)
|
|
_write_chunk_from_lines(lines, pathlib.Path(tmp_dir))
|
|
|
|
# re-point CHUNK_DIR for this request and agent_runner
|
|
CHUNK_DIR = pathlib.Path(tmp_dir)
|
|
os.environ["CHUNK_DIR"] = tmp_dir
|
|
if agent_runner:
|
|
agent_runner.CHUNK_DIR = tmp_dir
|
|
if hasattr(agent_runner, "BLOB_DIR"):
|
|
agent_runner.BLOB_DIR = ""
|
|
|
|
# run analysis
|
|
res = _run_analysis(question)
|
|
|
|
# optional email
|
|
email_cfg = payload.get("email") or {}
|
|
if email_cfg.get("send"):
|
|
try:
|
|
send_email(subject="[Intesa Logs] Report", body_text=res["result"], to_addr=email_cfg.get("to"))
|
|
except Exception:
|
|
pass
|
|
|
|
return jsonify(res), 200
|
|
|
|
except ValueError as ve:
|
|
return jsonify({"ok": False, "error": str(ve)}), 400
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "error": str(e)}), 500
|
|
finally:
|
|
# restore CHUNK_DIR context
|
|
if prev_env_chunk is None:
|
|
os.environ.pop("CHUNK_DIR", None)
|
|
else:
|
|
os.environ["CHUNK_DIR"] = prev_env_chunk
|
|
if agent_runner:
|
|
agent_runner.CHUNK_DIR = prev_agent_chunk
|
|
CHUNK_DIR = pathlib.Path(prev_local_chunk)
|
|
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ---------------- Event Grid webhook ----------------
|
|
@app.post("/eventgrid")
|
|
def eventgrid():
|
|
"""
|
|
Webhook for Azure Event Grid (Storage -> BlobCreated).
|
|
- Handles SubscriptionValidation by echoing validationResponse.
|
|
- For each BlobCreated event, extracts container/blob and ingests it.
|
|
- Optional auto-analyze + email is controlled by:
|
|
EVENTGRID_ANALYZE_DEFAULT (true/false) [default: true]
|
|
EVENTGRID_EMAIL_DEFAULT (true/false) [default: MAIL_ENABLED]
|
|
"""
|
|
try:
|
|
events = request.get_json(force=True, silent=True)
|
|
if not events:
|
|
return jsonify({"ok": False, "error": "No events"}), 400
|
|
if isinstance(events, dict):
|
|
events = [events]
|
|
|
|
analyze_default = os.getenv("EVENTGRID_ANALYZE_DEFAULT", "true").lower() in {"1","true","yes"}
|
|
email_default = os.getenv("EVENTGRID_EMAIL_DEFAULT", os.getenv("MAIL_ENABLED", "false")).lower() in {"1","true","yes"}
|
|
target_container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
|
|
|
for ev in events:
|
|
et = ev.get("eventType")
|
|
if et == "Microsoft.EventGrid.SubscriptionValidationEvent":
|
|
validation_code = ev.get("data", {}).get("validationCode")
|
|
return jsonify({"validationResponse": validation_code})
|
|
|
|
if et in ("Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobRenamed"):
|
|
url = (ev.get("data", {}) or {}).get("url")
|
|
if not url:
|
|
continue
|
|
u = urlparse(url)
|
|
# path like: /container/dir1/dir2/file.json
|
|
parts = [p for p in u.path.split("/") if p]
|
|
if len(parts) < 2:
|
|
continue
|
|
container = parts[0]
|
|
blob_name = "/".join(parts[1:])
|
|
|
|
# Only act on the configured container
|
|
if container != target_container:
|
|
continue
|
|
|
|
# Download the blob, normalize, write chunk, analyze/email if enabled
|
|
data = _download_blob_bytes(container, blob_name)
|
|
lines, _ = _normalize_bytes_to_jsonl_lines(data)
|
|
_write_chunk_from_lines(lines, CHUNK_DIR)
|
|
|
|
if analyze_default:
|
|
rep = _run_analysis(DEFAULT_QUESTION)
|
|
if email_default:
|
|
try:
|
|
send_email(subject="[Intesa Logs] Auto Analysis", body_text=rep["result"])
|
|
except Exception:
|
|
pass
|
|
|
|
return jsonify({"ok": True}), 200
|
|
|
|
except ValueError as ve:
|
|
return jsonify({"ok": False, "error": str(ve)}), 400
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "error": str(e)}), 500
|