Table of contents

How do I implement request batching for efficient web scraping?

Request batching is a crucial optimization technique that allows you to process multiple web scraping requests simultaneously, dramatically improving throughput while maintaining respect for server resources. This approach is essential for large-scale data extraction projects where sequential processing would be prohibitively slow.

Understanding Request Batching

Request batching involves grouping multiple HTTP requests together and executing them concurrently rather than sequentially. This technique leverages the idle time during network I/O operations to process other requests, resulting in significant performance improvements. However, effective batching requires careful consideration of rate limits, server capacity, and error handling.

Python Implementation with asyncio and aiohttp

Python's asyncio library provides excellent support for concurrent request processing. Here's a comprehensive implementation using aiohttp:

import asyncio
import aiohttp
from typing import List, Dict, Optional
import time
from dataclasses import dataclass

@dataclass
class BatchRequest:
    url: str
    method: str = 'GET'
    headers: Optional[Dict] = None
    data: Optional[Dict] = None

class RequestBatcher:
    def __init__(self, batch_size: int = 10, delay: float = 1.0):
        self.batch_size = batch_size
        self.delay = delay
        self.session = None

    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
        timeout = aiohttp.ClientTimeout(total=30)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def execute_request(self, request: BatchRequest) -> Dict:
        """Execute a single request with error handling"""
        try:
            async with self.session.request(
                request.method,
                request.url,
                headers=request.headers,
                json=request.data
            ) as response:
                return {
                    'url': request.url,
                    'status': response.status,
                    'data': await response.text(),
                    'headers': dict(response.headers),
                    'success': True
                }
        except Exception as e:
            return {
                'url': request.url,
                'error': str(e),
                'success': False
            }

    async def execute_batch(self, requests: List[BatchRequest]) -> List[Dict]:
        """Execute a batch of requests concurrently"""
        semaphore = asyncio.Semaphore(self.batch_size)

        async def limited_request(request):
            async with semaphore:
                result = await self.execute_request(request)
                await asyncio.sleep(self.delay / self.batch_size)
                return result

        tasks = [limited_request(req) for req in requests]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def process_requests(self, all_requests: List[BatchRequest]) -> List[Dict]:
        """Process all requests in batches"""
        results = []

        for i in range(0, len(all_requests), self.batch_size):
            batch = all_requests[i:i + self.batch_size]
            print(f"Processing batch {i // self.batch_size + 1}")

            batch_results = await self.execute_batch(batch)
            results.extend(batch_results)

            # Rate limiting between batches
            if i + self.batch_size < len(all_requests):
                await asyncio.sleep(self.delay)

        return results

# Usage example
async def main():
    urls = [
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3",
        # Add more URLs as needed
    ]

    requests = [BatchRequest(url) for url in urls]

    async with RequestBatcher(batch_size=5, delay=1.0) as batcher:
        results = await batcher.process_requests(requests)

        # Process results
        successful = [r for r in results if r.get('success')]
        failed = [r for r in results if not r.get('success')]

        print(f"Successful requests: {len(successful)}")
        print(f"Failed requests: {len(failed)}")

# Run the async function
if __name__ == "__main__":
    asyncio.run(main())

JavaScript Implementation with Promise-based Batching

JavaScript's Promise API and async/await syntax make it ideal for request batching. Here's a robust implementation:

class RequestBatcher {
    constructor(batchSize = 10, delayMs = 1000) {
        this.batchSize = batchSize;
        this.delayMs = delayMs;
    }

    async executeRequest(url, options = {}) {
        const controller = new AbortController();
        const timeoutId = setTimeout(() => controller.abort(), 30000);

        try {
            const response = await fetch(url, {
                ...options,
                signal: controller.signal
            });

            clearTimeout(timeoutId);

            return {
                url,
                status: response.status,
                data: await response.text(),
                headers: Object.fromEntries(response.headers.entries()),
                success: response.ok
            };
        } catch (error) {
            clearTimeout(timeoutId);
            return {
                url,
                error: error.message,
                success: false
            };
        }
    }

    async executeBatch(requests) {
        const promises = requests.map(async (request, index) => {
            // Stagger requests within the batch
            if (index > 0) {
                await this.delay(this.delayMs / this.batchSize * index);
            }
            return this.executeRequest(request.url, request.options);
        });

        return Promise.allSettled(promises);
    }

    async processRequests(allRequests) {
        const results = [];
        const batches = this.createBatches(allRequests);

        for (let i = 0; i < batches.length; i++) {
            console.log(`Processing batch ${i + 1} of ${batches.length}`);

            const batchResults = await this.executeBatch(batches[i]);
            const processedResults = batchResults.map(result => 
                result.status === 'fulfilled' ? result.value : {
                    error: result.reason,
                    success: false
                }
            );

            results.push(...processedResults);

            // Delay between batches
            if (i < batches.length - 1) {
                await this.delay(this.delayMs);
            }
        }

        return results;
    }

    createBatches(requests) {
        const batches = [];
        for (let i = 0; i < requests.length; i += this.batchSize) {
            batches.push(requests.slice(i, i + this.batchSize));
        }
        return batches;
    }

    delay(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
}

// Usage example
async function main() {
    const urls = [
        'https://api.example.com/data/1',
        'https://api.example.com/data/2',
        'https://api.example.com/data/3',
        // Add more URLs as needed
    ];

    const requests = urls.map(url => ({
        url,
        options: {
            method: 'GET',
            headers: {
                'User-Agent': 'Mozilla/5.0 (compatible; BatchScraper/1.0)'
            }
        }
    }));

    const batcher = new RequestBatcher(5, 1000);
    const results = await batcher.processRequests(requests);

    const successful = results.filter(r => r.success);
    const failed = results.filter(r => !r.success);

    console.log(`Successful requests: ${successful.length}`);
    console.log(`Failed requests: ${failed.length}`);
}

main().catch(console.error);

Swift Implementation with Alamofire

For iOS and macOS applications, Alamofire provides excellent support for concurrent request processing:

import Alamofire
import Foundation

class RequestBatcher {
    private let batchSize: Int
    private let delayBetweenBatches: TimeInterval
    private let session: Session
    private let queue: DispatchQueue

    init(batchSize: Int = 10, delayBetweenBatches: TimeInterval = 1.0) {
        self.batchSize = batchSize
        self.delayBetweenBatches = delayBetweenBatches

        let configuration = URLSessionConfiguration.default
        configuration.httpMaximumConnectionsPerHost = batchSize
        configuration.timeoutIntervalForRequest = 30

        self.session = Session(configuration: configuration)
        self.queue = DispatchQueue(label: "batch.scraper", qos: .utility)
    }

    func processRequests(urls: [String], completion: @escaping ([RequestResult]) -> Void) {
        let batches = urls.chunked(into: batchSize)
        var allResults: [RequestResult] = []

        let group = DispatchGroup()

        for (index, batch) in batches.enumerated() {
            group.enter()

            queue.asyncAfter(deadline: .now() + TimeInterval(index) * delayBetweenBatches) {
                self.executeBatch(urls: batch) { batchResults in
                    allResults.append(contentsOf: batchResults)
                    group.leave()
                }
            }
        }

        group.notify(queue: .main) {
            completion(allResults.sorted { $0.originalIndex < $1.originalIndex })
        }
    }

    private func executeBatch(urls: [String], completion: @escaping ([RequestResult]) -> Void) {
        let group = DispatchGroup()
        var results: [RequestResult] = []
        let resultsLock = NSLock()

        for (index, url) in urls.enumerated() {
            group.enter()

            session.request(url)
                .validate()
                .responseString(queue: queue) { response in
                    defer { group.leave() }

                    let result = RequestResult(
                        url: url,
                        originalIndex: index,
                        success: response.error == nil,
                        data: response.value,
                        error: response.error?.localizedDescription,
                        statusCode: response.response?.statusCode
                    )

                    resultsLock.lock()
                    results.append(result)
                    resultsLock.unlock()
                }
        }

        group.notify(queue: queue) {
            completion(results)
        }
    }
}

struct RequestResult {
    let url: String
    let originalIndex: Int
    let success: Bool
    let data: String?
    let error: String?
    let statusCode: Int?
}

extension Array {
    func chunked(into size: Int) -> [[Element]] {
        return stride(from: 0, to: count, by: size).map {
            Array(self[$0..<Swift.min($0 + size, count)])
        }
    }
}

// Usage example
let urls = [
    "https://api.example.com/data/1",
    "https://api.example.com/data/2",
    "https://api.example.com/data/3"
]

let batcher = RequestBatcher(batchSize: 5, delayBetweenBatches: 1.0)
batcher.processRequests(urls: urls) { results in
    let successful = results.filter { $0.success }
    let failed = results.filter { !$0.success }

    print("Successful requests: \(successful.count)")
    print("Failed requests: \(failed.count)")
}

Advanced Batching Strategies

Adaptive Batch Sizing

Implement dynamic batch size adjustment based on server response times and error rates:

class AdaptiveBatcher:
    def __init__(self, initial_batch_size: int = 10):
        self.batch_size = initial_batch_size
        self.success_rate_threshold = 0.8
        self.response_time_threshold = 5.0

    def adjust_batch_size(self, results: List[Dict], avg_response_time: float):
        success_rate = sum(1 for r in results if r.get('success', False)) / len(results)

        if success_rate < self.success_rate_threshold or avg_response_time > self.response_time_threshold:
            # Reduce batch size if performance degrades
            self.batch_size = max(1, int(self.batch_size * 0.8))
        elif success_rate > 0.95 and avg_response_time < 2.0:
            # Increase batch size if performance is excellent
            self.batch_size = min(50, int(self.batch_size * 1.2))

        return self.batch_size

Error Recovery and Retry Logic

Implement sophisticated retry mechanisms for failed requests:

async def retry_failed_requests(failed_results: List[Dict], max_retries: int = 3):
    retry_requests = []

    for result in failed_results:
        if result.get('retry_count', 0) < max_retries:
            retry_request = BatchRequest(
                url=result['url'],
                retry_count=result.get('retry_count', 0) + 1
            )
            retry_requests.append(retry_request)

    if retry_requests:
        # Exponential backoff for retries
        delay = 2 ** retry_requests[0].retry_count
        await asyncio.sleep(delay)

        async with RequestBatcher(batch_size=5, delay=2.0) as batcher:
            return await batcher.process_requests(retry_requests)

    return []

Best Practices and Considerations

Rate Limiting and Server Respect

Always implement proper rate limiting to avoid overwhelming target servers:

  • Respect robots.txt: Check and follow the website's robots.txt file
  • Implement exponential backoff: Increase delays after receiving rate limit responses
  • Monitor server response: Adjust batch size based on server performance indicators
  • Use appropriate delays: Allow sufficient time between requests to avoid being blocked

Memory Management

For large-scale scraping operations, implement streaming and memory-efficient processing:

async def stream_process_urls(url_generator, batch_size: int = 10):
    """Process URLs from a generator to avoid loading all URLs into memory"""
    batch = []

    async with RequestBatcher(batch_size=batch_size) as batcher:
        async for url in url_generator:
            batch.append(BatchRequest(url))

            if len(batch) >= batch_size:
                results = await batcher.execute_batch(batch)
                yield results
                batch = []

        # Process remaining URLs
        if batch:
            results = await batcher.execute_batch(batch)
            yield results

When working with complex single-page applications, consider using tools that can handle browser sessions effectively for more sophisticated scraping scenarios. Additionally, for applications requiring multiple concurrent browser instances, you might want to explore how to run multiple pages in parallel.

Performance Monitoring and Optimization

Implement comprehensive monitoring to track batch performance:

import time
from collections import defaultdict

class BatchMetrics:
    def __init__(self):
        self.request_times = []
        self.success_rates = defaultdict(list)
        self.batch_sizes = []

    def record_batch(self, batch_size: int, results: List[Dict], execution_time: float):
        self.batch_sizes.append(batch_size)
        self.request_times.append(execution_time)

        success_count = sum(1 for r in results if r.get('success', False))
        success_rate = success_count / len(results) if results else 0
        self.success_rates[batch_size].append(success_rate)

    def get_optimal_batch_size(self) -> int:
        """Calculate optimal batch size based on historical performance"""
        if not self.success_rates:
            return 10

        best_size = 10
        best_score = 0

        for size, rates in self.success_rates.items():
            avg_success_rate = sum(rates) / len(rates)
            # Score combines success rate with batch size efficiency
            score = avg_success_rate * min(size / 10, 1.0)

            if score > best_score:
                best_score = score
                best_size = size

        return best_size

Conclusion

Request batching is a powerful technique for optimizing web scraping performance. The key to successful implementation lies in balancing speed with server respect, implementing robust error handling, and continuously monitoring performance metrics. By using the patterns and code examples provided above, you can build scalable scraping solutions that efficiently process large volumes of data while maintaining good relationships with target websites.

Remember to always follow ethical scraping practices, respect rate limits, and consider the impact of your scraping activities on the target servers. Proper implementation of request batching can improve your scraping efficiency by 5-10x while maintaining reliability and server compatibility.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon