""" CLI for Monadical Container Tool. """ import os from typing import List, Optional import typer from rich.console import Console from rich.table import Table from .config import ConfigManager from .container import ContainerManager from .models import SessionStatus from .user_config import UserConfigManager from .session import SessionManager from .mcp import MCPManager app = typer.Typer(help="Monadical Container Tool") session_app = typer.Typer(help="Manage MC sessions") driver_app = typer.Typer(help="Manage MC drivers", no_args_is_help=True) config_app = typer.Typer(help="Manage MC configuration") mcp_app = typer.Typer(help="Manage MCP servers") app.add_typer(session_app, name="session", no_args_is_help=True) app.add_typer(driver_app, name="driver", no_args_is_help=True) app.add_typer(config_app, name="config", no_args_is_help=True) app.add_typer(mcp_app, name="mcp", no_args_is_help=True) console = Console() config_manager = ConfigManager() user_config = UserConfigManager() session_manager = SessionManager() container_manager = ContainerManager(config_manager, session_manager, user_config) mcp_manager = MCPManager(config_manager=user_config) @app.callback(invoke_without_command=True) def main(ctx: typer.Context) -> None: """Monadical Container Tool""" # If no command is specified, create a session if ctx.invoked_subcommand is None: create_session( driver=None, project=None, env=[], volume=[], network=[], name=None, no_connect=False, no_mount=False, ) @app.command() def version() -> None: """Show MC version information""" from importlib.metadata import version as get_version try: version_str = get_version("mcontainer") console.print(f"MC - Monadical Container Tool v{version_str}") except Exception: console.print("MC - Monadical Container Tool (development version)") @session_app.command("list") def list_sessions() -> None: """List active MC sessions""" sessions = container_manager.list_sessions() if not sessions: console.print("No active sessions found") return table = Table(show_header=True, header_style="bold") table.add_column("ID") table.add_column("Name") table.add_column("Driver") table.add_column("Status") table.add_column("Ports") table.add_column("Project") table.add_column("MCPs") for session in sessions: ports_str = ", ".join( [ f"{container_port}:{host_port}" for container_port, host_port in session.ports.items() ] ) status_color = { SessionStatus.RUNNING: "green", SessionStatus.STOPPED: "red", SessionStatus.CREATING: "yellow", SessionStatus.FAILED: "red", }.get(session.status, "white") status_name = ( session.status.value if hasattr(session.status, "value") else str(session.status) ) # Format MCPs as a comma-separated list mcps_str = ", ".join(session.mcps) if session.mcps else "" table.add_row( session.id, session.name, session.driver, f"[{status_color}]{status_name}[/{status_color}]", ports_str, session.project or "", mcps_str, ) console.print(table) @session_app.command("create") def create_session( driver: Optional[str] = typer.Option(None, "--driver", "-d", help="Driver to use"), project: Optional[str] = typer.Option( None, "--project", "-p", help="Project repository URL" ), env: List[str] = typer.Option( [], "--env", "-e", help="Environment variables (KEY=VALUE)" ), volume: List[str] = typer.Option( [], "--volume", "-v", help="Mount volumes (LOCAL_PATH:CONTAINER_PATH)" ), network: List[str] = typer.Option( [], "--network", "-N", help="Connect to additional Docker networks" ), name: Optional[str] = typer.Option(None, "--name", "-n", help="Session name"), no_connect: bool = typer.Option( False, "--no-connect", help="Don't automatically connect to the session" ), no_mount: bool = typer.Option( False, "--no-mount", help="Don't mount local directory to /app (ignored if --project is used)", ), mcp: List[str] = typer.Option( [], "--mcp", "-m", help="Attach MCP servers to the session (can be specified multiple times)", ), ) -> None: """Create a new MC session""" # Use default driver from user configuration if not driver: driver = user_config.get( "defaults.driver", config_manager.config.defaults.get("driver", "goose") ) # Start with environment variables from user configuration environment = user_config.get_environment_variables() # Override with environment variables from command line for var in env: if "=" in var: key, value = var.split("=", 1) environment[key] = value else: console.print( f"[yellow]Warning: Ignoring invalid environment variable format: {var}[/yellow]" ) # Parse volume mounts volume_mounts = {} # Get default volumes from user config default_volumes = user_config.get("defaults.volumes", []) # Combine default volumes with user-specified volumes all_volumes = default_volumes + list(volume) for vol in all_volumes: if ":" in vol: local_path, container_path = vol.split(":", 1) # Convert to absolute path if relative if not os.path.isabs(local_path): local_path = os.path.abspath(local_path) # Validate local path exists if not os.path.exists(local_path): console.print( f"[yellow]Warning: Local path '{local_path}' does not exist. Volume will not be mounted.[/yellow]" ) continue # Add to volume mounts (later entries override earlier ones with same host path) volume_mounts[local_path] = {"bind": container_path, "mode": "rw"} else: console.print( f"[yellow]Warning: Ignoring invalid volume format: {vol}. Use LOCAL_PATH:CONTAINER_PATH.[/yellow]" ) # Get default networks from user config default_networks = user_config.get("defaults.networks", []) # Combine default networks with user-specified networks, removing duplicates all_networks = list(set(default_networks + network)) if all_networks: console.print(f"Networks: {', '.join(all_networks)}") # Show volumes that will be mounted if volume_mounts: console.print("Volumes:") for host_path, mount_info in volume_mounts.items(): console.print(f" {host_path} -> {mount_info['bind']}") with console.status(f"Creating session with driver '{driver}'..."): session = container_manager.create_session( driver_name=driver, project=project, environment=environment, session_name=name, mount_local=not no_mount and user_config.get("defaults.mount_local", True), volumes=volume_mounts, networks=all_networks, mcp=mcp, ) if session: console.print("[green]Session created successfully![/green]") console.print(f"Session ID: {session.id}") console.print(f"Driver: {session.driver}") if session.mcps: console.print("MCP Servers:") for mcp in session.mcps: console.print(f" - {mcp}") if session.ports: console.print("Ports:") for container_port, host_port in session.ports.items(): console.print(f" {container_port} -> {host_port}") # Auto-connect based on user config, unless overridden by --no-connect flag auto_connect = user_config.get("defaults.connect", True) if not no_connect and auto_connect: container_manager.connect_session(session.id) else: console.print( f"\nConnect to the session with:\n mc session connect {session.id}" ) else: console.print("[red]Failed to create session[/red]") @session_app.command("close") def close_session( session_id: Optional[str] = typer.Argument(None, help="Session ID to close"), all_sessions: bool = typer.Option(False, "--all", help="Close all active sessions"), ) -> None: """Close a MC session or all sessions""" if all_sessions: # Get sessions first to display them sessions = container_manager.list_sessions() if not sessions: console.print("No active sessions to close") return console.print(f"Closing {len(sessions)} sessions...") # Simple progress function that prints a line when a session is closed def update_progress(session_id, status, message): if status == "completed": console.print( f"[green]Session {session_id} closed successfully[/green]" ) elif status == "failed": console.print( f"[red]Failed to close session {session_id}: {message}[/red]" ) # Start closing sessions with progress updates count, success = container_manager.close_all_sessions(update_progress) # Final result if success: console.print(f"[green]{count} sessions closed successfully[/green]") else: console.print("[red]Failed to close all sessions[/red]") elif session_id: with console.status(f"Closing session {session_id}..."): success = container_manager.close_session(session_id) if success: console.print(f"[green]Session {session_id} closed successfully[/green]") else: console.print(f"[red]Failed to close session {session_id}[/red]") else: console.print("[red]Error: Please provide a session ID or use --all flag[/red]") @session_app.command("connect") def connect_session( session_id: str = typer.Argument(..., help="Session ID to connect to"), ) -> None: """Connect to a MC session""" console.print(f"Connecting to session {session_id}...") success = container_manager.connect_session(session_id) if not success: console.print(f"[red]Failed to connect to session {session_id}[/red]") @session_app.command("logs") def session_logs( session_id: str = typer.Argument(..., help="Session ID to get logs from"), follow: bool = typer.Option(False, "--follow", "-f", help="Follow log output"), init: bool = typer.Option( False, "--init", "-i", help="Show initialization logs instead of container logs" ), ) -> None: """Stream logs from a MC session""" if init: # Show initialization logs if follow: console.print( f"Streaming initialization logs from session {session_id}... (Ctrl+C to exit)" ) container_manager.get_init_logs(session_id, follow=True) else: logs = container_manager.get_init_logs(session_id) if logs: console.print(logs) else: # Show regular container logs if follow: console.print( f"Streaming logs from session {session_id}... (Ctrl+C to exit)" ) container_manager.get_session_logs(session_id, follow=True) else: logs = container_manager.get_session_logs(session_id) if logs: console.print(logs) @app.command() def stop() -> None: """Stop the current MC session (from inside the container)""" # Check if running inside a container if not os.path.exists("/.dockerenv"): console.print( "[red]This command can only be run from inside a MC container[/red]" ) return # Stop the container from inside console.print("Stopping the current session...") os.system("kill 1") # Send SIGTERM to PID 1 (container's init process) # Main CLI entry point that handles project repository URLs @app.command(name="") def quick_create( project: Optional[str] = typer.Argument(..., help="Project repository URL"), driver: Optional[str] = typer.Option(None, "--driver", "-d", help="Driver to use"), env: List[str] = typer.Option( [], "--env", "-e", help="Environment variables (KEY=VALUE)" ), volume: List[str] = typer.Option( [], "--volume", "-v", help="Mount volumes (LOCAL_PATH:CONTAINER_PATH)" ), network: List[str] = typer.Option( [], "--network", "-N", help="Connect to additional Docker networks" ), name: Optional[str] = typer.Option(None, "--name", "-n", help="Session name"), no_connect: bool = typer.Option( False, "--no-connect", help="Don't automatically connect to the session" ), no_mount: bool = typer.Option( False, "--no-mount", help="Don't mount local directory to /app (ignored if a project is specified)", ), mcp: List[str] = typer.Option( [], "--mcp", "-m", help="Attach MCP servers to the session (can be specified multiple times)", ), ) -> None: """Create a new MC session with a project repository""" # Use user config for defaults if not specified if not driver: driver = user_config.get("defaults.driver") create_session( driver=driver, project=project, env=env, volume=volume, network=network, name=name, no_connect=no_connect, no_mount=no_mount, mcp=mcp, ) @driver_app.command("list") def list_drivers() -> None: """List available MC drivers""" drivers = config_manager.list_drivers() if not drivers: console.print("No drivers found") return table = Table(show_header=True, header_style="bold") table.add_column("Name") table.add_column("Description") table.add_column("Version") table.add_column("Maintainer") table.add_column("Image") for name, driver in drivers.items(): table.add_row( driver.name, driver.description, driver.version, driver.maintainer, driver.image, ) console.print(table) @driver_app.command("build") def build_driver( driver_name: str = typer.Argument(..., help="Driver name to build"), tag: str = typer.Option("latest", "--tag", "-t", help="Image tag"), push: bool = typer.Option( False, "--push", "-p", help="Push image to registry after building" ), ) -> None: """Build a driver Docker image""" # Get driver path driver_path = config_manager.get_driver_path(driver_name) if not driver_path: console.print(f"[red]Driver '{driver_name}' not found[/red]") return # Check if Dockerfile exists dockerfile_path = driver_path / "Dockerfile" if not dockerfile_path.exists(): console.print(f"[red]Dockerfile not found in {driver_path}[/red]") return # Build image name image_name = f"monadical/mc-{driver_name}:{tag}" # Build the image with console.status(f"Building image {image_name}..."): result = os.system(f"cd {driver_path} && docker build -t {image_name} .") if result != 0: console.print("[red]Failed to build driver image[/red]") return console.print(f"[green]Successfully built image: {image_name}[/green]") # Push if requested if push: with console.status(f"Pushing image {image_name}..."): result = os.system(f"docker push {image_name}") if result != 0: console.print("[red]Failed to push driver image[/red]") return console.print(f"[green]Successfully pushed image: {image_name}[/green]") @driver_app.command("info") def driver_info( driver_name: str = typer.Argument(..., help="Driver name to get info for"), ) -> None: """Show detailed information about a driver""" driver = config_manager.get_driver(driver_name) if not driver: console.print(f"[red]Driver '{driver_name}' not found[/red]") return console.print(f"[bold]Driver: {driver.name}[/bold]") console.print(f"Description: {driver.description}") console.print(f"Version: {driver.version}") console.print(f"Maintainer: {driver.maintainer}") console.print(f"Image: {driver.image}") if driver.ports: console.print("\n[bold]Ports:[/bold]") for port in driver.ports: console.print(f" {port}") # Get driver path driver_path = config_manager.get_driver_path(driver_name) if driver_path: console.print(f"\n[bold]Path:[/bold] {driver_path}") # Check for README readme_path = driver_path / "README.md" if readme_path.exists(): console.print("\n[bold]README:[/bold]") with open(readme_path, "r") as f: console.print(f.read()) # Create a network subcommand for config network_app = typer.Typer(help="Manage default networks") config_app.add_typer(network_app, name="network", no_args_is_help=True) # Create a volume subcommand for config volume_app = typer.Typer(help="Manage default volumes") config_app.add_typer(volume_app, name="volume", no_args_is_help=True) # Configuration commands @config_app.command("list") def list_config() -> None: """List all configuration values""" # Create table table = Table(show_header=True, header_style="bold") table.add_column("Configuration", style="cyan") table.add_column("Value") # Add rows from flattened config for key, value in user_config.list_config(): table.add_row(key, str(value)) console.print(table) @config_app.command("get") def get_config( key: str = typer.Argument( ..., help="Configuration key to get (e.g., langfuse.url)" ), ) -> None: """Get a configuration value""" value = user_config.get(key) if value is None: console.print(f"[yellow]Configuration key '{key}' not found[/yellow]") return # Mask sensitive values if ( any(substr in key.lower() for substr in ["key", "token", "secret", "password"]) and value ): display_value = "*****" else: display_value = value console.print(f"{key} = {display_value}") @config_app.command("set") def set_config( key: str = typer.Argument( ..., help="Configuration key to set (e.g., langfuse.url)" ), value: str = typer.Argument(..., help="Value to set"), ) -> None: """Set a configuration value""" try: # Convert string value to appropriate type if value.lower() == "true": typed_value = True elif value.lower() == "false": typed_value = False elif value.isdigit(): typed_value = int(value) else: typed_value = value user_config.set(key, typed_value) # Mask sensitive values in output if ( any( substr in key.lower() for substr in ["key", "token", "secret", "password"] ) and value ): display_value = "*****" else: display_value = typed_value console.print(f"[green]Configuration updated: {key} = {display_value}[/green]") except Exception as e: console.print(f"[red]Error setting configuration: {e}[/red]") @config_app.command("reset") def reset_config( confirm: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"), ) -> None: """Reset configuration to defaults""" if not confirm: should_reset = typer.confirm( "Are you sure you want to reset all configuration to defaults?" ) if not should_reset: console.print("Reset canceled") return user_config.reset() console.print("[green]Configuration reset to defaults[/green]") # Network configuration commands @network_app.command("list") def list_networks() -> None: """List all default networks""" networks = user_config.get("defaults.networks", []) if not networks: console.print("No default networks configured") return table = Table(show_header=True, header_style="bold") table.add_column("Network") for network in networks: table.add_row(network) console.print(table) @network_app.command("add") def add_network( network: str = typer.Argument(..., help="Network name to add to defaults"), ) -> None: """Add a network to default networks""" networks = user_config.get("defaults.networks", []) if network in networks: console.print(f"Network '{network}' is already in defaults") return networks.append(network) user_config.set("defaults.networks", networks) console.print(f"[green]Added network '{network}' to defaults[/green]") @network_app.command("remove") def remove_network( network: str = typer.Argument(..., help="Network name to remove from defaults"), ) -> None: """Remove a network from default networks""" networks = user_config.get("defaults.networks", []) if network not in networks: console.print(f"Network '{network}' is not in defaults") return networks.remove(network) user_config.set("defaults.networks", networks) console.print(f"[green]Removed network '{network}' from defaults[/green]") # Volume configuration commands @volume_app.command("list") def list_volumes() -> None: """List all default volumes""" volumes = user_config.get("defaults.volumes", []) if not volumes: console.print("No default volumes configured") return table = Table(show_header=True, header_style="bold") table.add_column("Local Path") table.add_column("Container Path") for volume in volumes: if ":" in volume: local_path, container_path = volume.split(":", 1) table.add_row(local_path, container_path) else: table.add_row(volume, "[yellow]Invalid format[/yellow]") console.print(table) @volume_app.command("add") def add_volume( volume: str = typer.Argument( ..., help="Volume to add (format: LOCAL_PATH:CONTAINER_PATH)" ), ) -> None: """Add a volume to default volumes""" volumes = user_config.get("defaults.volumes", []) # Validate format if ":" not in volume: console.print( "[red]Invalid volume format. Use LOCAL_PATH:CONTAINER_PATH.[/red]" ) return local_path, container_path = volume.split(":", 1) # Convert to absolute path if relative if not os.path.isabs(local_path): local_path = os.path.abspath(local_path) volume = f"{local_path}:{container_path}" # Validate local path exists if not os.path.exists(local_path): console.print( f"[yellow]Warning: Local path '{local_path}' does not exist.[/yellow]" ) if not typer.confirm("Add anyway?"): return # Check if volume is already in defaults if volume in volumes: console.print(f"Volume '{volume}' is already in defaults") return volumes.append(volume) user_config.set("defaults.volumes", volumes) console.print(f"[green]Added volume '{volume}' to defaults[/green]") @volume_app.command("remove") def remove_volume( volume: str = typer.Argument( ..., help="Volume to remove (format: LOCAL_PATH:CONTAINER_PATH)" ), ) -> None: """Remove a volume from default volumes""" volumes = user_config.get("defaults.volumes", []) # Handle case where user provides just a prefix to match matching_volumes = [v for v in volumes if v.startswith(volume)] if not matching_volumes: console.print(f"No volumes matching '{volume}' found in defaults") return if len(matching_volumes) > 1: console.print(f"Multiple volumes match '{volume}':") for i, v in enumerate(matching_volumes): console.print(f" {i + 1}. {v}") index = typer.prompt( "Enter the number of the volume to remove (0 to cancel)", type=int ) if index == 0 or index > len(matching_volumes): console.print("Volume removal canceled") return volume_to_remove = matching_volumes[index - 1] else: volume_to_remove = matching_volumes[0] volumes.remove(volume_to_remove) user_config.set("defaults.volumes", volumes) console.print(f"[green]Removed volume '{volume_to_remove}' from defaults[/green]") # MCP Management Commands @mcp_app.command("list") def list_mcps() -> None: """List all configured MCP servers""" mcps = mcp_manager.list_mcps() if not mcps: console.print("No MCP servers configured") return # Create a table with the MCP information table = Table(show_header=True, header_style="bold") table.add_column("Name") table.add_column("Type") table.add_column("Status") table.add_column("Ports") table.add_column("Details") # Check status of each MCP for mcp in mcps: name = mcp.get("name", "") mcp_type = mcp.get("type", "") try: status_info = mcp_manager.get_mcp_status(name) status = status_info.get("status", "unknown") # Set status color based on status status_color = { "running": "green", "stopped": "red", "not_found": "yellow", "not_applicable": "blue", "failed": "red", }.get(status, "white") # Get port information ports_info = "" if mcp_type == "proxy" and status == "running": # For running proxy MCP, show the bound ports container_ports = status_info.get("ports", {}) if container_ports: port_mappings = [] for container_port, host_port in container_ports.items(): if host_port: port_mappings.append(f"{host_port}←{container_port}") if port_mappings: ports_info = ", ".join(port_mappings) # For non-running proxy MCP, show the configured host port if not ports_info and mcp_type == "proxy" and mcp.get("host_port"): sse_port = mcp.get("proxy_options", {}).get("sse_port", 8080) ports_info = f"{mcp.get('host_port')}←{sse_port}/tcp (configured)" # Different details based on MCP type if mcp_type == "remote": details = mcp.get("url", "") elif mcp_type == "docker": details = mcp.get("image", "") elif mcp_type == "proxy": details = ( f"{mcp.get('base_image', '')} (via {mcp.get('proxy_image', '')})" ) else: details = "" table.add_row( name, mcp_type, f"[{status_color}]{status}[/{status_color}]", ports_info, details, ) except Exception as e: table.add_row( name, mcp_type, "[red]error[/red]", "", # Empty ports column for error str(e), ) console.print(table) @mcp_app.command("status") def mcp_status(name: str = typer.Argument(..., help="MCP server name")) -> None: """Show detailed status of an MCP server""" try: # Get the MCP configuration mcp_config = mcp_manager.get_mcp(name) if not mcp_config: console.print(f"[red]MCP server '{name}' not found[/red]") return # Get status information status_info = mcp_manager.get_mcp_status(name) # Print detailed information console.print(f"[bold]MCP Server:[/bold] {name}") console.print(f"[bold]Type:[/bold] {mcp_config.get('type')}") status = status_info.get("status") status_color = { "running": "green", "stopped": "red", "not_found": "yellow", "not_applicable": "blue", "failed": "red", }.get(status, "white") console.print(f"[bold]Status:[/bold] [{status_color}]{status}[/{status_color}]") # Type-specific information if mcp_config.get("type") == "remote": console.print(f"[bold]URL:[/bold] {mcp_config.get('url')}") if mcp_config.get("headers"): console.print("[bold]Headers:[/bold]") for key, value in mcp_config.get("headers", {}).items(): # Mask sensitive headers if ( "token" in key.lower() or "key" in key.lower() or "auth" in key.lower() ): console.print(f" {key}: ****") else: console.print(f" {key}: {value}") elif mcp_config.get("type") in ["docker", "proxy"]: console.print(f"[bold]Image:[/bold] {status_info.get('image')}") if status_info.get("container_id"): console.print( f"[bold]Container ID:[/bold] {status_info.get('container_id')}" ) if status_info.get("ports"): console.print("[bold]Container Ports:[/bold]") for port, host_port in status_info.get("ports", {}).items(): if host_port: console.print( f" {port} -> [green]bound to host port {host_port}[/green]" ) else: console.print(f" {port} (internal only)") if status_info.get("created"): console.print(f"[bold]Created:[/bold] {status_info.get('created')}") # For proxy type, show additional information if mcp_config.get("type") == "proxy": console.print( f"[bold]Base Image:[/bold] {mcp_config.get('base_image')}" ) console.print( f"[bold]Proxy Image:[/bold] {mcp_config.get('proxy_image')}" ) # Show configured host port binding if mcp_config.get("host_port"): sse_port = mcp_config.get("proxy_options", {}).get("sse_port", 8080) console.print( f"[bold]Port Binding:[/bold] Container port {sse_port}/tcp -> Host port {mcp_config.get('host_port')}" ) console.print("[bold]Proxy Options:[/bold]") for key, value in mcp_config.get("proxy_options", {}).items(): console.print(f" {key}: {value}") except Exception as e: console.print(f"[red]Error getting MCP status: {e}[/red]") @mcp_app.command("start") def start_mcp( name: Optional[str] = typer.Argument(None, help="MCP server name"), all_servers: bool = typer.Option(False, "--all", help="Start all MCP servers"), ) -> None: """Start an MCP server or all servers""" # Check if we need to start all servers if all_servers: # Get all configured MCP servers mcps = mcp_manager.list_mcps() if not mcps: console.print("[yellow]No MCP servers configured[/yellow]") return # Count of successfully started servers started_count = 0 remote_count = 0 failed_count = 0 console.print(f"Starting {len(mcps)} MCP servers...") for mcp in mcps: mcp_name = mcp.get("name") if not mcp_name: continue try: with console.status(f"Starting MCP server '{mcp_name}'..."): result = mcp_manager.start_mcp(mcp_name) if result.get("status") == "running": console.print(f"[green]Started MCP server '{mcp_name}'[/green]") started_count += 1 elif result.get("status") == "not_applicable": console.print( f"[blue]MCP server '{mcp_name}' is a remote type (no container to start)[/blue]" ) remote_count += 1 else: console.print( f"MCP server '{mcp_name}' status: {result.get('status')}" ) failed_count += 1 except Exception as e: console.print(f"[red]Error starting MCP server '{mcp_name}': {e}[/red]") failed_count += 1 # Show a summary if started_count > 0: console.print( f"[green]Successfully started {started_count} MCP servers[/green]" ) if remote_count > 0: console.print( f"[blue]{remote_count} remote MCP servers (no action needed)[/blue]" ) if failed_count > 0: console.print(f"[red]Failed to start {failed_count} MCP servers[/red]") # Otherwise start a specific server elif name: try: with console.status(f"Starting MCP server '{name}'..."): result = mcp_manager.start_mcp(name) if result.get("status") == "running": console.print(f"[green]Started MCP server '{name}'[/green]") elif result.get("status") == "not_applicable": console.print( f"[blue]MCP server '{name}' is a remote type (no container to start)[/blue]" ) else: console.print(f"MCP server '{name}' status: {result.get('status')}") except Exception as e: console.print(f"[red]Error starting MCP server: {e}[/red]") else: console.print( "[red]Error: Please provide a server name or use --all to start all servers[/red]" ) @mcp_app.command("stop") def stop_mcp( name: Optional[str] = typer.Argument(None, help="MCP server name"), all_servers: bool = typer.Option(False, "--all", help="Stop all MCP servers"), ) -> None: """Stop an MCP server or all servers""" # Check if we need to stop all servers if all_servers: # Get all configured MCP servers mcps = mcp_manager.list_mcps() if not mcps: console.print("[yellow]No MCP servers configured[/yellow]") return # Count of successfully stopped servers stopped_count = 0 not_running_count = 0 failed_count = 0 console.print(f"Stopping {len(mcps)} MCP servers...") for mcp in mcps: mcp_name = mcp.get("name") if not mcp_name: continue try: with console.status(f"Stopping MCP server '{mcp_name}'..."): result = mcp_manager.stop_mcp(mcp_name) if result: console.print(f"[green]Stopped MCP server '{mcp_name}'[/green]") stopped_count += 1 else: console.print( f"[yellow]MCP server '{mcp_name}' was not running[/yellow]" ) not_running_count += 1 except Exception as e: console.print(f"[red]Error stopping MCP server '{mcp_name}': {e}[/red]") failed_count += 1 # Show a summary if stopped_count > 0: console.print( f"[green]Successfully stopped {stopped_count} MCP servers[/green]" ) if not_running_count > 0: console.print( f"[yellow]{not_running_count} MCP servers were not running[/yellow]" ) if failed_count > 0: console.print(f"[red]Failed to stop {failed_count} MCP servers[/red]") # Otherwise stop a specific server elif name: try: with console.status(f"Stopping MCP server '{name}'..."): result = mcp_manager.stop_mcp(name) if result: console.print(f"[green]Stopped MCP server '{name}'[/green]") else: console.print(f"[yellow]MCP server '{name}' was not running[/yellow]") except Exception as e: console.print(f"[red]Error stopping MCP server: {e}[/red]") else: console.print( "[red]Error: Please provide a server name or use --all to stop all servers[/red]" ) @mcp_app.command("restart") def restart_mcp( name: Optional[str] = typer.Argument(None, help="MCP server name"), all_servers: bool = typer.Option(False, "--all", help="Restart all MCP servers"), ) -> None: """Restart an MCP server or all servers""" # Check if we need to restart all servers if all_servers: # Get all configured MCP servers mcps = mcp_manager.list_mcps() if not mcps: console.print("[yellow]No MCP servers configured[/yellow]") return # Count of successfully restarted servers restarted_count = 0 remote_count = 0 failed_count = 0 console.print(f"Restarting {len(mcps)} MCP servers...") for mcp in mcps: mcp_name = mcp.get("name") if not mcp_name: continue try: with console.status(f"Restarting MCP server '{mcp_name}'..."): result = mcp_manager.restart_mcp(mcp_name) if result.get("status") == "running": console.print(f"[green]Restarted MCP server '{mcp_name}'[/green]") restarted_count += 1 elif result.get("status") == "not_applicable": console.print( f"[blue]MCP server '{mcp_name}' is a remote type (no container to restart)[/blue]" ) remote_count += 1 else: console.print( f"MCP server '{mcp_name}' status: {result.get('status')}" ) failed_count += 1 except Exception as e: console.print( f"[red]Error restarting MCP server '{mcp_name}': {e}[/red]" ) failed_count += 1 # Show a summary if restarted_count > 0: console.print( f"[green]Successfully restarted {restarted_count} MCP servers[/green]" ) if remote_count > 0: console.print( f"[blue]{remote_count} remote MCP servers (no action needed)[/blue]" ) if failed_count > 0: console.print(f"[red]Failed to restart {failed_count} MCP servers[/red]") # Otherwise restart a specific server elif name: try: with console.status(f"Restarting MCP server '{name}'..."): result = mcp_manager.restart_mcp(name) if result.get("status") == "running": console.print(f"[green]Restarted MCP server '{name}'[/green]") elif result.get("status") == "not_applicable": console.print( f"[blue]MCP server '{name}' is a remote type (no container to restart)[/blue]" ) else: console.print(f"MCP server '{name}' status: {result.get('status')}") except Exception as e: console.print(f"[red]Error restarting MCP server: {e}[/red]") else: console.print( "[red]Error: Please provide a server name or use --all to restart all servers[/red]" ) @mcp_app.command("logs") def mcp_logs( name: str = typer.Argument(..., help="MCP server name"), tail: int = typer.Option(100, "--tail", "-n", help="Number of lines to show"), ) -> None: """Show logs from an MCP server""" try: logs = mcp_manager.get_mcp_logs(name, tail=tail) console.print(logs) except Exception as e: console.print(f"[red]Error getting MCP logs: {e}[/red]") @mcp_app.command("remove") def remove_mcp(name: str = typer.Argument(..., help="MCP server name")) -> None: """Remove an MCP server configuration""" try: with console.status(f"Removing MCP server '{name}'..."): result = mcp_manager.remove_mcp(name) if result: console.print(f"[green]Removed MCP server '{name}'[/green]") else: console.print(f"[yellow]MCP server '{name}' not found[/yellow]") except Exception as e: console.print(f"[red]Error removing MCP server: {e}[/red]") @mcp_app.command("add") def add_mcp( name: str = typer.Argument(..., help="MCP server name"), base_image: str = typer.Argument(..., help="Base MCP Docker image"), proxy_image: str = typer.Option( "ghcr.io/sparfenyuk/mcp-proxy:latest", "--proxy-image", help="Proxy image for MCP", ), command: str = typer.Option( "", "--command", "-c", help="Command to run in the container" ), sse_port: int = typer.Option( 8080, "--sse-port", help="Port for SSE server inside container" ), sse_host: str = typer.Option("0.0.0.0", "--sse-host", help="Host for SSE server"), allow_origin: str = typer.Option( "*", "--allow-origin", help="CORS allow-origin header" ), host_port: Optional[int] = typer.Option( None, "--host-port", "-p", help="Host port to bind the MCP server to (auto-assigned if not specified)", ), env: List[str] = typer.Option( [], "--env", "-e", help="Environment variables (format: KEY=VALUE)" ), ) -> None: """Add a proxy-based MCP server (default type)""" # Parse environment variables environment = {} for var in env: if "=" in var: key, value = var.split("=", 1) environment[key] = value else: console.print( f"[yellow]Warning: Ignoring invalid environment variable format: {var}[/yellow]" ) # Prepare proxy options proxy_options = { "sse_port": sse_port, "sse_host": sse_host, "allow_origin": allow_origin, } try: with console.status(f"Adding MCP server '{name}'..."): result = mcp_manager.add_proxy_mcp( name, base_image, proxy_image, command, proxy_options, environment, host_port, ) # Get the assigned port assigned_port = result.get("host_port") console.print(f"[green]Added MCP server '{name}'[/green]") if assigned_port: console.print( f"Container port {sse_port} will be bound to host port {assigned_port}" ) except Exception as e: console.print(f"[red]Error adding MCP server: {e}[/red]") @mcp_app.command("add-remote") def add_remote_mcp( name: str = typer.Argument(..., help="MCP server name"), url: str = typer.Argument(..., help="URL of the remote MCP server"), header: List[str] = typer.Option( [], "--header", "-H", help="HTTP headers (format: KEY=VALUE)" ), ) -> None: """Add a remote MCP server""" # Parse headers headers = {} for h in header: if "=" in h: key, value = h.split("=", 1) headers[key] = value else: console.print( f"[yellow]Warning: Ignoring invalid header format: {h}[/yellow]" ) try: with console.status(f"Adding remote MCP server '{name}'..."): mcp_manager.add_remote_mcp(name, url, headers) console.print(f"[green]Added remote MCP server '{name}'[/green]") except Exception as e: console.print(f"[red]Error adding remote MCP server: {e}[/red]") @mcp_app.command("inspector") def run_mcp_inspector( host_port: int = typer.Option( 5173, "--port", "-p", help="Host port for the MCP Inspector" ), detach: bool = typer.Option(False, "--detach", "-d", help="Run in detached mode"), stop: bool = typer.Option(False, "--stop", help="Stop running MCP Inspector(s)"), ) -> None: """Run the MCP Inspector to visualize and debug MCP servers""" import docker import time # Get Docker client quietly try: client = docker.from_env() except Exception as e: console.print(f"[red]Error connecting to Docker: {e}[/red]") return # If stop flag is set, stop all running MCP Inspectors if stop: containers = client.containers.list( all=True, filters={"label": "mc.mcp.inspector=true"} ) if not containers: console.print("[yellow]No running MCP Inspector instances found[/yellow]") return with console.status("Stopping MCP Inspector..."): for container in containers: try: container.stop() container.remove(force=True) except Exception: pass console.print("[green]MCP Inspector stopped[/green]") return # Check if inspector is already running all_inspectors = client.containers.list( all=True, filters={"label": "mc.mcp.inspector=true"} ) # Stop any existing inspectors first for inspector in all_inspectors: try: if inspector.status == "running": inspector.stop(timeout=1) inspector.remove(force=True) except Exception as e: console.print( f"[yellow]Warning: Could not remove existing inspector: {e}[/yellow]" ) # Check if the specified port is already in use import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.bind(("0.0.0.0", host_port)) s.close() except socket.error: console.print( f"[red]Error: Port {host_port} is already in use by another process.[/red]" ) console.print("Please stop any web servers or other processes using this port.") console.print("You can try a different port with --port option") return # Container name with timestamp to avoid conflicts container_name = f"mc_mcp_inspector_{int(time.time())}" with console.status("Starting MCP Inspector..."): # Get MCP servers from configuration all_mcps = mcp_manager.list_mcps() # Get all MCP server URLs (including remote ones) mcp_servers = [] # Collect networks that need to be connected to the Inspector mcp_networks_to_connect = [] # Add remote MCP servers for mcp in all_mcps: if mcp.get("type") == "remote": url = mcp.get("url", "") headers = mcp.get("headers", {}) if url: mcp_servers.append( { "name": mcp.get("name", "Remote MCP"), "url": url, "headers": headers, } ) # Process container-based MCP servers from the configuration for mcp in all_mcps: # We only need to connect to container-based MCPs if mcp.get("type") in ["docker", "proxy"]: mcp_name = mcp.get("name") try: # Get the container name for this MCP container_name = f"mc_mcp_{mcp_name}" container = None # Try to find the container try: container = client.containers.get(container_name) except docker.errors.NotFound: console.print( f"[yellow]Warning: Container for MCP '{mcp_name}' not found[/yellow]" ) continue if container and container.status == "running": # Find all networks this MCP container is connected to for network_name, network_info in ( container.attrs.get("NetworkSettings", {}) .get("Networks", {}) .items() ): # Don't add default bridge network - it doesn't support DNS resolution # Also avoid duplicate networks if ( network_name != "bridge" and network_name not in mcp_networks_to_connect ): mcp_networks_to_connect.append(network_name) # For proxy type, get the SSE port from the config port = "8080" # Default MCP proxy SSE port if mcp.get("type") == "proxy" and "proxy_options" in mcp: port = str( mcp.get("proxy_options", {}).get("sse_port", "8080") ) # Add container-based MCP server URL using just the MCP name as the hostname # This works because we join all networks and the MCP containers have aliases mcp_servers.append( { "name": mcp_name, "url": f"http://{mcp_name}:{port}", "headers": {}, } ) except Exception as e: console.print( f"[yellow]Warning: Error processing MCP '{mcp_name}': {str(e)}[/yellow]" ) # Make sure we have at least one network to connect to if not mcp_networks_to_connect: # Create an MCP-specific network if none exists network_name = "mc-mcp-network" console.print("No MCP networks found, creating a default one") try: networks = client.networks.list(names=[network_name]) if not networks: client.networks.create(network_name, driver="bridge") mcp_networks_to_connect.append(network_name) except Exception as e: console.print( f"[yellow]Warning: Could not create default network: {str(e)}[/yellow]" ) # Pull the image if needed (silently) try: client.images.get("mcp/inspector") except docker.errors.ImageNotFound: client.images.pull("mcp/inspector") try: # Create a custom entrypoint to handle the localhost binding issue and auto-connect to MCP servers script_content = """#!/bin/sh # This script modifies the Express server to bind to all interfaces # Try to find the CLI script CLI_FILE=$(find /app -name "cli.js" | grep -v node_modules | head -1) if [ -z "$CLI_FILE" ]; then echo "Could not find CLI file. Trying common locations..." for path in "/app/client/bin/cli.js" "/app/bin/cli.js" "./client/bin/cli.js" "./bin/cli.js"; do if [ -f "$path" ]; then CLI_FILE="$path" break fi done fi if [ -z "$CLI_FILE" ]; then echo "ERROR: Could not find the MCP Inspector CLI file." exit 1 fi echo "Found CLI file at: $CLI_FILE" # Make a backup of the original file cp "$CLI_FILE" "$CLI_FILE.bak" # Modify the file to use 0.0.0.0 as the host sed -i 's/app.listen(PORT/app.listen(PORT, "0.0.0.0"/g' "$CLI_FILE" sed -i 's/server.listen(port/server.listen(port, "0.0.0.0"/g' "$CLI_FILE" sed -i 's/listen(PORT/listen(PORT, "0.0.0.0"/g' "$CLI_FILE" echo "Modified server to listen on all interfaces (0.0.0.0)" # Start the MCP Inspector echo "Starting MCP Inspector on all interfaces..." exec npm start """ # Write the script to a temp file script_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "mc_inspector_entrypoint.sh" ) with open(script_path, "w") as f: f.write(script_content) os.chmod(script_path, 0o755) # Use the script as the entrypoint # The entrypoint is directly specified in the container.run() call below # Run the MCP Inspector container - use the first network initially initial_network = ( mcp_networks_to_connect[0] if mcp_networks_to_connect else "bridge" ) console.print(f"Starting Inspector on network: {initial_network}") # Check if existing container with the same name exists, and remove it try: existing = client.containers.get("mc_mcp_inspector") if existing.status == "running": existing.stop(timeout=1) existing.remove(force=True) console.print("Removed existing MCP Inspector container") except docker.errors.NotFound: pass except Exception as e: console.print( f"[yellow]Warning: Error removing existing container: {e}[/yellow]" ) # Create network config with just the inspector alias for the initial network network_config = { initial_network: { "aliases": [ "inspector" ] # Allow container to be reached as just "inspector" } } # Log MCP servers that are in the initial network initial_mcp_containers = [] for mcp in all_mcps: if mcp.get("type") in ["docker", "proxy"]: mcp_name = mcp.get("name") container_name = f"mc_mcp_{mcp_name}" try: # Check if this container exists mcp_container = client.containers.get(container_name) # Check if it's in the initial network if initial_network in mcp_container.attrs.get( "NetworkSettings", {} ).get("Networks", {}): initial_mcp_containers.append(mcp_name) except Exception: pass if initial_mcp_containers: console.print( f"MCP servers in initial network: {', '.join(initial_mcp_containers)}" ) container = client.containers.run( image="mcp/inspector", name="mc_mcp_inspector", # Use a fixed name detach=True, network=initial_network, ports={ "5173/tcp": host_port, # Map container port 5173 to host port (frontend) "3000/tcp": 3000, # Map container port 3000 to host port 3000 (backend) }, environment={ "SERVER_PORT": "3000", # Tell the server to use port 3000 (default) }, volumes={ script_path: { "bind": "/entrypoint.sh", "mode": "ro", } }, entrypoint="/entrypoint.sh", labels={ "mc.mcp.inspector": "true", "mc.managed": "true", }, network_mode=None, # Don't use network_mode as we're using network with aliases networking_config=client.api.create_networking_config(network_config), ) # Connect to all additional MCP networks if len(mcp_networks_to_connect) > 1: # Get the networks the container is already connected to container_networks = list( container.attrs["NetworkSettings"]["Networks"].keys() ) for network_name in mcp_networks_to_connect[ 1: ]: # Skip the first one that we already connected to # Skip if already connected to this network if network_name in container_networks: console.print( f"Inspector already connected to network: {network_name}" ) continue try: console.print( f"Connecting Inspector to additional network: {network_name}" ) network = client.networks.get(network_name) # Get all MCP containers in this network mcp_containers = [] # Find all MCP containers that are in this network for mcp in all_mcps: if mcp.get("type") in ["docker", "proxy"]: mcp_name = mcp.get("name") container_name = f"mc_mcp_{mcp_name}" try: # Check if this container exists mcp_container = client.containers.get( container_name ) # Check if it's in the current network if network_name in mcp_container.attrs.get( "NetworkSettings", {} ).get("Networks", {}): mcp_containers.append(mcp_name) except Exception: pass # Connect the inspector with the inspector alias and the individual MCP server aliases network.connect(container, aliases=["inspector"]) console.print(f" Added inspector to network {network_name}") if mcp_containers: console.print( f" MCP servers in this network: {', '.join(mcp_containers)}" ) except Exception as e: console.print( f"[yellow]Warning: Could not connect Inspector to network {network_name}: {str(e)}[/yellow]" ) # Wait a moment for the container to start properly time.sleep(1) except Exception as e: console.print(f"[red]Error running MCP Inspector: {e}[/red]") # Try to clean up try: client.containers.get(container_name).remove(force=True) except Exception: pass return console.print("[bold]MCP Inspector is available at:[/bold]") console.print(f"- Frontend: http://localhost:{host_port}") console.print("- Backend API: http://localhost:3000") if len(mcp_servers) > 0: console.print( f"[green]Auto-connected to {len(mcp_servers)} MCP servers[/green]" ) else: console.print( "[yellow]Warning: No MCP servers found or started. The Inspector will run but won't have any servers to connect to.[/yellow]" ) console.print( "Start MCP servers using 'mc mcp start --all' and then restart the Inspector." ) if not detach: try: console.print("[yellow]Press Ctrl+C to stop the MCP Inspector...[/yellow]") for line in container.logs(stream=True): console.print(line.decode().strip()) except KeyboardInterrupt: with console.status("Stopping MCP Inspector..."): container.stop() container.remove(force=True) console.print("[green]MCP Inspector stopped[/green]") if __name__ == "__main__": app()