How do I handle rate limiting when using LLM APIs for web scraping?
When using Large Language Model (LLM) APIs like OpenAI's GPT, Anthropic's Claude, or Google's Gemini for web scraping and data extraction, you'll inevitably encounter rate limiting. LLM providers impose limits on requests per minute (RPM), tokens per minute (TPM), and requests per day (RPD) to ensure fair usage and prevent system overload.
Rate limiting is particularly challenging when scraping large datasets because each page needs to be processed through the LLM API. Without proper handling, your scraper will fail with 429 (Too Many Requests) errors, wasting time and credits.
Understanding LLM API Rate Limits
Different LLM providers have varying rate limit structures:
OpenAI (GPT-4, GPT-3.5) - Requests per minute: 3-10,000 (tier-dependent) - Tokens per minute: 40,000-2,000,000 - Requests per day: Varies by tier
Anthropic (Claude) - Requests per minute: 50-1,000 - Tokens per minute: 40,000-400,000 - Monthly token limits based on tier
Google (Gemini) - Requests per minute: 60-1,000 - Tokens per minute: 32,000-4,000,000
Rate limits are typically enforced at the API key level and depend on your subscription tier. Higher-tier accounts get more generous limits.
Core Strategies for Handling Rate Limits
1. Implement Exponential Backoff with Retry Logic
The most fundamental approach is to catch rate limit errors and retry with increasing delays:
Python Example:
import time
import openai
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5)
)
def call_llm_with_retry(prompt):
"""Call LLM API with automatic retry on rate limits"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content
except openai.error.RateLimitError as e:
print(f"Rate limit hit, retrying... {e}")
raise # tenacity will handle the retry
except Exception as e:
print(f"Error: {e}")
raise
# Usage
html_content = "<div>Product price: $99.99</div>"
prompt = f"Extract the price from this HTML: {html_content}"
result = call_llm_with_retry(prompt)
JavaScript Example:
async function callLLMWithRetry(prompt, maxRetries = 5) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{ role: 'user', content: prompt }]
})
});
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || Math.pow(2, attempt);
console.log(`Rate limited. Waiting ${retryAfter} seconds...`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
continue;
}
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
const data = await response.json();
return data.choices[0].message.content;
} catch (error) {
if (attempt === maxRetries - 1) throw error;
const delay = Math.min(1000 * Math.pow(2, attempt), 60000);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
2. Rate Limit with Token Bucket Algorithm
Implement a token bucket to control request rate proactively:
Python Example:
import time
from threading import Lock
class RateLimiter:
def __init__(self, requests_per_minute=50, tokens_per_minute=40000):
self.rpm = requests_per_minute
self.tpm = tokens_per_minute
self.request_tokens = requests_per_minute
self.token_budget = tokens_per_minute
self.last_update = time.time()
self.lock = Lock()
def wait_if_needed(self, estimated_tokens=1000):
"""Wait if rate limit would be exceeded"""
with self.lock:
now = time.time()
time_passed = now - self.last_update
# Replenish tokens
self.request_tokens = min(
self.rpm,
self.request_tokens + (time_passed * self.rpm / 60)
)
self.token_budget = min(
self.tpm,
self.token_budget + (time_passed * self.tpm / 60)
)
self.last_update = now
# Check if we need to wait
if self.request_tokens < 1 or self.token_budget < estimated_tokens:
wait_time = max(
60 / self.rpm if self.request_tokens < 1 else 0,
(estimated_tokens - self.token_budget) * 60 / self.tpm
)
print(f"Rate limit reached. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
self.request_tokens = 1
self.token_budget = estimated_tokens
else:
self.request_tokens -= 1
self.token_budget -= estimated_tokens
# Usage
limiter = RateLimiter(requests_per_minute=50, tokens_per_minute=40000)
def scrape_with_llm(urls):
results = []
for url in urls:
# Estimate tokens needed (prompt + response)
estimated_tokens = 1500
limiter.wait_if_needed(estimated_tokens)
# Make API call
result = call_llm_api(url)
results.append(result)
return results
3. Batch Processing and Queue Management
Process URLs in batches to stay within rate limits:
Python Example with Queue:
import asyncio
from asyncio import Queue, Semaphore
import aiohttp
async def process_url_with_llm(session, url, semaphore, rate_limiter):
"""Process a single URL with rate limiting"""
async with semaphore: # Limit concurrent requests
await rate_limiter.acquire()
try:
# Fetch page content
async with session.get(url) as response:
html = await response.text()
# Extract data with LLM
prompt = f"Extract product info from: {html[:2000]}"
result = await call_llm_api_async(prompt)
return {'url': url, 'data': result}
except Exception as e:
print(f"Error processing {url}: {e}")
return {'url': url, 'error': str(e)}
async def scrape_batch(urls, max_concurrent=5, rpm=50):
"""Scrape URLs in batches with rate limiting"""
semaphore = Semaphore(max_concurrent)
rate_limiter = AsyncRateLimiter(rpm)
async with aiohttp.ClientSession() as session:
tasks = [
process_url_with_llm(session, url, semaphore, rate_limiter)
for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
class AsyncRateLimiter:
def __init__(self, rate_per_minute):
self.rate = rate_per_minute
self.interval = 60.0 / rate_per_minute
self.last_called = 0
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = asyncio.get_event_loop().time()
time_since_last = now - self.last_called
if time_since_last < self.interval:
await asyncio.sleep(self.interval - time_since_last)
self.last_called = asyncio.get_event_loop().time()
4. Monitor and Respect Header Limits
LLM APIs return rate limit information in response headers:
Python Example:
import requests
def call_llm_and_track_limits(prompt):
"""Call LLM and track rate limit headers"""
response = requests.post(
'https://api.openai.com/v1/chat/completions',
headers={'Authorization': f'Bearer {API_KEY}'},
json={'model': 'gpt-4', 'messages': [{'role': 'user', 'content': prompt}]}
)
# Extract rate limit info from headers
remaining_requests = int(response.headers.get('x-ratelimit-remaining-requests', 0))
remaining_tokens = int(response.headers.get('x-ratelimit-remaining-tokens', 0))
reset_time = response.headers.get('x-ratelimit-reset-requests', '')
print(f"Remaining: {remaining_requests} requests, {remaining_tokens} tokens")
print(f"Reset time: {reset_time}")
if response.status_code == 429:
retry_after = int(response.headers.get('retry-after', 60))
print(f"Rate limited! Retry after {retry_after} seconds")
time.sleep(retry_after)
return call_llm_and_track_limits(prompt) # Retry
return response.json()
5. Use Multiple API Keys or Providers
Distribute load across multiple API keys or switch between different LLM providers:
Python Example:
from itertools import cycle
class MultiKeyLLMClient:
def __init__(self, api_keys):
self.keys = cycle(api_keys) # Round-robin through keys
self.key_limiters = {
key: RateLimiter(requests_per_minute=50)
for key in api_keys
}
self.current_key = next(self.keys)
def call_llm(self, prompt):
"""Call LLM using round-robin API keys"""
max_attempts = len(self.key_limiters)
for _ in range(max_attempts):
try:
limiter = self.key_limiters[self.current_key]
limiter.wait_if_needed()
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
api_key=self.current_key
)
return response.choices[0].message.content
except openai.error.RateLimitError:
print(f"Key {self.current_key[:8]}... rate limited, switching")
self.current_key = next(self.keys)
continue
raise Exception("All API keys are rate limited")
# Usage
api_keys = ['sk-key1...', 'sk-key2...', 'sk-key3...']
client = MultiKeyLLMClient(api_keys)
result = client.call_llm("Extract data from this page...")
6. Optimize Token Usage
Reduce tokens per request to fit more requests within token limits:
def optimize_html_for_llm(html, max_chars=8000):
"""Reduce HTML size before sending to LLM"""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Remove unnecessary elements
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
tag.decompose()
# Get text with minimal whitespace
text = ' '.join(soup.get_text().split())
# Truncate if still too long
return text[:max_chars]
# Usage in scraping
html_content = fetch_page(url)
optimized = optimize_html_for_llm(html_content)
prompt = f"Extract product data from: {optimized}"
result = call_llm_with_retry(prompt)
Best Practices
- Start Conservative: Begin with lower request rates and gradually increase
- Log Rate Limit Events: Track when limits are hit to optimize your strategy
- Use Caching: Cache LLM responses to avoid redundant API calls
- Implement Circuit Breakers: Stop requests temporarily if repeated failures occur
- Monitor Costs: Rate limiting often correlates with API costs; track spending
- Handle Errors Gracefully: Don't lose scraped data when rate limits are hit
- Consider Alternatives: For simple extraction, traditional parsing methods may be more efficient
Combining with Web Scraping Workflows
When using LLMs with browser automation tools, implement proper timeout handling to prevent cascading delays:
async def scrape_with_timeout_and_rate_limit(url, limiter, timeout=30):
"""Combine timeout handling with rate limiting"""
try:
# Wait for rate limit
await limiter.acquire()
# Fetch with timeout
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
html = await response.text()
# Process with LLM (also with timeout)
result = await asyncio.wait_for(
call_llm_api_async(html),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f"Timeout processing {url}")
return None
except Exception as e:
print(f"Error: {e}")
return None
Conclusion
Handling rate limiting when using LLM APIs for web scraping requires a multi-layered approach combining retry logic, proactive rate limiting, batch processing, and intelligent error handling. By implementing these strategies, you can build robust scrapers that efficiently process large datasets while respecting API limits and optimizing costs.
The key is to balance throughput with reliability—scraping faster isn't valuable if you're constantly hitting rate limits and failing requests. Start with conservative limits, monitor performance, and adjust based on your specific use case and API tier.