How do I implement scheduled scraping tasks in Go?
Implementing scheduled scraping tasks in Go allows you to automate data collection workflows, monitor websites for changes, and gather information at regular intervals. This guide covers multiple approaches to scheduling scraping tasks, from simple time-based triggers to robust production-ready solutions.
Table of Contents
- Using time.Ticker for Simple Scheduling
- Implementing Cron-like Scheduling
- Building a Task Queue System
- Production-Ready Scheduling with External Tools
- Error Handling and Monitoring
- Best Practices
Using time.Ticker for Simple Scheduling
The most straightforward approach for scheduled scraping in Go is using time.Ticker
for regular intervals:
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/PuerkitoBio/goquery"
)
type Scraper struct {
client *http.Client
interval time.Duration
url string
}
func NewScraper(url string, interval time.Duration) *Scraper {
return &Scraper{
client: &http.Client{
Timeout: 30 * time.Second,
},
interval: interval,
url: url,
}
}
func (s *Scraper) scrapeData() error {
resp, err := s.client.Get(s.url)
if err != nil {
return fmt.Errorf("failed to fetch URL: %w", err)
}
defer resp.Body.Close()
doc, err := goquery.NewDocumentFromReader(resp.Body)
if err != nil {
return fmt.Errorf("failed to parse HTML: %w", err)
}
// Extract data
title := doc.Find("title").Text()
fmt.Printf("Scraped at %s: %s\n", time.Now().Format(time.RFC3339), title)
return nil
}
func (s *Scraper) Start(ctx context.Context) {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
// Run immediately
if err := s.scrapeData(); err != nil {
log.Printf("Scraping error: %v", err)
}
for {
select {
case <-ctx.Done():
log.Println("Scraper stopped")
return
case <-ticker.C:
if err := s.scrapeData(); err != nil {
log.Printf("Scraping error: %v", err)
}
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scraper := NewScraper("https://example.com", 5*time.Minute)
scraper.Start(ctx)
}
Implementing Cron-like Scheduling
For more complex scheduling requirements, use the robfig/cron
library to implement cron-like functionality:
go get github.com/robfig/cron/v3
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/PuerkitoBio/goquery"
"github.com/robfig/cron/v3"
)
type ScheduledScraper struct {
client *http.Client
cron *cron.Cron
jobs map[string]ScrapingJob
}
type ScrapingJob struct {
Name string
URL string
Schedule string
Handler func(data ScrapedData) error
}
type ScrapedData struct {
URL string
Title string
Timestamp time.Time
Content map[string]string
}
func NewScheduledScraper() *ScheduledScraper {
return &ScheduledScraper{
client: &http.Client{
Timeout: 30 * time.Second,
},
cron: cron.New(cron.WithSeconds()),
jobs: make(map[string]ScrapingJob),
}
}
func (ss *ScheduledScraper) AddJob(job ScrapingJob) error {
_, err := ss.cron.AddFunc(job.Schedule, func() {
if err := ss.executeJob(job); err != nil {
log.Printf("Job %s failed: %v", job.Name, err)
}
})
if err != nil {
return fmt.Errorf("failed to add job %s: %w", job.Name, err)
}
ss.jobs[job.Name] = job
log.Printf("Added job: %s with schedule: %s", job.Name, job.Schedule)
return nil
}
func (ss *ScheduledScraper) executeJob(job ScrapingJob) error {
log.Printf("Executing job: %s", job.Name)
resp, err := ss.client.Get(job.URL)
if err != nil {
return fmt.Errorf("failed to fetch %s: %w", job.URL, err)
}
defer resp.Body.Close()
doc, err := goquery.NewDocumentFromReader(resp.Body)
if err != nil {
return fmt.Errorf("failed to parse HTML: %w", err)
}
data := ScrapedData{
URL: job.URL,
Title: doc.Find("title").Text(),
Timestamp: time.Now(),
Content: make(map[string]string),
}
// Extract additional data based on job requirements
doc.Find("h1").Each(func(i int, s *goquery.Selection) {
data.Content[fmt.Sprintf("h1_%d", i)] = s.Text()
})
if job.Handler != nil {
return job.Handler(data)
}
log.Printf("Job %s completed successfully", job.Name)
return nil
}
func (ss *ScheduledScraper) Start() {
ss.cron.Start()
log.Println("Scheduler started")
}
func (ss *ScheduledScraper) Stop() {
ss.cron.Stop()
log.Println("Scheduler stopped")
}
// Example usage
func main() {
scraper := NewScheduledScraper()
// Add a job that runs every hour
newsJob := ScrapingJob{
Name: "news-scraper",
URL: "https://example-news.com",
Schedule: "0 0 * * * *", // Every hour
Handler: func(data ScrapedData) error {
// Process the scraped data
fmt.Printf("News title: %s\n", data.Title)
return nil
},
}
// Add a job that runs every day at 9 AM
dailyJob := ScrapingJob{
Name: "daily-report",
URL: "https://example-reports.com",
Schedule: "0 0 9 * * *", // Daily at 9 AM
Handler: func(data ScrapedData) error {
// Generate daily report
fmt.Printf("Daily report: %s\n", data.Title)
return nil
},
}
scraper.AddJob(newsJob)
scraper.AddJob(dailyJob)
scraper.Start()
// Keep the program running
select {}
}
Building a Task Queue System
For production environments, implement a robust task queue system with Redis or database backing:
go get github.com/go-redis/redis/v8
go get github.com/vmihailenco/taskq/v3
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/go-redis/redis/v8"
"github.com/vmihailenco/taskq/v3"
"github.com/vmihailenco/taskq/v3/redisq"
"github.com/PuerkitoBio/goquery"
)
type TaskQueueScraper struct {
queue taskq.Queue
client *http.Client
}
type ScrapeTask struct {
URL string `json:"url"`
Selectors map[string]string `json:"selectors"`
JobID string `json:"job_id"`
}
type ScrapeResult struct {
JobID string `json:"job_id"`
URL string `json:"url"`
Data map[string]string `json:"data"`
Timestamp time.Time `json:"timestamp"`
Error string `json:"error,omitempty"`
}
func NewTaskQueueScraper() *TaskQueueScraper {
// Initialize Redis connection
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Create task queue
queue := redisq.NewQueue(rdb, &taskq.QueueOptions{
Name: "scraping-queue",
Redis: rdb,
})
scraper := &TaskQueueScraper{
queue: queue,
client: &http.Client{
Timeout: 30 * time.Second,
},
}
// Register task handler
queue.RegisterHandler("scrape", scraper.handleScrapeTask)
return scraper
}
func (tqs *TaskQueueScraper) handleScrapeTask(ctx context.Context, task *ScrapeTask) error {
log.Printf("Processing scrape task: %s", task.JobID)
result := ScrapeResult{
JobID: task.JobID,
URL: task.URL,
Data: make(map[string]string),
Timestamp: time.Now(),
}
resp, err := tqs.client.Get(task.URL)
if err != nil {
result.Error = err.Error()
return tqs.saveResult(result)
}
defer resp.Body.Close()
doc, err := goquery.NewDocumentFromReader(resp.Body)
if err != nil {
result.Error = err.Error()
return tqs.saveResult(result)
}
// Extract data based on provided selectors
for key, selector := range task.Selectors {
result.Data[key] = doc.Find(selector).First().Text()
}
return tqs.saveResult(result)
}
func (tqs *TaskQueueScraper) saveResult(result ScrapeResult) error {
// Save result to database or file
resultJSON, _ := json.MarshalIndent(result, "", " ")
log.Printf("Scrape result: %s", resultJSON)
return nil
}
func (tqs *TaskQueueScraper) ScheduleTask(task ScrapeTask, delay time.Duration) error {
msg := &taskq.Message{
Type: "scrape",
Body: task,
}
if delay > 0 {
msg.Delay = delay
}
return tqs.queue.Add(msg)
}
func (tqs *TaskQueueScraper) StartWorker(ctx context.Context) error {
log.Println("Starting task queue worker")
return tqs.queue.StartConsuming(ctx)
}
// Scheduler that adds tasks periodically
func (tqs *TaskQueueScraper) StartScheduler(ctx context.Context) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Schedule periodic scraping tasks
task := ScrapeTask{
URL: "https://example.com",
JobID: fmt.Sprintf("scheduled-%d", time.Now().Unix()),
Selectors: map[string]string{
"title": "title",
"description": "meta[name=description]",
"heading": "h1",
},
}
if err := tqs.ScheduleTask(task, 0); err != nil {
log.Printf("Failed to schedule task: %v", err)
}
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scraper := NewTaskQueueScraper()
// Start worker and scheduler
go scraper.StartWorker(ctx)
go scraper.StartScheduler(ctx)
// Keep running
select {}
}
Production-Ready Scheduling with External Tools
For large-scale production environments, consider using external schedulers like Kubernetes CronJobs or traditional cron:
Kubernetes CronJob Example
apiVersion: batch/v1
kind: CronJob
metadata:
name: web-scraper
spec:
schedule: "0 */6 * * *" # Every 6 hours
jobTemplate:
spec:
template:
spec:
containers:
- name: scraper
image: your-scraper-image:latest
env:
- name: SCRAPE_URL
value: "https://example.com"
- name: OUTPUT_PATH
value: "/data/scraped"
volumeMounts:
- name: output-volume
mountPath: /data
restartPolicy: OnFailure
volumes:
- name: output-volume
persistentVolumeClaim:
claimName: scraper-pvc
Docker Compose with Cron
version: '3.8'
services:
scraper:
build: .
environment:
- CRON_SCHEDULE=0 */4 * * *
- REDIS_URL=redis://redis:6379
depends_on:
- redis
volumes:
- ./data:/app/data
redis:
image: redis:alpine
ports:
- "6379:6379"
cron-scheduler:
image: alpine:latest
command: |
sh -c "
echo '0 */4 * * * docker-compose exec scraper /app/scraper' > /var/spool/cron/crontabs/root
crond -f
"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Error Handling and Monitoring
Implement comprehensive error handling and monitoring for production scraping tasks:
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
scrapeCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "scraper_requests_total",
Help: "Total number of scraping requests",
},
[]string{"job", "status"},
)
scrapeDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "scraper_duration_seconds",
Help: "Duration of scraping requests",
},
[]string{"job"},
)
)
type MonitoredScraper struct {
client *http.Client
maxRetries int
backoff time.Duration
}
func NewMonitoredScraper() *MonitoredScraper {
return &MonitoredScraper{
client: &http.Client{
Timeout: 30 * time.Second,
},
maxRetries: 3,
backoff: time.Second * 2,
}
}
func (ms *MonitoredScraper) ScrapeWithRetry(ctx context.Context, jobName, url string) error {
timer := prometheus.NewTimer(scrapeDuration.WithLabelValues(jobName))
defer timer.ObserveDuration()
var lastErr error
for attempt := 0; attempt <= ms.maxRetries; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(ms.backoff * time.Duration(attempt)):
}
}
err := ms.scrape(url)
if err == nil {
scrapeCounter.WithLabelValues(jobName, "success").Inc()
return nil
}
lastErr = err
log.Printf("Attempt %d failed for job %s: %v", attempt+1, jobName, err)
}
scrapeCounter.WithLabelValues(jobName, "failure").Inc()
return fmt.Errorf("all %d attempts failed, last error: %w", ms.maxRetries+1, lastErr)
}
func (ms *MonitoredScraper) scrape(url string) error {
resp, err := ms.client.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// Process response...
return nil
}
Best Practices
1. Rate Limiting and Respectful Scraping
import "golang.org/x/time/rate"
type RateLimitedScraper struct {
limiter *rate.Limiter
client *http.Client
}
func NewRateLimitedScraper(requestsPerSecond float64) *RateLimitedScraper {
return &RateLimitedScraper{
limiter: rate.NewLimiter(rate.Limit(requestsPerSecond), 1),
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (rls *RateLimitedScraper) Scrape(ctx context.Context, url string) error {
// Wait for rate limiter
if err := rls.limiter.Wait(ctx); err != nil {
return err
}
// Perform scraping...
return nil
}
2. Configuration Management
type ScraperConfig struct {
Interval time.Duration `yaml:"interval"`
MaxRetries int `yaml:"max_retries"`
Timeout time.Duration `yaml:"timeout"`
RateLimit float64 `yaml:"rate_limit"`
UserAgent string `yaml:"user_agent"`
ProxyURL string `yaml:"proxy_url"`
}
func LoadConfig(path string) (*ScraperConfig, error) {
// Load configuration from YAML file
// Implementation details...
}
3. Graceful Shutdown
import (
"os"
"os/signal"
"syscall"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Handle shutdown signals
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
log.Println("Shutting down gracefully...")
cancel()
}()
scraper := NewScheduledScraper()
scraper.Start(ctx)
}
4. Database Integration
import "database/sql"
type DatabaseScraper struct {
db *sql.DB
scraper *ScheduledScraper
}
func (ds *DatabaseScraper) SaveResult(data ScrapedData) error {
query := `
INSERT INTO scraped_data (url, title, content, scraped_at)
VALUES ($1, $2, $3, $4)
`
_, err := ds.db.Exec(query, data.URL, data.Title,
data.Content, data.Timestamp)
return err
}
5. Health Checks and Monitoring
import (
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func (ss *ScheduledScraper) StartHealthServer() {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
http.Handle("/metrics", promhttp.Handler())
log.Println("Health server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Advanced Scheduling Patterns
Jittered Scheduling
Prevent thundering herd problems by adding jitter to your schedules:
import "math/rand"
func (ss *ScheduledScraper) AddJitteredJob(job ScrapingJob, maxJitter time.Duration) error {
jitter := time.Duration(rand.Int63n(int64(maxJitter)))
_, err := ss.cron.AddFunc(job.Schedule, func() {
time.Sleep(jitter)
if err := ss.executeJob(job); err != nil {
log.Printf("Job %s failed: %v", job.Name, err)
}
})
return err
}
Conditional Scheduling
Execute jobs based on conditions:
type ConditionalJob struct {
ScrapingJob
Condition func() bool
}
func (ss *ScheduledScraper) executeConditionalJob(job ConditionalJob) error {
if !job.Condition() {
log.Printf("Skipping job %s - condition not met", job.Name)
return nil
}
return ss.executeJob(job.ScrapingJob)
}
Conclusion
Implementing scheduled scraping tasks in Go offers multiple approaches depending on your requirements. For simple periodic tasks, time.Ticker
provides a straightforward solution. For complex scheduling, the robfig/cron
library offers cron-like functionality. Production environments benefit from task queue systems with Redis backing and external schedulers like Kubernetes CronJobs.
When building scheduled scrapers, always implement proper error handling, rate limiting, monitoring, and graceful shutdown mechanisms. Consider the target website's robots.txt and terms of service, and implement respectful scraping practices to avoid being blocked or causing server overload.
For more advanced scraping scenarios involving dynamic content, you might want to explore how to handle authentication in Go web scraping or learn about implementing rate limiting in Go web scraping applications to ensure your scheduled tasks run smoothly and responsibly.