Added new files for agent and mail processing.
This commit is contained in:
parent
6c372ac6cb
commit
ee1526cfd9
116
flask_app.py
Normal file
116
flask_app.py
Normal file
@ -0,0 +1,116 @@
|
||||
# flask_app.py
|
||||
import os, tempfile, time, gzip, json, pathlib
|
||||
from typing import Optional
|
||||
from flask import Flask, request, jsonify
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(os.getenv("ENV_FILE", ".env")) # load .env for local dev
|
||||
|
||||
# reuse your agent code
|
||||
from agent_runner import build_agent
|
||||
from notify import send_email
|
||||
|
||||
# optional Azure Blob (only if you want the API to pull blobs directly)
|
||||
try:
|
||||
from azure.storage.blob import BlobServiceClient
|
||||
except Exception:
|
||||
BlobServiceClient = None
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
|
||||
if not BlobServiceClient:
|
||||
raise RuntimeError("azure-storage-blob not installed. pip install azure-storage-blob")
|
||||
conn = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
||||
if not conn:
|
||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
||||
svc = BlobServiceClient.from_connection_string(conn)
|
||||
blob = svc.get_blob_client(container=container, blob=blob_name)
|
||||
data = blob.download_blob().readall()
|
||||
# keep original extension; agent reads .jsonl or .jsonl.gz
|
||||
fname = os.path.basename(blob_name)
|
||||
path = os.path.join(outdir, fname)
|
||||
with open(path, "wb") as f:
|
||||
f.write(data)
|
||||
return path
|
||||
|
||||
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
|
||||
# supports full SAS URL; simple requests-less impl via azure sdk if available
|
||||
if not BlobServiceClient:
|
||||
# fallback: use urllib
|
||||
import urllib.request
|
||||
data = urllib.request.urlopen(sas_url, timeout=30).read()
|
||||
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
|
||||
path = os.path.join(outdir, name)
|
||||
open(path, "wb").write(data)
|
||||
return path
|
||||
# with sdk: parse URL
|
||||
from azure.storage.blob import BlobClient
|
||||
blob = BlobClient.from_blob_url(sas_url)
|
||||
data = blob.download_blob().readall()
|
||||
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
|
||||
path = os.path.join(outdir, name)
|
||||
open(path, "wb").write(data)
|
||||
return path
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
return {"status": "ok"}, 200
|
||||
|
||||
@app.post("/analyze")
|
||||
def analyze():
|
||||
"""
|
||||
JSON body options:
|
||||
{
|
||||
"question": "custom question (optional)",
|
||||
"email": {"send": true, "to": "override@example.com"},
|
||||
"blob": {
|
||||
"container": "bank-logs", "blob_name": "intesa/2025/09/25/..chunk.jsonl.gz"
|
||||
# OR
|
||||
"sas_url": "https://...blob.core.windows.net/.../chunk.jsonl.gz?sig=..."
|
||||
}
|
||||
}
|
||||
"""
|
||||
t0 = time.time()
|
||||
payload = request.get_json(force=True, silent=True) or {}
|
||||
question = payload.get("question") or (
|
||||
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
|
||||
"Give a brief summary and next steps."
|
||||
)
|
||||
|
||||
temp_dir = None
|
||||
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
|
||||
try:
|
||||
# If a blob is provided, download to a temp directory and point agent to it.
|
||||
blob_req = payload.get("blob")
|
||||
if blob_req:
|
||||
temp_dir = tempfile.mkdtemp(prefix="agent_blob_")
|
||||
if blob_req.get("sas_url"):
|
||||
_download_sas_to_dir(blob_req["sas_url"], temp_dir)
|
||||
elif blob_req.get("container") and blob_req.get("blob_name"):
|
||||
_download_blob_to_dir(
|
||||
blob_req["container"], blob_req["blob_name"], temp_dir
|
||||
)
|
||||
else:
|
||||
return jsonify({"error": "blob requires either sas_url OR (container+blob_name)"}), 400
|
||||
os.environ["CHUNK_DIR"] = temp_dir # agent reads from here
|
||||
|
||||
agent = build_agent()
|
||||
out = agent.invoke({"input": question, "chat_history": []})
|
||||
result = out.get("output", "")
|
||||
|
||||
# Optional email
|
||||
email_cfg = payload.get("email") or {}
|
||||
if email_cfg.get("send"):
|
||||
to_addr = email_cfg.get("to")
|
||||
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
|
||||
|
||||
dt = round(time.time() - t0, 3)
|
||||
return jsonify({"ok": True, "duration_sec": dt, "result": result}), 200
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"ok": False, "error": str(e)}), 500
|
||||
|
||||
finally:
|
||||
# restore CHUNK_DIR if we changed it
|
||||
os.environ["CHUNK_DIR"] = prev_chunk_dir
|
||||
38
notify.py
Normal file
38
notify.py
Normal file
@ -0,0 +1,38 @@
|
||||
# notify.py
|
||||
import os, smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import formataddr
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# load .env automatically (ENV_FILE can override path)
|
||||
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
||||
|
||||
def send_email(subject: str, body_text: str, to_addr: str | None = None):
|
||||
if os.getenv("MAIL_ENABLED", "false").lower() != "true":
|
||||
print("[notify] MAIL_ENABLED != true; skipping email")
|
||||
return
|
||||
|
||||
smtp_host = os.getenv("SMTP_HOST")
|
||||
smtp_port = int(os.getenv("SMTP_PORT", "587"))
|
||||
smtp_user = os.getenv("SMTP_USER")
|
||||
smtp_pass = os.getenv("SMTP_PASS")
|
||||
mail_from = os.getenv("MAIL_FROM") or smtp_user
|
||||
mail_to = to_addr or os.getenv("MAIL_TO")
|
||||
|
||||
if not (smtp_host and smtp_user and smtp_pass and mail_to):
|
||||
print("[notify] missing SMTP config; skipping email")
|
||||
return
|
||||
|
||||
msg = MIMEText(body_text, "plain", "utf-8")
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = formataddr(("Intesa Logs Agent", mail_from))
|
||||
msg["To"] = mail_to
|
||||
|
||||
with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as s:
|
||||
try:
|
||||
s.starttls()
|
||||
except smtplib.SMTPException:
|
||||
pass
|
||||
s.login(smtp_user, smtp_pass)
|
||||
s.send_message(msg)
|
||||
print(f"[notify] sent email to {mail_to}")
|
||||
12
requirements.txt
Normal file
12
requirements.txt
Normal file
@ -0,0 +1,12 @@
|
||||
langchain>=0.3,<0.4
|
||||
langchain-core>=0.3.27,<0.4
|
||||
langchain-community>=0.3,<0.4
|
||||
langchain-openai>=0.2.12,<0.3
|
||||
openai>=1.40
|
||||
faiss-cpu==1.8.*
|
||||
ujson>=5
|
||||
pydantic>=2
|
||||
python-dotenv>=1
|
||||
flask>=3
|
||||
gunicorn>=21
|
||||
azure-storage-blob>=12
|
||||
Loading…
x
Reference in New Issue
Block a user