How do I Create an MCP Server from Scratch?
Creating a Model Context Protocol (MCP) server from scratch allows you to build custom tools and resources that integrate seamlessly with AI assistants like Claude. This guide walks you through building an MCP server for web scraping use cases, from initial setup to production deployment.
Understanding MCP Server Architecture
An MCP server is a lightweight service that exposes tools, resources, and prompts to MCP clients. The server communicates via JSON-RPC 2.0 over standard input/output (stdio) or Server-Sent Events (SSE). For web scraping applications, MCP servers can provide:
- Tools: Executable functions for scraping, data extraction, and browser automation
- Resources: Access to scraped data, configuration files, or cached results
- Prompts: Pre-built templates for common scraping tasks
Prerequisites
Before building your MCP server, ensure you have:
- Node.js (v18 or later) or Python (3.10+)
- Basic understanding of async/await patterns
- Familiarity with web scraping concepts
- The MCP SDK for your chosen language
# For TypeScript/JavaScript
npm install @modelcontextprotocol/sdk
# For Python
pip install mcp
Building Your First MCP Server (TypeScript)
Let's create a basic MCP server that provides web scraping capabilities using Puppeteer.
Step 1: Project Setup
mkdir mcp-scraper-server
cd mcp-scraper-server
npm init -y
npm install @modelcontextprotocol/sdk puppeteer zod
npm install -D typescript @types/node tsx
Create a tsconfig.json
:
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./build",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true
},
"include": ["src/**/*"]
}
Step 2: Implement the MCP Server
Create src/index.ts
:
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
Tool,
} from "@modelcontextprotocol/sdk/types.js";
import puppeteer, { Browser, Page } from "puppeteer";
import { z } from "zod";
// Tool input schemas
const ScrapePageSchema = z.object({
url: z.string().url(),
selector: z.string().optional(),
waitFor: z.number().optional().default(1000),
});
const ExtractTextSchema = z.object({
url: z.string().url(),
selectors: z.array(z.string()),
javascript: z.boolean().optional().default(true),
});
class ScraperMCPServer {
private server: Server;
private browser: Browser | null = null;
constructor() {
this.server = new Server(
{
name: "scraper-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
this.setupHandlers();
this.setupErrorHandling();
}
private setupErrorHandling(): void {
this.server.onerror = (error) => {
console.error("[MCP Error]", error);
};
process.on("SIGINT", async () => {
await this.cleanup();
process.exit(0);
});
}
private setupHandlers(): void {
// List available tools
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "scrape_page",
description: "Scrape HTML content from a webpage with optional CSS selector",
inputSchema: {
type: "object",
properties: {
url: {
type: "string",
description: "The URL to scrape",
},
selector: {
type: "string",
description: "Optional CSS selector to extract specific elements",
},
waitFor: {
type: "number",
description: "Milliseconds to wait before scraping (default: 1000)",
},
},
required: ["url"],
},
} as Tool,
{
name: "extract_text",
description: "Extract text content from multiple elements on a page",
inputSchema: {
type: "object",
properties: {
url: {
type: "string",
description: "The URL to extract text from",
},
selectors: {
type: "array",
items: { type: "string" },
description: "Array of CSS selectors to extract text from",
},
javascript: {
type: "boolean",
description: "Whether to execute JavaScript (default: true)",
},
},
required: ["url", "selectors"],
},
} as Tool,
],
}));
// Handle tool calls
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "scrape_page":
return await this.scrapePage(args);
case "extract_text":
return await this.extractText(args);
default:
throw new Error(`Unknown tool: ${name}`);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return {
content: [
{
type: "text",
text: `Error: ${errorMessage}`,
},
],
isError: true,
};
}
});
}
private async getBrowser(): Promise<Browser> {
if (!this.browser) {
this.browser = await puppeteer.launch({
headless: true,
args: ["--no-sandbox", "--disable-setuid-sandbox"],
});
}
return this.browser;
}
private async scrapePage(args: unknown) {
const { url, selector, waitFor } = ScrapePageSchema.parse(args);
const browser = await this.getBrowser();
const page = await browser.newPage();
try {
await page.goto(url, { waitUntil: "networkidle0" });
await page.waitForTimeout(waitFor);
let content: string;
if (selector) {
content = await page.$eval(selector, (el) => el.outerHTML);
} else {
content = await page.content();
}
return {
content: [
{
type: "text",
text: JSON.stringify({ url, content, selector }, null, 2),
},
],
};
} finally {
await page.close();
}
}
private async extractText(args: unknown) {
const { url, selectors, javascript } = ExtractTextSchema.parse(args);
const browser = await this.getBrowser();
const page = await browser.newPage();
try {
if (!javascript) {
await page.setJavaScriptEnabled(false);
}
await page.goto(url, { waitUntil: "networkidle0" });
const results: Record<string, string[]> = {};
for (const selector of selectors) {
const elements = await page.$$(selector);
const texts = await Promise.all(
elements.map((el) => el.evaluate((node) => node.textContent?.trim() || ""))
);
results[selector] = texts.filter((text) => text.length > 0);
}
return {
content: [
{
type: "text",
text: JSON.stringify({ url, results }, null, 2),
},
],
};
} finally {
await page.close();
}
}
private async cleanup(): Promise<void> {
if (this.browser) {
await this.browser.close();
this.browser = null;
}
}
async run(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error("Scraper MCP server running on stdio");
}
}
// Start the server
const server = new ScraperMCPServer();
server.run().catch(console.error);
Step 3: Configure Package.json
Update your package.json
:
{
"name": "mcp-scraper-server",
"version": "1.0.0",
"type": "module",
"bin": {
"mcp-scraper-server": "./build/index.js"
},
"scripts": {
"build": "tsc",
"prepare": "npm run build",
"start": "node build/index.js"
}
}
Building an MCP Server in Python
For Python developers, here's an equivalent implementation:
#!/usr/bin/env python3
import asyncio
import json
from typing import Any, Dict, List, Optional
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from playwright.async_api import async_playwright, Browser, Page
from pydantic import BaseModel, Field
class ScrapePageArgs(BaseModel):
url: str = Field(description="The URL to scrape")
selector: Optional[str] = Field(None, description="Optional CSS selector")
wait_for: int = Field(1000, description="Milliseconds to wait before scraping")
class ExtractTextArgs(BaseModel):
url: str = Field(description="The URL to extract text from")
selectors: List[str] = Field(description="CSS selectors to extract text from")
javascript: bool = Field(True, description="Whether to execute JavaScript")
class ScraperMCPServer:
def __init__(self):
self.server = Server("scraper-server")
self.browser: Optional[Browser] = None
self.playwright = None
self.server.list_tools = self.handle_list_tools
self.server.call_tool = self.handle_call_tool
async def handle_list_tools(self) -> List[Tool]:
"""Return list of available tools."""
return [
Tool(
name="scrape_page",
description="Scrape HTML content from a webpage with optional CSS selector",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL to scrape"},
"selector": {"type": "string", "description": "Optional CSS selector"},
"wait_for": {"type": "number", "description": "Wait time in ms"},
},
"required": ["url"],
},
),
Tool(
name="extract_text",
description="Extract text content from multiple elements",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL to extract from"},
"selectors": {
"type": "array",
"items": {"type": "string"},
"description": "CSS selectors",
},
"javascript": {"type": "boolean", "description": "Enable JavaScript"},
},
"required": ["url", "selectors"],
},
),
]
async def handle_call_tool(
self, name: str, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Handle tool execution."""
try:
if name == "scrape_page":
return await self.scrape_page(arguments)
elif name == "extract_text":
return await self.extract_text(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def get_browser(self) -> Browser:
"""Get or create browser instance."""
if not self.browser:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
return self.browser
async def scrape_page(self, args: Dict[str, Any]) -> List[TextContent]:
"""Scrape a webpage."""
validated_args = ScrapePageArgs(**args)
browser = await self.get_browser()
page = await browser.new_page()
try:
await page.goto(validated_args.url, wait_until="networkidle")
await page.wait_for_timeout(validated_args.wait_for)
if validated_args.selector:
element = await page.query_selector(validated_args.selector)
content = await element.inner_html() if element else ""
else:
content = await page.content()
result = {
"url": validated_args.url,
"content": content,
"selector": validated_args.selector,
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
finally:
await page.close()
async def extract_text(self, args: Dict[str, Any]) -> List[TextContent]:
"""Extract text from multiple selectors."""
validated_args = ExtractTextArgs(**args)
browser = await self.get_browser()
page = await browser.new_page()
try:
if not validated_args.javascript:
await page.set_extra_http_headers({"User-Agent": "MCP-Scraper/1.0"})
await page.goto(validated_args.url, wait_until="networkidle")
results = {}
for selector in validated_args.selectors:
elements = await page.query_selector_all(selector)
texts = []
for element in elements:
text = await element.inner_text()
if text.strip():
texts.append(text.strip())
results[selector] = texts
return [
TextContent(
type="text",
text=json.dumps({"url": validated_args.url, "results": results}, indent=2),
)
]
finally:
await page.close()
async def cleanup(self):
"""Cleanup resources."""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
async def run(self):
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="scraper-server",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
async def main():
server = ScraperMCPServer()
try:
await server.run()
finally:
await server.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Integrating with Claude Desktop
After building your server, configure it in Claude Desktop's configuration file:
MacOS: ~/Library/Application Support/Claude/claude_desktop_config.json
Windows: %APPDATA%\Claude\claude_desktop_config.json
{
"mcpServers": {
"scraper": {
"command": "node",
"args": ["/path/to/mcp-scraper-server/build/index.js"]
}
}
}
For Python servers:
{
"mcpServers": {
"scraper": {
"command": "python",
"args": ["/path/to/scraper_server.py"]
}
}
}
Advanced Features
Adding Resource Support
Resources allow your MCP server to expose data that can be read by clients. Here's how to add scraped data caching:
import { ListResourcesRequestSchema, ReadResourceRequestSchema } from "@modelcontextprotocol/sdk/types.js";
// In your setupHandlers method:
this.server.setRequestHandler(ListResourcesRequestSchema, async () => ({
resources: [
{
uri: "scraper://cache/recent",
name: "Recent Scrapes",
description: "Recently scraped pages",
mimeType: "application/json",
},
],
}));
this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (uri === "scraper://cache/recent") {
// Return cached scraping results
return {
contents: [
{
uri,
mimeType: "application/json",
text: JSON.stringify(this.cachedResults, null, 2),
},
],
};
}
throw new Error(`Unknown resource: ${uri}`);
});
Error Handling and Retry Logic
Implement robust error handling for handling timeouts and network issues:
private async scrapeWithRetry(url: string, maxRetries = 3): Promise<string> {
let lastError: Error | null = null;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const browser = await this.getBrowser();
const page = await browser.newPage();
await page.goto(url, {
waitUntil: "networkidle0",
timeout: 30000
});
const content = await page.content();
await page.close();
return content;
} catch (error) {
lastError = error as Error;
if (attempt < maxRetries - 1) {
await new Promise(resolve => setTimeout(resolve, 1000 * (attempt + 1)));
}
}
}
throw lastError || new Error("Scraping failed");
}
Browser Automation Extensions
Extend your server with advanced browser automation capabilities similar to handling browser sessions in Puppeteer:
private async handleAuthentication(page: Page, credentials?: { username: string; password: string }) {
if (credentials) {
await page.authenticate(credentials);
}
}
private async handleCookies(page: Page, cookies?: any[]) {
if (cookies && cookies.length > 0) {
await page.setCookie(...cookies);
}
}
Testing Your MCP Server
Create a test client to verify your server works correctly:
// test-client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
async function testServer() {
const transport = new StdioClientTransport({
command: "node",
args: ["./build/index.js"],
});
const client = new Client({
name: "test-client",
version: "1.0.0",
}, {
capabilities: {},
});
await client.connect(transport);
// List tools
const tools = await client.listTools();
console.log("Available tools:", tools);
// Call scrape_page tool
const result = await client.callTool({
name: "scrape_page",
arguments: {
url: "https://example.com",
selector: "h1",
},
});
console.log("Scrape result:", result);
await client.close();
}
testServer().catch(console.error);
Deployment Considerations
When deploying your MCP server to production:
- Resource Management: Properly close browser instances and manage memory
- Rate Limiting: Implement rate limiting to prevent abuse
- Logging: Add comprehensive logging for debugging
- Security: Validate all inputs and sanitize URLs
- Performance: Consider connection pooling for handling multiple pages in parallel
Conclusion
Building an MCP server from scratch gives you complete control over web scraping capabilities exposed to AI assistants. By following this guide, you've created a functional MCP server that can scrape web pages, extract text, and integrate seamlessly with Claude Desktop. You can extend this foundation with additional tools for authentication, form submission, screenshot capture, and more advanced automation tasks.
The modular architecture of MCP servers makes them ideal for building reusable web scraping components that can be shared across projects and teams, providing a standardized interface for AI-powered data extraction workflows.