Table of contents

How do I implement distributed scraping using multiple Colly instances?

Implementing distributed scraping with multiple Colly instances allows you to scale your web scraping operations across multiple processes, machines, or containers. This approach significantly improves performance for large-scale scraping projects by parallelizing work and distributing load.

Core Concepts of Distributed Scraping

Distributed scraping involves breaking down a large scraping task into smaller, manageable units that can be processed independently by multiple Colly instances. The key components include:

  • Job Distribution: Mechanism to distribute URLs or tasks across multiple workers
  • Coordination: Ensuring no duplicate work and proper task completion tracking
  • Data Collection: Aggregating results from multiple sources
  • Error Handling: Managing failures and retries across the distributed system

Basic Distributed Architecture

1. Queue-Based Distribution

The most common approach uses a message queue to distribute work:

package main

import (
    "encoding/json"
    "log"
    "sync"
    "time"

    "github.com/gocolly/colly/v2"
    "github.com/gocolly/colly/v2/debug"
    "github.com/gocolly/colly/v2/queue"
)

type ScrapeJob struct {
    URL      string            `json:"url"`
    Metadata map[string]string `json:"metadata"`
    Priority int               `json:"priority"`
}

type DistributedScraper struct {
    workers    int
    collector  *colly.Collector
    jobQueue   chan ScrapeJob
    resultsCh  chan interface{}
    wg         sync.WaitGroup
}

func NewDistributedScraper(workers int) *DistributedScraper {
    c := colly.NewCollector(
        colly.Debugger(&debug.LogDebugger{}),
        colly.Async(true),
    )

    // Configure rate limiting
    c.Limit(&colly.LimitRule{
        DomainGlob:  "*",
        Parallelism: 2,
        Delay:       1 * time.Second,
    })

    return &DistributedScraper{
        workers:   workers,
        collector: c,
        jobQueue:  make(chan ScrapeJob, workers*10),
        resultsCh: make(chan interface{}, workers*10),
    }
}

func (ds *DistributedScraper) Start() {
    for i := 0; i < ds.workers; i++ {
        ds.wg.Add(1)
        go ds.worker(i)
    }
}

func (ds *DistributedScraper) worker(id int) {
    defer ds.wg.Done()

    c := ds.collector.Clone()

    c.OnHTML("a[href]", func(e *colly.HTMLElement) {
        link := e.Attr("href")
        e.Request.Visit(link)
    })

    c.OnResponse(func(r *colly.Response) {
        result := map[string]interface{}{
            "worker_id": id,
            "url":       r.Request.URL.String(),
            "status":    r.StatusCode,
            "body_size": len(r.Body),
            "timestamp": time.Now(),
        }

        select {
        case ds.resultsCh <- result:
        default:
            log.Printf("Worker %d: results channel full", id)
        }
    })

    c.OnError(func(r *colly.Response, err error) {
        log.Printf("Worker %d error: %v", id, err)
    })

    for job := range ds.jobQueue {
        log.Printf("Worker %d processing: %s", id, job.URL)
        c.Visit(job.URL)
        c.Wait()
    }
}

func (ds *DistributedScraper) AddJob(job ScrapeJob) {
    select {
    case ds.jobQueue <- job:
    default:
        log.Println("Job queue full, dropping job:", job.URL)
    }
}

func (ds *DistributedScraper) Stop() {
    close(ds.jobQueue)
    ds.wg.Wait()
    close(ds.resultsCh)
}

func main() {
    scraper := NewDistributedScraper(5)
    scraper.Start()

    // Add jobs
    urls := []string{
        "https://example.com",
        "https://httpbin.org",
        "https://jsonplaceholder.typicode.com",
    }

    for _, url := range urls {
        scraper.AddJob(ScrapeJob{
            URL:      url,
            Priority: 1,
            Metadata: map[string]string{"source": "main"},
        })
    }

    // Collect results
    go func() {
        for result := range scraper.resultsCh {
            data, _ := json.MarshalIndent(result, "", "  ")
            log.Printf("Result: %s", data)
        }
    }()

    time.Sleep(30 * time.Second)
    scraper.Stop()
}

2. Redis-Based Coordination

For true distributed systems across multiple machines, use Redis for coordination:

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
    "github.com/gocolly/colly/v2"
)

type RedisDistributedScraper struct {
    collector  *colly.Collector
    redisClient *redis.Client
    workerID    string
    queueKey    string
    resultsKey  string
}

func NewRedisDistributedScraper(redisAddr, workerID string) *RedisDistributedScraper {
    rdb := redis.NewClient(&redis.Options{
        Addr: redisAddr,
    })

    c := colly.NewCollector()
    c.Async = true

    c.Limit(&colly.LimitRule{
        DomainGlob:  "*",
        Parallelism: 3,
        Delay:       2 * time.Second,
    })

    return &RedisDistributedScraper{
        collector:   c,
        redisClient: rdb,
        workerID:    workerID,
        queueKey:    "scraping:jobs",
        resultsKey:  "scraping:results",
    }
}

func (rds *RedisDistributedScraper) SetupHandlers() {
    rds.collector.OnHTML("title", func(e *colly.HTMLElement) {
        result := map[string]interface{}{
            "worker_id": rds.workerID,
            "url":       e.Request.URL.String(),
            "title":     e.Text,
            "timestamp": time.Now().Unix(),
        }

        rds.saveResult(result)
    })

    rds.collector.OnHTML("a[href]", func(e *colly.HTMLElement) {
        link := e.Request.AbsoluteURL(e.Attr("href"))
        if link != "" {
            rds.addJob(link)
        }
    })

    rds.collector.OnError(func(r *colly.Response, err error) {
        log.Printf("Worker %s error on %s: %v", rds.workerID, r.Request.URL, err)

        // Re-queue failed job for retry
        rds.addJob(r.Request.URL.String())
    })
}

func (rds *RedisDistributedScraper) addJob(url string) {
    ctx := context.Background()
    job := map[string]interface{}{
        "url":        url,
        "created_at": time.Now().Unix(),
        "retries":    0,
    }

    jobData, _ := json.Marshal(job)
    rds.redisClient.LPush(ctx, rds.queueKey, jobData)
}

func (rds *RedisDistributedScraper) saveResult(result map[string]interface{}) {
    ctx := context.Background()
    resultData, _ := json.Marshal(result)
    rds.redisClient.LPush(ctx, rds.resultsKey, resultData)
}

func (rds *RedisDistributedScraper) processJobs(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            // Pop job from queue with timeout
            result, err := rds.redisClient.BRPop(ctx, 5*time.Second, rds.queueKey).Result()
            if err == redis.Nil {
                continue // No jobs available
            } else if err != nil {
                log.Printf("Worker %s: Redis error: %v", rds.workerID, err)
                continue
            }

            var job map[string]interface{}
            if err := json.Unmarshal([]byte(result[1]), &job); err != nil {
                log.Printf("Worker %s: Invalid job data: %v", rds.workerID, err)
                continue
            }

            url := job["url"].(string)
            log.Printf("Worker %s processing: %s", rds.workerID, url)

            rds.collector.Visit(url)
            rds.collector.Wait()
        }
    }
}

func (rds *RedisDistributedScraper) Start(ctx context.Context) {
    rds.SetupHandlers()
    rds.processJobs(ctx)
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    scraper := NewRedisDistributedScraper("localhost:6379", "worker-1")

    // Add initial jobs
    scraper.addJob("https://example.com")
    scraper.addJob("https://httpbin.org")

    scraper.Start(ctx)
}

Advanced Distributed Patterns

3. Master-Worker Architecture

Implement a master coordinator that manages multiple worker processes:

package main

import (
    "context"
    "log"
    "sync"
    "time"

    "github.com/gocolly/colly/v2"
)

type Master struct {
    workers      []*Worker
    jobQueue     chan string
    resultsCh    chan ScrapingResult
    mu           sync.RWMutex
    processedURLs map[string]bool
}

type Worker struct {
    id        int
    collector *colly.Collector
    master    *Master
}

type ScrapingResult struct {
    WorkerID  int       `json:"worker_id"`
    URL       string    `json:"url"`
    Title     string    `json:"title"`
    Links     []string  `json:"links"`
    Timestamp time.Time `json:"timestamp"`
}

func NewMaster(numWorkers int) *Master {
    master := &Master{
        workers:       make([]*Worker, numWorkers),
        jobQueue:      make(chan string, numWorkers*100),
        resultsCh:     make(chan ScrapingResult, numWorkers*100),
        processedURLs: make(map[string]bool),
    }

    for i := 0; i < numWorkers; i++ {
        master.workers[i] = NewWorker(i, master)
    }

    return master
}

func NewWorker(id int, master *Master) *Worker {
    c := colly.NewCollector()
    c.Async = true

    c.Limit(&colly.LimitRule{
        DomainGlob:  "*",
        Parallelism: 2,
        Delay:       1 * time.Second,
    })

    worker := &Worker{
        id:        id,
        collector: c,
        master:    master,
    }

    worker.setupHandlers()
    return worker
}

func (w *Worker) setupHandlers() {
    w.collector.OnHTML("title", func(e *colly.HTMLElement) {
        result := ScrapingResult{
            WorkerID:  w.id,
            URL:       e.Request.URL.String(),
            Title:     e.Text,
            Timestamp: time.Now(),
        }

        // Extract links
        e.ForEach("a[href]", func(i int, el *colly.HTMLElement) {
            link := el.Request.AbsoluteURL(el.Attr("href"))
            if link != "" {
                result.Links = append(result.Links, link)
                w.master.AddURL(link) // Add discovered URLs
            }
        })

        w.master.resultsCh <- result
    })

    w.collector.OnError(func(r *colly.Response, err error) {
        log.Printf("Worker %d error: %v", w.id, err)
    })
}

func (w *Worker) Start(ctx context.Context) {
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            case url := <-w.master.jobQueue:
                log.Printf("Worker %d processing: %s", w.id, url)
                w.collector.Visit(url)
                w.collector.Wait()
            }
        }
    }()
}

func (m *Master) AddURL(url string) {
    m.mu.Lock()
    defer m.mu.Unlock()

    if !m.processedURLs[url] {
        m.processedURLs[url] = true
        select {
        case m.jobQueue <- url:
        default:
            log.Printf("Job queue full, dropping URL: %s", url)
        }
    }
}

func (m *Master) Start(ctx context.Context, seedURLs []string) {
    // Start all workers
    for _, worker := range m.workers {
        worker.Start(ctx)
    }

    // Add seed URLs
    for _, url := range seedURLs {
        m.AddURL(url)
    }

    // Process results
    go func() {
        for result := range m.resultsCh {
            log.Printf("Result from worker %d: %s - %s", 
                result.WorkerID, result.URL, result.Title)
        }
    }()
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    master := NewMaster(3)

    seedURLs := []string{
        "https://example.com",
        "https://httpbin.org",
    }

    master.Start(ctx, seedURLs)

    // Wait for completion
    <-ctx.Done()
    log.Println("Scraping completed")
}

Deployment Strategies

Docker-Based Distribution

Create a Dockerfile for containerized deployment:

FROM golang:1.19-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go build -o scraper ./cmd/scraper

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/scraper .

CMD ["./scraper"]

Use Docker Compose for multi-instance deployment:

version: '3.8'
services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  scraper-worker:
    build: .
    environment:
      - WORKER_ID=${WORKER_ID:-worker}
      - REDIS_URL=redis:6379
    depends_on:
      - redis
    deploy:
      replicas: 5

Kubernetes Deployment

For production-scale distributed scraping:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: colly-scraper
spec:
  replicas: 10
  selector:
    matchLabels:
      app: colly-scraper
  template:
    metadata:
      labels:
        app: colly-scraper
    spec:
      containers:
      - name: scraper
        image: your-registry/colly-scraper:latest
        env:
        - name: WORKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: REDIS_URL
          value: "redis-service:6379"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"

Best Practices and Considerations

1. Load Balancing and Rate Limiting

Implement intelligent load distribution to avoid overwhelming target servers:

func (ds *DistributedScraper) setupRateLimiting() {
    // Domain-specific rate limiting
    ds.collector.Limit(&colly.LimitRule{
        DomainGlob:  "*.example.com",
        Parallelism: 1,
        Delay:       3 * time.Second,
    })

    ds.collector.Limit(&colly.LimitRule{
        DomainGlob:  "*",
        Parallelism: 2,
        Delay:       1 * time.Second,
    })
}

2. Error Handling and Retry Logic

Implement robust error handling across distributed workers:

func (rds *RedisDistributedScraper) handleRetry(url string, retryCount int) {
    maxRetries := 3
    if retryCount < maxRetries {
        job := map[string]interface{}{
            "url":        url,
            "retries":    retryCount + 1,
            "created_at": time.Now().Unix(),
        }

        jobData, _ := json.Marshal(job)
        // Add back to queue with exponential backoff
        delay := time.Duration(retryCount*retryCount) * time.Second
        time.AfterFunc(delay, func() {
            rds.redisClient.LPush(context.Background(), rds.queueKey, jobData)
        })
    }
}

3. Monitoring and Metrics

Track performance across distributed instances:

type Metrics struct {
    ProcessedURLs   int64     `json:"processed_urls"`
    SuccessfulReqs  int64     `json:"successful_requests"`
    FailedReqs      int64     `json:"failed_requests"`
    AverageRespTime time.Duration `json:"average_response_time"`
    LastActivity    time.Time `json:"last_activity"`
}

func (w *Worker) updateMetrics() {
    w.collector.OnResponse(func(r *colly.Response) {
        atomic.AddInt64(&w.metrics.ProcessedURLs, 1)
        atomic.AddInt64(&w.metrics.SuccessfulReqs, 1)
        w.metrics.LastActivity = time.Now()
    })
}

Related Topics

For handling complex scenarios that might arise during distributed scraping, you may also want to explore how to run multiple pages in parallel with Puppeteer when JavaScript execution is required, or learn about handling browser sessions in Puppeteer for maintaining state across distributed scraping operations.

Conclusion

Implementing distributed scraping with multiple Colly instances requires careful consideration of job distribution, coordination, and error handling. The patterns shown above provide a solid foundation for building scalable web scraping systems. Start with simple queue-based distribution for single-machine scenarios, then move to Redis-based coordination for true distributed systems across multiple machines.

Remember to always respect robots.txt files, implement appropriate rate limiting, and consider the legal and ethical implications of your scraping activities. Monitor your distributed system's performance and adjust worker counts and rate limits based on target server responses and your infrastructure capacity.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon