From 91426c38194b100d2e44f23379c1c62465927aa1 Mon Sep 17 00:00:00 2001 From: Torped Date: Sun, 28 Sep 2025 11:49:44 +0200 Subject: [PATCH] Initial commit of all project files related to splunk (5 containers total) --- .env.example | 20 +++ README.md | 44 ++++++ REPOLAYOUT.md | 11 ++ agent_runner.py | 331 ++++++++++++++++++++++++++++++++++++++++ api/Dockerfile | 15 ++ api/flask_app.py | 185 ++++++++++++++++++++++ api/requirements.txt | 13 ++ compose.yaml | 103 +++++++++++++ flask_app.py | 185 ++++++++++++++++++++++ notify.py | 38 +++++ poller/Dockerfile | 18 +++ poller/requirements.txt | 6 + requirements.txt | 14 ++ sampleLogs.txt | 81 ++++++++++ splunk_poller.py | 260 +++++++++++++++++++++++++++++++ worker/Dockerfile | 16 ++ worker/queue_worker.py | 103 +++++++++++++ worker/requirements.txt | 2 + 18 files changed, 1445 insertions(+) create mode 100644 .env.example create mode 100644 README.md create mode 100644 REPOLAYOUT.md create mode 100644 agent_runner.py create mode 100644 api/Dockerfile create mode 100644 api/flask_app.py create mode 100644 api/requirements.txt create mode 100644 compose.yaml create mode 100644 flask_app.py create mode 100644 notify.py create mode 100644 poller/Dockerfile create mode 100644 poller/requirements.txt create mode 100644 requirements.txt create mode 100644 sampleLogs.txt create mode 100644 splunk_poller.py create mode 100644 worker/Dockerfile create mode 100644 worker/queue_worker.py create mode 100644 worker/requirements.txt diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..4d9ffa1 --- /dev/null +++ b/.env.example @@ -0,0 +1,20 @@ +# ====== LLM / Analyzer (works locally; fill with your own if needed) ====== +# Leave these blank if you want the built-in “dummy” behavior or your code handles missing keys gracefully. +AZURE_OPENAI_ENDPOINT= +AZURE_OPENAI_API_KEY= +AZURE_OPENAI_API_VERSION=2025-01-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini + +# ====== Email (Mailtrap sandbox) — optional ====== +MAIL_ENABLED=true +MAIL_FROM=alerts@intesa-pipeline.local +MAIL_TO=you@company.com +SMTP_HOST=sandbox.smtp.mailtrap.io +SMTP_PORT=2525 +SMTP_USER=YOUR_MAILTRAP_USER +SMTP_PASS=YOUR_MAILTRAP_PASS + +# ====== Azure placeholders (intentionally empty for now) ====== +AZURE_STORAGE_CONNECTION_STRING= +AZURE_STORAGE_CONTAINER= +AZURE_STORAGE_QUEUE_NAME= diff --git a/README.md b/README.md new file mode 100644 index 0000000..661b791 --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# Intesa Logs – Local Docker Setup (Azure bits left empty) + +This repo runs a local pipeline that mimics production **end-to-end**, but **without any active Azure dependencies**. +All “Azure things” are left as **placeholders** so this same repo can later be deployed to Azure. + +## What runs locally + +1. **Splunk** (container) – receives events via HEC. +2. **Poller** (`splunk_poller.py`) – queries Splunk and writes newline-delimited JSON **chunks** to a shared volume. +3. **Agent API** (`flask_app.py`) – reads chunks and produces a concise compliance/ops report (optionally emails it via Mailtrap). + +> Local mode uses `SINK=file` and a shared Docker volume. **No Azure Storage or Queues** are used in this mode. + +--- + +## Quick start (TL;DR) + +```bash +# 1) Create a .env (see sample below) +# 2) Make sure compose.yaml has SINK=file for the poller +# 3) Start the stack +docker compose up -d + +# 4) Check health +curl -sS http://localhost:8080/health + +# 5) Send test events to Splunk HEC +for i in {1..200}; do + curl -k https://localhost:8088/services/collector/event \ + -H "Authorization: Splunk dev-0123456789abcdef" \ + -H "Content-Type: application/json" \ + -d '{"event":{"event_type":"bonifico","step":"esito","status":"accepted","importo": '"$((RANDOM%5000+50))"',"divisa":"EUR","transaction_id":"TX-'$RANDOM'"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' >/dev/null 2>&1 +done + +# 6) Add a couple of anomalies to exercise the analyzer +curl -k https://localhost:8088/services/collector/event \ + -H "Authorization: Splunk dev-0123456789abcdef" \ + -H "Content-Type: application/json" \ + -d '{"event":{"event_type":"bonifico","step":"esito","status":"rejected","importo":12500,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT1998*2*4*6*8*10*12*14*16*9375","iban_dest_masked":"IT1171*2*4*6*8*10*12*14*16*0000","bic_swift":"TESTBICX"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' + +# 7) Ask the Agent API to analyze the latest local chunks +curl -sS -X POST http://localhost:8080/analyze \ + -H 'Content-Type: application/json' \ + -d '{"question":"Scan latest chunks. Flag rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC.","email":{"send":false}}' | jq . diff --git a/REPOLAYOUT.md b/REPOLAYOUT.md new file mode 100644 index 0000000..082a9de --- /dev/null +++ b/REPOLAYOUT.md @@ -0,0 +1,11 @@ +. +├─ api/ # Dockerfile for agent-api +├─ poller/ # Dockerfile for Splunk poller +├─ worker/ # Dockerfile for queue-worker (Azure mode only; not used locally) +├─ agent_runner.py # Analyzer orchestration +├─ flask_app.py # Flask API: /health, /analyze +├─ notify.py # SMTP (Mailtrap) email helper +├─ compose.yaml # Docker Compose stack +├─ requirements.txt +├─ sampleLogs.txt # misc sample content +└─ splunk_poller.py # Polls Splunk & writes chunk files diff --git a/agent_runner.py b/agent_runner.py new file mode 100644 index 0000000..cb45054 --- /dev/null +++ b/agent_runner.py @@ -0,0 +1,331 @@ +import os, sys, glob, json, ujson, gzip, pathlib, re +from typing import List, Dict, Any + +from dotenv import load_dotenv +from notify import send_email +from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings +from langchain_community.vectorstores import FAISS +from langchain_core.documents import Document +from langchain.tools import Tool +from langchain.agents import AgentExecutor, create_tool_calling_agent +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder + +# ----- load .env (defaults to ./.env; override with ENV_FILE=/path/to/.env) ----- +load_dotenv(os.getenv("ENV_FILE", ".env")) + +# ----- read env (supports both AZURE_* and AOAI_*) ----- +def _norm_endpoint(ep: str | None) -> str: + if not ep: return "" + ep = ep.strip().rstrip("/") + # strip any trailing /openai[/v...] + ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep) + return ep + "/" + +AZ_ENDPOINT = _norm_endpoint( + os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT") +) +AZ_API_KEY = ( + os.getenv("AZURE_OPENAI_API_KEY") + or os.getenv("AOAI_API_KEY") + or os.getenv("OPENAI_API_KEY") +) +AZ_API_VERSION = ( + os.getenv("AZURE_OPENAI_API_VERSION") + or os.getenv("AOAI_API_VERSION") + or "2025-01-01-preview" +) +AZ_CHAT_DEPLOY = ( + os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") + or os.getenv("AOAI_CHAT_DEPLOYMENT") + or "gpt-4o-mini" +) +AZ_EMBED_DEPLOY = ( + os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT") + or os.getenv("AOAI_EMBED_DEPLOYMENT") + or "" +) + +# ----- local data config ----- +CHUNK_DIR = os.getenv("CHUNK_DIR", "./out") +BLOB_DIR = os.getenv("BLOB_DIR", "") +TOP_K = int(os.getenv("TOP_K", "12")) + +# ---------- Helpers to build LLM/Embeddings for Azure OpenAI ---------- +def make_llm(temperature: float = 0.2) -> AzureChatOpenAI: + if not AZ_ENDPOINT or not AZ_API_KEY: + raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).") + return AzureChatOpenAI( + azure_endpoint=AZ_ENDPOINT, + api_key=AZ_API_KEY, + api_version=AZ_API_VERSION, + azure_deployment=AZ_CHAT_DEPLOY, + temperature=temperature, + ) + +def make_embeddings() -> AzureOpenAIEmbeddings | None: + if not AZ_EMBED_DEPLOY: + return None + return AzureOpenAIEmbeddings( + azure_endpoint=AZ_ENDPOINT, + api_key=AZ_API_KEY, + api_version=AZ_API_VERSION, + azure_deployment=AZ_EMBED_DEPLOY, + ) + +# ---------- Load JSONL chunk files ---------- +def _iter_chunk_files() -> List[pathlib.Path]: + paths: List[pathlib.Path] = [] + if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists(): + paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")] + if BLOB_DIR and pathlib.Path(BLOB_DIR).exists(): + paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)] + return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True) + +def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]: + data = path.read_bytes() + if path.suffix == ".gz": + data = gzip.decompress(data) + out: List[Dict[str, Any]] = [] + for ln in data.splitlines(): + if not ln.strip(): continue + try: + out.append(ujson.loads(ln)) + except Exception: + continue + return out + +# Accept either raw events or HEC-shaped {"event": {...}} +def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]: + return rec.get("event", rec) + +def _evt_to_text(evt: Dict[str, Any]) -> str: + keys = ["event_type","transaction_id","step","status","importo","divisa","istantaneo", + "spese_commissioni","causale","data_pagamento","iban_origin_masked","iban_dest_masked", + "vop_check","vop_score","bic_swift","latency_ms","device","os","browser","geo"] + parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None] + return "bonifico | " + " | ".join(parts) + +# ---------- Build vector store (only if embeddings deployment exists) ---------- +def build_vectorstore(limit_files: int = 20): + embs = make_embeddings() + if embs is None: + raise RuntimeError("No embeddings deployment set. Export AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT.") + files = _iter_chunk_files()[:limit_files] + if not files: + raise RuntimeError("No chunk files found; set CHUNK_DIR or BLOB_DIR.") + docs, meta_index = [], [] + for fp in files: + rows = _read_jsonl(fp) + for rec in rows: + evt = _normalize_event(rec) + docs.append(Document( + page_content=_evt_to_text(evt), + metadata={"file": fp.name, **{k: evt.get(k) for k in ("transaction_id","step","status")}} + )) + meta_index.append(evt) + vs = FAISS.from_documents(docs, embs) + return vs, meta_index + +# ---------- Tools ---------- +def stats_tool_impl(query: str = "") -> str: + """ + Filters supported in `query` (space-separated): + status: + step: + divisa: + instant: + vop: + min_amount: + iban_country:<2-letter e.g., IT> + Examples: + 'status:rejected min_amount:10000' + 'vop:no_match step:esito' + 'divisa:EUR instant:true' + """ + # load recent events into memory + files = _iter_chunk_files()[:20] + events = [] + for fp in files: + for rec in _read_jsonl(fp): + events.append(_normalize_event(rec)) + + # parse filters + q = query.lower() + def _kv(key, pat=r"([^\s]+)"): + m = re.search(fr"{key}:{pat}", q) + return m.group(1) if m else None + + status_f = _kv("status") + step_f = _kv("step") + div_f = _kv("divisa") + vop_f = _kv("vop") + country = _kv("iban_country") + instant_s = _kv("instant") + min_amt_s = _kv("min_amount") + min_amt = float(min_amt_s) if min_amt_s else 0.0 + inst_f = None + if instant_s in {"true","false"}: + inst_f = (instant_s == "true") + + def _boolish(x): + if isinstance(x, bool): return x + if isinstance(x, str): return x.lower() in {"true","1","yes"} + return False + + def keep(e): + try: amt = float(e.get("importo", 0) or 0) + except: amt = 0.0 + if amt < min_amt: return False + if status_f and (str(e.get("status","")).lower() != status_f): return False + if step_f and (str(e.get("step","")).lower() != step_f): return False + if div_f and (str(e.get("divisa","")).upper() != div_f.upper()): return False + if vop_f: + v = str(e.get("vop_check","")).lower() + if v != vop_f: return False + if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f: + return False + if country: + # heuristic from IBAN (dest or origin) + iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper() + if not iban.startswith(country.upper()): + return False + return True + + filtered = [e for e in events if keep(e)] + + total = len(filtered) + rej = sum(1 for e in filtered if str(e.get("status","")).lower()=="rejected") + amt_sum = 0.0; hi = 0.0; hi_tx = None + for e in filtered: + try: amt = float(e.get("importo", 0) or 0) + except: amt = 0.0 + amt_sum += amt + if amt > hi: + hi, hi_tx = amt, e.get("transaction_id") + return f"events={total}, rejected={rej}, rejection_rate={round(rej/max(total,1),3)}, total_amount={round(amt_sum,2)}, max_amount={hi} (tx={hi_tx})" + +def retrieve_tool_impl(question: str) -> str: + vs, _ = build_vectorstore() + docs = vs.similarity_search(question, k=TOP_K) + return "\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs)) + +def raw_sample_tool_impl(arg: str = "") -> str: + """ + Return a few raw JSON events from the newest chunks. + Accepts the same filters as get_stats PLUS optional 'n:' to control how many. + Examples: + 'n:5 status:rejected min_amount:10000' + 'divisa:EUR instant:true step:esito n:3' + """ + q = (arg or "").lower() + + # helpers (same parsing as get_stats) + def _kv(key, pat=r"([^\s]+)"): + m = re.search(fr"{key}:{pat}", q) + return m.group(1) if m else None + + n_s = _kv("n", r"(\d+)") + n = int(n_s) if n_s else 5 + status_f = _kv("status") + step_f = _kv("step") + div_f = _kv("divisa") + vop_f = _kv("vop") + country = _kv("iban_country") + instant_s = _kv("instant") + min_amt_s = _kv("min_amount") + min_amt = float(min_amt_s) if min_amt_s else 0.0 + + inst_f = None + if instant_s in {"true","false"}: + inst_f = (instant_s == "true") + + def _boolish(x): + if isinstance(x, bool): return x + if isinstance(x, str): return x.lower() in {"true","1","yes"} + return False + + def keep(e): + try: amt = float(e.get("importo", 0) or 0) + except: amt = 0.0 + if amt < min_amt: return False + if status_f and (str(e.get("status","")).lower() != status_f): return False + if step_f and (str(e.get("step","")).lower() != step_f): return False + if div_f and (str(e.get("divisa","")).upper() != div_f.upper()): return False + if vop_f: + v = str(e.get("vop_check","")).lower() + if v != vop_f: return False + if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f: + return False + if country: + iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper() + if not iban.startswith(country.upper()): + return False + return True + + # load newest events and filter + files = _iter_chunk_files() + out = [] + for fp in files: + for rec in _read_jsonl(fp): + evt = _normalize_event(rec) + if keep(evt): + out.append(json.dumps(evt, ensure_ascii=False)) + if len(out) >= n: + break + if len(out) >= n: + break + + if not out: + return "(no matching events)" + return "\n".join(out) + + +# ---------- Build the agent ---------- +def build_agent(): + llm = make_llm(temperature=0.2) + tools = [ + Tool(name="get_stats", func=stats_tool_impl, + description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."), + Tool(name="raw_samples", func=raw_sample_tool_impl, + description="Return a few raw JSON events. Accepts filters like get_stats and 'n:'. Example: 'n:5 status:rejected min_amount:10000'.") + ] + if AZ_EMBED_DEPLOY: + tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl, + description="Semantic search over logs. Ask a question about bonifico logs.")) + + system = """You are a payments log analyst. Use the tools to inspect recent Splunk-derived logs for 'bonifico' events. +- Prefer 'get_stats' for quick metrics (rejection rate, totals). +- Use 'retrieve_similar' (if available) to pull relevant examples before concluding. +- When asked for anomalies, treat as suspicious: rejected EUR >= 10,000, 'vop_no_match', invalid IBAN/BIC, unusual spikes. +Return a short, structured report with: Findings, Evidence, and Recommended actions.""" + + prompt = ChatPromptTemplate.from_messages([ + ("system", system), + MessagesPlaceholder("chat_history"), + ("human", "{input}"), + MessagesPlaceholder("agent_scratchpad"), + ]) + agent = create_tool_calling_agent(llm, tools, prompt) + return AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) + +def run_default_question(question_override: str | None = None): + agent = build_agent() + question = question_override or ( + "Scan the latest chunks. List any anomalies " + "(rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " + "Give a brief summary and next steps." + ) + out = agent.invoke({"input": question, "chat_history": []}) + result = out.get("output", "") + print("\n=== AGENT OUTPUT ===\n", result) + + # Email the result if MAIL_ENABLED=true (handled inside notify.py) + try: + send_email(subject="[Intesa Logs] Agent Report", body_text=result) + except Exception as e: + print("[notify] email failed:", e) + +if __name__ == "__main__": + # optional CLI: allow a custom question + custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None + run_default_question(custom if custom else None) diff --git a/api/Dockerfile b/api/Dockerfile new file mode 100644 index 0000000..3ec79b9 --- /dev/null +++ b/api/Dockerfile @@ -0,0 +1,15 @@ +# api/Dockerfile +FROM python:3.12-slim +WORKDIR /app + +COPY api/requirements.txt . +RUN python -m pip install --upgrade pip setuptools wheel \ + && pip install --no-cache-dir -r requirements.txt + +# Bring in your app files from repo root +COPY agent_runner.py flask_app.py notify.py . + +# The agent loads .env if present; we'll mount it via env_file in compose +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 +CMD ["gunicorn", "-w", "2", "-b", "0.0.0.0:8080", "flask_app:app"] diff --git a/api/flask_app.py b/api/flask_app.py new file mode 100644 index 0000000..b1833cd --- /dev/null +++ b/api/flask_app.py @@ -0,0 +1,185 @@ +# flask_app.py +import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt +from typing import Optional +from flask import Flask, request, jsonify +from dotenv import load_dotenv + +# Load .env locally (App Service uses App Settings instead) +load_dotenv(os.getenv("ENV_FILE", ".env")) + +# Agent + email +from agent_runner import build_agent +from notify import send_email + +# Azure SDKs (guarded imports so we don't crash at boot) +try: + from azure.storage.blob import BlobServiceClient, ContentSettings +except Exception: + BlobServiceClient = None + ContentSettings = None + +try: + from azure.storage.queue import QueueClient +except Exception: + QueueClient = None + +app = Flask(__name__) + +# -------- Helpers -------- +def _blob_client() -> BlobServiceClient: + if not BlobServiceClient: + raise RuntimeError("azure-storage-blob not installed") + cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + if not cs: + raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") + return BlobServiceClient.from_connection_string(cs) + +def _queue_client() -> QueueClient: + if not QueueClient: + raise RuntimeError("azure-storage-queue not installed") + cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + if not cs: + raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") + qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") + qc = QueueClient.from_connection_string(cs, qname) + try: + qc.create_queue() + except Exception: + pass + return qc + +def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str: + svc = _blob_client() + cc = svc.get_container_client(container) + try: + cc.create_container() + except Exception: + pass + ext = "jsonl.gz" if compressed else "jsonl" + # folder scheme matches poller + prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}" + blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}" + data = gzip.compress(raw_bytes) if compressed else raw_bytes + settings = ContentSettings( + content_type="application/json", + content_encoding=("gzip" if compressed else None), + ) + bc = cc.get_blob_client(blob_name) + bc.upload_blob(data, overwrite=True, content_settings=settings) + return blob_name + +def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str: + svc = _blob_client() + blob = svc.get_blob_client(container=container, blob=blob_name) + data = blob.download_blob().readall() + fname = os.path.basename(blob_name) + path = os.path.join(outdir, fname) + with open(path, "wb") as f: + f.write(data) + return path + +def _download_sas_to_dir(sas_url: str, outdir: str) -> str: + if not BlobServiceClient: + # ultra-light fallback + import urllib.request + data = urllib.request.urlopen(sas_url, timeout=30).read() + else: + from azure.storage.blob import BlobClient + blob = BlobClient.from_blob_url(sas_url) + data = blob.download_blob().readall() + name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl" + path = os.path.join(outdir, name) + open(path, "wb").write(data) + return path + +# -------- Routes -------- +@app.get("/health") +def health(): + return {"status": "ok"}, 200 + +@app.post("/analyze") +def analyze(): + """ + POST JSON: + { + "question": "...optional custom question...", + "email": {"send": true, "to": "override@example.com"}, + "blob": { + "container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]" + // OR + "sas_url": "https://.../chunk.jsonl.gz?sig=..." + } + } + """ + t0 = time.time() + payload = request.get_json(force=True, silent=True) or {} + question = payload.get("question") or ( + "Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " + "Give a brief summary and next steps." + ) + + prev_chunk_dir = os.getenv("CHUNK_DIR", "./out") + tmp_dir = None + try: + blob_req = payload.get("blob") + if blob_req: + tmp_dir = tempfile.mkdtemp(prefix="agent_blob_") + if blob_req.get("sas_url"): + _download_sas_to_dir(blob_req["sas_url"], tmp_dir) + elif blob_req.get("container") and blob_req.get("blob_name"): + _download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir) + else: + return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400 + os.environ["CHUNK_DIR"] = tmp_dir + + agent = build_agent() + out = agent.invoke({"input": question, "chat_history": []}) + result = out.get("output", "") + + email_cfg = payload.get("email") or {} + if email_cfg.get("send"): + to_addr = email_cfg.get("to") + send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr) + + return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200 + + except Exception as e: + return jsonify({"ok": False, "error": str(e)}), 500 + finally: + os.environ["CHUNK_DIR"] = prev_chunk_dir + +# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC) +@app.post("/collect") +@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility +def collect_hec(): + try: + container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") + # Accept either single JSON object or a list; we will write one line per event + body = request.get_json(force=True, silent=True) + if body is None: + return jsonify({"ok": False, "error": "invalid JSON"}), 400 + + lines = [] + if isinstance(body, list): + for item in body: + lines.append(json.dumps(item, separators=(",", ":"))) + else: + lines.append(json.dumps(body, separators=(",", ":"))) + raw = ("\n".join(lines) + "\n").encode("utf-8") + + blob_name = _upload_chunk_blob(container, raw, compressed=True) + + # Enqueue a message your queue-worker understands + msg = { + "blob": {"container": container, "blob_name": blob_name}, + # flip to true if you want emails by default + "email": {"send": False} + } + + qc = _queue_client() + qc.send_message(json.dumps(msg, separators=(",", ":"))) + + return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200 + + except Exception as e: + return jsonify({"ok": False, "error": str(e)}), 500 \ No newline at end of file diff --git a/api/requirements.txt b/api/requirements.txt new file mode 100644 index 0000000..aea7bb5 --- /dev/null +++ b/api/requirements.txt @@ -0,0 +1,13 @@ +langchain>=0.3,<0.4 +langchain-core>=0.3.27,<0.4 +langchain-community>=0.3,<0.4 +langchain-openai>=0.2.12,<0.3 +openai>=1.40 +faiss-cpu==1.8.* +ujson>=5 +pydantic>=2 +python-dotenv>=1 +flask>=3 +gunicorn>=21 +azure-storage-blob>=12 # only needed if /analyze pulls blobs +requests \ No newline at end of file diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..8aaaf59 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,103 @@ +services: + splunk: + image: splunk/splunk:9.4.2 + container_name: splunk + restart: unless-stopped + ports: + - "8000:8000" # Splunk Web + - "8088:8088" # HEC + - "8089:8089" # Management API + environment: + SPLUNK_START_ARGS: --accept-license + SPLUNK_PASSWORD: ${SPLUNK_PASSWORD:-Str0ngP@ss!9} + SPLUNK_HEC_TOKEN: ${SPLUNK_HEC_TOKEN:-dev-0123456789abcdef} + volumes: + - splunk-etc:/opt/splunk/etc + - splunk-var:/opt/splunk/var + healthcheck: + test: ["CMD-SHELL", "curl -sk https://localhost:8089/services/server/info | grep -q version"] + interval: 10s + timeout: 5s + retries: 30 + + poller: + build: + context: . + dockerfile: poller/Dockerfile + container_name: splunk-poller + restart: unless-stopped + depends_on: + splunk: + condition: service_healthy + environment: + # --- Splunk connection (to containerized Splunk) --- + SPLUNK_HOST: splunk + SPLUNK_PORT: "8089" + SPLUNK_USER: admin + SPLUNK_PW: ${SPLUNK_PASSWORD:-Str0ngP@ss!9} + SPLUNK_VERIFY_SSL: "false" + # --- What to read --- + SPLUNK_INDEX: intesa_payments + SPLUNK_SOURCETYPE: intesa:bonifico + INITIAL_LOOKBACK: -24h@h + CREATE_INDEX_IF_MISSING: "true" + # --- Polling / chunking --- + SLEEP_SECONDS: "60" + MAX_CHUNK_BYTES: "1800000" + # --- Sink selection: file (local) | blob (azure) | blob+queue (azure) --- + SINK: blob+queue + OUTDIR: /app/out + # --- Azure Storage (Blob + Queue) --- + AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-} + AZURE_STORAGE_CONTAINER: ${AZURE_STORAGE_CONTAINER:-bank-logs} + AZURE_STORAGE_QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks} + AZURE_COMPRESS: "true" + # --- Email default for enqueued messages --- + POLLER_EMAIL_SEND_DEFAULT: "true" + volumes: + - chunks:/app/out + + agent-api: + build: + context: . + dockerfile: api/Dockerfile + container_name: agent-api + restart: unless-stopped + depends_on: + - poller + ports: + - "8080:8080" + env_file: + - .env # AOAI + Mailtrap, etc. + environment: + CHUNK_DIR: /app/out + TOP_K: "12" + # If the API should read blobs directly, ensure these also exist in .env: + # AZURE_STORAGE_CONNECTION_STRING=... + # AZURE_STORAGE_CONTAINER=bank-logs + volumes: + - chunks:/app/out + + queue-worker: + build: + context: . + dockerfile: worker/Dockerfile + container_name: queue-worker + restart: unless-stopped + depends_on: + - agent-api + env_file: + - .env # to pick up AZURE_STORAGE_CONNECTION_STRING if you keep it here + environment: + AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-} + QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks} + ANALYZER_URL: http://agent-api:8080/analyze # inside compose network + POLL_INTERVAL_SEC: "60" + MAX_DEQUEUE: "1" + VISIBILITY_TIMEOUT: "120" + HTTP_TIMEOUT: "120" + +volumes: + splunk-etc: + splunk-var: + chunks: diff --git a/flask_app.py b/flask_app.py new file mode 100644 index 0000000..b1833cd --- /dev/null +++ b/flask_app.py @@ -0,0 +1,185 @@ +# flask_app.py +import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt +from typing import Optional +from flask import Flask, request, jsonify +from dotenv import load_dotenv + +# Load .env locally (App Service uses App Settings instead) +load_dotenv(os.getenv("ENV_FILE", ".env")) + +# Agent + email +from agent_runner import build_agent +from notify import send_email + +# Azure SDKs (guarded imports so we don't crash at boot) +try: + from azure.storage.blob import BlobServiceClient, ContentSettings +except Exception: + BlobServiceClient = None + ContentSettings = None + +try: + from azure.storage.queue import QueueClient +except Exception: + QueueClient = None + +app = Flask(__name__) + +# -------- Helpers -------- +def _blob_client() -> BlobServiceClient: + if not BlobServiceClient: + raise RuntimeError("azure-storage-blob not installed") + cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + if not cs: + raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") + return BlobServiceClient.from_connection_string(cs) + +def _queue_client() -> QueueClient: + if not QueueClient: + raise RuntimeError("azure-storage-queue not installed") + cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + if not cs: + raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set") + qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") + qc = QueueClient.from_connection_string(cs, qname) + try: + qc.create_queue() + except Exception: + pass + return qc + +def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str: + svc = _blob_client() + cc = svc.get_container_client(container) + try: + cc.create_container() + except Exception: + pass + ext = "jsonl.gz" if compressed else "jsonl" + # folder scheme matches poller + prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}" + blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}" + data = gzip.compress(raw_bytes) if compressed else raw_bytes + settings = ContentSettings( + content_type="application/json", + content_encoding=("gzip" if compressed else None), + ) + bc = cc.get_blob_client(blob_name) + bc.upload_blob(data, overwrite=True, content_settings=settings) + return blob_name + +def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str: + svc = _blob_client() + blob = svc.get_blob_client(container=container, blob=blob_name) + data = blob.download_blob().readall() + fname = os.path.basename(blob_name) + path = os.path.join(outdir, fname) + with open(path, "wb") as f: + f.write(data) + return path + +def _download_sas_to_dir(sas_url: str, outdir: str) -> str: + if not BlobServiceClient: + # ultra-light fallback + import urllib.request + data = urllib.request.urlopen(sas_url, timeout=30).read() + else: + from azure.storage.blob import BlobClient + blob = BlobClient.from_blob_url(sas_url) + data = blob.download_blob().readall() + name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl" + path = os.path.join(outdir, name) + open(path, "wb").write(data) + return path + +# -------- Routes -------- +@app.get("/health") +def health(): + return {"status": "ok"}, 200 + +@app.post("/analyze") +def analyze(): + """ + POST JSON: + { + "question": "...optional custom question...", + "email": {"send": true, "to": "override@example.com"}, + "blob": { + "container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]" + // OR + "sas_url": "https://.../chunk.jsonl.gz?sig=..." + } + } + """ + t0 = time.time() + payload = request.get_json(force=True, silent=True) or {} + question = payload.get("question") or ( + "Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). " + "Give a brief summary and next steps." + ) + + prev_chunk_dir = os.getenv("CHUNK_DIR", "./out") + tmp_dir = None + try: + blob_req = payload.get("blob") + if blob_req: + tmp_dir = tempfile.mkdtemp(prefix="agent_blob_") + if blob_req.get("sas_url"): + _download_sas_to_dir(blob_req["sas_url"], tmp_dir) + elif blob_req.get("container") and blob_req.get("blob_name"): + _download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir) + else: + return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400 + os.environ["CHUNK_DIR"] = tmp_dir + + agent = build_agent() + out = agent.invoke({"input": question, "chat_history": []}) + result = out.get("output", "") + + email_cfg = payload.get("email") or {} + if email_cfg.get("send"): + to_addr = email_cfg.get("to") + send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr) + + return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200 + + except Exception as e: + return jsonify({"ok": False, "error": str(e)}), 500 + finally: + os.environ["CHUNK_DIR"] = prev_chunk_dir + +# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC) +@app.post("/collect") +@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility +def collect_hec(): + try: + container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") + # Accept either single JSON object or a list; we will write one line per event + body = request.get_json(force=True, silent=True) + if body is None: + return jsonify({"ok": False, "error": "invalid JSON"}), 400 + + lines = [] + if isinstance(body, list): + for item in body: + lines.append(json.dumps(item, separators=(",", ":"))) + else: + lines.append(json.dumps(body, separators=(",", ":"))) + raw = ("\n".join(lines) + "\n").encode("utf-8") + + blob_name = _upload_chunk_blob(container, raw, compressed=True) + + # Enqueue a message your queue-worker understands + msg = { + "blob": {"container": container, "blob_name": blob_name}, + # flip to true if you want emails by default + "email": {"send": False} + } + + qc = _queue_client() + qc.send_message(json.dumps(msg, separators=(",", ":"))) + + return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200 + + except Exception as e: + return jsonify({"ok": False, "error": str(e)}), 500 \ No newline at end of file diff --git a/notify.py b/notify.py new file mode 100644 index 0000000..c44a0a3 --- /dev/null +++ b/notify.py @@ -0,0 +1,38 @@ +# notify.py +import os, smtplib +from email.mime.text import MIMEText +from email.utils import formataddr +from dotenv import load_dotenv + +# load .env automatically (ENV_FILE can override path) +load_dotenv(os.getenv("ENV_FILE", ".env")) + +def send_email(subject: str, body_text: str, to_addr: str | None = None): + if os.getenv("MAIL_ENABLED", "false").lower() != "true": + print("[notify] MAIL_ENABLED != true; skipping email") + return + + smtp_host = os.getenv("SMTP_HOST") + smtp_port = int(os.getenv("SMTP_PORT", "587")) + smtp_user = os.getenv("SMTP_USER") + smtp_pass = os.getenv("SMTP_PASS") + mail_from = os.getenv("MAIL_FROM") or smtp_user + mail_to = to_addr or os.getenv("MAIL_TO") + + if not (smtp_host and smtp_user and smtp_pass and mail_to): + print("[notify] missing SMTP config; skipping email") + return + + msg = MIMEText(body_text, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = formataddr(("Intesa Logs Agent", mail_from)) + msg["To"] = mail_to + + with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as s: + try: + s.starttls() + except smtplib.SMTPException: + pass + s.login(smtp_user, smtp_pass) + s.send_message(msg) + print(f"[notify] sent email to {mail_to}") diff --git a/poller/Dockerfile b/poller/Dockerfile new file mode 100644 index 0000000..85921ad --- /dev/null +++ b/poller/Dockerfile @@ -0,0 +1,18 @@ +# poller/Dockerfile +FROM python:3.12-slim +WORKDIR /app + +# Helpful system deps +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl \ + && rm -rf /var/lib/apt/lists/* + +COPY poller/requirements.txt . +RUN python -m pip install --upgrade pip setuptools wheel \ + && pip install --no-cache-dir -r requirements.txt + +# Copy the poller script from repo root +COPY splunk_poller.py . + +# default to root to avoid permission issues on named volumes +ENV PYTHONUNBUFFERED=1 +CMD ["python", "-u", "splunk_poller.py"] diff --git a/poller/requirements.txt b/poller/requirements.txt new file mode 100644 index 0000000..b3497a8 --- /dev/null +++ b/poller/requirements.txt @@ -0,0 +1,6 @@ +splunk-sdk==2.0.2 +langchain-core==0.2.* +azure-storage-blob>=12.19.0 +azure-storage-queue>=12.9.0 +ujson +requests diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2dfe408 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +langchain>=0.3,<0.4 +langchain-core>=0.3.27,<0.4 +langchain-community>=0.3,<0.4 +langchain-openai>=0.2.12,<0.3 +openai>=1.40 +faiss-cpu==1.8.* +ujson>=5 +pydantic>=2 +python-dotenv>=1 +flask>=3 +gunicorn>=21 +azure-storage-blob>=12 +requests +azure-storage-queue==12.9.0 diff --git a/sampleLogs.txt b/sampleLogs.txt new file mode 100644 index 0000000..7710a39 --- /dev/null +++ b/sampleLogs.txt @@ -0,0 +1,81 @@ +#Cli preset parameters +#source .venv/bin/activate +HEC_URL="https://localhost:8088/services/collector/event" +HEC_TOKEN="dev-0123456789abcdef" +INDEX="intesa_payments" +SOURCETYPE="intesa:bonifico" + +#Cli script for generating logs +gen_iban(){ local d=""; for _ in $(seq 1 25); do d="${d}$((RANDOM%10))"; done; echo "IT${d}"; } +mask_iban(){ local i="$1"; local pre="${i:0:6}"; local suf="${i: -4}"; local n=$(( ${#i}-10 )); printf "%s%0.s*" "$pre" $(seq 1 $n); echo -n "$suf"; } +rand_amount(){ awk 'BEGIN{srand(); printf "%.2f", 5+rand()*14995}'; } +rand_bool_str(){ if ((RANDOM%2)); then echo "true"; else echo "false"; fi; } +pick(){ local a=("$@"); echo "${a[$RANDOM%${#a[@]}]}"; } + +spese=(SHA OUR BEN) +divise=(EUR EUR EUR EUR USD GBP) +statuses=(accepted pending rejected) + +for tx in {1..20}; do + txid=$(cat /proc/sys/kernel/random/uuid 2>/dev/null || uuidgen 2>/dev/null || openssl rand -hex 16) + t0=$(date -u +%s); t1=$((t0+1)); t2=$((t1+2)) + iso0=$(date -u -d @$t0 +%FT%T.%6NZ) + iso1=$(date -u -d @$t1 +%FT%T.%6NZ) + iso2=$(date -u -d @$t2 +%FT%T.%6NZ) + + src=$(gen_iban); dst=$(gen_iban) + srcm=$(mask_iban "$src"); dstm=$(mask_iban "$dst") + amt=$(rand_amount) + dv=$(pick "${divise[@]}") + inst=$(rand_bool_str) + sp=$(pick "${spese[@]}") + final=$(pick "${statuses[@]}") + + send() { + local when="$1" iso="$2" step="$3" status="$4" + curl -sk "$HEC_URL" \ + -H "Authorization: Splunk $HEC_TOKEN" -H "Content-Type: application/json" \ + -d @- </dev/null 2>&1 +done + diff --git a/splunk_poller.py b/splunk_poller.py new file mode 100644 index 0000000..86f63ad --- /dev/null +++ b/splunk_poller.py @@ -0,0 +1,260 @@ +# splunk_poller.py +import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys +import splunklib.client as client +from splunklib.results import JSONResultsReader + +try: + from langchain_core.documents import Document +except ImportError: + from langchain.schema import Document + +STOP = False +def _handle_stop(signum, frame): + global STOP + STOP = True +signal.signal(signal.SIGINT, _handle_stop) +signal.signal(signal.SIGTERM, _handle_stop) + +# ---------- Splunk config ---------- +SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost") +SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089")) +SPLUNK_USER = os.getenv("SPLUNK_USER", "admin") +SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9") +SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"} +INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments") +SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico") +INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h") +CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"} + +# ---------- Polling / chunking ---------- +SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60")) +MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000))) + +# ---------- Sinks ---------- +# Supported: file | blob | blob+queue +SINK = os.getenv("SINK", "file").lower() +OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out")) +CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt")) +AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"} + +# Azure Blob +AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING") +AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") + +# Azure Storage Queue +AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") + +# Email toggle for messages produced by the poller (default: True) +EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"} + +if SINK.startswith("file"): + OUTDIR.mkdir(parents=True, exist_ok=True) + +# ---------- Azure clients (lazy) ---------- +_blob_service = None +_container_client = None +_queue_client = None + +def _init_blob(): + global _blob_service, _container_client + if _blob_service: + return + from azure.storage.blob import BlobServiceClient + _blob_service = BlobServiceClient.from_connection_string(AZ_CS) + _container_client = _blob_service.get_container_client(AZ_CONTAINER) + try: + _container_client.create_container() + except Exception: + pass + +def _init_queue(): + global _queue_client + if _queue_client: + return + from azure.storage.queue import QueueClient + _queue_client = QueueClient.from_connection_string( + conn_str=AZ_CS, queue_name=AZ_QUEUE + ) + try: + _queue_client.create_queue() # idempotent + except Exception: + pass + +# ---------- Checkpoint helpers ---------- +def read_ckpt() -> str | None: + if not CKPT_FILE.exists(): return None + val = CKPT_FILE.read_text().strip() + return val or None + +def write_ckpt(val: str) -> None: + CKPT_FILE.write_text(val) + +def to_epoch_seconds(v) -> int | None: + if v is None: return None + try: + return int(float(v)) + except Exception: + pass + try: + s = str(v).replace("Z", "+00:00") + return int(dt.datetime.fromisoformat(s).timestamp()) + except Exception: + return None + +# ---------- Splunk helpers ---------- +def ensure_index(service, name: str): + # idempotent: create if missing + for idx in service.indexes: + if idx.name == name: + return + service.indexes.create(name) + +def build_search(ckpt_epoch: int | None) -> str: + q = f''' +search index={INDEX} sourcetype="{SOURCETYPE}" +| fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status +'''.strip() + if ckpt_epoch is not None: + q += f"\n| where _indextime > {ckpt_epoch}" + q += "\n| sort + _indextime" + return q + +def fetch(service, ckpt_epoch: int | None): + job = service.jobs.create( + build_search(ckpt_epoch), + exec_mode="normal", + earliest_time=INITIAL_LOOKBACK, + latest_time="now", + output_mode="json", + ) + while not job.is_done(): + if STOP: break + time.sleep(0.5) + rr = JSONResultsReader(job.results(output_mode="json")) + rows = [dict(r) for r in rr if isinstance(r, dict)] + job.cancel() + return rows + +# ---------- Chunking ---------- +def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES): + buf, size = [], 0 + for item in items: + b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8") + if size + len(b) > max_bytes and buf: + yield b"".join(buf) + buf, size = [b], len(b) + else: + buf.append(b); size += len(b) + if buf: yield b"".join(buf) + +# ---------- Sinks ---------- +def write_chunk_file(blob: bytes) -> pathlib.Path: + ts = int(time.time()) + name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl" + name.write_bytes(blob) + return name + +def upload_chunk_blob(blob: bytes): + _init_blob() + from azure.storage.blob import ContentSettings + ts = int(time.time()) + ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl" + # timezone-aware UTC + now_utc = dt.datetime.now(dt.timezone.utc) + blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}" + data = gzip.compress(blob) if AZURE_COMPRESS else blob + content_settings = ContentSettings( + content_type="application/json", + content_encoding=("gzip" if AZURE_COMPRESS else None) + ) + bc = _container_client.get_blob_client(blob_name) + bc.upload_blob(data, overwrite=True, content_settings=content_settings) + return { + "blob_name": blob_name, + "url": bc.url, + "size_bytes": len(data), + "compressed": AZURE_COMPRESS, + } + +def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True): + _init_queue() + payload = { + "blob": {"container": container, "blob_name": blob_name}, + "email": {"send": bool(send_email)} + } + _queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False)) + print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True) + +# ---------- Main ---------- +def main(): + print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})") + service = client.connect( + host=SPLUNK_HOST, + port=SPLUNK_PORT, + scheme="https", + username=SPLUNK_USER, + password=SPLUNK_PW, + verify=SPLUNK_VERIFY_SSL, + ) + + if CREATE_INDEX_IF_MISSING: + try: + ensure_index(service, INDEX) + print(f"[poller] ensured index exists: {INDEX}") + except Exception as e: + print(f"[poller] warn: ensure_index failed: {e}", flush=True) + + ckpt_val = read_ckpt() + ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None + + while not STOP: + rows = fetch(service, ckpt_epoch) + if not rows: + print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True) + for _ in range(SLEEP_SECONDS): + if STOP: break + time.sleep(1) + continue + + max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0 + if max_index_time: + ckpt_epoch = max(ckpt_epoch or 0, max_index_time) + write_ckpt(str(ckpt_epoch)) + + for _, blob in enumerate(chunks_by_bytes(rows)): + # (Document kept for potential future LC usage) + _ = Document( + page_content=blob.decode("utf-8", errors="ignore"), + metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)}, + ) + + if SINK == "file": + fpath = write_chunk_file(blob) + print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True) + + elif SINK == "blob": + if not AZ_CS: + raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads") + meta = upload_chunk_blob(blob) + print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True) + + elif SINK == "blob+queue": + if not AZ_CS: + raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue") + meta = upload_chunk_blob(blob) + print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True) + enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT) + + else: + raise ValueError(f"Unknown SINK={SINK}") + + # brief pause + for _ in range(5): + if STOP: break + time.sleep(1) + + print("[poller] stopping gracefully") + sys.exit(0) + +if __name__ == "__main__": + main() diff --git a/worker/Dockerfile b/worker/Dockerfile new file mode 100644 index 0000000..d74a1a6 --- /dev/null +++ b/worker/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 + +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl \ + && rm -rf /var/lib/apt/lists/* + +COPY worker/requirements.txt . +RUN python -m pip install --upgrade pip setuptools wheel \ + && pip install --no-cache-dir -r requirements.txt + +COPY worker/queue_worker.py . + +USER 1000 +CMD ["python", "queue_worker.py"] diff --git a/worker/queue_worker.py b/worker/queue_worker.py new file mode 100644 index 0000000..f3de3b1 --- /dev/null +++ b/worker/queue_worker.py @@ -0,0 +1,103 @@ +import os, sys, time, json, signal, logging, traceback +from typing import List +import requests +from azure.storage.queue import QueueClient + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)s | %(message)s", +) + +STOP = False +def _handle_stop(*_): + global STOP + STOP = True +signal.signal(signal.SIGTERM, _handle_stop) +signal.signal(signal.SIGINT, _handle_stop) + +# --- config via env --- +AZURE_STORAGE_CONNECTION_STRING = os.getenv("AZURE_STORAGE_CONNECTION_STRING") +QUEUE_NAME = os.getenv("QUEUE_NAME", "log-chunks") +ANALYZER_URL = os.getenv("ANALYZER_URL", "http://agent-api:8080/analyze") # local compose default +POLL_INTERVAL_SEC = int(os.getenv("POLL_INTERVAL_SEC", "5")) +MAX_DEQUEUE = int(os.getenv("MAX_DEQUEUE", "16")) # up to 32 +VISIBILITY_TIMEOUT = int(os.getenv("VISIBILITY_TIMEOUT", "120")) # seconds +HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "120")) # seconds + +if not AZURE_STORAGE_CONNECTION_STRING: + logging.error("AZURE_STORAGE_CONNECTION_STRING missing") + sys.exit(1) + +def process_message(text: str) -> bool: + """ + Returns True if handled successfully (and message should be deleted), + False otherwise (let it reappear for retry). + """ + try: + payload = json.loads(text) + except Exception: + logging.warning("Message is not valid JSON; ignoring: %s", text[:500]) + return True # delete bad messages to avoid poison + + try: + r = requests.post(ANALYZER_URL, json=payload, timeout=HTTP_TIMEOUT) + if r.status_code // 100 == 2: + logging.info("Analyzer OK: %s", r.text[:500]) + return True + else: + logging.warning("Analyzer HTTP %s: %s", r.status_code, r.text[:500]) + return False + except Exception as e: + logging.error("Analyzer call failed: %s", e) + return False + +def main(): + logging.info("queue-worker starting; queue=%s analyzer=%s", QUEUE_NAME, ANALYZER_URL) + q = QueueClient.from_connection_string( + conn_str=AZURE_STORAGE_CONNECTION_STRING, + queue_name=QUEUE_NAME, + ) + # create queue if missing + try: + q.create_queue() + except Exception: + pass + + while not STOP: + try: + msgs = list(q.receive_messages( + messages_per_page=MAX_DEQUEUE, + visibility_timeout=VISIBILITY_TIMEOUT + )) + + if not msgs: + time.sleep(POLL_INTERVAL_SEC) + continue + + for m in msgs: + ok = False + try: + # In SDK v12, m.content is already base64-decoded text + ok = process_message(m.content) + except Exception as ex: + logging.error("Error processing message: %s\n%s", ex, traceback.format_exc()) + ok = False + + if ok: + try: + q.delete_message(m) + logging.info("Deleted message id=%s", m.id) + except Exception as de: + logging.warning("Delete failed (will reappear later): %s", de) + else: + # Don’t delete; it will become visible again after VISIBILITY_TIMEOUT + logging.info("Kept message for retry id=%s", m.id) + + except Exception as loop_ex: + logging.error("Receive loop error: %s", loop_ex) + time.sleep(POLL_INTERVAL_SEC) + + logging.info("queue-worker stopping gracefully") + +if __name__ == "__main__": + main() diff --git a/worker/requirements.txt b/worker/requirements.txt new file mode 100644 index 0000000..21089df --- /dev/null +++ b/worker/requirements.txt @@ -0,0 +1,2 @@ +azure-storage-queue==12.9.0 +requests>=2.32.0