feat: Initial commit of blob-ingestor and file restructuring

This commit is contained in:
Torped 2025-10-04 18:09:16 +02:00
parent f805a51f5d
commit a35633c96c
32 changed files with 785 additions and 938 deletions

32
.env Normal file
View File

@ -0,0 +1,32 @@
# Azure (optional; only needed if SINK=blob or blob+sb)
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;EndpointSuffix=core.windows.net"
AZURE_STORAGE_CONTAINER=bank-logs
AZURE_STORAGE_QUEUE_NAME=log-chunks
AZURE_SERVICEBUS_CONNECTION_STRING=
AZURE_STORAGE_CONTAINER =bank-logs
AZURE_OPENAI_ENDPOINT=https://tf-in-dev-clz-core-aif.cognitiveservices.azure.com/
AZURE_OPENAI_API_KEY=BwPkZje8Ifr51ob4gYKIj37L0OlvBGoQo4dqeebwdpz72cmhvJ0pJQQJ99BIACgEuAYXJ3w3AAAAACOGOfEc
AZURE_OPENAI_API_VERSION=2025-01-01-preview
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini
#ANALYZER_URL=https://tf-in-dev-chatapp-app.azurewebsites.net/analyze
# Optional embeddings:
# AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT=text-embedding-3-small
# --- Mail (Mailtrap SMTP Sandbox) ---
MAIL_ENABLED=true
SMTP_HOST=sandbox.smtp.mailtrap.io #smtp.office365.com
SMTP_PORT=2525 #587 + MAIL_TLS = true if outlook
SMTP_USER=14ea96c3766614
SMTP_PASS=48b19e33a6290e
MAIL_FROM=alerts@intesa-pipeline.local
MAIL_TO=you@company.com
RG=intesadev-rg
SA=tfindevst
QUEUE=log-chunks
# (optional) where your chunks are:
CHUNK_DIR="./out"

View File

@ -1,82 +0,0 @@
# Intesa Logs Project Documentation
This repo implements a small, production-style pipeline that inspects bank transfer (“**bonifico**”) logs, looks for anomalies (e.g., **rejected EUR ≥ 10,000**, **`vop_no_match`**, **invalid IBAN/BIC**), and produces a concise report (optionally emailed).
It runs **locally via Docker** and is designed to be **deployable to Azure** using the same containers.
---
## High-level flow
**Splunk (HEC)** → **Poller***(Chunks: file or Azure Blob)**(Optional: Azure Queue message)***Analyzer API** → *(Optional: Email via Mailtrap)*
- **Local mode:** Poller writes chunk **files** to a shared volume. Analyzer reads those files directly.
- **Azure mode (final target):** Poller uploads **blobs** to Storage (`bank-logs`) and enqueues messages to Storage Queue (`log-chunks`). A **Queue Worker** consumes queue messages and calls the Analyzer API.
---
## Current state snapshot (whats running now)
### ✅ Running in Azure
- **App Service (Agent API)**
- Name: `tf-in-dev-chatapp-app`
- Image: `tfindevacr.azurecr.io/agent-api:prod` (pulled from ACR via Managed Identity)
- Public endpoint: `https://tf-in-dev-chatapp-app.azurewebsites.net`
- Health: `GET /health``{"status":"ok"}`
- API: `POST /analyze` (see examples below)
- **Azure Container Registry (ACR)**
- Name: `tfindevacr`
- Repos/tags present:
- `agent-api:prod`
- `queue-worker:prod` ✅ *(built & pushed; not yet deployed)*
- **Azure Storage (data plane in use)**
- Storage account: `tfindevst`
- **Blob container:** `bank-logs` (holds `.jsonl` or `.jsonl.gz` chunks)
- **Queue:** `log-chunks` (messages the worker consumes)
> The API is live in Azure. The **worker** and **Splunk** are still local right now.
### ✅ Running locally (Docker Compose)
- **Splunk** container (HEC exposed)
- **Poller** (`splunk_poller.py`)
- You can run it in either:
- `SINK=file` → write chunks to local volume (simple local dev), or
- `SINK=blob+queue` → upload to Azure Blob + enqueue Azure Queue (production-like)
- **Queue Worker** (`worker/`)
- Currently running **locally**, reading Azure Storage Queue and calling the Analyzer (either local API or Azure API based on `ANALYZER_URL`).
---
## Repo structure
```bash
# 1) Create a .env (see sample below)
# 2) Make sure compose.yaml has SINK=file (if local) or SINK=blob/blob+queue (if azure) for the poller
# 3) Start the stack
docker compose up -d
# 4) Check health
curl -sS http://localhost:8080/health
# 5) Send test events to Splunk HEC
for i in {1..5}; do
curl -k https://localhost:8088/services/collector/event \
-H "Authorization: Splunk dev-0123456789abcdef" \
-H "Content-Type: application/json" \
-d '{"event":{"event_type":"bonifico","step":"esito","status":"accepted","importo": '"$((RANDOM%5000+50))"',"divisa":"EUR","transaction_id":"TX-'$RANDOM'"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' >/dev/null 2>&1
done
# 6) Add a couple of anomalies to exercise the analyzer
curl -k https://localhost:8088/services/collector/event \
-H "Authorization: Splunk dev-0123456789abcdef" \
-H "Content-Type: application/json" \
-d '{"event":{"event_type":"bonifico","step":"esito","status":"rejected","importo":12500,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT1998*2*4*6*8*10*12*14*16*9375","iban_dest_masked":"IT1171*2*4*6*8*10*12*14*16*0000","bic_swift":"TESTBICX"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}'
# 7) Ask the Agent API to analyze the latest local chunks
curl -sS -X POST http://localhost:8080/analyze \
-H 'Content-Type: application/json' \
-d '{"question":"Scan latest chunks. Flag rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC.","email":{"send":false}}' | jq .

View File

@ -1,11 +0,0 @@
.
├─ api/ # Dockerfile for agent-api
├─ poller/ # Dockerfile for Splunk poller
├─ worker/ # Dockerfile for queue-worker (Azure mode only; not used locally)
├─ agent_runner.py # Analyzer orchestration
├─ flask_app.py # Flask API: /health, /analyze
├─ notify.py # SMTP (Mailtrap) email helper
├─ compose.yaml # Docker Compose stack
├─ requirements.txt
├─ sampleLogs.txt # misc sample content
└─ splunk_poller.py # Polls Splunk & writes chunk files

View File

@ -2,14 +2,16 @@
FROM python:3.12-slim
WORKDIR /app
# deps
COPY api/requirements.txt .
RUN python -m pip install --upgrade pip setuptools wheel \
&& pip install --no-cache-dir -r requirements.txt
# Bring in your app files from repo root
COPY agent_runner.py flask_app.py notify.py .
# app code (put everything at /app so imports stay "from notify import send_email")
COPY api/agent_runner.py ./agent_runner.py
COPY api/flask_app.py ./flask_app.py
COPY api/notify.py ./notify.py
# The agent loads .env if present; we'll mount it via env_file in compose
ENV PYTHONUNBUFFERED=1
EXPOSE 8080
CMD ["gunicorn", "-w", "2", "-b", "0.0.0.0:8080", "flask_app:app"]
CMD ["gunicorn","-w","2","-b","0.0.0.0:8080","flask_app:app"]

View File

@ -10,47 +10,28 @@ from langchain.tools import Tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# ----- load .env (defaults to ./.env; override with ENV_FILE=/path/to/.env) -----
# ----- load .env -----
load_dotenv(os.getenv("ENV_FILE", ".env"))
# ----- read env (supports both AZURE_* and AOAI_*) -----
# ----- normalize endpoint -----
def _norm_endpoint(ep: str | None) -> str:
if not ep: return ""
ep = ep.strip().rstrip("/")
# strip any trailing /openai[/v...]
ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep)
return ep + "/"
AZ_ENDPOINT = _norm_endpoint(
os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT")
)
AZ_API_KEY = (
os.getenv("AZURE_OPENAI_API_KEY")
or os.getenv("AOAI_API_KEY")
or os.getenv("OPENAI_API_KEY")
)
AZ_API_VERSION = (
os.getenv("AZURE_OPENAI_API_VERSION")
or os.getenv("AOAI_API_VERSION")
or "2025-01-01-preview"
)
AZ_CHAT_DEPLOY = (
os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
or os.getenv("AOAI_CHAT_DEPLOYMENT")
or "gpt-4o-mini"
)
AZ_EMBED_DEPLOY = (
os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT")
or os.getenv("AOAI_EMBED_DEPLOYMENT")
or ""
)
AZ_ENDPOINT = _norm_endpoint(os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT"))
AZ_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY")
AZ_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION") or os.getenv("AOAI_API_VERSION") or "2025-01-01-preview"
AZ_CHAT_DEPLOY = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") or os.getenv("AOAI_CHAT_DEPLOYMENT") or "gpt-4o-mini"
AZ_EMBED_DEPLOY = os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT") or os.getenv("AOAI_EMBED_DEPLOYMENT") or ""
# ----- local data config -----
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out")
BLOB_DIR = os.getenv("BLOB_DIR", "")
TOP_K = int(os.getenv("TOP_K", "12"))
# ---------- Helpers to build LLM/Embeddings for Azure OpenAI ----------
# ---------- LLM and embeddings ----------
def make_llm(temperature: float = 0.2) -> AzureChatOpenAI:
if not AZ_ENDPOINT or not AZ_API_KEY:
raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).")
@ -77,14 +58,19 @@ def _iter_chunk_files() -> List[pathlib.Path]:
paths: List[pathlib.Path] = []
if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists():
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")]
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/hec_*.jsonl*")]
if BLOB_DIR and pathlib.Path(BLOB_DIR).exists():
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)]
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/hec_*.jsonl*", recursive=True)]
return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True)
def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
data = path.read_bytes()
if path.suffix == ".gz":
data = gzip.decompress(data)
try:
data = gzip.decompress(data)
except Exception:
pass
out: List[Dict[str, Any]] = []
for ln in data.splitlines():
if not ln.strip(): continue
@ -94,7 +80,6 @@ def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
continue
return out
# Accept either raw events or HEC-shaped {"event": {...}}
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
return rec.get("event", rec)
@ -105,7 +90,7 @@ def _evt_to_text(evt: Dict[str, Any]) -> str:
parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None]
return "bonifico | " + " | ".join(parts)
# ---------- Build vector store (only if embeddings deployment exists) ----------
# ---------- Vector store ----------
def build_vectorstore(limit_files: int = 20):
embs = make_embeddings()
if embs is None:
@ -149,7 +134,6 @@ def stats_tool_impl(query: str = "") -> str:
for rec in _read_jsonl(fp):
events.append(_normalize_event(rec))
# parse filters
q = query.lower()
def _kv(key, pat=r"([^\s]+)"):
m = re.search(fr"{key}:{pat}", q)
@ -163,6 +147,7 @@ def stats_tool_impl(query: str = "") -> str:
instant_s = _kv("instant")
min_amt_s = _kv("min_amount")
min_amt = float(min_amt_s) if min_amt_s else 0.0
inst_f = None
if instant_s in {"true","false"}:
inst_f = (instant_s == "true")
@ -185,7 +170,6 @@ def stats_tool_impl(query: str = "") -> str:
if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f:
return False
if country:
# heuristic from IBAN (dest or origin)
iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper()
if not iban.startswith(country.upper()):
return False
@ -207,24 +191,20 @@ def stats_tool_impl(query: str = "") -> str:
def retrieve_tool_impl(question: str) -> str:
vs, _ = build_vectorstore()
docs = vs.similarity_search(question, k=TOP_K)
return "\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs))
return "\n".join(f\"[{i+1}] {d.page_content}\" for i, d in enumerate(docs))
def raw_sample_tool_impl(arg: str = "") -> str:
"""
Return a few raw JSON events from the newest chunks.
Accepts the same filters as get_stats PLUS optional 'n:<int>' to control how many.
Examples:
'n:5 status:rejected min_amount:10000'
'divisa:EUR instant:true step:esito n:3'
Accepts the same filters as get_stats PLUS optional 'n:<int>'.
"""
q = (arg or "").lower()
# helpers (same parsing as get_stats)
def _kv(key, pat=r"([^\s]+)"):
m = re.search(fr"{key}:{pat}", q)
return m.group(1) if m else None
n_s = _kv("n", r"(\d+)")
n_s = _kv("n", r"(\\d+)")
n = int(n_s) if n_s else 5
status_f = _kv("status")
step_f = _kv("step")
@ -262,7 +242,6 @@ def raw_sample_tool_impl(arg: str = "") -> str:
return False
return True
# load newest events and filter
files = _iter_chunk_files()
out = []
for fp in files:
@ -277,8 +256,7 @@ def raw_sample_tool_impl(arg: str = "") -> str:
if not out:
return "(no matching events)"
return "\n".join(out)
return "\\n".join(out)
# ---------- Build the agent ----------
def build_agent():
@ -287,7 +265,7 @@ def build_agent():
Tool(name="get_stats", func=stats_tool_impl,
description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."),
Tool(name="raw_samples", func=raw_sample_tool_impl,
description="Return a few raw JSON events. Accepts filters like get_stats and 'n:<int>'. Example: 'n:5 status:rejected min_amount:10000'.")
description="Return a few raw JSON events. Accepts filters like get_stats and 'n:<int>'."),
]
if AZ_EMBED_DEPLOY:
tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl,
@ -317,15 +295,12 @@ def run_default_question(question_override: str | None = None):
)
out = agent.invoke({"input": question, "chat_history": []})
result = out.get("output", "")
print("\n=== AGENT OUTPUT ===\n", result)
# Email the result if MAIL_ENABLED=true (handled inside notify.py)
print("\\n=== AGENT OUTPUT ===\\n", result)
try:
send_email(subject="[Intesa Logs] Agent Report", body_text=result)
except Exception as e:
print("[notify] email failed:", e)
if __name__ == "__main__":
# optional CLI: allow a custom question
custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
run_default_question(custom if custom else None)

View File

@ -1,32 +1,24 @@
# flask_app.py
import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt
from typing import Optional
# api/flask_app.py
import os, io, glob, gzip, json, time, uuid, tempfile, pathlib, datetime as dt
from typing import List, Dict, Any, Tuple
from urllib.parse import urlparse
from flask import Flask, request, jsonify
from dotenv import load_dotenv
# Load .env locally (App Service uses App Settings instead)
load_dotenv(os.getenv("ENV_FILE", ".env"))
# Agent + email
from agent_runner import build_agent
# Optional email
from notify import send_email
# Azure SDKs (guarded imports so we don't crash at boot)
# ---------------- Azure SDKs (guarded) ----------------
try:
from azure.storage.blob import BlobServiceClient, ContentSettings
from azure.storage.blob import BlobServiceClient, BlobClient
except Exception:
BlobServiceClient = None
ContentSettings = None
BlobClient = None
try:
from azure.storage.queue import QueueClient
except Exception:
QueueClient = None
app = Flask(__name__)
# -------- Helpers --------
def _blob_client() -> BlobServiceClient:
def _blob_client() -> "BlobServiceClient":
if not BlobServiceClient:
raise RuntimeError("azure-storage-blob not installed")
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
@ -34,152 +26,448 @@ def _blob_client() -> BlobServiceClient:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
return BlobServiceClient.from_connection_string(cs)
def _queue_client() -> QueueClient:
if not QueueClient:
raise RuntimeError("azure-storage-queue not installed")
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
if not cs:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
qc = QueueClient.from_connection_string(cs, qname)
try:
qc.create_queue()
except Exception:
pass
return qc
def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str:
svc = _blob_client()
cc = svc.get_container_client(container)
try:
cc.create_container()
except Exception:
pass
ext = "jsonl.gz" if compressed else "jsonl"
# folder scheme matches poller
prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}"
blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}"
data = gzip.compress(raw_bytes) if compressed else raw_bytes
settings = ContentSettings(
content_type="application/json",
content_encoding=("gzip" if compressed else None),
)
bc = cc.get_blob_client(blob_name)
bc.upload_blob(data, overwrite=True, content_settings=settings)
return blob_name
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
def _download_blob_bytes(container: str, blob_name: str) -> bytes:
svc = _blob_client()
blob = svc.get_blob_client(container=container, blob=blob_name)
data = blob.download_blob().readall()
fname = os.path.basename(blob_name)
path = os.path.join(outdir, fname)
with open(path, "wb") as f:
f.write(data)
return path
return blob.download_blob().readall()
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
if not BlobServiceClient:
# ultra-light fallback
import urllib.request
data = urllib.request.urlopen(sas_url, timeout=30).read()
else:
from azure.storage.blob import BlobClient
def _download_sas_bytes(sas_url: str) -> bytes:
if BlobClient:
blob = BlobClient.from_blob_url(sas_url)
data = blob.download_blob().readall()
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
path = os.path.join(outdir, name)
open(path, "wb").write(data)
return blob.download_blob().readall()
else:
import urllib.request
return urllib.request.urlopen(sas_url, timeout=30).read()
# ---------------- Agent (guarded) ----------------
_CAN_LLM = True
try:
import agent_runner # we'll temporarily set agent_runner.CHUNK_DIR when a blob is provided
build_agent = agent_runner.build_agent
except Exception:
_CAN_LLM = False
agent_runner = None
build_agent = None
app = Flask(__name__)
# ---------------- Config ----------------
CHUNK_DIR = pathlib.Path(os.getenv("CHUNK_DIR", "/app/out"))
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
AUTO_ANALYZE = os.getenv("AUTO_ANALYZE", "true").lower() in {"1","true","yes"}
DEFAULT_QUESTION = os.getenv("DEFAULT_QUESTION",
"Scan the latest chunks. List anomalies (rejected EUR >= 10000 and vop_no_match). "
"Provide brief evidence and next steps."
)
EMAIL_SEND_BY_DEFAULT = os.getenv("MAIL_ENABLED", "false").lower() in {"1","true","yes"}
# ---------------- Helpers ----------------
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
"""Accept raw or HEC-shaped records (`{'event': {...}}`)."""
return rec.get("event", rec)
def _write_chunk_from_lines(jsonl_lines: List[bytes], dest_dir: pathlib.Path) -> pathlib.Path:
name = f"chunk_{int(time.time())}_{uuid.uuid4().hex[:8]}.jsonl.gz"
path = dest_dir / name
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
for ln in jsonl_lines:
gz.write(ln if ln.endswith(b"\n") else ln + b"\n")
path.write_bytes(buf.getvalue())
return path
# -------- Routes --------
def _bytes_is_gzip(b: bytes) -> bool:
return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B
def _decode_utf8(b: bytes) -> str:
# be permissive; some exports contain odd characters / BOM
return b.decode("utf-8", errors="replace").lstrip("\ufeff")
def _normalize_bytes_to_jsonl_lines(data: bytes) -> Tuple[List[bytes], int]:
"""
Accepts: JSONL, JSONL.GZ, JSON array, or single JSON object.
Returns: (list of JSONL lines as bytes, number_of_events)
Raises: ValueError if format is not recognized.
"""
# decompress if gz
if _bytes_is_gzip(data):
try:
data = gzip.decompress(data)
except Exception:
raise ValueError("Invalid gzip stream")
text = _decode_utf8(data).strip()
if not text:
raise ValueError("Empty blob")
# Try JSON first (object or array)
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
ev = _normalize_event(parsed)
return [json.dumps(ev, separators=(",", ":")).encode("utf-8")], 1
if isinstance(parsed, list):
lines = []
count = 0
for item in parsed:
if isinstance(item, (dict,)):
ev = _normalize_event(item)
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
count += 1
if count == 0:
raise ValueError("JSON array has no objects")
return lines, count
# If JSON but not dict/list, fall through to JSONL attempt
except Exception:
pass
# Try JSONL (one JSON object per line)
lines = []
count = 0
for raw in text.splitlines():
s = raw.strip()
if not s:
continue
try:
obj = json.loads(s)
ev = _normalize_event(obj)
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
count += 1
except Exception:
# Not valid JSON per line -> not JSONL
raise ValueError("Unrecognized blob format. Expected JSONL, JSONL.GZ, JSON array, or single JSON object.")
if count == 0:
raise ValueError("No JSON objects found")
return lines, count
def _iter_recent_events(limit_files: int = 10, limit_events: int = 5000) -> List[Dict[str, Any]]:
"""Load recent events from newest chunk_*.jsonl(.gz) and hec_*.jsonl(.gz) in CHUNK_DIR."""
patterns = ["chunk_*.jsonl*", "hec_*.jsonl*"]
files: List[pathlib.Path] = []
for pat in patterns:
files += [pathlib.Path(p) for p in glob.glob(str(CHUNK_DIR / pat))]
files = sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)[:limit_files]
out: List[Dict[str, Any]] = []
for fp in files:
data = fp.read_bytes()
if fp.suffix == ".gz":
data = gzip.decompress(data)
for ln in data.splitlines():
if not ln.strip():
continue
try:
rec = json.loads(ln)
out.append(_normalize_event(rec))
except Exception:
continue
if len(out) >= limit_events:
return out
return out
def _as_float(x) -> float:
try:
return float(x)
except Exception:
return 0.0
def _rules_only_report() -> str:
"""Simple rules when AOAI creds are not set."""
evts = _iter_recent_events(limit_files=20, limit_events=20000)
total = len(evts)
rej_hi, vop_no = [], []
for e in evts:
status = (e.get("status") or "").lower()
divisa = (e.get("divisa") or "").upper()
amt = _as_float(e.get("importo"))
vop = (e.get("vop_check") or "").lower()
if status == "rejected" and divisa == "EUR" and amt >= 10000:
rej_hi.append({"transaction_id": e.get("transaction_id"), "importo": amt, "step": e.get("step")})
if vop in {"no_match", "vop_no_match"}:
vop_no.append({"transaction_id": e.get("transaction_id"), "step": e.get("step")})
lines = []
lines.append("### Findings")
lines.append(f"- Total events scanned: {total}")
lines.append(f"- High-value rejected (EUR≥10000): {len(rej_hi)}")
lines.append(f"- VOP no_match: {len(vop_no)}")
if rej_hi[:5]:
lines.append(" - Examples (rejected): " + ", ".join(
f"{i.get('transaction_id')}({i.get('importo')})" for i in rej_hi[:5] if i.get('transaction_id')
))
if vop_no[:5]:
lines.append(" - Examples (vop_no_match): " + ", ".join(
f"{i.get('transaction_id')}" for i in vop_no[:5] if i.get('transaction_id')
))
lines.append("")
lines.append("### Recommended actions")
for r in (
"Validate VOP mismatches; confirm beneficiary details.",
"Investigate rejection reasons for all EUR≥10k transactions.",
"Check spike trends by hour/day and counterparties.",
):
lines.append(f"- {r}")
return "\n".join(lines)
def _try_llm_report(question: str) -> str:
if not _CAN_LLM or not build_agent:
raise RuntimeError("LLM agent not available")
agent = build_agent()
out = agent.invoke({"input": question, "chat_history": []})
return out.get("output", "") or "(no output from agent)"
def _run_analysis(question: str) -> Dict[str, Any]:
t0 = time.time()
used = "rules"
try:
if _CAN_LLM and os.getenv("AZURE_OPENAI_ENDPOINT") and (
os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY")
):
txt = _try_llm_report(question)
used = "llm"
else:
txt = _rules_only_report()
except Exception as e:
txt = _rules_only_report()
used = f"rules (fallback from llm error: {str(e)[:120]})"
return {"ok": True, "analyzer": used, "duration_sec": round(time.time() - t0, 3), "result": txt}
# ---------------- Routes ----------------
@app.get("/health")
def health():
return {"status": "ok"}, 200
return {"status": "ok", "auto_analyze": AUTO_ANALYZE, "chunk_dir": str(CHUNK_DIR)}, 200
@app.post("/ingest")
def ingest():
"""
POST JSON:
{ "events": [ {..}, {..} ] } OR a single object {..}
Writes a chunk and (optionally) auto-analyzes.
"""
body = request.get_json(force=True, silent=True)
if body is None:
return jsonify({"ok": False, "error": "invalid JSON"}), 400
# normalize to list (support HEC-style {"event": {...}})
events: List[Dict[str, Any]] = []
if isinstance(body, dict) and "events" in body and isinstance(body["events"], list):
events = [_normalize_event(e) for e in body["events"] if isinstance(e, dict)]
elif isinstance(body, list):
events = [_normalize_event(e) for e in body if isinstance(e, dict)]
elif isinstance(body, dict):
events = [_normalize_event(body)]
else:
return jsonify({"ok": False, "error": "payload must be an object or list"}), 400
if not events:
return jsonify({"ok": False, "error": "no events to ingest"}), 400
lines = [json.dumps(e, separators=(",", ":")).encode("utf-8") for e in events]
path = _write_chunk_from_lines(lines, CHUNK_DIR)
report = None
if AUTO_ANALYZE:
report = _run_analysis(DEFAULT_QUESTION)
if EMAIL_SEND_BY_DEFAULT:
try:
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"])
except Exception:
pass
return jsonify({
"ok": True,
"written": len(events),
"chunk_file": path.name,
"auto_analyzed": bool(AUTO_ANALYZE),
"report": report,
}), 200
@app.post("/ingest_blob")
def ingest_blob():
"""
Accept a blob, normalize it to chunk_*.jsonl.gz in CHUNK_DIR, optionally analyze.
Body:
{
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
// OR
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
"analyze": true,
"email": {"send": true, "to": "x@x"}
}
"""
payload = request.get_json(force=True, silent=True) or {}
analyze = bool(payload.get("analyze"))
data: bytes = b""
try:
if payload.get("sas_url"):
data = _download_sas_bytes(payload["sas_url"])
elif payload.get("container") and payload.get("blob_name"):
data = _download_blob_bytes(payload["container"], payload["blob_name"])
else:
return jsonify({"ok": False, "error": "Provide 'sas_url' OR ('container' + 'blob_name')"}), 400
lines, count = _normalize_bytes_to_jsonl_lines(data)
path = _write_chunk_from_lines(lines, CHUNK_DIR)
report = None
if analyze:
report = _run_analysis(DEFAULT_QUESTION)
email_cfg = payload.get("email") or {}
if email_cfg.get("send"):
try:
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"], to_addr=email_cfg.get("to"))
except Exception:
pass
return jsonify({"ok": True, "written": count, "chunk_file": path.name, "report": report}), 200
except ValueError as ve:
return jsonify({"ok": False, "error": str(ve)}), 400
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.post("/analyze")
def analyze():
"""
POST JSON:
Manually trigger analysis over the newest chunks, OR over a specific blob.
Body options:
{
"question": "...optional custom question...",
"email": {"send": true, "to": "override@example.com"},
"question": "...",
"email": {"send": true, "to": "x@x"},
"blob": {
"container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]"
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
// OR
"sas_url": "https://.../chunk.jsonl.gz?sig=..."
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
}
}
"""
t0 = time.time()
global CHUNK_DIR # declare BEFORE any use in this function
payload = request.get_json(force=True, silent=True) or {}
question = payload.get("question") or (
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps."
)
question = payload.get("question") or DEFAULT_QUESTION
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
tmp_dir = None
prev_env_chunk = os.getenv("CHUNK_DIR")
prev_agent_chunk = (getattr(agent_runner, "CHUNK_DIR", str(CHUNK_DIR))
if agent_runner else str(CHUNK_DIR))
prev_local_chunk = CHUNK_DIR
try:
blob_req = payload.get("blob")
if blob_req:
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
# download to bytes
if blob_req.get("sas_url"):
_download_sas_to_dir(blob_req["sas_url"], tmp_dir)
data = _download_sas_bytes(blob_req["sas_url"])
elif blob_req.get("container") and blob_req.get("blob_name"):
_download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir)
data = _download_blob_bytes(blob_req["container"], blob_req["blob_name"])
else:
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
# normalize to a real chunk_*.jsonl.gz in tmp_dir
lines, _ = _normalize_bytes_to_jsonl_lines(data)
_write_chunk_from_lines(lines, pathlib.Path(tmp_dir))
# re-point CHUNK_DIR for this request and agent_runner
CHUNK_DIR = pathlib.Path(tmp_dir)
os.environ["CHUNK_DIR"] = tmp_dir
if agent_runner:
agent_runner.CHUNK_DIR = tmp_dir
if hasattr(agent_runner, "BLOB_DIR"):
agent_runner.BLOB_DIR = ""
agent = build_agent()
out = agent.invoke({"input": question, "chat_history": []})
result = out.get("output", "")
# run analysis
res = _run_analysis(question)
# optional email
email_cfg = payload.get("email") or {}
if email_cfg.get("send"):
to_addr = email_cfg.get("to")
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
try:
send_email(subject="[Intesa Logs] Report", body_text=res["result"], to_addr=email_cfg.get("to"))
except Exception:
pass
return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200
return jsonify(res), 200
except ValueError as ve:
return jsonify({"ok": False, "error": str(ve)}), 400
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
finally:
os.environ["CHUNK_DIR"] = prev_chunk_dir
# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC)
@app.post("/collect")
@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility
def collect_hec():
try:
container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
# Accept either single JSON object or a list; we will write one line per event
body = request.get_json(force=True, silent=True)
if body is None:
return jsonify({"ok": False, "error": "invalid JSON"}), 400
lines = []
if isinstance(body, list):
for item in body:
lines.append(json.dumps(item, separators=(",", ":")))
# restore CHUNK_DIR context
if prev_env_chunk is None:
os.environ.pop("CHUNK_DIR", None)
else:
lines.append(json.dumps(body, separators=(",", ":")))
raw = ("\n".join(lines) + "\n").encode("utf-8")
os.environ["CHUNK_DIR"] = prev_env_chunk
if agent_runner:
agent_runner.CHUNK_DIR = prev_agent_chunk
CHUNK_DIR = pathlib.Path(prev_local_chunk)
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
blob_name = _upload_chunk_blob(container, raw, compressed=True)
# ---------------- Event Grid webhook ----------------
@app.post("/eventgrid")
def eventgrid():
"""
Webhook for Azure Event Grid (Storage -> BlobCreated).
- Handles SubscriptionValidation by echoing validationResponse.
- For each BlobCreated event, extracts container/blob and ingests it.
- Optional auto-analyze + email is controlled by:
EVENTGRID_ANALYZE_DEFAULT (true/false) [default: true]
EVENTGRID_EMAIL_DEFAULT (true/false) [default: MAIL_ENABLED]
"""
try:
events = request.get_json(force=True, silent=True)
if not events:
return jsonify({"ok": False, "error": "No events"}), 400
if isinstance(events, dict):
events = [events]
# Enqueue a message your queue-worker understands
msg = {
"blob": {"container": container, "blob_name": blob_name},
# flip to true if you want emails by default
"email": {"send": False}
}
analyze_default = os.getenv("EVENTGRID_ANALYZE_DEFAULT", "true").lower() in {"1","true","yes"}
email_default = os.getenv("EVENTGRID_EMAIL_DEFAULT", os.getenv("MAIL_ENABLED", "false")).lower() in {"1","true","yes"}
target_container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
qc = _queue_client()
qc.send_message(json.dumps(msg, separators=(",", ":")))
for ev in events:
et = ev.get("eventType")
if et == "Microsoft.EventGrid.SubscriptionValidationEvent":
validation_code = ev.get("data", {}).get("validationCode")
return jsonify({"validationResponse": validation_code})
return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200
if et in ("Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobRenamed"):
url = (ev.get("data", {}) or {}).get("url")
if not url:
continue
u = urlparse(url)
# path like: /container/dir1/dir2/file.json
parts = [p for p in u.path.split("/") if p]
if len(parts) < 2:
continue
container = parts[0]
blob_name = "/".join(parts[1:])
# Only act on the configured container
if container != target_container:
continue
# Download the blob, normalize, write chunk, analyze/email if enabled
data = _download_blob_bytes(container, blob_name)
lines, _ = _normalize_bytes_to_jsonl_lines(data)
_write_chunk_from_lines(lines, CHUNK_DIR)
if analyze_default:
rep = _run_analysis(DEFAULT_QUESTION)
if email_default:
try:
send_email(subject="[Intesa Logs] Auto Analysis", body_text=rep["result"])
except Exception:
pass
return jsonify({"ok": True}), 200
except ValueError as ve:
return jsonify({"ok": False, "error": str(ve)}), 400
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
return jsonify({"ok": False, "error": str(e)}), 500

67
api/notify.py Normal file
View File

@ -0,0 +1,67 @@
# api/notify.py
import os, smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from dotenv import load_dotenv
# Load .env before reading anything
load_dotenv(os.getenv("ENV_FILE", ".env"))
_TRUES = {"1", "true", "yes", "on"}
def _b(s: str | None, default=False) -> bool:
if s is None:
return default
return str(s).strip().lower() in _TRUES
def _first(*names: str, default: str | None = None) -> str | None:
for n in names:
v = os.getenv(n)
if v is not None and str(v).strip() != "":
return v
return default
def _cfg():
"""Merge MAIL_* and SMTP_* envs; MAIL_* wins if both present."""
enabled = _b(_first("MAIL_ENABLED"), default=False)
host = _first("MAIL_HOST", "SMTP_HOST")
port = int(_first("MAIL_PORT", "SMTP_PORT", default="0") or "0")
user = _first("MAIL_USER", "SMTP_USER")
pwd = _first("MAIL_PASSWORD", "SMTP_PASS")
mail_from = _first("MAIL_FROM", default=user)
mail_to = _first("MAIL_TO_DEFAULT", "MAIL_TO")
tls = _b(_first("MAIL_TLS"), default=False)
return {
"enabled": enabled,
"host": host, "port": port, "user": user, "pwd": pwd,
"from": mail_from, "to": mail_to, "tls": tls
}
def send_email(subject: str, body_text: str, to_addr: str | None = None):
cfg = _cfg()
if not cfg["enabled"]:
print("[notify] MAIL_ENABLED is not true; skipping email")
return
host, port, user, pwd = cfg["host"], cfg["port"], cfg["user"], cfg["pwd"]
mail_from = cfg["from"]
mail_to = to_addr or cfg["to"]
if not (host and port and user and pwd and mail_to):
print("[notify] missing SMTP config; skipping email")
return
msg = MIMEText(body_text, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = formataddr(("Intesa Logs Agent", mail_from))
msg["To"] = mail_to
with smtplib.SMTP(host, port, timeout=20) as s:
if cfg["tls"]:
try:
s.starttls()
except smtplib.SMTPException:
pass
s.login(user, pwd)
s.send_message(msg)
print(f"[notify] sent email to {mail_to}")

View File

@ -3,7 +3,6 @@ langchain-core>=0.3.27,<0.4
langchain-community>=0.3,<0.4
langchain-openai>=0.2.12,<0.3
openai>=1.40
faiss-cpu==1.8.*
ujson>=5
pydantic>=2
python-dotenv>=1

View File

@ -0,0 +1,24 @@
import os, json, logging, datetime as dt
import azure.functions as func
from azure.storage.blob import BlobServiceClient
# Source storage (same one your trigger points to)
BLOB_CONN = os.environ["BLOB_CONN"] # set as App Setting
CONTAINER = "bank-logs" # container your trigger uses
def main(blob: func.InputStream):
# 1) Log to App Insights (if configured)
logging.info("BlobIngest fired: name=%s len=%d", blob.name, blob.length)
# 2) Write a marker blob so we can *see* it fired from Storage itself
svc = BlobServiceClient.from_connection_string(BLOB_CONN)
marker_name = f"intesa/_diag/processed-{dt.datetime.utcnow().strftime('%Y%m%dT%H%M%S')}.txt"
payload = {
"source": blob.name,
"length": blob.length,
"seen_at_utc": dt.datetime.utcnow().isoformat() + "Z"
}
svc.get_blob_client(CONTAINER, marker_name).upload_blob(
json.dumps(payload).encode("utf-8"),
overwrite=True
)

View File

@ -0,0 +1,51 @@
#THIS IS WHAT IS USED IN PROD, CURRENT MAIN INIT IS FOR TESTING ONLY
import os, json, gzip, logging, requests
import azure.functions as func
AGENT_API_URL = os.getenv("AGENT_API_URL") # e.g. https://agent-api-app.azurewebsites.net
def _to_events(data: bytes):
# gunzip if needed
if len(data) >= 2 and data[0] == 0x1F and data[1] == 0x8B:
data = gzip.decompress(data)
text = data.decode("utf-8", errors="replace").lstrip("\ufeff").strip()
if not text:
return []
# try JSON (object or array)
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return [parsed]
if isinstance(parsed, list):
return [x for x in parsed if isinstance(x, dict)]
except Exception:
pass
# try JSONL
out = []
for ln in text.splitlines():
s = ln.strip()
if not s:
continue
try:
out.append(json.loads(s))
except Exception:
pass
return out
def main(inputblob: func.InputStream):
if not AGENT_API_URL:
logging.warning("AGENT_API_URL not set; skipping.")
return
events = _to_events(inputblob.read())
if not events:
logging.info("No JSON events parsed from blob %s", inputblob.name)
return
try:
url = AGENT_API_URL.rstrip("/") + "/ingest"
r = requests.post(url, json={"events": events}, timeout=30)
logging.info("Agent ingest -> %s %s", r.status_code, r.text[:300])
r.raise_for_status()
except Exception as e:
logging.exception("Ingest failed: %s", e)

View File

@ -0,0 +1,11 @@
{
"bindings": [
{
"type": "blobTrigger",
"direction": "in",
"name": "blob",
"path": "bank-logs/intesa/test-exact.json",
"connection": "BLOB_CONN"
}
]
}

View File

@ -0,0 +1,4 @@
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
return func.HttpResponse("pong", status_code=200)

View File

@ -0,0 +1,17 @@
{
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"authLevel": "anonymous",
"name": "req",
"methods": [ "get", "post" ],
"route": "ping"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}

8
blob-ingestor/host.json Normal file
View File

@ -0,0 +1,8 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"logging": { "applicationInsights": { "samplingSettings": { "isEnabled": true } } }
}

View File

@ -0,0 +1,2 @@
azure-functions>=1.20.0
requests>=2.31.0

View File

@ -1,103 +1,36 @@
services:
splunk:
image: splunk/splunk:9.4.2
container_name: splunk
restart: unless-stopped
ports:
- "8000:8000" # Splunk Web
- "8088:8088" # HEC
- "8089:8089" # Management API
environment:
SPLUNK_START_ARGS: --accept-license
SPLUNK_PASSWORD: ${SPLUNK_PASSWORD:-Str0ngP@ss!9}
SPLUNK_HEC_TOKEN: ${SPLUNK_HEC_TOKEN:-dev-0123456789abcdef}
volumes:
- splunk-etc:/opt/splunk/etc
- splunk-var:/opt/splunk/var
healthcheck:
test: ["CMD-SHELL", "curl -sk https://localhost:8089/services/server/info | grep -q version"]
interval: 10s
timeout: 5s
retries: 30
poller:
build:
context: .
dockerfile: poller/Dockerfile
container_name: splunk-poller
restart: unless-stopped
depends_on:
splunk:
condition: service_healthy
environment:
# --- Splunk connection (to containerized Splunk) ---
SPLUNK_HOST: splunk
SPLUNK_PORT: "8089"
SPLUNK_USER: admin
SPLUNK_PW: ${SPLUNK_PASSWORD:-Str0ngP@ss!9}
SPLUNK_VERIFY_SSL: "false"
# --- What to read ---
SPLUNK_INDEX: intesa_payments
SPLUNK_SOURCETYPE: intesa:bonifico
INITIAL_LOOKBACK: -24h@h
CREATE_INDEX_IF_MISSING: "true"
# --- Polling / chunking ---
SLEEP_SECONDS: "60"
MAX_CHUNK_BYTES: "1800000"
# --- Sink selection: file (local) | blob (azure) | blob+queue (azure) ---
SINK: blob+queue
OUTDIR: /app/out
# --- Azure Storage (Blob + Queue) ---
AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-}
AZURE_STORAGE_CONTAINER: ${AZURE_STORAGE_CONTAINER:-bank-logs}
AZURE_STORAGE_QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks}
AZURE_COMPRESS: "true"
# --- Email default for enqueued messages ---
POLLER_EMAIL_SEND_DEFAULT: "true"
volumes:
- chunks:/app/out
agent-api:
build:
context: .
dockerfile: api/Dockerfile
container_name: agent-api
restart: unless-stopped
depends_on:
- poller
ports:
- "8080:8080"
env_file:
- .env # AOAI + Mailtrap, etc.
- .env # pulls your AZURE_OPENAI_*, SMTP_*, MAIL_*, etc.
environment:
CHUNK_DIR: /app/out
TOP_K: "12"
# If the API should read blobs directly, ensure these also exist in .env:
# AZURE_STORAGE_CONNECTION_STRING=...
# AZURE_STORAGE_CONTAINER=bank-logs
CHUNK_DIR: /app/out # where the agent reads chunk files
volumes:
- chunks:/app/out
restart: unless-stopped
queue-worker:
file-poller:
build:
context: .
dockerfile: worker/Dockerfile
container_name: queue-worker
restart: unless-stopped
dockerfile: poller/Dockerfile
container_name: file-poller
environment:
INDIR: /app/in # folder the poller watches
OUTDIR: /app/out # folder it writes chunk_*.jsonl
SLEEP_SECONDS: 60 # scan every minute
EMAIL_SEND_DEFAULT: "true" # tell agent-api to email results
AGENT_API_URL: http://agent-api:8080
volumes:
- ./data:/app/in:rw # drop your .jsonl files here (on Windows)
- chunks:/app/out
depends_on:
- agent-api
env_file:
- .env # to pick up AZURE_STORAGE_CONNECTION_STRING if you keep it here
environment:
AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-}
QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks}
ANALYZER_URL: http://agent-api:8080/analyze # inside compose network
POLL_INTERVAL_SEC: "60"
MAX_DEQUEUE: "1"
VISIBILITY_TIMEOUT: "120"
HTTP_TIMEOUT: "120"
restart: unless-stopped
volumes:
splunk-etc:
splunk-var:
chunks:

1
data/_done/batch1.jsonl Normal file
View File

@ -0,0 +1 @@
{"event_type":"bonifico","transaction_id":"T1","step":"esito","status":"rejected","importo":12000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12A1234512345123451234512"}

1
data/_done/batch2.jsonl Normal file
View File

@ -0,0 +1 @@
{"event_type":"bonifico","transaction_id":"T2","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match"}

Binary file not shown.

1
data/local/_sanity.jsonl Normal file
View File

@ -0,0 +1 @@
{"event_type":"bonifico","transaction_id":"T900","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12ZZ000000000000000000000","bic_swift":"TESTGB2L","instantaneo":true}

2
data/local/sampl3.jsonl Normal file
View File

@ -0,0 +1,2 @@
{"event_type":"bonifico","transaction_id":"W1","step":"esito","status":"accepted","importo":9500,"divisa":"EUR","vop_check":"match","data_pagamento":"2025-09-26T12:01:00Z"}
{"event_type":"bonifico","transaction_id":"W2","step":"esito","status":"rejected","importo":150000,"divisa":"EUR","vop_check":"no_match","data_pagamento":"2025-09-26T12:05:00Z"}

2
data/local/sampl33.json Normal file
View File

@ -0,0 +1,2 @@
{"event_type":"bonifico","transaction_id":"W1","step":"esito","status":"accepted","importo":9500,"divisa":"EUR","vop_check":"match","data_pagamento":"2025-09-26T12:01:00Z"}
{"event_type":"bonifico","transaction_id":"W2","step":"esito","status":"rejected","importo":150000,"divisa":"EUR","vop_check":"no_match","data_pagamento":"2025-09-26T12:05:00Z"}

View File

@ -0,0 +1,2 @@
{"event_type":"bonifico","transaction_id":"T100","step":"esito","status":"rejected","importo":12000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12ZZ000000000000000000000","bic_swift":"TESTGB2L","instantaneo":true}
{"event_type":"bonifico","transaction_id":"T101","step":"esito","status":"accepted","importo":4000,"divisa":"EUR","vop_check":"match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"DE89370400440532013000","bic_swift":"DEUTDEFF","instantaneo":false}

1
export.json Normal file
View File

@ -0,0 +1 @@
{"event_type":"bonifico","transaction_id":"T777","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match","instantaneo":true}

View File

@ -1,185 +0,0 @@
# flask_app.py
import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt
from typing import Optional
from flask import Flask, request, jsonify
from dotenv import load_dotenv
# Load .env locally (App Service uses App Settings instead)
load_dotenv(os.getenv("ENV_FILE", ".env"))
# Agent + email
from agent_runner import build_agent
from notify import send_email
# Azure SDKs (guarded imports so we don't crash at boot)
try:
from azure.storage.blob import BlobServiceClient, ContentSettings
except Exception:
BlobServiceClient = None
ContentSettings = None
try:
from azure.storage.queue import QueueClient
except Exception:
QueueClient = None
app = Flask(__name__)
# -------- Helpers --------
def _blob_client() -> BlobServiceClient:
if not BlobServiceClient:
raise RuntimeError("azure-storage-blob not installed")
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
if not cs:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
return BlobServiceClient.from_connection_string(cs)
def _queue_client() -> QueueClient:
if not QueueClient:
raise RuntimeError("azure-storage-queue not installed")
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
if not cs:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
qc = QueueClient.from_connection_string(cs, qname)
try:
qc.create_queue()
except Exception:
pass
return qc
def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str:
svc = _blob_client()
cc = svc.get_container_client(container)
try:
cc.create_container()
except Exception:
pass
ext = "jsonl.gz" if compressed else "jsonl"
# folder scheme matches poller
prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}"
blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}"
data = gzip.compress(raw_bytes) if compressed else raw_bytes
settings = ContentSettings(
content_type="application/json",
content_encoding=("gzip" if compressed else None),
)
bc = cc.get_blob_client(blob_name)
bc.upload_blob(data, overwrite=True, content_settings=settings)
return blob_name
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
svc = _blob_client()
blob = svc.get_blob_client(container=container, blob=blob_name)
data = blob.download_blob().readall()
fname = os.path.basename(blob_name)
path = os.path.join(outdir, fname)
with open(path, "wb") as f:
f.write(data)
return path
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
if not BlobServiceClient:
# ultra-light fallback
import urllib.request
data = urllib.request.urlopen(sas_url, timeout=30).read()
else:
from azure.storage.blob import BlobClient
blob = BlobClient.from_blob_url(sas_url)
data = blob.download_blob().readall()
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
path = os.path.join(outdir, name)
open(path, "wb").write(data)
return path
# -------- Routes --------
@app.get("/health")
def health():
return {"status": "ok"}, 200
@app.post("/analyze")
def analyze():
"""
POST JSON:
{
"question": "...optional custom question...",
"email": {"send": true, "to": "override@example.com"},
"blob": {
"container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]"
// OR
"sas_url": "https://.../chunk.jsonl.gz?sig=..."
}
}
"""
t0 = time.time()
payload = request.get_json(force=True, silent=True) or {}
question = payload.get("question") or (
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps."
)
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
tmp_dir = None
try:
blob_req = payload.get("blob")
if blob_req:
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
if blob_req.get("sas_url"):
_download_sas_to_dir(blob_req["sas_url"], tmp_dir)
elif blob_req.get("container") and blob_req.get("blob_name"):
_download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir)
else:
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
os.environ["CHUNK_DIR"] = tmp_dir
agent = build_agent()
out = agent.invoke({"input": question, "chat_history": []})
result = out.get("output", "")
email_cfg = payload.get("email") or {}
if email_cfg.get("send"):
to_addr = email_cfg.get("to")
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
finally:
os.environ["CHUNK_DIR"] = prev_chunk_dir
# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC)
@app.post("/collect")
@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility
def collect_hec():
try:
container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
# Accept either single JSON object or a list; we will write one line per event
body = request.get_json(force=True, silent=True)
if body is None:
return jsonify({"ok": False, "error": "invalid JSON"}), 400
lines = []
if isinstance(body, list):
for item in body:
lines.append(json.dumps(item, separators=(",", ":")))
else:
lines.append(json.dumps(body, separators=(",", ":")))
raw = ("\n".join(lines) + "\n").encode("utf-8")
blob_name = _upload_chunk_blob(container, raw, compressed=True)
# Enqueue a message your queue-worker understands
msg = {
"blob": {"container": container, "blob_name": blob_name},
# flip to true if you want emails by default
"email": {"send": False}
}
qc = _queue_client()
qc.send_message(json.dumps(msg, separators=(",", ":")))
return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500

View File

@ -1,38 +0,0 @@
# notify.py
import os, smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from dotenv import load_dotenv
# load .env automatically (ENV_FILE can override path)
load_dotenv(os.getenv("ENV_FILE", ".env"))
def send_email(subject: str, body_text: str, to_addr: str | None = None):
if os.getenv("MAIL_ENABLED", "false").lower() != "true":
print("[notify] MAIL_ENABLED != true; skipping email")
return
smtp_host = os.getenv("SMTP_HOST")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
smtp_user = os.getenv("SMTP_USER")
smtp_pass = os.getenv("SMTP_PASS")
mail_from = os.getenv("MAIL_FROM") or smtp_user
mail_to = to_addr or os.getenv("MAIL_TO")
if not (smtp_host and smtp_user and smtp_pass and mail_to):
print("[notify] missing SMTP config; skipping email")
return
msg = MIMEText(body_text, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = formataddr(("Intesa Logs Agent", mail_from))
msg["To"] = mail_to
with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as s:
try:
s.starttls()
except smtplib.SMTPException:
pass
s.login(smtp_user, smtp_pass)
s.send_message(msg)
print(f"[notify] sent email to {mail_to}")

View File

@ -1,18 +1,21 @@
# poller/Dockerfile
# poller/Dockerfile (local file poller, no Splunk)
FROM python:3.12-slim
WORKDIR /app
# Helpful system deps
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
COPY poller/requirements.txt .
# minimal deps
COPY poller/requirements.txt ./requirements.txt
RUN python -m pip install --upgrade pip setuptools wheel \
&& pip install --no-cache-dir -r requirements.txt
# Copy the poller script from repo root
COPY splunk_poller.py .
COPY poller/file_poller.py .
# default to root to avoid permission issues on named volumes
ENV PYTHONUNBUFFERED=1
CMD ["python", "-u", "splunk_poller.py"]
# default inputs folder (mounted by compose) and local API endpoint
ENV IN_DIR=/app/in
ENV TARGET_URL=http://agent-api:8080/ingest
ENV BATCH_MAX=1000
ENV SLEEP_SEC=2.0
CMD ["python", "-u", "file_poller.py"]

97
poller/file_poller.py Normal file
View File

@ -0,0 +1,97 @@
# poller/file_poller.py
import os, time, json, uuid, shutil, logging, pathlib, requests
from typing import List
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
INDIR = pathlib.Path(os.getenv("INDIR", "/app/in"))
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "/app/out")) # shared with agent-api
DONE = INDIR / "_done"
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
# Where the agent API lives (container name in compose or localhost outside)
API_BASE = os.getenv("AGENT_API_URL", "http://agent-api:8080").rstrip("/")
# Make the poller ask for emails automatically
EMAIL_SEND_DEFAULT = os.getenv("EMAIL_SEND_DEFAULT", "false").lower() in {"1", "true", "yes"}
QUESTION = os.getenv(
"QUESTION",
"Scan the latest chunks. List anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
"Give a brief summary and next steps."
)
def ensure_dirs():
INDIR.mkdir(parents=True, exist_ok=True)
DONE.mkdir(parents=True, exist_ok=True)
OUTDIR.mkdir(parents=True, exist_ok=True)
def _copy_to_out(src: pathlib.Path) -> pathlib.Path:
ts = int(time.time())
dst = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
shutil.copy2(src, dst)
return dst
def _call_analyze(email_send: bool) -> bool:
"""Tell the agent to analyze latest chunks (it reads CHUNK_DIR=/app/out)."""
url = f"{API_BASE}/analyze"
payload = {
"question": QUESTION,
"email": {"send": bool(email_send)}
}
try:
r = requests.post(url, json=payload, timeout=180)
if r.status_code // 100 == 2:
logging.info("analyze OK: %s", r.text[:400])
return True
logging.error("analyze HTTP %s: %s", r.status_code, r.text[:400])
return False
except Exception as e:
logging.error("analyze failed: %s", e)
return False
def _process_file(f: pathlib.Path) -> bool:
"""
Copy the file into OUTDIR as a chunk, call /analyze, move file to _done.
Return True if fully handled (so we move to _done), False to retry later.
"""
logging.info("processing %s", f.name)
try:
# 1) copy to /app/out where agent looks for chunk_*.jsonl
dst = _copy_to_out(f)
logging.info("placed chunk %s", dst.name)
# 2) trigger analysis (and email if enabled)
ok = _call_analyze(EMAIL_SEND_DEFAULT)
if not ok:
logging.info("will retry %s later", f.name)
return False
# 3) move original to _done
shutil.move(str(f), str(DONE / f.name))
logging.info("done %s", f.name)
return True
except Exception as e:
logging.error("error processing %s: %s", f.name, e)
logging.info("will retry %s later", f.name)
return False
def list_inputs() -> List[pathlib.Path]:
return sorted([p for p in INDIR.glob("*.jsonl") if p.is_file()])
def main():
ensure_dirs()
logging.info("file-poller watching %s -> %s target=analyze email_default=%s",
INDIR, f"{API_BASE}/analyze", EMAIL_SEND_DEFAULT)
while True:
files = list_inputs()
if not files:
time.sleep(SLEEP_SECONDS)
continue
for f in files:
_process_file(f)
# brief backoff between batches
time.sleep(2)
if __name__ == "__main__":
main()

View File

@ -1,6 +1 @@
splunk-sdk==2.0.2
langchain-core==0.2.*
azure-storage-blob>=12.19.0
azure-storage-queue>=12.9.0
ujson
requests

View File

@ -1,14 +0,0 @@
langchain>=0.3,<0.4
langchain-core>=0.3.27,<0.4
langchain-community>=0.3,<0.4
langchain-openai>=0.2.12,<0.3
openai>=1.40
faiss-cpu==1.8.*
ujson>=5
pydantic>=2
python-dotenv>=1
flask>=3
gunicorn>=21
azure-storage-blob>=12
requests
azure-storage-queue==12.9.0

View File

@ -1,81 +0,0 @@
#Cli preset parameters
#source .venv/bin/activate
HEC_URL="https://localhost:8088/services/collector/event"
HEC_TOKEN="dev-0123456789abcdef"
INDEX="intesa_payments"
SOURCETYPE="intesa:bonifico"
#Cli script for generating logs
gen_iban(){ local d=""; for _ in $(seq 1 25); do d="${d}$((RANDOM%10))"; done; echo "IT${d}"; }
mask_iban(){ local i="$1"; local pre="${i:0:6}"; local suf="${i: -4}"; local n=$(( ${#i}-10 )); printf "%s%0.s*" "$pre" $(seq 1 $n); echo -n "$suf"; }
rand_amount(){ awk 'BEGIN{srand(); printf "%.2f", 5+rand()*14995}'; }
rand_bool_str(){ if ((RANDOM%2)); then echo "true"; else echo "false"; fi; }
pick(){ local a=("$@"); echo "${a[$RANDOM%${#a[@]}]}"; }
spese=(SHA OUR BEN)
divise=(EUR EUR EUR EUR USD GBP)
statuses=(accepted pending rejected)
for tx in {1..20}; do
txid=$(cat /proc/sys/kernel/random/uuid 2>/dev/null || uuidgen 2>/dev/null || openssl rand -hex 16)
t0=$(date -u +%s); t1=$((t0+1)); t2=$((t1+2))
iso0=$(date -u -d @$t0 +%FT%T.%6NZ)
iso1=$(date -u -d @$t1 +%FT%T.%6NZ)
iso2=$(date -u -d @$t2 +%FT%T.%6NZ)
src=$(gen_iban); dst=$(gen_iban)
srcm=$(mask_iban "$src"); dstm=$(mask_iban "$dst")
amt=$(rand_amount)
dv=$(pick "${divise[@]}")
inst=$(rand_bool_str)
sp=$(pick "${spese[@]}")
final=$(pick "${statuses[@]}")
send() {
local when="$1" iso="$2" step="$3" status="$4"
curl -sk "$HEC_URL" \
-H "Authorization: Splunk $HEC_TOKEN" -H "Content-Type: application/json" \
-d @- <<JSON
{
"time": $when,
"host": "seed.cli",
"source": "cli_for_loop",
"sourcetype": "$SOURCETYPE",
"index": "$INDEX",
"event": {
"event_type": "bonifico",
"transaction_id": "$txid",
"step": "$step",
"iban_origin_masked": "$srcm",
"iban_dest_masked": "$dstm",
"importo": "$amt",
"divisa": "$dv",
"istantaneo": "$inst",
"data_pagamento": "$iso",
"spese_commissioni": "$sp",
"causale": "TEST SEED",
"status": "$status"
}
}
JSON
}
send "$t0" "$iso0" "compila" "in_progress"
send "$t1" "$iso1" "conferma" "in_progress"
send "$t2" "$iso2" "esito" "$final"
done
###FAST
HEC_URL="https://localhost:8088/services/collector/event"
HEC_TOKEN="dev-0123456789abcdef"
INDEX="intesa_payments"
SOURCETYPE="intesa:bonifico"
for i in {1..200}; do
curl -k https://localhost:8088/services/collector/event \
-H "Authorization: Splunk dev-0123456789abcdef" \
-H "Content-Type: application/json" \
-d '{"event":{"event_type":"bonifico","step":"esito","status":"accepted","importo": '"$((RANDOM%5000+50))"',"divisa":"EUR","transaction_id":"TX-'$RANDOM'"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' >/dev/null 2>&1
done

View File

@ -1,260 +0,0 @@
# splunk_poller.py
import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys
import splunklib.client as client
from splunklib.results import JSONResultsReader
try:
from langchain_core.documents import Document
except ImportError:
from langchain.schema import Document
STOP = False
def _handle_stop(signum, frame):
global STOP
STOP = True
signal.signal(signal.SIGINT, _handle_stop)
signal.signal(signal.SIGTERM, _handle_stop)
# ---------- Splunk config ----------
SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost")
SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089"))
SPLUNK_USER = os.getenv("SPLUNK_USER", "admin")
SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9")
SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"}
INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments")
SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico")
INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h")
CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"}
# ---------- Polling / chunking ----------
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000)))
# ---------- Sinks ----------
# Supported: file | blob | blob+queue
SINK = os.getenv("SINK", "file").lower()
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out"))
CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt"))
AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"}
# Azure Blob
AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
# Azure Storage Queue
AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
# Email toggle for messages produced by the poller (default: True)
EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"}
if SINK.startswith("file"):
OUTDIR.mkdir(parents=True, exist_ok=True)
# ---------- Azure clients (lazy) ----------
_blob_service = None
_container_client = None
_queue_client = None
def _init_blob():
global _blob_service, _container_client
if _blob_service:
return
from azure.storage.blob import BlobServiceClient
_blob_service = BlobServiceClient.from_connection_string(AZ_CS)
_container_client = _blob_service.get_container_client(AZ_CONTAINER)
try:
_container_client.create_container()
except Exception:
pass
def _init_queue():
global _queue_client
if _queue_client:
return
from azure.storage.queue import QueueClient
_queue_client = QueueClient.from_connection_string(
conn_str=AZ_CS, queue_name=AZ_QUEUE
)
try:
_queue_client.create_queue() # idempotent
except Exception:
pass
# ---------- Checkpoint helpers ----------
def read_ckpt() -> str | None:
if not CKPT_FILE.exists(): return None
val = CKPT_FILE.read_text().strip()
return val or None
def write_ckpt(val: str) -> None:
CKPT_FILE.write_text(val)
def to_epoch_seconds(v) -> int | None:
if v is None: return None
try:
return int(float(v))
except Exception:
pass
try:
s = str(v).replace("Z", "+00:00")
return int(dt.datetime.fromisoformat(s).timestamp())
except Exception:
return None
# ---------- Splunk helpers ----------
def ensure_index(service, name: str):
# idempotent: create if missing
for idx in service.indexes:
if idx.name == name:
return
service.indexes.create(name)
def build_search(ckpt_epoch: int | None) -> str:
q = f'''
search index={INDEX} sourcetype="{SOURCETYPE}"
| fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status
'''.strip()
if ckpt_epoch is not None:
q += f"\n| where _indextime > {ckpt_epoch}"
q += "\n| sort + _indextime"
return q
def fetch(service, ckpt_epoch: int | None):
job = service.jobs.create(
build_search(ckpt_epoch),
exec_mode="normal",
earliest_time=INITIAL_LOOKBACK,
latest_time="now",
output_mode="json",
)
while not job.is_done():
if STOP: break
time.sleep(0.5)
rr = JSONResultsReader(job.results(output_mode="json"))
rows = [dict(r) for r in rr if isinstance(r, dict)]
job.cancel()
return rows
# ---------- Chunking ----------
def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES):
buf, size = [], 0
for item in items:
b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8")
if size + len(b) > max_bytes and buf:
yield b"".join(buf)
buf, size = [b], len(b)
else:
buf.append(b); size += len(b)
if buf: yield b"".join(buf)
# ---------- Sinks ----------
def write_chunk_file(blob: bytes) -> pathlib.Path:
ts = int(time.time())
name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
name.write_bytes(blob)
return name
def upload_chunk_blob(blob: bytes):
_init_blob()
from azure.storage.blob import ContentSettings
ts = int(time.time())
ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl"
# timezone-aware UTC
now_utc = dt.datetime.now(dt.timezone.utc)
blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}"
data = gzip.compress(blob) if AZURE_COMPRESS else blob
content_settings = ContentSettings(
content_type="application/json",
content_encoding=("gzip" if AZURE_COMPRESS else None)
)
bc = _container_client.get_blob_client(blob_name)
bc.upload_blob(data, overwrite=True, content_settings=content_settings)
return {
"blob_name": blob_name,
"url": bc.url,
"size_bytes": len(data),
"compressed": AZURE_COMPRESS,
}
def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True):
_init_queue()
payload = {
"blob": {"container": container, "blob_name": blob_name},
"email": {"send": bool(send_email)}
}
_queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False))
print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True)
# ---------- Main ----------
def main():
print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})")
service = client.connect(
host=SPLUNK_HOST,
port=SPLUNK_PORT,
scheme="https",
username=SPLUNK_USER,
password=SPLUNK_PW,
verify=SPLUNK_VERIFY_SSL,
)
if CREATE_INDEX_IF_MISSING:
try:
ensure_index(service, INDEX)
print(f"[poller] ensured index exists: {INDEX}")
except Exception as e:
print(f"[poller] warn: ensure_index failed: {e}", flush=True)
ckpt_val = read_ckpt()
ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None
while not STOP:
rows = fetch(service, ckpt_epoch)
if not rows:
print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True)
for _ in range(SLEEP_SECONDS):
if STOP: break
time.sleep(1)
continue
max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0
if max_index_time:
ckpt_epoch = max(ckpt_epoch or 0, max_index_time)
write_ckpt(str(ckpt_epoch))
for _, blob in enumerate(chunks_by_bytes(rows)):
# (Document kept for potential future LC usage)
_ = Document(
page_content=blob.decode("utf-8", errors="ignore"),
metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)},
)
if SINK == "file":
fpath = write_chunk_file(blob)
print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True)
elif SINK == "blob":
if not AZ_CS:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads")
meta = upload_chunk_blob(blob)
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
elif SINK == "blob+queue":
if not AZ_CS:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue")
meta = upload_chunk_blob(blob)
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT)
else:
raise ValueError(f"Unknown SINK={SINK}")
# brief pause
for _ in range(5):
if STOP: break
time.sleep(1)
print("[poller] stopping gracefully")
sys.exit(0)
if __name__ == "__main__":
main()