How do you implement API health checks for scraping services?
API health checks are essential for maintaining reliable web scraping services. They provide real-time monitoring of your scraping infrastructure, enabling proactive detection of issues before they impact your users. This guide covers comprehensive implementation strategies for building robust health check systems.
Understanding API Health Checks
Health checks are automated monitoring endpoints that verify the operational status of your scraping services. They test critical components including database connectivity, external API availability, browser automation services, and resource utilization. A well-designed health check system provides immediate feedback on service health and enables rapid incident response.
Basic Health Check Implementation
Python Implementation with Flask
Here's a comprehensive health check endpoint using Flask:
from flask import Flask, jsonify
import requests
import psutil
import redis
from datetime import datetime
import time
app = Flask(__name__)
class HealthChecker:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.database_url = "postgresql://user:pass@localhost/scraping_db"
def check_database(self):
try:
import psycopg2
conn = psycopg2.connect(self.database_url)
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
conn.close()
return {"status": "healthy", "response_time": 0.05}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
def check_redis(self):
try:
start_time = time.time()
self.redis_client.ping()
response_time = time.time() - start_time
return {"status": "healthy", "response_time": response_time}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
def check_external_apis(self):
apis = [
{"name": "target_api", "url": "https://api.example.com/health"},
{"name": "proxy_service", "url": "https://proxy.example.com/status"}
]
results = {}
for api in apis:
try:
start_time = time.time()
response = requests.get(api["url"], timeout=5)
response_time = time.time() - start_time
results[api["name"]] = {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"status_code": response.status_code,
"response_time": response_time
}
except Exception as e:
results[api["name"]] = {
"status": "unhealthy",
"error": str(e)
}
return results
def check_system_resources(self):
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu_usage": cpu_percent,
"memory_usage": memory.percent,
"disk_usage": disk.percent,
"status": "healthy" if cpu_percent < 80 and memory.percent < 85 else "degraded"
}
health_checker = HealthChecker()
@app.route('/health')
def health_check():
start_time = time.time()
checks = {
"database": health_checker.check_database(),
"redis": health_checker.check_redis(),
"external_apis": health_checker.check_external_apis(),
"system_resources": health_checker.check_system_resources()
}
# Determine overall health status
overall_status = "healthy"
for check_name, check_result in checks.items():
if check_name == "external_apis":
for api_result in check_result.values():
if api_result.get("status") == "unhealthy":
overall_status = "unhealthy"
break
elif check_result.get("status") in ["unhealthy", "degraded"]:
overall_status = "unhealthy" if check_result.get("status") == "unhealthy" else "degraded"
total_time = time.time() - start_time
return jsonify({
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"checks": checks,
"response_time": total_time,
"version": "1.0.0"
}), 200 if overall_status == "healthy" else 503
Node.js Implementation with Express
const express = require('express');
const redis = require('redis');
const { Pool } = require('pg');
const axios = require('axios');
const os = require('os');
const app = express();
const redisClient = redis.createClient();
const pgPool = new Pool({
connectionString: 'postgresql://user:pass@localhost/scraping_db'
});
class HealthChecker {
async checkDatabase() {
try {
const start = Date.now();
const client = await pgPool.connect();
await client.query('SELECT 1');
client.release();
const responseTime = Date.now() - start;
return { status: 'healthy', response_time: responseTime };
} catch (error) {
return { status: 'unhealthy', error: error.message };
}
}
async checkRedis() {
try {
const start = Date.now();
await redisClient.ping();
const responseTime = Date.now() - start;
return { status: 'healthy', response_time: responseTime };
} catch (error) {
return { status: 'unhealthy', error: error.message };
}
}
async checkExternalAPIs() {
const apis = [
{ name: 'scraping_target', url: 'https://target-api.com/status' },
{ name: 'browser_service', url: 'http://localhost:3000/health' }
];
const results = {};
for (const api of apis) {
try {
const start = Date.now();
const response = await axios.get(api.url, { timeout: 5000 });
const responseTime = Date.now() - start;
results[api.name] = {
status: response.status === 200 ? 'healthy' : 'unhealthy',
status_code: response.status,
response_time: responseTime
};
} catch (error) {
results[api.name] = {
status: 'unhealthy',
error: error.message
};
}
}
return results;
}
checkSystemResources() {
const totalMem = os.totalmem();
const freeMem = os.freemem();
const memoryUsage = ((totalMem - freeMem) / totalMem) * 100;
const loadAverage = os.loadavg()[0];
const cpuCount = os.cpus().length;
const cpuUsage = (loadAverage / cpuCount) * 100;
return {
memory_usage: Math.round(memoryUsage),
cpu_usage: Math.round(cpuUsage),
uptime: os.uptime(),
status: memoryUsage < 85 && cpuUsage < 80 ? 'healthy' : 'degraded'
};
}
}
const healthChecker = new HealthChecker();
app.get('/health', async (req, res) => {
const start = Date.now();
try {
const checks = {
database: await healthChecker.checkDatabase(),
redis: await healthChecker.checkRedis(),
external_apis: await healthChecker.checkExternalAPIs(),
system_resources: healthChecker.checkSystemResources()
};
// Determine overall status
let overallStatus = 'healthy';
Object.values(checks).forEach(check => {
if (check.status === 'unhealthy') {
overallStatus = 'unhealthy';
} else if (check.status === 'degraded' && overallStatus !== 'unhealthy') {
overallStatus = 'degraded';
}
});
const totalTime = Date.now() - start;
const statusCode = overallStatus === 'healthy' ? 200 : 503;
res.status(statusCode).json({
status: overallStatus,
timestamp: new Date().toISOString(),
checks: checks,
response_time: totalTime,
version: '1.0.0'
});
} catch (error) {
res.status(503).json({
status: 'unhealthy',
error: error.message,
timestamp: new Date().toISOString()
});
}
});
Advanced Health Check Patterns
Circuit Breaker Pattern
Implement circuit breakers to prevent cascading failures:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage with health checks
database_circuit = CircuitBreaker(failure_threshold=3, timeout=30)
def check_database_with_circuit():
return database_circuit.call(health_checker.check_database)
Browser Automation Health Checks
For scraping services using browser automation, implement specific checks for browser health. This is particularly important when handling browser sessions in Puppeteer:
import asyncio
from pyppeteer import launch
class BrowserHealthChecker:
def __init__(self):
self.browser = None
async def check_browser_health(self):
try:
if not self.browser:
self.browser = await launch(headless=True)
page = await self.browser.newPage()
start_time = time.time()
# Test basic navigation
await page.goto('https://httpbin.org/get')
content = await page.content()
response_time = time.time() - start_time
await page.close()
return {
"status": "healthy" if "httpbin" in content else "unhealthy",
"response_time": response_time,
"browser_version": await self.browser.version()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
async def cleanup(self):
if self.browser:
await self.browser.close()
browser_checker = BrowserHealthChecker()
Monitoring and Alerting Integration
Prometheus Metrics Integration
from prometheus_client import Counter, Histogram, Gauge, generate_latest
# Metrics
health_check_duration = Histogram('health_check_duration_seconds', 'Health check duration')
health_check_status = Gauge('health_check_status', 'Health check status', ['service'])
health_check_total = Counter('health_check_total', 'Total health checks', ['status'])
@app.route('/metrics')
def metrics():
return generate_latest()
@health_check_duration.time()
def enhanced_health_check():
# Your health check logic here
result = perform_health_checks()
# Update metrics
for service, status in result['checks'].items():
health_check_status.labels(service=service).set(
1 if status.get('status') == 'healthy' else 0
)
health_check_total.labels(status=result['status']).inc()
return result
Webhook Notifications
import requests
import json
class AlertManager:
def __init__(self, slack_webhook=None, discord_webhook=None):
self.slack_webhook = slack_webhook
self.discord_webhook = discord_webhook
def send_alert(self, service_name, status, details):
message = {
"text": f"🚨 Health Check Alert: {service_name}",
"attachments": [{
"color": "danger" if status == "unhealthy" else "warning",
"fields": [
{"title": "Service", "value": service_name, "short": True},
{"title": "Status", "value": status, "short": True},
{"title": "Details", "value": json.dumps(details, indent=2), "short": False}
],
"ts": int(time.time())
}]
}
if self.slack_webhook:
requests.post(self.slack_webhook, json=message)
alert_manager = AlertManager(slack_webhook="https://hooks.slack.com/your-webhook")
Database Health Check Queries
PostgreSQL Specific Checks
-- Check connection count
SELECT count(*) as active_connections
FROM pg_stat_activity
WHERE state = 'active';
-- Check database size
SELECT pg_size_pretty(pg_database_size('your_database')) as db_size;
-- Check for long-running queries
SELECT pid, now() - pg_stat_activity.query_start AS duration, query
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes';
-- Check replication lag (if using replicas)
SELECT CASE
WHEN pg_is_in_recovery() THEN
EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
ELSE 0
END as replication_lag_seconds;
Container Health Checks
Docker Health Check
FROM python:3.9-slim
# Your application setup...
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5000/health || exit 1
# Or using Python script
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python /app/health_check.py || exit 1
Kubernetes Liveness and Readiness Probes
apiVersion: apps/v1
kind: Deployment
metadata:
name: scraping-service
spec:
template:
spec:
containers:
- name: scraping-service
image: your-scraping-service:latest
ports:
- containerPort: 5000
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
Testing Health Check Endpoints
Automated Testing
import unittest
import requests
import json
class HealthCheckTests(unittest.TestCase):
def setUp(self):
self.base_url = "http://localhost:5000"
def test_health_endpoint_returns_200(self):
response = requests.get(f"{self.base_url}/health")
self.assertEqual(response.status_code, 200)
def test_health_response_structure(self):
response = requests.get(f"{self.base_url}/health")
data = response.json()
required_fields = ['status', 'timestamp', 'checks', 'response_time']
for field in required_fields:
self.assertIn(field, data)
def test_health_check_performance(self):
start_time = time.time()
response = requests.get(f"{self.base_url}/health")
duration = time.time() - start_time
self.assertLess(duration, 5.0) # Should complete within 5 seconds
Best Practices and Considerations
Security Considerations
- Implement authentication for detailed health endpoints
- Sanitize error messages to avoid information disclosure
- Use separate endpoints for internal vs external monitoring
- Rate limit health check endpoints to prevent abuse
Performance Optimization
- Cache health check results for frequently accessed endpoints
- Implement timeout controls for all external dependencies
- Use connection pooling for database health checks
- Consider async health checks for improved performance
When implementing health checks for browser-based scraping services, ensure proper handling of timeouts in Puppeteer to prevent health check failures due to slow page loads.
Command Line Health Check Tools
cURL Commands
# Basic health check
curl -f http://localhost:5000/health
# Health check with detailed output
curl -v http://localhost:5000/health | jq '.'
# Health check with timeout
curl --max-time 10 http://localhost:5000/health
Custom CLI Tool
#!/usr/bin/env python3
import argparse
import requests
import sys
import json
def check_service_health(url, timeout=10):
try:
response = requests.get(f"{url}/health", timeout=timeout)
data = response.json()
print(f"Service Status: {data['status']}")
print(f"Response Time: {data['response_time']:.3f}s")
if data['status'] != 'healthy':
print("Failed Checks:")
for check_name, check_result in data['checks'].items():
if check_result.get('status') != 'healthy':
print(f" - {check_name}: {check_result.get('error', 'Unknown error')}")
return data['status'] == 'healthy'
except Exception as e:
print(f"Health check failed: {e}")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Check service health')
parser.add_argument('url', help='Service URL')
parser.add_argument('--timeout', type=int, default=10, help='Request timeout')
args = parser.parse_args()
if check_service_health(args.url, args.timeout):
sys.exit(0)
else:
sys.exit(1)
Conclusion
Implementing comprehensive API health checks for scraping services requires careful consideration of all system components, from database connectivity to browser automation health. The examples provided offer a foundation for building robust monitoring systems that can detect issues early and maintain service reliability.
Remember to tailor your health checks to your specific architecture and requirements, implementing appropriate alerting mechanisms and ensuring your monitoring doesn't become a performance bottleneck itself. Regular testing and maintenance of your health check systems are essential for maintaining their effectiveness over time.