Skip to main content

Overview

Secure MCP Gateway provides comprehensive OAuth 2.0 and OAuth 2.1 support for authenticating with remote MCP servers. The OAuth service handles token acquisition, caching, refresh, and revocation with full compliance to RFC specifications.

Features

OAuth 2.0 & 2.1

Full spec compliance with security best practices

Client Credentials

Server-to-server authentication flow

Mutual TLS (mTLS)

Client certificate authentication (RFC 8705)

Token Caching

Automatic caching with expiration tracking

Proactive Refresh

Tokens refreshed 5 minutes before expiry

Scope Validation

Verifies tokens have requested scopes

Architecture

The OAuth service consists of several specialized components:
┌─────────────────────────────────────────────────────────┐
│                    OAuthService                          │
│  - Token acquisition with retry logic                   │
│  - Grant type routing (client_credentials, auth_code)   │
│  - Error handling and correlation IDs                   │
└──────────────────┬──────────────────────────────────────┘

        ┌──────────┴──────────┐
        │                     │
┌───────▼──────────┐  ┌──────▼────────────┐
│  TokenManager    │  │  OAuthMetrics     │
│  - Token cache   │  │  - Acquisitions   │
│  - Expiration    │  │  - Cache hits     │
│  - Refresh       │  │  - Latency        │
└──────────────────┘  └───────────────────┘

OAuth Configuration

OAuthConfig Data Model

src/secure_mcp_gateway/services/oauth/models.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any

class OAuthVersion(Enum):
    OAUTH_2_0 = "2.0"
    OAUTH_2_1 = "2.1"

class OAuthGrantType(Enum):
    CLIENT_CREDENTIALS = "client_credentials"
    AUTHORIZATION_CODE = "authorization_code"
    REFRESH_TOKEN = "refresh_token"

@dataclass
class OAuthConfig:
    # Core configuration
    enabled: bool = False
    version: OAuthVersion = OAuthVersion.OAUTH_2_0
    grant_type: OAuthGrantType = OAuthGrantType.CLIENT_CREDENTIALS
    token_url: str = ""

    # Client credentials
    client_id: Optional[str] = None
    client_secret: Optional[str] = None

    # Optional parameters
    audience: Optional[str] = None
    organization: Optional[str] = None
    scope: Optional[str] = None

    # OAuth 2.1 specific
    resource: Optional[str] = None  # Resource Indicator (RFC 8707)
    use_pkce: bool = False  # PKCE for Authorization Code flow

    # Authorization Code Grant
    authorization_url: Optional[str] = None
    redirect_uri: Optional[str] = None
    state: Optional[str] = None
    code_verifier: Optional[str] = None
    code_challenge: Optional[str] = None
    code_challenge_method: str = "S256"

    # Token management
    token_expiry_buffer: int = 300  # 5 minutes

    # Security settings
    use_basic_auth: bool = True
    enforce_https: bool = True
    token_in_header_only: bool = True

    # mTLS settings (RFC 8705)
    use_mtls: bool = False
    client_cert_path: Optional[str] = None
    client_key_path: Optional[str] = None
    ca_bundle_path: Optional[str] = None

    # Token revocation (RFC 7009)
    revocation_url: Optional[str] = None

    # Scope validation
    validate_scopes: bool = True

    # Advanced
    additional_params: Dict[str, Any] = field(default_factory=dict)
    custom_headers: Dict[str, str] = field(default_factory=dict)

Server Configuration Example

{
  "server_name": "remote_mcp_server",
  "config": {
    "url": "https://api.example.com/mcp",
    "transport": "http"
  },
  "oauth": {
    "enabled": true,
    "version": "2.1",
    "grant_type": "client_credentials",
    "token_url": "https://auth.example.com/oauth/token",
    "client_id": "your_client_id",
    "client_secret": "your_client_secret",
    "audience": "https://api.example.com",
    "scope": "mcp:read mcp:write",
    "enforce_https": true
  }
}

OAuthService Implementation

Token Acquisition

src/secure_mcp_gateway/services/oauth/oauth_service.py
class OAuthService:
    async def get_access_token(
        self,
        server_name: str,
        oauth_config: OAuthConfig,
        config_id: str,
        project_id: str,
        force_refresh: bool = False
    ) -> Tuple[Optional[str], Optional[str]]:
        """
        Get access token with caching and retry logic.

        Returns:
            Tuple of (access_token, error_message)
        """
        # Validate configuration
        is_valid, error_msg = oauth_config.validate()
        if not is_valid:
            return None, error_msg

        # Check cache unless force refresh
        if not force_refresh:
            cached_token = await self.token_manager.get_token(
                server_name, oauth_config, config_id, project_id
            )
            if cached_token:
                self.metrics.record_cache_hit()
                return cached_token.access_token, None
            else:
                self.metrics.record_cache_miss()

        # Obtain new token
        if oauth_config.grant_type == OAuthGrantType.CLIENT_CREDENTIALS:
            token = await self._client_credentials_flow_with_retry(
                server_name, oauth_config
            )
        else:
            return None, f"Unsupported grant type: {oauth_config.grant_type.value}"

        if token:
            # Store in cache
            await self.token_manager.store_token(
                server_name, token, config_id, project_id
            )
            self.metrics.record_token_acquisition(True)
            return token.access_token, None

        return None, "Failed to obtain token"

Client Credentials Flow with Retry

src/secure_mcp_gateway/services/oauth/oauth_service.py
async def _client_credentials_flow_with_retry(
    self, server_name: str, oauth_config: OAuthConfig
) -> Optional[OAuthToken]:
    """
    Execute client credentials flow with exponential backoff retry.

    Retries: 3 attempts with 2s, 4s, 8s delays
    """
    correlation_id = str(uuid.uuid4())
    logger.info(
        f"[OAuthService] Client credentials flow started",
        extra={"server_name": server_name, "correlation_id": correlation_id}
    )

    try:
        async for attempt in AsyncRetrying(
            stop=stop_after_attempt(3),
            wait=wait_exponential(multiplier=2, min=2, max=8),
            retry=retry_if_exception_type(
                (aiohttp.ClientError, asyncio.TimeoutError)
            ),
        ):
            with attempt:
                return await self._execute_client_credentials_flow(
                    server_name, oauth_config, correlation_id
                )
    except RetryError as e:
        logger.error(
            f"[OAuthService] All retry attempts failed for {server_name}",
            extra={"correlation_id": correlation_id, "error": str(e)}
        )
        return None

Token Execution with mTLS Support

src/secure_mcp_gateway/services/oauth/oauth_service.py
async def _execute_client_credentials_flow(
    self, server_name: str, oauth_config: OAuthConfig, correlation_id: str
) -> OAuthToken:
    # Build request body
    data = {
        "grant_type": "client_credentials",
        "client_id": oauth_config.client_id,
    }

    if not oauth_config.use_basic_auth:
        data["client_secret"] = oauth_config.client_secret

    if oauth_config.scope:
        data["scope"] = oauth_config.scope
    if oauth_config.audience:
        data["audience"] = oauth_config.audience
    if oauth_config.resource:
        data["resource"] = oauth_config.resource

    # Merge additional params
    data.update(oauth_config.additional_params)

    # Build headers
    headers = {"Content-Type": "application/x-www-form-urlencoded"}

    if oauth_config.use_basic_auth:
        # RFC 6749: client_secret_basic authentication
        credentials = f"{oauth_config.client_id}:{oauth_config.client_secret}"
        encoded = base64.b64encode(credentials.encode()).decode()
        headers["Authorization"] = f"Basic {encoded}"

    headers.update(oauth_config.custom_headers)

    # Configure mTLS if enabled
    ssl_context = None
    if oauth_config.use_mtls:
        ssl_context = ssl.create_default_context(
            cafile=oauth_config.ca_bundle_path
        )
        ssl_context.load_cert_chain(
            certfile=oauth_config.client_cert_path,
            keyfile=oauth_config.client_key_path
        )

    # Make request
    timeout = aiohttp.ClientTimeout(total=self.timeout_manager.get_timeout("auth"))

    async with aiohttp.ClientSession(
        timeout=timeout, connector=aiohttp.TCPConnector(ssl=ssl_context)
    ) as session:
        async with session.post(
            oauth_config.token_url, data=data, headers=headers
        ) as response:
            response_data = await response.json()

            if response.status != 200:
                raise AuthenticationError(
                    f"Token request failed: {response_data.get('error')}"
                )

            return OAuthToken.from_dict(response_data)

Token Management

TokenManager

src/secure_mcp_gateway/services/oauth/token_manager.py
class TokenManager:
    def __init__(self):
        self._token_cache: Dict[str, OAuthToken] = {}
        self._lock = asyncio.Lock()

    async def get_token(
        self,
        server_name: str,
        oauth_config: OAuthConfig,
        config_id: str,
        project_id: str
    ) -> Optional[OAuthToken]:
        """Get cached token if valid."""
        cache_key = self._build_cache_key(server_name, config_id, project_id)

        async with self._lock:
            token = self._token_cache.get(cache_key)

            if token and not self.is_token_expired(
                token, oauth_config.token_expiry_buffer
            ):
                return token

            # Token expired or not found
            if token:
                logger.debug(f"[TokenManager] Token expired for {server_name}")

            return None

    async def store_token(
        self,
        server_name: str,
        token: OAuthToken,
        config_id: str,
        project_id: str
    ) -> None:
        """Store token in cache."""
        cache_key = self._build_cache_key(server_name, config_id, project_id)

        async with self._lock:
            self._token_cache[cache_key] = token
            logger.info(
                f"[TokenManager] Token cached for {server_name}",
                extra={"expires_in": token.expires_in}
            )

    def is_token_expired(
        self, token: OAuthToken, buffer_seconds: int = 300
    ) -> bool:
        """Check if token is expired or will expire soon."""
        if not token.created_at or not token.expires_in:
            return True

        expiry_time = token.created_at + timedelta(seconds=token.expires_in)
        buffer_time = expiry_time - timedelta(seconds=buffer_seconds)

        return datetime.now(timezone.utc) >= buffer_time

Client Integration

The gateway automatically injects OAuth tokens into MCP server connections:
src/secure_mcp_gateway/services/oauth/integration.py
async def inject_oauth_token(
    server_entry: Dict[str, Any], config_id: str
) -> Dict[str, Any]:
    """
    Inject OAuth token into server connection.

    For remote servers: Adds Authorization header
    For local servers: Adds ENKRYPT_ACCESS_TOKEN env var
    """
    oauth_config = server_entry.get("oauth", {})

    if not oauth_config.get("enabled"):
        return server_entry

    # Get or acquire token
    oauth_service = OAuthService()
    access_token, error = await oauth_service.get_access_token(
        server_entry["server_name"],
        OAuthConfig.from_dict(oauth_config),
        config_id,
        project_id
    )

    if not access_token:
        raise AuthenticationError(f"OAuth token acquisition failed: {error}")

    # Inject based on transport type
    if server_entry["config"].get("transport") == "http":
        # Remote server - add Authorization header
        if "headers" not in server_entry["config"]:
            server_entry["config"]["headers"] = {}
        server_entry["config"]["headers"]["Authorization"] = f"Bearer {access_token}"
    else:
        # Local server - add environment variable
        if "env" not in server_entry["config"]:
            server_entry["config"]["env"] = {}
        server_entry["config"]["env"]["ENKRYPT_ACCESS_TOKEN"] = access_token

    return server_entry

OAuth Metrics

The OAuth service tracks comprehensive metrics:
src/secure_mcp_gateway/services/oauth/metrics.py
class OAuthMetrics:
    def get_metrics(self) -> Dict[str, Any]:
        return {
            "token_acquisitions_total": self._acquisitions_total,
            "token_acquisitions_success": self._acquisitions_success,
            "token_acquisitions_failure": self._acquisitions_failure,
            "token_cache_hits": self._cache_hits,
            "token_cache_misses": self._cache_misses,
            "token_refreshes": self._refreshes,
            "cache_hit_ratio": self._cache_hits / (self._cache_hits + self._cache_misses),
            "success_rate": self._acquisitions_success / self._acquisitions_total,
            "avg_latency_ms": sum(self._latencies) / len(self._latencies),
            "max_latency_ms": max(self._latencies),
            "min_latency_ms": min(self._latencies),
            "active_tokens": len(self.token_manager._token_cache)
        }

Security Best Practices

HTTPS Only: OAuth 2.1 requires HTTPS for all token endpoints. Set enforce_https: true to validate.
Token Storage: Tokens are stored in memory only. For multi-instance deployments, consider external cache with encryption.
mTLS Certificates: Store client certificates securely and rotate regularly. Never commit certificates to source control.

Testing OAuth Configuration

Use the echo OAuth test server to verify configuration:
bad_mcps/echo_oauth_mcp.py
# This test server echoes back the Authorization header
import os
from mcp import FastMCP

mcp = FastMCP("Echo OAuth Server")

@mcp.tool()
def echo_auth_header() -> str:
    """Returns the Authorization header value for testing."""
    auth_header = os.environ.get("ENKRYPT_ACCESS_TOKEN", "No token found")
    return f"Received token: {auth_header[:20]}..."

Build docs developers (and LLMs) love