""" Enterprise SaaS SharePoint Connector - DynamoDB Edition Secure multi-tenant SharePoint connector using AWS DynamoDB for storage. Implements OAuth 2.0 with encrypted token storage, DynamoDB persistence, and enterprise security best practices. """ import os import secrets import hashlib from typing import Optional, Dict, Any, List from datetime import datetime, timedelta from dataclasses import dataclass import json from decimal import Decimal import requests from msal import ConfidentialClientApplication from cryptography.fernet import Fernet import boto3 from boto3.dynamodb.conditions import Key, Attr from botocore.exceptions import ClientError from flask import Flask, request, redirect, session, jsonify, url_for from functools import wraps # ============================================================================ # DYNAMODB HELPER FUNCTIONS # ============================================================================ def decimal_to_float(obj): """Convert DynamoDB Decimal types to float/int for JSON serialization.""" if isinstance(obj, list): return [decimal_to_float(i) for i in obj] elif isinstance(obj, dict): return {k: decimal_to_float(v) for k, v in obj.items()} elif isinstance(obj, Decimal): return int(obj) if obj % 1 == 0 else float(obj) return obj def datetime_to_iso(dt: datetime) -> str: """Convert datetime to ISO 8601 string.""" return dt.isoformat() if dt else None def iso_to_datetime(iso_string: str) -> Optional[datetime]: """Convert ISO 8601 string to datetime.""" try: return datetime.fromisoformat(iso_string) if iso_string else None except (ValueError, AttributeError): return None # ============================================================================ # ENCRYPTION SERVICE # ============================================================================ class TokenEncryption: """ Handles encryption/decryption of OAuth tokens. SECURITY: Never store tokens in plain text! """ def __init__(self, encryption_key: Optional[str] = None): """ Initialize encryption service. Args: encryption_key: Base64-encoded Fernet key. If not provided, uses ENCRYPTION_KEY env var. """ if encryption_key: self.key = encryption_key.encode() else: key_str = os.getenv('ENCRYPTION_KEY') if not key_str: raise ValueError("ENCRYPTION_KEY environment variable must be set") self.key = key_str.encode() self.cipher = Fernet(self.key) @staticmethod def generate_key() -> str: """Generate a new encryption key. Store this securely!""" return Fernet.generate_key().decode() def encrypt(self, plaintext: str) -> str: """Encrypt a string.""" if not plaintext: return "" return self.cipher.encrypt(plaintext.encode()).decode() def decrypt(self, ciphertext: str) -> str: """Decrypt a string.""" if not ciphertext: return "" return self.cipher.decrypt(ciphertext.encode()).decode() # ============================================================================ # DYNAMODB DATA MODELS # ============================================================================ @dataclass class SharePointConnectionInfo: """Connection information for a SharePoint connection.""" id: str user_id: str organization_id: Optional[str] connection_name: Optional[str] tenant_id: str microsoft_user_id: str is_active: bool created_at: datetime last_used_at: Optional[datetime] # ============================================================================ # DYNAMODB CONNECTOR # ============================================================================ class DynamoDBSharePointConnector: """ Enterprise-grade SharePoint connector using DynamoDB for storage. DynamoDB Tables: 1. sharepoint_connections - Stores encrypted OAuth tokens 2. sharepoint_oauth_states - Temporary OAuth state for CSRF protection 3. sharepoint_audit_logs - Audit trail for compliance Features: - Multi-tenant OAuth 2.0 - Encrypted token storage - Automatic token refresh - Audit logging - CSRF protection - Serverless-friendly (no connection pooling) """ def __init__( self, client_id: str, client_secret: str, redirect_uri: str, encryption_key: str, aws_region: str = "us-east-1", dynamodb_endpoint: Optional[str] = None, # For local development table_prefix: str = "", tenant_id: str = "common" ): """ Initialize secure DynamoDB connector. Args: client_id: Azure AD application ID client_secret: Azure AD client secret redirect_uri: OAuth callback URL (must be HTTPS in production!) encryption_key: Fernet encryption key for token storage aws_region: AWS region for DynamoDB dynamodb_endpoint: Custom endpoint (e.g., http://localhost:8000 for local) table_prefix: Prefix for table names (e.g., "prod_" or "dev_") tenant_id: Azure AD tenant or "common" for multi-tenant """ self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.tenant_id = tenant_id self.table_prefix = table_prefix # Required scopes for SharePoint access # Note: offline_access is handled automatically by MSAL and should NOT be included self.scopes = [ "https://graph.microsoft.com/Sites.Read.All", "https://graph.microsoft.com/Files.Read.All", "https://graph.microsoft.com/User.Read" ] # Initialize encryption self.encryption = TokenEncryption(encryption_key) # Initialize DynamoDB if dynamodb_endpoint: # Local development self.dynamodb = boto3.resource('dynamodb', region_name=aws_region, endpoint_url=dynamodb_endpoint) else: # Production self.dynamodb = boto3.resource('dynamodb', region_name=aws_region) # Table names self.connections_table_name = f"{table_prefix}sharepoint_connections" self.oauth_states_table_name = f"{table_prefix}sharepoint_oauth_states" self.audit_logs_table_name = f"{table_prefix}sharepoint_audit_logs" # Initialize tables self._ensure_tables_exist() # Initialize MSAL self.authority = f"https://login.microsoftonline.com/{tenant_id}" self.msal_app = ConfidentialClientApplication( client_id=self.client_id, client_credential=self.client_secret, authority=self.authority ) def _ensure_tables_exist(self): """Create DynamoDB tables if they don't exist.""" try: # Connections table self.connections_table = self.dynamodb.Table(self.connections_table_name) self.connections_table.load() except ClientError: self.connections_table = self._create_connections_table() try: # OAuth states table self.oauth_states_table = self.dynamodb.Table(self.oauth_states_table_name) self.oauth_states_table.load() except ClientError: self.oauth_states_table = self._create_oauth_states_table() try: # Audit logs table self.audit_logs_table = self.dynamodb.Table(self.audit_logs_table_name) self.audit_logs_table.load() except ClientError: self.audit_logs_table = self._create_audit_logs_table() def _create_connections_table(self): """Create SharePoint connections table.""" table = self.dynamodb.create_table( TableName=self.connections_table_name, KeySchema=[ {'AttributeName': 'id', 'KeyType': 'HASH'} # Partition key ], AttributeDefinitions=[ {'AttributeName': 'id', 'AttributeType': 'S'}, {'AttributeName': 'user_id', 'AttributeType': 'S'}, {'AttributeName': 'organization_id', 'AttributeType': 'S'} ], GlobalSecondaryIndexes=[ { 'IndexName': 'user_id-index', 'KeySchema': [ {'AttributeName': 'user_id', 'KeyType': 'HASH'} ], 'Projection': {'ProjectionType': 'ALL'}, 'ProvisionedThroughput': { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } }, { 'IndexName': 'organization_id-index', 'KeySchema': [ {'AttributeName': 'organization_id', 'KeyType': 'HASH'} ], 'Projection': {'ProjectionType': 'ALL'}, 'ProvisionedThroughput': { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } } ], BillingMode='PAY_PER_REQUEST' # On-demand pricing (or use ProvisionedThroughput) ) table.wait_until_exists() return table def _create_oauth_states_table(self): """Create OAuth states table with TTL for automatic cleanup.""" table = self.dynamodb.create_table( TableName=self.oauth_states_table_name, KeySchema=[ {'AttributeName': 'state', 'KeyType': 'HASH'} ], AttributeDefinitions=[ {'AttributeName': 'state', 'AttributeType': 'S'} ], BillingMode='PAY_PER_REQUEST' ) table.wait_until_exists() # Enable TTL for automatic cleanup try: self.dynamodb.meta.client.update_time_to_live( TableName=self.oauth_states_table_name, TimeToLiveSpecification={ 'Enabled': True, 'AttributeName': 'ttl' } ) except ClientError: pass # TTL already enabled or not supported return table def _create_audit_logs_table(self): """Create audit logs table.""" table = self.dynamodb.create_table( TableName=self.audit_logs_table_name, KeySchema=[ {'AttributeName': 'connection_id', 'KeyType': 'HASH'}, {'AttributeName': 'timestamp', 'KeyType': 'RANGE'} # Sort key ], AttributeDefinitions=[ {'AttributeName': 'connection_id', 'AttributeType': 'S'}, {'AttributeName': 'timestamp', 'AttributeType': 'S'}, {'AttributeName': 'user_id', 'AttributeType': 'S'} ], GlobalSecondaryIndexes=[ { 'IndexName': 'user_id-timestamp-index', 'KeySchema': [ {'AttributeName': 'user_id', 'KeyType': 'HASH'}, {'AttributeName': 'timestamp', 'KeyType': 'RANGE'} ], 'Projection': {'ProjectionType': 'ALL'}, 'ProvisionedThroughput': { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } } ], BillingMode='PAY_PER_REQUEST' ) table.wait_until_exists() return table def initiate_connection( self, user_id: str, organization_id: Optional[str] = None, return_url: Optional[str] = None ) -> str: """ Initiate OAuth flow for a user to connect their SharePoint. Args: user_id: Your SaaS user ID organization_id: Your SaaS organization ID (if applicable) return_url: URL to redirect to after successful connection Returns: Authorization URL to redirect user to """ # Generate secure state token state = secrets.token_urlsafe(32) # Store state in DynamoDB for CSRF protection expires_at = datetime.utcnow() + timedelta(minutes=10) ttl = int(expires_at.timestamp()) # Unix timestamp for DynamoDB TTL self.oauth_states_table.put_item( Item={ 'state': state, 'user_id': user_id, 'organization_id': organization_id or '', 'return_url': return_url or '', 'expires_at': datetime_to_iso(expires_at), 'ttl': ttl, # DynamoDB will auto-delete after this time 'used': False } ) # Generate authorization URL auth_url = self.msal_app.get_authorization_request_url( scopes=self.scopes, state=state, redirect_uri=self.redirect_uri ) return auth_url def complete_connection( self, auth_code: str, state: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None ) -> SharePointConnectionInfo: """ Complete OAuth flow and store connection. Args: auth_code: Authorization code from OAuth callback state: State parameter from OAuth callback ip_address: User's IP address (for audit log) user_agent: User's user agent (for audit log) Returns: Connection information Raises: ValueError: If state is invalid or expired Exception: If token acquisition fails """ # Validate state (CSRF protection) try: response = self.oauth_states_table.get_item(Key={'state': state}) oauth_state = response.get('Item') except ClientError: oauth_state = None if not oauth_state: raise ValueError("Invalid OAuth state") # Check if state is valid expires_at = iso_to_datetime(oauth_state.get('expires_at')) if oauth_state.get('used') or datetime.utcnow() > expires_at: raise ValueError("OAuth state expired or already used") # Mark state as used self.oauth_states_table.update_item( Key={'state': state}, UpdateExpression='SET used = :val', ExpressionAttributeValues={':val': True} ) # Exchange code for tokens token_response = self.msal_app.acquire_token_by_authorization_code( code=auth_code, scopes=self.scopes, redirect_uri=self.redirect_uri ) if "error" in token_response: raise Exception(f"Token acquisition failed: {token_response.get('error_description', token_response['error'])}") # Get user info from Microsoft user_info = self._get_user_info(token_response["access_token"]) # Calculate token expiry expires_in = token_response.get("expires_in", 3600) expires_at = datetime.utcnow() + timedelta(seconds=expires_in) # Encrypt tokens encrypted_access_token = self.encryption.encrypt(token_response["access_token"]) encrypted_refresh_token = self.encryption.encrypt(token_response.get("refresh_token", "")) # Create connection record user_id = oauth_state['user_id'] organization_id = oauth_state.get('organization_id') or None connection_id = self._generate_connection_id(user_id, user_info["id"]) now = datetime.utcnow() connection_item = { 'id': connection_id, 'user_id': user_id, 'organization_id': organization_id or '', 'tenant_id': user_info.get('tenantId', self.tenant_id), 'microsoft_user_id': user_info["id"], 'connection_name': f"{user_info.get('displayName', 'SharePoint')} - {user_info.get('userPrincipalName', '')}", 'encrypted_access_token': encrypted_access_token, 'encrypted_refresh_token': encrypted_refresh_token, 'token_expires_at': datetime_to_iso(expires_at), 'scopes': json.dumps(self.scopes), 'is_active': True, 'created_at': datetime_to_iso(now), 'updated_at': datetime_to_iso(now), 'last_used_at': datetime_to_iso(now) } # Store connection self.connections_table.put_item(Item=connection_item) # Audit log self._log_activity( connection_id=connection_id, user_id=user_id, action="connection_created", status="success", ip_address=ip_address, user_agent=user_agent ) return SharePointConnectionInfo( id=connection_id, user_id=user_id, organization_id=organization_id, connection_name=connection_item['connection_name'], tenant_id=connection_item['tenant_id'], microsoft_user_id=user_info["id"], is_active=True, created_at=now, last_used_at=now ) def get_valid_token( self, connection_id: str, user_id: str ) -> str: """ Get a valid access token, refreshing if necessary. Args: connection_id: SharePoint connection ID user_id: Your SaaS user ID (for authorization check) Returns: Valid access token Raises: ValueError: If connection not found or doesn't belong to user Exception: If token refresh fails """ # Get connection (with authorization check!) try: response = self.connections_table.get_item(Key={'id': connection_id}) connection = response.get('Item') except ClientError: connection = None if not connection or connection.get('user_id') != user_id or not connection.get('is_active'): raise ValueError("Connection not found or access denied") # Check if token needs refresh token_expires_at = iso_to_datetime(connection['token_expires_at']) if datetime.utcnow() >= token_expires_at - timedelta(minutes=5): # Refresh token refresh_token = self.encryption.decrypt(connection['encrypted_refresh_token']) token_response = self.msal_app.acquire_token_by_refresh_token( refresh_token=refresh_token, scopes=self.scopes ) if "error" in token_response: self._log_activity( connection_id=connection_id, user_id=user_id, action="token_refresh", status="failure", details=json.dumps({"error": token_response.get("error")}) ) raise Exception(f"Token refresh failed: {token_response.get('error_description', token_response['error'])}") # Update stored tokens new_access_token = self.encryption.encrypt(token_response["access_token"]) new_refresh_token = token_response.get("refresh_token") expires_in = token_response.get("expires_in", 3600) new_expires_at = datetime.utcnow() + timedelta(seconds=expires_in) update_expression = "SET encrypted_access_token = :access, token_expires_at = :expires, updated_at = :updated, last_used_at = :used" expression_values = { ':access': new_access_token, ':expires': datetime_to_iso(new_expires_at), ':updated': datetime_to_iso(datetime.utcnow()), ':used': datetime_to_iso(datetime.utcnow()) } if new_refresh_token: update_expression += ", encrypted_refresh_token = :refresh" expression_values[':refresh'] = self.encryption.encrypt(new_refresh_token) self.connections_table.update_item( Key={'id': connection_id}, UpdateExpression=update_expression, ExpressionAttributeValues=expression_values ) self._log_activity( connection_id=connection_id, user_id=user_id, action="token_refresh", status="success" ) connection['encrypted_access_token'] = new_access_token else: # Update last used timestamp self.connections_table.update_item( Key={'id': connection_id}, UpdateExpression='SET last_used_at = :val', ExpressionAttributeValues={':val': datetime_to_iso(datetime.utcnow())} ) # Decrypt and return access token return self.encryption.decrypt(connection['encrypted_access_token']) def list_connections( self, user_id: str, organization_id: Optional[str] = None ) -> List[SharePointConnectionInfo]: """ List all SharePoint connections for a user or organization. Args: user_id: Your SaaS user ID organization_id: Your SaaS organization ID (optional) Returns: List of connections """ # Query by user_id using GSI response = self.connections_table.query( IndexName='user_id-index', KeyConditionExpression=Key('user_id').eq(user_id), FilterExpression=Attr('is_active').eq(True) ) connections = response.get('Items', []) # Filter by organization if specified if organization_id: connections = [c for c in connections if c.get('organization_id') == organization_id] return [ SharePointConnectionInfo( id=conn['id'], user_id=conn['user_id'], organization_id=conn.get('organization_id') or None, connection_name=conn.get('connection_name'), tenant_id=conn.get('tenant_id'), microsoft_user_id=conn.get('microsoft_user_id'), is_active=conn.get('is_active', False), created_at=iso_to_datetime(conn.get('created_at')), last_used_at=iso_to_datetime(conn.get('last_used_at')) ) for conn in connections ] def disconnect( self, connection_id: str, user_id: str, ip_address: Optional[str] = None ): """ Disconnect (deactivate) a SharePoint connection. Args: connection_id: SharePoint connection ID user_id: Your SaaS user ID (for authorization) ip_address: User's IP for audit log """ # Get connection to verify ownership try: response = self.connections_table.get_item(Key={'id': connection_id}) connection = response.get('Item') except ClientError: connection = None if not connection or connection.get('user_id') != user_id: raise ValueError("Connection not found or access denied") # Deactivate connection self.connections_table.update_item( Key={'id': connection_id}, UpdateExpression='SET is_active = :val, updated_at = :updated', ExpressionAttributeValues={ ':val': False, ':updated': datetime_to_iso(datetime.utcnow()) } ) self._log_activity( connection_id=connection_id, user_id=user_id, action="connection_disconnected", status="success", ip_address=ip_address ) def _get_user_info(self, access_token: str) -> Dict[str, Any]: """Get Microsoft user information.""" headers = {"Authorization": f"Bearer {access_token}"} response = requests.get( "https://graph.microsoft.com/v1.0/me?$select=id,displayName,userPrincipalName,mail", headers=headers ) response.raise_for_status() return response.json() def _generate_connection_id(self, user_id: str, microsoft_user_id: str) -> str: """Generate deterministic connection ID.""" combined = f"{user_id}:{microsoft_user_id}" return hashlib.sha256(combined.encode()).hexdigest()[:32] def _log_activity( self, connection_id: str, user_id: str, action: str, status: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None, details: Optional[str] = None ): """Create audit log entry.""" timestamp = datetime.utcnow() self.audit_logs_table.put_item( Item={ 'connection_id': connection_id, 'timestamp': datetime_to_iso(timestamp), 'user_id': user_id, 'action': action, 'status': status, 'ip_address': ip_address or '', 'user_agent': user_agent or '', 'details': details or '' } ) # ============================================================================ # SHAREPOINT API CLIENT # ============================================================================ class SecureSharePointClient: """ SharePoint API client with automatic token management. Use this to make SharePoint API calls on behalf of connected users. """ def __init__(self, connector: DynamoDBSharePointConnector, connection_id: str, user_id: str): """ Initialize client. Args: connector: DynamoDBSharePointConnector instance connection_id: SharePoint connection ID user_id: Your SaaS user ID """ self.connector = connector self.connection_id = connection_id self.user_id = user_id def _get_headers(self) -> Dict[str, str]: """Get headers with valid access token.""" access_token = self.connector.get_valid_token(self.connection_id, self.user_id) return { "Authorization": f"Bearer {access_token}", "Accept": "application/json" } def list_sites(self) -> List[Dict[str, Any]]: """List SharePoint sites the user has access to.""" response = requests.get( "https://graph.microsoft.com/v1.0/sites?search=*", headers=self._get_headers() ) response.raise_for_status() return response.json().get("value", []) def read_file(self, site_id: str, file_path: str, as_text: bool = True) -> Any: """Read file content.""" encoded_path = requests.utils.quote(file_path) url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive/root:/{encoded_path}:/content" response = requests.get(url, headers=self._get_headers()) response.raise_for_status() return response.text if as_text else response.content def list_files(self, site_id: str, folder_path: str = "") -> List[Dict[str, Any]]: """List files in a folder.""" if folder_path: encoded_path = requests.utils.quote(folder_path) url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive/root:/{encoded_path}:/children" else: url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive/root/children" response = requests.get(url, headers=self._get_headers()) response.raise_for_status() return response.json().get("value", []) def search_files(self, site_id: str, query: str) -> List[Dict[str, Any]]: """Search for files.""" url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive/root/search(q='{query}')" response = requests.get(url, headers=self._get_headers()) response.raise_for_status() return response.json().get("value", []) # ============================================================================ # FLASK INTEGRATION EXAMPLE # ============================================================================ def create_app(connector: DynamoDBSharePointConnector) -> Flask: """Create Flask app with DynamoDB SharePoint connector integration.""" app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET_KEY", secrets.token_urlsafe(32)) def require_auth(f): """Decorator to require authentication.""" @wraps(f) def decorated_function(*args, **kwargs): if "user_id" not in session: return jsonify({"error": "Authentication required"}), 401 return f(*args, **kwargs) return decorated_function @app.route("/sharepoint/connect") @require_auth def connect_sharepoint(): """Initiate SharePoint connection.""" user_id = session["user_id"] organization_id = session.get("organization_id") auth_url = connector.initiate_connection( user_id=user_id, organization_id=organization_id, return_url=request.args.get("return_url", "/dashboard") ) return redirect(auth_url) @app.route("/sharepoint/callback") def sharepoint_callback(): """OAuth callback endpoint.""" if "error" in request.args: return jsonify({ "error": request.args.get("error_description", request.args["error"]) }), 400 auth_code = request.args.get("code") state = request.args.get("state") if not auth_code or not state: return jsonify({"error": "Invalid callback"}), 400 try: connection_info = connector.complete_connection( auth_code=auth_code, state=state, ip_address=request.remote_addr, user_agent=request.headers.get("User-Agent") ) session["sharepoint_connection_id"] = connection_info.id return jsonify({ "success": True, "connection": { "id": connection_info.id, "name": connection_info.connection_name } }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/sharepoint/connections") @require_auth def list_connections(): """List user's SharePoint connections.""" user_id = session["user_id"] connections = connector.list_connections(user_id) return jsonify({ "connections": [ { "id": conn.id, "name": conn.connection_name, "created_at": conn.created_at.isoformat(), "last_used_at": conn.last_used_at.isoformat() if conn.last_used_at else None } for conn in connections ] }) @app.route("/api/sharepoint/connections//disconnect", methods=["POST"]) @require_auth def disconnect_sharepoint(connection_id): """Disconnect SharePoint.""" user_id = session["user_id"] try: connector.disconnect( connection_id=connection_id, user_id=user_id, ip_address=request.remote_addr ) return jsonify({"success": True}) except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/api/sharepoint//sites") @require_auth def get_sites(connection_id): """Get SharePoint sites.""" user_id = session["user_id"] try: client = SecureSharePointClient(connector, connection_id, user_id) sites = client.list_sites() return jsonify({"sites": decimal_to_float(sites)}) except Exception as e: return jsonify({"error": str(e)}), 500 return app if __name__ == "__main__": # Initialize connector connector = DynamoDBSharePointConnector( client_id=os.getenv("SHAREPOINT_CLIENT_ID"), client_secret=os.getenv("SHAREPOINT_CLIENT_SECRET"), redirect_uri=os.getenv("REDIRECT_URI"), encryption_key=os.getenv("ENCRYPTION_KEY"), aws_region=os.getenv("AWS_REGION", "us-east-1"), table_prefix=os.getenv("TABLE_PREFIX", ""), tenant_id=os.getenv("SHAREPOINT_TENANT_ID", "common") ) # Create and run Flask app app = create_app(connector) app.run(debug=False, port=5000)