185 lines
6.5 KiB
Python
185 lines
6.5 KiB
Python
# flask_app.py
|
|
import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt
|
|
from typing import Optional
|
|
from flask import Flask, request, jsonify
|
|
from dotenv import load_dotenv
|
|
|
|
# Load .env locally (App Service uses App Settings instead)
|
|
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
|
|
|
# Agent + email
|
|
from agent_runner import build_agent
|
|
from notify import send_email
|
|
|
|
# Azure SDKs (guarded imports so we don't crash at boot)
|
|
try:
|
|
from azure.storage.blob import BlobServiceClient, ContentSettings
|
|
except Exception:
|
|
BlobServiceClient = None
|
|
ContentSettings = None
|
|
|
|
try:
|
|
from azure.storage.queue import QueueClient
|
|
except Exception:
|
|
QueueClient = None
|
|
|
|
app = Flask(__name__)
|
|
|
|
# -------- Helpers --------
|
|
def _blob_client() -> BlobServiceClient:
|
|
if not BlobServiceClient:
|
|
raise RuntimeError("azure-storage-blob not installed")
|
|
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
if not cs:
|
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
|
return BlobServiceClient.from_connection_string(cs)
|
|
|
|
def _queue_client() -> QueueClient:
|
|
if not QueueClient:
|
|
raise RuntimeError("azure-storage-queue not installed")
|
|
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
if not cs:
|
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
|
qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
|
|
qc = QueueClient.from_connection_string(cs, qname)
|
|
try:
|
|
qc.create_queue()
|
|
except Exception:
|
|
pass
|
|
return qc
|
|
|
|
def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str:
|
|
svc = _blob_client()
|
|
cc = svc.get_container_client(container)
|
|
try:
|
|
cc.create_container()
|
|
except Exception:
|
|
pass
|
|
ext = "jsonl.gz" if compressed else "jsonl"
|
|
# folder scheme matches poller
|
|
prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}"
|
|
blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}"
|
|
data = gzip.compress(raw_bytes) if compressed else raw_bytes
|
|
settings = ContentSettings(
|
|
content_type="application/json",
|
|
content_encoding=("gzip" if compressed else None),
|
|
)
|
|
bc = cc.get_blob_client(blob_name)
|
|
bc.upload_blob(data, overwrite=True, content_settings=settings)
|
|
return blob_name
|
|
|
|
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
|
|
svc = _blob_client()
|
|
blob = svc.get_blob_client(container=container, blob=blob_name)
|
|
data = blob.download_blob().readall()
|
|
fname = os.path.basename(blob_name)
|
|
path = os.path.join(outdir, fname)
|
|
with open(path, "wb") as f:
|
|
f.write(data)
|
|
return path
|
|
|
|
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
|
|
if not BlobServiceClient:
|
|
# ultra-light fallback
|
|
import urllib.request
|
|
data = urllib.request.urlopen(sas_url, timeout=30).read()
|
|
else:
|
|
from azure.storage.blob import BlobClient
|
|
blob = BlobClient.from_blob_url(sas_url)
|
|
data = blob.download_blob().readall()
|
|
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
|
|
path = os.path.join(outdir, name)
|
|
open(path, "wb").write(data)
|
|
return path
|
|
|
|
# -------- Routes --------
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}, 200
|
|
|
|
@app.post("/analyze")
|
|
def analyze():
|
|
"""
|
|
POST JSON:
|
|
{
|
|
"question": "...optional custom question...",
|
|
"email": {"send": true, "to": "override@example.com"},
|
|
"blob": {
|
|
"container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]"
|
|
// OR
|
|
"sas_url": "https://.../chunk.jsonl.gz?sig=..."
|
|
}
|
|
}
|
|
"""
|
|
t0 = time.time()
|
|
payload = request.get_json(force=True, silent=True) or {}
|
|
question = payload.get("question") or (
|
|
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
|
|
"Give a brief summary and next steps."
|
|
)
|
|
|
|
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
|
|
tmp_dir = None
|
|
try:
|
|
blob_req = payload.get("blob")
|
|
if blob_req:
|
|
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
|
|
if blob_req.get("sas_url"):
|
|
_download_sas_to_dir(blob_req["sas_url"], tmp_dir)
|
|
elif blob_req.get("container") and blob_req.get("blob_name"):
|
|
_download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir)
|
|
else:
|
|
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
|
|
os.environ["CHUNK_DIR"] = tmp_dir
|
|
|
|
agent = build_agent()
|
|
out = agent.invoke({"input": question, "chat_history": []})
|
|
result = out.get("output", "")
|
|
|
|
email_cfg = payload.get("email") or {}
|
|
if email_cfg.get("send"):
|
|
to_addr = email_cfg.get("to")
|
|
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
|
|
|
|
return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "error": str(e)}), 500
|
|
finally:
|
|
os.environ["CHUNK_DIR"] = prev_chunk_dir
|
|
|
|
# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC)
|
|
@app.post("/collect")
|
|
@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility
|
|
def collect_hec():
|
|
try:
|
|
container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
|
# Accept either single JSON object or a list; we will write one line per event
|
|
body = request.get_json(force=True, silent=True)
|
|
if body is None:
|
|
return jsonify({"ok": False, "error": "invalid JSON"}), 400
|
|
|
|
lines = []
|
|
if isinstance(body, list):
|
|
for item in body:
|
|
lines.append(json.dumps(item, separators=(",", ":")))
|
|
else:
|
|
lines.append(json.dumps(body, separators=(",", ":")))
|
|
raw = ("\n".join(lines) + "\n").encode("utf-8")
|
|
|
|
blob_name = _upload_chunk_blob(container, raw, compressed=True)
|
|
|
|
# Enqueue a message your queue-worker understands
|
|
msg = {
|
|
"blob": {"container": container, "blob_name": blob_name},
|
|
# flip to true if you want emails by default
|
|
"email": {"send": False}
|
|
}
|
|
|
|
qc = _queue_client()
|
|
qc.send_message(json.dumps(msg, separators=(",", ":")))
|
|
|
|
return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "error": str(e)}), 500 |