Table of contents

What is the Most Efficient Way to Scrape Multiple Pages Concurrently in Python?

When scraping multiple web pages, processing them sequentially can be extremely slow and inefficient. Concurrent scraping allows you to process multiple pages simultaneously, dramatically reducing the total time required. Python offers several approaches for concurrent web scraping, each with its own advantages and use cases.

Understanding Concurrency vs Parallelism

Before diving into implementation details, it's important to understand the difference:

  • Concurrency: Multiple tasks make progress by switching between them (ideal for I/O-bound operations like web scraping)
  • Parallelism: Multiple tasks run simultaneously on different CPU cores (ideal for CPU-bound operations)

Web scraping is primarily I/O-bound, as most time is spent waiting for network responses, making concurrency the preferred approach.

Method 1: Asyncio with aiohttp (Recommended)

Asyncio is the most efficient approach for concurrent web scraping in Python. It uses a single thread with an event loop to handle multiple requests simultaneously.

Basic Implementation

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch_page(session, url):
    """Fetch a single page asynchronously"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                html = await response.text()
                return {'url': url, 'content': html, 'status': response.status}
            else:
                return {'url': url, 'content': None, 'status': response.status}
    except Exception as e:
        return {'url': url, 'content': None, 'error': str(e)}

async def scrape_multiple_pages(urls, max_concurrent=10):
    """Scrape multiple pages concurrently with rate limiting"""
    # Create a semaphore to limit concurrent requests
    semaphore = asyncio.Semaphore(max_concurrent)

    async def bounded_fetch(session, url):
        async with semaphore:
            return await fetch_page(session, url)

    # Configure session with connection pooling
    connector = aiohttp.TCPConnector(
        limit=100,  # Total connection pool size
        limit_per_host=30,  # Per-host connection limit
        ttl_dns_cache=300,  # DNS cache TTL
        use_dns_cache=True,
    )

    timeout = aiohttp.ClientTimeout(total=30, connect=10)

    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout,
        headers={'User-Agent': 'Mozilla/5.0 (compatible; bot)'}
    ) as session:
        # Create tasks for all URLs
        tasks = [bounded_fetch(session, url) for url in urls]

        # Execute all tasks concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

# Usage example
async def main():
    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3',
        # Add more URLs...
    ]

    start_time = time.time()
    results = await scrape_multiple_pages(urls, max_concurrent=5)
    end_time = time.time()

    print(f"Scraped {len(urls)} pages in {end_time - start_time:.2f} seconds")

    # Process results
    for result in results:
        if isinstance(result, dict) and result.get('content'):
            soup = BeautifulSoup(result['content'], 'html.parser')
            # Extract data from soup
            print(f"Successfully scraped: {result['url']}")

# Run the async function
if __name__ == "__main__":
    asyncio.run(main())

Advanced Asyncio Implementation with Data Processing

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import json
import csv
from typing import List, Dict, Any

class AsyncWebScraper:
    def __init__(self, max_concurrent=10, delay=0.1):
        self.max_concurrent = max_concurrent
        self.delay = delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []

    async def fetch_and_parse(self, session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
        """Fetch and parse a single page"""
        async with self.semaphore:
            try:
                # Add delay to respect rate limits
                await asyncio.sleep(self.delay)

                async with session.get(url) as response:
                    if response.status == 200:
                        html = await response.text()
                        return await self.parse_page(url, html)
                    else:
                        return {'url': url, 'error': f'HTTP {response.status}'}
            except Exception as e:
                return {'url': url, 'error': str(e)}

    async def parse_page(self, url: str, html: str) -> Dict[str, Any]:
        """Parse HTML content and extract data"""
        soup = BeautifulSoup(html, 'html.parser')

        # Example data extraction
        data = {
            'url': url,
            'title': soup.find('title').text.strip() if soup.find('title') else '',
            'meta_description': '',
            'headings': [],
            'links': [],
            'images': []
        }

        # Extract meta description
        meta_desc = soup.find('meta', attrs={'name': 'description'})
        if meta_desc:
            data['meta_description'] = meta_desc.get('content', '')

        # Extract headings
        for heading in soup.find_all(['h1', 'h2', 'h3']):
            data['headings'].append({
                'tag': heading.name,
                'text': heading.text.strip()
            })

        # Extract links
        for link in soup.find_all('a', href=True):
            data['links'].append({
                'text': link.text.strip(),
                'href': link['href']
            })

        # Extract images
        for img in soup.find_all('img', src=True):
            data['images'].append({
                'src': img['src'],
                'alt': img.get('alt', '')
            })

        return data

    async def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
        """Scrape multiple URLs concurrently"""
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=30,
            ttl_dns_cache=300
        )

        timeout = aiohttp.ClientTimeout(total=30)

        async with aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        ) as session:
            tasks = [self.fetch_and_parse(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)

        return [r for r in results if isinstance(r, dict)]

    def save_to_json(self, data: List[Dict[str, Any]], filename: str):
        """Save results to JSON file"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

    def save_to_csv(self, data: List[Dict[str, Any]], filename: str):
        """Save results to CSV file"""
        if not data:
            return

        fieldnames = ['url', 'title', 'meta_description', 'heading_count', 'link_count', 'image_count']

        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()

            for item in data:
                if 'error' not in item:
                    writer.writerow({
                        'url': item['url'],
                        'title': item['title'],
                        'meta_description': item['meta_description'],
                        'heading_count': len(item['headings']),
                        'link_count': len(item['links']),
                        'image_count': len(item['images'])
                    })

# Usage
async def main():
    scraper = AsyncWebScraper(max_concurrent=5, delay=0.2)

    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        # Add your URLs here
    ]

    results = await scraper.scrape_urls(urls)

    # Save results
    scraper.save_to_json(results, 'scraped_data.json')
    scraper.save_to_csv(results, 'scraped_data.csv')

    print(f"Successfully scraped {len(results)} pages")

asyncio.run(main())

Method 2: Threading with requests

For simpler use cases or when working with synchronous libraries, threading can be effective:

import threading
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class ThreadedScraper:
    def __init__(self, max_workers=10):
        self.max_workers = max_workers
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; scraper)'
        })

    def fetch_page(self, url):
        """Fetch a single page using requests"""
        try:
            response = self.session.get(url, timeout=30)
            if response.status_code == 200:
                return {'url': url, 'content': response.text, 'status': response.status_code}
            else:
                return {'url': url, 'content': None, 'status': response.status_code}
        except Exception as e:
            return {'url': url, 'content': None, 'error': str(e)}

    def scrape_urls(self, urls):
        """Scrape URLs using ThreadPoolExecutor"""
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_url = {executor.submit(self.fetch_page, url): url for url in urls}

            # Process completed tasks
            for future in as_completed(future_to_url):
                result = future.result()
                results.append(result)

                # Optional: process result immediately
                if result.get('content'):
                    soup = BeautifulSoup(result['content'], 'html.parser')
                    title = soup.find('title')
                    print(f"Scraped: {result['url']} - Title: {title.text if title else 'No title'}")

        return results

# Usage
scraper = ThreadedScraper(max_workers=5)
urls = ['https://example.com/page1', 'https://example.com/page2']
results = scraper.scrape_urls(urls)

Method 3: Multiprocessing for CPU-Intensive Tasks

When you need to perform heavy data processing on scraped content, multiprocessing can be beneficial:

import multiprocessing
import requests
from bs4 import BeautifulSoup
from multiprocessing import Pool
import time

def fetch_and_process(url):
    """Fetch and process a single URL"""
    try:
        response = requests.get(url, timeout=30)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')

            # CPU-intensive processing here
            data = {
                'url': url,
                'title': soup.find('title').text if soup.find('title') else '',
                'word_count': len(soup.get_text().split()),
                'link_count': len(soup.find_all('a')),
                'image_count': len(soup.find_all('img'))
            }
            return data
        else:
            return {'url': url, 'error': f'HTTP {response.status_code}'}
    except Exception as e:
        return {'url': url, 'error': str(e)}

def scrape_with_multiprocessing(urls, processes=4):
    """Scrape URLs using multiprocessing"""
    with Pool(processes=processes) as pool:
        results = pool.map(fetch_and_process, urls)
    return results

# Usage
if __name__ == "__main__":
    urls = ['https://example.com/page1', 'https://example.com/page2']
    results = scrape_with_multiprocessing(urls, processes=4)
    print(f"Processed {len(results)} pages")

Performance Optimization Tips

1. Connection Pooling and Reuse

# For aiohttp
connector = aiohttp.TCPConnector(
    limit=100,          # Total connections
    limit_per_host=30,  # Per-host limit
    keepalive_timeout=300,
    enable_cleanup_closed=True
)

# For requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=20,
    pool_maxsize=20,
    max_retries=3
)
session.mount('http://', adapter)
session.mount('https://', adapter)

2. Rate Limiting and Respect

import asyncio
from asyncio import Semaphore

class RateLimitedScraper:
    def __init__(self, rate_limit=5, time_window=1):
        self.rate_limit = rate_limit
        self.time_window = time_window
        self.requests = []

    async def wait_if_needed(self):
        now = time.time()
        # Remove old requests outside the time window
        self.requests = [req_time for req_time in self.requests if now - req_time < self.time_window]

        if len(self.requests) >= self.rate_limit:
            sleep_time = self.time_window - (now - self.requests[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.requests.append(now)

3. Error Handling and Retries

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def fetch_with_retry(session, url):
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.text()

Choosing the Right Method

  • Use asyncio/aiohttp for maximum efficiency with I/O-bound scraping tasks
  • Use threading for simpler implementations or when working with synchronous libraries
  • Use multiprocessing when heavy CPU processing is required on scraped data
  • Combine approaches for complex scenarios (e.g., asyncio for fetching, multiprocessing for data processing)

Best Practices for Concurrent Scraping

  1. Respect robots.txt and rate limits
  2. Use appropriate delays between requests
  3. Implement proper error handling and retries
  4. Monitor resource usage (memory, CPU, network)
  5. Use connection pooling to reduce overhead
  6. Set reasonable timeouts to avoid hanging requests
  7. Consider using proxies for large-scale scraping

For browser-based scraping scenarios requiring JavaScript execution, you might want to explore how to run multiple pages in parallel with Puppeteer as an alternative approach.

Concurrent scraping dramatically improves performance, but remember to be respectful of target websites and their resources. Always test your implementation with a small number of URLs first and gradually scale up while monitoring performance and error rates.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon