Model Context Protocol (MCP): Building AI Integrations That Scale
A hands-on guide to the Model Context Protocol. Learn how to build MCP servers and clients, connect AI agents to enterprise tools, and deploy MCP in production with security and observability.
Every AI application eventually needs to talk to the outside world. It needs to query databases, read files, call APIs, search the web, and trigger workflows. The standard approach has been to write custom integration code for every combination of AI model and external tool. If you have 5 AI applications and 10 data sources, you end up writing and maintaining 50 bespoke connectors. Each one has its own authentication logic, error handling, serialization format, and retry strategy. The Model Context Protocol (MCP) eliminates this combinatorial explosion by defining a single, open standard for how AI models connect to external data sources and tools. Build one MCP server for your database, and every MCP-compatible AI client can use it immediately. This guide covers everything you need to build, deploy, and secure MCP integrations in production.
What MCP Solves: The N Times M Integration Problem
Before MCP, the AI integration landscape looked like this: every AI-powered application needed custom glue code for every external system it interacted with. A coding assistant needed one integration for GitHub, another for Jira, another for your internal documentation. A customer support agent needed connectors for your CRM, ticketing system, and knowledge base. Each integration was a one-off effort.
This is the classic N times M problem. With N AI applications and M data sources, you need N * M integrations. Each integration is tightly coupled to both the AI framework and the external tool's API. When either side changes, the integration breaks.
MCP reduces this to N + M. Each AI application implements one MCP client. Each external tool exposes one MCP server. The protocol handles the communication between them. Add a new AI application, and it instantly has access to every existing MCP server. Add a new MCP server, and every existing AI application can use it.
The MCP Architecture
MCP follows a client-server architecture with four key components:
Host is the AI application that the user interacts with. This could be Claude Desktop, an IDE with AI features, a custom chatbot, or any application that uses an AI model. The host manages the lifecycle of MCP clients and controls which servers a given AI model can access.
Client is the protocol-level component inside the host. It maintains a one-to-one connection with a single MCP server, handles the JSON-RPC message framing, and manages capability negotiation. A host can spin up multiple clients to connect to multiple servers simultaneously.
Server is a lightweight program that exposes specific capabilities through MCP. A server might wrap a database, a file system, an API, or any other data source. Servers declare what they can do (their tools, resources, and prompts) and respond to requests from clients.
Transport is the communication channel between client and server. MCP supports multiple transports: stdio for local processes, HTTP with Server-Sent Events (SSE) for remote servers, and the newer Streamable HTTP transport for production deployments.
The message flow works like this:
User <-> Host Application <-> MCP Client <-> Transport <-> MCP Server <-> External System
All messages between client and server use JSON-RPC 2.0. The client sends requests, the server sends responses. Servers can also send notifications (one-way messages that do not expect a response) for things like progress updates or log messages.
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "query_database",
"arguments": {
"sql": "SELECT * FROM users WHERE active = true LIMIT 10"
}
}
}MCP Core Primitives
MCP defines three core primitives that servers expose to clients: Tools, Resources, and Prompts. Each serves a distinct purpose and gives the AI model a different type of capability.
Tools: Actions the AI Can Take
Tools are executable functions that the AI model can invoke. They represent actions with side effects: querying a database, sending an email, creating a file, calling an API. The AI model decides when and how to call tools based on the user's request and the tool's description.
Each tool has a name, a human-readable description (which the AI model uses to decide when to call it), and a JSON Schema defining its input parameters.
{
"name": "run_sql_query",
"description": "Execute a read-only SQL query against the analytics database. Use this to answer questions about user behavior, revenue, and product metrics.",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute. Must be a SELECT statement."
},
"limit": {
"type": "number",
"description": "Maximum number of rows to return. Defaults to 100.",
"default": 100
}
},
"required": ["query"]
}
}Tool descriptions are critical. The AI model reads them to decide which tool to call and what arguments to provide. Write descriptions that explain what the tool does, when to use it, and what constraints apply. Vague descriptions lead to misuse.
Resources: Data the AI Can Read
Resources are read-only data that the AI can access. Unlike tools, resources do not perform actions or cause side effects. They provide context. A resource might be a file's contents, a database schema, a configuration document, or live data from a monitoring system.
Resources are identified by URIs and can be either static (listed by the server) or dynamic (constructed from URI templates).
{
"uri": "db://analytics/schema",
"name": "Analytics Database Schema",
"description": "Complete schema of the analytics database including all tables, columns, types, and relationships.",
"mimeType": "application/json"
}Dynamic resources use URI templates to represent parameterized data:
{
"uriTemplate": "db://analytics/table/{table_name}/schema",
"name": "Table Schema",
"description": "Schema for a specific table in the analytics database."
}The client reads a resource by sending a resources/read request with the URI. The server returns the content, which can be text or binary (base64-encoded).
Prompts: Reusable Prompt Templates
Prompts are server-defined templates that help the AI model interact with the server's capabilities effectively. They encode best practices and domain knowledge into reusable workflows.
For example, a database MCP server might provide a prompt template for data analysis:
{
"name": "analyze_table",
"description": "Generate a comprehensive analysis of a database table including row counts, null rates, value distributions, and anomalies.",
"arguments": [
{
"name": "table_name",
"description": "The table to analyze",
"required": true
},
{
"name": "focus_columns",
"description": "Comma-separated list of columns to focus the analysis on",
"required": false
}
]
}When the client requests this prompt, the server returns a structured message sequence that the host application can inject into the AI conversation. This ensures consistent, high-quality interactions without requiring the user to know the right prompts.
Building Your First MCP Server in Python
Let us build a practical MCP server that wraps a SQLite database, exposing query capabilities as tools and table schemas as resources.
Project Setup
Start by installing the MCP Python SDK:
pip install mcp[cli] aiosqliteCreate the project structure:
mkdir db-mcp-server && cd db-mcp-server
touch server.pyDefining the Server
The Python SDK provides a high-level FastMCP class that handles protocol negotiation, message routing, and transport setup. You define tools and resources using decorators.
# server.py
import json
import aiosqlite
from mcp.server.fastmcp import FastMCP
# Initialize the MCP server
mcp = FastMCP(
name="database-server",
version="1.0.0",
)
DATABASE_PATH = "analytics.db"
@mcp.resource("db://schema")
async def get_database_schema() -> str:
"""Return the complete database schema as JSON."""
async with aiosqlite.connect(DATABASE_PATH) as db:
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = await cursor.fetchall()
schema = {}
for (table_name,) in tables:
cursor = await db.execute(f"PRAGMA table_info({table_name})")
columns = await cursor.fetchall()
schema[table_name] = [
{
"name": col[1],
"type": col[2],
"nullable": not col[3],
"primary_key": bool(col[5]),
}
for col in columns
]
return json.dumps(schema, indent=2)
@mcp.resource("db://tables/{table_name}/sample")
async def get_table_sample(table_name: str) -> str:
"""Return a sample of rows from the specified table."""
# Validate table name to prevent injection
if not table_name.isalnum():
raise ValueError("Table name must be alphanumeric")
async with aiosqlite.connect(DATABASE_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
f"SELECT * FROM {table_name} LIMIT 5"
)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
result = [dict(zip(columns, row)) for row in rows]
return json.dumps(result, indent=2)
@mcp.tool()
async def run_query(query: str, limit: int = 100) -> str:
"""
Execute a read-only SQL query against the analytics database.
Use this tool to answer questions about data in the database.
Only SELECT statements are allowed. The query will be
automatically limited to the specified number of rows.
Args:
query: SQL SELECT query to execute
limit: Maximum rows to return (default 100, max 1000)
"""
# Validate the query is read-only
normalized = query.strip().upper()
if not normalized.startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed")
disallowed = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE"]
for keyword in disallowed:
if keyword in normalized.split("SELECT", 1)[0]:
raise ValueError(f"Query contains disallowed keyword: {keyword}")
# Enforce limit
limit = min(limit, 1000)
if "LIMIT" not in normalized:
query = f"{query.rstrip(';')} LIMIT {limit}"
async with aiosqlite.connect(DATABASE_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(query)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description]
result = {
"columns": columns,
"rows": [dict(zip(columns, row)) for row in rows],
"row_count": len(rows),
}
return json.dumps(result, indent=2)
@mcp.tool()
async def list_tables() -> str:
"""
List all tables in the analytics database with row counts.
Use this tool to discover what data is available before
writing queries.
"""
async with aiosqlite.connect(DATABASE_PATH) as db:
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = await cursor.fetchall()
result = []
for (table_name,) in tables:
count_cursor = await db.execute(
f"SELECT COUNT(*) FROM {table_name}"
)
count = (await count_cursor.fetchone())[0]
result.append({"table": table_name, "row_count": count})
return json.dumps(result, indent=2)
@mcp.prompt()
async def analyze_data(table_name: str, question: str) -> str:
"""Generate a prompt for analyzing data in a specific table."""
# Fetch the schema for context
schema = await get_database_schema()
return f"""You are a data analyst. Analyze the following question using
the database schema provided.
Database Schema:
{schema}
Target Table: {table_name}
Question: {question}
First, use the list_tables tool to understand available data.
Then, use the run_query tool to execute SQL queries to answer the question.
Provide your analysis with specific numbers and insights."""
if __name__ == "__main__":
mcp.run()Running the Server
Run the server locally using stdio transport (the default):
python server.pyOr use the MCP CLI to test it interactively:
mcp dev server.pyThis launches the MCP Inspector, a browser-based tool that lets you list the server's capabilities, call tools, and read resources without writing client code. It is invaluable during development.
To install the server for use with Claude Desktop or other MCP hosts:
mcp install server.py --name "Analytics DB"This adds the server configuration to the host's MCP settings file.
Building Your First MCP Server in TypeScript
TypeScript is equally well-supported. Let us build a file system MCP server that gives an AI model controlled access to read and search files within a specified directory.
Project Setup
mkdir fs-mcp-server && cd fs-mcp-server
npm init -y
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node
npx tsc --initThe Complete Server
// src/index.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import * as fs from "fs/promises";
import * as path from "path";
const ALLOWED_ROOT = process.env.FS_ROOT || process.cwd();
// Helper to validate paths stay within the allowed root
function validatePath(requestedPath: string): string {
const resolved = path.resolve(ALLOWED_ROOT, requestedPath);
if (!resolved.startsWith(ALLOWED_ROOT)) {
throw new Error(
`Access denied: path ${requestedPath} is outside the allowed root`
);
}
return resolved;
}
const server = new McpServer({
name: "filesystem-server",
version: "1.0.0",
});
// --- Resources ---
server.resource(
"directory-listing",
"file://directory",
async (uri) => {
const entries = await fs.readdir(ALLOWED_ROOT, {
withFileTypes: true,
recursive: true,
});
const listing = entries.map((entry) => ({
name: entry.name,
type: entry.isDirectory() ? "directory" : "file",
path: path.relative(
ALLOWED_ROOT,
path.join(entry.parentPath || entry.path, entry.name)
),
}));
return {
contents: [
{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(listing, null, 2),
},
],
};
}
);
// --- Tools ---
server.tool(
"read_file",
"Read the contents of a file at the given path relative to the project root. " +
"Use this to examine source code, configuration files, or documentation.",
{
file_path: z
.string()
.describe("Relative path to the file from the project root"),
encoding: z
.enum(["utf-8", "base64"])
.default("utf-8")
.describe("File encoding"),
},
async ({ file_path, encoding }) => {
const fullPath = validatePath(file_path);
try {
const stat = await fs.stat(fullPath);
if (stat.size > 1024 * 1024) {
return {
content: [
{
type: "text" as const,
text: `File is too large (${(stat.size / 1024 / 1024).toFixed(1)}MB). Maximum size is 1MB.`,
},
],
isError: true,
};
}
const content = await fs.readFile(fullPath, encoding as BufferEncoding);
return {
content: [{ type: "text" as const, text: content }],
};
} catch (err: any) {
return {
content: [
{ type: "text" as const, text: `Error reading file: ${err.message}` },
],
isError: true,
};
}
}
);
server.tool(
"search_files",
"Search for files whose names match a glob pattern. " +
"Use this to find relevant files before reading them.",
{
pattern: z
.string()
.describe(
'Glob-like pattern to match file names (e.g., "*.ts", "test_*")'
),
directory: z
.string()
.default(".")
.describe("Directory to search in, relative to project root"),
},
async ({ pattern, directory }) => {
const searchDir = validatePath(directory);
const entries = await fs.readdir(searchDir, {
withFileTypes: true,
recursive: true,
});
// Simple glob matching
const regex = new RegExp(
"^" + pattern.replace(/\*/g, ".*").replace(/\?/g, ".") + "$"
);
const matches = entries
.filter((entry) => entry.isFile() && regex.test(entry.name))
.map((entry) =>
path.relative(
ALLOWED_ROOT,
path.join(entry.parentPath || entry.path, entry.name)
)
)
.slice(0, 50); // Limit results
return {
content: [
{
type: "text" as const,
text: JSON.stringify(
{ matches, total: matches.length, truncated: matches.length >= 50 },
null,
2
),
},
],
};
}
);
server.tool(
"get_file_info",
"Get metadata about a file including size, modification time, and permissions.",
{
file_path: z.string().describe("Relative path to the file"),
},
async ({ file_path }) => {
const fullPath = validatePath(file_path);
const stat = await fs.stat(fullPath);
const info = {
path: file_path,
size_bytes: stat.size,
size_human: `${(stat.size / 1024).toFixed(1)}KB`,
modified: stat.mtime.toISOString(),
created: stat.birthtime.toISOString(),
is_directory: stat.isDirectory(),
is_file: stat.isFile(),
permissions: stat.mode.toString(8),
};
return {
content: [{ type: "text" as const, text: JSON.stringify(info, null, 2) }],
};
}
);
// --- Start the server ---
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Filesystem MCP server running on stdio");
}
main().catch((error) => {
console.error("Fatal error:", error);
process.exit(1);
});Building and Running
npx tsc
node dist/index.jsTo configure this server for a host application, add it to the host's MCP configuration file. For Claude Desktop, edit claude_desktop_config.json:
{
"mcpServers": {
"filesystem": {
"command": "node",
"args": ["/absolute/path/to/dist/index.js"],
"env": {
"FS_ROOT": "/path/to/your/project"
}
}
}
}Connecting MCP to AI Agents
Building MCP servers is half the equation. The other half is connecting them to AI agents as clients.
Using the MCP Client SDK Directly
The Python SDK provides a client that can connect to any MCP server:
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
# Connect to a local MCP server
server_params = StdioServerParameters(
command="python",
args=["server.py"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
# Discover available tools
tools = await session.list_tools()
print("Available tools:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
# Call a tool
result = await session.call_tool(
"run_query",
arguments={"query": "SELECT COUNT(*) as total FROM users"}
)
print(f"Query result: {result.content[0].text}")
# Read a resource
resource = await session.read_resource("db://schema")
print(f"Schema: {resource.contents[0].text}")
asyncio.run(main())Integrating with LangChain
MCP tools integrate naturally with LangChain agents. You can convert MCP tools into LangChain-compatible tools and use them in any agent or chain:
import asyncio
from langchain_core.tools import StructuredTool
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
def mcp_tool_to_langchain(tool_def, session):
"""Convert an MCP tool definition into a LangChain StructuredTool."""
async def call_tool(**kwargs):
result = await session.call_tool(tool_def.name, arguments=kwargs)
return result.content[0].text
# Build the args schema from MCP's JSON Schema
return StructuredTool.from_function(
coroutine=call_tool,
name=tool_def.name,
description=tool_def.description,
args_schema=None, # Can be derived from tool_def.inputSchema
)
async def main():
server_params = StdioServerParameters(
command="python",
args=["server.py"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Convert MCP tools to LangChain tools
mcp_tools = await session.list_tools()
lc_tools = [
mcp_tool_to_langchain(tool, session)
for tool in mcp_tools.tools
]
# Build a LangChain agent with MCP tools
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful data analyst."),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(llm, lc_tools, prompt)
executor = AgentExecutor(agent=agent, tools=lc_tools)
result = await executor.ainvoke({
"input": "How many active users do we have?"
})
print(result["output"])
asyncio.run(main())Building a Custom Agent with MCP
For maximum control, build your own agent loop with direct MCP integration:
import asyncio
import json
from anthropic import Anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def run_agent(user_message: str):
client = Anthropic()
server_params = StdioServerParameters(
command="python",
args=["server.py"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Get available tools in Claude's format
mcp_tools = await session.list_tools()
claude_tools = [
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
}
for tool in mcp_tools.tools
]
messages = [{"role": "user", "content": user_message}]
# Agent loop
while True:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=claude_tools,
messages=messages,
)
# Check if the model wants to use a tool
if response.stop_reason == "tool_use":
# Process each tool use block
tool_results = []
for block in response.content:
if block.type == "tool_use":
# Call the tool via MCP
result = await session.call_tool(
block.name,
arguments=block.input,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result.content[0].text,
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
else:
# Model is done - extract the final text response
final_text = next(
block.text
for block in response.content
if hasattr(block, "text")
)
return final_text
result = asyncio.run(run_agent("What are our top 10 customers by revenue?"))
print(result)MCP in Production
Moving from development to production requires attention to transport selection, authentication, error handling, and observability.
Transport Options
stdio is the simplest transport. The host launches the server as a child process and communicates over standard input/output. This is ideal for local tools and development but does not support remote servers.
HTTP with Server-Sent Events (SSE) enables remote MCP servers. The client sends requests as HTTP POST and receives responses and notifications through an SSE stream. This was the original remote transport, and while it works, it has limitations around connection management.
Streamable HTTP is the recommended transport for production. It uses standard HTTP POST requests for client-to-server communication, with optional SSE for server-to-client streaming. It supports stateless operation (no persistent connection required), which makes it compatible with standard HTTP infrastructure like load balancers, API gateways, and CDNs.
Here is a production server using Streamable HTTP with the Python SDK:
# production_server.py
import logging
from mcp.server.fastmcp import FastMCP
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
mcp = FastMCP(
name="production-database-server",
version="2.0.0",
# Stateless mode for horizontal scaling
stateless_http=True,
)
@mcp.tool()
async def run_query(query: str) -> str:
"""Execute a read-only SQL query."""
logger.info(f"Executing query: {query[:100]}...")
# ... implementation ...
return '{"result": "data"}'
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)Authentication with OAuth 2.0
MCP's specification includes a built-in OAuth 2.0 authorization flow for remote servers. The server declares its authorization endpoint during capability negotiation, and clients handle the token exchange automatically.
For simpler setups, you can use bearer token authentication with middleware:
# auth_middleware.py
import os
import time
import hashlib
import hmac
from functools import wraps
API_KEYS = {
"client_prod_01": {
"secret_hash": "sha256_hash_of_secret_here",
"scopes": ["tools:read", "tools:execute", "resources:read"],
"rate_limit": 100, # requests per minute
},
"client_dev_01": {
"secret_hash": "sha256_hash_of_secret_here",
"scopes": ["tools:read", "resources:read"],
"rate_limit": 20,
},
}
# Simple in-memory rate limiter
rate_limit_store: dict[str, list[float]] = {}
def verify_api_key(api_key: str, required_scope: str) -> bool:
"""Verify an API key and check scope permissions."""
client_id = api_key.split(":")[0] if ":" in api_key else None
if client_id not in API_KEYS:
return False
config = API_KEYS[client_id]
# Verify the secret
secret = api_key.split(":", 1)[1] if ":" in api_key else ""
secret_hash = hashlib.sha256(secret.encode()).hexdigest()
if not hmac.compare_digest(secret_hash, config["secret_hash"]):
return False
# Check scope
if required_scope not in config["scopes"]:
return False
# Check rate limit
now = time.time()
window = rate_limit_store.setdefault(client_id, [])
window[:] = [t for t in window if now - t < 60]
if len(window) >= config["rate_limit"]:
return False
window.append(now)
return True
def require_auth(scope: str):
"""Decorator to require authentication on MCP tool handlers."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# In a real implementation, extract the token from
# the MCP session or transport headers
api_key = os.environ.get("MCP_API_KEY", "")
if not verify_api_key(api_key, scope):
raise PermissionError(
"Authentication failed or insufficient permissions"
)
return await func(*args, **kwargs)
return wrapper
return decoratorError Handling
Production MCP servers must handle errors gracefully. The protocol defines standard error codes, and your tools should return structured error information rather than crashing:
from mcp.server.fastmcp import FastMCP
import json
mcp = FastMCP("resilient-server")
@mcp.tool()
async def safe_query(query: str) -> str:
"""Execute a database query with comprehensive error handling."""
try:
# Validate input
if not query or len(query) > 10000:
return json.dumps({
"error": "INVALID_INPUT",
"message": "Query must be between 1 and 10,000 characters",
})
if not query.strip().upper().startswith("SELECT"):
return json.dumps({
"error": "FORBIDDEN_OPERATION",
"message": "Only SELECT queries are allowed",
})
# Execute with timeout
import asyncio
import aiosqlite
async with aiosqlite.connect("analytics.db") as db:
try:
result = await asyncio.wait_for(
db.execute(query),
timeout=30.0,
)
rows = await result.fetchall()
columns = [d[0] for d in result.description]
return json.dumps({
"success": True,
"columns": columns,
"rows": [dict(zip(columns, row)) for row in rows],
"row_count": len(rows),
})
except asyncio.TimeoutError:
return json.dumps({
"error": "QUERY_TIMEOUT",
"message": "Query exceeded the 30 second time limit",
})
except Exception as e:
return json.dumps({
"error": "INTERNAL_ERROR",
"message": f"An unexpected error occurred: {str(e)}",
})Observability
Add structured logging and metrics to understand how your MCP server is being used:
import time
import json
import logging
from functools import wraps
logger = logging.getLogger("mcp.audit")
def audit_log(func):
"""Decorator that logs every tool invocation with timing and metadata."""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
tool_name = func.__name__
log_entry = {
"event": "tool_invocation",
"tool": tool_name,
"arguments": {
k: v[:200] if isinstance(v, str) else v
for k, v in kwargs.items()
},
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
try:
result = await func(*args, **kwargs)
elapsed = time.time() - start
log_entry["status"] = "success"
log_entry["duration_ms"] = round(elapsed * 1000, 2)
log_entry["response_size"] = len(result) if isinstance(result, str) else 0
logger.info(json.dumps(log_entry))
return result
except Exception as e:
elapsed = time.time() - start
log_entry["status"] = "error"
log_entry["error"] = str(e)
log_entry["duration_ms"] = round(elapsed * 1000, 2)
logger.error(json.dumps(log_entry))
raise
return wrapperSecurity Considerations
MCP servers extend the reach of AI models into real systems. Security is not optional. A poorly secured MCP server is a direct path from a prompt injection attack to your production database.
Input Validation
Every tool input must be validated before use. Never trust that the AI model will send well-formed, safe inputs. The model's decisions are influenced by user prompts, which could be adversarial.
import re
from pydantic import BaseModel, field_validator
class QueryInput(BaseModel):
"""Validated input for SQL query tool."""
query: str
limit: int = 100
@field_validator("query")
@classmethod
def validate_query(cls, v: str) -> str:
normalized = v.strip().upper()
# Must start with SELECT
if not normalized.startswith("SELECT"):
raise ValueError("Only SELECT queries are permitted")
# Block dangerous patterns
dangerous_patterns = [
r"\bDROP\b", r"\bDELETE\b", r"\bINSERT\b",
r"\bUPDATE\b", r"\bALTER\b", r"\bCREATE\b",
r"\bEXEC\b", r"\bEXECUTE\b", r"\bGRANT\b",
r"\bREVOKE\b", r"\bUNION\b.*\bSELECT\b",
r";\s*\w", # Multiple statements
r"--", # SQL comments (potential injection)
r"/\*", # Block comments
]
for pattern in dangerous_patterns:
if re.search(pattern, normalized):
raise ValueError(
f"Query contains forbidden pattern: {pattern}"
)
return v
@field_validator("limit")
@classmethod
def validate_limit(cls, v: int) -> int:
if v < 1 or v > 1000:
raise ValueError("Limit must be between 1 and 1000")
return vPermission Scoping
Different AI agents should have different levels of access. A customer support agent does not need write access to the production database. A code review agent does not need access to financial data. Implement permission scoping at the server level:
from dataclasses import dataclass
from enum import Enum
class Permission(Enum):
READ_SCHEMA = "read_schema"
READ_DATA = "read_data"
WRITE_DATA = "write_data"
EXECUTE_QUERIES = "execute_queries"
ACCESS_PII = "access_pii"
@dataclass
class ClientPermissions:
client_id: str
permissions: set[Permission]
allowed_tables: set[str] | None = None # None means all tables
max_rows: int = 1000
query_timeout_seconds: int = 30
# Define permission profiles
PERMISSION_PROFILES = {
"support_agent": ClientPermissions(
client_id="support_agent",
permissions={Permission.READ_DATA, Permission.READ_SCHEMA},
allowed_tables={"customers", "tickets", "products"},
max_rows=100,
),
"analytics_agent": ClientPermissions(
client_id="analytics_agent",
permissions={
Permission.READ_DATA,
Permission.READ_SCHEMA,
Permission.EXECUTE_QUERIES,
},
allowed_tables=None, # Access to all tables
max_rows=10000,
),
"admin_agent": ClientPermissions(
client_id="admin_agent",
permissions={
Permission.READ_DATA,
Permission.READ_SCHEMA,
Permission.WRITE_DATA,
Permission.EXECUTE_QUERIES,
Permission.ACCESS_PII,
},
allowed_tables=None,
max_rows=50000,
),
}
def check_permission(
client_id: str, required: Permission, table: str | None = None
) -> bool:
"""Check if a client has the required permission."""
profile = PERMISSION_PROFILES.get(client_id)
if not profile:
return False
if required not in profile.permissions:
return False
if table and profile.allowed_tables is not None:
if table not in profile.allowed_tables:
return False
return TruePreventing Prompt Injection via MCP Resources
When MCP resources return data that gets inserted into an AI prompt, that data becomes a vector for indirect prompt injection. An attacker who can control the content of a resource (for example, by writing a malicious value into a database field) could manipulate the AI's behavior.
Mitigations:
import re
import html
def sanitize_resource_content(content: str) -> str:
"""
Sanitize content from external sources before returning it
as an MCP resource. This reduces the risk of indirect prompt
injection.
"""
# Remove common prompt injection patterns
injection_patterns = [
r"ignore\s+(all\s+)?(previous|prior|above)\s+instructions",
r"you\s+are\s+now\s+a",
r"new\s+instructions:",
r"system\s*prompt:",
r"<\s*system\s*>",
r"\[INST\]",
r"<<\s*SYS\s*>>",
]
sanitized = content
for pattern in injection_patterns:
sanitized = re.sub(
pattern, "[CONTENT FILTERED]", sanitized, flags=re.IGNORECASE
)
# Escape any HTML/XML-like tags that could confuse the model
sanitized = html.escape(sanitized)
# Truncate excessively long content
max_length = 50000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "\n[TRUNCATED]"
return sanitizedAudit Logging
Every tool invocation should be logged with enough context to reconstruct what happened during an incident:
import json
import time
import uuid
from dataclasses import dataclass, asdict
@dataclass
class AuditEvent:
event_id: str
timestamp: str
client_id: str
tool_name: str
arguments: dict
result_status: str
duration_ms: float
error_message: str | None = None
class AuditLogger:
def __init__(self, log_path: str = "mcp_audit.jsonl"):
self.log_path = log_path
def log(self, event: AuditEvent):
with open(self.log_path, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def create_event(
self, client_id: str, tool_name: str, arguments: dict
) -> AuditEvent:
return AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime()
),
client_id=client_id,
tool_name=tool_name,
arguments=self._redact_sensitive(arguments),
result_status="pending",
duration_ms=0,
)
def _redact_sensitive(self, args: dict) -> dict:
"""Redact sensitive fields from audit logs."""
sensitive_keys = {"password", "token", "secret", "api_key", "ssn"}
redacted = {}
for key, value in args.items():
if key.lower() in sensitive_keys:
redacted[key] = "[REDACTED]"
elif isinstance(value, str) and len(value) > 500:
redacted[key] = value[:500] + "...[TRUNCATED]"
else:
redacted[key] = value
return redactedBuilding Multi-Server Architectures
Real enterprise deployments rarely use a single MCP server. A production environment might connect an AI agent to a database server, a file system server, a Slack server, a GitHub server, and internal API servers. Managing this constellation of servers requires deliberate architecture.
The Multi-Client Pattern
The MCP specification requires one client per server connection. A host application manages multiple clients, each connected to a different server. The host aggregates the capabilities from all connected servers and presents them to the AI model as a unified set of tools and resources.
# multi_server_client.py
import asyncio
from dataclasses import dataclass
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
@dataclass
class ServerConfig:
name: str
command: str
args: list[str]
env: dict[str, str] | None = None
class MultiServerManager:
"""Manage connections to multiple MCP servers."""
def __init__(self):
self.sessions: dict[str, ClientSession] = {}
self.tool_map: dict[str, str] = {} # tool_name -> server_name
self._contexts = []
async def connect(self, config: ServerConfig):
"""Connect to an MCP server and register its tools."""
params = StdioServerParameters(
command=config.command,
args=config.args,
env=config.env,
)
# Keep references to context managers so they stay open
ctx = stdio_client(params)
read, write = await ctx.__aenter__()
self._contexts.append(ctx)
session = ClientSession(read, write)
sess_ctx = session
await sess_ctx.__aenter__()
self._contexts.append(sess_ctx)
await session.initialize()
self.sessions[config.name] = session
# Map tool names to their server
tools = await session.list_tools()
for tool in tools.tools:
qualified_name = f"{config.name}.{tool.name}"
self.tool_map[qualified_name] = config.name
# Also register unqualified for convenience
if tool.name not in self.tool_map:
self.tool_map[tool.name] = config.name
print(
f"Connected to {config.name}: "
f"{len(tools.tools)} tools available"
)
async def call_tool(self, tool_name: str, arguments: dict) -> str:
"""Route a tool call to the correct server."""
server_name = self.tool_map.get(tool_name)
if not server_name:
raise ValueError(f"Unknown tool: {tool_name}")
# Strip server prefix if present
actual_name = (
tool_name.split(".", 1)[1]
if "." in tool_name
else tool_name
)
session = self.sessions[server_name]
result = await session.call_tool(actual_name, arguments=arguments)
return result.content[0].text
async def list_all_tools(self) -> list[dict]:
"""Aggregate tools from all connected servers."""
all_tools = []
for server_name, session in self.sessions.items():
tools = await session.list_tools()
for tool in tools.tools:
all_tools.append({
"name": f"{server_name}.{tool.name}",
"description": f"[{server_name}] {tool.description}",
"input_schema": tool.inputSchema,
})
return all_tools
async def disconnect_all(self):
"""Clean up all connections."""
for ctx in reversed(self._contexts):
await ctx.__aexit__(None, None, None)
self.sessions.clear()
self.tool_map.clear()
self._contexts.clear()
# Usage
async def main():
manager = MultiServerManager()
# Connect to multiple servers
await manager.connect(ServerConfig(
name="database",
command="python",
args=["db_server.py"],
))
await manager.connect(ServerConfig(
name="filesystem",
command="node",
args=["fs_server/dist/index.js"],
env={"FS_ROOT": "/path/to/project"},
))
await manager.connect(ServerConfig(
name="github",
command="npx",
args=["-y", "@modelcontextprotocol/server-github"],
env={"GITHUB_TOKEN": "ghp_xxxxxxxxxxxx"},
))
# List all tools across servers
tools = await manager.list_all_tools()
for tool in tools:
print(f" {tool['name']}: {tool['description'][:80]}")
# Call tools on specific servers
schema = await manager.call_tool(
"database.run_query",
{"query": "SELECT COUNT(*) FROM users"}
)
print(f"Database result: {schema}")
files = await manager.call_tool(
"filesystem.search_files",
{"pattern": "*.py"}
)
print(f"Files found: {files}")
await manager.disconnect_all()
asyncio.run(main())Server Discovery and Configuration
In larger organizations, managing server configurations across teams requires a centralized approach. Define server registries that clients can query:
{
"registry_version": "1.0",
"servers": [
{
"name": "analytics-db",
"description": "Read-only access to the analytics database",
"transport": "streamable-http",
"url": "https://mcp.internal.company.com/analytics-db",
"auth": {
"type": "oauth2",
"authorization_url": "https://auth.company.com/authorize",
"token_url": "https://auth.company.com/token",
"scopes": ["mcp:analytics:read"]
},
"capabilities": ["tools", "resources"],
"tags": ["database", "analytics", "read-only"],
"owner": "data-platform-team",
"sla": "99.9%"
},
{
"name": "github-enterprise",
"description": "Access to GitHub Enterprise repositories and issues",
"transport": "streamable-http",
"url": "https://mcp.internal.company.com/github",
"auth": {
"type": "bearer",
"token_env": "GITHUB_MCP_TOKEN"
},
"capabilities": ["tools", "resources", "prompts"],
"tags": ["github", "code", "issues"],
"owner": "devtools-team",
"sla": "99.5%"
},
{
"name": "slack-notifications",
"description": "Send messages and read channels via Slack",
"transport": "streamable-http",
"url": "https://mcp.internal.company.com/slack",
"auth": {
"type": "oauth2",
"authorization_url": "https://slack.com/oauth/v2/authorize",
"token_url": "https://slack.com/api/oauth.v2.access",
"scopes": ["channels:read", "chat:write"]
},
"capabilities": ["tools"],
"tags": ["slack", "messaging", "notifications"],
"owner": "platform-team",
"sla": "99.0%"
}
]
}Capability Negotiation
When a client connects to a server, they negotiate capabilities. Not all servers support all features. The initialization handshake establishes what both sides can do:
async def negotiate_capabilities(session: ClientSession) -> dict:
"""
After initialization, inspect what the server supports
and adapt the client behavior accordingly.
"""
capabilities = {}
# Check for tool support
try:
tools = await session.list_tools()
capabilities["tools"] = [t.name for t in tools.tools]
except Exception:
capabilities["tools"] = []
# Check for resource support
try:
resources = await session.list_resources()
capabilities["resources"] = [r.uri for r in resources.resources]
except Exception:
capabilities["resources"] = []
# Check for prompt support
try:
prompts = await session.list_prompts()
capabilities["prompts"] = [p.name for p in prompts.prompts]
except Exception:
capabilities["prompts"] = []
return capabilitiesTesting and Debugging MCP Servers
Rigorous testing is essential for MCP servers because their consumers are AI models, which are nondeterministic. You need confidence that the server itself behaves correctly and consistently.
Unit Testing Tools
Test each tool handler in isolation using standard async test patterns:
# test_server.py
import pytest
import json
import aiosqlite
# Import the tool functions directly from your server module
from server import run_query, list_tables, get_database_schema
@pytest.fixture
async def setup_test_db(tmp_path):
"""Create a temporary test database."""
db_path = str(tmp_path / "test.db")
async with aiosqlite.connect(db_path) as db:
await db.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
active BOOLEAN DEFAULT 1
)
""")
await db.execute("""
INSERT INTO users (name, email, active) VALUES
('Alice', 'alice@example.com', 1),
('Bob', 'bob@example.com', 1),
('Charlie', 'charlie@example.com', 0)
""")
await db.commit()
return db_path
@pytest.mark.asyncio
async def test_run_query_returns_valid_json(setup_test_db, monkeypatch):
monkeypatch.setattr("server.DATABASE_PATH", setup_test_db)
result = await run_query("SELECT * FROM users WHERE active = 1")
data = json.loads(result)
assert data["row_count"] == 2
assert "columns" in data
assert "name" in data["columns"]
@pytest.mark.asyncio
async def test_run_query_blocks_write_operations(setup_test_db, monkeypatch):
monkeypatch.setattr("server.DATABASE_PATH", setup_test_db)
with pytest.raises(ValueError, match="Only SELECT queries"):
await run_query("DROP TABLE users")
@pytest.mark.asyncio
async def test_run_query_blocks_delete(setup_test_db, monkeypatch):
monkeypatch.setattr("server.DATABASE_PATH", setup_test_db)
with pytest.raises(ValueError, match="Only SELECT queries"):
await run_query("DELETE FROM users WHERE id = 1")
@pytest.mark.asyncio
async def test_run_query_enforces_limit(setup_test_db, monkeypatch):
monkeypatch.setattr("server.DATABASE_PATH", setup_test_db)
result = await run_query("SELECT * FROM users", limit=1)
data = json.loads(result)
assert data["row_count"] <= 1
@pytest.mark.asyncio
async def test_list_tables_returns_all_tables(setup_test_db, monkeypatch):
monkeypatch.setattr("server.DATABASE_PATH", setup_test_db)
result = await list_tables()
data = json.loads(result)
table_names = [t["table"] for t in data]
assert "users" in table_names
@pytest.mark.asyncio
async def test_schema_includes_column_types(setup_test_db, monkeypatch):
monkeypatch.setattr("server.DATABASE_PATH", setup_test_db)
result = await get_database_schema()
schema = json.loads(result)
assert "users" in schema
column_names = [c["name"] for c in schema["users"]]
assert "id" in column_names
assert "name" in column_names
assert "email" in column_namesIntegration Testing with the MCP Client
Test the full protocol flow by spinning up the server and connecting a real MCP client:
# test_integration.py
import pytest
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
@pytest.fixture
async def mcp_session():
"""Create a live MCP session connected to the server."""
params = StdioServerParameters(
command="python",
args=["server.py"],
)
ctx = stdio_client(params)
read, write = await ctx.__aenter__()
session = ClientSession(read, write)
await session.__aenter__()
await session.initialize()
yield session
await session.__aexit__(None, None, None)
await ctx.__aexit__(None, None, None)
@pytest.mark.asyncio
async def test_server_lists_tools(mcp_session):
tools = await mcp_session.list_tools()
tool_names = [t.name for t in tools.tools]
assert "run_query" in tool_names
assert "list_tables" in tool_names
@pytest.mark.asyncio
async def test_server_lists_resources(mcp_session):
resources = await mcp_session.list_resources()
resource_uris = [r.uri for r in resources.resources]
assert any("schema" in uri for uri in resource_uris)
@pytest.mark.asyncio
async def test_tool_call_returns_result(mcp_session):
result = await mcp_session.call_tool(
"list_tables",
arguments={},
)
assert len(result.content) > 0
assert result.content[0].type == "text"
# Should be valid JSON
import json
data = json.loads(result.content[0].text)
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_resource_read_returns_content(mcp_session):
result = await mcp_session.read_resource("db://schema")
assert len(result.contents) > 0
assert result.contents[0].text is not NoneUsing the MCP Inspector
The MCP Inspector is a browser-based debugging tool included with the Python SDK. It provides a visual interface for interacting with your server during development:
# Launch the inspector connected to your server
mcp dev server.py
# Or connect to a remote server
mcp dev --transport streamable-http --url http://localhost:8080/mcpThe Inspector lets you:
- View all registered tools, resources, and prompts
- Call tools with custom arguments and inspect responses
- Read resources and view their contents
- Monitor JSON-RPC messages in real time
- Test error handling by sending malformed requests
For CI/CD pipelines, you can also run automated protocol compliance checks:
# Run the MCP compliance test suite against your server
mcp test server.py --comprehensiveDebugging Tips
When things go wrong, these strategies help isolate issues quickly:
Enable protocol-level logging. Set the MCP_LOG_LEVEL environment variable to debug to see every JSON-RPC message exchanged between client and server:
MCP_LOG_LEVEL=debug python server.pyTest tools in isolation. Before debugging through the full MCP protocol, verify your tool functions work correctly when called directly:
# quick_test.py
import asyncio
from server import run_query
async def main():
result = await run_query("SELECT 1 as test")
print(result)
asyncio.run(main())Validate JSON Schema definitions. Incorrect input schemas cause silent failures where the AI model either never calls the tool or calls it with wrong arguments. Use a JSON Schema validator to check your schemas:
import jsonschema
schema = {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "number", "default": 100},
},
"required": ["query"],
}
# This should pass
jsonschema.validate({"query": "SELECT 1", "limit": 50}, schema)
# This should fail - missing required field
try:
jsonschema.validate({"limit": 50}, schema)
except jsonschema.ValidationError as e:
print(f"Validation error: {e.message}")MCP vs Alternatives
MCP is not the only way to connect AI models to external tools. Understanding when to use MCP versus alternatives helps you make the right architectural choice.
Function Calling (Native)
Most LLM providers support function calling natively. You define functions in the API request, and the model returns structured calls to those functions. This is simpler than MCP for basic use cases but has significant limitations at scale.
When to use function calling: You have a small number of tools (under 10), a single AI provider, and no need for reusable integrations across applications.
When MCP is better: You have many tools, multiple AI applications that need the same integrations, or you want to share tool implementations across teams.
OpenAPI Plugins
OpenAI's plugin system and similar approaches generate tool definitions from OpenAPI specifications. This works well for existing REST APIs but does not support bidirectional communication, streaming, or the resource/prompt primitives that MCP provides.
When to use OpenAPI plugins: You already have well-documented REST APIs and need quick, read-mostly integrations.
When MCP is better: You need bidirectional communication, streaming responses, or rich context beyond simple API calls.
Custom API Integrations
Building direct integrations gives maximum control but maximum maintenance burden. Every integration is bespoke code that must be maintained, tested, and updated when either the AI framework or the external API changes.
When to use custom integrations: You have unique requirements that no standard protocol can satisfy, or you need to optimize for latency in ways that a general protocol cannot.
When MCP is better: Almost always, unless your requirements are truly unique. The maintenance cost of custom integrations compounds rapidly.
Comparison Summary
| Aspect | Function Calling | OpenAPI Plugins | Custom APIs | MCP |
|---|---|---|---|---|
| Setup complexity | Low | Medium | High | Medium |
| Reusability | Low | Medium | Low | High |
| Ecosystem | Provider-specific | Limited | None | Growing |
| Bidirectional | No | No | Possible | Yes |
| Streaming | Limited | No | Possible | Yes |
| Resource access | No | No | Possible | Yes |
| Prompt templates | No | No | No | Yes |
| Multi-provider | No | No | Yes | Yes |
MCP's strength is in its role as a universal standard. The initial setup cost is slightly higher than native function calling, but the investment pays off as soon as you have more than one AI application or more than a handful of integrations. The protocol is open, the ecosystem is growing rapidly, and the specification continues to evolve with input from the community.
What Comes Next
MCP is still evolving. The specification is adding new capabilities around authorization, server discovery, and richer transport options. The ecosystem of pre-built MCP servers covers major platforms like GitHub, Slack, Google Drive, PostgreSQL, and dozens more, with new servers appearing regularly.
The most impactful step you can take is to identify one integration in your stack that multiple AI applications or agents would benefit from, build an MCP server for it, and start using it. The experience of building and deploying a real MCP server teaches more than any guide can. Start with stdio transport locally, prove the value, then move to Streamable HTTP for production.
If your organization is building AI-powered products or deploying AI agents internally, MCP is rapidly becoming the standard way to connect those agents to your systems. The teams that invest in MCP infrastructure now will have a significant advantage as AI capabilities continue to accelerate.