HTTP Request Queuing Strategies for Large-Scale Scraping
When scaling web scraping operations to handle thousands or millions of requests, implementing effective HTTP request queuing strategies becomes crucial for maintaining performance, reliability, and respectful resource usage. This guide covers proven queuing strategies that help manage large-scale scraping operations efficiently.
Why Request Queuing Matters
Large-scale web scraping presents unique challenges:
- Rate limiting: Websites implement rate limits to prevent abuse
- Server overload: Too many concurrent requests can overwhelm target servers
- IP blocking: Aggressive scraping patterns can trigger anti-bot measures
- Resource management: Uncontrolled requests consume excessive bandwidth and memory
- Data integrity: Proper queuing ensures reliable data collection
Core Queuing Strategies
1. FIFO (First In, First Out) Queue
The simplest approach where requests are processed in the order they're added:
import asyncio
from asyncio import Queue
import aiohttp
class FIFOScraper:
def __init__(self, max_concurrent=10):
self.queue = Queue()
self.max_concurrent = max_concurrent
self.session = None
async def add_url(self, url):
await self.queue.put(url)
async def worker(self):
while True:
url = await self.queue.get()
try:
async with self.session.get(url) as response:
content = await response.text()
await self.process_content(url, content)
except Exception as e:
print(f"Error processing {url}: {e}")
finally:
self.queue.task_done()
async def start_workers(self):
self.session = aiohttp.ClientSession()
workers = [asyncio.create_task(self.worker())
for _ in range(self.max_concurrent)]
return workers
async def process_content(self, url, content):
# Process scraped content
print(f"Processed {url}: {len(content)} characters")
2. Priority Queue System
Implement priority-based processing for more important requests:
import heapq
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class PriorityRequest:
priority: int
url: str
metadata: dict = None
def __lt__(self, other):
return self.priority < other.priority
class PriorityQueueScraper:
def __init__(self, max_concurrent=10):
self.queue = asyncio.PriorityQueue()
self.max_concurrent = max_concurrent
self.session = None
async def add_priority_request(self, url, priority=5, metadata=None):
request = PriorityRequest(priority, url, metadata)
await self.queue.put(request)
async def worker(self):
while True:
request = await self.queue.get()
try:
async with self.session.get(request.url) as response:
content = await response.text()
await self.process_priority_content(request, content)
except Exception as e:
print(f"Error processing {request.url}: {e}")
finally:
self.queue.task_done()
async def process_priority_content(self, request, content):
print(f"Priority {request.priority}: {request.url}")
3. Rate-Limited Queue with Token Bucket
Control request rate using the token bucket algorithm:
class RateLimitedQueue {
constructor(maxTokens = 10, refillRate = 1) {
this.maxTokens = maxTokens;
this.tokens = maxTokens;
this.refillRate = refillRate;
this.queue = [];
this.processing = false;
// Refill tokens periodically
setInterval(() => {
this.tokens = Math.min(this.maxTokens, this.tokens + this.refillRate);
this.processQueue();
}, 1000);
}
async addRequest(url, options = {}) {
return new Promise((resolve, reject) => {
this.queue.push({ url, options, resolve, reject });
this.processQueue();
});
}
async processQueue() {
if (this.processing || this.queue.length === 0 || this.tokens <= 0) {
return;
}
this.processing = true;
while (this.queue.length > 0 && this.tokens > 0) {
const request = this.queue.shift();
this.tokens--;
try {
const response = await fetch(request.url, request.options);
const content = await response.text();
request.resolve({ url: request.url, content, status: response.status });
} catch (error) {
request.reject(error);
}
}
this.processing = false;
}
}
// Usage example
const rateLimitedScraper = new RateLimitedQueue(5, 2); // 5 tokens max, refill 2/second
async function scrapeUrls(urls) {
const promises = urls.map(url => rateLimitedScraper.addRequest(url));
const results = await Promise.allSettled(promises);
return results;
}
4. Distributed Queue with Redis
Scale across multiple workers using Redis:
import redis
import json
import asyncio
import aiohttp
from typing import List, Optional
class RedisDistributedQueue:
def __init__(self, redis_url="redis://localhost:6379", queue_name="scrape_queue"):
self.redis_client = redis.from_url(redis_url)
self.queue_name = queue_name
self.processing_set = f"{queue_name}:processing"
self.failed_queue = f"{queue_name}:failed"
def add_urls(self, urls: List[str], priority: int = 0):
"""Add URLs to the queue with optional priority"""
pipeline = self.redis_client.pipeline()
for url in urls:
request_data = {"url": url, "priority": priority, "attempts": 0}
pipeline.lpush(self.queue_name, json.dumps(request_data))
pipeline.execute()
def get_next_request(self, timeout: int = 10) -> Optional[dict]:
"""Get next request from queue with timeout"""
result = self.redis_client.brpop(self.queue_name, timeout=timeout)
if result:
request_data = json.loads(result[1])
# Move to processing set for failure recovery
self.redis_client.sadd(self.processing_set, json.dumps(request_data))
return request_data
return None
def complete_request(self, request_data: dict):
"""Mark request as completed"""
self.redis_client.srem(self.processing_set, json.dumps(request_data))
def fail_request(self, request_data: dict, max_retries: int = 3):
"""Handle failed request with retry logic"""
request_data["attempts"] += 1
if request_data["attempts"] < max_retries:
# Retry the request
self.redis_client.lpush(self.queue_name, json.dumps(request_data))
else:
# Move to failed queue
self.redis_client.lpush(self.failed_queue, json.dumps(request_data))
self.redis_client.srem(self.processing_set, json.dumps(request_data))
class DistributedWorker:
def __init__(self, queue: RedisDistributedQueue, worker_id: str):
self.queue = queue
self.worker_id = worker_id
self.session = None
self.running = False
async def start(self):
"""Start the worker"""
self.session = aiohttp.ClientSession()
self.running = True
while self.running:
request_data = self.queue.get_next_request()
if request_data:
await self.process_request(request_data)
await asyncio.sleep(0.1) # Prevent busy waiting
async def process_request(self, request_data: dict):
"""Process a single request"""
url = request_data["url"]
try:
async with self.session.get(url, timeout=30) as response:
if response.status == 200:
content = await response.text()
await self.handle_success(url, content)
self.queue.complete_request(request_data)
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
except Exception as e:
print(f"Worker {self.worker_id} failed to process {url}: {e}")
self.queue.fail_request(request_data)
async def handle_success(self, url: str, content: str):
"""Handle successful scraping"""
print(f"Worker {self.worker_id} processed {url}: {len(content)} chars")
# Process and store content as needed
async def stop(self):
"""Stop the worker"""
self.running = False
if self.session:
await self.session.close()
Advanced Queuing Patterns
1. Circuit Breaker Pattern
Implement circuit breakers to handle failing endpoints:
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
return (time.time() - self.last_failure_time) >= self.recovery_timeout
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerQueue:
def __init__(self):
self.circuit_breakers = {}
self.queue = asyncio.Queue()
def get_circuit_breaker(self, domain: str) -> CircuitBreaker:
if domain not in self.circuit_breakers:
self.circuit_breakers[domain] = CircuitBreaker()
return self.circuit_breakers[domain]
async def process_request(self, url: str):
from urllib.parse import urlparse
domain = urlparse(url).netloc
circuit_breaker = self.get_circuit_breaker(domain)
try:
result = circuit_breaker.call(self.make_request, url)
return result
except Exception as e:
print(f"Circuit breaker blocked request to {domain}: {e}")
# Optionally re-queue for later
await self.queue.put(url)
def make_request(self, url: str):
# Simulate HTTP request that might fail
import requests
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text
2. Adaptive Queue with Backpressure
Handle varying server loads with adaptive queuing:
class AdaptiveQueue {
constructor(initialConcurrency = 5) {
this.concurrency = initialConcurrency;
this.maxConcurrency = 50;
this.minConcurrency = 1;
this.queue = [];
this.activeRequests = 0;
this.successCount = 0;
self.errorCount = 0;
this.lastAdjustment = Date.now();
// Adjust concurrency every 10 seconds
setInterval(() => this.adjustConcurrency(), 10000);
}
async addRequest(url, options = {}) {
return new Promise((resolve, reject) => {
this.queue.push({ url, options, resolve, reject });
this.processQueue();
});
}
async processQueue() {
while (this.queue.length > 0 && this.activeRequests < this.concurrency) {
const request = this.queue.shift();
this.activeRequests++;
this.makeRequest(request)
.finally(() => {
this.activeRequests--;
this.processQueue();
});
}
}
async makeRequest(request) {
const startTime = Date.now();
try {
const response = await fetch(request.url, {
...request.options,
timeout: 30000
});
if (response.ok) {
this.successCount++;
const content = await response.text();
request.resolve({ url: request.url, content, status: response.status });
} else {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
} catch (error) {
this.errorCount++;
request.reject(error);
}
}
adjustConcurrency() {
const totalRequests = this.successCount + this.errorCount;
if (totalRequests === 0) return;
const successRate = this.successCount / totalRequests;
const avgResponseTime = this.calculateAverageResponseTime();
if (successRate > 0.95 && avgResponseTime < 2000) {
// High success rate and fast responses - increase concurrency
this.concurrency = Math.min(this.maxConcurrency, this.concurrency + 2);
} else if (successRate < 0.8 || avgResponseTime > 5000) {
// Low success rate or slow responses - decrease concurrency
this.concurrency = Math.max(this.minConcurrency, this.concurrency - 1);
}
console.log(`Adjusted concurrency to ${this.concurrency} (success rate: ${(successRate * 100).toFixed(1)}%)`);
// Reset counters
this.successCount = 0;
this.errorCount = 0;
}
calculateAverageResponseTime() {
// Implement response time tracking
return 1000; // Placeholder
}
}
Queue Monitoring and Metrics
Implement comprehensive monitoring for queue performance:
import time
import threading
from collections import deque, defaultdict
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class QueueMetrics:
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
avg_response_time: float = 0.0
queue_size: int = 0
active_workers: int = 0
requests_per_second: float = 0.0
class MetricsCollector:
def __init__(self, window_size=300): # 5-minute window
self.window_size = window_size
self.response_times = deque(maxlen=1000)
self.request_timestamps = deque(maxlen=1000)
self.domain_stats = defaultdict(lambda: {'success': 0, 'failure': 0})
self.lock = threading.Lock()
def record_request(self, domain: str, response_time: float, success: bool):
with self.lock:
current_time = time.time()
self.response_times.append(response_time)
self.request_timestamps.append(current_time)
if success:
self.domain_stats[domain]['success'] += 1
else:
self.domain_stats[domain]['failure'] += 1
def get_metrics(self, queue_size: int, active_workers: int) -> QueueMetrics:
with self.lock:
current_time = time.time()
# Calculate requests per second
recent_requests = [ts for ts in self.request_timestamps
if current_time - ts <= 60] # Last minute
rps = len(recent_requests) / 60 if recent_requests else 0
# Calculate average response time
avg_response_time = (sum(self.response_times) / len(self.response_times)
if self.response_times else 0)
# Calculate success/failure counts
total_success = sum(stats['success'] for stats in self.domain_stats.values())
total_failure = sum(stats['failure'] for stats in self.domain_stats.values())
return QueueMetrics(
total_requests=total_success + total_failure,
successful_requests=total_success,
failed_requests=total_failure,
avg_response_time=avg_response_time,
queue_size=queue_size,
active_workers=active_workers,
requests_per_second=rps
)
def get_domain_stats(self) -> Dict[str, Dict[str, int]]:
with self.lock:
return dict(self.domain_stats)
Best Practices for Queue Implementation
1. Graceful Degradation
Implement fallback mechanisms when queues become overwhelmed:
class RobustQueue:
def __init__(self, max_queue_size=10000):
self.primary_queue = asyncio.Queue(maxsize=max_queue_size)
self.overflow_queue = []
self.metrics = MetricsCollector()
async def add_request(self, url: str, priority: int = 0):
try:
# Try primary queue first
self.primary_queue.put_nowait((priority, url))
except asyncio.QueueFull:
# Fall back to overflow handling
if len(self.overflow_queue) < 5000: # Limit overflow
self.overflow_queue.append((priority, url))
else:
# Drop lowest priority requests
self.overflow_queue.sort(key=lambda x: x[0])
if priority > self.overflow_queue[0][0]:
self.overflow_queue[0] = (priority, url)
self.overflow_queue.sort(key=lambda x: x[0])
2. Request Deduplication
Prevent duplicate requests in large-scale operations:
import hashlib
from urllib.parse import urlparse, parse_qs, urlencode
class DeduplicatedQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.seen_urls = set()
self.url_fingerprints = set()
def normalize_url(self, url: str) -> str:
"""Normalize URL for deduplication"""
parsed = urlparse(url.lower())
# Sort query parameters
if parsed.query:
params = parse_qs(parsed.query)
sorted_params = urlencode(sorted(params.items()))
normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_params}"
else:
normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return normalized
def get_url_fingerprint(self, url: str) -> str:
"""Generate fingerprint for similar URLs"""
normalized = self.normalize_url(url)
return hashlib.md5(normalized.encode()).hexdigest()
async def add_request(self, url: str, allow_duplicates: bool = False):
if not allow_duplicates:
fingerprint = self.get_url_fingerprint(url)
if fingerprint in self.url_fingerprints:
return False # Duplicate detected
self.url_fingerprints.add(fingerprint)
self.seen_urls.add(url)
await self.queue.put(url)
return True
Integration with Browser Automation
When combining queuing strategies with browser automation tools, consider the resource implications. For example, when running multiple pages in parallel with Puppeteer, you'll need to balance queue throughput with browser resource consumption.
For JavaScript-heavy sites that require full browser rendering, implementing proper timeout handling in Puppeteer becomes crucial for maintaining queue performance and preventing resource leaks.
Conclusion
Effective HTTP request queuing is fundamental to successful large-scale web scraping. The strategies outlined above provide a foundation for building robust, scalable scraping systems that respect server resources while maximizing data collection efficiency.
Key takeaways:
- Choose the right strategy: FIFO for simplicity, priority queues for importance-based processing, rate limiting for respectful scraping
- Monitor performance: Track metrics to optimize queue parameters and detect issues early
- Handle failures gracefully: Implement retry logic, circuit breakers, and overflow handling
- Scale horizontally: Use distributed queues with Redis or similar systems for multi-worker setups
- Adapt dynamically: Adjust concurrency and rate limits based on server responses and performance metrics
By implementing these queuing strategies, you'll build scraping systems that can handle massive scale while maintaining reliability and performance.