261 lines
9.0 KiB
Python
261 lines
9.0 KiB
Python
# splunk_poller.py
|
|
import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys
|
|
import splunklib.client as client
|
|
from splunklib.results import JSONResultsReader
|
|
|
|
try:
|
|
from langchain_core.documents import Document
|
|
except ImportError:
|
|
from langchain.schema import Document
|
|
|
|
STOP = False
|
|
def _handle_stop(signum, frame):
|
|
global STOP
|
|
STOP = True
|
|
signal.signal(signal.SIGINT, _handle_stop)
|
|
signal.signal(signal.SIGTERM, _handle_stop)
|
|
|
|
# ---------- Splunk config ----------
|
|
SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost")
|
|
SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089"))
|
|
SPLUNK_USER = os.getenv("SPLUNK_USER", "admin")
|
|
SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9")
|
|
SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"}
|
|
INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments")
|
|
SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico")
|
|
INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h")
|
|
CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"}
|
|
|
|
# ---------- Polling / chunking ----------
|
|
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60"))
|
|
MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000)))
|
|
|
|
# ---------- Sinks ----------
|
|
# Supported: file | blob | blob+queue
|
|
SINK = os.getenv("SINK", "file").lower()
|
|
OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out"))
|
|
CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt"))
|
|
AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"}
|
|
|
|
# Azure Blob
|
|
AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
|
|
AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs")
|
|
|
|
# Azure Storage Queue
|
|
AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks")
|
|
|
|
# Email toggle for messages produced by the poller (default: True)
|
|
EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"}
|
|
|
|
if SINK.startswith("file"):
|
|
OUTDIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ---------- Azure clients (lazy) ----------
|
|
_blob_service = None
|
|
_container_client = None
|
|
_queue_client = None
|
|
|
|
def _init_blob():
|
|
global _blob_service, _container_client
|
|
if _blob_service:
|
|
return
|
|
from azure.storage.blob import BlobServiceClient
|
|
_blob_service = BlobServiceClient.from_connection_string(AZ_CS)
|
|
_container_client = _blob_service.get_container_client(AZ_CONTAINER)
|
|
try:
|
|
_container_client.create_container()
|
|
except Exception:
|
|
pass
|
|
|
|
def _init_queue():
|
|
global _queue_client
|
|
if _queue_client:
|
|
return
|
|
from azure.storage.queue import QueueClient
|
|
_queue_client = QueueClient.from_connection_string(
|
|
conn_str=AZ_CS, queue_name=AZ_QUEUE
|
|
)
|
|
try:
|
|
_queue_client.create_queue() # idempotent
|
|
except Exception:
|
|
pass
|
|
|
|
# ---------- Checkpoint helpers ----------
|
|
def read_ckpt() -> str | None:
|
|
if not CKPT_FILE.exists(): return None
|
|
val = CKPT_FILE.read_text().strip()
|
|
return val or None
|
|
|
|
def write_ckpt(val: str) -> None:
|
|
CKPT_FILE.write_text(val)
|
|
|
|
def to_epoch_seconds(v) -> int | None:
|
|
if v is None: return None
|
|
try:
|
|
return int(float(v))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
s = str(v).replace("Z", "+00:00")
|
|
return int(dt.datetime.fromisoformat(s).timestamp())
|
|
except Exception:
|
|
return None
|
|
|
|
# ---------- Splunk helpers ----------
|
|
def ensure_index(service, name: str):
|
|
# idempotent: create if missing
|
|
for idx in service.indexes:
|
|
if idx.name == name:
|
|
return
|
|
service.indexes.create(name)
|
|
|
|
def build_search(ckpt_epoch: int | None) -> str:
|
|
q = f'''
|
|
search index={INDEX} sourcetype="{SOURCETYPE}"
|
|
| fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status
|
|
'''.strip()
|
|
if ckpt_epoch is not None:
|
|
q += f"\n| where _indextime > {ckpt_epoch}"
|
|
q += "\n| sort + _indextime"
|
|
return q
|
|
|
|
def fetch(service, ckpt_epoch: int | None):
|
|
job = service.jobs.create(
|
|
build_search(ckpt_epoch),
|
|
exec_mode="normal",
|
|
earliest_time=INITIAL_LOOKBACK,
|
|
latest_time="now",
|
|
output_mode="json",
|
|
)
|
|
while not job.is_done():
|
|
if STOP: break
|
|
time.sleep(0.5)
|
|
rr = JSONResultsReader(job.results(output_mode="json"))
|
|
rows = [dict(r) for r in rr if isinstance(r, dict)]
|
|
job.cancel()
|
|
return rows
|
|
|
|
# ---------- Chunking ----------
|
|
def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES):
|
|
buf, size = [], 0
|
|
for item in items:
|
|
b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8")
|
|
if size + len(b) > max_bytes and buf:
|
|
yield b"".join(buf)
|
|
buf, size = [b], len(b)
|
|
else:
|
|
buf.append(b); size += len(b)
|
|
if buf: yield b"".join(buf)
|
|
|
|
# ---------- Sinks ----------
|
|
def write_chunk_file(blob: bytes) -> pathlib.Path:
|
|
ts = int(time.time())
|
|
name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl"
|
|
name.write_bytes(blob)
|
|
return name
|
|
|
|
def upload_chunk_blob(blob: bytes):
|
|
_init_blob()
|
|
from azure.storage.blob import ContentSettings
|
|
ts = int(time.time())
|
|
ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl"
|
|
# timezone-aware UTC
|
|
now_utc = dt.datetime.now(dt.timezone.utc)
|
|
blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}"
|
|
data = gzip.compress(blob) if AZURE_COMPRESS else blob
|
|
content_settings = ContentSettings(
|
|
content_type="application/json",
|
|
content_encoding=("gzip" if AZURE_COMPRESS else None)
|
|
)
|
|
bc = _container_client.get_blob_client(blob_name)
|
|
bc.upload_blob(data, overwrite=True, content_settings=content_settings)
|
|
return {
|
|
"blob_name": blob_name,
|
|
"url": bc.url,
|
|
"size_bytes": len(data),
|
|
"compressed": AZURE_COMPRESS,
|
|
}
|
|
|
|
def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True):
|
|
_init_queue()
|
|
payload = {
|
|
"blob": {"container": container, "blob_name": blob_name},
|
|
"email": {"send": bool(send_email)}
|
|
}
|
|
_queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False))
|
|
print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True)
|
|
|
|
# ---------- Main ----------
|
|
def main():
|
|
print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})")
|
|
service = client.connect(
|
|
host=SPLUNK_HOST,
|
|
port=SPLUNK_PORT,
|
|
scheme="https",
|
|
username=SPLUNK_USER,
|
|
password=SPLUNK_PW,
|
|
verify=SPLUNK_VERIFY_SSL,
|
|
)
|
|
|
|
if CREATE_INDEX_IF_MISSING:
|
|
try:
|
|
ensure_index(service, INDEX)
|
|
print(f"[poller] ensured index exists: {INDEX}")
|
|
except Exception as e:
|
|
print(f"[poller] warn: ensure_index failed: {e}", flush=True)
|
|
|
|
ckpt_val = read_ckpt()
|
|
ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None
|
|
|
|
while not STOP:
|
|
rows = fetch(service, ckpt_epoch)
|
|
if not rows:
|
|
print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True)
|
|
for _ in range(SLEEP_SECONDS):
|
|
if STOP: break
|
|
time.sleep(1)
|
|
continue
|
|
|
|
max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0
|
|
if max_index_time:
|
|
ckpt_epoch = max(ckpt_epoch or 0, max_index_time)
|
|
write_ckpt(str(ckpt_epoch))
|
|
|
|
for _, blob in enumerate(chunks_by_bytes(rows)):
|
|
# (Document kept for potential future LC usage)
|
|
_ = Document(
|
|
page_content=blob.decode("utf-8", errors="ignore"),
|
|
metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)},
|
|
)
|
|
|
|
if SINK == "file":
|
|
fpath = write_chunk_file(blob)
|
|
print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True)
|
|
|
|
elif SINK == "blob":
|
|
if not AZ_CS:
|
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads")
|
|
meta = upload_chunk_blob(blob)
|
|
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
|
|
|
|
elif SINK == "blob+queue":
|
|
if not AZ_CS:
|
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue")
|
|
meta = upload_chunk_blob(blob)
|
|
print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True)
|
|
enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT)
|
|
|
|
else:
|
|
raise ValueError(f"Unknown SINK={SINK}")
|
|
|
|
# brief pause
|
|
for _ in range(5):
|
|
if STOP: break
|
|
time.sleep(1)
|
|
|
|
print("[poller] stopping gracefully")
|
|
sys.exit(0)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|