How do I use MCP integration with my existing scraping tools?
Integrating the Model Context Protocol (MCP) with your existing web scraping tools allows you to combine the power of AI assistants with your established scraping infrastructure. Whether you're using Puppeteer, Playwright, Selenium, or web scraping APIs, MCP provides a standardized way to expose these tools to AI models, enabling natural language control and intelligent automation of complex scraping workflows.
This guide demonstrates how to wrap your existing scraping tools in MCP servers, making them accessible to AI assistants like Claude while preserving all their functionality and your existing codebase.
Understanding MCP Integration Architecture
MCP acts as a bridge between AI assistants and your scraping tools. The basic architecture looks like this:
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Claude │◄───────►│ MCP Server │◄───────►│ Your Existing │
│ AI │ MCP │ (Wrapper) │ │ Scraping Tool │
└─────────────┘ └──────────────┘ └─────────────────┘
│
▼
┌──────────┐
│ Target │
│ Website │
└──────────┘
Your existing scraping logic remains unchanged—you simply wrap it in an MCP server that exposes its capabilities as tools the AI can invoke.
Integrating MCP with Puppeteer
Puppeteer is one of the most popular browser automation tools. Here's how to create an MCP server that exposes your existing Puppeteer scripts:
Python MCP Server for Puppeteer (via pyppeteer)
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.models import InitializationOptions
from mcp.types import Tool, TextContent
from pyppeteer import launch
import json
app = Server("puppeteer-mcp-server")
# Global browser instance
browser = None
async def get_browser():
global browser
if browser is None:
browser = await launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox']
)
return browser
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="puppeteer_navigate",
description="Navigate to a URL and extract content using Puppeteer",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to navigate to"},
"wait_selector": {"type": "string", "description": "CSS selector to wait for"},
"timeout": {"type": "number", "description": "Navigation timeout in ms"}
},
"required": ["url"]
}
),
Tool(
name="puppeteer_screenshot",
description="Take a screenshot of a webpage",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"selector": {"type": "string", "description": "Element to screenshot"},
"full_page": {"type": "boolean", "description": "Capture full page"}
},
"required": ["url"]
}
),
Tool(
name="puppeteer_execute",
description="Execute custom JavaScript in the page context",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"script": {"type": "string", "description": "JavaScript to execute"}
},
"required": ["url", "script"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
browser = await get_browser()
page = await browser.newPage()
try:
if name == "puppeteer_navigate":
# Your existing Puppeteer navigation logic
await page.goto(arguments["url"], {
'timeout': arguments.get('timeout', 30000),
'waitUntil': 'networkidle2'
})
if 'wait_selector' in arguments:
await page.waitForSelector(arguments['wait_selector'])
content = await page.content()
title = await page.title()
return [TextContent(
type="text",
text=f"Title: {title}\n\nHTML Content:\n{content}"
)]
elif name == "puppeteer_screenshot":
await page.goto(arguments["url"])
screenshot_options = {
'type': 'png',
'fullPage': arguments.get('full_page', False)
}
if 'selector' in arguments:
element = await page.querySelector(arguments['selector'])
screenshot = await element.screenshot(screenshot_options)
else:
screenshot = await page.screenshot(screenshot_options)
# Return base64 encoded screenshot
import base64
screenshot_b64 = base64.b64encode(screenshot).decode()
return [TextContent(
type="text",
text=f"Screenshot captured (base64):\n{screenshot_b64[:100]}..."
)]
elif name == "puppeteer_execute":
await page.goto(arguments["url"])
result = await page.evaluate(arguments["script"])
return [TextContent(
type="text",
text=f"Script execution result:\n{json.dumps(result, indent=2)}"
)]
finally:
await page.close()
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="puppeteer-mcp",
server_version="1.0.0"
)
)
if __name__ == "__main__":
asyncio.run(main())
JavaScript/TypeScript MCP Server for Puppeteer
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import puppeteer, { Browser, Page } from "puppeteer";
const server = new Server(
{
name: "puppeteer-mcp-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
let browser: Browser | null = null;
async function getBrowser(): Promise<Browser> {
if (!browser) {
browser = await puppeteer.launch({
headless: true,
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
}
return browser;
}
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "puppeteer_scrape",
description: "Scrape webpage content using Puppeteer with full JavaScript rendering",
inputSchema: {
type: "object",
properties: {
url: { type: "string", description: "URL to scrape" },
wait_for: { type: "string", description: "CSS selector to wait for" },
extract_selector: { type: "string", description: "CSS selector to extract" },
},
required: ["url"],
},
},
{
name: "puppeteer_interact",
description: "Interact with page elements (click, type, etc.)",
inputSchema: {
type: "object",
properties: {
url: { type: "string" },
actions: {
type: "array",
description: "Array of actions to perform",
items: {
type: "object",
properties: {
type: { type: "string", enum: ["click", "type", "wait"] },
selector: { type: "string" },
value: { type: "string" }
}
}
}
},
required: ["url", "actions"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
const browser = await getBrowser();
const page = await browser.newPage();
try {
if (name === "puppeteer_scrape") {
// Navigate similar to how you would handle navigation in Puppeteer
await page.goto(args.url, { waitUntil: 'networkidle2' });
if (args.wait_for) {
await page.waitForSelector(args.wait_for);
}
let content: string;
if (args.extract_selector) {
content = await page.evaluate((selector) => {
const elements = Array.from(document.querySelectorAll(selector));
return elements.map(el => el.textContent?.trim()).join('\n');
}, args.extract_selector);
} else {
content = await page.content();
}
return {
content: [
{
type: "text",
text: content,
},
],
};
}
if (name === "puppeteer_interact") {
await page.goto(args.url, { waitUntil: 'networkidle2' });
for (const action of args.actions) {
switch (action.type) {
case "click":
await page.click(action.selector);
break;
case "type":
await page.type(action.selector, action.value);
break;
case "wait":
await page.waitForSelector(action.selector);
break;
}
}
const finalContent = await page.content();
return {
content: [
{
type: "text",
text: finalContent,
},
],
};
}
throw new Error(`Unknown tool: ${name}`);
} finally {
await page.close();
}
});
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Puppeteer MCP Server running on stdio");
}
main().catch(console.error);
Integrating MCP with Playwright
Playwright is another powerful browser automation tool. Here's how to integrate it with MCP:
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.models import InitializationOptions
from mcp.types import Tool, TextContent
from playwright.async_api import async_playwright
import json
app = Server("playwright-mcp-server")
playwright_instance = None
browser = None
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="playwright_scrape",
description="Scrape content using Playwright with full browser capabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"browser_type": {
"type": "string",
"enum": ["chromium", "firefox", "webkit"],
"description": "Browser engine to use"
},
"wait_for_selector": {"type": "string"},
"timeout": {"type": "number"}
},
"required": ["url"]
}
),
Tool(
name="playwright_form_fill",
description="Fill and submit forms using Playwright",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"form_data": {
"type": "object",
"description": "Key-value pairs for form fields"
},
"submit_selector": {"type": "string"}
},
"required": ["url", "form_data"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
global playwright_instance, browser
if playwright_instance is None:
playwright_instance = await async_playwright().start()
browser_type = arguments.get('browser_type', 'chromium')
if browser is None:
if browser_type == 'firefox':
browser = await playwright_instance.firefox.launch()
elif browser_type == 'webkit':
browser = await playwright_instance.webkit.launch()
else:
browser = await playwright_instance.chromium.launch()
context = await browser.new_context()
page = await context.new_page()
try:
if name == "playwright_scrape":
await page.goto(arguments["url"], timeout=arguments.get('timeout', 30000))
if 'wait_for_selector' in arguments:
await page.wait_for_selector(arguments['wait_for_selector'])
content = await page.content()
title = await page.title()
return [TextContent(
type="text",
text=f"Page Title: {title}\n\nContent:\n{content}"
)]
elif name == "playwright_form_fill":
await page.goto(arguments["url"])
# Fill form fields
for selector, value in arguments["form_data"].items():
await page.fill(selector, str(value))
# Submit form if selector provided
if 'submit_selector' in arguments:
await page.click(arguments['submit_selector'])
await page.wait_for_load_state('networkidle')
result_content = await page.content()
return [TextContent(
type="text",
text=f"Form submitted successfully:\n{result_content}"
)]
finally:
await page.close()
await context.close()
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="playwright-mcp",
server_version="1.0.0"
)
)
if __name__ == "__main__":
asyncio.run(main())
Integrating MCP with Web Scraping APIs
If you're using web scraping APIs like WebScraping.AI, you can create an MCP wrapper that preserves your existing API calls:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import axios from "axios";
const API_KEY = process.env.WEBSCRAPING_AI_API_KEY;
const BASE_URL = "https://api.webscraping.ai";
const server = new Server(
{
name: "webscraping-api-mcp",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// Wrapper for your existing API integration
class WebScrapingAPIClient {
async getHTML(url: string, options: any = {}) {
const response = await axios.get(`${BASE_URL}/html`, {
params: {
url,
api_key: API_KEY,
js: options.js ?? true,
js_timeout: options.js_timeout ?? 2000,
proxy: options.proxy ?? 'datacenter',
...options
}
});
return response.data;
}
async getText(url: string, options: any = {}) {
const response = await axios.get(`${BASE_URL}/text`, {
params: {
url,
api_key: API_KEY,
...options
}
});
return response.data;
}
async extractFields(url: string, fields: object, options: any = {}) {
const response = await axios.post(`${BASE_URL}/fields`,
{ fields },
{
params: {
url,
api_key: API_KEY,
...options
}
}
);
return response.data;
}
async askQuestion(url: string, question: string, options: any = {}) {
const response = await axios.post(`${BASE_URL}/question`,
{ question },
{
params: {
url,
api_key: API_KEY,
...options
}
}
);
return response.data;
}
}
const apiClient = new WebScrapingAPIClient();
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "scrape_html",
description: "Get HTML content with JavaScript rendering",
inputSchema: {
type: "object",
properties: {
url: { type: "string" },
wait_for: { type: "string", description: "CSS selector to wait for" },
js_timeout: { type: "number" },
},
required: ["url"],
},
},
{
name: "scrape_text",
description: "Extract clean text content from webpage",
inputSchema: {
type: "object",
properties: {
url: { type: "string" },
},
required: ["url"],
},
},
{
name: "extract_fields",
description: "Extract structured data fields using AI",
inputSchema: {
type: "object",
properties: {
url: { type: "string" },
fields: {
type: "object",
description: "Field names with extraction instructions"
},
},
required: ["url", "fields"],
},
},
{
name: "ask_question",
description: "Ask a question about webpage content",
inputSchema: {
type: "object",
properties: {
url: { type: "string" },
question: { type: "string" },
},
required: ["url", "question"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
let result: any;
switch (name) {
case "scrape_html":
result = await apiClient.getHTML(args.url, {
wait_for: args.wait_for,
js_timeout: args.js_timeout
});
break;
case "scrape_text":
result = await apiClient.getText(args.url);
break;
case "extract_fields":
result = await apiClient.extractFields(args.url, args.fields);
break;
case "ask_question":
result = await apiClient.askQuestion(args.url, args.question);
break;
default:
throw new Error(`Unknown tool: ${name}`);
}
return {
content: [
{
type: "text",
text: typeof result === 'string' ? result : JSON.stringify(result, null, 2),
},
],
};
} catch (error: any) {
return {
content: [
{
type: "text",
text: `Error: ${error.message}`,
},
],
isError: true,
};
}
});
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("WebScraping.AI MCP Server running");
}
main().catch(console.error);
Integrating MCP with Selenium
For Selenium-based scrapers, create an MCP wrapper around your existing Selenium code:
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.models import InitializationOptions
from mcp.types import Tool, TextContent
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
app = Server("selenium-mcp-server")
# Global driver instance
driver = None
def get_driver():
global driver
if driver is None:
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
return driver
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="selenium_navigate",
description="Navigate to URL and extract content using Selenium",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"wait_element": {"type": "string", "description": "Element to wait for (CSS selector)"},
"timeout": {"type": "number"}
},
"required": ["url"]
}
),
Tool(
name="selenium_find_elements",
description="Find and extract elements using various selectors",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"selector": {"type": "string"},
"by": {
"type": "string",
"enum": ["css", "xpath", "id", "class", "tag"],
"description": "Selector type"
}
},
"required": ["url", "selector"]
}
),
Tool(
name="selenium_execute_script",
description="Execute JavaScript in the browser context",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"script": {"type": "string"}
},
"required": ["url", "script"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
driver = get_driver()
try:
if name == "selenium_navigate":
driver.get(arguments["url"])
if 'wait_element' in arguments:
timeout = arguments.get('timeout', 10)
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, arguments['wait_element']))
)
page_source = driver.page_source
title = driver.title
return [TextContent(
type="text",
text=f"Title: {title}\n\nPage Source:\n{page_source}"
)]
elif name == "selenium_find_elements":
driver.get(arguments["url"])
by_type = arguments.get('by', 'css')
selector = arguments['selector']
by_mapping = {
'css': By.CSS_SELECTOR,
'xpath': By.XPATH,
'id': By.ID,
'class': By.CLASS_NAME,
'tag': By.TAG_NAME
}
elements = driver.find_elements(by_mapping[by_type], selector)
texts = [elem.text for elem in elements]
return [TextContent(
type="text",
text=f"Found {len(elements)} elements:\n" + "\n".join(texts)
)]
elif name == "selenium_execute_script":
driver.get(arguments["url"])
result = driver.execute_script(arguments["script"])
return [TextContent(
type="text",
text=f"Script result:\n{json.dumps(result, indent=2)}"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="selenium-mcp",
server_version="1.0.0"
)
)
if __name__ == "__main__":
asyncio.run(main())
Configuration and Deployment
Installing Dependencies
For Python MCP servers:
pip install mcp httpx playwright pyppeteer selenium
playwright install # Install browser binaries
For JavaScript/TypeScript MCP servers:
npm install @modelcontextprotocol/sdk puppeteer playwright-core axios
npx playwright install # Install browser binaries
Claude Desktop Configuration
Add your MCP servers to Claude Desktop configuration:
macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
Windows: %APPDATA%\Claude\claude_desktop_config.json
{
"mcpServers": {
"puppeteer": {
"command": "node",
"args": ["/path/to/puppeteer-mcp-server.js"]
},
"playwright": {
"command": "python",
"args": ["/path/to/playwright-mcp-server.py"]
},
"webscraping-api": {
"command": "node",
"args": ["/path/to/webscraping-api-mcp.js"],
"env": {
"WEBSCRAPING_AI_API_KEY": "your_api_key_here"
}
},
"selenium": {
"command": "python",
"args": ["/path/to/selenium-mcp-server.py"]
}
}
}
Best Practices for MCP Integration
1. Preserve Existing Logic
Keep your existing scraping logic intact and use MCP as a wrapper:
# Your existing scraping function
async def scrape_product_data(url: str):
# ... your existing code ...
pass
# MCP wrapper
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "scrape_product":
# Call your existing function
result = await scrape_product_data(arguments["url"])
return [TextContent(type="text", text=json.dumps(result))]
2. Handle Resource Management
When integrating tools like Puppeteer for browser automation, properly manage browser instances:
let browserInstance: Browser | null = null;
async function getBrowser(): Promise<Browser> {
if (!browserInstance) {
browserInstance = await puppeteer.launch({
headless: true,
args: ['--no-sandbox']
});
}
return browserInstance;
}
// Clean up on shutdown
process.on('SIGINT', async () => {
if (browserInstance) {
await browserInstance.close();
}
process.exit(0);
});
3. Error Handling and Timeouts
Implement robust error handling similar to how you'd handle errors in Puppeteer:
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
# Set timeout for operations
timeout = arguments.get('timeout', 30)
async with asyncio.timeout(timeout):
# Your scraping logic
result = await perform_scraping(arguments)
return [TextContent(type="text", text=result)]
except asyncio.TimeoutError:
return [TextContent(
type="text",
text=f"Operation timed out after {timeout} seconds"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
4. Combine Multiple Tools
Create workflows that combine different scraping approaches:
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "scrape_and_analyze") {
// Use Puppeteer for dynamic content
const browser = await puppeteer.launch();
const page = await browser.newPage();
await page.goto(request.params.arguments.url);
// Navigate through pagination like in Puppeteer workflows
const data = [];
let hasNextPage = true;
while (hasNextPage) {
const pageData = await page.evaluate(() => {
// Extract data from current page
return Array.from(document.querySelectorAll('.item')).map(item => ({
title: item.querySelector('h2')?.textContent,
price: item.querySelector('.price')?.textContent
}));
});
data.push(...pageData);
// Check for next page
const nextButton = await page.$('.next-page');
if (nextButton) {
await nextButton.click();
await page.waitForNavigation();
} else {
hasNextPage = false;
}
}
await browser.close();
return {
content: [{ type: "text", text: JSON.stringify(data, null, 2) }]
};
}
});
Real-World Integration Examples
E-commerce Price Monitoring
Combine your existing scraping tools with MCP for AI-powered monitoring:
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "monitor_prices":
products = arguments["product_urls"]
results = []
for url in products:
# Use your existing Selenium/Puppeteer code
price_data = await your_existing_price_scraper(url)
results.append(price_data)
# AI can now analyze the results
return [TextContent(
type="text",
text=json.dumps(results, indent=2)
)]
Content Aggregation Pipeline
Leverage MCP to orchestrate complex scraping workflows:
# Install required dependencies
npm install @modelcontextprotocol/sdk puppeteer axios cheerio
# Run your MCP server
node content-aggregator-mcp.js
Then use natural language with Claude:
"Scrape the top 10 articles from TechCrunch, extract their titles and summaries,
and save the results to a JSON file"
Advantages of MCP Integration
- Preserve Existing Code: Your current scraping logic remains unchanged
- Natural Language Control: Control complex workflows through conversation
- AI-Powered Decision Making: Let AI determine the best scraping strategy
- Unified Interface: Expose multiple tools through a single MCP interface
- Enhanced Debugging: AI can help troubleshoot scraping issues in real-time
Troubleshooting Common Issues
Browser Not Launching
# Install browser dependencies
npx playwright install-deps
# Or for Puppeteer
npx puppeteer browsers install chrome
MCP Server Not Connecting
Check your configuration paths and ensure the server script is executable:
# Make script executable
chmod +x /path/to/your-mcp-server.py
# Test running directly
python /path/to/your-mcp-server.py
API Rate Limits
Implement rate limiting in your MCP wrapper:
import asyncio
from asyncio import Semaphore
rate_limiter = Semaphore(5) # Max 5 concurrent requests
@app.call_tool()
async def call_tool(name: str, arguments: dict):
async with rate_limiter:
# Your API calls here
result = await api_call(arguments)
return [TextContent(type="text", text=result)]
Conclusion
Integrating MCP with your existing scraping tools provides the best of both worlds: the reliability and customization of your established codebase combined with the intelligence and natural language capabilities of AI assistants. Whether you're using Puppeteer, Playwright, Selenium, or web scraping APIs, MCP integration enables you to build more powerful, flexible, and maintainable scraping solutions.
By wrapping your tools in MCP servers, you can automate complex workflows, handle edge cases more intelligently, and dramatically reduce the time spent on routine scraping tasks—all while preserving your existing investments in scraping infrastructure.