""" MCP (Model Control Protocol) server management for Monadical Container. """ import os import docker import logging import tempfile from typing import Dict, List, Optional, Any from docker.errors import DockerException, ImageNotFound, NotFound from .models import MCPStatus, RemoteMCP, DockerMCP, ProxyMCP, MCPContainer from .user_config import UserConfigManager # Configure logging logger = logging.getLogger(__name__) class MCPManager: """Manager for MCP (Model Control Protocol) servers.""" def __init__( self, config_manager: Optional[UserConfigManager] = None, ): """Initialize the MCP manager.""" self.config_manager = config_manager or UserConfigManager() try: self.client = docker.from_env() # Test connection self.client.ping() except DockerException as e: logger.error(f"Error connecting to Docker: {e}") self.client = None def _ensure_mcp_network(self) -> str: """Ensure the MCP network exists and return its name.""" network_name = "mc-mcp-network" if self.client: networks = self.client.networks.list(names=[network_name]) if not networks: self.client.networks.create(network_name, driver="bridge") return network_name def list_mcps(self) -> List[Dict[str, Any]]: """List all configured MCP servers.""" mcps = self.config_manager.get("mcps", []) return mcps def get_mcp(self, name: str) -> Optional[Dict[str, Any]]: """Get an MCP configuration by name.""" mcps = self.list_mcps() for mcp in mcps: if mcp.get("name") == name: return mcp return None def add_remote_mcp( self, name: str, url: str, headers: Dict[str, str] = None ) -> Dict[str, Any]: """Add a remote MCP server.""" # Create the remote MCP configuration remote_mcp = RemoteMCP( name=name, url=url, headers=headers or {}, ) # Add to the configuration mcps = self.list_mcps() # Remove existing MCP with the same name if it exists mcps = [mcp for mcp in mcps if mcp.get("name") != name] # Add the new MCP mcps.append(remote_mcp.model_dump()) # Save the configuration self.config_manager.set("mcps", mcps) return remote_mcp.model_dump() def add_docker_mcp( self, name: str, image: str, command: str, env: Dict[str, str] = None ) -> Dict[str, Any]: """Add a Docker-based MCP server.""" # Create the Docker MCP configuration docker_mcp = DockerMCP( name=name, image=image, command=command, env=env or {}, ) # Add to the configuration mcps = self.list_mcps() # Remove existing MCP with the same name if it exists mcps = [mcp for mcp in mcps if mcp.get("name") != name] # Add the new MCP mcps.append(docker_mcp.model_dump()) # Save the configuration self.config_manager.set("mcps", mcps) return docker_mcp.model_dump() def add_proxy_mcp( self, name: str, base_image: str, proxy_image: str, command: str, proxy_options: Dict[str, Any] = None, env: Dict[str, str] = None, ) -> Dict[str, Any]: """Add a proxy-based MCP server.""" # Create the Proxy MCP configuration proxy_mcp = ProxyMCP( name=name, base_image=base_image, proxy_image=proxy_image, command=command, proxy_options=proxy_options or {}, env=env or {}, ) # Add to the configuration mcps = self.list_mcps() # Remove existing MCP with the same name if it exists mcps = [mcp for mcp in mcps if mcp.get("name") != name] # Add the new MCP mcps.append(proxy_mcp.model_dump()) # Save the configuration self.config_manager.set("mcps", mcps) return proxy_mcp.model_dump() def remove_mcp(self, name: str) -> bool: """Remove an MCP server configuration.""" mcps = self.list_mcps() # Filter out the MCP with the specified name updated_mcps = [mcp for mcp in mcps if mcp.get("name") != name] # If the length hasn't changed, the MCP wasn't found if len(mcps) == len(updated_mcps): return False # Save the updated configuration self.config_manager.set("mcps", updated_mcps) # Stop and remove the container if it exists self.stop_mcp(name) return True def get_mcp_container_name(self, mcp_name: str) -> str: """Get the Docker container name for an MCP server.""" return f"mc_mcp_{mcp_name}" def start_mcp(self, name: str) -> Dict[str, Any]: """Start an MCP server container.""" if not self.client: raise Exception("Docker client is not available") # Get the MCP configuration mcp_config = self.get_mcp(name) if not mcp_config: raise ValueError(f"MCP server '{name}' not found") # Get the container name container_name = self.get_mcp_container_name(name) # Check if the container already exists try: container = self.client.containers.get(container_name) # If it exists, start it if it's not running if container.status != "running": container.start() # Return the container status return { "container_id": container.id, "status": "running", "name": name, } except NotFound: # Container doesn't exist, we need to create it pass # Ensure the MCP network exists network_name = self._ensure_mcp_network() # Handle different MCP types mcp_type = mcp_config.get("type") if mcp_type == "remote": # Remote MCP servers don't need containers return { "status": "not_applicable", "name": name, "type": "remote", } elif mcp_type == "docker": # Pull the image if needed try: self.client.images.get(mcp_config["image"]) except ImageNotFound: logger.info(f"Pulling image {mcp_config['image']}") self.client.images.pull(mcp_config["image"]) # Create and start the container container = self.client.containers.run( image=mcp_config["image"], command=mcp_config.get("command"), name=container_name, detach=True, network=network_name, environment=mcp_config.get("env", {}), labels={ "mc.mcp": "true", "mc.mcp.name": name, "mc.mcp.type": "docker", }, ports={ "8080/tcp": 8080, # Default SSE port }, ) return { "container_id": container.id, "status": "running", "name": name, } elif mcp_type == "proxy": # For proxy, we need to create a custom Dockerfile and build an image with tempfile.TemporaryDirectory() as tmp_dir: # Create entrypoint script for mcp-proxy that runs the base MCP image entrypoint_script = """#!/bin/sh echo "Starting MCP proxy with base image $MCP_BASE_IMAGE (command: $MCP_COMMAND) on port $SSE_PORT" # Verify if Docker socket is available if [ ! -S /var/run/docker.sock ]; then echo "ERROR: Docker socket not available. Cannot run base MCP image." echo "Make sure the Docker socket is mounted from the host." # Create a minimal fallback server for testing cat > /tmp/fallback_server.py << 'EOF' import json, sys, time print(json.dumps({"type": "ready", "message": "Fallback server - Docker socket not available"})) sys.stdout.flush() while True: line = sys.stdin.readline().strip() if line: try: data = json.loads(line) if data.get("type") == "ping": print(json.dumps({"type": "pong", "id": data.get("id")})) else: print(json.dumps({"type": "error", "message": "Docker socket not available"})) except: print(json.dumps({"type": "error"})) sys.stdout.flush() time.sleep(1) EOF exec mcp-proxy \ --sse-port "$SSE_PORT" \ --sse-host "$SSE_HOST" \ --allow-origin "$ALLOW_ORIGIN" \ --pass-environment \ -- \ python /tmp/fallback_server.py exit 1 fi # Pull the base MCP image echo "Pulling base MCP image: $MCP_BASE_IMAGE" docker pull "$MCP_BASE_IMAGE" || true # Prepare the command to run the MCP server if [ -n "$MCP_COMMAND" ]; then CMD="$MCP_COMMAND" else # Default to empty if no command specified CMD="" fi echo "Running MCP server from image $MCP_BASE_IMAGE with command: $CMD" # Run the actual MCP server in the base image and pipe its I/O to mcp-proxy # Using docker run without -d to keep stdio connected exec mcp-proxy \ --sse-port "$SSE_PORT" \ --sse-host "$SSE_HOST" \ --allow-origin "$ALLOW_ORIGIN" \ --pass-environment \ -- \ docker run --rm -i "$MCP_BASE_IMAGE" $CMD """ # Write the entrypoint script entrypoint_path = os.path.join(tmp_dir, "entrypoint.sh") with open(entrypoint_path, "w") as f: f.write(entrypoint_script) # Create a Dockerfile for the proxy dockerfile_content = f""" FROM {mcp_config["proxy_image"]} # Install Docker CLI (trying multiple package managers to handle different base images) USER root RUN (apt-get update && apt-get install -y docker.io) || \\ (apt-get update && apt-get install -y docker-ce-cli) || \\ (apk add --no-cache docker-cli) || \\ (yum install -y docker) || \\ echo "WARNING: Could not install Docker CLI - will fall back to minimal MCP server" # Set environment variables for the proxy ENV MCP_BASE_IMAGE={mcp_config["base_image"]} ENV MCP_COMMAND={mcp_config.get("command", "")} ENV SSE_PORT={mcp_config["proxy_options"].get("sse_port", 8080)} ENV SSE_HOST={mcp_config["proxy_options"].get("sse_host", "0.0.0.0")} ENV ALLOW_ORIGIN={mcp_config["proxy_options"].get("allow_origin", "*")} ENV DEBUG=1 # Add environment variables from the configuration {chr(10).join([f'ENV {k}="{v}"' for k, v in mcp_config.get("env", {}).items()])} # Add entrypoint script COPY entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] """ # Write the Dockerfile dockerfile_path = os.path.join(tmp_dir, "Dockerfile") with open(dockerfile_path, "w") as f: f.write(dockerfile_content) # Build the image custom_image_name = f"mc_mcp_proxy_{name}" logger.info(f"Building custom proxy image: {custom_image_name}") self.client.images.build( path=tmp_dir, tag=custom_image_name, rm=True, ) # Format command for the Docker entrypoint arguments # The MCP proxy container will handle this internally based on # the MCP_BASE_IMAGE and MCP_COMMAND env vars we set logger.info( f"Starting MCP proxy with base_image={mcp_config['base_image']}, command={mcp_config.get('command', '')}" ) # Create and start the container container = self.client.containers.run( image=custom_image_name, name=container_name, detach=True, network=network_name, volumes={ "/var/run/docker.sock": { "bind": "/var/run/docker.sock", "mode": "rw", } }, labels={ "mc.mcp": "true", "mc.mcp.name": name, "mc.mcp.type": "proxy", }, ports={ f"{mcp_config['proxy_options'].get('sse_port', 8080)}/tcp": mcp_config[ "proxy_options" ].get("sse_port", 8080), }, ) return { "container_id": container.id, "status": "running", "name": name, } else: raise ValueError(f"Unsupported MCP type: {mcp_type}") def stop_mcp(self, name: str) -> bool: """Stop an MCP server container.""" if not self.client: raise Exception("Docker client is not available") # Get the MCP configuration mcp_config = self.get_mcp(name) if not mcp_config: raise ValueError(f"MCP server '{name}' not found") # Remote MCPs don't have containers to stop if mcp_config.get("type") == "remote": return True # Get the container name container_name = self.get_mcp_container_name(name) # Try to get and stop the container try: container = self.client.containers.get(container_name) container.stop(timeout=10) return True except NotFound: # Container doesn't exist return False except Exception as e: logger.error(f"Error stopping MCP container: {e}") return False def restart_mcp(self, name: str) -> Dict[str, Any]: """Restart an MCP server container.""" if not self.client: raise Exception("Docker client is not available") # Get the MCP configuration mcp_config = self.get_mcp(name) if not mcp_config: raise ValueError(f"MCP server '{name}' not found") # Remote MCPs don't have containers to restart if mcp_config.get("type") == "remote": return { "status": "not_applicable", "name": name, "type": "remote", } # Get the container name container_name = self.get_mcp_container_name(name) # Try to get and restart the container try: container = self.client.containers.get(container_name) container.restart(timeout=10) return { "container_id": container.id, "status": "running", "name": name, } except NotFound: # Container doesn't exist, start it return self.start_mcp(name) except Exception as e: logger.error(f"Error restarting MCP container: {e}") raise def get_mcp_status(self, name: str) -> Dict[str, Any]: """Get the status of an MCP server.""" if not self.client: raise Exception("Docker client is not available") # Get the MCP configuration mcp_config = self.get_mcp(name) if not mcp_config: raise ValueError(f"MCP server '{name}' not found") # Remote MCPs don't have containers if mcp_config.get("type") == "remote": return { "status": "not_applicable", "name": name, "type": "remote", "url": mcp_config.get("url"), } # Get the container name container_name = self.get_mcp_container_name(name) # Try to get the container status try: container = self.client.containers.get(container_name) status = ( MCPStatus.RUNNING if container.status == "running" else MCPStatus.STOPPED ) # Get container details container_info = container.attrs # Extract ports ports = {} if ( "NetworkSettings" in container_info and "Ports" in container_info["NetworkSettings"] ): for port, mappings in container_info["NetworkSettings"][ "Ports" ].items(): if mappings: ports[port] = int(mappings[0]["HostPort"]) return { "status": status.value, "container_id": container.id, "name": name, "type": mcp_config.get("type"), "image": container_info["Config"]["Image"], "ports": ports, "created": container_info["Created"], } except NotFound: # Container doesn't exist return { "status": MCPStatus.NOT_FOUND.value, "name": name, "type": mcp_config.get("type"), } except Exception as e: logger.error(f"Error getting MCP container status: {e}") return { "status": MCPStatus.FAILED.value, "name": name, "error": str(e), } def get_mcp_logs(self, name: str, tail: int = 100) -> str: """Get logs from an MCP server container.""" if not self.client: raise Exception("Docker client is not available") # Get the MCP configuration mcp_config = self.get_mcp(name) if not mcp_config: raise ValueError(f"MCP server '{name}' not found") # Remote MCPs don't have logs if mcp_config.get("type") == "remote": return "Remote MCPs don't have local logs" # Get the container name container_name = self.get_mcp_container_name(name) # Try to get the container logs try: container = self.client.containers.get(container_name) logs = container.logs(tail=tail, timestamps=True).decode("utf-8") return logs except NotFound: # Container doesn't exist return f"MCP container '{name}' not found" except Exception as e: logger.error(f"Error getting MCP container logs: {e}") return f"Error getting logs: {str(e)}" def list_mcp_containers(self) -> List[MCPContainer]: """List all MCP containers.""" if not self.client: raise Exception("Docker client is not available") # Get all containers with the mc.mcp label containers = self.client.containers.list(all=True, filters={"label": "mc.mcp"}) result = [] for container in containers: # Get container details container_info = container.attrs # Extract labels labels = container_info["Config"]["Labels"] # Extract ports ports = {} if ( "NetworkSettings" in container_info and "Ports" in container_info["NetworkSettings"] ): for port, mappings in container_info["NetworkSettings"][ "Ports" ].items(): if mappings: ports[port] = int(mappings[0]["HostPort"]) # Determine status status = ( MCPStatus.RUNNING if container.status == "running" else MCPStatus.STOPPED ) # Create MCPContainer object mcp_container = MCPContainer( name=labels.get("mc.mcp.name", "unknown"), container_id=container.id, status=status, image=container_info["Config"]["Image"], ports=ports, created_at=container_info["Created"], type=labels.get("mc.mcp.type", "unknown"), ) result.append(mcp_container) return result