How do I monitor MCP server health and performance?
Monitoring MCP (Model Context Protocol) server health and performance is crucial for maintaining reliable web scraping operations and ensuring optimal resource utilization. This guide covers comprehensive monitoring strategies, performance metrics, logging techniques, and optimization approaches for MCP servers.
Understanding MCP Server Monitoring Fundamentals
MCP servers require continuous monitoring to ensure they're operating efficiently and handling requests properly. Effective monitoring helps you identify bottlenecks, prevent downtime, and optimize resource allocation for your web scraping workflows.
Key Performance Indicators (KPIs)
When monitoring MCP servers, focus on these critical metrics:
- Response Time: Time taken to process requests
- Request Throughput: Number of requests processed per second
- Error Rate: Percentage of failed requests
- Resource Utilization: CPU, memory, and network usage
- Connection Pool Status: Active and idle connections
- Queue Length: Number of pending requests
Implementing Health Check Endpoints
Health check endpoints provide real-time status information about your MCP server's operational state.
Basic Health Check Implementation (Python)
from fastapi import FastAPI, status
from datetime import datetime
import psutil
import asyncio
app = FastAPI()
class HealthMonitor:
def __init__(self):
self.start_time = datetime.now()
self.request_count = 0
self.error_count = 0
async def get_health_status(self):
"""Comprehensive health check"""
uptime = (datetime.now() - self.start_time).total_seconds()
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
return {
"status": "healthy" if cpu_percent < 80 and memory.percent < 85 else "degraded",
"uptime_seconds": uptime,
"cpu_usage_percent": cpu_percent,
"memory_usage_percent": memory.percent,
"memory_available_mb": memory.available / (1024 * 1024),
"total_requests": self.request_count,
"total_errors": self.error_count,
"error_rate": (self.error_count / self.request_count * 100) if self.request_count > 0 else 0,
"timestamp": datetime.now().isoformat()
}
monitor = HealthMonitor()
@app.get("/health")
async def health_check():
"""Simple health check endpoint"""
return {"status": "ok", "timestamp": datetime.now().isoformat()}
@app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health status with metrics"""
return await monitor.get_health_status()
@app.get("/health/ready")
async def readiness_check():
"""Check if server is ready to accept requests"""
health = await monitor.get_health_status()
if health["status"] == "healthy":
return {"ready": True}
return {"ready": False}, status.HTTP_503_SERVICE_UNAVAILABLE
Health Check Implementation (Node.js)
const express = require('express');
const os = require('os');
const app = express();
class HealthMonitor {
constructor() {
this.startTime = Date.now();
this.requestCount = 0;
this.errorCount = 0;
}
getHealthStatus() {
const uptime = (Date.now() - this.startTime) / 1000;
const totalMemory = os.totalmem();
const freeMemory = os.freemem();
const usedMemory = totalMemory - freeMemory;
const memoryUsagePercent = (usedMemory / totalMemory) * 100;
const cpuUsage = os.loadavg()[0] / os.cpus().length * 100;
return {
status: cpuUsage < 80 && memoryUsagePercent < 85 ? 'healthy' : 'degraded',
uptime_seconds: uptime,
cpu_usage_percent: cpuUsage.toFixed(2),
memory_usage_percent: memoryUsagePercent.toFixed(2),
memory_available_mb: (freeMemory / 1024 / 1024).toFixed(2),
total_requests: this.requestCount,
total_errors: this.errorCount,
error_rate: this.requestCount > 0 ?
(this.errorCount / this.requestCount * 100).toFixed(2) : 0,
timestamp: new Date().toISOString()
};
}
incrementRequests() {
this.requestCount++;
}
incrementErrors() {
this.errorCount++;
}
}
const monitor = new HealthMonitor();
app.get('/health', (req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
app.get('/health/detailed', (req, res) => {
res.json(monitor.getHealthStatus());
});
app.get('/health/ready', (req, res) => {
const health = monitor.getHealthStatus();
if (health.status === 'healthy') {
res.json({ ready: true });
} else {
res.status(503).json({ ready: false });
}
});
// Middleware to track requests
app.use((req, res, next) => {
monitor.incrementRequests();
const originalSend = res.send;
res.send = function(data) {
if (res.statusCode >= 400) {
monitor.incrementErrors();
}
originalSend.call(this, data);
};
next();
});
Performance Metrics Collection
Implementing Prometheus Metrics
Prometheus is an industry-standard monitoring system perfect for tracking MCP server metrics.
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time
# Define metrics
request_counter = Counter('mcp_requests_total', 'Total number of requests', ['method', 'endpoint'])
request_duration = Histogram('mcp_request_duration_seconds', 'Request duration in seconds', ['endpoint'])
active_connections = Gauge('mcp_active_connections', 'Number of active connections')
error_counter = Counter('mcp_errors_total', 'Total number of errors', ['error_type'])
scraping_success = Counter('mcp_scraping_success_total', 'Successful scraping operations')
scraping_failures = Counter('mcp_scraping_failures_total', 'Failed scraping operations', ['reason'])
@app.middleware("http")
async def track_metrics(request, call_next):
"""Middleware to track request metrics"""
start_time = time.time()
active_connections.inc()
try:
response = await call_next(request)
duration = time.time() - start_time
request_counter.labels(method=request.method, endpoint=request.url.path).inc()
request_duration.labels(endpoint=request.url.path).observe(duration)
if response.status_code >= 400:
error_counter.labels(error_type=f"http_{response.status_code}").inc()
return response
finally:
active_connections.dec()
@app.get("/metrics")
async def metrics():
"""Expose Prometheus metrics"""
return Response(content=generate_latest(), media_type="text/plain")
Custom Performance Tracking
class PerformanceTracker {
constructor() {
this.metrics = {
requests: new Map(),
responseTime: [],
throughput: {
current: 0,
peak: 0,
average: 0
}
};
this.windowSize = 60000; // 1 minute window
this.startTracking();
}
recordRequest(endpoint, duration, success) {
const timestamp = Date.now();
// Record response time
this.metrics.responseTime.push({
timestamp,
endpoint,
duration,
success
});
// Clean old entries
this.metrics.responseTime = this.metrics.responseTime.filter(
entry => timestamp - entry.timestamp < this.windowSize
);
// Update throughput
this.metrics.throughput.current = this.metrics.responseTime.length / 60;
this.metrics.throughput.peak = Math.max(
this.metrics.throughput.peak,
this.metrics.throughput.current
);
}
getMetrics() {
const recentMetrics = this.metrics.responseTime;
const successfulRequests = recentMetrics.filter(m => m.success);
return {
total_requests: recentMetrics.length,
successful_requests: successfulRequests.length,
failed_requests: recentMetrics.length - successfulRequests.length,
success_rate: recentMetrics.length > 0 ?
(successfulRequests.length / recentMetrics.length * 100).toFixed(2) : 0,
avg_response_time: recentMetrics.length > 0 ?
(recentMetrics.reduce((sum, m) => sum + m.duration, 0) / recentMetrics.length).toFixed(2) : 0,
p95_response_time: this.calculatePercentile(recentMetrics, 95),
p99_response_time: this.calculatePercentile(recentMetrics, 99),
throughput_per_minute: this.metrics.throughput.current.toFixed(2),
peak_throughput: this.metrics.throughput.peak.toFixed(2)
};
}
calculatePercentile(metrics, percentile) {
if (metrics.length === 0) return 0;
const sorted = metrics.map(m => m.duration).sort((a, b) => a - b);
const index = Math.ceil(sorted.length * percentile / 100) - 1;
return sorted[index].toFixed(2);
}
startTracking() {
setInterval(() => {
console.log('Current metrics:', this.getMetrics());
}, 10000); // Log every 10 seconds
}
}
const performanceTracker = new PerformanceTracker();
Logging and Error Tracking
Comprehensive logging helps diagnose issues and track server behavior over time.
Structured Logging Implementation
import logging
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
self.logger.setLevel(logging.INFO)
# JSON formatter for structured logs
handler = logging.StreamHandler()
self.logger.addHandler(handler)
def log(self, level: str, message: str, **kwargs: Any):
"""Log structured message"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"service": self.service_name,
"level": level,
"message": message,
**kwargs
}
log_line = json.dumps(log_entry)
if level == "ERROR":
self.logger.error(log_line)
elif level == "WARNING":
self.logger.warning(log_line)
elif level == "INFO":
self.logger.info(log_line)
else:
self.logger.debug(log_line)
def log_request(self, method: str, endpoint: str, duration: float, status_code: int):
"""Log HTTP request"""
self.log(
"INFO",
f"{method} {endpoint}",
method=method,
endpoint=endpoint,
duration_ms=duration * 1000,
status_code=status_code,
type="http_request"
)
def log_scraping_operation(self, url: str, success: bool, duration: float, error: str = None):
"""Log scraping operation"""
self.log(
"INFO" if success else "ERROR",
f"Scraping {'succeeded' if success else 'failed'}: {url}",
url=url,
success=success,
duration_ms=duration * 1000,
error=error,
type="scraping_operation"
)
# Usage
logger = StructuredLogger("mcp-server")
@app.middleware("http")
async def log_requests(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
logger.log_request(
request.method,
str(request.url.path),
duration,
response.status_code
)
return response
Monitoring with External Services
Integration with Cloud Monitoring
# Install monitoring agents
# For AWS CloudWatch
pip install boto3 watchtower
# For Google Cloud Monitoring
pip install google-cloud-monitoring
# For Datadog
pip install datadog
import boto3
from datetime import datetime
class CloudWatchMonitor:
def __init__(self, namespace='MCPServer'):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
def send_metric(self, metric_name: str, value: float, unit: str = 'None'):
"""Send custom metric to CloudWatch"""
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
]
)
def send_scraping_metrics(self, success_count: int, failure_count: int, avg_duration: float):
"""Send scraping-specific metrics"""
self.send_metric('ScrapingSuccessCount', success_count, 'Count')
self.send_metric('ScrapingFailureCount', failure_count, 'Count')
self.send_metric('AverageScrapingDuration', avg_duration, 'Milliseconds')
Performance Optimization Strategies
Connection Pool Monitoring
When working with browser automation for web scraping, monitoring connection pools is essential. Similar to how you monitor network requests in Puppeteer, you need to track active browser instances and their resource consumption.
class BrowserPoolMonitor:
def __init__(self):
self.active_browsers = 0
self.max_browsers = 10
self.browser_queue = []
def get_pool_status(self):
"""Get current pool status"""
return {
"active_browsers": self.active_browsers,
"max_browsers": self.max_browsers,
"queue_length": len(self.browser_queue),
"utilization_percent": (self.active_browsers / self.max_browsers) * 100,
"available_slots": self.max_browsers - self.active_browsers
}
async def acquire_browser(self):
"""Acquire browser from pool with monitoring"""
if self.active_browsers >= self.max_browsers:
logger.log("WARNING", "Browser pool at capacity", **self.get_pool_status())
# Wait or queue request
self.active_browsers += 1
logger.log("INFO", "Browser acquired", **self.get_pool_status())
async def release_browser(self):
"""Release browser back to pool"""
self.active_browsers -= 1
logger.log("INFO", "Browser released", **self.get_pool_status())
Alert Configuration
Set up alerts for critical thresholds:
class AlertManager:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
self.thresholds = {
'cpu_percent': 80,
'memory_percent': 85,
'error_rate': 5,
'response_time_p95': 5000 # milliseconds
}
async def check_and_alert(self, metrics: Dict[str, float]):
"""Check metrics against thresholds and send alerts"""
alerts = []
for metric, value in metrics.items():
if metric in self.thresholds and value > self.thresholds[metric]:
alerts.append({
'metric': metric,
'value': value,
'threshold': self.thresholds[metric],
'severity': 'critical' if value > self.thresholds[metric] * 1.2 else 'warning'
})
if alerts:
await self.send_alert(alerts)
async def send_alert(self, alerts: list):
"""Send alert notification"""
# Send to Slack, PagerDuty, email, etc.
payload = {
'text': f'MCP Server Alert: {len(alerts)} threshold(s) exceeded',
'alerts': alerts,
'timestamp': datetime.utcnow().isoformat()
}
# Implementation for your notification system
Dashboard and Visualization
Creating a Monitoring Dashboard
// Express endpoint for dashboard data
app.get('/dashboard/metrics', async (req, res) => {
const metrics = {
health: await monitor.getHealthStatus(),
performance: performanceTracker.getMetrics(),
pool: browserPool.getPoolStatus(),
timeline: {
labels: generateTimeLabels(60), // Last 60 minutes
requests: getRequestTimeline(60),
errors: getErrorTimeline(60),
responseTime: getResponseTimeTimeline(60)
}
};
res.json(metrics);
});
function generateTimeLabels(minutes) {
const labels = [];
const now = new Date();
for (let i = minutes; i >= 0; i--) {
const time = new Date(now - i * 60000);
labels.push(time.toLocaleTimeString());
}
return labels;
}
Best Practices for MCP Server Monitoring
- Implement Multiple Health Check Levels: Basic, detailed, and readiness checks serve different purposes
- Use Structured Logging: JSON-formatted logs enable better parsing and analysis
- Track Business Metrics: Monitor scraping success rates, not just technical metrics
- Set Appropriate Alerts: Avoid alert fatigue by setting meaningful thresholds
- Monitor Resource Trends: Track patterns over time to predict capacity needs
- Implement Graceful Degradation: When handling errors in Puppeteer or other browser automation tools, ensure your monitoring captures degraded states
- Regular Performance Reviews: Analyze metrics weekly to identify optimization opportunities
Monitoring Checklist
- [ ] Health check endpoints implemented
- [ ] Performance metrics collection active
- [ ] Structured logging configured
- [ ] Error tracking and alerting set up
- [ ] Resource utilization monitoring enabled
- [ ] Dashboard for visualization created
- [ ] Alert thresholds defined and tested
- [ ] Log aggregation system configured
- [ ] Backup monitoring systems in place
- [ ] Documentation for monitoring procedures updated
Conclusion
Effective monitoring of MCP server health and performance is essential for maintaining reliable web scraping operations. By implementing comprehensive health checks, collecting detailed metrics, using structured logging, and setting up appropriate alerts, you can ensure your MCP servers operate efficiently and handle issues proactively. Regular review of monitoring data helps optimize performance and plan for scaling needs.
Remember that monitoring is not a one-time setup but an ongoing process that should evolve with your infrastructure and requirements. Start with basic health checks and gradually add more sophisticated monitoring as your needs grow.