Table of contents

How do I handle WebSocket connections in Go scraping?

WebSocket connections provide real-time, bidirectional communication between clients and servers, making them essential for scraping dynamic content that updates continuously. In Go, handling WebSocket connections for scraping requires understanding connection lifecycle, message handling, and proper resource management.

Understanding WebSocket Scraping in Go

WebSockets maintain persistent connections that allow servers to push data to clients instantly. This is particularly useful for scraping:

  • Live chat applications
  • Real-time financial data feeds
  • Sports scores and live updates
  • Stock trading platforms
  • Social media live streams
  • Gaming platforms with real-time updates

Basic WebSocket Connection Setup

Here's how to establish a WebSocket connection using the popular gorilla/websocket library:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/gorilla/websocket"
)

type WebSocketScraper struct {
    conn     *websocket.Conn
    url      string
    headers  http.Header
    timeout  time.Duration
}

func NewWebSocketScraper(url string) *WebSocketScraper {
    return &WebSocketScraper{
        url:     url,
        headers: make(http.Header),
        timeout: 30 * time.Second,
    }
}

func (ws *WebSocketScraper) Connect(ctx context.Context) error {
    dialer := websocket.Dialer{
        HandshakeTimeout: ws.timeout,
        ReadBufferSize:   4096,
        WriteBufferSize:  4096,
    }

    conn, resp, err := dialer.DialContext(ctx, ws.url, ws.headers)
    if err != nil {
        if resp != nil {
            return fmt.Errorf("websocket connection failed: %v (status: %s)", err, resp.Status)
        }
        return fmt.Errorf("websocket connection failed: %v", err)
    }

    ws.conn = conn
    return nil
}

Advanced Connection Management

For production scraping, implement robust connection handling with reconnection logic:

import (
    "sync"
)

type ReconnectingWebSocket struct {
    *WebSocketScraper
    maxReconnectAttempts int
    reconnectDelay       time.Duration
    messageChannel       chan []byte
    errorChannel         chan error
    stopChannel          chan struct{}
    isConnected          bool
    mu                   sync.RWMutex
}

func NewReconnectingWebSocket(url string) *ReconnectingWebSocket {
    return &ReconnectingWebSocket{
        WebSocketScraper:     NewWebSocketScraper(url),
        maxReconnectAttempts: 5,
        reconnectDelay:       5 * time.Second,
        messageChannel:       make(chan []byte, 100),
        errorChannel:         make(chan error, 10),
        stopChannel:          make(chan struct{}),
    }
}

func (rws *ReconnectingWebSocket) Start(ctx context.Context) error {
    for attempt := 0; attempt < rws.maxReconnectAttempts; attempt++ {
        if err := rws.Connect(ctx); err != nil {
            log.Printf("Connection attempt %d failed: %v", attempt+1, err)
            if attempt < rws.maxReconnectAttempts-1 {
                time.Sleep(rws.reconnectDelay)
                continue
            }
            return fmt.Errorf("failed to connect after %d attempts", rws.maxReconnectAttempts)
        }

        rws.setConnected(true)
        go rws.readMessages(ctx)
        go rws.handleReconnection(ctx)
        return nil
    }
    return fmt.Errorf("all connection attempts exhausted")
}

func (rws *ReconnectingWebSocket) setConnected(connected bool) {
    rws.mu.Lock()
    defer rws.mu.Unlock()
    rws.isConnected = connected
}

func (rws *ReconnectingWebSocket) readMessages(ctx context.Context) {
    defer rws.setConnected(false)

    for {
        select {
        case <-ctx.Done():
            return
        case <-rws.stopChannel:
            return
        default:
            messageType, message, err := rws.conn.ReadMessage()
            if err != nil {
                rws.errorChannel <- err
                return
            }

            if messageType == websocket.TextMessage || messageType == websocket.BinaryMessage {
                select {
                case rws.messageChannel <- message:
                case <-ctx.Done():
                    return
                }
            }
        }
    }
}

Message Handling and Data Extraction

Implement structured message processing for different data formats:

import (
    "encoding/json"
)

type MessageHandler struct {
    processors map[string]func([]byte) error
}

func NewMessageHandler() *MessageHandler {
    return &MessageHandler{
        processors: make(map[string]func([]byte) error),
    }
}

func (mh *MessageHandler) RegisterProcessor(messageType string, processor func([]byte) error) {
    mh.processors[messageType] = processor
}

func (mh *MessageHandler) ProcessMessage(rawMessage []byte) error {
    // Try to parse as JSON to determine message type
    var envelope struct {
        Type string          `json:"type"`
        Data json.RawMessage `json:"data"`
    }

    if err := json.Unmarshal(rawMessage, &envelope); err != nil {
        // Handle non-JSON messages
        return mh.processRawMessage(rawMessage)
    }

    processor, exists := mh.processors[envelope.Type]
    if !exists {
        return fmt.Errorf("no processor for message type: %s", envelope.Type)
    }

    return processor(envelope.Data)
}

func (mh *MessageHandler) processRawMessage(data []byte) error {
    // Handle raw text or binary messages
    fmt.Printf("Raw message: %s\n", string(data))
    return nil
}

// Example: Processing stock price updates
func processStockUpdate(data []byte) error {
    var stockData struct {
        Symbol    string  `json:"symbol"`
        Price     float64 `json:"price"`
        Timestamp int64   `json:"timestamp"`
        Volume    int64   `json:"volume"`
    }

    if err := json.Unmarshal(data, &stockData); err != nil {
        return fmt.Errorf("failed to parse stock data: %v", err)
    }

    // Store or process the stock data
    fmt.Printf("Stock Update: %s - $%.2f (Volume: %d)\n", 
        stockData.Symbol, stockData.Price, stockData.Volume)

    return nil
}

Authentication and Headers

Many WebSocket endpoints require authentication. Handle this properly:

func (ws *WebSocketScraper) SetAuthToken(token string) {
    ws.headers.Set("Authorization", "Bearer "+token)
}

func (ws *WebSocketScraper) SetCustomHeaders(headers map[string]string) {
    for key, value := range headers {
        ws.headers.Set(key, value)
    }
}

func (ws *WebSocketScraper) SetUserAgent(userAgent string) {
    ws.headers.Set("User-Agent", userAgent)
}

// Example: Connecting to authenticated endpoint
func connectWithAuth(ctx context.Context, url, token string) error {
    scraper := NewWebSocketScraper(url)
    scraper.SetAuthToken(token)
    scraper.SetUserAgent("Go-WebSocket-Scraper/1.0")

    return scraper.Connect(ctx)
}

Complete Real-Time Scraping Example

Here's a comprehensive example that scrapes live cryptocurrency prices:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "os/signal"
    "strconv"
    "sync"
    "syscall"
    "time"

    "github.com/gorilla/websocket"
)

type CryptoScraper struct {
    ws           *ReconnectingWebSocket
    priceHandler *MessageHandler
    prices       map[string]float64
    mu           sync.RWMutex
}

func NewCryptoScraper() *CryptoScraper {
    ws := NewReconnectingWebSocket("wss://stream.binance.com:9443/ws/btcusdt@ticker")
    handler := NewMessageHandler()

    scraper := &CryptoScraper{
        ws:           ws,
        priceHandler: handler,
        prices:       make(map[string]float64),
    }

    // Register price update processor
    handler.RegisterProcessor("ticker", scraper.processPriceUpdate)

    return scraper
}

func (cs *CryptoScraper) processPriceUpdate(data []byte) error {
    var ticker struct {
        Symbol string `json:"s"`
        Price  string `json:"c"`
    }

    if err := json.Unmarshal(data, &ticker); err != nil {
        return err
    }

    price, err := strconv.ParseFloat(ticker.Price, 64)
    if err != nil {
        return err
    }

    cs.mu.Lock()
    cs.prices[ticker.Symbol] = price
    cs.mu.Unlock()

    fmt.Printf("Price Update: %s = $%.2f\n", ticker.Symbol, price)
    return nil
}

func (cs *CryptoScraper) Start(ctx context.Context) error {
    if err := cs.ws.Start(ctx); err != nil {
        return err
    }

    go cs.messageLoop(ctx)
    return nil
}

func (cs *CryptoScraper) messageLoop(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case message := <-cs.ws.messageChannel:
            if err := cs.priceHandler.ProcessMessage(message); err != nil {
                log.Printf("Message processing error: %v", err)
            }
        case err := <-cs.ws.errorChannel:
            log.Printf("WebSocket error: %v", err)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    scraper := NewCryptoScraper()

    if err := scraper.Start(ctx); err != nil {
        log.Fatal("Failed to start scraper:", err)
    }

    // Handle graceful shutdown
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    <-c
    log.Println("Shutting down gracefully...")
    cancel()
}

Error Handling and Recovery

Implement robust error handling for WebSocket connections:

func (rws *ReconnectingWebSocket) handleReconnection(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case err := <-rws.errorChannel:
            log.Printf("WebSocket error occurred: %v", err)
            rws.setConnected(false)

            if rws.conn != nil {
                rws.conn.Close()
            }

            // Attempt reconnection
            for attempt := 0; attempt < rws.maxReconnectAttempts; attempt++ {
                log.Printf("Attempting reconnection %d/%d", attempt+1, rws.maxReconnectAttempts)

                time.Sleep(rws.reconnectDelay)

                if err := rws.Connect(ctx); err != nil {
                    log.Printf("Reconnection attempt %d failed: %v", attempt+1, err)
                    continue
                }

                rws.setConnected(true)
                go rws.readMessages(ctx)
                log.Println("Successfully reconnected")
                break
            }
        }
    }
}

Best Practices and Performance Tips

  1. Connection Pooling: For multiple endpoints, implement connection pooling:
type ConnectionPool struct {
    connections map[string]*ReconnectingWebSocket
    mu          sync.RWMutex
}

func (cp *ConnectionPool) GetConnection(url string) *ReconnectingWebSocket {
    cp.mu.RLock()
    conn, exists := cp.connections[url]
    cp.mu.RUnlock()

    if !exists {
        cp.mu.Lock()
        conn = NewReconnectingWebSocket(url)
        cp.connections[url] = conn
        cp.mu.Unlock()
    }

    return conn
}
  1. Rate Limiting: Implement proper rate limiting for outgoing messages:
func (ws *WebSocketScraper) SendWithRateLimit(message []byte, rateLimiter *time.Ticker) error {
    <-rateLimiter.C
    return ws.conn.WriteMessage(websocket.TextMessage, message)
}
  1. Memory Management: Monitor and limit buffer sizes to prevent memory leaks.

  2. Graceful Shutdown: Always implement proper cleanup:

func (ws *WebSocketScraper) Close() error {
    if ws.conn != nil {
        return ws.conn.Close()
    }
    return nil
}

Testing WebSocket Connections

Create unit tests for your WebSocket scrapers:

func TestWebSocketConnection(t *testing.T) {
    server := httptest.NewServer(websocket.Handler(func(ws *websocket.Conn) {
        defer ws.Close()
        ws.WriteMessage(websocket.TextMessage, []byte("test message"))
    }))
    defer server.Close()

    url := "ws" + strings.TrimPrefix(server.URL, "http")
    scraper := NewWebSocketScraper(url)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    err := scraper.Connect(ctx)
    assert.NoError(t, err)
    defer scraper.Close()
}

Integration with HTTP Scraping

WebSocket scraping often complements traditional HTTP scraping. For scenarios requiring real-time data extraction alongside static content scraping, consider hybrid approaches that combine both techniques.

When dealing with complex web applications that use WebSockets for live updates, you might also need to handle dynamic content loading before establishing WebSocket connections.

Installing Dependencies

To get started with WebSocket scraping in Go, install the necessary dependencies:

go mod init websocket-scraper
go get github.com/gorilla/websocket
go get github.com/stretchr/testify/assert  # For testing

WebSocket connections in Go provide powerful capabilities for real-time data scraping. By implementing proper connection management, error handling, and message processing, you can build robust scrapers that handle live data streams effectively. Remember to respect rate limits, implement graceful shutdowns, and monitor resource usage for production deployments.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon