How do you implement API request validation and sanitization?
API request validation and sanitization are critical security measures that protect your web scraping applications from malicious input, data corruption, and security vulnerabilities. Proper implementation ensures data integrity, prevents injection attacks, and maintains system stability when handling user-provided data.
Understanding Validation vs Sanitization
Validation verifies that incoming data meets specific criteria and formats, rejecting invalid requests before processing. Sanitization cleans and transforms input data to remove or neutralize potentially harmful content while preserving legitimate data.
Both processes work together to create a robust defense against common security threats like SQL injection, XSS attacks, and data corruption.
Input Validation Strategies
Schema-Based Validation
Schema validation defines the expected structure, data types, and constraints for API requests:
# Python with Marshmallow
from marshmallow import Schema, fields, ValidationError, validate
class ScrapingRequestSchema(Schema):
url = fields.Url(required=True)
timeout = fields.Integer(missing=30, validate=validate.Range(min=1, max=300))
user_agent = fields.String(missing="WebScraper/1.0", validate=validate.Length(max=200))
headers = fields.Dict(keys=fields.String(), values=fields.String(), missing={})
proxy = fields.Url(allow_none=True)
def validate_request(data):
schema = ScrapingRequestSchema()
try:
result = schema.load(data)
return result, None
except ValidationError as err:
return None, err.messages
// JavaScript with Joi
const Joi = require('joi');
const scrapingRequestSchema = Joi.object({
url: Joi.string().uri().required(),
timeout: Joi.number().integer().min(1).max(300).default(30),
userAgent: Joi.string().max(200).default('WebScraper/1.0'),
headers: Joi.object().pattern(Joi.string(), Joi.string()).default({}),
proxy: Joi.string().uri().allow(null)
});
function validateRequest(data) {
const { error, value } = scrapingRequestSchema.validate(data);
return { isValid: !error, data: value, errors: error?.details };
}
Type and Format Validation
Implement strict type checking and format validation for different data types:
import re
from urllib.parse import urlparse
def validate_url(url):
"""Validate URL format and scheme"""
try:
parsed = urlparse(url)
if not parsed.scheme in ['http', 'https']:
return False, "Only HTTP and HTTPS schemes are allowed"
if not parsed.netloc:
return False, "URL must contain a valid domain"
return True, None
except Exception as e:
return False, f"Invalid URL format: {str(e)}"
def validate_css_selector(selector):
"""Basic CSS selector validation"""
if not selector or len(selector.strip()) == 0:
return False, "CSS selector cannot be empty"
# Check for potentially dangerous patterns
dangerous_patterns = ['javascript:', 'data:', 'vbscript:']
if any(pattern in selector.lower() for pattern in dangerous_patterns):
return False, "CSS selector contains dangerous patterns"
return True, None
def validate_extraction_fields(fields):
"""Validate field extraction configuration"""
if not isinstance(fields, dict):
return False, "Fields must be a dictionary"
for field_name, field_config in fields.items():
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', field_name):
return False, f"Invalid field name: {field_name}"
if not isinstance(field_config, str):
return False, f"Field configuration must be a string: {field_name}"
return True, None
Rate Limiting and Quota Validation
Implement request rate limiting and quota validation to prevent abuse:
from functools import wraps
from flask import request, jsonify
import redis
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit(max_requests=100, window=3600):
"""Rate limiting decorator"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
client_ip = request.remote_addr
key = f"rate_limit:{client_ip}"
current_requests = redis_client.get(key)
if current_requests is None:
redis_client.setex(key, window, 1)
return f(*args, **kwargs)
if int(current_requests) >= max_requests:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': redis_client.ttl(key)
}), 429
redis_client.incr(key)
return f(*args, **kwargs)
return wrapper
return decorator
Input Sanitization Techniques
HTML and Script Sanitization
Remove or escape potentially dangerous HTML and JavaScript content:
import html
import re
from bleach import clean
def sanitize_html_content(content):
"""Sanitize HTML content by removing dangerous tags and attributes"""
allowed_tags = ['p', 'br', 'strong', 'em', 'u', 'ol', 'ul', 'li', 'h1', 'h2', 'h3']
allowed_attributes = {}
# Remove script tags and dangerous content
content = re.sub(r'<script[^>]*>.*?</script>', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'javascript:', '', content, flags=re.IGNORECASE)
content = re.sub(r'on\w+\s*=', '', content, flags=re.IGNORECASE)
# Clean with bleach
sanitized = clean(content, tags=allowed_tags, attributes=allowed_attributes, strip=True)
return sanitized
def sanitize_user_input(user_input):
"""Basic input sanitization"""
if not isinstance(user_input, str):
return str(user_input)
# HTML escape
sanitized = html.escape(user_input)
# Remove null bytes
sanitized = sanitized.replace('\x00', '')
# Limit length
if len(sanitized) > 10000:
sanitized = sanitized[:10000]
return sanitized.strip()
SQL Injection Prevention
Always use parameterized queries and input sanitization for database operations:
import sqlite3
from typing import List, Dict, Any
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
def sanitize_column_name(self, column_name):
"""Sanitize column names to prevent SQL injection"""
# Only allow alphanumeric characters and underscores
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', column_name):
raise ValueError(f"Invalid column name: {column_name}")
return column_name
def insert_scraping_result(self, url: str, content: str, metadata: Dict[str, Any]):
"""Safely insert scraping results using parameterized queries"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Sanitize inputs
url = sanitize_user_input(url)[:2048] # Limit URL length
content = sanitize_html_content(content)
# Use parameterized query
cursor.execute("""
INSERT INTO scraping_results (url, content, metadata, created_at)
VALUES (?, ?, ?, datetime('now'))
""", (url, content, json.dumps(metadata)))
conn.commit()
return cursor.lastrowid
File Upload Sanitization
Implement secure file upload handling with proper validation:
import os
import mimetypes
from werkzeug.utils import secure_filename
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'csv', 'json'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
def validate_file_upload(file):
"""Validate and sanitize file uploads"""
if not file or not file.filename:
return False, "No file provided"
# Check file extension
filename = secure_filename(file.filename)
if '.' not in filename:
return False, "File must have an extension"
extension = filename.rsplit('.', 1)[1].lower()
if extension not in ALLOWED_EXTENSIONS:
return False, f"File type not allowed: {extension}"
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > MAX_FILE_SIZE:
return False, "File size exceeds maximum limit"
# Validate MIME type
mime_type, _ = mimetypes.guess_type(filename)
if mime_type and not mime_type.startswith(('text/', 'image/', 'application/json', 'application/pdf')):
return False, f"MIME type not allowed: {mime_type}"
return True, filename
Advanced Validation Patterns
Custom Validation Rules
Create reusable validation rules for common web scraping scenarios:
// JavaScript validation utilities
class ApiValidator {
static validateScrapingParameters(params) {
const errors = [];
// URL validation with additional security checks
if (!this.isValidUrl(params.url)) {
errors.push('Invalid URL format');
} else if (this.isPrivateNetwork(params.url)) {
errors.push('URLs pointing to private networks are not allowed');
}
// CSS selector validation
if (params.selector && !this.isValidCssSelector(params.selector)) {
errors.push('Invalid CSS selector format');
}
// Timeout validation
if (params.timeout && (params.timeout < 1 || params.timeout > 300)) {
errors.push('Timeout must be between 1 and 300 seconds');
}
return errors;
}
static isValidUrl(url) {
try {
const parsed = new URL(url);
return ['http:', 'https:'].includes(parsed.protocol);
} catch {
return false;
}
}
static isPrivateNetwork(url) {
try {
const parsed = new URL(url);
const hostname = parsed.hostname;
// Check for localhost, private IPs, etc.
const privatePatterns = [
/^localhost$/i,
/^127\./,
/^10\./,
/^172\.(1[6-9]|2[0-9]|3[0-1])\./,
/^192\.168\./,
/^::1$/,
/^fe80:/i
];
return privatePatterns.some(pattern => pattern.test(hostname));
} catch {
return false;
}
}
static isValidCssSelector(selector) {
try {
document.querySelector(selector);
return true;
} catch {
return false;
}
}
}
Error Handling and Logging
Implement comprehensive error handling with proper logging:
import logging
from typing import Tuple, Optional, Dict, Any
logger = logging.getLogger(__name__)
class ValidationError(Exception):
def __init__(self, message: str, field: str = None, code: str = None):
self.message = message
self.field = field
self.code = code
super().__init__(message)
class ApiRequestValidator:
def __init__(self):
self.validation_rules = {}
def validate_request(self, data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]], Optional[Dict[str, str]]]:
"""
Comprehensive request validation with detailed error reporting
Returns: (is_valid, sanitized_data, errors)
"""
errors = {}
sanitized_data = {}
try:
# Log validation attempt
logger.info(f"Validating API request with keys: {list(data.keys())}")
# Validate and sanitize each field
for field_name, field_value in data.items():
try:
sanitized_value = self._validate_field(field_name, field_value)
sanitized_data[field_name] = sanitized_value
except ValidationError as e:
errors[field_name] = e.message
logger.warning(f"Validation error for field {field_name}: {e.message}")
# Check for required fields
required_fields = ['url']
for field in required_fields:
if field not in sanitized_data:
errors[field] = f"Field '{field}' is required"
is_valid = len(errors) == 0
if is_valid:
logger.info("Request validation successful")
return True, sanitized_data, None
else:
logger.warning(f"Request validation failed with errors: {errors}")
return False, None, errors
except Exception as e:
logger.error(f"Unexpected error during validation: {str(e)}")
return False, None, {"general": "Internal validation error"}
def _validate_field(self, field_name: str, field_value: Any) -> Any:
"""Validate and sanitize individual fields"""
if field_name == 'url':
return self._validate_url(field_value)
elif field_name == 'timeout':
return self._validate_timeout(field_value)
elif field_name == 'headers':
return self._validate_headers(field_value)
else:
# Default sanitization for unknown fields
return sanitize_user_input(str(field_value))
def _validate_url(self, url: str) -> str:
if not isinstance(url, str):
raise ValidationError("URL must be a string", "url", "invalid_type")
is_valid, error_message = validate_url(url)
if not is_valid:
raise ValidationError(error_message, "url", "invalid_format")
return url.strip()
def _validate_timeout(self, timeout: Any) -> int:
try:
timeout_int = int(timeout)
if timeout_int < 1 or timeout_int > 300:
raise ValidationError("Timeout must be between 1 and 300 seconds", "timeout", "out_of_range")
return timeout_int
except (ValueError, TypeError):
raise ValidationError("Timeout must be a valid integer", "timeout", "invalid_type")
def _validate_headers(self, headers: Any) -> Dict[str, str]:
if not isinstance(headers, dict):
raise ValidationError("Headers must be a dictionary", "headers", "invalid_type")
sanitized_headers = {}
for key, value in headers.items():
if not isinstance(key, str) or not isinstance(value, str):
raise ValidationError("Header keys and values must be strings", "headers", "invalid_type")
# Sanitize header values
sanitized_key = sanitize_user_input(key)[:100]
sanitized_value = sanitize_user_input(value)[:500]
sanitized_headers[sanitized_key] = sanitized_value
return sanitized_headers
Integration with Web Scraping APIs
When working with web scraping APIs, validation becomes even more critical. Consider implementing validation middleware that works seamlessly with your scraping infrastructure. For complex scenarios requiring browser automation and dynamic content handling, ensure that validation rules account for JavaScript-heavy applications and AJAX requests.
For applications that need to monitor network requests during scraping, implement validation for network monitoring parameters and ensure that collected data is properly sanitized before storage.
Best Practices and Security Considerations
Defense in Depth
Implement multiple layers of validation:
- Client-side validation for immediate user feedback
- API gateway validation for request routing and basic filtering
- Application-level validation for business logic
- Database-level constraints as the final safety net
Regular Security Audits
Regularly review and update your validation rules:
# Example security audit checklist
SECURITY_AUDIT_CHECKLIST = {
'input_validation': [
'All user inputs are validated against strict schemas',
'File uploads are properly validated and sandboxed',
'SQL injection prevention is implemented',
'XSS protection is in place'
],
'rate_limiting': [
'Rate limits are enforced per user/IP',
'Quota limits prevent abuse',
'Circuit breakers prevent cascading failures'
],
'logging_monitoring': [
'All validation failures are logged',
'Suspicious patterns are detected and alerted',
'Performance metrics are tracked'
]
}
Performance Optimization
Balance security with performance by implementing efficient validation:
- Use compiled regex patterns for repeated validations
- Implement caching for expensive validation operations
- Consider asynchronous validation for non-critical checks
- Profile validation performance regularly
Conclusion
Implementing robust API request validation and sanitization is essential for building secure, reliable web scraping applications. By combining schema-based validation, proper input sanitization, comprehensive error handling, and regular security audits, you can protect your applications from common security threats while maintaining good performance and user experience.
Remember to keep validation rules updated as your API evolves, regularly audit your security measures, and always follow the principle of least privilege when processing user input. Proper validation and sanitization form the foundation of a secure web scraping infrastructure that can handle diverse data sources and user requirements safely.