How to Implement API Request Queuing for High-Volume Scraping
When scraping at scale, managing API requests efficiently becomes crucial for maintaining performance, respecting rate limits, and ensuring system stability. Request queuing is a fundamental pattern that helps organize and control the flow of HTTP requests in high-volume scraping operations.
Understanding Request Queuing Fundamentals
Request queuing involves creating a buffer system between your scraping logic and the actual HTTP requests. This pattern allows you to:
- Control request rate and timing
- Handle failures gracefully with retry mechanisms
- Prioritize certain requests over others
- Monitor and limit concurrent connections
- Implement backpressure when systems are overwhelmed
Basic Queue Implementation in Python
Here's a fundamental implementation using Python's asyncio
and aiohttp
:
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class Priority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
@dataclass
class QueuedRequest:
url: str
method: str = 'GET'
headers: Dict[str, str] = None
data: Any = None
priority: Priority = Priority.NORMAL
max_retries: int = 3
retry_count: int = 0
created_at: float = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
if self.headers is None:
self.headers = {}
class RequestQueue:
def __init__(self, max_concurrent=10, rate_limit=1.0):
self.queue = asyncio.PriorityQueue()
self.max_concurrent = max_concurrent
self.rate_limit = rate_limit # seconds between requests
self.last_request_time = 0
self.active_requests = 0
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def add_request(self, request: QueuedRequest):
"""Add a request to the queue with priority"""
priority_value = -request.priority.value # Negative for max heap behavior
await self.queue.put((priority_value, time.time(), request))
async def process_queue(self):
"""Main processing loop"""
workers = [
asyncio.create_task(self._worker(f"worker-{i}"))
for i in range(self.max_concurrent)
]
try:
await asyncio.gather(*workers)
except KeyboardInterrupt:
for worker in workers:
worker.cancel()
async def _worker(self, worker_name: str):
"""Individual worker processing requests"""
while True:
try:
# Get next request from queue
_, _, request = await self.queue.get()
# Rate limiting
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.rate_limit:
await asyncio.sleep(self.rate_limit - time_since_last)
self.last_request_time = time.time()
self.active_requests += 1
# Execute request
success = await self._execute_request(request, worker_name)
if not success and request.retry_count < request.max_retries:
# Retry with exponential backoff
request.retry_count += 1
await asyncio.sleep(2 ** request.retry_count)
await self.add_request(request)
self.active_requests -= 1
self.queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
print(f"Worker {worker_name} error: {e}")
self.active_requests = max(0, self.active_requests - 1)
async def _execute_request(self, request: QueuedRequest, worker_name: str) -> bool:
"""Execute individual HTTP request"""
try:
async with self.session.request(
request.method,
request.url,
headers=request.headers,
data=request.data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
content = await response.text()
print(f"{worker_name}: Successfully processed {request.url}")
# Process your scraped content here
return True
elif response.status == 429: # Rate limited
print(f"{worker_name}: Rate limited for {request.url}")
return False
else:
print(f"{worker_name}: HTTP {response.status} for {request.url}")
return False
except Exception as e:
print(f"{worker_name}: Request failed for {request.url}: {e}")
return False
# Usage example
async def main():
urls = [
"https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://api.example.com/data/3",
]
async with RequestQueue(max_concurrent=5, rate_limit=0.5) as queue:
# Add requests to queue
for url in urls:
request = QueuedRequest(
url=url,
headers={"User-Agent": "MyBot/1.0"},
priority=Priority.NORMAL
)
await queue.add_request(request)
# Process all requests
await queue.process_queue()
if __name__ == "__main__":
asyncio.run(main())
JavaScript Implementation with Node.js
For JavaScript environments, here's an equivalent implementation:
const axios = require('axios');
const { EventEmitter } = require('events');
class Priority {
static LOW = 1;
static NORMAL = 2;
static HIGH = 3;
}
class QueuedRequest {
constructor(url, options = {}) {
this.url = url;
this.method = options.method || 'GET';
this.headers = options.headers || {};
this.data = options.data || null;
this.priority = options.priority || Priority.NORMAL;
this.maxRetries = options.maxRetries || 3;
this.retryCount = 0;
this.createdAt = Date.now();
}
}
class RequestQueue extends EventEmitter {
constructor(options = {}) {
super();
this.maxConcurrent = options.maxConcurrent || 10;
this.rateLimit = options.rateLimit || 1000; // milliseconds
this.queue = [];
this.activeRequests = 0;
this.lastRequestTime = 0;
this.processing = false;
}
addRequest(request) {
this.queue.push(request);
this.queue.sort((a, b) => {
// Sort by priority (higher first), then by creation time
if (a.priority !== b.priority) {
return b.priority - a.priority;
}
return a.createdAt - b.createdAt;
});
if (!this.processing) {
this.processQueue();
}
}
async processQueue() {
this.processing = true;
while (this.queue.length > 0 || this.activeRequests > 0) {
// Wait if we've hit the concurrency limit
if (this.activeRequests >= this.maxConcurrent) {
await this.sleep(100);
continue;
}
// Get next request
const request = this.queue.shift();
if (!request) {
await this.sleep(100);
continue;
}
// Rate limiting
const now = Date.now();
const timeSinceLastRequest = now - this.lastRequestTime;
if (timeSinceLastRequest < this.rateLimit) {
await this.sleep(this.rateLimit - timeSinceLastRequest);
}
this.lastRequestTime = Date.now();
this.executeRequest(request);
}
this.processing = false;
this.emit('queueComplete');
}
async executeRequest(request) {
this.activeRequests++;
try {
const response = await axios({
method: request.method,
url: request.url,
headers: request.headers,
data: request.data,
timeout: 30000
});
if (response.status === 200) {
this.emit('requestSuccess', {
request,
response: response.data
});
} else {
throw new Error(`HTTP ${response.status}`);
}
} catch (error) {
console.error(`Request failed for ${request.url}:`, error.message);
// Retry logic
if (request.retryCount < request.maxRetries) {
request.retryCount++;
const backoffDelay = Math.pow(2, request.retryCount) * 1000;
setTimeout(() => {
this.addRequest(request);
}, backoffDelay);
this.emit('requestRetry', { request, error });
} else {
this.emit('requestFailed', { request, error });
}
} finally {
this.activeRequests--;
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
getStats() {
return {
queueLength: this.queue.length,
activeRequests: this.activeRequests,
processing: this.processing
};
}
}
// Usage example
const queue = new RequestQueue({
maxConcurrent: 5,
rateLimit: 500 // 500ms between requests
});
queue.on('requestSuccess', ({ request, response }) => {
console.log(`✓ Success: ${request.url}`);
// Process your scraped data here
});
queue.on('requestFailed', ({ request, error }) => {
console.log(`✗ Failed: ${request.url} - ${error.message}`);
});
queue.on('queueComplete', () => {
console.log('All requests processed!');
});
// Add requests
const urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3'
];
urls.forEach(url => {
const request = new QueuedRequest(url, {
headers: { 'User-Agent': 'MyBot/1.0' },
priority: Priority.NORMAL
});
queue.addRequest(request);
});
Advanced Features and Optimizations
Circuit Breaker Pattern
Implement circuit breakers to handle failing services gracefully:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Database-Backed Queue System
For production environments, consider using database-backed queues:
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class QueuedJob(Base):
__tablename__ = 'queued_jobs'
id = sa.Column(sa.Integer, primary_key=True)
url = sa.Column(sa.String(2048), nullable=False)
method = sa.Column(sa.String(10), default='GET')
headers = sa.Column(sa.JSON)
data = sa.Column(sa.JSON)
priority = sa.Column(sa.Integer, default=2)
max_retries = sa.Column(sa.Integer, default=3)
retry_count = sa.Column(sa.Integer, default=0)
status = sa.Column(sa.String(20), default='pending')
created_at = sa.Column(sa.DateTime, default=sa.func.now())
processed_at = sa.Column(sa.DateTime)
error_message = sa.Column(sa.Text)
class DatabaseQueue:
def __init__(self, database_url):
self.engine = sa.create_engine(database_url)
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def add_job(self, url, **kwargs):
job = QueuedJob(url=url, **kwargs)
self.session.add(job)
self.session.commit()
return job.id
def get_next_job(self):
return self.session.query(QueuedJob)\
.filter(QueuedJob.status == 'pending')\
.order_by(QueuedJob.priority.desc(), QueuedJob.created_at)\
.first()
def mark_completed(self, job_id):
job = self.session.query(QueuedJob).get(job_id)
if job:
job.status = 'completed'
job.processed_at = sa.func.now()
self.session.commit()
def mark_failed(self, job_id, error_message):
job = self.session.query(QueuedJob).get(job_id)
if job:
job.status = 'failed'
job.error_message = error_message
job.processed_at = sa.func.now()
self.session.commit()
Integration with Web Scraping Tools
When implementing request queuing for web scraping, consider integrating with tools that can handle dynamic content. For complex scenarios involving JavaScript-heavy sites, you might need to coordinate your queuing system with browser automation tools that can handle browser sessions effectively.
Monitoring and Observability
Implement comprehensive monitoring for your queue system:
import logging
import time
from dataclasses import dataclass
from typing import Dict
@dataclass
class QueueMetrics:
requests_processed: int = 0
requests_failed: int = 0
requests_retried: int = 0
average_response_time: float = 0.0
queue_length: int = 0
active_workers: int = 0
class MonitoredRequestQueue(RequestQueue):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = QueueMetrics()
self.response_times = []
async def _execute_request(self, request: QueuedRequest, worker_name: str) -> bool:
start_time = time.time()
try:
success = await super()._execute_request(request, worker_name)
# Update metrics
response_time = time.time() - start_time
self.response_times.append(response_time)
if success:
self.metrics.requests_processed += 1
else:
self.metrics.requests_failed += 1
if request.retry_count > 0:
self.metrics.requests_retried += 1
# Calculate rolling average
if len(self.response_times) > 100:
self.response_times = self.response_times[-100:]
self.metrics.average_response_time = sum(self.response_times) / len(self.response_times)
self.metrics.queue_length = self.queue.qsize()
self.metrics.active_workers = self.active_requests
return success
except Exception as e:
logging.error(f"Request execution failed: {e}")
self.metrics.requests_failed += 1
return False
def get_metrics(self) -> Dict:
return {
'requests_processed': self.metrics.requests_processed,
'requests_failed': self.metrics.requests_failed,
'requests_retried': self.metrics.requests_retried,
'success_rate': self.metrics.requests_processed / max(1, self.metrics.requests_processed + self.metrics.requests_failed),
'average_response_time': self.metrics.average_response_time,
'queue_length': self.metrics.queue_length,
'active_workers': self.metrics.active_workers
}
Best Practices and Considerations
Rate Limiting Strategies
- Fixed Window: Simple but can cause traffic bursts
- Sliding Window: More sophisticated, smoother traffic distribution
- Token Bucket: Allows bursts while maintaining average rate
- Adaptive Rate Limiting: Adjusts based on server responses
Error Handling
- Implement exponential backoff for retries
- Use different retry strategies for different error types
- Set maximum retry limits to prevent infinite loops
- Log errors with sufficient context for debugging
Resource Management
- Monitor memory usage, especially for large queues
- Implement queue size limits to prevent memory exhaustion
- Use connection pooling to reduce overhead
- Consider disk-based queues for very large datasets
Security Considerations
- Validate and sanitize all URLs before processing
- Implement proper authentication and authorization
- Use HTTPS for sensitive data transmission
- Rate limit per API key or user to prevent abuse
Conclusion
Implementing effective API request queuing is essential for high-volume web scraping operations. The patterns and implementations shown here provide a solid foundation that can be adapted to specific requirements. Remember to always respect website terms of service, implement proper rate limiting, and monitor your systems for optimal performance.
For scenarios requiring more complex browser interactions, you might also need to consider how your queuing system integrates with parallel page processing to maximize efficiency while maintaining stability.
The key to successful high-volume scraping lies in balancing speed, reliability, and respectful resource usage. Start with simple implementations and gradually add complexity as your requirements grow.