Table of contents

How do I implement rate limiting when using urllib3?

Rate limiting is a crucial technique when working with urllib3 to prevent overwhelming target servers and avoid getting blocked or throttled. This comprehensive guide covers multiple approaches to implement rate limiting effectively in your urllib3-based applications.

Understanding Rate Limiting in Web Scraping

Rate limiting controls the frequency of HTTP requests sent to a server within a specific time window. It's essential for:

  • Respecting server resources and preventing overload
  • Avoiding IP bans and blocking mechanisms
  • Maintaining ethical scraping practices
  • Ensuring consistent data collection over time

Basic Rate Limiting with time.sleep()

The simplest approach uses Python's time.sleep() function to introduce delays between requests:

import urllib3
import time

# Create a PoolManager instance
http = urllib3.PoolManager()

urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page3'
]

# Basic rate limiting with fixed delay
for url in urls:
    response = http.request('GET', url)
    print(f"Status: {response.status}, URL: {url}")

    # Wait 1 second between requests
    time.sleep(1)

While simple, this approach lacks flexibility and doesn't account for varying response times.

Advanced Rate Limiting with Token Bucket Algorithm

The token bucket algorithm provides more sophisticated rate limiting by allowing burst requests while maintaining an average rate:

import urllib3
import time
import threading
from collections import deque

class TokenBucket:
    def __init__(self, capacity, refill_rate):
        """
        Token bucket rate limiter

        Args:
            capacity: Maximum number of tokens in bucket
            refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def consume(self, tokens=1):
        """Attempt to consume tokens from bucket"""
        with self.lock:
            now = time.time()
            # Add tokens based on elapsed time
            tokens_to_add = (now - self.last_refill) * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_for_token(self, tokens=1):
        """Wait until enough tokens are available"""
        while not self.consume(tokens):
            time.sleep(0.01)  # Small sleep to prevent busy waiting

# Example usage with urllib3
http = urllib3.PoolManager()
rate_limiter = TokenBucket(capacity=10, refill_rate=2.0)  # 2 requests per second

urls = ['https://httpbin.org/delay/1'] * 20

for i, url in enumerate(urls):
    rate_limiter.wait_for_token()

    start_time = time.time()
    response = http.request('GET', url)
    end_time = time.time()

    print(f"Request {i+1}: Status {response.status}, "
          f"Time: {end_time - start_time:.2f}s")

Rate Limiting with Threading and Queue

For concurrent requests with rate limiting, combine threading with a controlled queue system:

import urllib3
import time
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

class RateLimitedRequester:
    def __init__(self, max_workers=5, requests_per_second=2):
        self.http = urllib3.PoolManager()
        self.max_workers = max_workers
        self.requests_per_second = requests_per_second
        self.last_request_time = 0
        self.lock = threading.Lock()

    def _rate_limited_request(self, method, url, **kwargs):
        """Make a rate-limited HTTP request"""
        with self.lock:
            current_time = time.time()
            time_since_last = current_time - self.last_request_time
            min_interval = 1.0 / self.requests_per_second

            if time_since_last < min_interval:
                sleep_time = min_interval - time_since_last
                time.sleep(sleep_time)

            self.last_request_time = time.time()

        return self.http.request(method, url, **kwargs)

    def get(self, url, **kwargs):
        """Make a GET request with rate limiting"""
        return self._rate_limited_request('GET', url, **kwargs)

    def post(self, url, **kwargs):
        """Make a POST request with rate limiting"""
        return self._rate_limited_request('POST', url, **kwargs)

    def bulk_request(self, urls):
        """Make multiple requests with rate limiting and threading"""
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(self.get, url) for url in urls]

            for future in futures:
                try:
                    response = future.result()
                    results.append({
                        'status': response.status,
                        'url': response.geturl(),
                        'success': True
                    })
                except Exception as e:
                    results.append({
                        'error': str(e),
                        'success': False
                    })

        return results

# Usage example
requester = RateLimitedRequester(max_workers=3, requests_per_second=1.5)

urls = [
    'https://httpbin.org/json',
    'https://httpbin.org/headers',
    'https://httpbin.org/user-agent',
    'https://httpbin.org/ip'
]

results = requester.bulk_request(urls)
for result in results:
    if result['success']:
        print(f"Success: {result['url']} - Status: {result['status']}")
    else:
        print(f"Error: {result['error']}")

Adaptive Rate Limiting Based on Response Status

Implement intelligent rate limiting that adjusts based on server responses:

import urllib3
import time
import random

class AdaptiveRateLimiter:
    def __init__(self, initial_delay=1.0, max_delay=60.0):
        self.current_delay = initial_delay
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.success_count = 0
        self.http = urllib3.PoolManager()

    def adjust_delay(self, status_code):
        """Adjust delay based on response status"""
        if status_code == 429:  # Too Many Requests
            self.current_delay = min(self.current_delay * 2, self.max_delay)
            self.success_count = 0
            print(f"Rate limited! Increasing delay to {self.current_delay}s")
        elif 500 <= status_code < 600:  # Server errors
            self.current_delay = min(self.current_delay * 1.5, self.max_delay)
            self.success_count = 0
            print(f"Server error! Increasing delay to {self.current_delay}s")
        elif status_code == 200:
            self.success_count += 1
            # Decrease delay after successful requests
            if self.success_count >= 5:
                self.current_delay = max(
                    self.current_delay * 0.9, 
                    self.initial_delay
                )
                self.success_count = 0

    def request_with_backoff(self, method, url, max_retries=3, **kwargs):
        """Make request with adaptive rate limiting and backoff"""
        for attempt in range(max_retries + 1):
            try:
                # Apply current delay with some jitter
                jitter = random.uniform(0.8, 1.2)
                time.sleep(self.current_delay * jitter)

                response = self.http.request(method, url, **kwargs)
                self.adjust_delay(response.status)

                if response.status == 429 and attempt < max_retries:
                    print(f"Retrying in {self.current_delay}s...")
                    continue

                return response

            except urllib3.exceptions.RequestError as e:
                if attempt < max_retries:
                    self.current_delay = min(self.current_delay * 2, self.max_delay)
                    print(f"Request error, retrying in {self.current_delay}s...")
                    time.sleep(self.current_delay)
                else:
                    raise e

        return None

# Usage example
adaptive_limiter = AdaptiveRateLimiter(initial_delay=0.5, max_delay=30.0)

# Simulate requests that might trigger rate limiting
test_urls = [
    'https://httpbin.org/status/200',
    'https://httpbin.org/status/429',  # Simulates rate limiting
    'https://httpbin.org/status/200',
    'https://httpbin.org/status/500',  # Simulates server error
    'https://httpbin.org/status/200'
]

for url in test_urls:
    response = adaptive_limiter.request_with_backoff('GET', url)
    if response:
        print(f"Response: {response.status} from {url}")

Rate Limiting with External Libraries

For production applications, consider using dedicated rate limiting libraries:

Using ratelimit library

pip install ratelimit
import urllib3
from ratelimit import limits, sleep_and_retry
import time

# Rate limit: 30 calls per minute
@sleep_and_retry
@limits(calls=30, period=60)
def make_request(http, url):
    """Rate-limited request function"""
    return http.request('GET', url)

http = urllib3.PoolManager()
urls = ['https://httpbin.org/json'] * 50

for i, url in enumerate(urls):
    response = make_request(http, url)
    print(f"Request {i+1}: Status {response.status}")

Using pyrate-limiter

pip install pyrate-limiter
import urllib3
from pyrate_limiter import Duration, RequestRate, Limiter

# Create a limiter: 10 requests per 5 seconds
limiter = Limiter(RequestRate(10, Duration.SECOND * 5))

http = urllib3.PoolManager()

@limiter.ratelimit("api_requests", delay=True)
def fetch_data(url):
    """Rate-limited data fetching"""
    response = http.request('GET', url)
    return {
        'status': response.status,
        'data': response.data.decode('utf-8')[:100] + '...'
    }

# Example usage
urls = ['https://httpbin.org/json'] * 25

for i, url in enumerate(urls):
    result = fetch_data(url)
    print(f"Request {i+1}: Status {result['status']}")

Best Practices for Rate Limiting

1. Respect robots.txt and Crawl-Delay

import urllib3
import urllib.robotparser
import urllib.parse

def get_crawl_delay(url):
    """Extract crawl delay from robots.txt"""
    try:
        rp = urllib.robotparser.RobotFileParser()
        robots_url = urllib.parse.urljoin(url, '/robots.txt')
        rp.set_url(robots_url)
        rp.read()

        # Get crawl delay for all user agents
        delay = rp.crawl_delay('*')
        return delay if delay else 1.0
    except:
        return 1.0  # Default delay

# Usage in rate-limited requests
http = urllib3.PoolManager()
base_url = 'https://example.com'
crawl_delay = get_crawl_delay(base_url)

print(f"Using crawl delay: {crawl_delay} seconds")
# Apply this delay in your rate limiting logic

2. Monitor Response Headers

def check_rate_limit_headers(response):
    """Monitor rate limiting headers from server"""
    headers = response.headers

    rate_limit_info = {
        'limit': headers.get('X-RateLimit-Limit'),
        'remaining': headers.get('X-RateLimit-Remaining'),
        'reset': headers.get('X-RateLimit-Reset'),
        'retry_after': headers.get('Retry-After')
    }

    # Calculate wait time based on headers
    if rate_limit_info['retry_after']:
        return int(rate_limit_info['retry_after'])
    elif rate_limit_info['remaining'] == '0' and rate_limit_info['reset']:
        reset_time = int(rate_limit_info['reset'])
        wait_time = reset_time - int(time.time())
        return max(wait_time, 0)

    return 0

# Example usage
response = http.request('GET', 'https://api.example.com/data')
wait_time = check_rate_limit_headers(response)
if wait_time > 0:
    print(f"Rate limit hit, waiting {wait_time} seconds")
    time.sleep(wait_time)

Common Pitfalls and Solutions

Avoiding Thundering Herd

When running multiple processes, add randomization to prevent simultaneous requests:

import random

def jittered_delay(base_delay, jitter_factor=0.1):
    """Add jitter to prevent thundering herd"""
    jitter = random.uniform(-jitter_factor, jitter_factor)
    return base_delay * (1 + jitter)

# Apply jittered delays
for url in urls:
    delay = jittered_delay(1.0, 0.2)  # 1 second ± 20%
    time.sleep(delay)
    response = http.request('GET', url)

Implementing Circuit Breaker Pattern

Protect against repeated failures with a circuit breaker:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = 1
    OPEN = 2
    HALF_OPEN = 3

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def can_request(self):
        """Check if request is allowed"""
        if self.state == CircuitState.CLOSED:
            return True
        elif self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        else:  # HALF_OPEN
            return True

    def record_success(self):
        """Record successful request"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        """Record failed request"""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage with urllib3
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
http = urllib3.PoolManager()

def protected_request(url):
    """Make request with circuit breaker protection"""
    if not circuit_breaker.can_request():
        raise Exception("Circuit breaker is open")

    try:
        response = http.request('GET', url, timeout=10)
        if response.status < 500:
            circuit_breaker.record_success()
        else:
            circuit_breaker.record_failure()
        return response
    except Exception as e:
        circuit_breaker.record_failure()
        raise e

Integration with Connection Pooling

Combine rate limiting with urllib3's connection pooling for optimal performance:

import urllib3
import time
from threading import Lock

class PooledRateLimiter:
    def __init__(self, pool_size=10, requests_per_second=5):
        # Configure connection pool
        self.http = urllib3.PoolManager(
            num_pools=pool_size,
            maxsize=pool_size,
            block=True
        )

        self.requests_per_second = requests_per_second
        self.min_interval = 1.0 / requests_per_second
        self.last_request_time = 0
        self.lock = Lock()

    def request(self, method, url, **kwargs):
        """Make rate-limited request using connection pool"""
        with self.lock:
            current_time = time.time()
            time_since_last = current_time - self.last_request_time

            if time_since_last < self.min_interval:
                sleep_time = self.min_interval - time_since_last
                time.sleep(sleep_time)

            self.last_request_time = time.time()

        # Make request using pooled connection
        return self.http.request(method, url, **kwargs)

# Usage example
limiter = PooledRateLimiter(pool_size=20, requests_per_second=3)

urls = ['https://httpbin.org/json'] * 50

for i, url in enumerate(urls):
    response = limiter.request('GET', url)
    print(f"Request {i+1}: Status {response.status}")

Conclusion

Implementing effective rate limiting with urllib3 requires choosing the right strategy for your use case. Start with simple time-based delays for basic scenarios, then progress to token bucket algorithms or adaptive rate limiting for more sophisticated applications. Always monitor server responses and respect rate limiting headers to maintain ethical scraping practices.

For complex scraping projects requiring JavaScript execution or advanced browser automation, consider complementary tools like handling authentication in Puppeteer or monitoring network requests in Puppeteer for comprehensive web scraping solutions.

Remember that proper rate limiting not only prevents technical issues but also demonstrates respect for the websites you're accessing, leading to more sustainable and reliable data collection over time.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon