tf_sharepoint_integration/storage/credentials_storage.py
Daniel Grozdanovic bcd0f8a227
Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
Initial commit: SharePoint connector and ToothFairyAI integration
2026-02-22 17:58:45 +02:00

156 lines
4.6 KiB
Python

"""
Secure Credentials Storage
Encrypts and stores SharePoint OAuth credentials to disk for persistence across restarts.
Uses Fernet symmetric encryption to protect sensitive data.
"""
import json
import os
from typing import Dict, Optional
from cryptography.fernet import Fernet
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class CredentialsStorage:
"""
Secure storage for SharePoint OAuth credentials.
Stores credentials encrypted on disk at ~/.sharepoint_credentials/credentials.enc
Each user's credentials are stored separately and encrypted.
"""
def __init__(self, storage_dir: str = None):
"""
Initialize credentials storage.
Args:
storage_dir: Directory to store credentials (default: ~/.sharepoint_credentials)
"""
if storage_dir is None:
storage_dir = os.path.expanduser("~/.sharepoint_credentials")
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.credentials_file = self.storage_dir / "credentials.enc"
self.key_file = self.storage_dir / "key.key"
# Initialize encryption key
self.cipher = self._get_or_create_cipher()
# Load existing credentials
self.credentials: Dict[str, Dict] = self._load_credentials()
def _get_or_create_cipher(self) -> Fernet:
"""Get or create encryption key."""
if self.key_file.exists():
with open(self.key_file, 'rb') as f:
key = f.read()
else:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
# Set restrictive permissions on key file
os.chmod(self.key_file, 0o600)
return Fernet(key)
def _load_credentials(self) -> Dict[str, Dict]:
"""Load credentials from disk."""
if not self.credentials_file.exists():
return {}
try:
with open(self.credentials_file, 'rb') as f:
encrypted_data = f.read()
if not encrypted_data:
return {}
decrypted_data = self.cipher.decrypt(encrypted_data)
return json.loads(decrypted_data.decode('utf-8'))
except Exception as e:
logger.error(f"Failed to load credentials: {e}")
return {}
def _save_credentials(self):
"""Save credentials to disk."""
try:
data = json.dumps(self.credentials).encode('utf-8')
encrypted_data = self.cipher.encrypt(data)
with open(self.credentials_file, 'wb') as f:
f.write(encrypted_data)
# Set restrictive permissions on credentials file
os.chmod(self.credentials_file, 0o600)
logger.info("Credentials saved to disk")
except Exception as e:
logger.error(f"Failed to save credentials: {e}")
raise
def save_config(self, user_id: str, config: Dict) -> None:
"""
Save user configuration.
Args:
user_id: User identifier
config: Configuration dictionary containing:
- tenant_id: Azure tenant ID
- client_id: Azure client ID
- client_secret: Azure client secret
"""
self.credentials[user_id] = config
self._save_credentials()
logger.info(f"Saved credentials for user {user_id}")
def get_config(self, user_id: str) -> Optional[Dict]:
"""
Get user configuration.
Args:
user_id: User identifier
Returns:
Configuration dictionary or None if not found
"""
return self.credentials.get(user_id)
def delete_config(self, user_id: str) -> bool:
"""
Delete user configuration.
Args:
user_id: User identifier
Returns:
True if deleted, False if not found
"""
if user_id in self.credentials:
del self.credentials[user_id]
self._save_credentials()
logger.info(f"Deleted credentials for user {user_id}")
return True
return False
def list_users(self) -> list:
"""Get list of users with stored credentials."""
return list(self.credentials.keys())
# Global credentials storage instance
_credentials_storage = None
def get_credentials_storage() -> CredentialsStorage:
"""Get the global credentials storage instance."""
global _credentials_storage
if _credentials_storage is None:
_credentials_storage = CredentialsStorage()
return _credentials_storage