How do I handle rate limiting when using Requests?
Rate limiting is a crucial aspect of responsible web scraping and API consumption. When servers implement rate limits to prevent abuse, your Python Requests applications need robust strategies to handle these restrictions gracefully. This comprehensive guide covers various techniques to manage rate limiting effectively.
Understanding Rate Limiting
Rate limiting occurs when a server restricts the number of requests a client can make within a specific time period. Common scenarios include:
- HTTP 429 "Too Many Requests" status codes
- API quotas (e.g., 1000 requests per hour)
- Concurrent connection limits
- Burst rate limits (e.g., 10 requests per second)
Basic Rate Limiting with Sleep
The simplest approach is adding delays between requests:
import requests
import time
def fetch_with_delay(urls, delay=1.0):
results = []
for url in urls:
response = requests.get(url)
results.append(response)
time.sleep(delay) # Wait between requests
return results
# Example usage
urls = ['https://api.example.com/data/1', 'https://api.example.com/data/2']
responses = fetch_with_delay(urls, delay=2.0)
Implementing Exponential Backoff
Exponential backoff increases delay times progressively after each failure:
import requests
import time
import random
def exponential_backoff_request(url, max_retries=5, base_delay=1):
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 429:
# Calculate exponential backoff delay
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {delay:.2f} seconds...")
time.sleep(delay)
continue
return response
except requests.RequestException as e:
if attempt == max_retries - 1:
raise e
delay = base_delay * (2 ** attempt)
time.sleep(delay)
raise Exception(f"Max retries exceeded for {url}")
# Example usage
try:
response = exponential_backoff_request('https://api.example.com/data')
print(f"Success: {response.status_code}")
except Exception as e:
print(f"Failed: {e}")
Using urllib3 Retry Strategy
The urllib3
library (used by Requests) provides built-in retry mechanisms:
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
def create_session_with_retries():
session = requests.Session()
# Define retry strategy
retry_strategy = Retry(
total=5,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
# Mount adapter with retry strategy
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Example usage
session = create_session_with_retries()
response = session.get('https://api.example.com/data', timeout=10)
Advanced Rate Limiting with Token Bucket
Implement a token bucket algorithm for precise rate control:
import requests
import time
import threading
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def consume(self, tokens=1):
with self.lock:
now = time.time()
# Add tokens based on time elapsed
tokens_to_add = (now - self.last_refill) * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_tokens(self, tokens=1):
while not self.consume(tokens):
time.sleep(0.1)
class RateLimitedSession:
def __init__(self, requests_per_second=10):
self.bucket = TokenBucket(capacity=10, refill_rate=requests_per_second)
self.session = requests.Session()
def get(self, url, **kwargs):
self.bucket.wait_for_tokens()
return self.session.get(url, **kwargs)
def post(self, url, **kwargs):
self.bucket.wait_for_tokens()
return self.session.post(url, **kwargs)
# Example usage
rate_limited_session = RateLimitedSession(requests_per_second=5)
response = rate_limited_session.get('https://api.example.com/data')
Handling Retry-After Headers
Many APIs provide Retry-After
headers indicating when to retry:
import requests
import time
def respect_retry_after(url, max_retries=3):
for attempt in range(max_retries):
response = requests.get(url)
if response.status_code == 429:
retry_after = response.headers.get('Retry-After')
if retry_after:
# Retry-After can be in seconds or HTTP date
try:
delay = int(retry_after)
except ValueError:
# Parse HTTP date format if needed
delay = 60 # Default fallback
print(f"Rate limited. Retrying after {delay} seconds...")
time.sleep(delay)
continue
return response
raise Exception("Max retries exceeded")
# Example usage
response = respect_retry_after('https://api.example.com/data')
Concurrent Requests with Rate Limiting
Combine threading with rate limiting for efficient concurrent processing:
import requests
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
class ConcurrentRateLimiter:
def __init__(self, max_workers=5, requests_per_second=10):
self.max_workers = max_workers
self.request_queue = Queue()
self.rate_limit_delay = 1.0 / requests_per_second
self.last_request_time = 0
self.lock = threading.Lock()
def rate_limited_request(self, url):
with self.lock:
now = time.time()
time_since_last = now - self.last_request_time
if time_since_last < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
return requests.get(url)
def fetch_urls(self, urls):
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_url = {
executor.submit(self.rate_limited_request, url): url
for url in urls
}
# Collect results
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
response = future.result()
results.append((url, response))
except Exception as e:
results.append((url, e))
return results
# Example usage
limiter = ConcurrentRateLimiter(max_workers=3, requests_per_second=2)
urls = [f'https://api.example.com/data/{i}' for i in range(10)]
results = limiter.fetch_urls(urls)
Monitoring and Logging Rate Limits
Implement comprehensive logging to track rate limiting behavior:
import requests
import time
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RateLimitMonitor:
def __init__(self):
self.request_count = 0
self.rate_limit_count = 0
self.start_time = time.time()
def make_request(self, url, **kwargs):
self.request_count += 1
start_time = time.time()
try:
response = requests.get(url, **kwargs)
duration = time.time() - start_time
# Log request details
logger.info(
f"Request #{self.request_count}: {response.status_code} "
f"({duration:.2f}s) - {url}"
)
# Handle rate limiting
if response.status_code == 429:
self.rate_limit_count += 1
retry_after = response.headers.get('Retry-After', '60')
logger.warning(
f"Rate limited! Count: {self.rate_limit_count}, "
f"Retry-After: {retry_after}s"
)
time.sleep(int(retry_after))
return self.make_request(url, **kwargs) # Retry
return response
except requests.RequestException as e:
logger.error(f"Request failed: {e}")
raise
def get_stats(self):
elapsed = time.time() - self.start_time
return {
'total_requests': self.request_count,
'rate_limits_hit': self.rate_limit_count,
'elapsed_time': elapsed,
'requests_per_second': self.request_count / elapsed if elapsed > 0 else 0
}
# Example usage
monitor = RateLimitMonitor()
response = monitor.make_request('https://api.example.com/data')
print(monitor.get_stats())
Best Practices for Rate Limiting
1. Respect Server Policies
Always check the API documentation for rate limiting policies and implement appropriate delays.
2. Use Session Objects
Reuse session objects to maintain connection pools and cookies:
session = requests.Session()
session.headers.update({'User-Agent': 'MyApp/1.0'})
# Use session for all requests
response = session.get('https://api.example.com/data')
3. Implement Circuit Breakers
Stop making requests temporarily when encountering persistent failures:
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.failure_count = 0
self.state = 'CLOSED'
return result
except Exception as e:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
self.last_failure_time = time.time()
raise e
Integration with Web Scraping Workflows
When building comprehensive scraping solutions, consider integrating rate limiting with other tools. For example, when handling timeouts in browser automation, you can apply similar backoff strategies to manage both network delays and rate limits effectively.
For complex scenarios involving parallel processing, implement distributed rate limiting across multiple workers to ensure compliance with server policies while maximizing throughput.
Conclusion
Effective rate limiting is essential for building robust, respectful web scraping applications. By implementing exponential backoff, respecting Retry-After
headers, and using appropriate delay mechanisms, you can handle rate limits gracefully while maintaining good relationships with target servers.
Remember to always: - Monitor your request patterns and adjust rates accordingly - Implement proper error handling and logging - Respect robots.txt and API terms of service - Use sessions and connection pooling for efficiency - Test your rate limiting logic under various scenarios
These strategies will help you build reliable applications that can handle rate limiting challenges while maintaining optimal performance.