Table of contents

How do I Handle API Rate Limits When Using MCP Servers?

API rate limiting is a critical consideration when building Model Context Protocol (MCP) servers for web scraping and API interactions. Without proper rate limiting mechanisms, your MCP server can overwhelm target APIs, trigger IP blocks, consume credits too quickly, or violate terms of service. This guide covers comprehensive strategies for implementing rate limiting in MCP servers using various techniques and patterns.

Understanding Rate Limits in MCP Contexts

Rate limits can occur at multiple levels when working with MCP servers:

  1. Target API Limits: The external APIs or websites you're scraping have their own rate limits
  2. MCP Server Limits: Your own server may need to throttle requests to manage resources
  3. Client Limits: The AI model or client application may have usage quotas
  4. Infrastructure Limits: Network bandwidth, database connections, or proxy limits

A robust MCP server implementation must handle all these layers to ensure reliable, sustainable operation.

Basic Rate Limiting Implementation

Token Bucket Algorithm (Python)

The token bucket algorithm is one of the most effective rate limiting strategies. It allows for burst traffic while maintaining average rate limits:

import asyncio
import time
from typing import Optional
from mcp.server import Server
from mcp.types import Tool, TextContent

class TokenBucket:
    """Token bucket rate limiter for controlling request rates."""

    def __init__(self, capacity: int, refill_rate: float):
        """
        Initialize token bucket.

        Args:
            capacity: Maximum number of tokens (requests)
            refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1) -> bool:
        """
        Attempt to acquire tokens, waiting if necessary.

        Args:
            tokens: Number of tokens to acquire

        Returns:
            True when tokens are acquired
        """
        async with self.lock:
            while self.tokens < tokens:
                await self._refill()
                if self.tokens < tokens:
                    # Calculate wait time
                    wait_time = (tokens - self.tokens) / self.refill_rate
                    await asyncio.sleep(wait_time)

            self.tokens -= tokens
            return True

    async def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate

        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# Initialize MCP server with rate limiter
app = Server("rate-limited-scraper")
rate_limiter = TokenBucket(capacity=10, refill_rate=2.0)  # 10 burst, 2/sec sustained

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute tool with rate limiting."""
    if name == "fetch_url":
        # Acquire token before making request
        await rate_limiter.acquire()

        url = arguments["url"]
        # Perform the actual HTTP request
        async with httpx.AsyncClient() as client:
            response = await client.get(url)

        return [TextContent(
            type="text",
            text=response.text
        )]

Sliding Window Rate Limiter (JavaScript)

For more precise rate limiting, implement a sliding window counter:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import axios from "axios";

class SlidingWindowRateLimiter {
  constructor(maxRequests, windowMs) {
    this.maxRequests = maxRequests;
    this.windowMs = windowMs;
    this.requests = [];
  }

  async acquire() {
    const now = Date.now();

    // Remove requests outside the current window
    this.requests = this.requests.filter(
      timestamp => now - timestamp < this.windowMs
    );

    // Check if we're at the limit
    if (this.requests.length >= this.maxRequests) {
      const oldestRequest = this.requests[0];
      const waitTime = this.windowMs - (now - oldestRequest);

      if (waitTime > 0) {
        await new Promise(resolve => setTimeout(resolve, waitTime));
        return this.acquire(); // Retry after waiting
      }
    }

    // Add current request timestamp
    this.requests.push(now);
    return true;
  }

  getRemaining() {
    const now = Date.now();
    this.requests = this.requests.filter(
      timestamp => now - timestamp < this.windowMs
    );
    return this.maxRequests - this.requests.length;
  }
}

const server = new Server(
  { name: "rate-limited-scraper", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

// Create rate limiter: 100 requests per minute
const rateLimiter = new SlidingWindowRateLimiter(100, 60000);

server.setRequestHandler("tools/call", async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "fetch_data") {
    // Wait for rate limiter
    await rateLimiter.acquire();

    try {
      const response = await axios.get(args.url, {
        timeout: 10000,
        headers: {
          'User-Agent': 'MCP-Scraper/1.0'
        }
      });

      return {
        content: [{
          type: "text",
          text: JSON.stringify({
            status: response.status,
            data: response.data,
            remainingRequests: rateLimiter.getRemaining()
          })
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: "text",
          text: `Error: ${error.message}`
        }],
        isError: true
      };
    }
  }
});

Advanced Rate Limiting Strategies

Per-Domain Rate Limiting

Different APIs have different rate limits. Implement per-domain rate limiting to optimize throughput:

from urllib.parse import urlparse
from collections import defaultdict
import asyncio

class MultiDomainRateLimiter:
    """Manage separate rate limits for different domains."""

    def __init__(self):
        self.limiters = defaultdict(lambda: TokenBucket(
            capacity=5,
            refill_rate=1.0
        ))
        self.domain_configs = {
            'api.github.com': {'capacity': 60, 'refill_rate': 1.0},  # 60/min
            'api.twitter.com': {'capacity': 15, 'refill_rate': 0.25},  # 15/min
            'example.com': {'capacity': 10, 'refill_rate': 0.5},  # 30/min
        }

    def get_limiter(self, url: str) -> TokenBucket:
        """Get or create rate limiter for domain."""
        domain = urlparse(url).netloc

        # Create configured limiter if not exists
        if domain not in self.limiters and domain in self.domain_configs:
            config = self.domain_configs[domain]
            self.limiters[domain] = TokenBucket(
                capacity=config['capacity'],
                refill_rate=config['refill_rate']
            )

        return self.limiters[domain]

    async def acquire(self, url: str, tokens: int = 1):
        """Acquire tokens for specific domain."""
        limiter = self.get_limiter(url)
        await limiter.acquire(tokens)

# Usage in MCP server
domain_limiter = MultiDomainRateLimiter()

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "multi_domain_fetch":
        urls = arguments["urls"]
        results = []

        for url in urls:
            await domain_limiter.acquire(url)

            async with httpx.AsyncClient() as client:
                response = await client.get(url)
                results.append({
                    'url': url,
                    'status': response.status_code,
                    'length': len(response.text)
                })

        return [TextContent(
            type="text",
            text=json.dumps(results, indent=2)
        )]

Exponential Backoff with Retry

Combine rate limiting with exponential backoff for handling rate limit errors, similar to handling errors in Puppeteer:

class ExponentialBackoff {
  constructor(baseDelay = 1000, maxDelay = 60000, maxRetries = 5) {
    this.baseDelay = baseDelay;
    this.maxDelay = maxDelay;
    this.maxRetries = maxRetries;
  }

  async execute(fn, context = 'operation') {
    let lastError;

    for (let attempt = 0; attempt < this.maxRetries; attempt++) {
      try {
        return await fn();
      } catch (error) {
        lastError = error;

        // Check if it's a rate limit error
        if (this.isRateLimitError(error)) {
          const delay = Math.min(
            this.baseDelay * Math.pow(2, attempt),
            this.maxDelay
          );

          console.error(
            `Rate limit hit for ${context}. ` +
            `Retry ${attempt + 1}/${this.maxRetries} after ${delay}ms`
          );

          // Add jitter to prevent thundering herd
          const jitter = Math.random() * 0.3 * delay;
          await new Promise(resolve =>
            setTimeout(resolve, delay + jitter)
          );

          continue;
        }

        // Non-rate-limit error, throw immediately
        throw error;
      }
    }

    throw new Error(
      `Max retries (${this.maxRetries}) exceeded for ${context}: ${lastError.message}`
    );
  }

  isRateLimitError(error) {
    // Check for common rate limit indicators
    if (error.response) {
      const status = error.response.status;
      const retryAfter = error.response.headers['retry-after'];

      return status === 429 || // Too Many Requests
             status === 503 ||  // Service Unavailable
             retryAfter !== undefined;
    }

    return false;
  }

  getRetryAfter(error) {
    if (error.response?.headers['retry-after']) {
      const retryAfter = error.response.headers['retry-after'];
      // Can be seconds or HTTP date
      return parseInt(retryAfter) * 1000 ||
             new Date(retryAfter).getTime() - Date.now();
    }
    return null;
  }
}

// Usage with rate limiter
const backoff = new ExponentialBackoff();

async function fetchWithRateLimit(url, rateLimiter) {
  return await backoff.execute(async () => {
    await rateLimiter.acquire();

    const response = await axios.get(url, {
      timeout: 10000,
      validateStatus: (status) => status < 500 // Don't throw on 4xx
    });

    if (response.status === 429) {
      const error = new Error('Rate limit exceeded');
      error.response = response;
      throw error;
    }

    return response.data;
  }, `fetch ${url}`);
}

Respecting Server-Provided Rate Limits

Many APIs return rate limit information in response headers. Parse and respect these:

import httpx
from datetime import datetime, timedelta

class AdaptiveRateLimiter:
    """Rate limiter that adapts based on server responses."""

    def __init__(self):
        self.limits = {}
        self.reset_times = {}

    async def make_request(self, url: str, **kwargs) -> httpx.Response:
        """Make HTTP request with adaptive rate limiting."""
        domain = urlparse(url).netloc

        # Wait if we've hit the limit
        if domain in self.reset_times:
            now = datetime.now()
            if now < self.reset_times[domain]:
                wait_seconds = (self.reset_times[domain] - now).total_seconds()
                print(f"Rate limited. Waiting {wait_seconds:.1f}s for {domain}")
                await asyncio.sleep(wait_seconds)
                del self.reset_times[domain]

        async with httpx.AsyncClient() as client:
            response = await client.get(url, **kwargs)

            # Parse rate limit headers
            self._update_limits(domain, response.headers)

            # Handle 429 specifically
            if response.status_code == 429:
                retry_after = response.headers.get('retry-after', '60')
                wait_seconds = int(retry_after)
                self.reset_times[domain] = datetime.now() + timedelta(seconds=wait_seconds)

                # Recursive retry after waiting
                await asyncio.sleep(wait_seconds)
                return await self.make_request(url, **kwargs)

            return response

    def _update_limits(self, domain: str, headers: dict):
        """Update rate limit info from response headers."""
        # GitHub-style headers
        if 'x-ratelimit-remaining' in headers:
            remaining = int(headers['x-ratelimit-remaining'])
            reset_timestamp = int(headers.get('x-ratelimit-reset', 0))

            if remaining == 0 and reset_timestamp:
                self.reset_times[domain] = datetime.fromtimestamp(reset_timestamp)

        # Twitter-style headers
        elif 'x-rate-limit-remaining' in headers:
            remaining = int(headers['x-rate-limit-remaining'])
            reset_timestamp = int(headers.get('x-rate-limit-reset', 0))

            if remaining == 0 and reset_timestamp:
                self.reset_times[domain] = datetime.fromtimestamp(reset_timestamp)

# Usage in MCP server
adaptive_limiter = AdaptiveRateLimiter()

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "adaptive_fetch":
        url = arguments["url"]

        try:
            response = await adaptive_limiter.make_request(
                url,
                headers={'User-Agent': 'MCP-Scraper/1.0'}
            )

            return [TextContent(
                type="text",
                text=f"Status: {response.status_code}\n\n{response.text}"
            )]
        except Exception as e:
            return [TextContent(
                type="text",
                text=f"Error: {str(e)}"
            )]

Distributed Rate Limiting with Redis

For production MCP servers running across multiple instances, use Redis for shared rate limiting:

import redis.asyncio as redis
import json

class RedisRateLimiter:
    """Distributed rate limiter using Redis."""

    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)

    async def acquire(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> tuple[bool, dict]:
        """
        Attempt to acquire rate limit token.

        Returns:
            (success, info) where info contains limit details
        """
        current_time = int(time.time())
        window_start = current_time - window_seconds

        pipe = self.redis.pipeline()

        # Remove old entries outside the window
        pipe.zremrangebyscore(key, 0, window_start)

        # Count requests in current window
        pipe.zcard(key)

        # Add current request
        pipe.zadd(key, {str(current_time): current_time})

        # Set expiry on the key
        pipe.expire(key, window_seconds)

        results = await pipe.execute()
        request_count = results[1]

        if request_count < max_requests:
            return True, {
                'allowed': True,
                'remaining': max_requests - request_count - 1,
                'limit': max_requests,
                'reset': current_time + window_seconds
            }
        else:
            # Get oldest request time to calculate reset
            oldest = await self.redis.zrange(key, 0, 0, withscores=True)
            reset_time = int(oldest[0][1]) + window_seconds if oldest else current_time

            return False, {
                'allowed': False,
                'remaining': 0,
                'limit': max_requests,
                'reset': reset_time,
                'retry_after': reset_time - current_time
            }

    async def check_limit(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> dict:
        """Check rate limit status without consuming a token."""
        current_time = int(time.time())
        window_start = current_time - window_seconds

        await self.redis.zremrangebyscore(key, 0, window_start)
        count = await self.redis.zcard(key)

        return {
            'remaining': max_requests - count,
            'limit': max_requests,
            'used': count
        }

# Usage
redis_limiter = RedisRateLimiter('redis://localhost:6379')

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "distributed_fetch":
        url = arguments["url"]
        domain = urlparse(url).netloc

        # Use domain as rate limit key
        key = f"ratelimit:{domain}"
        allowed, info = await redis_limiter.acquire(
            key,
            max_requests=100,
            window_seconds=60
        )

        if not allowed:
            return [TextContent(
                type="text",
                text=f"Rate limit exceeded. Retry after {info['retry_after']}s"
            )]

        # Make the request
        async with httpx.AsyncClient() as client:
            response = await client.get(url)

        return [TextContent(
            type="text",
            text=f"Success! Remaining: {info['remaining']}/{info['limit']}\n\n{response.text}"
        )]

Combining Strategies for Robust Rate Limiting

A production-ready MCP server should combine multiple strategies:

class ComprehensiveRateLimiter {
  constructor(config) {
    this.config = config;
    this.slidingWindow = new SlidingWindowRateLimiter(
      config.maxRequests,
      config.windowMs
    );
    this.backoff = new ExponentialBackoff(
      config.baseDelay,
      config.maxDelay,
      config.maxRetries
    );
    this.domainLimits = new Map();
  }

  async executeRequest(url, requestFn) {
    const domain = new URL(url).hostname;

    // Apply global rate limit
    await this.slidingWindow.acquire();

    // Apply per-domain limit if configured
    if (this.config.perDomainLimits?.[domain]) {
      const domainLimit = this.getDomainLimiter(domain);
      await domainLimit.acquire();
    }

    // Execute with exponential backoff
    return await this.backoff.execute(requestFn, `request to ${domain}`);
  }

  getDomainLimiter(domain) {
    if (!this.domainLimits.has(domain)) {
      const config = this.config.perDomainLimits[domain];
      this.domainLimits.set(
        domain,
        new SlidingWindowRateLimiter(config.maxRequests, config.windowMs)
      );
    }
    return this.domainLimits.get(domain);
  }

  async getStatus() {
    return {
      global: {
        remaining: this.slidingWindow.getRemaining(),
        limit: this.config.maxRequests
      },
      domains: Array.from(this.domainLimits.entries()).map(([domain, limiter]) => ({
        domain,
        remaining: limiter.getRemaining(),
        limit: this.config.perDomainLimits[domain].maxRequests
      }))
    };
  }
}

// Configuration
const limiter = new ComprehensiveRateLimiter({
  maxRequests: 1000,
  windowMs: 60000,
  baseDelay: 1000,
  maxDelay: 60000,
  maxRetries: 5,
  perDomainLimits: {
    'api.github.com': { maxRequests: 60, windowMs: 60000 },
    'api.example.com': { maxRequests: 100, windowMs: 60000 }
  }
});

// MCP tool implementation
server.setRequestHandler("tools/call", async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "smart_fetch") {
    try {
      const result = await limiter.executeRequest(
        args.url,
        async () => {
          const response = await axios.get(args.url, {
            timeout: 10000,
            headers: { 'User-Agent': 'MCP-Scraper/1.0' }
          });
          return response.data;
        }
      );

      const status = await limiter.getStatus();

      return {
        content: [{
          type: "text",
          text: JSON.stringify({
            data: result,
            rateLimitStatus: status
          }, null, 2)
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: "text",
          text: `Error: ${error.message}`
        }],
        isError: true
      };
    }
  }
});

Monitoring and Observability

Implement monitoring to track rate limit usage and adjust strategies:

from dataclasses import dataclass
from datetime import datetime
import logging

@dataclass
class RateLimitMetrics:
    """Track rate limiting metrics."""
    total_requests: int = 0
    throttled_requests: int = 0
    retry_count: int = 0
    total_wait_time: float = 0.0
    errors: int = 0

class MonitoredRateLimiter:
    """Rate limiter with metrics tracking."""

    def __init__(self, limiter: TokenBucket):
        self.limiter = limiter
        self.metrics = RateLimitMetrics()
        self.logger = logging.getLogger(__name__)

    async def acquire(self, tokens: int = 1) -> bool:
        """Acquire with metrics tracking."""
        start_time = time.time()
        self.metrics.total_requests += 1

        # Check if we need to wait
        if self.limiter.tokens < tokens:
            self.metrics.throttled_requests += 1
            self.logger.info(
                f"Rate limit throttling request. "
                f"Available: {self.limiter.tokens}, Needed: {tokens}"
            )

        result = await self.limiter.acquire(tokens)

        wait_time = time.time() - start_time
        if wait_time > 0.01:  # More than 10ms wait
            self.metrics.total_wait_time += wait_time
            self.logger.debug(f"Waited {wait_time:.3f}s for rate limit")

        return result

    def get_metrics(self) -> dict:
        """Get current metrics."""
        throttle_rate = (
            self.metrics.throttled_requests / self.metrics.total_requests
            if self.metrics.total_requests > 0
            else 0
        )

        return {
            'total_requests': self.metrics.total_requests,
            'throttled_requests': self.metrics.throttled_requests,
            'throttle_rate': f"{throttle_rate:.2%}",
            'total_wait_time': f"{self.metrics.total_wait_time:.2f}s",
            'avg_wait_time': (
                f"{self.metrics.total_wait_time / self.metrics.throttled_requests:.3f}s"
                if self.metrics.throttled_requests > 0
                else "0s"
            ),
            'errors': self.metrics.errors
        }

    def reset_metrics(self):
        """Reset metrics counters."""
        self.metrics = RateLimitMetrics()

# Add metrics endpoint to MCP server
@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="get_rate_limit_metrics",
            description="Get rate limiting statistics and metrics",
            inputSchema={
                "type": "object",
                "properties": {}
            }
        )
    ]

monitored_limiter = MonitoredRateLimiter(TokenBucket(10, 2.0))

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "get_rate_limit_metrics":
        metrics = monitored_limiter.get_metrics()
        return [TextContent(
            type="text",
            text=json.dumps(metrics, indent=2)
        )]

Best Practices

  1. Start Conservative: Begin with lower rate limits and increase based on monitoring
  2. Implement Multiple Layers: Combine global, per-domain, and adaptive limiting
  3. Respect Server Signals: Always honor Retry-After and rate limit headers
  4. Add Jitter: Randomize retry delays to prevent thundering herd problems
  5. Monitor Metrics: Track throttling rates and adjust limits accordingly
  6. Fail Gracefully: Return informative errors when rate limits are exceeded
  7. Cache When Possible: Reduce API calls by caching responses appropriately
  8. Document Limits: Clearly communicate rate limits to MCP clients

Similar to handling timeouts in Puppeteer, proper timeout configuration complements rate limiting by preventing hanging requests that can exhaust your rate limit quota without completing.

Conclusion

Effective rate limiting is essential for building reliable MCP servers that interact with external APIs and websites. By implementing token bucket algorithms, sliding windows, exponential backoff, and adaptive strategies, you can create robust systems that respect API limits while maximizing throughput. Combined with proper monitoring and observability, these techniques ensure your MCP server operates sustainably and avoids service disruptions from rate limit violations.

Remember to test your rate limiting implementation thoroughly, monitor real-world usage patterns, and adjust your strategies based on actual API behavior and requirements. A well-implemented rate limiting system protects both your infrastructure and the services you're accessing.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon