# splunk_poller.py import os, time, json, pathlib, datetime as dt, gzip, uuid, signal, sys import splunklib.client as client from splunklib.results import JSONResultsReader try: from langchain_core.documents import Document except ImportError: from langchain.schema import Document STOP = False def _handle_stop(signum, frame): global STOP STOP = True signal.signal(signal.SIGINT, _handle_stop) signal.signal(signal.SIGTERM, _handle_stop) # ---------- Splunk config ---------- SPLUNK_HOST = os.getenv("SPLUNK_HOST", "localhost") SPLUNK_PORT = int(os.getenv("SPLUNK_PORT", "8089")) SPLUNK_USER = os.getenv("SPLUNK_USER", "admin") SPLUNK_PW = os.getenv("SPLUNK_PW", "Str0ngP@ss!9") SPLUNK_VERIFY_SSL = os.getenv("SPLUNK_VERIFY_SSL", "false").lower() in {"1","true","yes"} INDEX = os.getenv("SPLUNK_INDEX", "intesa_payments") SOURCETYPE = os.getenv("SPLUNK_SOURCETYPE", "intesa:bonifico") INITIAL_LOOKBACK = os.getenv("INITIAL_LOOKBACK", "-24h@h") CREATE_INDEX_IF_MISSING = os.getenv("CREATE_INDEX_IF_MISSING", "true").lower() in {"1","true","yes"} # ---------- Polling / chunking ---------- SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS", "60")) MAX_CHUNK_BYTES = int(os.getenv("MAX_CHUNK_BYTES", str(1_800_000))) # ---------- Sinks ---------- # Supported: file | blob | blob+queue SINK = os.getenv("SINK", "file").lower() OUTDIR = pathlib.Path(os.getenv("OUTDIR", "./out")) CKPT_FILE = pathlib.Path(os.getenv("CKPT_FILE", "./.ckpt")) AZURE_COMPRESS = os.getenv("AZURE_COMPRESS", "false").lower() in {"1","true","yes"} # Azure Blob AZ_CS = os.getenv("AZURE_STORAGE_CONNECTION_STRING") AZ_CONTAINER = os.getenv("AZURE_STORAGE_CONTAINER", "bank-logs") # Azure Storage Queue AZ_QUEUE = os.getenv("AZURE_STORAGE_QUEUE_NAME", "log-chunks") # Email toggle for messages produced by the poller (default: True) EMAIL_SEND_DEFAULT = os.getenv("POLLER_EMAIL_SEND_DEFAULT", "true").lower() in {"1","true","yes"} if SINK.startswith("file"): OUTDIR.mkdir(parents=True, exist_ok=True) # ---------- Azure clients (lazy) ---------- _blob_service = None _container_client = None _queue_client = None def _init_blob(): global _blob_service, _container_client if _blob_service: return from azure.storage.blob import BlobServiceClient _blob_service = BlobServiceClient.from_connection_string(AZ_CS) _container_client = _blob_service.get_container_client(AZ_CONTAINER) try: _container_client.create_container() except Exception: pass def _init_queue(): global _queue_client if _queue_client: return from azure.storage.queue import QueueClient _queue_client = QueueClient.from_connection_string( conn_str=AZ_CS, queue_name=AZ_QUEUE ) try: _queue_client.create_queue() # idempotent except Exception: pass # ---------- Checkpoint helpers ---------- def read_ckpt() -> str | None: if not CKPT_FILE.exists(): return None val = CKPT_FILE.read_text().strip() return val or None def write_ckpt(val: str) -> None: CKPT_FILE.write_text(val) def to_epoch_seconds(v) -> int | None: if v is None: return None try: return int(float(v)) except Exception: pass try: s = str(v).replace("Z", "+00:00") return int(dt.datetime.fromisoformat(s).timestamp()) except Exception: return None # ---------- Splunk helpers ---------- def ensure_index(service, name: str): # idempotent: create if missing for idx in service.indexes: if idx.name == name: return service.indexes.create(name) def build_search(ckpt_epoch: int | None) -> str: q = f''' search index={INDEX} sourcetype="{SOURCETYPE}" | fields _time, _indextime, event_type, step, iban_origin_masked, iban_dest_masked, bic_swift, importo, divisa, istantaneo, data_pagamento, spese_commissioni, causale, vop_check, status '''.strip() if ckpt_epoch is not None: q += f"\n| where _indextime > {ckpt_epoch}" q += "\n| sort + _indextime" return q def fetch(service, ckpt_epoch: int | None): job = service.jobs.create( build_search(ckpt_epoch), exec_mode="normal", earliest_time=INITIAL_LOOKBACK, latest_time="now", output_mode="json", ) while not job.is_done(): if STOP: break time.sleep(0.5) rr = JSONResultsReader(job.results(output_mode="json")) rows = [dict(r) for r in rr if isinstance(r, dict)] job.cancel() return rows # ---------- Chunking ---------- def chunks_by_bytes(items, max_bytes=MAX_CHUNK_BYTES): buf, size = [], 0 for item in items: b = (json.dumps(item, separators=(",", ":")) + "\n").encode("utf-8") if size + len(b) > max_bytes and buf: yield b"".join(buf) buf, size = [b], len(b) else: buf.append(b); size += len(b) if buf: yield b"".join(buf) # ---------- Sinks ---------- def write_chunk_file(blob: bytes) -> pathlib.Path: ts = int(time.time()) name = OUTDIR / f"chunk_{ts}_{uuid.uuid4().hex[:8]}.jsonl" name.write_bytes(blob) return name def upload_chunk_blob(blob: bytes): _init_blob() from azure.storage.blob import ContentSettings ts = int(time.time()) ext = "jsonl.gz" if AZURE_COMPRESS else "jsonl" # timezone-aware UTC now_utc = dt.datetime.now(dt.timezone.utc) blob_name = f"intesa/{now_utc.strftime('%Y/%m/%d/%H')}/chunk_{ts}_{uuid.uuid4().hex[:8]}.{ext}" data = gzip.compress(blob) if AZURE_COMPRESS else blob content_settings = ContentSettings( content_type="application/json", content_encoding=("gzip" if AZURE_COMPRESS else None) ) bc = _container_client.get_blob_client(blob_name) bc.upload_blob(data, overwrite=True, content_settings=content_settings) return { "blob_name": blob_name, "url": bc.url, "size_bytes": len(data), "compressed": AZURE_COMPRESS, } def enqueue_blob_msg(container: str, blob_name: str, send_email: bool = True): _init_queue() payload = { "blob": {"container": container, "blob_name": blob_name}, "email": {"send": bool(send_email)} } _queue_client.send_message(json.dumps(payload, separators=(",", ":"), ensure_ascii=False)) print(f"[poller] enqueued to storage queue: {AZ_QUEUE} -> {container}/{blob_name}", flush=True) # ---------- Main ---------- def main(): print(f"[poller] connecting to Splunk https://{SPLUNK_HOST}:{SPLUNK_PORT} (verify_ssl={SPLUNK_VERIFY_SSL})") service = client.connect( host=SPLUNK_HOST, port=SPLUNK_PORT, scheme="https", username=SPLUNK_USER, password=SPLUNK_PW, verify=SPLUNK_VERIFY_SSL, ) if CREATE_INDEX_IF_MISSING: try: ensure_index(service, INDEX) print(f"[poller] ensured index exists: {INDEX}") except Exception as e: print(f"[poller] warn: ensure_index failed: {e}", flush=True) ckpt_val = read_ckpt() ckpt_epoch = int(ckpt_val) if (ckpt_val and ckpt_val.isdigit()) else None while not STOP: rows = fetch(service, ckpt_epoch) if not rows: print(f"[poller] no logs — sleeping {SLEEP_SECONDS}s", flush=True) for _ in range(SLEEP_SECONDS): if STOP: break time.sleep(1) continue max_index_time = max((to_epoch_seconds(r.get("_indextime")) or 0) for r in rows) or 0 if max_index_time: ckpt_epoch = max(ckpt_epoch or 0, max_index_time) write_ckpt(str(ckpt_epoch)) for _, blob in enumerate(chunks_by_bytes(rows)): # (Document kept for potential future LC usage) _ = Document( page_content=blob.decode("utf-8", errors="ignore"), metadata={"source": "splunk", "index": INDEX, "bytes": len(blob)}, ) if SINK == "file": fpath = write_chunk_file(blob) print(f"[poller] wrote {fpath} ({len(blob)} bytes)", flush=True) elif SINK == "blob": if not AZ_CS: raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads") meta = upload_chunk_blob(blob) print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True) elif SINK == "blob+queue": if not AZ_CS: raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING is required for blob uploads/queue") meta = upload_chunk_blob(blob) print(f"[poller] uploaded blob {AZ_CONTAINER}/{meta['blob_name']} ({meta['size_bytes']} bytes, compressed={meta['compressed']})", flush=True) enqueue_blob_msg(AZ_CONTAINER, meta["blob_name"], send_email=EMAIL_SEND_DEFAULT) else: raise ValueError(f"Unknown SINK={SINK}") # brief pause for _ in range(5): if STOP: break time.sleep(1) print("[poller] stopping gracefully") sys.exit(0) if __name__ == "__main__": main()