Table of contents

How do I handle rate limiting when using Requests?

Rate limiting is a crucial aspect of responsible web scraping and API consumption. When servers implement rate limits to prevent abuse, your Python Requests applications need robust strategies to handle these restrictions gracefully. This comprehensive guide covers various techniques to manage rate limiting effectively.

Understanding Rate Limiting

Rate limiting occurs when a server restricts the number of requests a client can make within a specific time period. Common scenarios include:

  • HTTP 429 "Too Many Requests" status codes
  • API quotas (e.g., 1000 requests per hour)
  • Concurrent connection limits
  • Burst rate limits (e.g., 10 requests per second)

Basic Rate Limiting with Sleep

The simplest approach is adding delays between requests:

import requests
import time

def fetch_with_delay(urls, delay=1.0):
    results = []
    for url in urls:
        response = requests.get(url)
        results.append(response)
        time.sleep(delay)  # Wait between requests
    return results

# Example usage
urls = ['https://api.example.com/data/1', 'https://api.example.com/data/2']
responses = fetch_with_delay(urls, delay=2.0)

Implementing Exponential Backoff

Exponential backoff increases delay times progressively after each failure:

import requests
import time
import random

def exponential_backoff_request(url, max_retries=5, base_delay=1):
    for attempt in range(max_retries):
        try:
            response = requests.get(url)

            if response.status_code == 429:
                # Calculate exponential backoff delay
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited. Waiting {delay:.2f} seconds...")
                time.sleep(delay)
                continue

            return response

        except requests.RequestException as e:
            if attempt == max_retries - 1:
                raise e
            delay = base_delay * (2 ** attempt)
            time.sleep(delay)

    raise Exception(f"Max retries exceeded for {url}")

# Example usage
try:
    response = exponential_backoff_request('https://api.example.com/data')
    print(f"Success: {response.status_code}")
except Exception as e:
    print(f"Failed: {e}")

Using urllib3 Retry Strategy

The urllib3 library (used by Requests) provides built-in retry mechanisms:

import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

def create_session_with_retries():
    session = requests.Session()

    # Define retry strategy
    retry_strategy = Retry(
        total=5,
        status_forcelist=[429, 500, 502, 503, 504],
        backoff_factor=1,
        allowed_methods=["HEAD", "GET", "OPTIONS"]
    )

    # Mount adapter with retry strategy
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    return session

# Example usage
session = create_session_with_retries()
response = session.get('https://api.example.com/data', timeout=10)

Advanced Rate Limiting with Token Bucket

Implement a token bucket algorithm for precise rate control:

import requests
import time
import threading

class TokenBucket:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def consume(self, tokens=1):
        with self.lock:
            now = time.time()
            # Add tokens based on time elapsed
            tokens_to_add = (now - self.last_refill) * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_for_tokens(self, tokens=1):
        while not self.consume(tokens):
            time.sleep(0.1)

class RateLimitedSession:
    def __init__(self, requests_per_second=10):
        self.bucket = TokenBucket(capacity=10, refill_rate=requests_per_second)
        self.session = requests.Session()

    def get(self, url, **kwargs):
        self.bucket.wait_for_tokens()
        return self.session.get(url, **kwargs)

    def post(self, url, **kwargs):
        self.bucket.wait_for_tokens()
        return self.session.post(url, **kwargs)

# Example usage
rate_limited_session = RateLimitedSession(requests_per_second=5)
response = rate_limited_session.get('https://api.example.com/data')

Handling Retry-After Headers

Many APIs provide Retry-After headers indicating when to retry:

import requests
import time

def respect_retry_after(url, max_retries=3):
    for attempt in range(max_retries):
        response = requests.get(url)

        if response.status_code == 429:
            retry_after = response.headers.get('Retry-After')

            if retry_after:
                # Retry-After can be in seconds or HTTP date
                try:
                    delay = int(retry_after)
                except ValueError:
                    # Parse HTTP date format if needed
                    delay = 60  # Default fallback

                print(f"Rate limited. Retrying after {delay} seconds...")
                time.sleep(delay)
                continue

        return response

    raise Exception("Max retries exceeded")

# Example usage
response = respect_retry_after('https://api.example.com/data')

Concurrent Requests with Rate Limiting

Combine threading with rate limiting for efficient concurrent processing:

import requests
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue

class ConcurrentRateLimiter:
    def __init__(self, max_workers=5, requests_per_second=10):
        self.max_workers = max_workers
        self.request_queue = Queue()
        self.rate_limit_delay = 1.0 / requests_per_second
        self.last_request_time = 0
        self.lock = threading.Lock()

    def rate_limited_request(self, url):
        with self.lock:
            now = time.time()
            time_since_last = now - self.last_request_time

            if time_since_last < self.rate_limit_delay:
                sleep_time = self.rate_limit_delay - time_since_last
                time.sleep(sleep_time)

            self.last_request_time = time.time()

        return requests.get(url)

    def fetch_urls(self, urls):
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_url = {
                executor.submit(self.rate_limited_request, url): url 
                for url in urls
            }

            # Collect results
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    response = future.result()
                    results.append((url, response))
                except Exception as e:
                    results.append((url, e))

        return results

# Example usage
limiter = ConcurrentRateLimiter(max_workers=3, requests_per_second=2)
urls = [f'https://api.example.com/data/{i}' for i in range(10)]
results = limiter.fetch_urls(urls)

Monitoring and Logging Rate Limits

Implement comprehensive logging to track rate limiting behavior:

import requests
import time
import logging
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RateLimitMonitor:
    def __init__(self):
        self.request_count = 0
        self.rate_limit_count = 0
        self.start_time = time.time()

    def make_request(self, url, **kwargs):
        self.request_count += 1
        start_time = time.time()

        try:
            response = requests.get(url, **kwargs)
            duration = time.time() - start_time

            # Log request details
            logger.info(
                f"Request #{self.request_count}: {response.status_code} "
                f"({duration:.2f}s) - {url}"
            )

            # Handle rate limiting
            if response.status_code == 429:
                self.rate_limit_count += 1
                retry_after = response.headers.get('Retry-After', '60')

                logger.warning(
                    f"Rate limited! Count: {self.rate_limit_count}, "
                    f"Retry-After: {retry_after}s"
                )

                time.sleep(int(retry_after))
                return self.make_request(url, **kwargs)  # Retry

            return response

        except requests.RequestException as e:
            logger.error(f"Request failed: {e}")
            raise

    def get_stats(self):
        elapsed = time.time() - self.start_time
        return {
            'total_requests': self.request_count,
            'rate_limits_hit': self.rate_limit_count,
            'elapsed_time': elapsed,
            'requests_per_second': self.request_count / elapsed if elapsed > 0 else 0
        }

# Example usage
monitor = RateLimitMonitor()
response = monitor.make_request('https://api.example.com/data')
print(monitor.get_stats())

Best Practices for Rate Limiting

1. Respect Server Policies

Always check the API documentation for rate limiting policies and implement appropriate delays.

2. Use Session Objects

Reuse session objects to maintain connection pools and cookies:

session = requests.Session()
session.headers.update({'User-Agent': 'MyApp/1.0'})

# Use session for all requests
response = session.get('https://api.example.com/data')

3. Implement Circuit Breakers

Stop making requests temporarily when encountering persistent failures:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self.failure_count = 0
            self.state = 'CLOSED'
            return result
        except Exception as e:
            self.failure_count += 1
            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'
                self.last_failure_time = time.time()
            raise e

Integration with Web Scraping Workflows

When building comprehensive scraping solutions, consider integrating rate limiting with other tools. For example, when handling timeouts in browser automation, you can apply similar backoff strategies to manage both network delays and rate limits effectively.

For complex scenarios involving parallel processing, implement distributed rate limiting across multiple workers to ensure compliance with server policies while maximizing throughput.

Conclusion

Effective rate limiting is essential for building robust, respectful web scraping applications. By implementing exponential backoff, respecting Retry-After headers, and using appropriate delay mechanisms, you can handle rate limits gracefully while maintaining good relationships with target servers.

Remember to always: - Monitor your request patterns and adjust rates accordingly - Implement proper error handling and logging - Respect robots.txt and API terms of service - Use sessions and connection pooling for efficiency - Test your rate limiting logic under various scenarios

These strategies will help you build reliable applications that can handle rate limiting challenges while maintaining optimal performance.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon