Table of contents

How to Implement API Request Batching for Efficient Scraping

API request batching is a crucial technique for optimizing web scraping performance by grouping multiple requests together and processing them efficiently. This approach reduces overhead, improves throughput, and helps manage rate limits while maintaining system stability.

What is API Request Batching?

Request batching involves collecting multiple API requests and processing them as a group rather than handling each request individually. This technique offers several advantages:

  • Reduced overhead: Fewer connection establishments and teardowns
  • Better resource utilization: More efficient use of system resources
  • Improved throughput: Higher requests per second with proper implementation
  • Rate limit management: Better control over request frequency
  • Memory optimization: Batch processing can reduce memory fragmentation

Basic Batching Implementation in Python

Here's a fundamental implementation using Python's asyncio and aiohttp:

import asyncio
import aiohttp
from typing import List, Dict, Any
import time

class APIBatcher:
    def __init__(self, batch_size: int = 10, delay: float = 1.0):
        self.batch_size = batch_size
        self.delay = delay
        self.request_queue = []
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    def add_request(self, url: str, method: str = 'GET', **kwargs):
        """Add a request to the batch queue"""
        self.request_queue.append({
            'url': url,
            'method': method,
            'kwargs': kwargs
        })

    async def process_batch(self, requests: List[Dict]) -> List[Dict]:
        """Process a batch of requests concurrently"""
        tasks = []
        for req in requests:
            task = self._make_request(
                req['url'], 
                req['method'], 
                **req['kwargs']
            )
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

    async def _make_request(self, url: str, method: str, **kwargs) -> Dict[str, Any]:
        """Make individual HTTP request"""
        try:
            async with self.session.request(method, url, **kwargs) as response:
                return {
                    'url': url,
                    'status': response.status,
                    'data': await response.text(),
                    'headers': dict(response.headers)
                }
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'status': None
            }

    async def execute_batches(self) -> List[Dict]:
        """Execute all queued requests in batches"""
        all_results = []

        while self.request_queue:
            # Get next batch
            batch = self.request_queue[:self.batch_size]
            self.request_queue = self.request_queue[self.batch_size:]

            # Process batch
            batch_results = await self.process_batch(batch)
            all_results.extend(batch_results)

            # Delay between batches if more requests remain
            if self.request_queue:
                await asyncio.sleep(self.delay)

        return all_results

# Usage example
async def main():
    urls = [
        'https://api.example.com/users/1',
        'https://api.example.com/users/2',
        'https://api.example.com/posts/1',
        'https://api.example.com/posts/2'
    ]

    async with APIBatcher(batch_size=2, delay=0.5) as batcher:
        # Add requests to batch
        for url in urls:
            batcher.add_request(url, headers={'User-Agent': 'Scraper/1.0'})

        # Execute all batches
        results = await batcher.execute_batches()

        for result in results:
            if 'error' in result:
                print(f"Error for {result['url']}: {result['error']}")
            else:
                print(f"Success for {result['url']}: Status {result['status']}")

# Run the example
# asyncio.run(main())

JavaScript Implementation with Node.js

Here's an equivalent implementation using Node.js and the axios library:

const axios = require('axios');

class APIBatcher {
    constructor(batchSize = 10, delay = 1000) {
        this.batchSize = batchSize;
        this.delay = delay;
        this.requestQueue = [];
    }

    addRequest(url, options = {}) {
        this.requestQueue.push({
            url,
            options: {
                method: 'GET',
                timeout: 5000,
                ...options
            }
        });
    }

    async processBatch(requests) {
        const promises = requests.map(req => this.makeRequest(req));
        return Promise.allSettled(promises);
    }

    async makeRequest(request) {
        try {
            const response = await axios({
                url: request.url,
                ...request.options
            });

            return {
                url: request.url,
                status: response.status,
                data: response.data,
                headers: response.headers
            };
        } catch (error) {
            return {
                url: request.url,
                error: error.message,
                status: error.response?.status || null
            };
        }
    }

    async executeBatches() {
        const allResults = [];

        while (this.requestQueue.length > 0) {
            // Extract next batch
            const batch = this.requestQueue.splice(0, this.batchSize);

            // Process batch
            const batchResults = await this.processBatch(batch);
            allResults.push(...batchResults);

            // Delay between batches
            if (this.requestQueue.length > 0) {
                await new Promise(resolve => setTimeout(resolve, this.delay));
            }
        }

        return allResults;
    }
}

// Usage example
async function main() {
    const batcher = new APIBatcher(3, 500);

    const urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/users/1',
        'https://jsonplaceholder.typicode.com/users/2'
    ];

    // Add requests to batch
    urls.forEach(url => {
        batcher.addRequest(url, {
            headers: { 'User-Agent': 'Scraper/1.0' }
        });
    });

    // Execute batches
    const results = await batcher.executeBatches();

    results.forEach((result, index) => {
        if (result.status === 'fulfilled') {
            const data = result.value;
            console.log(`Success: ${data.url} - Status: ${data.status}`);
        } else {
            console.log(`Error: ${result.reason}`);
        }
    });
}

// main().catch(console.error);

Advanced Batching Strategies

1. Priority-Based Batching

Implement priority queues to handle urgent requests first:

import heapq
from dataclasses import dataclass
from typing import Any

@dataclass
class PriorityRequest:
    priority: int
    url: str
    method: str = 'GET'
    kwargs: Dict[str, Any] = None

    def __lt__(self, other):
        return self.priority < other.priority

class PriorityBatcher(APIBatcher):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.priority_queue = []

    def add_priority_request(self, url: str, priority: int = 5, **kwargs):
        """Add request with priority (lower number = higher priority)"""
        request = PriorityRequest(
            priority=priority,
            url=url,
            kwargs=kwargs
        )
        heapq.heappush(self.priority_queue, request)

    def get_next_batch(self) -> List[Dict]:
        """Get next batch based on priority"""
        batch = []
        for _ in range(min(self.batch_size, len(self.priority_queue))):
            if self.priority_queue:
                req = heapq.heappop(self.priority_queue)
                batch.append({
                    'url': req.url,
                    'method': req.method,
                    'kwargs': req.kwargs or {}
                })
        return batch

2. Adaptive Batch Sizing

Dynamically adjust batch sizes based on response times:

class AdaptiveBatcher(APIBatcher):
    def __init__(self, initial_batch_size: int = 10, **kwargs):
        super().__init__(batch_size=initial_batch_size, **kwargs)
        self.min_batch_size = 1
        self.max_batch_size = 50
        self.response_times = []
        self.adjustment_threshold = 5

    def adjust_batch_size(self, batch_time: float, batch_size: int):
        """Adjust batch size based on performance"""
        avg_time_per_request = batch_time / batch_size
        self.response_times.append(avg_time_per_request)

        if len(self.response_times) >= self.adjustment_threshold:
            recent_avg = sum(self.response_times[-5:]) / 5

            if recent_avg > 2.0 and self.batch_size > self.min_batch_size:
                # Slow responses, reduce batch size
                self.batch_size = max(self.min_batch_size, self.batch_size - 2)
            elif recent_avg < 0.5 and self.batch_size < self.max_batch_size:
                # Fast responses, increase batch size
                self.batch_size = min(self.max_batch_size, self.batch_size + 2)

    async def process_batch_with_timing(self, requests: List[Dict]) -> List[Dict]:
        """Process batch and measure timing"""
        start_time = time.time()
        results = await self.process_batch(requests)
        batch_time = time.time() - start_time

        self.adjust_batch_size(batch_time, len(requests))
        return results

Error Handling and Retry Logic

Implement robust error handling with exponential backoff:

import random
from typing import Optional

class ResilientBatcher(APIBatcher):
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0, **kwargs):
        super().__init__(**kwargs)
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.failed_requests = []

    async def process_batch_with_retry(self, requests: List[Dict]) -> List[Dict]:
        """Process batch with retry logic"""
        results = []

        for request in requests:
            result = await self._make_request_with_retry(request)
            results.append(result)

        return results

    async def _make_request_with_retry(self, request: Dict, attempt: int = 0) -> Dict:
        """Make request with exponential backoff retry"""
        try:
            result = await self._make_request(
                request['url'],
                request['method'],
                **request['kwargs']
            )

            # Check if request was successful
            if result.get('status', 0) >= 500 and attempt < self.max_retries:
                # Server error, retry with backoff
                delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(delay)
                return await self._make_request_with_retry(request, attempt + 1)

            return result

        except Exception as e:
            if attempt < self.max_retries:
                delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(delay)
                return await self._make_request_with_retry(request, attempt + 1)
            else:
                return {
                    'url': request['url'],
                    'error': str(e),
                    'status': None,
                    'attempts': attempt + 1
                }

Rate Limit Management

Implement sophisticated rate limiting for different API endpoints:

from collections import defaultdict
import time

class RateLimitedBatcher(APIBatcher):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.rate_limits = defaultdict(lambda: {'requests': 0, 'window_start': time.time()})
        self.endpoint_limits = {
            'api.example.com': {'requests_per_minute': 60, 'concurrent': 5},
            'api.other.com': {'requests_per_minute': 100, 'concurrent': 10}
        }

    def get_domain(self, url: str) -> str:
        """Extract domain from URL"""
        from urllib.parse import urlparse
        return urlparse(url).netloc

    async def check_rate_limit(self, url: str) -> bool:
        """Check if request can be made without violating rate limits"""
        domain = self.get_domain(url)
        now = time.time()

        if domain not in self.endpoint_limits:
            return True

        limit_info = self.rate_limits[domain]
        limits = self.endpoint_limits[domain]

        # Reset window if minute has passed
        if now - limit_info['window_start'] >= 60:
            limit_info['requests'] = 0
            limit_info['window_start'] = now

        return limit_info['requests'] < limits['requests_per_minute']

    async def process_batch(self, requests: List[Dict]) -> List[Dict]:
        """Process batch respecting rate limits"""
        # Group requests by domain
        domain_groups = defaultdict(list)
        for req in requests:
            domain = self.get_domain(req['url'])
            domain_groups[domain].append(req)

        all_results = []

        # Process each domain group separately
        for domain, domain_requests in domain_groups.items():
            domain_results = []

            for request in domain_requests:
                if await self.check_rate_limit(request['url']):
                    # Update rate limit counter
                    self.rate_limits[domain]['requests'] += 1
                    result = await self._make_request(
                        request['url'],
                        request['method'],
                        **request['kwargs']
                    )
                    domain_results.append(result)
                else:
                    # Rate limit exceeded, delay this request
                    domain_results.append({
                        'url': request['url'],
                        'error': 'Rate limit exceeded',
                        'status': 429
                    })

            all_results.extend(domain_results)

        return all_results

Memory-Efficient Processing for Large Datasets

When dealing with handling large-scale web scraping operations, implement streaming batch processing:

class StreamingBatcher:
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size

    async def process_stream(self, request_generator, callback):
        """Process requests as a stream with callback for results"""
        batch = []

        async for request in request_generator:
            batch.append(request)

            if len(batch) >= self.batch_size:
                results = await self.process_batch(batch)
                await callback(results)
                batch = []

        # Process remaining requests
        if batch:
            results = await self.process_batch(batch)
            await callback(results)

    async def process_batch(self, requests):
        # Implementation similar to previous examples
        pass

# Usage with generator
async def url_generator():
    """Generate URLs from a large dataset"""
    with open('large_url_list.txt', 'r') as f:
        for line in f:
            yield {'url': line.strip(), 'method': 'GET', 'kwargs': {}}

async def result_callback(results):
    """Process batch results"""
    for result in results:
        # Save to database, file, etc.
        print(f"Processed: {result['url']}")

# Process large dataset efficiently
batcher = StreamingBatcher(batch_size=50)
await batcher.process_stream(url_generator(), result_callback)

Monitoring and Analytics

Implement comprehensive monitoring for your batching system:

import logging
from dataclasses import dataclass
from typing import Dict
import json

@dataclass
class BatchMetrics:
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    average_response_time: float = 0.0
    batches_processed: int = 0

    def to_dict(self) -> Dict:
        return {
            'total_requests': self.total_requests,
            'successful_requests': self.successful_requests,
            'failed_requests': self.failed_requests,
            'success_rate': self.successful_requests / max(self.total_requests, 1),
            'average_response_time': self.average_response_time,
            'batches_processed': self.batches_processed
        }

class MonitoredBatcher(APIBatcher):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.metrics = BatchMetrics()
        self.logger = logging.getLogger(__name__)

    async def process_batch(self, requests: List[Dict]) -> List[Dict]:
        """Process batch with monitoring"""
        start_time = time.time()
        results = await super().process_batch(requests)
        batch_time = time.time() - start_time

        # Update metrics
        self.metrics.total_requests += len(results)
        self.metrics.batches_processed += 1

        successful = sum(1 for r in results if r.get('status', 0) < 400)
        self.metrics.successful_requests += successful
        self.metrics.failed_requests += len(results) - successful

        # Update average response time
        old_avg = self.metrics.average_response_time
        old_total = self.metrics.batches_processed - 1
        self.metrics.average_response_time = (
            (old_avg * old_total + batch_time) / self.metrics.batches_processed
        )

        # Log batch completion
        self.logger.info(f"Batch completed: {len(results)} requests in {batch_time:.2f}s")

        return results

    def get_metrics(self) -> Dict:
        """Get current metrics"""
        return self.metrics.to_dict()

    def log_final_metrics(self):
        """Log final performance metrics"""
        metrics = self.get_metrics()
        self.logger.info(f"Final metrics: {json.dumps(metrics, indent=2)}")

Best Practices and Considerations

1. Optimal Batch Sizing

  • Start with 10-20 requests per batch and adjust based on performance
  • Consider memory constraints and API response sizes
  • Monitor response times and error rates to find the sweet spot

2. Connection Management

  • Use connection pooling to reuse HTTP connections
  • Set appropriate timeouts for requests
  • Implement proper session management

3. Error Recovery

  • Implement exponential backoff for retries
  • Handle different error types appropriately (4xx vs 5xx)
  • Log errors for monitoring and debugging

4. Resource Management

  • Monitor memory usage, especially with large responses
  • Implement backpressure mechanisms
  • Use streaming when processing large datasets

Conclusion

API request batching is essential for efficient web scraping at scale. By implementing proper batching strategies, you can significantly improve throughput while respecting rate limits and maintaining system stability. The key is to balance batch size, timing, error handling, and resource management based on your specific use case and the APIs you're working with.

Remember to always respect the terms of service of the APIs you're scraping and implement appropriate rate limiting to avoid being blocked. When working with complex web applications that require JavaScript execution, consider combining batching techniques with browser automation tools for optimal results.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon