Table of contents

HTTP Request Queuing Strategies for Large-Scale Scraping

When scaling web scraping operations to handle thousands or millions of requests, implementing effective HTTP request queuing strategies becomes crucial for maintaining performance, reliability, and respectful resource usage. This guide covers proven queuing strategies that help manage large-scale scraping operations efficiently.

Why Request Queuing Matters

Large-scale web scraping presents unique challenges:

  • Rate limiting: Websites implement rate limits to prevent abuse
  • Server overload: Too many concurrent requests can overwhelm target servers
  • IP blocking: Aggressive scraping patterns can trigger anti-bot measures
  • Resource management: Uncontrolled requests consume excessive bandwidth and memory
  • Data integrity: Proper queuing ensures reliable data collection

Core Queuing Strategies

1. FIFO (First In, First Out) Queue

The simplest approach where requests are processed in the order they're added:

import asyncio
from asyncio import Queue
import aiohttp

class FIFOScraper:
    def __init__(self, max_concurrent=10):
        self.queue = Queue()
        self.max_concurrent = max_concurrent
        self.session = None

    async def add_url(self, url):
        await self.queue.put(url)

    async def worker(self):
        while True:
            url = await self.queue.get()
            try:
                async with self.session.get(url) as response:
                    content = await response.text()
                    await self.process_content(url, content)
            except Exception as e:
                print(f"Error processing {url}: {e}")
            finally:
                self.queue.task_done()

    async def start_workers(self):
        self.session = aiohttp.ClientSession()
        workers = [asyncio.create_task(self.worker()) 
                  for _ in range(self.max_concurrent)]
        return workers

    async def process_content(self, url, content):
        # Process scraped content
        print(f"Processed {url}: {len(content)} characters")

2. Priority Queue System

Implement priority-based processing for more important requests:

import heapq
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class PriorityRequest:
    priority: int
    url: str
    metadata: dict = None

    def __lt__(self, other):
        return self.priority < other.priority

class PriorityQueueScraper:
    def __init__(self, max_concurrent=10):
        self.queue = asyncio.PriorityQueue()
        self.max_concurrent = max_concurrent
        self.session = None

    async def add_priority_request(self, url, priority=5, metadata=None):
        request = PriorityRequest(priority, url, metadata)
        await self.queue.put(request)

    async def worker(self):
        while True:
            request = await self.queue.get()
            try:
                async with self.session.get(request.url) as response:
                    content = await response.text()
                    await self.process_priority_content(request, content)
            except Exception as e:
                print(f"Error processing {request.url}: {e}")
            finally:
                self.queue.task_done()

    async def process_priority_content(self, request, content):
        print(f"Priority {request.priority}: {request.url}")

3. Rate-Limited Queue with Token Bucket

Control request rate using the token bucket algorithm:

class RateLimitedQueue {
    constructor(maxTokens = 10, refillRate = 1) {
        this.maxTokens = maxTokens;
        this.tokens = maxTokens;
        this.refillRate = refillRate;
        this.queue = [];
        this.processing = false;

        // Refill tokens periodically
        setInterval(() => {
            this.tokens = Math.min(this.maxTokens, this.tokens + this.refillRate);
            this.processQueue();
        }, 1000);
    }

    async addRequest(url, options = {}) {
        return new Promise((resolve, reject) => {
            this.queue.push({ url, options, resolve, reject });
            this.processQueue();
        });
    }

    async processQueue() {
        if (this.processing || this.queue.length === 0 || this.tokens <= 0) {
            return;
        }

        this.processing = true;

        while (this.queue.length > 0 && this.tokens > 0) {
            const request = this.queue.shift();
            this.tokens--;

            try {
                const response = await fetch(request.url, request.options);
                const content = await response.text();
                request.resolve({ url: request.url, content, status: response.status });
            } catch (error) {
                request.reject(error);
            }
        }

        this.processing = false;
    }
}

// Usage example
const rateLimitedScraper = new RateLimitedQueue(5, 2); // 5 tokens max, refill 2/second

async function scrapeUrls(urls) {
    const promises = urls.map(url => rateLimitedScraper.addRequest(url));
    const results = await Promise.allSettled(promises);
    return results;
}

4. Distributed Queue with Redis

Scale across multiple workers using Redis:

import redis
import json
import asyncio
import aiohttp
from typing import List, Optional

class RedisDistributedQueue:
    def __init__(self, redis_url="redis://localhost:6379", queue_name="scrape_queue"):
        self.redis_client = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.processing_set = f"{queue_name}:processing"
        self.failed_queue = f"{queue_name}:failed"

    def add_urls(self, urls: List[str], priority: int = 0):
        """Add URLs to the queue with optional priority"""
        pipeline = self.redis_client.pipeline()
        for url in urls:
            request_data = {"url": url, "priority": priority, "attempts": 0}
            pipeline.lpush(self.queue_name, json.dumps(request_data))
        pipeline.execute()

    def get_next_request(self, timeout: int = 10) -> Optional[dict]:
        """Get next request from queue with timeout"""
        result = self.redis_client.brpop(self.queue_name, timeout=timeout)
        if result:
            request_data = json.loads(result[1])
            # Move to processing set for failure recovery
            self.redis_client.sadd(self.processing_set, json.dumps(request_data))
            return request_data
        return None

    def complete_request(self, request_data: dict):
        """Mark request as completed"""
        self.redis_client.srem(self.processing_set, json.dumps(request_data))

    def fail_request(self, request_data: dict, max_retries: int = 3):
        """Handle failed request with retry logic"""
        request_data["attempts"] += 1

        if request_data["attempts"] < max_retries:
            # Retry the request
            self.redis_client.lpush(self.queue_name, json.dumps(request_data))
        else:
            # Move to failed queue
            self.redis_client.lpush(self.failed_queue, json.dumps(request_data))

        self.redis_client.srem(self.processing_set, json.dumps(request_data))

class DistributedWorker:
    def __init__(self, queue: RedisDistributedQueue, worker_id: str):
        self.queue = queue
        self.worker_id = worker_id
        self.session = None
        self.running = False

    async def start(self):
        """Start the worker"""
        self.session = aiohttp.ClientSession()
        self.running = True

        while self.running:
            request_data = self.queue.get_next_request()
            if request_data:
                await self.process_request(request_data)
            await asyncio.sleep(0.1)  # Prevent busy waiting

    async def process_request(self, request_data: dict):
        """Process a single request"""
        url = request_data["url"]

        try:
            async with self.session.get(url, timeout=30) as response:
                if response.status == 200:
                    content = await response.text()
                    await self.handle_success(url, content)
                    self.queue.complete_request(request_data)
                else:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status
                    )
        except Exception as e:
            print(f"Worker {self.worker_id} failed to process {url}: {e}")
            self.queue.fail_request(request_data)

    async def handle_success(self, url: str, content: str):
        """Handle successful scraping"""
        print(f"Worker {self.worker_id} processed {url}: {len(content)} chars")
        # Process and store content as needed

    async def stop(self):
        """Stop the worker"""
        self.running = False
        if self.session:
            await self.session.close()

Advanced Queuing Patterns

1. Circuit Breaker Pattern

Implement circuit breakers to handle failing endpoints:

import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func: Callable, *args, **kwargs) -> Any:
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e

    def _should_attempt_reset(self) -> bool:
        return (time.time() - self.last_failure_time) >= self.recovery_timeout

    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

class CircuitBreakerQueue:
    def __init__(self):
        self.circuit_breakers = {}
        self.queue = asyncio.Queue()

    def get_circuit_breaker(self, domain: str) -> CircuitBreaker:
        if domain not in self.circuit_breakers:
            self.circuit_breakers[domain] = CircuitBreaker()
        return self.circuit_breakers[domain]

    async def process_request(self, url: str):
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        circuit_breaker = self.get_circuit_breaker(domain)

        try:
            result = circuit_breaker.call(self.make_request, url)
            return result
        except Exception as e:
            print(f"Circuit breaker blocked request to {domain}: {e}")
            # Optionally re-queue for later
            await self.queue.put(url)

    def make_request(self, url: str):
        # Simulate HTTP request that might fail
        import requests
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.text

2. Adaptive Queue with Backpressure

Handle varying server loads with adaptive queuing:

class AdaptiveQueue {
    constructor(initialConcurrency = 5) {
        this.concurrency = initialConcurrency;
        this.maxConcurrency = 50;
        this.minConcurrency = 1;
        this.queue = [];
        this.activeRequests = 0;
        this.successCount = 0;
        self.errorCount = 0;
        this.lastAdjustment = Date.now();

        // Adjust concurrency every 10 seconds
        setInterval(() => this.adjustConcurrency(), 10000);
    }

    async addRequest(url, options = {}) {
        return new Promise((resolve, reject) => {
            this.queue.push({ url, options, resolve, reject });
            this.processQueue();
        });
    }

    async processQueue() {
        while (this.queue.length > 0 && this.activeRequests < this.concurrency) {
            const request = this.queue.shift();
            this.activeRequests++;

            this.makeRequest(request)
                .finally(() => {
                    this.activeRequests--;
                    this.processQueue();
                });
        }
    }

    async makeRequest(request) {
        const startTime = Date.now();

        try {
            const response = await fetch(request.url, {
                ...request.options,
                timeout: 30000
            });

            if (response.ok) {
                this.successCount++;
                const content = await response.text();
                request.resolve({ url: request.url, content, status: response.status });
            } else {
                throw new Error(`HTTP ${response.status}: ${response.statusText}`);
            }
        } catch (error) {
            this.errorCount++;
            request.reject(error);
        }
    }

    adjustConcurrency() {
        const totalRequests = this.successCount + this.errorCount;

        if (totalRequests === 0) return;

        const successRate = this.successCount / totalRequests;
        const avgResponseTime = this.calculateAverageResponseTime();

        if (successRate > 0.95 && avgResponseTime < 2000) {
            // High success rate and fast responses - increase concurrency
            this.concurrency = Math.min(this.maxConcurrency, this.concurrency + 2);
        } else if (successRate < 0.8 || avgResponseTime > 5000) {
            // Low success rate or slow responses - decrease concurrency
            this.concurrency = Math.max(this.minConcurrency, this.concurrency - 1);
        }

        console.log(`Adjusted concurrency to ${this.concurrency} (success rate: ${(successRate * 100).toFixed(1)}%)`);

        // Reset counters
        this.successCount = 0;
        this.errorCount = 0;
    }

    calculateAverageResponseTime() {
        // Implement response time tracking
        return 1000; // Placeholder
    }
}

Queue Monitoring and Metrics

Implement comprehensive monitoring for queue performance:

import time
import threading
from collections import deque, defaultdict
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class QueueMetrics:
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    avg_response_time: float = 0.0
    queue_size: int = 0
    active_workers: int = 0
    requests_per_second: float = 0.0

class MetricsCollector:
    def __init__(self, window_size=300):  # 5-minute window
        self.window_size = window_size
        self.response_times = deque(maxlen=1000)
        self.request_timestamps = deque(maxlen=1000)
        self.domain_stats = defaultdict(lambda: {'success': 0, 'failure': 0})
        self.lock = threading.Lock()

    def record_request(self, domain: str, response_time: float, success: bool):
        with self.lock:
            current_time = time.time()
            self.response_times.append(response_time)
            self.request_timestamps.append(current_time)

            if success:
                self.domain_stats[domain]['success'] += 1
            else:
                self.domain_stats[domain]['failure'] += 1

    def get_metrics(self, queue_size: int, active_workers: int) -> QueueMetrics:
        with self.lock:
            current_time = time.time()

            # Calculate requests per second
            recent_requests = [ts for ts in self.request_timestamps 
                             if current_time - ts <= 60]  # Last minute
            rps = len(recent_requests) / 60 if recent_requests else 0

            # Calculate average response time
            avg_response_time = (sum(self.response_times) / len(self.response_times) 
                               if self.response_times else 0)

            # Calculate success/failure counts
            total_success = sum(stats['success'] for stats in self.domain_stats.values())
            total_failure = sum(stats['failure'] for stats in self.domain_stats.values())

            return QueueMetrics(
                total_requests=total_success + total_failure,
                successful_requests=total_success,
                failed_requests=total_failure,
                avg_response_time=avg_response_time,
                queue_size=queue_size,
                active_workers=active_workers,
                requests_per_second=rps
            )

    def get_domain_stats(self) -> Dict[str, Dict[str, int]]:
        with self.lock:
            return dict(self.domain_stats)

Best Practices for Queue Implementation

1. Graceful Degradation

Implement fallback mechanisms when queues become overwhelmed:

class RobustQueue:
    def __init__(self, max_queue_size=10000):
        self.primary_queue = asyncio.Queue(maxsize=max_queue_size)
        self.overflow_queue = []
        self.metrics = MetricsCollector()

    async def add_request(self, url: str, priority: int = 0):
        try:
            # Try primary queue first
            self.primary_queue.put_nowait((priority, url))
        except asyncio.QueueFull:
            # Fall back to overflow handling
            if len(self.overflow_queue) < 5000:  # Limit overflow
                self.overflow_queue.append((priority, url))
            else:
                # Drop lowest priority requests
                self.overflow_queue.sort(key=lambda x: x[0])
                if priority > self.overflow_queue[0][0]:
                    self.overflow_queue[0] = (priority, url)
                    self.overflow_queue.sort(key=lambda x: x[0])

2. Request Deduplication

Prevent duplicate requests in large-scale operations:

import hashlib
from urllib.parse import urlparse, parse_qs, urlencode

class DeduplicatedQueue:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.seen_urls = set()
        self.url_fingerprints = set()

    def normalize_url(self, url: str) -> str:
        """Normalize URL for deduplication"""
        parsed = urlparse(url.lower())

        # Sort query parameters
        if parsed.query:
            params = parse_qs(parsed.query)
            sorted_params = urlencode(sorted(params.items()))
            normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_params}"
        else:
            normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"

        return normalized

    def get_url_fingerprint(self, url: str) -> str:
        """Generate fingerprint for similar URLs"""
        normalized = self.normalize_url(url)
        return hashlib.md5(normalized.encode()).hexdigest()

    async def add_request(self, url: str, allow_duplicates: bool = False):
        if not allow_duplicates:
            fingerprint = self.get_url_fingerprint(url)
            if fingerprint in self.url_fingerprints:
                return False  # Duplicate detected

            self.url_fingerprints.add(fingerprint)
            self.seen_urls.add(url)

        await self.queue.put(url)
        return True

Integration with Browser Automation

When combining queuing strategies with browser automation tools, consider the resource implications. For example, when running multiple pages in parallel with Puppeteer, you'll need to balance queue throughput with browser resource consumption.

For JavaScript-heavy sites that require full browser rendering, implementing proper timeout handling in Puppeteer becomes crucial for maintaining queue performance and preventing resource leaks.

Conclusion

Effective HTTP request queuing is fundamental to successful large-scale web scraping. The strategies outlined above provide a foundation for building robust, scalable scraping systems that respect server resources while maximizing data collection efficiency.

Key takeaways:

  • Choose the right strategy: FIFO for simplicity, priority queues for importance-based processing, rate limiting for respectful scraping
  • Monitor performance: Track metrics to optimize queue parameters and detect issues early
  • Handle failures gracefully: Implement retry logic, circuit breakers, and overflow handling
  • Scale horizontally: Use distributed queues with Redis or similar systems for multi-worker setups
  • Adapt dynamically: Adjust concurrency and rate limits based on server responses and performance metrics

By implementing these queuing strategies, you'll build scraping systems that can handle massive scale while maintaining reliability and performance.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon