tf_sharepoint_integration/background_indexer.py
Daniel Grozdanovic bcd0f8a227
Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
Initial commit: SharePoint connector and ToothFairyAI integration
2026-02-22 17:58:45 +02:00

309 lines
10 KiB
Python

"""
Background Document Indexer
Indexes SharePoint files in the background without blocking the UI.
Uses threading for simple deployment (no Celery/Redis required).
"""
import threading
import time
from typing import Dict, List, Optional
from datetime import datetime
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IndexingJob:
"""Represents a background indexing job."""
def __init__(self, job_id: str, site_id: str, site_name: str, connection_id: str, user_id: str):
self.job_id = job_id
self.site_id = site_id
self.site_name = site_name
self.connection_id = connection_id
self.user_id = user_id
self.status = "pending" # pending, running, completed, failed
self.progress = 0 # 0-100
self.total_files = 0
self.processed_files = 0
self.successful_files = 0
self.failed_files = 0
self.started_at = None
self.completed_at = None
self.error = None
self.current_file = None
class BackgroundIndexer:
"""
Background indexer for SharePoint documents.
Runs indexing jobs in separate threads to avoid blocking the main application.
"""
def __init__(self):
self.jobs: Dict[str, IndexingJob] = {}
self.active_threads: Dict[str, threading.Thread] = {}
self.lock = threading.Lock()
def start_indexing(
self,
job_id: str,
site_id: str,
site_name: str,
connection_id: str,
user_id: str,
connector,
vector_store,
document_parser,
path: str = "",
tags: List[str] = None
) -> IndexingJob:
"""
Start a background indexing job for a SharePoint site.
Args:
job_id: Unique job identifier
site_id: SharePoint site ID
site_name: Site display name
connection_id: SharePoint connection ID
user_id: User ID
connector: SharePoint connector instance
vector_store: Vector store instance
document_parser: Document parser instance
path: Optional path to start indexing from
Returns:
IndexingJob instance
"""
with self.lock:
# Create job
job = IndexingJob(job_id, site_id, site_name, connection_id, user_id)
self.jobs[job_id] = job
# Start thread
thread = threading.Thread(
target=self._index_site,
args=(job, connector, vector_store, document_parser, path, tags or []),
daemon=True
)
self.active_threads[job_id] = thread
thread.start()
logger.info(f"Started indexing job {job_id} for site {site_name}")
return job
def get_job_status(self, job_id: str) -> Optional[Dict]:
"""Get the status of an indexing job."""
with self.lock:
job = self.jobs.get(job_id)
if not job:
return None
return {
"job_id": job.job_id,
"site_id": job.site_id,
"site_name": job.site_name,
"status": job.status,
"progress": job.progress,
"total_files": job.total_files,
"processed_files": job.processed_files,
"successful_files": job.successful_files,
"failed_files": job.failed_files,
"current_file": job.current_file,
"started_at": job.started_at.isoformat() if job.started_at else None,
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
"error": job.error
}
def cancel_job(self, job_id: str) -> bool:
"""Cancel a running indexing job."""
with self.lock:
job = self.jobs.get(job_id)
if job and job.status == "running":
job.status = "cancelled"
job.completed_at = datetime.utcnow()
logger.info(f"Cancelled indexing job {job_id}")
return True
return False
def _index_site(self, job, connector, vector_store, document_parser, path="", tags=None):
"""
Index all files in a SharePoint site (runs in background thread).
This method:
1. Recursively lists all files in the site
2. Downloads and parses each file
3. Generates embeddings
4. Stores in vector store with specified tags
"""
from saas_connector_dynamodb import SecureSharePointClient
if tags is None:
tags = []
try:
job.status = "running"
job.started_at = datetime.utcnow()
# Create SharePoint client
client = SecureSharePointClient(connector, job.connection_id, job.user_id)
# First, count total files
logger.info(f"[Job {job.job_id}] Counting files in {job.site_name}...")
all_files = self._list_all_files_recursive(client, job, job.site_id, path)
if job.status == "cancelled":
return
job.total_files = len(all_files)
logger.info(f"[Job {job.job_id}] Found {job.total_files} files to index")
# Process each file
for file_info in all_files:
if job.status == "cancelled":
logger.info(f"[Job {job.job_id}] Job cancelled by user")
break
try:
self._process_file(
job,
client,
vector_store,
document_parser,
file_info,
tags
)
job.successful_files += 1
except Exception as e:
logger.error(f"[Job {job.job_id}] Failed to process {file_info['path']}: {e}")
job.failed_files += 1
job.processed_files += 1
job.progress = int((job.processed_files / job.total_files) * 100) if job.total_files > 0 else 0
# Mark as completed
if job.status != "cancelled":
job.status = "completed"
job.completed_at = datetime.utcnow()
logger.info(
f"[Job {job.job_id}] Completed: {job.successful_files} successful, "
f"{job.failed_files} failed out of {job.total_files} total"
)
except Exception as e:
logger.error(f"[Job {job.job_id}] Job failed with error: {e}")
job.status = "failed"
job.error = str(e)
job.completed_at = datetime.utcnow()
def _list_all_files_recursive(self, client, job, site_id, path=""):
"""Recursively list all files in a site."""
files_to_process = []
try:
items = client.list_files(site_id, path)
for item in items:
if job.status == "cancelled":
break
# If it's a folder, recurse
if 'folder' in item:
folder_name = item['name']
new_path = f"{path}/{folder_name}" if path else folder_name
files_to_process.extend(
self._list_all_files_recursive(client, job, site_id, new_path)
)
else:
# It's a file
file_path = f"{path}/{item['name']}" if path else item['name']
files_to_process.append({
'name': item['name'],
'path': file_path,
'size': item.get('size', 0)
})
except Exception as e:
logger.error(f"[Job {job.job_id}] Error listing files in {path}: {e}")
return files_to_process
def _process_file(self, job, client, vector_store, document_parser, file_info, tags=None):
"""Process a single file: download, parse, embed, store."""
if not vector_store:
logger.warning(f"[Job {job.job_id}] Vector store not available, skipping {file_info['name']}")
return
if tags is None:
tags = []
filename = file_info['name']
file_path = file_info['path']
job.current_file = filename
logger.info(f"[Job {job.job_id}] Processing {filename}...")
# Check if file can be parsed
if not document_parser.can_parse(filename):
logger.info(f"[Job {job.job_id}] Skipping unsupported file type: {filename}")
return
# Download file
binary_content = client.read_file(job.site_id, file_path, as_text=False)
# Parse content
try:
content = document_parser.parse(binary_content, filename)
except Exception as parse_err:
logger.warning(f"[Job {job.job_id}] Failed to parse {filename}: {parse_err}")
# Try fallback to text
try:
content = binary_content.decode('utf-8', errors='ignore')
except:
logger.error(f"[Job {job.job_id}] Could not decode {filename}, skipping")
return
# Skip error messages
if content.startswith("[") and content.endswith("]"):
logger.info(f"[Job {job.job_id}] Skipping error content for {filename}")
return
# Skip very small files (likely empty)
if len(content.strip()) < 50:
logger.info(f"[Job {job.job_id}] Skipping small file {filename} ({len(content)} chars)")
return
# Add to vector store with specified tags
try:
document_id = vector_store.add_document(
user_id=job.user_id,
site_id=job.site_id,
file_path=file_path,
filename=filename,
content=content,
tags=tags, # Apply tags from indexing request
chunk_size=1000,
chunk_overlap=200
)
logger.info(f"[Job {job.job_id}] Indexed {filename} with document_id {document_id} and tags {tags}")
except Exception as e:
logger.error(f"[Job {job.job_id}] Failed to add {filename} to vector store: {e}")
raise
# Global indexer instance
_indexer = None
def get_indexer() -> BackgroundIndexer:
"""Get the global indexer instance."""
global _indexer
if _indexer is None:
_indexer = BackgroundIndexer()
return _indexer