intesa_splunk_main/splunk_poller.py

261 lines
9.0 KiB
Python

# splunk_poller.py
import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys
import splunklib.client as client
from splunklib.results import JSONResultsReader
try:
from langchain_core.documents import Document
except ImportError:
from langchain.schema import Document
STOP = False
def _handle_stop(signum, frame):
global STOP
STOP = True
signal.signal(signal.SIGINT, _handle_stop)
signal.signal(signal.SIGTERM, _handle_stop)
# ---------- Splunk config ----------
SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost")
SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089"))
SPLUNK_USER = os.getenv("SPLUNK_USER", "admin")
SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9")
SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"}
INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments")
SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico")
INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h")
CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"}
# ---------- Polling / chunking ----------
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000)))
# ---------- Sinks ----------
# Supported: file | blob | blob+queue
SINK = os.getenv("SINK", "file").lower()
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out"))
CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt"))
AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"}
# Azure Blob
AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
# Azure Storage Queue
AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
# Email toggle for messages produced by the poller (default: True)
EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"}
if SINK.startswith("file"):
OUTDIR.mkdir(parents=True, exist_ok=True)
# ---------- Azure clients (lazy) ----------
_blob_service = None
_container_client = None
_queue_client = None
def _init_blob():
global _blob_service, _container_client
if _blob_service:
return
from azure.storage.blob import BlobServiceClient
_blob_service = BlobServiceClient.from_connection_string(AZ_CS)
_container_client = _blob_service.get_container_client(AZ_CONTAINER)
try:
_container_client.create_container()
except Exception:
pass
def _init_queue():
global _queue_client
if _queue_client:
return
from azure.storage.queue import QueueClient
_queue_client = QueueClient.from_connection_string(
conn_str=AZ_CS, queue_name=AZ_QUEUE
)
try:
_queue_client.create_queue() # idempotent
except Exception:
pass
# ---------- Checkpoint helpers ----------
def read_ckpt() -> str | None:
if not CKPT_FILE.exists(): return None
val = CKPT_FILE.read_text().strip()
return val or None
def write_ckpt(val: str) -> None:
CKPT_FILE.write_text(val)
def to_epoch_seconds(v) -> int | None:
if v is None: return None
try:
return int(float(v))
except Exception:
pass
try:
s = str(v).replace("Z", "+00:00")
return int(dt.datetime.fromisoformat(s).timestamp())
except Exception:
return None
# ---------- Splunk helpers ----------
def ensure_index(service, name: str):
# idempotent: create if missing
for idx in service.indexes:
if idx.name == name:
return
service.indexes.create(name)
def build_search(ckpt_epoch: int | None) -> str:
q = f'''
search index={INDEX} sourcetype="{SOURCETYPE}"
| fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status
'''.strip()
if ckpt_epoch is not None:
q += f"\n| where _indextime > {ckpt_epoch}"
q += "\n| sort + _indextime"
return q
def fetch(service, ckpt_epoch: int | None):
job = service.jobs.create(
build_search(ckpt_epoch),
exec_mode="normal",
earliest_time=INITIAL_LOOKBACK,
latest_time="now",
output_mode="json",
)
while not job.is_done():
if STOP: break
time.sleep(0.5)
rr = JSONResultsReader(job.results(output_mode="json"))
rows = [dict(r) for r in rr if isinstance(r, dict)]
job.cancel()
return rows
# ---------- Chunking ----------
def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES):
buf, size = [], 0
for item in items:
b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8")
if size + len(b) > max_bytes and buf:
yield b"".join(buf)
buf, size = [b], len(b)
else:
buf.append(b); size += len(b)
if buf: yield b"".join(buf)
# ---------- Sinks ----------
def write_chunk_file(blob: bytes) -> pathlib.Path:
ts = int(time.time())
name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
name.write_bytes(blob)
return name
def upload_chunk_blob(blob: bytes):
_init_blob()
from azure.storage.blob import ContentSettings
ts = int(time.time())
ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl"
# timezone-aware UTC
now_utc = dt.datetime.now(dt.timezone.utc)
blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}"
data = gzip.compress(blob) if AZURE_COMPRESS else blob
content_settings = ContentSettings(
content_type="application/json",
content_encoding=("gzip" if AZURE_COMPRESS else None)
)
bc = _container_client.get_blob_client(blob_name)
bc.upload_blob(data, overwrite=True, content_settings=content_settings)
return {
"blob_name": blob_name,
"url": bc.url,
"size_bytes": len(data),
"compressed": AZURE_COMPRESS,
}
def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True):
_init_queue()
payload = {
"blob": {"container": container, "blob_name": blob_name},
"email": {"send": bool(send_email)}
}
_queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False))
print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True)
# ---------- Main ----------
def main():
print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})")
service = client.connect(
host=SPLUNK_HOST,
port=SPLUNK_PORT,
scheme="https",
username=SPLUNK_USER,
password=SPLUNK_PW,
verify=SPLUNK_VERIFY_SSL,
)
if CREATE_INDEX_IF_MISSING:
try:
ensure_index(service, INDEX)
print(f"[poller] ensured index exists: {INDEX}")
except Exception as e:
print(f"[poller] warn: ensure_index failed: {e}", flush=True)
ckpt_val = read_ckpt()
ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None
while not STOP:
rows = fetch(service, ckpt_epoch)
if not rows:
print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True)
for _ in range(SLEEP_SECONDS):
if STOP: break
time.sleep(1)
continue
max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0
if max_index_time:
ckpt_epoch = max(ckpt_epoch or 0, max_index_time)
write_ckpt(str(ckpt_epoch))
for _, blob in enumerate(chunks_by_bytes(rows)):
# (Document kept for potential future LC usage)
_ = Document(
page_content=blob.decode("utf-8", errors="ignore"),
metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)},
)
if SINK == "file":
fpath = write_chunk_file(blob)
print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True)
elif SINK == "blob":
if not AZ_CS:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads")
meta = upload_chunk_blob(blob)
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
elif SINK == "blob+queue":
if not AZ_CS:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue")
meta = upload_chunk_blob(blob)
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT)
else:
raise ValueError(f"Unknown SINK={SINK}")
# brief pause
for _ in range(5):
if STOP: break
time.sleep(1)
print("[poller] stopping gracefully")
sys.exit(0)
if __name__ == "__main__":
main()