Table of contents

How do I handle chunked transfer encoding with urllib3?

Chunked transfer encoding is an HTTP/1.1 feature that allows servers to send data in chunks without knowing the total content length beforehand. This is particularly useful for streaming large responses, real-time data, or dynamically generated content. urllib3, a powerful HTTP library for Python, provides excellent support for handling chunked responses efficiently.

Understanding Chunked Transfer Encoding

Chunked transfer encoding breaks the response body into smaller pieces (chunks), each prefixed with its size in hexadecimal. The server sends chunks sequentially until it sends a zero-length chunk to signal the end of the response. This mechanism is essential for:

  • Streaming large files without loading them entirely into memory
  • Real-time data feeds where content length is unknown
  • Server-sent events and live data streams
  • API responses that generate content dynamically

Basic Chunked Response Handling

urllib3 automatically handles chunked transfer encoding, but you can explicitly work with chunked responses using the stream parameter:

import urllib3

# Create a PoolManager instance
http = urllib3.PoolManager()

# Make a request with streaming enabled
response = http.request('GET', 'https://api.example.com/large-dataset', 
                       preload_content=False)

# Check if response uses chunked encoding
if response.headers.get('Transfer-Encoding') == 'chunked':
    print("Response uses chunked transfer encoding")

# Read data in chunks
chunk_size = 8192
while True:
    chunk = response.read(chunk_size)
    if not chunk:
        break
    # Process chunk data
    process_chunk(chunk)

response.release_conn()

Streaming Large Responses

For large responses, streaming with chunked encoding prevents memory overflow:

import urllib3
import json

def stream_json_data(url):
    http = urllib3.PoolManager()

    # Request with streaming enabled
    response = http.request('GET', url, preload_content=False)

    try:
        # Initialize buffer for incomplete JSON objects
        buffer = ""

        for chunk in response.stream(chunk_size=1024):
            if chunk:
                # Decode chunk to string
                chunk_str = chunk.decode('utf-8')
                buffer += chunk_str

                # Process complete JSON objects
                while '\n' in buffer:
                    line, buffer = buffer.split('\n', 1)
                    if line.strip():
                        try:
                            json_obj = json.loads(line)
                            yield json_obj
                        except json.JSONDecodeError:
                            continue

    finally:
        response.release_conn()

# Usage example
for data_item in stream_json_data('https://api.example.com/stream'):
    print(f"Received: {data_item}")

Handling Chunked File Downloads

When downloading large files, chunked encoding helps manage memory usage:

import urllib3
import os

def download_chunked_file(url, filename):
    http = urllib3.PoolManager()

    response = http.request('GET', url, preload_content=False)

    try:
        total_size = 0
        chunk_size = 8192

        with open(filename, 'wb') as file:
            while True:
                chunk = response.read(chunk_size)
                if not chunk:
                    break

                file.write(chunk)
                total_size += len(chunk)

                # Optional: show progress
                print(f"\rDownloaded: {total_size} bytes", end='', flush=True)

        print(f"\nDownload complete: {filename}")
        return total_size

    except Exception as e:
        # Clean up partial file on error
        if os.path.exists(filename):
            os.remove(filename)
        raise e

    finally:
        response.release_conn()

# Download a large file
download_chunked_file('https://example.com/large-file.zip', 'download.zip')

Advanced Chunked Response Processing

For more sophisticated handling, you can implement custom chunk processors:

import urllib3
import hashlib
import time

class ChunkedProcessor:
    def __init__(self, url, chunk_size=8192):
        self.url = url
        self.chunk_size = chunk_size
        self.http = urllib3.PoolManager()
        self.total_bytes = 0
        self.start_time = time.time()
        self.hash_md5 = hashlib.md5()

    def process_with_validation(self):
        response = self.http.request('GET', self.url, preload_content=False)

        try:
            # Verify chunked encoding
            if response.headers.get('Transfer-Encoding') != 'chunked':
                print("Warning: Response doesn't use chunked encoding")

            for chunk in response.stream(self.chunk_size):
                if chunk:
                    self.total_bytes += len(chunk)
                    self.hash_md5.update(chunk)

                    # Calculate download speed
                    elapsed = time.time() - self.start_time
                    speed = self.total_bytes / elapsed if elapsed > 0 else 0

                    print(f"\rProcessed: {self.total_bytes} bytes "
                          f"({speed:.2f} bytes/sec)", end='', flush=True)

                    # Process chunk data
                    yield chunk

        finally:
            response.release_conn()

        print(f"\nMD5 Hash: {self.hash_md5.hexdigest()}")

# Usage
processor = ChunkedProcessor('https://api.example.com/data-stream')
for chunk in processor.process_with_validation():
    # Process each chunk as needed
    pass

Error Handling and Retry Logic

Robust chunked response handling includes proper error management:

import urllib3
from urllib3.exceptions import ReadTimeoutError, ProtocolError
import time

def robust_chunked_request(url, max_retries=3, timeout=30):
    http = urllib3.PoolManager()

    for attempt in range(max_retries):
        try:
            response = http.request('GET', url, 
                                  preload_content=False,
                                  timeout=timeout)

            chunks = []
            bytes_received = 0

            try:
                for chunk in response.stream(chunk_size=4096):
                    if chunk:
                        chunks.append(chunk)
                        bytes_received += len(chunk)

                        # Optional: implement max size limit
                        if bytes_received > 100 * 1024 * 1024:  # 100MB limit
                            raise ValueError("Response too large")

                # Successfully received all chunks
                return b''.join(chunks)

            except (ReadTimeoutError, ProtocolError) as e:
                print(f"Stream error on attempt {attempt + 1}: {e}")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # Exponential backoff

            finally:
                response.release_conn()

        except Exception as e:
            print(f"Request failed on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

# Usage with error handling
try:
    data = robust_chunked_request('https://api.example.com/chunked-data')
    print(f"Successfully received {len(data)} bytes")
except Exception as e:
    print(f"Failed to receive chunked data: {e}")

Performance Optimization

Optimize chunked response handling for better performance:

import urllib3
from concurrent.futures import ThreadPoolExecutor
import queue
import threading

class OptimizedChunkedReader:
    def __init__(self, url, num_workers=3):
        self.url = url
        self.http = urllib3.PoolManager()
        self.chunk_queue = queue.Queue(maxsize=50)
        self.num_workers = num_workers

    def producer(self, response):
        """Producer thread that reads chunks"""
        try:
            for chunk in response.stream(chunk_size=16384):
                if chunk:
                    self.chunk_queue.put(chunk)
            self.chunk_queue.put(None)  # Signal end
        except Exception as e:
            self.chunk_queue.put(e)
        finally:
            response.release_conn()

    def consumer(self, process_func):
        """Consumer that processes chunks"""
        while True:
            item = self.chunk_queue.get()
            if item is None:  # End signal
                break
            if isinstance(item, Exception):
                raise item
            process_func(item)
            self.chunk_queue.task_done()

    def process_async(self, process_func):
        """Process chunks asynchronously"""
        response = self.http.request('GET', self.url, preload_content=False)

        # Start producer thread
        producer_thread = threading.Thread(
            target=self.producer, 
            args=(response,)
        )
        producer_thread.start()

        # Start consumer threads
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = [
                executor.submit(self.consumer, process_func)
                for _ in range(self.num_workers)
            ]

            # Wait for producer to finish
            producer_thread.join()

            # Signal all consumers to stop
            for _ in range(self.num_workers):
                self.chunk_queue.put(None)

            # Wait for all consumers
            for future in futures:
                future.result()

# Usage example
def process_chunk_data(chunk):
    # Simulate processing time
    time.sleep(0.01)
    print(f"Processed chunk of {len(chunk)} bytes")

reader = OptimizedChunkedReader('https://api.example.com/large-stream')
reader.process_async(process_chunk_data)

Integration with Web Scraping

When handling chunked responses in web scraping scenarios, similar to how network monitoring works in browser automation, you need to manage streaming data efficiently:

import urllib3
from urllib3.response import HTTPResponse

def scrape_streaming_api(api_url, headers=None):
    """Scrape data from streaming API with chunked encoding"""
    http = urllib3.PoolManager()

    # Add custom headers for scraping
    default_headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'application/json, text/plain, */*',
        'Accept-Encoding': 'gzip, deflate',
    }

    if headers:
        default_headers.update(headers)

    response = http.request('GET', api_url, 
                           headers=default_headers,
                           preload_content=False)

    scraped_data = []

    try:
        for chunk in response.stream(chunk_size=2048):
            if chunk:
                # Parse chunk data (assuming JSON lines)
                chunk_str = chunk.decode('utf-8')
                for line in chunk_str.strip().split('\n'):
                    if line:
                        try:
                            import json
                            data = json.loads(line)
                            scraped_data.append(data)
                        except json.JSONDecodeError:
                            continue

    finally:
        response.release_conn()

    return scraped_data

Common Scenarios and Solutions

Handling Server-Sent Events (SSE)

import urllib3
import re

def handle_sse_stream(url):
    """Handle Server-Sent Events with chunked encoding"""
    http = urllib3.PoolManager()
    response = http.request('GET', url, 
                           headers={'Accept': 'text/event-stream'},
                           preload_content=False)

    buffer = ""

    try:
        for chunk in response.stream(chunk_size=1024):
            if chunk:
                buffer += chunk.decode('utf-8')

                # Process complete SSE messages
                while '\n\n' in buffer:
                    message, buffer = buffer.split('\n\n', 1)

                    # Parse SSE message
                    event_data = {}
                    for line in message.split('\n'):
                        if ':' in line:
                            key, value = line.split(':', 1)
                            event_data[key.strip()] = value.strip()

                    if 'data' in event_data:
                        yield event_data['data']

    finally:
        response.release_conn()

# Process SSE stream
for event in handle_sse_stream('https://api.example.com/events'):
    print(f"Received SSE: {event}")

Progress Tracking for Large Downloads

import urllib3
import sys

def download_with_progress(url, filename):
    """Download with progress tracking for chunked responses"""
    http = urllib3.PoolManager()
    response = http.request('GET', url, preload_content=False)

    # Try to get content length if available
    content_length = response.headers.get('Content-Length')
    total_size = int(content_length) if content_length else None

    downloaded = 0
    chunk_size = 8192

    try:
        with open(filename, 'wb') as f:
            for chunk in response.stream(chunk_size):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)

                    # Show progress
                    if total_size:
                        percent = (downloaded / total_size) * 100
                        sys.stdout.write(f"\rProgress: {percent:.1f}% "
                                       f"({downloaded}/{total_size} bytes)")
                    else:
                        sys.stdout.write(f"\rDownloaded: {downloaded} bytes")
                    sys.stdout.flush()

    finally:
        response.release_conn()
        print("\nDownload complete!")

download_with_progress('https://example.com/large-file.zip', 'download.zip')

Best Practices and Tips

  1. Always use preload_content=False when dealing with large chunked responses to avoid memory issues.

  2. Implement proper cleanup by calling response.release_conn() in a finally block.

  3. Set appropriate timeouts to handle slow or stalled connections, especially when managing connection timeouts effectively.

  4. Monitor memory usage when processing large streams, especially in long-running applications.

  5. Use appropriate chunk sizes - too small causes overhead, too large uses more memory.

  6. Implement retry logic for network failures during chunk processing.

  7. Validate data integrity using checksums when downloading files.

  8. Handle encoding properly - always decode chunks to the appropriate character encoding.

  9. Buffer incomplete data when processing structured data like JSON or XML.

  10. Use connection pooling for multiple requests to improve performance.

Troubleshooting Common Issues

Memory Leaks

# Always ensure proper cleanup
try:
    response = http.request('GET', url, preload_content=False)
    # Process chunks...
finally:
    response.release_conn()  # Critical for preventing memory leaks

Incomplete Chunks

# Buffer incomplete data properly
buffer = b""
for chunk in response.stream():
    buffer += chunk
    # Process complete messages only
    while b'\n' in buffer:
        line, buffer = buffer.split(b'\n', 1)
        process_line(line)

Connection Timeouts

# Set appropriate timeouts for chunked streams
http = urllib3.PoolManager(timeout=urllib3.Timeout(connect=5, read=30))

urllib3's chunked transfer encoding support makes it an excellent choice for handling streaming responses and large data transfers efficiently. By following these patterns and best practices, you can build robust applications that handle chunked data streams reliably while maintaining optimal performance and memory usage.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon