What are the Best Practices for Using MCP Servers in Production?
Deploying Model Context Protocol (MCP) servers to production environments requires careful planning and adherence to best practices to ensure reliability, security, and scalability. For web scraping applications, production MCP servers must handle high-volume requests, implement robust error handling, and maintain security standards while providing seamless AI-powered data extraction capabilities.
This comprehensive guide covers essential production best practices for MCP servers, from architecture design to deployment strategies, helping you build enterprise-grade web scraping infrastructure.
Security Best Practices
1. Environment Variable Management
Never hardcode API keys, credentials, or sensitive configuration in your MCP server code. Use environment variables and secret management systems:
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Access credentials securely
API_KEY = os.environ.get("WEBSCRAPING_AI_API_KEY")
DATABASE_URL = os.environ.get("DATABASE_URL")
if not API_KEY:
raise ValueError("WEBSCRAPING_AI_API_KEY environment variable is required")
# MCP server configuration
from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx
app = Server("secure-webscraping-server")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.webscraping.ai/html",
params={
"url": arguments["url"],
"api_key": API_KEY, # Use environment variable
"js": "true"
}
)
return [TextContent(type="text", text=response.text)]
2. Input Validation and Sanitization
Implement strict validation for all user inputs to prevent injection attacks and malicious URLs:
from urllib.parse import urlparse
from typing import Optional
import re
class InputValidator:
"""Validate and sanitize MCP tool inputs"""
ALLOWED_SCHEMES = ["http", "https"]
BLOCKED_DOMAINS = ["localhost", "127.0.0.1", "0.0.0.0"]
MAX_URL_LENGTH = 2048
@staticmethod
def validate_url(url: str) -> tuple[bool, Optional[str]]:
"""
Validate URL for security and format
Returns: (is_valid, error_message)
"""
if len(url) > InputValidator.MAX_URL_LENGTH:
return False, f"URL exceeds maximum length of {InputValidator.MAX_URL_LENGTH}"
try:
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in InputValidator.ALLOWED_SCHEMES:
return False, f"Unsupported URL scheme: {parsed.scheme}"
# Check for blocked domains
if any(blocked in parsed.netloc.lower()
for blocked in InputValidator.BLOCKED_DOMAINS):
return False, "Access to local/internal URLs is forbidden"
# Verify domain format
if not parsed.netloc or "." not in parsed.netloc:
return False, "Invalid domain format"
return True, None
except Exception as e:
return False, f"Invalid URL format: {str(e)}"
@staticmethod
def sanitize_selector(selector: str) -> tuple[bool, str]:
"""Validate CSS selector for safety"""
# Remove potentially dangerous characters
if len(selector) > 500:
return False, "Selector too long"
# Basic CSS selector validation
if re.search(r'[<>{}();]', selector):
return False, "Invalid characters in selector"
return True, selector
# Usage in MCP server
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "scrape_url":
url = arguments.get("url")
# Validate URL
is_valid, error = InputValidator.validate_url(url)
if not is_valid:
return [TextContent(
type="text",
text=f"Error: {error}"
)]
# Proceed with scraping
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.webscraping.ai/html",
params={"url": url, "api_key": API_KEY}
)
return [TextContent(type="text", text=response.text)]
3. Rate Limiting and Throttling
Implement rate limiting to prevent abuse and protect your infrastructure, similar to handling timeouts in Puppeteer:
import asyncio
from datetime import datetime, timedelta
from collections import defaultdict
class RateLimiter:
"""Token bucket rate limiter for MCP tools"""
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # seconds
self.requests = defaultdict(list)
async def check_rate_limit(self, client_id: str) -> tuple[bool, str]:
"""
Check if request is within rate limits
Returns: (is_allowed, message)
"""
now = datetime.now()
cutoff = now - timedelta(seconds=self.time_window)
# Remove old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > cutoff
]
# Check limit
if len(self.requests[client_id]) >= self.max_requests:
return False, f"Rate limit exceeded. Max {self.max_requests} requests per {self.time_window}s"
# Add current request
self.requests[client_id].append(now)
return True, "OK"
# Global rate limiter (100 requests per minute)
rate_limiter = RateLimiter(max_requests=100, time_window=60)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
client_id = arguments.get("client_id", "default")
# Check rate limit
allowed, message = await rate_limiter.check_rate_limit(client_id)
if not allowed:
return [TextContent(type="text", text=f"Error: {message}")]
# Process request
# ... rest of the code
Error Handling and Resilience
1. Comprehensive Exception Handling
Implement robust error handling to gracefully manage failures:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { CallToolRequestSchema } from "@modelcontextprotocol/sdk/types.js";
import axios, { AxiosError } from "axios";
const server = new Server(
{ name: "production-mcp-server", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// Custom error types
class ScrapingError extends Error {
constructor(message: string, public statusCode?: number) {
super(message);
this.name = "ScrapingError";
}
}
// Error handler with logging
async function handleScrapingError(error: unknown, url: string): Promise<string> {
console.error(`[ERROR] Scraping failed for ${url}:`, error);
if (error instanceof AxiosError) {
if (error.response) {
// HTTP error response
const status = error.response.status;
switch (status) {
case 401:
return "Authentication failed. Check your API key.";
case 403:
return "Access forbidden. The target site may be blocking requests.";
case 404:
return "URL not found. Please verify the URL is correct.";
case 429:
return "Rate limit exceeded. Please try again later.";
case 500:
case 502:
case 503:
return "Server error. The service may be temporarily unavailable.";
default:
return `HTTP error ${status}: ${error.response.statusText}`;
}
} else if (error.request) {
// Network error
return "Network error. Please check your internet connection.";
}
}
if (error instanceof Error) {
return `Unexpected error: ${error.message}`;
}
return "An unknown error occurred.";
}
// Tool handler with comprehensive error handling
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
if (name === "scrape_url") {
const url = args.url as string;
// Input validation
if (!url || typeof url !== "string") {
throw new ScrapingError("Invalid URL parameter");
}
// Timeout configuration
const timeout = 30000; // 30 seconds
const response = await axios.get("https://api.webscraping.ai/html", {
params: {
url: url,
api_key: process.env.WEBSCRAPING_AI_API_KEY,
js: true,
},
timeout: timeout,
});
return {
content: [
{
type: "text",
text: response.data,
},
],
};
}
throw new ScrapingError(`Unknown tool: ${name}`);
} catch (error) {
const errorMessage = await handleScrapingError(error, args.url as string);
return {
content: [
{
type: "text",
text: `Error: ${errorMessage}`,
},
],
isError: true,
};
}
});
2. Retry Logic with Exponential Backoff
Implement retry mechanisms for transient failures:
import asyncio
from typing import Optional, Callable, Any
import httpx
class RetryConfig:
"""Configuration for retry logic"""
def __init__(
self,
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0
):
self.max_attempts = max_attempts
self.initial_delay = initial_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
async def retry_with_backoff(
func: Callable,
config: RetryConfig = RetryConfig(),
*args,
**kwargs
) -> Any:
"""Execute function with exponential backoff retry"""
last_exception = None
for attempt in range(config.max_attempts):
try:
return await func(*args, **kwargs)
except httpx.TimeoutException as e:
last_exception = e
if attempt < config.max_attempts - 1:
delay = min(
config.initial_delay * (config.exponential_base ** attempt),
config.max_delay
)
print(f"Timeout on attempt {attempt + 1}, retrying in {delay}s...")
await asyncio.sleep(delay)
else:
raise
except httpx.HTTPStatusError as e:
# Don't retry on client errors (4xx)
if 400 <= e.response.status_code < 500:
raise
last_exception = e
if attempt < config.max_attempts - 1:
delay = min(
config.initial_delay * (config.exponential_base ** attempt),
config.max_delay
)
print(f"Server error on attempt {attempt + 1}, retrying in {delay}s...")
await asyncio.sleep(delay)
else:
raise
raise last_exception
# Usage in MCP tool
async def scrape_with_retry(url: str, api_key: str) -> str:
"""Scrape URL with automatic retry on failures"""
async def scrape():
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
"https://api.webscraping.ai/html",
params={"url": url, "api_key": api_key, "js": "true"}
)
response.raise_for_status()
return response.text
return await retry_with_backoff(scrape, RetryConfig(max_attempts=3))
Monitoring and Logging
1. Structured Logging
Implement comprehensive logging for debugging and monitoring:
import logging
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
"""Structured JSON logger for production environments"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Console handler with JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(self._get_json_formatter())
self.logger.addHandler(handler)
def _get_json_formatter(self):
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "extra"):
log_data.update(record.extra)
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
return JSONFormatter()
def info(self, message: str, **kwargs):
extra = {"extra": kwargs} if kwargs else {}
self.logger.info(message, extra=extra)
def error(self, message: str, **kwargs):
extra = {"extra": kwargs} if kwargs else {}
self.logger.error(message, extra=extra)
def warning(self, message: str, **kwargs):
extra = {"extra": kwargs} if kwargs else {}
self.logger.warning(message, extra=extra)
# Initialize logger
logger = StructuredLogger("mcp-server")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
request_id = arguments.get("request_id", "unknown")
start_time = datetime.now()
logger.info(
"Tool call started",
tool=name,
request_id=request_id,
url=arguments.get("url")
)
try:
# Process request
result = await process_request(name, arguments)
duration = (datetime.now() - start_time).total_seconds()
logger.info(
"Tool call completed",
tool=name,
request_id=request_id,
duration_seconds=duration,
success=True
)
return result
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(
"Tool call failed",
tool=name,
request_id=request_id,
duration_seconds=duration,
error=str(e),
error_type=type(e).__name__
)
raise
2. Metrics and Performance Monitoring
Track key performance indicators for your MCP server:
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class Metrics:
"""Performance metrics for MCP server"""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
total_duration: float = 0.0
def record_success(self, duration: float):
self.total_requests += 1
self.successful_requests += 1
self.total_duration += duration
def record_failure(self, duration: float):
self.total_requests += 1
self.failed_requests += 1
self.total_duration += duration
def get_stats(self) -> Dict[str, Any]:
return {
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"success_rate": self.successful_requests / self.total_requests if self.total_requests > 0 else 0,
"average_duration": self.total_duration / self.total_requests if self.total_requests > 0 else 0
}
# Global metrics
metrics = Metrics()
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
start_time = time.time()
try:
result = await process_request(name, arguments)
duration = time.time() - start_time
metrics.record_success(duration)
return result
except Exception as e:
duration = time.time() - start_time
metrics.record_failure(duration)
raise
# Expose metrics endpoint (if using HTTP transport)
@app.list_resources()
async def list_resources():
return [{
"uri": "metrics://server/stats",
"name": "Server Metrics",
"mimeType": "application/json"
}]
@app.read_resource()
async def read_resource(uri: str):
if uri == "metrics://server/stats":
return json.dumps(metrics.get_stats(), indent=2)
Scalability and Performance
1. Connection Pooling
Use connection pooling to optimize performance when making multiple requests:
import httpx
from typing import Optional
class ScrapingClient:
"""Singleton scraping client with connection pooling"""
_instance: Optional['ScrapingClient'] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# Create client with connection pooling
self.client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
)
self._initialized = True
self.api_key = os.environ.get("WEBSCRAPING_AI_API_KEY")
async def scrape_html(self, url: str, **params) -> str:
"""Scrape HTML with connection pooling"""
response = await self.client.get(
"https://api.webscraping.ai/html",
params={
"url": url,
"api_key": self.api_key,
**params
}
)
response.raise_for_status()
return response.text
async def close(self):
"""Close client connections"""
await self.client.aclose()
# Use singleton client
scraping_client = ScrapingClient()
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "scrape_url":
html = await scraping_client.scrape_html(
arguments["url"],
js="true"
)
return [TextContent(type="text", text=html)]
2. Caching Strategies
Implement caching to reduce redundant requests and improve performance:
from functools import lru_cache
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Tuple
class CacheManager:
"""Simple in-memory cache with TTL"""
def __init__(self, ttl_seconds: int = 300):
self.cache: Dict[str, Tuple[Any, datetime]] = {}
self.ttl = timedelta(seconds=ttl_seconds)
def _generate_key(self, url: str, params: Dict) -> str:
"""Generate cache key from URL and parameters"""
data = f"{url}:{json.dumps(params, sort_keys=True)}"
return hashlib.md5(data.encode()).hexdigest()
def get(self, url: str, params: Dict) -> Optional[str]:
"""Get cached response if available and not expired"""
key = self._generate_key(url, params)
if key in self.cache:
content, timestamp = self.cache[key]
if datetime.now() - timestamp < self.ttl:
return content
else:
# Remove expired entry
del self.cache[key]
return None
def set(self, url: str, params: Dict, content: str):
"""Cache response with timestamp"""
key = self._generate_key(url, params)
self.cache[key] = (content, datetime.now())
def clear_expired(self):
"""Remove all expired cache entries"""
now = datetime.now()
expired_keys = [
key for key, (_, timestamp) in self.cache.items()
if now - timestamp >= self.ttl
]
for key in expired_keys:
del self.cache[key]
# Global cache (5-minute TTL)
cache = CacheManager(ttl_seconds=300)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "scrape_url":
url = arguments["url"]
params = {"js": "true"}
# Check cache first
cached_content = cache.get(url, params)
if cached_content:
logger.info("Cache hit", url=url)
return [TextContent(type="text", text=cached_content)]
# Scrape if not cached
logger.info("Cache miss, scraping", url=url)
html = await scraping_client.scrape_html(url, **params)
# Store in cache
cache.set(url, params, html)
return [TextContent(type="text", text=html)]
Deployment Best Practices
1. Docker Containerization
Package your MCP server in a Docker container for consistent deployment, similar to using Puppeteer with Docker:
# Dockerfile for production MCP server
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user for security
RUN useradd -m -u 1000 mcpuser && \
chown -R mcpuser:mcpuser /app
USER mcpuser
# Environment variables (override in production)
ENV PYTHONUNBUFFERED=1
ENV LOG_LEVEL=INFO
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import sys; sys.exit(0)"
# Run the server
CMD ["python", "mcp_server.py"]
# docker-compose.yml for local development
version: '3.8'
services:
mcp-server:
build: .
environment:
- WEBSCRAPING_AI_API_KEY=${WEBSCRAPING_AI_API_KEY}
- LOG_LEVEL=DEBUG
volumes:
- ./logs:/app/logs
restart: unless-stopped
deploy:
resources:
limits:
cpus: '2'
memory: 1G
2. Environment Configuration
Use configuration management for different environments:
from pydantic import BaseSettings, Field
from typing import Optional
class Settings(BaseSettings):
"""Production-ready configuration"""
# API Configuration
webscraping_api_key: str = Field(..., env="WEBSCRAPING_AI_API_KEY")
webscraping_api_url: str = "https://api.webscraping.ai"
# Server Configuration
server_name: str = "production-mcp-server"
server_version: str = "1.0.0"
# Performance Settings
max_connections: int = 100
connection_timeout: int = 30
request_timeout: int = 60
# Rate Limiting
rate_limit_requests: int = 100
rate_limit_window: int = 60
# Caching
cache_enabled: bool = True
cache_ttl: int = 300
# Logging
log_level: str = "INFO"
log_format: str = "json"
# Security
allowed_domains: Optional[str] = None
blocked_domains: str = "localhost,127.0.0.1"
class Config:
env_file = ".env"
case_sensitive = False
# Load settings
settings = Settings()
# Use in application
logger.setLevel(settings.log_level)
3. Process Management
Use process managers for production deployment:
# supervisord.conf
[program:mcp-server]
command=/usr/local/bin/python /app/mcp_server.py
directory=/app
user=mcpuser
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/mcp-server.log
environment=WEBSCRAPING_AI_API_KEY="%(ENV_WEBSCRAPING_AI_API_KEY)s"
Testing and Quality Assurance
1. Unit Tests
Write comprehensive tests for your MCP server:
import pytest
from unittest.mock import AsyncMock, patch
from mcp_server import scrape_with_retry, InputValidator
@pytest.mark.asyncio
async def test_scrape_with_retry_success():
"""Test successful scraping with retry logic"""
with patch('httpx.AsyncClient') as mock_client:
mock_response = AsyncMock()
mock_response.text = "<html>Test</html>"
mock_response.raise_for_status = AsyncMock()
mock_client.return_value.__aenter__.return_value.get.return_value = mock_response
result = await scrape_with_retry("https://example.com", "test_key")
assert result == "<html>Test</html>"
@pytest.mark.asyncio
async def test_scrape_with_retry_failure():
"""Test retry logic on failures"""
with patch('httpx.AsyncClient') as mock_client:
mock_client.return_value.__aenter__.return_value.get.side_effect = \
httpx.TimeoutException("Timeout")
with pytest.raises(httpx.TimeoutException):
await scrape_with_retry("https://example.com", "test_key")
def test_url_validation():
"""Test URL validation logic"""
# Valid URL
is_valid, error = InputValidator.validate_url("https://example.com")
assert is_valid is True
# Invalid scheme
is_valid, error = InputValidator.validate_url("ftp://example.com")
assert is_valid is False
assert "scheme" in error.lower()
# Blocked domain
is_valid, error = InputValidator.validate_url("https://localhost/test")
assert is_valid is False
2. Integration Tests
Test the complete MCP server workflow:
import pytest
from mcp.client import ClientSession
from mcp.client.stdio import stdio_client
@pytest.mark.asyncio
async def test_mcp_server_integration():
"""Test complete MCP server interaction"""
async with stdio_client("python", ["mcp_server.py"]) as (read, write):
async with ClientSession(read, write) as session:
# Initialize
await session.initialize()
# List tools
tools = await session.list_tools()
assert len(tools) > 0
assert any(tool.name == "scrape_url" for tool in tools)
# Call tool
result = await session.call_tool("scrape_url", {
"url": "https://example.com"
})
assert result is not None
Production Checklist
Before deploying your MCP server to production, ensure you have:
- [ ] Security: Environment variables for all secrets
- [ ] Security: Input validation and sanitization
- [ ] Security: Rate limiting implemented
- [ ] Security: HTTPS/TLS for all external communications
- [ ] Reliability: Comprehensive error handling
- [ ] Reliability: Retry logic with exponential backoff
- [ ] Reliability: Timeouts configured appropriately
- [ ] Monitoring: Structured logging implemented
- [ ] Monitoring: Metrics collection and exposure
- [ ] Monitoring: Health check endpoints
- [ ] Performance: Connection pooling configured
- [ ] Performance: Caching strategy implemented
- [ ] Performance: Resource limits defined
- [ ] Testing: Unit tests with >80% coverage
- [ ] Testing: Integration tests for critical paths
- [ ] Testing: Load testing completed
- [ ] Deployment: Docker containerization
- [ ] Deployment: CI/CD pipeline configured
- [ ] Deployment: Rollback strategy defined
- [ ] Documentation: API documentation complete
- [ ] Documentation: Runbooks for common issues
Conclusion
Deploying MCP servers to production requires careful attention to security, reliability, performance, and monitoring. By following these best practices—from implementing robust error handling and rate limiting to containerization and comprehensive testing—you can build enterprise-grade web scraping infrastructure that scales reliably.
Remember that production systems require ongoing maintenance, monitoring, and optimization. Continuously review your logs, metrics, and performance indicators to identify areas for improvement and ensure your MCP server remains secure, efficient, and reliable as your web scraping needs evolve.