Table of contents

How can I handle HTTP authentication tokens and refresh cycles?

HTTP authentication tokens and refresh cycles are critical components of modern web applications and APIs. When scraping or interacting with protected resources, you need to implement robust token management to maintain authenticated sessions and handle token expiration gracefully. This guide covers comprehensive strategies for managing authentication tokens, implementing refresh mechanisms, and ensuring secure, uninterrupted data access.

Understanding Token-Based Authentication

Token-based authentication uses cryptographic tokens to verify user identity and maintain session state. The most common types include:

  • JWT (JSON Web Tokens): Self-contained tokens with encoded payload
  • Bearer tokens: Simple string tokens validated server-side
  • OAuth tokens: Access and refresh token pairs for third-party authorization
  • API keys: Long-lived tokens for service authentication

Basic Token Handling Implementation

Python Implementation with Requests

import requests
import json
import time
from datetime import datetime, timedelta
import jwt

class TokenManager:
    def __init__(self, auth_url, client_id, client_secret):
        self.auth_url = auth_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.refresh_token = None
        self.token_expires_at = None

    def authenticate(self, username, password):
        """Initial authentication to get tokens"""
        auth_data = {
            'grant_type': 'password',
            'username': username,
            'password': password,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        response = requests.post(self.auth_url, data=auth_data)
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data['access_token']
        self.refresh_token = token_data.get('refresh_token')

        # Calculate expiration time
        expires_in = token_data.get('expires_in', 3600)
        self.token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60)

        return self.access_token

    def is_token_expired(self):
        """Check if current token is expired or about to expire"""
        if not self.token_expires_at:
            return True
        return datetime.now() >= self.token_expires_at

    def refresh_access_token(self):
        """Refresh the access token using refresh token"""
        if not self.refresh_token:
            raise Exception("No refresh token available")

        refresh_data = {
            'grant_type': 'refresh_token',
            'refresh_token': self.refresh_token,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        response = requests.post(self.auth_url, data=refresh_data)
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data['access_token']

        # Update refresh token if provided
        if 'refresh_token' in token_data:
            self.refresh_token = token_data['refresh_token']

        # Update expiration time
        expires_in = token_data.get('expires_in', 3600)
        self.token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60)

        return self.access_token

    def get_valid_token(self):
        """Get a valid access token, refreshing if necessary"""
        if self.is_token_expired():
            if self.refresh_token:
                self.refresh_access_token()
            else:
                raise Exception("Token expired and no refresh token available")

        return self.access_token

    def make_authenticated_request(self, url, method='GET', **kwargs):
        """Make HTTP request with automatic token management"""
        token = self.get_valid_token()
        headers = kwargs.get('headers', {})
        headers['Authorization'] = f'Bearer {token}'
        kwargs['headers'] = headers

        response = requests.request(method, url, **kwargs)

        # Handle token expiration during request
        if response.status_code == 401:
            self.refresh_access_token()
            headers['Authorization'] = f'Bearer {self.access_token}'
            response = requests.request(method, url, **kwargs)

        return response

# Usage example
token_manager = TokenManager(
    auth_url='https://api.example.com/oauth/token',
    client_id='your_client_id',
    client_secret='your_client_secret'
)

# Initial authentication
token_manager.authenticate('username', 'password')

# Make authenticated requests
response = token_manager.make_authenticated_request('https://api.example.com/data')
print(response.json())

JavaScript Implementation with Axios

class TokenManager {
    constructor(authUrl, clientId, clientSecret) {
        this.authUrl = authUrl;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
        this.accessToken = null;
        this.refreshToken = null;
        this.tokenExpiresAt = null;
        this.refreshPromise = null;
    }

    async authenticate(username, password) {
        const authData = {
            grant_type: 'password',
            username: username,
            password: password,
            client_id: this.clientId,
            client_secret: this.clientSecret
        };

        try {
            const response = await fetch(this.authUrl, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded'
                },
                body: new URLSearchParams(authData)
            });

            if (!response.ok) {
                throw new Error(`Authentication failed: ${response.status}`);
            }

            const tokenData = await response.json();
            this.accessToken = tokenData.access_token;
            this.refreshToken = tokenData.refresh_token;

            // Calculate expiration time (subtract 60 seconds buffer)
            const expiresIn = tokenData.expires_in || 3600;
            this.tokenExpiresAt = new Date(Date.now() + (expiresIn - 60) * 1000);

            return this.accessToken;
        } catch (error) {
            console.error('Authentication error:', error);
            throw error;
        }
    }

    isTokenExpired() {
        if (!this.tokenExpiresAt) return true;
        return new Date() >= this.tokenExpiresAt;
    }

    async refreshAccessToken() {
        // Prevent multiple concurrent refresh requests
        if (this.refreshPromise) {
            return await this.refreshPromise;
        }

        if (!this.refreshToken) {
            throw new Error('No refresh token available');
        }

        this.refreshPromise = this._performTokenRefresh();

        try {
            const result = await this.refreshPromise;
            return result;
        } finally {
            this.refreshPromise = null;
        }
    }

    async _performTokenRefresh() {
        const refreshData = {
            grant_type: 'refresh_token',
            refresh_token: this.refreshToken,
            client_id: this.clientId,
            client_secret: this.clientSecret
        };

        const response = await fetch(this.authUrl, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/x-www-form-urlencoded'
            },
            body: new URLSearchParams(refreshData)
        });

        if (!response.ok) {
            throw new Error(`Token refresh failed: ${response.status}`);
        }

        const tokenData = await response.json();
        this.accessToken = tokenData.access_token;

        // Update refresh token if provided
        if (tokenData.refresh_token) {
            this.refreshToken = tokenData.refresh_token;
        }

        // Update expiration time
        const expiresIn = tokenData.expires_in || 3600;
        this.tokenExpiresAt = new Date(Date.now() + (expiresIn - 60) * 1000);

        return this.accessToken;
    }

    async getValidToken() {
        if (this.isTokenExpired()) {
            if (this.refreshToken) {
                await this.refreshAccessToken();
            } else {
                throw new Error('Token expired and no refresh token available');
            }
        }

        return this.accessToken;
    }

    async makeAuthenticatedRequest(url, options = {}) {
        const token = await this.getValidToken();

        const headers = {
            ...options.headers,
            'Authorization': `Bearer ${token}`
        };

        let response = await fetch(url, {
            ...options,
            headers
        });

        // Handle token expiration during request
        if (response.status === 401 && this.refreshToken) {
            await this.refreshAccessToken();
            headers['Authorization'] = `Bearer ${this.accessToken}`;

            response = await fetch(url, {
                ...options,
                headers
            });
        }

        return response;
    }
}

// Usage example
const tokenManager = new TokenManager(
    'https://api.example.com/oauth/token',
    'your_client_id',
    'your_client_secret'
);

// Initialize and use
(async () => {
    try {
        await tokenManager.authenticate('username', 'password');

        const response = await tokenManager.makeAuthenticatedRequest(
            'https://api.example.com/data'
        );

        const data = await response.json();
        console.log(data);
    } catch (error) {
        console.error('Request failed:', error);
    }
})();

Advanced Token Management Strategies

JWT Token Decoding and Validation

import jwt
import json
from datetime import datetime

def decode_jwt_token(token, secret=None, verify=False):
    """Decode JWT token to extract payload information"""
    try:
        # Decode without verification for inspection
        decoded = jwt.decode(token, options={"verify_signature": verify})

        # Check expiration
        if 'exp' in decoded:
            exp_timestamp = decoded['exp']
            exp_datetime = datetime.fromtimestamp(exp_timestamp)

            if datetime.now() >= exp_datetime:
                print(f"Token expired at: {exp_datetime}")
                return None

        return decoded
    except jwt.ExpiredSignatureError:
        print("Token has expired")
        return None
    except jwt.InvalidTokenError as e:
        print(f"Invalid token: {e}")
        return None

# Usage
token_payload = decode_jwt_token(access_token)
if token_payload:
    print(f"User ID: {token_payload.get('sub')}")
    print(f"Expires: {datetime.fromtimestamp(token_payload.get('exp'))}")

Secure Token Storage

import keyring
import json
from cryptography.fernet import Fernet

class SecureTokenStorage:
    def __init__(self, service_name):
        self.service_name = service_name
        self.key = self._get_or_create_key()
        self.cipher = Fernet(self.key)

    def _get_or_create_key(self):
        """Get encryption key from secure storage or create new one"""
        key = keyring.get_password(self.service_name, 'encryption_key')
        if not key:
            key = Fernet.generate_key().decode()
            keyring.set_password(self.service_name, 'encryption_key', key)
        return key.encode()

    def store_tokens(self, access_token, refresh_token=None, expires_at=None):
        """Securely store tokens"""
        token_data = {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'expires_at': expires_at.isoformat() if expires_at else None
        }

        encrypted_data = self.cipher.encrypt(json.dumps(token_data).encode())
        keyring.set_password(self.service_name, 'tokens', encrypted_data.decode())

    def load_tokens(self):
        """Load and decrypt stored tokens"""
        encrypted_data = keyring.get_password(self.service_name, 'tokens')
        if not encrypted_data:
            return None

        try:
            decrypted_data = self.cipher.decrypt(encrypted_data.encode())
            token_data = json.loads(decrypted_data.decode())

            if token_data.get('expires_at'):
                token_data['expires_at'] = datetime.fromisoformat(token_data['expires_at'])

            return token_data
        except Exception as e:
            print(f"Failed to decrypt tokens: {e}")
            return None

    def clear_tokens(self):
        """Remove stored tokens"""
        keyring.delete_password(self.service_name, 'tokens')

# Usage
storage = SecureTokenStorage('my_api_service')
storage.store_tokens(access_token, refresh_token, expires_at)
stored_tokens = storage.load_tokens()

Handling Different Authentication Flows

OAuth 2.0 Authorization Code Flow

import requests
import urllib.parse
import secrets
import hashlib
import base64

class OAuth2Manager:
    def __init__(self, client_id, client_secret, redirect_uri, auth_url, token_url):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.auth_url = auth_url
        self.token_url = token_url

    def generate_auth_url(self, scopes=None):
        """Generate authorization URL for OAuth2 flow"""
        state = secrets.token_urlsafe(32)

        # PKCE support
        code_verifier = secrets.token_urlsafe(96)
        code_challenge = base64.urlsafe_b64encode(
            hashlib.sha256(code_verifier.encode()).digest()
        ).decode().rstrip('=')

        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'state': state,
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256'
        }

        if scopes:
            params['scope'] = ' '.join(scopes)

        auth_url = f"{self.auth_url}?{urllib.parse.urlencode(params)}"

        return auth_url, state, code_verifier

    def exchange_code_for_tokens(self, authorization_code, code_verifier):
        """Exchange authorization code for access tokens"""
        token_data = {
            'grant_type': 'authorization_code',
            'code': authorization_code,
            'redirect_uri': self.redirect_uri,
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'code_verifier': code_verifier
        }

        response = requests.post(self.token_url, data=token_data)
        response.raise_for_status()

        return response.json()

Error Handling and Retry Logic

import time
import random
from functools import wraps

def retry_on_auth_failure(max_retries=3, backoff_factor=1):
    """Decorator to retry requests on authentication failures"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except requests.exceptions.HTTPError as e:
                    last_exception = e

                    if e.response.status_code == 401 and attempt < max_retries:
                        # Exponential backoff with jitter
                        delay = (backoff_factor * (2 ** attempt)) + random.uniform(0, 1)
                        print(f"Auth failure, retrying in {delay:.2f} seconds...")
                        time.sleep(delay)
                        continue
                    else:
                        raise
                except Exception as e:
                    last_exception = e
                    raise

            raise last_exception
        return wrapper
    return decorator

# Usage with token manager
class RobustTokenManager(TokenManager):
    @retry_on_auth_failure(max_retries=2)
    def make_authenticated_request(self, url, method='GET', **kwargs):
        return super().make_authenticated_request(url, method, **kwargs)

Integration with Web Scraping Tools

When implementing token authentication in web scraping scenarios, you may need to integrate with browser automation tools. Understanding how to handle authentication in Puppeteer can be crucial for scraping JavaScript-heavy applications that require authentication.

For complex authentication flows that involve multiple redirects or dynamic content loading, monitoring network requests in Puppeteer helps track token exchanges and API calls during the authentication process.

Console Commands and Testing

Testing Token Management

# Test token endpoint with curl
curl -X POST https://api.example.com/oauth/token \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=password&username=testuser&password=testpass&client_id=your_client_id&client_secret=your_secret"

# Test API with token
curl -H "Authorization: Bearer your_access_token" \
  https://api.example.com/protected-resource

# Test token refresh
curl -X POST https://api.example.com/oauth/token \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=refresh_token&refresh_token=your_refresh_token&client_id=your_client_id&client_secret=your_secret"

Environment Variable Configuration

# Set environment variables for token management
export API_CLIENT_ID="your_client_id"
export API_CLIENT_SECRET="your_client_secret"
export API_AUTH_URL="https://api.example.com/oauth/token"
export API_BASE_URL="https://api.example.com"

# Use in Python
import os
client_id = os.getenv('API_CLIENT_ID')
client_secret = os.getenv('API_CLIENT_SECRET')

Best Practices and Security Considerations

  1. Token Security: Store tokens securely using system keychains or encrypted storage
  2. Refresh Buffer: Refresh tokens 60 seconds before expiration to avoid race conditions
  3. Concurrent Requests: Implement mutex locks to prevent multiple simultaneous refresh attempts
  4. Error Handling: Gracefully handle token expiration, network failures, and invalid tokens
  5. Logging: Log authentication events while avoiding token exposure in logs
  6. Rate Limiting: Respect API rate limits during token refresh operations

Production Deployment Considerations

  • Use HTTPS for all token-related communications
  • Implement proper token rotation policies
  • Monitor token usage and expiration patterns
  • Set up alerting for authentication failures
  • Use secure configuration management for credentials
  • Implement graceful degradation when authentication services are unavailable

Proper token management ensures reliable, secure access to protected resources while maintaining good performance and user experience in your web scraping and API integration projects.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon