Initial commit: SharePoint connector and ToothFairyAI integration
Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
Some checks failed
CI - SharePoint Plugin with SonarQube / Test and SonarQube Analysis (push) Has been cancelled
This commit is contained in:
155
storage/credentials_storage.py
Normal file
155
storage/credentials_storage.py
Normal file
@@ -0,0 +1,155 @@
|
||||
"""
|
||||
Secure Credentials Storage
|
||||
|
||||
Encrypts and stores SharePoint OAuth credentials to disk for persistence across restarts.
|
||||
Uses Fernet symmetric encryption to protect sensitive data.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, Optional
|
||||
from cryptography.fernet import Fernet
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CredentialsStorage:
|
||||
"""
|
||||
Secure storage for SharePoint OAuth credentials.
|
||||
|
||||
Stores credentials encrypted on disk at ~/.sharepoint_credentials/credentials.enc
|
||||
Each user's credentials are stored separately and encrypted.
|
||||
"""
|
||||
|
||||
def __init__(self, storage_dir: str = None):
|
||||
"""
|
||||
Initialize credentials storage.
|
||||
|
||||
Args:
|
||||
storage_dir: Directory to store credentials (default: ~/.sharepoint_credentials)
|
||||
"""
|
||||
if storage_dir is None:
|
||||
storage_dir = os.path.expanduser("~/.sharepoint_credentials")
|
||||
|
||||
self.storage_dir = Path(storage_dir)
|
||||
self.storage_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.credentials_file = self.storage_dir / "credentials.enc"
|
||||
self.key_file = self.storage_dir / "key.key"
|
||||
|
||||
# Initialize encryption key
|
||||
self.cipher = self._get_or_create_cipher()
|
||||
|
||||
# Load existing credentials
|
||||
self.credentials: Dict[str, Dict] = self._load_credentials()
|
||||
|
||||
def _get_or_create_cipher(self) -> Fernet:
|
||||
"""Get or create encryption key."""
|
||||
if self.key_file.exists():
|
||||
with open(self.key_file, 'rb') as f:
|
||||
key = f.read()
|
||||
else:
|
||||
key = Fernet.generate_key()
|
||||
with open(self.key_file, 'wb') as f:
|
||||
f.write(key)
|
||||
# Set restrictive permissions on key file
|
||||
os.chmod(self.key_file, 0o600)
|
||||
|
||||
return Fernet(key)
|
||||
|
||||
def _load_credentials(self) -> Dict[str, Dict]:
|
||||
"""Load credentials from disk."""
|
||||
if not self.credentials_file.exists():
|
||||
return {}
|
||||
|
||||
try:
|
||||
with open(self.credentials_file, 'rb') as f:
|
||||
encrypted_data = f.read()
|
||||
|
||||
if not encrypted_data:
|
||||
return {}
|
||||
|
||||
decrypted_data = self.cipher.decrypt(encrypted_data)
|
||||
return json.loads(decrypted_data.decode('utf-8'))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load credentials: {e}")
|
||||
return {}
|
||||
|
||||
def _save_credentials(self):
|
||||
"""Save credentials to disk."""
|
||||
try:
|
||||
data = json.dumps(self.credentials).encode('utf-8')
|
||||
encrypted_data = self.cipher.encrypt(data)
|
||||
|
||||
with open(self.credentials_file, 'wb') as f:
|
||||
f.write(encrypted_data)
|
||||
|
||||
# Set restrictive permissions on credentials file
|
||||
os.chmod(self.credentials_file, 0o600)
|
||||
|
||||
logger.info("Credentials saved to disk")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save credentials: {e}")
|
||||
raise
|
||||
|
||||
def save_config(self, user_id: str, config: Dict) -> None:
|
||||
"""
|
||||
Save user configuration.
|
||||
|
||||
Args:
|
||||
user_id: User identifier
|
||||
config: Configuration dictionary containing:
|
||||
- tenant_id: Azure tenant ID
|
||||
- client_id: Azure client ID
|
||||
- client_secret: Azure client secret
|
||||
"""
|
||||
self.credentials[user_id] = config
|
||||
self._save_credentials()
|
||||
logger.info(f"Saved credentials for user {user_id}")
|
||||
|
||||
def get_config(self, user_id: str) -> Optional[Dict]:
|
||||
"""
|
||||
Get user configuration.
|
||||
|
||||
Args:
|
||||
user_id: User identifier
|
||||
|
||||
Returns:
|
||||
Configuration dictionary or None if not found
|
||||
"""
|
||||
return self.credentials.get(user_id)
|
||||
|
||||
def delete_config(self, user_id: str) -> bool:
|
||||
"""
|
||||
Delete user configuration.
|
||||
|
||||
Args:
|
||||
user_id: User identifier
|
||||
|
||||
Returns:
|
||||
True if deleted, False if not found
|
||||
"""
|
||||
if user_id in self.credentials:
|
||||
del self.credentials[user_id]
|
||||
self._save_credentials()
|
||||
logger.info(f"Deleted credentials for user {user_id}")
|
||||
return True
|
||||
return False
|
||||
|
||||
def list_users(self) -> list:
|
||||
"""Get list of users with stored credentials."""
|
||||
return list(self.credentials.keys())
|
||||
|
||||
|
||||
# Global credentials storage instance
|
||||
_credentials_storage = None
|
||||
|
||||
|
||||
def get_credentials_storage() -> CredentialsStorage:
|
||||
"""Get the global credentials storage instance."""
|
||||
global _credentials_storage
|
||||
if _credentials_storage is None:
|
||||
_credentials_storage = CredentialsStorage()
|
||||
return _credentials_storage
|
||||
Reference in New Issue
Block a user