Table of contents

How do I create custom pipelines in Scrapy?

Scrapy pipelines are essential components that process spider data after extraction. They enable you to clean, validate, store, and filter scraped items in a structured workflow. This comprehensive guide covers everything you need to know about creating custom pipelines in Scrapy.

Understanding Scrapy Pipelines

Pipelines in Scrapy are Python classes that process items yielded by spiders. Each pipeline receives an item, performs some action on it, and either returns the item for further processing or drops it entirely. Items flow through pipelines sequentially based on their priority order.

Basic Pipeline Structure

Every Scrapy pipeline must implement the process_item method:

class BasicPipeline:
    def process_item(self, item, spider):
        # Process the item here
        return item  # Return item to continue processing
        # or raise DropItem() to drop the item

Creating Your First Custom Pipeline

Let's start with a simple validation pipeline that ensures required fields are present:

# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class ValidationPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Check if required fields are present
        required_fields = ['title', 'price', 'url']
        for field in required_fields:
            if not adapter.get(field):
                raise DropItem(f"Missing {field} in {item}")

        return item

Data Cleaning Pipeline

Here's a pipeline that cleans and normalizes data:

import re
from itemadapter import ItemAdapter

class CleaningPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Clean title field
        if adapter.get('title'):
            # Remove extra whitespace and newlines
            adapter['title'] = re.sub(r'\s+', ' ', adapter['title']).strip()

        # Clean price field
        if adapter.get('price'):
            # Extract numeric price from string
            price_match = re.search(r'[\d,]+\.?\d*', adapter['price'])
            if price_match:
                adapter['price'] = float(price_match.group().replace(',', ''))
            else:
                adapter['price'] = 0.0

        # Normalize URL
        if adapter.get('url'):
            adapter['url'] = adapter['url'].strip().lower()

        return item

Advanced Pipeline Features

Pipeline with Open and Close Methods

Pipelines can implement additional methods for setup and cleanup:

import json
from itemadapter import ItemAdapter

class JsonWriterPipeline:
    def open_spider(self, spider):
        self.file = open('items.json', 'w')
        self.file.write('[\n')
        self.first_item = True

    def close_spider(self, spider):
        self.file.write('\n]')
        self.file.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        if not self.first_item:
            self.file.write(',\n')
        else:
            self.first_item = False

        line = json.dumps(dict(adapter), indent=2)
        self.file.write(line)
        return item

Database Storage Pipeline

Here's a pipeline that stores items in a database:

import sqlite3
from itemadapter import ItemAdapter

class DatabasePipeline:
    def __init__(self, sqlite_db):
        self.sqlite_db = sqlite_db

    @classmethod
    def from_crawler(cls, crawler):
        db_settings = crawler.settings.getdict("DATABASE")
        return cls(
            sqlite_db=db_settings['database']
        )

    def open_spider(self, spider):
        self.connection = sqlite3.connect(self.sqlite_db)
        self.cursor = self.connection.cursor()

        # Create table if it doesn't exist
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT,
                price REAL,
                url TEXT UNIQUE,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.connection.commit()

    def close_spider(self, spider):
        self.connection.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        try:
            self.cursor.execute('''
                INSERT OR REPLACE INTO items (title, price, url)
                VALUES (?, ?, ?)
            ''', (
                adapter.get('title'),
                adapter.get('price'),
                adapter.get('url')
            ))
            self.connection.commit()
        except sqlite3.Error as e:
            spider.logger.error(f"Error inserting item: {e}")

        return item

Conditional Processing Pipelines

Create pipelines that process items differently based on conditions:

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class ConditionalPipeline:
    def __init__(self):
        self.seen_urls = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Skip duplicate URLs
        url = adapter.get('url')
        if url in self.seen_urls:
            raise DropItem(f"Duplicate item found: {url}")
        else:
            self.seen_urls.add(url)

        # Process only items above certain price
        if adapter.get('price', 0) < 10.0:
            raise DropItem(f"Price too low: {adapter.get('price')}")

        # Spider-specific processing
        if spider.name == 'premium_spider':
            adapter['is_premium'] = True

        return item

Image Download Pipeline

For downloading and processing images:

import scrapy
from scrapy.pipelines.images import ImagesPipeline
from itemadapter import ItemAdapter

class CustomImagesPipeline(ImagesPipeline):
    def get_media_requests(self, item, info):
        adapter = ItemAdapter(item)

        # Download images from image_urls field
        for image_url in adapter.get('image_urls', []):
            yield scrapy.Request(image_url)

    def item_completed(self, results, item, info):
        adapter = ItemAdapter(item)

        # Filter out failed downloads
        image_paths = [x['path'] for ok, x in results if ok]
        adapter['image_paths'] = image_paths

        if not image_paths:
            adapter['has_images'] = False
        else:
            adapter['has_images'] = True

        return item

Configuration and Settings

Pipeline Priority Configuration

Configure pipelines in your settings.py:

# settings.py
ITEM_PIPELINES = {
    'myproject.pipelines.ValidationPipeline': 300,
    'myproject.pipelines.CleaningPipeline': 400,
    'myproject.pipelines.ConditionalPipeline': 500,
    'myproject.pipelines.DatabasePipeline': 600,
    'myproject.pipelines.JsonWriterPipeline': 700,
}

# Database configuration
DATABASE = {
    'database': 'scraped_data.db'
}

# Images pipeline configuration
IMAGES_STORE = 'images'
IMAGES_URLS_FIELD = 'image_urls'
IMAGES_RESULT_FIELD = 'images'

Environment-Specific Pipelines

Configure different pipelines for different environments:

# settings.py
import os

if os.getenv('ENVIRONMENT') == 'production':
    ITEM_PIPELINES = {
        'myproject.pipelines.ValidationPipeline': 300,
        'myproject.pipelines.CleaningPipeline': 400,
        'myproject.pipelines.DatabasePipeline': 500,
    }
else:
    ITEM_PIPELINES = {
        'myproject.pipelines.ValidationPipeline': 300,
        'myproject.pipelines.CleaningPipeline': 400,
        'myproject.pipelines.JsonWriterPipeline': 500,
    }

Error Handling and Logging

Implement robust error handling in your pipelines:

import logging
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class RobustPipeline:
    def __init__(self):
        self.logger = logging.getLogger(__name__)

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        try:
            # Your processing logic here
            self.validate_item(adapter)
            self.transform_item(adapter)

        except ValueError as e:
            self.logger.warning(f"Validation error for item {adapter.get('url')}: {e}")
            raise DropItem(f"Invalid item: {e}")

        except Exception as e:
            self.logger.error(f"Unexpected error processing item: {e}")
            # Optionally re-raise or handle gracefully
            return item

        return item

    def validate_item(self, adapter):
        if not adapter.get('title'):
            raise ValueError("Title is required")

    def transform_item(self, adapter):
        # Transform data here
        pass

Testing Custom Pipelines

Create unit tests for your pipelines:

# test_pipelines.py
import unittest
from scrapy.exceptions import DropItem
from myproject.pipelines import ValidationPipeline
from myproject.items import MyItem

class TestValidationPipeline(unittest.TestCase):
    def setUp(self):
        self.pipeline = ValidationPipeline()

    def test_valid_item_passes(self):
        item = MyItem(title="Test", price=10.0, url="http://example.com")
        result = self.pipeline.process_item(item, None)
        self.assertEqual(result, item)

    def test_missing_title_drops_item(self):
        item = MyItem(price=10.0, url="http://example.com")
        with self.assertRaises(DropItem):
            self.pipeline.process_item(item, None)

if __name__ == '__main__':
    unittest.main()

Running Your Pipeline

Test your pipeline with a simple spider:

# Run spider with custom pipelines
scrapy crawl myspider

# Run with specific settings
scrapy crawl myspider -s ITEM_PIPELINES='{"myproject.pipelines.ValidationPipeline": 300}'

# Debug pipeline processing
scrapy crawl myspider -L DEBUG

Best Practices

  1. Keep pipelines focused: Each pipeline should have a single responsibility
  2. Handle errors gracefully: Use proper exception handling and logging
  3. Use ItemAdapter: Always use ItemAdapter for item manipulation
  4. Configure priority wisely: Order pipelines from validation to storage
  5. Test thoroughly: Write unit tests for complex pipeline logic
  6. Monitor performance: Profile pipelines that process large volumes of data

Custom Scrapy pipelines provide powerful data processing capabilities that complement other web scraping approaches. For complex JavaScript-rendered content that might require additional processing, you might also consider handling JavaScript heavy websites with Selenium or implementing rate limiting in Scrapy for large-scale operations.

Whether you're building simple data validation or complex multi-stage processing workflows, understanding pipeline patterns will help you build more robust and maintainable scraping solutions.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon