# flask_app.py import os, tempfile, time, gzip, json, pathlib from typing import Optional from flask import Flask, request, jsonify from dotenv import load_dotenv load_dotenv(os.getenv("ENV_FILE", ".env")) # load .env for local dev # reuse your agent code from agent_runner import build_agent from notify import send_email # optional Azure Blob (only if you want the API to pull blobs directly) try: from azure.storage.blob import BlobServiceClient except Exception: BlobServiceClient = None app = Flask(__name__) def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str: if not BlobServiceClient: raise RuntimeError("azure-storage-blob not installed. pip install azure-storage-blob") conn = os.getenv("AZURE_STORAGE_CONNECTION_STRING") if not conn: raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") svc = BlobServiceClient.from_connection_string(conn) blob = svc.get_blob_client(container=container, blob=blob_name) data = blob.download_blob().readall() # keep original extension; agent reads .jsonl or .jsonl.gz fname = os.path.basename(blob_name) path = os.path.join(outdir, fname) with open(path, "wb") as f: f.write(data) return path def _download_sas_to_dir(sas_url: str, outdir: str) -> str: # supports full SAS URL; simple requests-less impl via azure sdk if available if not BlobServiceClient: # fallback: use urllib import urllib.request data = urllib.request.urlopen(sas_url, timeout=30).read() name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl" path = os.path.join(outdir, name) open(path, "wb").write(data) return path # with sdk: parse URL from azure.storage.blob import BlobClient blob = BlobClient.from_blob_url(sas_url) data = blob.download_blob().readall() name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl" path = os.path.join(outdir, name) open(path, "wb").write(data) return path @app.get("/health") def health(): return {"status": "ok"}, 200 @app.post("/analyze") def analyze(): """ JSON body options: { "question": "custom question (optional)", "email": {"send": true, "to": "override@example.com"}, "blob": { "container": "bank-logs", "blob_name": "intesa/2025/09/25/..chunk.jsonl.gz" # OR "sas_url": "https://...blob.core.windows.net/.../chunk.jsonl.gz?sig=..." } } """ t0 = time.time() payload = request.get_json(force=True, silent=True) or {} question = payload.get("question") or ( "Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " "Give a brief summary and next steps." ) temp_dir = None prev_chunk_dir = os.getenv("CHUNK_DIR", "./out") try: # If a blob is provided, download to a temp directory and point agent to it. blob_req = payload.get("blob") if blob_req: temp_dir = tempfile.mkdtemp(prefix="agent_blob_") if blob_req.get("sas_url"): _download_sas_to_dir(blob_req["sas_url"], temp_dir) elif blob_req.get("container") and blob_req.get("blob_name"): _download_blob_to_dir( blob_req["container"], blob_req["blob_name"], temp_dir ) else: return jsonify({"error": "blob requires either sas_url OR (container+blob_name)"}), 400 os.environ["CHUNK_DIR"] = temp_dir # agent reads from here agent = build_agent() out = agent.invoke({"input": question, "chat_history": []}) result = out.get("output", "") # Optional email email_cfg = payload.get("email") or {} if email_cfg.get("send"): to_addr = email_cfg.get("to") send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr) dt = round(time.time() - t0, 3) return jsonify({"ok": True, "duration_sec": dt, "result": result}), 200 except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 finally: # restore CHUNK_DIR if we changed it os.environ["CHUNK_DIR"] = prev_chunk_dir