How do I handle streaming responses in Guzzle for large data sets?
When working with large data sets through web APIs or file downloads, loading the entire response into memory can quickly exhaust your server's resources. Guzzle's streaming capabilities provide an elegant solution for processing large responses chunk by chunk, maintaining low memory usage regardless of the response size.
Understanding Guzzle Streaming
Guzzle streaming allows you to process HTTP responses as they arrive, rather than waiting for the complete response to be downloaded and stored in memory. This approach is essential when dealing with:
- Large file downloads (images, videos, archives)
- API responses with extensive data sets
- Real-time data feeds
- CSV exports from databases
- JSON responses with thousands of records
Basic Streaming Implementation
Here's how to implement basic streaming in Guzzle:
use GuzzleHttp\Client;
use GuzzleHttp\RequestOptions;
$client = new Client();
// Enable streaming by setting the stream option to true
$response = $client->get('https://api.example.com/large-dataset', [
RequestOptions::STREAM => true,
]);
// Get the response body as a stream
$stream = $response->getBody();
// Process the stream chunk by chunk
while (!$stream->eof()) {
$chunk = $stream->read(8192); // Read 8KB at a time
// Process the chunk
processChunk($chunk);
}
function processChunk($data) {
// Your processing logic here
echo "Processing " . strlen($data) . " bytes\n";
}
Streaming Large File Downloads
When downloading large files, streaming prevents memory exhaustion:
use GuzzleHttp\Client;
use GuzzleHttp\RequestOptions;
function downloadLargeFile($url, $localPath) {
$client = new Client();
// Open local file for writing
$fileHandle = fopen($localPath, 'w');
if (!$fileHandle) {
throw new Exception("Cannot open file for writing: $localPath");
}
try {
$response = $client->get($url, [
RequestOptions::STREAM => true,
RequestOptions::TIMEOUT => 300, // 5 minutes timeout
]);
$stream = $response->getBody();
while (!$stream->eof()) {
$chunk = $stream->read(8192);
fwrite($fileHandle, $chunk);
}
echo "File downloaded successfully to: $localPath\n";
} finally {
fclose($fileHandle);
}
}
// Usage
downloadLargeFile('https://example.com/large-file.zip', '/tmp/downloaded-file.zip');
Processing JSON Streaming Responses
For large JSON datasets, you can process individual records as they arrive:
use GuzzleHttp\Client;
use GuzzleHttp\RequestOptions;
function processStreamingJsonApi($url) {
$client = new Client();
$response = $client->get($url, [
RequestOptions::STREAM => true,
RequestOptions::HEADERS => [
'Accept' => 'application/json',
],
]);
$stream = $response->getBody();
$buffer = '';
$recordCount = 0;
while (!$stream->eof()) {
$chunk = $stream->read(4096);
$buffer .= $chunk;
// Process complete JSON objects
while (($pos = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 1);
if (!empty(trim($line))) {
$record = json_decode($line, true);
if (json_last_error() === JSON_ERROR_NONE) {
processRecord($record);
$recordCount++;
// Log progress every 1000 records
if ($recordCount % 1000 === 0) {
echo "Processed $recordCount records\n";
}
}
}
}
}
// Process any remaining data in buffer
if (!empty(trim($buffer))) {
$record = json_decode($buffer, true);
if (json_last_error() === JSON_ERROR_NONE) {
processRecord($record);
$recordCount++;
}
}
echo "Total records processed: $recordCount\n";
}
function processRecord($record) {
// Your record processing logic
// e.g., save to database, transform data, etc.
}
Advanced Streaming with Progress Tracking
For better user experience, implement progress tracking:
use GuzzleHttp\Client;
use GuzzleHttp\RequestOptions;
function downloadWithProgress($url, $localPath) {
$client = new Client();
$fileHandle = fopen($localPath, 'w');
$response = $client->get($url, [
RequestOptions::STREAM => true,
RequestOptions::PROGRESS => function($downloadTotal, $downloadedBytes, $uploadTotal, $uploadedBytes) {
if ($downloadTotal > 0) {
$percentage = round(($downloadedBytes / $downloadTotal) * 100, 2);
echo "\rProgress: {$percentage}% ({$downloadedBytes}/{$downloadTotal} bytes)";
}
},
]);
$stream = $response->getBody();
$totalSize = $response->getHeader('Content-Length')[0] ?? 0;
$downloadedBytes = 0;
while (!$stream->eof()) {
$chunk = $stream->read(8192);
fwrite($fileHandle, $chunk);
$downloadedBytes += strlen($chunk);
// Custom progress display
if ($totalSize > 0) {
$percentage = round(($downloadedBytes / $totalSize) * 100, 2);
echo "\rDownloaded: {$percentage}% ({$downloadedBytes}/{$totalSize} bytes)";
}
}
fclose($fileHandle);
echo "\nDownload completed!\n";
}
Memory-Efficient CSV Processing
When processing large CSV files from APIs:
use GuzzleHttp\Client;
use GuzzleHttp\RequestOptions;
function processStreamingCsv($url) {
$client = new Client();
$response = $client->get($url, [
RequestOptions::STREAM => true,
RequestOptions::HEADERS => [
'Accept' => 'text/csv',
],
]);
$stream = $response->getBody();
$buffer = '';
$headers = null;
$rowCount = 0;
while (!$stream->eof()) {
$chunk = $stream->read(4096);
$buffer .= $chunk;
// Process complete lines
while (($pos = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 1);
$csvRow = str_getcsv(trim($line));
if ($headers === null) {
$headers = $csvRow;
continue;
}
if (count($csvRow) === count($headers)) {
$record = array_combine($headers, $csvRow);
processCsvRecord($record);
$rowCount++;
if ($rowCount % 5000 === 0) {
echo "Processed $rowCount CSV rows\n";
}
}
}
}
echo "Total CSV rows processed: $rowCount\n";
}
function processCsvRecord($record) {
// Process individual CSV record
// e.g., validate, transform, store in database
}
Error Handling and Resilience
Implement robust error handling for streaming operations:
use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\RequestOptions;
function robustStreamingDownload($url, $localPath, $maxRetries = 3) {
$client = new Client();
$attempt = 0;
while ($attempt < $maxRetries) {
$attempt++;
$fileHandle = null;
try {
$fileHandle = fopen($localPath, 'w');
$response = $client->get($url, [
RequestOptions::STREAM => true,
RequestOptions::TIMEOUT => 300,
RequestOptions::CONNECT_TIMEOUT => 30,
]);
$stream = $response->getBody();
$bytesWritten = 0;
while (!$stream->eof()) {
$chunk = $stream->read(8192);
if (fwrite($fileHandle, $chunk) === false) {
throw new Exception("Failed to write to file");
}
$bytesWritten += strlen($chunk);
}
fclose($fileHandle);
echo "Successfully downloaded $bytesWritten bytes\n";
return true;
} catch (RequestException $e) {
if ($fileHandle) {
fclose($fileHandle);
}
echo "Attempt $attempt failed: " . $e->getMessage() . "\n";
if ($attempt >= $maxRetries) {
throw new Exception("Max retries exceeded: " . $e->getMessage());
}
// Wait before retrying (exponential backoff)
sleep(pow(2, $attempt - 1));
} catch (Exception $e) {
if ($fileHandle) {
fclose($fileHandle);
}
throw $e;
}
}
return false;
}
Performance Optimization Tips
- Adjust Chunk Size: Experiment with different chunk sizes (4KB to 64KB) based on your use case
- Use Appropriate Timeouts: Set reasonable timeout values for large downloads
- Implement Connection Pooling: Reuse HTTP connections when possible
- Monitor Memory Usage: Use
memory_get_usage()
to track memory consumption
// Memory monitoring during streaming
function monitorMemoryUsage() {
$memoryMB = round(memory_get_usage(true) / 1024 / 1024, 2);
echo "Current memory usage: {$memoryMB} MB\n";
}
Comparison with Alternative Approaches
While Guzzle streaming is excellent for HTTP-based data processing, consider these alternatives for specific scenarios:
- For JavaScript applications processing large datasets from single page applications, Puppeteer provides browser-based streaming capabilities
- When dealing with file downloads in browser automation, Puppeteer offers complementary download handling features
Conclusion
Guzzle's streaming capabilities provide a powerful solution for handling large data sets without exhausting server memory. By processing responses chunk by chunk, you can build scalable applications that handle massive datasets efficiently. Remember to implement proper error handling, progress tracking, and memory monitoring to ensure robust production deployments.
The key to successful streaming implementation is understanding your data format, choosing appropriate chunk sizes, and implementing resilient error handling. With these techniques, you can confidently process datasets of any size while maintaining optimal performance and resource utilization.