How do I implement rate limiting when using urllib3?
Rate limiting is a crucial technique when working with urllib3 to prevent overwhelming target servers and avoid getting blocked or throttled. This comprehensive guide covers multiple approaches to implement rate limiting effectively in your urllib3-based applications.
Understanding Rate Limiting in Web Scraping
Rate limiting controls the frequency of HTTP requests sent to a server within a specific time window. It's essential for:
- Respecting server resources and preventing overload
- Avoiding IP bans and blocking mechanisms
- Maintaining ethical scraping practices
- Ensuring consistent data collection over time
Basic Rate Limiting with time.sleep()
The simplest approach uses Python's time.sleep()
function to introduce delays between requests:
import urllib3
import time
# Create a PoolManager instance
http = urllib3.PoolManager()
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3'
]
# Basic rate limiting with fixed delay
for url in urls:
response = http.request('GET', url)
print(f"Status: {response.status}, URL: {url}")
# Wait 1 second between requests
time.sleep(1)
While simple, this approach lacks flexibility and doesn't account for varying response times.
Advanced Rate Limiting with Token Bucket Algorithm
The token bucket algorithm provides more sophisticated rate limiting by allowing burst requests while maintaining an average rate:
import urllib3
import time
import threading
from collections import deque
class TokenBucket:
def __init__(self, capacity, refill_rate):
"""
Token bucket rate limiter
Args:
capacity: Maximum number of tokens in bucket
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def consume(self, tokens=1):
"""Attempt to consume tokens from bucket"""
with self.lock:
now = time.time()
# Add tokens based on elapsed time
tokens_to_add = (now - self.last_refill) * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_token(self, tokens=1):
"""Wait until enough tokens are available"""
while not self.consume(tokens):
time.sleep(0.01) # Small sleep to prevent busy waiting
# Example usage with urllib3
http = urllib3.PoolManager()
rate_limiter = TokenBucket(capacity=10, refill_rate=2.0) # 2 requests per second
urls = ['https://httpbin.org/delay/1'] * 20
for i, url in enumerate(urls):
rate_limiter.wait_for_token()
start_time = time.time()
response = http.request('GET', url)
end_time = time.time()
print(f"Request {i+1}: Status {response.status}, "
f"Time: {end_time - start_time:.2f}s")
Rate Limiting with Threading and Queue
For concurrent requests with rate limiting, combine threading with a controlled queue system:
import urllib3
import time
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
class RateLimitedRequester:
def __init__(self, max_workers=5, requests_per_second=2):
self.http = urllib3.PoolManager()
self.max_workers = max_workers
self.requests_per_second = requests_per_second
self.last_request_time = 0
self.lock = threading.Lock()
def _rate_limited_request(self, method, url, **kwargs):
"""Make a rate-limited HTTP request"""
with self.lock:
current_time = time.time()
time_since_last = current_time - self.last_request_time
min_interval = 1.0 / self.requests_per_second
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
return self.http.request(method, url, **kwargs)
def get(self, url, **kwargs):
"""Make a GET request with rate limiting"""
return self._rate_limited_request('GET', url, **kwargs)
def post(self, url, **kwargs):
"""Make a POST request with rate limiting"""
return self._rate_limited_request('POST', url, **kwargs)
def bulk_request(self, urls):
"""Make multiple requests with rate limiting and threading"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.get, url) for url in urls]
for future in futures:
try:
response = future.result()
results.append({
'status': response.status,
'url': response.geturl(),
'success': True
})
except Exception as e:
results.append({
'error': str(e),
'success': False
})
return results
# Usage example
requester = RateLimitedRequester(max_workers=3, requests_per_second=1.5)
urls = [
'https://httpbin.org/json',
'https://httpbin.org/headers',
'https://httpbin.org/user-agent',
'https://httpbin.org/ip'
]
results = requester.bulk_request(urls)
for result in results:
if result['success']:
print(f"Success: {result['url']} - Status: {result['status']}")
else:
print(f"Error: {result['error']}")
Adaptive Rate Limiting Based on Response Status
Implement intelligent rate limiting that adjusts based on server responses:
import urllib3
import time
import random
class AdaptiveRateLimiter:
def __init__(self, initial_delay=1.0, max_delay=60.0):
self.current_delay = initial_delay
self.initial_delay = initial_delay
self.max_delay = max_delay
self.success_count = 0
self.http = urllib3.PoolManager()
def adjust_delay(self, status_code):
"""Adjust delay based on response status"""
if status_code == 429: # Too Many Requests
self.current_delay = min(self.current_delay * 2, self.max_delay)
self.success_count = 0
print(f"Rate limited! Increasing delay to {self.current_delay}s")
elif 500 <= status_code < 600: # Server errors
self.current_delay = min(self.current_delay * 1.5, self.max_delay)
self.success_count = 0
print(f"Server error! Increasing delay to {self.current_delay}s")
elif status_code == 200:
self.success_count += 1
# Decrease delay after successful requests
if self.success_count >= 5:
self.current_delay = max(
self.current_delay * 0.9,
self.initial_delay
)
self.success_count = 0
def request_with_backoff(self, method, url, max_retries=3, **kwargs):
"""Make request with adaptive rate limiting and backoff"""
for attempt in range(max_retries + 1):
try:
# Apply current delay with some jitter
jitter = random.uniform(0.8, 1.2)
time.sleep(self.current_delay * jitter)
response = self.http.request(method, url, **kwargs)
self.adjust_delay(response.status)
if response.status == 429 and attempt < max_retries:
print(f"Retrying in {self.current_delay}s...")
continue
return response
except urllib3.exceptions.RequestError as e:
if attempt < max_retries:
self.current_delay = min(self.current_delay * 2, self.max_delay)
print(f"Request error, retrying in {self.current_delay}s...")
time.sleep(self.current_delay)
else:
raise e
return None
# Usage example
adaptive_limiter = AdaptiveRateLimiter(initial_delay=0.5, max_delay=30.0)
# Simulate requests that might trigger rate limiting
test_urls = [
'https://httpbin.org/status/200',
'https://httpbin.org/status/429', # Simulates rate limiting
'https://httpbin.org/status/200',
'https://httpbin.org/status/500', # Simulates server error
'https://httpbin.org/status/200'
]
for url in test_urls:
response = adaptive_limiter.request_with_backoff('GET', url)
if response:
print(f"Response: {response.status} from {url}")
Rate Limiting with External Libraries
For production applications, consider using dedicated rate limiting libraries:
Using ratelimit library
pip install ratelimit
import urllib3
from ratelimit import limits, sleep_and_retry
import time
# Rate limit: 30 calls per minute
@sleep_and_retry
@limits(calls=30, period=60)
def make_request(http, url):
"""Rate-limited request function"""
return http.request('GET', url)
http = urllib3.PoolManager()
urls = ['https://httpbin.org/json'] * 50
for i, url in enumerate(urls):
response = make_request(http, url)
print(f"Request {i+1}: Status {response.status}")
Using pyrate-limiter
pip install pyrate-limiter
import urllib3
from pyrate_limiter import Duration, RequestRate, Limiter
# Create a limiter: 10 requests per 5 seconds
limiter = Limiter(RequestRate(10, Duration.SECOND * 5))
http = urllib3.PoolManager()
@limiter.ratelimit("api_requests", delay=True)
def fetch_data(url):
"""Rate-limited data fetching"""
response = http.request('GET', url)
return {
'status': response.status,
'data': response.data.decode('utf-8')[:100] + '...'
}
# Example usage
urls = ['https://httpbin.org/json'] * 25
for i, url in enumerate(urls):
result = fetch_data(url)
print(f"Request {i+1}: Status {result['status']}")
Best Practices for Rate Limiting
1. Respect robots.txt and Crawl-Delay
import urllib3
import urllib.robotparser
import urllib.parse
def get_crawl_delay(url):
"""Extract crawl delay from robots.txt"""
try:
rp = urllib.robotparser.RobotFileParser()
robots_url = urllib.parse.urljoin(url, '/robots.txt')
rp.set_url(robots_url)
rp.read()
# Get crawl delay for all user agents
delay = rp.crawl_delay('*')
return delay if delay else 1.0
except:
return 1.0 # Default delay
# Usage in rate-limited requests
http = urllib3.PoolManager()
base_url = 'https://example.com'
crawl_delay = get_crawl_delay(base_url)
print(f"Using crawl delay: {crawl_delay} seconds")
# Apply this delay in your rate limiting logic
2. Monitor Response Headers
def check_rate_limit_headers(response):
"""Monitor rate limiting headers from server"""
headers = response.headers
rate_limit_info = {
'limit': headers.get('X-RateLimit-Limit'),
'remaining': headers.get('X-RateLimit-Remaining'),
'reset': headers.get('X-RateLimit-Reset'),
'retry_after': headers.get('Retry-After')
}
# Calculate wait time based on headers
if rate_limit_info['retry_after']:
return int(rate_limit_info['retry_after'])
elif rate_limit_info['remaining'] == '0' and rate_limit_info['reset']:
reset_time = int(rate_limit_info['reset'])
wait_time = reset_time - int(time.time())
return max(wait_time, 0)
return 0
# Example usage
response = http.request('GET', 'https://api.example.com/data')
wait_time = check_rate_limit_headers(response)
if wait_time > 0:
print(f"Rate limit hit, waiting {wait_time} seconds")
time.sleep(wait_time)
Common Pitfalls and Solutions
Avoiding Thundering Herd
When running multiple processes, add randomization to prevent simultaneous requests:
import random
def jittered_delay(base_delay, jitter_factor=0.1):
"""Add jitter to prevent thundering herd"""
jitter = random.uniform(-jitter_factor, jitter_factor)
return base_delay * (1 + jitter)
# Apply jittered delays
for url in urls:
delay = jittered_delay(1.0, 0.2) # 1 second ± 20%
time.sleep(delay)
response = http.request('GET', url)
Implementing Circuit Breaker Pattern
Protect against repeated failures with a circuit breaker:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = 1
OPEN = 2
HALF_OPEN = 3
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def can_request(self):
"""Check if request is allowed"""
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
else: # HALF_OPEN
return True
def record_success(self):
"""Record successful request"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
"""Record failed request"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage with urllib3
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
http = urllib3.PoolManager()
def protected_request(url):
"""Make request with circuit breaker protection"""
if not circuit_breaker.can_request():
raise Exception("Circuit breaker is open")
try:
response = http.request('GET', url, timeout=10)
if response.status < 500:
circuit_breaker.record_success()
else:
circuit_breaker.record_failure()
return response
except Exception as e:
circuit_breaker.record_failure()
raise e
Integration with Connection Pooling
Combine rate limiting with urllib3's connection pooling for optimal performance:
import urllib3
import time
from threading import Lock
class PooledRateLimiter:
def __init__(self, pool_size=10, requests_per_second=5):
# Configure connection pool
self.http = urllib3.PoolManager(
num_pools=pool_size,
maxsize=pool_size,
block=True
)
self.requests_per_second = requests_per_second
self.min_interval = 1.0 / requests_per_second
self.last_request_time = 0
self.lock = Lock()
def request(self, method, url, **kwargs):
"""Make rate-limited request using connection pool"""
with self.lock:
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_interval:
sleep_time = self.min_interval - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
# Make request using pooled connection
return self.http.request(method, url, **kwargs)
# Usage example
limiter = PooledRateLimiter(pool_size=20, requests_per_second=3)
urls = ['https://httpbin.org/json'] * 50
for i, url in enumerate(urls):
response = limiter.request('GET', url)
print(f"Request {i+1}: Status {response.status}")
Conclusion
Implementing effective rate limiting with urllib3 requires choosing the right strategy for your use case. Start with simple time-based delays for basic scenarios, then progress to token bucket algorithms or adaptive rate limiting for more sophisticated applications. Always monitor server responses and respect rate limiting headers to maintain ethical scraping practices.
For complex scraping projects requiring JavaScript execution or advanced browser automation, consider complementary tools like handling authentication in Puppeteer or monitoring network requests in Puppeteer for comprehensive web scraping solutions.
Remember that proper rate limiting not only prevents technical issues but also demonstrates respect for the websites you're accessing, leading to more sustainable and reliable data collection over time.