Files
cubbi/mcontainer/container.py

485 lines
18 KiB
Python

import os
import sys
import uuid
import docker
import hashlib
import pathlib
import concurrent.futures
from typing import Dict, List, Optional, Tuple
from docker.errors import DockerException, ImageNotFound
from .models import Session, SessionStatus
from .config import ConfigManager
class ContainerManager:
def __init__(self, config_manager: Optional[ConfigManager] = None):
self.config_manager = config_manager or ConfigManager()
try:
self.client = docker.from_env()
# Test connection
self.client.ping()
except DockerException as e:
print(f"Error connecting to Docker: {e}")
sys.exit(1)
def _ensure_network(self) -> None:
"""Ensure the MC network exists"""
network_name = self.config_manager.config.docker.get("network", "mc-network")
networks = self.client.networks.list(names=[network_name])
if not networks:
self.client.networks.create(network_name, driver="bridge")
def _generate_session_id(self) -> str:
"""Generate a unique session ID"""
return str(uuid.uuid4())[:8]
def _get_project_config_path(self, project: Optional[str] = None) -> pathlib.Path:
"""Get the path to the project configuration directory
Args:
project: Optional project repository URL. If None, uses current directory.
Returns:
Path to the project configuration directory
"""
# Get home directory for the MC config
mc_home = pathlib.Path.home() / ".mc"
# If no project URL is provided, use the current directory path
if not project:
# Use current working directory as project identifier
project_id = os.getcwd()
else:
# Use project URL as identifier
project_id = project
# Create a hash of the project ID to use as directory name
project_hash = hashlib.md5(project_id.encode()).hexdigest()
# Create the project config directory path
config_path = mc_home / "projects" / project_hash / "config"
# Create the directory if it doesn't exist
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.mkdir(exist_ok=True)
return config_path
def list_sessions(self) -> List[Session]:
"""List all active MC sessions"""
sessions = []
try:
containers = self.client.containers.list(
all=True, filters={"label": "mc.session"}
)
for container in containers:
container_id = container.id
labels = container.labels
session_id = labels.get("mc.session.id")
if not session_id:
continue
status = SessionStatus.RUNNING
if container.status == "exited":
status = SessionStatus.STOPPED
elif container.status == "created":
status = SessionStatus.CREATING
session = Session(
id=session_id,
name=labels.get("mc.session.name", f"mc-{session_id}"),
driver=labels.get("mc.driver", "unknown"),
status=status,
container_id=container_id,
created_at=container.attrs["Created"],
project=labels.get("mc.project"),
)
# Get port mappings
if container.attrs.get("NetworkSettings", {}).get("Ports"):
ports = {}
for container_port, host_ports in container.attrs[
"NetworkSettings"
]["Ports"].items():
if host_ports:
# Strip /tcp or /udp suffix and convert to int
container_port_num = int(container_port.split("/")[0])
host_port = int(host_ports[0]["HostPort"])
ports[container_port_num] = host_port
session.ports = ports
sessions.append(session)
except DockerException as e:
print(f"Error listing sessions: {e}")
return sessions
def create_session(
self,
driver_name: str,
project: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
session_name: Optional[str] = None,
mount_local: bool = True,
) -> Optional[Session]:
"""Create a new MC session
Args:
driver_name: The name of the driver to use
project: Optional project repository URL
environment: Optional environment variables
session_name: Optional session name
mount_local: Whether to mount the current directory to /app
"""
try:
# Validate driver exists
driver = self.config_manager.get_driver(driver_name)
if not driver:
print(f"Driver '{driver_name}' not found")
return None
# Generate session ID and name
session_id = self._generate_session_id()
if not session_name:
session_name = f"mc-{session_id}"
# Ensure network exists
self._ensure_network()
# Prepare environment variables
env_vars = environment or {}
# Add project URL to environment if provided
if project:
env_vars["MC_PROJECT_URL"] = project
# Pass API keys from host environment to container for local development
api_keys = [
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"OPENROUTER_API_KEY",
"LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
"LANGFUSE_INIT_PROJECT_SECRET_KEY",
"LANGFUSE_URL",
]
for key in api_keys:
if key in os.environ and key not in env_vars:
env_vars[key] = os.environ[key]
# Pull image if needed
try:
self.client.images.get(driver.image)
except ImageNotFound:
print(f"Pulling image {driver.image}...")
self.client.images.pull(driver.image)
# Set up volume mounts
volumes = {}
# If project URL is provided, don't mount local directory (will clone into /app)
# If no project URL and mount_local is True, mount local directory to /app
if not project and mount_local:
# Mount current directory to /app in the container
current_dir = os.getcwd()
volumes[current_dir] = {"bind": "/app", "mode": "rw"}
print(f"Mounting local directory {current_dir} to /app")
elif project:
print(
f"Project URL provided - container will clone {project} into /app during initialization"
)
# Set up persistent project configuration
project_config_path = self._get_project_config_path(project)
print(f"Using project configuration directory: {project_config_path}")
# Mount the project configuration directory
volumes[str(project_config_path)] = {"bind": "/mc-config", "mode": "rw"}
# Add environment variables for config path
env_vars["MC_CONFIG_DIR"] = "/mc-config"
env_vars["MC_DRIVER_CONFIG_DIR"] = f"/mc-config/{driver_name}"
# Create driver-specific config directories
if driver.persistent_configs:
print("Setting up persistent configuration directories:")
for config in driver.persistent_configs:
# Get target directory path on host
target_dir = project_config_path / config.target.lstrip(
"/mc-config/"
)
# Create directory if it's a directory type config
if config.type == "directory":
target_dir.mkdir(parents=True, exist_ok=True)
print(f" - Created directory: {target_dir}")
# For files, make sure parent directory exists
elif config.type == "file":
target_dir.parent.mkdir(parents=True, exist_ok=True)
# File will be created by the container if needed
# Create container
container = self.client.containers.create(
image=driver.image,
name=session_name,
hostname=session_name,
detach=True,
tty=True,
stdin_open=True,
environment=env_vars,
volumes=volumes,
labels={
"mc.session": "true",
"mc.session.id": session_id,
"mc.session.name": session_name,
"mc.driver": driver_name,
"mc.project": project or "",
},
network=self.config_manager.config.docker.get("network", "mc-network"),
ports={f"{port}/tcp": None for port in driver.ports},
)
# Start container
container.start()
# Get updated port information
container.reload()
ports = {}
if container.attrs.get("NetworkSettings", {}).get("Ports"):
for container_port, host_ports in container.attrs["NetworkSettings"][
"Ports"
].items():
if host_ports:
container_port_num = int(container_port.split("/")[0])
host_port = int(host_ports[0]["HostPort"])
ports[container_port_num] = host_port
# Create session object
session = Session(
id=session_id,
name=session_name,
driver=driver_name,
status=SessionStatus.RUNNING,
container_id=container.id,
environment=env_vars,
project=project,
created_at=container.attrs["Created"],
ports=ports,
)
# Save session to config as JSON-compatible dict
self.config_manager.add_session(session_id, session.model_dump(mode="json"))
return session
except DockerException as e:
print(f"Error creating session: {e}")
return None
def close_session(self, session_id: str) -> bool:
"""Close a MC session"""
try:
sessions = self.list_sessions()
for session in sessions:
if session.id == session_id:
return self._close_single_session(session)
print(f"Session '{session_id}' not found")
return False
except DockerException as e:
print(f"Error closing session: {e}")
return False
def connect_session(self, session_id: str) -> bool:
"""Connect to a running MC session"""
try:
sessions = self.list_sessions()
for session in sessions:
if session.id == session_id and session.container_id:
if session.status != SessionStatus.RUNNING:
print(f"Session '{session_id}' is not running")
return False
# Execute interactive shell in container
# The init-status.sh script will automatically show logs if needed
print(f"Connecting to session {session_id}...")
os.system(f"docker exec -it {session.container_id} /bin/bash")
return True
print(f"Session '{session_id}' not found")
return False
except DockerException as e:
print(f"Error connecting to session: {e}")
return False
def _close_single_session(self, session: Session) -> bool:
"""Close a single session (helper for parallel processing)
Args:
session: The session to close
Returns:
bool: Whether the session was successfully closed
"""
if not session.container_id:
return False
try:
container = self.client.containers.get(session.container_id)
container.stop()
container.remove()
self.config_manager.remove_session(session.id)
return True
except DockerException as e:
print(f"Error closing session {session.id}: {e}")
return False
def close_all_sessions(self, progress_callback=None) -> Tuple[int, bool]:
"""Close all MC sessions with parallel processing and progress reporting
Args:
progress_callback: Optional callback function to report progress
The callback should accept (session_id, status, message)
Returns:
tuple: (number of sessions closed, success)
"""
try:
sessions = self.list_sessions()
if not sessions:
return 0, True
# No need for session status as we receive it via callback
# Define a wrapper to track progress
def close_with_progress(session):
if not session.container_id:
return False
try:
container = self.client.containers.get(session.container_id)
# Stop and remove container
container.stop()
container.remove()
# Remove from config
self.config_manager.remove_session(session.id)
# Notify about completion
if progress_callback:
progress_callback(
session.id,
"completed",
f"{session.name} closed successfully",
)
return True
except DockerException as e:
error_msg = f"Error: {str(e)}"
if progress_callback:
progress_callback(session.id, "failed", error_msg)
print(f"Error closing session {session.id}: {e}")
return False
# Use ThreadPoolExecutor to close sessions in parallel
with concurrent.futures.ThreadPoolExecutor(
max_workers=min(10, len(sessions))
) as executor:
# Submit all session closing tasks
future_to_session = {
executor.submit(close_with_progress, session): session
for session in sessions
}
# Collect results
closed_count = 0
for future in concurrent.futures.as_completed(future_to_session):
session = future_to_session[future]
try:
success = future.result()
if success:
closed_count += 1
except Exception as e:
print(f"Error closing session {session.id}: {e}")
return closed_count, closed_count > 0
except DockerException as e:
print(f"Error closing all sessions: {e}")
return 0, False
def get_session_logs(self, session_id: str, follow: bool = False) -> Optional[str]:
"""Get logs from a MC session"""
try:
sessions = self.list_sessions()
for session in sessions:
if session.id == session_id and session.container_id:
container = self.client.containers.get(session.container_id)
if follow:
for line in container.logs(stream=True, follow=True):
print(line.decode().strip())
return None
else:
return container.logs().decode()
print(f"Session '{session_id}' not found")
return None
except DockerException as e:
print(f"Error getting session logs: {e}")
return None
def get_init_logs(self, session_id: str, follow: bool = False) -> Optional[str]:
"""Get initialization logs from a MC session
Args:
session_id: The session ID
follow: Whether to follow the logs
Returns:
The logs as a string, or None if there was an error
"""
try:
sessions = self.list_sessions()
for session in sessions:
if session.id == session_id and session.container_id:
container = self.client.containers.get(session.container_id)
# Check if initialization is complete
init_complete = False
try:
exit_code, output = container.exec_run(
"grep -q 'INIT_COMPLETE=true' /init.status"
)
init_complete = exit_code == 0
except DockerException:
pass
if follow and not init_complete:
print(
f"Following initialization logs for session {session_id}..."
)
print("Press Ctrl+C to stop following")
container.exec_run(
"tail -f /init.log", stream=True, demux=True, tty=True
)
return None
else:
exit_code, output = container.exec_run("cat /init.log")
if exit_code == 0:
return output.decode()
else:
print("No initialization logs found")
return None
print(f"Session '{session_id}' not found")
return None
except DockerException as e:
print(f"Error getting initialization logs: {e}")
return None