How do I handle chunked transfer encoding with urllib3?
Chunked transfer encoding is an HTTP/1.1 feature that allows servers to send data in chunks without knowing the total content length beforehand. This is particularly useful for streaming large responses, real-time data, or dynamically generated content. urllib3, a powerful HTTP library for Python, provides excellent support for handling chunked responses efficiently.
Understanding Chunked Transfer Encoding
Chunked transfer encoding breaks the response body into smaller pieces (chunks), each prefixed with its size in hexadecimal. The server sends chunks sequentially until it sends a zero-length chunk to signal the end of the response. This mechanism is essential for:
- Streaming large files without loading them entirely into memory
- Real-time data feeds where content length is unknown
- Server-sent events and live data streams
- API responses that generate content dynamically
Basic Chunked Response Handling
urllib3 automatically handles chunked transfer encoding, but you can explicitly work with chunked responses using the stream
parameter:
import urllib3
# Create a PoolManager instance
http = urllib3.PoolManager()
# Make a request with streaming enabled
response = http.request('GET', 'https://api.example.com/large-dataset',
preload_content=False)
# Check if response uses chunked encoding
if response.headers.get('Transfer-Encoding') == 'chunked':
print("Response uses chunked transfer encoding")
# Read data in chunks
chunk_size = 8192
while True:
chunk = response.read(chunk_size)
if not chunk:
break
# Process chunk data
process_chunk(chunk)
response.release_conn()
Streaming Large Responses
For large responses, streaming with chunked encoding prevents memory overflow:
import urllib3
import json
def stream_json_data(url):
http = urllib3.PoolManager()
# Request with streaming enabled
response = http.request('GET', url, preload_content=False)
try:
# Initialize buffer for incomplete JSON objects
buffer = ""
for chunk in response.stream(chunk_size=1024):
if chunk:
# Decode chunk to string
chunk_str = chunk.decode('utf-8')
buffer += chunk_str
# Process complete JSON objects
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
if line.strip():
try:
json_obj = json.loads(line)
yield json_obj
except json.JSONDecodeError:
continue
finally:
response.release_conn()
# Usage example
for data_item in stream_json_data('https://api.example.com/stream'):
print(f"Received: {data_item}")
Handling Chunked File Downloads
When downloading large files, chunked encoding helps manage memory usage:
import urllib3
import os
def download_chunked_file(url, filename):
http = urllib3.PoolManager()
response = http.request('GET', url, preload_content=False)
try:
total_size = 0
chunk_size = 8192
with open(filename, 'wb') as file:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
file.write(chunk)
total_size += len(chunk)
# Optional: show progress
print(f"\rDownloaded: {total_size} bytes", end='', flush=True)
print(f"\nDownload complete: {filename}")
return total_size
except Exception as e:
# Clean up partial file on error
if os.path.exists(filename):
os.remove(filename)
raise e
finally:
response.release_conn()
# Download a large file
download_chunked_file('https://example.com/large-file.zip', 'download.zip')
Advanced Chunked Response Processing
For more sophisticated handling, you can implement custom chunk processors:
import urllib3
import hashlib
import time
class ChunkedProcessor:
def __init__(self, url, chunk_size=8192):
self.url = url
self.chunk_size = chunk_size
self.http = urllib3.PoolManager()
self.total_bytes = 0
self.start_time = time.time()
self.hash_md5 = hashlib.md5()
def process_with_validation(self):
response = self.http.request('GET', self.url, preload_content=False)
try:
# Verify chunked encoding
if response.headers.get('Transfer-Encoding') != 'chunked':
print("Warning: Response doesn't use chunked encoding")
for chunk in response.stream(self.chunk_size):
if chunk:
self.total_bytes += len(chunk)
self.hash_md5.update(chunk)
# Calculate download speed
elapsed = time.time() - self.start_time
speed = self.total_bytes / elapsed if elapsed > 0 else 0
print(f"\rProcessed: {self.total_bytes} bytes "
f"({speed:.2f} bytes/sec)", end='', flush=True)
# Process chunk data
yield chunk
finally:
response.release_conn()
print(f"\nMD5 Hash: {self.hash_md5.hexdigest()}")
# Usage
processor = ChunkedProcessor('https://api.example.com/data-stream')
for chunk in processor.process_with_validation():
# Process each chunk as needed
pass
Error Handling and Retry Logic
Robust chunked response handling includes proper error management:
import urllib3
from urllib3.exceptions import ReadTimeoutError, ProtocolError
import time
def robust_chunked_request(url, max_retries=3, timeout=30):
http = urllib3.PoolManager()
for attempt in range(max_retries):
try:
response = http.request('GET', url,
preload_content=False,
timeout=timeout)
chunks = []
bytes_received = 0
try:
for chunk in response.stream(chunk_size=4096):
if chunk:
chunks.append(chunk)
bytes_received += len(chunk)
# Optional: implement max size limit
if bytes_received > 100 * 1024 * 1024: # 100MB limit
raise ValueError("Response too large")
# Successfully received all chunks
return b''.join(chunks)
except (ReadTimeoutError, ProtocolError) as e:
print(f"Stream error on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
finally:
response.release_conn()
except Exception as e:
print(f"Request failed on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
# Usage with error handling
try:
data = robust_chunked_request('https://api.example.com/chunked-data')
print(f"Successfully received {len(data)} bytes")
except Exception as e:
print(f"Failed to receive chunked data: {e}")
Performance Optimization
Optimize chunked response handling for better performance:
import urllib3
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
class OptimizedChunkedReader:
def __init__(self, url, num_workers=3):
self.url = url
self.http = urllib3.PoolManager()
self.chunk_queue = queue.Queue(maxsize=50)
self.num_workers = num_workers
def producer(self, response):
"""Producer thread that reads chunks"""
try:
for chunk in response.stream(chunk_size=16384):
if chunk:
self.chunk_queue.put(chunk)
self.chunk_queue.put(None) # Signal end
except Exception as e:
self.chunk_queue.put(e)
finally:
response.release_conn()
def consumer(self, process_func):
"""Consumer that processes chunks"""
while True:
item = self.chunk_queue.get()
if item is None: # End signal
break
if isinstance(item, Exception):
raise item
process_func(item)
self.chunk_queue.task_done()
def process_async(self, process_func):
"""Process chunks asynchronously"""
response = self.http.request('GET', self.url, preload_content=False)
# Start producer thread
producer_thread = threading.Thread(
target=self.producer,
args=(response,)
)
producer_thread.start()
# Start consumer threads
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = [
executor.submit(self.consumer, process_func)
for _ in range(self.num_workers)
]
# Wait for producer to finish
producer_thread.join()
# Signal all consumers to stop
for _ in range(self.num_workers):
self.chunk_queue.put(None)
# Wait for all consumers
for future in futures:
future.result()
# Usage example
def process_chunk_data(chunk):
# Simulate processing time
time.sleep(0.01)
print(f"Processed chunk of {len(chunk)} bytes")
reader = OptimizedChunkedReader('https://api.example.com/large-stream')
reader.process_async(process_chunk_data)
Integration with Web Scraping
When handling chunked responses in web scraping scenarios, similar to how network monitoring works in browser automation, you need to manage streaming data efficiently:
import urllib3
from urllib3.response import HTTPResponse
def scrape_streaming_api(api_url, headers=None):
"""Scrape data from streaming API with chunked encoding"""
http = urllib3.PoolManager()
# Add custom headers for scraping
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate',
}
if headers:
default_headers.update(headers)
response = http.request('GET', api_url,
headers=default_headers,
preload_content=False)
scraped_data = []
try:
for chunk in response.stream(chunk_size=2048):
if chunk:
# Parse chunk data (assuming JSON lines)
chunk_str = chunk.decode('utf-8')
for line in chunk_str.strip().split('\n'):
if line:
try:
import json
data = json.loads(line)
scraped_data.append(data)
except json.JSONDecodeError:
continue
finally:
response.release_conn()
return scraped_data
Common Scenarios and Solutions
Handling Server-Sent Events (SSE)
import urllib3
import re
def handle_sse_stream(url):
"""Handle Server-Sent Events with chunked encoding"""
http = urllib3.PoolManager()
response = http.request('GET', url,
headers={'Accept': 'text/event-stream'},
preload_content=False)
buffer = ""
try:
for chunk in response.stream(chunk_size=1024):
if chunk:
buffer += chunk.decode('utf-8')
# Process complete SSE messages
while '\n\n' in buffer:
message, buffer = buffer.split('\n\n', 1)
# Parse SSE message
event_data = {}
for line in message.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
event_data[key.strip()] = value.strip()
if 'data' in event_data:
yield event_data['data']
finally:
response.release_conn()
# Process SSE stream
for event in handle_sse_stream('https://api.example.com/events'):
print(f"Received SSE: {event}")
Progress Tracking for Large Downloads
import urllib3
import sys
def download_with_progress(url, filename):
"""Download with progress tracking for chunked responses"""
http = urllib3.PoolManager()
response = http.request('GET', url, preload_content=False)
# Try to get content length if available
content_length = response.headers.get('Content-Length')
total_size = int(content_length) if content_length else None
downloaded = 0
chunk_size = 8192
try:
with open(filename, 'wb') as f:
for chunk in response.stream(chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Show progress
if total_size:
percent = (downloaded / total_size) * 100
sys.stdout.write(f"\rProgress: {percent:.1f}% "
f"({downloaded}/{total_size} bytes)")
else:
sys.stdout.write(f"\rDownloaded: {downloaded} bytes")
sys.stdout.flush()
finally:
response.release_conn()
print("\nDownload complete!")
download_with_progress('https://example.com/large-file.zip', 'download.zip')
Best Practices and Tips
Always use
preload_content=False
when dealing with large chunked responses to avoid memory issues.Implement proper cleanup by calling
response.release_conn()
in a finally block.Set appropriate timeouts to handle slow or stalled connections, especially when managing connection timeouts effectively.
Monitor memory usage when processing large streams, especially in long-running applications.
Use appropriate chunk sizes - too small causes overhead, too large uses more memory.
Implement retry logic for network failures during chunk processing.
Validate data integrity using checksums when downloading files.
Handle encoding properly - always decode chunks to the appropriate character encoding.
Buffer incomplete data when processing structured data like JSON or XML.
Use connection pooling for multiple requests to improve performance.
Troubleshooting Common Issues
Memory Leaks
# Always ensure proper cleanup
try:
response = http.request('GET', url, preload_content=False)
# Process chunks...
finally:
response.release_conn() # Critical for preventing memory leaks
Incomplete Chunks
# Buffer incomplete data properly
buffer = b""
for chunk in response.stream():
buffer += chunk
# Process complete messages only
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
process_line(line)
Connection Timeouts
# Set appropriate timeouts for chunked streams
http = urllib3.PoolManager(timeout=urllib3.Timeout(connect=5, read=30))
urllib3's chunked transfer encoding support makes it an excellent choice for handling streaming responses and large data transfers efficiently. By following these patterns and best practices, you can build robust applications that handle chunked data streams reliably while maintaining optimal performance and memory usage.