Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
246 lines
8.2 KiB
Python
246 lines
8.2 KiB
Python
"""
|
|
Document Parser - Extract text from various file formats
|
|
|
|
Supports: PDF, Word (.docx), Excel (.xlsx), PowerPoint (.pptx),
|
|
CSV, text files, and more
|
|
"""
|
|
|
|
import io
|
|
import mimetypes
|
|
from typing import Optional
|
|
|
|
|
|
class DocumentParser:
|
|
"""Parse different document types and extract text content for LLM processing."""
|
|
|
|
def __init__(self):
|
|
"""Initialize document parser."""
|
|
self.supported_extensions = {
|
|
# Text formats
|
|
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.css', '.js',
|
|
'.py', '.java', '.cpp', '.c', '.h', '.sh', '.yaml', '.yml',
|
|
# Microsoft Office
|
|
'.docx', '.xlsx', '.pptx',
|
|
# PDF
|
|
'.pdf',
|
|
# Other
|
|
'.rtf', '.log'
|
|
}
|
|
|
|
def can_parse(self, filename: str) -> bool:
|
|
"""Check if file can be parsed."""
|
|
ext = self._get_extension(filename)
|
|
return ext in self.supported_extensions
|
|
|
|
def parse(self, content: bytes, filename: str) -> str:
|
|
"""
|
|
Parse document and extract text content.
|
|
|
|
Args:
|
|
content: File content as bytes
|
|
filename: Original filename (used to determine file type)
|
|
|
|
Returns:
|
|
Extracted text content
|
|
|
|
Raises:
|
|
ValueError: If file type is not supported
|
|
"""
|
|
ext = self._get_extension(filename)
|
|
|
|
if ext not in self.supported_extensions:
|
|
raise ValueError(f"Unsupported file type: {ext}")
|
|
|
|
# Text files - direct decode
|
|
if ext in {'.txt', '.md', '.csv', '.json', '.xml', '.html', '.css',
|
|
'.js', '.py', '.java', '.cpp', '.c', '.h', '.sh', '.yaml',
|
|
'.yml', '.log', '.rtf'}:
|
|
return self._parse_text(content)
|
|
|
|
# PDF
|
|
elif ext == '.pdf':
|
|
return self._parse_pdf(content)
|
|
|
|
# Microsoft Word
|
|
elif ext == '.docx':
|
|
return self._parse_docx(content)
|
|
|
|
# Microsoft Excel
|
|
elif ext == '.xlsx':
|
|
return self._parse_xlsx(content)
|
|
|
|
# Microsoft PowerPoint
|
|
elif ext == '.pptx':
|
|
return self._parse_pptx(content)
|
|
|
|
else:
|
|
raise ValueError(f"Parser not implemented for: {ext}")
|
|
|
|
def _get_extension(self, filename: str) -> str:
|
|
"""Get file extension in lowercase."""
|
|
return '.' + filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
|
|
|
|
def _parse_text(self, content: bytes) -> str:
|
|
"""Parse plain text files."""
|
|
# Try multiple encodings
|
|
for encoding in ['utf-8', 'latin-1', 'cp1252', 'ascii']:
|
|
try:
|
|
return content.decode(encoding)
|
|
except (UnicodeDecodeError, AttributeError):
|
|
continue
|
|
|
|
# Fallback: decode with errors='ignore'
|
|
return content.decode('utf-8', errors='ignore')
|
|
|
|
def _parse_pdf(self, content: bytes) -> str:
|
|
"""Parse PDF files."""
|
|
try:
|
|
import PyPDF2
|
|
|
|
pdf_file = io.BytesIO(content)
|
|
reader = PyPDF2.PdfReader(pdf_file)
|
|
|
|
text_parts = []
|
|
for page_num, page in enumerate(reader.pages):
|
|
text = page.extract_text()
|
|
if text.strip():
|
|
text_parts.append(f"--- Page {page_num + 1} ---\n{text}")
|
|
|
|
return '\n\n'.join(text_parts) if text_parts else "(empty PDF)"
|
|
|
|
except ImportError:
|
|
return "[PDF parsing requires PyPDF2: pip install PyPDF2]"
|
|
except Exception as e:
|
|
return f"[Error parsing PDF: {str(e)}]"
|
|
|
|
def _parse_docx(self, content: bytes) -> str:
|
|
"""Parse Word documents (.docx)."""
|
|
try:
|
|
import docx
|
|
|
|
doc_file = io.BytesIO(content)
|
|
doc = docx.Document(doc_file)
|
|
|
|
text_parts = []
|
|
|
|
# Extract paragraphs
|
|
for para in doc.paragraphs:
|
|
if para.text.strip():
|
|
text_parts.append(para.text)
|
|
|
|
# Extract tables
|
|
for table in doc.tables:
|
|
table_text = []
|
|
for row in table.rows:
|
|
row_text = [cell.text.strip() for cell in row.cells]
|
|
table_text.append(' | '.join(row_text))
|
|
if table_text:
|
|
text_parts.append('\n' + '\n'.join(table_text))
|
|
|
|
return '\n\n'.join(text_parts) if text_parts else "(empty document)"
|
|
|
|
except ImportError:
|
|
return "[Word parsing requires python-docx: pip install python-docx]"
|
|
except Exception as e:
|
|
return f"[Error parsing Word document: {str(e)}]"
|
|
|
|
def _parse_xlsx(self, content: bytes) -> str:
|
|
"""Parse Excel spreadsheets (.xlsx)."""
|
|
try:
|
|
import openpyxl
|
|
|
|
excel_file = io.BytesIO(content)
|
|
workbook = openpyxl.load_workbook(excel_file, data_only=True)
|
|
|
|
text_parts = []
|
|
|
|
for sheet_name in workbook.sheetnames:
|
|
sheet = workbook[sheet_name]
|
|
|
|
sheet_text = [f"=== Sheet: {sheet_name} ==="]
|
|
|
|
# Get data rows
|
|
for row in sheet.iter_rows(values_only=True):
|
|
# Skip empty rows
|
|
if any(cell is not None for cell in row):
|
|
row_text = ' | '.join(str(cell) if cell is not None else '' for cell in row)
|
|
sheet_text.append(row_text)
|
|
|
|
if len(sheet_text) > 1: # Has content beyond header
|
|
text_parts.append('\n'.join(sheet_text))
|
|
|
|
return '\n\n'.join(text_parts) if text_parts else "(empty spreadsheet)"
|
|
|
|
except ImportError:
|
|
return "[Excel parsing requires openpyxl: pip install openpyxl]"
|
|
except Exception as e:
|
|
return f"[Error parsing Excel file: {str(e)}]"
|
|
|
|
def _parse_pptx(self, content: bytes) -> str:
|
|
"""Parse PowerPoint presentations (.pptx)."""
|
|
try:
|
|
from pptx import Presentation
|
|
|
|
ppt_file = io.BytesIO(content)
|
|
prs = Presentation(ppt_file)
|
|
|
|
text_parts = []
|
|
|
|
for slide_num, slide in enumerate(prs.slides, start=1):
|
|
slide_text = [f"=== Slide {slide_num} ==="]
|
|
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
slide_text.append(shape.text)
|
|
|
|
if len(slide_text) > 1: # Has content beyond header
|
|
text_parts.append('\n'.join(slide_text))
|
|
|
|
return '\n\n'.join(text_parts) if text_parts else "(empty presentation)"
|
|
|
|
except ImportError:
|
|
return "[PowerPoint parsing requires python-pptx: pip install python-pptx]"
|
|
except Exception as e:
|
|
return f"[Error parsing PowerPoint file: {str(e)}]"
|
|
|
|
|
|
def get_file_info(filename: str, file_size: int) -> dict:
|
|
"""Get human-readable file information."""
|
|
ext = '.' + filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
|
|
|
|
# File type categories
|
|
type_categories = {
|
|
'document': {'.pdf', '.docx', '.doc', '.txt', '.rtf', '.odt'},
|
|
'spreadsheet': {'.xlsx', '.xls', '.csv', '.ods'},
|
|
'presentation': {'.pptx', '.ppt', '.odp'},
|
|
'image': {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'},
|
|
'video': {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv', '.webm'},
|
|
'audio': {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma'},
|
|
'archive': {'.zip', '.rar', '.7z', '.tar', '.gz'},
|
|
'code': {'.py', '.js', '.java', '.cpp', '.c', '.h', '.cs', '.php', '.rb'},
|
|
'web': {'.html', '.css', '.xml', '.json', '.yaml', '.yml'},
|
|
}
|
|
|
|
category = 'file'
|
|
for cat, extensions in type_categories.items():
|
|
if ext in extensions:
|
|
category = cat
|
|
break
|
|
|
|
# Format file size
|
|
if file_size < 1024:
|
|
size_str = f"{file_size} B"
|
|
elif file_size < 1024 * 1024:
|
|
size_str = f"{file_size / 1024:.1f} KB"
|
|
elif file_size < 1024 * 1024 * 1024:
|
|
size_str = f"{file_size / (1024 * 1024):.1f} MB"
|
|
else:
|
|
size_str = f"{file_size / (1024 * 1024 * 1024):.1f} GB"
|
|
|
|
return {
|
|
'extension': ext,
|
|
'category': category,
|
|
'size_formatted': size_str,
|
|
'size_bytes': file_size
|
|
}
|