Table of contents

How do I Implement Scraping Automation with MCP Servers?

Implementing scraping automation with MCP (Model Context Protocol) servers enables you to build intelligent, self-managing data extraction workflows that can run on schedules, respond to events, and adapt to changing website structures. By combining MCP's tool-calling capabilities with automation frameworks, you can create robust scraping systems that require minimal manual intervention.

Understanding MCP-Based Scraping Automation

MCP servers provide a standardized interface for AI assistants to execute web scraping operations. Automation adds scheduling, monitoring, error recovery, and data pipeline management to these capabilities, creating end-to-end solutions for continuous data collection.

Key Components of Automated Scraping

  1. MCP Server: Exposes scraping tools (HTTP requests, browser automation, data extraction)
  2. Scheduler: Triggers scraping tasks at defined intervals
  3. Task Queue: Manages concurrent scraping jobs
  4. Data Pipeline: Processes, validates, and stores extracted data
  5. Monitor: Tracks performance, detects failures, and sends alerts
  6. AI Orchestrator: Uses Claude or another AI to adapt to page changes

Building an Automated Scraping MCP Server

Basic Automated Scraper Structure

Here's a complete MCP server with built-in automation capabilities in Python:

from mcp.server import Server
from mcp.types import Tool, TextContent, Resource
import httpx
from bs4 import BeautifulSoup
import asyncio
from datetime import datetime
import json
import logging
from typing import Dict, List, Any

app = Server("automated-scraper")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# In-memory storage for scraped data
scraping_results: Dict[str, List[Dict[str, Any]]] = {}
scraping_schedules: Dict[str, Dict[str, Any]] = {}

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="create_scraping_job",
            description="Create an automated scraping job with scheduling",
            inputSchema={
                "type": "object",
                "properties": {
                    "job_id": {"type": "string", "description": "Unique job identifier"},
                    "url": {"type": "string", "description": "Target URL to scrape"},
                    "selectors": {
                        "type": "object",
                        "description": "CSS selectors for data extraction",
                        "additionalProperties": {"type": "string"}
                    },
                    "schedule": {
                        "type": "string",
                        "description": "Cron expression (e.g., '0 */6 * * *' for every 6 hours)"
                    },
                    "max_retries": {"type": "integer", "default": 3}
                },
                "required": ["job_id", "url", "selectors"]
            }
        ),
        Tool(
            name="execute_scraping_job",
            description="Execute a scraping job immediately",
            inputSchema={
                "type": "object",
                "properties": {
                    "job_id": {"type": "string", "description": "Job identifier to execute"}
                },
                "required": ["job_id"]
            }
        ),
        Tool(
            name="get_scraping_results",
            description="Retrieve results from a scraping job",
            inputSchema={
                "type": "object",
                "properties": {
                    "job_id": {"type": "string", "description": "Job identifier"},
                    "limit": {"type": "integer", "description": "Max results to return", "default": 100}
                },
                "required": ["job_id"]
            }
        ),
        Tool(
            name="list_active_jobs",
            description="List all active scraping jobs",
            inputSchema={"type": "object", "properties": {}}
        )
    ]

async def scrape_url(url: str, selectors: Dict[str, str]) -> Dict[str, Any]:
    """Perform actual web scraping"""
    async with httpx.AsyncClient(timeout=30.0) as client:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = await client.get(url, headers=headers)
        response.raise_for_status()

        soup = BeautifulSoup(response.text, 'html.parser')

        result = {
            "timestamp": datetime.utcnow().isoformat(),
            "url": url,
            "status_code": response.status_code,
            "data": {}
        }

        for field, selector in selectors.items():
            elements = soup.select(selector)
            if len(elements) == 1:
                result["data"][field] = elements[0].get_text(strip=True)
            else:
                result["data"][field] = [el.get_text(strip=True) for el in elements]

        return result

async def execute_job_with_retry(job_config: Dict[str, Any]) -> Dict[str, Any]:
    """Execute scraping job with retry logic"""
    job_id = job_config["job_id"]
    max_retries = job_config.get("max_retries", 3)

    for attempt in range(max_retries):
        try:
            logger.info(f"Executing job {job_id}, attempt {attempt + 1}/{max_retries}")
            result = await scrape_url(job_config["url"], job_config["selectors"])

            # Store results
            if job_id not in scraping_results:
                scraping_results[job_id] = []
            scraping_results[job_id].append(result)

            logger.info(f"Job {job_id} completed successfully")
            return result

        except Exception as e:
            logger.error(f"Job {job_id} failed on attempt {attempt + 1}: {str(e)}")
            if attempt == max_retries - 1:
                return {
                    "error": str(e),
                    "timestamp": datetime.utcnow().isoformat(),
                    "job_id": job_id
                }
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        if name == "create_scraping_job":
            job_id = arguments["job_id"]
            scraping_schedules[job_id] = {
                "job_id": job_id,
                "url": arguments["url"],
                "selectors": arguments["selectors"],
                "schedule": arguments.get("schedule"),
                "max_retries": arguments.get("max_retries", 3),
                "created_at": datetime.utcnow().isoformat(),
                "enabled": True
            }

            return [TextContent(
                type="text",
                text=f"Created scraping job '{job_id}' for {arguments['url']}\n" +
                     f"Schedule: {arguments.get('schedule', 'manual')}\n" +
                     f"Selectors: {json.dumps(arguments['selectors'], indent=2)}"
            )]

        elif name == "execute_scraping_job":
            job_id = arguments["job_id"]
            if job_id not in scraping_schedules:
                raise ValueError(f"Job '{job_id}' not found")

            result = await execute_job_with_retry(scraping_schedules[job_id])

            return [TextContent(
                type="text",
                text=f"Scraping job '{job_id}' executed:\n{json.dumps(result, indent=2)}"
            )]

        elif name == "get_scraping_results":
            job_id = arguments["job_id"]
            limit = arguments.get("limit", 100)

            if job_id not in scraping_results:
                return [TextContent(type="text", text=f"No results for job '{job_id}'")]

            results = scraping_results[job_id][-limit:]

            return [TextContent(
                type="text",
                text=json.dumps(results, indent=2)
            )]

        elif name == "list_active_jobs":
            jobs_info = []
            for job_id, config in scraping_schedules.items():
                result_count = len(scraping_results.get(job_id, []))
                jobs_info.append({
                    "job_id": job_id,
                    "url": config["url"],
                    "schedule": config.get("schedule", "manual"),
                    "enabled": config["enabled"],
                    "results_count": result_count
                })

            return [TextContent(
                type="text",
                text=json.dumps(jobs_info, indent=2)
            )]

        else:
            raise ValueError(f"Unknown tool: {name}")

    except Exception as e:
        logger.error(f"Tool execution error: {str(e)}")
        return [TextContent(type="text", text=f"Error: {str(e)}")]

# Resource handler for accessing job configurations
@app.list_resources()
async def list_resources() -> list[Resource]:
    resources = []
    for job_id in scraping_schedules.keys():
        resources.append(
            Resource(
                uri=f"scraper://jobs/{job_id}/config",
                name=f"Job Config: {job_id}",
                mimeType="application/json",
                description=f"Configuration for scraping job {job_id}"
            )
        )
    return resources

JavaScript/TypeScript Implementation

Here's an equivalent implementation using Node.js:

#!/usr/bin/env node

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import axios from "axios";
import * as cheerio from "cheerio";
import cron from "node-cron";

const server = new Server(
  {
    name: "automated-scraper-mcp",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
      resources: {}
    },
  }
);

// Storage
const scrapingResults = new Map();
const scrapingJobs = new Map();
const cronTasks = new Map();

// Scraping function with retry logic
async function scrapeWithRetry(url, selectors, maxRetries = 3) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      const response = await axios.get(url, {
        headers: {
          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        },
        timeout: 30000
      });

      const $ = cheerio.load(response.data);
      const data = {};

      for (const [field, selector] of Object.entries(selectors)) {
        const elements = $(selector);
        if (elements.length === 1) {
          data[field] = elements.text().trim();
        } else {
          data[field] = elements.map((i, el) => $(el).text().trim()).get();
        }
      }

      return {
        timestamp: new Date().toISOString(),
        url: url,
        statusCode: response.status,
        data: data
      };

    } catch (error) {
      console.error(`Attempt ${attempt + 1} failed: ${error.message}`);
      if (attempt === maxRetries - 1) {
        throw error;
      }
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000));
    }
  }
}

// Execute job
async function executeJob(jobId) {
  const job = scrapingJobs.get(jobId);
  if (!job) {
    throw new Error(`Job ${jobId} not found`);
  }

  console.log(`Executing job ${jobId} at ${new Date().toISOString()}`);

  try {
    const result = await scrapeWithRetry(job.url, job.selectors, job.maxRetries);

    if (!scrapingResults.has(jobId)) {
      scrapingResults.set(jobId, []);
    }
    scrapingResults.get(jobId).push(result);

    console.log(`Job ${jobId} completed successfully`);
    return result;
  } catch (error) {
    console.error(`Job ${jobId} failed: ${error.message}`);
    throw error;
  }
}

// Schedule job
function scheduleJob(jobId, cronExpression) {
  if (cronTasks.has(jobId)) {
    cronTasks.get(jobId).stop();
  }

  const task = cron.schedule(cronExpression, async () => {
    try {
      await executeJob(jobId);
    } catch (error) {
      console.error(`Scheduled execution of ${jobId} failed:`, error);
    }
  });

  cronTasks.set(jobId, task);
  console.log(`Scheduled job ${jobId} with cron: ${cronExpression}`);
}

// Define tools
server.setRequestHandler("tools/list", async () => {
  return {
    tools: [
      {
        name: "create_scraping_job",
        description: "Create an automated scraping job with optional scheduling",
        inputSchema: {
          type: "object",
          properties: {
            jobId: { type: "string", description: "Unique job identifier" },
            url: { type: "string", description: "Target URL to scrape" },
            selectors: {
              type: "object",
              description: "CSS selectors for data extraction",
              additionalProperties: { type: "string" }
            },
            schedule: {
              type: "string",
              description: "Optional cron expression (e.g., '*/30 * * * *')"
            },
            maxRetries: { type: "number", default: 3 }
          },
          required: ["jobId", "url", "selectors"]
        }
      },
      {
        name: "execute_job",
        description: "Execute a scraping job immediately",
        inputSchema: {
          type: "object",
          properties: {
            jobId: { type: "string", description: "Job to execute" }
          },
          required: ["jobId"]
        }
      },
      {
        name: "get_results",
        description: "Get scraping results for a job",
        inputSchema: {
          type: "object",
          properties: {
            jobId: { type: "string" },
            limit: { type: "number", default: 100 }
          },
          required: ["jobId"]
        }
      },
      {
        name: "stop_job",
        description: "Stop a scheduled job",
        inputSchema: {
          type: "object",
          properties: {
            jobId: { type: "string" }
          },
          required: ["jobId"]
        }
      }
    ]
  };
});

// Handle tool calls
server.setRequestHandler("tools/call", async (request) => {
  const { name, arguments: args } = request.params;

  try {
    if (name === "create_scraping_job") {
      scrapingJobs.set(args.jobId, {
        jobId: args.jobId,
        url: args.url,
        selectors: args.selectors,
        schedule: args.schedule,
        maxRetries: args.maxRetries || 3,
        createdAt: new Date().toISOString(),
        enabled: true
      });

      if (args.schedule) {
        scheduleJob(args.jobId, args.schedule);
      }

      return {
        content: [{
          type: "text",
          text: `Created job '${args.jobId}' for ${args.url}\n` +
                `Schedule: ${args.schedule || 'manual'}\n` +
                `Selectors: ${JSON.stringify(args.selectors, null, 2)}`
        }]
      };
    }

    if (name === "execute_job") {
      const result = await executeJob(args.jobId);
      return {
        content: [{
          type: "text",
          text: JSON.stringify(result, null, 2)
        }]
      };
    }

    if (name === "get_results") {
      const results = scrapingResults.get(args.jobId) || [];
      const limited = results.slice(-args.limit);
      return {
        content: [{
          type: "text",
          text: JSON.stringify(limited, null, 2)
        }]
      };
    }

    if (name === "stop_job") {
      const task = cronTasks.get(args.jobId);
      if (task) {
        task.stop();
        cronTasks.delete(args.jobId);
      }
      return {
        content: [{
          type: "text",
          text: `Stopped job '${args.jobId}'`
        }]
      };
    }

    throw new Error(`Unknown tool: ${name}`);
  } catch (error) {
    return {
      content: [{
        type: "text",
        text: `Error: ${error.message}`
      }],
      isError: true
    };
  }
});

// Start server
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Automated Scraper MCP server running");

Advanced Automation Patterns

Event-Driven Scraping

Trigger scraping based on external events rather than schedules:

import asyncio
from typing import Callable, Dict

class EventDrivenScraper:
    def __init__(self):
        self.event_handlers: Dict[str, Callable] = {}

    def on_event(self, event_type: str):
        """Decorator to register event handlers"""
        def decorator(func: Callable):
            self.event_handlers[event_type] = func
            return func
        return decorator

    async def emit(self, event_type: str, data: dict):
        """Emit an event and trigger associated scraping"""
        handler = self.event_handlers.get(event_type)
        if handler:
            await handler(data)

# Usage
scraper = EventDrivenScraper()

@scraper.on_event("price_check")
async def scrape_prices(data: dict):
    url = data["url"]
    # Perform scraping similar to handling browser sessions in Puppeteer
    result = await scrape_url(url, {"price": ".product-price"})
    if result["data"]["price"]:
        # Alert if price changed
        print(f"Price update detected: {result['data']['price']}")

# Trigger event
await scraper.emit("price_check", {"url": "https://example.com/product"})

Parallel Scraping with Concurrency Control

import asyncio
from asyncio import Semaphore

class ConcurrentScraper:
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = Semaphore(max_concurrent)
        self.results = []

    async def scrape_one(self, url: str, selectors: dict):
        """Scrape single URL with semaphore"""
        async with self.semaphore:
            try:
                result = await scrape_url(url, selectors)
                self.results.append(result)
                return result
            except Exception as e:
                logger.error(f"Failed to scrape {url}: {e}")
                return {"error": str(e), "url": url}

    async def scrape_many(self, urls: list, selectors: dict):
        """Scrape multiple URLs concurrently"""
        tasks = [self.scrape_one(url, selectors) for url in urls]
        return await asyncio.gather(*tasks)

# Usage
scraper = ConcurrentScraper(max_concurrent=3)
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3"
]
results = await scraper.scrape_many(urls, {"title": "h1", "content": ".article"})

Data Pipeline Integration

Automatically process and store scraped data:

import { createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';

class DataPipeline {
  constructor() {
    this.processors = [];
  }

  addProcessor(processor) {
    this.processors.push(processor);
    return this;
  }

  async process(data) {
    let result = data;
    for (const processor of this.processors) {
      result = await processor(result);
    }
    return result;
  }
}

// Define processors
const cleanData = async (data) => {
  // Remove empty fields
  const cleaned = {};
  for (const [key, value] of Object.entries(data)) {
    if (value && value.length > 0) {
      cleaned[key] = value;
    }
  }
  return cleaned;
};

const validateData = async (data) => {
  // Add validation logic
  if (!data.title || !data.url) {
    throw new Error('Missing required fields');
  }
  return data;
};

const saveToFile = async (data) => {
  const filename = `scraped-${Date.now()}.json`;
  await fs.writeFile(filename, JSON.stringify(data, null, 2));
  console.log(`Saved to ${filename}`);
  return data;
};

// Create pipeline
const dataPipeline = new DataPipeline()
  .addProcessor(cleanData)
  .addProcessor(validateData)
  .addProcessor(saveToFile);

// Use in scraping workflow
async function scrapeAndProcess(url, selectors) {
  const rawData = await scrapeWithRetry(url, selectors);
  const processedData = await dataPipeline.process(rawData.data);
  return processedData;
}

Monitoring and Alerting

Health Checks and Monitoring

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class JobHealth:
    job_id: str
    last_success: Optional[datetime]
    last_failure: Optional[datetime]
    consecutive_failures: int
    success_rate: float

class HealthMonitor:
    def __init__(self):
        self.health_data: Dict[str, JobHealth] = {}

    def record_success(self, job_id: str):
        if job_id not in self.health_data:
            self.health_data[job_id] = JobHealth(
                job_id=job_id,
                last_success=None,
                last_failure=None,
                consecutive_failures=0,
                success_rate=1.0
            )

        health = self.health_data[job_id]
        health.last_success = datetime.utcnow()
        health.consecutive_failures = 0
        # Update success rate calculation

    def record_failure(self, job_id: str):
        if job_id not in self.health_data:
            self.health_data[job_id] = JobHealth(
                job_id=job_id,
                last_success=None,
                last_failure=None,
                consecutive_failures=0,
                success_rate=0.0
            )

        health = self.health_data[job_id]
        health.last_failure = datetime.utcnow()
        health.consecutive_failures += 1

        # Alert if too many failures
        if health.consecutive_failures >= 3:
            await self.send_alert(job_id, "Multiple consecutive failures")

    async def send_alert(self, job_id: str, message: str):
        # Integration with notification service
        logger.warning(f"ALERT for {job_id}: {message}")
        # Could send email, Slack message, etc.

Adaptive Scraping with AI

Use Claude to adapt to website changes:

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "adaptive_scrape":
        url = arguments["url"]
        target_data = arguments["target_data"]  # e.g., "product price and title"

        # First attempt with cached selectors
        if url in selector_cache:
            try:
                result = await scrape_url(url, selector_cache[url])
                return [TextContent(type="text", text=json.dumps(result))]
            except Exception:
                pass

        # If failed, fetch HTML and ask AI to find selectors
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            html_sample = response.text[:5000]  # First 5KB

        # This would integrate with Claude API
        prompt = f"""
        Analyze this HTML and provide CSS selectors to extract: {target_data}

        HTML sample:
        {html_sample}

        Respond with JSON: {{"field_name": "css_selector"}}
        """

        # Get selectors from AI (pseudo-code)
        # new_selectors = await ask_claude(prompt)
        # selector_cache[url] = new_selectors
        # result = await scrape_url(url, new_selectors)

        return [TextContent(type="text", text="Adaptive scraping completed")]

Deployment and Production Setup

Docker Deployment

FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm install

COPY . .

CMD ["node", "automated-scraper.js"]
# docker-compose.yml
version: '3.8'

services:
  scraper:
    build: .
    volumes:
      - ./data:/app/data
    environment:
      - LOG_LEVEL=info
    restart: unless-stopped

Configuration Management

# Install dependencies
npm install @modelcontextprotocol/sdk axios cheerio node-cron

# Configure MCP server in claude_desktop_config.json
{
  "mcpServers": {
    "automated-scraper": {
      "command": "node",
      "args": ["/path/to/automated-scraper.js"]
    }
  }
}

Best Practices for Automated Scraping

  1. Implement Rate Limiting: Respect target websites by limiting request frequency
  2. Use Exponential Backoff: Gradually increase retry delays after failures
  3. Monitor Resource Usage: Track memory and CPU to prevent resource exhaustion
  4. Log Everything: Maintain detailed logs for debugging and auditing
  5. Handle Pagination: Automatically detect and follow pagination patterns similar to navigating to different pages using Puppeteer
  6. Validate Data: Implement schema validation for extracted data
  7. Set Timeouts: Configure reasonable timeouts to prevent hanging requests
  8. Cache Intelligently: Store results to avoid redundant scraping
  9. Respect robots.txt: Check and honor website scraping policies
  10. Plan for Failures: Design for graceful degradation when scraping fails

Example: Complete E-commerce Price Monitoring

# Complete automated price monitoring system
import asyncio
from datetime import datetime

class PriceMonitor:
    def __init__(self):
        self.price_history = {}
        self.alert_thresholds = {}

    async def monitor_product(self, product_id: str, url: str, threshold: float):
        """Monitor product price and alert on changes"""
        self.alert_thresholds[product_id] = threshold

        # Create scraping job
        job_config = {
            "job_id": f"price_{product_id}",
            "url": url,
            "selectors": {
                "price": ".product-price",
                "availability": ".stock-status",
                "title": "h1.product-title"
            },
            "schedule": "0 */6 * * *",  # Every 6 hours
            "max_retries": 3
        }

        scraping_schedules[job_config["job_id"]] = job_config

        # Execute immediately for first result
        result = await execute_job_with_retry(job_config)

        if product_id not in self.price_history:
            self.price_history[product_id] = []

        self.price_history[product_id].append({
            "timestamp": result["timestamp"],
            "price": result["data"]["price"],
            "availability": result["data"]["availability"]
        })

        await self.check_price_alert(product_id, result["data"]["price"])

    async def check_price_alert(self, product_id: str, current_price: str):
        """Check if price dropped below threshold"""
        try:
            price_value = float(current_price.replace('$', '').replace(',', ''))
            threshold = self.alert_thresholds.get(product_id)

            if threshold and price_value <= threshold:
                logger.info(f"PRICE ALERT: {product_id} is now ${price_value}")
                # Send notification
        except ValueError:
            logger.error(f"Could not parse price: {current_price}")

# Usage
monitor = PriceMonitor()
await monitor.monitor_product(
    product_id="laptop_xyz",
    url="https://example.com/laptop",
    threshold=999.99
)

Conclusion

Implementing scraping automation with MCP servers combines the power of AI-assisted tool calling with robust automation frameworks. By creating scheduled jobs, event-driven workflows, and intelligent monitoring systems, you can build production-ready scraping solutions that require minimal maintenance.

The key to successful automation is balancing efficiency with respect for target websites, implementing comprehensive error handling similar to handling timeouts in Puppeteer, and maintaining detailed monitoring. With proper architecture, your MCP-based scraping automation can scale from simple price monitoring to complex data extraction pipelines serving enterprise applications.

Start with simple scheduled jobs, gradually add features like adaptive selectors and AI-powered error recovery, and always monitor performance to ensure your automation remains reliable and efficient.

Try WebScraping.AI for Your Web Scraping Needs

Looking for a powerful web scraping solution? WebScraping.AI provides an LLM-powered API that combines Chromium JavaScript rendering with rotating proxies for reliable data extraction.

Key Features:

  • AI-powered extraction: Ask questions about web pages or extract structured data fields
  • JavaScript rendering: Full Chromium browser support for dynamic content
  • Rotating proxies: Datacenter and residential proxies from multiple countries
  • Easy integration: Simple REST API with SDKs for Python, Ruby, PHP, and more
  • Reliable & scalable: Built for developers who need consistent results

Getting Started:

Get page content with AI analysis:

curl "https://api.webscraping.ai/ai/question?url=https://example.com&question=What is the main topic?&api_key=YOUR_API_KEY"

Extract structured data:

curl "https://api.webscraping.ai/ai/fields?url=https://example.com&fields[title]=Page title&fields[price]=Product price&api_key=YOUR_API_KEY"

Try in request builder

Related Questions

Get Started Now

WebScraping.AI provides rotating proxies, Chromium rendering and built-in HTML parser for web scraping
Icon