How do I implement distributed scraping in Go?
Distributed scraping in Go allows you to scale web scraping operations across multiple processes, machines, or containers to handle large-scale data extraction efficiently. Go's excellent concurrency features, including goroutines and channels, make it an ideal language for building distributed scraping systems.
Understanding Distributed Scraping Architecture
Distributed scraping involves splitting scraping tasks across multiple workers that can run on different machines. The key components include:
- Task Queue: Central repository for URLs or scraping tasks
- Workers: Individual processes that consume and execute tasks
- Load Balancer: Distributes tasks evenly across workers
- Result Aggregator: Collects and processes scraped data
- Monitoring System: Tracks progress and handles failures
Basic Worker Pool Implementation
Start with a simple worker pool that can be distributed across multiple processes:
package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/PuerkitoBio/goquery"
)
type Task struct {
URL string
ID int
Params map[string]interface{}
}
type Result struct {
TaskID int
Data map[string]interface{}
Error error
}
type Worker struct {
ID int
TaskChan chan Task
ResultChan chan Result
Client *http.Client
WaitGroup *sync.WaitGroup
}
func NewWorker(id int, taskChan chan Task, resultChan chan Result, wg *sync.WaitGroup) *Worker {
return &Worker{
ID: id,
TaskChan: taskChan,
ResultChan: resultChan,
Client: &http.Client{
Timeout: 30 * time.Second,
},
WaitGroup: wg,
}
}
func (w *Worker) Start(ctx context.Context) {
defer w.WaitGroup.Done()
for {
select {
case task := <-w.TaskChan:
result := w.ProcessTask(task)
w.ResultChan <- result
case <-ctx.Done():
log.Printf("Worker %d stopping", w.ID)
return
}
}
}
func (w *Worker) ProcessTask(task Task) Result {
resp, err := w.Client.Get(task.URL)
if err != nil {
return Result{TaskID: task.ID, Error: err}
}
defer resp.Body.Close()
doc, err := goquery.NewDocumentFromReader(resp.Body)
if err != nil {
return Result{TaskID: task.ID, Error: err}
}
// Extract data based on task parameters
data := make(map[string]interface{})
// Example: Extract title
title := doc.Find("title").Text()
data["title"] = title
// Example: Extract meta description
description, _ := doc.Find("meta[name='description']").Attr("content")
data["description"] = description
return Result{
TaskID: task.ID,
Data: data,
Error: nil,
}
}
func main() {
const numWorkers = 5
const numTasks = 100
taskChan := make(chan Task, numTasks)
resultChan := make(chan Result, numTasks)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
worker := NewWorker(i, taskChan, resultChan, &wg)
go worker.Start(ctx)
}
// Add tasks
go func() {
for i := 0; i < numTasks; i++ {
task := Task{
URL: fmt.Sprintf("https://example.com/page/%d", i),
ID: i,
Params: map[string]interface{}{
"extract": []string{"title", "description"},
},
}
taskChan <- task
}
close(taskChan)
}()
// Collect results
go func() {
for i := 0; i < numTasks; i++ {
result := <-resultChan
if result.Error != nil {
log.Printf("Task %d failed: %v", result.TaskID, result.Error)
} else {
log.Printf("Task %d completed: %+v", result.TaskID, result.Data)
}
}
}()
wg.Wait()
}
Redis-Based Distributed Queue
For true distributed scraping across multiple machines, use Redis as a central task queue:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
type DistributedScraper struct {
RedisClient *redis.Client
WorkerID string
QueueName string
}
func NewDistributedScraper(redisAddr, workerID, queueName string) *DistributedScraper {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
return &DistributedScraper{
RedisClient: rdb,
WorkerID: workerID,
QueueName: queueName,
}
}
func (ds *DistributedScraper) AddTask(ctx context.Context, task Task) error {
taskJSON, err := json.Marshal(task)
if err != nil {
return err
}
return ds.RedisClient.LPush(ctx, ds.QueueName, taskJSON).Err()
}
func (ds *DistributedScraper) GetTask(ctx context.Context) (*Task, error) {
result, err := ds.RedisClient.BRPop(ctx, 5*time.Second, ds.QueueName).Result()
if err != nil {
if err == redis.Nil {
return nil, nil // No tasks available
}
return nil, err
}
var task Task
err = json.Unmarshal([]byte(result[1]), &task)
return &task, err
}
func (ds *DistributedScraper) ProcessTasks(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
task, err := ds.GetTask(ctx)
if err != nil {
log.Printf("Error getting task: %v", err)
continue
}
if task == nil {
// No tasks available, wait a bit
time.Sleep(1 * time.Second)
continue
}
// Process the task
result := ds.scrapeURL(task.URL)
// Store result in Redis
ds.storeResult(ctx, task.ID, result)
}
}
}
func (ds *DistributedScraper) scrapeURL(url string) map[string]interface{} {
// Implement your scraping logic here
// This is similar to the ProcessTask method from the previous example
return map[string]interface{}{
"url": url,
"timestamp": time.Now(),
"data": "scraped content",
}
}
func (ds *DistributedScraper) storeResult(ctx context.Context, taskID int, result map[string]interface{}) {
resultJSON, _ := json.Marshal(result)
resultKey := fmt.Sprintf("result:%d", taskID)
ds.RedisClient.Set(ctx, resultKey, resultJSON, 24*time.Hour)
}
Advanced Distributed Architecture with Message Queues
For production-scale distributed scraping, implement a robust architecture using message queues like RabbitMQ or Apache Kafka:
package main
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"github.com/streadway/amqp"
)
type RabbitMQScraper struct {
Connection *amqp.Connection
Channel *amqp.Channel
QueueName string
WorkerID string
}
func NewRabbitMQScraper(amqpURL, queueName, workerID string) (*RabbitMQScraper, error) {
conn, err := amqp.Dial(amqpURL)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// Declare queue
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return &RabbitMQScraper{
Connection: conn,
Channel: ch,
QueueName: queueName,
WorkerID: workerID,
}, nil
}
func (rms *RabbitMQScraper) PublishTask(task Task) error {
body, err := json.Marshal(task)
if err != nil {
return err
}
return rms.Channel.Publish(
"", // exchange
rms.QueueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // Make message persistent
},
)
}
func (rms *RabbitMQScraper) StartConsumer(ctx context.Context, numWorkers int) error {
// Set QoS to control the number of unacknowledged messages
err := rms.Channel.Qos(
numWorkers, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
return err
}
msgs, err := rms.Channel.Consume(
rms.QueueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
rms.worker(ctx, workerID, msgs)
}(i)
}
wg.Wait()
return nil
}
func (rms *RabbitMQScraper) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
for {
select {
case <-ctx.Done():
log.Printf("Worker %d stopping", workerID)
return
case msg, ok := <-msgs:
if !ok {
return
}
var task Task
err := json.Unmarshal(msg.Body, &task)
if err != nil {
log.Printf("Worker %d: Error unmarshaling task: %v", workerID, err)
msg.Nack(false, false) // Don't requeue invalid messages
continue
}
// Process the task
log.Printf("Worker %d processing task %d: %s", workerID, task.ID, task.URL)
result := rms.processTask(task)
if result.Error != nil {
log.Printf("Worker %d: Task %d failed: %v", workerID, task.ID, result.Error)
// Decide whether to requeue based on error type
msg.Nack(false, true) // Requeue for retry
} else {
log.Printf("Worker %d: Task %d completed successfully", workerID, task.ID)
msg.Ack(false)
}
}
}
}
func (rms *RabbitMQScraper) processTask(task Task) Result {
// Implement your scraping logic here
// Add rate limiting, retry logic, etc.
// Simulate processing time
time.Sleep(time.Duration(100+task.ID%1000) * time.Millisecond)
return Result{
TaskID: task.ID,
Data: map[string]interface{}{
"url": task.URL,
"worker_id": rms.WorkerID,
"processed_at": time.Now(),
},
Error: nil,
}
}
func (rms *RabbitMQScraper) Close() {
rms.Channel.Close()
rms.Connection.Close()
}
Kubernetes-Based Distributed Scraping
Deploy your distributed scraper on Kubernetes for automatic scaling and fault tolerance:
# scraper-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: web-scraper
spec:
replicas: 10
selector:
matchLabels:
app: web-scraper
template:
metadata:
labels:
app: web-scraper
spec:
containers:
- name: scraper
image: your-registry/web-scraper:latest
env:
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: QUEUE_NAME
value: "scraping-tasks"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
Best Practices for Distributed Scraping
1. Rate Limiting and Respect
Implement distributed rate limiting to avoid overwhelming target servers:
type DistributedRateLimiter struct {
RedisClient *redis.Client
Key string
Limit int
Window time.Duration
}
func (drl *DistributedRateLimiter) Allow(ctx context.Context) bool {
now := time.Now()
windowStart := now.Truncate(drl.Window)
pipe := drl.RedisClient.Pipeline()
// Increment counter for current window
pipe.Incr(ctx, fmt.Sprintf("%s:%d", drl.Key, windowStart.Unix()))
pipe.Expire(ctx, fmt.Sprintf("%s:%d", drl.Key, windowStart.Unix()), drl.Window)
results, err := pipe.Exec(ctx)
if err != nil {
return false
}
count := results[0].(*redis.IntCmd).Val()
return count <= int64(drl.Limit)
}
2. Fault Tolerance and Recovery
Implement circuit breakers and retry mechanisms for handling failures. For complex scraping scenarios that require JavaScript execution, consider integrating with tools that can handle dynamic content efficiently.
3. Monitoring and Observability
Add comprehensive monitoring to track scraper performance:
type ScraperMetrics struct {
TasksProcessed int64
TasksFailed int64
AverageLatency time.Duration
LastHeartbeat time.Time
}
func (rms *RabbitMQScraper) ReportMetrics(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
metrics := ScraperMetrics{
TasksProcessed: atomic.LoadInt64(&rms.tasksProcessed),
TasksFailed: atomic.LoadInt64(&rms.tasksFailed),
LastHeartbeat: time.Now(),
}
// Send metrics to monitoring system
rms.sendMetrics(metrics)
}
}
}
Running the Distributed Scraper
Console Commands
Start multiple worker instances:
# Start Redis server
redis-server
# Start multiple scraper workers
go run main.go -worker-id=worker-1 -queue=scraping-tasks &
go run main.go -worker-id=worker-2 -queue=scraping-tasks &
go run main.go -worker-id=worker-3 -queue=scraping-tasks &
# Add tasks to the queue
go run task-producer.go -urls-file=urls.txt -queue=scraping-tasks
Docker Compose Setup
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
scraper:
build: .
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
- QUEUE_NAME=scraping-tasks
deploy:
replicas: 5
Performance Optimization
For maximum throughput in distributed scraping scenarios, consider implementing connection pooling and optimizing your HTTP client configuration. When dealing with complex sites that require advanced navigation patterns, you might need to integrate solutions that can handle authentication flows across your distributed workers.
Conclusion
Implementing distributed scraping in Go provides excellent scalability and performance for large-scale web scraping operations. By leveraging Go's concurrency features, message queues, and container orchestration platforms, you can build robust systems that can handle millions of pages efficiently while respecting rate limits and maintaining fault tolerance.
The key to successful distributed scraping is proper architecture design, comprehensive monitoring, and implementing best practices for rate limiting and error handling. Start with a simple worker pool implementation and gradually scale to more sophisticated distributed architectures as your requirements grow.