How do you handle API pagination when scraping large datasets?
API pagination is a crucial aspect of web scraping that allows you to efficiently retrieve large datasets by breaking them into smaller, manageable chunks. Understanding how to properly handle pagination is essential for building robust scrapers that can handle enterprise-scale data collection.
Understanding API Pagination Types
Most APIs implement pagination to limit the amount of data returned in a single request, improving performance and reducing server load. There are several common pagination patterns:
1. Offset-Based Pagination
This is the most common pagination method, using offset
and limit
parameters:
import requests
import time
def scrape_with_offset_pagination(base_url, headers=None):
all_data = []
offset = 0
limit = 100
while True:
params = {
'offset': offset,
'limit': limit
}
response = requests.get(base_url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
items = data.get('items', [])
if not items:
break
all_data.extend(items)
offset += limit
# Rate limiting
time.sleep(0.5)
print(f"Retrieved {len(items)} items, total: {len(all_data)}")
return all_data
2. Page-Based Pagination
Uses page numbers instead of offsets:
async function scrapeWithPagePagination(baseUrl, headers = {}) {
const allData = [];
let page = 1;
const pageSize = 50;
while (true) {
const params = new URLSearchParams({
page: page,
page_size: pageSize
});
try {
const response = await fetch(`${baseUrl}?${params}`, {
headers: headers
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
const items = data.results || data.items || [];
if (items.length === 0) {
break;
}
allData.push(...items);
page++;
// Rate limiting
await new Promise(resolve => setTimeout(resolve, 500));
console.log(`Page ${page - 1}: Retrieved ${items.length} items, total: ${allData.length}`);
} catch (error) {
console.error(`Error fetching page ${page}:`, error);
break;
}
}
return allData;
}
3. Cursor-Based Pagination
More efficient for large datasets, uses cursors to track position:
import requests
import json
def scrape_with_cursor_pagination(base_url, headers=None):
all_data = []
cursor = None
while True:
params = {'limit': 100}
if cursor:
params['cursor'] = cursor
response = requests.get(base_url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
items = data.get('data', [])
if not items:
break
all_data.extend(items)
# Get next cursor
pagination_info = data.get('pagination', {})
cursor = pagination_info.get('next_cursor')
if not cursor:
break
time.sleep(0.5)
print(f"Retrieved {len(items)} items, total: {len(all_data)}")
return all_data
Advanced Pagination Handling Techniques
Handling Rate Limits and Retries
When scraping large datasets, you'll often encounter rate limits. Implement exponential backoff for robust error handling:
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class PaginationScraper:
def __init__(self, base_url, headers=None, max_retries=3):
self.base_url = base_url
self.headers = headers or {}
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def scrape_with_exponential_backoff(self):
all_data = []
page = 1
consecutive_errors = 0
while True:
try:
params = {'page': page, 'per_page': 100}
response = self.session.get(
self.base_url,
params=params,
headers=self.headers,
timeout=30
)
if response.status_code == 429: # Rate limited
wait_time = 2 ** consecutive_errors + random.uniform(0, 1)
print(f"Rate limited. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
consecutive_errors += 1
continue
response.raise_for_status()
data = response.json()
items = data.get('items', [])
if not items:
break
all_data.extend(items)
page += 1
consecutive_errors = 0 # Reset on success
# Adaptive rate limiting
time.sleep(0.1 + random.uniform(0, 0.2))
except Exception as e:
consecutive_errors += 1
if consecutive_errors >= 5:
print(f"Too many consecutive errors: {e}")
break
wait_time = 2 ** consecutive_errors
print(f"Error occurred: {e}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
return all_data
Parallel Pagination Processing
For APIs that support it, you can process multiple pages concurrently:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
async def fetch_page(session, url, page, semaphore):
async with semaphore: # Limit concurrent requests
params = {'page': page, 'per_page': 100}
try:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return page, data.get('items', [])
else:
print(f"Error fetching page {page}: {response.status}")
return page, []
except Exception as e:
print(f"Exception fetching page {page}: {e}")
return page, []
async def scrape_parallel_pagination(base_url, total_pages, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
all_data = []
async with aiohttp.ClientSession() as session:
tasks = [
fetch_page(session, base_url, page, semaphore)
for page in range(1, total_pages + 1)
]
results = await asyncio.gather(*tasks)
# Sort by page number and combine data
results.sort(key=lambda x: x[0])
for page, items in results:
all_data.extend(items)
print(f"Page {page}: {len(items)} items")
return all_data
Handling Dynamic Pagination
Some modern web applications use JavaScript-based pagination that requires browser automation. In such cases, you can handle AJAX requests using Puppeteer to intercept API calls:
const puppeteer = require('puppeteer');
async function scrapeDynamicPagination(url) {
const browser = await puppeteer.launch();
const page = await browser.newPage();
const allData = [];
// Intercept network requests
await page.setRequestInterception(true);
page.on('request', (request) => {
request.continue();
});
page.on('response', async (response) => {
if (response.url().includes('/api/data') && response.status() === 200) {
try {
const data = await response.json();
if (data.items) {
allData.push(...data.items);
}
} catch (error) {
console.error('Error parsing JSON:', error);
}
}
});
await page.goto(url);
// Keep clicking "Load More" or "Next" until no more data
while (true) {
try {
await page.waitForSelector('.next-button', { timeout: 5000 });
await page.click('.next-button');
await page.waitForTimeout(2000); // Wait for data to load
} catch (error) {
console.log('No more pages available');
break;
}
}
await browser.close();
return allData;
}
Best Practices for Large Dataset Pagination
1. Implement Robust Error Handling
Always include comprehensive error handling for network issues, rate limiting, and malformed responses:
def robust_pagination_scraper(base_url, headers=None):
all_data = []
page = 1
max_consecutive_failures = 3
consecutive_failures = 0
while consecutive_failures < max_consecutive_failures:
try:
params = {'page': page, 'limit': 100}
response = requests.get(base_url, params=params, headers=headers, timeout=30)
# Handle different HTTP status codes
if response.status_code == 404:
print("Reached end of data (404)")
break
elif response.status_code == 429:
print("Rate limited, waiting...")
time.sleep(60) # Wait 1 minute for rate limit reset
continue
elif response.status_code >= 500:
print(f"Server error {response.status_code}, retrying...")
consecutive_failures += 1
time.sleep(5)
continue
response.raise_for_status()
data = response.json()
# Validate response structure
if not isinstance(data, dict) or 'items' not in data:
print(f"Unexpected response format on page {page}")
consecutive_failures += 1
continue
items = data['items']
if not items:
print("No more items available")
break
all_data.extend(items)
page += 1
consecutive_failures = 0 # Reset on success
print(f"Successfully scraped page {page - 1}: {len(items)} items")
time.sleep(1) # Rate limiting
except requests.exceptions.RequestException as e:
print(f"Request error on page {page}: {e}")
consecutive_failures += 1
time.sleep(5)
except json.JSONDecodeError as e:
print(f"JSON decode error on page {page}: {e}")
consecutive_failures += 1
time.sleep(5)
return all_data
2. Data Persistence and Resume Capability
For very large datasets, implement checkpointing to resume interrupted scraping sessions:
import pickle
import os
class PersistentPaginationScraper:
def __init__(self, base_url, checkpoint_file='scraping_checkpoint.pkl'):
self.base_url = base_url
self.checkpoint_file = checkpoint_file
self.load_checkpoint()
def load_checkpoint(self):
if os.path.exists(self.checkpoint_file):
with open(self.checkpoint_file, 'rb') as f:
checkpoint = pickle.load(f)
self.current_page = checkpoint.get('page', 1)
self.scraped_data = checkpoint.get('data', [])
print(f"Resumed from page {self.current_page} with {len(self.scraped_data)} items")
else:
self.current_page = 1
self.scraped_data = []
def save_checkpoint(self):
checkpoint = {
'page': self.current_page,
'data': self.scraped_data
}
with open(self.checkpoint_file, 'wb') as f:
pickle.dump(checkpoint, f)
def scrape(self):
try:
while True:
params = {'page': self.current_page, 'per_page': 100}
response = requests.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
items = data.get('items', [])
if not items:
break
self.scraped_data.extend(items)
self.current_page += 1
# Save checkpoint every 10 pages
if self.current_page % 10 == 0:
self.save_checkpoint()
print(f"Checkpoint saved at page {self.current_page}")
time.sleep(0.5)
except KeyboardInterrupt:
print("Scraping interrupted. Saving checkpoint...")
self.save_checkpoint()
raise
# Final save
self.save_checkpoint()
return self.scraped_data
3. Memory Management for Large Datasets
When dealing with massive datasets, consider streaming data to files instead of keeping everything in memory:
import csv
import json
def stream_paginated_data_to_csv(base_url, output_file, headers=None):
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = None
page = 1
total_items = 0
while True:
params = {'page': page, 'limit': 1000}
response = requests.get(base_url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
items = data.get('items', [])
if not items:
break
# Initialize CSV writer with headers from first item
if writer is None:
fieldnames = items[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# Write items to CSV
for item in items:
writer.writerow(item)
total_items += len(items)
page += 1
print(f"Processed page {page - 1}: {len(items)} items (total: {total_items})")
time.sleep(0.5)
print(f"Completed! Total items saved: {total_items}")
Conclusion
Handling API pagination effectively is crucial for successful large-scale web scraping. The key principles include implementing robust error handling, respecting rate limits, using appropriate pagination patterns, and considering memory management for massive datasets.
When dealing with complex pagination scenarios, you might also need to monitor network requests or handle dynamic content loading. Always test your pagination logic thoroughly and implement proper logging to track your scraping progress.
Remember to always check the API documentation for specific pagination parameters and respect the service's terms of use and rate limiting policies. Proper pagination handling will make your scrapers more reliable, efficient, and maintainable for production use.