How do I Handle API Rate Limits When Using MCP Servers?
API rate limiting is a critical consideration when building Model Context Protocol (MCP) servers for web scraping and API interactions. Without proper rate limiting mechanisms, your MCP server can overwhelm target APIs, trigger IP blocks, consume credits too quickly, or violate terms of service. This guide covers comprehensive strategies for implementing rate limiting in MCP servers using various techniques and patterns.
Understanding Rate Limits in MCP Contexts
Rate limits can occur at multiple levels when working with MCP servers:
- Target API Limits: The external APIs or websites you're scraping have their own rate limits
- MCP Server Limits: Your own server may need to throttle requests to manage resources
- Client Limits: The AI model or client application may have usage quotas
- Infrastructure Limits: Network bandwidth, database connections, or proxy limits
A robust MCP server implementation must handle all these layers to ensure reliable, sustainable operation.
Basic Rate Limiting Implementation
Token Bucket Algorithm (Python)
The token bucket algorithm is one of the most effective rate limiting strategies. It allows for burst traffic while maintaining average rate limits:
import asyncio
import time
from typing import Optional
from mcp.server import Server
from mcp.types import Tool, TextContent
class TokenBucket:
"""Token bucket rate limiter for controlling request rates."""
def __init__(self, capacity: int, refill_rate: float):
"""
Initialize token bucket.
Args:
capacity: Maximum number of tokens (requests)
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
"""
Attempt to acquire tokens, waiting if necessary.
Args:
tokens: Number of tokens to acquire
Returns:
True when tokens are acquired
"""
async with self.lock:
while self.tokens < tokens:
await self._refill()
if self.tokens < tokens:
# Calculate wait time
wait_time = (tokens - self.tokens) / self.refill_rate
await asyncio.sleep(wait_time)
self.tokens -= tokens
return True
async def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
# Initialize MCP server with rate limiter
app = Server("rate-limited-scraper")
rate_limiter = TokenBucket(capacity=10, refill_rate=2.0) # 10 burst, 2/sec sustained
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute tool with rate limiting."""
if name == "fetch_url":
# Acquire token before making request
await rate_limiter.acquire()
url = arguments["url"]
# Perform the actual HTTP request
async with httpx.AsyncClient() as client:
response = await client.get(url)
return [TextContent(
type="text",
text=response.text
)]
Sliding Window Rate Limiter (JavaScript)
For more precise rate limiting, implement a sliding window counter:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import axios from "axios";
class SlidingWindowRateLimiter {
constructor(maxRequests, windowMs) {
this.maxRequests = maxRequests;
this.windowMs = windowMs;
this.requests = [];
}
async acquire() {
const now = Date.now();
// Remove requests outside the current window
this.requests = this.requests.filter(
timestamp => now - timestamp < this.windowMs
);
// Check if we're at the limit
if (this.requests.length >= this.maxRequests) {
const oldestRequest = this.requests[0];
const waitTime = this.windowMs - (now - oldestRequest);
if (waitTime > 0) {
await new Promise(resolve => setTimeout(resolve, waitTime));
return this.acquire(); // Retry after waiting
}
}
// Add current request timestamp
this.requests.push(now);
return true;
}
getRemaining() {
const now = Date.now();
this.requests = this.requests.filter(
timestamp => now - timestamp < this.windowMs
);
return this.maxRequests - this.requests.length;
}
}
const server = new Server(
{ name: "rate-limited-scraper", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// Create rate limiter: 100 requests per minute
const rateLimiter = new SlidingWindowRateLimiter(100, 60000);
server.setRequestHandler("tools/call", async (request) => {
const { name, arguments: args } = request.params;
if (name === "fetch_data") {
// Wait for rate limiter
await rateLimiter.acquire();
try {
const response = await axios.get(args.url, {
timeout: 10000,
headers: {
'User-Agent': 'MCP-Scraper/1.0'
}
});
return {
content: [{
type: "text",
text: JSON.stringify({
status: response.status,
data: response.data,
remainingRequests: rateLimiter.getRemaining()
})
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `Error: ${error.message}`
}],
isError: true
};
}
}
});
Advanced Rate Limiting Strategies
Per-Domain Rate Limiting
Different APIs have different rate limits. Implement per-domain rate limiting to optimize throughput:
from urllib.parse import urlparse
from collections import defaultdict
import asyncio
class MultiDomainRateLimiter:
"""Manage separate rate limits for different domains."""
def __init__(self):
self.limiters = defaultdict(lambda: TokenBucket(
capacity=5,
refill_rate=1.0
))
self.domain_configs = {
'api.github.com': {'capacity': 60, 'refill_rate': 1.0}, # 60/min
'api.twitter.com': {'capacity': 15, 'refill_rate': 0.25}, # 15/min
'example.com': {'capacity': 10, 'refill_rate': 0.5}, # 30/min
}
def get_limiter(self, url: str) -> TokenBucket:
"""Get or create rate limiter for domain."""
domain = urlparse(url).netloc
# Create configured limiter if not exists
if domain not in self.limiters and domain in self.domain_configs:
config = self.domain_configs[domain]
self.limiters[domain] = TokenBucket(
capacity=config['capacity'],
refill_rate=config['refill_rate']
)
return self.limiters[domain]
async def acquire(self, url: str, tokens: int = 1):
"""Acquire tokens for specific domain."""
limiter = self.get_limiter(url)
await limiter.acquire(tokens)
# Usage in MCP server
domain_limiter = MultiDomainRateLimiter()
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "multi_domain_fetch":
urls = arguments["urls"]
results = []
for url in urls:
await domain_limiter.acquire(url)
async with httpx.AsyncClient() as client:
response = await client.get(url)
results.append({
'url': url,
'status': response.status_code,
'length': len(response.text)
})
return [TextContent(
type="text",
text=json.dumps(results, indent=2)
)]
Exponential Backoff with Retry
Combine rate limiting with exponential backoff for handling rate limit errors, similar to handling errors in Puppeteer:
class ExponentialBackoff {
constructor(baseDelay = 1000, maxDelay = 60000, maxRetries = 5) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
this.maxRetries = maxRetries;
}
async execute(fn, context = 'operation') {
let lastError;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
// Check if it's a rate limit error
if (this.isRateLimitError(error)) {
const delay = Math.min(
this.baseDelay * Math.pow(2, attempt),
this.maxDelay
);
console.error(
`Rate limit hit for ${context}. ` +
`Retry ${attempt + 1}/${this.maxRetries} after ${delay}ms`
);
// Add jitter to prevent thundering herd
const jitter = Math.random() * 0.3 * delay;
await new Promise(resolve =>
setTimeout(resolve, delay + jitter)
);
continue;
}
// Non-rate-limit error, throw immediately
throw error;
}
}
throw new Error(
`Max retries (${this.maxRetries}) exceeded for ${context}: ${lastError.message}`
);
}
isRateLimitError(error) {
// Check for common rate limit indicators
if (error.response) {
const status = error.response.status;
const retryAfter = error.response.headers['retry-after'];
return status === 429 || // Too Many Requests
status === 503 || // Service Unavailable
retryAfter !== undefined;
}
return false;
}
getRetryAfter(error) {
if (error.response?.headers['retry-after']) {
const retryAfter = error.response.headers['retry-after'];
// Can be seconds or HTTP date
return parseInt(retryAfter) * 1000 ||
new Date(retryAfter).getTime() - Date.now();
}
return null;
}
}
// Usage with rate limiter
const backoff = new ExponentialBackoff();
async function fetchWithRateLimit(url, rateLimiter) {
return await backoff.execute(async () => {
await rateLimiter.acquire();
const response = await axios.get(url, {
timeout: 10000,
validateStatus: (status) => status < 500 // Don't throw on 4xx
});
if (response.status === 429) {
const error = new Error('Rate limit exceeded');
error.response = response;
throw error;
}
return response.data;
}, `fetch ${url}`);
}
Respecting Server-Provided Rate Limits
Many APIs return rate limit information in response headers. Parse and respect these:
import httpx
from datetime import datetime, timedelta
class AdaptiveRateLimiter:
"""Rate limiter that adapts based on server responses."""
def __init__(self):
self.limits = {}
self.reset_times = {}
async def make_request(self, url: str, **kwargs) -> httpx.Response:
"""Make HTTP request with adaptive rate limiting."""
domain = urlparse(url).netloc
# Wait if we've hit the limit
if domain in self.reset_times:
now = datetime.now()
if now < self.reset_times[domain]:
wait_seconds = (self.reset_times[domain] - now).total_seconds()
print(f"Rate limited. Waiting {wait_seconds:.1f}s for {domain}")
await asyncio.sleep(wait_seconds)
del self.reset_times[domain]
async with httpx.AsyncClient() as client:
response = await client.get(url, **kwargs)
# Parse rate limit headers
self._update_limits(domain, response.headers)
# Handle 429 specifically
if response.status_code == 429:
retry_after = response.headers.get('retry-after', '60')
wait_seconds = int(retry_after)
self.reset_times[domain] = datetime.now() + timedelta(seconds=wait_seconds)
# Recursive retry after waiting
await asyncio.sleep(wait_seconds)
return await self.make_request(url, **kwargs)
return response
def _update_limits(self, domain: str, headers: dict):
"""Update rate limit info from response headers."""
# GitHub-style headers
if 'x-ratelimit-remaining' in headers:
remaining = int(headers['x-ratelimit-remaining'])
reset_timestamp = int(headers.get('x-ratelimit-reset', 0))
if remaining == 0 and reset_timestamp:
self.reset_times[domain] = datetime.fromtimestamp(reset_timestamp)
# Twitter-style headers
elif 'x-rate-limit-remaining' in headers:
remaining = int(headers['x-rate-limit-remaining'])
reset_timestamp = int(headers.get('x-rate-limit-reset', 0))
if remaining == 0 and reset_timestamp:
self.reset_times[domain] = datetime.fromtimestamp(reset_timestamp)
# Usage in MCP server
adaptive_limiter = AdaptiveRateLimiter()
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "adaptive_fetch":
url = arguments["url"]
try:
response = await adaptive_limiter.make_request(
url,
headers={'User-Agent': 'MCP-Scraper/1.0'}
)
return [TextContent(
type="text",
text=f"Status: {response.status_code}\n\n{response.text}"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
Distributed Rate Limiting with Redis
For production MCP servers running across multiple instances, use Redis for shared rate limiting:
import redis.asyncio as redis
import json
class RedisRateLimiter:
"""Distributed rate limiter using Redis."""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def acquire(
self,
key: str,
max_requests: int,
window_seconds: int
) -> tuple[bool, dict]:
"""
Attempt to acquire rate limit token.
Returns:
(success, info) where info contains limit details
"""
current_time = int(time.time())
window_start = current_time - window_seconds
pipe = self.redis.pipeline()
# Remove old entries outside the window
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in current window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(current_time): current_time})
# Set expiry on the key
pipe.expire(key, window_seconds)
results = await pipe.execute()
request_count = results[1]
if request_count < max_requests:
return True, {
'allowed': True,
'remaining': max_requests - request_count - 1,
'limit': max_requests,
'reset': current_time + window_seconds
}
else:
# Get oldest request time to calculate reset
oldest = await self.redis.zrange(key, 0, 0, withscores=True)
reset_time = int(oldest[0][1]) + window_seconds if oldest else current_time
return False, {
'allowed': False,
'remaining': 0,
'limit': max_requests,
'reset': reset_time,
'retry_after': reset_time - current_time
}
async def check_limit(
self,
key: str,
max_requests: int,
window_seconds: int
) -> dict:
"""Check rate limit status without consuming a token."""
current_time = int(time.time())
window_start = current_time - window_seconds
await self.redis.zremrangebyscore(key, 0, window_start)
count = await self.redis.zcard(key)
return {
'remaining': max_requests - count,
'limit': max_requests,
'used': count
}
# Usage
redis_limiter = RedisRateLimiter('redis://localhost:6379')
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "distributed_fetch":
url = arguments["url"]
domain = urlparse(url).netloc
# Use domain as rate limit key
key = f"ratelimit:{domain}"
allowed, info = await redis_limiter.acquire(
key,
max_requests=100,
window_seconds=60
)
if not allowed:
return [TextContent(
type="text",
text=f"Rate limit exceeded. Retry after {info['retry_after']}s"
)]
# Make the request
async with httpx.AsyncClient() as client:
response = await client.get(url)
return [TextContent(
type="text",
text=f"Success! Remaining: {info['remaining']}/{info['limit']}\n\n{response.text}"
)]
Combining Strategies for Robust Rate Limiting
A production-ready MCP server should combine multiple strategies:
class ComprehensiveRateLimiter {
constructor(config) {
this.config = config;
this.slidingWindow = new SlidingWindowRateLimiter(
config.maxRequests,
config.windowMs
);
this.backoff = new ExponentialBackoff(
config.baseDelay,
config.maxDelay,
config.maxRetries
);
this.domainLimits = new Map();
}
async executeRequest(url, requestFn) {
const domain = new URL(url).hostname;
// Apply global rate limit
await this.slidingWindow.acquire();
// Apply per-domain limit if configured
if (this.config.perDomainLimits?.[domain]) {
const domainLimit = this.getDomainLimiter(domain);
await domainLimit.acquire();
}
// Execute with exponential backoff
return await this.backoff.execute(requestFn, `request to ${domain}`);
}
getDomainLimiter(domain) {
if (!this.domainLimits.has(domain)) {
const config = this.config.perDomainLimits[domain];
this.domainLimits.set(
domain,
new SlidingWindowRateLimiter(config.maxRequests, config.windowMs)
);
}
return this.domainLimits.get(domain);
}
async getStatus() {
return {
global: {
remaining: this.slidingWindow.getRemaining(),
limit: this.config.maxRequests
},
domains: Array.from(this.domainLimits.entries()).map(([domain, limiter]) => ({
domain,
remaining: limiter.getRemaining(),
limit: this.config.perDomainLimits[domain].maxRequests
}))
};
}
}
// Configuration
const limiter = new ComprehensiveRateLimiter({
maxRequests: 1000,
windowMs: 60000,
baseDelay: 1000,
maxDelay: 60000,
maxRetries: 5,
perDomainLimits: {
'api.github.com': { maxRequests: 60, windowMs: 60000 },
'api.example.com': { maxRequests: 100, windowMs: 60000 }
}
});
// MCP tool implementation
server.setRequestHandler("tools/call", async (request) => {
const { name, arguments: args } = request.params;
if (name === "smart_fetch") {
try {
const result = await limiter.executeRequest(
args.url,
async () => {
const response = await axios.get(args.url, {
timeout: 10000,
headers: { 'User-Agent': 'MCP-Scraper/1.0' }
});
return response.data;
}
);
const status = await limiter.getStatus();
return {
content: [{
type: "text",
text: JSON.stringify({
data: result,
rateLimitStatus: status
}, null, 2)
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `Error: ${error.message}`
}],
isError: true
};
}
}
});
Monitoring and Observability
Implement monitoring to track rate limit usage and adjust strategies:
from dataclasses import dataclass
from datetime import datetime
import logging
@dataclass
class RateLimitMetrics:
"""Track rate limiting metrics."""
total_requests: int = 0
throttled_requests: int = 0
retry_count: int = 0
total_wait_time: float = 0.0
errors: int = 0
class MonitoredRateLimiter:
"""Rate limiter with metrics tracking."""
def __init__(self, limiter: TokenBucket):
self.limiter = limiter
self.metrics = RateLimitMetrics()
self.logger = logging.getLogger(__name__)
async def acquire(self, tokens: int = 1) -> bool:
"""Acquire with metrics tracking."""
start_time = time.time()
self.metrics.total_requests += 1
# Check if we need to wait
if self.limiter.tokens < tokens:
self.metrics.throttled_requests += 1
self.logger.info(
f"Rate limit throttling request. "
f"Available: {self.limiter.tokens}, Needed: {tokens}"
)
result = await self.limiter.acquire(tokens)
wait_time = time.time() - start_time
if wait_time > 0.01: # More than 10ms wait
self.metrics.total_wait_time += wait_time
self.logger.debug(f"Waited {wait_time:.3f}s for rate limit")
return result
def get_metrics(self) -> dict:
"""Get current metrics."""
throttle_rate = (
self.metrics.throttled_requests / self.metrics.total_requests
if self.metrics.total_requests > 0
else 0
)
return {
'total_requests': self.metrics.total_requests,
'throttled_requests': self.metrics.throttled_requests,
'throttle_rate': f"{throttle_rate:.2%}",
'total_wait_time': f"{self.metrics.total_wait_time:.2f}s",
'avg_wait_time': (
f"{self.metrics.total_wait_time / self.metrics.throttled_requests:.3f}s"
if self.metrics.throttled_requests > 0
else "0s"
),
'errors': self.metrics.errors
}
def reset_metrics(self):
"""Reset metrics counters."""
self.metrics = RateLimitMetrics()
# Add metrics endpoint to MCP server
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="get_rate_limit_metrics",
description="Get rate limiting statistics and metrics",
inputSchema={
"type": "object",
"properties": {}
}
)
]
monitored_limiter = MonitoredRateLimiter(TokenBucket(10, 2.0))
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "get_rate_limit_metrics":
metrics = monitored_limiter.get_metrics()
return [TextContent(
type="text",
text=json.dumps(metrics, indent=2)
)]
Best Practices
- Start Conservative: Begin with lower rate limits and increase based on monitoring
- Implement Multiple Layers: Combine global, per-domain, and adaptive limiting
- Respect Server Signals: Always honor
Retry-After
and rate limit headers - Add Jitter: Randomize retry delays to prevent thundering herd problems
- Monitor Metrics: Track throttling rates and adjust limits accordingly
- Fail Gracefully: Return informative errors when rate limits are exceeded
- Cache When Possible: Reduce API calls by caching responses appropriately
- Document Limits: Clearly communicate rate limits to MCP clients
Similar to handling timeouts in Puppeteer, proper timeout configuration complements rate limiting by preventing hanging requests that can exhaust your rate limit quota without completing.
Conclusion
Effective rate limiting is essential for building reliable MCP servers that interact with external APIs and websites. By implementing token bucket algorithms, sliding windows, exponential backoff, and adaptive strategies, you can create robust systems that respect API limits while maximizing throughput. Combined with proper monitoring and observability, these techniques ensure your MCP server operates sustainably and avoids service disruptions from rate limit violations.
Remember to test your rate limiting implementation thoroughly, monitor real-world usage patterns, and adjust your strategies based on actual API behavior and requirements. A well-implemented rate limiting system protects both your infrastructure and the services you're accessing.