diff --git a/.env b/.env new file mode 100644 index 0000000..467cc15 --- /dev/null +++ b/.env @@ -0,0 +1,32 @@ +# Azure (optional; only needed if SINK=blob or blob+sb) +AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;EndpointSuffix=core.windows.net" +AZURE_STORAGE_CONTAINER=bank-logs +AZURE_STORAGE_QUEUE_NAME=log-chunks +AZURE_SERVICEBUS_CONNECTION_STRING= +AZURE_STORAGE_CONTAINER =bank-logs + +AZURE_OPENAI_ENDPOINT=https://tf-in-dev-clz-core-aif.cognitiveservices.azure.com/ +AZURE_OPENAI_API_KEY=BwPkZje8Ifr51ob4gYKIj37L0OlvBGoQo4dqeebwdpz72cmhvJ0pJQQJ99BIACgEuAYXJ3w3AAAAACOGOfEc +AZURE_OPENAI_API_VERSION=2025-01-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini + +#ANALYZER_URL=https://tf-in-dev-chatapp-app.azurewebsites.net/analyze + +# Optional embeddings: +# AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT=text-embedding-3-small + +# --- Mail (Mailtrap SMTP Sandbox) --- +MAIL_ENABLED=true +SMTP_HOST=sandbox.smtp.mailtrap.io #smtp.office365.com +SMTP_PORT=2525 #587 + MAIL_TLS = true if outlook +SMTP_USER=14ea96c3766614 +SMTP_PASS=48b19e33a6290e +MAIL_FROM=alerts@intesa-pipeline.local +MAIL_TO=you@company.com + +RG=intesadev-rg +SA=tfindevst +QUEUE=log-chunks + +# (optional) where your chunks are: +CHUNK_DIR="./out" \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index d637b34..0000000 --- a/README.md +++ /dev/null @@ -1,82 +0,0 @@ -# Intesa Logs – Project Documentation - -This repo implements a small, production-style pipeline that inspects bank transfer (“**bonifico**”) logs, looks for anomalies (e.g., **rejected EUR ≥ 10,000**, **`vop_no_match`**, **invalid IBAN/BIC**), and produces a concise report (optionally emailed). - -It runs **locally via Docker** and is designed to be **deployable to Azure** using the same containers. - ---- - -## High-level flow - -**Splunk (HEC)** → **Poller** → *(Chunks: file or Azure Blob)* → *(Optional: Azure Queue message)* → **Analyzer API** → *(Optional: Email via Mailtrap)* - -- **Local mode:** Poller writes chunk **files** to a shared volume. Analyzer reads those files directly. -- **Azure mode (final target):** Poller uploads **blobs** to Storage (`bank-logs`) and enqueues messages to Storage Queue (`log-chunks`). A **Queue Worker** consumes queue messages and calls the Analyzer API. - ---- - -## Current state snapshot (what’s running now) - -### ✅ Running in Azure - -- **App Service (Agent API)** - - Name: `tf-in-dev-chatapp-app` - - Image: `tfindevacr.azurecr.io/agent-api:prod` (pulled from ACR via Managed Identity) - - Public endpoint: `https://tf-in-dev-chatapp-app.azurewebsites.net` - - Health: `GET /health` → `{"status":"ok"}` - - API: `POST /analyze` (see examples below) - -- **Azure Container Registry (ACR)** - - Name: `tfindevacr` - - Repos/tags present: - - `agent-api:prod` ✅ - - `queue-worker:prod` ✅ *(built & pushed; not yet deployed)* - -- **Azure Storage (data plane in use)** - - Storage account: `tfindevst` - - **Blob container:** `bank-logs` (holds `.jsonl` or `.jsonl.gz` chunks) - - **Queue:** `log-chunks` (messages the worker consumes) - -> The API is live in Azure. The **worker** and **Splunk** are still local right now. - -### ✅ Running locally (Docker Compose) - -- **Splunk** container (HEC exposed) -- **Poller** (`splunk_poller.py`) - - You can run it in either: - - `SINK=file` → write chunks to local volume (simple local dev), or - - `SINK=blob+queue` → upload to Azure Blob + enqueue Azure Queue (production-like) -- **Queue Worker** (`worker/`) - - Currently running **locally**, reading Azure Storage Queue and calling the Analyzer (either local API or Azure API based on `ANALYZER_URL`). - ---- - -## Repo structure - -```bash -# 1) Create a .env (see sample below) -# 2) Make sure compose.yaml has SINK=file (if local) or SINK=blob/blob+queue (if azure) for the poller -# 3) Start the stack -docker compose up -d - -# 4) Check health -curl -sS http://localhost:8080/health - -# 5) Send test events to Splunk HEC -for i in {1..5}; do - curl -k https://localhost:8088/services/collector/event \ - -H "Authorization: Splunk dev-0123456789abcdef" \ - -H "Content-Type: application/json" \ - -d '{"event":{"event_type":"bonifico","step":"esito","status":"accepted","importo": '"$((RANDOM%5000+50))"',"divisa":"EUR","transaction_id":"TX-'$RANDOM'"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' >/dev/null 2>&1 -done - -# 6) Add a couple of anomalies to exercise the analyzer -curl -k https://localhost:8088/services/collector/event \ - -H "Authorization: Splunk dev-0123456789abcdef" \ - -H "Content-Type: application/json" \ - -d '{"event":{"event_type":"bonifico","step":"esito","status":"rejected","importo":12500,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT1998*2*4*6*8*10*12*14*16*9375","iban_dest_masked":"IT1171*2*4*6*8*10*12*14*16*0000","bic_swift":"TESTBICX"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' - -# 7) Ask the Agent API to analyze the latest local chunks -curl -sS -X POST http://localhost:8080/analyze \ - -H 'Content-Type: application/json' \ - -d '{"question":"Scan latest chunks. Flag rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC.","email":{"send":false}}' | jq . diff --git a/REPOLAYOUT.md b/REPOLAYOUT.md deleted file mode 100644 index 082a9de..0000000 --- a/REPOLAYOUT.md +++ /dev/null @@ -1,11 +0,0 @@ -. -├─ api/ # Dockerfile for agent-api -├─ poller/ # Dockerfile for Splunk poller -├─ worker/ # Dockerfile for queue-worker (Azure mode only; not used locally) -├─ agent_runner.py # Analyzer orchestration -├─ flask_app.py # Flask API: /health, /analyze -├─ notify.py # SMTP (Mailtrap) email helper -├─ compose.yaml # Docker Compose stack -├─ requirements.txt -├─ sampleLogs.txt # misc sample content -└─ splunk_poller.py # Polls Splunk & writes chunk files diff --git a/api/Dockerfile b/api/Dockerfile index 3ec79b9..0272402 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -2,14 +2,16 @@ FROM python:3.12-slim WORKDIR /app +# deps COPY api/requirements.txt . RUN python -m pip install --upgrade pip setuptools wheel \ && pip install --no-cache-dir -r requirements.txt -# Bring in your app files from repo root -COPY agent_runner.py flask_app.py notify.py . +# app code (put everything at /app so imports stay "from notify import send_email") +COPY api/agent_runner.py ./agent_runner.py +COPY api/flask_app.py ./flask_app.py +COPY api/notify.py ./notify.py -# The agent loads .env if present; we'll mount it via env_file in compose ENV PYTHONUNBUFFERED=1 EXPOSE 8080 -CMD ["gunicorn", "-w", "2", "-b", "0.0.0.0:8080", "flask_app:app"] +CMD ["gunicorn","-w","2","-b","0.0.0.0:8080","flask_app:app"] diff --git a/agent_runner.py b/api/agent_runner.py similarity index 86% rename from agent_runner.py rename to api/agent_runner.py index cb45054..3b872bf 100644 --- a/agent_runner.py +++ b/api/agent_runner.py @@ -10,47 +10,28 @@ from langchain.tools import Tool from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -# ----- load .env (defaults to ./.env; override with ENV_FILE=/path/to/.env) ----- +# ----- load .env ----- load_dotenv(os.getenv("ENV_FILE", ".env")) -# ----- read env (supports both AZURE_* and AOAI_*) ----- +# ----- normalize endpoint ----- def _norm_endpoint(ep: str | None) -> str: if not ep: return "" ep = ep.strip().rstrip("/") - # strip any trailing /openai[/v...] ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep) return ep + "/" -AZ_ENDPOINT = _norm_endpoint( - os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT") -) -AZ_API_KEY = ( - os.getenv("AZURE_OPENAI_API_KEY") - or os.getenv("AOAI_API_KEY") - or os.getenv("OPENAI_API_KEY") -) -AZ_API_VERSION = ( - os.getenv("AZURE_OPENAI_API_VERSION") - or os.getenv("AOAI_API_VERSION") - or "2025-01-01-preview" -) -AZ_CHAT_DEPLOY = ( - os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") - or os.getenv("AOAI_CHAT_DEPLOYMENT") - or "gpt-4o-mini" -) -AZ_EMBED_DEPLOY = ( - os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT") - or os.getenv("AOAI_EMBED_DEPLOYMENT") - or "" -) +AZ_ENDPOINT = _norm_endpoint(os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT")) +AZ_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY") +AZ_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION") or os.getenv("AOAI_API_VERSION") or "2025-01-01-preview" +AZ_CHAT_DEPLOY = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") or os.getenv("AOAI_CHAT_DEPLOYMENT") or "gpt-4o-mini" +AZ_EMBED_DEPLOY = os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT") or os.getenv("AOAI_EMBED_DEPLOYMENT") or "" # ----- local data config ----- CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") BLOB_DIR = os.getenv("BLOB_DIR", "") TOP_K = int(os.getenv("TOP_K", "12")) -# ---------- Helpers to build LLM/Embeddings for Azure OpenAI ---------- +# ---------- LLM and embeddings ---------- def make_llm(temperature: float = 0.2) -> AzureChatOpenAI: if not AZ_ENDPOINT or not AZ_API_KEY: raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).") @@ -77,14 +58,19 @@ def _iter_chunk_files() -> List[pathlib.Path]: paths: List[pathlib.Path] = [] if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists(): paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")] + paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/hec_*.jsonl*")] if BLOB_DIR and pathlib.Path(BLOB_DIR).exists(): paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)] + paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/hec_*.jsonl*", recursive=True)] return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True) def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]: data = path.read_bytes() if path.suffix == ".gz": - data = gzip.decompress(data) + try: + data = gzip.decompress(data) + except Exception: + pass out: List[Dict[str, Any]] = [] for ln in data.splitlines(): if not ln.strip(): continue @@ -94,7 +80,6 @@ def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]: continue return out -# Accept either raw events or HEC-shaped {"event": {...}} def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]: return rec.get("event", rec) @@ -105,7 +90,7 @@ def _evt_to_text(evt: Dict[str, Any]) -> str: parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None] return "bonifico | " + " | ".join(parts) -# ---------- Build vector store (only if embeddings deployment exists) ---------- +# ---------- Vector store ---------- def build_vectorstore(limit_files: int = 20): embs = make_embeddings() if embs is None: @@ -149,7 +134,6 @@ def stats_tool_impl(query: str = "") -> str: for rec in _read_jsonl(fp): events.append(_normalize_event(rec)) - # parse filters q = query.lower() def _kv(key, pat=r"([^\s]+)"): m = re.search(fr"{key}:{pat}", q) @@ -163,6 +147,7 @@ def stats_tool_impl(query: str = "") -> str: instant_s = _kv("instant") min_amt_s = _kv("min_amount") min_amt = float(min_amt_s) if min_amt_s else 0.0 + inst_f = None if instant_s in {"true","false"}: inst_f = (instant_s == "true") @@ -185,7 +170,6 @@ def stats_tool_impl(query: str = "") -> str: if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f: return False if country: - # heuristic from IBAN (dest or origin) iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper() if not iban.startswith(country.upper()): return False @@ -207,24 +191,20 @@ def stats_tool_impl(query: str = "") -> str: def retrieve_tool_impl(question: str) -> str: vs, _ = build_vectorstore() docs = vs.similarity_search(question, k=TOP_K) - return "\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs)) + return "\n".join(f\"[{i+1}] {d.page_content}\" for i, d in enumerate(docs)) def raw_sample_tool_impl(arg: str = "") -> str: """ Return a few raw JSON events from the newest chunks. - Accepts the same filters as get_stats PLUS optional 'n:' to control how many. - Examples: - 'n:5 status:rejected min_amount:10000' - 'divisa:EUR instant:true step:esito n:3' + Accepts the same filters as get_stats PLUS optional 'n:'. """ q = (arg or "").lower() - # helpers (same parsing as get_stats) def _kv(key, pat=r"([^\s]+)"): m = re.search(fr"{key}:{pat}", q) return m.group(1) if m else None - n_s = _kv("n", r"(\d+)") + n_s = _kv("n", r"(\\d+)") n = int(n_s) if n_s else 5 status_f = _kv("status") step_f = _kv("step") @@ -262,7 +242,6 @@ def raw_sample_tool_impl(arg: str = "") -> str: return False return True - # load newest events and filter files = _iter_chunk_files() out = [] for fp in files: @@ -277,8 +256,7 @@ def raw_sample_tool_impl(arg: str = "") -> str: if not out: return "(no matching events)" - return "\n".join(out) - + return "\\n".join(out) # ---------- Build the agent ---------- def build_agent(): @@ -287,7 +265,7 @@ def build_agent(): Tool(name="get_stats", func=stats_tool_impl, description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."), Tool(name="raw_samples", func=raw_sample_tool_impl, - description="Return a few raw JSON events. Accepts filters like get_stats and 'n:'. Example: 'n:5 status:rejected min_amount:10000'.") + description="Return a few raw JSON events. Accepts filters like get_stats and 'n:'."), ] if AZ_EMBED_DEPLOY: tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl, @@ -317,15 +295,12 @@ def run_default_question(question_override: str | None = None): ) out = agent.invoke({"input": question, "chat_history": []}) result = out.get("output", "") - print("\n=== AGENT OUTPUT ===\n", result) - - # Email the result if MAIL_ENABLED=true (handled inside notify.py) + print("\\n=== AGENT OUTPUT ===\\n", result) try: send_email(subject="[Intesa Logs] Agent Report", body_text=result) except Exception as e: print("[notify] email failed:", e) if __name__ == "__main__": - # optional CLI: allow a custom question custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None run_default_question(custom if custom else None) diff --git a/api/flask_app.py b/api/flask_app.py index b1833cd..4b26335 100644 --- a/api/flask_app.py +++ b/api/flask_app.py @@ -1,32 +1,24 @@ -# flask_app.py -import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt -from typing import Optional +# api/flask_app.py +import os, io, glob, gzip, json, time, uuid, tempfile, pathlib, datetime as dt +from typing import List, Dict, Any, Tuple +from urllib.parse import urlparse from flask import Flask, request, jsonify from dotenv import load_dotenv # Load .env locally (App Service uses App Settings instead) load_dotenv(os.getenv("ENV_FILE", ".env")) -# Agent + email -from agent_runner import build_agent +# Optional email from notify import send_email -# Azure SDKs (guarded imports so we don't crash at boot) +# ---------------- Azure SDKs (guarded) ---------------- try: - from azure.storage.blob import BlobServiceClient, ContentSettings + from azure.storage.blob import BlobServiceClient, BlobClient except Exception: BlobServiceClient = None - ContentSettings = None + BlobClient = None -try: - from azure.storage.queue import QueueClient -except Exception: - QueueClient = None - -app = Flask(__name__) - -# -------- Helpers -------- -def _blob_client() -> BlobServiceClient: +def _blob_client() -> "BlobServiceClient": if not BlobServiceClient: raise RuntimeError("azure-storage-blob not installed") cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") @@ -34,152 +26,448 @@ def _blob_client() -> BlobServiceClient: raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") return BlobServiceClient.from_connection_string(cs) -def _queue_client() -> QueueClient: - if not QueueClient: - raise RuntimeError("azure-storage-queue not installed") - cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") - if not cs: - raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") - qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") - qc = QueueClient.from_connection_string(cs, qname) - try: - qc.create_queue() - except Exception: - pass - return qc - -def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str: - svc = _blob_client() - cc = svc.get_container_client(container) - try: - cc.create_container() - except Exception: - pass - ext = "jsonl.gz" if compressed else "jsonl" - # folder scheme matches poller - prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}" - blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}" - data = gzip.compress(raw_bytes) if compressed else raw_bytes - settings = ContentSettings( - content_type="application/json", - content_encoding=("gzip" if compressed else None), - ) - bc = cc.get_blob_client(blob_name) - bc.upload_blob(data, overwrite=True, content_settings=settings) - return blob_name - -def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str: +def _download_blob_bytes(container: str, blob_name: str) -> bytes: svc = _blob_client() blob = svc.get_blob_client(container=container, blob=blob_name) - data = blob.download_blob().readall() - fname = os.path.basename(blob_name) - path = os.path.join(outdir, fname) - with open(path, "wb") as f: - f.write(data) - return path + return blob.download_blob().readall() -def _download_sas_to_dir(sas_url: str, outdir: str) -> str: - if not BlobServiceClient: - # ultra-light fallback - import urllib.request - data = urllib.request.urlopen(sas_url, timeout=30).read() - else: - from azure.storage.blob import BlobClient +def _download_sas_bytes(sas_url: str) -> bytes: + if BlobClient: blob = BlobClient.from_blob_url(sas_url) - data = blob.download_blob().readall() - name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl" - path = os.path.join(outdir, name) - open(path, "wb").write(data) + return blob.download_blob().readall() + else: + import urllib.request + return urllib.request.urlopen(sas_url, timeout=30).read() + +# ---------------- Agent (guarded) ---------------- +_CAN_LLM = True +try: + import agent_runner # we'll temporarily set agent_runner.CHUNK_DIR when a blob is provided + build_agent = agent_runner.build_agent +except Exception: + _CAN_LLM = False + agent_runner = None + build_agent = None + +app = Flask(__name__) + +# ---------------- Config ---------------- +CHUNK_DIR = pathlib.Path(os.getenv("CHUNK_DIR", "/app/out")) +CHUNK_DIR.mkdir(parents=True, exist_ok=True) + +AUTO_ANALYZE = os.getenv("AUTO_ANALYZE", "true").lower() in {"1","true","yes"} +DEFAULT_QUESTION = os.getenv("DEFAULT_QUESTION", + "Scan the latest chunks. List anomalies (rejected EUR >= 10000 and vop_no_match). " + "Provide brief evidence and next steps." +) +EMAIL_SEND_BY_DEFAULT = os.getenv("MAIL_ENABLED", "false").lower() in {"1","true","yes"} + +# ---------------- Helpers ---------------- +def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]: + """Accept raw or HEC-shaped records (`{'event': {...}}`).""" + return rec.get("event", rec) + +def _write_chunk_from_lines(jsonl_lines: List[bytes], dest_dir: pathlib.Path) -> pathlib.Path: + name = f"chunk_{int(time.time())}_{uuid.uuid4().hex[:8]}.jsonl.gz" + path = dest_dir / name + buf = io.BytesIO() + with gzip.GzipFile(fileobj=buf, mode="wb") as gz: + for ln in jsonl_lines: + gz.write(ln if ln.endswith(b"\n") else ln + b"\n") + path.write_bytes(buf.getvalue()) return path -# -------- Routes -------- +def _bytes_is_gzip(b: bytes) -> bool: + return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B + +def _decode_utf8(b: bytes) -> str: + # be permissive; some exports contain odd characters / BOM + return b.decode("utf-8", errors="replace").lstrip("\ufeff") + +def _normalize_bytes_to_jsonl_lines(data: bytes) -> Tuple[List[bytes], int]: + """ + Accepts: JSONL, JSONL.GZ, JSON array, or single JSON object. + Returns: (list of JSONL lines as bytes, number_of_events) + Raises: ValueError if format is not recognized. + """ + # decompress if gz + if _bytes_is_gzip(data): + try: + data = gzip.decompress(data) + except Exception: + raise ValueError("Invalid gzip stream") + + text = _decode_utf8(data).strip() + if not text: + raise ValueError("Empty blob") + + # Try JSON first (object or array) + try: + parsed = json.loads(text) + if isinstance(parsed, dict): + ev = _normalize_event(parsed) + return [json.dumps(ev, separators=(",", ":")).encode("utf-8")], 1 + if isinstance(parsed, list): + lines = [] + count = 0 + for item in parsed: + if isinstance(item, (dict,)): + ev = _normalize_event(item) + lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8")) + count += 1 + if count == 0: + raise ValueError("JSON array has no objects") + return lines, count + # If JSON but not dict/list, fall through to JSONL attempt + except Exception: + pass + + # Try JSONL (one JSON object per line) + lines = [] + count = 0 + for raw in text.splitlines(): + s = raw.strip() + if not s: + continue + try: + obj = json.loads(s) + ev = _normalize_event(obj) + lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8")) + count += 1 + except Exception: + # Not valid JSON per line -> not JSONL + raise ValueError("Unrecognized blob format. Expected JSONL, JSONL.GZ, JSON array, or single JSON object.") + if count == 0: + raise ValueError("No JSON objects found") + return lines, count + +def _iter_recent_events(limit_files: int = 10, limit_events: int = 5000) -> List[Dict[str, Any]]: + """Load recent events from newest chunk_*.jsonl(.gz) and hec_*.jsonl(.gz) in CHUNK_DIR.""" + patterns = ["chunk_*.jsonl*", "hec_*.jsonl*"] + files: List[pathlib.Path] = [] + for pat in patterns: + files += [pathlib.Path(p) for p in glob.glob(str(CHUNK_DIR / pat))] + files = sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)[:limit_files] + + out: List[Dict[str, Any]] = [] + for fp in files: + data = fp.read_bytes() + if fp.suffix == ".gz": + data = gzip.decompress(data) + for ln in data.splitlines(): + if not ln.strip(): + continue + try: + rec = json.loads(ln) + out.append(_normalize_event(rec)) + except Exception: + continue + if len(out) >= limit_events: + return out + return out + +def _as_float(x) -> float: + try: + return float(x) + except Exception: + return 0.0 + +def _rules_only_report() -> str: + """Simple rules when AOAI creds are not set.""" + evts = _iter_recent_events(limit_files=20, limit_events=20000) + total = len(evts) + rej_hi, vop_no = [], [] + + for e in evts: + status = (e.get("status") or "").lower() + divisa = (e.get("divisa") or "").upper() + amt = _as_float(e.get("importo")) + vop = (e.get("vop_check") or "").lower() + if status == "rejected" and divisa == "EUR" and amt >= 10000: + rej_hi.append({"transaction_id": e.get("transaction_id"), "importo": amt, "step": e.get("step")}) + if vop in {"no_match", "vop_no_match"}: + vop_no.append({"transaction_id": e.get("transaction_id"), "step": e.get("step")}) + + lines = [] + lines.append("### Findings") + lines.append(f"- Total events scanned: {total}") + lines.append(f"- High-value rejected (EUR≥10000): {len(rej_hi)}") + lines.append(f"- VOP no_match: {len(vop_no)}") + if rej_hi[:5]: + lines.append(" - Examples (rejected): " + ", ".join( + f"{i.get('transaction_id')}({i.get('importo')})" for i in rej_hi[:5] if i.get('transaction_id') + )) + if vop_no[:5]: + lines.append(" - Examples (vop_no_match): " + ", ".join( + f"{i.get('transaction_id')}" for i in vop_no[:5] if i.get('transaction_id') + )) + lines.append("") + lines.append("### Recommended actions") + for r in ( + "Validate VOP mismatches; confirm beneficiary details.", + "Investigate rejection reasons for all EUR≥10k transactions.", + "Check spike trends by hour/day and counterparties.", + ): + lines.append(f"- {r}") + return "\n".join(lines) + +def _try_llm_report(question: str) -> str: + if not _CAN_LLM or not build_agent: + raise RuntimeError("LLM agent not available") + agent = build_agent() + out = agent.invoke({"input": question, "chat_history": []}) + return out.get("output", "") or "(no output from agent)" + +def _run_analysis(question: str) -> Dict[str, Any]: + t0 = time.time() + used = "rules" + try: + if _CAN_LLM and os.getenv("AZURE_OPENAI_ENDPOINT") and ( + os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY") + ): + txt = _try_llm_report(question) + used = "llm" + else: + txt = _rules_only_report() + except Exception as e: + txt = _rules_only_report() + used = f"rules (fallback from llm error: {str(e)[:120]})" + return {"ok": True, "analyzer": used, "duration_sec": round(time.time() - t0, 3), "result": txt} + +# ---------------- Routes ---------------- @app.get("/health") def health(): - return {"status": "ok"}, 200 + return {"status": "ok", "auto_analyze": AUTO_ANALYZE, "chunk_dir": str(CHUNK_DIR)}, 200 + +@app.post("/ingest") +def ingest(): + """ + POST JSON: + { "events": [ {..}, {..} ] } OR a single object {..} + Writes a chunk and (optionally) auto-analyzes. + """ + body = request.get_json(force=True, silent=True) + if body is None: + return jsonify({"ok": False, "error": "invalid JSON"}), 400 + + # normalize to list (support HEC-style {"event": {...}}) + events: List[Dict[str, Any]] = [] + if isinstance(body, dict) and "events" in body and isinstance(body["events"], list): + events = [_normalize_event(e) for e in body["events"] if isinstance(e, dict)] + elif isinstance(body, list): + events = [_normalize_event(e) for e in body if isinstance(e, dict)] + elif isinstance(body, dict): + events = [_normalize_event(body)] + else: + return jsonify({"ok": False, "error": "payload must be an object or list"}), 400 + + if not events: + return jsonify({"ok": False, "error": "no events to ingest"}), 400 + + lines = [json.dumps(e, separators=(",", ":")).encode("utf-8") for e in events] + path = _write_chunk_from_lines(lines, CHUNK_DIR) + + report = None + if AUTO_ANALYZE: + report = _run_analysis(DEFAULT_QUESTION) + if EMAIL_SEND_BY_DEFAULT: + try: + send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"]) + except Exception: + pass + + return jsonify({ + "ok": True, + "written": len(events), + "chunk_file": path.name, + "auto_analyzed": bool(AUTO_ANALYZE), + "report": report, + }), 200 + +@app.post("/ingest_blob") +def ingest_blob(): + """ + Accept a blob, normalize it to chunk_*.jsonl.gz in CHUNK_DIR, optionally analyze. + + Body: + { + "container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]" + // OR + "sas_url": "https://.../file.jsonl[.gz]?sig=..." + "analyze": true, + "email": {"send": true, "to": "x@x"} + } + """ + payload = request.get_json(force=True, silent=True) or {} + analyze = bool(payload.get("analyze")) + data: bytes = b"" + + try: + if payload.get("sas_url"): + data = _download_sas_bytes(payload["sas_url"]) + elif payload.get("container") and payload.get("blob_name"): + data = _download_blob_bytes(payload["container"], payload["blob_name"]) + else: + return jsonify({"ok": False, "error": "Provide 'sas_url' OR ('container' + 'blob_name')"}), 400 + + lines, count = _normalize_bytes_to_jsonl_lines(data) + path = _write_chunk_from_lines(lines, CHUNK_DIR) + + report = None + if analyze: + report = _run_analysis(DEFAULT_QUESTION) + email_cfg = payload.get("email") or {} + if email_cfg.get("send"): + try: + send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"], to_addr=email_cfg.get("to")) + except Exception: + pass + + return jsonify({"ok": True, "written": count, "chunk_file": path.name, "report": report}), 200 + + except ValueError as ve: + return jsonify({"ok": False, "error": str(ve)}), 400 + except Exception as e: + return jsonify({"ok": False, "error": str(e)}), 500 @app.post("/analyze") def analyze(): """ - POST JSON: + Manually trigger analysis over the newest chunks, OR over a specific blob. + + Body options: { - "question": "...optional custom question...", - "email": {"send": true, "to": "override@example.com"}, + "question": "...", + "email": {"send": true, "to": "x@x"}, "blob": { - "container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]" + "container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]" // OR - "sas_url": "https://.../chunk.jsonl.gz?sig=..." + "sas_url": "https://.../file.jsonl[.gz]?sig=..." } } """ - t0 = time.time() + global CHUNK_DIR # declare BEFORE any use in this function payload = request.get_json(force=True, silent=True) or {} - question = payload.get("question") or ( - "Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " - "Give a brief summary and next steps." - ) + question = payload.get("question") or DEFAULT_QUESTION - prev_chunk_dir = os.getenv("CHUNK_DIR", "./out") tmp_dir = None + prev_env_chunk = os.getenv("CHUNK_DIR") + prev_agent_chunk = (getattr(agent_runner, "CHUNK_DIR", str(CHUNK_DIR)) + if agent_runner else str(CHUNK_DIR)) + prev_local_chunk = CHUNK_DIR + try: blob_req = payload.get("blob") if blob_req: tmp_dir = tempfile.mkdtemp(prefix="agent_blob_") + # download to bytes if blob_req.get("sas_url"): - _download_sas_to_dir(blob_req["sas_url"], tmp_dir) + data = _download_sas_bytes(blob_req["sas_url"]) elif blob_req.get("container") and blob_req.get("blob_name"): - _download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir) + data = _download_blob_bytes(blob_req["container"], blob_req["blob_name"]) else: return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400 + + # normalize to a real chunk_*.jsonl.gz in tmp_dir + lines, _ = _normalize_bytes_to_jsonl_lines(data) + _write_chunk_from_lines(lines, pathlib.Path(tmp_dir)) + + # re-point CHUNK_DIR for this request and agent_runner + CHUNK_DIR = pathlib.Path(tmp_dir) os.environ["CHUNK_DIR"] = tmp_dir + if agent_runner: + agent_runner.CHUNK_DIR = tmp_dir + if hasattr(agent_runner, "BLOB_DIR"): + agent_runner.BLOB_DIR = "" - agent = build_agent() - out = agent.invoke({"input": question, "chat_history": []}) - result = out.get("output", "") + # run analysis + res = _run_analysis(question) + # optional email email_cfg = payload.get("email") or {} if email_cfg.get("send"): - to_addr = email_cfg.get("to") - send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr) + try: + send_email(subject="[Intesa Logs] Report", body_text=res["result"], to_addr=email_cfg.get("to")) + except Exception: + pass - return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200 + return jsonify(res), 200 + except ValueError as ve: + return jsonify({"ok": False, "error": str(ve)}), 400 except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 finally: - os.environ["CHUNK_DIR"] = prev_chunk_dir - -# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC) -@app.post("/collect") -@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility -def collect_hec(): - try: - container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") - # Accept either single JSON object or a list; we will write one line per event - body = request.get_json(force=True, silent=True) - if body is None: - return jsonify({"ok": False, "error": "invalid JSON"}), 400 - - lines = [] - if isinstance(body, list): - for item in body: - lines.append(json.dumps(item, separators=(",", ":"))) + # restore CHUNK_DIR context + if prev_env_chunk is None: + os.environ.pop("CHUNK_DIR", None) else: - lines.append(json.dumps(body, separators=(",", ":"))) - raw = ("\n".join(lines) + "\n").encode("utf-8") + os.environ["CHUNK_DIR"] = prev_env_chunk + if agent_runner: + agent_runner.CHUNK_DIR = prev_agent_chunk + CHUNK_DIR = pathlib.Path(prev_local_chunk) + CHUNK_DIR.mkdir(parents=True, exist_ok=True) - blob_name = _upload_chunk_blob(container, raw, compressed=True) +# ---------------- Event Grid webhook ---------------- +@app.post("/eventgrid") +def eventgrid(): + """ + Webhook for Azure Event Grid (Storage -> BlobCreated). + - Handles SubscriptionValidation by echoing validationResponse. + - For each BlobCreated event, extracts container/blob and ingests it. + - Optional auto-analyze + email is controlled by: + EVENTGRID_ANALYZE_DEFAULT (true/false) [default: true] + EVENTGRID_EMAIL_DEFAULT (true/false) [default: MAIL_ENABLED] + """ + try: + events = request.get_json(force=True, silent=True) + if not events: + return jsonify({"ok": False, "error": "No events"}), 400 + if isinstance(events, dict): + events = [events] - # Enqueue a message your queue-worker understands - msg = { - "blob": {"container": container, "blob_name": blob_name}, - # flip to true if you want emails by default - "email": {"send": False} - } + analyze_default = os.getenv("EVENTGRID_ANALYZE_DEFAULT", "true").lower() in {"1","true","yes"} + email_default = os.getenv("EVENTGRID_EMAIL_DEFAULT", os.getenv("MAIL_ENABLED", "false")).lower() in {"1","true","yes"} + target_container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") - qc = _queue_client() - qc.send_message(json.dumps(msg, separators=(",", ":"))) + for ev in events: + et = ev.get("eventType") + if et == "Microsoft.EventGrid.SubscriptionValidationEvent": + validation_code = ev.get("data", {}).get("validationCode") + return jsonify({"validationResponse": validation_code}) - return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200 + if et in ("Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobRenamed"): + url = (ev.get("data", {}) or {}).get("url") + if not url: + continue + u = urlparse(url) + # path like: /container/dir1/dir2/file.json + parts = [p for p in u.path.split("/") if p] + if len(parts) < 2: + continue + container = parts[0] + blob_name = "/".join(parts[1:]) + # Only act on the configured container + if container != target_container: + continue + + # Download the blob, normalize, write chunk, analyze/email if enabled + data = _download_blob_bytes(container, blob_name) + lines, _ = _normalize_bytes_to_jsonl_lines(data) + _write_chunk_from_lines(lines, CHUNK_DIR) + + if analyze_default: + rep = _run_analysis(DEFAULT_QUESTION) + if email_default: + try: + send_email(subject="[Intesa Logs] Auto Analysis", body_text=rep["result"]) + except Exception: + pass + + return jsonify({"ok": True}), 200 + + except ValueError as ve: + return jsonify({"ok": False, "error": str(ve)}), 400 except Exception as e: - return jsonify({"ok": False, "error": str(e)}), 500 \ No newline at end of file + return jsonify({"ok": False, "error": str(e)}), 500 diff --git a/api/notify.py b/api/notify.py new file mode 100644 index 0000000..7c8576b --- /dev/null +++ b/api/notify.py @@ -0,0 +1,67 @@ +# api/notify.py +import os, smtplib +from email.mime.text import MIMEText +from email.utils import formataddr +from dotenv import load_dotenv + +# Load .env before reading anything +load_dotenv(os.getenv("ENV_FILE", ".env")) + +_TRUES = {"1", "true", "yes", "on"} + +def _b(s: str | None, default=False) -> bool: + if s is None: + return default + return str(s).strip().lower() in _TRUES + +def _first(*names: str, default: str | None = None) -> str | None: + for n in names: + v = os.getenv(n) + if v is not None and str(v).strip() != "": + return v + return default + +def _cfg(): + """Merge MAIL_* and SMTP_* envs; MAIL_* wins if both present.""" + enabled = _b(_first("MAIL_ENABLED"), default=False) + host = _first("MAIL_HOST", "SMTP_HOST") + port = int(_first("MAIL_PORT", "SMTP_PORT", default="0") or "0") + user = _first("MAIL_USER", "SMTP_USER") + pwd = _first("MAIL_PASSWORD", "SMTP_PASS") + mail_from = _first("MAIL_FROM", default=user) + mail_to = _first("MAIL_TO_DEFAULT", "MAIL_TO") + tls = _b(_first("MAIL_TLS"), default=False) + return { + "enabled": enabled, + "host": host, "port": port, "user": user, "pwd": pwd, + "from": mail_from, "to": mail_to, "tls": tls + } + +def send_email(subject: str, body_text: str, to_addr: str | None = None): + cfg = _cfg() + if not cfg["enabled"]: + print("[notify] MAIL_ENABLED is not true; skipping email") + return + + host, port, user, pwd = cfg["host"], cfg["port"], cfg["user"], cfg["pwd"] + mail_from = cfg["from"] + mail_to = to_addr or cfg["to"] + + if not (host and port and user and pwd and mail_to): + print("[notify] missing SMTP config; skipping email") + return + + msg = MIMEText(body_text, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = formataddr(("Intesa Logs Agent", mail_from)) + msg["To"] = mail_to + + with smtplib.SMTP(host, port, timeout=20) as s: + if cfg["tls"]: + try: + s.starttls() + except smtplib.SMTPException: + pass + s.login(user, pwd) + s.send_message(msg) + print(f"[notify] sent email to {mail_to}") diff --git a/api/requirements.txt b/api/requirements.txt index aea7bb5..b5407e3 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -3,7 +3,6 @@ langchain-core>=0.3.27,<0.4 langchain-community>=0.3,<0.4 langchain-openai>=0.2.12,<0.3 openai>=1.40 -faiss-cpu==1.8.* ujson>=5 pydantic>=2 python-dotenv>=1 diff --git a/blob-ingestor/BlobIngest/__init__.py b/blob-ingestor/BlobIngest/__init__.py new file mode 100644 index 0000000..e0058b5 --- /dev/null +++ b/blob-ingestor/BlobIngest/__init__.py @@ -0,0 +1,24 @@ +import os, json, logging, datetime as dt +import azure.functions as func +from azure.storage.blob import BlobServiceClient + +# Source storage (same one your trigger points to) +BLOB_CONN = os.environ["BLOB_CONN"] # set as App Setting +CONTAINER = "bank-logs" # container your trigger uses + +def main(blob: func.InputStream): + # 1) Log to App Insights (if configured) + logging.info("BlobIngest fired: name=%s len=%d", blob.name, blob.length) + + # 2) Write a marker blob so we can *see* it fired from Storage itself + svc = BlobServiceClient.from_connection_string(BLOB_CONN) + marker_name = f"intesa/_diag/processed-{dt.datetime.utcnow().strftime('%Y%m%dT%H%M%S')}.txt" + payload = { + "source": blob.name, + "length": blob.length, + "seen_at_utc": dt.datetime.utcnow().isoformat() + "Z" + } + svc.get_blob_client(CONTAINER, marker_name).upload_blob( + json.dumps(payload).encode("utf-8"), + overwrite=True + ) diff --git a/blob-ingestor/BlobIngest/__init__prod.py b/blob-ingestor/BlobIngest/__init__prod.py new file mode 100644 index 0000000..5c3656f --- /dev/null +++ b/blob-ingestor/BlobIngest/__init__prod.py @@ -0,0 +1,51 @@ +#THIS IS WHAT IS USED IN PROD, CURRENT MAIN INIT IS FOR TESTING ONLY +import os, json, gzip, logging, requests +import azure.functions as func + +AGENT_API_URL = os.getenv("AGENT_API_URL") # e.g. https://agent-api-app.azurewebsites.net + +def _to_events(data: bytes): + # gunzip if needed + if len(data) >= 2 and data[0] == 0x1F and data[1] == 0x8B: + data = gzip.decompress(data) + text = data.decode("utf-8", errors="replace").lstrip("\ufeff").strip() + if not text: + return [] + # try JSON (object or array) + try: + parsed = json.loads(text) + if isinstance(parsed, dict): + return [parsed] + if isinstance(parsed, list): + return [x for x in parsed if isinstance(x, dict)] + except Exception: + pass + # try JSONL + out = [] + for ln in text.splitlines(): + s = ln.strip() + if not s: + continue + try: + out.append(json.loads(s)) + except Exception: + pass + return out + +def main(inputblob: func.InputStream): + if not AGENT_API_URL: + logging.warning("AGENT_API_URL not set; skipping.") + return + + events = _to_events(inputblob.read()) + if not events: + logging.info("No JSON events parsed from blob %s", inputblob.name) + return + + try: + url = AGENT_API_URL.rstrip("/") + "/ingest" + r = requests.post(url, json={"events": events}, timeout=30) + logging.info("Agent ingest -> %s %s", r.status_code, r.text[:300]) + r.raise_for_status() + except Exception as e: + logging.exception("Ingest failed: %s", e) diff --git a/blob-ingestor/BlobIngest/function.json b/blob-ingestor/BlobIngest/function.json new file mode 100644 index 0000000..188a719 --- /dev/null +++ b/blob-ingestor/BlobIngest/function.json @@ -0,0 +1,11 @@ +{ + "bindings": [ + { + "type": "blobTrigger", + "direction": "in", + "name": "blob", + "path": "bank-logs/intesa/test-exact.json", + "connection": "BLOB_CONN" + } + ] +} diff --git a/blob-ingestor/Ping/__init__.py b/blob-ingestor/Ping/__init__.py new file mode 100644 index 0000000..e4212b7 --- /dev/null +++ b/blob-ingestor/Ping/__init__.py @@ -0,0 +1,4 @@ +import azure.functions as func + +def main(req: func.HttpRequest) -> func.HttpResponse: + return func.HttpResponse("pong", status_code=200) diff --git a/blob-ingestor/Ping/function.json b/blob-ingestor/Ping/function.json new file mode 100644 index 0000000..1a0866b --- /dev/null +++ b/blob-ingestor/Ping/function.json @@ -0,0 +1,17 @@ +{ + "bindings": [ + { + "type": "httpTrigger", + "direction": "in", + "authLevel": "anonymous", + "name": "req", + "methods": [ "get", "post" ], + "route": "ping" + }, + { + "type": "http", + "direction": "out", + "name": "$return" + } + ] +} diff --git a/blob-ingestor/host.json b/blob-ingestor/host.json new file mode 100644 index 0000000..2efe4df --- /dev/null +++ b/blob-ingestor/host.json @@ -0,0 +1,8 @@ +{ + "version": "2.0", + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + }, + "logging": { "applicationInsights": { "samplingSettings": { "isEnabled": true } } } +} diff --git a/blob-ingestor/requirements.txt b/blob-ingestor/requirements.txt new file mode 100644 index 0000000..52d35a2 --- /dev/null +++ b/blob-ingestor/requirements.txt @@ -0,0 +1,2 @@ +azure-functions>=1.20.0 +requests>=2.31.0 diff --git a/compose.yaml b/compose.yaml index 8aaaf59..d924d94 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,103 +1,36 @@ services: - splunk: - image: splunk/splunk:9.4.2 - container_name: splunk - restart: unless-stopped - ports: - - "8000:8000" # Splunk Web - - "8088:8088" # HEC - - "8089:8089" # Management API - environment: - SPLUNK_START_ARGS: --accept-license - SPLUNK_PASSWORD: ${SPLUNK_PASSWORD:-Str0ngP@ss!9} - SPLUNK_HEC_TOKEN: ${SPLUNK_HEC_TOKEN:-dev-0123456789abcdef} - volumes: - - splunk-etc:/opt/splunk/etc - - splunk-var:/opt/splunk/var - healthcheck: - test: ["CMD-SHELL", "curl -sk https://localhost:8089/services/server/info | grep -q version"] - interval: 10s - timeout: 5s - retries: 30 - - poller: - build: - context: . - dockerfile: poller/Dockerfile - container_name: splunk-poller - restart: unless-stopped - depends_on: - splunk: - condition: service_healthy - environment: - # --- Splunk connection (to containerized Splunk) --- - SPLUNK_HOST: splunk - SPLUNK_PORT: "8089" - SPLUNK_USER: admin - SPLUNK_PW: ${SPLUNK_PASSWORD:-Str0ngP@ss!9} - SPLUNK_VERIFY_SSL: "false" - # --- What to read --- - SPLUNK_INDEX: intesa_payments - SPLUNK_SOURCETYPE: intesa:bonifico - INITIAL_LOOKBACK: -24h@h - CREATE_INDEX_IF_MISSING: "true" - # --- Polling / chunking --- - SLEEP_SECONDS: "60" - MAX_CHUNK_BYTES: "1800000" - # --- Sink selection: file (local) | blob (azure) | blob+queue (azure) --- - SINK: blob+queue - OUTDIR: /app/out - # --- Azure Storage (Blob + Queue) --- - AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-} - AZURE_STORAGE_CONTAINER: ${AZURE_STORAGE_CONTAINER:-bank-logs} - AZURE_STORAGE_QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks} - AZURE_COMPRESS: "true" - # --- Email default for enqueued messages --- - POLLER_EMAIL_SEND_DEFAULT: "true" - volumes: - - chunks:/app/out - agent-api: build: context: . dockerfile: api/Dockerfile container_name: agent-api - restart: unless-stopped - depends_on: - - poller ports: - "8080:8080" env_file: - - .env # AOAI + Mailtrap, etc. + - .env # pulls your AZURE_OPENAI_*, SMTP_*, MAIL_*, etc. environment: - CHUNK_DIR: /app/out - TOP_K: "12" - # If the API should read blobs directly, ensure these also exist in .env: - # AZURE_STORAGE_CONNECTION_STRING=... - # AZURE_STORAGE_CONTAINER=bank-logs + CHUNK_DIR: /app/out # where the agent reads chunk files volumes: - chunks:/app/out + restart: unless-stopped - queue-worker: + file-poller: build: context: . - dockerfile: worker/Dockerfile - container_name: queue-worker - restart: unless-stopped + dockerfile: poller/Dockerfile + container_name: file-poller + environment: + INDIR: /app/in # folder the poller watches + OUTDIR: /app/out # folder it writes chunk_*.jsonl + SLEEP_SECONDS: 60 # scan every minute + EMAIL_SEND_DEFAULT: "true" # tell agent-api to email results + AGENT_API_URL: http://agent-api:8080 + volumes: + - ./data:/app/in:rw # drop your .jsonl files here (on Windows) + - chunks:/app/out depends_on: - agent-api - env_file: - - .env # to pick up AZURE_STORAGE_CONNECTION_STRING if you keep it here - environment: - AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-} - QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks} - ANALYZER_URL: http://agent-api:8080/analyze # inside compose network - POLL_INTERVAL_SEC: "60" - MAX_DEQUEUE: "1" - VISIBILITY_TIMEOUT: "120" - HTTP_TIMEOUT: "120" + restart: unless-stopped volumes: - splunk-etc: - splunk-var: chunks: diff --git a/data/_done/batch1.jsonl b/data/_done/batch1.jsonl new file mode 100644 index 0000000..25da5e5 --- /dev/null +++ b/data/_done/batch1.jsonl @@ -0,0 +1 @@ +{"event_type":"bonifico","transaction_id":"T1","step":"esito","status":"rejected","importo":12000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12A1234512345123451234512"} \ No newline at end of file diff --git a/data/_done/batch2.jsonl b/data/_done/batch2.jsonl new file mode 100644 index 0000000..77b9825 --- /dev/null +++ b/data/_done/batch2.jsonl @@ -0,0 +1 @@ +{"event_type":"bonifico","transaction_id":"T2","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match"} \ No newline at end of file diff --git a/data/_out/chunk_1759477104_538344ce.jsonl.gz b/data/_out/chunk_1759477104_538344ce.jsonl.gz new file mode 100644 index 0000000..7384f30 Binary files /dev/null and b/data/_out/chunk_1759477104_538344ce.jsonl.gz differ diff --git a/data/local/_sanity.jsonl b/data/local/_sanity.jsonl new file mode 100644 index 0000000..9ccf76c --- /dev/null +++ b/data/local/_sanity.jsonl @@ -0,0 +1 @@ +{"event_type":"bonifico","transaction_id":"T900","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12ZZ000000000000000000000","bic_swift":"TESTGB2L","instantaneo":true} diff --git a/data/local/sampl3.jsonl b/data/local/sampl3.jsonl new file mode 100644 index 0000000..513967a --- /dev/null +++ b/data/local/sampl3.jsonl @@ -0,0 +1,2 @@ +{"event_type":"bonifico","transaction_id":"W1","step":"esito","status":"accepted","importo":9500,"divisa":"EUR","vop_check":"match","data_pagamento":"2025-09-26T12:01:00Z"} +{"event_type":"bonifico","transaction_id":"W2","step":"esito","status":"rejected","importo":150000,"divisa":"EUR","vop_check":"no_match","data_pagamento":"2025-09-26T12:05:00Z"} diff --git a/data/local/sampl33.json b/data/local/sampl33.json new file mode 100644 index 0000000..513967a --- /dev/null +++ b/data/local/sampl33.json @@ -0,0 +1,2 @@ +{"event_type":"bonifico","transaction_id":"W1","step":"esito","status":"accepted","importo":9500,"divisa":"EUR","vop_check":"match","data_pagamento":"2025-09-26T12:01:00Z"} +{"event_type":"bonifico","transaction_id":"W2","step":"esito","status":"rejected","importo":150000,"divisa":"EUR","vop_check":"no_match","data_pagamento":"2025-09-26T12:05:00Z"} diff --git a/data/local/sampl_anom.jsonl b/data/local/sampl_anom.jsonl new file mode 100644 index 0000000..9c02451 --- /dev/null +++ b/data/local/sampl_anom.jsonl @@ -0,0 +1,2 @@ +{"event_type":"bonifico","transaction_id":"T100","step":"esito","status":"rejected","importo":12000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12ZZ000000000000000000000","bic_swift":"TESTGB2L","instantaneo":true} +{"event_type":"bonifico","transaction_id":"T101","step":"esito","status":"accepted","importo":4000,"divisa":"EUR","vop_check":"match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"DE89370400440532013000","bic_swift":"DEUTDEFF","instantaneo":false} diff --git a/export.json b/export.json new file mode 100644 index 0000000..7f81e3e --- /dev/null +++ b/export.json @@ -0,0 +1 @@ +{"event_type":"bonifico","transaction_id":"T777","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match","instantaneo":true} \ No newline at end of file diff --git a/flask_app.py b/flask_app.py deleted file mode 100644 index b1833cd..0000000 --- a/flask_app.py +++ /dev/null @@ -1,185 +0,0 @@ -# flask_app.py -import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt -from typing import Optional -from flask import Flask, request, jsonify -from dotenv import load_dotenv - -# Load .env locally (App Service uses App Settings instead) -load_dotenv(os.getenv("ENV_FILE", ".env")) - -# Agent + email -from agent_runner import build_agent -from notify import send_email - -# Azure SDKs (guarded imports so we don't crash at boot) -try: - from azure.storage.blob import BlobServiceClient, ContentSettings -except Exception: - BlobServiceClient = None - ContentSettings = None - -try: - from azure.storage.queue import QueueClient -except Exception: - QueueClient = None - -app = Flask(__name__) - -# -------- Helpers -------- -def _blob_client() -> BlobServiceClient: - if not BlobServiceClient: - raise RuntimeError("azure-storage-blob not installed") - cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") - if not cs: - raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") - return BlobServiceClient.from_connection_string(cs) - -def _queue_client() -> QueueClient: - if not QueueClient: - raise RuntimeError("azure-storage-queue not installed") - cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") - if not cs: - raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") - qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") - qc = QueueClient.from_connection_string(cs, qname) - try: - qc.create_queue() - except Exception: - pass - return qc - -def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str: - svc = _blob_client() - cc = svc.get_container_client(container) - try: - cc.create_container() - except Exception: - pass - ext = "jsonl.gz" if compressed else "jsonl" - # folder scheme matches poller - prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}" - blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}" - data = gzip.compress(raw_bytes) if compressed else raw_bytes - settings = ContentSettings( - content_type="application/json", - content_encoding=("gzip" if compressed else None), - ) - bc = cc.get_blob_client(blob_name) - bc.upload_blob(data, overwrite=True, content_settings=settings) - return blob_name - -def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str: - svc = _blob_client() - blob = svc.get_blob_client(container=container, blob=blob_name) - data = blob.download_blob().readall() - fname = os.path.basename(blob_name) - path = os.path.join(outdir, fname) - with open(path, "wb") as f: - f.write(data) - return path - -def _download_sas_to_dir(sas_url: str, outdir: str) -> str: - if not BlobServiceClient: - # ultra-light fallback - import urllib.request - data = urllib.request.urlopen(sas_url, timeout=30).read() - else: - from azure.storage.blob import BlobClient - blob = BlobClient.from_blob_url(sas_url) - data = blob.download_blob().readall() - name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl" - path = os.path.join(outdir, name) - open(path, "wb").write(data) - return path - -# -------- Routes -------- -@app.get("/health") -def health(): - return {"status": "ok"}, 200 - -@app.post("/analyze") -def analyze(): - """ - POST JSON: - { - "question": "...optional custom question...", - "email": {"send": true, "to": "override@example.com"}, - "blob": { - "container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]" - // OR - "sas_url": "https://.../chunk.jsonl.gz?sig=..." - } - } - """ - t0 = time.time() - payload = request.get_json(force=True, silent=True) or {} - question = payload.get("question") or ( - "Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " - "Give a brief summary and next steps." - ) - - prev_chunk_dir = os.getenv("CHUNK_DIR", "./out") - tmp_dir = None - try: - blob_req = payload.get("blob") - if blob_req: - tmp_dir = tempfile.mkdtemp(prefix="agent_blob_") - if blob_req.get("sas_url"): - _download_sas_to_dir(blob_req["sas_url"], tmp_dir) - elif blob_req.get("container") and blob_req.get("blob_name"): - _download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir) - else: - return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400 - os.environ["CHUNK_DIR"] = tmp_dir - - agent = build_agent() - out = agent.invoke({"input": question, "chat_history": []}) - result = out.get("output", "") - - email_cfg = payload.get("email") or {} - if email_cfg.get("send"): - to_addr = email_cfg.get("to") - send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr) - - return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200 - - except Exception as e: - return jsonify({"ok": False, "error": str(e)}), 500 - finally: - os.environ["CHUNK_DIR"] = prev_chunk_dir - -# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC) -@app.post("/collect") -@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility -def collect_hec(): - try: - container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") - # Accept either single JSON object or a list; we will write one line per event - body = request.get_json(force=True, silent=True) - if body is None: - return jsonify({"ok": False, "error": "invalid JSON"}), 400 - - lines = [] - if isinstance(body, list): - for item in body: - lines.append(json.dumps(item, separators=(",", ":"))) - else: - lines.append(json.dumps(body, separators=(",", ":"))) - raw = ("\n".join(lines) + "\n").encode("utf-8") - - blob_name = _upload_chunk_blob(container, raw, compressed=True) - - # Enqueue a message your queue-worker understands - msg = { - "blob": {"container": container, "blob_name": blob_name}, - # flip to true if you want emails by default - "email": {"send": False} - } - - qc = _queue_client() - qc.send_message(json.dumps(msg, separators=(",", ":"))) - - return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200 - - except Exception as e: - return jsonify({"ok": False, "error": str(e)}), 500 \ No newline at end of file diff --git a/notify.py b/notify.py deleted file mode 100644 index c44a0a3..0000000 --- a/notify.py +++ /dev/null @@ -1,38 +0,0 @@ -# notify.py -import os, smtplib -from email.mime.text import MIMEText -from email.utils import formataddr -from dotenv import load_dotenv - -# load .env automatically (ENV_FILE can override path) -load_dotenv(os.getenv("ENV_FILE", ".env")) - -def send_email(subject: str, body_text: str, to_addr: str | None = None): - if os.getenv("MAIL_ENABLED", "false").lower() != "true": - print("[notify] MAIL_ENABLED != true; skipping email") - return - - smtp_host = os.getenv("SMTP_HOST") - smtp_port = int(os.getenv("SMTP_PORT", "587")) - smtp_user = os.getenv("SMTP_USER") - smtp_pass = os.getenv("SMTP_PASS") - mail_from = os.getenv("MAIL_FROM") or smtp_user - mail_to = to_addr or os.getenv("MAIL_TO") - - if not (smtp_host and smtp_user and smtp_pass and mail_to): - print("[notify] missing SMTP config; skipping email") - return - - msg = MIMEText(body_text, "plain", "utf-8") - msg["Subject"] = subject - msg["From"] = formataddr(("Intesa Logs Agent", mail_from)) - msg["To"] = mail_to - - with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as s: - try: - s.starttls() - except smtplib.SMTPException: - pass - s.login(smtp_user, smtp_pass) - s.send_message(msg) - print(f"[notify] sent email to {mail_to}") diff --git a/poller/Dockerfile b/poller/Dockerfile index 85921ad..438ecc5 100644 --- a/poller/Dockerfile +++ b/poller/Dockerfile @@ -1,18 +1,21 @@ -# poller/Dockerfile +# poller/Dockerfile (local file poller, no Splunk) FROM python:3.12-slim WORKDIR /app -# Helpful system deps RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl \ && rm -rf /var/lib/apt/lists/* -COPY poller/requirements.txt . +# minimal deps +COPY poller/requirements.txt ./requirements.txt RUN python -m pip install --upgrade pip setuptools wheel \ && pip install --no-cache-dir -r requirements.txt -# Copy the poller script from repo root -COPY splunk_poller.py . +COPY poller/file_poller.py . -# default to root to avoid permission issues on named volumes ENV PYTHONUNBUFFERED=1 -CMD ["python", "-u", "splunk_poller.py"] +# default inputs folder (mounted by compose) and local API endpoint +ENV IN_DIR=/app/in +ENV TARGET_URL=http://agent-api:8080/ingest +ENV BATCH_MAX=1000 +ENV SLEEP_SEC=2.0 +CMD ["python", "-u", "file_poller.py"] diff --git a/poller/file_poller.py b/poller/file_poller.py new file mode 100644 index 0000000..66e251b --- /dev/null +++ b/poller/file_poller.py @@ -0,0 +1,97 @@ +# poller/file_poller.py +import os, time, json, uuid, shutil, logging, pathlib, requests +from typing import List + +logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") + +INDIR = pathlib.Path(os.getenv("INDIR", "/app/in")) +OUTDIR = pathlib.Path(os.getenv("OUTDIR", "/app/out")) # shared with agent-api +DONE = INDIR / "_done" +SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60")) + +# Where the agent API lives (container name in compose or localhost outside) +API_BASE = os.getenv("AGENT_API_URL", "http://agent-api:8080").rstrip("/") + +# Make the poller ask for emails automatically +EMAIL_SEND_DEFAULT = os.getenv("EMAIL_SEND_DEFAULT", "false").lower() in {"1", "true", "yes"} + +QUESTION = os.getenv( + "QUESTION", + "Scan the latest chunks. List anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " + "Give a brief summary and next steps." +) + +def ensure_dirs(): + INDIR.mkdir(parents=True, exist_ok=True) + DONE.mkdir(parents=True, exist_ok=True) + OUTDIR.mkdir(parents=True, exist_ok=True) + +def _copy_to_out(src: pathlib.Path) -> pathlib.Path: + ts = int(time.time()) + dst = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl" + shutil.copy2(src, dst) + return dst + +def _call_analyze(email_send: bool) -> bool: + """Tell the agent to analyze latest chunks (it reads CHUNK_DIR=/app/out).""" + url = f"{API_BASE}/analyze" + payload = { + "question": QUESTION, + "email": {"send": bool(email_send)} + } + try: + r = requests.post(url, json=payload, timeout=180) + if r.status_code // 100 == 2: + logging.info("analyze OK: %s", r.text[:400]) + return True + logging.error("analyze HTTP %s: %s", r.status_code, r.text[:400]) + return False + except Exception as e: + logging.error("analyze failed: %s", e) + return False + +def _process_file(f: pathlib.Path) -> bool: + """ + Copy the file into OUTDIR as a chunk, call /analyze, move file to _done. + Return True if fully handled (so we move to _done), False to retry later. + """ + logging.info("processing %s", f.name) + try: + # 1) copy to /app/out where agent looks for chunk_*.jsonl + dst = _copy_to_out(f) + logging.info("placed chunk %s", dst.name) + + # 2) trigger analysis (and email if enabled) + ok = _call_analyze(EMAIL_SEND_DEFAULT) + if not ok: + logging.info("will retry %s later", f.name) + return False + + # 3) move original to _done + shutil.move(str(f), str(DONE / f.name)) + logging.info("done %s", f.name) + return True + except Exception as e: + logging.error("error processing %s: %s", f.name, e) + logging.info("will retry %s later", f.name) + return False + +def list_inputs() -> List[pathlib.Path]: + return sorted([p for p in INDIR.glob("*.jsonl") if p.is_file()]) + +def main(): + ensure_dirs() + logging.info("file-poller watching %s -> %s target=analyze email_default=%s", + INDIR, f"{API_BASE}/analyze", EMAIL_SEND_DEFAULT) + while True: + files = list_inputs() + if not files: + time.sleep(SLEEP_SECONDS) + continue + for f in files: + _process_file(f) + # brief backoff between batches + time.sleep(2) + +if __name__ == "__main__": + main() diff --git a/poller/requirements.txt b/poller/requirements.txt index b3497a8..f229360 100644 --- a/poller/requirements.txt +++ b/poller/requirements.txt @@ -1,6 +1 @@ -splunk-sdk==2.0.2 -langchain-core==0.2.* -azure-storage-blob>=12.19.0 -azure-storage-queue>=12.9.0 -ujson requests diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 2dfe408..0000000 --- a/requirements.txt +++ /dev/null @@ -1,14 +0,0 @@ -langchain>=0.3,<0.4 -langchain-core>=0.3.27,<0.4 -langchain-community>=0.3,<0.4 -langchain-openai>=0.2.12,<0.3 -openai>=1.40 -faiss-cpu==1.8.* -ujson>=5 -pydantic>=2 -python-dotenv>=1 -flask>=3 -gunicorn>=21 -azure-storage-blob>=12 -requests -azure-storage-queue==12.9.0 diff --git a/sampleLogs.txt b/sampleLogs.txt deleted file mode 100644 index 7710a39..0000000 --- a/sampleLogs.txt +++ /dev/null @@ -1,81 +0,0 @@ -#Cli preset parameters -#source .venv/bin/activate -HEC_URL="https://localhost:8088/services/collector/event" -HEC_TOKEN="dev-0123456789abcdef" -INDEX="intesa_payments" -SOURCETYPE="intesa:bonifico" - -#Cli script for generating logs -gen_iban(){ local d=""; for _ in $(seq 1 25); do d="${d}$((RANDOM%10))"; done; echo "IT${d}"; } -mask_iban(){ local i="$1"; local pre="${i:0:6}"; local suf="${i: -4}"; local n=$(( ${#i}-10 )); printf "%s%0.s*" "$pre" $(seq 1 $n); echo -n "$suf"; } -rand_amount(){ awk 'BEGIN{srand(); printf "%.2f", 5+rand()*14995}'; } -rand_bool_str(){ if ((RANDOM%2)); then echo "true"; else echo "false"; fi; } -pick(){ local a=("$@"); echo "${a[$RANDOM%${#a[@]}]}"; } - -spese=(SHA OUR BEN) -divise=(EUR EUR EUR EUR USD GBP) -statuses=(accepted pending rejected) - -for tx in {1..20}; do - txid=$(cat /proc/sys/kernel/random/uuid 2>/dev/null || uuidgen 2>/dev/null || openssl rand -hex 16) - t0=$(date -u +%s); t1=$((t0+1)); t2=$((t1+2)) - iso0=$(date -u -d @$t0 +%FT%T.%6NZ) - iso1=$(date -u -d @$t1 +%FT%T.%6NZ) - iso2=$(date -u -d @$t2 +%FT%T.%6NZ) - - src=$(gen_iban); dst=$(gen_iban) - srcm=$(mask_iban "$src"); dstm=$(mask_iban "$dst") - amt=$(rand_amount) - dv=$(pick "${divise[@]}") - inst=$(rand_bool_str) - sp=$(pick "${spese[@]}") - final=$(pick "${statuses[@]}") - - send() { - local when="$1" iso="$2" step="$3" status="$4" - curl -sk "$HEC_URL" \ - -H "Authorization: Splunk $HEC_TOKEN" -H "Content-Type: application/json" \ - -d @- </dev/null 2>&1 -done - diff --git a/splunk_poller.py b/splunk_poller.py deleted file mode 100644 index 86f63ad..0000000 --- a/splunk_poller.py +++ /dev/null @@ -1,260 +0,0 @@ -# splunk_poller.py -import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys -import splunklib.client as client -from splunklib.results import JSONResultsReader - -try: - from langchain_core.documents import Document -except ImportError: - from langchain.schema import Document - -STOP = False -def _handle_stop(signum, frame): - global STOP - STOP = True -signal.signal(signal.SIGINT, _handle_stop) -signal.signal(signal.SIGTERM, _handle_stop) - -# ---------- Splunk config ---------- -SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost") -SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089")) -SPLUNK_USER = os.getenv("SPLUNK_USER", "admin") -SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9") -SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"} -INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments") -SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico") -INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h") -CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"} - -# ---------- Polling / chunking ---------- -SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60")) -MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000))) - -# ---------- Sinks ---------- -# Supported: file | blob | blob+queue -SINK = os.getenv("SINK", "file").lower() -OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out")) -CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt")) -AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"} - -# Azure Blob -AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING") -AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") - -# Azure Storage Queue -AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") - -# Email toggle for messages produced by the poller (default: True) -EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"} - -if SINK.startswith("file"): - OUTDIR.mkdir(parents=True, exist_ok=True) - -# ---------- Azure clients (lazy) ---------- -_blob_service = None -_container_client = None -_queue_client = None - -def _init_blob(): - global _blob_service, _container_client - if _blob_service: - return - from azure.storage.blob import BlobServiceClient - _blob_service = BlobServiceClient.from_connection_string(AZ_CS) - _container_client = _blob_service.get_container_client(AZ_CONTAINER) - try: - _container_client.create_container() - except Exception: - pass - -def _init_queue(): - global _queue_client - if _queue_client: - return - from azure.storage.queue import QueueClient - _queue_client = QueueClient.from_connection_string( - conn_str=AZ_CS, queue_name=AZ_QUEUE - ) - try: - _queue_client.create_queue() # idempotent - except Exception: - pass - -# ---------- Checkpoint helpers ---------- -def read_ckpt() -> str | None: - if not CKPT_FILE.exists(): return None - val = CKPT_FILE.read_text().strip() - return val or None - -def write_ckpt(val: str) -> None: - CKPT_FILE.write_text(val) - -def to_epoch_seconds(v) -> int | None: - if v is None: return None - try: - return int(float(v)) - except Exception: - pass - try: - s = str(v).replace("Z", "+00:00") - return int(dt.datetime.fromisoformat(s).timestamp()) - except Exception: - return None - -# ---------- Splunk helpers ---------- -def ensure_index(service, name: str): - # idempotent: create if missing - for idx in service.indexes: - if idx.name == name: - return - service.indexes.create(name) - -def build_search(ckpt_epoch: int | None) -> str: - q = f''' -search index={INDEX} sourcetype="{SOURCETYPE}" -| fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status -'''.strip() - if ckpt_epoch is not None: - q += f"\n| where _indextime > {ckpt_epoch}" - q += "\n| sort + _indextime" - return q - -def fetch(service, ckpt_epoch: int | None): - job = service.jobs.create( - build_search(ckpt_epoch), - exec_mode="normal", - earliest_time=INITIAL_LOOKBACK, - latest_time="now", - output_mode="json", - ) - while not job.is_done(): - if STOP: break - time.sleep(0.5) - rr = JSONResultsReader(job.results(output_mode="json")) - rows = [dict(r) for r in rr if isinstance(r, dict)] - job.cancel() - return rows - -# ---------- Chunking ---------- -def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES): - buf, size = [], 0 - for item in items: - b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8") - if size + len(b) > max_bytes and buf: - yield b"".join(buf) - buf, size = [b], len(b) - else: - buf.append(b); size += len(b) - if buf: yield b"".join(buf) - -# ---------- Sinks ---------- -def write_chunk_file(blob: bytes) -> pathlib.Path: - ts = int(time.time()) - name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl" - name.write_bytes(blob) - return name - -def upload_chunk_blob(blob: bytes): - _init_blob() - from azure.storage.blob import ContentSettings - ts = int(time.time()) - ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl" - # timezone-aware UTC - now_utc = dt.datetime.now(dt.timezone.utc) - blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}" - data = gzip.compress(blob) if AZURE_COMPRESS else blob - content_settings = ContentSettings( - content_type="application/json", - content_encoding=("gzip" if AZURE_COMPRESS else None) - ) - bc = _container_client.get_blob_client(blob_name) - bc.upload_blob(data, overwrite=True, content_settings=content_settings) - return { - "blob_name": blob_name, - "url": bc.url, - "size_bytes": len(data), - "compressed": AZURE_COMPRESS, - } - -def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True): - _init_queue() - payload = { - "blob": {"container": container, "blob_name": blob_name}, - "email": {"send": bool(send_email)} - } - _queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False)) - print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True) - -# ---------- Main ---------- -def main(): - print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})") - service = client.connect( - host=SPLUNK_HOST, - port=SPLUNK_PORT, - scheme="https", - username=SPLUNK_USER, - password=SPLUNK_PW, - verify=SPLUNK_VERIFY_SSL, - ) - - if CREATE_INDEX_IF_MISSING: - try: - ensure_index(service, INDEX) - print(f"[poller] ensured index exists: {INDEX}") - except Exception as e: - print(f"[poller] warn: ensure_index failed: {e}", flush=True) - - ckpt_val = read_ckpt() - ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None - - while not STOP: - rows = fetch(service, ckpt_epoch) - if not rows: - print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True) - for _ in range(SLEEP_SECONDS): - if STOP: break - time.sleep(1) - continue - - max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0 - if max_index_time: - ckpt_epoch = max(ckpt_epoch or 0, max_index_time) - write_ckpt(str(ckpt_epoch)) - - for _, blob in enumerate(chunks_by_bytes(rows)): - # (Document kept for potential future LC usage) - _ = Document( - page_content=blob.decode("utf-8", errors="ignore"), - metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)}, - ) - - if SINK == "file": - fpath = write_chunk_file(blob) - print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True) - - elif SINK == "blob": - if not AZ_CS: - raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads") - meta = upload_chunk_blob(blob) - print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True) - - elif SINK == "blob+queue": - if not AZ_CS: - raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue") - meta = upload_chunk_blob(blob) - print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True) - enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT) - - else: - raise ValueError(f"Unknown SINK={SINK}") - - # brief pause - for _ in range(5): - if STOP: break - time.sleep(1) - - print("[poller] stopping gracefully") - sys.exit(0) - -if __name__ == "__main__": - main()