What is the proper way to handle large response bodies with HTTParty?
Handling large response bodies efficiently is crucial for web scraping applications that process substantial amounts of data. HTTParty, while convenient for most HTTP requests, requires specific techniques to manage memory usage and prevent timeouts when dealing with large responses. This guide covers streaming strategies, memory optimization, and best practices for processing large datasets.
Understanding the Challenge
Large response bodies can cause several issues: - Memory exhaustion: Loading entire responses into memory - Timeout errors: Long download times exceeding default limits - Performance degradation: Blocking operations affecting application responsiveness - Network interruptions: Risk of losing partial downloads
Memory-Efficient Response Handling
1. Streaming Large Files
HTTParty supports streaming responses through the :stream_body
option:
require 'httparty'
class LargeFileDownloader
include HTTParty
def self.download_large_file(url, file_path)
File.open(file_path, 'wb') do |file|
get(url, {
stream_body: true,
timeout: 300 # 5 minutes timeout
}) do |chunk|
file.write(chunk)
end
end
end
end
# Usage
begin
LargeFileDownloader.download_large_file(
'https://example.com/large-dataset.json',
'/tmp/dataset.json'
)
puts "Download completed successfully"
rescue => e
puts "Download failed: #{e.message}"
end
2. Processing Streaming JSON Data
For large JSON responses, process data incrementally:
require 'httparty'
require 'json'
class StreamingJSONProcessor
include HTTParty
def self.process_large_json(url, &block)
buffer = ""
bracket_count = 0
in_string = false
escape_next = false
get(url, stream_body: true, timeout: 600) do |chunk|
buffer += chunk
# Process complete JSON objects from buffer
process_json_objects(buffer, &block)
end
# Process any remaining data
process_remaining_buffer(buffer, &block) unless buffer.strip.empty?
end
private
def self.process_json_objects(buffer, &block)
start_pos = 0
bracket_count = 0
in_string = false
escape_next = false
buffer.each_char.with_index do |char, index|
case char
when '"'
in_string = !in_string unless escape_next
when '{'
bracket_count += 1 unless in_string
when '}'
bracket_count -= 1 unless in_string
# Complete JSON object found
if bracket_count == 0 && !in_string
json_str = buffer[start_pos..index]
begin
object = JSON.parse(json_str)
yield(object) if block_given?
rescue JSON::ParserError => e
puts "JSON parse error: #{e.message}"
end
start_pos = index + 1
end
when '\\'
escape_next = !escape_next && in_string
next
end
escape_next = false
end
# Remove processed portion from buffer
buffer.slice!(0, start_pos) if start_pos > 0
end
def self.process_remaining_buffer(buffer, &block)
begin
remaining_objects = JSON.parse(buffer)
if remaining_objects.is_a?(Array)
remaining_objects.each { |obj| yield(obj) if block_given? }
else
yield(remaining_objects) if block_given?
end
rescue JSON::ParserError
puts "Warning: Unparseable data remaining in buffer"
end
end
end
# Usage
total_processed = 0
StreamingJSONProcessor.process_large_json('https://api.example.com/large-dataset') do |record|
# Process each record individually
puts "Processing record ID: #{record['id']}"
# Store to database, transform data, etc.
total_processed += 1
# Optional: Progress reporting
puts "Processed #{total_processed} records" if total_processed % 1000 == 0
end
puts "Total records processed: #{total_processed}"
3. Chunked CSV Processing
For large CSV files, process data line by line:
require 'httparty'
require 'csv'
class StreamingCSVProcessor
include HTTParty
def self.process_large_csv(url, options = {}, &block)
buffer = ""
headers = nil
row_count = 0
csv_options = {
headers: true,
header_converters: :symbol
}.merge(options)
get(url, stream_body: true, timeout: 600) do |chunk|
buffer += chunk
# Process complete lines
while (line_end = buffer.index("\n"))
line = buffer.slice!(0, line_end + 1).chomp
if headers.nil? && csv_options[:headers]
headers = CSV.parse_line(line, csv_options)
next
end
begin
row = CSV.parse_line(line, csv_options)
next if row.nil? || row.empty?
# Convert to hash if headers are present
if headers && csv_options[:headers]
row_hash = headers.zip(row).to_h
yield(row_hash, row_count) if block_given?
else
yield(row, row_count) if block_given?
end
row_count += 1
# Progress reporting
puts "Processed #{row_count} rows" if row_count % 10000 == 0
rescue CSV::MalformedCSVError => e
puts "Malformed CSV line #{row_count}: #{e.message}"
end
end
end
# Process any remaining data in buffer
unless buffer.strip.empty?
begin
row = CSV.parse_line(buffer, csv_options)
if row && !row.empty?
if headers && csv_options[:headers]
row_hash = headers.zip(row).to_h
yield(row_hash, row_count) if block_given?
else
yield(row, row_count) if block_given?
end
row_count += 1
end
rescue CSV::MalformedCSVError => e
puts "Malformed CSV in buffer: #{e.message}"
end
end
row_count
end
end
# Usage
total_rows = StreamingCSVProcessor.process_large_csv(
'https://example.com/large-dataset.csv',
headers: true,
col_sep: ','
) do |row, index|
# Process each row
puts "Row #{index}: #{row[:name]} - #{row[:email]}"
# Example: Insert into database
# User.create!(name: row[:name], email: row[:email])
end
puts "Total rows processed: #{total_rows}"
Advanced Memory Management
1. Response Size Limiting
Prevent memory issues by limiting response size:
class SizeLimitedClient
include HTTParty
MAX_RESPONSE_SIZE = 100 * 1024 * 1024 # 100MB limit
def self.get_with_size_limit(url, options = {})
max_size = options.delete(:max_size) || MAX_RESPONSE_SIZE
downloaded_size = 0
response_data = ""
response = get(url, options.merge(stream_body: true)) do |chunk|
downloaded_size += chunk.bytesize
if downloaded_size > max_size
raise "Response size limit exceeded: #{downloaded_size} bytes > #{max_size} bytes"
end
response_data += chunk
end
# Create a mock response object with the accumulated data
MockResponse.new(response_data, response.code, response.headers)
end
class MockResponse
attr_reader :body, :code, :headers
def initialize(body, code, headers)
@body = body
@code = code
@headers = headers
end
def success?
(200..299).include?(@code)
end
end
end
# Usage
begin
response = SizeLimitedClient.get_with_size_limit(
'https://api.example.com/data',
max_size: 50 * 1024 * 1024, # 50MB limit
timeout: 300
)
puts "Response size: #{response.body.bytesize} bytes"
data = JSON.parse(response.body)
rescue => e
puts "Error: #{e.message}"
end
2. Lazy Loading with Pagination
Handle large datasets through pagination:
class PaginatedDataFetcher
include HTTParty
base_uri 'https://api.example.com'
def self.fetch_all_pages(endpoint, options = {})
page = 1
per_page = options[:per_page] || 100
total_records = 0
loop do
puts "Fetching page #{page}..."
response = get(endpoint, {
query: {
page: page,
per_page: per_page
}.merge(options[:query] || {}),
timeout: 120
})
unless response.success?
raise "API request failed: #{response.code} - #{response.message}"
end
data = JSON.parse(response.body)
records = data['data'] || data['results'] || data
break if records.empty?
# Process each record
records.each do |record|
yield(record) if block_given?
total_records += 1
end
# Check if we've reached the end
break if records.size < per_page
page += 1
# Rate limiting
sleep(0.5)
end
total_records
end
end
# Usage
total_processed = PaginatedDataFetcher.fetch_all_pages('/users', per_page: 50) do |user|
puts "Processing user: #{user['name']}"
# Process individual records without loading everything into memory
# Database.insert_user(user)
end
puts "Total users processed: #{total_processed}"
Performance Optimization
1. Parallel Processing with Concurrent Downloads
For multiple large files, use concurrent processing:
require 'httparty'
require 'concurrent'
class ConcurrentLargeFileProcessor
include HTTParty
def self.process_multiple_files(urls, max_threads: 5)
thread_pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_threads,
max_queue: urls.size
)
futures = urls.map do |url|
Concurrent::Future.execute(executor: thread_pool) do
process_single_file(url)
end
end
# Wait for all downloads to complete
results = futures.map(&:value)
thread_pool.shutdown
thread_pool.wait_for_termination
results
end
private
def self.process_single_file(url)
start_time = Time.now
total_size = 0
processed_records = 0
begin
get(url, stream_body: true, timeout: 600) do |chunk|
total_size += chunk.bytesize
# Process chunk (example: count newlines for record estimation)
processed_records += chunk.count("\n")
# Optional: Progress callback
if total_size % (1024 * 1024) == 0 # Every MB
puts "#{url}: Downloaded #{total_size / 1024 / 1024}MB"
end
end
duration = Time.now - start_time
{
url: url,
success: true,
size: total_size,
records: processed_records,
duration: duration
}
rescue => e
{
url: url,
success: false,
error: e.message,
duration: Time.now - start_time
}
end
end
end
# Usage
urls = [
'https://example.com/dataset1.json',
'https://example.com/dataset2.json',
'https://example.com/dataset3.json'
]
results = ConcurrentLargeFileProcessor.process_multiple_files(urls, max_threads: 3)
results.each do |result|
if result[:success]
puts "✓ #{result[:url]}: #{result[:size]} bytes, #{result[:records]} records (#{result[:duration].round(2)}s)"
else
puts "✗ #{result[:url]}: #{result[:error]} (#{result[:duration].round(2)}s)"
end
end
2. Progress Tracking and Resumable Downloads
Implement progress tracking for better user experience:
class ResumableDownloader
include HTTParty
def self.download_with_progress(url, file_path, options = {})
# Check if partial file exists
start_byte = File.exist?(file_path) ? File.size(file_path) : 0
downloaded = start_byte
headers = options[:headers] || {}
headers['Range'] = "bytes=#{start_byte}-" if start_byte > 0
start_time = Time.now
last_progress_time = start_time
File.open(file_path, start_byte > 0 ? 'ab' : 'wb') do |file|
response = get(url, {
headers: headers,
stream_body: true,
timeout: 600
}.merge(options)) do |chunk|
file.write(chunk)
downloaded += chunk.bytesize
# Progress reporting (every 5 seconds)
current_time = Time.now
if current_time - last_progress_time >= 5
duration = current_time - start_time
speed = downloaded / duration if duration > 0
puts "Downloaded: #{format_bytes(downloaded)} " \
"(#{format_speed(speed)}) " \
"Time: #{duration.round(2)}s"
last_progress_time = current_time
end
# Optional: Progress callback
yield(downloaded, chunk.bytesize) if block_given?
end
unless response.success?
raise "Download failed: #{response.code} - #{response.message}"
end
end
total_time = Time.now - start_time
avg_speed = downloaded / total_time if total_time > 0
{
file_path: file_path,
total_size: downloaded,
duration: total_time,
average_speed: avg_speed
}
end
private
def self.format_bytes(bytes)
units = ['B', 'KB', 'MB', 'GB']
size = bytes.to_f
unit_index = 0
while size >= 1024 && unit_index < units.length - 1
size /= 1024
unit_index += 1
end
"#{size.round(2)} #{units[unit_index]}"
end
def self.format_speed(bytes_per_second)
return "0 B/s" if bytes_per_second.nil? || bytes_per_second <= 0
"#{format_bytes(bytes_per_second)}/s"
end
end
# Usage with progress tracking
begin
result = ResumableDownloader.download_with_progress(
'https://example.com/large-file.zip',
'/tmp/large-file.zip'
) do |total_downloaded, chunk_size|
# Custom progress handling
# puts "Progress: #{total_downloaded} bytes downloaded"
end
puts "Download completed:"
puts " File: #{result[:file_path]}"
puts " Size: #{ResumableDownloader.send(:format_bytes, result[:total_size])}"
puts " Time: #{result[:duration].round(2)} seconds"
puts " Speed: #{ResumableDownloader.send(:format_speed, result[:average_speed])}"
rescue => e
puts "Download failed: #{e.message}"
end
Error Handling and Timeouts
Robust Error Handling for Large Responses
class RobustLargeFileClient
include HTTParty
def self.safe_large_request(url, options = {})
retries = 0
max_retries = options.delete(:max_retries) || 3
begin
# Extended timeout for large files
default_options = {
timeout: 1800, # 30 minutes
read_timeout: 300, # 5 minutes per chunk
open_timeout: 60 # 1 minute to establish connection
}
response = get(url, default_options.merge(options))
unless response.success?
raise "HTTP Error: #{response.code} - #{response.message}"
end
response
rescue Net::TimeoutError, Errno::ECONNRESET, Errno::ECONNREFUSED => e
retries += 1
if retries <= max_retries
wait_time = 2 ** retries
puts "Network error (attempt #{retries}/#{max_retries}): #{e.message}"
puts "Retrying in #{wait_time} seconds..."
sleep(wait_time)
retry
else
raise "Failed after #{max_retries} retries: #{e.message}"
end
rescue => e
puts "Unexpected error: #{e.class} - #{e.message}"
raise
end
end
end
Best Practices
1. Memory Management
- Use streaming for files larger than available RAM
- Process data incrementally rather than loading everything at once
- Implement size limits to prevent memory exhaustion
- Monitor memory usage during development and testing
2. Timeout Configuration
- Set appropriate timeouts based on expected file sizes
- Use different timeouts for connection establishment vs. data transfer
- Implement exponential backoff for retry attempts
3. Progress Monitoring
- Provide progress feedback for long-running downloads
- Log download statistics for performance monitoring
- Implement resumable downloads for critical large files
4. Error Recovery
- Handle network interruptions gracefully
- Implement retry logic with appropriate delays
- Validate partial downloads and resume when possible
When dealing with complex scenarios that require authentication handling or timeout management, consider implementing similar robust patterns across your entire scraping infrastructure.
Efficient handling of large response bodies is essential for building scalable web scraping applications. The techniques shown above provide a foundation for processing substantial datasets while maintaining system stability and performance.