intesa_splunk_main/poller/file_poller.py

98 lines
3.3 KiB
Python

# poller/file_poller.py
import os, time, json, uuid, shutil, logging, pathlib, requests
from typing import List
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
INDIR = pathlib.Path(os.getenv("INDIR", "/app/in"))
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "/app/out")) # shared with agent-api
DONE = INDIR / "_done"
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
# Where the agent API lives (container name in compose or localhost outside)
API_BASE = os.getenv("AGENT_API_URL", "http://agent-api:8080").rstrip("/")
# Make the poller ask for emails automatically
EMAIL_SEND_DEFAULT = os.getenv("EMAIL_SEND_DEFAULT", "false").lower() in {"1", "true", "yes"}
QUESTION = os.getenv(
"QUESTION",
"Scan the latest chunks. List anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps."
)
def ensure_dirs():
INDIR.mkdir(parents=True, exist_ok=True)
DONE.mkdir(parents=True, exist_ok=True)
OUTDIR.mkdir(parents=True, exist_ok=True)
def _copy_to_out(src: pathlib.Path) -> pathlib.Path:
ts = int(time.time())
dst = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
shutil.copy2(src, dst)
return dst
def _call_analyze(email_send: bool) -> bool:
"""Tell the agent to analyze latest chunks (it reads CHUNK_DIR=/app/out)."""
url = f"{API_BASE}/analyze"
payload = {
"question": QUESTION,
"email": {"send": bool(email_send)}
}
try:
r = requests.post(url, json=payload, timeout=180)
if r.status_code // 100 == 2:
logging.info("analyze OK: %s", r.text[:400])
return True
logging.error("analyze HTTP %s: %s", r.status_code, r.text[:400])
return False
except Exception as e:
logging.error("analyze failed: %s", e)
return False
def _process_file(f: pathlib.Path) -> bool:
"""
Copy the file into OUTDIR as a chunk, call /analyze, move file to _done.
Return True if fully handled (so we move to _done), False to retry later.
"""
logging.info("processing %s", f.name)
try:
# 1) copy to /app/out where agent looks for chunk_*.jsonl
dst = _copy_to_out(f)
logging.info("placed chunk %s", dst.name)
# 2) trigger analysis (and email if enabled)
ok = _call_analyze(EMAIL_SEND_DEFAULT)
if not ok:
logging.info("will retry %s later", f.name)
return False
# 3) move original to _done
shutil.move(str(f), str(DONE / f.name))
logging.info("done %s", f.name)
return True
except Exception as e:
logging.error("error processing %s: %s", f.name, e)
logging.info("will retry %s later", f.name)
return False
def list_inputs() -> List[pathlib.Path]:
return sorted([p for p in INDIR.glob("*.jsonl") if p.is_file()])
def main():
ensure_dirs()
logging.info("file-poller watching %s -> %s target=analyze email_default=%s",
INDIR, f"{API_BASE}/analyze", EMAIL_SEND_DEFAULT)
while True:
files = list_inputs()
if not files:
time.sleep(SLEEP_SECONDS)
continue
for f in files:
_process_file(f)
# brief backoff between batches
time.sleep(2)
if __name__ == "__main__":
main()