Table of contents

How do You Optimize API Request Performance for Web Scraping?

Optimizing API request performance is crucial for efficient web scraping operations. Whether you're building a custom scraper or working with third-party APIs, implementing the right performance optimization strategies can dramatically improve throughput, reduce latency, and minimize resource consumption.

Understanding API Performance Bottlenecks

Before diving into optimization techniques, it's important to identify common performance bottlenecks in web scraping:

  • Network latency: Time spent waiting for server responses
  • Connection overhead: Time to establish new connections
  • Sequential processing: Making requests one after another
  • Rate limiting: API throttling mechanisms
  • Memory consumption: Inefficient data handling
  • CPU utilization: Poor request scheduling and processing

Connection Pooling and Keep-Alive

Connection pooling is one of the most effective ways to improve API performance by reusing existing connections instead of creating new ones for each request.

Python Implementation with Requests

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

class OptimizedScraper:
    def __init__(self):
        self.session = requests.Session()

        # Configure connection pooling
        adapter = HTTPAdapter(
            pool_connections=20,  # Number of connection pools
            pool_maxsize=20,      # Max connections per pool
            max_retries=Retry(
                total=3,
                backoff_factor=0.3,
                status_forcelist=[500, 502, 503, 504]
            )
        )

        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)

        # Set common headers
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; WebScraper/1.0)',
            'Connection': 'keep-alive'
        })

    def fetch_data(self, url):
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error fetching {url}: {e}")
            return None

    def close(self):
        self.session.close()

# Usage
scraper = OptimizedScraper()
urls = ['https://api.example.com/data/1', 'https://api.example.com/data/2']

for url in urls:
    data = scraper.fetch_data(url)
    if data:
        print(f"Fetched {len(data)} items from {url}")

scraper.close()

JavaScript Implementation with Axios

const axios = require('axios');
const https = require('https');

class OptimizedScraper {
    constructor() {
        // Configure connection pooling
        const httpsAgent = new https.Agent({
            keepAlive: true,
            maxSockets: 20,
            maxFreeSockets: 10,
            timeout: 60000,
            freeSocketTimeout: 30000
        });

        this.client = axios.create({
            httpsAgent,
            timeout: 10000,
            headers: {
                'User-Agent': 'Mozilla/5.0 (compatible; WebScraper/1.0)',
                'Connection': 'keep-alive'
            }
        });

        // Add response interceptor for retries
        this.client.interceptors.response.use(
            response => response,
            error => {
                if (error.response?.status >= 500 && error.config?.retryCount < 3) {
                    error.config.retryCount = (error.config.retryCount || 0) + 1;
                    return new Promise(resolve => {
                        setTimeout(() => resolve(this.client(error.config)), 1000);
                    });
                }
                return Promise.reject(error);
            }
        );
    }

    async fetchData(url) {
        try {
            const response = await this.client.get(url);
            return response.data;
        } catch (error) {
            console.error(`Error fetching ${url}:`, error.message);
            return null;
        }
    }
}

// Usage
const scraper = new OptimizedScraper();
const urls = ['https://api.example.com/data/1', 'https://api.example.com/data/2'];

async function scrapeData() {
    const promises = urls.map(url => scraper.fetchData(url));
    const results = await Promise.all(promises);

    results.forEach((data, index) => {
        if (data) {
            console.log(`Fetched data from ${urls[index]}`);
        }
    });
}

scrapeData();

Asynchronous and Concurrent Processing

Implementing asynchronous request patterns allows you to make multiple API calls simultaneously, significantly reducing total execution time.

Python Async Implementation

import asyncio
import aiohttp
import time

class AsyncScraper:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_data(self, session, url):
        async with self.semaphore:  # Limit concurrent requests
            try:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        print(f"HTTP {response.status} for {url}")
                        return None
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None

    async def scrape_urls(self, urls):
        connector = aiohttp.TCPConnector(
            limit=100,           # Total connection pool size
            limit_per_host=30,   # Per-host connection limit
            ttl_dns_cache=300,   # DNS cache TTL
            use_dns_cache=True,
        )

        timeout = aiohttp.ClientTimeout(total=30, connect=10)

        async with aiohttp.ClientSession(
            connector=connector, 
            timeout=timeout,
            headers={'User-Agent': 'AsyncScraper/1.0'}
        ) as session:

            tasks = [self.fetch_data(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            return [r for r in results if r is not None and not isinstance(r, Exception)]

# Usage
async def main():
    urls = [f'https://api.example.com/data/{i}' for i in range(1, 101)]

    scraper = AsyncScraper(max_concurrent=20)

    start_time = time.time()
    results = await scraper.scrape_urls(urls)
    end_time = time.time()

    print(f"Scraped {len(results)} URLs in {end_time - start_time:.2f} seconds")

# Run the async scraper
asyncio.run(main())

Request Batching and Pagination

For APIs that support batch operations, grouping multiple requests can significantly reduce overhead and improve performance.

Batch Request Implementation

class BatchScraper:
    def __init__(self, batch_size=50):
        self.batch_size = batch_size
        self.session = requests.Session()

    def batch_requests(self, items):
        """Group items into batches for efficient processing"""
        for i in range(0, len(items), self.batch_size):
            yield items[i:i + self.batch_size]

    def fetch_batch_data(self, api_endpoint, item_ids):
        """Fetch data for multiple items in a single request"""
        payload = {
            'ids': item_ids,
            'batch': True
        }

        try:
            response = self.session.post(api_endpoint, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Batch request failed: {e}")
            return {}

    def scrape_with_batching(self, api_endpoint, all_item_ids):
        all_results = {}

        for batch_ids in self.batch_requests(all_item_ids):
            print(f"Processing batch of {len(batch_ids)} items...")

            batch_results = self.fetch_batch_data(api_endpoint, batch_ids)
            all_results.update(batch_results)

            # Add delay between batches to respect rate limits
            time.sleep(0.5)

        return all_results

# Usage
scraper = BatchScraper(batch_size=25)
item_ids = list(range(1, 1001))  # 1000 items

results = scraper.scrape_with_batching(
    'https://api.example.com/batch',
    item_ids
)

print(f"Retrieved data for {len(results)} items")

Intelligent Rate Limiting and Throttling

Implementing smart rate limiting prevents API blocking while maximizing throughput. This is particularly important when handling AJAX requests using Puppeteer or working with dynamic content.

Adaptive Rate Limiter

import time
from collections import deque
from threading import Lock

class AdaptiveRateLimiter:
    def __init__(self, initial_rate=10, window_size=60):
        self.rate = initial_rate  # requests per window
        self.window_size = window_size  # seconds
        self.requests = deque()
        self.lock = Lock()
        self.consecutive_errors = 0

    def wait_if_needed(self):
        with self.lock:
            now = time.time()

            # Remove old requests outside the window
            while self.requests and self.requests[0] <= now - self.window_size:
                self.requests.popleft()

            # Check if we need to wait
            if len(self.requests) >= self.rate:
                sleep_time = self.window_size - (now - self.requests[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)

            # Record this request
            self.requests.append(now)

    def on_success(self):
        """Called when a request succeeds"""
        self.consecutive_errors = 0
        # Gradually increase rate on success
        if self.rate < 50:  # Max rate limit
            self.rate = min(50, self.rate * 1.1)

    def on_error(self, status_code):
        """Called when a request fails"""
        self.consecutive_errors += 1

        # Reduce rate on rate limiting or server errors
        if status_code == 429 or status_code >= 500:
            self.rate = max(1, self.rate * 0.5)

        # Back off more aggressively on repeated errors
        if self.consecutive_errors > 3:
            self.rate = max(1, self.rate * 0.3)

class ThrottledScraper:
    def __init__(self):
        self.rate_limiter = AdaptiveRateLimiter()
        self.session = requests.Session()

    def fetch_with_throttling(self, url):
        self.rate_limiter.wait_if_needed()

        try:
            response = self.session.get(url)

            if response.status_code == 200:
                self.rate_limiter.on_success()
                return response.json()
            else:
                self.rate_limiter.on_error(response.status_code)
                return None

        except requests.exceptions.RequestException as e:
            self.rate_limiter.on_error(500)  # Treat as server error
            print(f"Request failed: {e}")
            return None

# Usage
scraper = ThrottledScraper()
urls = [f'https://api.example.com/item/{i}' for i in range(1, 100)]

for url in urls:
    data = scraper.fetch_with_throttling(url)
    if data:
        print(f"Successfully fetched: {url}")

Caching and Data Deduplication

Implementing intelligent caching reduces redundant requests and improves overall performance.

Redis-Based Caching

import redis
import json
import hashlib
from datetime import timedelta

class CachedScraper:
    def __init__(self, redis_host='localhost', redis_port=6379, cache_ttl=3600):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.cache_ttl = cache_ttl
        self.session = requests.Session()

    def _get_cache_key(self, url, params=None):
        """Generate a unique cache key for the request"""
        key_data = f"{url}:{json.dumps(params, sort_keys=True) if params else ''}"
        return hashlib.md5(key_data.encode()).hexdigest()

    def fetch_with_cache(self, url, params=None, force_refresh=False):
        cache_key = self._get_cache_key(url, params)

        # Try to get from cache first
        if not force_refresh:
            cached_data = self.redis_client.get(cache_key)
            if cached_data:
                print(f"Cache hit for {url}")
                return json.loads(cached_data)

        # Fetch from API
        try:
            response = self.session.get(url, params=params)
            response.raise_for_status()
            data = response.json()

            # Cache the response
            self.redis_client.setex(
                cache_key, 
                self.cache_ttl, 
                json.dumps(data)
            )

            print(f"Fetched and cached: {url}")
            return data

        except requests.exceptions.RequestException as e:
            print(f"Error fetching {url}: {e}")
            return None

    def invalidate_cache(self, url, params=None):
        """Manually invalidate cache for a specific request"""
        cache_key = self._get_cache_key(url, params)
        self.redis_client.delete(cache_key)

# Usage
scraper = CachedScraper(cache_ttl=1800)  # 30-minute cache

# First request will hit the API
data1 = scraper.fetch_with_cache('https://api.example.com/data/1')

# Second request will use cache
data2 = scraper.fetch_with_cache('https://api.example.com/data/1')

# Force refresh ignores cache
data3 = scraper.fetch_with_cache('https://api.example.com/data/1', force_refresh=True)

Monitoring and Performance Metrics

Tracking performance metrics helps identify bottlenecks and optimize your scraping operations effectively.

Performance Monitor

import time
import statistics
from collections import defaultdict, deque

class PerformanceMonitor:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.response_times = deque(maxlen=window_size)
        self.status_codes = defaultdict(int)
        self.error_count = 0
        self.total_requests = 0
        self.start_time = time.time()

    def record_request(self, response_time, status_code, success=True):
        self.response_times.append(response_time)
        self.status_codes[status_code] += 1
        self.total_requests += 1

        if not success:
            self.error_count += 1

    def get_stats(self):
        if not self.response_times:
            return {}

        total_time = time.time() - self.start_time

        return {
            'total_requests': self.total_requests,
            'requests_per_second': self.total_requests / total_time,
            'avg_response_time': statistics.mean(self.response_times),
            'median_response_time': statistics.median(self.response_times),
            'p95_response_time': statistics.quantiles(self.response_times, n=20)[18],  # 95th percentile
            'error_rate': (self.error_count / self.total_requests) * 100,
            'status_code_distribution': dict(self.status_codes),
            'uptime': total_time
        }

    def print_stats(self):
        stats = self.get_stats()
        if not stats:
            print("No requests recorded yet")
            return

        print("\n=== Performance Statistics ===")
        print(f"Total requests: {stats['total_requests']}")
        print(f"Requests/second: {stats['requests_per_second']:.2f}")
        print(f"Average response time: {stats['avg_response_time']:.3f}s")
        print(f"Median response time: {stats['median_response_time']:.3f}s")
        print(f"95th percentile: {stats['p95_response_time']:.3f}s")
        print(f"Error rate: {stats['error_rate']:.1f}%")
        print(f"Status codes: {stats['status_code_distribution']}")

class MonitoredScraper:
    def __init__(self):
        self.monitor = PerformanceMonitor()
        self.session = requests.Session()

    def fetch_with_monitoring(self, url):
        start_time = time.time()

        try:
            response = self.session.get(url)
            response_time = time.time() - start_time

            self.monitor.record_request(
                response_time, 
                response.status_code, 
                success=response.status_code < 400
            )

            if response.status_code == 200:
                return response.json()
            else:
                return None

        except requests.exceptions.RequestException as e:
            response_time = time.time() - start_time
            self.monitor.record_request(response_time, 0, success=False)
            print(f"Request failed: {e}")
            return None

# Usage
scraper = MonitoredScraper()
urls = [f'https://api.example.com/data/{i}' for i in range(1, 51)]

for i, url in enumerate(urls):
    data = scraper.fetch_with_monitoring(url)

    # Print stats every 10 requests
    if (i + 1) % 10 == 0:
        scraper.monitor.print_stats()

# Final statistics
scraper.monitor.print_stats()

Advanced Optimization Techniques

HTTP/2 Support

When available, HTTP/2 can provide significant performance improvements through multiplexing:

import httpx

class HTTP2Scraper:
    def __init__(self):
        self.client = httpx.AsyncClient(http2=True)

    async def fetch_multiple(self, urls):
        tasks = [self.client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)

        results = []
        for response in responses:
            if isinstance(response, httpx.Response) and response.status_code == 200:
                results.append(response.json())

        return results

    async def close(self):
        await self.client.aclose()

DNS Optimization

Configure DNS settings for better resolution performance:

import socket

# Configure DNS timeout and caching
socket.setdefaulttimeout(10)

# Use custom DNS servers for better performance
import dns.resolver
dns.resolver.default_resolver = dns.resolver.Resolver(configure=False)
dns.resolver.default_resolver.nameservers = ['8.8.8.8', '8.8.4.4']

Best Practices Summary

  1. Use connection pooling to minimize connection overhead
  2. Implement asynchronous processing for concurrent requests
  3. Batch requests when APIs support it
  4. Implement intelligent rate limiting to avoid blocks
  5. Cache responses to reduce redundant requests
  6. Monitor performance metrics to identify bottlenecks
  7. Handle errors gracefully with exponential backoff
  8. Use HTTP/2 when available for multiplexing benefits
  9. Optimize DNS resolution for faster lookups
  10. Configure timeouts appropriately to avoid hanging requests

When working with browser-based scraping tools, similar optimization principles apply. For instance, when running multiple pages in parallel with Puppeteer, you can apply concurrent processing patterns and connection management strategies.

By implementing these optimization techniques, you can achieve significant performance improvements in your web scraping operations, reducing execution time while maintaining reliability and respecting API rate limits. Remember to always test your optimizations with realistic workloads and monitor the impact on both performance and server response patterns.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon