Real-time web scraping involves continuously extracting live data from websites as it updates. This comprehensive guide covers multiple techniques for scraping real-time data using Python.
Understanding Real-time Data Sources
Before scraping, identify how the website delivers real-time data:
- Static HTML Updates: Server-side rendering with page refreshes
- AJAX/XHR Requests: JavaScript making API calls to fetch data
- WebSockets: Persistent connections for live data streaming
- Server-Sent Events (SSE): One-way server-to-client data streaming
- Polling: Client repeatedly requesting data at intervals
Method 1: Polling Static Content
For websites that update content through page refreshes:
import requests
from bs4 import BeautifulSoup
import time
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_static_content(url, selector, interval=10):
"""Scrape real-time data from static HTML content"""
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
previous_data = None
while True:
try:
response = session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
element = soup.select_one(selector)
if element:
current_data = element.get_text(strip=True)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Only log if data changed
if current_data != previous_data:
logger.info(f"[{timestamp}] Data updated: {current_data}")
previous_data = current_data
# Process the new data here
process_data(current_data)
except requests.RequestException as e:
logger.error(f"Request failed: {e}")
except Exception as e:
logger.error(f"Parsing error: {e}")
time.sleep(interval)
def process_data(data):
"""Process the scraped data"""
# Add your data processing logic here
print(f"Processing: {data}")
# Usage
scrape_static_content(
url='https://example.com/live-data',
selector='#live-price',
interval=5
)
Method 2: Scraping AJAX/API Endpoints
When data is loaded via JavaScript API calls:
import requests
import json
import time
from typing import Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RealTimeAPIScraper:
def __init__(self, base_url: str, headers: Dict[str, str] = None):
self.session = requests.Session()
self.base_url = base_url
# Set default headers
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
if headers:
default_headers.update(headers)
self.session.headers.update(default_headers)
def fetch_data(self, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Fetch data from API endpoint"""
try:
url = f"{self.base_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"API request failed: {e}")
return {}
except json.JSONDecodeError as e:
logger.error(f"JSON parsing failed: {e}")
return {}
def monitor_endpoint(self, endpoint: str, interval: int = 10,
data_key: str = None):
"""Monitor API endpoint for changes"""
previous_data = None
while True:
current_data = self.fetch_data(endpoint)
if current_data:
# Extract specific data if key provided
relevant_data = current_data.get(data_key) if data_key else current_data
if relevant_data != previous_data:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"[{timestamp}] New data received")
self.process_update(relevant_data)
previous_data = relevant_data
time.sleep(interval)
def process_update(self, data: Any):
"""Process data updates"""
print(f"Data update: {data}")
# Usage
scraper = RealTimeAPIScraper('https://api.example.com')
scraper.monitor_endpoint('/live-prices', interval=5, data_key='price')
Method 3: WebSocket Connections
For real-time data streaming via WebSockets:
import websocket
import json
import threading
import time
from typing import Callable, Dict, Any
class WebSocketScraper:
def __init__(self, url: str, on_data: Callable = None):
self.url = url
self.ws = None
self.is_connected = False
self.on_data_callback = on_data or self.default_data_handler
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
def on_message(self, ws, message: str):
"""Handle incoming WebSocket messages"""
try:
data = json.loads(message)
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] Received: {data}")
self.on_data_callback(data)
except json.JSONDecodeError as e:
print(f"Failed to parse message: {e}")
def on_error(self, ws, error):
"""Handle WebSocket errors"""
print(f"WebSocket error: {error}")
self.is_connected = False
def on_close(self, ws, close_status_code, close_msg):
"""Handle WebSocket connection close"""
print(f"Connection closed: {close_status_code} - {close_msg}")
self.is_connected = False
# Attempt reconnection
if self.reconnect_attempts < self.max_reconnect_attempts:
self.reconnect_attempts += 1
print(f"Reconnecting... (attempt {self.reconnect_attempts})")
time.sleep(5)
self.connect()
def on_open(self, ws):
"""Handle WebSocket connection open"""
print("WebSocket connection established")
self.is_connected = True
self.reconnect_attempts = 0
# Send initial subscription message if needed
subscribe_message = {
"action": "subscribe",
"channel": "live-data"
}
ws.send(json.dumps(subscribe_message))
def default_data_handler(self, data: Dict[str, Any]):
"""Default data processing function"""
print(f"Processing data: {data}")
def connect(self):
"""Establish WebSocket connection"""
self.ws = websocket.WebSocketApp(
self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# Run in separate thread to avoid blocking
ws_thread = threading.Thread(target=self.ws.run_forever)
ws_thread.daemon = True
ws_thread.start()
return ws_thread
def disconnect(self):
"""Close WebSocket connection"""
if self.ws:
self.ws.close()
# Custom data handler function
def handle_price_data(data):
if 'price' in data:
print(f"Price update: ${data['price']}")
# Usage
scraper = WebSocketScraper(
url='wss://api.example.com/live-feed',
on_data=handle_price_data
)
# Connect and keep running
thread = scraper.connect()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping scraper...")
scraper.disconnect()
Method 4: Advanced Selenium-based Real-time Scraping
For complex JavaScript-heavy sites:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import time
import logging
class SeleniumRealTimeScraper:
def __init__(self, headless: bool = True):
self.driver = self.setup_driver(headless)
self.wait = WebDriverWait(self.driver, 10)
def setup_driver(self, headless: bool = True):
"""Setup Chrome WebDriver with optimal settings"""
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1080')
return webdriver.Chrome(options=options)
def monitor_element(self, url: str, selector: str, interval: int = 5):
"""Monitor specific element for changes"""
self.driver.get(url)
previous_text = None
while True:
try:
element = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
current_text = element.text.strip()
if current_text != previous_text:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] Element changed: {current_text}")
previous_text = current_text
# Process the change
self.process_change(current_text)
except TimeoutException:
print("Element not found or page load timeout")
except WebDriverException as e:
print(f"WebDriver error: {e}")
break
time.sleep(interval)
def process_change(self, text: str):
"""Process detected changes"""
print(f"Processing change: {text}")
def close(self):
"""Clean up resources"""
if self.driver:
self.driver.quit()
# Usage with context manager
try:
scraper = SeleniumRealTimeScraper(headless=True)
scraper.monitor_element(
url='https://example.com/live-dashboard',
selector='.live-counter',
interval=3
)
except KeyboardInterrupt:
print("Stopping scraper...")
finally:
scraper.close()
Best Practices and Rate Limiting
import time
import random
from functools import wraps
def rate_limit(calls_per_second: float = 1.0):
"""Decorator to limit function call rate"""
min_interval = 1.0 / calls_per_second
last_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return wrapper
return decorator
def exponential_backoff(max_retries: int = 3, base_delay: float = 1.0):
"""Decorator for exponential backoff retry logic"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise e
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
time.sleep(delay)
return wrapper
return decorator
# Example usage with decorators
@rate_limit(calls_per_second=0.5) # Max 1 call every 2 seconds
@exponential_backoff(max_retries=3)
def fetch_data_safely(url):
response = requests.get(url)
response.raise_for_status()
return response.json()
Error Handling and Monitoring
import logging
import smtplib
from email.mime.text import MIMEText
from typing import Optional
class ScrapingMonitor:
def __init__(self, log_file: str = 'scraping.log'):
self.setup_logging(log_file)
self.error_count = 0
self.success_count = 0
def setup_logging(self, log_file: str):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def log_success(self, message: str):
"""Log successful operations"""
self.success_count += 1
self.logger.info(f"SUCCESS: {message}")
def log_error(self, message: str, exception: Optional[Exception] = None):
"""Log errors with optional exception details"""
self.error_count += 1
error_msg = f"ERROR: {message}"
if exception:
error_msg += f" - {str(exception)}"
self.logger.error(error_msg)
# Send alert if error rate is high
if self.error_count > 5 and self.error_count > self.success_count:
self.send_alert(f"High error rate detected: {self.error_count} errors")
def send_alert(self, message: str):
"""Send email alert for critical issues"""
# Implement email alerting logic here
print(f"ALERT: {message}")
def get_stats(self) -> dict:
"""Get scraping statistics"""
total = self.success_count + self.error_count
success_rate = (self.success_count / total * 100) if total > 0 else 0
return {
'total_requests': total,
'successful': self.success_count,
'errors': self.error_count,
'success_rate': f"{success_rate:.2f}%"
}
# Usage
monitor = ScrapingMonitor()
def monitored_scrape(url):
try:
# Your scraping logic here
response = requests.get(url)
response.raise_for_status()
monitor.log_success(f"Successfully scraped {url}")
return response.json()
except Exception as e:
monitor.log_error(f"Failed to scrape {url}", e)
return None
Key Considerations
- Respect Rate Limits: Implement delays between requests to avoid overwhelming servers
- Handle Failures Gracefully: Use try-catch blocks and retry logic with exponential backoff
- Monitor Performance: Track success rates, response times, and error patterns
- Legal Compliance: Check robots.txt, terms of service, and applicable laws
- Resource Management: Properly close connections and clean up resources
- Data Validation: Verify data integrity and handle unexpected formats
- Scalability: Design for handling large volumes of real-time data
This comprehensive approach ensures robust, efficient, and ethical real-time web scraping with Python.