5 Commits

Author SHA1 Message Date
github-actions
de1b3c0976 0.4.0
Automatically generated by python-semantic-release
2025-08-06 02:46:46 +00:00
75c9849315 feat: add user port support (#26)
* feat: add user port support

* fix: fix unit test and improve isolation

* refactor: remove some fixture
2025-08-05 18:01:09 -06:00
9dc11582a2 docs: update readme (#25)
doc: update readme
2025-08-05 16:21:10 -06:00
472f030924 feat: support for crush (#23) 2025-08-04 09:29:51 -06:00
b8ecad6227 feat: make opencode beautiful by default (#24)
opencode: try having compatible default theme
2025-08-04 15:29:13 +00:00
25 changed files with 1425 additions and 166 deletions

View File

@@ -1,6 +1,35 @@
# CHANGELOG
## v0.4.0 (2025-08-06)
### Documentation
- Update readme ([#25](https://github.com/Monadical-SAS/cubbi/pull/25),
[`9dc1158`](https://github.com/Monadical-SAS/cubbi/commit/9dc11582a21371a069d407390308340a87358a9f))
doc: update readme
### Features
- Add user port support ([#26](https://github.com/Monadical-SAS/cubbi/pull/26),
[`75c9849`](https://github.com/Monadical-SAS/cubbi/commit/75c9849315aebb41ffbd5ac942c7eb3c4a151663))
* feat: add user port support
* fix: fix unit test and improve isolation
* refactor: remove some fixture
- Make opencode beautiful by default ([#24](https://github.com/Monadical-SAS/cubbi/pull/24),
[`b8ecad6`](https://github.com/Monadical-SAS/cubbi/commit/b8ecad6227f6a328517edfc442cd9bcf4d3361dc))
opencode: try having compatible default theme
- Support for crush ([#23](https://github.com/Monadical-SAS/cubbi/pull/23),
[`472f030`](https://github.com/Monadical-SAS/cubbi/commit/472f030924e58973dea0a41188950540550c125d))
## v0.3.0 (2025-07-31)
### Bug Fixes

View File

@@ -48,3 +48,15 @@ Use uv instead:
- **Configuration**: Use environment variables with YAML for configuration
Refer to SPECIFICATIONS.md for detailed architecture and implementation guidance.
## Cubbi images
A cubbi image is a flavored docker image that wrap a tool (let's say goose), and dynamically configure the tool when the image is starting. All cubbi images are defined in `cubbi/images` directory.
Each image must have (let's take goose image for example):
- `goose/cubbi_image.yaml`, list of persistent paths, etc.
- `goose/Dockerfile`, that is used to build the cubbi image with cubbi tools
- `goose/goose_plugin.py`, a plugin file named of the cubbi image name, that is specific for this image, with the intent to configure dynamically the docker image when starting with the preferences of the user (via environment variable). They all import `cubbi_init.py`, but this file is shared accross all images, so it is normal that execution of the plugin import does not work, because the build system will copy the file in place during the build.
- `goose/README.md`, a tiny readme about the image
If you are creating a new image, look about existing images (goose, opencode).

View File

@@ -2,7 +2,7 @@
# Cubbi - Container Tool
Cubbi is a command-line tool for managing ephemeral containers that run AI tools and development environments, with support for MCP servers.
Cubbi is a command-line tool for managing ephemeral containers that run AI tools and development environments, with support for MCP servers. It supports [Aider](https://github.com/Aider-AI/aider), [Crush](https://github.com/charmbracelet/crush), [Claude Code](https://github.com/anthropics/claude-code), [Goose](https://github.com/block/goose), [Opencode](https://github.com/sst/opencode).
![PyPI - Version](https://img.shields.io/pypi/v/cubbi)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/cubbi)
@@ -17,7 +17,6 @@ Cubbi is a command-line tool for managing ephemeral containers that run AI tools
- `cubbix` - Shortcut for `cubbi session create`
- `cubbix .` - Mount the current directory
- `cubbix /path/to/dir` - Mount a specific directory
- `cubbix https://github.com/user/repo` - Clone a repository
## 📋 Requirements
@@ -27,9 +26,6 @@ Cubbi is a command-line tool for managing ephemeral containers that run AI tools
## 📥 Installation
```bash
# Via pip
pip install cubbi
# Via uv
uv tool install cubbi
@@ -43,6 +39,7 @@ Then compile your first image:
```bash
cubbi image build goose
cubbi image build opencode
cubbi image build crush
```
### For Developers
@@ -80,9 +77,19 @@ cubbi session connect SESSION_ID
# Close a session when done
cubbi session close SESSION_ID
# Close a session quickly (kill instead of graceful stop)
cubbi session close SESSION_ID --kill
# Close all sessions at once
cubbi session close --all
# Close all sessions quickly
cubbi session close --all --kill
# Create a session with a specific image
cubbix --image goose
cubbix --image opencode
cubbix --image crush
# Create a session with environment variables
cubbix -e VAR1=value1 -e VAR2=value2
@@ -95,6 +102,11 @@ cubbix -v ~/data:/data -v ./configs:/etc/app/config
cubbix .
cubbix /path/to/project
# Forward ports from container to host
cubbix --port 8000 # Forward port 8000
cubbix --port 8000,3000,5173 # Forward multiple ports (comma-separated)
cubbix --port 8000 --port 3000 # Forward multiple ports (repeated flag)
# Connect to external Docker networks
cubbix --network teamnet --network dbnet
@@ -138,6 +150,7 @@ Cubbi includes an image management system that allows you to build, manage, and
| opencode | no |
| claudecode | no |
| aider | no |
| crush | no |
```bash
# List available images
@@ -146,10 +159,12 @@ cubbi image list
# Get detailed information about an image
cubbi image info goose
cubbi image info opencode
cubbi image info crush
# Build an image
cubbi image build goose
cubbi image build opencode
cubbi image build crush
```
Images are defined in the `cubbi/images/` directory, with each subdirectory containing:
@@ -234,6 +249,26 @@ cubbi config volume remove /local/path
Default volumes will be combined with any volumes specified using the `-v` flag when creating a session.
### Default Ports Configuration
You can configure default ports that will be automatically forwarded in every new session:
```bash
# List default ports
cubbi config port list
# Add a single port to defaults
cubbi config port add 8000
# Add multiple ports to defaults (comma-separated)
cubbi config port add 8000,3000,5173
# Remove a port from defaults
cubbi config port remove 8000
```
Default ports will be combined with any ports specified using the `--port` flag when creating a session.
### Default MCP Servers Configuration
You can configure default MCP servers that sessions will automatically connect to:

View File

@@ -142,6 +142,11 @@ def create_session(
network: List[str] = typer.Option(
[], "--network", "-N", help="Connect to additional Docker networks"
),
port: List[str] = typer.Option(
[],
"--port",
help="Forward ports (e.g., '8000' or '8000,3000' or multiple --port flags)",
),
name: Optional[str] = typer.Option(None, "--name", "-n", help="Session name"),
run_command: Optional[str] = typer.Option(
None,
@@ -319,6 +324,35 @@ def create_session(
"[yellow]Warning: --domains cannot be used with --network. Network restrictions will take precedence.[/yellow]"
)
# Get default ports from user config
default_ports = temp_user_config.get("defaults.ports", [])
# Parse and combine ports from command line
session_ports = []
for port_arg in port:
try:
parsed_ports = [int(p.strip()) for p in port_arg.split(",")]
# Validate port ranges
invalid_ports = [p for p in parsed_ports if not (1 <= p <= 65535)]
if invalid_ports:
console.print(
f"[red]Error: Invalid ports {invalid_ports}. Ports must be between 1 and 65535[/red]"
)
return
session_ports.extend(parsed_ports)
except ValueError:
console.print(
f"[yellow]Warning: Ignoring invalid port format: {port_arg}. Use integers only.[/yellow]"
)
# Combine default ports with session ports, removing duplicates
all_ports = list(set(default_ports + session_ports))
if all_ports:
console.print(f"Forwarding ports: {', '.join(map(str, all_ports))}")
# Get default MCPs from user config if none specified
all_mcps = mcp if isinstance(mcp, list) else []
if not all_mcps:
@@ -372,6 +406,7 @@ def create_session(
mount_local=mount_local,
volumes=volume_mounts,
networks=all_networks,
ports=all_ports,
mcp=all_mcps,
run_command=run_command,
no_shell=no_shell,
@@ -457,6 +492,9 @@ def create_session(
def close_session(
session_id: Optional[str] = typer.Argument(None, help="Session ID to close"),
all_sessions: bool = typer.Option(False, "--all", help="Close all active sessions"),
kill: bool = typer.Option(
False, "--kill", help="Forcefully kill containers instead of graceful stop"
),
) -> None:
"""Close a Cubbi session or all sessions"""
if all_sessions:
@@ -480,7 +518,9 @@ def close_session(
)
# Start closing sessions with progress updates
count, success = container_manager.close_all_sessions(update_progress)
count, success = container_manager.close_all_sessions(
update_progress, kill=kill
)
# Final result
if success:
@@ -489,7 +529,7 @@ def close_session(
console.print("[red]Failed to close all sessions[/red]")
elif session_id:
with console.status(f"Closing session {session_id}..."):
success = container_manager.close_session(session_id)
success = container_manager.close_session(session_id, kill=kill)
if success:
console.print(f"[green]Session {session_id} closed successfully[/green]")
@@ -711,6 +751,10 @@ config_app.add_typer(network_app, name="network", no_args_is_help=True)
volume_app = typer.Typer(help="Manage default volumes")
config_app.add_typer(volume_app, name="volume", no_args_is_help=True)
# Create a port subcommand for config
port_app = typer.Typer(help="Manage default ports")
config_app.add_typer(port_app, name="port", no_args_is_help=True)
# Create an MCP subcommand for config
config_mcp_app = typer.Typer(help="Manage default MCP servers")
config_app.add_typer(config_mcp_app, name="mcp", no_args_is_help=True)
@@ -1021,6 +1065,91 @@ def remove_volume(
console.print(f"[green]Removed volume '{volume_to_remove}' from defaults[/green]")
# Port configuration commands
@port_app.command("list")
def list_ports() -> None:
"""List all default ports"""
ports = user_config.get("defaults.ports", [])
if not ports:
console.print("No default ports configured")
return
table = Table(show_header=True, header_style="bold")
table.add_column("Port")
for port in ports:
table.add_row(str(port))
console.print(table)
@port_app.command("add")
def add_port(
ports_arg: str = typer.Argument(
..., help="Port(s) to add to defaults (e.g., '8000' or '8000,3000,5173')"
),
) -> None:
"""Add port(s) to default ports"""
current_ports = user_config.get("defaults.ports", [])
# Parse ports (support comma-separated)
try:
if "," in ports_arg:
new_ports = [int(p.strip()) for p in ports_arg.split(",")]
else:
new_ports = [int(ports_arg)]
except ValueError:
console.print(
"[red]Error: Invalid port format. Use integers only (e.g., '8000' or '8000,3000')[/red]"
)
return
# Validate port ranges
invalid_ports = [p for p in new_ports if not (1 <= p <= 65535)]
if invalid_ports:
console.print(
f"[red]Error: Invalid ports {invalid_ports}. Ports must be between 1 and 65535[/red]"
)
return
# Add new ports, avoiding duplicates
added_ports = []
for port in new_ports:
if port not in current_ports:
current_ports.append(port)
added_ports.append(port)
if not added_ports:
if len(new_ports) == 1:
console.print(f"Port {new_ports[0]} is already in defaults")
else:
console.print(f"All ports {new_ports} are already in defaults")
return
user_config.set("defaults.ports", current_ports)
if len(added_ports) == 1:
console.print(f"[green]Added port {added_ports[0]} to defaults[/green]")
else:
console.print(f"[green]Added ports {added_ports} to defaults[/green]")
@port_app.command("remove")
def remove_port(
port: int = typer.Argument(..., help="Port to remove from defaults"),
) -> None:
"""Remove a port from default ports"""
ports = user_config.get("defaults.ports", [])
if port not in ports:
console.print(f"Port {port} is not in defaults")
return
ports.remove(port)
user_config.set("defaults.ports", ports)
console.print(f"[green]Removed port {port} from defaults[/green]")
# MCP Management Commands

View File

@@ -154,6 +154,7 @@ class ContainerManager:
mount_local: bool = False,
volumes: Optional[Dict[str, Dict[str, str]]] = None,
networks: Optional[List[str]] = None,
ports: Optional[List[int]] = None,
mcp: Optional[List[str]] = None,
run_command: Optional[str] = None,
no_shell: bool = False,
@@ -634,9 +635,12 @@ class ContainerManager:
},
"command": container_command, # Set the command
"entrypoint": entrypoint, # Set the entrypoint (might be None)
"ports": {f"{port}/tcp": None for port in image.ports},
}
# Add port forwarding if ports are specified
if ports:
container_params["ports"] = {f"{port}/tcp": None for port in ports}
# Use network_mode if domains are specified, otherwise use regular network
if network_mode:
container_params["network_mode"] = network_mode
@@ -773,13 +777,18 @@ class ContainerManager:
return None
def close_session(self, session_id: str) -> bool:
"""Close a Cubbi session"""
def close_session(self, session_id: str, kill: bool = False) -> bool:
"""Close a Cubbi session
Args:
session_id: The ID of the session to close
kill: If True, forcefully kill the container instead of graceful stop
"""
try:
sessions = self.list_sessions()
for session in sessions:
if session.id == session_id:
return self._close_single_session(session)
return self._close_single_session(session, kill=kill)
print(f"Session '{session_id}' not found")
return False
@@ -856,11 +865,12 @@ class ContainerManager:
print(f"Error connecting to session: {e}")
return False
def _close_single_session(self, session: Session) -> bool:
def _close_single_session(self, session: Session, kill: bool = False) -> bool:
"""Close a single session (helper for parallel processing)
Args:
session: The session to close
kill: If True, forcefully kill the container instead of graceful stop
Returns:
bool: Whether the session was successfully closed
@@ -871,7 +881,10 @@ class ContainerManager:
try:
# First, close the main session container
container = self.client.containers.get(session.container_id)
container.stop()
if kill:
container.kill()
else:
container.stop()
container.remove()
# Check for and close any associated network-filter container
@@ -881,7 +894,10 @@ class ContainerManager:
network_filter_name
)
logger.info(f"Stopping network-filter container {network_filter_name}")
network_filter_container.stop()
if kill:
network_filter_container.kill()
else:
network_filter_container.stop()
network_filter_container.remove()
except DockerException:
# Network-filter container might not exist, which is fine
@@ -893,12 +909,15 @@ class ContainerManager:
print(f"Error closing session {session.id}: {e}")
return False
def close_all_sessions(self, progress_callback=None) -> Tuple[int, bool]:
def close_all_sessions(
self, progress_callback=None, kill: bool = False
) -> Tuple[int, bool]:
"""Close all Cubbi sessions with parallel processing and progress reporting
Args:
progress_callback: Optional callback function to report progress
The callback should accept (session_id, status, message)
kill: If True, forcefully kill containers instead of graceful stop
Returns:
tuple: (number of sessions closed, success)
@@ -918,7 +937,10 @@ class ContainerManager:
try:
container = self.client.containers.get(session.container_id)
# Stop and remove container
container.stop()
if kill:
container.kill()
else:
container.stop()
container.remove()
# Check for and close any associated network-filter container
@@ -927,7 +949,10 @@ class ContainerManager:
network_filter_container = self.client.containers.get(
network_filter_name
)
network_filter_container.stop()
if kill:
network_filter_container.kill()
else:
network_filter_container.stop()
network_filter_container.remove()
except DockerException:
# Network-filter container might not exist, which is fine

View File

@@ -70,8 +70,6 @@ environment:
description: HTTPS proxy server URL
required: false
ports: []
volumes:
- mountPath: /app
description: Application directory

View File

@@ -50,8 +50,6 @@ environment:
required: false
default: "false"
ports: []
volumes:
- mountPath: /app
description: Application directory

View File

@@ -0,0 +1,62 @@
FROM python:3.12-slim
LABEL maintainer="team@monadical.com"
LABEL description="Crush AI coding assistant for Cubbi"
# Install system dependencies including gosu for user switching and shadow for useradd/groupadd
RUN apt-get update && apt-get install -y --no-install-recommends \
gosu \
sudo \
passwd \
bash \
curl \
bzip2 \
iputils-ping \
iproute2 \
libxcb1 \
libdbus-1-3 \
nano \
tmux \
git-core \
ripgrep \
openssh-client \
vim \
nodejs \
npm \
&& rm -rf /var/lib/apt/lists/*
# Install deps
WORKDIR /tmp
RUN curl -fsSL https://astral.sh/uv/install.sh -o install.sh && \
sh install.sh && \
mv /root/.local/bin/uv /usr/local/bin/uv && \
mv /root/.local/bin/uvx /usr/local/bin/uvx && \
rm install.sh
# Install crush via npm
RUN npm install -g @charmland/crush
# Create app directory
WORKDIR /app
# Copy initialization system
COPY cubbi_init.py /cubbi/cubbi_init.py
COPY crush_plugin.py /cubbi/crush_plugin.py
COPY cubbi_image.yaml /cubbi/cubbi_image.yaml
COPY init-status.sh /cubbi/init-status.sh
RUN chmod +x /cubbi/cubbi_init.py /cubbi/init-status.sh
RUN echo '[ -x /cubbi/init-status.sh ] && /cubbi/init-status.sh' >> /etc/bash.bashrc
# Set up environment
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV UV_LINK_MODE=copy
# Pre-install the cubbi_init
RUN /cubbi/cubbi_init.py --help
# Set WORKDIR to /app, common practice and expected by cubbi-init.sh
WORKDIR /app
ENTRYPOINT ["/cubbi/cubbi_init.py"]
CMD ["tail", "-f", "/dev/null"]

View File

@@ -0,0 +1,77 @@
# Crush Image for Cubbi
This image provides a containerized environment for running [Crush](https://github.com/charmbracelet/crush), a terminal-based AI coding assistant.
## Features
- Pre-configured environment for Crush AI coding assistant
- Multi-model support (OpenAI, Anthropic, Groq)
- JSON-based configuration
- MCP server integration support
- Session preservation across runs
## Environment Variables
### AI Provider Configuration
| Variable | Description | Required | Default |
|----------|-------------|----------|---------|
| `OPENAI_API_KEY` | OpenAI API key for crush | No | - |
| `ANTHROPIC_API_KEY` | Anthropic API key for crush | No | - |
| `GROQ_API_KEY` | Groq API key for crush | No | - |
| `OPENAI_URL` | Custom OpenAI-compatible API URL | No | - |
| `CUBBI_MODEL` | AI model to use with crush | No | - |
| `CUBBI_PROVIDER` | AI provider to use with crush | No | - |
### Cubbi Core Variables
| Variable | Description | Required | Default |
|----------|-------------|----------|---------|
| `CUBBI_USER_ID` | UID for the container user | No | `1000` |
| `CUBBI_GROUP_ID` | GID for the container user | No | `1000` |
| `CUBBI_RUN_COMMAND` | Command to execute after initialization | No | - |
| `CUBBI_NO_SHELL` | Exit after command execution | No | `false` |
| `CUBBI_CONFIG_DIR` | Directory for persistent configurations | No | `/cubbi-config` |
| `CUBBI_PERSISTENT_LINKS` | Semicolon-separated list of source:target symlinks | No | - |
### MCP Integration Variables
| Variable | Description | Required |
|----------|-------------|----------|
| `MCP_COUNT` | Number of available MCP servers | No |
| `MCP_NAMES` | JSON array of MCP server names | No |
| `MCP_{idx}_NAME` | Name of MCP server at index | No |
| `MCP_{idx}_TYPE` | Type of MCP server | No |
| `MCP_{idx}_HOST` | Hostname of MCP server | No |
| `MCP_{idx}_URL` | Full URL for remote MCP servers | No |
## Build
To build this image:
```bash
cd cubbi/images/crush
docker build -t monadical/cubbi-crush:latest .
```
## Usage
```bash
# Create a new session with this image
cubbix -i crush
# Run crush with specific provider
cubbix -i crush -e CUBBI_PROVIDER=openai -e CUBBI_MODEL=gpt-4
# Test crush installation
cubbix -i crush --no-shell --run "crush --help"
```
## Configuration
Crush uses JSON configuration stored in `/home/cubbi/.config/crush/config.json`. The plugin automatically configures:
- AI providers based on available API keys
- Default models and providers from environment variables
- Session preservation settings
- MCP server integrations

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Crush-specific plugin for Cubbi initialization
"""
import json
import os
from pathlib import Path
from typing import Any, Dict
from cubbi_init import ToolPlugin
class CrushPlugin(ToolPlugin):
"""Plugin for Crush AI coding assistant initialization"""
@property
def tool_name(self) -> str:
return "crush"
def _get_user_ids(self) -> tuple[int, int]:
"""Get the cubbi user and group IDs from environment"""
user_id = int(os.environ.get("CUBBI_USER_ID", "1000"))
group_id = int(os.environ.get("CUBBI_GROUP_ID", "1000"))
return user_id, group_id
def _set_ownership(self, path: Path) -> None:
"""Set ownership of a path to the cubbi user"""
user_id, group_id = self._get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError as e:
self.status.log(f"Failed to set ownership for {path}: {e}", "WARNING")
def _get_user_config_path(self) -> Path:
"""Get the correct config path for the cubbi user"""
return Path("/home/cubbi/.config/crush")
def _ensure_user_config_dir(self) -> Path:
"""Ensure config directory exists with correct ownership"""
config_dir = self._get_user_config_path()
# Create the full directory path
try:
config_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# Directory already exists, which is fine
pass
except OSError as e:
self.status.log(
f"Failed to create config directory {config_dir}: {e}", "ERROR"
)
return config_dir
# Set ownership for the directories
config_parent = config_dir.parent
if config_parent.exists():
self._set_ownership(config_parent)
if config_dir.exists():
self._set_ownership(config_dir)
return config_dir
def initialize(self) -> bool:
"""Initialize Crush configuration"""
self._ensure_user_config_dir()
return self.setup_tool_configuration()
def setup_tool_configuration(self) -> bool:
"""Set up Crush configuration file"""
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
if not config_dir.exists():
self.status.log(
f"Config directory {config_dir} does not exist and could not be created",
"ERROR",
)
return False
config_file = config_dir / "config.json"
# Load or initialize configuration
if config_file.exists():
try:
with config_file.open("r") as f:
config_data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
self.status.log(f"Failed to load existing config: {e}", "WARNING")
config_data = {}
else:
config_data = {}
# Set default model and provider if specified
# cubbi_model = os.environ.get("CUBBI_MODEL")
# cubbi_provider = os.environ.get("CUBBI_PROVIDER")
# XXX i didn't understood yet the configuration file, tbd later.
try:
with config_file.open("w") as f:
json.dump(config_data, f, indent=2)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
self.status.log(f"Updated Crush configuration at {config_file}")
return True
except Exception as e:
self.status.log(f"Failed to write Crush configuration: {e}", "ERROR")
return False
def integrate_mcp_servers(self, mcp_config: Dict[str, Any]) -> bool:
"""Integrate Crush with available MCP servers"""
if mcp_config["count"] == 0:
self.status.log("No MCP servers to integrate")
return True
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
if not config_dir.exists():
self.status.log(
f"Config directory {config_dir} does not exist and could not be created",
"ERROR",
)
return False
config_file = config_dir / "config.json"
if config_file.exists():
try:
with config_file.open("r") as f:
config_data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
self.status.log(f"Failed to load existing config: {e}", "WARNING")
config_data = {}
else:
config_data = {}
if "mcp_servers" not in config_data:
config_data["mcp_servers"] = {}
for server in mcp_config["servers"]:
server_name = server["name"]
server_host = server["host"]
server_url = server["url"]
if server_name and server_host:
mcp_url = f"http://{server_host}:8080/sse"
self.status.log(f"Adding MCP server: {server_name} - {mcp_url}")
config_data["mcp_servers"][server_name] = {
"uri": mcp_url,
"type": server.get("type", "sse"),
"enabled": True,
}
elif server_name and server_url:
self.status.log(
f"Adding remote MCP server: {server_name} - {server_url}"
)
config_data["mcp_servers"][server_name] = {
"uri": server_url,
"type": server.get("type", "sse"),
"enabled": True,
}
try:
with config_file.open("w") as f:
json.dump(config_data, f, indent=2)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
return True
except Exception as e:
self.status.log(f"Failed to integrate MCP servers: {e}", "ERROR")
return False

View File

@@ -0,0 +1,51 @@
name: crush
description: Crush AI coding assistant environment
version: 1.0.0
maintainer: team@monadical.com
image: monadical/cubbi-crush:latest
init:
pre_command: /cubbi-init.sh
command: /entrypoint.sh
environment:
- name: OPENAI_API_KEY
description: OpenAI API key for crush
required: false
sensitive: true
- name: ANTHROPIC_API_KEY
description: Anthropic API key for crush
required: false
sensitive: true
- name: GROQ_API_KEY
description: Groq API key for crush
required: false
sensitive: true
- name: OPENAI_URL
description: Custom OpenAI-compatible API URL
required: false
- name: CUBBI_MODEL
description: AI model to use with crush
required: false
- name: CUBBI_PROVIDER
description: AI provider to use with crush
required: false
volumes:
- mountPath: /app
description: Application directory
persistent_configs:
- source: "/home/cubbi/.config/crush"
target: "/cubbi-config/crush-config"
type: "directory"
description: "Crush configuration directory"
- source: "/app/.crush"
target: "/cubbi-config/crush-app"
type: "directory"
description: "Crush application data and sessions"

View File

@@ -24,9 +24,6 @@ environment:
required: false
default: https://cloud.langfuse.com
ports:
- 8000
volumes:
- mountPath: /app
description: Application directory

View File

@@ -67,6 +67,7 @@ RUN echo '[ -x /cubbi/init-status.sh ] && /cubbi/init-status.sh' >> /etc/bash.ba
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV UV_LINK_MODE=copy
ENV COLORTERM=truecolor
# Pre-install the cubbi_init
RUN /cubbi/cubbi_init.py --help

View File

@@ -9,8 +9,6 @@ init:
command: /entrypoint.sh
environment: []
ports: []
volumes:
- mountPath: /app
description: Application directory

View File

@@ -182,6 +182,9 @@ class OpencodePlugin(ToolPlugin):
else:
config_data = {}
# Set default theme to system
config_data.setdefault("theme", "system")
# Update with environment variables
opencode_model = os.environ.get("CUBBI_MODEL")
opencode_provider = os.environ.get("CUBBI_PROVIDER")

View File

@@ -51,7 +51,6 @@ class Image(BaseModel):
image: str
init: Optional[ImageInit] = None
environment: List[ImageEnvironmentVariable] = []
ports: List[int] = []
volumes: List[VolumeMount] = []
persistent_configs: List[PersistentConfig] = []

View File

@@ -96,6 +96,7 @@ class UserConfigManager:
"mount_local": True,
"networks": [], # Default networks to connect to (besides cubbi-network)
"volumes": [], # Default volumes to mount, format: "source:dest"
"ports": [], # Default ports to forward, format: list of integers
"mcps": [], # Default MCP servers to connect to
"model": "claude-3-5-sonnet-latest", # Default LLM model to use
"provider": "anthropic", # Default LLM provider to use

View File

@@ -1,6 +1,6 @@
[project]
name = "cubbi"
version = "0.3.0"
version = "0.4.0"
description = "Cubbi Container Tool"
readme = "README.md"
requires-python = ">=3.12"

View File

@@ -2,17 +2,18 @@
Common test fixtures for Cubbi Container tests.
"""
import uuid
import tempfile
import pytest
import docker
import uuid
from pathlib import Path
from unittest.mock import patch
from cubbi.container import ContainerManager
from cubbi.session import SessionManager
import docker
import pytest
from cubbi.config import ConfigManager
from cubbi.container import ContainerManager
from cubbi.models import Session, SessionStatus
from cubbi.session import SessionManager
from cubbi.user_config import UserConfigManager
@@ -41,13 +42,6 @@ requires_docker = pytest.mark.skipif(
)
@pytest.fixture
def temp_dir():
"""Create a temporary directory for test files."""
with tempfile.TemporaryDirectory() as tmp_dir:
yield Path(tmp_dir)
@pytest.fixture
def temp_config_dir():
"""Create a temporary directory for configuration files."""
@@ -56,76 +50,26 @@ def temp_config_dir():
@pytest.fixture
def isolated_config(temp_config_dir):
"""Provide an isolated UserConfigManager instance."""
config_path = temp_config_dir / "config.yaml"
config_path.parent.mkdir(parents=True, exist_ok=True)
return UserConfigManager(str(config_path))
@pytest.fixture
def isolated_session_manager(temp_config_dir):
"""Create an isolated session manager for testing."""
sessions_path = temp_config_dir / "sessions.yaml"
return SessionManager(sessions_path)
@pytest.fixture
def isolated_config_manager():
"""Create an isolated config manager for testing."""
config_manager = ConfigManager()
# Ensure we're using the built-in images, not trying to load from user config
return config_manager
@pytest.fixture
def mock_session_manager():
"""Mock the SessionManager class."""
with patch("cubbi.cli.session_manager") as mock_manager:
yield mock_manager
@pytest.fixture
def mock_container_manager():
"""Mock the ContainerManager class with proper initialization."""
def mock_container_manager(isolate_cubbi_config):
"""Mock the ContainerManager class with proper behaviors for testing."""
mock_session = Session(
id="test-session-id",
name="test-session",
image="goose",
status=SessionStatus.RUNNING,
ports={"8080": "8080"},
ports={8080: 32768},
)
with patch("cubbi.cli.container_manager") as mock_manager:
# Set behaviors to avoid TypeErrors
mock_manager.list_sessions.return_value = []
mock_manager.create_session.return_value = mock_session
mock_manager.close_session.return_value = True
mock_manager.close_all_sessions.return_value = (3, True)
# MCP-related mocks
mock_manager.get_mcp_status.return_value = {
"status": "running",
"container_id": "test-id",
}
mock_manager.start_mcp.return_value = {
"status": "running",
"container_id": "test-id",
}
mock_manager.stop_mcp.return_value = True
mock_manager.restart_mcp.return_value = {
"status": "running",
"container_id": "test-id",
}
mock_manager.get_mcp_logs.return_value = "Test log output"
yield mock_manager
container_manager = isolate_cubbi_config["container_manager"]
@pytest.fixture
def container_manager(isolated_session_manager, isolated_config_manager):
"""Create a container manager with isolated components."""
return ContainerManager(
config_manager=isolated_config_manager, session_manager=isolated_session_manager
)
# Patch the container manager methods for mocking
with (
patch.object(container_manager, "list_sessions", return_value=[]),
patch.object(container_manager, "create_session", return_value=mock_session),
patch.object(container_manager, "close_session", return_value=True),
patch.object(container_manager, "close_all_sessions", return_value=(3, True)),
):
yield container_manager
@pytest.fixture
@@ -137,28 +81,23 @@ def cli_runner():
@pytest.fixture
def test_file_content(temp_dir):
"""Create a test file with content in the temporary directory."""
def test_file_content(temp_config_dir):
"""Create a test file with content in a temporary directory."""
test_content = "This is a test file for volume mounting"
test_file = temp_dir / "test_volume_file.txt"
test_file = temp_config_dir / "test_volume_file.txt"
with open(test_file, "w") as f:
f.write(test_content)
return test_file, test_content
@pytest.fixture
def test_network_name():
"""Generate a unique network name for testing."""
return f"cubbi-test-network-{uuid.uuid4().hex[:8]}"
@pytest.fixture
def docker_test_network(test_network_name):
def docker_test_network():
"""Create a Docker network for testing and clean it up after."""
if not is_docker_available():
pytest.skip("Docker is not available")
return None
test_network_name = f"cubbi-test-network-{uuid.uuid4().hex[:8]}"
client = docker.from_env()
network = client.networks.create(test_network_name, driver="bridge")
@@ -172,8 +111,59 @@ def docker_test_network(test_network_name):
pass
@pytest.fixture(autouse=True, scope="function")
def isolate_cubbi_config(temp_config_dir):
"""
Automatically isolate all Cubbi configuration for every test.
This fixture ensures that tests never touch the user's real configuration
by patching both ConfigManager and UserConfigManager in cli.py to use
temporary directories.
"""
# Create isolated config instances with temporary paths
config_path = temp_config_dir / "config.yaml"
user_config_path = temp_config_dir / "user_config.yaml"
# Create the ConfigManager with a custom config path
isolated_config_manager = ConfigManager(config_path)
# Create the UserConfigManager with a custom config path
isolated_user_config = UserConfigManager(str(user_config_path))
# Create isolated session manager
sessions_path = temp_config_dir / "sessions.yaml"
isolated_session_manager = SessionManager(sessions_path)
# Create isolated container manager
isolated_container_manager = ContainerManager(
isolated_config_manager, isolated_session_manager, isolated_user_config
)
# Patch all the global instances in cli.py and the UserConfigManager class
with (
patch("cubbi.cli.config_manager", isolated_config_manager),
patch("cubbi.cli.user_config", isolated_user_config),
patch("cubbi.cli.session_manager", isolated_session_manager),
patch("cubbi.cli.container_manager", isolated_container_manager),
patch("cubbi.cli.UserConfigManager", return_value=isolated_user_config),
):
# Create isolated MCP manager with isolated user config
from cubbi.mcp import MCPManager
isolated_mcp_manager = MCPManager(config_manager=isolated_user_config)
# Patch the global mcp_manager instance
with patch("cubbi.cli.mcp_manager", isolated_mcp_manager):
yield {
"config_manager": isolated_config_manager,
"user_config": isolated_user_config,
"session_manager": isolated_session_manager,
"container_manager": isolated_container_manager,
"mcp_manager": isolated_mcp_manager,
}
@pytest.fixture
def patched_config_manager(isolated_config):
"""Patch the UserConfigManager in cli.py to use our isolated instance."""
with patch("cubbi.cli.user_config", isolated_config):
yield isolated_config
def patched_config_manager(isolate_cubbi_config):
"""Compatibility fixture - returns the isolated user config."""
return isolate_cubbi_config["user_config"]

View File

@@ -189,4 +189,103 @@ def test_config_reset(cli_runner, patched_config_manager, monkeypatch):
assert patched_config_manager.get("defaults.image") == "goose"
def test_port_list_empty(cli_runner, patched_config_manager):
"""Test listing ports when none are configured."""
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "No default ports configured" in result.stdout
def test_port_add_single(cli_runner, patched_config_manager):
"""Test adding a single port."""
result = cli_runner.invoke(app, ["config", "port", "add", "8000"])
assert result.exit_code == 0
assert "Added port 8000 to defaults" in result.stdout
# Verify it was added
ports = patched_config_manager.get("defaults.ports")
assert 8000 in ports
def test_port_add_multiple(cli_runner, patched_config_manager):
"""Test adding multiple ports with comma separation."""
result = cli_runner.invoke(app, ["config", "port", "add", "8000,3000,5173"])
assert result.exit_code == 0
assert "Added ports [8000, 3000, 5173] to defaults" in result.stdout
# Verify they were added
ports = patched_config_manager.get("defaults.ports")
assert 8000 in ports
assert 3000 in ports
assert 5173 in ports
def test_port_add_duplicate(cli_runner, patched_config_manager):
"""Test adding a port that already exists."""
# Add a port first
patched_config_manager.set("defaults.ports", [8000])
# Try to add the same port again
result = cli_runner.invoke(app, ["config", "port", "add", "8000"])
assert result.exit_code == 0
assert "Port 8000 is already in defaults" in result.stdout
def test_port_add_invalid_format(cli_runner, patched_config_manager):
"""Test adding an invalid port format."""
result = cli_runner.invoke(app, ["config", "port", "add", "invalid"])
assert result.exit_code == 0
assert "Error: Invalid port format" in result.stdout
def test_port_add_invalid_range(cli_runner, patched_config_manager):
"""Test adding a port outside valid range."""
result = cli_runner.invoke(app, ["config", "port", "add", "70000"])
assert result.exit_code == 0
assert "Error: Invalid ports [70000]" in result.stdout
def test_port_list_with_ports(cli_runner, patched_config_manager):
"""Test listing ports when some are configured."""
# Add some ports
patched_config_manager.set("defaults.ports", [8000, 3000])
# List ports
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "8000" in result.stdout
assert "3000" in result.stdout
def test_port_remove(cli_runner, patched_config_manager):
"""Test removing a port."""
# Add a port first
patched_config_manager.set("defaults.ports", [8000])
# Remove the port
result = cli_runner.invoke(app, ["config", "port", "remove", "8000"])
assert result.exit_code == 0
assert "Removed port 8000 from defaults" in result.stdout
# Verify it's gone
ports = patched_config_manager.get("defaults.ports")
assert 8000 not in ports
def test_port_remove_not_found(cli_runner, patched_config_manager):
"""Test removing a port that doesn't exist."""
result = cli_runner.invoke(app, ["config", "port", "remove", "8000"])
assert result.exit_code == 0
assert "Port 8000 is not in defaults" in result.stdout
# patched_config_manager fixture is now in conftest.py

View File

@@ -0,0 +1,90 @@
"""
Test that configuration isolation works correctly and doesn't touch user's real config.
"""
from pathlib import Path
from cubbi.cli import app
def test_config_isolation_preserves_user_config(cli_runner, isolate_cubbi_config):
"""Test that test isolation doesn't affect user's real configuration."""
# Get the user's real config path
real_config_path = Path.home() / ".config" / "cubbi" / "config.yaml"
# If the user has a real config, store its content before test
original_content = None
if real_config_path.exists():
with open(real_config_path, "r") as f:
original_content = f.read()
# Run some config modification commands in the test
result = cli_runner.invoke(app, ["config", "port", "add", "9999"])
assert result.exit_code == 0
result = cli_runner.invoke(app, ["config", "set", "defaults.image", "test-image"])
assert result.exit_code == 0
# Verify the user's real config is unchanged
if original_content is not None:
with open(real_config_path, "r") as f:
current_content = f.read()
assert current_content == original_content
else:
# If no real config existed, it should still not exist
assert not real_config_path.exists()
def test_isolated_config_works_independently(cli_runner, isolate_cubbi_config):
"""Test that the isolated config works correctly for tests."""
# Add a port to isolated config
result = cli_runner.invoke(app, ["config", "port", "add", "8888"])
assert result.exit_code == 0
assert "Added port 8888 to defaults" in result.stdout
# Verify it appears in the list
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "8888" in result.stdout
# Remove the port
result = cli_runner.invoke(app, ["config", "port", "remove", "8888"])
assert result.exit_code == 0
assert "Removed port 8888 from defaults" in result.stdout
# Verify it's gone
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "No default ports configured" in result.stdout
def test_each_test_gets_fresh_config(cli_runner, isolate_cubbi_config):
"""Test that each test gets a fresh, isolated configuration."""
# This test should start with empty ports (fresh config)
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "No default ports configured" in result.stdout
# Add a port
result = cli_runner.invoke(app, ["config", "port", "add", "7777"])
assert result.exit_code == 0
# Verify it's there
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "7777" in result.stdout
def test_another_fresh_config_test(cli_runner, isolate_cubbi_config):
"""Another test to verify each test gets a completely fresh config."""
# This test should also start with empty ports (independent of previous test)
result = cli_runner.invoke(app, ["config", "port", "list"])
assert result.exit_code == 0
assert "No default ports configured" in result.stdout
# The port from the previous test should not be here
result = cli_runner.invoke(app, ["config", "port", "list"])
assert "7777" not in result.stdout

View File

@@ -6,6 +6,8 @@ These tests require Docker to be running.
import subprocess
import time
import uuid
import docker
# Import the requires_docker decorator from conftest
from conftest import requires_docker
@@ -21,13 +23,56 @@ def execute_command_in_container(container_id, command):
return result.stdout.strip()
def wait_for_container_init(container_id, timeout=5.0, poll_interval=0.1):
"""
Wait for a Cubbi container to complete initialization by polling /cubbi/init.status.
Args:
container_id: Docker container ID
timeout: Maximum time to wait in seconds (default: 5.0)
poll_interval: Time between polls in seconds (default: 0.1)
Returns:
bool: True if initialization completed, False if timed out
Raises:
subprocess.CalledProcessError: If docker exec command fails
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Check if /cubbi/init.status contains INIT_COMPLETE=true
result = execute_command_in_container(
container_id,
"grep -q 'INIT_COMPLETE=true' /cubbi/init.status 2>/dev/null && echo 'COMPLETE' || echo 'PENDING'",
)
if result == "COMPLETE":
return True
except subprocess.CalledProcessError:
# File might not exist yet or container not ready, continue polling
pass
time.sleep(poll_interval)
# Timeout reached
return False
@requires_docker
def test_integration_session_create_with_volumes(container_manager, test_file_content):
def test_integration_session_create_with_volumes(
isolate_cubbi_config, test_file_content
):
"""Test creating a session with a volume mount."""
test_file, test_content = test_file_content
session = None
try:
# Get the isolated container manager
container_manager = isolate_cubbi_config["container_manager"]
# Create a session with a volume mount
session = container_manager.create_session(
image_name="goose",
@@ -39,8 +84,9 @@ def test_integration_session_create_with_volumes(container_manager, test_file_co
assert session is not None
assert session.status == "running"
# Give container time to fully start
time.sleep(2)
# Wait for container initialization to complete
init_success = wait_for_container_init(session.container_id)
assert init_success, "Container initialization timed out"
# Verify the file exists in the container and has correct content
container_content = execute_command_in_container(
@@ -50,19 +96,22 @@ def test_integration_session_create_with_volumes(container_manager, test_file_co
assert container_content == test_content
finally:
# Clean up the container
# Clean up the container (use kill for faster test cleanup)
if session and session.container_id:
container_manager.close_session(session.id)
container_manager.close_session(session.id, kill=True)
@requires_docker
def test_integration_session_create_with_networks(
container_manager, docker_test_network
isolate_cubbi_config, docker_test_network
):
"""Test creating a session connected to a custom network."""
session = None
try:
# Get the isolated container manager
container_manager = isolate_cubbi_config["container_manager"]
# Create a session with the test network
session = container_manager.create_session(
image_name="goose",
@@ -74,8 +123,9 @@ def test_integration_session_create_with_networks(
assert session is not None
assert session.status == "running"
# Give container time to fully start
time.sleep(2)
# Wait for container initialization to complete
init_success = wait_for_container_init(session.container_id)
assert init_success, "Container initialization timed out"
# Verify the container is connected to the test network
# Use inspect to check network connections
@@ -97,6 +147,240 @@ def test_integration_session_create_with_networks(
assert int(network_interfaces) >= 2
finally:
# Clean up the container
# Clean up the container (use kill for faster test cleanup)
if session and session.container_id:
container_manager.close_session(session.id)
container_manager.close_session(session.id, kill=True)
@requires_docker
def test_integration_session_create_with_ports(isolate_cubbi_config):
"""Test creating a session with port forwarding."""
session = None
try:
# Get the isolated container manager
container_manager = isolate_cubbi_config["container_manager"]
# Create a session with port forwarding
session = container_manager.create_session(
image_name="goose",
session_name=f"cubbi-test-ports-{uuid.uuid4().hex[:8]}",
mount_local=False, # Don't mount current directory
ports=[8080, 9000], # Forward these ports
)
assert session is not None
assert session.status == "running"
# Verify ports are mapped
assert isinstance(session.ports, dict)
assert 8080 in session.ports
assert 9000 in session.ports
# Verify port mappings are valid (host ports should be assigned)
assert isinstance(session.ports[8080], int)
assert isinstance(session.ports[9000], int)
assert session.ports[8080] > 0
assert session.ports[9000] > 0
# Wait for container initialization to complete
init_success = wait_for_container_init(session.container_id)
assert init_success, "Container initialization timed out"
# Verify Docker port mappings using Docker client
import docker
client = docker.from_env()
container = client.containers.get(session.container_id)
container_ports = container.attrs["NetworkSettings"]["Ports"]
# Verify both ports are exposed
assert "8080/tcp" in container_ports
assert "9000/tcp" in container_ports
# Verify host port bindings exist
assert container_ports["8080/tcp"] is not None
assert container_ports["9000/tcp"] is not None
assert len(container_ports["8080/tcp"]) > 0
assert len(container_ports["9000/tcp"]) > 0
# Verify host ports match session.ports
host_port_8080 = int(container_ports["8080/tcp"][0]["HostPort"])
host_port_9000 = int(container_ports["9000/tcp"][0]["HostPort"])
assert session.ports[8080] == host_port_8080
assert session.ports[9000] == host_port_9000
finally:
# Clean up the container (use kill for faster test cleanup)
if session and session.container_id:
container_manager.close_session(session.id, kill=True)
@requires_docker
def test_integration_session_create_no_ports(isolate_cubbi_config):
"""Test creating a session without port forwarding."""
session = None
try:
# Get the isolated container manager
container_manager = isolate_cubbi_config["container_manager"]
# Create a session without ports
session = container_manager.create_session(
image_name="goose",
session_name=f"cubbi-test-no-ports-{uuid.uuid4().hex[:8]}",
mount_local=False, # Don't mount current directory
ports=[], # No ports
)
assert session is not None
assert session.status == "running"
# Verify no ports are mapped
assert isinstance(session.ports, dict)
assert len(session.ports) == 0
# Wait for container initialization to complete
init_success = wait_for_container_init(session.container_id)
assert init_success, "Container initialization timed out"
# Verify Docker has no port mappings
import docker
client = docker.from_env()
container = client.containers.get(session.container_id)
container_ports = container.attrs["NetworkSettings"]["Ports"]
# Should have no port mappings (empty dict or None values)
for port_spec, bindings in container_ports.items():
assert bindings is None or len(bindings) == 0
finally:
# Clean up the container (use kill for faster test cleanup)
if session and session.container_id:
container_manager.close_session(session.id, kill=True)
@requires_docker
def test_integration_session_create_with_single_port(isolate_cubbi_config):
"""Test creating a session with a single port forward."""
session = None
try:
# Get the isolated container manager
container_manager = isolate_cubbi_config["container_manager"]
# Create a session with single port
session = container_manager.create_session(
image_name="goose",
session_name=f"cubbi-test-single-port-{uuid.uuid4().hex[:8]}",
mount_local=False, # Don't mount current directory
ports=[3000], # Single port
)
assert session is not None
assert session.status == "running"
# Verify single port is mapped
assert isinstance(session.ports, dict)
assert len(session.ports) == 1
assert 3000 in session.ports
assert isinstance(session.ports[3000], int)
assert session.ports[3000] > 0
# Wait for container initialization to complete
init_success = wait_for_container_init(session.container_id)
assert init_success, "Container initialization timed out"
client = docker.from_env()
container = client.containers.get(session.container_id)
container_ports = container.attrs["NetworkSettings"]["Ports"]
# Should have exactly one port mapping
port_mappings = {
k: v for k, v in container_ports.items() if v is not None and len(v) > 0
}
assert len(port_mappings) == 1
assert "3000/tcp" in port_mappings
finally:
# Clean up the container (use kill for faster test cleanup)
if session and session.container_id:
container_manager.close_session(session.id, kill=True)
@requires_docker
def test_integration_kill_vs_stop_speed(isolate_cubbi_config):
"""Test that kill is faster than stop for container termination."""
import time
# Get the isolated container manager
container_manager = isolate_cubbi_config["container_manager"]
# Create two identical sessions for comparison
session_stop = None
session_kill = None
try:
# Create first session (will be stopped gracefully)
session_stop = container_manager.create_session(
image_name="goose",
session_name=f"cubbi-test-stop-{uuid.uuid4().hex[:8]}",
mount_local=False,
ports=[],
)
# Create second session (will be killed)
session_kill = container_manager.create_session(
image_name="goose",
session_name=f"cubbi-test-kill-{uuid.uuid4().hex[:8]}",
mount_local=False,
ports=[],
)
assert session_stop is not None
assert session_kill is not None
# Wait for both containers to initialize
init_success_stop = wait_for_container_init(session_stop.container_id)
init_success_kill = wait_for_container_init(session_kill.container_id)
assert init_success_stop, "Stop test container initialization timed out"
assert init_success_kill, "Kill test container initialization timed out"
# Time graceful stop
start_time = time.time()
container_manager.close_session(session_stop.id, kill=False)
stop_time = time.time() - start_time
session_stop = None # Mark as cleaned up
# Time kill
start_time = time.time()
container_manager.close_session(session_kill.id, kill=True)
kill_time = time.time() - start_time
session_kill = None # Mark as cleaned up
# Kill should be faster than stop (usually by several seconds)
# We use a generous threshold since system performance can vary
assert (
kill_time < stop_time
), f"Kill ({kill_time:.2f}s) should be faster than stop ({stop_time:.2f}s)"
# Verify both methods successfully closed the containers
# (containers should no longer be in the session list)
remaining_sessions = container_manager.list_sessions()
session_ids = [s.id for s in remaining_sessions]
assert session_stop.id if session_stop else "stop-session" not in session_ids
assert session_kill.id if session_kill else "kill-session" not in session_ids
finally:
# Clean up any remaining containers
if session_stop and session_stop.container_id:
try:
container_manager.close_session(session_stop.id, kill=True)
except Exception:
pass
if session_kill and session_kill.container_id:
try:
container_manager.close_session(session_kill.id, kill=True)
except Exception:
pass

View File

@@ -21,7 +21,7 @@ def test_mcp_list_empty(cli_runner, patched_config_manager):
assert "No MCP servers configured" in result.stdout
def test_mcp_add_remote(cli_runner, patched_config_manager):
def test_mcp_add_remote(cli_runner, isolate_cubbi_config):
"""Test adding a remote MCP server and listing it."""
# Add a remote MCP server
result = cli_runner.invoke(
@@ -49,7 +49,7 @@ def test_mcp_add_remote(cli_runner, patched_config_manager):
assert "http://mcp-se" in result.stdout # Truncated in table view
def test_mcp_add(cli_runner, patched_config_manager):
def test_mcp_add(cli_runner, isolate_cubbi_config):
"""Test adding a proxy-based MCP server and listing it."""
# Add a Docker MCP server
result = cli_runner.invoke(
@@ -350,10 +350,12 @@ def test_mcp_status(cli_runner, patched_config_manager, mock_container_manager):
@pytest.mark.requires_docker
def test_mcp_start(cli_runner, patched_config_manager, mock_container_manager):
def test_mcp_start(cli_runner, isolate_cubbi_config):
"""Test starting an MCP server."""
mcp_manager = isolate_cubbi_config["mcp_manager"]
# Add a Docker MCP
patched_config_manager.set(
isolate_cubbi_config["user_config"].set(
"mcps",
[
{
@@ -365,25 +367,30 @@ def test_mcp_start(cli_runner, patched_config_manager, mock_container_manager):
],
)
# Mock the start operation
mock_container_manager.start_mcp.return_value = {
"container_id": "test-container-id",
"status": "running",
}
# Mock the start_mcp method to avoid actual Docker operations
with patch.object(
mcp_manager,
"start_mcp",
return_value={
"container_id": "test-container-id",
"status": "running",
},
):
# Start the MCP
result = cli_runner.invoke(app, ["mcp", "start", "test-docker-mcp"])
# Start the MCP
result = cli_runner.invoke(app, ["mcp", "start", "test-docker-mcp"])
assert result.exit_code == 0
assert "Started MCP server" in result.stdout
assert "test-docker-mcp" in result.stdout
assert result.exit_code == 0
assert "Started MCP server" in result.stdout
assert "test-docker-mcp" in result.stdout
@pytest.mark.requires_docker
def test_mcp_stop(cli_runner, patched_config_manager, mock_container_manager):
def test_mcp_stop(cli_runner, isolate_cubbi_config):
"""Test stopping an MCP server."""
mcp_manager = isolate_cubbi_config["mcp_manager"]
# Add a Docker MCP
patched_config_manager.set(
isolate_cubbi_config["user_config"].set(
"mcps",
[
{
@@ -395,22 +402,23 @@ def test_mcp_stop(cli_runner, patched_config_manager, mock_container_manager):
],
)
# Mock the stop operation
mock_container_manager.stop_mcp.return_value = True
# Mock the stop_mcp method to avoid actual Docker operations
with patch.object(mcp_manager, "stop_mcp", return_value=True):
# Stop the MCP
result = cli_runner.invoke(app, ["mcp", "stop", "test-docker-mcp"])
# Stop the MCP
result = cli_runner.invoke(app, ["mcp", "stop", "test-docker-mcp"])
assert result.exit_code == 0
assert "Stopped and removed MCP server" in result.stdout
assert "test-docker-mcp" in result.stdout
assert result.exit_code == 0
assert "Stopped and removed MCP server" in result.stdout
assert "test-docker-mcp" in result.stdout
@pytest.mark.requires_docker
def test_mcp_restart(cli_runner, patched_config_manager, mock_container_manager):
def test_mcp_restart(cli_runner, isolate_cubbi_config):
"""Test restarting an MCP server."""
mcp_manager = isolate_cubbi_config["mcp_manager"]
# Add a Docker MCP
patched_config_manager.set(
isolate_cubbi_config["user_config"].set(
"mcps",
[
{
@@ -422,18 +430,21 @@ def test_mcp_restart(cli_runner, patched_config_manager, mock_container_manager)
],
)
# Mock the restart operation
mock_container_manager.restart_mcp.return_value = {
"container_id": "test-container-id",
"status": "running",
}
# Mock the restart_mcp method to avoid actual Docker operations
with patch.object(
mcp_manager,
"restart_mcp",
return_value={
"container_id": "test-container-id",
"status": "running",
},
):
# Restart the MCP
result = cli_runner.invoke(app, ["mcp", "restart", "test-docker-mcp"])
# Restart the MCP
result = cli_runner.invoke(app, ["mcp", "restart", "test-docker-mcp"])
assert result.exit_code == 0
assert "Restarted MCP server" in result.stdout
assert "test-docker-mcp" in result.stdout
assert result.exit_code == 0
assert "Restarted MCP server" in result.stdout
assert "test-docker-mcp" in result.stdout
@pytest.mark.requires_docker

View File

@@ -83,7 +83,9 @@ def test_session_close(cli_runner, mock_container_manager):
assert result.exit_code == 0
assert "closed successfully" in result.stdout
mock_container_manager.close_session.assert_called_once_with("test-session-id")
mock_container_manager.close_session.assert_called_once_with(
"test-session-id", kill=False
)
def test_session_close_all(cli_runner, mock_container_manager):
@@ -113,6 +115,197 @@ def test_session_close_all(cli_runner, mock_container_manager):
mock_container_manager.close_all_sessions.assert_called_once()
def test_session_create_with_ports(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session creation with port forwarding."""
from cubbi.models import Session, SessionStatus
# Mock the create_session to return a session with ports
mock_session = Session(
id="test-session-id",
name="test-session",
image="goose",
status=SessionStatus.RUNNING,
ports={8000: 32768, 3000: 32769},
)
mock_container_manager.create_session.return_value = mock_session
result = cli_runner.invoke(app, ["session", "create", "--port", "8000,3000"])
assert result.exit_code == 0
assert "Session created successfully" in result.stdout
assert "Forwarding ports: 8000, 3000" in result.stdout
# Verify create_session was called with correct ports
mock_container_manager.create_session.assert_called_once()
call_args = mock_container_manager.create_session.call_args
assert call_args.kwargs["ports"] == [8000, 3000]
def test_session_create_with_default_ports(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session creation using default ports."""
from cubbi.models import Session, SessionStatus
# Set up default ports
patched_config_manager.set("defaults.ports", [8080, 9000])
# Mock the create_session to return a session with ports
mock_session = Session(
id="test-session-id",
name="test-session",
image="goose",
status=SessionStatus.RUNNING,
ports={8080: 32768, 9000: 32769},
)
mock_container_manager.create_session.return_value = mock_session
result = cli_runner.invoke(app, ["session", "create"])
assert result.exit_code == 0
assert "Session created successfully" in result.stdout
assert "Forwarding ports: 8080, 9000" in result.stdout
# Verify create_session was called with default ports
mock_container_manager.create_session.assert_called_once()
call_args = mock_container_manager.create_session.call_args
assert call_args.kwargs["ports"] == [8080, 9000]
def test_session_create_combine_default_and_custom_ports(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session creation combining default and custom ports."""
from cubbi.models import Session, SessionStatus
# Set up default ports
patched_config_manager.set("defaults.ports", [8080])
# Mock the create_session to return a session with combined ports
mock_session = Session(
id="test-session-id",
name="test-session",
image="goose",
status=SessionStatus.RUNNING,
ports={8080: 32768, 3000: 32769},
)
mock_container_manager.create_session.return_value = mock_session
result = cli_runner.invoke(app, ["session", "create", "--port", "3000"])
assert result.exit_code == 0
assert "Session created successfully" in result.stdout
# Ports should be combined and deduplicated
assert "Forwarding ports:" in result.stdout
# Verify create_session was called with combined ports
mock_container_manager.create_session.assert_called_once()
call_args = mock_container_manager.create_session.call_args
# Should contain both default (8080) and custom (3000) ports
assert set(call_args.kwargs["ports"]) == {8080, 3000}
def test_session_create_invalid_port_format(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session creation with invalid port format."""
result = cli_runner.invoke(app, ["session", "create", "--port", "invalid"])
assert result.exit_code == 0
assert "Warning: Ignoring invalid port format" in result.stdout
# Session creation should continue with empty ports list (invalid port ignored)
mock_container_manager.create_session.assert_called_once()
call_args = mock_container_manager.create_session.call_args
assert call_args.kwargs["ports"] == [] # Invalid port should be ignored
def test_session_create_invalid_port_range(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session creation with port outside valid range."""
result = cli_runner.invoke(app, ["session", "create", "--port", "70000"])
assert result.exit_code == 0
assert "Error: Invalid ports [70000]" in result.stdout
# Session creation should not happen due to early return
mock_container_manager.create_session.assert_not_called()
def test_session_list_shows_ports(cli_runner, mock_container_manager):
"""Test that session list shows port mappings."""
from cubbi.models import Session, SessionStatus
mock_session = Session(
id="test-session-id",
name="test-session",
image="goose",
status=SessionStatus.RUNNING,
ports={8000: 32768, 3000: 32769},
)
mock_container_manager.list_sessions.return_value = [mock_session]
result = cli_runner.invoke(app, ["session", "list"])
assert result.exit_code == 0
assert "8000:32768" in result.stdout
assert "3000:32769" in result.stdout
def test_session_close_with_kill_flag(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session close with --kill flag."""
result = cli_runner.invoke(app, ["session", "close", "test-session-id", "--kill"])
assert result.exit_code == 0
# Verify close_session was called with kill=True
mock_container_manager.close_session.assert_called_once_with(
"test-session-id", kill=True
)
def test_session_close_all_with_kill_flag(
cli_runner, mock_container_manager, patched_config_manager
):
"""Test session close --all with --kill flag."""
from cubbi.models import Session, SessionStatus
# Mock some sessions to close
mock_sessions = [
Session(
id="session-1",
name="Session 1",
image="goose",
status=SessionStatus.RUNNING,
ports={},
),
Session(
id="session-2",
name="Session 2",
image="goose",
status=SessionStatus.RUNNING,
ports={},
),
]
mock_container_manager.list_sessions.return_value = mock_sessions
mock_container_manager.close_all_sessions.return_value = (2, True)
result = cli_runner.invoke(app, ["session", "close", "--all", "--kill"])
assert result.exit_code == 0
assert "2 sessions closed successfully" in result.stdout
# Verify close_all_sessions was called with kill=True
mock_container_manager.close_all_sessions.assert_called_once()
call_args = mock_container_manager.close_all_sessions.call_args
assert call_args.kwargs["kill"] is True
# For more complex tests that need actual Docker,
# we've implemented them in test_integration_docker.py
# They will run automatically if Docker is available

2
uv.lock generated
View File

@@ -78,7 +78,7 @@ wheels = [
[[package]]
name = "cubbi"
version = "0.3.0"
version = "0.4.0"
source = { editable = "." }
dependencies = [
{ name = "docker" },