Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
156 lines
4.6 KiB
Python
156 lines
4.6 KiB
Python
"""
|
|
Secure Credentials Storage
|
|
|
|
Encrypts and stores SharePoint OAuth credentials to disk for persistence across restarts.
|
|
Uses Fernet symmetric encryption to protect sensitive data.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from typing import Dict, Optional
|
|
from cryptography.fernet import Fernet
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CredentialsStorage:
|
|
"""
|
|
Secure storage for SharePoint OAuth credentials.
|
|
|
|
Stores credentials encrypted on disk at ~/.sharepoint_credentials/credentials.enc
|
|
Each user's credentials are stored separately and encrypted.
|
|
"""
|
|
|
|
def __init__(self, storage_dir: str = None):
|
|
"""
|
|
Initialize credentials storage.
|
|
|
|
Args:
|
|
storage_dir: Directory to store credentials (default: ~/.sharepoint_credentials)
|
|
"""
|
|
if storage_dir is None:
|
|
storage_dir = os.path.expanduser("~/.sharepoint_credentials")
|
|
|
|
self.storage_dir = Path(storage_dir)
|
|
self.storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.credentials_file = self.storage_dir / "credentials.enc"
|
|
self.key_file = self.storage_dir / "key.key"
|
|
|
|
# Initialize encryption key
|
|
self.cipher = self._get_or_create_cipher()
|
|
|
|
# Load existing credentials
|
|
self.credentials: Dict[str, Dict] = self._load_credentials()
|
|
|
|
def _get_or_create_cipher(self) -> Fernet:
|
|
"""Get or create encryption key."""
|
|
if self.key_file.exists():
|
|
with open(self.key_file, 'rb') as f:
|
|
key = f.read()
|
|
else:
|
|
key = Fernet.generate_key()
|
|
with open(self.key_file, 'wb') as f:
|
|
f.write(key)
|
|
# Set restrictive permissions on key file
|
|
os.chmod(self.key_file, 0o600)
|
|
|
|
return Fernet(key)
|
|
|
|
def _load_credentials(self) -> Dict[str, Dict]:
|
|
"""Load credentials from disk."""
|
|
if not self.credentials_file.exists():
|
|
return {}
|
|
|
|
try:
|
|
with open(self.credentials_file, 'rb') as f:
|
|
encrypted_data = f.read()
|
|
|
|
if not encrypted_data:
|
|
return {}
|
|
|
|
decrypted_data = self.cipher.decrypt(encrypted_data)
|
|
return json.loads(decrypted_data.decode('utf-8'))
|
|
except Exception as e:
|
|
logger.error(f"Failed to load credentials: {e}")
|
|
return {}
|
|
|
|
def _save_credentials(self):
|
|
"""Save credentials to disk."""
|
|
try:
|
|
data = json.dumps(self.credentials).encode('utf-8')
|
|
encrypted_data = self.cipher.encrypt(data)
|
|
|
|
with open(self.credentials_file, 'wb') as f:
|
|
f.write(encrypted_data)
|
|
|
|
# Set restrictive permissions on credentials file
|
|
os.chmod(self.credentials_file, 0o600)
|
|
|
|
logger.info("Credentials saved to disk")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save credentials: {e}")
|
|
raise
|
|
|
|
def save_config(self, user_id: str, config: Dict) -> None:
|
|
"""
|
|
Save user configuration.
|
|
|
|
Args:
|
|
user_id: User identifier
|
|
config: Configuration dictionary containing:
|
|
- tenant_id: Azure tenant ID
|
|
- client_id: Azure client ID
|
|
- client_secret: Azure client secret
|
|
"""
|
|
self.credentials[user_id] = config
|
|
self._save_credentials()
|
|
logger.info(f"Saved credentials for user {user_id}")
|
|
|
|
def get_config(self, user_id: str) -> Optional[Dict]:
|
|
"""
|
|
Get user configuration.
|
|
|
|
Args:
|
|
user_id: User identifier
|
|
|
|
Returns:
|
|
Configuration dictionary or None if not found
|
|
"""
|
|
return self.credentials.get(user_id)
|
|
|
|
def delete_config(self, user_id: str) -> bool:
|
|
"""
|
|
Delete user configuration.
|
|
|
|
Args:
|
|
user_id: User identifier
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
if user_id in self.credentials:
|
|
del self.credentials[user_id]
|
|
self._save_credentials()
|
|
logger.info(f"Deleted credentials for user {user_id}")
|
|
return True
|
|
return False
|
|
|
|
def list_users(self) -> list:
|
|
"""Get list of users with stored credentials."""
|
|
return list(self.credentials.keys())
|
|
|
|
|
|
# Global credentials storage instance
|
|
_credentials_storage = None
|
|
|
|
|
|
def get_credentials_storage() -> CredentialsStorage:
|
|
"""Get the global credentials storage instance."""
|
|
global _credentials_storage
|
|
if _credentials_storage is None:
|
|
_credentials_storage = CredentialsStorage()
|
|
return _credentials_storage
|