How do I Secure My MCP Server Connections?
Securing Model Context Protocol (MCP) server connections is critical when building web scraping automation systems. Since MCP servers act as bridges between AI assistants and external tools—including sensitive scraping APIs, databases, and file systems—implementing robust security measures protects your infrastructure, API keys, and data from unauthorized access and potential exploits.
This comprehensive guide covers authentication, encryption, input validation, access control, and security best practices for production MCP deployments.
Understanding MCP Security Architecture
MCP servers typically communicate via stdio (standard input/output) or HTTP/SSE (Server-Sent Events). The security model varies based on your deployment:
- Local stdio servers: Run as processes on the same machine as the MCP host (e.g., Claude Desktop)
- Remote HTTP servers: Exposed over networks, requiring authentication and encryption
- Containerized servers: Isolated environments with additional security layers
Each deployment model requires different security considerations.
1. Authentication and Authorization
API Key Management
Never hardcode API keys or credentials in your MCP server code. Instead, use environment variables and secure secret management:
Python Example:
import os
from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx
app = Server("secure-webscraping-mcp")
# Load credentials from environment
WEBSCRAPING_API_KEY = os.environ.get("WEBSCRAPING_AI_API_KEY")
ALLOWED_DOMAINS = os.environ.get("ALLOWED_DOMAINS", "").split(",")
if not WEBSCRAPING_API_KEY:
raise ValueError("WEBSCRAPING_AI_API_KEY environment variable not set")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "scrape_url":
url = arguments["url"]
# Validate domain whitelist
if not any(domain in url for domain in ALLOWED_DOMAINS):
return [TextContent(
type="text",
text=f"Error: Domain not in allowed list"
)]
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(
"https://api.webscraping.ai/html",
params={
"url": url,
"api_key": WEBSCRAPING_API_KEY,
"js": "true"
}
)
response.raise_for_status()
return [TextContent(type="text", text=response.text)]
except httpx.HTTPError as e:
return [TextContent(
type="text",
text=f"Scraping error: {str(e)}"
)]
TypeScript Example:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import axios from "axios";
import * as dotenv from "dotenv";
// Load environment variables
dotenv.config();
const API_KEY = process.env.WEBSCRAPING_AI_API_KEY;
const ALLOWED_ORIGINS = process.env.ALLOWED_ORIGINS?.split(",") || [];
if (!API_KEY) {
throw new Error("WEBSCRAPING_AI_API_KEY not configured");
}
const server = new Server(
{
name: "secure-webscraping-mcp",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "scrape_html") {
const url = args.url as string;
// Validate origin
const urlObj = new URL(url);
if (ALLOWED_ORIGINS.length > 0 && !ALLOWED_ORIGINS.includes(urlObj.origin)) {
throw new Error(`Origin ${urlObj.origin} not allowed`);
}
try {
const response = await axios.get("https://api.webscraping.ai/html", {
params: {
url: url,
api_key: API_KEY,
js: true,
},
timeout: 30000,
});
return {
content: [
{
type: "text",
text: response.data,
},
],
};
} catch (error) {
throw new Error(`Scraping failed: ${error.message}`);
}
}
throw new Error(`Unknown tool: ${name}`);
});
Configuration Security
When configuring MCP servers in Claude Desktop or other hosts, store sensitive credentials securely:
macOS Configuration (~/Library/Application Support/Claude/claude_desktop_config.json
):
{
"mcpServers": {
"webscraping": {
"command": "python",
"args": ["/path/to/secure_mcp_server.py"],
"env": {
"WEBSCRAPING_AI_API_KEY": "${WEBSCRAPING_AI_API_KEY}",
"ALLOWED_DOMAINS": "example.com,api.example.com"
}
}
}
}
Best Practice: Use a secrets manager like:
- macOS Keychain: Store API keys in Keychain and retrieve programmatically
- AWS Secrets Manager: For cloud-deployed MCP servers
- HashiCorp Vault: For enterprise environments
- Environment variables: Loaded from .env
files (never committed to version control)
2. Input Validation and Sanitization
Always validate and sanitize inputs to prevent injection attacks and unauthorized access, similar to how you would handle authentication in Puppeteer:
from urllib.parse import urlparse
import re
def validate_url(url: str) -> tuple[bool, str]:
"""
Validate URL format and scheme
Returns (is_valid, error_message)
"""
try:
parsed = urlparse(url)
# Only allow HTTP/HTTPS
if parsed.scheme not in ["http", "https"]:
return False, f"Invalid scheme: {parsed.scheme}"
# Require valid netloc
if not parsed.netloc:
return False, "Missing domain"
# Prevent localhost/private IP access
private_patterns = [
r"^localhost$",
r"^127\.",
r"^10\.",
r"^172\.(1[6-9]|2\d|3[01])\.",
r"^192\.168\.",
r"^\[::1\]$",
r"^\[fe80:",
]
for pattern in private_patterns:
if re.match(pattern, parsed.netloc.lower()):
return False, "Private IP addresses not allowed"
return True, ""
except Exception as e:
return False, f"Invalid URL: {str(e)}"
def validate_css_selector(selector: str) -> tuple[bool, str]:
"""Validate CSS selector to prevent injection"""
# Basic validation - adjust based on needs
if len(selector) > 500:
return False, "Selector too long"
# Check for potentially dangerous characters
dangerous_chars = ["<", ">", "{", "}", ";"]
if any(char in selector for char in dangerous_chars):
return False, "Invalid characters in selector"
return True, ""
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "scrape_with_selector":
url = arguments.get("url", "")
selector = arguments.get("selector", "")
# Validate URL
url_valid, url_error = validate_url(url)
if not url_valid:
return [TextContent(type="text", text=f"Error: {url_error}")]
# Validate selector
selector_valid, selector_error = validate_css_selector(selector)
if not selector_valid:
return [TextContent(type="text", text=f"Error: {selector_error}")]
# Proceed with validated inputs
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.webscraping.ai/selected",
params={
"url": url,
"selector": selector,
"api_key": WEBSCRAPING_API_KEY
}
)
return [TextContent(type="text", text=response.text)]
3. Rate Limiting and Resource Protection
Implement rate limiting to prevent abuse and protect your API quotas:
from datetime import datetime, timedelta
from collections import defaultdict
import asyncio
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
"""
max_requests: Maximum requests allowed
time_window: Time window in seconds
"""
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
self.lock = asyncio.Lock()
async def check_rate_limit(self, identifier: str) -> tuple[bool, str]:
"""
Check if request is allowed
Returns (allowed, message)
"""
async with self.lock:
now = datetime.now()
cutoff = now - timedelta(seconds=self.time_window)
# Clean old requests
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if req_time > cutoff
]
# Check limit
if len(self.requests[identifier]) >= self.max_requests:
return False, f"Rate limit exceeded: {self.max_requests} requests per {self.time_window}s"
# Record new request
self.requests[identifier].append(now)
return True, ""
# Initialize rate limiter: 10 requests per minute
rate_limiter = RateLimiter(max_requests=10, time_window=60)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
# Check rate limit (use tool name as identifier)
allowed, message = await rate_limiter.check_rate_limit(name)
if not allowed:
return [TextContent(type="text", text=f"Error: {message}")]
# Process request...
4. Secure Transport for Remote MCP Servers
When deploying MCP servers over HTTP (not stdio), use TLS/SSL encryption:
from mcp.server.sse import SseServerTransport
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
app_server = Server("secure-remote-mcp")
# Create Starlette app with SSE transport
async def handle_sse(request):
async with SseServerTransport("/messages") as transport:
await app_server.run(
transport.read_stream,
transport.write_stream,
app_server.create_initialization_options()
)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse)
]
)
if __name__ == "__main__":
# Run with SSL/TLS
uvicorn.run(
starlette_app,
host="0.0.0.0",
port=8443,
ssl_keyfile="/path/to/private.key",
ssl_certfile="/path/to/certificate.crt",
ssl_ca_certs="/path/to/ca-bundle.crt"
)
Token-Based Authentication for HTTP Servers
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
import secrets
import hashlib
# Generate secure tokens
VALID_TOKENS = {
hashlib.sha256(token.encode()).hexdigest()
for token in os.environ.get("MCP_AUTH_TOKENS", "").split(",")
}
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return Response("Unauthorized", status_code=401)
token = auth_header[7:]
token_hash = hashlib.sha256(token.encode()).hexdigest()
if token_hash not in VALID_TOKENS:
return Response("Invalid token", status_code=401)
return await call_next(request)
starlette_app = Starlette(
routes=[Route("/sse", endpoint=handle_sse)],
middleware=[Middleware(AuthMiddleware)]
)
5. Error Handling and Information Disclosure
Avoid exposing sensitive information in error messages, just as you would when handling errors in Puppeteer:
import logging
from typing import Optional
# Configure secure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mcp/server.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def safe_error_message(error: Exception, user_message: str) -> str:
"""
Log full error details but return sanitized message to user
"""
# Log full error for debugging
logger.error(f"Error occurred: {type(error).__name__}: {str(error)}", exc_info=True)
# Return generic message to user
return user_message
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "scrape_url":
url = arguments["url"]
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.webscraping.ai/html",
params={
"url": url,
"api_key": WEBSCRAPING_API_KEY
}
)
response.raise_for_status()
return [TextContent(type="text", text=response.text)]
except httpx.HTTPStatusError as e:
# Don't expose API keys or internal details
return [TextContent(
type="text",
text=safe_error_message(e, "Failed to fetch content from URL")
)]
except Exception as e:
return [TextContent(
type="text",
text=safe_error_message(e, "An unexpected error occurred")
)]
6. Container Security for MCP Servers
When deploying MCP servers in containers, use security best practices:
Dockerfile Example:
FROM python:3.11-slim
# Run as non-root user
RUN useradd -m -u 1000 mcpuser
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY --chown=mcpuser:mcpuser mcp_server.py .
# Switch to non-root user
USER mcpuser
# Run server
CMD ["python", "mcp_server.py"]
Docker Compose with security:
version: '3.8'
services:
mcp-server:
build: .
environment:
- WEBSCRAPING_AI_API_KEY=${WEBSCRAPING_AI_API_KEY}
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
cap_add:
- NET_BIND_SERVICE
read_only: true
tmpfs:
- /tmp
networks:
- mcp_internal
networks:
mcp_internal:
driver: bridge
7. Monitoring and Audit Logging
Implement comprehensive logging for security monitoring:
import json
from datetime import datetime
class AuditLogger:
def __init__(self, log_file: str):
self.log_file = log_file
def log_request(self, tool: str, arguments: dict, result: str, user_id: Optional[str] = None):
"""Log all MCP tool requests for audit purposes"""
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"tool": tool,
"user_id": user_id or "anonymous",
"arguments": {
# Sanitize sensitive data
k: v if k not in ["api_key", "password", "token"] else "***REDACTED***"
for k, v in arguments.items()
},
"result_length": len(result),
"success": "Error" not in result
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(audit_entry) + "\n")
audit_logger = AuditLogger("/var/log/mcp/audit.log")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
# Process request
result = await process_tool_request(name, arguments)
# Log successful request
audit_logger.log_request(name, arguments, result.text)
return [result]
except Exception as e:
error_msg = str(e)
audit_logger.log_request(name, arguments, f"Error: {error_msg}")
raise
8. Dependency Security
Keep dependencies updated and scan for vulnerabilities:
# Python - Check for vulnerabilities
pip install safety
safety check
# Node.js - Audit dependencies
npm audit
npm audit fix
# Update dependencies regularly
pip install --upgrade mcp httpx
npm update @modelcontextprotocol/sdk
requirements.txt with pinned versions:
mcp==0.9.0
httpx==0.26.0
python-dotenv==1.0.0
pydantic==2.5.0
Security Checklist for Production MCP Servers
✅ Authentication & Authorization - [ ] API keys stored in environment variables - [ ] Domain/origin whitelisting implemented - [ ] Token-based auth for remote servers - [ ] Secret rotation policy in place
✅ Input Validation - [ ] URL validation and sanitization - [ ] CSS selector validation - [ ] Parameter type checking - [ ] Maximum input length limits
✅ Network Security - [ ] TLS/SSL for remote connections - [ ] Private IP blocking - [ ] Rate limiting implemented - [ ] Timeout configuration
✅ Error Handling - [ ] Sanitized error messages - [ ] Comprehensive audit logging - [ ] No sensitive data in logs - [ ] Error monitoring alerts
✅ Container Security (if applicable) - [ ] Non-root user - [ ] Read-only filesystem - [ ] Minimal capabilities - [ ] Network isolation
✅ Monitoring & Updates - [ ] Dependency scanning - [ ] Regular security updates - [ ] Audit log review process - [ ] Incident response plan
Conclusion
Securing MCP server connections requires a multi-layered approach encompassing authentication, input validation, network security, and operational monitoring. By implementing these security best practices, you can safely expose web scraping capabilities through MCP while protecting your infrastructure, API keys, and sensitive data.
Remember that security is an ongoing process—regularly review your security posture, update dependencies, monitor audit logs, and stay informed about emerging threats in the MCP ecosystem. With proper security measures in place, you can confidently build powerful, AI-driven web scraping automation systems on the Model Context Protocol.