How do I create custom pipelines in Scrapy?
Scrapy pipelines are essential components that process spider data after extraction. They enable you to clean, validate, store, and filter scraped items in a structured workflow. This comprehensive guide covers everything you need to know about creating custom pipelines in Scrapy.
Understanding Scrapy Pipelines
Pipelines in Scrapy are Python classes that process items yielded by spiders. Each pipeline receives an item, performs some action on it, and either returns the item for further processing or drops it entirely. Items flow through pipelines sequentially based on their priority order.
Basic Pipeline Structure
Every Scrapy pipeline must implement the process_item
method:
class BasicPipeline:
def process_item(self, item, spider):
# Process the item here
return item # Return item to continue processing
# or raise DropItem() to drop the item
Creating Your First Custom Pipeline
Let's start with a simple validation pipeline that ensures required fields are present:
# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class ValidationPipeline:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Check if required fields are present
required_fields = ['title', 'price', 'url']
for field in required_fields:
if not adapter.get(field):
raise DropItem(f"Missing {field} in {item}")
return item
Data Cleaning Pipeline
Here's a pipeline that cleans and normalizes data:
import re
from itemadapter import ItemAdapter
class CleaningPipeline:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Clean title field
if adapter.get('title'):
# Remove extra whitespace and newlines
adapter['title'] = re.sub(r'\s+', ' ', adapter['title']).strip()
# Clean price field
if adapter.get('price'):
# Extract numeric price from string
price_match = re.search(r'[\d,]+\.?\d*', adapter['price'])
if price_match:
adapter['price'] = float(price_match.group().replace(',', ''))
else:
adapter['price'] = 0.0
# Normalize URL
if adapter.get('url'):
adapter['url'] = adapter['url'].strip().lower()
return item
Advanced Pipeline Features
Pipeline with Open and Close Methods
Pipelines can implement additional methods for setup and cleanup:
import json
from itemadapter import ItemAdapter
class JsonWriterPipeline:
def open_spider(self, spider):
self.file = open('items.json', 'w')
self.file.write('[\n')
self.first_item = True
def close_spider(self, spider):
self.file.write('\n]')
self.file.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if not self.first_item:
self.file.write(',\n')
else:
self.first_item = False
line = json.dumps(dict(adapter), indent=2)
self.file.write(line)
return item
Database Storage Pipeline
Here's a pipeline that stores items in a database:
import sqlite3
from itemadapter import ItemAdapter
class DatabasePipeline:
def __init__(self, sqlite_db):
self.sqlite_db = sqlite_db
@classmethod
def from_crawler(cls, crawler):
db_settings = crawler.settings.getdict("DATABASE")
return cls(
sqlite_db=db_settings['database']
)
def open_spider(self, spider):
self.connection = sqlite3.connect(self.sqlite_db)
self.cursor = self.connection.cursor()
# Create table if it doesn't exist
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
price REAL,
url TEXT UNIQUE,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.connection.commit()
def close_spider(self, spider):
self.connection.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
try:
self.cursor.execute('''
INSERT OR REPLACE INTO items (title, price, url)
VALUES (?, ?, ?)
''', (
adapter.get('title'),
adapter.get('price'),
adapter.get('url')
))
self.connection.commit()
except sqlite3.Error as e:
spider.logger.error(f"Error inserting item: {e}")
return item
Conditional Processing Pipelines
Create pipelines that process items differently based on conditions:
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class ConditionalPipeline:
def __init__(self):
self.seen_urls = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Skip duplicate URLs
url = adapter.get('url')
if url in self.seen_urls:
raise DropItem(f"Duplicate item found: {url}")
else:
self.seen_urls.add(url)
# Process only items above certain price
if adapter.get('price', 0) < 10.0:
raise DropItem(f"Price too low: {adapter.get('price')}")
# Spider-specific processing
if spider.name == 'premium_spider':
adapter['is_premium'] = True
return item
Image Download Pipeline
For downloading and processing images:
import scrapy
from scrapy.pipelines.images import ImagesPipeline
from itemadapter import ItemAdapter
class CustomImagesPipeline(ImagesPipeline):
def get_media_requests(self, item, info):
adapter = ItemAdapter(item)
# Download images from image_urls field
for image_url in adapter.get('image_urls', []):
yield scrapy.Request(image_url)
def item_completed(self, results, item, info):
adapter = ItemAdapter(item)
# Filter out failed downloads
image_paths = [x['path'] for ok, x in results if ok]
adapter['image_paths'] = image_paths
if not image_paths:
adapter['has_images'] = False
else:
adapter['has_images'] = True
return item
Configuration and Settings
Pipeline Priority Configuration
Configure pipelines in your settings.py
:
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.ValidationPipeline': 300,
'myproject.pipelines.CleaningPipeline': 400,
'myproject.pipelines.ConditionalPipeline': 500,
'myproject.pipelines.DatabasePipeline': 600,
'myproject.pipelines.JsonWriterPipeline': 700,
}
# Database configuration
DATABASE = {
'database': 'scraped_data.db'
}
# Images pipeline configuration
IMAGES_STORE = 'images'
IMAGES_URLS_FIELD = 'image_urls'
IMAGES_RESULT_FIELD = 'images'
Environment-Specific Pipelines
Configure different pipelines for different environments:
# settings.py
import os
if os.getenv('ENVIRONMENT') == 'production':
ITEM_PIPELINES = {
'myproject.pipelines.ValidationPipeline': 300,
'myproject.pipelines.CleaningPipeline': 400,
'myproject.pipelines.DatabasePipeline': 500,
}
else:
ITEM_PIPELINES = {
'myproject.pipelines.ValidationPipeline': 300,
'myproject.pipelines.CleaningPipeline': 400,
'myproject.pipelines.JsonWriterPipeline': 500,
}
Error Handling and Logging
Implement robust error handling in your pipelines:
import logging
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class RobustPipeline:
def __init__(self):
self.logger = logging.getLogger(__name__)
def process_item(self, item, spider):
adapter = ItemAdapter(item)
try:
# Your processing logic here
self.validate_item(adapter)
self.transform_item(adapter)
except ValueError as e:
self.logger.warning(f"Validation error for item {adapter.get('url')}: {e}")
raise DropItem(f"Invalid item: {e}")
except Exception as e:
self.logger.error(f"Unexpected error processing item: {e}")
# Optionally re-raise or handle gracefully
return item
return item
def validate_item(self, adapter):
if not adapter.get('title'):
raise ValueError("Title is required")
def transform_item(self, adapter):
# Transform data here
pass
Testing Custom Pipelines
Create unit tests for your pipelines:
# test_pipelines.py
import unittest
from scrapy.exceptions import DropItem
from myproject.pipelines import ValidationPipeline
from myproject.items import MyItem
class TestValidationPipeline(unittest.TestCase):
def setUp(self):
self.pipeline = ValidationPipeline()
def test_valid_item_passes(self):
item = MyItem(title="Test", price=10.0, url="http://example.com")
result = self.pipeline.process_item(item, None)
self.assertEqual(result, item)
def test_missing_title_drops_item(self):
item = MyItem(price=10.0, url="http://example.com")
with self.assertRaises(DropItem):
self.pipeline.process_item(item, None)
if __name__ == '__main__':
unittest.main()
Running Your Pipeline
Test your pipeline with a simple spider:
# Run spider with custom pipelines
scrapy crawl myspider
# Run with specific settings
scrapy crawl myspider -s ITEM_PIPELINES='{"myproject.pipelines.ValidationPipeline": 300}'
# Debug pipeline processing
scrapy crawl myspider -L DEBUG
Best Practices
- Keep pipelines focused: Each pipeline should have a single responsibility
- Handle errors gracefully: Use proper exception handling and logging
- Use ItemAdapter: Always use ItemAdapter for item manipulation
- Configure priority wisely: Order pipelines from validation to storage
- Test thoroughly: Write unit tests for complex pipeline logic
- Monitor performance: Profile pipelines that process large volumes of data
Custom Scrapy pipelines provide powerful data processing capabilities that complement other web scraping approaches. For complex JavaScript-rendered content that might require additional processing, you might also consider handling JavaScript heavy websites with Selenium or implementing rate limiting in Scrapy for large-scale operations.
Whether you're building simple data validation or complex multi-stage processing workflows, understanding pipeline patterns will help you build more robust and maintainable scraping solutions.