How do you handle API response filtering and data transformation?
API response filtering and data transformation are crucial aspects of web scraping and API integration. When working with APIs, you often receive large datasets that need to be filtered, transformed, and structured according to your application's requirements. This process involves extracting relevant data, converting formats, and reshaping data structures to make them more useful for your specific use case.
Understanding API Response Filtering
API response filtering is the process of extracting only the data you need from API responses. This is essential for:
- Performance optimization: Reducing memory usage and processing time
- Data relevance: Focusing on specific fields or records
- Cost efficiency: Minimizing data transfer and storage costs
- User experience: Presenting only relevant information
Data Transformation Fundamentals
Data transformation involves converting data from one format or structure to another. Common transformations include:
- Format conversion: JSON to XML, CSV to JSON
- Data type conversion: String to number, date parsing
- Structure reshaping: Flattening nested objects, grouping data
- Value normalization: Standardizing formats, units, or representations
Python Implementation Examples
Basic Response Filtering with Python
import requests
import json
from typing import Dict, List, Any
class APIResponseFilter:
def __init__(self, api_url: str, headers: Dict = None):
self.api_url = api_url
self.headers = headers or {}
def fetch_and_filter(self, filters: Dict[str, Any]) -> List[Dict]:
"""Fetch API data and apply filters"""
response = requests.get(self.api_url, headers=self.headers)
response.raise_for_status()
data = response.json()
return self.apply_filters(data, filters)
def apply_filters(self, data: List[Dict], filters: Dict[str, Any]) -> List[Dict]:
"""Apply multiple filters to the data"""
filtered_data = data
# Filter by field values
for field, value in filters.items():
if field.startswith('min_'):
field_name = field[4:] # Remove 'min_' prefix
filtered_data = [item for item in filtered_data
if item.get(field_name, 0) >= value]
elif field.startswith('max_'):
field_name = field[4:] # Remove 'max_' prefix
filtered_data = [item for item in filtered_data
if item.get(field_name, 0) <= value]
else:
filtered_data = [item for item in filtered_data
if item.get(field) == value]
return filtered_data
# Usage example
filter_engine = APIResponseFilter('https://api.example.com/products')
filtered_products = filter_engine.fetch_and_filter({
'category': 'electronics',
'min_price': 100,
'max_price': 1000
})
Advanced Data Transformation
import pandas as pd
from datetime import datetime
from typing import Union
class DataTransformer:
@staticmethod
def flatten_nested_dict(nested_dict: Dict, separator: str = '_') -> Dict:
"""Flatten nested dictionary structures"""
def _flatten(obj, parent_key=''):
items = []
if isinstance(obj, dict):
for k, v in obj.items():
new_key = f"{parent_key}{separator}{k}" if parent_key else k
items.extend(_flatten(v, new_key).items())
elif isinstance(obj, list):
for i, v in enumerate(obj):
new_key = f"{parent_key}{separator}{i}" if parent_key else str(i)
items.extend(_flatten(v, new_key).items())
else:
return {parent_key: obj}
return dict(items)
return _flatten(nested_dict)
@staticmethod
def normalize_dates(data: List[Dict], date_fields: List[str]) -> List[Dict]:
"""Normalize date formats across the dataset"""
normalized_data = []
for item in data:
normalized_item = item.copy()
for field in date_fields:
if field in item and item[field]:
try:
# Handle multiple date formats
date_value = item[field]
if isinstance(date_value, str):
# Try common date formats
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d-%m-%Y']:
try:
parsed_date = datetime.strptime(date_value, fmt)
normalized_item[field] = parsed_date.isoformat()
break
except ValueError:
continue
except Exception as e:
print(f"Error normalizing date {field}: {e}")
normalized_data.append(normalized_item)
return normalized_data
# Example usage
transformer = DataTransformer()
# Sample nested API response
api_response = [
{
'id': 1,
'user': {
'name': 'John Doe',
'contact': {
'email': 'john@example.com',
'phone': '123-456-7890'
}
},
'created_at': '2023-12-01'
}
]
# Flatten the structure
flattened = [transformer.flatten_nested_dict(item) for item in api_response]
print(json.dumps(flattened, indent=2))
# Normalize dates
normalized = transformer.normalize_dates(api_response, ['created_at'])
JavaScript Implementation Examples
Client-Side Response Filtering
class APIResponseHandler {
constructor(baseURL, defaultHeaders = {}) {
this.baseURL = baseURL;
this.defaultHeaders = defaultHeaders;
}
async fetchAndFilter(endpoint, filters = {}) {
try {
const response = await fetch(`${this.baseURL}${endpoint}`, {
headers: this.defaultHeaders
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
return this.applyFilters(data, filters);
} catch (error) {
console.error('Error fetching data:', error);
throw error;
}
}
applyFilters(data, filters) {
if (!Array.isArray(data)) {
data = [data];
}
return data.filter(item => {
return Object.entries(filters).every(([key, value]) => {
if (key.startsWith('min_')) {
const fieldName = key.substring(4);
return (item[fieldName] || 0) >= value;
} else if (key.startsWith('max_')) {
const fieldName = key.substring(4);
return (item[fieldName] || 0) <= value;
} else if (key.startsWith('contains_')) {
const fieldName = key.substring(9);
return item[fieldName] &&
item[fieldName].toString().toLowerCase()
.includes(value.toLowerCase());
} else {
return item[key] === value;
}
});
});
}
transformData(data, transformations) {
return data.map(item => {
const transformed = { ...item };
transformations.forEach(transform => {
switch (transform.type) {
case 'rename':
if (item[transform.from]) {
transformed[transform.to] = item[transform.from];
delete transformed[transform.from];
}
break;
case 'calculate':
transformed[transform.field] = transform.formula(item);
break;
case 'format':
if (item[transform.field]) {
transformed[transform.field] =
transform.formatter(item[transform.field]);
}
break;
}
});
return transformed;
});
}
}
// Usage example
const apiHandler = new APIResponseHandler('https://api.example.com');
// Fetch and filter products
apiHandler.fetchAndFilter('/products', {
category: 'electronics',
min_price: 50,
contains_name: 'laptop'
}).then(filteredProducts => {
// Transform the data
const transformations = [
{
type: 'rename',
from: 'product_name',
to: 'title'
},
{
type: 'calculate',
field: 'discounted_price',
formula: (item) => item.price * 0.9
},
{
type: 'format',
field: 'created_at',
formatter: (date) => new Date(date).toLocaleDateString()
}
];
const transformedData = apiHandler.transformData(filteredProducts, transformations);
console.log('Transformed data:', transformedData);
});
Node.js Server-Side Processing
const axios = require('axios');
const _ = require('lodash');
class ServerSideDataProcessor {
constructor(config = {}) {
this.timeout = config.timeout || 30000;
this.retryAttempts = config.retryAttempts || 3;
}
async processApiData(apiUrl, processingConfig) {
try {
const response = await this.fetchWithRetry(apiUrl);
let data = response.data;
// Apply filtering
if (processingConfig.filters) {
data = this.applyAdvancedFilters(data, processingConfig.filters);
}
// Apply transformations
if (processingConfig.transformations) {
data = this.applyTransformations(data, processingConfig.transformations);
}
// Apply aggregations
if (processingConfig.aggregations) {
data = this.applyAggregations(data, processingConfig.aggregations);
}
return data;
} catch (error) {
console.error('Error processing API data:', error);
throw error;
}
}
async fetchWithRetry(url, attempt = 1) {
try {
return await axios.get(url, { timeout: this.timeout });
} catch (error) {
if (attempt < this.retryAttempts) {
console.log(`Retry attempt ${attempt + 1} for ${url}`);
await this.delay(1000 * attempt); // Exponential backoff
return this.fetchWithRetry(url, attempt + 1);
}
throw error;
}
}
applyAdvancedFilters(data, filters) {
return data.filter(item => {
return filters.every(filter => {
switch (filter.operator) {
case 'equals':
return item[filter.field] === filter.value;
case 'contains':
return item[filter.field] &&
item[filter.field].toString().includes(filter.value);
case 'range':
return item[filter.field] >= filter.min &&
item[filter.field] <= filter.max;
case 'in':
return filter.values.includes(item[filter.field]);
case 'regex':
const regex = new RegExp(filter.pattern, filter.flags || 'i');
return regex.test(item[filter.field]);
default:
return true;
}
});
});
}
applyTransformations(data, transformations) {
return data.map(item => {
let transformed = _.cloneDeep(item);
transformations.forEach(transform => {
switch (transform.type) {
case 'map_values':
if (transformed[transform.field]) {
transformed[transform.field] =
transform.mapping[transformed[transform.field]] ||
transformed[transform.field];
}
break;
case 'extract_nested':
transformed[transform.target] =
_.get(transformed, transform.path);
break;
case 'combine_fields':
transformed[transform.target] = transform.fields
.map(field => transformed[field])
.join(transform.separator || ' ');
break;
}
});
return transformed;
});
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Working with Complex API Responses
When dealing with complex API responses, especially those from modern web applications, you might need to handle dynamic content loading. This is where techniques for handling AJAX requests using Puppeteer become valuable, as they allow you to capture data that loads asynchronously.
Handling Paginated API Responses
import asyncio
import aiohttp
from typing import AsyncGenerator
class PaginatedAPIProcessor:
def __init__(self, base_url: str, headers: Dict = None):
self.base_url = base_url
self.headers = headers or {}
async def fetch_all_pages(self, endpoint: str,
params: Dict = None) -> AsyncGenerator[Dict, None]:
"""Fetch all pages from a paginated API"""
async with aiohttp.ClientSession() as session:
page = 1
while True:
current_params = {**(params or {}), 'page': page}
async with session.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=current_params
) as response:
if response.status != 200:
break
data = await response.json()
if not data.get('results'):
break
yield data
# Check if there are more pages
if not data.get('has_next', False):
break
page += 1
# Usage
async def process_all_data():
processor = PaginatedAPIProcessor('https://api.example.com')
all_items = []
async for page_data in processor.fetch_all_pages('/items'):
# Filter and transform each page
filtered_items = [
item for item in page_data['results']
if item.get('status') == 'active'
]
all_items.extend(filtered_items)
return all_items
Performance Optimization Strategies
Memory-Efficient Processing
def process_large_dataset_streaming(api_url: str,
chunk_size: int = 1000) -> Iterator[Dict]:
"""Process large datasets in chunks to manage memory usage"""
offset = 0
while True:
params = {'limit': chunk_size, 'offset': offset}
response = requests.get(api_url, params=params)
if response.status_code != 200:
break
data = response.json()
items = data.get('items', [])
if not items:
break
# Process each chunk
for item in items:
# Apply transformations on individual items
transformed_item = transform_single_item(item)
yield transformed_item
offset += chunk_size
# Check if we've reached the end
if len(items) < chunk_size:
break
def transform_single_item(item: Dict) -> Dict:
"""Transform individual items to reduce memory footprint"""
return {
'id': item.get('id'),
'title': item.get('name', '').strip(),
'price': float(item.get('price', 0)),
'category': item.get('category', {}).get('name', 'Unknown'),
'processed_at': datetime.now().isoformat()
}
Integration with Modern Frameworks
When building applications that require complex data processing, you might also need to monitor network requests in Puppeteer to understand how data flows through your application and identify optimization opportunities.
React Integration Example
import React, { useState, useEffect } from 'react';
const DataProcessor = () => {
const [processedData, setProcessedData] = useState([]);
const [loading, setLoading] = useState(false);
const processApiData = async (filters, transformations) => {
setLoading(true);
try {
const apiHandler = new APIResponseHandler('/api');
// Fetch and filter data
const filteredData = await apiHandler.fetchAndFilter('/data', filters);
// Apply transformations
const transformedData = apiHandler.transformData(
filteredData,
transformations
);
setProcessedData(transformedData);
} catch (error) {
console.error('Error processing data:', error);
} finally {
setLoading(false);
}
};
return (
<div>
{loading ? <div>Processing...</div> :
<DataVisualization data={processedData} />}
</div>
);
};
Command Line Tools for Data Processing
For quick data processing tasks, you can use command-line tools:
# Using jq for JSON processing
curl -s "https://api.example.com/data" | \
jq '.[] | select(.price > 100) | {id: .id, name: .name, price: .price}'
# Using Python one-liner for CSV transformation
python -c "
import json, sys, csv
data = json.load(sys.stdin)
writer = csv.DictWriter(sys.stdout, fieldnames=['id', 'name', 'price'])
writer.writeheader()
for item in data:
if item['price'] > 100:
writer.writerow({k: item[k] for k in ['id', 'name', 'price']})
" < api_response.json > filtered_data.csv
Best Practices and Error Handling
- Validate Data Structure: Always check if expected fields exist before processing
- Handle Missing Values: Provide default values or skip invalid records
- Log Transformations: Keep track of what transformations were applied
- Performance Monitoring: Monitor processing time for large datasets
- Error Recovery: Implement retry logic for network failures
def robust_data_processor(api_url: str, max_retries: int = 3) -> List[Dict]:
"""Robust data processing with error handling"""
for attempt in range(max_retries):
try:
response = requests.get(api_url, timeout=30)
response.raise_for_status()
data = response.json()
# Validate data structure
if not isinstance(data, list):
raise ValueError("Expected list of items")
# Process with error handling
processed_data = []
for item in data:
try:
processed_item = process_item_safely(item)
if processed_item:
processed_data.append(processed_item)
except Exception as e:
logging.warning(f"Error processing item {item.get('id', 'unknown')}: {e}")
continue
return processed_data
except requests.RequestException as e:
logging.error(f"Request failed (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
return []
API response filtering and data transformation are essential skills for effective web scraping and API integration. By implementing proper filtering, transformation, and error handling strategies, you can build robust applications that efficiently process large amounts of data while maintaining performance and reliability.