How to Implement API Request Batching for Efficient Scraping
API request batching is a crucial technique for optimizing web scraping performance by grouping multiple requests together and processing them efficiently. This approach reduces overhead, improves throughput, and helps manage rate limits while maintaining system stability.
What is API Request Batching?
Request batching involves collecting multiple API requests and processing them as a group rather than handling each request individually. This technique offers several advantages:
- Reduced overhead: Fewer connection establishments and teardowns
- Better resource utilization: More efficient use of system resources
- Improved throughput: Higher requests per second with proper implementation
- Rate limit management: Better control over request frequency
- Memory optimization: Batch processing can reduce memory fragmentation
Basic Batching Implementation in Python
Here's a fundamental implementation using Python's asyncio
and aiohttp
:
import asyncio
import aiohttp
from typing import List, Dict, Any
import time
class APIBatcher:
def __init__(self, batch_size: int = 10, delay: float = 1.0):
self.batch_size = batch_size
self.delay = delay
self.request_queue = []
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def add_request(self, url: str, method: str = 'GET', **kwargs):
"""Add a request to the batch queue"""
self.request_queue.append({
'url': url,
'method': method,
'kwargs': kwargs
})
async def process_batch(self, requests: List[Dict]) -> List[Dict]:
"""Process a batch of requests concurrently"""
tasks = []
for req in requests:
task = self._make_request(
req['url'],
req['method'],
**req['kwargs']
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _make_request(self, url: str, method: str, **kwargs) -> Dict[str, Any]:
"""Make individual HTTP request"""
try:
async with self.session.request(method, url, **kwargs) as response:
return {
'url': url,
'status': response.status,
'data': await response.text(),
'headers': dict(response.headers)
}
except Exception as e:
return {
'url': url,
'error': str(e),
'status': None
}
async def execute_batches(self) -> List[Dict]:
"""Execute all queued requests in batches"""
all_results = []
while self.request_queue:
# Get next batch
batch = self.request_queue[:self.batch_size]
self.request_queue = self.request_queue[self.batch_size:]
# Process batch
batch_results = await self.process_batch(batch)
all_results.extend(batch_results)
# Delay between batches if more requests remain
if self.request_queue:
await asyncio.sleep(self.delay)
return all_results
# Usage example
async def main():
urls = [
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/posts/1',
'https://api.example.com/posts/2'
]
async with APIBatcher(batch_size=2, delay=0.5) as batcher:
# Add requests to batch
for url in urls:
batcher.add_request(url, headers={'User-Agent': 'Scraper/1.0'})
# Execute all batches
results = await batcher.execute_batches()
for result in results:
if 'error' in result:
print(f"Error for {result['url']}: {result['error']}")
else:
print(f"Success for {result['url']}: Status {result['status']}")
# Run the example
# asyncio.run(main())
JavaScript Implementation with Node.js
Here's an equivalent implementation using Node.js and the axios
library:
const axios = require('axios');
class APIBatcher {
constructor(batchSize = 10, delay = 1000) {
this.batchSize = batchSize;
this.delay = delay;
this.requestQueue = [];
}
addRequest(url, options = {}) {
this.requestQueue.push({
url,
options: {
method: 'GET',
timeout: 5000,
...options
}
});
}
async processBatch(requests) {
const promises = requests.map(req => this.makeRequest(req));
return Promise.allSettled(promises);
}
async makeRequest(request) {
try {
const response = await axios({
url: request.url,
...request.options
});
return {
url: request.url,
status: response.status,
data: response.data,
headers: response.headers
};
} catch (error) {
return {
url: request.url,
error: error.message,
status: error.response?.status || null
};
}
}
async executeBatches() {
const allResults = [];
while (this.requestQueue.length > 0) {
// Extract next batch
const batch = this.requestQueue.splice(0, this.batchSize);
// Process batch
const batchResults = await this.processBatch(batch);
allResults.push(...batchResults);
// Delay between batches
if (this.requestQueue.length > 0) {
await new Promise(resolve => setTimeout(resolve, this.delay));
}
}
return allResults;
}
}
// Usage example
async function main() {
const batcher = new APIBatcher(3, 500);
const urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2'
];
// Add requests to batch
urls.forEach(url => {
batcher.addRequest(url, {
headers: { 'User-Agent': 'Scraper/1.0' }
});
});
// Execute batches
const results = await batcher.executeBatches();
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
const data = result.value;
console.log(`Success: ${data.url} - Status: ${data.status}`);
} else {
console.log(`Error: ${result.reason}`);
}
});
}
// main().catch(console.error);
Advanced Batching Strategies
1. Priority-Based Batching
Implement priority queues to handle urgent requests first:
import heapq
from dataclasses import dataclass
from typing import Any
@dataclass
class PriorityRequest:
priority: int
url: str
method: str = 'GET'
kwargs: Dict[str, Any] = None
def __lt__(self, other):
return self.priority < other.priority
class PriorityBatcher(APIBatcher):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.priority_queue = []
def add_priority_request(self, url: str, priority: int = 5, **kwargs):
"""Add request with priority (lower number = higher priority)"""
request = PriorityRequest(
priority=priority,
url=url,
kwargs=kwargs
)
heapq.heappush(self.priority_queue, request)
def get_next_batch(self) -> List[Dict]:
"""Get next batch based on priority"""
batch = []
for _ in range(min(self.batch_size, len(self.priority_queue))):
if self.priority_queue:
req = heapq.heappop(self.priority_queue)
batch.append({
'url': req.url,
'method': req.method,
'kwargs': req.kwargs or {}
})
return batch
2. Adaptive Batch Sizing
Dynamically adjust batch sizes based on response times:
class AdaptiveBatcher(APIBatcher):
def __init__(self, initial_batch_size: int = 10, **kwargs):
super().__init__(batch_size=initial_batch_size, **kwargs)
self.min_batch_size = 1
self.max_batch_size = 50
self.response_times = []
self.adjustment_threshold = 5
def adjust_batch_size(self, batch_time: float, batch_size: int):
"""Adjust batch size based on performance"""
avg_time_per_request = batch_time / batch_size
self.response_times.append(avg_time_per_request)
if len(self.response_times) >= self.adjustment_threshold:
recent_avg = sum(self.response_times[-5:]) / 5
if recent_avg > 2.0 and self.batch_size > self.min_batch_size:
# Slow responses, reduce batch size
self.batch_size = max(self.min_batch_size, self.batch_size - 2)
elif recent_avg < 0.5 and self.batch_size < self.max_batch_size:
# Fast responses, increase batch size
self.batch_size = min(self.max_batch_size, self.batch_size + 2)
async def process_batch_with_timing(self, requests: List[Dict]) -> List[Dict]:
"""Process batch and measure timing"""
start_time = time.time()
results = await self.process_batch(requests)
batch_time = time.time() - start_time
self.adjust_batch_size(batch_time, len(requests))
return results
Error Handling and Retry Logic
Implement robust error handling with exponential backoff:
import random
from typing import Optional
class ResilientBatcher(APIBatcher):
def __init__(self, max_retries: int = 3, base_delay: float = 1.0, **kwargs):
super().__init__(**kwargs)
self.max_retries = max_retries
self.base_delay = base_delay
self.failed_requests = []
async def process_batch_with_retry(self, requests: List[Dict]) -> List[Dict]:
"""Process batch with retry logic"""
results = []
for request in requests:
result = await self._make_request_with_retry(request)
results.append(result)
return results
async def _make_request_with_retry(self, request: Dict, attempt: int = 0) -> Dict:
"""Make request with exponential backoff retry"""
try:
result = await self._make_request(
request['url'],
request['method'],
**request['kwargs']
)
# Check if request was successful
if result.get('status', 0) >= 500 and attempt < self.max_retries:
# Server error, retry with backoff
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
return await self._make_request_with_retry(request, attempt + 1)
return result
except Exception as e:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
return await self._make_request_with_retry(request, attempt + 1)
else:
return {
'url': request['url'],
'error': str(e),
'status': None,
'attempts': attempt + 1
}
Rate Limit Management
Implement sophisticated rate limiting for different API endpoints:
from collections import defaultdict
import time
class RateLimitedBatcher(APIBatcher):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.rate_limits = defaultdict(lambda: {'requests': 0, 'window_start': time.time()})
self.endpoint_limits = {
'api.example.com': {'requests_per_minute': 60, 'concurrent': 5},
'api.other.com': {'requests_per_minute': 100, 'concurrent': 10}
}
def get_domain(self, url: str) -> str:
"""Extract domain from URL"""
from urllib.parse import urlparse
return urlparse(url).netloc
async def check_rate_limit(self, url: str) -> bool:
"""Check if request can be made without violating rate limits"""
domain = self.get_domain(url)
now = time.time()
if domain not in self.endpoint_limits:
return True
limit_info = self.rate_limits[domain]
limits = self.endpoint_limits[domain]
# Reset window if minute has passed
if now - limit_info['window_start'] >= 60:
limit_info['requests'] = 0
limit_info['window_start'] = now
return limit_info['requests'] < limits['requests_per_minute']
async def process_batch(self, requests: List[Dict]) -> List[Dict]:
"""Process batch respecting rate limits"""
# Group requests by domain
domain_groups = defaultdict(list)
for req in requests:
domain = self.get_domain(req['url'])
domain_groups[domain].append(req)
all_results = []
# Process each domain group separately
for domain, domain_requests in domain_groups.items():
domain_results = []
for request in domain_requests:
if await self.check_rate_limit(request['url']):
# Update rate limit counter
self.rate_limits[domain]['requests'] += 1
result = await self._make_request(
request['url'],
request['method'],
**request['kwargs']
)
domain_results.append(result)
else:
# Rate limit exceeded, delay this request
domain_results.append({
'url': request['url'],
'error': 'Rate limit exceeded',
'status': 429
})
all_results.extend(domain_results)
return all_results
Memory-Efficient Processing for Large Datasets
When dealing with handling large-scale web scraping operations, implement streaming batch processing:
class StreamingBatcher:
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
async def process_stream(self, request_generator, callback):
"""Process requests as a stream with callback for results"""
batch = []
async for request in request_generator:
batch.append(request)
if len(batch) >= self.batch_size:
results = await self.process_batch(batch)
await callback(results)
batch = []
# Process remaining requests
if batch:
results = await self.process_batch(batch)
await callback(results)
async def process_batch(self, requests):
# Implementation similar to previous examples
pass
# Usage with generator
async def url_generator():
"""Generate URLs from a large dataset"""
with open('large_url_list.txt', 'r') as f:
for line in f:
yield {'url': line.strip(), 'method': 'GET', 'kwargs': {}}
async def result_callback(results):
"""Process batch results"""
for result in results:
# Save to database, file, etc.
print(f"Processed: {result['url']}")
# Process large dataset efficiently
batcher = StreamingBatcher(batch_size=50)
await batcher.process_stream(url_generator(), result_callback)
Monitoring and Analytics
Implement comprehensive monitoring for your batching system:
import logging
from dataclasses import dataclass
from typing import Dict
import json
@dataclass
class BatchMetrics:
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
average_response_time: float = 0.0
batches_processed: int = 0
def to_dict(self) -> Dict:
return {
'total_requests': self.total_requests,
'successful_requests': self.successful_requests,
'failed_requests': self.failed_requests,
'success_rate': self.successful_requests / max(self.total_requests, 1),
'average_response_time': self.average_response_time,
'batches_processed': self.batches_processed
}
class MonitoredBatcher(APIBatcher):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.metrics = BatchMetrics()
self.logger = logging.getLogger(__name__)
async def process_batch(self, requests: List[Dict]) -> List[Dict]:
"""Process batch with monitoring"""
start_time = time.time()
results = await super().process_batch(requests)
batch_time = time.time() - start_time
# Update metrics
self.metrics.total_requests += len(results)
self.metrics.batches_processed += 1
successful = sum(1 for r in results if r.get('status', 0) < 400)
self.metrics.successful_requests += successful
self.metrics.failed_requests += len(results) - successful
# Update average response time
old_avg = self.metrics.average_response_time
old_total = self.metrics.batches_processed - 1
self.metrics.average_response_time = (
(old_avg * old_total + batch_time) / self.metrics.batches_processed
)
# Log batch completion
self.logger.info(f"Batch completed: {len(results)} requests in {batch_time:.2f}s")
return results
def get_metrics(self) -> Dict:
"""Get current metrics"""
return self.metrics.to_dict()
def log_final_metrics(self):
"""Log final performance metrics"""
metrics = self.get_metrics()
self.logger.info(f"Final metrics: {json.dumps(metrics, indent=2)}")
Best Practices and Considerations
1. Optimal Batch Sizing
- Start with 10-20 requests per batch and adjust based on performance
- Consider memory constraints and API response sizes
- Monitor response times and error rates to find the sweet spot
2. Connection Management
- Use connection pooling to reuse HTTP connections
- Set appropriate timeouts for requests
- Implement proper session management
3. Error Recovery
- Implement exponential backoff for retries
- Handle different error types appropriately (4xx vs 5xx)
- Log errors for monitoring and debugging
4. Resource Management
- Monitor memory usage, especially with large responses
- Implement backpressure mechanisms
- Use streaming when processing large datasets
Conclusion
API request batching is essential for efficient web scraping at scale. By implementing proper batching strategies, you can significantly improve throughput while respecting rate limits and maintaining system stability. The key is to balance batch size, timing, error handling, and resource management based on your specific use case and the APIs you're working with.
Remember to always respect the terms of service of the APIs you're scraping and implement appropriate rate limiting to avoid being blocked. When working with complex web applications that require JavaScript execution, consider combining batching techniques with browser automation tools for optimal results.