feat(mcp): initial version of mcp

This commit is contained in:
2025-03-12 19:51:40 -06:00
parent 20916c5713
commit 212f271268
8 changed files with 1551 additions and 11 deletions

View File

@@ -9,20 +9,24 @@ from .container import ContainerManager
from .models import SessionStatus
from .user_config import UserConfigManager
from .session import SessionManager
from .mcp import MCPManager
app = typer.Typer(help="Monadical Container Tool")
session_app = typer.Typer(help="Manage MC sessions")
driver_app = typer.Typer(help="Manage MC drivers", no_args_is_help=True)
config_app = typer.Typer(help="Manage MC configuration")
mcp_app = typer.Typer(help="Manage MCP servers")
app.add_typer(session_app, name="session", no_args_is_help=True)
app.add_typer(driver_app, name="driver", no_args_is_help=True)
app.add_typer(config_app, name="config", no_args_is_help=True)
app.add_typer(mcp_app, name="mcp", no_args_is_help=True)
console = Console()
config_manager = ConfigManager()
user_config = UserConfigManager()
session_manager = SessionManager()
container_manager = ContainerManager(config_manager, session_manager)
container_manager = ContainerManager(config_manager, session_manager, user_config)
mcp_manager = MCPManager(config_manager=user_config)
@app.callback(invoke_without_command=True)
@@ -70,6 +74,7 @@ def list_sessions() -> None:
table.add_column("Status")
table.add_column("Ports")
table.add_column("Project")
table.add_column("MCPs")
for session in sessions:
ports_str = ", ".join(
@@ -92,6 +97,9 @@ def list_sessions() -> None:
else str(session.status)
)
# Format MCPs as a comma-separated list
mcps_str = ", ".join(session.mcps) if session.mcps else ""
table.add_row(
session.id,
session.name,
@@ -99,6 +107,7 @@ def list_sessions() -> None:
f"[{status_color}]{status_name}[/{status_color}]",
ports_str,
session.project or "",
mcps_str,
)
console.print(table)
@@ -128,6 +137,12 @@ def create_session(
"--no-mount",
help="Don't mount local directory to /app (ignored if --project is used)",
),
mcp: List[str] = typer.Option(
[],
"--mcp",
"-m",
help="Attach MCP servers to the session (can be specified multiple times)",
),
) -> None:
"""Create a new MC session"""
# Use default driver from user configuration
@@ -203,6 +218,7 @@ def create_session(
mount_local=not no_mount and user_config.get("defaults.mount_local", True),
volumes=volume_mounts,
networks=all_networks,
mcp=mcp,
)
if session:
@@ -210,6 +226,11 @@ def create_session(
console.print(f"Session ID: {session.id}")
console.print(f"Driver: {session.driver}")
if session.mcps:
console.print("MCP Servers:")
for mcp in session.mcps:
console.print(f" - {mcp}")
if session.ports:
console.print("Ports:")
for container_port, host_port in session.ports.items():
@@ -356,6 +377,12 @@ def quick_create(
"--no-mount",
help="Don't mount local directory to /app (ignored if a project is specified)",
),
mcp: List[str] = typer.Option(
[],
"--mcp",
"-m",
help="Attach MCP servers to the session (can be specified multiple times)",
),
) -> None:
"""Create a new MC session with a project repository"""
# Use user config for defaults if not specified
@@ -371,6 +398,7 @@ def quick_create(
name=name,
no_connect=no_connect,
no_mount=no_mount,
mcp=mcp,
)
@@ -739,5 +767,353 @@ def remove_volume(
console.print(f"[green]Removed volume '{volume_to_remove}' from defaults[/green]")
# MCP Management Commands
mcp_remote_app = typer.Typer(help="Manage remote MCP servers")
mcp_docker_app = typer.Typer(help="Manage Docker-based MCP servers")
mcp_proxy_app = typer.Typer(help="Manage proxy-based MCP servers")
mcp_app.add_typer(mcp_remote_app, name="remote", no_args_is_help=True)
mcp_app.add_typer(mcp_docker_app, name="docker", no_args_is_help=True)
mcp_app.add_typer(mcp_proxy_app, name="proxy", no_args_is_help=True)
@mcp_app.command("list")
def list_mcps() -> None:
"""List all configured MCP servers"""
mcps = mcp_manager.list_mcps()
if not mcps:
console.print("No MCP servers configured")
return
# Create a table with the MCP information
table = Table(show_header=True, header_style="bold")
table.add_column("Name")
table.add_column("Type")
table.add_column("Status")
table.add_column("Details")
# Check status of each MCP
for mcp in mcps:
name = mcp.get("name", "")
mcp_type = mcp.get("type", "")
try:
status_info = mcp_manager.get_mcp_status(name)
status = status_info.get("status", "unknown")
# Set status color based on status
status_color = {
"running": "green",
"stopped": "red",
"not_found": "yellow",
"not_applicable": "blue",
"failed": "red",
}.get(status, "white")
# Different details based on MCP type
if mcp_type == "remote":
details = mcp.get("url", "")
elif mcp_type == "docker":
details = mcp.get("image", "")
elif mcp_type == "proxy":
details = (
f"{mcp.get('base_image', '')} (via {mcp.get('proxy_image', '')})"
)
else:
details = ""
table.add_row(
name,
mcp_type,
f"[{status_color}]{status}[/{status_color}]",
details,
)
except Exception as e:
table.add_row(
name,
mcp_type,
"[red]error[/red]",
str(e),
)
console.print(table)
@mcp_app.command("status")
def mcp_status(name: str = typer.Argument(..., help="MCP server name")) -> None:
"""Show detailed status of an MCP server"""
try:
# Get the MCP configuration
mcp_config = mcp_manager.get_mcp(name)
if not mcp_config:
console.print(f"[red]MCP server '{name}' not found[/red]")
return
# Get status information
status_info = mcp_manager.get_mcp_status(name)
# Print detailed information
console.print(f"[bold]MCP Server:[/bold] {name}")
console.print(f"[bold]Type:[/bold] {mcp_config.get('type')}")
status = status_info.get("status")
status_color = {
"running": "green",
"stopped": "red",
"not_found": "yellow",
"not_applicable": "blue",
"failed": "red",
}.get(status, "white")
console.print(f"[bold]Status:[/bold] [{status_color}]{status}[/{status_color}]")
# Type-specific information
if mcp_config.get("type") == "remote":
console.print(f"[bold]URL:[/bold] {mcp_config.get('url')}")
if mcp_config.get("headers"):
console.print("[bold]Headers:[/bold]")
for key, value in mcp_config.get("headers", {}).items():
# Mask sensitive headers
if (
"token" in key.lower()
or "key" in key.lower()
or "auth" in key.lower()
):
console.print(f" {key}: ****")
else:
console.print(f" {key}: {value}")
elif mcp_config.get("type") in ["docker", "proxy"]:
console.print(f"[bold]Image:[/bold] {status_info.get('image')}")
if status_info.get("container_id"):
console.print(
f"[bold]Container ID:[/bold] {status_info.get('container_id')}"
)
if status_info.get("ports"):
console.print("[bold]Ports:[/bold]")
for port, host_port in status_info.get("ports", {}).items():
console.print(f" {port} -> {host_port}")
if status_info.get("created"):
console.print(f"[bold]Created:[/bold] {status_info.get('created')}")
# For proxy type, show additional information
if mcp_config.get("type") == "proxy":
console.print(
f"[bold]Base Image:[/bold] {mcp_config.get('base_image')}"
)
console.print(
f"[bold]Proxy Image:[/bold] {mcp_config.get('proxy_image')}"
)
console.print("[bold]Proxy Options:[/bold]")
for key, value in mcp_config.get("proxy_options", {}).items():
console.print(f" {key}: {value}")
except Exception as e:
console.print(f"[red]Error getting MCP status: {e}[/red]")
@mcp_app.command("start")
def start_mcp(name: str = typer.Argument(..., help="MCP server name")) -> None:
"""Start an MCP server"""
try:
with console.status(f"Starting MCP server '{name}'..."):
result = mcp_manager.start_mcp(name)
if result.get("status") == "running":
console.print(f"[green]Started MCP server '{name}'[/green]")
elif result.get("status") == "not_applicable":
console.print(
f"[blue]MCP server '{name}' is a remote type (no container to start)[/blue]"
)
else:
console.print(f"MCP server '{name}' status: {result.get('status')}")
except Exception as e:
console.print(f"[red]Error starting MCP server: {e}[/red]")
@mcp_app.command("stop")
def stop_mcp(name: str = typer.Argument(..., help="MCP server name")) -> None:
"""Stop an MCP server"""
try:
with console.status(f"Stopping MCP server '{name}'..."):
result = mcp_manager.stop_mcp(name)
if result:
console.print(f"[green]Stopped MCP server '{name}'[/green]")
else:
console.print(f"[yellow]MCP server '{name}' was not running[/yellow]")
except Exception as e:
console.print(f"[red]Error stopping MCP server: {e}[/red]")
@mcp_app.command("restart")
def restart_mcp(name: str = typer.Argument(..., help="MCP server name")) -> None:
"""Restart an MCP server"""
try:
with console.status(f"Restarting MCP server '{name}'..."):
result = mcp_manager.restart_mcp(name)
if result.get("status") == "running":
console.print(f"[green]Restarted MCP server '{name}'[/green]")
elif result.get("status") == "not_applicable":
console.print(
f"[blue]MCP server '{name}' is a remote type (no container to restart)[/blue]"
)
else:
console.print(f"MCP server '{name}' status: {result.get('status')}")
except Exception as e:
console.print(f"[red]Error restarting MCP server: {e}[/red]")
@mcp_app.command("logs")
def mcp_logs(
name: str = typer.Argument(..., help="MCP server name"),
tail: int = typer.Option(100, "--tail", "-n", help="Number of lines to show"),
) -> None:
"""Show logs from an MCP server"""
try:
logs = mcp_manager.get_mcp_logs(name, tail=tail)
console.print(logs)
except Exception as e:
console.print(f"[red]Error getting MCP logs: {e}[/red]")
@mcp_app.command("remove")
def remove_mcp(name: str = typer.Argument(..., help="MCP server name")) -> None:
"""Remove an MCP server configuration"""
try:
with console.status(f"Removing MCP server '{name}'..."):
result = mcp_manager.remove_mcp(name)
if result:
console.print(f"[green]Removed MCP server '{name}'[/green]")
else:
console.print(f"[yellow]MCP server '{name}' not found[/yellow]")
except Exception as e:
console.print(f"[red]Error removing MCP server: {e}[/red]")
@mcp_remote_app.command("add")
def add_remote_mcp(
name: str = typer.Argument(..., help="MCP server name"),
url: str = typer.Argument(..., help="URL of the remote MCP server"),
header: List[str] = typer.Option(
[], "--header", "-H", help="HTTP headers (format: KEY=VALUE)"
),
) -> None:
"""Add a remote MCP server"""
# Parse headers
headers = {}
for h in header:
if "=" in h:
key, value = h.split("=", 1)
headers[key] = value
else:
console.print(
f"[yellow]Warning: Ignoring invalid header format: {h}[/yellow]"
)
try:
with console.status(f"Adding remote MCP server '{name}'..."):
result = mcp_manager.add_remote_mcp(name, url, headers)
console.print(f"[green]Added remote MCP server '{name}'[/green]")
except Exception as e:
console.print(f"[red]Error adding remote MCP server: {e}[/red]")
@mcp_docker_app.command("add")
def add_docker_mcp(
name: str = typer.Argument(..., help="MCP server name"),
image: str = typer.Argument(..., help="Docker image for the MCP server"),
command: str = typer.Option(
"", "--command", "-c", help="Command to run in the container"
),
env: List[str] = typer.Option(
[], "--env", "-e", help="Environment variables (format: KEY=VALUE)"
),
) -> None:
"""Add a Docker-based MCP server"""
# Parse environment variables
environment = {}
for var in env:
if "=" in var:
key, value = var.split("=", 1)
environment[key] = value
else:
console.print(
f"[yellow]Warning: Ignoring invalid environment variable format: {var}[/yellow]"
)
try:
with console.status(f"Adding Docker-based MCP server '{name}'..."):
result = mcp_manager.add_docker_mcp(name, image, command, environment)
console.print(f"[green]Added Docker-based MCP server '{name}'[/green]")
except Exception as e:
console.print(f"[red]Error adding Docker-based MCP server: {e}[/red]")
@mcp_proxy_app.command("add")
def add_proxy_mcp(
name: str = typer.Argument(..., help="MCP server name"),
base_image: str = typer.Argument(..., help="Base MCP Docker image"),
proxy_image: str = typer.Option(
"ghcr.io/sparfenyuk/mcp-proxy:latest",
"--proxy-image",
help="Proxy image for MCP",
),
command: str = typer.Option(
"", "--command", "-c", help="Command to run in the container"
),
sse_port: int = typer.Option(8080, "--sse-port", help="Port for SSE server"),
sse_host: str = typer.Option("0.0.0.0", "--sse-host", help="Host for SSE server"),
allow_origin: str = typer.Option(
"*", "--allow-origin", help="CORS allow-origin header"
),
env: List[str] = typer.Option(
[], "--env", "-e", help="Environment variables (format: KEY=VALUE)"
),
) -> None:
"""Add a proxy-based MCP server"""
# Parse environment variables
environment = {}
for var in env:
if "=" in var:
key, value = var.split("=", 1)
environment[key] = value
else:
console.print(
f"[yellow]Warning: Ignoring invalid environment variable format: {var}[/yellow]"
)
# Prepare proxy options
proxy_options = {
"sse_port": sse_port,
"sse_host": sse_host,
"allow_origin": allow_origin,
}
try:
with console.status(f"Adding proxy-based MCP server '{name}'..."):
result = mcp_manager.add_proxy_mcp(
name, base_image, proxy_image, command, proxy_options, environment
)
console.print(f"[green]Added proxy-based MCP server '{name}'[/green]")
except Exception as e:
console.print(f"[red]Error adding proxy-based MCP server: {e}[/red]")
if __name__ == "__main__":
app()

View File

@@ -5,12 +5,18 @@ import docker
import hashlib
import pathlib
import concurrent.futures
import logging
from typing import Dict, List, Optional, Tuple
from docker.errors import DockerException, ImageNotFound
from .models import Session, SessionStatus
from .config import ConfigManager
from .session import SessionManager
from .mcp import MCPManager
from .user_config import UserConfigManager
# Configure logging
logger = logging.getLogger(__name__)
class ContainerManager:
@@ -18,14 +24,19 @@ class ContainerManager:
self,
config_manager: Optional[ConfigManager] = None,
session_manager: Optional[SessionManager] = None,
user_config_manager: Optional[UserConfigManager] = None,
):
self.config_manager = config_manager or ConfigManager()
self.session_manager = session_manager or SessionManager()
self.user_config_manager = user_config_manager or UserConfigManager()
self.mcp_manager = MCPManager(config_manager=self.user_config_manager)
try:
self.client = docker.from_env()
# Test connection
self.client.ping()
except DockerException as e:
logger.error(f"Error connecting to Docker: {e}")
print(f"Error connecting to Docker: {e}")
sys.exit(1)
@@ -133,6 +144,7 @@ class ContainerManager:
mount_local: bool = True,
volumes: Optional[Dict[str, Dict[str, str]]] = None,
networks: Optional[List[str]] = None,
mcp: Optional[List[str]] = None,
) -> Optional[Session]:
"""Create a new MC session
@@ -144,6 +156,7 @@ class ContainerManager:
mount_local: Whether to mount the current directory to /app
volumes: Optional additional volumes to mount (dict of {host_path: {"bind": container_path, "mode": mode}})
networks: Optional list of additional Docker networks to connect to
mcp: Optional list of MCP server names to attach to the session
"""
try:
# Validate driver exists
@@ -267,7 +280,113 @@ class ContainerManager:
"network", "mc-network"
)
# Create container with default MC network
# Get network list
network_list = [default_network]
# Process MCPs if provided
mcp_configs = []
mcp_names = []
# Ensure MCP is a list
mcps_to_process = mcp or []
# Process each MCP
for mcp_name in mcps_to_process:
# Get the MCP configuration
mcp_config = self.mcp_manager.get_mcp(mcp_name)
if not mcp_config:
print(f"Warning: MCP server '{mcp_name}' not found, skipping")
continue
# Add to the list of processed MCPs
mcp_configs.append(mcp_config)
mcp_names.append(mcp_name)
# Check if the MCP server is running (for Docker-based MCPs)
if mcp_config.get("type") in ["docker", "proxy"]:
# Ensure the MCP is running
try:
print(f"Ensuring MCP server '{mcp_name}' is running...")
self.mcp_manager.start_mcp(mcp_name)
# Add MCP network to the list
mcp_network = self.mcp_manager._ensure_mcp_network()
if mcp_network not in network_list:
network_list.append(mcp_network)
# Get MCP status to extract endpoint information
mcp_status = self.mcp_manager.get_mcp_status(mcp_name)
# Add MCP environment variables with index
idx = len(mcp_names) - 1 # 0-based index for the current MCP
if mcp_config.get("type") == "remote":
# For remote MCP, set the URL and headers
env_vars[f"MCP_{idx}_URL"] = mcp_config.get("url")
if mcp_config.get("headers"):
# Serialize headers as JSON
import json
env_vars[f"MCP_{idx}_HEADERS"] = json.dumps(
mcp_config.get("headers")
)
else:
# For Docker/proxy MCP, set the connection details
# Use the container name as hostname for internal Docker DNS resolution
container_name = self.mcp_manager.get_mcp_container_name(
mcp_name
)
env_vars[f"MCP_{idx}_HOST"] = container_name
# Default port is 8080 unless specified in status
port = next(
iter(mcp_status.get("ports", {}).values()), 8080
)
env_vars[f"MCP_{idx}_PORT"] = str(port)
env_vars[f"MCP_{idx}_URL"] = (
f"http://{container_name}:{port}/sse"
)
# Set type-specific information
env_vars[f"MCP_{idx}_TYPE"] = mcp_config.get("type")
env_vars[f"MCP_{idx}_NAME"] = mcp_name
except Exception as e:
print(f"Warning: Failed to start MCP server '{mcp_name}': {e}")
elif mcp_config.get("type") == "remote":
# For remote MCP, just set environment variables
idx = len(mcp_names) - 1 # 0-based index for the current MCP
env_vars[f"MCP_{idx}_URL"] = mcp_config.get("url")
if mcp_config.get("headers"):
# Serialize headers as JSON
import json
env_vars[f"MCP_{idx}_HEADERS"] = json.dumps(
mcp_config.get("headers")
)
# Set type-specific information
env_vars[f"MCP_{idx}_TYPE"] = "remote"
env_vars[f"MCP_{idx}_NAME"] = mcp_name
# Set environment variables for MCP count if we have any
if mcp_names:
env_vars["MCP_COUNT"] = str(len(mcp_names))
env_vars["MCP_ENABLED"] = "true"
# Serialize all MCP names as JSON
import json
env_vars["MCP_NAMES"] = json.dumps(mcp_names)
# Add user-specified networks
if networks:
for network in networks:
if network not in network_list:
network_list.append(network)
print(f"Adding network {network} to session")
# Create container
container = self.client.containers.create(
image=driver.image,
name=session_name,
@@ -283,17 +402,18 @@ class ContainerManager:
"mc.session.name": session_name,
"mc.driver": driver_name,
"mc.project": project or "",
"mc.mcps": ",".join(mcp_names) if mcp_names else "",
},
network=default_network,
network=network_list[0], # Connect to the first network initially
ports={f"{port}/tcp": None for port in driver.ports},
)
# Start container
container.start()
# Connect to additional networks if specified
if networks:
for network_name in networks:
# Connect to additional networks (after the first one in network_list)
if len(network_list) > 1:
for network_name in network_list[1:]:
try:
# Get or create the network
try:
@@ -310,6 +430,30 @@ class ContainerManager:
except DockerException as e:
print(f"Error connecting to network {network_name}: {e}")
# Connect to additional user-specified networks
if networks:
for network_name in networks:
if (
network_name not in network_list
): # Avoid connecting to the same network twice
try:
# Get or create the network
try:
network = self.client.networks.get(network_name)
except DockerException:
print(
f"Network '{network_name}' not found, creating it..."
)
network = self.client.networks.create(
network_name, driver="bridge"
)
# Connect the container to the network
network.connect(container)
print(f"Connected to network: {network_name}")
except DockerException as e:
print(f"Error connecting to network {network_name}: {e}")
# Get updated port information
container.reload()
ports = {}
@@ -333,6 +477,7 @@ class ContainerManager:
project=project,
created_at=container.attrs["Created"],
ports=ports,
mcps=mcp_names,
)
# Save session to the session manager as JSON-compatible dict

514
mcontainer/mcp.py Normal file
View File

@@ -0,0 +1,514 @@
"""
MCP (Model Control Protocol) server management for Monadical Container.
"""
import os
import docker
import logging
import tempfile
from typing import Dict, List, Optional, Any
from docker.errors import DockerException, ImageNotFound, NotFound
from .models import MCPStatus, RemoteMCP, DockerMCP, ProxyMCP, MCPContainer
from .user_config import UserConfigManager
# Configure logging
logger = logging.getLogger(__name__)
class MCPManager:
"""Manager for MCP (Model Control Protocol) servers."""
def __init__(
self,
config_manager: Optional[UserConfigManager] = None,
):
"""Initialize the MCP manager."""
self.config_manager = config_manager or UserConfigManager()
try:
self.client = docker.from_env()
# Test connection
self.client.ping()
except DockerException as e:
logger.error(f"Error connecting to Docker: {e}")
self.client = None
def _ensure_mcp_network(self) -> str:
"""Ensure the MCP network exists and return its name."""
network_name = "mc-mcp-network"
if self.client:
networks = self.client.networks.list(names=[network_name])
if not networks:
self.client.networks.create(network_name, driver="bridge")
return network_name
def list_mcps(self) -> List[Dict[str, Any]]:
"""List all configured MCP servers."""
mcps = self.config_manager.get("mcps", [])
return mcps
def get_mcp(self, name: str) -> Optional[Dict[str, Any]]:
"""Get an MCP configuration by name."""
mcps = self.list_mcps()
for mcp in mcps:
if mcp.get("name") == name:
return mcp
return None
def add_remote_mcp(
self, name: str, url: str, headers: Dict[str, str] = None
) -> Dict[str, Any]:
"""Add a remote MCP server."""
# Create the remote MCP configuration
remote_mcp = RemoteMCP(
name=name,
url=url,
headers=headers or {},
)
# Add to the configuration
mcps = self.list_mcps()
# Remove existing MCP with the same name if it exists
mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# Add the new MCP
mcps.append(remote_mcp.model_dump())
# Save the configuration
self.config_manager.set("mcps", mcps)
return remote_mcp.model_dump()
def add_docker_mcp(
self, name: str, image: str, command: str, env: Dict[str, str] = None
) -> Dict[str, Any]:
"""Add a Docker-based MCP server."""
# Create the Docker MCP configuration
docker_mcp = DockerMCP(
name=name,
image=image,
command=command,
env=env or {},
)
# Add to the configuration
mcps = self.list_mcps()
# Remove existing MCP with the same name if it exists
mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# Add the new MCP
mcps.append(docker_mcp.model_dump())
# Save the configuration
self.config_manager.set("mcps", mcps)
return docker_mcp.model_dump()
def add_proxy_mcp(
self,
name: str,
base_image: str,
proxy_image: str,
command: str,
proxy_options: Dict[str, Any] = None,
env: Dict[str, str] = None,
) -> Dict[str, Any]:
"""Add a proxy-based MCP server."""
# Create the Proxy MCP configuration
proxy_mcp = ProxyMCP(
name=name,
base_image=base_image,
proxy_image=proxy_image,
command=command,
proxy_options=proxy_options or {},
env=env or {},
)
# Add to the configuration
mcps = self.list_mcps()
# Remove existing MCP with the same name if it exists
mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# Add the new MCP
mcps.append(proxy_mcp.model_dump())
# Save the configuration
self.config_manager.set("mcps", mcps)
return proxy_mcp.model_dump()
def remove_mcp(self, name: str) -> bool:
"""Remove an MCP server configuration."""
mcps = self.list_mcps()
# Filter out the MCP with the specified name
updated_mcps = [mcp for mcp in mcps if mcp.get("name") != name]
# If the length hasn't changed, the MCP wasn't found
if len(mcps) == len(updated_mcps):
return False
# Save the updated configuration
self.config_manager.set("mcps", updated_mcps)
# Stop and remove the container if it exists
self.stop_mcp(name)
return True
def get_mcp_container_name(self, mcp_name: str) -> str:
"""Get the Docker container name for an MCP server."""
return f"mc_mcp_{mcp_name}"
def start_mcp(self, name: str) -> Dict[str, Any]:
"""Start an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Get the container name
container_name = self.get_mcp_container_name(name)
# Check if the container already exists
try:
container = self.client.containers.get(container_name)
# If it exists, start it if it's not running
if container.status != "running":
container.start()
# Return the container status
return {
"container_id": container.id,
"status": "running",
"name": name,
}
except NotFound:
# Container doesn't exist, we need to create it
pass
# Ensure the MCP network exists
network_name = self._ensure_mcp_network()
# Handle different MCP types
mcp_type = mcp_config.get("type")
if mcp_type == "remote":
# Remote MCP servers don't need containers
return {
"status": "not_applicable",
"name": name,
"type": "remote",
}
elif mcp_type == "docker":
# Pull the image if needed
try:
self.client.images.get(mcp_config["image"])
except ImageNotFound:
logger.info(f"Pulling image {mcp_config['image']}")
self.client.images.pull(mcp_config["image"])
# Create and start the container
container = self.client.containers.run(
image=mcp_config["image"],
command=mcp_config.get("command"),
name=container_name,
detach=True,
network=network_name,
environment=mcp_config.get("env", {}),
labels={
"mc.mcp": "true",
"mc.mcp.name": name,
"mc.mcp.type": "docker",
},
ports={
"8080/tcp": 8080, # Default SSE port
},
)
return {
"container_id": container.id,
"status": "running",
"name": name,
}
elif mcp_type == "proxy":
# For proxy, we need to create a custom Dockerfile and build an image
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a Dockerfile for the proxy
dockerfile_content = f"""
FROM {mcp_config["proxy_image"]}
# Set environment variables for the proxy
ENV MCP_BASE_IMAGE={mcp_config["base_image"]}
ENV MCP_COMMAND={mcp_config["command"]}
ENV SSE_PORT={mcp_config["proxy_options"].get("sse_port", 8080)}
ENV SSE_HOST={mcp_config["proxy_options"].get("sse_host", "0.0.0.0")}
ENV ALLOW_ORIGIN={mcp_config["proxy_options"].get("allow_origin", "*")}
# Add environment variables from the configuration
{chr(10).join([f'ENV {k}="{v}"' for k, v in mcp_config.get("env", {}).items()])}
"""
# Write the Dockerfile
dockerfile_path = os.path.join(tmp_dir, "Dockerfile")
with open(dockerfile_path, "w") as f:
f.write(dockerfile_content)
# Build the image
custom_image_name = f"mc_mcp_proxy_{name}"
logger.info(f"Building custom proxy image: {custom_image_name}")
self.client.images.build(
path=tmp_dir,
tag=custom_image_name,
rm=True,
)
# Create and start the container
container = self.client.containers.run(
image=custom_image_name,
name=container_name,
detach=True,
network=network_name,
labels={
"mc.mcp": "true",
"mc.mcp.name": name,
"mc.mcp.type": "proxy",
},
ports={
f"{mcp_config['proxy_options'].get('sse_port', 8080)}/tcp": mcp_config[
"proxy_options"
].get("sse_port", 8080),
},
)
return {
"container_id": container.id,
"status": "running",
"name": name,
}
else:
raise ValueError(f"Unsupported MCP type: {mcp_type}")
def stop_mcp(self, name: str) -> bool:
"""Stop an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have containers to stop
if mcp_config.get("type") == "remote":
return True
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get and stop the container
try:
container = self.client.containers.get(container_name)
container.stop(timeout=10)
return True
except NotFound:
# Container doesn't exist
return False
except Exception as e:
logger.error(f"Error stopping MCP container: {e}")
return False
def restart_mcp(self, name: str) -> Dict[str, Any]:
"""Restart an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have containers to restart
if mcp_config.get("type") == "remote":
return {
"status": "not_applicable",
"name": name,
"type": "remote",
}
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get and restart the container
try:
container = self.client.containers.get(container_name)
container.restart(timeout=10)
return {
"container_id": container.id,
"status": "running",
"name": name,
}
except NotFound:
# Container doesn't exist, start it
return self.start_mcp(name)
except Exception as e:
logger.error(f"Error restarting MCP container: {e}")
raise
def get_mcp_status(self, name: str) -> Dict[str, Any]:
"""Get the status of an MCP server."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have containers
if mcp_config.get("type") == "remote":
return {
"status": "not_applicable",
"name": name,
"type": "remote",
"url": mcp_config.get("url"),
}
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get the container status
try:
container = self.client.containers.get(container_name)
status = (
MCPStatus.RUNNING
if container.status == "running"
else MCPStatus.STOPPED
)
# Get container details
container_info = container.attrs
# Extract ports
ports = {}
if (
"NetworkSettings" in container_info
and "Ports" in container_info["NetworkSettings"]
):
for port, mappings in container_info["NetworkSettings"][
"Ports"
].items():
if mappings:
ports[port] = int(mappings[0]["HostPort"])
return {
"status": status.value,
"container_id": container.id,
"name": name,
"type": mcp_config.get("type"),
"image": container_info["Config"]["Image"],
"ports": ports,
"created": container_info["Created"],
}
except NotFound:
# Container doesn't exist
return {
"status": MCPStatus.NOT_FOUND.value,
"name": name,
"type": mcp_config.get("type"),
}
except Exception as e:
logger.error(f"Error getting MCP container status: {e}")
return {
"status": MCPStatus.FAILED.value,
"name": name,
"error": str(e),
}
def get_mcp_logs(self, name: str, tail: int = 100) -> str:
"""Get logs from an MCP server container."""
if not self.client:
raise Exception("Docker client is not available")
# Get the MCP configuration
mcp_config = self.get_mcp(name)
if not mcp_config:
raise ValueError(f"MCP server '{name}' not found")
# Remote MCPs don't have logs
if mcp_config.get("type") == "remote":
return "Remote MCPs don't have local logs"
# Get the container name
container_name = self.get_mcp_container_name(name)
# Try to get the container logs
try:
container = self.client.containers.get(container_name)
logs = container.logs(tail=tail, timestamps=True).decode("utf-8")
return logs
except NotFound:
# Container doesn't exist
return f"MCP container '{name}' not found"
except Exception as e:
logger.error(f"Error getting MCP container logs: {e}")
return f"Error getting logs: {str(e)}"
def list_mcp_containers(self) -> List[MCPContainer]:
"""List all MCP containers."""
if not self.client:
raise Exception("Docker client is not available")
# Get all containers with the mc.mcp label
containers = self.client.containers.list(all=True, filters={"label": "mc.mcp"})
result = []
for container in containers:
# Get container details
container_info = container.attrs
# Extract labels
labels = container_info["Config"]["Labels"]
# Extract ports
ports = {}
if (
"NetworkSettings" in container_info
and "Ports" in container_info["NetworkSettings"]
):
for port, mappings in container_info["NetworkSettings"][
"Ports"
].items():
if mappings:
ports[port] = int(mappings[0]["HostPort"])
# Determine status
status = (
MCPStatus.RUNNING
if container.status == "running"
else MCPStatus.STOPPED
)
# Create MCPContainer object
mcp_container = MCPContainer(
name=labels.get("mc.mcp.name", "unknown"),
container_id=container.id,
status=status,
image=container_info["Config"]["Image"],
ports=ports,
created_at=container_info["Created"],
type=labels.get("mc.mcp.type", "unknown"),
)
result.append(mcp_container)
return result

View File

@@ -1,5 +1,5 @@
from enum import Enum
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union, Any
from pydantic import BaseModel, Field
@@ -10,6 +10,13 @@ class SessionStatus(str, Enum):
FAILED = "failed"
class MCPStatus(str, Enum):
RUNNING = "running"
STOPPED = "stopped"
NOT_FOUND = "not_found"
FAILED = "failed"
class DriverEnvironmentVariable(BaseModel):
name: str
description: str
@@ -48,6 +55,44 @@ class Driver(BaseModel):
persistent_configs: List[PersistentConfig] = []
class RemoteMCP(BaseModel):
name: str
type: str = "remote"
url: str
headers: Dict[str, str] = Field(default_factory=dict)
class DockerMCP(BaseModel):
name: str
type: str = "docker"
image: str
command: str
env: Dict[str, str] = Field(default_factory=dict)
class ProxyMCP(BaseModel):
name: str
type: str = "proxy"
base_image: str
proxy_image: str
command: str
proxy_options: Dict[str, Any] = Field(default_factory=dict)
env: Dict[str, str] = Field(default_factory=dict)
MCP = Union[RemoteMCP, DockerMCP, ProxyMCP]
class MCPContainer(BaseModel):
name: str
container_id: str
status: MCPStatus
image: str
ports: Dict[str, int] = Field(default_factory=dict)
created_at: str
type: str
class Session(BaseModel):
id: str
name: str
@@ -58,6 +103,7 @@ class Session(BaseModel):
project: Optional[str] = None
created_at: str
ports: Dict[int, int] = Field(default_factory=dict)
mcps: List[str] = Field(default_factory=list) # List of MCP server names
class Config(BaseModel):
@@ -66,3 +112,4 @@ class Config(BaseModel):
defaults: Dict[str, object] = Field(
default_factory=dict
) # Can store strings, booleans, or other values
mcps: List[Dict[str, Any]] = Field(default_factory=list)