Table of contents

How do you handle API pagination when scraping large datasets?

API pagination is a crucial aspect of web scraping that allows you to efficiently retrieve large datasets by breaking them into smaller, manageable chunks. Understanding how to properly handle pagination is essential for building robust scrapers that can handle enterprise-scale data collection.

Understanding API Pagination Types

Most APIs implement pagination to limit the amount of data returned in a single request, improving performance and reducing server load. There are several common pagination patterns:

1. Offset-Based Pagination

This is the most common pagination method, using offset and limit parameters:

import requests
import time

def scrape_with_offset_pagination(base_url, headers=None):
    all_data = []
    offset = 0
    limit = 100

    while True:
        params = {
            'offset': offset,
            'limit': limit
        }

        response = requests.get(base_url, params=params, headers=headers)
        response.raise_for_status()

        data = response.json()
        items = data.get('items', [])

        if not items:
            break

        all_data.extend(items)
        offset += limit

        # Rate limiting
        time.sleep(0.5)

        print(f"Retrieved {len(items)} items, total: {len(all_data)}")

    return all_data

2. Page-Based Pagination

Uses page numbers instead of offsets:

async function scrapeWithPagePagination(baseUrl, headers = {}) {
    const allData = [];
    let page = 1;
    const pageSize = 50;

    while (true) {
        const params = new URLSearchParams({
            page: page,
            page_size: pageSize
        });

        try {
            const response = await fetch(`${baseUrl}?${params}`, {
                headers: headers
            });

            if (!response.ok) {
                throw new Error(`HTTP error! status: ${response.status}`);
            }

            const data = await response.json();
            const items = data.results || data.items || [];

            if (items.length === 0) {
                break;
            }

            allData.push(...items);
            page++;

            // Rate limiting
            await new Promise(resolve => setTimeout(resolve, 500));

            console.log(`Page ${page - 1}: Retrieved ${items.length} items, total: ${allData.length}`);

        } catch (error) {
            console.error(`Error fetching page ${page}:`, error);
            break;
        }
    }

    return allData;
}

3. Cursor-Based Pagination

More efficient for large datasets, uses cursors to track position:

import requests
import json

def scrape_with_cursor_pagination(base_url, headers=None):
    all_data = []
    cursor = None

    while True:
        params = {'limit': 100}
        if cursor:
            params['cursor'] = cursor

        response = requests.get(base_url, params=params, headers=headers)
        response.raise_for_status()

        data = response.json()
        items = data.get('data', [])

        if not items:
            break

        all_data.extend(items)

        # Get next cursor
        pagination_info = data.get('pagination', {})
        cursor = pagination_info.get('next_cursor')

        if not cursor:
            break

        time.sleep(0.5)
        print(f"Retrieved {len(items)} items, total: {len(all_data)}")

    return all_data

Advanced Pagination Handling Techniques

Handling Rate Limits and Retries

When scraping large datasets, you'll often encounter rate limits. Implement exponential backoff for robust error handling:

import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class PaginationScraper:
    def __init__(self, base_url, headers=None, max_retries=3):
        self.base_url = base_url
        self.headers = headers or {}
        self.session = requests.Session()

        # Configure retry strategy
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

    def scrape_with_exponential_backoff(self):
        all_data = []
        page = 1
        consecutive_errors = 0

        while True:
            try:
                params = {'page': page, 'per_page': 100}
                response = self.session.get(
                    self.base_url, 
                    params=params, 
                    headers=self.headers,
                    timeout=30
                )

                if response.status_code == 429:  # Rate limited
                    wait_time = 2 ** consecutive_errors + random.uniform(0, 1)
                    print(f"Rate limited. Waiting {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                    consecutive_errors += 1
                    continue

                response.raise_for_status()
                data = response.json()
                items = data.get('items', [])

                if not items:
                    break

                all_data.extend(items)
                page += 1
                consecutive_errors = 0  # Reset on success

                # Adaptive rate limiting
                time.sleep(0.1 + random.uniform(0, 0.2))

            except Exception as e:
                consecutive_errors += 1
                if consecutive_errors >= 5:
                    print(f"Too many consecutive errors: {e}")
                    break

                wait_time = 2 ** consecutive_errors
                print(f"Error occurred: {e}. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)

        return all_data

Parallel Pagination Processing

For APIs that support it, you can process multiple pages concurrently:

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor

async def fetch_page(session, url, page, semaphore):
    async with semaphore:  # Limit concurrent requests
        params = {'page': page, 'per_page': 100}
        try:
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    return page, data.get('items', [])
                else:
                    print(f"Error fetching page {page}: {response.status}")
                    return page, []
        except Exception as e:
            print(f"Exception fetching page {page}: {e}")
            return page, []

async def scrape_parallel_pagination(base_url, total_pages, max_concurrent=5):
    semaphore = asyncio.Semaphore(max_concurrent)
    all_data = []

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_page(session, base_url, page, semaphore) 
            for page in range(1, total_pages + 1)
        ]

        results = await asyncio.gather(*tasks)

        # Sort by page number and combine data
        results.sort(key=lambda x: x[0])
        for page, items in results:
            all_data.extend(items)
            print(f"Page {page}: {len(items)} items")

    return all_data

Handling Dynamic Pagination

Some modern web applications use JavaScript-based pagination that requires browser automation. In such cases, you can handle AJAX requests using Puppeteer to intercept API calls:

const puppeteer = require('puppeteer');

async function scrapeDynamicPagination(url) {
    const browser = await puppeteer.launch();
    const page = await browser.newPage();

    const allData = [];

    // Intercept network requests
    await page.setRequestInterception(true);

    page.on('request', (request) => {
        request.continue();
    });

    page.on('response', async (response) => {
        if (response.url().includes('/api/data') && response.status() === 200) {
            try {
                const data = await response.json();
                if (data.items) {
                    allData.push(...data.items);
                }
            } catch (error) {
                console.error('Error parsing JSON:', error);
            }
        }
    });

    await page.goto(url);

    // Keep clicking "Load More" or "Next" until no more data
    while (true) {
        try {
            await page.waitForSelector('.next-button', { timeout: 5000 });
            await page.click('.next-button');
            await page.waitForTimeout(2000); // Wait for data to load
        } catch (error) {
            console.log('No more pages available');
            break;
        }
    }

    await browser.close();
    return allData;
}

Best Practices for Large Dataset Pagination

1. Implement Robust Error Handling

Always include comprehensive error handling for network issues, rate limiting, and malformed responses:

def robust_pagination_scraper(base_url, headers=None):
    all_data = []
    page = 1
    max_consecutive_failures = 3
    consecutive_failures = 0

    while consecutive_failures < max_consecutive_failures:
        try:
            params = {'page': page, 'limit': 100}
            response = requests.get(base_url, params=params, headers=headers, timeout=30)

            # Handle different HTTP status codes
            if response.status_code == 404:
                print("Reached end of data (404)")
                break
            elif response.status_code == 429:
                print("Rate limited, waiting...")
                time.sleep(60)  # Wait 1 minute for rate limit reset
                continue
            elif response.status_code >= 500:
                print(f"Server error {response.status_code}, retrying...")
                consecutive_failures += 1
                time.sleep(5)
                continue

            response.raise_for_status()
            data = response.json()

            # Validate response structure
            if not isinstance(data, dict) or 'items' not in data:
                print(f"Unexpected response format on page {page}")
                consecutive_failures += 1
                continue

            items = data['items']
            if not items:
                print("No more items available")
                break

            all_data.extend(items)
            page += 1
            consecutive_failures = 0  # Reset on success

            print(f"Successfully scraped page {page - 1}: {len(items)} items")
            time.sleep(1)  # Rate limiting

        except requests.exceptions.RequestException as e:
            print(f"Request error on page {page}: {e}")
            consecutive_failures += 1
            time.sleep(5)
        except json.JSONDecodeError as e:
            print(f"JSON decode error on page {page}: {e}")
            consecutive_failures += 1
            time.sleep(5)

    return all_data

2. Data Persistence and Resume Capability

For very large datasets, implement checkpointing to resume interrupted scraping sessions:

import pickle
import os

class PersistentPaginationScraper:
    def __init__(self, base_url, checkpoint_file='scraping_checkpoint.pkl'):
        self.base_url = base_url
        self.checkpoint_file = checkpoint_file
        self.load_checkpoint()

    def load_checkpoint(self):
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'rb') as f:
                checkpoint = pickle.load(f)
                self.current_page = checkpoint.get('page', 1)
                self.scraped_data = checkpoint.get('data', [])
                print(f"Resumed from page {self.current_page} with {len(self.scraped_data)} items")
        else:
            self.current_page = 1
            self.scraped_data = []

    def save_checkpoint(self):
        checkpoint = {
            'page': self.current_page,
            'data': self.scraped_data
        }
        with open(self.checkpoint_file, 'wb') as f:
            pickle.dump(checkpoint, f)

    def scrape(self):
        try:
            while True:
                params = {'page': self.current_page, 'per_page': 100}
                response = requests.get(self.base_url, params=params)
                response.raise_for_status()

                data = response.json()
                items = data.get('items', [])

                if not items:
                    break

                self.scraped_data.extend(items)
                self.current_page += 1

                # Save checkpoint every 10 pages
                if self.current_page % 10 == 0:
                    self.save_checkpoint()
                    print(f"Checkpoint saved at page {self.current_page}")

                time.sleep(0.5)

        except KeyboardInterrupt:
            print("Scraping interrupted. Saving checkpoint...")
            self.save_checkpoint()
            raise

        # Final save
        self.save_checkpoint()
        return self.scraped_data

3. Memory Management for Large Datasets

When dealing with massive datasets, consider streaming data to files instead of keeping everything in memory:

import csv
import json

def stream_paginated_data_to_csv(base_url, output_file, headers=None):
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = None
        page = 1
        total_items = 0

        while True:
            params = {'page': page, 'limit': 1000}
            response = requests.get(base_url, params=params, headers=headers)
            response.raise_for_status()

            data = response.json()
            items = data.get('items', [])

            if not items:
                break

            # Initialize CSV writer with headers from first item
            if writer is None:
                fieldnames = items[0].keys()
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()

            # Write items to CSV
            for item in items:
                writer.writerow(item)

            total_items += len(items)
            page += 1

            print(f"Processed page {page - 1}: {len(items)} items (total: {total_items})")
            time.sleep(0.5)

        print(f"Completed! Total items saved: {total_items}")

Conclusion

Handling API pagination effectively is crucial for successful large-scale web scraping. The key principles include implementing robust error handling, respecting rate limits, using appropriate pagination patterns, and considering memory management for massive datasets.

When dealing with complex pagination scenarios, you might also need to monitor network requests or handle dynamic content loading. Always test your pagination logic thoroughly and implement proper logging to track your scraping progress.

Remember to always check the API documentation for specific pagination parameters and respect the service's terms of use and rate limiting policies. Proper pagination handling will make your scrapers more reliable, efficient, and maintainable for production use.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon