How do I implement parallel processing in Scrapy?
Scrapy is designed with concurrency and parallel processing in mind, offering several approaches to implement parallel execution for faster and more efficient web scraping. This guide covers various methods to leverage parallel processing in Scrapy, from basic concurrent requests to advanced distributed crawling setups.
Built-in Concurrent Requests
Scrapy's most fundamental form of parallel processing is its built-in support for concurrent HTTP requests. By default, Scrapy can handle multiple requests simultaneously using Twisted's asynchronous framework.
Configuring Concurrent Requests
You can control the level of concurrency through Scrapy settings:
# settings.py
CONCURRENT_REQUESTS = 32 # Default is 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8 # Default is 8
DOWNLOAD_DELAY = 1 # Delay between requests (seconds)
RANDOMIZE_DOWNLOAD_DELAY = 0.5 # Randomize delay (0.5 * to 1.5 * DOWNLOAD_DELAY)
Example Spider with Concurrent Processing
import scrapy
from scrapy import Request
class ParallelSpider(scrapy.Spider):
name = 'parallel_spider'
custom_settings = {
'CONCURRENT_REQUESTS': 64,
'CONCURRENT_REQUESTS_PER_DOMAIN': 16,
'DOWNLOAD_DELAY': 0.5,
}
def start_requests(self):
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
# Add more URLs
]
for url in urls:
yield Request(url=url, callback=self.parse)
def parse(self, response):
# Extract data
for item in response.css('div.item'):
yield {
'title': item.css('h2::text').get(),
'url': response.url,
}
# Follow pagination links concurrently
next_pages = response.css('a.next-page::attr(href)').getall()
for next_page in next_pages:
yield response.follow(next_page, callback=self.parse)
Multiple Spider Instances
Running multiple spider instances simultaneously can significantly improve throughput, especially when scraping different domains or handling different types of content.
Using Multiprocessing
import multiprocessing
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
def run_spider(spider_class, **kwargs):
"""Run a spider in a separate process"""
process = CrawlerProcess(get_project_settings())
process.crawl(spider_class, **kwargs)
process.start()
class ParallelSpiderRunner:
def __init__(self, spider_classes):
self.spider_classes = spider_classes
def run_parallel(self):
processes = []
for spider_class in self.spider_classes:
p = multiprocessing.Process(
target=run_spider,
args=(spider_class,)
)
p.start()
processes.append(p)
# Wait for all processes to complete
for p in processes:
p.join()
# Usage
if __name__ == '__main__':
from myproject.spiders import Spider1, Spider2, Spider3
runner = ParallelSpiderRunner([Spider1, Spider2, Spider3])
runner.run_parallel()
Using Threading for I/O Bound Tasks
import threading
from scrapy.crawler import CrawlerRunner
from twisted.internet import reactor, defer
class ThreadedSpiderRunner:
def __init__(self):
self.runner = CrawlerRunner()
@defer.inlineCallbacks
def run_spiders_parallel(self, spider_configs):
"""Run multiple spiders concurrently using threading"""
deferreds = []
for spider_class, spider_kwargs in spider_configs:
deferred = self.runner.crawl(spider_class, **spider_kwargs)
deferreds.append(deferred)
# Wait for all spiders to complete
yield defer.DeferredList(deferreds)
reactor.stop()
# Usage
def main():
runner = ThreadedSpiderRunner()
spider_configs = [
(ProductSpider, {'category': 'electronics'}),
(ProductSpider, {'category': 'books'}),
(ProductSpider, {'category': 'clothing'}),
]
runner.run_spiders_parallel(spider_configs)
reactor.run()
Distributed Crawling with Scrapy-Redis
For large-scale parallel processing, Scrapy-Redis enables distributed crawling across multiple machines or containers.
Installation and Setup
pip install scrapy-redis
Redis-Based Distributed Spider
import scrapy
from scrapy_redis.spiders import RedisSpider
class DistributedSpider(RedisSpider):
name = 'distributed_spider'
redis_key = 'distributed_spider:start_urls'
custom_settings = {
'SCHEDULER': 'scrapy_redis.scheduler.Scheduler',
'DUPEFILTER_CLASS': 'scrapy_redis.dupefilter.RFPDupeFilter',
'ITEM_PIPELINES': {
'scrapy_redis.pipelines.RedisPipeline': 300,
},
'REDIS_URL': 'redis://localhost:6379',
'CONCURRENT_REQUESTS': 100,
'CONCURRENT_REQUESTS_PER_DOMAIN': 20,
}
def parse(self, response):
# Extract items
for item in response.css('div.product'):
yield {
'name': item.css('h3::text').get(),
'price': item.css('.price::text').get(),
'url': response.url,
}
# Generate more URLs for the queue
for link in response.css('a.category-link::attr(href)').getall():
yield response.follow(link, callback=self.parse)
Redis Queue Management
import redis
import json
class RedisQueueManager:
def __init__(self, redis_url='redis://localhost:6379'):
self.redis_client = redis.from_url(redis_url)
def add_urls_to_queue(self, spider_name, urls):
"""Add URLs to Redis queue for distributed processing"""
queue_key = f"{spider_name}:start_urls"
for url in urls:
self.redis_client.lpush(queue_key, url)
def get_queue_size(self, spider_name):
"""Get current queue size"""
queue_key = f"{spider_name}:start_urls"
return self.redis_client.llen(queue_key)
def clear_queue(self, spider_name):
"""Clear the queue"""
queue_key = f"{spider_name}:start_urls"
self.redis_client.delete(queue_key)
# Usage
manager = RedisQueueManager()
urls = ['https://example.com/page{}'.format(i) for i in range(1, 1000)]
manager.add_urls_to_queue('distributed_spider', urls)
Advanced Parallel Processing Techniques
Custom Scheduler for Load Balancing
from scrapy.core.scheduler import Scheduler
from scrapy.utils.misc import load_object
import random
class LoadBalancingScheduler(Scheduler):
def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
logunser=False, stats=None, pqclass=None, crawler=None):
super().__init__(dupefilter, jobdir, dqclass, mqclass, logunser, stats, pqclass, crawler)
self.domain_queues = {}
def enqueue_request(self, request):
"""Distribute requests across domain-specific queues"""
domain = request.url.split('/')[2]
if domain not in self.domain_queues:
self.domain_queues[domain] = []
self.domain_queues[domain].append(request)
return super().enqueue_request(request)
def next_request(self):
"""Return next request using round-robin across domains"""
if not self.domain_queues:
return super().next_request()
# Simple round-robin selection
domain = random.choice(list(self.domain_queues.keys()))
if self.domain_queues[domain]:
return self.domain_queues[domain].pop(0)
return super().next_request()
Parallel Item Processing Pipeline
import multiprocessing
from multiprocessing import Queue, Process
from scrapy.exceptions import DropItem
class ParallelProcessingPipeline:
def __init__(self, process_count=4):
self.process_count = process_count
self.input_queue = Queue()
self.output_queue = Queue()
self.processes = []
def open_spider(self, spider):
"""Start worker processes when spider opens"""
for i in range(self.process_count):
p = Process(target=self.worker_process)
p.start()
self.processes.append(p)
def close_spider(self, spider):
"""Clean up processes when spider closes"""
# Send termination signals
for _ in range(self.process_count):
self.input_queue.put(None)
# Wait for processes to finish
for p in self.processes:
p.join()
def process_item(self, item, spider):
"""Send item to worker process for parallel processing"""
self.input_queue.put(item)
# Get processed result (non-blocking)
try:
processed_item = self.output_queue.get_nowait()
return processed_item
except:
return item
def worker_process(self):
"""Worker process for heavy item processing"""
while True:
item = self.input_queue.get()
if item is None: # Termination signal
break
# Perform heavy processing
processed_item = self.heavy_processing(item)
self.output_queue.put(processed_item)
def heavy_processing(self, item):
"""Example heavy processing function"""
# Simulate CPU-intensive work
import time
time.sleep(0.1)
# Add processed data
item['processed'] = True
item['processing_time'] = time.time()
return item
Performance Optimization Tips
Memory Management
# settings.py
# Limit memory usage for large-scale parallel processing
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 2048
MEMUSAGE_WARNING_MB = 1024
# Optimize autothrottling for dynamic concurrency
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 60
AUTOTHROTTLE_TARGET_CONCURRENCY = 8.0
Connection Pooling
# Custom downloader middleware for connection pooling
from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware
class OptimizedHTTPMiddleware(HttpCompressionMiddleware):
def __init__(self, stats=None):
super().__init__(stats)
self.pool_size = 100 # Increase connection pool size
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.stats)
Monitoring and Debugging
Parallel Processing Monitor
import time
import threading
from scrapy.statscollectors import MemoryStatsCollector
class ParallelProcessingMonitor:
def __init__(self, spider):
self.spider = spider
self.stats = spider.crawler.stats
self.monitoring = True
def start_monitoring(self):
"""Start monitoring thread"""
monitor_thread = threading.Thread(target=self.monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def monitor_loop(self):
"""Monitor parallel processing metrics"""
while self.monitoring:
stats_data = {
'requests_count': self.stats.get_value('downloader/request_count', 0),
'response_count': self.stats.get_value('downloader/response_count', 0),
'concurrent_requests': self.stats.get_value('downloader/request_count', 0) -
self.stats.get_value('downloader/response_count', 0),
'items_scraped': self.stats.get_value('item_scraped_count', 0),
}
self.spider.logger.info(f"Parallel processing stats: {stats_data}")
time.sleep(10) # Log every 10 seconds
def stop_monitoring(self):
self.monitoring = False
Command Line Usage
Running Multiple Spiders
# Run multiple spiders simultaneously
scrapy crawl spider1 &
scrapy crawl spider2 &
scrapy crawl spider3 &
# Wait for all to complete
wait
# Using GNU parallel for better control
parallel scrapy crawl {} ::: spider1 spider2 spider3
# Docker-based parallel processing
docker-compose up --scale spider=4
Monitoring Commands
# Monitor Scrapy stats in real-time
scrapy crawl myspider -s STATS_CLASS=scrapy.statscollectors.MemoryStatsCollector
# Monitor with custom logging
scrapy crawl myspider -L INFO --set LOG_FILE=spider.log
# Monitor Redis queue status
redis-cli llen myspider:start_urls
Best Practices for Parallel Processing
- Respect Rate Limits: Always configure appropriate delays and respect robots.txt
- Monitor Resource Usage: Keep track of CPU, memory, and network usage
- Handle Errors Gracefully: Implement proper error handling and retry logic
- Use Appropriate Concurrency: Too many concurrent requests can overwhelm servers
- Test Incrementally: Start with low concurrency and gradually increase
When implementing parallel processing in Scrapy, similar to how you handle multiple browser instances with Puppeteer, proper resource management and monitoring are crucial for maintaining performance and stability.
Integration with External Tools
Scrapy's parallel processing capabilities can be enhanced by integrating with external tools and services. For complex scenarios involving JavaScript-heavy sites, you might consider combining Scrapy with Selenium WebDriver to handle dynamic content while maintaining parallel execution.
By leveraging these parallel processing techniques, you can significantly improve your Scrapy scraping performance while maintaining reliability and respecting target websites' resources.