Table of contents

How do I implement distributed scraping in Go?

Distributed scraping in Go allows you to scale web scraping operations across multiple processes, machines, or containers to handle large-scale data extraction efficiently. Go's excellent concurrency features, including goroutines and channels, make it an ideal language for building distributed scraping systems.

Understanding Distributed Scraping Architecture

Distributed scraping involves splitting scraping tasks across multiple workers that can run on different machines. The key components include:

  • Task Queue: Central repository for URLs or scraping tasks
  • Workers: Individual processes that consume and execute tasks
  • Load Balancer: Distributes tasks evenly across workers
  • Result Aggregator: Collects and processes scraped data
  • Monitoring System: Tracks progress and handles failures

Basic Worker Pool Implementation

Start with a simple worker pool that can be distributed across multiple processes:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"

    "github.com/PuerkitoBio/goquery"
)

type Task struct {
    URL    string
    ID     int
    Params map[string]interface{}
}

type Result struct {
    TaskID int
    Data   map[string]interface{}
    Error  error
}

type Worker struct {
    ID         int
    TaskChan   chan Task
    ResultChan chan Result
    Client     *http.Client
    WaitGroup  *sync.WaitGroup
}

func NewWorker(id int, taskChan chan Task, resultChan chan Result, wg *sync.WaitGroup) *Worker {
    return &Worker{
        ID:         id,
        TaskChan:   taskChan,
        ResultChan: resultChan,
        Client: &http.Client{
            Timeout: 30 * time.Second,
        },
        WaitGroup: wg,
    }
}

func (w *Worker) Start(ctx context.Context) {
    defer w.WaitGroup.Done()

    for {
        select {
        case task := <-w.TaskChan:
            result := w.ProcessTask(task)
            w.ResultChan <- result
        case <-ctx.Done():
            log.Printf("Worker %d stopping", w.ID)
            return
        }
    }
}

func (w *Worker) ProcessTask(task Task) Result {
    resp, err := w.Client.Get(task.URL)
    if err != nil {
        return Result{TaskID: task.ID, Error: err}
    }
    defer resp.Body.Close()

    doc, err := goquery.NewDocumentFromReader(resp.Body)
    if err != nil {
        return Result{TaskID: task.ID, Error: err}
    }

    // Extract data based on task parameters
    data := make(map[string]interface{})

    // Example: Extract title
    title := doc.Find("title").Text()
    data["title"] = title

    // Example: Extract meta description
    description, _ := doc.Find("meta[name='description']").Attr("content")
    data["description"] = description

    return Result{
        TaskID: task.ID,
        Data:   data,
        Error:  nil,
    }
}

func main() {
    const numWorkers = 5
    const numTasks = 100

    taskChan := make(chan Task, numTasks)
    resultChan := make(chan Result, numTasks)

    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        worker := NewWorker(i, taskChan, resultChan, &wg)
        go worker.Start(ctx)
    }

    // Add tasks
    go func() {
        for i := 0; i < numTasks; i++ {
            task := Task{
                URL: fmt.Sprintf("https://example.com/page/%d", i),
                ID:  i,
                Params: map[string]interface{}{
                    "extract": []string{"title", "description"},
                },
            }
            taskChan <- task
        }
        close(taskChan)
    }()

    // Collect results
    go func() {
        for i := 0; i < numTasks; i++ {
            result := <-resultChan
            if result.Error != nil {
                log.Printf("Task %d failed: %v", result.TaskID, result.Error)
            } else {
                log.Printf("Task %d completed: %+v", result.TaskID, result.Data)
            }
        }
    }()

    wg.Wait()
}

Redis-Based Distributed Queue

For true distributed scraping across multiple machines, use Redis as a central task queue:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
)

type DistributedScraper struct {
    RedisClient *redis.Client
    WorkerID    string
    QueueName   string
}

func NewDistributedScraper(redisAddr, workerID, queueName string) *DistributedScraper {
    rdb := redis.NewClient(&redis.Options{
        Addr: redisAddr,
    })

    return &DistributedScraper{
        RedisClient: rdb,
        WorkerID:    workerID,
        QueueName:   queueName,
    }
}

func (ds *DistributedScraper) AddTask(ctx context.Context, task Task) error {
    taskJSON, err := json.Marshal(task)
    if err != nil {
        return err
    }

    return ds.RedisClient.LPush(ctx, ds.QueueName, taskJSON).Err()
}

func (ds *DistributedScraper) GetTask(ctx context.Context) (*Task, error) {
    result, err := ds.RedisClient.BRPop(ctx, 5*time.Second, ds.QueueName).Result()
    if err != nil {
        if err == redis.Nil {
            return nil, nil // No tasks available
        }
        return nil, err
    }

    var task Task
    err = json.Unmarshal([]byte(result[1]), &task)
    return &task, err
}

func (ds *DistributedScraper) ProcessTasks(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            task, err := ds.GetTask(ctx)
            if err != nil {
                log.Printf("Error getting task: %v", err)
                continue
            }

            if task == nil {
                // No tasks available, wait a bit
                time.Sleep(1 * time.Second)
                continue
            }

            // Process the task
            result := ds.scrapeURL(task.URL)

            // Store result in Redis
            ds.storeResult(ctx, task.ID, result)
        }
    }
}

func (ds *DistributedScraper) scrapeURL(url string) map[string]interface{} {
    // Implement your scraping logic here
    // This is similar to the ProcessTask method from the previous example
    return map[string]interface{}{
        "url":       url,
        "timestamp": time.Now(),
        "data":      "scraped content",
    }
}

func (ds *DistributedScraper) storeResult(ctx context.Context, taskID int, result map[string]interface{}) {
    resultJSON, _ := json.Marshal(result)
    resultKey := fmt.Sprintf("result:%d", taskID)
    ds.RedisClient.Set(ctx, resultKey, resultJSON, 24*time.Hour)
}

Advanced Distributed Architecture with Message Queues

For production-scale distributed scraping, implement a robust architecture using message queues like RabbitMQ or Apache Kafka:

package main

import (
    "context"
    "encoding/json"
    "log"
    "sync"
    "time"

    "github.com/streadway/amqp"
)

type RabbitMQScraper struct {
    Connection *amqp.Connection
    Channel    *amqp.Channel
    QueueName  string
    WorkerID   string
}

func NewRabbitMQScraper(amqpURL, queueName, workerID string) (*RabbitMQScraper, error) {
    conn, err := amqp.Dial(amqpURL)
    if err != nil {
        return nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    // Declare queue
    _, err = ch.QueueDeclare(
        queueName, // name
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        return nil, err
    }

    return &RabbitMQScraper{
        Connection: conn,
        Channel:    ch,
        QueueName:  queueName,
        WorkerID:   workerID,
    }, nil
}

func (rms *RabbitMQScraper) PublishTask(task Task) error {
    body, err := json.Marshal(task)
    if err != nil {
        return err
    }

    return rms.Channel.Publish(
        "",             // exchange
        rms.QueueName,  // routing key
        false,          // mandatory
        false,          // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent, // Make message persistent
        },
    )
}

func (rms *RabbitMQScraper) StartConsumer(ctx context.Context, numWorkers int) error {
    // Set QoS to control the number of unacknowledged messages
    err := rms.Channel.Qos(
        numWorkers, // prefetch count
        0,          // prefetch size
        false,      // global
    )
    if err != nil {
        return err
    }

    msgs, err := rms.Channel.Consume(
        rms.QueueName, // queue
        "",            // consumer
        false,         // auto-ack
        false,         // exclusive
        false,         // no-local
        false,         // no-wait
        nil,           // args
    )
    if err != nil {
        return err
    }

    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            rms.worker(ctx, workerID, msgs)
        }(i)
    }

    wg.Wait()
    return nil
}

func (rms *RabbitMQScraper) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
    for {
        select {
        case <-ctx.Done():
            log.Printf("Worker %d stopping", workerID)
            return
        case msg, ok := <-msgs:
            if !ok {
                return
            }

            var task Task
            err := json.Unmarshal(msg.Body, &task)
            if err != nil {
                log.Printf("Worker %d: Error unmarshaling task: %v", workerID, err)
                msg.Nack(false, false) // Don't requeue invalid messages
                continue
            }

            // Process the task
            log.Printf("Worker %d processing task %d: %s", workerID, task.ID, task.URL)

            result := rms.processTask(task)
            if result.Error != nil {
                log.Printf("Worker %d: Task %d failed: %v", workerID, task.ID, result.Error)
                // Decide whether to requeue based on error type
                msg.Nack(false, true) // Requeue for retry
            } else {
                log.Printf("Worker %d: Task %d completed successfully", workerID, task.ID)
                msg.Ack(false)
            }
        }
    }
}

func (rms *RabbitMQScraper) processTask(task Task) Result {
    // Implement your scraping logic here
    // Add rate limiting, retry logic, etc.

    // Simulate processing time
    time.Sleep(time.Duration(100+task.ID%1000) * time.Millisecond)

    return Result{
        TaskID: task.ID,
        Data: map[string]interface{}{
            "url":        task.URL,
            "worker_id":  rms.WorkerID,
            "processed_at": time.Now(),
        },
        Error: nil,
    }
}

func (rms *RabbitMQScraper) Close() {
    rms.Channel.Close()
    rms.Connection.Close()
}

Kubernetes-Based Distributed Scraping

Deploy your distributed scraper on Kubernetes for automatic scaling and fault tolerance:

# scraper-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: web-scraper
spec:
  replicas: 10
  selector:
    matchLabels:
      app: web-scraper
  template:
    metadata:
      labels:
        app: web-scraper
    spec:
      containers:
      - name: scraper
        image: your-registry/web-scraper:latest
        env:
        - name: REDIS_URL
          value: "redis://redis-service:6379"
        - name: WORKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: QUEUE_NAME
          value: "scraping-tasks"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"

Best Practices for Distributed Scraping

1. Rate Limiting and Respect

Implement distributed rate limiting to avoid overwhelming target servers:

type DistributedRateLimiter struct {
    RedisClient *redis.Client
    Key         string
    Limit       int
    Window      time.Duration
}

func (drl *DistributedRateLimiter) Allow(ctx context.Context) bool {
    now := time.Now()
    windowStart := now.Truncate(drl.Window)

    pipe := drl.RedisClient.Pipeline()

    // Increment counter for current window
    pipe.Incr(ctx, fmt.Sprintf("%s:%d", drl.Key, windowStart.Unix()))
    pipe.Expire(ctx, fmt.Sprintf("%s:%d", drl.Key, windowStart.Unix()), drl.Window)

    results, err := pipe.Exec(ctx)
    if err != nil {
        return false
    }

    count := results[0].(*redis.IntCmd).Val()
    return count <= int64(drl.Limit)
}

2. Fault Tolerance and Recovery

Implement circuit breakers and retry mechanisms for handling failures. For complex scraping scenarios that require JavaScript execution, consider integrating with tools that can handle dynamic content efficiently.

3. Monitoring and Observability

Add comprehensive monitoring to track scraper performance:

type ScraperMetrics struct {
    TasksProcessed   int64
    TasksFailed      int64
    AverageLatency   time.Duration
    LastHeartbeat    time.Time
}

func (rms *RabbitMQScraper) ReportMetrics(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            metrics := ScraperMetrics{
                TasksProcessed: atomic.LoadInt64(&rms.tasksProcessed),
                TasksFailed:    atomic.LoadInt64(&rms.tasksFailed),
                LastHeartbeat:  time.Now(),
            }

            // Send metrics to monitoring system
            rms.sendMetrics(metrics)
        }
    }
}

Running the Distributed Scraper

Console Commands

Start multiple worker instances:

# Start Redis server
redis-server

# Start multiple scraper workers
go run main.go -worker-id=worker-1 -queue=scraping-tasks &
go run main.go -worker-id=worker-2 -queue=scraping-tasks &
go run main.go -worker-id=worker-3 -queue=scraping-tasks &

# Add tasks to the queue
go run task-producer.go -urls-file=urls.txt -queue=scraping-tasks

Docker Compose Setup

version: '3.8'
services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  scraper:
    build: .
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379
      - QUEUE_NAME=scraping-tasks
    deploy:
      replicas: 5

Performance Optimization

For maximum throughput in distributed scraping scenarios, consider implementing connection pooling and optimizing your HTTP client configuration. When dealing with complex sites that require advanced navigation patterns, you might need to integrate solutions that can handle authentication flows across your distributed workers.

Conclusion

Implementing distributed scraping in Go provides excellent scalability and performance for large-scale web scraping operations. By leveraging Go's concurrency features, message queues, and container orchestration platforms, you can build robust systems that can handle millions of pages efficiently while respecting rate limits and maintaining fault tolerance.

The key to successful distributed scraping is proper architecture design, comprehensive monitoring, and implementing best practices for rate limiting and error handling. Start with a simple worker pool implementation and gradually scale to more sophisticated distributed architectures as your requirements grow.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon