Compare commits
2 Commits
main
...
feature/de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b89563fe61 | ||
|
|
a35633c96c |
32
.env
Normal file
32
.env
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
# Azure (optional; only needed if SINK=blob or blob+sb)
|
||||||
|
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;EndpointSuffix=core.windows.net"
|
||||||
|
AZURE_STORAGE_CONTAINER=bank-logs
|
||||||
|
AZURE_STORAGE_QUEUE_NAME=log-chunks
|
||||||
|
AZURE_SERVICEBUS_CONNECTION_STRING=
|
||||||
|
AZURE_STORAGE_CONTAINER =bank-logs
|
||||||
|
|
||||||
|
AZURE_OPENAI_ENDPOINT=https://tf-in-dev-clz-core-aif.cognitiveservices.azure.com/
|
||||||
|
AZURE_OPENAI_API_KEY=BwPkZje8Ifr51ob4gYKIj37L0OlvBGoQo4dqeebwdpz72cmhvJ0pJQQJ99BIACgEuAYXJ3w3AAAAACOGOfEc
|
||||||
|
AZURE_OPENAI_API_VERSION=2025-01-01-preview
|
||||||
|
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini
|
||||||
|
|
||||||
|
#ANALYZER_URL=https://tf-in-dev-chatapp-app.azurewebsites.net/analyze
|
||||||
|
|
||||||
|
# Optional embeddings:
|
||||||
|
# AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT=text-embedding-3-small
|
||||||
|
|
||||||
|
# --- Mail (Mailtrap SMTP Sandbox) ---
|
||||||
|
MAIL_ENABLED=true
|
||||||
|
SMTP_HOST=sandbox.smtp.mailtrap.io #smtp.office365.com
|
||||||
|
SMTP_PORT=2525 #587 + MAIL_TLS = true if outlook
|
||||||
|
SMTP_USER=14ea96c3766614
|
||||||
|
SMTP_PASS=48b19e33a6290e
|
||||||
|
MAIL_FROM=alerts@intesa-pipeline.local
|
||||||
|
MAIL_TO=you@company.com
|
||||||
|
|
||||||
|
RG=intesadev-rg
|
||||||
|
SA=tfindevst
|
||||||
|
QUEUE=log-chunks
|
||||||
|
|
||||||
|
# (optional) where your chunks are:
|
||||||
|
CHUNK_DIR="./out"
|
||||||
82
README.md
82
README.md
@ -1,82 +0,0 @@
|
|||||||
# Intesa Logs – Project Documentation
|
|
||||||
|
|
||||||
This repo implements a small, production-style pipeline that inspects bank transfer (“**bonifico**”) logs, looks for anomalies (e.g., **rejected EUR ≥ 10,000**, **`vop_no_match`**, **invalid IBAN/BIC**), and produces a concise report (optionally emailed).
|
|
||||||
|
|
||||||
It runs **locally via Docker** and is designed to be **deployable to Azure** using the same containers.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## High-level flow
|
|
||||||
|
|
||||||
**Splunk (HEC)** → **Poller** → *(Chunks: file or Azure Blob)* → *(Optional: Azure Queue message)* → **Analyzer API** → *(Optional: Email via Mailtrap)*
|
|
||||||
|
|
||||||
- **Local mode:** Poller writes chunk **files** to a shared volume. Analyzer reads those files directly.
|
|
||||||
- **Azure mode (final target):** Poller uploads **blobs** to Storage (`bank-logs`) and enqueues messages to Storage Queue (`log-chunks`). A **Queue Worker** consumes queue messages and calls the Analyzer API.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Current state snapshot (what’s running now)
|
|
||||||
|
|
||||||
### ✅ Running in Azure
|
|
||||||
|
|
||||||
- **App Service (Agent API)**
|
|
||||||
- Name: `tf-in-dev-chatapp-app`
|
|
||||||
- Image: `tfindevacr.azurecr.io/agent-api:prod` (pulled from ACR via Managed Identity)
|
|
||||||
- Public endpoint: `https://tf-in-dev-chatapp-app.azurewebsites.net`
|
|
||||||
- Health: `GET /health` → `{"status":"ok"}`
|
|
||||||
- API: `POST /analyze` (see examples below)
|
|
||||||
|
|
||||||
- **Azure Container Registry (ACR)**
|
|
||||||
- Name: `tfindevacr`
|
|
||||||
- Repos/tags present:
|
|
||||||
- `agent-api:prod` ✅
|
|
||||||
- `queue-worker:prod` ✅ *(built & pushed; not yet deployed)*
|
|
||||||
|
|
||||||
- **Azure Storage (data plane in use)**
|
|
||||||
- Storage account: `tfindevst`
|
|
||||||
- **Blob container:** `bank-logs` (holds `.jsonl` or `.jsonl.gz` chunks)
|
|
||||||
- **Queue:** `log-chunks` (messages the worker consumes)
|
|
||||||
|
|
||||||
> The API is live in Azure. The **worker** and **Splunk** are still local right now.
|
|
||||||
|
|
||||||
### ✅ Running locally (Docker Compose)
|
|
||||||
|
|
||||||
- **Splunk** container (HEC exposed)
|
|
||||||
- **Poller** (`splunk_poller.py`)
|
|
||||||
- You can run it in either:
|
|
||||||
- `SINK=file` → write chunks to local volume (simple local dev), or
|
|
||||||
- `SINK=blob+queue` → upload to Azure Blob + enqueue Azure Queue (production-like)
|
|
||||||
- **Queue Worker** (`worker/`)
|
|
||||||
- Currently running **locally**, reading Azure Storage Queue and calling the Analyzer (either local API or Azure API based on `ANALYZER_URL`).
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Repo structure
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# 1) Create a .env (see sample below)
|
|
||||||
# 2) Make sure compose.yaml has SINK=file (if local) or SINK=blob/blob+queue (if azure) for the poller
|
|
||||||
# 3) Start the stack
|
|
||||||
docker compose up -d
|
|
||||||
|
|
||||||
# 4) Check health
|
|
||||||
curl -sS http://localhost:8080/health
|
|
||||||
|
|
||||||
# 5) Send test events to Splunk HEC
|
|
||||||
for i in {1..5}; do
|
|
||||||
curl -k https://localhost:8088/services/collector/event \
|
|
||||||
-H "Authorization: Splunk dev-0123456789abcdef" \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{"event":{"event_type":"bonifico","step":"esito","status":"accepted","importo": '"$((RANDOM%5000+50))"',"divisa":"EUR","transaction_id":"TX-'$RANDOM'"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' >/dev/null 2>&1
|
|
||||||
done
|
|
||||||
|
|
||||||
# 6) Add a couple of anomalies to exercise the analyzer
|
|
||||||
curl -k https://localhost:8088/services/collector/event \
|
|
||||||
-H "Authorization: Splunk dev-0123456789abcdef" \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{"event":{"event_type":"bonifico","step":"esito","status":"rejected","importo":12500,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT1998*2*4*6*8*10*12*14*16*9375","iban_dest_masked":"IT1171*2*4*6*8*10*12*14*16*0000","bic_swift":"TESTBICX"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}'
|
|
||||||
|
|
||||||
# 7) Ask the Agent API to analyze the latest local chunks
|
|
||||||
curl -sS -X POST http://localhost:8080/analyze \
|
|
||||||
-H 'Content-Type: application/json' \
|
|
||||||
-d '{"question":"Scan latest chunks. Flag rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC.","email":{"send":false}}' | jq .
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
.
|
|
||||||
├─ api/ # Dockerfile for agent-api
|
|
||||||
├─ poller/ # Dockerfile for Splunk poller
|
|
||||||
├─ worker/ # Dockerfile for queue-worker (Azure mode only; not used locally)
|
|
||||||
├─ agent_runner.py # Analyzer orchestration
|
|
||||||
├─ flask_app.py # Flask API: /health, /analyze
|
|
||||||
├─ notify.py # SMTP (Mailtrap) email helper
|
|
||||||
├─ compose.yaml # Docker Compose stack
|
|
||||||
├─ requirements.txt
|
|
||||||
├─ sampleLogs.txt # misc sample content
|
|
||||||
└─ splunk_poller.py # Polls Splunk & writes chunk files
|
|
||||||
@ -2,14 +2,16 @@
|
|||||||
FROM python:3.12-slim
|
FROM python:3.12-slim
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
|
# deps
|
||||||
COPY api/requirements.txt .
|
COPY api/requirements.txt .
|
||||||
RUN python -m pip install --upgrade pip setuptools wheel \
|
RUN python -m pip install --upgrade pip setuptools wheel \
|
||||||
&& pip install --no-cache-dir -r requirements.txt
|
&& pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
# Bring in your app files from repo root
|
# app code (put everything at /app so imports stay "from notify import send_email")
|
||||||
COPY agent_runner.py flask_app.py notify.py .
|
COPY api/agent_runner.py ./agent_runner.py
|
||||||
|
COPY api/flask_app.py ./flask_app.py
|
||||||
|
COPY api/notify.py ./notify.py
|
||||||
|
|
||||||
# The agent loads .env if present; we'll mount it via env_file in compose
|
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
EXPOSE 8080
|
EXPOSE 8080
|
||||||
CMD ["gunicorn", "-w", "2", "-b", "0.0.0.0:8080", "flask_app:app"]
|
CMD ["gunicorn","-w","2","-b","0.0.0.0:8080","flask_app:app"]
|
||||||
|
|||||||
@ -10,47 +10,28 @@ from langchain.tools import Tool
|
|||||||
from langchain.agents import AgentExecutor, create_tool_calling_agent
|
from langchain.agents import AgentExecutor, create_tool_calling_agent
|
||||||
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
|
||||||
|
|
||||||
# ----- load .env (defaults to ./.env; override with ENV_FILE=/path/to/.env) -----
|
# ----- load .env -----
|
||||||
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
||||||
|
|
||||||
# ----- read env (supports both AZURE_* and AOAI_*) -----
|
# ----- normalize endpoint -----
|
||||||
def _norm_endpoint(ep: str | None) -> str:
|
def _norm_endpoint(ep: str | None) -> str:
|
||||||
if not ep: return ""
|
if not ep: return ""
|
||||||
ep = ep.strip().rstrip("/")
|
ep = ep.strip().rstrip("/")
|
||||||
# strip any trailing /openai[/v...]
|
|
||||||
ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep)
|
ep = re.sub(r"/openai(?:/v\d+(?:\.\d+)?(?:-\w+)?)?$", "", ep)
|
||||||
return ep + "/"
|
return ep + "/"
|
||||||
|
|
||||||
AZ_ENDPOINT = _norm_endpoint(
|
AZ_ENDPOINT = _norm_endpoint(os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT"))
|
||||||
os.getenv("AZURE_OPENAI_ENDPOINT") or os.getenv("AOAI_ENDPOINT")
|
AZ_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY")
|
||||||
)
|
AZ_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION") or os.getenv("AOAI_API_VERSION") or "2025-01-01-preview"
|
||||||
AZ_API_KEY = (
|
AZ_CHAT_DEPLOY = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") or os.getenv("AOAI_CHAT_DEPLOYMENT") or "gpt-4o-mini"
|
||||||
os.getenv("AZURE_OPENAI_API_KEY")
|
AZ_EMBED_DEPLOY = os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT") or os.getenv("AOAI_EMBED_DEPLOYMENT") or ""
|
||||||
or os.getenv("AOAI_API_KEY")
|
|
||||||
or os.getenv("OPENAI_API_KEY")
|
|
||||||
)
|
|
||||||
AZ_API_VERSION = (
|
|
||||||
os.getenv("AZURE_OPENAI_API_VERSION")
|
|
||||||
or os.getenv("AOAI_API_VERSION")
|
|
||||||
or "2025-01-01-preview"
|
|
||||||
)
|
|
||||||
AZ_CHAT_DEPLOY = (
|
|
||||||
os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT")
|
|
||||||
or os.getenv("AOAI_CHAT_DEPLOYMENT")
|
|
||||||
or "gpt-4o-mini"
|
|
||||||
)
|
|
||||||
AZ_EMBED_DEPLOY = (
|
|
||||||
os.getenv("AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT")
|
|
||||||
or os.getenv("AOAI_EMBED_DEPLOYMENT")
|
|
||||||
or ""
|
|
||||||
)
|
|
||||||
|
|
||||||
# ----- local data config -----
|
# ----- local data config -----
|
||||||
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out")
|
CHUNK_DIR = os.getenv("CHUNK_DIR", "./out")
|
||||||
BLOB_DIR = os.getenv("BLOB_DIR", "")
|
BLOB_DIR = os.getenv("BLOB_DIR", "")
|
||||||
TOP_K = int(os.getenv("TOP_K", "12"))
|
TOP_K = int(os.getenv("TOP_K", "12"))
|
||||||
|
|
||||||
# ---------- Helpers to build LLM/Embeddings for Azure OpenAI ----------
|
# ---------- LLM and embeddings ----------
|
||||||
def make_llm(temperature: float = 0.2) -> AzureChatOpenAI:
|
def make_llm(temperature: float = 0.2) -> AzureChatOpenAI:
|
||||||
if not AZ_ENDPOINT or not AZ_API_KEY:
|
if not AZ_ENDPOINT or not AZ_API_KEY:
|
||||||
raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).")
|
raise RuntimeError("Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY (or AOAI_* equivalents).")
|
||||||
@ -77,14 +58,19 @@ def _iter_chunk_files() -> List[pathlib.Path]:
|
|||||||
paths: List[pathlib.Path] = []
|
paths: List[pathlib.Path] = []
|
||||||
if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists():
|
if CHUNK_DIR and pathlib.Path(CHUNK_DIR).exists():
|
||||||
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")]
|
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/chunk_*.jsonl*")]
|
||||||
|
paths += [pathlib.Path(p) for p in glob.glob(f"{CHUNK_DIR}/hec_*.jsonl*")]
|
||||||
if BLOB_DIR and pathlib.Path(BLOB_DIR).exists():
|
if BLOB_DIR and pathlib.Path(BLOB_DIR).exists():
|
||||||
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)]
|
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/chunk_*.jsonl*", recursive=True)]
|
||||||
|
paths += [pathlib.Path(p) for p in glob.glob(f"{BLOB_DIR}/**/hec_*.jsonl*", recursive=True)]
|
||||||
return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True)
|
return sorted(paths, key=lambda p: p.stat().st_mtime, reverse=True)
|
||||||
|
|
||||||
def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
|
def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
|
||||||
data = path.read_bytes()
|
data = path.read_bytes()
|
||||||
if path.suffix == ".gz":
|
if path.suffix == ".gz":
|
||||||
data = gzip.decompress(data)
|
try:
|
||||||
|
data = gzip.decompress(data)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
out: List[Dict[str, Any]] = []
|
out: List[Dict[str, Any]] = []
|
||||||
for ln in data.splitlines():
|
for ln in data.splitlines():
|
||||||
if not ln.strip(): continue
|
if not ln.strip(): continue
|
||||||
@ -94,7 +80,6 @@ def _read_jsonl(path: pathlib.Path) -> List[Dict[str, Any]]:
|
|||||||
continue
|
continue
|
||||||
return out
|
return out
|
||||||
|
|
||||||
# Accept either raw events or HEC-shaped {"event": {...}}
|
|
||||||
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
|
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
return rec.get("event", rec)
|
return rec.get("event", rec)
|
||||||
|
|
||||||
@ -105,7 +90,7 @@ def _evt_to_text(evt: Dict[str, Any]) -> str:
|
|||||||
parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None]
|
parts = [f"{k}={evt[k]}" for k in keys if evt.get(k) is not None]
|
||||||
return "bonifico | " + " | ".join(parts)
|
return "bonifico | " + " | ".join(parts)
|
||||||
|
|
||||||
# ---------- Build vector store (only if embeddings deployment exists) ----------
|
# ---------- Vector store ----------
|
||||||
def build_vectorstore(limit_files: int = 20):
|
def build_vectorstore(limit_files: int = 20):
|
||||||
embs = make_embeddings()
|
embs = make_embeddings()
|
||||||
if embs is None:
|
if embs is None:
|
||||||
@ -149,7 +134,6 @@ def stats_tool_impl(query: str = "") -> str:
|
|||||||
for rec in _read_jsonl(fp):
|
for rec in _read_jsonl(fp):
|
||||||
events.append(_normalize_event(rec))
|
events.append(_normalize_event(rec))
|
||||||
|
|
||||||
# parse filters
|
|
||||||
q = query.lower()
|
q = query.lower()
|
||||||
def _kv(key, pat=r"([^\s]+)"):
|
def _kv(key, pat=r"([^\s]+)"):
|
||||||
m = re.search(fr"{key}:{pat}", q)
|
m = re.search(fr"{key}:{pat}", q)
|
||||||
@ -163,6 +147,7 @@ def stats_tool_impl(query: str = "") -> str:
|
|||||||
instant_s = _kv("instant")
|
instant_s = _kv("instant")
|
||||||
min_amt_s = _kv("min_amount")
|
min_amt_s = _kv("min_amount")
|
||||||
min_amt = float(min_amt_s) if min_amt_s else 0.0
|
min_amt = float(min_amt_s) if min_amt_s else 0.0
|
||||||
|
|
||||||
inst_f = None
|
inst_f = None
|
||||||
if instant_s in {"true","false"}:
|
if instant_s in {"true","false"}:
|
||||||
inst_f = (instant_s == "true")
|
inst_f = (instant_s == "true")
|
||||||
@ -185,7 +170,6 @@ def stats_tool_impl(query: str = "") -> str:
|
|||||||
if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f:
|
if inst_f is not None and _boolish(e.get("instantaneo") or e.get("istantaneo")) != inst_f:
|
||||||
return False
|
return False
|
||||||
if country:
|
if country:
|
||||||
# heuristic from IBAN (dest or origin)
|
|
||||||
iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper()
|
iban = (e.get("iban_dest_masked") or e.get("iban_origin_masked") or "").upper()
|
||||||
if not iban.startswith(country.upper()):
|
if not iban.startswith(country.upper()):
|
||||||
return False
|
return False
|
||||||
@ -207,24 +191,20 @@ def stats_tool_impl(query: str = "") -> str:
|
|||||||
def retrieve_tool_impl(question: str) -> str:
|
def retrieve_tool_impl(question: str) -> str:
|
||||||
vs, _ = build_vectorstore()
|
vs, _ = build_vectorstore()
|
||||||
docs = vs.similarity_search(question, k=TOP_K)
|
docs = vs.similarity_search(question, k=TOP_K)
|
||||||
return "\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs))
|
return "\n".join(f\"[{i+1}] {d.page_content}\" for i, d in enumerate(docs))
|
||||||
|
|
||||||
def raw_sample_tool_impl(arg: str = "") -> str:
|
def raw_sample_tool_impl(arg: str = "") -> str:
|
||||||
"""
|
"""
|
||||||
Return a few raw JSON events from the newest chunks.
|
Return a few raw JSON events from the newest chunks.
|
||||||
Accepts the same filters as get_stats PLUS optional 'n:<int>' to control how many.
|
Accepts the same filters as get_stats PLUS optional 'n:<int>'.
|
||||||
Examples:
|
|
||||||
'n:5 status:rejected min_amount:10000'
|
|
||||||
'divisa:EUR instant:true step:esito n:3'
|
|
||||||
"""
|
"""
|
||||||
q = (arg or "").lower()
|
q = (arg or "").lower()
|
||||||
|
|
||||||
# helpers (same parsing as get_stats)
|
|
||||||
def _kv(key, pat=r"([^\s]+)"):
|
def _kv(key, pat=r"([^\s]+)"):
|
||||||
m = re.search(fr"{key}:{pat}", q)
|
m = re.search(fr"{key}:{pat}", q)
|
||||||
return m.group(1) if m else None
|
return m.group(1) if m else None
|
||||||
|
|
||||||
n_s = _kv("n", r"(\d+)")
|
n_s = _kv("n", r"(\\d+)")
|
||||||
n = int(n_s) if n_s else 5
|
n = int(n_s) if n_s else 5
|
||||||
status_f = _kv("status")
|
status_f = _kv("status")
|
||||||
step_f = _kv("step")
|
step_f = _kv("step")
|
||||||
@ -262,7 +242,6 @@ def raw_sample_tool_impl(arg: str = "") -> str:
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# load newest events and filter
|
|
||||||
files = _iter_chunk_files()
|
files = _iter_chunk_files()
|
||||||
out = []
|
out = []
|
||||||
for fp in files:
|
for fp in files:
|
||||||
@ -277,8 +256,7 @@ def raw_sample_tool_impl(arg: str = "") -> str:
|
|||||||
|
|
||||||
if not out:
|
if not out:
|
||||||
return "(no matching events)"
|
return "(no matching events)"
|
||||||
return "\n".join(out)
|
return "\\n".join(out)
|
||||||
|
|
||||||
|
|
||||||
# ---------- Build the agent ----------
|
# ---------- Build the agent ----------
|
||||||
def build_agent():
|
def build_agent():
|
||||||
@ -287,7 +265,7 @@ def build_agent():
|
|||||||
Tool(name="get_stats", func=stats_tool_impl,
|
Tool(name="get_stats", func=stats_tool_impl,
|
||||||
description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."),
|
description="Quick stats over recent events. Example: 'status:rejected min_amount:10000 step:esito'."),
|
||||||
Tool(name="raw_samples", func=raw_sample_tool_impl,
|
Tool(name="raw_samples", func=raw_sample_tool_impl,
|
||||||
description="Return a few raw JSON events. Accepts filters like get_stats and 'n:<int>'. Example: 'n:5 status:rejected min_amount:10000'.")
|
description="Return a few raw JSON events. Accepts filters like get_stats and 'n:<int>'."),
|
||||||
]
|
]
|
||||||
if AZ_EMBED_DEPLOY:
|
if AZ_EMBED_DEPLOY:
|
||||||
tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl,
|
tools.append(Tool(name="retrieve_similar", func=retrieve_tool_impl,
|
||||||
@ -317,15 +295,12 @@ def run_default_question(question_override: str | None = None):
|
|||||||
)
|
)
|
||||||
out = agent.invoke({"input": question, "chat_history": []})
|
out = agent.invoke({"input": question, "chat_history": []})
|
||||||
result = out.get("output", "")
|
result = out.get("output", "")
|
||||||
print("\n=== AGENT OUTPUT ===\n", result)
|
print("\\n=== AGENT OUTPUT ===\\n", result)
|
||||||
|
|
||||||
# Email the result if MAIL_ENABLED=true (handled inside notify.py)
|
|
||||||
try:
|
try:
|
||||||
send_email(subject="[Intesa Logs] Agent Report", body_text=result)
|
send_email(subject="[Intesa Logs] Agent Report", body_text=result)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("[notify] email failed:", e)
|
print("[notify] email failed:", e)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# optional CLI: allow a custom question
|
|
||||||
custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
|
custom = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else None
|
||||||
run_default_question(custom if custom else None)
|
run_default_question(custom if custom else None)
|
||||||
528
api/flask_app.py
528
api/flask_app.py
@ -1,32 +1,24 @@
|
|||||||
# flask_app.py
|
# api/flask_app.py
|
||||||
import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt
|
import os, io, glob, gzip, json, time, uuid, tempfile, pathlib, datetime as dt
|
||||||
from typing import Optional
|
from typing import List, Dict, Any, Tuple
|
||||||
|
from urllib.parse import urlparse
|
||||||
from flask import Flask, request, jsonify
|
from flask import Flask, request, jsonify
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
# Load .env locally (App Service uses App Settings instead)
|
# Load .env locally (App Service uses App Settings instead)
|
||||||
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
||||||
|
|
||||||
# Agent + email
|
# Optional email
|
||||||
from agent_runner import build_agent
|
|
||||||
from notify import send_email
|
from notify import send_email
|
||||||
|
|
||||||
# Azure SDKs (guarded imports so we don't crash at boot)
|
# ---------------- Azure SDKs (guarded) ----------------
|
||||||
try:
|
try:
|
||||||
from azure.storage.blob import BlobServiceClient, ContentSettings
|
from azure.storage.blob import BlobServiceClient, BlobClient
|
||||||
except Exception:
|
except Exception:
|
||||||
BlobServiceClient = None
|
BlobServiceClient = None
|
||||||
ContentSettings = None
|
BlobClient = None
|
||||||
|
|
||||||
try:
|
def _blob_client() -> "BlobServiceClient":
|
||||||
from azure.storage.queue import QueueClient
|
|
||||||
except Exception:
|
|
||||||
QueueClient = None
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
# -------- Helpers --------
|
|
||||||
def _blob_client() -> BlobServiceClient:
|
|
||||||
if not BlobServiceClient:
|
if not BlobServiceClient:
|
||||||
raise RuntimeError("azure-storage-blob not installed")
|
raise RuntimeError("azure-storage-blob not installed")
|
||||||
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
||||||
@ -34,152 +26,448 @@ def _blob_client() -> BlobServiceClient:
|
|||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
||||||
return BlobServiceClient.from_connection_string(cs)
|
return BlobServiceClient.from_connection_string(cs)
|
||||||
|
|
||||||
def _queue_client() -> QueueClient:
|
def _download_blob_bytes(container: str, blob_name: str) -> bytes:
|
||||||
if not QueueClient:
|
|
||||||
raise RuntimeError("azure-storage-queue not installed")
|
|
||||||
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
||||||
if not cs:
|
|
||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
|
||||||
qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
|
|
||||||
qc = QueueClient.from_connection_string(cs, qname)
|
|
||||||
try:
|
|
||||||
qc.create_queue()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return qc
|
|
||||||
|
|
||||||
def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str:
|
|
||||||
svc = _blob_client()
|
|
||||||
cc = svc.get_container_client(container)
|
|
||||||
try:
|
|
||||||
cc.create_container()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
ext = "jsonl.gz" if compressed else "jsonl"
|
|
||||||
# folder scheme matches poller
|
|
||||||
prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}"
|
|
||||||
blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}"
|
|
||||||
data = gzip.compress(raw_bytes) if compressed else raw_bytes
|
|
||||||
settings = ContentSettings(
|
|
||||||
content_type="application/json",
|
|
||||||
content_encoding=("gzip" if compressed else None),
|
|
||||||
)
|
|
||||||
bc = cc.get_blob_client(blob_name)
|
|
||||||
bc.upload_blob(data, overwrite=True, content_settings=settings)
|
|
||||||
return blob_name
|
|
||||||
|
|
||||||
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
|
|
||||||
svc = _blob_client()
|
svc = _blob_client()
|
||||||
blob = svc.get_blob_client(container=container, blob=blob_name)
|
blob = svc.get_blob_client(container=container, blob=blob_name)
|
||||||
data = blob.download_blob().readall()
|
return blob.download_blob().readall()
|
||||||
fname = os.path.basename(blob_name)
|
|
||||||
path = os.path.join(outdir, fname)
|
|
||||||
with open(path, "wb") as f:
|
|
||||||
f.write(data)
|
|
||||||
return path
|
|
||||||
|
|
||||||
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
|
def _download_sas_bytes(sas_url: str) -> bytes:
|
||||||
if not BlobServiceClient:
|
if BlobClient:
|
||||||
# ultra-light fallback
|
|
||||||
import urllib.request
|
|
||||||
data = urllib.request.urlopen(sas_url, timeout=30).read()
|
|
||||||
else:
|
|
||||||
from azure.storage.blob import BlobClient
|
|
||||||
blob = BlobClient.from_blob_url(sas_url)
|
blob = BlobClient.from_blob_url(sas_url)
|
||||||
data = blob.download_blob().readall()
|
return blob.download_blob().readall()
|
||||||
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
|
else:
|
||||||
path = os.path.join(outdir, name)
|
import urllib.request
|
||||||
open(path, "wb").write(data)
|
return urllib.request.urlopen(sas_url, timeout=30).read()
|
||||||
|
|
||||||
|
# ---------------- Agent (guarded) ----------------
|
||||||
|
_CAN_LLM = True
|
||||||
|
try:
|
||||||
|
import agent_runner # we'll temporarily set agent_runner.CHUNK_DIR when a blob is provided
|
||||||
|
build_agent = agent_runner.build_agent
|
||||||
|
except Exception:
|
||||||
|
_CAN_LLM = False
|
||||||
|
agent_runner = None
|
||||||
|
build_agent = None
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
# ---------------- Config ----------------
|
||||||
|
CHUNK_DIR = pathlib.Path(os.getenv("CHUNK_DIR", "/app/out"))
|
||||||
|
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
AUTO_ANALYZE = os.getenv("AUTO_ANALYZE", "true").lower() in {"1","true","yes"}
|
||||||
|
DEFAULT_QUESTION = os.getenv("DEFAULT_QUESTION",
|
||||||
|
"Scan the latest chunks. List anomalies (rejected EUR >= 10000 and vop_no_match). "
|
||||||
|
"Provide brief evidence and next steps."
|
||||||
|
)
|
||||||
|
EMAIL_SEND_BY_DEFAULT = os.getenv("MAIL_ENABLED", "false").lower() in {"1","true","yes"}
|
||||||
|
|
||||||
|
# ---------------- Helpers ----------------
|
||||||
|
def _normalize_event(rec: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Accept raw or HEC-shaped records (`{'event': {...}}`)."""
|
||||||
|
return rec.get("event", rec)
|
||||||
|
|
||||||
|
def _write_chunk_from_lines(jsonl_lines: List[bytes], dest_dir: pathlib.Path) -> pathlib.Path:
|
||||||
|
name = f"chunk_{int(time.time())}_{uuid.uuid4().hex[:8]}.jsonl.gz"
|
||||||
|
path = dest_dir / name
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
|
||||||
|
for ln in jsonl_lines:
|
||||||
|
gz.write(ln if ln.endswith(b"\n") else ln + b"\n")
|
||||||
|
path.write_bytes(buf.getvalue())
|
||||||
return path
|
return path
|
||||||
|
|
||||||
# -------- Routes --------
|
def _bytes_is_gzip(b: bytes) -> bool:
|
||||||
|
return len(b) >= 2 and b[0] == 0x1F and b[1] == 0x8B
|
||||||
|
|
||||||
|
def _decode_utf8(b: bytes) -> str:
|
||||||
|
# be permissive; some exports contain odd characters / BOM
|
||||||
|
return b.decode("utf-8", errors="replace").lstrip("\ufeff")
|
||||||
|
|
||||||
|
def _normalize_bytes_to_jsonl_lines(data: bytes) -> Tuple[List[bytes], int]:
|
||||||
|
"""
|
||||||
|
Accepts: JSONL, JSONL.GZ, JSON array, or single JSON object.
|
||||||
|
Returns: (list of JSONL lines as bytes, number_of_events)
|
||||||
|
Raises: ValueError if format is not recognized.
|
||||||
|
"""
|
||||||
|
# decompress if gz
|
||||||
|
if _bytes_is_gzip(data):
|
||||||
|
try:
|
||||||
|
data = gzip.decompress(data)
|
||||||
|
except Exception:
|
||||||
|
raise ValueError("Invalid gzip stream")
|
||||||
|
|
||||||
|
text = _decode_utf8(data).strip()
|
||||||
|
if not text:
|
||||||
|
raise ValueError("Empty blob")
|
||||||
|
|
||||||
|
# Try JSON first (object or array)
|
||||||
|
try:
|
||||||
|
parsed = json.loads(text)
|
||||||
|
if isinstance(parsed, dict):
|
||||||
|
ev = _normalize_event(parsed)
|
||||||
|
return [json.dumps(ev, separators=(",", ":")).encode("utf-8")], 1
|
||||||
|
if isinstance(parsed, list):
|
||||||
|
lines = []
|
||||||
|
count = 0
|
||||||
|
for item in parsed:
|
||||||
|
if isinstance(item, (dict,)):
|
||||||
|
ev = _normalize_event(item)
|
||||||
|
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
|
||||||
|
count += 1
|
||||||
|
if count == 0:
|
||||||
|
raise ValueError("JSON array has no objects")
|
||||||
|
return lines, count
|
||||||
|
# If JSON but not dict/list, fall through to JSONL attempt
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Try JSONL (one JSON object per line)
|
||||||
|
lines = []
|
||||||
|
count = 0
|
||||||
|
for raw in text.splitlines():
|
||||||
|
s = raw.strip()
|
||||||
|
if not s:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
obj = json.loads(s)
|
||||||
|
ev = _normalize_event(obj)
|
||||||
|
lines.append(json.dumps(ev, separators=(",", ":")).encode("utf-8"))
|
||||||
|
count += 1
|
||||||
|
except Exception:
|
||||||
|
# Not valid JSON per line -> not JSONL
|
||||||
|
raise ValueError("Unrecognized blob format. Expected JSONL, JSONL.GZ, JSON array, or single JSON object.")
|
||||||
|
if count == 0:
|
||||||
|
raise ValueError("No JSON objects found")
|
||||||
|
return lines, count
|
||||||
|
|
||||||
|
def _iter_recent_events(limit_files: int = 10, limit_events: int = 5000) -> List[Dict[str, Any]]:
|
||||||
|
"""Load recent events from newest chunk_*.jsonl(.gz) and hec_*.jsonl(.gz) in CHUNK_DIR."""
|
||||||
|
patterns = ["chunk_*.jsonl*", "hec_*.jsonl*"]
|
||||||
|
files: List[pathlib.Path] = []
|
||||||
|
for pat in patterns:
|
||||||
|
files += [pathlib.Path(p) for p in glob.glob(str(CHUNK_DIR / pat))]
|
||||||
|
files = sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)[:limit_files]
|
||||||
|
|
||||||
|
out: List[Dict[str, Any]] = []
|
||||||
|
for fp in files:
|
||||||
|
data = fp.read_bytes()
|
||||||
|
if fp.suffix == ".gz":
|
||||||
|
data = gzip.decompress(data)
|
||||||
|
for ln in data.splitlines():
|
||||||
|
if not ln.strip():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
rec = json.loads(ln)
|
||||||
|
out.append(_normalize_event(rec))
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if len(out) >= limit_events:
|
||||||
|
return out
|
||||||
|
return out
|
||||||
|
|
||||||
|
def _as_float(x) -> float:
|
||||||
|
try:
|
||||||
|
return float(x)
|
||||||
|
except Exception:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
def _rules_only_report() -> str:
|
||||||
|
"""Simple rules when AOAI creds are not set."""
|
||||||
|
evts = _iter_recent_events(limit_files=20, limit_events=20000)
|
||||||
|
total = len(evts)
|
||||||
|
rej_hi, vop_no = [], []
|
||||||
|
|
||||||
|
for e in evts:
|
||||||
|
status = (e.get("status") or "").lower()
|
||||||
|
divisa = (e.get("divisa") or "").upper()
|
||||||
|
amt = _as_float(e.get("importo"))
|
||||||
|
vop = (e.get("vop_check") or "").lower()
|
||||||
|
if status == "rejected" and divisa == "EUR" and amt >= 10000:
|
||||||
|
rej_hi.append({"transaction_id": e.get("transaction_id"), "importo": amt, "step": e.get("step")})
|
||||||
|
if vop in {"no_match", "vop_no_match"}:
|
||||||
|
vop_no.append({"transaction_id": e.get("transaction_id"), "step": e.get("step")})
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
lines.append("### Findings")
|
||||||
|
lines.append(f"- Total events scanned: {total}")
|
||||||
|
lines.append(f"- High-value rejected (EUR≥10000): {len(rej_hi)}")
|
||||||
|
lines.append(f"- VOP no_match: {len(vop_no)}")
|
||||||
|
if rej_hi[:5]:
|
||||||
|
lines.append(" - Examples (rejected): " + ", ".join(
|
||||||
|
f"{i.get('transaction_id')}({i.get('importo')})" for i in rej_hi[:5] if i.get('transaction_id')
|
||||||
|
))
|
||||||
|
if vop_no[:5]:
|
||||||
|
lines.append(" - Examples (vop_no_match): " + ", ".join(
|
||||||
|
f"{i.get('transaction_id')}" for i in vop_no[:5] if i.get('transaction_id')
|
||||||
|
))
|
||||||
|
lines.append("")
|
||||||
|
lines.append("### Recommended actions")
|
||||||
|
for r in (
|
||||||
|
"Validate VOP mismatches; confirm beneficiary details.",
|
||||||
|
"Investigate rejection reasons for all EUR≥10k transactions.",
|
||||||
|
"Check spike trends by hour/day and counterparties.",
|
||||||
|
):
|
||||||
|
lines.append(f"- {r}")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
def _try_llm_report(question: str) -> str:
|
||||||
|
if not _CAN_LLM or not build_agent:
|
||||||
|
raise RuntimeError("LLM agent not available")
|
||||||
|
agent = build_agent()
|
||||||
|
out = agent.invoke({"input": question, "chat_history": []})
|
||||||
|
return out.get("output", "") or "(no output from agent)"
|
||||||
|
|
||||||
|
def _run_analysis(question: str) -> Dict[str, Any]:
|
||||||
|
t0 = time.time()
|
||||||
|
used = "rules"
|
||||||
|
try:
|
||||||
|
if _CAN_LLM and os.getenv("AZURE_OPENAI_ENDPOINT") and (
|
||||||
|
os.getenv("AZURE_OPENAI_API_KEY") or os.getenv("AOAI_API_KEY") or os.getenv("OPENAI_API_KEY")
|
||||||
|
):
|
||||||
|
txt = _try_llm_report(question)
|
||||||
|
used = "llm"
|
||||||
|
else:
|
||||||
|
txt = _rules_only_report()
|
||||||
|
except Exception as e:
|
||||||
|
txt = _rules_only_report()
|
||||||
|
used = f"rules (fallback from llm error: {str(e)[:120]})"
|
||||||
|
return {"ok": True, "analyzer": used, "duration_sec": round(time.time() - t0, 3), "result": txt}
|
||||||
|
|
||||||
|
# ---------------- Routes ----------------
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
def health():
|
def health():
|
||||||
return {"status": "ok"}, 200
|
return {"status": "ok", "auto_analyze": AUTO_ANALYZE, "chunk_dir": str(CHUNK_DIR)}, 200
|
||||||
|
|
||||||
|
@app.post("/ingest")
|
||||||
|
def ingest():
|
||||||
|
"""
|
||||||
|
POST JSON:
|
||||||
|
{ "events": [ {..}, {..} ] } OR a single object {..}
|
||||||
|
Writes a chunk and (optionally) auto-analyzes.
|
||||||
|
"""
|
||||||
|
body = request.get_json(force=True, silent=True)
|
||||||
|
if body is None:
|
||||||
|
return jsonify({"ok": False, "error": "invalid JSON"}), 400
|
||||||
|
|
||||||
|
# normalize to list (support HEC-style {"event": {...}})
|
||||||
|
events: List[Dict[str, Any]] = []
|
||||||
|
if isinstance(body, dict) and "events" in body and isinstance(body["events"], list):
|
||||||
|
events = [_normalize_event(e) for e in body["events"] if isinstance(e, dict)]
|
||||||
|
elif isinstance(body, list):
|
||||||
|
events = [_normalize_event(e) for e in body if isinstance(e, dict)]
|
||||||
|
elif isinstance(body, dict):
|
||||||
|
events = [_normalize_event(body)]
|
||||||
|
else:
|
||||||
|
return jsonify({"ok": False, "error": "payload must be an object or list"}), 400
|
||||||
|
|
||||||
|
if not events:
|
||||||
|
return jsonify({"ok": False, "error": "no events to ingest"}), 400
|
||||||
|
|
||||||
|
lines = [json.dumps(e, separators=(",", ":")).encode("utf-8") for e in events]
|
||||||
|
path = _write_chunk_from_lines(lines, CHUNK_DIR)
|
||||||
|
|
||||||
|
report = None
|
||||||
|
if AUTO_ANALYZE:
|
||||||
|
report = _run_analysis(DEFAULT_QUESTION)
|
||||||
|
if EMAIL_SEND_BY_DEFAULT:
|
||||||
|
try:
|
||||||
|
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"])
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
"ok": True,
|
||||||
|
"written": len(events),
|
||||||
|
"chunk_file": path.name,
|
||||||
|
"auto_analyzed": bool(AUTO_ANALYZE),
|
||||||
|
"report": report,
|
||||||
|
}), 200
|
||||||
|
|
||||||
|
@app.post("/ingest_blob")
|
||||||
|
def ingest_blob():
|
||||||
|
"""
|
||||||
|
Accept a blob, normalize it to chunk_*.jsonl.gz in CHUNK_DIR, optionally analyze.
|
||||||
|
|
||||||
|
Body:
|
||||||
|
{
|
||||||
|
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
|
||||||
|
// OR
|
||||||
|
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
|
||||||
|
"analyze": true,
|
||||||
|
"email": {"send": true, "to": "x@x"}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
payload = request.get_json(force=True, silent=True) or {}
|
||||||
|
analyze = bool(payload.get("analyze"))
|
||||||
|
data: bytes = b""
|
||||||
|
|
||||||
|
try:
|
||||||
|
if payload.get("sas_url"):
|
||||||
|
data = _download_sas_bytes(payload["sas_url"])
|
||||||
|
elif payload.get("container") and payload.get("blob_name"):
|
||||||
|
data = _download_blob_bytes(payload["container"], payload["blob_name"])
|
||||||
|
else:
|
||||||
|
return jsonify({"ok": False, "error": "Provide 'sas_url' OR ('container' + 'blob_name')"}), 400
|
||||||
|
|
||||||
|
lines, count = _normalize_bytes_to_jsonl_lines(data)
|
||||||
|
path = _write_chunk_from_lines(lines, CHUNK_DIR)
|
||||||
|
|
||||||
|
report = None
|
||||||
|
if analyze:
|
||||||
|
report = _run_analysis(DEFAULT_QUESTION)
|
||||||
|
email_cfg = payload.get("email") or {}
|
||||||
|
if email_cfg.get("send"):
|
||||||
|
try:
|
||||||
|
send_email(subject="[Intesa Logs] Auto Analysis", body_text=report["result"], to_addr=email_cfg.get("to"))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return jsonify({"ok": True, "written": count, "chunk_file": path.name, "report": report}), 200
|
||||||
|
|
||||||
|
except ValueError as ve:
|
||||||
|
return jsonify({"ok": False, "error": str(ve)}), 400
|
||||||
|
except Exception as e:
|
||||||
|
return jsonify({"ok": False, "error": str(e)}), 500
|
||||||
|
|
||||||
@app.post("/analyze")
|
@app.post("/analyze")
|
||||||
def analyze():
|
def analyze():
|
||||||
"""
|
"""
|
||||||
POST JSON:
|
Manually trigger analysis over the newest chunks, OR over a specific blob.
|
||||||
|
|
||||||
|
Body options:
|
||||||
{
|
{
|
||||||
"question": "...optional custom question...",
|
"question": "...",
|
||||||
"email": {"send": true, "to": "override@example.com"},
|
"email": {"send": true, "to": "x@x"},
|
||||||
"blob": {
|
"blob": {
|
||||||
"container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]"
|
"container": "bank-logs", "blob_name": "intesa/.../file.json[.gz]"
|
||||||
// OR
|
// OR
|
||||||
"sas_url": "https://.../chunk.jsonl.gz?sig=..."
|
"sas_url": "https://.../file.jsonl[.gz]?sig=..."
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
t0 = time.time()
|
global CHUNK_DIR # declare BEFORE any use in this function
|
||||||
payload = request.get_json(force=True, silent=True) or {}
|
payload = request.get_json(force=True, silent=True) or {}
|
||||||
question = payload.get("question") or (
|
question = payload.get("question") or DEFAULT_QUESTION
|
||||||
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
|
|
||||||
"Give a brief summary and next steps."
|
|
||||||
)
|
|
||||||
|
|
||||||
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
|
|
||||||
tmp_dir = None
|
tmp_dir = None
|
||||||
|
prev_env_chunk = os.getenv("CHUNK_DIR")
|
||||||
|
prev_agent_chunk = (getattr(agent_runner, "CHUNK_DIR", str(CHUNK_DIR))
|
||||||
|
if agent_runner else str(CHUNK_DIR))
|
||||||
|
prev_local_chunk = CHUNK_DIR
|
||||||
|
|
||||||
try:
|
try:
|
||||||
blob_req = payload.get("blob")
|
blob_req = payload.get("blob")
|
||||||
if blob_req:
|
if blob_req:
|
||||||
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
|
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
|
||||||
|
# download to bytes
|
||||||
if blob_req.get("sas_url"):
|
if blob_req.get("sas_url"):
|
||||||
_download_sas_to_dir(blob_req["sas_url"], tmp_dir)
|
data = _download_sas_bytes(blob_req["sas_url"])
|
||||||
elif blob_req.get("container") and blob_req.get("blob_name"):
|
elif blob_req.get("container") and blob_req.get("blob_name"):
|
||||||
_download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir)
|
data = _download_blob_bytes(blob_req["container"], blob_req["blob_name"])
|
||||||
else:
|
else:
|
||||||
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
|
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
|
||||||
|
|
||||||
|
# normalize to a real chunk_*.jsonl.gz in tmp_dir
|
||||||
|
lines, _ = _normalize_bytes_to_jsonl_lines(data)
|
||||||
|
_write_chunk_from_lines(lines, pathlib.Path(tmp_dir))
|
||||||
|
|
||||||
|
# re-point CHUNK_DIR for this request and agent_runner
|
||||||
|
CHUNK_DIR = pathlib.Path(tmp_dir)
|
||||||
os.environ["CHUNK_DIR"] = tmp_dir
|
os.environ["CHUNK_DIR"] = tmp_dir
|
||||||
|
if agent_runner:
|
||||||
|
agent_runner.CHUNK_DIR = tmp_dir
|
||||||
|
if hasattr(agent_runner, "BLOB_DIR"):
|
||||||
|
agent_runner.BLOB_DIR = ""
|
||||||
|
|
||||||
agent = build_agent()
|
# run analysis
|
||||||
out = agent.invoke({"input": question, "chat_history": []})
|
res = _run_analysis(question)
|
||||||
result = out.get("output", "")
|
|
||||||
|
|
||||||
|
# optional email
|
||||||
email_cfg = payload.get("email") or {}
|
email_cfg = payload.get("email") or {}
|
||||||
if email_cfg.get("send"):
|
if email_cfg.get("send"):
|
||||||
to_addr = email_cfg.get("to")
|
try:
|
||||||
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
|
send_email(subject="[Intesa Logs] Report", body_text=res["result"], to_addr=email_cfg.get("to"))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200
|
return jsonify(res), 200
|
||||||
|
|
||||||
|
except ValueError as ve:
|
||||||
|
return jsonify({"ok": False, "error": str(ve)}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return jsonify({"ok": False, "error": str(e)}), 500
|
return jsonify({"ok": False, "error": str(e)}), 500
|
||||||
finally:
|
finally:
|
||||||
os.environ["CHUNK_DIR"] = prev_chunk_dir
|
# restore CHUNK_DIR context
|
||||||
|
if prev_env_chunk is None:
|
||||||
# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC)
|
os.environ.pop("CHUNK_DIR", None)
|
||||||
@app.post("/collect")
|
|
||||||
@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility
|
|
||||||
def collect_hec():
|
|
||||||
try:
|
|
||||||
container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
|
||||||
# Accept either single JSON object or a list; we will write one line per event
|
|
||||||
body = request.get_json(force=True, silent=True)
|
|
||||||
if body is None:
|
|
||||||
return jsonify({"ok": False, "error": "invalid JSON"}), 400
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
if isinstance(body, list):
|
|
||||||
for item in body:
|
|
||||||
lines.append(json.dumps(item, separators=(",", ":")))
|
|
||||||
else:
|
else:
|
||||||
lines.append(json.dumps(body, separators=(",", ":")))
|
os.environ["CHUNK_DIR"] = prev_env_chunk
|
||||||
raw = ("\n".join(lines) + "\n").encode("utf-8")
|
if agent_runner:
|
||||||
|
agent_runner.CHUNK_DIR = prev_agent_chunk
|
||||||
|
CHUNK_DIR = pathlib.Path(prev_local_chunk)
|
||||||
|
CHUNK_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
blob_name = _upload_chunk_blob(container, raw, compressed=True)
|
# ---------------- Event Grid webhook ----------------
|
||||||
|
@app.post("/eventgrid")
|
||||||
|
def eventgrid():
|
||||||
|
"""
|
||||||
|
Webhook for Azure Event Grid (Storage -> BlobCreated).
|
||||||
|
- Handles SubscriptionValidation by echoing validationResponse.
|
||||||
|
- For each BlobCreated event, extracts container/blob and ingests it.
|
||||||
|
- Optional auto-analyze + email is controlled by:
|
||||||
|
EVENTGRID_ANALYZE_DEFAULT (true/false) [default: true]
|
||||||
|
EVENTGRID_EMAIL_DEFAULT (true/false) [default: MAIL_ENABLED]
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
events = request.get_json(force=True, silent=True)
|
||||||
|
if not events:
|
||||||
|
return jsonify({"ok": False, "error": "No events"}), 400
|
||||||
|
if isinstance(events, dict):
|
||||||
|
events = [events]
|
||||||
|
|
||||||
# Enqueue a message your queue-worker understands
|
analyze_default = os.getenv("EVENTGRID_ANALYZE_DEFAULT", "true").lower() in {"1","true","yes"}
|
||||||
msg = {
|
email_default = os.getenv("EVENTGRID_EMAIL_DEFAULT", os.getenv("MAIL_ENABLED", "false")).lower() in {"1","true","yes"}
|
||||||
"blob": {"container": container, "blob_name": blob_name},
|
target_container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
||||||
# flip to true if you want emails by default
|
|
||||||
"email": {"send": False}
|
|
||||||
}
|
|
||||||
|
|
||||||
qc = _queue_client()
|
for ev in events:
|
||||||
qc.send_message(json.dumps(msg, separators=(",", ":")))
|
et = ev.get("eventType")
|
||||||
|
if et == "Microsoft.EventGrid.SubscriptionValidationEvent":
|
||||||
|
validation_code = ev.get("data", {}).get("validationCode")
|
||||||
|
return jsonify({"validationResponse": validation_code})
|
||||||
|
|
||||||
return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200
|
if et in ("Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobRenamed"):
|
||||||
|
url = (ev.get("data", {}) or {}).get("url")
|
||||||
|
if not url:
|
||||||
|
continue
|
||||||
|
u = urlparse(url)
|
||||||
|
# path like: /container/dir1/dir2/file.json
|
||||||
|
parts = [p for p in u.path.split("/") if p]
|
||||||
|
if len(parts) < 2:
|
||||||
|
continue
|
||||||
|
container = parts[0]
|
||||||
|
blob_name = "/".join(parts[1:])
|
||||||
|
|
||||||
|
# Only act on the configured container
|
||||||
|
if container != target_container:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Download the blob, normalize, write chunk, analyze/email if enabled
|
||||||
|
data = _download_blob_bytes(container, blob_name)
|
||||||
|
lines, _ = _normalize_bytes_to_jsonl_lines(data)
|
||||||
|
_write_chunk_from_lines(lines, CHUNK_DIR)
|
||||||
|
|
||||||
|
if analyze_default:
|
||||||
|
rep = _run_analysis(DEFAULT_QUESTION)
|
||||||
|
if email_default:
|
||||||
|
try:
|
||||||
|
send_email(subject="[Intesa Logs] Auto Analysis", body_text=rep["result"])
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return jsonify({"ok": True}), 200
|
||||||
|
|
||||||
|
except ValueError as ve:
|
||||||
|
return jsonify({"ok": False, "error": str(ve)}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return jsonify({"ok": False, "error": str(e)}), 500
|
return jsonify({"ok": False, "error": str(e)}), 500
|
||||||
|
|||||||
67
api/notify.py
Normal file
67
api/notify.py
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
# api/notify.py
|
||||||
|
import os, smtplib
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
from email.utils import formataddr
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load .env before reading anything
|
||||||
|
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
||||||
|
|
||||||
|
_TRUES = {"1", "true", "yes", "on"}
|
||||||
|
|
||||||
|
def _b(s: str | None, default=False) -> bool:
|
||||||
|
if s is None:
|
||||||
|
return default
|
||||||
|
return str(s).strip().lower() in _TRUES
|
||||||
|
|
||||||
|
def _first(*names: str, default: str | None = None) -> str | None:
|
||||||
|
for n in names:
|
||||||
|
v = os.getenv(n)
|
||||||
|
if v is not None and str(v).strip() != "":
|
||||||
|
return v
|
||||||
|
return default
|
||||||
|
|
||||||
|
def _cfg():
|
||||||
|
"""Merge MAIL_* and SMTP_* envs; MAIL_* wins if both present."""
|
||||||
|
enabled = _b(_first("MAIL_ENABLED"), default=False)
|
||||||
|
host = _first("MAIL_HOST", "SMTP_HOST")
|
||||||
|
port = int(_first("MAIL_PORT", "SMTP_PORT", default="0") or "0")
|
||||||
|
user = _first("MAIL_USER", "SMTP_USER")
|
||||||
|
pwd = _first("MAIL_PASSWORD", "SMTP_PASS")
|
||||||
|
mail_from = _first("MAIL_FROM", default=user)
|
||||||
|
mail_to = _first("MAIL_TO_DEFAULT", "MAIL_TO")
|
||||||
|
tls = _b(_first("MAIL_TLS"), default=False)
|
||||||
|
return {
|
||||||
|
"enabled": enabled,
|
||||||
|
"host": host, "port": port, "user": user, "pwd": pwd,
|
||||||
|
"from": mail_from, "to": mail_to, "tls": tls
|
||||||
|
}
|
||||||
|
|
||||||
|
def send_email(subject: str, body_text: str, to_addr: str | None = None):
|
||||||
|
cfg = _cfg()
|
||||||
|
if not cfg["enabled"]:
|
||||||
|
print("[notify] MAIL_ENABLED is not true; skipping email")
|
||||||
|
return
|
||||||
|
|
||||||
|
host, port, user, pwd = cfg["host"], cfg["port"], cfg["user"], cfg["pwd"]
|
||||||
|
mail_from = cfg["from"]
|
||||||
|
mail_to = to_addr or cfg["to"]
|
||||||
|
|
||||||
|
if not (host and port and user and pwd and mail_to):
|
||||||
|
print("[notify] missing SMTP config; skipping email")
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = MIMEText(body_text, "plain", "utf-8")
|
||||||
|
msg["Subject"] = subject
|
||||||
|
msg["From"] = formataddr(("Intesa Logs Agent", mail_from))
|
||||||
|
msg["To"] = mail_to
|
||||||
|
|
||||||
|
with smtplib.SMTP(host, port, timeout=20) as s:
|
||||||
|
if cfg["tls"]:
|
||||||
|
try:
|
||||||
|
s.starttls()
|
||||||
|
except smtplib.SMTPException:
|
||||||
|
pass
|
||||||
|
s.login(user, pwd)
|
||||||
|
s.send_message(msg)
|
||||||
|
print(f"[notify] sent email to {mail_to}")
|
||||||
@ -3,7 +3,6 @@ langchain-core>=0.3.27,<0.4
|
|||||||
langchain-community>=0.3,<0.4
|
langchain-community>=0.3,<0.4
|
||||||
langchain-openai>=0.2.12,<0.3
|
langchain-openai>=0.2.12,<0.3
|
||||||
openai>=1.40
|
openai>=1.40
|
||||||
faiss-cpu==1.8.*
|
|
||||||
ujson>=5
|
ujson>=5
|
||||||
pydantic>=2
|
pydantic>=2
|
||||||
python-dotenv>=1
|
python-dotenv>=1
|
||||||
|
|||||||
2
blob-func/.funcignore
Normal file
2
blob-func/.funcignore
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
|
||||||
|
venv
|
||||||
98
blob-func/function_app.py
Normal file
98
blob-func/function_app.py
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
import azure.functions as func
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import requests
|
||||||
|
from typing import List, Any
|
||||||
|
|
||||||
|
app = func.FunctionApp()
|
||||||
|
|
||||||
|
# Simple health check
|
||||||
|
@app.route(route="ping", auth_level=func.AuthLevel.ANONYMOUS)
|
||||||
|
def Ping(req: func.HttpRequest) -> func.HttpResponse:
|
||||||
|
return func.HttpResponse("pong", status_code=200)
|
||||||
|
|
||||||
|
|
||||||
|
@app.blob_trigger(
|
||||||
|
arg_name="blob",
|
||||||
|
path="bank-logs/intesa/{name}.json", # container/path pattern
|
||||||
|
connection="BLOB_CONN" # App Setting name (NOT the raw connection string)
|
||||||
|
)
|
||||||
|
def BlobIngest(blob: func.InputStream):
|
||||||
|
"""
|
||||||
|
Reads JSON, NDJSON, or JSON array files from Blob Storage.
|
||||||
|
Handles UTF-8 BOM safely. Posts parsed items to AGENT_API_URL/ingest.
|
||||||
|
"""
|
||||||
|
logging.info("BlobIngest fired: name=%s length=%s", blob.name, blob.length)
|
||||||
|
|
||||||
|
# Read whole blob and decode using utf-8-sig to strip a BOM if present
|
||||||
|
raw: bytes = blob.read()
|
||||||
|
# strict = raise if decoding is invalid; change to "ignore" only if you truly want to skip bad bytes
|
||||||
|
text: str = raw.decode("utf-8-sig", errors="strict")
|
||||||
|
|
||||||
|
items: List[Any] = []
|
||||||
|
|
||||||
|
# Try whole-document JSON first (object or array). If that fails, fall back to NDJSON.
|
||||||
|
stripped = text.lstrip("\ufeff").strip() # extra safety if BOM sneaks through
|
||||||
|
try:
|
||||||
|
if stripped:
|
||||||
|
first = stripped[0]
|
||||||
|
if first in "[{":
|
||||||
|
parsed = json.loads(stripped)
|
||||||
|
items = parsed if isinstance(parsed, list) else [parsed]
|
||||||
|
else:
|
||||||
|
# Not an array or object at top level -> treat as NDJSON
|
||||||
|
raise ValueError("Top-level not array/object; using NDJSON mode.")
|
||||||
|
else:
|
||||||
|
logging.warning("Blob %s is empty.", blob.name)
|
||||||
|
items = []
|
||||||
|
except Exception:
|
||||||
|
# NDJSON fallback (one JSON object per line)
|
||||||
|
ndjson_items: List[Any] = []
|
||||||
|
for i, line in enumerate(text.splitlines(), start=1):
|
||||||
|
s = line.lstrip("\ufeff").strip() # strip BOM at line-start + trim whitespace
|
||||||
|
if not s:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
ndjson_items.append(json.loads(s))
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logging.error("Invalid JSON at line %d in %s: %s | line=%r",
|
||||||
|
i, blob.name, e, s[:200])
|
||||||
|
# Re-raise to allow retry/poison for truly bad content
|
||||||
|
raise
|
||||||
|
items = ndjson_items
|
||||||
|
|
||||||
|
logging.info("Parsed %d item(s) from blob '%s'.", len(items), blob.name)
|
||||||
|
|
||||||
|
# If no agent URL configured, just log and exit gracefully
|
||||||
|
api = os.environ.get("AGENT_API_URL")
|
||||||
|
if not api:
|
||||||
|
logging.warning("AGENT_API_URL not set; skipping POST. (Parsed %d items).", len(items))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Prepare POST
|
||||||
|
url = api.rstrip('/') + "/ingest"
|
||||||
|
batch_size = int(os.environ.get("INGEST_BATCH_SIZE", "500"))
|
||||||
|
timeout = float(os.environ.get("HTTP_TIMEOUT_SECONDS", "25"))
|
||||||
|
|
||||||
|
def chunks(seq: List[Any], n: int):
|
||||||
|
for i in range(0, len(seq), n):
|
||||||
|
yield seq[i:i + n]
|
||||||
|
|
||||||
|
# Send in batches to avoid oversized payloads
|
||||||
|
sent = 0
|
||||||
|
for batch in chunks(items, batch_size):
|
||||||
|
try:
|
||||||
|
r = requests.post(url, json=batch, timeout=timeout)
|
||||||
|
logging.info("POST %s [%d items] -> %d", url, len(batch), r.status_code)
|
||||||
|
if r.status_code >= 400:
|
||||||
|
logging.error("Agent API error %d: %s", r.status_code, r.text[:500])
|
||||||
|
# Raise to trigger retry/poison if desired
|
||||||
|
raise RuntimeError(f"Agent API returned {r.status_code}")
|
||||||
|
sent += len(batch)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("HTTP call to Agent API failed after sending %d/%d items.", sent, len(items))
|
||||||
|
# Re-raise so the runtime can retry and dead-letter if needed
|
||||||
|
raise
|
||||||
|
|
||||||
|
logging.info("Completed BlobIngest for %s: posted %d item(s).", blob.name, sent)
|
||||||
10
blob-func/host.json
Normal file
10
blob-func/host.json
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
{
|
||||||
|
"version": "2.0",
|
||||||
|
"extensionBundle": {
|
||||||
|
"id": "Microsoft.Azure.Functions.ExtensionBundle",
|
||||||
|
"version": "[4.*, 5.0.0)"
|
||||||
|
},
|
||||||
|
"logging": {
|
||||||
|
"applicationInsights": { "samplingSettings": { "isEnabled": true, "excludedTypes": "Request" } }
|
||||||
|
}
|
||||||
|
}
|
||||||
12
blob-func/local.settings.json
Normal file
12
blob-func/local.settings.json
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
{
|
||||||
|
"IsEncrypted": false,
|
||||||
|
"Values": {
|
||||||
|
"FUNCTIONS_WORKER_RUNTIME": "python",
|
||||||
|
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;BlobEndpoint=https://tfindevst.blob.core.windows.net/;FileEndpoint=https://tfindevst.file.core.windows.net/;QueueEndpoint=https://tfindevst.queue.core.windows.net/;TableEndpoint=https://tfindevst.table.core.windows.net/",
|
||||||
|
"BLOB_CONN": "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;BlobEndpoint=https://tfindevst.blob.core.windows.net/;FileEndpoint=https://tfindevst.file.core.windows.net/;QueueEndpoint=https://tfindevst.queue.core.windows.net/;TableEndpoint=https://tfindevst.table.core.windows.net/",
|
||||||
|
"AGENT_API_URL": "https://agent-api-app.azurewebsites.net",
|
||||||
|
"AzureWebJobsSecretStorageType": "Files"
|
||||||
|
},
|
||||||
|
"ConnectionStrings": {},
|
||||||
|
"Host": {}
|
||||||
|
}
|
||||||
2
blob-func/requirements.txt
Normal file
2
blob-func/requirements.txt
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
azure-functions==1.20.0
|
||||||
|
requests==2.32.3
|
||||||
24
blob-ingestor/BlobIngest/__init__.py
Normal file
24
blob-ingestor/BlobIngest/__init__.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import os, json, logging, datetime as dt
|
||||||
|
import azure.functions as func
|
||||||
|
from azure.storage.blob import BlobServiceClient
|
||||||
|
|
||||||
|
# Source storage (same one your trigger points to)
|
||||||
|
BLOB_CONN = os.environ["BLOB_CONN"] # set as App Setting
|
||||||
|
CONTAINER = "bank-logs" # container your trigger uses
|
||||||
|
|
||||||
|
def main(blob: func.InputStream):
|
||||||
|
# 1) Log to App Insights (if configured)
|
||||||
|
logging.info("BlobIngest fired: name=%s len=%d", blob.name, blob.length)
|
||||||
|
|
||||||
|
# 2) Write a marker blob so we can *see* it fired from Storage itself
|
||||||
|
svc = BlobServiceClient.from_connection_string(BLOB_CONN)
|
||||||
|
marker_name = f"intesa/_diag/processed-{dt.datetime.utcnow().strftime('%Y%m%dT%H%M%S')}.txt"
|
||||||
|
payload = {
|
||||||
|
"source": blob.name,
|
||||||
|
"length": blob.length,
|
||||||
|
"seen_at_utc": dt.datetime.utcnow().isoformat() + "Z"
|
||||||
|
}
|
||||||
|
svc.get_blob_client(CONTAINER, marker_name).upload_blob(
|
||||||
|
json.dumps(payload).encode("utf-8"),
|
||||||
|
overwrite=True
|
||||||
|
)
|
||||||
51
blob-ingestor/BlobIngest/__init__prod.py
Normal file
51
blob-ingestor/BlobIngest/__init__prod.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
#THIS IS WHAT IS USED IN PROD, CURRENT MAIN INIT IS FOR TESTING ONLY
|
||||||
|
import os, json, gzip, logging, requests
|
||||||
|
import azure.functions as func
|
||||||
|
|
||||||
|
AGENT_API_URL = os.getenv("AGENT_API_URL") # e.g. https://agent-api-app.azurewebsites.net
|
||||||
|
|
||||||
|
def _to_events(data: bytes):
|
||||||
|
# gunzip if needed
|
||||||
|
if len(data) >= 2 and data[0] == 0x1F and data[1] == 0x8B:
|
||||||
|
data = gzip.decompress(data)
|
||||||
|
text = data.decode("utf-8", errors="replace").lstrip("\ufeff").strip()
|
||||||
|
if not text:
|
||||||
|
return []
|
||||||
|
# try JSON (object or array)
|
||||||
|
try:
|
||||||
|
parsed = json.loads(text)
|
||||||
|
if isinstance(parsed, dict):
|
||||||
|
return [parsed]
|
||||||
|
if isinstance(parsed, list):
|
||||||
|
return [x for x in parsed if isinstance(x, dict)]
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# try JSONL
|
||||||
|
out = []
|
||||||
|
for ln in text.splitlines():
|
||||||
|
s = ln.strip()
|
||||||
|
if not s:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
out.append(json.loads(s))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return out
|
||||||
|
|
||||||
|
def main(inputblob: func.InputStream):
|
||||||
|
if not AGENT_API_URL:
|
||||||
|
logging.warning("AGENT_API_URL not set; skipping.")
|
||||||
|
return
|
||||||
|
|
||||||
|
events = _to_events(inputblob.read())
|
||||||
|
if not events:
|
||||||
|
logging.info("No JSON events parsed from blob %s", inputblob.name)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
url = AGENT_API_URL.rstrip("/") + "/ingest"
|
||||||
|
r = requests.post(url, json={"events": events}, timeout=30)
|
||||||
|
logging.info("Agent ingest -> %s %s", r.status_code, r.text[:300])
|
||||||
|
r.raise_for_status()
|
||||||
|
except Exception as e:
|
||||||
|
logging.exception("Ingest failed: %s", e)
|
||||||
11
blob-ingestor/BlobIngest/function.json
Normal file
11
blob-ingestor/BlobIngest/function.json
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"bindings": [
|
||||||
|
{
|
||||||
|
"type": "blobTrigger",
|
||||||
|
"direction": "in",
|
||||||
|
"name": "blob",
|
||||||
|
"path": "bank-logs/intesa/test-exact.json",
|
||||||
|
"connection": "BLOB_CONN"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
4
blob-ingestor/Ping/__init__.py
Normal file
4
blob-ingestor/Ping/__init__.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
import azure.functions as func
|
||||||
|
|
||||||
|
def main(req: func.HttpRequest) -> func.HttpResponse:
|
||||||
|
return func.HttpResponse("pong", status_code=200)
|
||||||
17
blob-ingestor/Ping/function.json
Normal file
17
blob-ingestor/Ping/function.json
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
{
|
||||||
|
"bindings": [
|
||||||
|
{
|
||||||
|
"type": "httpTrigger",
|
||||||
|
"direction": "in",
|
||||||
|
"authLevel": "anonymous",
|
||||||
|
"name": "req",
|
||||||
|
"methods": [ "get", "post" ],
|
||||||
|
"route": "ping"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "http",
|
||||||
|
"direction": "out",
|
||||||
|
"name": "$return"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
8
blob-ingestor/host.json
Normal file
8
blob-ingestor/host.json
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
{
|
||||||
|
"version": "2.0",
|
||||||
|
"extensionBundle": {
|
||||||
|
"id": "Microsoft.Azure.Functions.ExtensionBundle",
|
||||||
|
"version": "[4.*, 5.0.0)"
|
||||||
|
},
|
||||||
|
"logging": { "applicationInsights": { "samplingSettings": { "isEnabled": true } } }
|
||||||
|
}
|
||||||
2
blob-ingestor/requirements.txt
Normal file
2
blob-ingestor/requirements.txt
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
azure-functions>=1.20.0
|
||||||
|
requests>=2.31.0
|
||||||
99
compose.yaml
99
compose.yaml
@ -1,103 +1,36 @@
|
|||||||
services:
|
services:
|
||||||
splunk:
|
|
||||||
image: splunk/splunk:9.4.2
|
|
||||||
container_name: splunk
|
|
||||||
restart: unless-stopped
|
|
||||||
ports:
|
|
||||||
- "8000:8000" # Splunk Web
|
|
||||||
- "8088:8088" # HEC
|
|
||||||
- "8089:8089" # Management API
|
|
||||||
environment:
|
|
||||||
SPLUNK_START_ARGS: --accept-license
|
|
||||||
SPLUNK_PASSWORD: ${SPLUNK_PASSWORD:-Str0ngP@ss!9}
|
|
||||||
SPLUNK_HEC_TOKEN: ${SPLUNK_HEC_TOKEN:-dev-0123456789abcdef}
|
|
||||||
volumes:
|
|
||||||
- splunk-etc:/opt/splunk/etc
|
|
||||||
- splunk-var:/opt/splunk/var
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD-SHELL", "curl -sk https://localhost:8089/services/server/info | grep -q version"]
|
|
||||||
interval: 10s
|
|
||||||
timeout: 5s
|
|
||||||
retries: 30
|
|
||||||
|
|
||||||
poller:
|
|
||||||
build:
|
|
||||||
context: .
|
|
||||||
dockerfile: poller/Dockerfile
|
|
||||||
container_name: splunk-poller
|
|
||||||
restart: unless-stopped
|
|
||||||
depends_on:
|
|
||||||
splunk:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
|
||||||
# --- Splunk connection (to containerized Splunk) ---
|
|
||||||
SPLUNK_HOST: splunk
|
|
||||||
SPLUNK_PORT: "8089"
|
|
||||||
SPLUNK_USER: admin
|
|
||||||
SPLUNK_PW: ${SPLUNK_PASSWORD:-Str0ngP@ss!9}
|
|
||||||
SPLUNK_VERIFY_SSL: "false"
|
|
||||||
# --- What to read ---
|
|
||||||
SPLUNK_INDEX: intesa_payments
|
|
||||||
SPLUNK_SOURCETYPE: intesa:bonifico
|
|
||||||
INITIAL_LOOKBACK: -24h@h
|
|
||||||
CREATE_INDEX_IF_MISSING: "true"
|
|
||||||
# --- Polling / chunking ---
|
|
||||||
SLEEP_SECONDS: "60"
|
|
||||||
MAX_CHUNK_BYTES: "1800000"
|
|
||||||
# --- Sink selection: file (local) | blob (azure) | blob+queue (azure) ---
|
|
||||||
SINK: blob+queue
|
|
||||||
OUTDIR: /app/out
|
|
||||||
# --- Azure Storage (Blob + Queue) ---
|
|
||||||
AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-}
|
|
||||||
AZURE_STORAGE_CONTAINER: ${AZURE_STORAGE_CONTAINER:-bank-logs}
|
|
||||||
AZURE_STORAGE_QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks}
|
|
||||||
AZURE_COMPRESS: "true"
|
|
||||||
# --- Email default for enqueued messages ---
|
|
||||||
POLLER_EMAIL_SEND_DEFAULT: "true"
|
|
||||||
volumes:
|
|
||||||
- chunks:/app/out
|
|
||||||
|
|
||||||
agent-api:
|
agent-api:
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: api/Dockerfile
|
dockerfile: api/Dockerfile
|
||||||
container_name: agent-api
|
container_name: agent-api
|
||||||
restart: unless-stopped
|
|
||||||
depends_on:
|
|
||||||
- poller
|
|
||||||
ports:
|
ports:
|
||||||
- "8080:8080"
|
- "8080:8080"
|
||||||
env_file:
|
env_file:
|
||||||
- .env # AOAI + Mailtrap, etc.
|
- .env # pulls your AZURE_OPENAI_*, SMTP_*, MAIL_*, etc.
|
||||||
environment:
|
environment:
|
||||||
CHUNK_DIR: /app/out
|
CHUNK_DIR: /app/out # where the agent reads chunk files
|
||||||
TOP_K: "12"
|
|
||||||
# If the API should read blobs directly, ensure these also exist in .env:
|
|
||||||
# AZURE_STORAGE_CONNECTION_STRING=...
|
|
||||||
# AZURE_STORAGE_CONTAINER=bank-logs
|
|
||||||
volumes:
|
volumes:
|
||||||
- chunks:/app/out
|
- chunks:/app/out
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
queue-worker:
|
file-poller:
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: worker/Dockerfile
|
dockerfile: poller/Dockerfile
|
||||||
container_name: queue-worker
|
container_name: file-poller
|
||||||
restart: unless-stopped
|
environment:
|
||||||
|
INDIR: /app/in # folder the poller watches
|
||||||
|
OUTDIR: /app/out # folder it writes chunk_*.jsonl
|
||||||
|
SLEEP_SECONDS: 60 # scan every minute
|
||||||
|
EMAIL_SEND_DEFAULT: "true" # tell agent-api to email results
|
||||||
|
AGENT_API_URL: http://agent-api:8080
|
||||||
|
volumes:
|
||||||
|
- ./data:/app/in:rw # drop your .jsonl files here (on Windows)
|
||||||
|
- chunks:/app/out
|
||||||
depends_on:
|
depends_on:
|
||||||
- agent-api
|
- agent-api
|
||||||
env_file:
|
restart: unless-stopped
|
||||||
- .env # to pick up AZURE_STORAGE_CONNECTION_STRING if you keep it here
|
|
||||||
environment:
|
|
||||||
AZURE_STORAGE_CONNECTION_STRING: ${AZURE_STORAGE_CONNECTION_STRING:-}
|
|
||||||
QUEUE_NAME: ${AZURE_STORAGE_QUEUE_NAME:-log-chunks}
|
|
||||||
ANALYZER_URL: http://agent-api:8080/analyze # inside compose network
|
|
||||||
POLL_INTERVAL_SEC: "60"
|
|
||||||
MAX_DEQUEUE: "1"
|
|
||||||
VISIBILITY_TIMEOUT: "120"
|
|
||||||
HTTP_TIMEOUT: "120"
|
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
splunk-etc:
|
|
||||||
splunk-var:
|
|
||||||
chunks:
|
chunks:
|
||||||
|
|||||||
1
data/_done/batch1.jsonl
Normal file
1
data/_done/batch1.jsonl
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"T1","step":"esito","status":"rejected","importo":12000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12A1234512345123451234512"}
|
||||||
1
data/_done/batch2.jsonl
Normal file
1
data/_done/batch2.jsonl
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"T2","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match"}
|
||||||
BIN
data/_out/chunk_1759477104_538344ce.jsonl.gz
Normal file
BIN
data/_out/chunk_1759477104_538344ce.jsonl.gz
Normal file
Binary file not shown.
1
data/local/_sanity.jsonl
Normal file
1
data/local/_sanity.jsonl
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"T900","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12ZZ000000000000000000000","bic_swift":"TESTGB2L","instantaneo":true}
|
||||||
2
data/local/sampl3.jsonl
Normal file
2
data/local/sampl3.jsonl
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"W1","step":"esito","status":"accepted","importo":9500,"divisa":"EUR","vop_check":"match","data_pagamento":"2025-09-26T12:01:00Z"}
|
||||||
|
{"event_type":"bonifico","transaction_id":"W2","step":"esito","status":"rejected","importo":150000,"divisa":"EUR","vop_check":"no_match","data_pagamento":"2025-09-26T12:05:00Z"}
|
||||||
2
data/local/sampl33.json
Normal file
2
data/local/sampl33.json
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"W1","step":"esito","status":"accepted","importo":9500,"divisa":"EUR","vop_check":"match","data_pagamento":"2025-09-26T12:01:00Z"}
|
||||||
|
{"event_type":"bonifico","transaction_id":"W2","step":"esito","status":"rejected","importo":150000,"divisa":"EUR","vop_check":"no_match","data_pagamento":"2025-09-26T12:05:00Z"}
|
||||||
2
data/local/sampl_anom.jsonl
Normal file
2
data/local/sampl_anom.jsonl
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"T100","step":"esito","status":"rejected","importo":12000,"divisa":"EUR","vop_check":"no_match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"IT12ZZ000000000000000000000","bic_swift":"TESTGB2L","instantaneo":true}
|
||||||
|
{"event_type":"bonifico","transaction_id":"T101","step":"esito","status":"accepted","importo":4000,"divisa":"EUR","vop_check":"match","iban_origin_masked":"IT60X0542811101000000123456","iban_dest_masked":"DE89370400440532013000","bic_swift":"DEUTDEFF","instantaneo":false}
|
||||||
1
export.json
Normal file
1
export.json
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"event_type":"bonifico","transaction_id":"T777","step":"esito","status":"rejected","importo":15000,"divisa":"EUR","vop_check":"no_match","instantaneo":true}
|
||||||
185
flask_app.py
185
flask_app.py
@ -1,185 +0,0 @@
|
|||||||
# flask_app.py
|
|
||||||
import os, tempfile, time, gzip, json, pathlib, uuid, datetime as dt
|
|
||||||
from typing import Optional
|
|
||||||
from flask import Flask, request, jsonify
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
# Load .env locally (App Service uses App Settings instead)
|
|
||||||
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
|
||||||
|
|
||||||
# Agent + email
|
|
||||||
from agent_runner import build_agent
|
|
||||||
from notify import send_email
|
|
||||||
|
|
||||||
# Azure SDKs (guarded imports so we don't crash at boot)
|
|
||||||
try:
|
|
||||||
from azure.storage.blob import BlobServiceClient, ContentSettings
|
|
||||||
except Exception:
|
|
||||||
BlobServiceClient = None
|
|
||||||
ContentSettings = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
from azure.storage.queue import QueueClient
|
|
||||||
except Exception:
|
|
||||||
QueueClient = None
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
# -------- Helpers --------
|
|
||||||
def _blob_client() -> BlobServiceClient:
|
|
||||||
if not BlobServiceClient:
|
|
||||||
raise RuntimeError("azure-storage-blob not installed")
|
|
||||||
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
||||||
if not cs:
|
|
||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
|
||||||
return BlobServiceClient.from_connection_string(cs)
|
|
||||||
|
|
||||||
def _queue_client() -> QueueClient:
|
|
||||||
if not QueueClient:
|
|
||||||
raise RuntimeError("azure-storage-queue not installed")
|
|
||||||
cs = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
||||||
if not cs:
|
|
||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING not set")
|
|
||||||
qname = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
|
|
||||||
qc = QueueClient.from_connection_string(cs, qname)
|
|
||||||
try:
|
|
||||||
qc.create_queue()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return qc
|
|
||||||
|
|
||||||
def _upload_chunk_blob(container: str, raw_bytes: bytes, compressed: bool = True) -> str:
|
|
||||||
svc = _blob_client()
|
|
||||||
cc = svc.get_container_client(container)
|
|
||||||
try:
|
|
||||||
cc.create_container()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
ext = "jsonl.gz" if compressed else "jsonl"
|
|
||||||
# folder scheme matches poller
|
|
||||||
prefix = f"intesa/{dt.datetime.now(dt.timezone.utc).strftime('%Y/%m/%d/%H')}"
|
|
||||||
blob_name = f"{prefix}/hec_{uuid.uuid4().hex[:8]}.{ext}"
|
|
||||||
data = gzip.compress(raw_bytes) if compressed else raw_bytes
|
|
||||||
settings = ContentSettings(
|
|
||||||
content_type="application/json",
|
|
||||||
content_encoding=("gzip" if compressed else None),
|
|
||||||
)
|
|
||||||
bc = cc.get_blob_client(blob_name)
|
|
||||||
bc.upload_blob(data, overwrite=True, content_settings=settings)
|
|
||||||
return blob_name
|
|
||||||
|
|
||||||
def _download_blob_to_dir(container: str, blob_name: str, outdir: str) -> str:
|
|
||||||
svc = _blob_client()
|
|
||||||
blob = svc.get_blob_client(container=container, blob=blob_name)
|
|
||||||
data = blob.download_blob().readall()
|
|
||||||
fname = os.path.basename(blob_name)
|
|
||||||
path = os.path.join(outdir, fname)
|
|
||||||
with open(path, "wb") as f:
|
|
||||||
f.write(data)
|
|
||||||
return path
|
|
||||||
|
|
||||||
def _download_sas_to_dir(sas_url: str, outdir: str) -> str:
|
|
||||||
if not BlobServiceClient:
|
|
||||||
# ultra-light fallback
|
|
||||||
import urllib.request
|
|
||||||
data = urllib.request.urlopen(sas_url, timeout=30).read()
|
|
||||||
else:
|
|
||||||
from azure.storage.blob import BlobClient
|
|
||||||
blob = BlobClient.from_blob_url(sas_url)
|
|
||||||
data = blob.download_blob().readall()
|
|
||||||
name = "chunk_from_sas.jsonl.gz" if sas_url.endswith(".gz") else "chunk_from_sas.jsonl"
|
|
||||||
path = os.path.join(outdir, name)
|
|
||||||
open(path, "wb").write(data)
|
|
||||||
return path
|
|
||||||
|
|
||||||
# -------- Routes --------
|
|
||||||
@app.get("/health")
|
|
||||||
def health():
|
|
||||||
return {"status": "ok"}, 200
|
|
||||||
|
|
||||||
@app.post("/analyze")
|
|
||||||
def analyze():
|
|
||||||
"""
|
|
||||||
POST JSON:
|
|
||||||
{
|
|
||||||
"question": "...optional custom question...",
|
|
||||||
"email": {"send": true, "to": "override@example.com"},
|
|
||||||
"blob": {
|
|
||||||
"container": "bank-logs", "blob_name": "intesa/2025/09/26/..chunk.jsonl[.gz]"
|
|
||||||
// OR
|
|
||||||
"sas_url": "https://.../chunk.jsonl.gz?sig=..."
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
t0 = time.time()
|
|
||||||
payload = request.get_json(force=True, silent=True) or {}
|
|
||||||
question = payload.get("question") or (
|
|
||||||
"Scan the latest chunks. List any anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
|
|
||||||
"Give a brief summary and next steps."
|
|
||||||
)
|
|
||||||
|
|
||||||
prev_chunk_dir = os.getenv("CHUNK_DIR", "./out")
|
|
||||||
tmp_dir = None
|
|
||||||
try:
|
|
||||||
blob_req = payload.get("blob")
|
|
||||||
if blob_req:
|
|
||||||
tmp_dir = tempfile.mkdtemp(prefix="agent_blob_")
|
|
||||||
if blob_req.get("sas_url"):
|
|
||||||
_download_sas_to_dir(blob_req["sas_url"], tmp_dir)
|
|
||||||
elif blob_req.get("container") and blob_req.get("blob_name"):
|
|
||||||
_download_blob_to_dir(blob_req["container"], blob_req["blob_name"], tmp_dir)
|
|
||||||
else:
|
|
||||||
return jsonify({"ok": False, "error": "blob requires sas_url OR (container + blob_name)"}), 400
|
|
||||||
os.environ["CHUNK_DIR"] = tmp_dir
|
|
||||||
|
|
||||||
agent = build_agent()
|
|
||||||
out = agent.invoke({"input": question, "chat_history": []})
|
|
||||||
result = out.get("output", "")
|
|
||||||
|
|
||||||
email_cfg = payload.get("email") or {}
|
|
||||||
if email_cfg.get("send"):
|
|
||||||
to_addr = email_cfg.get("to")
|
|
||||||
send_email(subject="[Intesa Logs] Agent Report", body_text=result, to_addr=to_addr)
|
|
||||||
|
|
||||||
return jsonify({"ok": True, "duration_sec": round(time.time() - t0, 3), "result": result}), 200
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return jsonify({"ok": False, "error": str(e)}), 500
|
|
||||||
finally:
|
|
||||||
os.environ["CHUNK_DIR"] = prev_chunk_dir
|
|
||||||
|
|
||||||
# HEC-style collector -> write one-line JSONL blob to Storage, enqueue message for worker, return 200 OK (like Splunk HEC)
|
|
||||||
@app.post("/collect")
|
|
||||||
@app.post("/services/collector/event") # alias for Splunk HEC curl compatibility
|
|
||||||
def collect_hec():
|
|
||||||
try:
|
|
||||||
container = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
|
||||||
# Accept either single JSON object or a list; we will write one line per event
|
|
||||||
body = request.get_json(force=True, silent=True)
|
|
||||||
if body is None:
|
|
||||||
return jsonify({"ok": False, "error": "invalid JSON"}), 400
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
if isinstance(body, list):
|
|
||||||
for item in body:
|
|
||||||
lines.append(json.dumps(item, separators=(",", ":")))
|
|
||||||
else:
|
|
||||||
lines.append(json.dumps(body, separators=(",", ":")))
|
|
||||||
raw = ("\n".join(lines) + "\n").encode("utf-8")
|
|
||||||
|
|
||||||
blob_name = _upload_chunk_blob(container, raw, compressed=True)
|
|
||||||
|
|
||||||
# Enqueue a message your queue-worker understands
|
|
||||||
msg = {
|
|
||||||
"blob": {"container": container, "blob_name": blob_name},
|
|
||||||
# flip to true if you want emails by default
|
|
||||||
"email": {"send": False}
|
|
||||||
}
|
|
||||||
|
|
||||||
qc = _queue_client()
|
|
||||||
qc.send_message(json.dumps(msg, separators=(",", ":")))
|
|
||||||
|
|
||||||
return jsonify({"ok": True, "queued": True, "blob_name": blob_name}), 200
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return jsonify({"ok": False, "error": str(e)}), 500
|
|
||||||
38
notify.py
38
notify.py
@ -1,38 +0,0 @@
|
|||||||
# notify.py
|
|
||||||
import os, smtplib
|
|
||||||
from email.mime.text import MIMEText
|
|
||||||
from email.utils import formataddr
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
# load .env automatically (ENV_FILE can override path)
|
|
||||||
load_dotenv(os.getenv("ENV_FILE", ".env"))
|
|
||||||
|
|
||||||
def send_email(subject: str, body_text: str, to_addr: str | None = None):
|
|
||||||
if os.getenv("MAIL_ENABLED", "false").lower() != "true":
|
|
||||||
print("[notify] MAIL_ENABLED != true; skipping email")
|
|
||||||
return
|
|
||||||
|
|
||||||
smtp_host = os.getenv("SMTP_HOST")
|
|
||||||
smtp_port = int(os.getenv("SMTP_PORT", "587"))
|
|
||||||
smtp_user = os.getenv("SMTP_USER")
|
|
||||||
smtp_pass = os.getenv("SMTP_PASS")
|
|
||||||
mail_from = os.getenv("MAIL_FROM") or smtp_user
|
|
||||||
mail_to = to_addr or os.getenv("MAIL_TO")
|
|
||||||
|
|
||||||
if not (smtp_host and smtp_user and smtp_pass and mail_to):
|
|
||||||
print("[notify] missing SMTP config; skipping email")
|
|
||||||
return
|
|
||||||
|
|
||||||
msg = MIMEText(body_text, "plain", "utf-8")
|
|
||||||
msg["Subject"] = subject
|
|
||||||
msg["From"] = formataddr(("Intesa Logs Agent", mail_from))
|
|
||||||
msg["To"] = mail_to
|
|
||||||
|
|
||||||
with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as s:
|
|
||||||
try:
|
|
||||||
s.starttls()
|
|
||||||
except smtplib.SMTPException:
|
|
||||||
pass
|
|
||||||
s.login(smtp_user, smtp_pass)
|
|
||||||
s.send_message(msg)
|
|
||||||
print(f"[notify] sent email to {mail_to}")
|
|
||||||
@ -1,18 +1,21 @@
|
|||||||
# poller/Dockerfile
|
# poller/Dockerfile (local file poller, no Splunk)
|
||||||
FROM python:3.12-slim
|
FROM python:3.12-slim
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
# Helpful system deps
|
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl \
|
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
COPY poller/requirements.txt .
|
# minimal deps
|
||||||
|
COPY poller/requirements.txt ./requirements.txt
|
||||||
RUN python -m pip install --upgrade pip setuptools wheel \
|
RUN python -m pip install --upgrade pip setuptools wheel \
|
||||||
&& pip install --no-cache-dir -r requirements.txt
|
&& pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
# Copy the poller script from repo root
|
COPY poller/file_poller.py .
|
||||||
COPY splunk_poller.py .
|
|
||||||
|
|
||||||
# default to root to avoid permission issues on named volumes
|
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
CMD ["python", "-u", "splunk_poller.py"]
|
# default inputs folder (mounted by compose) and local API endpoint
|
||||||
|
ENV IN_DIR=/app/in
|
||||||
|
ENV TARGET_URL=http://agent-api:8080/ingest
|
||||||
|
ENV BATCH_MAX=1000
|
||||||
|
ENV SLEEP_SEC=2.0
|
||||||
|
CMD ["python", "-u", "file_poller.py"]
|
||||||
|
|||||||
97
poller/file_poller.py
Normal file
97
poller/file_poller.py
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
# poller/file_poller.py
|
||||||
|
import os, time, json, uuid, shutil, logging, pathlib, requests
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
|
||||||
|
|
||||||
|
INDIR = pathlib.Path(os.getenv("INDIR", "/app/in"))
|
||||||
|
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "/app/out")) # shared with agent-api
|
||||||
|
DONE = INDIR / "_done"
|
||||||
|
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
|
||||||
|
|
||||||
|
# Where the agent API lives (container name in compose or localhost outside)
|
||||||
|
API_BASE = os.getenv("AGENT_API_URL", "http://agent-api:8080").rstrip("/")
|
||||||
|
|
||||||
|
# Make the poller ask for emails automatically
|
||||||
|
EMAIL_SEND_DEFAULT = os.getenv("EMAIL_SEND_DEFAULT", "false").lower() in {"1", "true", "yes"}
|
||||||
|
|
||||||
|
QUESTION = os.getenv(
|
||||||
|
"QUESTION",
|
||||||
|
"Scan the latest chunks. List anomalies (rejected EUR >= 10000, vop_no_match, invalid IBAN/BIC). "
|
||||||
|
"Give a brief summary and next steps."
|
||||||
|
)
|
||||||
|
|
||||||
|
def ensure_dirs():
|
||||||
|
INDIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
DONE.mkdir(parents=True, exist_ok=True)
|
||||||
|
OUTDIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def _copy_to_out(src: pathlib.Path) -> pathlib.Path:
|
||||||
|
ts = int(time.time())
|
||||||
|
dst = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
|
||||||
|
shutil.copy2(src, dst)
|
||||||
|
return dst
|
||||||
|
|
||||||
|
def _call_analyze(email_send: bool) -> bool:
|
||||||
|
"""Tell the agent to analyze latest chunks (it reads CHUNK_DIR=/app/out)."""
|
||||||
|
url = f"{API_BASE}/analyze"
|
||||||
|
payload = {
|
||||||
|
"question": QUESTION,
|
||||||
|
"email": {"send": bool(email_send)}
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
r = requests.post(url, json=payload, timeout=180)
|
||||||
|
if r.status_code // 100 == 2:
|
||||||
|
logging.info("analyze OK: %s", r.text[:400])
|
||||||
|
return True
|
||||||
|
logging.error("analyze HTTP %s: %s", r.status_code, r.text[:400])
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("analyze failed: %s", e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _process_file(f: pathlib.Path) -> bool:
|
||||||
|
"""
|
||||||
|
Copy the file into OUTDIR as a chunk, call /analyze, move file to _done.
|
||||||
|
Return True if fully handled (so we move to _done), False to retry later.
|
||||||
|
"""
|
||||||
|
logging.info("processing %s", f.name)
|
||||||
|
try:
|
||||||
|
# 1) copy to /app/out where agent looks for chunk_*.jsonl
|
||||||
|
dst = _copy_to_out(f)
|
||||||
|
logging.info("placed chunk %s", dst.name)
|
||||||
|
|
||||||
|
# 2) trigger analysis (and email if enabled)
|
||||||
|
ok = _call_analyze(EMAIL_SEND_DEFAULT)
|
||||||
|
if not ok:
|
||||||
|
logging.info("will retry %s later", f.name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 3) move original to _done
|
||||||
|
shutil.move(str(f), str(DONE / f.name))
|
||||||
|
logging.info("done %s", f.name)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("error processing %s: %s", f.name, e)
|
||||||
|
logging.info("will retry %s later", f.name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def list_inputs() -> List[pathlib.Path]:
|
||||||
|
return sorted([p for p in INDIR.glob("*.jsonl") if p.is_file()])
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ensure_dirs()
|
||||||
|
logging.info("file-poller watching %s -> %s target=analyze email_default=%s",
|
||||||
|
INDIR, f"{API_BASE}/analyze", EMAIL_SEND_DEFAULT)
|
||||||
|
while True:
|
||||||
|
files = list_inputs()
|
||||||
|
if not files:
|
||||||
|
time.sleep(SLEEP_SECONDS)
|
||||||
|
continue
|
||||||
|
for f in files:
|
||||||
|
_process_file(f)
|
||||||
|
# brief backoff between batches
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -1,6 +1 @@
|
|||||||
splunk-sdk==2.0.2
|
|
||||||
langchain-core==0.2.*
|
|
||||||
azure-storage-blob>=12.19.0
|
|
||||||
azure-storage-queue>=12.9.0
|
|
||||||
ujson
|
|
||||||
requests
|
requests
|
||||||
|
|||||||
@ -1,14 +0,0 @@
|
|||||||
langchain>=0.3,<0.4
|
|
||||||
langchain-core>=0.3.27,<0.4
|
|
||||||
langchain-community>=0.3,<0.4
|
|
||||||
langchain-openai>=0.2.12,<0.3
|
|
||||||
openai>=1.40
|
|
||||||
faiss-cpu==1.8.*
|
|
||||||
ujson>=5
|
|
||||||
pydantic>=2
|
|
||||||
python-dotenv>=1
|
|
||||||
flask>=3
|
|
||||||
gunicorn>=21
|
|
||||||
azure-storage-blob>=12
|
|
||||||
requests
|
|
||||||
azure-storage-queue==12.9.0
|
|
||||||
@ -1,81 +0,0 @@
|
|||||||
#Cli preset parameters
|
|
||||||
#source .venv/bin/activate
|
|
||||||
HEC_URL="https://localhost:8088/services/collector/event"
|
|
||||||
HEC_TOKEN="dev-0123456789abcdef"
|
|
||||||
INDEX="intesa_payments"
|
|
||||||
SOURCETYPE="intesa:bonifico"
|
|
||||||
|
|
||||||
#Cli script for generating logs
|
|
||||||
gen_iban(){ local d=""; for _ in $(seq 1 25); do d="${d}$((RANDOM%10))"; done; echo "IT${d}"; }
|
|
||||||
mask_iban(){ local i="$1"; local pre="${i:0:6}"; local suf="${i: -4}"; local n=$(( ${#i}-10 )); printf "%s%0.s*" "$pre" $(seq 1 $n); echo -n "$suf"; }
|
|
||||||
rand_amount(){ awk 'BEGIN{srand(); printf "%.2f", 5+rand()*14995}'; }
|
|
||||||
rand_bool_str(){ if ((RANDOM%2)); then echo "true"; else echo "false"; fi; }
|
|
||||||
pick(){ local a=("$@"); echo "${a[$RANDOM%${#a[@]}]}"; }
|
|
||||||
|
|
||||||
spese=(SHA OUR BEN)
|
|
||||||
divise=(EUR EUR EUR EUR USD GBP)
|
|
||||||
statuses=(accepted pending rejected)
|
|
||||||
|
|
||||||
for tx in {1..20}; do
|
|
||||||
txid=$(cat /proc/sys/kernel/random/uuid 2>/dev/null || uuidgen 2>/dev/null || openssl rand -hex 16)
|
|
||||||
t0=$(date -u +%s); t1=$((t0+1)); t2=$((t1+2))
|
|
||||||
iso0=$(date -u -d @$t0 +%FT%T.%6NZ)
|
|
||||||
iso1=$(date -u -d @$t1 +%FT%T.%6NZ)
|
|
||||||
iso2=$(date -u -d @$t2 +%FT%T.%6NZ)
|
|
||||||
|
|
||||||
src=$(gen_iban); dst=$(gen_iban)
|
|
||||||
srcm=$(mask_iban "$src"); dstm=$(mask_iban "$dst")
|
|
||||||
amt=$(rand_amount)
|
|
||||||
dv=$(pick "${divise[@]}")
|
|
||||||
inst=$(rand_bool_str)
|
|
||||||
sp=$(pick "${spese[@]}")
|
|
||||||
final=$(pick "${statuses[@]}")
|
|
||||||
|
|
||||||
send() {
|
|
||||||
local when="$1" iso="$2" step="$3" status="$4"
|
|
||||||
curl -sk "$HEC_URL" \
|
|
||||||
-H "Authorization: Splunk $HEC_TOKEN" -H "Content-Type: application/json" \
|
|
||||||
-d @- <<JSON
|
|
||||||
{
|
|
||||||
"time": $when,
|
|
||||||
"host": "seed.cli",
|
|
||||||
"source": "cli_for_loop",
|
|
||||||
"sourcetype": "$SOURCETYPE",
|
|
||||||
"index": "$INDEX",
|
|
||||||
"event": {
|
|
||||||
"event_type": "bonifico",
|
|
||||||
"transaction_id": "$txid",
|
|
||||||
"step": "$step",
|
|
||||||
"iban_origin_masked": "$srcm",
|
|
||||||
"iban_dest_masked": "$dstm",
|
|
||||||
"importo": "$amt",
|
|
||||||
"divisa": "$dv",
|
|
||||||
"istantaneo": "$inst",
|
|
||||||
"data_pagamento": "$iso",
|
|
||||||
"spese_commissioni": "$sp",
|
|
||||||
"causale": "TEST SEED",
|
|
||||||
"status": "$status"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
JSON
|
|
||||||
}
|
|
||||||
|
|
||||||
send "$t0" "$iso0" "compila" "in_progress"
|
|
||||||
send "$t1" "$iso1" "conferma" "in_progress"
|
|
||||||
send "$t2" "$iso2" "esito" "$final"
|
|
||||||
done
|
|
||||||
|
|
||||||
|
|
||||||
###FAST
|
|
||||||
HEC_URL="https://localhost:8088/services/collector/event"
|
|
||||||
HEC_TOKEN="dev-0123456789abcdef"
|
|
||||||
INDEX="intesa_payments"
|
|
||||||
SOURCETYPE="intesa:bonifico"
|
|
||||||
|
|
||||||
for i in {1..200}; do
|
|
||||||
curl -k https://localhost:8088/services/collector/event \
|
|
||||||
-H "Authorization: Splunk dev-0123456789abcdef" \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{"event":{"event_type":"bonifico","step":"esito","status":"accepted","importo": '"$((RANDOM%5000+50))"',"divisa":"EUR","transaction_id":"TX-'$RANDOM'"},"sourcetype":"intesa:bonifico","index":"intesa_payments"}' >/dev/null 2>&1
|
|
||||||
done
|
|
||||||
|
|
||||||
260
splunk_poller.py
260
splunk_poller.py
@ -1,260 +0,0 @@
|
|||||||
# splunk_poller.py
|
|
||||||
import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys
|
|
||||||
import splunklib.client as client
|
|
||||||
from splunklib.results import JSONResultsReader
|
|
||||||
|
|
||||||
try:
|
|
||||||
from langchain_core.documents import Document
|
|
||||||
except ImportError:
|
|
||||||
from langchain.schema import Document
|
|
||||||
|
|
||||||
STOP = False
|
|
||||||
def _handle_stop(signum, frame):
|
|
||||||
global STOP
|
|
||||||
STOP = True
|
|
||||||
signal.signal(signal.SIGINT, _handle_stop)
|
|
||||||
signal.signal(signal.SIGTERM, _handle_stop)
|
|
||||||
|
|
||||||
# ---------- Splunk config ----------
|
|
||||||
SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost")
|
|
||||||
SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089"))
|
|
||||||
SPLUNK_USER = os.getenv("SPLUNK_USER", "admin")
|
|
||||||
SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9")
|
|
||||||
SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"}
|
|
||||||
INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments")
|
|
||||||
SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico")
|
|
||||||
INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h")
|
|
||||||
CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"}
|
|
||||||
|
|
||||||
# ---------- Polling / chunking ----------
|
|
||||||
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
|
|
||||||
MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000)))
|
|
||||||
|
|
||||||
# ---------- Sinks ----------
|
|
||||||
# Supported: file | blob | blob+queue
|
|
||||||
SINK = os.getenv("SINK", "file").lower()
|
|
||||||
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out"))
|
|
||||||
CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt"))
|
|
||||||
AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"}
|
|
||||||
|
|
||||||
# Azure Blob
|
|
||||||
AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
||||||
AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
|
||||||
|
|
||||||
# Azure Storage Queue
|
|
||||||
AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
|
|
||||||
|
|
||||||
# Email toggle for messages produced by the poller (default: True)
|
|
||||||
EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"}
|
|
||||||
|
|
||||||
if SINK.startswith("file"):
|
|
||||||
OUTDIR.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# ---------- Azure clients (lazy) ----------
|
|
||||||
_blob_service = None
|
|
||||||
_container_client = None
|
|
||||||
_queue_client = None
|
|
||||||
|
|
||||||
def _init_blob():
|
|
||||||
global _blob_service, _container_client
|
|
||||||
if _blob_service:
|
|
||||||
return
|
|
||||||
from azure.storage.blob import BlobServiceClient
|
|
||||||
_blob_service = BlobServiceClient.from_connection_string(AZ_CS)
|
|
||||||
_container_client = _blob_service.get_container_client(AZ_CONTAINER)
|
|
||||||
try:
|
|
||||||
_container_client.create_container()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _init_queue():
|
|
||||||
global _queue_client
|
|
||||||
if _queue_client:
|
|
||||||
return
|
|
||||||
from azure.storage.queue import QueueClient
|
|
||||||
_queue_client = QueueClient.from_connection_string(
|
|
||||||
conn_str=AZ_CS, queue_name=AZ_QUEUE
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
_queue_client.create_queue() # idempotent
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# ---------- Checkpoint helpers ----------
|
|
||||||
def read_ckpt() -> str | None:
|
|
||||||
if not CKPT_FILE.exists(): return None
|
|
||||||
val = CKPT_FILE.read_text().strip()
|
|
||||||
return val or None
|
|
||||||
|
|
||||||
def write_ckpt(val: str) -> None:
|
|
||||||
CKPT_FILE.write_text(val)
|
|
||||||
|
|
||||||
def to_epoch_seconds(v) -> int | None:
|
|
||||||
if v is None: return None
|
|
||||||
try:
|
|
||||||
return int(float(v))
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
s = str(v).replace("Z", "+00:00")
|
|
||||||
return int(dt.datetime.fromisoformat(s).timestamp())
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# ---------- Splunk helpers ----------
|
|
||||||
def ensure_index(service, name: str):
|
|
||||||
# idempotent: create if missing
|
|
||||||
for idx in service.indexes:
|
|
||||||
if idx.name == name:
|
|
||||||
return
|
|
||||||
service.indexes.create(name)
|
|
||||||
|
|
||||||
def build_search(ckpt_epoch: int | None) -> str:
|
|
||||||
q = f'''
|
|
||||||
search index={INDEX} sourcetype="{SOURCETYPE}"
|
|
||||||
| fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status
|
|
||||||
'''.strip()
|
|
||||||
if ckpt_epoch is not None:
|
|
||||||
q += f"\n| where _indextime > {ckpt_epoch}"
|
|
||||||
q += "\n| sort + _indextime"
|
|
||||||
return q
|
|
||||||
|
|
||||||
def fetch(service, ckpt_epoch: int | None):
|
|
||||||
job = service.jobs.create(
|
|
||||||
build_search(ckpt_epoch),
|
|
||||||
exec_mode="normal",
|
|
||||||
earliest_time=INITIAL_LOOKBACK,
|
|
||||||
latest_time="now",
|
|
||||||
output_mode="json",
|
|
||||||
)
|
|
||||||
while not job.is_done():
|
|
||||||
if STOP: break
|
|
||||||
time.sleep(0.5)
|
|
||||||
rr = JSONResultsReader(job.results(output_mode="json"))
|
|
||||||
rows = [dict(r) for r in rr if isinstance(r, dict)]
|
|
||||||
job.cancel()
|
|
||||||
return rows
|
|
||||||
|
|
||||||
# ---------- Chunking ----------
|
|
||||||
def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES):
|
|
||||||
buf, size = [], 0
|
|
||||||
for item in items:
|
|
||||||
b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8")
|
|
||||||
if size + len(b) > max_bytes and buf:
|
|
||||||
yield b"".join(buf)
|
|
||||||
buf, size = [b], len(b)
|
|
||||||
else:
|
|
||||||
buf.append(b); size += len(b)
|
|
||||||
if buf: yield b"".join(buf)
|
|
||||||
|
|
||||||
# ---------- Sinks ----------
|
|
||||||
def write_chunk_file(blob: bytes) -> pathlib.Path:
|
|
||||||
ts = int(time.time())
|
|
||||||
name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
|
|
||||||
name.write_bytes(blob)
|
|
||||||
return name
|
|
||||||
|
|
||||||
def upload_chunk_blob(blob: bytes):
|
|
||||||
_init_blob()
|
|
||||||
from azure.storage.blob import ContentSettings
|
|
||||||
ts = int(time.time())
|
|
||||||
ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl"
|
|
||||||
# timezone-aware UTC
|
|
||||||
now_utc = dt.datetime.now(dt.timezone.utc)
|
|
||||||
blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}"
|
|
||||||
data = gzip.compress(blob) if AZURE_COMPRESS else blob
|
|
||||||
content_settings = ContentSettings(
|
|
||||||
content_type="application/json",
|
|
||||||
content_encoding=("gzip" if AZURE_COMPRESS else None)
|
|
||||||
)
|
|
||||||
bc = _container_client.get_blob_client(blob_name)
|
|
||||||
bc.upload_blob(data, overwrite=True, content_settings=content_settings)
|
|
||||||
return {
|
|
||||||
"blob_name": blob_name,
|
|
||||||
"url": bc.url,
|
|
||||||
"size_bytes": len(data),
|
|
||||||
"compressed": AZURE_COMPRESS,
|
|
||||||
}
|
|
||||||
|
|
||||||
def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True):
|
|
||||||
_init_queue()
|
|
||||||
payload = {
|
|
||||||
"blob": {"container": container, "blob_name": blob_name},
|
|
||||||
"email": {"send": bool(send_email)}
|
|
||||||
}
|
|
||||||
_queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False))
|
|
||||||
print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True)
|
|
||||||
|
|
||||||
# ---------- Main ----------
|
|
||||||
def main():
|
|
||||||
print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})")
|
|
||||||
service = client.connect(
|
|
||||||
host=SPLUNK_HOST,
|
|
||||||
port=SPLUNK_PORT,
|
|
||||||
scheme="https",
|
|
||||||
username=SPLUNK_USER,
|
|
||||||
password=SPLUNK_PW,
|
|
||||||
verify=SPLUNK_VERIFY_SSL,
|
|
||||||
)
|
|
||||||
|
|
||||||
if CREATE_INDEX_IF_MISSING:
|
|
||||||
try:
|
|
||||||
ensure_index(service, INDEX)
|
|
||||||
print(f"[poller] ensured index exists: {INDEX}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[poller] warn: ensure_index failed: {e}", flush=True)
|
|
||||||
|
|
||||||
ckpt_val = read_ckpt()
|
|
||||||
ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None
|
|
||||||
|
|
||||||
while not STOP:
|
|
||||||
rows = fetch(service, ckpt_epoch)
|
|
||||||
if not rows:
|
|
||||||
print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True)
|
|
||||||
for _ in range(SLEEP_SECONDS):
|
|
||||||
if STOP: break
|
|
||||||
time.sleep(1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0
|
|
||||||
if max_index_time:
|
|
||||||
ckpt_epoch = max(ckpt_epoch or 0, max_index_time)
|
|
||||||
write_ckpt(str(ckpt_epoch))
|
|
||||||
|
|
||||||
for _, blob in enumerate(chunks_by_bytes(rows)):
|
|
||||||
# (Document kept for potential future LC usage)
|
|
||||||
_ = Document(
|
|
||||||
page_content=blob.decode("utf-8", errors="ignore"),
|
|
||||||
metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)},
|
|
||||||
)
|
|
||||||
|
|
||||||
if SINK == "file":
|
|
||||||
fpath = write_chunk_file(blob)
|
|
||||||
print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True)
|
|
||||||
|
|
||||||
elif SINK == "blob":
|
|
||||||
if not AZ_CS:
|
|
||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads")
|
|
||||||
meta = upload_chunk_blob(blob)
|
|
||||||
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
|
|
||||||
|
|
||||||
elif SINK == "blob+queue":
|
|
||||||
if not AZ_CS:
|
|
||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue")
|
|
||||||
meta = upload_chunk_blob(blob)
|
|
||||||
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
|
|
||||||
enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT)
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unknown SINK={SINK}")
|
|
||||||
|
|
||||||
# brief pause
|
|
||||||
for _ in range(5):
|
|
||||||
if STOP: break
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
print("[poller] stopping gracefully")
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Loading…
x
Reference in New Issue
Block a user