Table of contents

What are the best methods for storing scraped data in databases using Python?

Storing scraped data efficiently in databases is crucial for building scalable web scraping applications. Python offers numerous libraries and approaches for database integration, each with specific advantages depending on your use case. This comprehensive guide covers the most effective methods for storing scraped data in various database systems.

Popular Database Options for Python Web Scraping

1. SQLite - Lightweight and File-Based

SQLite is perfect for small to medium-scale scraping projects due to its simplicity and zero-configuration setup.

import sqlite3
import requests
from bs4 import BeautifulSoup

# Create database connection
conn = sqlite3.connect('scraped_data.db')
cursor = conn.cursor()

# Create table
cursor.execute('''
    CREATE TABLE IF NOT EXISTS products (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        price REAL,
        description TEXT,
        url TEXT,
        scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
''')

# Function to store scraped data
def store_product_data(name, price, description, url):
    cursor.execute('''
        INSERT INTO products (name, price, description, url)
        VALUES (?, ?, ?, ?)
    ''', (name, price, description, url))
    conn.commit()

# Example scraping and storage
response = requests.get('https://example-ecommerce.com/products')
soup = BeautifulSoup(response.content, 'html.parser')

for product in soup.find_all('div', class_='product-item'):
    name = product.find('h3').text.strip()
    price = float(product.find('span', class_='price').text.replace('$', ''))
    description = product.find('p', class_='description').text.strip()
    url = product.find('a')['href']

    store_product_data(name, price, description, url)

conn.close()

2. PostgreSQL with psycopg2

PostgreSQL is ideal for production applications requiring advanced features, ACID compliance, and concurrent access.

import psycopg2
from psycopg2.extras import RealDictCursor
import requests
from bs4 import BeautifulSoup
from datetime import datetime

# Database connection
conn = psycopg2.connect(
    host="localhost",
    database="scraping_db",
    user="your_username",
    password="your_password"
)

cursor = conn.cursor(cursor_factory=RealDictCursor)

# Create table with proper indexing
cursor.execute('''
    CREATE TABLE IF NOT EXISTS articles (
        id SERIAL PRIMARY KEY,
        title VARCHAR(500) NOT NULL,
        content TEXT,
        author VARCHAR(200),
        published_date DATE,
        url VARCHAR(1000) UNIQUE,
        scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        tags TEXT[]
    );

    CREATE INDEX IF NOT EXISTS idx_articles_url ON articles(url);
    CREATE INDEX IF NOT EXISTS idx_articles_published_date ON articles(published_date);
''')

# Batch insert function for better performance
def batch_insert_articles(articles_data):
    insert_query = '''
        INSERT INTO articles (title, content, author, published_date, url, tags)
        VALUES %s
        ON CONFLICT (url) DO UPDATE SET
            title = EXCLUDED.title,
            content = EXCLUDED.content,
            scraped_at = CURRENT_TIMESTAMP
    '''

    from psycopg2.extras import execute_values
    execute_values(cursor, insert_query, articles_data, template=None, page_size=100)
    conn.commit()

# Example usage
articles_batch = []
for page in range(1, 6):  # Scrape 5 pages
    response = requests.get(f'https://example-news.com/page/{page}')
    soup = BeautifulSoup(response.content, 'html.parser')

    for article in soup.find_all('article'):
        title = article.find('h2').text.strip()
        content = article.find('div', class_='content').text.strip()
        author = article.find('span', class_='author').text.strip()
        url = article.find('a')['href']

        articles_batch.append((title, content, author, datetime.now().date(), url, []))

    # Batch insert every 100 records
    if len(articles_batch) >= 100:
        batch_insert_articles(articles_batch)
        articles_batch = []

# Insert remaining records
if articles_batch:
    batch_insert_articles(articles_batch)

conn.close()

3. MongoDB with PyMongo

MongoDB excels at storing unstructured or semi-structured scraped data with flexible schemas.

from pymongo import MongoClient
import requests
from bs4 import BeautifulSoup
from datetime import datetime

# MongoDB connection
client = MongoClient('mongodb://localhost:27017/')
db = client['scraping_database']
collection = db['products']

# Create indexes for better query performance
collection.create_index([("url", 1)], unique=True)
collection.create_index([("category", 1), ("price", 1)])
collection.create_index([("scraped_at", -1)])

def store_product_mongodb(product_data):
    """Store product data with upsert to handle duplicates"""
    try:
        result = collection.update_one(
            {"url": product_data["url"]},
            {
                "$set": {
                    **product_data,
                    "last_updated": datetime.now()
                },
                "$setOnInsert": {
                    "created_at": datetime.now()
                }
            },
            upsert=True
        )
        return result.upserted_id or result.matched_count
    except Exception as e:
        print(f"Error storing product: {e}")
        return None

# Example scraping with flexible data structure
def scrape_ecommerce_site():
    response = requests.get('https://example-shop.com/products')
    soup = BeautifulSoup(response.content, 'html.parser')

    for product in soup.find_all('div', class_='product-card'):
        product_data = {
            "name": product.find('h3').text.strip(),
            "price": float(product.find('span', class_='price').text.replace('$', '')),
            "url": product.find('a')['href'],
            "scraped_at": datetime.now(),
            "category": product.get('data-category', 'unknown'),
            "availability": product.find('span', class_='stock').text.strip(),
            "images": [img['src'] for img in product.find_all('img')],
            "specifications": {}
        }

        # Extract specifications if available
        specs_section = product.find('div', class_='specifications')
        if specs_section:
            for spec in specs_section.find_all('li'):
                key, value = spec.text.split(':', 1)
                product_data["specifications"][key.strip()] = value.strip()

        store_product_mongodb(product_data)

scrape_ecommerce_site()
client.close()

Advanced Database Patterns for Web Scraping

1. Using SQLAlchemy ORM for Complex Relationships

SQLAlchemy provides a powerful ORM that simplifies complex database operations and relationships.

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import requests
from bs4 import BeautifulSoup

Base = declarative_base()

class Website(Base):
    __tablename__ = 'websites'

    id = Column(Integer, primary_key=True)
    name = Column(String(200), nullable=False)
    base_url = Column(String(500), nullable=False)
    created_at = Column(DateTime, default=datetime.now)

    # Relationship to products
    products = relationship("Product", back_populates="website")

class Product(Base):
    __tablename__ = 'products'

    id = Column(Integer, primary_key=True)
    name = Column(String(500), nullable=False)
    price = Column(Float)
    description = Column(Text)
    url = Column(String(1000), unique=True)
    website_id = Column(Integer, ForeignKey('websites.id'))
    scraped_at = Column(DateTime, default=datetime.now)

    # Relationship to website
    website = relationship("Website", back_populates="products")

# Database setup
engine = create_engine('postgresql://user:password@localhost/scraping_db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

def scrape_and_store_with_orm():
    session = Session()

    # Get or create website
    website = session.query(Website).filter_by(name="Example Store").first()
    if not website:
        website = Website(name="Example Store", base_url="https://example-store.com")
        session.add(website)
        session.commit()

    response = requests.get(f"{website.base_url}/products")
    soup = BeautifulSoup(response.content, 'html.parser')

    products_to_add = []
    for product_elem in soup.find_all('div', class_='product'):
        # Check if product already exists
        product_url = product_elem.find('a')['href']
        existing_product = session.query(Product).filter_by(url=product_url).first()

        if not existing_product:
            product = Product(
                name=product_elem.find('h3').text.strip(),
                price=float(product_elem.find('span', class_='price').text.replace('$', '')),
                description=product_elem.find('p').text.strip(),
                url=product_url,
                website_id=website.id
            )
            products_to_add.append(product)

    # Bulk insert for better performance
    session.bulk_save_objects(products_to_add)
    session.commit()
    session.close()

scrape_and_store_with_orm()

2. Implementing Data Pipelines with Error Handling

When scraping data from websites that require complex navigation, robust error handling and data validation become essential.

import logging
from sqlalchemy.exc import IntegrityError
from typing import Dict, List, Optional
import time

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ScrapingDataPipeline:
    def __init__(self, db_session):
        self.session = db_session
        self.failed_items = []
        self.processed_count = 0

    def validate_data(self, data: Dict) -> bool:
        """Validate scraped data before storage"""
        required_fields = ['name', 'price', 'url']

        for field in required_fields:
            if field not in data or not data[field]:
                logger.warning(f"Missing required field: {field}")
                return False

        # Validate price is numeric
        try:
            float(data['price'])
        except (ValueError, TypeError):
            logger.warning(f"Invalid price format: {data.get('price')}")
            return False

        return True

    def clean_data(self, data: Dict) -> Dict:
        """Clean and normalize scraped data"""
        cleaned_data = {}

        # Clean string fields
        for key, value in data.items():
            if isinstance(value, str):
                cleaned_data[key] = value.strip().replace('\n', ' ').replace('\t', ' ')
            else:
                cleaned_data[key] = value

        # Normalize price
        if 'price' in cleaned_data:
            price_str = str(cleaned_data['price']).replace('$', '').replace(',', '')
            try:
                cleaned_data['price'] = float(price_str)
            except ValueError:
                cleaned_data['price'] = 0.0

        return cleaned_data

    def store_item(self, item_data: Dict) -> bool:
        """Store single item with error handling"""
        try:
            # Validate and clean data
            if not self.validate_data(item_data):
                self.failed_items.append({'data': item_data, 'reason': 'validation_failed'})
                return False

            cleaned_data = self.clean_data(item_data)

            # Create product instance
            product = Product(**cleaned_data)
            self.session.add(product)
            self.session.commit()

            self.processed_count += 1
            logger.info(f"Successfully stored item: {cleaned_data.get('name', 'Unknown')}")
            return True

        except IntegrityError as e:
            self.session.rollback()
            logger.warning(f"Duplicate item detected: {item_data.get('url', 'Unknown URL')}")
            self.failed_items.append({'data': item_data, 'reason': 'duplicate'})
            return False

        except Exception as e:
            self.session.rollback()
            logger.error(f"Error storing item: {str(e)}")
            self.failed_items.append({'data': item_data, 'reason': str(e)})
            return False

    def bulk_store_items(self, items: List[Dict], batch_size: int = 100) -> Dict:
        """Store items in batches for better performance"""
        results = {'success': 0, 'failed': 0, 'duplicate': 0}

        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]

            for item in batch:
                if self.store_item(item):
                    results['success'] += 1
                else:
                    results['failed'] += 1

            # Add delay between batches to be respectful
            time.sleep(1)

        return results

# Usage example
def scrape_with_pipeline():
    session = Session()
    pipeline = ScrapingDataPipeline(session)

    # Scrape data
    scraped_items = []
    response = requests.get('https://example-store.com/products')
    soup = BeautifulSoup(response.content, 'html.parser')

    for product in soup.find_all('div', class_='product'):
        item_data = {
            'name': product.find('h3').text if product.find('h3') else '',
            'price': product.find('span', class_='price').text if product.find('span', class_='price') else '0',
            'url': product.find('a')['href'] if product.find('a') else '',
            'description': product.find('p').text if product.find('p') else ''
        }
        scraped_items.append(item_data)

    # Store data using pipeline
    results = pipeline.bulk_store_items(scraped_items)

    logger.info(f"Processing complete. Success: {results['success']}, Failed: {results['failed']}")

    session.close()

Performance Optimization Strategies

1. Connection Pooling and Batch Operations

from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from contextlib import contextmanager

# Create engine with connection pooling
engine = create_engine(
    'postgresql://user:password@localhost/scraping_db',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    pool_recycle=3600
)

@contextmanager
def get_db_session():
    """Context manager for database sessions"""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

def batch_upsert_products(products_data: List[Dict]):
    """Efficient batch upsert using raw SQL"""
    with get_db_session() as session:
        # Use PostgreSQL's ON CONFLICT for efficient upserts
        query = '''
            INSERT INTO products (name, price, description, url, scraped_at)
            VALUES %(values)s
            ON CONFLICT (url) DO UPDATE SET
                name = EXCLUDED.name,
                price = EXCLUDED.price,
                description = EXCLUDED.description,
                scraped_at = EXCLUDED.scraped_at
        '''

        from psycopg2.extras import execute_values
        execute_values(
            session.connection().connection.cursor(),
            query,
            [(p['name'], p['price'], p['description'], p['url'], datetime.now()) 
             for p in products_data],
            template=None,
            page_size=1000
        )

2. Asynchronous Database Operations

For high-volume scraping operations, asynchronous database operations can significantly improve performance.

import asyncio
import asyncpg
from aiohttp import ClientSession

async def create_connection_pool():
    """Create async database connection pool"""
    return await asyncpg.create_pool(
        "postgresql://user:password@localhost/scraping_db",
        min_size=5,
        max_size=20
    )

async def store_product_async(pool, product_data):
    """Store product data asynchronously"""
    async with pool.acquire() as connection:
        try:
            await connection.execute('''
                INSERT INTO products (name, price, description, url, scraped_at)
                VALUES ($1, $2, $3, $4, $5)
                ON CONFLICT (url) DO UPDATE SET
                    name = EXCLUDED.name,
                    price = EXCLUDED.price,
                    scraped_at = EXCLUDED.scraped_at
            ''', product_data['name'], product_data['price'], 
                 product_data['description'], product_data['url'], datetime.now())
        except Exception as e:
            print(f"Error storing product: {e}")

async def scrape_and_store_async():
    """Asynchronous scraping and storage"""
    pool = await create_connection_pool()

    async with ClientSession() as session:
        # Scrape multiple pages concurrently
        tasks = []
        for page in range(1, 11):
            task = scrape_page_async(session, pool, page)
            tasks.append(task)

        await asyncio.gather(*tasks)

    await pool.close()

async def scrape_page_async(session, pool, page_num):
    """Scrape single page asynchronously"""
    async with session.get(f'https://example-store.com/page/{page_num}') as response:
        content = await response.text()
        soup = BeautifulSoup(content, 'html.parser')

        # Extract products and store concurrently
        storage_tasks = []
        for product in soup.find_all('div', class_='product'):
            product_data = {
                'name': product.find('h3').text.strip(),
                'price': float(product.find('span', class_='price').text.replace('$', '')),
                'description': product.find('p').text.strip(),
                'url': product.find('a')['href']
            }

            task = store_product_async(pool, product_data)
            storage_tasks.append(task)

        await asyncio.gather(*storage_tasks)

# Run the async scraper
# asyncio.run(scrape_and_store_async())

Best Practices and Recommendations

1. Choose the Right Database

  • SQLite: Perfect for prototyping, small-scale projects, or when you need portability
  • PostgreSQL: Best for production applications requiring ACID compliance, complex queries, and scalability
  • MongoDB: Ideal for unstructured data, rapid prototyping, and when schema flexibility is important

2. Data Quality and Validation

Always implement data validation and cleaning before storage. When implementing retry logic for failed requests in Python, ensure your database operations are also resilient to failures.

3. Performance Considerations

  • Use batch operations for inserting large amounts of data
  • Implement proper indexing strategies
  • Consider connection pooling for high-volume applications
  • Use async operations when scraping at scale

4. Monitoring and Maintenance

import psutil
import time
from datetime import datetime

class DatabaseMonitor:
    def __init__(self, session):
        self.session = session

    def log_performance_metrics(self):
        """Log database and system performance metrics"""
        # Database metrics
        result = self.session.execute("SELECT COUNT(*) FROM products").scalar()

        # System metrics
        cpu_percent = psutil.cpu_percent()
        memory_percent = psutil.virtual_memory().percent

        metrics = {
            'timestamp': datetime.now(),
            'total_products': result,
            'cpu_usage': cpu_percent,
            'memory_usage': memory_percent
        }

        print(f"Performance Metrics: {metrics}")
        return metrics

Database Schema Design Tips

When designing your database schema for scraped data, consider these best practices:

-- Example optimized schema for e-commerce scraping
CREATE TABLE websites (
    id SERIAL PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    base_url VARCHAR(500) NOT NULL,
    last_scraped TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE categories (
    id SERIAL PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    website_id INTEGER REFERENCES websites(id)
);

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(500) NOT NULL,
    price DECIMAL(10,2),
    original_price DECIMAL(10,2),
    description TEXT,
    url VARCHAR(1000) UNIQUE,
    image_urls TEXT[],
    in_stock BOOLEAN DEFAULT true,
    website_id INTEGER REFERENCES websites(id),
    category_id INTEGER REFERENCES categories(id),
    scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create indexes for better query performance
CREATE INDEX idx_products_website_category ON products(website_id, category_id);
CREATE INDEX idx_products_price ON products(price) WHERE price IS NOT NULL;
CREATE INDEX idx_products_scraped_at ON products(scraped_at);
CREATE INDEX idx_products_url ON products(url);

-- Create trigger for updating timestamp
CREATE OR REPLACE FUNCTION update_modified_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_products_modtime 
    BEFORE UPDATE ON products 
    FOR EACH ROW 
    EXECUTE FUNCTION update_modified_column();

Storing scraped data effectively in databases is fundamental to building robust web scraping applications. By choosing the appropriate database system, implementing proper error handling, and following performance best practices, you can create scalable solutions that handle large volumes of scraped data efficiently. Remember to always validate your data, implement proper indexing, and monitor your system's performance to ensure optimal operation.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon