How do I implement graceful shutdowns in Go scraping applications?
Implementing graceful shutdowns in Go scraping applications is crucial for maintaining data integrity, properly closing connections, and ensuring smooth deployments. A graceful shutdown allows your application to complete ongoing operations, save state, and clean up resources before terminating.
Understanding Graceful Shutdowns
A graceful shutdown is a controlled termination process that: - Stops accepting new requests or tasks - Completes ongoing operations within a reasonable time limit - Properly closes database connections, file handles, and network connections - Saves application state if necessary - Releases system resources cleanly
Basic Signal Handling Implementation
The foundation of graceful shutdowns in Go is handling operating system signals, particularly SIGTERM
and SIGINT
:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type Scraper struct {
client *http.Client
workers int
shutdown chan struct{}
wg sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
}
func NewScraper(workers int) *Scraper {
ctx, cancel := context.WithCancel(context.Background())
return &Scraper{
client: &http.Client{
Timeout: 30 * time.Second,
},
workers: workers,
shutdown: make(chan struct{}),
ctx: ctx,
cancelFunc: cancel,
}
}
func (s *Scraper) Start() {
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Start worker goroutines
for i := 0; i < s.workers; i++ {
s.wg.Add(1)
go s.worker(i)
}
log.Printf("Scraper started with %d workers", s.workers)
// Wait for shutdown signal
<-sigChan
log.Println("Shutdown signal received, initiating graceful shutdown...")
s.Shutdown()
}
func (s *Scraper) Shutdown() {
// Cancel context to signal all workers to stop
s.cancelFunc()
// Close shutdown channel
close(s.shutdown)
// Wait for all workers to finish with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
log.Println("All workers stopped gracefully")
case <-time.After(30 * time.Second):
log.Println("Timeout reached, forcing shutdown")
}
}
func (s *Scraper) worker(id int) {
defer s.wg.Done()
log.Printf("Worker %d started", id)
for {
select {
case <-s.ctx.Done():
log.Printf("Worker %d received shutdown signal", id)
return
case <-s.shutdown:
log.Printf("Worker %d shutting down", id)
return
default:
// Simulate scraping work
s.scrapeURL(fmt.Sprintf("https://example.com/page-%d", id))
time.Sleep(1 * time.Second)
}
}
}
func (s *Scraper) scrapeURL(url string) {
req, err := http.NewRequestWithContext(s.ctx, "GET", url, nil)
if err != nil {
log.Printf("Error creating request: %v", err)
return
}
resp, err := s.client.Do(req)
if err != nil {
if s.ctx.Err() != nil {
log.Printf("Request cancelled due to shutdown: %v", err)
return
}
log.Printf("Error making request: %v", err)
return
}
defer resp.Body.Close()
log.Printf("Successfully scraped: %s (Status: %d)", url, resp.StatusCode)
}
func main() {
scraper := NewScraper(3)
scraper.Start()
}
Advanced Shutdown with Resource Management
For more complex scraping applications, you'll need to manage multiple resources like databases, message queues, and file handles:
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
_ "github.com/lib/pq"
)
type ScrapingApp struct {
db *sql.DB
workers []*Worker
shutdown chan struct{}
wg sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
}
type Worker struct {
id int
taskChan chan Task
done chan struct{}
}
type Task struct {
URL string
Priority int
}
func NewScrapingApp(dbURL string, workerCount int) (*ScrapingApp, error) {
ctx, cancel := context.WithCancel(context.Background())
// Initialize database connection
db, err := sql.Open("postgres", dbURL)
if err != nil {
return nil, err
}
app := &ScrapingApp{
db: db,
shutdown: make(chan struct{}),
ctx: ctx,
cancelFunc: cancel,
}
// Initialize workers
for i := 0; i < workerCount; i++ {
worker := &Worker{
id: i,
taskChan: make(chan Task, 100),
done: make(chan struct{}),
}
app.workers = append(app.workers, worker)
}
return app, nil
}
func (app *ScrapingApp) Start() error {
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
// Start all workers
for _, worker := range app.workers {
app.wg.Add(1)
go app.runWorker(worker)
}
// Start task distributor
app.wg.Add(1)
go app.distributeTask()
log.Printf("Scraping application started with %d workers", len(app.workers))
// Wait for signals
for {
sig := <-sigChan
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
log.Println("Received shutdown signal, initiating graceful shutdown...")
return app.gracefulShutdown()
case syscall.SIGHUP:
log.Println("Received SIGHUP, reloading configuration...")
// Handle configuration reload
}
}
}
func (app *ScrapingApp) gracefulShutdown() error {
log.Println("Starting graceful shutdown process...")
// Step 1: Stop accepting new tasks
app.cancelFunc()
// Step 2: Close task channels to signal workers
for _, worker := range app.workers {
close(worker.taskChan)
}
// Step 3: Wait for workers to finish current tasks
shutdownComplete := make(chan struct{})
go func() {
app.wg.Wait()
close(shutdownComplete)
}()
// Step 4: Wait with timeout
select {
case <-shutdownComplete:
log.Println("All workers completed gracefully")
case <-time.After(30 * time.Second):
log.Println("Shutdown timeout reached, forcing exit")
}
// Step 5: Close database connections
if err := app.db.Close(); err != nil {
log.Printf("Error closing database: %v", err)
}
// Step 6: Close shutdown channel
close(app.shutdown)
log.Println("Graceful shutdown completed")
return nil
}
func (app *ScrapingApp) runWorker(worker *Worker) {
defer app.wg.Done()
log.Printf("Worker %d started", worker.id)
for {
select {
case task, ok := <-worker.taskChan:
if !ok {
log.Printf("Worker %d: task channel closed, shutting down", worker.id)
return
}
app.processTask(worker.id, task)
case <-app.ctx.Done():
log.Printf("Worker %d: context cancelled, finishing current tasks", worker.id)
// Drain remaining tasks
for {
select {
case task, ok := <-worker.taskChan:
if !ok {
return
}
app.processTask(worker.id, task)
default:
return
}
}
}
}
}
func (app *ScrapingApp) processTask(workerID int, task Task) {
log.Printf("Worker %d processing task: %s", workerID, task.URL)
// Create request with context for cancellation
ctx, cancel := context.WithTimeout(app.ctx, 10*time.Second)
defer cancel()
// Simulate scraping work
time.Sleep(2 * time.Second)
// Save results to database
if err := app.saveResult(ctx, task.URL, "scraped_data"); err != nil {
log.Printf("Worker %d: error saving result: %v", workerID, err)
}
}
func (app *ScrapingApp) saveResult(ctx context.Context, url, data string) error {
query := "INSERT INTO scraping_results (url, data, created_at) VALUES ($1, $2, $3)"
_, err := app.db.ExecContext(ctx, query, url, data, time.Now())
return err
}
func (app *ScrapingApp) distributeTask() {
defer app.wg.Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
taskCounter := 0
for {
select {
case <-ticker.C:
if app.ctx.Err() != nil {
log.Println("Task distributor stopping due to context cancellation")
return
}
// Create and distribute task
task := Task{
URL: fmt.Sprintf("https://example.com/page-%d", taskCounter),
Priority: taskCounter % 3,
}
// Round-robin task distribution
workerIndex := taskCounter % len(app.workers)
select {
case app.workers[workerIndex].taskChan <- task:
taskCounter++
case <-app.ctx.Done():
return
default:
log.Printf("Worker %d queue full, skipping task", workerIndex)
}
case <-app.ctx.Done():
log.Println("Task distributor received shutdown signal")
return
}
}
}
func main() {
app, err := NewScrapingApp("postgres://user:pass@localhost/scraping_db?sslmode=disable", 5)
if err != nil {
log.Fatal(err)
}
if err := app.Start(); err != nil {
log.Fatal(err)
}
}
Shutdown Strategies and Best Practices
1. Timeout Management
Always implement timeouts to prevent indefinite blocking during shutdown:
func (app *ScrapingApp) shutdownWithTimeout(timeout time.Duration) error {
done := make(chan struct{})
go func() {
app.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(timeout):
return fmt.Errorf("shutdown timeout after %v", timeout)
}
}
2. State Persistence
Save application state during shutdown to enable recovery:
func (app *ScrapingApp) saveState() error {
state := struct {
QueuedTasks []Task `json:"queued_tasks"`
LastURL string `json:"last_url"`
Timestamp int64 `json:"timestamp"`
}{
QueuedTasks: app.getQueuedTasks(),
LastURL: app.getLastProcessedURL(),
Timestamp: time.Now().Unix(),
}
data, err := json.Marshal(state)
if err != nil {
return err
}
return ioutil.WriteFile("app_state.json", data, 0644)
}
3. Health Checks Integration
Implement health checks that respect shutdown state:
func (app *ScrapingApp) healthCheck(w http.ResponseWriter, r *http.Request) {
select {
case <-app.shutdown:
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "shutting_down",
})
return
default:
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "healthy",
"workers": fmt.Sprintf("%d", len(app.workers)),
})
}
}
Command Line Testing
Test your graceful shutdown implementation using these commands:
# Start your scraper
go run main.go &
SCRAPER_PID=$!
# Let it run for a few seconds
sleep 5
# Send graceful shutdown signal
kill -TERM $SCRAPER_PID
# Check if process exits cleanly
wait $SCRAPER_PID
echo "Exit status: $?"
Testing Graceful Shutdowns
Create comprehensive tests for your shutdown behavior:
package main
import (
"os"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGracefulShutdown(t *testing.T) {
app, err := NewScrapingApp("test_db_url", 2)
require.NoError(t, err)
// Start application in goroutine
done := make(chan error)
go func() {
done <- app.Start()
}()
// Wait for startup
time.Sleep(100 * time.Millisecond)
// Send shutdown signal
process, _ := os.FindProcess(os.Getpid())
process.Signal(syscall.SIGTERM)
// Verify graceful shutdown completes within timeout
select {
case err := <-done:
assert.NoError(t, err)
case <-time.After(5 * time.Second):
t.Fatal("Shutdown took too long")
}
}
func TestWorkerCompletion(t *testing.T) {
scraper := NewScraper(2)
// Start scraper in background
go scraper.Start()
// Wait for workers to start
time.Sleep(100 * time.Millisecond)
// Trigger shutdown
scraper.Shutdown()
// Verify all workers completed
select {
case <-time.After(1 * time.Second):
// Workers should complete quickly in test
t.Fatal("Workers did not complete in time")
default:
// Test passed
}
}
Integration with Container Orchestration
When deploying to Kubernetes or Docker Swarm, configure appropriate grace periods:
# Kubernetes deployment example
apiVersion: apps/v1
kind: Deployment
metadata:
name: go-scraper
spec:
template:
spec:
terminationGracePeriodSeconds: 45
containers:
- name: scraper
image: go-scraper:latest
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"]
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
For Docker Compose:
version: '3.8'
services:
scraper:
build: .
stop_grace_period: 30s
environment:
- GRACEFUL_TIMEOUT=25s
depends_on:
- postgres
Monitoring and Observability
Implement metrics to monitor shutdown behavior:
import "github.com/prometheus/client_golang/prometheus"
var (
shutdownDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "scraper_shutdown_duration_seconds",
Help: "Time taken for graceful shutdown",
Buckets: prometheus.DefBuckets,
})
tasksCompleted = prometheus.NewCounter(prometheus.CounterOpts{
Name: "scraper_tasks_completed_during_shutdown",
Help: "Number of tasks completed during shutdown",
})
activeWorkers = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "scraper_active_workers",
Help: "Number of active workers",
})
)
func init() {
prometheus.MustRegister(shutdownDuration)
prometheus.MustRegister(tasksCompleted)
prometheus.MustRegister(activeWorkers)
}
func (app *ScrapingApp) monitoredShutdown() error {
start := time.Now()
defer func() {
shutdownDuration.Observe(time.Since(start).Seconds())
activeWorkers.Set(0)
}()
return app.gracefulShutdown()
}
Error Handling During Shutdown
Implement proper error handling for shutdown scenarios:
func (app *ScrapingApp) robustShutdown() error {
var shutdownErrors []error
// Cancel context
app.cancelFunc()
// Close worker channels with error handling
for i, worker := range app.workers {
func() {
defer func() {
if r := recover(); r != nil {
shutdownErrors = append(shutdownErrors,
fmt.Errorf("panic closing worker %d: %v", i, r))
}
}()
close(worker.taskChan)
}()
}
// Wait for workers with timeout
done := make(chan struct{})
go func() {
defer close(done)
app.wg.Wait()
}()
select {
case <-done:
log.Println("All workers completed")
case <-time.After(30 * time.Second):
shutdownErrors = append(shutdownErrors,
fmt.Errorf("shutdown timeout exceeded"))
}
// Close database with error handling
if app.db != nil {
if err := app.db.Close(); err != nil {
shutdownErrors = append(shutdownErrors,
fmt.Errorf("database close error: %w", err))
}
}
// Return combined errors
if len(shutdownErrors) > 0 {
return fmt.Errorf("shutdown errors: %v", shutdownErrors)
}
return nil
}
Conclusion
Implementing graceful shutdowns in Go scraping applications requires careful coordination of goroutines, proper resource cleanup, and robust error handling. By using context cancellation, signal handling, and timeout management, you can ensure your applications shut down cleanly and maintain data integrity.
Key components of a successful graceful shutdown include:
- Signal handling for OS-level shutdown requests
- Context cancellation to coordinate worker termination
- Timeout management to prevent indefinite blocking
- Resource cleanup for databases, files, and network connections
- State persistence for recovery after restart
- Comprehensive testing to verify shutdown behavior
Remember to test your shutdown logic thoroughly and monitor its behavior in production environments. Proper timeout handling techniques and robust error handling practices can complement your graceful shutdown implementation for more reliable scraping applications.
For distributed scraping systems, consider implementing shutdown coordination mechanisms to ensure all nodes complete their work before the entire system shuts down. This approach ensures maximum reliability and prevents data loss during deployment or maintenance operations.