diff --git a/blob-func/.funcignore b/blob-func/.funcignore new file mode 100644 index 0000000..6d090e7 --- /dev/null +++ b/blob-func/.funcignore @@ -0,0 +1,2 @@ + +venv \ No newline at end of file diff --git a/blob-func/function_app.py b/blob-func/function_app.py new file mode 100644 index 0000000..3667b8b --- /dev/null +++ b/blob-func/function_app.py @@ -0,0 +1,98 @@ +import azure.functions as func +import logging +import os +import json +import requests +from typing import List, Any + +app = func.FunctionApp() + +# Simple health check +@app.route(route="ping", auth_level=func.AuthLevel.ANONYMOUS) +def Ping(req: func.HttpRequest) -> func.HttpResponse: + return func.HttpResponse("pong", status_code=200) + + +@app.blob_trigger( + arg_name="blob", + path="bank-logs/intesa/{name}.json", # container/path pattern + connection="BLOB_CONN" # App Setting name (NOT the raw connection string) +) +def BlobIngest(blob: func.InputStream): + """ + Reads JSON, NDJSON, or JSON array files from Blob Storage. + Handles UTF-8 BOM safely. Posts parsed items to AGENT_API_URL/ingest. + """ + logging.info("BlobIngest fired: name=%s length=%s", blob.name, blob.length) + + # Read whole blob and decode using utf-8-sig to strip a BOM if present + raw: bytes = blob.read() + # strict = raise if decoding is invalid; change to "ignore" only if you truly want to skip bad bytes + text: str = raw.decode("utf-8-sig", errors="strict") + + items: List[Any] = [] + + # Try whole-document JSON first (object or array). If that fails, fall back to NDJSON. + stripped = text.lstrip("\ufeff").strip() # extra safety if BOM sneaks through + try: + if stripped: + first = stripped[0] + if first in "[{": + parsed = json.loads(stripped) + items = parsed if isinstance(parsed, list) else [parsed] + else: + # Not an array or object at top level -> treat as NDJSON + raise ValueError("Top-level not array/object; using NDJSON mode.") + else: + logging.warning("Blob %s is empty.", blob.name) + items = [] + except Exception: + # NDJSON fallback (one JSON object per line) + ndjson_items: List[Any] = [] + for i, line in enumerate(text.splitlines(), start=1): + s = line.lstrip("\ufeff").strip() # strip BOM at line-start + trim whitespace + if not s: + continue + try: + ndjson_items.append(json.loads(s)) + except json.JSONDecodeError as e: + logging.error("Invalid JSON at line %d in %s: %s | line=%r", + i, blob.name, e, s[:200]) + # Re-raise to allow retry/poison for truly bad content + raise + items = ndjson_items + + logging.info("Parsed %d item(s) from blob '%s'.", len(items), blob.name) + + # If no agent URL configured, just log and exit gracefully + api = os.environ.get("AGENT_API_URL") + if not api: + logging.warning("AGENT_API_URL not set; skipping POST. (Parsed %d items).", len(items)) + return + + # Prepare POST + url = api.rstrip('/') + "/ingest" + batch_size = int(os.environ.get("INGEST_BATCH_SIZE", "500")) + timeout = float(os.environ.get("HTTP_TIMEOUT_SECONDS", "25")) + + def chunks(seq: List[Any], n: int): + for i in range(0, len(seq), n): + yield seq[i:i + n] + + # Send in batches to avoid oversized payloads + sent = 0 + for batch in chunks(items, batch_size): + try: + r = requests.post(url, json=batch, timeout=timeout) + logging.info("POST %s [%d items] -> %d", url, len(batch), r.status_code) + if r.status_code >= 400: + logging.error("Agent API error %d: %s", r.status_code, r.text[:500]) + # Raise to trigger retry/poison if desired + raise RuntimeError(f"Agent API returned {r.status_code}") + sent += len(batch) + except Exception: + logging.exception("HTTP call to Agent API failed after sending %d/%d items.", sent, len(items)) + # Re-raise so the runtime can retry and dead-letter if needed + raise + + logging.info("Completed BlobIngest for %s: posted %d item(s).", blob.name, sent) diff --git a/blob-func/host.json b/blob-func/host.json new file mode 100644 index 0000000..10f2809 --- /dev/null +++ b/blob-func/host.json @@ -0,0 +1,10 @@ +{ + "version": "2.0", + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + }, + "logging": { + "applicationInsights": { "samplingSettings": { "isEnabled": true, "excludedTypes": "Request" } } + } +} diff --git a/blob-func/local.settings.json b/blob-func/local.settings.json new file mode 100644 index 0000000..09c5658 --- /dev/null +++ b/blob-func/local.settings.json @@ -0,0 +1,12 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;BlobEndpoint=https://tfindevst.blob.core.windows.net/;FileEndpoint=https://tfindevst.file.core.windows.net/;QueueEndpoint=https://tfindevst.queue.core.windows.net/;TableEndpoint=https://tfindevst.table.core.windows.net/", + "BLOB_CONN": "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=tfindevst;AccountKey=j6JLrPHysajPGGZfAlPGdJsg3GuhCCnI23bERObVOuOTBeuePiMHvjTJ9vABSbujiryK4qQdRrJC+AStIGOT4A==;BlobEndpoint=https://tfindevst.blob.core.windows.net/;FileEndpoint=https://tfindevst.file.core.windows.net/;QueueEndpoint=https://tfindevst.queue.core.windows.net/;TableEndpoint=https://tfindevst.table.core.windows.net/", + "AGENT_API_URL": "https://agent-api-app.azurewebsites.net", + "AzureWebJobsSecretStorageType": "Files" + }, + "ConnectionStrings": {}, + "Host": {} +} \ No newline at end of file diff --git a/blob-func/requirements.txt b/blob-func/requirements.txt new file mode 100644 index 0000000..a0d4912 --- /dev/null +++ b/blob-func/requirements.txt @@ -0,0 +1,2 @@ +azure-functions==1.20.0 +requests==2.32.3