What is HTTP Request Prioritization and How Can I Implement It?
HTTP request prioritization is a crucial technique in web scraping and API consumption that allows you to control the order and timing of HTTP requests based on their importance, urgency, or resource requirements. This approach helps optimize performance, manage server load, respect rate limits, and ensure critical requests are processed first.
Understanding HTTP Request Prioritization
Request prioritization involves assigning different priority levels to HTTP requests and processing them accordingly. This is particularly important in scenarios where you're making hundreds or thousands of requests and need to:
- Ensure critical data is fetched first
- Respect API rate limits
- Manage server resources efficiently
- Handle failures gracefully with retry mechanisms
- Balance load across multiple endpoints
Core Concepts of Request Prioritization
Priority Levels
Most prioritization systems use a numeric or categorical approach:
from enum import Enum
class RequestPriority(Enum):
CRITICAL = 1 # Must be processed immediately
HIGH = 2 # Process before normal requests
NORMAL = 3 # Standard priority
LOW = 4 # Process when resources are available
BACKGROUND = 5 # Process during idle time
Queue Management
Requests are typically managed using priority queues where higher priority items are processed first:
import heapq
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
@dataclass
class PriorityRequest:
priority: int
timestamp: float
url: str
method: str = "GET"
headers: Optional[Dict[str, str]] = None
data: Optional[Any] = None
def __lt__(self, other):
# Lower priority number = higher priority
if self.priority != other.priority:
return self.priority < other.priority
# If same priority, older requests first
return self.timestamp < other.timestamp
class RequestQueue:
def __init__(self):
self._queue = []
self._index = 0
def add_request(self, request: PriorityRequest):
heapq.heappush(self._queue, request)
def get_next_request(self) -> Optional[PriorityRequest]:
if self._queue:
return heapq.heappop(self._queue)
return None
def size(self) -> int:
return len(self._queue)
Implementation Strategies
1. Basic Priority Queue Implementation
Here's a comprehensive Python implementation using asyncio
and aiohttp
:
import asyncio
import aiohttp
import time
from typing import List, Callable, Optional
import logging
class PriorityHTTPClient:
def __init__(self, max_concurrent_requests: int = 10,
requests_per_second: float = 5.0):
self.queue = RequestQueue()
self.max_concurrent = max_concurrent_requests
self.rate_limit = requests_per_second
self.last_request_time = 0
self.session: Optional[aiohttp.ClientSession] = None
self.running = False
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def add_request(self, url: str, priority: int = RequestPriority.NORMAL.value,
method: str = "GET", **kwargs):
request = PriorityRequest(
priority=priority,
timestamp=time.time(),
url=url,
method=method,
headers=kwargs.get('headers'),
data=kwargs.get('data')
)
self.queue.add_request(request)
async def _respect_rate_limit(self):
"""Ensure we don't exceed the configured rate limit"""
if self.rate_limit > 0:
min_interval = 1.0 / self.rate_limit
elapsed = time.time() - self.last_request_time
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
self.last_request_time = time.time()
async def _execute_request(self, request: PriorityRequest) -> Dict:
"""Execute a single HTTP request"""
await self._respect_rate_limit()
try:
kwargs = {
'method': request.method,
'url': request.url,
'headers': request.headers or {}
}
if request.data:
kwargs['json' if isinstance(request.data, dict) else 'data'] = request.data
async with self.session.request(**kwargs) as response:
content = await response.text()
return {
'url': request.url,
'status': response.status,
'content': content,
'headers': dict(response.headers),
'priority': request.priority
}
except Exception as e:
logging.error(f"Request failed for {request.url}: {e}")
return {
'url': request.url,
'error': str(e),
'priority': request.priority
}
async def process_requests(self, callback: Optional[Callable] = None):
"""Process all requests in the queue with proper prioritization"""
self.running = True
semaphore = asyncio.Semaphore(self.max_concurrent)
async def worker():
while self.running or self.queue.size() > 0:
request = self.queue.get_next_request()
if not request:
await asyncio.sleep(0.1)
continue
async with semaphore:
result = await self._execute_request(request)
if callback:
await callback(result)
# Start multiple workers
workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)]
await asyncio.gather(*workers)
2. JavaScript Implementation with Node.js
const axios = require('axios');
const PriorityQueue = require('js-priority-queue');
class PriorityHTTPClient {
constructor(options = {}) {
this.maxConcurrent = options.maxConcurrent || 10;
this.rateLimit = options.rateLimit || 5; // requests per second
this.queue = new PriorityQueue({
comparator: (a, b) => {
if (a.priority !== b.priority) {
return a.priority - b.priority;
}
return a.timestamp - b.timestamp;
}
});
this.activeRequests = 0;
this.lastRequestTime = 0;
}
addRequest(url, priority = 3, options = {}) {
const request = {
url,
priority,
timestamp: Date.now(),
options: {
method: 'GET',
timeout: 30000,
...options
}
};
this.queue.queue(request);
this.processQueue();
}
async respectRateLimit() {
if (this.rateLimit > 0) {
const minInterval = 1000 / this.rateLimit;
const elapsed = Date.now() - this.lastRequestTime;
if (elapsed < minInterval) {
await new Promise(resolve =>
setTimeout(resolve, minInterval - elapsed)
);
}
}
this.lastRequestTime = Date.now();
}
async executeRequest(request) {
await this.respectRateLimit();
this.activeRequests++;
try {
const response = await axios({
url: request.url,
...request.options
});
return {
url: request.url,
status: response.status,
data: response.data,
headers: response.headers,
priority: request.priority
};
} catch (error) {
return {
url: request.url,
error: error.message,
priority: request.priority
};
} finally {
this.activeRequests--;
// Continue processing queue
setImmediate(() => this.processQueue());
}
}
async processQueue() {
while (this.queue.length > 0 && this.activeRequests < this.maxConcurrent) {
const request = this.queue.dequeue();
this.executeRequest(request).then(result => {
this.onResult(result);
});
}
}
onResult(result) {
// Override this method to handle results
console.log('Request completed:', result.url, 'Status:', result.status);
}
}
// Usage example
const client = new PriorityHTTPClient({
maxConcurrent: 5,
rateLimit: 2 // 2 requests per second
});
// Add requests with different priorities
client.addRequest('https://api.example.com/critical', 1); // Critical
client.addRequest('https://api.example.com/normal', 3); // Normal
client.addRequest('https://api.example.com/low', 4); // Low priority
Advanced Prioritization Techniques
1. Dynamic Priority Adjustment
class AdaptivePriorityClient(PriorityHTTPClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.retry_counts = {}
self.response_times = {}
def adjust_priority_on_failure(self, request: PriorityRequest):
"""Increase priority for failed requests (lower number = higher priority)"""
retry_count = self.retry_counts.get(request.url, 0)
if retry_count > 0:
# Increase priority for retries
request.priority = max(1, request.priority - retry_count)
self.retry_counts[request.url] = retry_count + 1
def adjust_priority_on_response_time(self, request: PriorityRequest, response_time: float):
"""Adjust priority based on server response time"""
if response_time > 5.0: # Slow response
# Lower priority for slow endpoints
request.priority = min(5, request.priority + 1)
elif response_time < 1.0: # Fast response
# Maintain or increase priority for fast endpoints
request.priority = max(1, request.priority - 1)
2. Content-Based Prioritization
def assign_content_priority(url: str, content_type: str = None) -> int:
"""Assign priority based on content type and URL patterns"""
# API endpoints get higher priority
if '/api/' in url:
return RequestPriority.HIGH.value
# Critical resources
if any(pattern in url for pattern in ['/auth', '/login', '/config']):
return RequestPriority.CRITICAL.value
# Media files get lower priority
if content_type and content_type.startswith(('image/', 'video/', 'audio/')):
return RequestPriority.LOW.value
# Default priority
return RequestPriority.NORMAL.value
Integration with Web Scraping
When implementing request prioritization in web scraping scenarios, you can combine it with tools like Puppeteer for enhanced control. For instance, when monitoring network requests in Puppeteer, you can apply prioritization to determine which resources to load first.
class WebScrapingPriorityManager:
def __init__(self):
self.client = PriorityHTTPClient(max_concurrent_requests=8, requests_per_second=3)
async def scrape_with_priorities(self, urls_with_priorities):
"""Scrape multiple URLs with different priorities"""
async with self.client:
# Add all requests to queue
for url, priority in urls_with_priorities:
self.client.add_request(url, priority)
# Process with callback
results = []
async def collect_results(result):
results.append(result)
await self.client.process_requests(callback=collect_results)
return results
# Usage
scraper = WebScrapingPriorityManager()
urls_to_scrape = [
('https://example.com/important-data', RequestPriority.HIGH.value),
('https://example.com/normal-page', RequestPriority.NORMAL.value),
('https://example.com/background-info', RequestPriority.LOW.value)
]
results = await scraper.scrape_with_priorities(urls_to_scrape)
Best Practices and Considerations
1. Rate Limiting Integration
Always combine prioritization with proper rate limiting to respect server capabilities and avoid being blocked:
# Monitor your request rates
curl -w "@curl-format.txt" -s -o /dev/null https://api.example.com/endpoint
# Where curl-format.txt contains:
# time_namelookup: %{time_namelookup}\n
# time_connect: %{time_connect}\n
# time_appconnect: %{time_appconnect}\n
# time_pretransfer: %{time_pretransfer}\n
# time_redirect: %{time_redirect}\n
# time_starttransfer: %{time_starttransfer}\n
# ----------\n
# time_total: %{time_total}\n
2. Error Handling and Retries
Implement exponential backoff for failed high-priority requests:
async def execute_with_retry(self, request: PriorityRequest, max_retries: int = 3):
"""Execute request with exponential backoff retry logic"""
for attempt in range(max_retries + 1):
try:
result = await self._execute_request(request)
if 'error' not in result:
return result
except Exception as e:
if attempt == max_retries:
raise e
# Exponential backoff: 1s, 2s, 4s, 8s...
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
# Increase priority for retries
request.priority = max(1, request.priority - 1)
3. Memory Management
For large-scale scraping operations, implement queue size limits and request cleanup:
class MemoryEfficientQueue(RequestQueue):
def __init__(self, max_size: int = 10000):
super().__init__()
self.max_size = max_size
def add_request(self, request: PriorityRequest):
if len(self._queue) >= self.max_size:
# Remove lowest priority items
self._queue.sort()
self._queue = self._queue[:self.max_size // 2]
heapq.heapify(self._queue)
heapq.heappush(self._queue, request)
Performance Monitoring and Optimization
Monitor your prioritization system's effectiveness:
import time
from collections import defaultdict
class PriorityMetrics:
def __init__(self):
self.priority_stats = defaultdict(lambda: {
'count': 0,
'total_time': 0,
'success_rate': 0,
'avg_response_time': 0
})
def record_request(self, priority: int, success: bool, response_time: float):
stats = self.priority_stats[priority]
stats['count'] += 1
stats['total_time'] += response_time
stats['success_rate'] = (stats['success_rate'] * (stats['count'] - 1) +
(1 if success else 0)) / stats['count']
stats['avg_response_time'] = stats['total_time'] / stats['count']
def get_performance_report(self):
return dict(self.priority_stats)
Conclusion
HTTP request prioritization is essential for building efficient, scalable web scraping and API consumption systems. By implementing proper priority queues, rate limiting, and adaptive algorithms, you can significantly improve your application's performance while respecting server resources and avoiding common pitfalls like rate limiting blocks.
The key to successful implementation lies in understanding your specific use case requirements, properly categorizing your requests by importance, and continuously monitoring and adjusting your prioritization strategy based on real-world performance data. Whether you're building a simple web scraper or a complex distributed crawling system, request prioritization will help you achieve better results with optimal resource utilization.
Remember to always respect robots.txt files, implement proper error handling, and consider the ethical implications of your scraping activities. When working with complex scenarios involving JavaScript-heavy sites, you might also want to explore how to handle AJAX requests using Puppeteer to ensure all dynamic content is properly prioritized and captured.