""" Background Document Indexer Indexes SharePoint files in the background without blocking the UI. Uses threading for simple deployment (no Celery/Redis required). """ import threading import time from typing import Dict, List, Optional from datetime import datetime import logging # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class IndexingJob: """Represents a background indexing job.""" def __init__(self, job_id: str, site_id: str, site_name: str, connection_id: str, user_id: str): self.job_id = job_id self.site_id = site_id self.site_name = site_name self.connection_id = connection_id self.user_id = user_id self.status = "pending" # pending, running, completed, failed self.progress = 0 # 0-100 self.total_files = 0 self.processed_files = 0 self.successful_files = 0 self.failed_files = 0 self.started_at = None self.completed_at = None self.error = None self.current_file = None class BackgroundIndexer: """ Background indexer for SharePoint documents. Runs indexing jobs in separate threads to avoid blocking the main application. """ def __init__(self): self.jobs: Dict[str, IndexingJob] = {} self.active_threads: Dict[str, threading.Thread] = {} self.lock = threading.Lock() def start_indexing( self, job_id: str, site_id: str, site_name: str, connection_id: str, user_id: str, connector, vector_store, document_parser, path: str = "", tags: List[str] = None ) -> IndexingJob: """ Start a background indexing job for a SharePoint site. Args: job_id: Unique job identifier site_id: SharePoint site ID site_name: Site display name connection_id: SharePoint connection ID user_id: User ID connector: SharePoint connector instance vector_store: Vector store instance document_parser: Document parser instance path: Optional path to start indexing from Returns: IndexingJob instance """ with self.lock: # Create job job = IndexingJob(job_id, site_id, site_name, connection_id, user_id) self.jobs[job_id] = job # Start thread thread = threading.Thread( target=self._index_site, args=(job, connector, vector_store, document_parser, path, tags or []), daemon=True ) self.active_threads[job_id] = thread thread.start() logger.info(f"Started indexing job {job_id} for site {site_name}") return job def get_job_status(self, job_id: str) -> Optional[Dict]: """Get the status of an indexing job.""" with self.lock: job = self.jobs.get(job_id) if not job: return None return { "job_id": job.job_id, "site_id": job.site_id, "site_name": job.site_name, "status": job.status, "progress": job.progress, "total_files": job.total_files, "processed_files": job.processed_files, "successful_files": job.successful_files, "failed_files": job.failed_files, "current_file": job.current_file, "started_at": job.started_at.isoformat() if job.started_at else None, "completed_at": job.completed_at.isoformat() if job.completed_at else None, "error": job.error } def cancel_job(self, job_id: str) -> bool: """Cancel a running indexing job.""" with self.lock: job = self.jobs.get(job_id) if job and job.status == "running": job.status = "cancelled" job.completed_at = datetime.utcnow() logger.info(f"Cancelled indexing job {job_id}") return True return False def _index_site(self, job, connector, vector_store, document_parser, path="", tags=None): """ Index all files in a SharePoint site (runs in background thread). This method: 1. Recursively lists all files in the site 2. Downloads and parses each file 3. Generates embeddings 4. Stores in vector store with specified tags """ from saas_connector_dynamodb import SecureSharePointClient if tags is None: tags = [] try: job.status = "running" job.started_at = datetime.utcnow() # Create SharePoint client client = SecureSharePointClient(connector, job.connection_id, job.user_id) # First, count total files logger.info(f"[Job {job.job_id}] Counting files in {job.site_name}...") all_files = self._list_all_files_recursive(client, job, job.site_id, path) if job.status == "cancelled": return job.total_files = len(all_files) logger.info(f"[Job {job.job_id}] Found {job.total_files} files to index") # Process each file for file_info in all_files: if job.status == "cancelled": logger.info(f"[Job {job.job_id}] Job cancelled by user") break try: self._process_file( job, client, vector_store, document_parser, file_info, tags ) job.successful_files += 1 except Exception as e: logger.error(f"[Job {job.job_id}] Failed to process {file_info['path']}: {e}") job.failed_files += 1 job.processed_files += 1 job.progress = int((job.processed_files / job.total_files) * 100) if job.total_files > 0 else 0 # Mark as completed if job.status != "cancelled": job.status = "completed" job.completed_at = datetime.utcnow() logger.info( f"[Job {job.job_id}] Completed: {job.successful_files} successful, " f"{job.failed_files} failed out of {job.total_files} total" ) except Exception as e: logger.error(f"[Job {job.job_id}] Job failed with error: {e}") job.status = "failed" job.error = str(e) job.completed_at = datetime.utcnow() def _list_all_files_recursive(self, client, job, site_id, path=""): """Recursively list all files in a site.""" files_to_process = [] try: items = client.list_files(site_id, path) for item in items: if job.status == "cancelled": break # If it's a folder, recurse if 'folder' in item: folder_name = item['name'] new_path = f"{path}/{folder_name}" if path else folder_name files_to_process.extend( self._list_all_files_recursive(client, job, site_id, new_path) ) else: # It's a file file_path = f"{path}/{item['name']}" if path else item['name'] files_to_process.append({ 'name': item['name'], 'path': file_path, 'size': item.get('size', 0) }) except Exception as e: logger.error(f"[Job {job.job_id}] Error listing files in {path}: {e}") return files_to_process def _process_file(self, job, client, vector_store, document_parser, file_info, tags=None): """Process a single file: download, parse, embed, store.""" if not vector_store: logger.warning(f"[Job {job.job_id}] Vector store not available, skipping {file_info['name']}") return if tags is None: tags = [] filename = file_info['name'] file_path = file_info['path'] job.current_file = filename logger.info(f"[Job {job.job_id}] Processing {filename}...") # Check if file can be parsed if not document_parser.can_parse(filename): logger.info(f"[Job {job.job_id}] Skipping unsupported file type: {filename}") return # Download file binary_content = client.read_file(job.site_id, file_path, as_text=False) # Parse content try: content = document_parser.parse(binary_content, filename) except Exception as parse_err: logger.warning(f"[Job {job.job_id}] Failed to parse {filename}: {parse_err}") # Try fallback to text try: content = binary_content.decode('utf-8', errors='ignore') except: logger.error(f"[Job {job.job_id}] Could not decode {filename}, skipping") return # Skip error messages if content.startswith("[") and content.endswith("]"): logger.info(f"[Job {job.job_id}] Skipping error content for {filename}") return # Skip very small files (likely empty) if len(content.strip()) < 50: logger.info(f"[Job {job.job_id}] Skipping small file {filename} ({len(content)} chars)") return # Add to vector store with specified tags try: document_id = vector_store.add_document( user_id=job.user_id, site_id=job.site_id, file_path=file_path, filename=filename, content=content, tags=tags, # Apply tags from indexing request chunk_size=1000, chunk_overlap=200 ) logger.info(f"[Job {job.job_id}] Indexed {filename} with document_id {document_id} and tags {tags}") except Exception as e: logger.error(f"[Job {job.job_id}] Failed to add {filename} to vector store: {e}") raise # Global indexer instance _indexer = None def get_indexer() -> BackgroundIndexer: """Get the global indexer instance.""" global _indexer if _indexer is None: _indexer = BackgroundIndexer() return _indexer