Table of contents

How do you implement API request deduplication to avoid redundant calls?

API request deduplication is a crucial optimization technique that prevents unnecessary network calls by identifying and eliminating duplicate requests. This approach significantly improves application performance, reduces server load, and helps avoid hitting API rate limits while building efficient web scraping and data collection systems.

Understanding API Request Deduplication

API request deduplication works by creating unique identifiers for each request and maintaining a cache or registry of completed requests. When a new request is made, the system checks if an identical request has already been processed recently, returning the cached result instead of making a redundant API call.

Key Benefits

  • Performance Optimization: Eliminates unnecessary network latency
  • Resource Conservation: Reduces bandwidth and server load
  • Rate Limit Management: Helps stay within API quotas
  • Cost Reduction: Minimizes usage-based API costs
  • Improved User Experience: Faster response times

Implementation Strategies

1. Request Fingerprinting with Hashing

The foundation of deduplication is creating unique fingerprints for each request:

import hashlib
import json
from typing import Dict, Any, Optional

class RequestDeduplicator:
    def __init__(self):
        self.cache = {}
        self.pending_requests = set()

    def create_request_fingerprint(self, url: str, method: str = 'GET', 
                                 headers: Optional[Dict] = None, 
                                 data: Optional[Dict] = None) -> str:
        """Create a unique fingerprint for the request"""
        request_data = {
            'url': url,
            'method': method.upper(),
            'headers': dict(sorted((headers or {}).items())),
            'data': data
        }

        # Create hash from normalized request data
        request_string = json.dumps(request_data, sort_keys=True)
        return hashlib.sha256(request_string.encode()).hexdigest()

    def is_duplicate(self, fingerprint: str) -> bool:
        """Check if request is duplicate or already pending"""
        return fingerprint in self.cache or fingerprint in self.pending_requests

2. Memory-Based Cache Implementation

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class CachedResponse:
    data: Any
    timestamp: float
    expires_at: Optional[float] = None

class MemoryCacheDeduplicator:
    def __init__(self, default_ttl: int = 3600):
        self.cache = {}
        self.default_ttl = default_ttl

    def get_cached_response(self, fingerprint: str) -> Optional[Any]:
        """Retrieve cached response if valid"""
        if fingerprint not in self.cache:
            return None

        cached = self.cache[fingerprint]
        current_time = time.time()

        # Check if cache entry has expired
        if cached.expires_at and current_time > cached.expires_at:
            del self.cache[fingerprint]
            return None

        return cached.data

    def cache_response(self, fingerprint: str, data: Any, ttl: Optional[int] = None):
        """Cache response with TTL"""
        expires_at = None
        if ttl or self.default_ttl:
            expires_at = time.time() + (ttl or self.default_ttl)

        self.cache[fingerprint] = CachedResponse(
            data=data,
            timestamp=time.time(),
            expires_at=expires_at
        )

3. Redis-Based Distributed Deduplication

For distributed systems, Redis provides a robust solution:

import redis
import json
import pickle
from typing import Optional

class RedisDeduplicator:
    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl
        self.pending_key_prefix = "pending:"
        self.cache_key_prefix = "cache:"

    def get_cached_response(self, fingerprint: str) -> Optional[Any]:
        """Get cached response from Redis"""
        cache_key = f"{self.cache_key_prefix}{fingerprint}"
        cached_data = self.redis.get(cache_key)

        if cached_data:
            return pickle.loads(cached_data)
        return None

    def cache_response(self, fingerprint: str, data: Any, ttl: Optional[int] = None):
        """Cache response in Redis with TTL"""
        cache_key = f"{self.cache_key_prefix}{fingerprint}"
        serialized_data = pickle.dumps(data)

        self.redis.setex(
            cache_key,
            ttl or self.default_ttl,
            serialized_data
        )

    def mark_pending(self, fingerprint: str, ttl: int = 300):
        """Mark request as pending to prevent concurrent duplicates"""
        pending_key = f"{self.pending_key_prefix}{fingerprint}"
        self.redis.setex(pending_key, ttl, "1")

    def is_pending(self, fingerprint: str) -> bool:
        """Check if request is currently being processed"""
        pending_key = f"{self.pending_key_prefix}{fingerprint}"
        return bool(self.redis.get(pending_key))

    def clear_pending(self, fingerprint: str):
        """Clear pending status after request completion"""
        pending_key = f"{self.pending_key_prefix}{fingerprint}"
        self.redis.delete(pending_key)

JavaScript Implementation

For client-side applications, here's a JavaScript implementation:

class APIDeduplicator {
    constructor(defaultTTL = 300000) { // 5 minutes default
        this.cache = new Map();
        this.pendingRequests = new Map();
        this.defaultTTL = defaultTTL;
    }

    createFingerprint(url, options = {}) {
        const requestData = {
            url,
            method: options.method || 'GET',
            headers: this.normalizeHeaders(options.headers || {}),
            body: options.body
        };

        return this.hashString(JSON.stringify(requestData));
    }

    normalizeHeaders(headers) {
        const normalized = {};
        Object.keys(headers).sort().forEach(key => {
            normalized[key.toLowerCase()] = headers[key];
        });
        return normalized;
    }

    hashString(str) {
        let hash = 0;
        for (let i = 0; i < str.length; i++) {
            const char = str.charCodeAt(i);
            hash = ((hash << 5) - hash) + char;
            hash = hash & hash; // Convert to 32-bit integer
        }
        return hash.toString(36);
    }

    async deduplicate(url, options = {}) {
        const fingerprint = this.createFingerprint(url, options);

        // Check cache first
        const cached = this.getCached(fingerprint);
        if (cached) {
            return cached.data;
        }

        // Check if request is already pending
        if (this.pendingRequests.has(fingerprint)) {
            return this.pendingRequests.get(fingerprint);
        }

        // Make the request and cache it
        const requestPromise = this.makeRequest(url, options)
            .then(response => {
                this.cacheResponse(fingerprint, response);
                this.pendingRequests.delete(fingerprint);
                return response;
            })
            .catch(error => {
                this.pendingRequests.delete(fingerprint);
                throw error;
            });

        this.pendingRequests.set(fingerprint, requestPromise);
        return requestPromise;
    }

    getCached(fingerprint) {
        const cached = this.cache.get(fingerprint);
        if (!cached) return null;

        if (Date.now() > cached.expiresAt) {
            this.cache.delete(fingerprint);
            return null;
        }

        return cached;
    }

    cacheResponse(fingerprint, data, ttl = this.defaultTTL) {
        this.cache.set(fingerprint, {
            data,
            expiresAt: Date.now() + ttl
        });
    }

    async makeRequest(url, options) {
        const response = await fetch(url, options);
        if (!response.ok) {
            throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
        return response.json();
    }
}

// Usage example
const deduplicator = new APIDeduplicator();

async function fetchUserData(userId) {
    return deduplicator.deduplicate(`/api/users/${userId}`, {
        method: 'GET',
        headers: { 'Authorization': 'Bearer token' }
    });
}

Advanced Patterns

Request Queuing and Batching

import asyncio
from collections import defaultdict
from typing import List, Callable

class RequestBatcher:
    def __init__(self, batch_size: int = 10, batch_timeout: float = 1.0):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.pending_batches = defaultdict(list)
        self.batch_processors = {}

    async def add_to_batch(self, batch_key: str, request_data: dict, 
                          processor: Callable) -> Any:
        """Add request to batch and process when ready"""
        future = asyncio.Future()

        self.pending_batches[batch_key].append({
            'data': request_data,
            'future': future
        })

        self.batch_processors[batch_key] = processor

        # Process batch if it's full or start timeout
        if len(self.pending_batches[batch_key]) >= self.batch_size:
            asyncio.create_task(self.process_batch(batch_key))
        else:
            asyncio.create_task(self.schedule_batch_timeout(batch_key))

        return await future

    async def process_batch(self, batch_key: str):
        """Process accumulated batch"""
        batch = self.pending_batches[batch_key]
        if not batch:
            return

        self.pending_batches[batch_key] = []
        processor = self.batch_processors[batch_key]

        try:
            # Extract request data for batch processing
            batch_data = [item['data'] for item in batch]
            results = await processor(batch_data)

            # Resolve individual futures with results
            for item, result in zip(batch, results):
                item['future'].set_result(result)

        except Exception as e:
            # Reject all futures in batch
            for item in batch:
                item['future'].set_exception(e)

    async def schedule_batch_timeout(self, batch_key: str):
        """Process batch after timeout even if not full"""
        await asyncio.sleep(self.batch_timeout)
        await self.process_batch(batch_key)

Conditional Deduplication

class ConditionalDeduplicator:
    def __init__(self):
        self.cache = {}
        self.dedup_rules = {}

    def add_rule(self, pattern: str, should_dedupe: Callable[[dict], bool]):
        """Add conditional deduplication rule"""
        self.dedup_rules[pattern] = should_dedupe

    def should_deduplicate(self, request_data: dict) -> bool:
        """Check if request should be deduplicated based on rules"""
        for pattern, rule_func in self.dedup_rules.items():
            if pattern in request_data.get('url', ''):
                return rule_func(request_data)

        return True  # Default to deduplication

    def process_request(self, request_data: dict):
        """Process request with conditional deduplication"""
        if not self.should_deduplicate(request_data):
            return self.make_request(request_data)

        fingerprint = self.create_fingerprint(request_data)

        if fingerprint in self.cache:
            return self.cache[fingerprint]

        result = self.make_request(request_data)
        self.cache[fingerprint] = result
        return result

# Usage example
deduplicator = ConditionalDeduplicator()

# Don't dedupe real-time data endpoints
deduplicator.add_rule('/api/live/', lambda req: False)

# Don't dedupe POST requests
deduplicator.add_rule('', lambda req: req.get('method', 'GET') != 'POST')

Integration with Web Scraping

When implementing deduplication in web scraping workflows, consider integrating with existing tools. For applications using browser automation, understanding how to monitor network requests in Puppeteer can help identify duplicate requests at the browser level.

Performance Monitoring

import time
from dataclasses import dataclass
from typing import Dict

@dataclass
class DeduplicationStats:
    total_requests: int = 0
    cache_hits: int = 0
    cache_misses: int = 0
    dedupe_savings_ms: float = 0.0

    @property
    def hit_rate(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.cache_hits / self.total_requests

    def add_hit(self, saved_time_ms: float = 0):
        self.total_requests += 1
        self.cache_hits += 1
        self.dedupe_savings_ms += saved_time_ms

    def add_miss(self):
        self.total_requests += 1
        self.cache_misses += 1

class MonitoredDeduplicator:
    def __init__(self):
        self.cache = {}
        self.stats = DeduplicationStats()
        self.request_times = {}

    def process_request(self, fingerprint: str, request_func: Callable):
        """Process request with performance monitoring"""
        start_time = time.time()

        if fingerprint in self.cache:
            # Estimate saved time based on previous requests
            avg_request_time = self.get_average_request_time()
            self.stats.add_hit(avg_request_time * 1000)  # Convert to ms
            return self.cache[fingerprint]

        # Make actual request
        result = request_func()
        request_time = time.time() - start_time

        # Cache result and update stats
        self.cache[fingerprint] = result
        self.request_times[fingerprint] = request_time
        self.stats.add_miss()

        return result

    def get_average_request_time(self) -> float:
        if not self.request_times:
            return 0.5  # Default estimate
        return sum(self.request_times.values()) / len(self.request_times)

    def get_performance_report(self) -> Dict:
        return {
            'hit_rate': f"{self.stats.hit_rate:.2%}",
            'total_requests': self.stats.total_requests,
            'time_saved_seconds': f"{self.stats.dedupe_savings_ms / 1000:.2f}",
            'cache_efficiency': 'High' if self.stats.hit_rate > 0.7 else 'Medium' if self.stats.hit_rate > 0.3 else 'Low'
        }

Best Practices

1. Cache Key Design

  • Include all parameters that affect the response
  • Normalize parameter order and format
  • Consider API versioning in cache keys
  • Handle authentication tokens appropriately

2. TTL Strategy

  • Set appropriate expiration times based on data freshness requirements
  • Use shorter TTLs for real-time data
  • Consider using ETags and Last-Modified headers for validation

3. Memory Management

  • Implement cache size limits
  • Use LRU eviction policies
  • Monitor memory usage in production

4. Error Handling

  • Don't cache error responses permanently
  • Implement retry logic for failed requests
  • Clear pending status on errors

Testing Deduplication

# Test cache hit rates
curl -X GET "http://localhost:3000/api/test" -H "X-Request-ID: test-1"
curl -X GET "http://localhost:3000/api/test" -H "X-Request-ID: test-1"

# Monitor cache statistics
curl -X GET "http://localhost:3000/api/cache/stats"

# Test cache invalidation
curl -X DELETE "http://localhost:3000/api/cache/clear"

Implementing proper API request deduplication requires careful consideration of your application's specific needs, data freshness requirements, and performance constraints. When building comprehensive scraping systems, this technique works well alongside other optimization strategies like handling AJAX requests using Puppeteer for dynamic content.

The key to successful deduplication lies in balancing cache effectiveness with data accuracy, ensuring your application remains both performant and reliable while avoiding unnecessary API calls.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon