Table of contents

How to Implement API Request Queuing for High-Volume Scraping

When scraping at scale, managing API requests efficiently becomes crucial for maintaining performance, respecting rate limits, and ensuring system stability. Request queuing is a fundamental pattern that helps organize and control the flow of HTTP requests in high-volume scraping operations.

Understanding Request Queuing Fundamentals

Request queuing involves creating a buffer system between your scraping logic and the actual HTTP requests. This pattern allows you to:

  • Control request rate and timing
  • Handle failures gracefully with retry mechanisms
  • Prioritize certain requests over others
  • Monitor and limit concurrent connections
  • Implement backpressure when systems are overwhelmed

Basic Queue Implementation in Python

Here's a fundamental implementation using Python's asyncio and aiohttp:

import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum

class Priority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3

@dataclass
class QueuedRequest:
    url: str
    method: str = 'GET'
    headers: Dict[str, str] = None
    data: Any = None
    priority: Priority = Priority.NORMAL
    max_retries: int = 3
    retry_count: int = 0
    created_at: float = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()
        if self.headers is None:
            self.headers = {}

class RequestQueue:
    def __init__(self, max_concurrent=10, rate_limit=1.0):
        self.queue = asyncio.PriorityQueue()
        self.max_concurrent = max_concurrent
        self.rate_limit = rate_limit  # seconds between requests
        self.last_request_time = 0
        self.active_requests = 0
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def add_request(self, request: QueuedRequest):
        """Add a request to the queue with priority"""
        priority_value = -request.priority.value  # Negative for max heap behavior
        await self.queue.put((priority_value, time.time(), request))

    async def process_queue(self):
        """Main processing loop"""
        workers = [
            asyncio.create_task(self._worker(f"worker-{i}"))
            for i in range(self.max_concurrent)
        ]

        try:
            await asyncio.gather(*workers)
        except KeyboardInterrupt:
            for worker in workers:
                worker.cancel()

    async def _worker(self, worker_name: str):
        """Individual worker processing requests"""
        while True:
            try:
                # Get next request from queue
                _, _, request = await self.queue.get()

                # Rate limiting
                current_time = time.time()
                time_since_last = current_time - self.last_request_time
                if time_since_last < self.rate_limit:
                    await asyncio.sleep(self.rate_limit - time_since_last)

                self.last_request_time = time.time()
                self.active_requests += 1

                # Execute request
                success = await self._execute_request(request, worker_name)

                if not success and request.retry_count < request.max_retries:
                    # Retry with exponential backoff
                    request.retry_count += 1
                    await asyncio.sleep(2 ** request.retry_count)
                    await self.add_request(request)

                self.active_requests -= 1
                self.queue.task_done()

            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Worker {worker_name} error: {e}")
                self.active_requests = max(0, self.active_requests - 1)

    async def _execute_request(self, request: QueuedRequest, worker_name: str) -> bool:
        """Execute individual HTTP request"""
        try:
            async with self.session.request(
                request.method,
                request.url,
                headers=request.headers,
                data=request.data,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                if response.status == 200:
                    content = await response.text()
                    print(f"{worker_name}: Successfully processed {request.url}")
                    # Process your scraped content here
                    return True
                elif response.status == 429:  # Rate limited
                    print(f"{worker_name}: Rate limited for {request.url}")
                    return False
                else:
                    print(f"{worker_name}: HTTP {response.status} for {request.url}")
                    return False
        except Exception as e:
            print(f"{worker_name}: Request failed for {request.url}: {e}")
            return False

# Usage example
async def main():
    urls = [
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3",
    ]

    async with RequestQueue(max_concurrent=5, rate_limit=0.5) as queue:
        # Add requests to queue
        for url in urls:
            request = QueuedRequest(
                url=url,
                headers={"User-Agent": "MyBot/1.0"},
                priority=Priority.NORMAL
            )
            await queue.add_request(request)

        # Process all requests
        await queue.process_queue()

if __name__ == "__main__":
    asyncio.run(main())

JavaScript Implementation with Node.js

For JavaScript environments, here's an equivalent implementation:

const axios = require('axios');
const { EventEmitter } = require('events');

class Priority {
    static LOW = 1;
    static NORMAL = 2;
    static HIGH = 3;
}

class QueuedRequest {
    constructor(url, options = {}) {
        this.url = url;
        this.method = options.method || 'GET';
        this.headers = options.headers || {};
        this.data = options.data || null;
        this.priority = options.priority || Priority.NORMAL;
        this.maxRetries = options.maxRetries || 3;
        this.retryCount = 0;
        this.createdAt = Date.now();
    }
}

class RequestQueue extends EventEmitter {
    constructor(options = {}) {
        super();
        this.maxConcurrent = options.maxConcurrent || 10;
        this.rateLimit = options.rateLimit || 1000; // milliseconds
        this.queue = [];
        this.activeRequests = 0;
        this.lastRequestTime = 0;
        this.processing = false;
    }

    addRequest(request) {
        this.queue.push(request);
        this.queue.sort((a, b) => {
            // Sort by priority (higher first), then by creation time
            if (a.priority !== b.priority) {
                return b.priority - a.priority;
            }
            return a.createdAt - b.createdAt;
        });

        if (!this.processing) {
            this.processQueue();
        }
    }

    async processQueue() {
        this.processing = true;

        while (this.queue.length > 0 || this.activeRequests > 0) {
            // Wait if we've hit the concurrency limit
            if (this.activeRequests >= this.maxConcurrent) {
                await this.sleep(100);
                continue;
            }

            // Get next request
            const request = this.queue.shift();
            if (!request) {
                await this.sleep(100);
                continue;
            }

            // Rate limiting
            const now = Date.now();
            const timeSinceLastRequest = now - this.lastRequestTime;
            if (timeSinceLastRequest < this.rateLimit) {
                await this.sleep(this.rateLimit - timeSinceLastRequest);
            }

            this.lastRequestTime = Date.now();
            this.executeRequest(request);
        }

        this.processing = false;
        this.emit('queueComplete');
    }

    async executeRequest(request) {
        this.activeRequests++;

        try {
            const response = await axios({
                method: request.method,
                url: request.url,
                headers: request.headers,
                data: request.data,
                timeout: 30000
            });

            if (response.status === 200) {
                this.emit('requestSuccess', {
                    request,
                    response: response.data
                });
            } else {
                throw new Error(`HTTP ${response.status}`);
            }

        } catch (error) {
            console.error(`Request failed for ${request.url}:`, error.message);

            // Retry logic
            if (request.retryCount < request.maxRetries) {
                request.retryCount++;
                const backoffDelay = Math.pow(2, request.retryCount) * 1000;

                setTimeout(() => {
                    this.addRequest(request);
                }, backoffDelay);

                this.emit('requestRetry', { request, error });
            } else {
                this.emit('requestFailed', { request, error });
            }
        } finally {
            this.activeRequests--;
        }
    }

    sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }

    getStats() {
        return {
            queueLength: this.queue.length,
            activeRequests: this.activeRequests,
            processing: this.processing
        };
    }
}

// Usage example
const queue = new RequestQueue({
    maxConcurrent: 5,
    rateLimit: 500 // 500ms between requests
});

queue.on('requestSuccess', ({ request, response }) => {
    console.log(`✓ Success: ${request.url}`);
    // Process your scraped data here
});

queue.on('requestFailed', ({ request, error }) => {
    console.log(`✗ Failed: ${request.url} - ${error.message}`);
});

queue.on('queueComplete', () => {
    console.log('All requests processed!');
});

// Add requests
const urls = [
    'https://api.example.com/data/1',
    'https://api.example.com/data/2',
    'https://api.example.com/data/3'
];

urls.forEach(url => {
    const request = new QueuedRequest(url, {
        headers: { 'User-Agent': 'MyBot/1.0' },
        priority: Priority.NORMAL
    });
    queue.addRequest(request);
});

Advanced Features and Optimizations

Circuit Breaker Pattern

Implement circuit breakers to handle failing services gracefully:

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e

    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Database-Backed Queue System

For production environments, consider using database-backed queues:

import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class QueuedJob(Base):
    __tablename__ = 'queued_jobs'

    id = sa.Column(sa.Integer, primary_key=True)
    url = sa.Column(sa.String(2048), nullable=False)
    method = sa.Column(sa.String(10), default='GET')
    headers = sa.Column(sa.JSON)
    data = sa.Column(sa.JSON)
    priority = sa.Column(sa.Integer, default=2)
    max_retries = sa.Column(sa.Integer, default=3)
    retry_count = sa.Column(sa.Integer, default=0)
    status = sa.Column(sa.String(20), default='pending')
    created_at = sa.Column(sa.DateTime, default=sa.func.now())
    processed_at = sa.Column(sa.DateTime)
    error_message = sa.Column(sa.Text)

class DatabaseQueue:
    def __init__(self, database_url):
        self.engine = sa.create_engine(database_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def add_job(self, url, **kwargs):
        job = QueuedJob(url=url, **kwargs)
        self.session.add(job)
        self.session.commit()
        return job.id

    def get_next_job(self):
        return self.session.query(QueuedJob)\
            .filter(QueuedJob.status == 'pending')\
            .order_by(QueuedJob.priority.desc(), QueuedJob.created_at)\
            .first()

    def mark_completed(self, job_id):
        job = self.session.query(QueuedJob).get(job_id)
        if job:
            job.status = 'completed'
            job.processed_at = sa.func.now()
            self.session.commit()

    def mark_failed(self, job_id, error_message):
        job = self.session.query(QueuedJob).get(job_id)
        if job:
            job.status = 'failed'
            job.error_message = error_message
            job.processed_at = sa.func.now()
            self.session.commit()

Integration with Web Scraping Tools

When implementing request queuing for web scraping, consider integrating with tools that can handle dynamic content. For complex scenarios involving JavaScript-heavy sites, you might need to coordinate your queuing system with browser automation tools that can handle browser sessions effectively.

Monitoring and Observability

Implement comprehensive monitoring for your queue system:

import logging
import time
from dataclasses import dataclass
from typing import Dict

@dataclass
class QueueMetrics:
    requests_processed: int = 0
    requests_failed: int = 0
    requests_retried: int = 0
    average_response_time: float = 0.0
    queue_length: int = 0
    active_workers: int = 0

class MonitoredRequestQueue(RequestQueue):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = QueueMetrics()
        self.response_times = []

    async def _execute_request(self, request: QueuedRequest, worker_name: str) -> bool:
        start_time = time.time()

        try:
            success = await super()._execute_request(request, worker_name)

            # Update metrics
            response_time = time.time() - start_time
            self.response_times.append(response_time)

            if success:
                self.metrics.requests_processed += 1
            else:
                self.metrics.requests_failed += 1
                if request.retry_count > 0:
                    self.metrics.requests_retried += 1

            # Calculate rolling average
            if len(self.response_times) > 100:
                self.response_times = self.response_times[-100:]

            self.metrics.average_response_time = sum(self.response_times) / len(self.response_times)
            self.metrics.queue_length = self.queue.qsize()
            self.metrics.active_workers = self.active_requests

            return success

        except Exception as e:
            logging.error(f"Request execution failed: {e}")
            self.metrics.requests_failed += 1
            return False

    def get_metrics(self) -> Dict:
        return {
            'requests_processed': self.metrics.requests_processed,
            'requests_failed': self.metrics.requests_failed,
            'requests_retried': self.metrics.requests_retried,
            'success_rate': self.metrics.requests_processed / max(1, self.metrics.requests_processed + self.metrics.requests_failed),
            'average_response_time': self.metrics.average_response_time,
            'queue_length': self.metrics.queue_length,
            'active_workers': self.metrics.active_workers
        }

Best Practices and Considerations

Rate Limiting Strategies

  • Fixed Window: Simple but can cause traffic bursts
  • Sliding Window: More sophisticated, smoother traffic distribution
  • Token Bucket: Allows bursts while maintaining average rate
  • Adaptive Rate Limiting: Adjusts based on server responses

Error Handling

  • Implement exponential backoff for retries
  • Use different retry strategies for different error types
  • Set maximum retry limits to prevent infinite loops
  • Log errors with sufficient context for debugging

Resource Management

  • Monitor memory usage, especially for large queues
  • Implement queue size limits to prevent memory exhaustion
  • Use connection pooling to reduce overhead
  • Consider disk-based queues for very large datasets

Security Considerations

  • Validate and sanitize all URLs before processing
  • Implement proper authentication and authorization
  • Use HTTPS for sensitive data transmission
  • Rate limit per API key or user to prevent abuse

Conclusion

Implementing effective API request queuing is essential for high-volume web scraping operations. The patterns and implementations shown here provide a solid foundation that can be adapted to specific requirements. Remember to always respect website terms of service, implement proper rate limiting, and monitor your systems for optimal performance.

For scenarios requiring more complex browser interactions, you might also need to consider how your queuing system integrates with parallel page processing to maximize efficiency while maintaining stability.

The key to successful high-volume scraping lies in balancing speed, reliability, and respectful resource usage. Start with simple implementations and gradually add complexity as your requirements grow.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon