""" Secure Credentials Storage Encrypts and stores SharePoint OAuth credentials to disk for persistence across restarts. Uses Fernet symmetric encryption to protect sensitive data. """ import json import os from typing import Dict, Optional from cryptography.fernet import Fernet from pathlib import Path import logging logger = logging.getLogger(__name__) class CredentialsStorage: """ Secure storage for SharePoint OAuth credentials. Stores credentials encrypted on disk at ~/.sharepoint_credentials/credentials.enc Each user's credentials are stored separately and encrypted. """ def __init__(self, storage_dir: str = None): """ Initialize credentials storage. Args: storage_dir: Directory to store credentials (default: ~/.sharepoint_credentials) """ if storage_dir is None: storage_dir = os.path.expanduser("~/.sharepoint_credentials") self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) self.credentials_file = self.storage_dir / "credentials.enc" self.key_file = self.storage_dir / "key.key" # Initialize encryption key self.cipher = self._get_or_create_cipher() # Load existing credentials self.credentials: Dict[str, Dict] = self._load_credentials() def _get_or_create_cipher(self) -> Fernet: """Get or create encryption key.""" if self.key_file.exists(): with open(self.key_file, 'rb') as f: key = f.read() else: key = Fernet.generate_key() with open(self.key_file, 'wb') as f: f.write(key) # Set restrictive permissions on key file os.chmod(self.key_file, 0o600) return Fernet(key) def _load_credentials(self) -> Dict[str, Dict]: """Load credentials from disk.""" if not self.credentials_file.exists(): return {} try: with open(self.credentials_file, 'rb') as f: encrypted_data = f.read() if not encrypted_data: return {} decrypted_data = self.cipher.decrypt(encrypted_data) return json.loads(decrypted_data.decode('utf-8')) except Exception as e: logger.error(f"Failed to load credentials: {e}") return {} def _save_credentials(self): """Save credentials to disk.""" try: data = json.dumps(self.credentials).encode('utf-8') encrypted_data = self.cipher.encrypt(data) with open(self.credentials_file, 'wb') as f: f.write(encrypted_data) # Set restrictive permissions on credentials file os.chmod(self.credentials_file, 0o600) logger.info("Credentials saved to disk") except Exception as e: logger.error(f"Failed to save credentials: {e}") raise def save_config(self, user_id: str, config: Dict) -> None: """ Save user configuration. Args: user_id: User identifier config: Configuration dictionary containing: - tenant_id: Azure tenant ID - client_id: Azure client ID - client_secret: Azure client secret """ self.credentials[user_id] = config self._save_credentials() logger.info(f"Saved credentials for user {user_id}") def get_config(self, user_id: str) -> Optional[Dict]: """ Get user configuration. Args: user_id: User identifier Returns: Configuration dictionary or None if not found """ return self.credentials.get(user_id) def delete_config(self, user_id: str) -> bool: """ Delete user configuration. Args: user_id: User identifier Returns: True if deleted, False if not found """ if user_id in self.credentials: del self.credentials[user_id] self._save_credentials() logger.info(f"Deleted credentials for user {user_id}") return True return False def list_users(self) -> list: """Get list of users with stored credentials.""" return list(self.credentials.keys()) # Global credentials storage instance _credentials_storage = None def get_credentials_storage() -> CredentialsStorage: """Get the global credentials storage instance.""" global _credentials_storage if _credentials_storage is None: _credentials_storage = CredentialsStorage() return _credentials_storage