mirror of
https://github.com/Monadical-SAS/cubbi.git
synced 2025-12-21 12:49:07 +00:00
feat(cli): phase 1 - local cli with docker integration
This commit is contained in:
5
mcontainer/__init__.py
Normal file
5
mcontainer/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""
|
||||
MC - Monadical Container Tool
|
||||
"""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
4
mcontainer/__main__.py
Normal file
4
mcontainer/__main__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .cli import app
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
314
mcontainer/cli.py
Normal file
314
mcontainer/cli.py
Normal file
@@ -0,0 +1,314 @@
|
||||
import os
|
||||
import sys
|
||||
from typing import List, Optional
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from .config import ConfigManager
|
||||
from .container import ContainerManager
|
||||
from .models import SessionStatus
|
||||
|
||||
app = typer.Typer(help="Monadical Container Tool")
|
||||
session_app = typer.Typer(help="Manage MC sessions")
|
||||
driver_app = typer.Typer(help="Manage MC drivers", no_args_is_help=True)
|
||||
app.add_typer(session_app, name="session", no_args_is_help=True)
|
||||
app.add_typer(driver_app, name="driver", no_args_is_help=True)
|
||||
|
||||
console = Console()
|
||||
config_manager = ConfigManager()
|
||||
container_manager = ContainerManager(config_manager)
|
||||
|
||||
|
||||
@app.callback(invoke_without_command=True)
|
||||
def main(ctx: typer.Context) -> None:
|
||||
"""Monadical Container Tool"""
|
||||
# If no command is specified, create a session
|
||||
if ctx.invoked_subcommand is None:
|
||||
create_session(driver=None, project=None, env=[], name=None)
|
||||
|
||||
|
||||
@app.command()
|
||||
def version() -> None:
|
||||
"""Show MC version information"""
|
||||
from importlib.metadata import version as get_version
|
||||
|
||||
try:
|
||||
version_str = get_version("mcontainer")
|
||||
console.print(f"MC - Monadical Container Tool v{version_str}")
|
||||
except Exception:
|
||||
console.print("MC - Monadical Container Tool (development version)")
|
||||
|
||||
|
||||
@session_app.command("list")
|
||||
def list_sessions() -> None:
|
||||
"""List active MC sessions"""
|
||||
sessions = container_manager.list_sessions()
|
||||
|
||||
if not sessions:
|
||||
console.print("No active sessions found")
|
||||
return
|
||||
|
||||
table = Table(show_header=True, header_style="bold")
|
||||
table.add_column("ID")
|
||||
table.add_column("Name")
|
||||
table.add_column("Driver")
|
||||
table.add_column("Status")
|
||||
table.add_column("Ports")
|
||||
table.add_column("Project")
|
||||
|
||||
for session in sessions:
|
||||
ports_str = ", ".join(
|
||||
[
|
||||
f"{container_port}:{host_port}"
|
||||
for container_port, host_port in session.ports.items()
|
||||
]
|
||||
)
|
||||
|
||||
status_color = {
|
||||
SessionStatus.RUNNING: "green",
|
||||
SessionStatus.STOPPED: "red",
|
||||
SessionStatus.CREATING: "yellow",
|
||||
SessionStatus.FAILED: "red",
|
||||
}.get(session.status, "white")
|
||||
|
||||
table.add_row(
|
||||
session.id,
|
||||
session.name,
|
||||
session.driver,
|
||||
f"[{status_color}]{session.status}[/{status_color}]",
|
||||
ports_str,
|
||||
session.project or "",
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
@session_app.command("create")
|
||||
def create_session(
|
||||
driver: Optional[str] = typer.Option(None, "--driver", "-d", help="Driver to use"),
|
||||
project: Optional[str] = typer.Option(None, "--project", "-p", help="Project repository URL"),
|
||||
env: List[str] = typer.Option(
|
||||
[], "--env", "-e", help="Environment variables (KEY=VALUE)"
|
||||
),
|
||||
name: Optional[str] = typer.Option(None, "--name", "-n", help="Session name"),
|
||||
) -> None:
|
||||
"""Create a new MC session"""
|
||||
# Use default driver if not specified
|
||||
if not driver:
|
||||
driver = config_manager.config.defaults.get("driver", "goose")
|
||||
|
||||
# Parse environment variables
|
||||
environment = {}
|
||||
for var in env:
|
||||
if "=" in var:
|
||||
key, value = var.split("=", 1)
|
||||
environment[key] = value
|
||||
else:
|
||||
console.print(
|
||||
f"[yellow]Warning: Ignoring invalid environment variable format: {var}[/yellow]"
|
||||
)
|
||||
|
||||
with console.status(f"Creating session with driver '{driver}'..."):
|
||||
session = container_manager.create_session(
|
||||
driver_name=driver,
|
||||
project=project,
|
||||
environment=environment,
|
||||
session_name=name,
|
||||
)
|
||||
|
||||
if session:
|
||||
console.print("[green]Session created successfully![/green]")
|
||||
console.print(f"Session ID: {session.id}")
|
||||
console.print(f"Driver: {session.driver}")
|
||||
|
||||
if session.ports:
|
||||
console.print("Ports:")
|
||||
for container_port, host_port in session.ports.items():
|
||||
console.print(f" {container_port} -> {host_port}")
|
||||
|
||||
console.print(
|
||||
f"\nConnect to the session with:\n mc session connect {session.id}"
|
||||
)
|
||||
else:
|
||||
console.print("[red]Failed to create session[/red]")
|
||||
|
||||
|
||||
@session_app.command("close")
|
||||
def close_session(
|
||||
session_id: str = typer.Argument(..., help="Session ID to close"),
|
||||
) -> None:
|
||||
"""Close a MC session"""
|
||||
with console.status(f"Closing session {session_id}..."):
|
||||
success = container_manager.close_session(session_id)
|
||||
|
||||
if success:
|
||||
console.print(f"[green]Session {session_id} closed successfully[/green]")
|
||||
else:
|
||||
console.print(f"[red]Failed to close session {session_id}[/red]")
|
||||
|
||||
|
||||
@session_app.command("connect")
|
||||
def connect_session(
|
||||
session_id: str = typer.Argument(..., help="Session ID to connect to"),
|
||||
) -> None:
|
||||
"""Connect to a MC session"""
|
||||
console.print(f"Connecting to session {session_id}...")
|
||||
success = container_manager.connect_session(session_id)
|
||||
|
||||
if not success:
|
||||
console.print(f"[red]Failed to connect to session {session_id}[/red]")
|
||||
|
||||
|
||||
@session_app.command("logs")
|
||||
def session_logs(
|
||||
session_id: str = typer.Argument(..., help="Session ID to get logs from"),
|
||||
follow: bool = typer.Option(False, "--follow", "-f", help="Follow log output"),
|
||||
) -> None:
|
||||
"""Stream logs from a MC session"""
|
||||
if follow:
|
||||
console.print(f"Streaming logs from session {session_id}... (Ctrl+C to exit)")
|
||||
container_manager.get_session_logs(session_id, follow=True)
|
||||
else:
|
||||
logs = container_manager.get_session_logs(session_id)
|
||||
if logs:
|
||||
console.print(logs)
|
||||
|
||||
|
||||
@app.command()
|
||||
def stop() -> None:
|
||||
"""Stop the current MC session (from inside the container)"""
|
||||
# Check if running inside a container
|
||||
if not os.path.exists("/.dockerenv"):
|
||||
console.print(
|
||||
"[red]This command can only be run from inside a MC container[/red]"
|
||||
)
|
||||
return
|
||||
|
||||
# Stop the container from inside
|
||||
console.print("Stopping the current session...")
|
||||
os.system("kill 1") # Send SIGTERM to PID 1 (container's init process)
|
||||
|
||||
|
||||
# Main CLI entry point that handles project repository URLs
|
||||
@app.command(name="")
|
||||
def quick_create(
|
||||
project: Optional[str] = typer.Argument(..., help="Project repository URL"),
|
||||
driver: Optional[str] = typer.Option(None, "--driver", "-d", help="Driver to use"),
|
||||
env: List[str] = typer.Option(
|
||||
[], "--env", "-e", help="Environment variables (KEY=VALUE)"
|
||||
),
|
||||
name: Optional[str] = typer.Option(None, "--name", "-n", help="Session name"),
|
||||
) -> None:
|
||||
"""Create a new MC session with a project repository"""
|
||||
create_session(driver=driver, project=project, env=env, name=name)
|
||||
|
||||
|
||||
@driver_app.command("list")
|
||||
def list_drivers() -> None:
|
||||
"""List available MC drivers"""
|
||||
drivers = config_manager.list_drivers()
|
||||
|
||||
if not drivers:
|
||||
console.print("No drivers found")
|
||||
return
|
||||
|
||||
table = Table(show_header=True, header_style="bold")
|
||||
table.add_column("Name")
|
||||
table.add_column("Description")
|
||||
table.add_column("Version")
|
||||
table.add_column("Maintainer")
|
||||
table.add_column("Image")
|
||||
|
||||
for name, driver in drivers.items():
|
||||
table.add_row(
|
||||
driver.name,
|
||||
driver.description,
|
||||
driver.version,
|
||||
driver.maintainer,
|
||||
driver.image,
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
@driver_app.command("build")
|
||||
def build_driver(
|
||||
driver_name: str = typer.Argument(..., help="Driver name to build"),
|
||||
tag: str = typer.Option("latest", "--tag", "-t", help="Image tag"),
|
||||
push: bool = typer.Option(False, "--push", "-p", help="Push image to registry after building"),
|
||||
) -> None:
|
||||
"""Build a driver Docker image"""
|
||||
# Get driver path
|
||||
driver_path = config_manager.get_driver_path(driver_name)
|
||||
if not driver_path:
|
||||
console.print(f"[red]Driver '{driver_name}' not found[/red]")
|
||||
return
|
||||
|
||||
# Check if Dockerfile exists
|
||||
dockerfile_path = driver_path / "Dockerfile"
|
||||
if not dockerfile_path.exists():
|
||||
console.print(f"[red]Dockerfile not found in {driver_path}[/red]")
|
||||
return
|
||||
|
||||
# Build image name
|
||||
image_name = f"monadical/mc-{driver_name}:{tag}"
|
||||
|
||||
# Build the image
|
||||
with console.status(f"Building image {image_name}..."):
|
||||
result = os.system(f"cd {driver_path} && docker build -t {image_name} .")
|
||||
|
||||
if result != 0:
|
||||
console.print("[red]Failed to build driver image[/red]")
|
||||
return
|
||||
|
||||
console.print(f"[green]Successfully built image: {image_name}[/green]")
|
||||
|
||||
# Push if requested
|
||||
if push:
|
||||
with console.status(f"Pushing image {image_name}..."):
|
||||
result = os.system(f"docker push {image_name}")
|
||||
|
||||
if result != 0:
|
||||
console.print("[red]Failed to push driver image[/red]")
|
||||
return
|
||||
|
||||
console.print(f"[green]Successfully pushed image: {image_name}[/green]")
|
||||
|
||||
|
||||
@driver_app.command("info")
|
||||
def driver_info(
|
||||
driver_name: str = typer.Argument(..., help="Driver name to get info for"),
|
||||
) -> None:
|
||||
"""Show detailed information about a driver"""
|
||||
driver = config_manager.get_driver(driver_name)
|
||||
if not driver:
|
||||
console.print(f"[red]Driver '{driver_name}' not found[/red]")
|
||||
return
|
||||
|
||||
console.print(f"[bold]Driver: {driver.name}[/bold]")
|
||||
console.print(f"Description: {driver.description}")
|
||||
console.print(f"Version: {driver.version}")
|
||||
console.print(f"Maintainer: {driver.maintainer}")
|
||||
console.print(f"Image: {driver.image}")
|
||||
|
||||
if driver.ports:
|
||||
console.print("\n[bold]Ports:[/bold]")
|
||||
for port in driver.ports:
|
||||
console.print(f" {port}")
|
||||
|
||||
# Get driver path
|
||||
driver_path = config_manager.get_driver_path(driver_name)
|
||||
if driver_path:
|
||||
console.print(f"\n[bold]Path:[/bold] {driver_path}")
|
||||
|
||||
# Check for README
|
||||
readme_path = driver_path / "README.md"
|
||||
if readme_path.exists():
|
||||
console.print("\n[bold]README:[/bold]")
|
||||
with open(readme_path, "r") as f:
|
||||
console.print(f.read())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
199
mcontainer/config.py
Normal file
199
mcontainer/config.py
Normal file
@@ -0,0 +1,199 @@
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
from .models import Config, Driver
|
||||
|
||||
DEFAULT_CONFIG_DIR = Path.home() / ".config" / "mc"
|
||||
DEFAULT_CONFIG_FILE = DEFAULT_CONFIG_DIR / "config.yaml"
|
||||
DEFAULT_DRIVERS_DIR = Path.home() / ".config" / "mc" / "drivers"
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
BUILTIN_DRIVERS_DIR = PROJECT_ROOT / "drivers"
|
||||
|
||||
# Default built-in driver configurations
|
||||
DEFAULT_DRIVERS = {
|
||||
"goose": Driver(
|
||||
name="goose",
|
||||
description="Goose with MCP servers",
|
||||
version="1.0.0",
|
||||
maintainer="team@monadical.com",
|
||||
image="monadical/mc-goose:latest",
|
||||
ports=[8000, 22],
|
||||
),
|
||||
"aider": Driver(
|
||||
name="aider",
|
||||
description="Aider coding assistant",
|
||||
version="1.0.0",
|
||||
maintainer="team@monadical.com",
|
||||
image="monadical/mc-aider:latest",
|
||||
ports=[22],
|
||||
),
|
||||
"claude-code": Driver(
|
||||
name="claude-code",
|
||||
description="Claude Code environment",
|
||||
version="1.0.0",
|
||||
maintainer="team@monadical.com",
|
||||
image="monadical/mc-claude-code:latest",
|
||||
ports=[22],
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class ConfigManager:
|
||||
def __init__(self, config_path: Optional[Path] = None):
|
||||
self.config_path = config_path or DEFAULT_CONFIG_FILE
|
||||
self.config_dir = self.config_path.parent
|
||||
self.drivers_dir = DEFAULT_DRIVERS_DIR
|
||||
self.config = self._load_or_create_config()
|
||||
|
||||
def _load_or_create_config(self) -> Config:
|
||||
"""Load existing config or create a new one with defaults"""
|
||||
if self.config_path.exists():
|
||||
try:
|
||||
with open(self.config_path, "r") as f:
|
||||
config_data = yaml.safe_load(f) or {}
|
||||
|
||||
# Create a new config from scratch, then update with data from file
|
||||
config = Config(
|
||||
docker=config_data.get('docker', {}),
|
||||
defaults=config_data.get('defaults', {})
|
||||
)
|
||||
|
||||
# Add drivers
|
||||
if 'drivers' in config_data:
|
||||
for driver_name, driver_data in config_data['drivers'].items():
|
||||
config.drivers[driver_name] = Driver.model_validate(driver_data)
|
||||
|
||||
# Add sessions (stored as simple dictionaries)
|
||||
if 'sessions' in config_data:
|
||||
config.sessions = config_data['sessions']
|
||||
|
||||
return config
|
||||
except Exception as e:
|
||||
print(f"Error loading config: {e}")
|
||||
return self._create_default_config()
|
||||
else:
|
||||
return self._create_default_config()
|
||||
|
||||
def _create_default_config(self) -> Config:
|
||||
"""Create a default configuration"""
|
||||
self.config_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.drivers_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Load built-in drivers from directories
|
||||
builtin_drivers = self.load_builtin_drivers()
|
||||
|
||||
# Merge with default drivers, with directory drivers taking precedence
|
||||
drivers = {**DEFAULT_DRIVERS, **builtin_drivers}
|
||||
|
||||
config = Config(
|
||||
docker={
|
||||
"socket": "/var/run/docker.sock",
|
||||
"network": "mc-network",
|
||||
},
|
||||
drivers=drivers,
|
||||
defaults={
|
||||
"driver": "goose",
|
||||
},
|
||||
)
|
||||
|
||||
self.save_config(config)
|
||||
return config
|
||||
|
||||
def save_config(self, config: Optional[Config] = None) -> None:
|
||||
"""Save the current config to disk"""
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
self.config_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Use model_dump with mode="json" for proper serialization of enums
|
||||
config_dict = self.config.model_dump(mode="json")
|
||||
|
||||
# Write to file
|
||||
with open(self.config_path, "w") as f:
|
||||
yaml.dump(config_dict, f)
|
||||
|
||||
def get_driver(self, name: str) -> Optional[Driver]:
|
||||
"""Get a driver by name"""
|
||||
return self.config.drivers.get(name)
|
||||
|
||||
def list_drivers(self) -> Dict[str, Driver]:
|
||||
"""List all available drivers"""
|
||||
return self.config.drivers
|
||||
|
||||
def add_session(self, session_id: str, session_data: dict) -> None:
|
||||
"""Add a session to the config"""
|
||||
# Store session data as a dictionary in the config
|
||||
self.config.sessions[session_id] = session_data
|
||||
self.save_config()
|
||||
|
||||
def remove_session(self, session_id: str) -> None:
|
||||
"""Remove a session from the config"""
|
||||
if session_id in self.config.sessions:
|
||||
del self.config.sessions[session_id]
|
||||
self.save_config()
|
||||
|
||||
def list_sessions(self) -> Dict:
|
||||
"""List all sessions in the config"""
|
||||
return self.config.sessions
|
||||
|
||||
def load_driver_from_dir(self, driver_dir: Path) -> Optional[Driver]:
|
||||
"""Load a driver configuration from a directory"""
|
||||
yaml_path = driver_dir / "mai-driver.yaml" # Keep this name for backward compatibility
|
||||
|
||||
if not yaml_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(yaml_path, "r") as f:
|
||||
driver_data = yaml.safe_load(f)
|
||||
|
||||
# Extract required fields
|
||||
if not all(k in driver_data for k in ["name", "description", "version", "maintainer"]):
|
||||
print(f"Driver config {yaml_path} missing required fields")
|
||||
return None
|
||||
|
||||
# Create driver object
|
||||
driver = Driver(
|
||||
name=driver_data["name"],
|
||||
description=driver_data["description"],
|
||||
version=driver_data["version"],
|
||||
maintainer=driver_data["maintainer"],
|
||||
image=f"monadical/mc-{driver_data['name']}:latest",
|
||||
ports=driver_data.get("ports", []),
|
||||
)
|
||||
|
||||
return driver
|
||||
except Exception as e:
|
||||
print(f"Error loading driver from {yaml_path}: {e}")
|
||||
return None
|
||||
|
||||
def load_builtin_drivers(self) -> Dict[str, Driver]:
|
||||
"""Load all built-in drivers from the drivers directory"""
|
||||
drivers = {}
|
||||
|
||||
if not BUILTIN_DRIVERS_DIR.exists():
|
||||
return drivers
|
||||
|
||||
for driver_dir in BUILTIN_DRIVERS_DIR.iterdir():
|
||||
if driver_dir.is_dir():
|
||||
driver = self.load_driver_from_dir(driver_dir)
|
||||
if driver:
|
||||
drivers[driver.name] = driver
|
||||
|
||||
return drivers
|
||||
|
||||
def get_driver_path(self, driver_name: str) -> Optional[Path]:
|
||||
"""Get the directory path for a driver"""
|
||||
# Check built-in drivers first
|
||||
builtin_path = BUILTIN_DRIVERS_DIR / driver_name
|
||||
if builtin_path.exists() and builtin_path.is_dir():
|
||||
return builtin_path
|
||||
|
||||
# Then check user drivers
|
||||
user_path = self.drivers_dir / driver_name
|
||||
if user_path.exists() and user_path.is_dir():
|
||||
return user_path
|
||||
|
||||
return None
|
||||
235
mcontainer/container.py
Normal file
235
mcontainer/container.py
Normal file
@@ -0,0 +1,235 @@
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import docker
|
||||
from typing import Dict, List, Optional
|
||||
from docker.errors import DockerException, ImageNotFound
|
||||
|
||||
from .models import Session, SessionStatus
|
||||
from .config import ConfigManager
|
||||
|
||||
|
||||
class ContainerManager:
|
||||
def __init__(self, config_manager: Optional[ConfigManager] = None):
|
||||
self.config_manager = config_manager or ConfigManager()
|
||||
try:
|
||||
self.client = docker.from_env()
|
||||
# Test connection
|
||||
self.client.ping()
|
||||
except DockerException as e:
|
||||
print(f"Error connecting to Docker: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def _ensure_network(self) -> None:
|
||||
"""Ensure the MC network exists"""
|
||||
network_name = self.config_manager.config.docker.get("network", "mc-network")
|
||||
networks = self.client.networks.list(names=[network_name])
|
||||
if not networks:
|
||||
self.client.networks.create(network_name, driver="bridge")
|
||||
|
||||
def _generate_session_id(self) -> str:
|
||||
"""Generate a unique session ID"""
|
||||
return str(uuid.uuid4())[:8]
|
||||
|
||||
def list_sessions(self) -> List[Session]:
|
||||
"""List all active MC sessions"""
|
||||
sessions = []
|
||||
try:
|
||||
containers = self.client.containers.list(
|
||||
all=True, filters={"label": "mc.session"}
|
||||
)
|
||||
|
||||
for container in containers:
|
||||
container_id = container.id
|
||||
labels = container.labels
|
||||
|
||||
session_id = labels.get("mc.session.id")
|
||||
if not session_id:
|
||||
continue
|
||||
|
||||
status = SessionStatus.RUNNING
|
||||
if container.status == "exited":
|
||||
status = SessionStatus.STOPPED
|
||||
elif container.status == "created":
|
||||
status = SessionStatus.CREATING
|
||||
|
||||
session = Session(
|
||||
id=session_id,
|
||||
name=labels.get("mc.session.name", f"mc-{session_id}"),
|
||||
driver=labels.get("mc.driver", "unknown"),
|
||||
status=status,
|
||||
container_id=container_id,
|
||||
created_at=container.attrs["Created"],
|
||||
project=labels.get("mc.project"),
|
||||
)
|
||||
|
||||
# Get port mappings
|
||||
if container.attrs.get("NetworkSettings", {}).get("Ports"):
|
||||
ports = {}
|
||||
for container_port, host_ports in container.attrs[
|
||||
"NetworkSettings"
|
||||
]["Ports"].items():
|
||||
if host_ports:
|
||||
# Strip /tcp or /udp suffix and convert to int
|
||||
container_port_num = int(container_port.split("/")[0])
|
||||
host_port = int(host_ports[0]["HostPort"])
|
||||
ports[container_port_num] = host_port
|
||||
session.ports = ports
|
||||
|
||||
sessions.append(session)
|
||||
|
||||
except DockerException as e:
|
||||
print(f"Error listing sessions: {e}")
|
||||
|
||||
return sessions
|
||||
|
||||
def create_session(
|
||||
self,
|
||||
driver_name: str,
|
||||
project: Optional[str] = None,
|
||||
environment: Optional[Dict[str, str]] = None,
|
||||
session_name: Optional[str] = None,
|
||||
) -> Optional[Session]:
|
||||
"""Create a new MC session"""
|
||||
try:
|
||||
# Validate driver exists
|
||||
driver = self.config_manager.get_driver(driver_name)
|
||||
if not driver:
|
||||
print(f"Driver '{driver_name}' not found")
|
||||
return None
|
||||
|
||||
# Generate session ID and name
|
||||
session_id = self._generate_session_id()
|
||||
if not session_name:
|
||||
session_name = f"mc-{session_id}"
|
||||
|
||||
# Ensure network exists
|
||||
self._ensure_network()
|
||||
|
||||
# Prepare environment variables
|
||||
env_vars = environment or {}
|
||||
|
||||
# Pull image if needed
|
||||
try:
|
||||
self.client.images.get(driver.image)
|
||||
except ImageNotFound:
|
||||
print(f"Pulling image {driver.image}...")
|
||||
self.client.images.pull(driver.image)
|
||||
|
||||
# Create container
|
||||
container = self.client.containers.create(
|
||||
image=driver.image,
|
||||
name=session_name,
|
||||
hostname=session_name,
|
||||
detach=True,
|
||||
tty=True,
|
||||
stdin_open=True,
|
||||
environment=env_vars,
|
||||
labels={
|
||||
"mc.session": "true",
|
||||
"mc.session.id": session_id,
|
||||
"mc.session.name": session_name,
|
||||
"mc.driver": driver_name,
|
||||
"mc.project": project or "",
|
||||
},
|
||||
network=self.config_manager.config.docker.get("network", "mc-network"),
|
||||
ports={f"{port}/tcp": None for port in driver.ports},
|
||||
)
|
||||
|
||||
# Start container
|
||||
container.start()
|
||||
|
||||
# Get updated port information
|
||||
container.reload()
|
||||
ports = {}
|
||||
if container.attrs.get("NetworkSettings", {}).get("Ports"):
|
||||
for container_port, host_ports in container.attrs["NetworkSettings"][
|
||||
"Ports"
|
||||
].items():
|
||||
if host_ports:
|
||||
container_port_num = int(container_port.split("/")[0])
|
||||
host_port = int(host_ports[0]["HostPort"])
|
||||
ports[container_port_num] = host_port
|
||||
|
||||
# Create session object
|
||||
session = Session(
|
||||
id=session_id,
|
||||
name=session_name,
|
||||
driver=driver_name,
|
||||
status=SessionStatus.RUNNING,
|
||||
container_id=container.id,
|
||||
environment=env_vars,
|
||||
project=project,
|
||||
created_at=container.attrs["Created"],
|
||||
ports=ports,
|
||||
)
|
||||
|
||||
# Save session to config as JSON-compatible dict
|
||||
self.config_manager.add_session(session_id, session.model_dump(mode="json"))
|
||||
|
||||
return session
|
||||
|
||||
except DockerException as e:
|
||||
print(f"Error creating session: {e}")
|
||||
return None
|
||||
|
||||
def close_session(self, session_id: str) -> bool:
|
||||
"""Close a MC session"""
|
||||
try:
|
||||
sessions = self.list_sessions()
|
||||
for session in sessions:
|
||||
if session.id == session_id and session.container_id:
|
||||
container = self.client.containers.get(session.container_id)
|
||||
container.stop()
|
||||
container.remove()
|
||||
self.config_manager.remove_session(session_id)
|
||||
return True
|
||||
|
||||
print(f"Session '{session_id}' not found")
|
||||
return False
|
||||
|
||||
except DockerException as e:
|
||||
print(f"Error closing session: {e}")
|
||||
return False
|
||||
|
||||
def connect_session(self, session_id: str) -> bool:
|
||||
"""Connect to a running MC session"""
|
||||
try:
|
||||
sessions = self.list_sessions()
|
||||
for session in sessions:
|
||||
if session.id == session_id and session.container_id:
|
||||
if session.status != SessionStatus.RUNNING:
|
||||
print(f"Session '{session_id}' is not running")
|
||||
return False
|
||||
|
||||
# Execute interactive shell in container
|
||||
os.system(f"docker exec -it {session.container_id} /bin/bash")
|
||||
return True
|
||||
|
||||
print(f"Session '{session_id}' not found")
|
||||
return False
|
||||
|
||||
except DockerException as e:
|
||||
print(f"Error connecting to session: {e}")
|
||||
return False
|
||||
|
||||
def get_session_logs(self, session_id: str, follow: bool = False) -> Optional[str]:
|
||||
"""Get logs from a MC session"""
|
||||
try:
|
||||
sessions = self.list_sessions()
|
||||
for session in sessions:
|
||||
if session.id == session_id and session.container_id:
|
||||
container = self.client.containers.get(session.container_id)
|
||||
if follow:
|
||||
for line in container.logs(stream=True, follow=True):
|
||||
print(line.decode().strip())
|
||||
return None
|
||||
else:
|
||||
return container.logs().decode()
|
||||
|
||||
print(f"Session '{session_id}' not found")
|
||||
return None
|
||||
|
||||
except DockerException as e:
|
||||
print(f"Error getting session logs: {e}")
|
||||
return None
|
||||
1
mcontainer/drivers/__init__.py
Normal file
1
mcontainer/drivers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Driver definitions for MAI"""
|
||||
28
mcontainer/drivers/base.py
Normal file
28
mcontainer/drivers/base.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""
|
||||
Base driver implementation for MAI
|
||||
"""
|
||||
|
||||
from typing import Dict, Optional
|
||||
|
||||
from ..models import Driver
|
||||
|
||||
|
||||
class DriverManager:
|
||||
"""Manager for MAI drivers"""
|
||||
|
||||
@staticmethod
|
||||
def get_default_drivers() -> Dict[str, Driver]:
|
||||
"""Get the default built-in drivers"""
|
||||
from ..config import DEFAULT_DRIVERS
|
||||
|
||||
return DEFAULT_DRIVERS
|
||||
|
||||
@staticmethod
|
||||
def get_driver_metadata(driver_name: str) -> Optional[Dict]:
|
||||
"""Get metadata for a specific driver"""
|
||||
from ..config import DEFAULT_DRIVERS
|
||||
|
||||
if driver_name in DEFAULT_DRIVERS:
|
||||
return DEFAULT_DRIVERS[driver_name].model_dump()
|
||||
|
||||
return None
|
||||
48
mcontainer/models.py
Normal file
48
mcontainer/models.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SessionStatus(str, Enum):
|
||||
CREATING = "creating"
|
||||
RUNNING = "running"
|
||||
STOPPED = "stopped"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
class DriverEnvironmentVariable(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
required: bool = False
|
||||
default: Optional[str] = None
|
||||
sensitive: bool = False
|
||||
|
||||
|
||||
class Driver(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
version: str
|
||||
maintainer: str
|
||||
image: str
|
||||
environment: List[DriverEnvironmentVariable] = []
|
||||
ports: List[int] = []
|
||||
volumes: List[Dict[str, str]] = []
|
||||
|
||||
|
||||
class Session(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
driver: str
|
||||
status: SessionStatus
|
||||
container_id: Optional[str] = None
|
||||
environment: Dict[str, str] = Field(default_factory=dict)
|
||||
project: Optional[str] = None
|
||||
created_at: str
|
||||
ports: Dict[int, int] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
docker: Dict[str, str] = Field(default_factory=dict)
|
||||
drivers: Dict[str, Driver] = Field(default_factory=dict)
|
||||
sessions: Dict[str, dict] = Field(default_factory=dict) # Store as dict to avoid serialization issues
|
||||
defaults: Dict[str, str] = Field(default_factory=dict)
|
||||
14
mcontainer/service.py
Normal file
14
mcontainer/service.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""
|
||||
MC Service - Container Management Web Service
|
||||
(This is a placeholder for Phase 2)
|
||||
"""
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the MC service"""
|
||||
print("MC Service - Container Management Web Service")
|
||||
print("This feature will be implemented in Phase 2")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
mcontainer/utils/__init__.py
Normal file
1
mcontainer/utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Utility functions for MAI"""
|
||||
61
mcontainer/utils/git.py
Normal file
61
mcontainer/utils/git.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""
|
||||
Git repository handling utilities for MAI
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def parse_git_url(url: str) -> Optional[Tuple[str, str, str]]:
|
||||
"""
|
||||
Parse a Git URL into its components: hostname, owner, repo
|
||||
|
||||
Supports formats:
|
||||
- git@github.com:owner/repo.git
|
||||
- https://github.com/owner/repo.git
|
||||
- github.com/owner/repo
|
||||
|
||||
Returns:
|
||||
Tuple of (hostname, owner, repo) or None if invalid
|
||||
"""
|
||||
# SSH format: git@github.com:owner/repo.git
|
||||
ssh_pattern = r"^(?:git@)?([\w\.-]+)(?::)([\w\.-]+)/([\w\.-]+)(?:\.git)?$"
|
||||
|
||||
# HTTPS format: https://github.com/owner/repo.git
|
||||
https_pattern = r"^(?:https?://)([\w\.-]+)/(?:([\w\.-]+)/([\w\.-]+))(?:\.git)?$"
|
||||
|
||||
# Simple format: github.com/owner/repo
|
||||
simple_pattern = r"^([\w\.-]+)/(?:([\w\.-]+)/([\w\.-]+))(?:\.git)?$"
|
||||
|
||||
for pattern in [ssh_pattern, https_pattern, simple_pattern]:
|
||||
match = re.match(pattern, url)
|
||||
if match:
|
||||
hostname, owner, repo = match.groups()
|
||||
return hostname, owner, repo
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_normalized_url(url: str) -> Optional[str]:
|
||||
"""
|
||||
Convert various Git URL formats to a normalized form
|
||||
|
||||
Returns:
|
||||
Normalized URL (git@hostname:owner/repo.git) or None if invalid
|
||||
"""
|
||||
parsed = parse_git_url(url)
|
||||
if not parsed:
|
||||
return None
|
||||
|
||||
hostname, owner, repo = parsed
|
||||
return f"git@{hostname}:{owner}/{repo}.git"
|
||||
|
||||
|
||||
def get_repository_name(url: str) -> Optional[str]:
|
||||
"""Get the repository name from a Git URL"""
|
||||
parsed = parse_git_url(url)
|
||||
if not parsed:
|
||||
return None
|
||||
|
||||
_, _, repo = parsed
|
||||
return repo.replace(".git", "")
|
||||
Reference in New Issue
Block a user