98 lines
3.3 KiB
Python
98 lines
3.3 KiB
Python
# poller/file_poller.py
|
|
import os, time, json, uuid, shutil, logging, pathlib, requests
|
|
from typing import List
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
|
|
|
|
INDIR = pathlib.Path(os.getenv("INDIR", "/app/in"))
|
|
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "/app/out")) # shared with agent-api
|
|
DONE = INDIR / "_done"
|
|
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
|
|
|
|
# Where the agent API lives (container name in compose or localhost outside)
|
|
API_BASE = os.getenv("AGENT_API_URL", "http://agent-api:8080").rstrip("/")
|
|
|
|
# Make the poller ask for emails automatically
|
|
EMAIL_SEND_DEFAULT = os.getenv("EMAIL_SEND_DEFAULT", "false").lower() in {"1", "true", "yes"}
|
|
|
|
QUESTION = os.getenv(
|
|
"QUESTION",
|
|
"Scan the latest chunks. List anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
|
|
"Give a brief summary and next steps."
|
|
)
|
|
|
|
def ensure_dirs():
|
|
INDIR.mkdir(parents=True, exist_ok=True)
|
|
DONE.mkdir(parents=True, exist_ok=True)
|
|
OUTDIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _copy_to_out(src: pathlib.Path) -> pathlib.Path:
|
|
ts = int(time.time())
|
|
dst = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
|
|
shutil.copy2(src, dst)
|
|
return dst
|
|
|
|
def _call_analyze(email_send: bool) -> bool:
|
|
"""Tell the agent to analyze latest chunks (it reads CHUNK_DIR=/app/out)."""
|
|
url = f"{API_BASE}/analyze"
|
|
payload = {
|
|
"question": QUESTION,
|
|
"email": {"send": bool(email_send)}
|
|
}
|
|
try:
|
|
r = requests.post(url, json=payload, timeout=180)
|
|
if r.status_code // 100 == 2:
|
|
logging.info("analyze OK: %s", r.text[:400])
|
|
return True
|
|
logging.error("analyze HTTP %s: %s", r.status_code, r.text[:400])
|
|
return False
|
|
except Exception as e:
|
|
logging.error("analyze failed: %s", e)
|
|
return False
|
|
|
|
def _process_file(f: pathlib.Path) -> bool:
|
|
"""
|
|
Copy the file into OUTDIR as a chunk, call /analyze, move file to _done.
|
|
Return True if fully handled (so we move to _done), False to retry later.
|
|
"""
|
|
logging.info("processing %s", f.name)
|
|
try:
|
|
# 1) copy to /app/out where agent looks for chunk_*.jsonl
|
|
dst = _copy_to_out(f)
|
|
logging.info("placed chunk %s", dst.name)
|
|
|
|
# 2) trigger analysis (and email if enabled)
|
|
ok = _call_analyze(EMAIL_SEND_DEFAULT)
|
|
if not ok:
|
|
logging.info("will retry %s later", f.name)
|
|
return False
|
|
|
|
# 3) move original to _done
|
|
shutil.move(str(f), str(DONE / f.name))
|
|
logging.info("done %s", f.name)
|
|
return True
|
|
except Exception as e:
|
|
logging.error("error processing %s: %s", f.name, e)
|
|
logging.info("will retry %s later", f.name)
|
|
return False
|
|
|
|
def list_inputs() -> List[pathlib.Path]:
|
|
return sorted([p for p in INDIR.glob("*.jsonl") if p.is_file()])
|
|
|
|
def main():
|
|
ensure_dirs()
|
|
logging.info("file-poller watching %s -> %s target=analyze email_default=%s",
|
|
INDIR, f"{API_BASE}/analyze", EMAIL_SEND_DEFAULT)
|
|
while True:
|
|
files = list_inputs()
|
|
if not files:
|
|
time.sleep(SLEEP_SECONDS)
|
|
continue
|
|
for f in files:
|
|
_process_file(f)
|
|
# brief backoff between batches
|
|
time.sleep(2)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|