import os import sys import uuid import docker import hashlib import pathlib import concurrent.futures import logging from typing import Dict, List, Optional, Tuple from docker.errors import DockerException, ImageNotFound from .models import Session, SessionStatus from .config import ConfigManager from .session import SessionManager from .mcp import MCPManager from .user_config import UserConfigManager # Configure logging logger = logging.getLogger(__name__) class ContainerManager: def __init__( self, config_manager: Optional[ConfigManager] = None, session_manager: Optional[SessionManager] = None, user_config_manager: Optional[UserConfigManager] = None, ): self.config_manager = config_manager or ConfigManager() self.session_manager = session_manager or SessionManager() self.user_config_manager = user_config_manager or UserConfigManager() self.mcp_manager = MCPManager(config_manager=self.user_config_manager) try: self.client = docker.from_env() # Test connection self.client.ping() except DockerException as e: logger.error(f"Error connecting to Docker: {e}") print(f"Error connecting to Docker: {e}") sys.exit(1) def _ensure_network(self) -> None: """Ensure the MC network exists""" network_name = self.config_manager.config.docker.get("network", "mc-network") networks = self.client.networks.list(names=[network_name]) if not networks: self.client.networks.create(network_name, driver="bridge") def _generate_session_id(self) -> str: """Generate a unique session ID""" return str(uuid.uuid4())[:8] def _get_project_config_path(self, project: Optional[str] = None) -> pathlib.Path: """Get the path to the project configuration directory Args: project: Optional project repository URL. If None, uses current directory. Returns: Path to the project configuration directory """ # Get home directory for the MC config mc_home = pathlib.Path.home() / ".mc" # If no project URL is provided, use the current directory path if not project: # Use current working directory as project identifier project_id = os.getcwd() else: # Use project URL as identifier project_id = project # Create a hash of the project ID to use as directory name project_hash = hashlib.md5(project_id.encode()).hexdigest() # Create the project config directory path config_path = mc_home / "projects" / project_hash / "config" # Create the directory if it doesn't exist config_path.parent.mkdir(parents=True, exist_ok=True) config_path.mkdir(exist_ok=True) return config_path def list_sessions(self) -> List[Session]: """List all active MC sessions""" sessions = [] try: containers = self.client.containers.list( all=True, filters={"label": "mc.session"} ) for container in containers: container_id = container.id labels = container.labels session_id = labels.get("mc.session.id") if not session_id: continue status = SessionStatus.RUNNING if container.status == "exited": status = SessionStatus.STOPPED elif container.status == "created": status = SessionStatus.CREATING session = Session( id=session_id, name=labels.get("mc.session.name", f"mc-{session_id}"), driver=labels.get("mc.driver", "unknown"), status=status, container_id=container_id, created_at=container.attrs["Created"], project=labels.get("mc.project"), ) # Get port mappings if container.attrs.get("NetworkSettings", {}).get("Ports"): ports = {} for container_port, host_ports in container.attrs[ "NetworkSettings" ]["Ports"].items(): if host_ports: # Strip /tcp or /udp suffix and convert to int container_port_num = int(container_port.split("/")[0]) host_port = int(host_ports[0]["HostPort"]) ports[container_port_num] = host_port session.ports = ports sessions.append(session) except DockerException as e: print(f"Error listing sessions: {e}") return sessions def create_session( self, driver_name: str, project: Optional[str] = None, environment: Optional[Dict[str, str]] = None, session_name: Optional[str] = None, mount_local: bool = False, volumes: Optional[Dict[str, Dict[str, str]]] = None, networks: Optional[List[str]] = None, mcp: Optional[List[str]] = None, uid: Optional[int] = None, gid: Optional[int] = None, ) -> Optional[Session]: """Create a new MC session Args: driver_name: The name of the driver to use project: Optional project repository URL or local directory path environment: Optional environment variables session_name: Optional session name mount_local: Whether to mount the specified local directory to /app (ignored if project is None) volumes: Optional additional volumes to mount (dict of {host_path: {"bind": container_path, "mode": mode}}) networks: Optional list of additional Docker networks to connect to mcp: Optional list of MCP server names to attach to the session uid: Optional user ID for the container process gid: Optional group ID for the container process """ try: # Validate driver exists driver = self.config_manager.get_driver(driver_name) if not driver: print(f"Driver '{driver_name}' not found") return None # Generate session ID and name session_id = self._generate_session_id() if not session_name: session_name = f"mc-{session_id}" # Ensure network exists self._ensure_network() # Prepare environment variables env_vars = environment or {} # Add TARGET_UID and TARGET_GID for entrypoint script if uid is not None: env_vars["TARGET_UID"] = str(uid) if gid is not None: env_vars["TARGET_GID"] = str(gid) # Add project URL to environment if provided if project: env_vars["MC_PROJECT_URL"] = project # Pass API keys from host environment to container for local development api_keys = [ "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "OPENROUTER_API_KEY", "LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "LANGFUSE_INIT_PROJECT_SECRET_KEY", "LANGFUSE_URL", ] for key in api_keys: if key in os.environ and key not in env_vars: env_vars[key] = os.environ[key] # Pull image if needed try: self.client.images.get(driver.image) except ImageNotFound: print(f"Pulling image {driver.image}...") self.client.images.pull(driver.image) # Set up volume mounts session_volumes = {} # Determine if project is a local directory or a Git repository is_local_directory = False is_git_repo = False if project: # Check if project is a local directory if os.path.isdir(os.path.expanduser(project)): is_local_directory = True else: # If not a local directory, assume it's a Git repo URL is_git_repo = True # Handle mounting based on project type if is_local_directory and mount_local: # Mount the specified local directory to /app in the container local_dir = os.path.abspath(os.path.expanduser(project)) session_volumes[local_dir] = {"bind": "/app", "mode": "rw"} print(f"Mounting local directory {local_dir} to /app") # Clear project for container environment since we're mounting project = None elif is_git_repo: print( f"Git repository URL provided - container will clone {project} into /app during initialization" ) # Add user-specified volumes if volumes: for host_path, mount_spec in volumes.items(): container_path = mount_spec["bind"] # Check for conflicts with /app mount if container_path == "/app" and is_local_directory and mount_local: print( "[yellow]Warning: Volume mount to /app conflicts with local directory mount. User-specified mount takes precedence.[/yellow]" ) # Remove the local directory mount if there's a conflict if local_dir in session_volumes: del session_volumes[local_dir] # Add the volume session_volumes[host_path] = mount_spec print(f"Mounting volume: {host_path} -> {container_path}") # Set up persistent project configuration project_config_path = self._get_project_config_path(project) print(f"Using project configuration directory: {project_config_path}") # Mount the project configuration directory session_volumes[str(project_config_path)] = { "bind": "/mc-config", "mode": "rw", } # Add environment variables for config path env_vars["MC_CONFIG_DIR"] = "/mc-config" env_vars["MC_DRIVER_CONFIG_DIR"] = f"/mc-config/{driver_name}" # Create driver-specific config directories and set up direct volume mounts if driver.persistent_configs: print("Setting up persistent configuration directories:") for config in driver.persistent_configs: # Get target directory path on host target_dir = project_config_path / config.target.removeprefix( "/mc-config/" ) # Create directory if it's a directory type config if config.type == "directory": dir_existed = target_dir.exists() target_dir.mkdir(parents=True, exist_ok=True) if not dir_existed: print(f" - Created directory: {target_dir}") # For files, make sure parent directory exists elif config.type == "file": target_dir.parent.mkdir(parents=True, exist_ok=True) # File will be created by the container if needed # Mount persistent config directly to container path session_volumes[str(target_dir)] = { "bind": config.source, "mode": "rw", } print( f" - Created direct volume mount: {target_dir} -> {config.source}" ) # Default MC network default_network = self.config_manager.config.docker.get( "network", "mc-network" ) # Get network list network_list = [default_network] # Process MCPs if provided mcp_configs = [] mcp_names = [] mcp_container_names = [] # Ensure MCP is a list mcps_to_process = mcp if isinstance(mcp, list) else [] # Process each MCP for mcp_name in mcps_to_process: # Get the MCP configuration mcp_config = self.mcp_manager.get_mcp(mcp_name) if not mcp_config: print(f"Warning: MCP server '{mcp_name}' not found, skipping") continue # Add to the list of processed MCPs mcp_configs.append(mcp_config) mcp_names.append(mcp_name) # Check if the MCP server is running (for Docker-based MCPs) if mcp_config.get("type") in ["docker", "proxy"]: # Ensure the MCP is running try: print(f"Ensuring MCP server '{mcp_name}' is running...") self.mcp_manager.start_mcp(mcp_name) # Store container name for later network connection container_name = self.mcp_manager.get_mcp_container_name( mcp_name ) mcp_container_names.append(container_name) # Get MCP status to extract endpoint information mcp_status = self.mcp_manager.get_mcp_status(mcp_name) # Add MCP environment variables with index idx = len(mcp_names) - 1 # 0-based index for the current MCP if mcp_config.get("type") == "remote": # For remote MCP, set the URL and headers env_vars[f"MCP_{idx}_URL"] = mcp_config.get("url") if mcp_config.get("headers"): # Serialize headers as JSON import json env_vars[f"MCP_{idx}_HEADERS"] = json.dumps( mcp_config.get("headers") ) else: # For Docker/proxy MCP, set the connection details # Use both the container name and the short name for internal Docker DNS resolution container_name = self.mcp_manager.get_mcp_container_name( mcp_name ) # Use the short name (mcp_name) as the primary hostname env_vars[f"MCP_{idx}_HOST"] = mcp_name # Default port is 8080 unless specified in status port = next( iter(mcp_status.get("ports", {}).values()), 8080 ) env_vars[f"MCP_{idx}_PORT"] = str(port) # Use the short name in the URL to take advantage of the network alias env_vars[f"MCP_{idx}_URL"] = f"http://{mcp_name}:{port}/sse" # For backward compatibility, also set the full container name URL env_vars[f"MCP_{idx}_CONTAINER_URL"] = ( f"http://{container_name}:{port}/sse" ) # Set type-specific information env_vars[f"MCP_{idx}_TYPE"] = mcp_config.get("type") env_vars[f"MCP_{idx}_NAME"] = mcp_name except Exception as e: print(f"Warning: Failed to start MCP server '{mcp_name}': {e}") # Remove from the container names list if failed if container_name in mcp_container_names: mcp_container_names.remove(container_name) elif mcp_config.get("type") == "remote": # For remote MCP, just set environment variables idx = len(mcp_names) - 1 # 0-based index for the current MCP env_vars[f"MCP_{idx}_URL"] = mcp_config.get("url") if mcp_config.get("headers"): # Serialize headers as JSON import json env_vars[f"MCP_{idx}_HEADERS"] = json.dumps( mcp_config.get("headers") ) # Set type-specific information env_vars[f"MCP_{idx}_TYPE"] = "remote" env_vars[f"MCP_{idx}_NAME"] = mcp_name # Set environment variables for MCP count if we have any if mcp_names: env_vars["MCP_COUNT"] = str(len(mcp_names)) env_vars["MCP_ENABLED"] = "true" # Serialize all MCP names as JSON import json env_vars["MCP_NAMES"] = json.dumps(mcp_names) # Add user-specified networks if networks: for network in networks: if network not in network_list: network_list.append(network) print(f"Adding network {network} to session") # Create container container = self.client.containers.create( image=driver.image, name=session_name, hostname=session_name, detach=True, tty=True, stdin_open=True, environment=env_vars, volumes=session_volumes, labels={ "mc.session": "true", "mc.session.id": session_id, "mc.session.name": session_name, "mc.driver": driver_name, "mc.project": project or "", "mc.mcps": ",".join(mcp_names) if mcp_names else "", }, network=network_list[0], # Connect to the first network initially ports={f"{port}/tcp": None for port in driver.ports}, ) # Start container container.start() # Connect to additional networks (after the first one in network_list) if len(network_list) > 1: for network_name in network_list[1:]: try: # Get or create the network try: network = self.client.networks.get(network_name) except DockerException: print(f"Network '{network_name}' not found, creating it...") network = self.client.networks.create( network_name, driver="bridge" ) # Connect the container to the network with session name as an alias network.connect(container, aliases=[session_name]) print( f"Connected to network: {network_name} with alias: {session_name}" ) except DockerException as e: print(f"Error connecting to network {network_name}: {e}") # Reload the container to get updated network information container.reload() # Connect directly to each MCP's dedicated network for mcp_name in mcp_names: try: # Get the dedicated network for this MCP dedicated_network_name = f"mc-mcp-{mcp_name}-network" try: network = self.client.networks.get(dedicated_network_name) # Connect the session container to the MCP's dedicated network network.connect(container, aliases=[session_name]) print( f"Connected session to MCP '{mcp_name}' via dedicated network: {dedicated_network_name}" ) except DockerException as e: print( f"Error connecting to MCP dedicated network '{dedicated_network_name}': {e}" ) except Exception as e: print(f"Error connecting session to MCP '{mcp_name}': {e}") # Connect to additional user-specified networks if networks: for network_name in networks: # Check if already connected to this network # NetworkSettings.Networks contains a dict where keys are network names existing_networks = ( container.attrs.get("NetworkSettings", {}) .get("Networks", {}) .keys() ) if network_name not in existing_networks: try: # Get or create the network try: network = self.client.networks.get(network_name) except DockerException: print( f"Network '{network_name}' not found, creating it..." ) network = self.client.networks.create( network_name, driver="bridge" ) # Connect the container to the network with session name as an alias network.connect(container, aliases=[session_name]) print( f"Connected to network: {network_name} with alias: {session_name}" ) except DockerException as e: print(f"Error connecting to network {network_name}: {e}") # Get updated port information container.reload() ports = {} if container.attrs.get("NetworkSettings", {}).get("Ports"): for container_port, host_ports in container.attrs["NetworkSettings"][ "Ports" ].items(): if host_ports: container_port_num = int(container_port.split("/")[0]) host_port = int(host_ports[0]["HostPort"]) ports[container_port_num] = host_port # Create session object session = Session( id=session_id, name=session_name, driver=driver_name, status=SessionStatus.RUNNING, container_id=container.id, environment=env_vars, project=project, created_at=container.attrs["Created"], ports=ports, mcps=mcp_names, # Assuming Session model has uid and gid fields uid=uid, gid=gid, ) # Save session to the session manager as JSON-compatible dict # Assuming Session model has uid and gid fields added to its definition session_data_to_save = session.model_dump(mode="json") session_data_to_save["uid"] = uid session_data_to_save["gid"] = gid self.session_manager.add_session(session_id, session_data_to_save) return session except DockerException as e: print(f"Error creating session: {e}") return None def close_session(self, session_id: str) -> bool: """Close a MC session""" try: sessions = self.list_sessions() for session in sessions: if session.id == session_id: return self._close_single_session(session) print(f"Session '{session_id}' not found") return False except DockerException as e: print(f"Error closing session: {e}") return False def connect_session(self, session_id: str) -> bool: """Connect to a running MC session""" # Retrieve full session data which should include uid/gid session_data = self.session_manager.get_session(session_id) if not session_data: print(f"Session '{session_id}' not found in session manager.") # Fallback: try listing via Docker labels if session data is missing sessions = self.list_sessions() session_obj = next((s for s in sessions if s.id == session_id), None) if not session_obj or not session_obj.container_id: print(f"Session '{session_id}' not found via Docker either.") return False container_id = session_obj.container_id # Cannot determine user if session data is missing user_spec = None print( f"[yellow]Warning: Session data missing for {session_id}. Connecting as default container user.[/yellow]" ) else: container_id = session_data.get("container_id") if not container_id: print(f"Container ID not found for session {session_id}.") return False # Check status from Docker directly try: container = self.client.containers.get(container_id) if container.status != "running": print( f"Session '{session_id}' container is not running (status: {container.status})." ) return False except docker.errors.NotFound: print(f"Container {container_id} for session {session_id} not found.") # Clean up potentially stale session data self.session_manager.remove_session(session_id) return False except DockerException as e: print(f"Error checking container status for session {session_id}: {e}") return False # Determine user spec from stored session data uid = session_data.get("uid") gid = session_data.get("gid") user_spec = f"{uid}:{gid}" if uid is not None and gid is not None else None try: # Execute interactive shell in container cmd = ["docker", "exec", "-it"] if user_spec: cmd.extend(["--user", user_spec]) print(f"Connecting as user {user_spec}...") else: print("Connecting as default container user...") cmd.extend([container_id, "/bin/bash"]) # Use execvp to replace the current process with docker exec # This provides a more seamless shell experience os.execvp("docker", cmd) # execvp does not return if successful return True # Should not be reached if execvp succeeds except FileNotFoundError: print( "[red]Error: 'docker' command not found. Is Docker installed and in your PATH?[/red]" ) return False except DockerException as e: print(f"Error connecting to session: {e}") return False def _close_single_session(self, session: Session) -> bool: """Close a single session (helper for parallel processing) Args: session: The session to close Returns: bool: Whether the session was successfully closed """ if not session.container_id: return False try: container = self.client.containers.get(session.container_id) container.stop() container.remove() self.session_manager.remove_session(session.id) return True except DockerException as e: print(f"Error closing session {session.id}: {e}") return False def close_all_sessions(self, progress_callback=None) -> Tuple[int, bool]: """Close all MC sessions with parallel processing and progress reporting Args: progress_callback: Optional callback function to report progress The callback should accept (session_id, status, message) Returns: tuple: (number of sessions closed, success) """ try: sessions = self.list_sessions() if not sessions: return 0, True # No need for session status as we receive it via callback # Define a wrapper to track progress def close_with_progress(session): if not session.container_id: return False try: container = self.client.containers.get(session.container_id) # Stop and remove container container.stop() container.remove() # Remove from session storage self.session_manager.remove_session(session.id) # Notify about completion if progress_callback: progress_callback( session.id, "completed", f"{session.name} closed successfully", ) return True except DockerException as e: error_msg = f"Error: {str(e)}" if progress_callback: progress_callback(session.id, "failed", error_msg) print(f"Error closing session {session.id}: {e}") return False # Use ThreadPoolExecutor to close sessions in parallel with concurrent.futures.ThreadPoolExecutor( max_workers=min(10, len(sessions)) ) as executor: # Submit all session closing tasks future_to_session = { executor.submit(close_with_progress, session): session for session in sessions } # Collect results closed_count = 0 for future in concurrent.futures.as_completed(future_to_session): session = future_to_session[future] try: success = future.result() if success: closed_count += 1 except Exception as e: print(f"Error closing session {session.id}: {e}") return closed_count, closed_count > 0 except DockerException as e: print(f"Error closing all sessions: {e}") return 0, False def get_session_logs(self, session_id: str, follow: bool = False) -> Optional[str]: """Get logs from a MC session""" try: sessions = self.list_sessions() for session in sessions: if session.id == session_id and session.container_id: container = self.client.containers.get(session.container_id) if follow: for line in container.logs(stream=True, follow=True): print(line.decode().strip()) return None else: return container.logs().decode() print(f"Session '{session_id}' not found") return None except DockerException as e: print(f"Error getting session logs: {e}") return None def get_init_logs(self, session_id: str, follow: bool = False) -> Optional[str]: """Get initialization logs from a MC session Args: session_id: The session ID follow: Whether to follow the logs Returns: The logs as a string, or None if there was an error """ try: sessions = self.list_sessions() for session in sessions: if session.id == session_id and session.container_id: container = self.client.containers.get(session.container_id) # Check if initialization is complete init_complete = False try: exit_code, output = container.exec_run( "grep -q 'INIT_COMPLETE=true' /init.status" ) init_complete = exit_code == 0 except DockerException: pass if follow and not init_complete: print( f"Following initialization logs for session {session_id}..." ) print("Press Ctrl+C to stop following") container.exec_run( "tail -f /init.log", stream=True, demux=True, tty=True ) return None else: exit_code, output = container.exec_run("cat /init.log") if exit_code == 0: return output.decode() else: print("No initialization logs found") return None print(f"Session '{session_id}' not found") return None except DockerException as e: print(f"Error getting initialization logs: {e}") return None