How do I implement distributed scraping using multiple Colly instances?
Implementing distributed scraping with multiple Colly instances allows you to scale your web scraping operations across multiple processes, machines, or containers. This approach significantly improves performance for large-scale scraping projects by parallelizing work and distributing load.
Core Concepts of Distributed Scraping
Distributed scraping involves breaking down a large scraping task into smaller, manageable units that can be processed independently by multiple Colly instances. The key components include:
- Job Distribution: Mechanism to distribute URLs or tasks across multiple workers
- Coordination: Ensuring no duplicate work and proper task completion tracking
- Data Collection: Aggregating results from multiple sources
- Error Handling: Managing failures and retries across the distributed system
Basic Distributed Architecture
1. Queue-Based Distribution
The most common approach uses a message queue to distribute work:
package main
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/gocolly/colly/v2"
"github.com/gocolly/colly/v2/debug"
"github.com/gocolly/colly/v2/queue"
)
type ScrapeJob struct {
URL string `json:"url"`
Metadata map[string]string `json:"metadata"`
Priority int `json:"priority"`
}
type DistributedScraper struct {
workers int
collector *colly.Collector
jobQueue chan ScrapeJob
resultsCh chan interface{}
wg sync.WaitGroup
}
func NewDistributedScraper(workers int) *DistributedScraper {
c := colly.NewCollector(
colly.Debugger(&debug.LogDebugger{}),
colly.Async(true),
)
// Configure rate limiting
c.Limit(&colly.LimitRule{
DomainGlob: "*",
Parallelism: 2,
Delay: 1 * time.Second,
})
return &DistributedScraper{
workers: workers,
collector: c,
jobQueue: make(chan ScrapeJob, workers*10),
resultsCh: make(chan interface{}, workers*10),
}
}
func (ds *DistributedScraper) Start() {
for i := 0; i < ds.workers; i++ {
ds.wg.Add(1)
go ds.worker(i)
}
}
func (ds *DistributedScraper) worker(id int) {
defer ds.wg.Done()
c := ds.collector.Clone()
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
e.Request.Visit(link)
})
c.OnResponse(func(r *colly.Response) {
result := map[string]interface{}{
"worker_id": id,
"url": r.Request.URL.String(),
"status": r.StatusCode,
"body_size": len(r.Body),
"timestamp": time.Now(),
}
select {
case ds.resultsCh <- result:
default:
log.Printf("Worker %d: results channel full", id)
}
})
c.OnError(func(r *colly.Response, err error) {
log.Printf("Worker %d error: %v", id, err)
})
for job := range ds.jobQueue {
log.Printf("Worker %d processing: %s", id, job.URL)
c.Visit(job.URL)
c.Wait()
}
}
func (ds *DistributedScraper) AddJob(job ScrapeJob) {
select {
case ds.jobQueue <- job:
default:
log.Println("Job queue full, dropping job:", job.URL)
}
}
func (ds *DistributedScraper) Stop() {
close(ds.jobQueue)
ds.wg.Wait()
close(ds.resultsCh)
}
func main() {
scraper := NewDistributedScraper(5)
scraper.Start()
// Add jobs
urls := []string{
"https://example.com",
"https://httpbin.org",
"https://jsonplaceholder.typicode.com",
}
for _, url := range urls {
scraper.AddJob(ScrapeJob{
URL: url,
Priority: 1,
Metadata: map[string]string{"source": "main"},
})
}
// Collect results
go func() {
for result := range scraper.resultsCh {
data, _ := json.MarshalIndent(result, "", " ")
log.Printf("Result: %s", data)
}
}()
time.Sleep(30 * time.Second)
scraper.Stop()
}
2. Redis-Based Coordination
For true distributed systems across multiple machines, use Redis for coordination:
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/go-redis/redis/v8"
"github.com/gocolly/colly/v2"
)
type RedisDistributedScraper struct {
collector *colly.Collector
redisClient *redis.Client
workerID string
queueKey string
resultsKey string
}
func NewRedisDistributedScraper(redisAddr, workerID string) *RedisDistributedScraper {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
c := colly.NewCollector()
c.Async = true
c.Limit(&colly.LimitRule{
DomainGlob: "*",
Parallelism: 3,
Delay: 2 * time.Second,
})
return &RedisDistributedScraper{
collector: c,
redisClient: rdb,
workerID: workerID,
queueKey: "scraping:jobs",
resultsKey: "scraping:results",
}
}
func (rds *RedisDistributedScraper) SetupHandlers() {
rds.collector.OnHTML("title", func(e *colly.HTMLElement) {
result := map[string]interface{}{
"worker_id": rds.workerID,
"url": e.Request.URL.String(),
"title": e.Text,
"timestamp": time.Now().Unix(),
}
rds.saveResult(result)
})
rds.collector.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Request.AbsoluteURL(e.Attr("href"))
if link != "" {
rds.addJob(link)
}
})
rds.collector.OnError(func(r *colly.Response, err error) {
log.Printf("Worker %s error on %s: %v", rds.workerID, r.Request.URL, err)
// Re-queue failed job for retry
rds.addJob(r.Request.URL.String())
})
}
func (rds *RedisDistributedScraper) addJob(url string) {
ctx := context.Background()
job := map[string]interface{}{
"url": url,
"created_at": time.Now().Unix(),
"retries": 0,
}
jobData, _ := json.Marshal(job)
rds.redisClient.LPush(ctx, rds.queueKey, jobData)
}
func (rds *RedisDistributedScraper) saveResult(result map[string]interface{}) {
ctx := context.Background()
resultData, _ := json.Marshal(result)
rds.redisClient.LPush(ctx, rds.resultsKey, resultData)
}
func (rds *RedisDistributedScraper) processJobs(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
// Pop job from queue with timeout
result, err := rds.redisClient.BRPop(ctx, 5*time.Second, rds.queueKey).Result()
if err == redis.Nil {
continue // No jobs available
} else if err != nil {
log.Printf("Worker %s: Redis error: %v", rds.workerID, err)
continue
}
var job map[string]interface{}
if err := json.Unmarshal([]byte(result[1]), &job); err != nil {
log.Printf("Worker %s: Invalid job data: %v", rds.workerID, err)
continue
}
url := job["url"].(string)
log.Printf("Worker %s processing: %s", rds.workerID, url)
rds.collector.Visit(url)
rds.collector.Wait()
}
}
}
func (rds *RedisDistributedScraper) Start(ctx context.Context) {
rds.SetupHandlers()
rds.processJobs(ctx)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scraper := NewRedisDistributedScraper("localhost:6379", "worker-1")
// Add initial jobs
scraper.addJob("https://example.com")
scraper.addJob("https://httpbin.org")
scraper.Start(ctx)
}
Advanced Distributed Patterns
3. Master-Worker Architecture
Implement a master coordinator that manages multiple worker processes:
package main
import (
"context"
"log"
"sync"
"time"
"github.com/gocolly/colly/v2"
)
type Master struct {
workers []*Worker
jobQueue chan string
resultsCh chan ScrapingResult
mu sync.RWMutex
processedURLs map[string]bool
}
type Worker struct {
id int
collector *colly.Collector
master *Master
}
type ScrapingResult struct {
WorkerID int `json:"worker_id"`
URL string `json:"url"`
Title string `json:"title"`
Links []string `json:"links"`
Timestamp time.Time `json:"timestamp"`
}
func NewMaster(numWorkers int) *Master {
master := &Master{
workers: make([]*Worker, numWorkers),
jobQueue: make(chan string, numWorkers*100),
resultsCh: make(chan ScrapingResult, numWorkers*100),
processedURLs: make(map[string]bool),
}
for i := 0; i < numWorkers; i++ {
master.workers[i] = NewWorker(i, master)
}
return master
}
func NewWorker(id int, master *Master) *Worker {
c := colly.NewCollector()
c.Async = true
c.Limit(&colly.LimitRule{
DomainGlob: "*",
Parallelism: 2,
Delay: 1 * time.Second,
})
worker := &Worker{
id: id,
collector: c,
master: master,
}
worker.setupHandlers()
return worker
}
func (w *Worker) setupHandlers() {
w.collector.OnHTML("title", func(e *colly.HTMLElement) {
result := ScrapingResult{
WorkerID: w.id,
URL: e.Request.URL.String(),
Title: e.Text,
Timestamp: time.Now(),
}
// Extract links
e.ForEach("a[href]", func(i int, el *colly.HTMLElement) {
link := el.Request.AbsoluteURL(el.Attr("href"))
if link != "" {
result.Links = append(result.Links, link)
w.master.AddURL(link) // Add discovered URLs
}
})
w.master.resultsCh <- result
})
w.collector.OnError(func(r *colly.Response, err error) {
log.Printf("Worker %d error: %v", w.id, err)
})
}
func (w *Worker) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case url := <-w.master.jobQueue:
log.Printf("Worker %d processing: %s", w.id, url)
w.collector.Visit(url)
w.collector.Wait()
}
}
}()
}
func (m *Master) AddURL(url string) {
m.mu.Lock()
defer m.mu.Unlock()
if !m.processedURLs[url] {
m.processedURLs[url] = true
select {
case m.jobQueue <- url:
default:
log.Printf("Job queue full, dropping URL: %s", url)
}
}
}
func (m *Master) Start(ctx context.Context, seedURLs []string) {
// Start all workers
for _, worker := range m.workers {
worker.Start(ctx)
}
// Add seed URLs
for _, url := range seedURLs {
m.AddURL(url)
}
// Process results
go func() {
for result := range m.resultsCh {
log.Printf("Result from worker %d: %s - %s",
result.WorkerID, result.URL, result.Title)
}
}()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
master := NewMaster(3)
seedURLs := []string{
"https://example.com",
"https://httpbin.org",
}
master.Start(ctx, seedURLs)
// Wait for completion
<-ctx.Done()
log.Println("Scraping completed")
}
Deployment Strategies
Docker-Based Distribution
Create a Dockerfile for containerized deployment:
FROM golang:1.19-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o scraper ./cmd/scraper
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/scraper .
CMD ["./scraper"]
Use Docker Compose for multi-instance deployment:
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- WORKER_ID=${WORKER_ID:-worker}
- REDIS_URL=redis:6379
depends_on:
- redis
deploy:
replicas: 5
Kubernetes Deployment
For production-scale distributed scraping:
apiVersion: apps/v1
kind: Deployment
metadata:
name: colly-scraper
spec:
replicas: 10
selector:
matchLabels:
app: colly-scraper
template:
metadata:
labels:
app: colly-scraper
spec:
containers:
- name: scraper
image: your-registry/colly-scraper:latest
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: REDIS_URL
value: "redis-service:6379"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
Best Practices and Considerations
1. Load Balancing and Rate Limiting
Implement intelligent load distribution to avoid overwhelming target servers:
func (ds *DistributedScraper) setupRateLimiting() {
// Domain-specific rate limiting
ds.collector.Limit(&colly.LimitRule{
DomainGlob: "*.example.com",
Parallelism: 1,
Delay: 3 * time.Second,
})
ds.collector.Limit(&colly.LimitRule{
DomainGlob: "*",
Parallelism: 2,
Delay: 1 * time.Second,
})
}
2. Error Handling and Retry Logic
Implement robust error handling across distributed workers:
func (rds *RedisDistributedScraper) handleRetry(url string, retryCount int) {
maxRetries := 3
if retryCount < maxRetries {
job := map[string]interface{}{
"url": url,
"retries": retryCount + 1,
"created_at": time.Now().Unix(),
}
jobData, _ := json.Marshal(job)
// Add back to queue with exponential backoff
delay := time.Duration(retryCount*retryCount) * time.Second
time.AfterFunc(delay, func() {
rds.redisClient.LPush(context.Background(), rds.queueKey, jobData)
})
}
}
3. Monitoring and Metrics
Track performance across distributed instances:
type Metrics struct {
ProcessedURLs int64 `json:"processed_urls"`
SuccessfulReqs int64 `json:"successful_requests"`
FailedReqs int64 `json:"failed_requests"`
AverageRespTime time.Duration `json:"average_response_time"`
LastActivity time.Time `json:"last_activity"`
}
func (w *Worker) updateMetrics() {
w.collector.OnResponse(func(r *colly.Response) {
atomic.AddInt64(&w.metrics.ProcessedURLs, 1)
atomic.AddInt64(&w.metrics.SuccessfulReqs, 1)
w.metrics.LastActivity = time.Now()
})
}
Related Topics
For handling complex scenarios that might arise during distributed scraping, you may also want to explore how to run multiple pages in parallel with Puppeteer when JavaScript execution is required, or learn about handling browser sessions in Puppeteer for maintaining state across distributed scraping operations.
Conclusion
Implementing distributed scraping with multiple Colly instances requires careful consideration of job distribution, coordination, and error handling. The patterns shown above provide a solid foundation for building scalable web scraping systems. Start with simple queue-based distribution for single-machine scenarios, then move to Redis-based coordination for true distributed systems across multiple machines.
Remember to always respect robots.txt files, implement appropriate rate limiting, and consider the legal and ethical implications of your scraping activities. Monitor your distributed system's performance and adjust worker counts and rate limits based on target server responses and your infrastructure capacity.