How do I implement distributed scraping across multiple devices with Alamofire?
Implementing distributed scraping across multiple devices with Alamofire requires careful architecture design to coordinate work across iOS/macOS devices, manage shared state, and ensure efficient resource utilization. This approach allows you to scale your scraping operations horizontally and distribute the workload across multiple devices.
Core Architecture Components
1. Central Coordinator Service
Create a central service to manage work distribution and coordination:
import Foundation
import Alamofire
class DistributedScrapingCoordinator {
private let baseURL: String
private let deviceID: String
private let session: Session
init(baseURL: String, deviceID: String = UIDevice.current.identifierForVendor?.uuidString ?? UUID().uuidString) {
self.baseURL = baseURL
self.deviceID = deviceID
let configuration = URLSessionConfiguration.default
configuration.timeoutIntervalForRequest = 30
configuration.timeoutIntervalForResource = 300
self.session = Session(configuration: configuration)
}
func requestWork() async throws -> WorkBatch? {
let url = "\(baseURL)/api/work/request"
let parameters: [String: Any] = [
"device_id": deviceID,
"capabilities": getDeviceCapabilities()
]
return try await withCheckedThrowingContinuation { continuation in
session.request(url, method: .post, parameters: parameters, encoding: JSONEncoding.default)
.validate()
.responseDecodable(of: WorkBatch.self) { response in
switch response.result {
case .success(let workBatch):
continuation.resume(returning: workBatch)
case .failure(let error):
continuation.resume(throwing: error)
}
}
}
}
func submitResults(_ results: [ScrapingResult], for batchID: String) async throws {
let url = "\(baseURL)/api/work/submit"
let parameters: [String: Any] = [
"device_id": deviceID,
"batch_id": batchID,
"results": results.map { $0.toDictionary() }
]
return try await withCheckedThrowingContinuation { continuation in
session.request(url, method: .post, parameters: parameters, encoding: JSONEncoding.default)
.validate()
.response { response in
if let error = response.error {
continuation.resume(throwing: error)
} else {
continuation.resume()
}
}
}
}
private func getDeviceCapabilities() -> [String: Any] {
return [
"concurrent_requests": ProcessInfo.processInfo.processorCount,
"memory_available": getAvailableMemory(),
"network_type": getNetworkType()
]
}
}
2. Work Distribution Models
Define data structures for work distribution:
struct WorkBatch: Codable {
let id: String
let urls: [String]
let priority: Int
let configuration: ScrapingConfiguration
let estimatedDuration: TimeInterval
}
struct ScrapingConfiguration: Codable {
let userAgent: String?
let headers: [String: String]
let timeout: TimeInterval
let retryCount: Int
let delayBetweenRequests: TimeInterval
}
struct ScrapingResult: Codable {
let url: String
let statusCode: Int
let data: Data?
let error: String?
let timestamp: Date
let processingTime: TimeInterval
func toDictionary() -> [String: Any] {
return [
"url": url,
"status_code": statusCode,
"data": data?.base64EncodedString() ?? "",
"error": error ?? "",
"timestamp": timestamp.timeIntervalSince1970,
"processing_time": processingTime
]
}
}
Distributed Scraping Manager
3. Main Scraping Engine
Implement the core scraping logic with proper coordination:
class DistributedScraper {
private let coordinator: DistributedScrapingCoordinator
private let maxConcurrentRequests: Int
private let requestQueue: OperationQueue
private var isRunning = false
init(coordinator: DistributedScrapingCoordinator, maxConcurrentRequests: Int = 5) {
self.coordinator = coordinator
self.maxConcurrentRequests = maxConcurrentRequests
self.requestQueue = OperationQueue()
self.requestQueue.maxConcurrentOperationCount = maxConcurrentRequests
}
func startScraping() async {
isRunning = true
while isRunning {
do {
guard let workBatch = try await coordinator.requestWork() else {
// No work available, wait before checking again
try await Task.sleep(nanoseconds: 5_000_000_000) // 5 seconds
continue
}
let results = await processWorkBatch(workBatch)
try await coordinator.submitResults(results, for: workBatch.id)
} catch {
print("Error in scraping cycle: \(error)")
try await Task.sleep(nanoseconds: 10_000_000_000) // 10 seconds
}
}
}
func stopScraping() {
isRunning = false
requestQueue.cancelAllOperations()
}
private func processWorkBatch(_ batch: WorkBatch) async -> [ScrapingResult] {
return await withTaskGroup(of: ScrapingResult.self) { group in
var results: [ScrapingResult] = []
for url in batch.urls {
group.addTask {
return await self.scrapeURL(url, configuration: batch.configuration)
}
}
for await result in group {
results.append(result)
}
return results
}
}
private func scrapeURL(_ urlString: String, configuration: ScrapingConfiguration) async -> ScrapingResult {
let startTime = Date()
guard let url = URL(string: urlString) else {
return ScrapingResult(
url: urlString,
statusCode: -1,
data: nil,
error: "Invalid URL",
timestamp: Date(),
processingTime: Date().timeIntervalSince(startTime)
)
}
var request = URLRequest(url: url)
request.timeoutInterval = configuration.timeout
// Apply configuration
if let userAgent = configuration.userAgent {
request.setValue(userAgent, forHTTPHeaderField: "User-Agent")
}
for (key, value) in configuration.headers {
request.setValue(value, forHTTPHeaderField: key)
}
do {
let (data, response) = try await URLSession.shared.data(for: request)
let httpResponse = response as? HTTPURLResponse
return ScrapingResult(
url: urlString,
statusCode: httpResponse?.statusCode ?? 200,
data: data,
error: nil,
timestamp: Date(),
processingTime: Date().timeIntervalSince(startTime)
)
} catch {
return ScrapingResult(
url: urlString,
statusCode: -1,
data: nil,
error: error.localizedDescription,
timestamp: Date(),
processingTime: Date().timeIntervalSince(startTime)
)
}
}
}
Server-Side Coordination
4. Backend API (Python/Django Example)
Create a backend service to coordinate work distribution:
# models.py
from django.db import models
import uuid
class WorkBatch(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
urls = models.JSONField()
priority = models.IntegerField(default=0)
configuration = models.JSONField()
status = models.CharField(max_length=20, choices=[
('pending', 'Pending'),
('assigned', 'Assigned'),
('completed', 'Completed'),
('failed', 'Failed')
], default='pending')
assigned_device = models.CharField(max_length=100, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
class ScrapingDevice(models.Model):
device_id = models.CharField(max_length=100, unique=True)
last_seen = models.DateTimeField(auto_now=True)
capabilities = models.JSONField()
current_workload = models.IntegerField(default=0)
# views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.utils import timezone
from .models import WorkBatch, ScrapingDevice
@api_view(['POST'])
def request_work(request):
device_id = request.data.get('device_id')
capabilities = request.data.get('capabilities', {})
# Update or create device record
device, created = ScrapingDevice.objects.get_or_create(
device_id=device_id,
defaults={'capabilities': capabilities}
)
if not created:
device.capabilities = capabilities
device.save()
# Find available work based on device capabilities
max_concurrent = capabilities.get('concurrent_requests', 1)
if device.current_workload >= max_concurrent:
return Response({'message': 'Device at capacity'}, status=429)
# Get highest priority pending work
work_batch = WorkBatch.objects.filter(
status='pending'
).order_by('-priority', 'created_at').first()
if not work_batch:
return Response({'message': 'No work available'}, status=204)
# Assign work to device
work_batch.status = 'assigned'
work_batch.assigned_device = device_id
work_batch.save()
device.current_workload += 1
device.save()
return Response({
'id': str(work_batch.id),
'urls': work_batch.urls,
'priority': work_batch.priority,
'configuration': work_batch.configuration,
'estimated_duration': len(work_batch.urls) * 2 # Rough estimate
})
@api_view(['POST'])
def submit_results(request):
device_id = request.data.get('device_id')
batch_id = request.data.get('batch_id')
results = request.data.get('results', [])
try:
work_batch = WorkBatch.objects.get(
id=batch_id,
assigned_device=device_id
)
# Process and store results
success_count = sum(1 for r in results if r.get('status_code') == 200)
work_batch.status = 'completed'
work_batch.completed_at = timezone.now()
work_batch.save()
# Update device workload
device = ScrapingDevice.objects.get(device_id=device_id)
device.current_workload = max(0, device.current_workload - 1)
device.save()
return Response({
'message': f'Results submitted successfully. {success_count}/{len(results)} successful.'
})
except WorkBatch.DoesNotExist:
return Response({'error': 'Work batch not found'}, status=404)
Advanced Features
5. Load Balancing and Failover
Implement smart work distribution:
class LoadBalancer {
private let coordinator: DistributedScrapingCoordinator
private var deviceMetrics: [String: DeviceMetrics] = [:]
func distributeWork(urls: [String]) async -> [WorkBatch] {
let availableDevices = await getAvailableDevices()
let optimalBatchSize = calculateOptimalBatchSize(for: availableDevices)
var batches: [WorkBatch] = []
var urlIndex = 0
for device in availableDevices {
let batchSize = min(optimalBatchSize, urls.count - urlIndex)
if batchSize <= 0 { break }
let batchURLs = Array(urls[urlIndex..<(urlIndex + batchSize)])
let batch = WorkBatch(
id: UUID().uuidString,
urls: batchURLs,
priority: calculatePriority(for: device),
configuration: getConfigurationFor(device: device),
estimatedDuration: estimateProcessingTime(for: batchURLs, device: device)
)
batches.append(batch)
urlIndex += batchSize
}
return batches
}
private func calculateOptimalBatchSize(for devices: [DeviceInfo]) -> Int {
let totalCapacity = devices.reduce(0) { $0 + $1.concurrentRequests }
return max(1, totalCapacity / devices.count)
}
}
struct DeviceInfo {
let deviceId: String
let concurrentRequests: Int
let averageResponseTime: TimeInterval
let successRate: Double
let currentLoad: Int
}
struct DeviceMetrics {
var totalRequests: Int = 0
var successfulRequests: Int = 0
var averageResponseTime: TimeInterval = 0
var lastSeen: Date = Date()
}
6. Error Handling and Retry Logic
Implement robust error handling across the distributed system:
class ResilientScraper {
private let maxRetries = 3
private let backoffMultiplier: TimeInterval = 2.0
func scrapeWithRetry(url: String, configuration: ScrapingConfiguration, attempt: Int = 1) async -> ScrapingResult {
let result = await scrapeURL(url, configuration: configuration)
if result.statusCode == 200 || attempt >= maxRetries {
return result
}
// Implement exponential backoff
let delay = configuration.delayBetweenRequests * pow(backoffMultiplier, Double(attempt - 1))
try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
return await scrapeWithRetry(url: url, configuration: configuration, attempt: attempt + 1)
}
func handleDeviceFailure(_ deviceId: String) async {
// Reassign work from failed device to other devices
await redistributeFailedWork(from: deviceId)
}
}
Monitoring and Analytics
7. Performance Monitoring
Track performance across your distributed system:
class DistributedScrapingMonitor {
private var metrics: [String: Any] = [:]
func trackMetrics(for batch: WorkBatch, results: [ScrapingResult]) {
let successRate = Double(results.filter { $0.statusCode == 200 }.count) / Double(results.count)
let averageResponseTime = results.map { $0.processingTime }.reduce(0, +) / Double(results.count)
metrics["success_rate"] = successRate
metrics["average_response_time"] = averageResponseTime
metrics["total_requests"] = results.count
metrics["timestamp"] = Date().timeIntervalSince1970
sendMetricsToCoordinator()
}
private func sendMetricsToCoordinator() {
// Send metrics to central monitoring system
}
}
Usage Example
8. Putting It All Together
Here's how to implement the complete distributed scraping system:
// Initialize the coordinator
let coordinator = DistributedScrapingCoordinator(
baseURL: "https://your-scraping-coordinator.com"
)
// Create and configure the scraper
let scraper = DistributedScraper(
coordinator: coordinator,
maxConcurrentRequests: 8
)
// Start the distributed scraping process
Task {
await scraper.startScraping()
}
// In a separate coordinator service, distribute work
let urls = [
"https://example1.com",
"https://example2.com",
// ... more URLs
]
let loadBalancer = LoadBalancer()
let workBatches = await loadBalancer.distributeWork(urls: urls)
// Submit work batches to the coordination service
for batch in workBatches {
await submitWorkBatch(batch)
}
9. Command Line Tools
Create command line utilities for managing the distributed system:
# Start a scraping worker
./scraper-worker --coordinator-url https://coordinator.com --max-concurrent 5
# Monitor system status
./scraper-status --show-devices --show-metrics
# Submit URLs for scraping
./scraper-submit --urls-file urls.txt --priority 1
When building sophisticated scraping systems, you might also need to consider how to run multiple pages in parallel with Puppeteer for JavaScript-heavy sites, or learn techniques for handling browser sessions in Puppeteer when dealing with complex authentication flows across your distributed system.
Best Practices and Deployment
Performance Optimization
- Batch Size Tuning: Adjust batch sizes based on network conditions and device capabilities
- Connection Pooling: Use Alamofire's session management for efficient connection reuse
- Memory Management: Implement proper cleanup to prevent memory leaks
- Network Optimization: Use compression and optimize request headers
Security Considerations
- Authentication: Implement secure device authentication with the coordinator
- HTTPS: Always use encrypted connections for coordinator communication
- Rate Limiting: Respect target website rate limits across all devices
- IP Rotation: Consider using proxy servers for better distribution
Monitoring and Maintenance
- Health Checks: Implement regular device health monitoring
- Failover: Design automatic failover for device failures
- Scaling: Support dynamic addition/removal of scraping devices
- Logging: Centralized logging for debugging and analysis
This distributed scraping architecture with Alamofire provides a scalable foundation for large-scale web scraping operations across multiple iOS/macOS devices, with proper coordination, error handling, and monitoring capabilities.