Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
309 lines
10 KiB
Python
309 lines
10 KiB
Python
"""
|
|
Background Document Indexer
|
|
|
|
Indexes SharePoint files in the background without blocking the UI.
|
|
Uses threading for simple deployment (no Celery/Redis required).
|
|
"""
|
|
|
|
import threading
|
|
import time
|
|
from typing import Dict, List, Optional
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IndexingJob:
|
|
"""Represents a background indexing job."""
|
|
|
|
def __init__(self, job_id: str, site_id: str, site_name: str, connection_id: str, user_id: str):
|
|
self.job_id = job_id
|
|
self.site_id = site_id
|
|
self.site_name = site_name
|
|
self.connection_id = connection_id
|
|
self.user_id = user_id
|
|
self.status = "pending" # pending, running, completed, failed
|
|
self.progress = 0 # 0-100
|
|
self.total_files = 0
|
|
self.processed_files = 0
|
|
self.successful_files = 0
|
|
self.failed_files = 0
|
|
self.started_at = None
|
|
self.completed_at = None
|
|
self.error = None
|
|
self.current_file = None
|
|
|
|
|
|
class BackgroundIndexer:
|
|
"""
|
|
Background indexer for SharePoint documents.
|
|
|
|
Runs indexing jobs in separate threads to avoid blocking the main application.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.jobs: Dict[str, IndexingJob] = {}
|
|
self.active_threads: Dict[str, threading.Thread] = {}
|
|
self.lock = threading.Lock()
|
|
|
|
def start_indexing(
|
|
self,
|
|
job_id: str,
|
|
site_id: str,
|
|
site_name: str,
|
|
connection_id: str,
|
|
user_id: str,
|
|
connector,
|
|
vector_store,
|
|
document_parser,
|
|
path: str = "",
|
|
tags: List[str] = None
|
|
) -> IndexingJob:
|
|
"""
|
|
Start a background indexing job for a SharePoint site.
|
|
|
|
Args:
|
|
job_id: Unique job identifier
|
|
site_id: SharePoint site ID
|
|
site_name: Site display name
|
|
connection_id: SharePoint connection ID
|
|
user_id: User ID
|
|
connector: SharePoint connector instance
|
|
vector_store: Vector store instance
|
|
document_parser: Document parser instance
|
|
path: Optional path to start indexing from
|
|
|
|
Returns:
|
|
IndexingJob instance
|
|
"""
|
|
with self.lock:
|
|
# Create job
|
|
job = IndexingJob(job_id, site_id, site_name, connection_id, user_id)
|
|
self.jobs[job_id] = job
|
|
|
|
# Start thread
|
|
thread = threading.Thread(
|
|
target=self._index_site,
|
|
args=(job, connector, vector_store, document_parser, path, tags or []),
|
|
daemon=True
|
|
)
|
|
self.active_threads[job_id] = thread
|
|
thread.start()
|
|
|
|
logger.info(f"Started indexing job {job_id} for site {site_name}")
|
|
|
|
return job
|
|
|
|
def get_job_status(self, job_id: str) -> Optional[Dict]:
|
|
"""Get the status of an indexing job."""
|
|
with self.lock:
|
|
job = self.jobs.get(job_id)
|
|
if not job:
|
|
return None
|
|
|
|
return {
|
|
"job_id": job.job_id,
|
|
"site_id": job.site_id,
|
|
"site_name": job.site_name,
|
|
"status": job.status,
|
|
"progress": job.progress,
|
|
"total_files": job.total_files,
|
|
"processed_files": job.processed_files,
|
|
"successful_files": job.successful_files,
|
|
"failed_files": job.failed_files,
|
|
"current_file": job.current_file,
|
|
"started_at": job.started_at.isoformat() if job.started_at else None,
|
|
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
|
"error": job.error
|
|
}
|
|
|
|
def cancel_job(self, job_id: str) -> bool:
|
|
"""Cancel a running indexing job."""
|
|
with self.lock:
|
|
job = self.jobs.get(job_id)
|
|
if job and job.status == "running":
|
|
job.status = "cancelled"
|
|
job.completed_at = datetime.utcnow()
|
|
logger.info(f"Cancelled indexing job {job_id}")
|
|
return True
|
|
return False
|
|
|
|
def _index_site(self, job, connector, vector_store, document_parser, path="", tags=None):
|
|
"""
|
|
Index all files in a SharePoint site (runs in background thread).
|
|
|
|
This method:
|
|
1. Recursively lists all files in the site
|
|
2. Downloads and parses each file
|
|
3. Generates embeddings
|
|
4. Stores in vector store with specified tags
|
|
"""
|
|
from saas_connector_dynamodb import SecureSharePointClient
|
|
|
|
if tags is None:
|
|
tags = []
|
|
|
|
try:
|
|
job.status = "running"
|
|
job.started_at = datetime.utcnow()
|
|
|
|
# Create SharePoint client
|
|
client = SecureSharePointClient(connector, job.connection_id, job.user_id)
|
|
|
|
# First, count total files
|
|
logger.info(f"[Job {job.job_id}] Counting files in {job.site_name}...")
|
|
all_files = self._list_all_files_recursive(client, job, job.site_id, path)
|
|
|
|
if job.status == "cancelled":
|
|
return
|
|
|
|
job.total_files = len(all_files)
|
|
logger.info(f"[Job {job.job_id}] Found {job.total_files} files to index")
|
|
|
|
# Process each file
|
|
for file_info in all_files:
|
|
if job.status == "cancelled":
|
|
logger.info(f"[Job {job.job_id}] Job cancelled by user")
|
|
break
|
|
|
|
try:
|
|
self._process_file(
|
|
job,
|
|
client,
|
|
vector_store,
|
|
document_parser,
|
|
file_info,
|
|
tags
|
|
)
|
|
job.successful_files += 1
|
|
except Exception as e:
|
|
logger.error(f"[Job {job.job_id}] Failed to process {file_info['path']}: {e}")
|
|
job.failed_files += 1
|
|
|
|
job.processed_files += 1
|
|
job.progress = int((job.processed_files / job.total_files) * 100) if job.total_files > 0 else 0
|
|
|
|
# Mark as completed
|
|
if job.status != "cancelled":
|
|
job.status = "completed"
|
|
job.completed_at = datetime.utcnow()
|
|
logger.info(
|
|
f"[Job {job.job_id}] Completed: {job.successful_files} successful, "
|
|
f"{job.failed_files} failed out of {job.total_files} total"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Job {job.job_id}] Job failed with error: {e}")
|
|
job.status = "failed"
|
|
job.error = str(e)
|
|
job.completed_at = datetime.utcnow()
|
|
|
|
def _list_all_files_recursive(self, client, job, site_id, path=""):
|
|
"""Recursively list all files in a site."""
|
|
files_to_process = []
|
|
|
|
try:
|
|
items = client.list_files(site_id, path)
|
|
|
|
for item in items:
|
|
if job.status == "cancelled":
|
|
break
|
|
|
|
# If it's a folder, recurse
|
|
if 'folder' in item:
|
|
folder_name = item['name']
|
|
new_path = f"{path}/{folder_name}" if path else folder_name
|
|
files_to_process.extend(
|
|
self._list_all_files_recursive(client, job, site_id, new_path)
|
|
)
|
|
else:
|
|
# It's a file
|
|
file_path = f"{path}/{item['name']}" if path else item['name']
|
|
files_to_process.append({
|
|
'name': item['name'],
|
|
'path': file_path,
|
|
'size': item.get('size', 0)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Job {job.job_id}] Error listing files in {path}: {e}")
|
|
|
|
return files_to_process
|
|
|
|
def _process_file(self, job, client, vector_store, document_parser, file_info, tags=None):
|
|
"""Process a single file: download, parse, embed, store."""
|
|
if not vector_store:
|
|
logger.warning(f"[Job {job.job_id}] Vector store not available, skipping {file_info['name']}")
|
|
return
|
|
|
|
if tags is None:
|
|
tags = []
|
|
|
|
filename = file_info['name']
|
|
file_path = file_info['path']
|
|
|
|
job.current_file = filename
|
|
logger.info(f"[Job {job.job_id}] Processing {filename}...")
|
|
|
|
# Check if file can be parsed
|
|
if not document_parser.can_parse(filename):
|
|
logger.info(f"[Job {job.job_id}] Skipping unsupported file type: {filename}")
|
|
return
|
|
|
|
# Download file
|
|
binary_content = client.read_file(job.site_id, file_path, as_text=False)
|
|
|
|
# Parse content
|
|
try:
|
|
content = document_parser.parse(binary_content, filename)
|
|
except Exception as parse_err:
|
|
logger.warning(f"[Job {job.job_id}] Failed to parse {filename}: {parse_err}")
|
|
# Try fallback to text
|
|
try:
|
|
content = binary_content.decode('utf-8', errors='ignore')
|
|
except:
|
|
logger.error(f"[Job {job.job_id}] Could not decode {filename}, skipping")
|
|
return
|
|
|
|
# Skip error messages
|
|
if content.startswith("[") and content.endswith("]"):
|
|
logger.info(f"[Job {job.job_id}] Skipping error content for {filename}")
|
|
return
|
|
|
|
# Skip very small files (likely empty)
|
|
if len(content.strip()) < 50:
|
|
logger.info(f"[Job {job.job_id}] Skipping small file {filename} ({len(content)} chars)")
|
|
return
|
|
|
|
# Add to vector store with specified tags
|
|
try:
|
|
document_id = vector_store.add_document(
|
|
user_id=job.user_id,
|
|
site_id=job.site_id,
|
|
file_path=file_path,
|
|
filename=filename,
|
|
content=content,
|
|
tags=tags, # Apply tags from indexing request
|
|
chunk_size=1000,
|
|
chunk_overlap=200
|
|
)
|
|
logger.info(f"[Job {job.job_id}] Indexed {filename} with document_id {document_id} and tags {tags}")
|
|
except Exception as e:
|
|
logger.error(f"[Job {job.job_id}] Failed to add {filename} to vector store: {e}")
|
|
raise
|
|
|
|
|
|
# Global indexer instance
|
|
_indexer = None
|
|
|
|
|
|
def get_indexer() -> BackgroundIndexer:
|
|
"""Get the global indexer instance."""
|
|
global _indexer
|
|
if _indexer is None:
|
|
_indexer = BackgroundIndexer()
|
|
return _indexer
|