Table of contents

How do I implement parallel processing in Scrapy?

Scrapy is designed with concurrency and parallel processing in mind, offering several approaches to implement parallel execution for faster and more efficient web scraping. This guide covers various methods to leverage parallel processing in Scrapy, from basic concurrent requests to advanced distributed crawling setups.

Built-in Concurrent Requests

Scrapy's most fundamental form of parallel processing is its built-in support for concurrent HTTP requests. By default, Scrapy can handle multiple requests simultaneously using Twisted's asynchronous framework.

Configuring Concurrent Requests

You can control the level of concurrency through Scrapy settings:

# settings.py
CONCURRENT_REQUESTS = 32  # Default is 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8  # Default is 8
DOWNLOAD_DELAY = 1  # Delay between requests (seconds)
RANDOMIZE_DOWNLOAD_DELAY = 0.5  # Randomize delay (0.5 * to 1.5 * DOWNLOAD_DELAY)

Example Spider with Concurrent Processing

import scrapy
from scrapy import Request

class ParallelSpider(scrapy.Spider):
    name = 'parallel_spider'

    custom_settings = {
        'CONCURRENT_REQUESTS': 64,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 16,
        'DOWNLOAD_DELAY': 0.5,
    }

    def start_requests(self):
        urls = [
            'https://example.com/page1',
            'https://example.com/page2',
            'https://example.com/page3',
            # Add more URLs
        ]

        for url in urls:
            yield Request(url=url, callback=self.parse)

    def parse(self, response):
        # Extract data
        for item in response.css('div.item'):
            yield {
                'title': item.css('h2::text').get(),
                'url': response.url,
            }

        # Follow pagination links concurrently
        next_pages = response.css('a.next-page::attr(href)').getall()
        for next_page in next_pages:
            yield response.follow(next_page, callback=self.parse)

Multiple Spider Instances

Running multiple spider instances simultaneously can significantly improve throughput, especially when scraping different domains or handling different types of content.

Using Multiprocessing

import multiprocessing
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

def run_spider(spider_class, **kwargs):
    """Run a spider in a separate process"""
    process = CrawlerProcess(get_project_settings())
    process.crawl(spider_class, **kwargs)
    process.start()

class ParallelSpiderRunner:
    def __init__(self, spider_classes):
        self.spider_classes = spider_classes

    def run_parallel(self):
        processes = []

        for spider_class in self.spider_classes:
            p = multiprocessing.Process(
                target=run_spider,
                args=(spider_class,)
            )
            p.start()
            processes.append(p)

        # Wait for all processes to complete
        for p in processes:
            p.join()

# Usage
if __name__ == '__main__':
    from myproject.spiders import Spider1, Spider2, Spider3

    runner = ParallelSpiderRunner([Spider1, Spider2, Spider3])
    runner.run_parallel()

Using Threading for I/O Bound Tasks

import threading
from scrapy.crawler import CrawlerRunner
from twisted.internet import reactor, defer

class ThreadedSpiderRunner:
    def __init__(self):
        self.runner = CrawlerRunner()

    @defer.inlineCallbacks
    def run_spiders_parallel(self, spider_configs):
        """Run multiple spiders concurrently using threading"""
        deferreds = []

        for spider_class, spider_kwargs in spider_configs:
            deferred = self.runner.crawl(spider_class, **spider_kwargs)
            deferreds.append(deferred)

        # Wait for all spiders to complete
        yield defer.DeferredList(deferreds)
        reactor.stop()

# Usage
def main():
    runner = ThreadedSpiderRunner()

    spider_configs = [
        (ProductSpider, {'category': 'electronics'}),
        (ProductSpider, {'category': 'books'}),
        (ProductSpider, {'category': 'clothing'}),
    ]

    runner.run_spiders_parallel(spider_configs)
    reactor.run()

Distributed Crawling with Scrapy-Redis

For large-scale parallel processing, Scrapy-Redis enables distributed crawling across multiple machines or containers.

Installation and Setup

pip install scrapy-redis

Redis-Based Distributed Spider

import scrapy
from scrapy_redis.spiders import RedisSpider

class DistributedSpider(RedisSpider):
    name = 'distributed_spider'
    redis_key = 'distributed_spider:start_urls'

    custom_settings = {
        'SCHEDULER': 'scrapy_redis.scheduler.Scheduler',
        'DUPEFILTER_CLASS': 'scrapy_redis.dupefilter.RFPDupeFilter',
        'ITEM_PIPELINES': {
            'scrapy_redis.pipelines.RedisPipeline': 300,
        },
        'REDIS_URL': 'redis://localhost:6379',
        'CONCURRENT_REQUESTS': 100,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 20,
    }

    def parse(self, response):
        # Extract items
        for item in response.css('div.product'):
            yield {
                'name': item.css('h3::text').get(),
                'price': item.css('.price::text').get(),
                'url': response.url,
            }

        # Generate more URLs for the queue
        for link in response.css('a.category-link::attr(href)').getall():
            yield response.follow(link, callback=self.parse)

Redis Queue Management

import redis
import json

class RedisQueueManager:
    def __init__(self, redis_url='redis://localhost:6379'):
        self.redis_client = redis.from_url(redis_url)

    def add_urls_to_queue(self, spider_name, urls):
        """Add URLs to Redis queue for distributed processing"""
        queue_key = f"{spider_name}:start_urls"

        for url in urls:
            self.redis_client.lpush(queue_key, url)

    def get_queue_size(self, spider_name):
        """Get current queue size"""
        queue_key = f"{spider_name}:start_urls"
        return self.redis_client.llen(queue_key)

    def clear_queue(self, spider_name):
        """Clear the queue"""
        queue_key = f"{spider_name}:start_urls"
        self.redis_client.delete(queue_key)

# Usage
manager = RedisQueueManager()
urls = ['https://example.com/page{}'.format(i) for i in range(1, 1000)]
manager.add_urls_to_queue('distributed_spider', urls)

Advanced Parallel Processing Techniques

Custom Scheduler for Load Balancing

from scrapy.core.scheduler import Scheduler
from scrapy.utils.misc import load_object
import random

class LoadBalancingScheduler(Scheduler):
    def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
                 logunser=False, stats=None, pqclass=None, crawler=None):
        super().__init__(dupefilter, jobdir, dqclass, mqclass, logunser, stats, pqclass, crawler)
        self.domain_queues = {}

    def enqueue_request(self, request):
        """Distribute requests across domain-specific queues"""
        domain = request.url.split('/')[2]

        if domain not in self.domain_queues:
            self.domain_queues[domain] = []

        self.domain_queues[domain].append(request)
        return super().enqueue_request(request)

    def next_request(self):
        """Return next request using round-robin across domains"""
        if not self.domain_queues:
            return super().next_request()

        # Simple round-robin selection
        domain = random.choice(list(self.domain_queues.keys()))
        if self.domain_queues[domain]:
            return self.domain_queues[domain].pop(0)

        return super().next_request()

Parallel Item Processing Pipeline

import multiprocessing
from multiprocessing import Queue, Process
from scrapy.exceptions import DropItem

class ParallelProcessingPipeline:
    def __init__(self, process_count=4):
        self.process_count = process_count
        self.input_queue = Queue()
        self.output_queue = Queue()
        self.processes = []

    def open_spider(self, spider):
        """Start worker processes when spider opens"""
        for i in range(self.process_count):
            p = Process(target=self.worker_process)
            p.start()
            self.processes.append(p)

    def close_spider(self, spider):
        """Clean up processes when spider closes"""
        # Send termination signals
        for _ in range(self.process_count):
            self.input_queue.put(None)

        # Wait for processes to finish
        for p in self.processes:
            p.join()

    def process_item(self, item, spider):
        """Send item to worker process for parallel processing"""
        self.input_queue.put(item)

        # Get processed result (non-blocking)
        try:
            processed_item = self.output_queue.get_nowait()
            return processed_item
        except:
            return item

    def worker_process(self):
        """Worker process for heavy item processing"""
        while True:
            item = self.input_queue.get()

            if item is None:  # Termination signal
                break

            # Perform heavy processing
            processed_item = self.heavy_processing(item)
            self.output_queue.put(processed_item)

    def heavy_processing(self, item):
        """Example heavy processing function"""
        # Simulate CPU-intensive work
        import time
        time.sleep(0.1)

        # Add processed data
        item['processed'] = True
        item['processing_time'] = time.time()

        return item

Performance Optimization Tips

Memory Management

# settings.py
# Limit memory usage for large-scale parallel processing
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 2048
MEMUSAGE_WARNING_MB = 1024

# Optimize autothrottling for dynamic concurrency
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 60
AUTOTHROTTLE_TARGET_CONCURRENCY = 8.0

Connection Pooling

# Custom downloader middleware for connection pooling
from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware

class OptimizedHTTPMiddleware(HttpCompressionMiddleware):
    def __init__(self, stats=None):
        super().__init__(stats)
        self.pool_size = 100  # Increase connection pool size

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.stats)

Monitoring and Debugging

Parallel Processing Monitor

import time
import threading
from scrapy.statscollectors import MemoryStatsCollector

class ParallelProcessingMonitor:
    def __init__(self, spider):
        self.spider = spider
        self.stats = spider.crawler.stats
        self.monitoring = True

    def start_monitoring(self):
        """Start monitoring thread"""
        monitor_thread = threading.Thread(target=self.monitor_loop)
        monitor_thread.daemon = True
        monitor_thread.start()

    def monitor_loop(self):
        """Monitor parallel processing metrics"""
        while self.monitoring:
            stats_data = {
                'requests_count': self.stats.get_value('downloader/request_count', 0),
                'response_count': self.stats.get_value('downloader/response_count', 0),
                'concurrent_requests': self.stats.get_value('downloader/request_count', 0) - 
                                     self.stats.get_value('downloader/response_count', 0),
                'items_scraped': self.stats.get_value('item_scraped_count', 0),
            }

            self.spider.logger.info(f"Parallel processing stats: {stats_data}")
            time.sleep(10)  # Log every 10 seconds

    def stop_monitoring(self):
        self.monitoring = False

Command Line Usage

Running Multiple Spiders

# Run multiple spiders simultaneously
scrapy crawl spider1 &
scrapy crawl spider2 &
scrapy crawl spider3 &

# Wait for all to complete
wait

# Using GNU parallel for better control
parallel scrapy crawl {} ::: spider1 spider2 spider3

# Docker-based parallel processing
docker-compose up --scale spider=4

Monitoring Commands

# Monitor Scrapy stats in real-time
scrapy crawl myspider -s STATS_CLASS=scrapy.statscollectors.MemoryStatsCollector

# Monitor with custom logging
scrapy crawl myspider -L INFO --set LOG_FILE=spider.log

# Monitor Redis queue status
redis-cli llen myspider:start_urls

Best Practices for Parallel Processing

  1. Respect Rate Limits: Always configure appropriate delays and respect robots.txt
  2. Monitor Resource Usage: Keep track of CPU, memory, and network usage
  3. Handle Errors Gracefully: Implement proper error handling and retry logic
  4. Use Appropriate Concurrency: Too many concurrent requests can overwhelm servers
  5. Test Incrementally: Start with low concurrency and gradually increase

When implementing parallel processing in Scrapy, similar to how you handle multiple browser instances with Puppeteer, proper resource management and monitoring are crucial for maintaining performance and stability.

Integration with External Tools

Scrapy's parallel processing capabilities can be enhanced by integrating with external tools and services. For complex scenarios involving JavaScript-heavy sites, you might consider combining Scrapy with Selenium WebDriver to handle dynamic content while maintaining parallel execution.

By leveraging these parallel processing techniques, you can significantly improve your Scrapy scraping performance while maintaining reliability and respecting target websites' resources.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon