How do I handle WebSocket connections in Go scraping?
WebSocket connections provide real-time, bidirectional communication between clients and servers, making them essential for scraping dynamic content that updates continuously. In Go, handling WebSocket connections for scraping requires understanding connection lifecycle, message handling, and proper resource management.
Understanding WebSocket Scraping in Go
WebSockets maintain persistent connections that allow servers to push data to clients instantly. This is particularly useful for scraping:
- Live chat applications
- Real-time financial data feeds
- Sports scores and live updates
- Stock trading platforms
- Social media live streams
- Gaming platforms with real-time updates
Basic WebSocket Connection Setup
Here's how to establish a WebSocket connection using the popular gorilla/websocket
library:
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
type WebSocketScraper struct {
conn *websocket.Conn
url string
headers http.Header
timeout time.Duration
}
func NewWebSocketScraper(url string) *WebSocketScraper {
return &WebSocketScraper{
url: url,
headers: make(http.Header),
timeout: 30 * time.Second,
}
}
func (ws *WebSocketScraper) Connect(ctx context.Context) error {
dialer := websocket.Dialer{
HandshakeTimeout: ws.timeout,
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
conn, resp, err := dialer.DialContext(ctx, ws.url, ws.headers)
if err != nil {
if resp != nil {
return fmt.Errorf("websocket connection failed: %v (status: %s)", err, resp.Status)
}
return fmt.Errorf("websocket connection failed: %v", err)
}
ws.conn = conn
return nil
}
Advanced Connection Management
For production scraping, implement robust connection handling with reconnection logic:
import (
"sync"
)
type ReconnectingWebSocket struct {
*WebSocketScraper
maxReconnectAttempts int
reconnectDelay time.Duration
messageChannel chan []byte
errorChannel chan error
stopChannel chan struct{}
isConnected bool
mu sync.RWMutex
}
func NewReconnectingWebSocket(url string) *ReconnectingWebSocket {
return &ReconnectingWebSocket{
WebSocketScraper: NewWebSocketScraper(url),
maxReconnectAttempts: 5,
reconnectDelay: 5 * time.Second,
messageChannel: make(chan []byte, 100),
errorChannel: make(chan error, 10),
stopChannel: make(chan struct{}),
}
}
func (rws *ReconnectingWebSocket) Start(ctx context.Context) error {
for attempt := 0; attempt < rws.maxReconnectAttempts; attempt++ {
if err := rws.Connect(ctx); err != nil {
log.Printf("Connection attempt %d failed: %v", attempt+1, err)
if attempt < rws.maxReconnectAttempts-1 {
time.Sleep(rws.reconnectDelay)
continue
}
return fmt.Errorf("failed to connect after %d attempts", rws.maxReconnectAttempts)
}
rws.setConnected(true)
go rws.readMessages(ctx)
go rws.handleReconnection(ctx)
return nil
}
return fmt.Errorf("all connection attempts exhausted")
}
func (rws *ReconnectingWebSocket) setConnected(connected bool) {
rws.mu.Lock()
defer rws.mu.Unlock()
rws.isConnected = connected
}
func (rws *ReconnectingWebSocket) readMessages(ctx context.Context) {
defer rws.setConnected(false)
for {
select {
case <-ctx.Done():
return
case <-rws.stopChannel:
return
default:
messageType, message, err := rws.conn.ReadMessage()
if err != nil {
rws.errorChannel <- err
return
}
if messageType == websocket.TextMessage || messageType == websocket.BinaryMessage {
select {
case rws.messageChannel <- message:
case <-ctx.Done():
return
}
}
}
}
}
Message Handling and Data Extraction
Implement structured message processing for different data formats:
import (
"encoding/json"
)
type MessageHandler struct {
processors map[string]func([]byte) error
}
func NewMessageHandler() *MessageHandler {
return &MessageHandler{
processors: make(map[string]func([]byte) error),
}
}
func (mh *MessageHandler) RegisterProcessor(messageType string, processor func([]byte) error) {
mh.processors[messageType] = processor
}
func (mh *MessageHandler) ProcessMessage(rawMessage []byte) error {
// Try to parse as JSON to determine message type
var envelope struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}
if err := json.Unmarshal(rawMessage, &envelope); err != nil {
// Handle non-JSON messages
return mh.processRawMessage(rawMessage)
}
processor, exists := mh.processors[envelope.Type]
if !exists {
return fmt.Errorf("no processor for message type: %s", envelope.Type)
}
return processor(envelope.Data)
}
func (mh *MessageHandler) processRawMessage(data []byte) error {
// Handle raw text or binary messages
fmt.Printf("Raw message: %s\n", string(data))
return nil
}
// Example: Processing stock price updates
func processStockUpdate(data []byte) error {
var stockData struct {
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Timestamp int64 `json:"timestamp"`
Volume int64 `json:"volume"`
}
if err := json.Unmarshal(data, &stockData); err != nil {
return fmt.Errorf("failed to parse stock data: %v", err)
}
// Store or process the stock data
fmt.Printf("Stock Update: %s - $%.2f (Volume: %d)\n",
stockData.Symbol, stockData.Price, stockData.Volume)
return nil
}
Authentication and Headers
Many WebSocket endpoints require authentication. Handle this properly:
func (ws *WebSocketScraper) SetAuthToken(token string) {
ws.headers.Set("Authorization", "Bearer "+token)
}
func (ws *WebSocketScraper) SetCustomHeaders(headers map[string]string) {
for key, value := range headers {
ws.headers.Set(key, value)
}
}
func (ws *WebSocketScraper) SetUserAgent(userAgent string) {
ws.headers.Set("User-Agent", userAgent)
}
// Example: Connecting to authenticated endpoint
func connectWithAuth(ctx context.Context, url, token string) error {
scraper := NewWebSocketScraper(url)
scraper.SetAuthToken(token)
scraper.SetUserAgent("Go-WebSocket-Scraper/1.0")
return scraper.Connect(ctx)
}
Complete Real-Time Scraping Example
Here's a comprehensive example that scrapes live cryptocurrency prices:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/gorilla/websocket"
)
type CryptoScraper struct {
ws *ReconnectingWebSocket
priceHandler *MessageHandler
prices map[string]float64
mu sync.RWMutex
}
func NewCryptoScraper() *CryptoScraper {
ws := NewReconnectingWebSocket("wss://stream.binance.com:9443/ws/btcusdt@ticker")
handler := NewMessageHandler()
scraper := &CryptoScraper{
ws: ws,
priceHandler: handler,
prices: make(map[string]float64),
}
// Register price update processor
handler.RegisterProcessor("ticker", scraper.processPriceUpdate)
return scraper
}
func (cs *CryptoScraper) processPriceUpdate(data []byte) error {
var ticker struct {
Symbol string `json:"s"`
Price string `json:"c"`
}
if err := json.Unmarshal(data, &ticker); err != nil {
return err
}
price, err := strconv.ParseFloat(ticker.Price, 64)
if err != nil {
return err
}
cs.mu.Lock()
cs.prices[ticker.Symbol] = price
cs.mu.Unlock()
fmt.Printf("Price Update: %s = $%.2f\n", ticker.Symbol, price)
return nil
}
func (cs *CryptoScraper) Start(ctx context.Context) error {
if err := cs.ws.Start(ctx); err != nil {
return err
}
go cs.messageLoop(ctx)
return nil
}
func (cs *CryptoScraper) messageLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case message := <-cs.ws.messageChannel:
if err := cs.priceHandler.ProcessMessage(message); err != nil {
log.Printf("Message processing error: %v", err)
}
case err := <-cs.ws.errorChannel:
log.Printf("WebSocket error: %v", err)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scraper := NewCryptoScraper()
if err := scraper.Start(ctx); err != nil {
log.Fatal("Failed to start scraper:", err)
}
// Handle graceful shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
log.Println("Shutting down gracefully...")
cancel()
}
Error Handling and Recovery
Implement robust error handling for WebSocket connections:
func (rws *ReconnectingWebSocket) handleReconnection(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case err := <-rws.errorChannel:
log.Printf("WebSocket error occurred: %v", err)
rws.setConnected(false)
if rws.conn != nil {
rws.conn.Close()
}
// Attempt reconnection
for attempt := 0; attempt < rws.maxReconnectAttempts; attempt++ {
log.Printf("Attempting reconnection %d/%d", attempt+1, rws.maxReconnectAttempts)
time.Sleep(rws.reconnectDelay)
if err := rws.Connect(ctx); err != nil {
log.Printf("Reconnection attempt %d failed: %v", attempt+1, err)
continue
}
rws.setConnected(true)
go rws.readMessages(ctx)
log.Println("Successfully reconnected")
break
}
}
}
}
Best Practices and Performance Tips
- Connection Pooling: For multiple endpoints, implement connection pooling:
type ConnectionPool struct {
connections map[string]*ReconnectingWebSocket
mu sync.RWMutex
}
func (cp *ConnectionPool) GetConnection(url string) *ReconnectingWebSocket {
cp.mu.RLock()
conn, exists := cp.connections[url]
cp.mu.RUnlock()
if !exists {
cp.mu.Lock()
conn = NewReconnectingWebSocket(url)
cp.connections[url] = conn
cp.mu.Unlock()
}
return conn
}
- Rate Limiting: Implement proper rate limiting for outgoing messages:
func (ws *WebSocketScraper) SendWithRateLimit(message []byte, rateLimiter *time.Ticker) error {
<-rateLimiter.C
return ws.conn.WriteMessage(websocket.TextMessage, message)
}
Memory Management: Monitor and limit buffer sizes to prevent memory leaks.
Graceful Shutdown: Always implement proper cleanup:
func (ws *WebSocketScraper) Close() error {
if ws.conn != nil {
return ws.conn.Close()
}
return nil
}
Testing WebSocket Connections
Create unit tests for your WebSocket scrapers:
func TestWebSocketConnection(t *testing.T) {
server := httptest.NewServer(websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
ws.WriteMessage(websocket.TextMessage, []byte("test message"))
}))
defer server.Close()
url := "ws" + strings.TrimPrefix(server.URL, "http")
scraper := NewWebSocketScraper(url)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := scraper.Connect(ctx)
assert.NoError(t, err)
defer scraper.Close()
}
Integration with HTTP Scraping
WebSocket scraping often complements traditional HTTP scraping. For scenarios requiring real-time data extraction alongside static content scraping, consider hybrid approaches that combine both techniques.
When dealing with complex web applications that use WebSockets for live updates, you might also need to handle dynamic content loading before establishing WebSocket connections.
Installing Dependencies
To get started with WebSocket scraping in Go, install the necessary dependencies:
go mod init websocket-scraper
go get github.com/gorilla/websocket
go get github.com/stretchr/testify/assert # For testing
WebSocket connections in Go provide powerful capabilities for real-time data scraping. By implementing proper connection management, error handling, and message processing, you can build robust scrapers that handle live data streams effectively. Remember to respect rate limits, implement graceful shutdowns, and monitor resource usage for production deployments.