Table of contents

How do I implement graceful shutdowns in Go scraping applications?

Implementing graceful shutdowns in Go scraping applications is crucial for maintaining data integrity, properly closing connections, and ensuring smooth deployments. A graceful shutdown allows your application to complete ongoing operations, save state, and clean up resources before terminating.

Understanding Graceful Shutdowns

A graceful shutdown is a controlled termination process that: - Stops accepting new requests or tasks - Completes ongoing operations within a reasonable time limit - Properly closes database connections, file handles, and network connections - Saves application state if necessary - Releases system resources cleanly

Basic Signal Handling Implementation

The foundation of graceful shutdowns in Go is handling operating system signals, particularly SIGTERM and SIGINT:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type Scraper struct {
    client     *http.Client
    workers    int
    shutdown   chan struct{}
    wg         sync.WaitGroup
    ctx        context.Context
    cancelFunc context.CancelFunc
}

func NewScraper(workers int) *Scraper {
    ctx, cancel := context.WithCancel(context.Background())

    return &Scraper{
        client: &http.Client{
            Timeout: 30 * time.Second,
        },
        workers:    workers,
        shutdown:   make(chan struct{}),
        ctx:        ctx,
        cancelFunc: cancel,
    }
}

func (s *Scraper) Start() {
    // Set up signal handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Start worker goroutines
    for i := 0; i < s.workers; i++ {
        s.wg.Add(1)
        go s.worker(i)
    }

    log.Printf("Scraper started with %d workers", s.workers)

    // Wait for shutdown signal
    <-sigChan
    log.Println("Shutdown signal received, initiating graceful shutdown...")

    s.Shutdown()
}

func (s *Scraper) Shutdown() {
    // Cancel context to signal all workers to stop
    s.cancelFunc()

    // Close shutdown channel
    close(s.shutdown)

    // Wait for all workers to finish with timeout
    done := make(chan struct{})
    go func() {
        s.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Println("All workers stopped gracefully")
    case <-time.After(30 * time.Second):
        log.Println("Timeout reached, forcing shutdown")
    }
}

func (s *Scraper) worker(id int) {
    defer s.wg.Done()

    log.Printf("Worker %d started", id)

    for {
        select {
        case <-s.ctx.Done():
            log.Printf("Worker %d received shutdown signal", id)
            return
        case <-s.shutdown:
            log.Printf("Worker %d shutting down", id)
            return
        default:
            // Simulate scraping work
            s.scrapeURL(fmt.Sprintf("https://example.com/page-%d", id))
            time.Sleep(1 * time.Second)
        }
    }
}

func (s *Scraper) scrapeURL(url string) {
    req, err := http.NewRequestWithContext(s.ctx, "GET", url, nil)
    if err != nil {
        log.Printf("Error creating request: %v", err)
        return
    }

    resp, err := s.client.Do(req)
    if err != nil {
        if s.ctx.Err() != nil {
            log.Printf("Request cancelled due to shutdown: %v", err)
            return
        }
        log.Printf("Error making request: %v", err)
        return
    }
    defer resp.Body.Close()

    log.Printf("Successfully scraped: %s (Status: %d)", url, resp.StatusCode)
}

func main() {
    scraper := NewScraper(3)
    scraper.Start()
}

Advanced Shutdown with Resource Management

For more complex scraping applications, you'll need to manage multiple resources like databases, message queues, and file handles:

package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    _ "github.com/lib/pq"
)

type ScrapingApp struct {
    db         *sql.DB
    workers    []*Worker
    shutdown   chan struct{}
    wg         sync.WaitGroup
    ctx        context.Context
    cancelFunc context.CancelFunc
}

type Worker struct {
    id       int
    taskChan chan Task
    done     chan struct{}
}

type Task struct {
    URL      string
    Priority int
}

func NewScrapingApp(dbURL string, workerCount int) (*ScrapingApp, error) {
    ctx, cancel := context.WithCancel(context.Background())

    // Initialize database connection
    db, err := sql.Open("postgres", dbURL)
    if err != nil {
        return nil, err
    }

    app := &ScrapingApp{
        db:         db,
        shutdown:   make(chan struct{}),
        ctx:        ctx,
        cancelFunc: cancel,
    }

    // Initialize workers
    for i := 0; i < workerCount; i++ {
        worker := &Worker{
            id:       i,
            taskChan: make(chan Task, 100),
            done:     make(chan struct{}),
        }
        app.workers = append(app.workers, worker)
    }

    return app, nil
}

func (app *ScrapingApp) Start() error {
    // Set up signal handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

    // Start all workers
    for _, worker := range app.workers {
        app.wg.Add(1)
        go app.runWorker(worker)
    }

    // Start task distributor
    app.wg.Add(1)
    go app.distributeTask()

    log.Printf("Scraping application started with %d workers", len(app.workers))

    // Wait for signals
    for {
        sig := <-sigChan
        switch sig {
        case syscall.SIGINT, syscall.SIGTERM:
            log.Println("Received shutdown signal, initiating graceful shutdown...")
            return app.gracefulShutdown()
        case syscall.SIGHUP:
            log.Println("Received SIGHUP, reloading configuration...")
            // Handle configuration reload
        }
    }
}

func (app *ScrapingApp) gracefulShutdown() error {
    log.Println("Starting graceful shutdown process...")

    // Step 1: Stop accepting new tasks
    app.cancelFunc()

    // Step 2: Close task channels to signal workers
    for _, worker := range app.workers {
        close(worker.taskChan)
    }

    // Step 3: Wait for workers to finish current tasks
    shutdownComplete := make(chan struct{})
    go func() {
        app.wg.Wait()
        close(shutdownComplete)
    }()

    // Step 4: Wait with timeout
    select {
    case <-shutdownComplete:
        log.Println("All workers completed gracefully")
    case <-time.After(30 * time.Second):
        log.Println("Shutdown timeout reached, forcing exit")
    }

    // Step 5: Close database connections
    if err := app.db.Close(); err != nil {
        log.Printf("Error closing database: %v", err)
    }

    // Step 6: Close shutdown channel
    close(app.shutdown)

    log.Println("Graceful shutdown completed")
    return nil
}

func (app *ScrapingApp) runWorker(worker *Worker) {
    defer app.wg.Done()

    log.Printf("Worker %d started", worker.id)

    for {
        select {
        case task, ok := <-worker.taskChan:
            if !ok {
                log.Printf("Worker %d: task channel closed, shutting down", worker.id)
                return
            }

            app.processTask(worker.id, task)

        case <-app.ctx.Done():
            log.Printf("Worker %d: context cancelled, finishing current tasks", worker.id)
            // Drain remaining tasks
            for {
                select {
                case task, ok := <-worker.taskChan:
                    if !ok {
                        return
                    }
                    app.processTask(worker.id, task)
                default:
                    return
                }
            }
        }
    }
}

func (app *ScrapingApp) processTask(workerID int, task Task) {
    log.Printf("Worker %d processing task: %s", workerID, task.URL)

    // Create request with context for cancellation
    ctx, cancel := context.WithTimeout(app.ctx, 10*time.Second)
    defer cancel()

    // Simulate scraping work
    time.Sleep(2 * time.Second)

    // Save results to database
    if err := app.saveResult(ctx, task.URL, "scraped_data"); err != nil {
        log.Printf("Worker %d: error saving result: %v", workerID, err)
    }
}

func (app *ScrapingApp) saveResult(ctx context.Context, url, data string) error {
    query := "INSERT INTO scraping_results (url, data, created_at) VALUES ($1, $2, $3)"
    _, err := app.db.ExecContext(ctx, query, url, data, time.Now())
    return err
}

func (app *ScrapingApp) distributeTask() {
    defer app.wg.Done()

    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    taskCounter := 0

    for {
        select {
        case <-ticker.C:
            if app.ctx.Err() != nil {
                log.Println("Task distributor stopping due to context cancellation")
                return
            }

            // Create and distribute task
            task := Task{
                URL:      fmt.Sprintf("https://example.com/page-%d", taskCounter),
                Priority: taskCounter % 3,
            }

            // Round-robin task distribution
            workerIndex := taskCounter % len(app.workers)

            select {
            case app.workers[workerIndex].taskChan <- task:
                taskCounter++
            case <-app.ctx.Done():
                return
            default:
                log.Printf("Worker %d queue full, skipping task", workerIndex)
            }

        case <-app.ctx.Done():
            log.Println("Task distributor received shutdown signal")
            return
        }
    }
}

func main() {
    app, err := NewScrapingApp("postgres://user:pass@localhost/scraping_db?sslmode=disable", 5)
    if err != nil {
        log.Fatal(err)
    }

    if err := app.Start(); err != nil {
        log.Fatal(err)
    }
}

Shutdown Strategies and Best Practices

1. Timeout Management

Always implement timeouts to prevent indefinite blocking during shutdown:

func (app *ScrapingApp) shutdownWithTimeout(timeout time.Duration) error {
    done := make(chan struct{})

    go func() {
        app.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return nil
    case <-time.After(timeout):
        return fmt.Errorf("shutdown timeout after %v", timeout)
    }
}

2. State Persistence

Save application state during shutdown to enable recovery:

func (app *ScrapingApp) saveState() error {
    state := struct {
        QueuedTasks []Task `json:"queued_tasks"`
        LastURL     string `json:"last_url"`
        Timestamp   int64  `json:"timestamp"`
    }{
        QueuedTasks: app.getQueuedTasks(),
        LastURL:     app.getLastProcessedURL(),
        Timestamp:   time.Now().Unix(),
    }

    data, err := json.Marshal(state)
    if err != nil {
        return err
    }

    return ioutil.WriteFile("app_state.json", data, 0644)
}

3. Health Checks Integration

Implement health checks that respect shutdown state:

func (app *ScrapingApp) healthCheck(w http.ResponseWriter, r *http.Request) {
    select {
    case <-app.shutdown:
        w.WriteHeader(http.StatusServiceUnavailable)
        json.NewEncoder(w).Encode(map[string]string{
            "status": "shutting_down",
        })
        return
    default:
        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(map[string]string{
            "status": "healthy",
            "workers": fmt.Sprintf("%d", len(app.workers)),
        })
    }
}

Command Line Testing

Test your graceful shutdown implementation using these commands:

# Start your scraper
go run main.go &
SCRAPER_PID=$!

# Let it run for a few seconds
sleep 5

# Send graceful shutdown signal
kill -TERM $SCRAPER_PID

# Check if process exits cleanly
wait $SCRAPER_PID
echo "Exit status: $?"

Testing Graceful Shutdowns

Create comprehensive tests for your shutdown behavior:

package main

import (
    "os"
    "syscall"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

func TestGracefulShutdown(t *testing.T) {
    app, err := NewScrapingApp("test_db_url", 2)
    require.NoError(t, err)

    // Start application in goroutine
    done := make(chan error)
    go func() {
        done <- app.Start()
    }()

    // Wait for startup
    time.Sleep(100 * time.Millisecond)

    // Send shutdown signal
    process, _ := os.FindProcess(os.Getpid())
    process.Signal(syscall.SIGTERM)

    // Verify graceful shutdown completes within timeout
    select {
    case err := <-done:
        assert.NoError(t, err)
    case <-time.After(5 * time.Second):
        t.Fatal("Shutdown took too long")
    }
}

func TestWorkerCompletion(t *testing.T) {
    scraper := NewScraper(2)

    // Start scraper in background
    go scraper.Start()

    // Wait for workers to start
    time.Sleep(100 * time.Millisecond)

    // Trigger shutdown
    scraper.Shutdown()

    // Verify all workers completed
    select {
    case <-time.After(1 * time.Second):
        // Workers should complete quickly in test
        t.Fatal("Workers did not complete in time")
    default:
        // Test passed
    }
}

Integration with Container Orchestration

When deploying to Kubernetes or Docker Swarm, configure appropriate grace periods:

# Kubernetes deployment example
apiVersion: apps/v1
kind: Deployment
metadata:
  name: go-scraper
spec:
  template:
    spec:
      terminationGracePeriodSeconds: 45
      containers:
      - name: scraper
        image: go-scraper:latest
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 5"]
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"

For Docker Compose:

version: '3.8'
services:
  scraper:
    build: .
    stop_grace_period: 30s
    environment:
      - GRACEFUL_TIMEOUT=25s
    depends_on:
      - postgres

Monitoring and Observability

Implement metrics to monitor shutdown behavior:

import "github.com/prometheus/client_golang/prometheus"

var (
    shutdownDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name: "scraper_shutdown_duration_seconds",
        Help: "Time taken for graceful shutdown",
        Buckets: prometheus.DefBuckets,
    })

    tasksCompleted = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "scraper_tasks_completed_during_shutdown",
        Help: "Number of tasks completed during shutdown",
    })

    activeWorkers = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "scraper_active_workers",
        Help: "Number of active workers",
    })
)

func init() {
    prometheus.MustRegister(shutdownDuration)
    prometheus.MustRegister(tasksCompleted)
    prometheus.MustRegister(activeWorkers)
}

func (app *ScrapingApp) monitoredShutdown() error {
    start := time.Now()
    defer func() {
        shutdownDuration.Observe(time.Since(start).Seconds())
        activeWorkers.Set(0)
    }()

    return app.gracefulShutdown()
}

Error Handling During Shutdown

Implement proper error handling for shutdown scenarios:

func (app *ScrapingApp) robustShutdown() error {
    var shutdownErrors []error

    // Cancel context
    app.cancelFunc()

    // Close worker channels with error handling
    for i, worker := range app.workers {
        func() {
            defer func() {
                if r := recover(); r != nil {
                    shutdownErrors = append(shutdownErrors, 
                        fmt.Errorf("panic closing worker %d: %v", i, r))
                }
            }()
            close(worker.taskChan)
        }()
    }

    // Wait for workers with timeout
    done := make(chan struct{})
    go func() {
        defer close(done)
        app.wg.Wait()
    }()

    select {
    case <-done:
        log.Println("All workers completed")
    case <-time.After(30 * time.Second):
        shutdownErrors = append(shutdownErrors, 
            fmt.Errorf("shutdown timeout exceeded"))
    }

    // Close database with error handling
    if app.db != nil {
        if err := app.db.Close(); err != nil {
            shutdownErrors = append(shutdownErrors, 
                fmt.Errorf("database close error: %w", err))
        }
    }

    // Return combined errors
    if len(shutdownErrors) > 0 {
        return fmt.Errorf("shutdown errors: %v", shutdownErrors)
    }

    return nil
}

Conclusion

Implementing graceful shutdowns in Go scraping applications requires careful coordination of goroutines, proper resource cleanup, and robust error handling. By using context cancellation, signal handling, and timeout management, you can ensure your applications shut down cleanly and maintain data integrity.

Key components of a successful graceful shutdown include:

  • Signal handling for OS-level shutdown requests
  • Context cancellation to coordinate worker termination
  • Timeout management to prevent indefinite blocking
  • Resource cleanup for databases, files, and network connections
  • State persistence for recovery after restart
  • Comprehensive testing to verify shutdown behavior

Remember to test your shutdown logic thoroughly and monitor its behavior in production environments. Proper timeout handling techniques and robust error handling practices can complement your graceful shutdown implementation for more reliable scraping applications.

For distributed scraping systems, consider implementing shutdown coordination mechanisms to ensure all nodes complete their work before the entire system shuts down. This approach ensures maximum reliability and prevents data loss during deployment or maintenance operations.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon