Table of contents

How can I scrape real-time data from a website using Python?

Real-time web scraping involves continuously extracting live data from websites as it updates. This comprehensive guide covers multiple techniques for scraping real-time data using Python.

Understanding Real-time Data Sources

Before scraping, identify how the website delivers real-time data:

  1. Static HTML Updates: Server-side rendering with page refreshes
  2. AJAX/XHR Requests: JavaScript making API calls to fetch data
  3. WebSockets: Persistent connections for live data streaming
  4. Server-Sent Events (SSE): One-way server-to-client data streaming
  5. Polling: Client repeatedly requesting data at intervals

Method 1: Polling Static Content

For websites that update content through page refreshes:

import requests
from bs4 import BeautifulSoup
import time
import logging
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_static_content(url, selector, interval=10):
    """Scrape real-time data from static HTML content"""
    session = requests.Session()
    session.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    })

    previous_data = None

    while True:
        try:
            response = session.get(url, timeout=10)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')
            element = soup.select_one(selector)

            if element:
                current_data = element.get_text(strip=True)
                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                # Only log if data changed
                if current_data != previous_data:
                    logger.info(f"[{timestamp}] Data updated: {current_data}")
                    previous_data = current_data

                    # Process the new data here
                    process_data(current_data)

        except requests.RequestException as e:
            logger.error(f"Request failed: {e}")
        except Exception as e:
            logger.error(f"Parsing error: {e}")

        time.sleep(interval)

def process_data(data):
    """Process the scraped data"""
    # Add your data processing logic here
    print(f"Processing: {data}")

# Usage
scrape_static_content(
    url='https://example.com/live-data',
    selector='#live-price',
    interval=5
)

Method 2: Scraping AJAX/API Endpoints

When data is loaded via JavaScript API calls:

import requests
import json
import time
from typing import Dict, Any
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RealTimeAPIScraper:
    def __init__(self, base_url: str, headers: Dict[str, str] = None):
        self.session = requests.Session()
        self.base_url = base_url

        # Set default headers
        default_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        if headers:
            default_headers.update(headers)

        self.session.headers.update(default_headers)

    def fetch_data(self, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Fetch data from API endpoint"""
        try:
            url = f"{self.base_url}{endpoint}"
            response = self.session.get(url, params=params, timeout=10)
            response.raise_for_status()

            return response.json()

        except requests.RequestException as e:
            logger.error(f"API request failed: {e}")
            return {}
        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing failed: {e}")
            return {}

    def monitor_endpoint(self, endpoint: str, interval: int = 10, 
                        data_key: str = None):
        """Monitor API endpoint for changes"""
        previous_data = None

        while True:
            current_data = self.fetch_data(endpoint)

            if current_data:
                # Extract specific data if key provided
                relevant_data = current_data.get(data_key) if data_key else current_data

                if relevant_data != previous_data:
                    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
                    logger.info(f"[{timestamp}] New data received")

                    self.process_update(relevant_data)
                    previous_data = relevant_data

            time.sleep(interval)

    def process_update(self, data: Any):
        """Process data updates"""
        print(f"Data update: {data}")

# Usage
scraper = RealTimeAPIScraper('https://api.example.com')
scraper.monitor_endpoint('/live-prices', interval=5, data_key='price')

Method 3: WebSocket Connections

For real-time data streaming via WebSockets:

import websocket
import json
import threading
import time
from typing import Callable, Dict, Any

class WebSocketScraper:
    def __init__(self, url: str, on_data: Callable = None):
        self.url = url
        self.ws = None
        self.is_connected = False
        self.on_data_callback = on_data or self.default_data_handler
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5

    def on_message(self, ws, message: str):
        """Handle incoming WebSocket messages"""
        try:
            data = json.loads(message)
            timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
            print(f"[{timestamp}] Received: {data}")

            self.on_data_callback(data)

        except json.JSONDecodeError as e:
            print(f"Failed to parse message: {e}")

    def on_error(self, ws, error):
        """Handle WebSocket errors"""
        print(f"WebSocket error: {error}")
        self.is_connected = False

    def on_close(self, ws, close_status_code, close_msg):
        """Handle WebSocket connection close"""
        print(f"Connection closed: {close_status_code} - {close_msg}")
        self.is_connected = False

        # Attempt reconnection
        if self.reconnect_attempts < self.max_reconnect_attempts:
            self.reconnect_attempts += 1
            print(f"Reconnecting... (attempt {self.reconnect_attempts})")
            time.sleep(5)
            self.connect()

    def on_open(self, ws):
        """Handle WebSocket connection open"""
        print("WebSocket connection established")
        self.is_connected = True
        self.reconnect_attempts = 0

        # Send initial subscription message if needed
        subscribe_message = {
            "action": "subscribe",
            "channel": "live-data"
        }
        ws.send(json.dumps(subscribe_message))

    def default_data_handler(self, data: Dict[str, Any]):
        """Default data processing function"""
        print(f"Processing data: {data}")

    def connect(self):
        """Establish WebSocket connection"""
        self.ws = websocket.WebSocketApp(
            self.url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        # Run in separate thread to avoid blocking
        ws_thread = threading.Thread(target=self.ws.run_forever)
        ws_thread.daemon = True
        ws_thread.start()

        return ws_thread

    def disconnect(self):
        """Close WebSocket connection"""
        if self.ws:
            self.ws.close()

# Custom data handler function
def handle_price_data(data):
    if 'price' in data:
        print(f"Price update: ${data['price']}")

# Usage
scraper = WebSocketScraper(
    url='wss://api.example.com/live-feed',
    on_data=handle_price_data
)

# Connect and keep running
thread = scraper.connect()
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping scraper...")
    scraper.disconnect()

Method 4: Advanced Selenium-based Real-time Scraping

For complex JavaScript-heavy sites:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import time
import logging

class SeleniumRealTimeScraper:
    def __init__(self, headless: bool = True):
        self.driver = self.setup_driver(headless)
        self.wait = WebDriverWait(self.driver, 10)

    def setup_driver(self, headless: bool = True):
        """Setup Chrome WebDriver with optimal settings"""
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument('--headless')

        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--disable-gpu')
        options.add_argument('--window-size=1920,1080')

        return webdriver.Chrome(options=options)

    def monitor_element(self, url: str, selector: str, interval: int = 5):
        """Monitor specific element for changes"""
        self.driver.get(url)
        previous_text = None

        while True:
            try:
                element = self.wait.until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, selector))
                )

                current_text = element.text.strip()

                if current_text != previous_text:
                    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
                    print(f"[{timestamp}] Element changed: {current_text}")
                    previous_text = current_text

                    # Process the change
                    self.process_change(current_text)

            except TimeoutException:
                print("Element not found or page load timeout")
            except WebDriverException as e:
                print(f"WebDriver error: {e}")
                break

            time.sleep(interval)

    def process_change(self, text: str):
        """Process detected changes"""
        print(f"Processing change: {text}")

    def close(self):
        """Clean up resources"""
        if self.driver:
            self.driver.quit()

# Usage with context manager
try:
    scraper = SeleniumRealTimeScraper(headless=True)
    scraper.monitor_element(
        url='https://example.com/live-dashboard',
        selector='.live-counter',
        interval=3
    )
except KeyboardInterrupt:
    print("Stopping scraper...")
finally:
    scraper.close()

Best Practices and Rate Limiting

import time
import random
from functools import wraps

def rate_limit(calls_per_second: float = 1.0):
    """Decorator to limit function call rate"""
    min_interval = 1.0 / calls_per_second
    last_called = [0.0]

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = min_interval - elapsed

            if left_to_wait > 0:
                time.sleep(left_to_wait)

            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            return ret
        return wrapper
    return decorator

def exponential_backoff(max_retries: int = 3, base_delay: float = 1.0):
    """Decorator for exponential backoff retry logic"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries:
                        raise e

                    delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1)
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
                    time.sleep(delay)
        return wrapper
    return decorator

# Example usage with decorators
@rate_limit(calls_per_second=0.5)  # Max 1 call every 2 seconds
@exponential_backoff(max_retries=3)
def fetch_data_safely(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

Error Handling and Monitoring

import logging
import smtplib
from email.mime.text import MIMEText
from typing import Optional

class ScrapingMonitor:
    def __init__(self, log_file: str = 'scraping.log'):
        self.setup_logging(log_file)
        self.error_count = 0
        self.success_count = 0

    def setup_logging(self, log_file: str):
        """Setup logging configuration"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def log_success(self, message: str):
        """Log successful operations"""
        self.success_count += 1
        self.logger.info(f"SUCCESS: {message}")

    def log_error(self, message: str, exception: Optional[Exception] = None):
        """Log errors with optional exception details"""
        self.error_count += 1
        error_msg = f"ERROR: {message}"

        if exception:
            error_msg += f" - {str(exception)}"

        self.logger.error(error_msg)

        # Send alert if error rate is high
        if self.error_count > 5 and self.error_count > self.success_count:
            self.send_alert(f"High error rate detected: {self.error_count} errors")

    def send_alert(self, message: str):
        """Send email alert for critical issues"""
        # Implement email alerting logic here
        print(f"ALERT: {message}")

    def get_stats(self) -> dict:
        """Get scraping statistics"""
        total = self.success_count + self.error_count
        success_rate = (self.success_count / total * 100) if total > 0 else 0

        return {
            'total_requests': total,
            'successful': self.success_count,
            'errors': self.error_count,
            'success_rate': f"{success_rate:.2f}%"
        }

# Usage
monitor = ScrapingMonitor()

def monitored_scrape(url):
    try:
        # Your scraping logic here
        response = requests.get(url)
        response.raise_for_status()

        monitor.log_success(f"Successfully scraped {url}")
        return response.json()

    except Exception as e:
        monitor.log_error(f"Failed to scrape {url}", e)
        return None

Key Considerations

  1. Respect Rate Limits: Implement delays between requests to avoid overwhelming servers
  2. Handle Failures Gracefully: Use try-catch blocks and retry logic with exponential backoff
  3. Monitor Performance: Track success rates, response times, and error patterns
  4. Legal Compliance: Check robots.txt, terms of service, and applicable laws
  5. Resource Management: Properly close connections and clean up resources
  6. Data Validation: Verify data integrity and handle unexpected formats
  7. Scalability: Design for handling large volumes of real-time data

This comprehensive approach ensures robust, efficient, and ethical real-time web scraping with Python.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon