intesa_splunk_main/api/flask_app.py

474 lines
18 KiB
Python

# api/flask_app.py
import os, io, glob, gzip, json, time, uuid, tempfile, pathlib, datetime as dt
from typing import List, Dict, Any, Tuple
from urllib.parse import urlparse
from flask import Flask, request, jsonify
from dotenv import load_dotenv
# Load .env locally (App Service uses App Settings instead)
load_dotenv(os.getenv("ENV_FILE", ".env"))
# Optional email
from notify import send_email
# ---------------- Azure SDKs (guarded) ----------------
try:
from azure.storage.blob import BlobServiceClient, BlobClient
except Exception:
BlobServiceClient = None
BlobClient = None
def _blob_client() -> "BlobServiceClient":
if not BlobServiceClient:
raise RuntimeError("azure-storage-blob not installed")
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
if not cs:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
return BlobServiceClient.from_connection_string(cs)
def _download_blob_bytes(container: str, blob_name: str) -> bytes:
svc = _blob_client()
blob = svc.get_blob_client(container=container, blob=blob_name)
return blob.download_blob().readall()
def _download_sas_bytes(sas_url: str) -> bytes:
if BlobClient:
blob = BlobClient.from_blob_url(sas_url)
return blob.download_blob().readall()
else:
import urllib.request
return urllib.request.urlopen(sas_url, timeout=30).read()
# ---------------- Agent (guarded) ----------------
_CAN_LLM = True
try:
import agent_runner # we'll temporarily set agent_runner.CHUNK_DIR when a blob is provided
build_agent = agent_runner.build_agent
except Exception:
_CAN_LLM = False
agent_runner = None
build_agent = None
app = Flask(__name__)
# ---------------- Config ----------------
CHUNK_DIR = pathlib.Path(os.getenv("CHUNK_DIR", "/app/out"))
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
AUTO_ANALYZE = os.getenv("AUTO_ANALYZE", "true").lower() in {"1","true","yes"}
DEFAULT_QUESTION = os.getenv("DEFAULT_QUESTION",
"Scan the latest chunks. List anomalies (rejected EUR >= 10000 and vop_no_match). "
"Provide brief evidence and next steps."
)
EMAIL_SEND_BY_DEFAULT = os.getenv("MAIL_ENABLED", "false").lower() in {"1","true","yes"}
# ---------------- Helpers ----------------
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
"""Accept raw or HEC-shaped records (`{'event': {...}}`)."""
return rec.get("event", rec)
def _write_chunk_from_lines(jsonl_lines: List[bytes], dest_dir: pathlib.Path) -> pathlib.Path:
name = f"chunk_{int(time.time())}_{uuid.uuid4().hex[:8]}.jsonl.gz"
path = dest_dir / name
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
for ln in jsonl_lines:
gz.write(ln if ln.endswith(b"\n") else ln + b"\n")
path.write_bytes(buf.getvalue())
return path
def _bytes_is_gzip(b: bytes) -> bool:
return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B
def _decode_utf8(b: bytes) -> str:
# be permissive; some exports contain odd characters / BOM
return b.decode("utf-8", errors="replace").lstrip("\ufeff")
def _normalize_bytes_to_jsonl_lines(data: bytes) -> Tuple[List[bytes], int]:
"""
Accepts: JSONL, JSONL.GZ, JSON array, or single JSON object.
Returns: (list of JSONL lines as bytes, number_of_events)
Raises: ValueError if format is not recognized.
"""
# decompress if gz
if _bytes_is_gzip(data):
try:
data = gzip.decompress(data)
except Exception:
raise ValueError("Invalid gzip stream")
text = _decode_utf8(data).strip()
if not text:
raise ValueError("Empty blob")
# Try JSON first (object or array)
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
ev = _normalize_event(parsed)
return [json.dumps(ev, separators=(",", ":")).encode("utf-8")], 1
if isinstance(parsed, list):
lines = []
count = 0
for item in parsed:
if isinstance(item, (dict,)):
ev = _normalize_event(item)
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
count += 1
if count == 0:
raise ValueError("JSON array has no objects")
return lines, count
# If JSON but not dict/list, fall through to JSONL attempt
except Exception:
pass
# Try JSONL (one JSON object per line)
lines = []
count = 0
for raw in text.splitlines():
s = raw.strip()
if not s:
continue
try:
obj = json.loads(s)
ev = _normalize_event(obj)
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
count += 1
except Exception:
# Not valid JSON per line -> not JSONL
raise ValueError("Unrecognized blob format. Expected JSONL, JSONL.GZ, JSON array, or single JSON object.")
if count == 0:
raise ValueError("No JSON objects found")
return lines, count
def _iter_recent_events(limit_files: int = 10, limit_events: int = 5000) -> List[Dict[str, Any]]:
"""Load recent events from newest chunk_*.jsonl(.gz) and hec_*.jsonl(.gz) in CHUNK_DIR."""
patterns = ["chunk_*.jsonl*", "hec_*.jsonl*"]
files: List[pathlib.Path] = []
for pat in patterns:
files += [pathlib.Path(p) for p in glob.glob(str(CHUNK_DIR / pat))]
files = sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)[:limit_files]
out: List[Dict[str, Any]] = []
for fp in files:
data = fp.read_bytes()
if fp.suffix == ".gz":
data = gzip.decompress(data)
for ln in data.splitlines():
if not ln.strip():
continue
try:
rec = json.loads(ln)
out.append(_normalize_event(rec))
except Exception:
continue
if len(out) >= limit_events:
return out
return out
def _as_float(x) -> float:
try:
return float(x)
except Exception:
return 0.0
def _rules_only_report() -> str:
"""Simple rules when AOAI creds are not set."""
evts = _iter_recent_events(limit_files=20, limit_events=20000)
total = len(evts)
rej_hi, vop_no = [], []
for e in evts:
status = (e.get("status") or "").lower()
divisa = (e.get("divisa") or "").upper()
amt = _as_float(e.get("importo"))
vop = (e.get("vop_check") or "").lower()
if status == "rejected" and divisa == "EUR" and amt >= 10000:
rej_hi.append({"transaction_id": e.get("transaction_id"), "importo": amt, "step": e.get("step")})
if vop in {"no_match", "vop_no_match"}:
vop_no.append({"transaction_id": e.get("transaction_id"), "step": e.get("step")})
lines = []
lines.append("### Findings")
lines.append(f"- Total events scanned: {total}")
lines.append(f"- High-value rejected (EUR≥10000): {len(rej_hi)}")
lines.append(f"- VOP no_match: {len(vop_no)}")
if rej_hi[:5]:
lines.append(" - Examples (rejected): " + ", ".join(
f"{i.get('transaction_id')}({i.get('importo')})" for i in rej_hi[:5] if i.get('transaction_id')
))
if vop_no[:5]:
lines.append(" - Examples (vop_no_match): " + ", ".join(
f"{i.get('transaction_id')}" for i in vop_no[:5] if i.get('transaction_id')
))
lines.append("")
lines.append("### Recommended actions")
for r in (
"Validate VOP mismatches; confirm beneficiary details.",
"Investigate rejection reasons for all EUR≥10k transactions.",
"Check spike trends by hour/day and counterparties.",
):
lines.append(f"- {r}")
return "\n".join(lines)
def _try_llm_report(question: str) -> str:
if not _CAN_LLM or not build_agent:
raise RuntimeError("LLM agent not available")
agent = build_agent()
out = agent.invoke({"input": question, "chat_history": []})
return out.get("output", "") or "(no output from agent)"
def _run_analysis(question: str) -> Dict[str, Any]:
t0 = time.time()
used = "rules"
try:
if _CAN_LLM and os.getenv("AZURE_OPENAI_ENDPOINT") and (
os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY")
):
txt = _try_llm_report(question)
used = "llm"
else:
txt = _rules_only_report()
except Exception as e:
txt = _rules_only_report()
used = f"rules (fallback from llm error: {str(e)[:120]})"
return {"ok": True, "analyzer": used, "duration_sec": round(time.time() - t0, 3), "result": txt}
# ---------------- Routes ----------------
@app.get("/health")
def health():
return {"status": "ok", "auto_analyze": AUTO_ANALYZE, "chunk_dir": str(CHUNK_DIR)}, 200
@app.post("/ingest")
def ingest():
"""
POST JSON:
{ "events": [ {..}, {..} ] } OR a single object {..}
Writes a chunk and (optionally) auto-analyzes.
"""
body = request.get_json(force=True, silent=True)
if body is None:
return jsonify({"ok": False, "error": "invalid JSON"}), 400
# normalize to list (support HEC-style {"event": {...}})
events: List[Dict[str, Any]] = []
if isinstance(body, dict) and "events" in body and isinstance(body["events"], list):
events = [_normalize_event(e) for e in body["events"] if isinstance(e, dict)]
elif isinstance(body, list):
events = [_normalize_event(e) for e in body if isinstance(e, dict)]
elif isinstance(body, dict):
events = [_normalize_event(body)]
else:
return jsonify({"ok": False, "error": "payload must be an object or list"}), 400
if not events:
return jsonify({"ok": False, "error": "no events to ingest"}), 400
lines = [json.dumps(e, separators=(",", ":")).encode("utf-8") for e in events]
path = _write_chunk_from_lines(lines, CHUNK_DIR)
report = None
if AUTO_ANALYZE:
report = _run_analysis(DEFAULT_QUESTION)
if EMAIL_SEND_BY_DEFAULT:
try:
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"])
except Exception:
pass
return jsonify({
"ok": True,
"written": len(events),
"chunk_file": path.name,
"auto_analyzed": bool(AUTO_ANALYZE),
"report": report,
}), 200
@app.post("/ingest_blob")
def ingest_blob():
"""
Accept a blob, normalize it to chunk_*.jsonl.gz in CHUNK_DIR, optionally analyze.
Body:
{
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
// OR
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
"analyze": true,
"email": {"send": true, "to": "x@x"}
}
"""
payload = request.get_json(force=True, silent=True) or {}
analyze = bool(payload.get("analyze"))
data: bytes = b""
try:
if payload.get("sas_url"):
data = _download_sas_bytes(payload["sas_url"])
elif payload.get("container") and payload.get("blob_name"):
data = _download_blob_bytes(payload["container"], payload["blob_name"])
else:
return jsonify({"ok": False, "error": "Provide 'sas_url' OR ('container' + 'blob_name')"}), 400
lines, count = _normalize_bytes_to_jsonl_lines(data)
path = _write_chunk_from_lines(lines, CHUNK_DIR)
report = None
if analyze:
report = _run_analysis(DEFAULT_QUESTION)
email_cfg = payload.get("email") or {}
if email_cfg.get("send"):
try:
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"], to_addr=email_cfg.get("to"))
except Exception:
pass
return jsonify({"ok": True, "written": count, "chunk_file": path.name, "report": report}), 200
except ValueError as ve:
return jsonify({"ok": False, "error": str(ve)}), 400
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.post("/analyze")
def analyze():
"""
Manually trigger analysis over the newest chunks, OR over a specific blob.
Body options:
{
"question": "...",
"email": {"send": true, "to": "x@x"},
"blob": {
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
// OR
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
}
}
"""
global CHUNK_DIR # declare BEFORE any use in this function
payload = request.get_json(force=True, silent=True) or {}
question = payload.get("question") or DEFAULT_QUESTION
tmp_dir = None
prev_env_chunk = os.getenv("CHUNK_DIR")
prev_agent_chunk = (getattr(agent_runner, "CHUNK_DIR", str(CHUNK_DIR))
if agent_runner else str(CHUNK_DIR))
prev_local_chunk = CHUNK_DIR
try:
blob_req = payload.get("blob")
if blob_req:
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
# download to bytes
if blob_req.get("sas_url"):
data = _download_sas_bytes(blob_req["sas_url"])
elif blob_req.get("container") and blob_req.get("blob_name"):
data = _download_blob_bytes(blob_req["container"], blob_req["blob_name"])
else:
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
# normalize to a real chunk_*.jsonl.gz in tmp_dir
lines, _ = _normalize_bytes_to_jsonl_lines(data)
_write_chunk_from_lines(lines, pathlib.Path(tmp_dir))
# re-point CHUNK_DIR for this request and agent_runner
CHUNK_DIR = pathlib.Path(tmp_dir)
os.environ["CHUNK_DIR"] = tmp_dir
if agent_runner:
agent_runner.CHUNK_DIR = tmp_dir
if hasattr(agent_runner, "BLOB_DIR"):
agent_runner.BLOB_DIR = ""
# run analysis
res = _run_analysis(question)
# optional email
email_cfg = payload.get("email") or {}
if email_cfg.get("send"):
try:
send_email(subject="[Intesa Logs] Report", body_text=res["result"], to_addr=email_cfg.get("to"))
except Exception:
pass
return jsonify(res), 200
except ValueError as ve:
return jsonify({"ok": False, "error": str(ve)}), 400
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
finally:
# restore CHUNK_DIR context
if prev_env_chunk is None:
os.environ.pop("CHUNK_DIR", None)
else:
os.environ["CHUNK_DIR"] = prev_env_chunk
if agent_runner:
agent_runner.CHUNK_DIR = prev_agent_chunk
CHUNK_DIR = pathlib.Path(prev_local_chunk)
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
# ---------------- Event Grid webhook ----------------
@app.post("/eventgrid")
def eventgrid():
"""
Webhook for Azure Event Grid (Storage -> BlobCreated).
- Handles SubscriptionValidation by echoing validationResponse.
- For each BlobCreated event, extracts container/blob and ingests it.
- Optional auto-analyze + email is controlled by:
EVENTGRID_ANALYZE_DEFAULT (true/false) [default: true]
EVENTGRID_EMAIL_DEFAULT (true/false) [default: MAIL_ENABLED]
"""
try:
events = request.get_json(force=True, silent=True)
if not events:
return jsonify({"ok": False, "error": "No events"}), 400
if isinstance(events, dict):
events = [events]
analyze_default = os.getenv("EVENTGRID_ANALYZE_DEFAULT", "true").lower() in {"1","true","yes"}
email_default = os.getenv("EVENTGRID_EMAIL_DEFAULT", os.getenv("MAIL_ENABLED", "false")).lower() in {"1","true","yes"}
target_container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
for ev in events:
et = ev.get("eventType")
if et == "Microsoft.EventGrid.SubscriptionValidationEvent":
validation_code = ev.get("data", {}).get("validationCode")
return jsonify({"validationResponse": validation_code})
if et in ("Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobRenamed"):
url = (ev.get("data", {}) or {}).get("url")
if not url:
continue
u = urlparse(url)
# path like: /container/dir1/dir2/file.json
parts = [p for p in u.path.split("/") if p]
if len(parts) < 2:
continue
container = parts[0]
blob_name = "/".join(parts[1:])
# Only act on the configured container
if container != target_container:
continue
# Download the blob, normalize, write chunk, analyze/email if enabled
data = _download_blob_bytes(container, blob_name)
lines, _ = _normalize_bytes_to_jsonl_lines(data)
_write_chunk_from_lines(lines, CHUNK_DIR)
if analyze_default:
rep = _run_analysis(DEFAULT_QUESTION)
if email_default:
try:
send_email(subject="[Intesa Logs] Auto Analysis", body_text=rep["result"])
except Exception:
pass
return jsonify({"ok": True}), 200
except ValueError as ve:
return jsonify({"ok": False, "error": str(ve)}), 400
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500