intesa_splunk/flask_app.py

117 lines
4.2 KiB
Python

# flask_app.py
import os, tempfile, time, gzip, json, pathlib
from typing import Optional
from flask import Flask, request, jsonify
from dotenv import load_dotenv
load_dotenv(os.getenv("ENV_FILE", ".env")) # load .env for local dev
# reuse your agent code
from agent_runner import build_agent
from notify import send_email
# optional Azure Blob (only if you want the API to pull blobs directly)
try:
from azure.storage.blob import BlobServiceClient
except Exception:
BlobServiceClient = None
app = Flask(__name__)
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
if not BlobServiceClient:
raise RuntimeError("azure-storage-blob not installed. pip install azure-storage-blob")
conn = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
if not conn:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
svc = BlobServiceClient.from_connection_string(conn)
blob = svc.get_blob_client(container=container, blob=blob_name)
data = blob.download_blob().readall()
# keep original extension; agent reads .jsonl or .jsonl.gz
fname = os.path.basename(blob_name)
path = os.path.join(outdir, fname)
with open(path, "wb") as f:
f.write(data)
return path
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
# supports full SAS URL; simple requests-less impl via azure sdk if available
if not BlobServiceClient:
# fallback: use urllib
import urllib.request
data = urllib.request.urlopen(sas_url, timeout=30).read()
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
path = os.path.join(outdir, name)
open(path, "wb").write(data)
return path
# with sdk: parse URL
from azure.storage.blob import BlobClient
blob = BlobClient.from_blob_url(sas_url)
data = blob.download_blob().readall()
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
path = os.path.join(outdir, name)
open(path, "wb").write(data)
return path
@app.get("/health")
def health():
return {"status": "ok"}, 200
@app.post("/analyze")
def analyze():
"""
JSON body options:
{
"question": "custom question (optional)",
"email": {"send": true, "to": "override@example.com"},
"blob": {
"container": "bank-logs", "blob_name": "intesa/2025/09/25/..chunk.jsonl.gz"
# OR
"sas_url": "https://...blob.core.windows.net/.../chunk.jsonl.gz?sig=..."
}
}
"""
t0 = time.time()
payload = request.get_json(force=True, silent=True) or {}
question = payload.get("question") or (
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps."
)
temp_dir = None
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
try:
# If a blob is provided, download to a temp directory and point agent to it.
blob_req = payload.get("blob")
if blob_req:
temp_dir = tempfile.mkdtemp(prefix="agent_blob_")
if blob_req.get("sas_url"):
_download_sas_to_dir(blob_req["sas_url"], temp_dir)
elif blob_req.get("container") and blob_req.get("blob_name"):
_download_blob_to_dir(
blob_req["container"], blob_req["blob_name"], temp_dir
)
else:
return jsonify({"error": "blob requires either sas_url OR (container+blob_name)"}), 400
os.environ["CHUNK_DIR"] = temp_dir # agent reads from here
agent = build_agent()
out = agent.invoke({"input": question, "chat_history": []})
result = out.get("output", "")
# Optional email
email_cfg = payload.get("email") or {}
if email_cfg.get("send"):
to_addr = email_cfg.get("to")
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
dt = round(time.time() - t0, 3)
return jsonify({"ok": True, "duration_sec": dt, "result": result}), 200
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
finally:
# restore CHUNK_DIR if we changed it
os.environ["CHUNK_DIR"] = prev_chunk_dir