Table of contents

How do I Integrate Multiple MCP Servers in One Scraping Workflow?

Integrating multiple MCP (Model Context Protocol) servers in a single scraping workflow allows you to leverage specialized capabilities from different servers simultaneously. This approach enables complex data extraction scenarios where you might need browser automation, API integration, data processing, and storage operations working together seamlessly.

Understanding Multi-Server MCP Architecture

The Model Context Protocol enables clients to connect to multiple MCP servers concurrently. Each server can provide different tools, resources, and capabilities that complement each other in a scraping workflow. For example, you might use one MCP server for browser automation, another for data transformation, and a third for database operations.

Benefits of Multi-Server Integration

  • Specialized functionality: Each server handles what it does best
  • Modularity: Easier to maintain and update individual components
  • Scalability: Distribute workload across multiple servers
  • Flexibility: Mix and match capabilities based on project needs
  • Resilience: If one server fails, others can continue operating

Setting Up Multiple MCP Servers

Configuration in Claude Desktop

To configure multiple MCP servers, edit your Claude Desktop configuration file (claude_desktop_config.json):

{
  "mcpServers": {
    "playwright-server": {
      "command": "npx",
      "args": ["-y", "@executeautomation/playwright-mcp-server"]
    },
    "webscraping-server": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-webscraping"]
    },
    "database-server": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-postgres"],
      "env": {
        "POSTGRES_CONNECTION_STRING": "postgresql://user:pass@localhost/scraping_db"
      }
    },
    "filesystem-server": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem"],
      "env": {
        "ALLOWED_DIRECTORIES": "/path/to/scraping/output"
      }
    }
  }
}

Python-Based MCP Client Configuration

When building a custom MCP client in Python, you can connect to multiple servers programmatically:

import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def create_multi_server_client():
    """Initialize connections to multiple MCP servers"""

    # Server configurations
    servers = {
        'playwright': StdioServerParameters(
            command='npx',
            args=['-y', '@executeautomation/playwright-mcp-server']
        ),
        'webscraping': StdioServerParameters(
            command='npx',
            args=['-y', '@modelcontextprotocol/server-webscraping']
        ),
        'database': StdioServerParameters(
            command='npx',
            args=['-y', '@modelcontextprotocol/server-postgres'],
            env={'POSTGRES_CONNECTION_STRING': 'postgresql://user:pass@localhost/db'}
        )
    }

    # Create sessions for each server
    sessions = {}

    for server_name, params in servers.items():
        stdio_transport = await stdio_client(params)
        read, write = stdio_transport

        session = ClientSession(read, write)
        await session.initialize()

        sessions[server_name] = session
        print(f"Connected to {server_name} MCP server")

    return sessions

# Usage
async def main():
    sessions = await create_multi_server_client()

    # List available tools from all servers
    for server_name, session in sessions.items():
        tools = await session.list_tools()
        print(f"\n{server_name} tools: {[tool.name for tool in tools]}")

asyncio.run(main())

Building a Multi-Server Scraping Workflow

Example: E-commerce Product Scraper

Here's a practical example that combines multiple MCP servers to scrape product data, process it, and store it in a database:

import asyncio
from mcp import ClientSession

async def scrape_ecommerce_products(sessions, url):
    """
    Multi-server workflow for scraping e-commerce products

    Flow:
    1. Playwright server - Navigate and extract data
    2. WebScraping server - Parse additional content
    3. Database server - Store results
    4. Filesystem server - Save images
    """

    playwright_session = sessions['playwright']
    webscraping_session = sessions['webscraping']
    db_session = sessions['database']
    fs_session = sessions['filesystem']

    # Step 1: Navigate using Playwright for JavaScript-heavy pages
    print("Navigating to product page...")
    await playwright_session.call_tool(
        'browser_navigate',
        arguments={'url': url}
    )

    # Wait for content to load
    await playwright_session.call_tool(
        'browser_wait_for',
        arguments={'time': 2}
    )

    # Take snapshot of the page
    snapshot_result = await playwright_session.call_tool(
        'browser_snapshot',
        arguments={}
    )

    # Step 2: Extract product links from snapshot
    product_links = []
    # Parse snapshot to find product URLs
    # (Actual parsing logic would go here)

    # Step 3: For each product, use WebScraping server for detailed extraction
    products = []

    for product_url in product_links[:5]:  # Limit to 5 for example
        print(f"Scraping product: {product_url}")

        # Use WebScraping.AI server to extract structured data
        product_data = await webscraping_session.call_tool(
            'webscraping_ai_fields',
            arguments={
                'url': product_url,
                'fields': {
                    'title': 'Product title',
                    'price': 'Current price',
                    'description': 'Product description',
                    'image_url': 'Main product image URL',
                    'rating': 'Customer rating',
                    'availability': 'Stock status'
                }
            }
        )

        products.append(product_data)

    # Step 4: Store in database
    print("Storing products in database...")
    for product in products:
        await db_session.call_tool(
            'execute_query',
            arguments={
                'query': '''
                    INSERT INTO products
                    (title, price, description, image_url, rating, availability, scraped_at)
                    VALUES ($1, $2, $3, $4, $5, $6, NOW())
                    ON CONFLICT (title) DO UPDATE
                    SET price = EXCLUDED.price,
                        availability = EXCLUDED.availability,
                        scraped_at = NOW()
                ''',
                'params': [
                    product['title'],
                    product['price'],
                    product['description'],
                    product['image_url'],
                    product['rating'],
                    product['availability']
                ]
            }
        )

    # Step 5: Download and save product images using filesystem server
    print("Saving product images...")
    for idx, product in enumerate(products):
        if product.get('image_url'):
            # Download image (you'd need an HTTP client here)
            # Then save using filesystem server
            await fs_session.call_tool(
                'write_file',
                arguments={
                    'path': f'/path/to/scraping/output/images/product_{idx}.jpg',
                    'content': '...'  # Image binary data
                }
            )

    return products

JavaScript/Node.js Implementation

For Node.js-based workflows, you can use the MCP SDK similarly:

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';

async function createMultiServerWorkflow() {
  // Initialize multiple server connections
  const servers = {
    playwright: new Client({
      name: 'playwright-client',
      version: '1.0.0'
    }, {
      capabilities: {}
    }),
    webscraping: new Client({
      name: 'webscraping-client',
      version: '1.0.0'
    }, {
      capabilities: {}
    })
  };

  // Connect to Playwright server
  const playwrightTransport = new StdioClientTransport({
    command: 'npx',
    args: ['-y', '@executeautomation/playwright-mcp-server']
  });
  await servers.playwright.connect(playwrightTransport);

  // Connect to WebScraping server
  const webscrapingTransport = new StdioClientTransport({
    command: 'npx',
    args: ['-y', '@modelcontextprotocol/server-webscraping']
  });
  await servers.webscraping.connect(webscrapingTransport);

  return servers;
}

async function scrapeWithMultipleServers(url) {
  const servers = await createMultiServerWorkflow();

  try {
    // Use Playwright for dynamic content
    await servers.playwright.request({
      method: 'tools/call',
      params: {
        name: 'browser_navigate',
        arguments: { url }
      }
    });

    // Get page snapshot
    const snapshot = await servers.playwright.request({
      method: 'tools/call',
      params: {
        name: 'browser_snapshot',
        arguments: {}
      }
    });

    // Extract specific fields using WebScraping.AI
    const productData = await servers.webscraping.request({
      method: 'tools/call',
      params: {
        name: 'webscraping_ai_fields',
        arguments: {
          url: url,
          fields: {
            title: 'Product title',
            price: 'Product price',
            description: 'Product description'
          }
        }
      }
    });

    console.log('Scraped data:', productData);
    return productData;

  } finally {
    // Clean up connections
    await servers.playwright.close();
    await servers.webscraping.close();
  }
}

// Execute the workflow
scrapeWithMultipleServers('https://example.com/products/item-123');

Best Practices for Multi-Server Integration

1. Error Handling and Fallbacks

When working with multiple servers, implement robust error handling similar to how to handle browser events in Puppeteer:

async def call_tool_with_fallback(sessions, primary_server, fallback_server, tool_name, arguments):
    """Call a tool with automatic fallback to another server"""
    try:
        result = await sessions[primary_server].call_tool(tool_name, arguments)
        return result
    except Exception as e:
        print(f"Primary server {primary_server} failed: {e}")
        print(f"Attempting fallback to {fallback_server}...")
        try:
            result = await sessions[fallback_server].call_tool(tool_name, arguments)
            return result
        except Exception as fallback_error:
            print(f"Fallback server also failed: {fallback_error}")
            raise

2. Resource Management

Properly manage connections to avoid resource leaks:

async def workflow_with_cleanup(urls):
    sessions = None
    try:
        sessions = await create_multi_server_client()

        for url in urls:
            await scrape_ecommerce_products(sessions, url)

    except Exception as e:
        print(f"Workflow error: {e}")
        raise
    finally:
        # Clean up all sessions
        if sessions:
            for server_name, session in sessions.items():
                try:
                    await session.cleanup()
                    print(f"Closed {server_name} session")
                except Exception as e:
                    print(f"Error closing {server_name}: {e}")

3. Parallel Processing

Leverage multiple servers for concurrent operations when dealing with multiple pages in parallel:

async def parallel_multi_server_scraping(urls, sessions):
    """Process multiple URLs concurrently using multiple servers"""

    async def scrape_single_url(url):
        try:
            # Each URL gets its own workflow
            return await scrape_ecommerce_products(sessions, url)
        except Exception as e:
            print(f"Error scraping {url}: {e}")
            return None

    # Process up to 5 URLs concurrently
    semaphore = asyncio.Semaphore(5)

    async def bounded_scrape(url):
        async with semaphore:
            return await scrape_single_url(url)

    results = await asyncio.gather(
        *[bounded_scrape(url) for url in urls],
        return_exceptions=True
    )

    return [r for r in results if r is not None]

4. Server Health Monitoring

Implement health checks for your MCP servers:

async def check_server_health(sessions):
    """Verify all MCP servers are responding"""
    health_status = {}

    for server_name, session in sessions.items():
        try:
            # Try listing tools as a health check
            tools = await session.list_tools()
            health_status[server_name] = {
                'status': 'healthy',
                'tools_count': len(tools)
            }
        except Exception as e:
            health_status[server_name] = {
                'status': 'unhealthy',
                'error': str(e)
            }

    return health_status

Common Patterns and Use Cases

Pattern 1: Browser + API Hybrid Scraping

Combine browser automation for JavaScript-rendered content with API calls for structured data:

async def hybrid_scraping_workflow(sessions, base_url):
    """Use browser for navigation, API for data extraction"""

    # Navigate with browser
    await sessions['playwright'].call_tool(
        'browser_navigate',
        arguments={'url': base_url}
    )

    # Get current URL (might have redirected)
    current_url = await sessions['playwright'].call_tool(
        'browser_evaluate',
        arguments={'function': '() => window.location.href'}
    )

    # Extract data via API for efficiency
    data = await sessions['webscraping'].call_tool(
        'webscraping_ai_fields',
        arguments={
            'url': current_url,
            'fields': {
                'title': 'Page title',
                'content': 'Main content'
            }
        }
    )

    return data

Pattern 2: Data Pipeline with Validation

Create a multi-stage pipeline with validation at each step:

async def validated_scraping_pipeline(sessions, urls):
    """Multi-server pipeline with validation"""

    results = []

    for url in urls:
        # Stage 1: Scrape
        raw_data = await sessions['webscraping'].call_tool(
            'webscraping_ai_question',
            arguments={
                'url': url,
                'question': 'Extract all product information as JSON'
            }
        )

        # Stage 2: Validate
        if not validate_product_data(raw_data):
            print(f"Invalid data from {url}, using browser fallback")
            # Fallback to browser-based extraction
            raw_data = await browser_based_extraction(sessions['playwright'], url)

        # Stage 3: Transform
        cleaned_data = transform_product_data(raw_data)

        # Stage 4: Store
        await sessions['database'].call_tool(
            'execute_query',
            arguments={
                'query': 'INSERT INTO products (...) VALUES (...)',
                'params': cleaned_data
            }
        )

        results.append(cleaned_data)

    return results

def validate_product_data(data):
    """Validate scraped data meets requirements"""
    required_fields = ['title', 'price', 'description']
    return all(field in data for field in required_fields)

Troubleshooting Multi-Server Setups

Connection Issues

If servers fail to connect:

  1. Verify each server is installed: npx -y <server-package> --version
  2. Check server logs in Claude Desktop (Help → View Logs)
  3. Ensure environment variables are correctly set
  4. Test servers individually before combining them

Performance Optimization

  • Connection pooling: Reuse sessions across multiple operations
  • Caching: Cache responses from servers when appropriate
  • Timeout configuration: Set appropriate timeouts for each server type
  • Rate limiting: Implement rate limiting to avoid overwhelming servers

Conclusion

Integrating multiple MCP servers in a scraping workflow provides powerful capabilities for complex data extraction tasks. By combining specialized servers for browser automation, API access, data storage, and file operations, you can build robust, scalable scraping solutions that handle diverse requirements efficiently.

The key to success is proper architecture planning, robust error handling, and understanding each server's strengths. Start with simple two-server integrations and gradually expand as your requirements grow.

For more advanced techniques, explore how different tools work together, such as combining browser automation with specialized extraction methods to create comprehensive data collection systems.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon