feat(mcp): initial version of mcp

This commit is contained in:
2025-03-12 19:51:40 -06:00
parent 2f84bc1e27
commit c870eed844
8 changed files with 1551 additions and 11 deletions

514
mcontainer/mcp.py Normal file
View File

@@ -0,0 +1,514 @@
"""
MCP (Model Control Protocol) server management for Monadical Container.
"""
import os
import docker
import logging
import tempfile
from typing import Dict, List, Optional, Any
from docker.errors import DockerException, ImageNotFound, NotFound
from .models import MCPStatus, RemoteMCP, DockerMCP, ProxyMCP, MCPContainer
from .user_config import UserConfigManager
# Configure logging
logger = logging.getLogger(__name__)
class MCPManager:
"""Manager for MCP (Model Control Protocol) servers."""
def __init__(
self,
config_manager: Optional[UserConfigManager] = None,
):
"""Initialize the MCP manager."""
self.config_manager = config_manager or UserConfigManager()
try:
self.client = docker.from_env()
# Test connection
self.client.ping()
except DockerException as e:
logger.error(f"Error connecting to Docker: {e}")
self.client = None
def _ensure_mcp_network(self) -> str:
"""Ensure the MCP network exists and return its name."""
network_name = "mc-mcp-network"
if self.client:
networks = self.client.networks.list(names=[network_name])
if not networks:
self.client.networks.create(network_name, driver="bridge")
return network_name
def list_mcps(self) -> List[Dict[str, Any]]:
"""List all configured MCP servers."""
mcps = self.config_manager.get("mcps", [])
return mcps
def get_mcp(self, name: str) -> Optional[Dict[str, Any]]:
"""Get an MCP configuration by name."""
mcps = self.list_mcps()
for mcp in mcps:
if mcp.get("name") == name:
return mcp
return None
def add_remote_mcp(
self, name: str, url: str, headers: Dict[str, str] = None
) -> Dict[str, Any]:
"""Add a remote MCP server."""
# Create the remote MCP configuration
remote_mcp = RemoteMCP(
name=name,
url=url,
headers=headers or {},
)
# Add to the configuration
mcps = self.list_mcps()
# Remove existing MCP with the same name if it exists
mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# Add the new MCP
mcps.append(remote_mcp.model_dump())
# Save the configuration
self.config_manager.set("mcps", mcps)
return remote_mcp.model_dump()
def add_docker_mcp(
self, name: str, image: str, command: str, env: Dict[str, str] = None
) -> Dict[str, Any]:
"""Add a Docker-based MCP server."""
# Create the Docker MCP configuration
docker_mcp = DockerMCP(
name=name,
image=image,
command=command,
env=env or {},
)
# Add to the configuration
mcps = self.list_mcps()
# Remove existing MCP with the same name if it exists
mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# Add the new MCP
mcps.append(docker_mcp.model_dump())
# Save the configuration
self.config_manager.set("mcps", mcps)
return docker_mcp.model_dump()
def add_proxy_mcp(
self,
name: str,
base_image: str,
proxy_image: str,
command: str,
proxy_options: Dict[str, Any] = None,
env: Dict[str, str] = None,
) -> Dict[str, Any]:
"""Add a proxy-based MCP server."""
# Create the Proxy MCP configuration
proxy_mcp = ProxyMCP(
name=name,
base_image=base_image,
proxy_image=proxy_image,
command=command,
proxy_options=proxy_options or {},
env=env or {},
)
# Add to the configuration
mcps = self.list_mcps()
# Remove existing MCP with the same name if it exists
mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# Add the new MCP
mcps.append(proxy_mcp.model_dump())
# Save the configuration
self.config_manager.set("mcps", mcps)
return proxy_mcp.model_dump()
def remove_mcp(self, name: str) -> bool:
"""Remove an MCP server configuration."""
mcps = self.list_mcps()
# Filter out the MCP with the specified name
updated_mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# If the length hasn't changed, the MCP wasn't found
if len(mcps) == len(updated_mcps):
return False
# Save the updated configuration
self.config_manager.set("mcps", updated_mcps)
# Stop and remove the container if it exists
self.stop_mcp(name)
return True
def get_mcp_container_name(self, mcp_name: str) -> str:
"""Get the Docker container name for an MCP server."""
return f"mc_mcp_{mcp_name}"
def start_mcp(self, name: str) -> Dict[str, Any]:
"""Start an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Get the container name
container_name = self.get_mcp_container_name(name)
# Check if the container already exists
try:
container = self.client.containers.get(container_name)
# If it exists, start it if it's not running
if container.status != "running":
container.start()
# Return the container status
return {
"container_id": container.id,
"status": "running",
"name": name,
}
except NotFound:
# Container doesn't exist, we need to create it
pass
# Ensure the MCP network exists
network_name = self._ensure_mcp_network()
# Handle different MCP types
mcp_type = mcp_config.get("type")
if mcp_type == "remote":
# Remote MCP servers don't need containers
return {
"status": "not_applicable",
"name": name,
"type": "remote",
}
elif mcp_type == "docker":
# Pull the image if needed
try:
self.client.images.get(mcp_config["image"])
except ImageNotFound:
logger.info(f"Pulling image {mcp_config['image']}")
self.client.images.pull(mcp_config["image"])
# Create and start the container
container = self.client.containers.run(
image=mcp_config["image"],
command=mcp_config.get("command"),
name=container_name,
detach=True,
network=network_name,
environment=mcp_config.get("env", {}),
labels={
"mc.mcp": "true",
"mc.mcp.name": name,
"mc.mcp.type": "docker",
},
ports={
"8080/tcp": 8080, # Default SSE port
},
)
return {
"container_id": container.id,
"status": "running",
"name": name,
}
elif mcp_type == "proxy":
# For proxy, we need to create a custom Dockerfile and build an image
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a Dockerfile for the proxy
dockerfile_content = f"""
FROM {mcp_config["proxy_image"]}
# Set environment variables for the proxy
ENV MCP_BASE_IMAGE={mcp_config["base_image"]}
ENV MCP_COMMAND={mcp_config["command"]}
ENV SSE_PORT={mcp_config["proxy_options"].get("sse_port", 8080)}
ENV SSE_HOST={mcp_config["proxy_options"].get("sse_host", "0.0.0.0")}
ENV ALLOW_ORIGIN={mcp_config["proxy_options"].get("allow_origin", "*")}
# Add environment variables from the configuration
{chr(10).join([f'ENV {k}="{v}"' for k, v in mcp_config.get("env", {}).items()])}
"""
# Write the Dockerfile
dockerfile_path = os.path.join(tmp_dir, "Dockerfile")
with open(dockerfile_path, "w") as f:
f.write(dockerfile_content)
# Build the image
custom_image_name = f"mc_mcp_proxy_{name}"
logger.info(f"Building custom proxy image: {custom_image_name}")
self.client.images.build(
path=tmp_dir,
tag=custom_image_name,
rm=True,
)
# Create and start the container
container = self.client.containers.run(
image=custom_image_name,
name=container_name,
detach=True,
network=network_name,
labels={
"mc.mcp": "true",
"mc.mcp.name": name,
"mc.mcp.type": "proxy",
},
ports={
f"{mcp_config['proxy_options'].get('sse_port', 8080)}/tcp": mcp_config[
"proxy_options"
].get("sse_port", 8080),
},
)
return {
"container_id": container.id,
"status": "running",
"name": name,
}
else:
raise ValueError(f"Unsupported MCP type: {mcp_type}")
def stop_mcp(self, name: str) -> bool:
"""Stop an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have containers to stop
if mcp_config.get("type") == "remote":
return True
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get and stop the container
try:
container = self.client.containers.get(container_name)
container.stop(timeout=10)
return True
except NotFound:
# Container doesn't exist
return False
except Exception as e:
logger.error(f"Error stopping MCP container: {e}")
return False
def restart_mcp(self, name: str) -> Dict[str, Any]:
"""Restart an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have containers to restart
if mcp_config.get("type") == "remote":
return {
"status": "not_applicable",
"name": name,
"type": "remote",
}
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get and restart the container
try:
container = self.client.containers.get(container_name)
container.restart(timeout=10)
return {
"container_id": container.id,
"status": "running",
"name": name,
}
except NotFound:
# Container doesn't exist, start it
return self.start_mcp(name)
except Exception as e:
logger.error(f"Error restarting MCP container: {e}")
raise
def get_mcp_status(self, name: str) -> Dict[str, Any]:
"""Get the status of an MCP server."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have containers
if mcp_config.get("type") == "remote":
return {
"status": "not_applicable",
"name": name,
"type": "remote",
"url": mcp_config.get("url"),
}
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get the container status
try:
container = self.client.containers.get(container_name)
status = (
MCPStatus.RUNNING
if container.status == "running"
else MCPStatus.STOPPED
)
# Get container details
container_info = container.attrs
# Extract ports
ports = {}
if (
"NetworkSettings" in container_info
and "Ports" in container_info["NetworkSettings"]
):
for port, mappings in container_info["NetworkSettings"][
"Ports"
].items():
if mappings:
ports[port] = int(mappings[0]["HostPort"])
return {
"status": status.value,
"container_id": container.id,
"name": name,
"type": mcp_config.get("type"),
"image": container_info["Config"]["Image"],
"ports": ports,
"created": container_info["Created"],
}
except NotFound:
# Container doesn't exist
return {
"status": MCPStatus.NOT_FOUND.value,
"name": name,
"type": mcp_config.get("type"),
}
except Exception as e:
logger.error(f"Error getting MCP container status: {e}")
return {
"status": MCPStatus.FAILED.value,
"name": name,
"error": str(e),
}
def get_mcp_logs(self, name: str, tail: int = 100) -> str:
"""Get logs from an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have logs
if mcp_config.get("type") == "remote":
return "Remote MCPs don't have local logs"
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get the container logs
try:
container = self.client.containers.get(container_name)
logs = container.logs(tail=tail, timestamps=True).decode("utf-8")
return logs
except NotFound:
# Container doesn't exist
return f"MCP container '{name}' not found"
except Exception as e:
logger.error(f"Error getting MCP container logs: {e}")
return f"Error getting logs: {str(e)}"
def list_mcp_containers(self) -> List[MCPContainer]:
"""List all MCP containers."""
if not self.client:
raise Exception("Docker client is not available")
# Get all containers with the mc.mcp label
containers = self.client.containers.list(all=True, filters={"label": "mc.mcp"})
result = []
for container in containers:
# Get container details
container_info = container.attrs
# Extract labels
labels = container_info["Config"]["Labels"]
# Extract ports
ports = {}
if (
"NetworkSettings" in container_info
and "Ports" in container_info["NetworkSettings"]
):
for port, mappings in container_info["NetworkSettings"][
"Ports"
].items():
if mappings:
ports[port] = int(mappings[0]["HostPort"])
# Determine status
status = (
MCPStatus.RUNNING
if container.status == "running"
else MCPStatus.STOPPED
)
# Create MCPContainer object
mcp_container = MCPContainer(
name=labels.get("mc.mcp.name", "unknown"),
container_id=container.id,
status=status,
image=container_info["Config"]["Image"],
ports=ports,
created_at=container_info["Created"],
type=labels.get("mc.mcp.type", "unknown"),
)
result.append(mcp_container)
return result