Table of contents

How do I monitor MCP server health and performance?

Monitoring MCP (Model Context Protocol) server health and performance is crucial for maintaining reliable web scraping operations and ensuring optimal resource utilization. This guide covers comprehensive monitoring strategies, performance metrics, logging techniques, and optimization approaches for MCP servers.

Understanding MCP Server Monitoring Fundamentals

MCP servers require continuous monitoring to ensure they're operating efficiently and handling requests properly. Effective monitoring helps you identify bottlenecks, prevent downtime, and optimize resource allocation for your web scraping workflows.

Key Performance Indicators (KPIs)

When monitoring MCP servers, focus on these critical metrics:

  • Response Time: Time taken to process requests
  • Request Throughput: Number of requests processed per second
  • Error Rate: Percentage of failed requests
  • Resource Utilization: CPU, memory, and network usage
  • Connection Pool Status: Active and idle connections
  • Queue Length: Number of pending requests

Implementing Health Check Endpoints

Health check endpoints provide real-time status information about your MCP server's operational state.

Basic Health Check Implementation (Python)

from fastapi import FastAPI, status
from datetime import datetime
import psutil
import asyncio

app = FastAPI()

class HealthMonitor:
    def __init__(self):
        self.start_time = datetime.now()
        self.request_count = 0
        self.error_count = 0

    async def get_health_status(self):
        """Comprehensive health check"""
        uptime = (datetime.now() - self.start_time).total_seconds()
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()

        return {
            "status": "healthy" if cpu_percent < 80 and memory.percent < 85 else "degraded",
            "uptime_seconds": uptime,
            "cpu_usage_percent": cpu_percent,
            "memory_usage_percent": memory.percent,
            "memory_available_mb": memory.available / (1024 * 1024),
            "total_requests": self.request_count,
            "total_errors": self.error_count,
            "error_rate": (self.error_count / self.request_count * 100) if self.request_count > 0 else 0,
            "timestamp": datetime.now().isoformat()
        }

monitor = HealthMonitor()

@app.get("/health")
async def health_check():
    """Simple health check endpoint"""
    return {"status": "ok", "timestamp": datetime.now().isoformat()}

@app.get("/health/detailed")
async def detailed_health_check():
    """Detailed health status with metrics"""
    return await monitor.get_health_status()

@app.get("/health/ready")
async def readiness_check():
    """Check if server is ready to accept requests"""
    health = await monitor.get_health_status()
    if health["status"] == "healthy":
        return {"ready": True}
    return {"ready": False}, status.HTTP_503_SERVICE_UNAVAILABLE

Health Check Implementation (Node.js)

const express = require('express');
const os = require('os');
const app = express();

class HealthMonitor {
    constructor() {
        this.startTime = Date.now();
        this.requestCount = 0;
        this.errorCount = 0;
    }

    getHealthStatus() {
        const uptime = (Date.now() - this.startTime) / 1000;
        const totalMemory = os.totalmem();
        const freeMemory = os.freemem();
        const usedMemory = totalMemory - freeMemory;
        const memoryUsagePercent = (usedMemory / totalMemory) * 100;

        const cpuUsage = os.loadavg()[0] / os.cpus().length * 100;

        return {
            status: cpuUsage < 80 && memoryUsagePercent < 85 ? 'healthy' : 'degraded',
            uptime_seconds: uptime,
            cpu_usage_percent: cpuUsage.toFixed(2),
            memory_usage_percent: memoryUsagePercent.toFixed(2),
            memory_available_mb: (freeMemory / 1024 / 1024).toFixed(2),
            total_requests: this.requestCount,
            total_errors: this.errorCount,
            error_rate: this.requestCount > 0 ?
                (this.errorCount / this.requestCount * 100).toFixed(2) : 0,
            timestamp: new Date().toISOString()
        };
    }

    incrementRequests() {
        this.requestCount++;
    }

    incrementErrors() {
        this.errorCount++;
    }
}

const monitor = new HealthMonitor();

app.get('/health', (req, res) => {
    res.json({ status: 'ok', timestamp: new Date().toISOString() });
});

app.get('/health/detailed', (req, res) => {
    res.json(monitor.getHealthStatus());
});

app.get('/health/ready', (req, res) => {
    const health = monitor.getHealthStatus();
    if (health.status === 'healthy') {
        res.json({ ready: true });
    } else {
        res.status(503).json({ ready: false });
    }
});

// Middleware to track requests
app.use((req, res, next) => {
    monitor.incrementRequests();
    const originalSend = res.send;
    res.send = function(data) {
        if (res.statusCode >= 400) {
            monitor.incrementErrors();
        }
        originalSend.call(this, data);
    };
    next();
});

Performance Metrics Collection

Implementing Prometheus Metrics

Prometheus is an industry-standard monitoring system perfect for tracking MCP server metrics.

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time

# Define metrics
request_counter = Counter('mcp_requests_total', 'Total number of requests', ['method', 'endpoint'])
request_duration = Histogram('mcp_request_duration_seconds', 'Request duration in seconds', ['endpoint'])
active_connections = Gauge('mcp_active_connections', 'Number of active connections')
error_counter = Counter('mcp_errors_total', 'Total number of errors', ['error_type'])
scraping_success = Counter('mcp_scraping_success_total', 'Successful scraping operations')
scraping_failures = Counter('mcp_scraping_failures_total', 'Failed scraping operations', ['reason'])

@app.middleware("http")
async def track_metrics(request, call_next):
    """Middleware to track request metrics"""
    start_time = time.time()
    active_connections.inc()

    try:
        response = await call_next(request)
        duration = time.time() - start_time

        request_counter.labels(method=request.method, endpoint=request.url.path).inc()
        request_duration.labels(endpoint=request.url.path).observe(duration)

        if response.status_code >= 400:
            error_counter.labels(error_type=f"http_{response.status_code}").inc()

        return response
    finally:
        active_connections.dec()

@app.get("/metrics")
async def metrics():
    """Expose Prometheus metrics"""
    return Response(content=generate_latest(), media_type="text/plain")

Custom Performance Tracking

class PerformanceTracker {
    constructor() {
        this.metrics = {
            requests: new Map(),
            responseTime: [],
            throughput: {
                current: 0,
                peak: 0,
                average: 0
            }
        };
        this.windowSize = 60000; // 1 minute window
        this.startTracking();
    }

    recordRequest(endpoint, duration, success) {
        const timestamp = Date.now();

        // Record response time
        this.metrics.responseTime.push({
            timestamp,
            endpoint,
            duration,
            success
        });

        // Clean old entries
        this.metrics.responseTime = this.metrics.responseTime.filter(
            entry => timestamp - entry.timestamp < this.windowSize
        );

        // Update throughput
        this.metrics.throughput.current = this.metrics.responseTime.length / 60;
        this.metrics.throughput.peak = Math.max(
            this.metrics.throughput.peak,
            this.metrics.throughput.current
        );
    }

    getMetrics() {
        const recentMetrics = this.metrics.responseTime;
        const successfulRequests = recentMetrics.filter(m => m.success);

        return {
            total_requests: recentMetrics.length,
            successful_requests: successfulRequests.length,
            failed_requests: recentMetrics.length - successfulRequests.length,
            success_rate: recentMetrics.length > 0 ?
                (successfulRequests.length / recentMetrics.length * 100).toFixed(2) : 0,
            avg_response_time: recentMetrics.length > 0 ?
                (recentMetrics.reduce((sum, m) => sum + m.duration, 0) / recentMetrics.length).toFixed(2) : 0,
            p95_response_time: this.calculatePercentile(recentMetrics, 95),
            p99_response_time: this.calculatePercentile(recentMetrics, 99),
            throughput_per_minute: this.metrics.throughput.current.toFixed(2),
            peak_throughput: this.metrics.throughput.peak.toFixed(2)
        };
    }

    calculatePercentile(metrics, percentile) {
        if (metrics.length === 0) return 0;
        const sorted = metrics.map(m => m.duration).sort((a, b) => a - b);
        const index = Math.ceil(sorted.length * percentile / 100) - 1;
        return sorted[index].toFixed(2);
    }

    startTracking() {
        setInterval(() => {
            console.log('Current metrics:', this.getMetrics());
        }, 10000); // Log every 10 seconds
    }
}

const performanceTracker = new PerformanceTracker();

Logging and Error Tracking

Comprehensive logging helps diagnose issues and track server behavior over time.

Structured Logging Implementation

import logging
import json
from datetime import datetime
from typing import Any, Dict

class StructuredLogger:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
        self.logger.setLevel(logging.INFO)

        # JSON formatter for structured logs
        handler = logging.StreamHandler()
        self.logger.addHandler(handler)

    def log(self, level: str, message: str, **kwargs: Any):
        """Log structured message"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "service": self.service_name,
            "level": level,
            "message": message,
            **kwargs
        }

        log_line = json.dumps(log_entry)

        if level == "ERROR":
            self.logger.error(log_line)
        elif level == "WARNING":
            self.logger.warning(log_line)
        elif level == "INFO":
            self.logger.info(log_line)
        else:
            self.logger.debug(log_line)

    def log_request(self, method: str, endpoint: str, duration: float, status_code: int):
        """Log HTTP request"""
        self.log(
            "INFO",
            f"{method} {endpoint}",
            method=method,
            endpoint=endpoint,
            duration_ms=duration * 1000,
            status_code=status_code,
            type="http_request"
        )

    def log_scraping_operation(self, url: str, success: bool, duration: float, error: str = None):
        """Log scraping operation"""
        self.log(
            "INFO" if success else "ERROR",
            f"Scraping {'succeeded' if success else 'failed'}: {url}",
            url=url,
            success=success,
            duration_ms=duration * 1000,
            error=error,
            type="scraping_operation"
        )

# Usage
logger = StructuredLogger("mcp-server")

@app.middleware("http")
async def log_requests(request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time

    logger.log_request(
        request.method,
        str(request.url.path),
        duration,
        response.status_code
    )

    return response

Monitoring with External Services

Integration with Cloud Monitoring

# Install monitoring agents
# For AWS CloudWatch
pip install boto3 watchtower

# For Google Cloud Monitoring
pip install google-cloud-monitoring

# For Datadog
pip install datadog
import boto3
from datetime import datetime

class CloudWatchMonitor:
    def __init__(self, namespace='MCPServer'):
        self.cloudwatch = boto3.client('cloudwatch')
        self.namespace = namespace

    def send_metric(self, metric_name: str, value: float, unit: str = 'None'):
        """Send custom metric to CloudWatch"""
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    'MetricName': metric_name,
                    'Value': value,
                    'Unit': unit,
                    'Timestamp': datetime.utcnow()
                }
            ]
        )

    def send_scraping_metrics(self, success_count: int, failure_count: int, avg_duration: float):
        """Send scraping-specific metrics"""
        self.send_metric('ScrapingSuccessCount', success_count, 'Count')
        self.send_metric('ScrapingFailureCount', failure_count, 'Count')
        self.send_metric('AverageScrapingDuration', avg_duration, 'Milliseconds')

Performance Optimization Strategies

Connection Pool Monitoring

When working with browser automation for web scraping, monitoring connection pools is essential. Similar to how you monitor network requests in Puppeteer, you need to track active browser instances and their resource consumption.

class BrowserPoolMonitor:
    def __init__(self):
        self.active_browsers = 0
        self.max_browsers = 10
        self.browser_queue = []

    def get_pool_status(self):
        """Get current pool status"""
        return {
            "active_browsers": self.active_browsers,
            "max_browsers": self.max_browsers,
            "queue_length": len(self.browser_queue),
            "utilization_percent": (self.active_browsers / self.max_browsers) * 100,
            "available_slots": self.max_browsers - self.active_browsers
        }

    async def acquire_browser(self):
        """Acquire browser from pool with monitoring"""
        if self.active_browsers >= self.max_browsers:
            logger.log("WARNING", "Browser pool at capacity", **self.get_pool_status())
            # Wait or queue request

        self.active_browsers += 1
        logger.log("INFO", "Browser acquired", **self.get_pool_status())

    async def release_browser(self):
        """Release browser back to pool"""
        self.active_browsers -= 1
        logger.log("INFO", "Browser released", **self.get_pool_status())

Alert Configuration

Set up alerts for critical thresholds:

class AlertManager:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
        self.thresholds = {
            'cpu_percent': 80,
            'memory_percent': 85,
            'error_rate': 5,
            'response_time_p95': 5000  # milliseconds
        }

    async def check_and_alert(self, metrics: Dict[str, float]):
        """Check metrics against thresholds and send alerts"""
        alerts = []

        for metric, value in metrics.items():
            if metric in self.thresholds and value > self.thresholds[metric]:
                alerts.append({
                    'metric': metric,
                    'value': value,
                    'threshold': self.thresholds[metric],
                    'severity': 'critical' if value > self.thresholds[metric] * 1.2 else 'warning'
                })

        if alerts:
            await self.send_alert(alerts)

    async def send_alert(self, alerts: list):
        """Send alert notification"""
        # Send to Slack, PagerDuty, email, etc.
        payload = {
            'text': f'MCP Server Alert: {len(alerts)} threshold(s) exceeded',
            'alerts': alerts,
            'timestamp': datetime.utcnow().isoformat()
        }
        # Implementation for your notification system

Dashboard and Visualization

Creating a Monitoring Dashboard

// Express endpoint for dashboard data
app.get('/dashboard/metrics', async (req, res) => {
    const metrics = {
        health: await monitor.getHealthStatus(),
        performance: performanceTracker.getMetrics(),
        pool: browserPool.getPoolStatus(),
        timeline: {
            labels: generateTimeLabels(60), // Last 60 minutes
            requests: getRequestTimeline(60),
            errors: getErrorTimeline(60),
            responseTime: getResponseTimeTimeline(60)
        }
    };

    res.json(metrics);
});

function generateTimeLabels(minutes) {
    const labels = [];
    const now = new Date();
    for (let i = minutes; i >= 0; i--) {
        const time = new Date(now - i * 60000);
        labels.push(time.toLocaleTimeString());
    }
    return labels;
}

Best Practices for MCP Server Monitoring

  1. Implement Multiple Health Check Levels: Basic, detailed, and readiness checks serve different purposes
  2. Use Structured Logging: JSON-formatted logs enable better parsing and analysis
  3. Track Business Metrics: Monitor scraping success rates, not just technical metrics
  4. Set Appropriate Alerts: Avoid alert fatigue by setting meaningful thresholds
  5. Monitor Resource Trends: Track patterns over time to predict capacity needs
  6. Implement Graceful Degradation: When handling errors in Puppeteer or other browser automation tools, ensure your monitoring captures degraded states
  7. Regular Performance Reviews: Analyze metrics weekly to identify optimization opportunities

Monitoring Checklist

  • [ ] Health check endpoints implemented
  • [ ] Performance metrics collection active
  • [ ] Structured logging configured
  • [ ] Error tracking and alerting set up
  • [ ] Resource utilization monitoring enabled
  • [ ] Dashboard for visualization created
  • [ ] Alert thresholds defined and tested
  • [ ] Log aggregation system configured
  • [ ] Backup monitoring systems in place
  • [ ] Documentation for monitoring procedures updated

Conclusion

Effective monitoring of MCP server health and performance is essential for maintaining reliable web scraping operations. By implementing comprehensive health checks, collecting detailed metrics, using structured logging, and setting up appropriate alerts, you can ensure your MCP servers operate efficiently and handle issues proactively. Regular review of monitoring data helps optimize performance and plan for scaling needs.

Remember that monitoring is not a one-time setup but an ongoing process that should evolve with your infrastructure and requirements. Start with basic health checks and gradually add more sophisticated monitoring as your needs grow.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon