refactor: deep clean plugins (#31)

* refactor: deep clean plugins

* refactor: modernize plugin system with Python 3.12+ typing and simplified discovery

- Update typing to Python 3.12+ style (Dict->dict, Optional->union types)
- Simplify plugin discovery using PLUGIN_CLASS exports instead of dir() reflection
- Add public get_user_ids() and set_ownership() functions in cubbi_init
- Add create_directory_with_ownership() helper method to ToolPlugin base class
- Replace initialize() + integrate_mcp_servers() pattern with unified configure()
- Add is_already_configured() checks to prevent overwriting existing configs
- Remove excessive comments and clean up code structure
- All 5 plugins updated: goose, opencode, claudecode, aider, crush

* fix: remove duplicate
This commit is contained in:
2025-08-07 12:10:22 -06:00
committed by GitHub
parent a709071d10
commit 3a7b9213b0
6 changed files with 176 additions and 404 deletions

View File

@@ -3,9 +3,8 @@
import os
import stat
from pathlib import Path
from typing import Dict
from cubbi_init import ToolPlugin, cubbi_config
from cubbi_init import ToolPlugin, cubbi_config, set_ownership
class AiderPlugin(ToolPlugin):
@@ -13,16 +12,6 @@ class AiderPlugin(ToolPlugin):
def tool_name(self) -> str:
return "aider"
def _get_user_ids(self) -> tuple[int, int]:
return cubbi_config.user.uid, cubbi_config.user.gid
def _set_ownership(self, path: Path) -> None:
user_id, group_id = self._get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError as e:
self.status.log(f"Failed to set ownership for {path}: {e}", "WARNING")
def _get_aider_config_dir(self) -> Path:
return Path("/home/cubbi/.aider")
@@ -33,28 +22,23 @@ class AiderPlugin(ToolPlugin):
config_dir = self._get_aider_config_dir()
cache_dir = self._get_aider_cache_dir()
# Create directories
for directory in [config_dir, cache_dir]:
try:
directory.mkdir(mode=0o755, parents=True, exist_ok=True)
self._set_ownership(directory)
except OSError as e:
self.status.log(
f"Failed to create Aider directory {directory}: {e}", "ERROR"
)
self.create_directory_with_ownership(config_dir)
self.create_directory_with_ownership(cache_dir)
return config_dir, cache_dir
def initialize(self) -> bool:
def is_already_configured(self) -> bool:
config_dir = self._get_aider_config_dir()
env_file = config_dir / ".env"
return env_file.exists()
def configure(self) -> bool:
self.status.log("Setting up Aider configuration...")
# Ensure Aider directories exist
config_dir, cache_dir = self._ensure_aider_dirs()
# Set up environment variables for the session
env_vars = self._create_environment_config()
# Create .env file if we have API keys
if env_vars:
env_file = config_dir / ".env"
success = self._write_env_file(env_file, env_vars)
@@ -71,22 +55,26 @@ class AiderPlugin(ToolPlugin):
"INFO",
)
# Always return True to allow container to start
if not cubbi_config.mcps:
self.status.log("No MCP servers to integrate")
return True
self.status.log(
f"Found {len(cubbi_config.mcps)} MCP server(s) - no direct integration available for Aider"
)
return True
def _create_environment_config(self) -> Dict[str, str]:
def _create_environment_config(self) -> dict[str, str]:
env_vars = {}
# Configure Aider with the default model from cubbi config
provider_config = cubbi_config.get_provider_for_default_model()
if provider_config and cubbi_config.defaults.model:
_, model_name = cubbi_config.defaults.model.split("/", 1)
# Set the model for Aider
env_vars["AIDER_MODEL"] = model_name
self.status.log(f"Set Aider model to {model_name}")
# Set provider-specific API key and configuration
if provider_config.type == "anthropic":
env_vars["AIDER_ANTHROPIC_API_KEY"] = provider_config.api_key
self.status.log("Configured Anthropic API key for Aider")
@@ -100,10 +88,7 @@ class AiderPlugin(ToolPlugin):
)
self.status.log("Configured OpenAI API key for Aider")
# Note: Aider uses different environment variable names for some providers
# We map cubbi provider types to Aider's expected variable names
elif provider_config.type == "google":
# Aider may expect GEMINI_API_KEY for Google models
env_vars["GEMINI_API_KEY"] = provider_config.api_key
self.status.log("Configured Google/Gemini API key for Aider")
@@ -122,7 +107,6 @@ class AiderPlugin(ToolPlugin):
"WARNING",
)
# Fallback to legacy environment variable checking for backward compatibility
api_key_mappings = {
"OPENAI_API_KEY": "AIDER_OPENAI_API_KEY",
"ANTHROPIC_API_KEY": "AIDER_ANTHROPIC_API_KEY",
@@ -138,7 +122,6 @@ class AiderPlugin(ToolPlugin):
provider = env_var.replace("_API_KEY", "").lower()
self.status.log(f"Added {provider} API key from environment")
# Check for OpenAI API base URL from legacy environment
openai_url = os.environ.get("OPENAI_URL")
if openai_url:
env_vars["AIDER_OPENAI_API_BASE"] = openai_url
@@ -146,17 +129,14 @@ class AiderPlugin(ToolPlugin):
f"Set OpenAI API base URL to {openai_url} from environment"
)
# Legacy model configuration
model = os.environ.get("AIDER_MODEL")
if model:
env_vars["AIDER_MODEL"] = model
self.status.log(f"Set model to {model} from environment")
# Handle additional API keys from AIDER_API_KEYS
additional_keys = os.environ.get("AIDER_API_KEYS")
if additional_keys:
try:
# Parse format: "provider1=key1,provider2=key2"
for pair in additional_keys.split(","):
if "=" in pair:
provider, key = pair.strip().split("=", 1)
@@ -166,17 +146,14 @@ class AiderPlugin(ToolPlugin):
except Exception as e:
self.status.log(f"Failed to parse AIDER_API_KEYS: {e}", "WARNING")
# Add git configuration
auto_commits = os.environ.get("AIDER_AUTO_COMMITS", "true")
if auto_commits.lower() in ["true", "false"]:
env_vars["AIDER_AUTO_COMMITS"] = auto_commits
# Add dark mode setting
dark_mode = os.environ.get("AIDER_DARK_MODE", "false")
if dark_mode.lower() in ["true", "false"]:
env_vars["AIDER_DARK_MODE"] = dark_mode
# Add proxy settings
for proxy_var in ["HTTP_PROXY", "HTTPS_PROXY"]:
value = os.environ.get(proxy_var)
if value:
@@ -185,7 +162,7 @@ class AiderPlugin(ToolPlugin):
return env_vars
def _write_env_file(self, env_file: Path, env_vars: Dict[str, str]) -> bool:
def _write_env_file(self, env_file: Path, env_vars: dict[str, str]) -> bool:
try:
content = "\n".join(f"{key}={value}" for key, value in env_vars.items())
@@ -193,8 +170,7 @@ class AiderPlugin(ToolPlugin):
f.write(content)
f.write("\n")
# Set ownership and secure file permissions (read/write for owner only)
self._set_ownership(env_file)
set_ownership(env_file)
os.chmod(env_file, stat.S_IRUSR | stat.S_IWUSR)
self.status.log(f"Created Aider environment file at {env_file}")
@@ -203,18 +179,5 @@ class AiderPlugin(ToolPlugin):
self.status.log(f"Failed to write Aider environment file: {e}", "ERROR")
return False
def setup_tool_configuration(self) -> bool:
# Additional tool configuration can be added here if needed
return True
def integrate_mcp_servers(self) -> bool:
if not cubbi_config.mcps:
self.status.log("No MCP servers to integrate")
return True
# Aider doesn't have native MCP support like Claude Code,
# but we could potentially add custom integrations here
self.status.log(
f"Found {len(cubbi_config.mcps)} MCP server(s) - no direct integration available for Aider"
)
return True
PLUGIN_CLASS = AiderPlugin

View File

@@ -4,9 +4,8 @@ import json
import os
import stat
from pathlib import Path
from typing import Dict, Optional
from cubbi_init import ToolPlugin, cubbi_config
from cubbi_init import ToolPlugin, cubbi_config, set_ownership
class ClaudeCodePlugin(ToolPlugin):
@@ -14,39 +13,19 @@ class ClaudeCodePlugin(ToolPlugin):
def tool_name(self) -> str:
return "claudecode"
def _get_user_ids(self) -> tuple[int, int]:
return cubbi_config.user.uid, cubbi_config.user.gid
def _set_ownership(self, path: Path) -> None:
user_id, group_id = self._get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError as e:
self.status.log(f"Failed to set ownership for {path}: {e}", "WARNING")
def _get_claude_dir(self) -> Path:
return Path("/home/cubbi/.claude")
def _ensure_claude_dir(self) -> Path:
claude_dir = self._get_claude_dir()
def is_already_configured(self) -> bool:
settings_file = self._get_claude_dir() / "settings.json"
return settings_file.exists()
try:
claude_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
self._set_ownership(claude_dir)
except OSError as e:
self.status.log(
f"Failed to create Claude directory {claude_dir}: {e}", "ERROR"
)
return claude_dir
def initialize(self) -> bool:
def configure(self) -> bool:
self.status.log("Setting up Claude Code authentication...")
# Ensure Claude directory exists
claude_dir = self._ensure_claude_dir()
claude_dir = self.create_directory_with_ownership(self._get_claude_dir())
claude_dir.chmod(0o700)
# Create settings configuration
settings = self._create_settings()
if settings:
@@ -54,6 +33,7 @@ class ClaudeCodePlugin(ToolPlugin):
success = self._write_settings(settings_file, settings)
if success:
self.status.log("✅ Claude Code authentication configured successfully")
self._integrate_mcp_servers()
return True
else:
return False
@@ -63,14 +43,19 @@ class ClaudeCodePlugin(ToolPlugin):
" Please set ANTHROPIC_API_KEY environment variable", "WARNING"
)
self.status.log(" Claude Code will run without authentication", "INFO")
# Return True to allow container to start without API key
# Users can still use Claude Code with their own authentication methods
self._integrate_mcp_servers()
return True
def _create_settings(self) -> Optional[Dict]:
def _integrate_mcp_servers(self) -> None:
if not cubbi_config.mcps:
self.status.log("No MCP servers to integrate")
return
self.status.log("MCP server integration available for Claude Code")
def _create_settings(self) -> dict | None:
settings = {}
# Get Anthropic provider configuration from cubbi_config
anthropic_provider = None
for provider_name, provider_config in cubbi_config.providers.items():
if provider_config.type == "anthropic":
@@ -78,7 +63,6 @@ class ClaudeCodePlugin(ToolPlugin):
break
if not anthropic_provider or not anthropic_provider.api_key:
# Fallback to environment variable for backward compatibility
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
return None
@@ -86,30 +70,25 @@ class ClaudeCodePlugin(ToolPlugin):
else:
settings["apiKey"] = anthropic_provider.api_key
# Custom authorization token (optional) - still from environment
auth_token = os.environ.get("ANTHROPIC_AUTH_TOKEN")
if auth_token:
settings["authToken"] = auth_token
# Custom headers (optional) - still from environment
custom_headers = os.environ.get("ANTHROPIC_CUSTOM_HEADERS")
if custom_headers:
try:
# Expect JSON string format
settings["customHeaders"] = json.loads(custom_headers)
except json.JSONDecodeError:
self.status.log(
"⚠️ Invalid ANTHROPIC_CUSTOM_HEADERS format, skipping", "WARNING"
)
# Enterprise integration settings - still from environment
if os.environ.get("CLAUDE_CODE_USE_BEDROCK") == "true":
settings["provider"] = "bedrock"
if os.environ.get("CLAUDE_CODE_USE_VERTEX") == "true":
settings["provider"] = "vertex"
# Network proxy settings - still from environment
http_proxy = os.environ.get("HTTP_PROXY")
https_proxy = os.environ.get("HTTPS_PROXY")
if http_proxy or https_proxy:
@@ -119,11 +98,9 @@ class ClaudeCodePlugin(ToolPlugin):
if https_proxy:
settings["proxy"]["https"] = https_proxy
# Telemetry settings - still from environment
if os.environ.get("DISABLE_TELEMETRY") == "true":
settings["telemetry"] = {"enabled": False}
# Tool permissions (allow all by default in Cubbi environment)
settings["permissions"] = {
"tools": {
"read": {"allowed": True},
@@ -137,14 +114,12 @@ class ClaudeCodePlugin(ToolPlugin):
return settings
def _write_settings(self, settings_file: Path, settings: Dict) -> bool:
def _write_settings(self, settings_file: Path, settings: dict) -> bool:
try:
# Write settings with secure permissions
with open(settings_file, "w") as f:
json.dump(settings, f, indent=2)
# Set ownership and secure file permissions (read/write for owner only)
self._set_ownership(settings_file)
set_ownership(settings_file)
os.chmod(settings_file, stat.S_IRUSR | stat.S_IWUSR)
self.status.log(f"Created Claude Code settings at {settings_file}")
@@ -153,16 +128,5 @@ class ClaudeCodePlugin(ToolPlugin):
self.status.log(f"Failed to write Claude Code settings: {e}", "ERROR")
return False
def setup_tool_configuration(self) -> bool:
# Additional tool configuration can be added here if needed
return True
def integrate_mcp_servers(self) -> bool:
if not cubbi_config.mcps:
self.status.log("No MCP servers to integrate")
return True
# Claude Code has built-in MCP support, so we can potentially
# configure MCP servers in the settings if needed
self.status.log("MCP server integration available for Claude Code")
return True
PLUGIN_CLASS = ClaudeCodePlugin

View File

@@ -1,13 +1,11 @@
#!/usr/bin/env python3
import json
import os
from pathlib import Path
from typing import Any, Dict
from typing import Any
from cubbi_init import ToolPlugin, cubbi_config
from cubbi_init import ToolPlugin, cubbi_config, set_ownership
# Standard providers that Crush supports natively
STANDARD_PROVIDERS = ["anthropic", "openai", "google", "openrouter"]
@@ -16,24 +14,19 @@ class CrushPlugin(ToolPlugin):
def tool_name(self) -> str:
return "crush"
def _get_user_ids(self) -> tuple[int, int]:
return cubbi_config.user.uid, cubbi_config.user.gid
def _set_ownership(self, path: Path) -> None:
user_id, group_id = self._get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError as e:
self.status.log(f"Failed to set ownership for {path}: {e}", "WARNING")
def _get_user_config_path(self) -> Path:
return Path("/home/cubbi/.config/crush")
def is_already_configured(self) -> bool:
config_file = self._get_user_config_path() / "crush.json"
return config_file.exists()
def configure(self) -> bool:
return self._setup_tool_configuration() and self._integrate_mcp_servers()
def _map_provider_to_crush_format(
self, provider_name: str, provider_config, is_default_provider: bool = False
) -> Dict[str, Any] | None:
"""Map cubbi provider configuration to crush provider format"""
) -> dict[str, Any] | None:
if not provider_config.base_url:
if provider_config.type in STANDARD_PROVIDERS:
provider_entry = {
@@ -41,16 +34,13 @@ class CrushPlugin(ToolPlugin):
}
return provider_entry
# Custom provider - include base_url and name
provider_entry = {
"api_key": provider_config.api_key,
"base_url": provider_config.base_url,
"models": [],
}
# Add name and type for custom providers
if provider_config.type in STANDARD_PROVIDERS:
# Standard provider with custom URL - determine type and name
if provider_config.type == "anthropic":
provider_entry["type"] = "anthropic"
elif provider_config.type == "openai":
@@ -59,47 +49,15 @@ class CrushPlugin(ToolPlugin):
provider_entry["type"] = "gemini"
elif provider_config.type == "openrouter":
provider_entry["type"] = "openai"
# Set name format as 'provider_name (type)'
provider_entry["name"] = f"{provider_name} ({provider_config.type})"
else:
# Non-standard provider with custom URL
provider_entry["type"] = "openai"
provider_entry["name"] = f"{provider_name} ({provider_config.type})"
return provider_entry
def _ensure_user_config_dir(self) -> Path:
config_dir = self._get_user_config_path()
# Create the full directory path
try:
config_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# Directory already exists, which is fine
pass
except OSError as e:
self.status.log(
f"Failed to create config directory {config_dir}: {e}", "ERROR"
)
return config_dir
# Set ownership for the directories
config_parent = config_dir.parent
if config_parent.exists():
self._set_ownership(config_parent)
if config_dir.exists():
self._set_ownership(config_dir)
return config_dir
def initialize(self) -> bool:
self._ensure_user_config_dir()
return self.setup_tool_configuration()
def setup_tool_configuration(self) -> bool:
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
def _setup_tool_configuration(self) -> bool:
config_dir = self.create_directory_with_ownership(self._get_user_config_path())
if not config_dir.exists():
self.status.log(
f"Config directory {config_dir} does not exist and could not be created",
@@ -109,15 +67,12 @@ class CrushPlugin(ToolPlugin):
config_file = config_dir / "crush.json"
# Initialize Crush configuration with schema
config_data = {"$schema": "https://charm.land/crush.json", "providers": {}}
# Determine the default provider from the default model
default_provider_name = None
if cubbi_config.defaults.model:
default_provider_name = cubbi_config.defaults.model.split("/", 1)[0]
# Get all configured providers using the new provider system
self.status.log(
f"Found {len(cubbi_config.providers)} configured providers for Crush"
)
@@ -128,7 +83,6 @@ class CrushPlugin(ToolPlugin):
provider_name, provider_config, is_default_provider
)
if crush_provider:
# Translate google provider name to gemini for crush configuration
crush_provider_name = (
"gemini" if provider_config.type == "google" else provider_name
)
@@ -145,14 +99,12 @@ class CrushPlugin(ToolPlugin):
}
self.status.log(f"Set default model to {cubbi_config.defaults.model}")
# add model to the crush provider only if custom
provider = cubbi_config.providers.get(provider_part)
if provider and provider.base_url:
config_data["providers"][provider_part]["models"].append(
{"id": model_part, "name": model_part}
)
# Only write config if we have providers configured
if not config_data["providers"]:
self.status.log(
"No providers configured, skipping Crush configuration file creation"
@@ -163,8 +115,7 @@ class CrushPlugin(ToolPlugin):
with config_file.open("w") as f:
json.dump(config_data, f, indent=2)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
set_ownership(config_file)
self.status.log(
f"Created Crush configuration at {config_file} with {len(config_data['providers'])} providers"
@@ -174,13 +125,12 @@ class CrushPlugin(ToolPlugin):
self.status.log(f"Failed to write Crush configuration: {e}", "ERROR")
return False
def integrate_mcp_servers(self) -> bool:
def _integrate_mcp_servers(self) -> bool:
if not cubbi_config.mcps:
self.status.log("No MCP servers to integrate")
return True
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
config_dir = self.create_directory_with_ownership(self._get_user_config_path())
if not config_dir.exists():
self.status.log(
f"Config directory {config_dir} does not exist and could not be created",
@@ -203,7 +153,6 @@ class CrushPlugin(ToolPlugin):
else:
config_data = {"$schema": "https://charm.land/crush.json", "providers": {}}
# Crush uses "mcps" field for MCP server configuration
if "mcps" not in config_data:
config_data["mcps"] = {}
@@ -229,8 +178,7 @@ class CrushPlugin(ToolPlugin):
with config_file.open("w") as f:
json.dump(config_data, f, indent=2)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
set_ownership(config_file)
self.status.log(
f"Integrated {len(cubbi_config.mcps)} MCP servers into Crush configuration"
@@ -239,3 +187,6 @@ class CrushPlugin(ToolPlugin):
except Exception as e:
self.status.log(f"Failed to integrate MCP servers: {e}", "ERROR")
return False
PLUGIN_CLASS = CrushPlugin

View File

@@ -19,7 +19,7 @@ import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any
from pydantic import BaseModel
from ruamel.yaml import YAML
@@ -31,9 +31,9 @@ class UserConfig(BaseModel):
class ProjectConfig(BaseModel):
url: Optional[str] = None
config_dir: Optional[str] = None
image_config_dir: Optional[str] = None
url: str | None = None
config_dir: str | None = None
image_config_dir: str | None = None
class PersistentLink(BaseModel):
@@ -45,20 +45,20 @@ class PersistentLink(BaseModel):
class ProviderConfig(BaseModel):
type: str
api_key: str
base_url: Optional[str] = None
base_url: str | None = None
class MCPConfig(BaseModel):
name: str
type: str
host: Optional[str] = None
port: Optional[int] = None
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
host: str | None = None
port: int | None = None
url: str | None = None
headers: dict[str, str] | None = None
class DefaultsConfig(BaseModel):
model: Optional[str] = None
model: str | None = None
class SSHConfig(BaseModel):
@@ -66,21 +66,18 @@ class SSHConfig(BaseModel):
class CubbiConfig(BaseModel):
"""Central configuration for container using Pydantic BaseModel"""
version: str = "1.0"
user: UserConfig = UserConfig()
providers: Dict[str, ProviderConfig] = {}
mcps: List[MCPConfig] = []
providers: dict[str, ProviderConfig] = {}
mcps: list[MCPConfig] = []
project: ProjectConfig = ProjectConfig()
persistent_links: List[PersistentLink] = []
persistent_links: list[PersistentLink] = []
defaults: DefaultsConfig = DefaultsConfig()
ssh: SSHConfig = SSHConfig()
run_command: Optional[str] = None
run_command: str | None = None
no_shell: bool = False
def get_provider_for_default_model(self) -> Optional[ProviderConfig]:
"""Get the provider config for the default model"""
def get_provider_for_default_model(self) -> ProviderConfig | None:
if not self.defaults.model or "/" not in self.defaults.model:
return None
@@ -89,7 +86,6 @@ class CubbiConfig(BaseModel):
def load_cubbi_config() -> CubbiConfig:
"""Load configuration from file or return default"""
config_path = Path("/cubbi/config.yaml")
if not config_path.exists():
return CubbiConfig()
@@ -104,9 +100,19 @@ def load_cubbi_config() -> CubbiConfig:
cubbi_config = load_cubbi_config()
class StatusManager:
"""Manages initialization status and logging"""
def get_user_ids() -> tuple[int, int]:
return cubbi_config.user.uid, cubbi_config.user.gid
def set_ownership(path: Path) -> None:
user_id, group_id = get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError:
pass
class StatusManager:
def __init__(
self, log_file: str = "/cubbi/init.log", status_file: str = "/cubbi/init.status"
):
@@ -127,17 +133,14 @@ class StatusManager:
f.flush()
def set_status(self, complete: bool) -> None:
"""Set initialization completion status"""
status = "true" if complete else "false"
with open(self.status_file, "w") as f:
f.write(f"INIT_COMPLETE={status}\n")
def start_initialization(self) -> None:
"""Mark initialization as started"""
self.set_status(False)
def complete_initialization(self) -> None:
"""Mark initialization as completed"""
self.set_status(True)
@@ -156,16 +159,14 @@ class ImageConfig:
version: str
maintainer: str
image: str
persistent_configs: List[PersistentConfig] = field(default_factory=list)
environments_to_forward: List[str] = field(default_factory=list)
persistent_configs: list[PersistentConfig] = field(default_factory=list)
environments_to_forward: list[str] = field(default_factory=list)
class ConfigParser:
"""Parses Cubbi image configuration and environment variables"""
def __init__(self, config_file: str = "/cubbi/cubbi_image.yaml"):
self.config_file = Path(config_file)
self.environment: Dict[str, str] = dict(os.environ)
self.environment: dict[str, str] = dict(os.environ)
def load_image_config(self) -> ImageConfig:
if not self.config_file.exists():
@@ -191,8 +192,6 @@ class ConfigParser:
class UserManager:
"""Manages user and group creation/modification in containers"""
def __init__(self, status: StatusManager):
self.status = status
self.username = "cubbi"
@@ -213,7 +212,6 @@ class UserManager:
f"Setting up user '{self.username}' with UID: {user_id}, GID: {group_id}"
)
# Handle group creation/modification
try:
existing_group = grp.getgrnam(self.username)
if existing_group.gr_gid != group_id:
@@ -228,7 +226,6 @@ class UserManager:
if not self._run_command(["groupadd", "-g", str(group_id), self.username]):
return False
# Handle user creation/modification
try:
existing_user = pwd.getpwnam(self.username)
if existing_user.pw_uid != user_id or existing_user.pw_gid != group_id:
@@ -262,7 +259,6 @@ class UserManager:
):
return False
# Create the sudoers file entry for the 'cubbi' user
sudoers_command = [
"sh",
"-c",
@@ -276,8 +272,6 @@ class UserManager:
class DirectoryManager:
"""Manages directory creation and permission setup"""
def __init__(self, status: StatusManager):
self.status = status
@@ -369,8 +363,6 @@ class DirectoryManager:
class ConfigManager:
"""Manages persistent configuration symlinks and mappings"""
def __init__(self, status: StatusManager):
self.status = status
@@ -420,7 +412,7 @@ class ConfigManager:
return False
def setup_persistent_configs(
self, persistent_configs: List[PersistentConfig], user_id: int, group_id: int
self, persistent_configs: list[PersistentConfig], user_id: int, group_id: int
) -> bool:
if not persistent_configs:
self.status.log("No persistent configurations defined in image config")
@@ -449,13 +441,11 @@ class ConfigManager:
class CommandManager:
"""Manages command execution and user switching"""
def __init__(self, status: StatusManager):
self.status = status
self.username = "cubbi"
def run_as_user(self, command: List[str], user: str = None) -> int:
def run_as_user(self, command: list[str], user: str = None) -> int:
if user is None:
user = self.username
@@ -476,7 +466,7 @@ class CommandManager:
self.status.log(f"Executing user command: {command}")
return self.run_as_user(["sh", "-c", command])
def exec_as_user(self, args: List[str]) -> None:
def exec_as_user(self, args: list[str]) -> None:
if not args:
args = ["tail", "-f", "/dev/null"]
@@ -492,9 +482,7 @@ class CommandManager:
class ToolPlugin(ABC):
"""Base class for tool-specific initialization plugins"""
def __init__(self, status: StatusManager, config: Dict[str, Any]):
def __init__(self, status: StatusManager, config: dict[str, Any]):
self.status = status
self.config = config
@@ -503,14 +491,30 @@ class ToolPlugin(ABC):
def tool_name(self) -> str:
pass
def create_directory_with_ownership(self, path: Path) -> Path:
try:
path.mkdir(parents=True, exist_ok=True)
set_ownership(path)
# Also set ownership on parent directories if they were created
parent = path.parent
if parent.exists() and parent != Path("/"):
set_ownership(parent)
except OSError as e:
self.status.log(f"Failed to create directory {path}: {e}", "ERROR")
return path
@abstractmethod
def initialize(self) -> bool:
def is_already_configured(self) -> bool:
pass
def integrate_mcp_servers(self, mcp_config: Dict[str, Any]) -> bool:
return True
@abstractmethod
def configure(self) -> bool:
pass
def get_resolved_model(self) -> Dict[str, Any] | None:
def get_resolved_model(self) -> dict[str, Any] | None:
model_spec = os.environ.get("CUBBI_MODEL_SPEC", "")
if not model_spec:
return None
@@ -539,7 +543,7 @@ class ToolPlugin(ABC):
"model_spec": model_spec,
}
def get_provider_config(self, provider_name: str) -> Dict[str, str]:
def get_provider_config(self, provider_name: str) -> dict[str, str]:
provider_config = {}
# Map provider names to their environment variables
@@ -568,7 +572,7 @@ class ToolPlugin(ABC):
return provider_config
def get_all_providers_config(self) -> Dict[str, Dict[str, str]]:
def get_all_providers_config(self) -> dict[str, dict[str, str]]:
all_providers = {}
# Check for each standard provider
@@ -601,7 +605,7 @@ class CubbiInitializer:
self.config_manager = ConfigManager(self.status)
self.command_manager = CommandManager(self.status)
def run_initialization(self, final_args: List[str]) -> None:
def run_initialization(self, final_args: list[str]) -> None:
"""Run the complete initialization process"""
try:
self.status.start_initialization()
@@ -684,37 +688,26 @@ class CubbiInitializer:
plugin_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(plugin_module)
# Find the plugin class (should inherit from ToolPlugin)
plugin_class = None
for attr_name in dir(plugin_module):
attr = getattr(plugin_module, attr_name)
if (
isinstance(attr, type)
and hasattr(attr, "tool_name")
and hasattr(attr, "initialize")
and attr_name != "ToolPlugin"
): # Skip the base class
plugin_class = attr
break
if not plugin_class:
# Get the plugin class from the standard export variable
if not hasattr(plugin_module, "PLUGIN_CLASS"):
self.status.log(
f"No valid plugin class found in {plugin_file}", "ERROR"
f"No PLUGIN_CLASS variable found in {plugin_file}", "ERROR"
)
return False
plugin_class = plugin_module.PLUGIN_CLASS
# Instantiate and run the plugin
plugin = plugin_class(self.status, {"image_config": image_config})
self.status.log(f"Running {plugin.tool_name}-specific initialization")
if not plugin.initialize():
self.status.log(f"{plugin.tool_name} initialization failed", "ERROR")
return False
if not plugin.integrate_mcp_servers():
self.status.log(f"{plugin.tool_name} MCP integration failed", "ERROR")
return False
if not plugin.is_already_configured():
if not plugin.configure():
self.status.log(f"{plugin.tool_name} configuration failed", "ERROR")
return False
else:
self.status.log(f"{plugin.tool_name} is already configured, skipping")
return True

View File

@@ -3,7 +3,7 @@
import os
from pathlib import Path
from cubbi_init import ToolPlugin, cubbi_config
from cubbi_init import ToolPlugin, cubbi_config, set_ownership
from ruamel.yaml import YAML
@@ -12,62 +12,36 @@ class GoosePlugin(ToolPlugin):
def tool_name(self) -> str:
return "goose"
def _get_user_ids(self) -> tuple[int, int]:
return cubbi_config.user.uid, cubbi_config.user.gid
def is_already_configured(self) -> bool:
config_file = Path("/home/cubbi/.config/goose/config.yaml")
return config_file.exists()
def _set_ownership(self, path: Path) -> None:
user_id, group_id = self._get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError as e:
self.status.log(f"Failed to set ownership for {path}: {e}", "WARNING")
def configure(self) -> bool:
self._ensure_user_config_dir()
if not self.setup_tool_configuration():
return False
return self.integrate_mcp_servers()
def _get_user_config_path(self) -> Path:
return Path("/home/cubbi/.config/goose")
def _ensure_user_config_dir(self) -> Path:
config_dir = self._get_user_config_path()
# Create the full directory path
try:
config_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# Directory already exists, which is fine
pass
except OSError as e:
self.status.log(
f"Failed to create config directory {config_dir}: {e}", "ERROR"
)
return config_dir
# Set ownership for the directories
config_parent = config_dir.parent
if config_parent.exists():
self._set_ownership(config_parent)
if config_dir.exists():
self._set_ownership(config_dir)
return config_dir
return self.create_directory_with_ownership(config_dir)
def _write_env_vars_to_profile(self, env_vars: dict) -> None:
"""Write environment variables to shell profile for interactive sessions"""
try:
# Write to cubbi user's bash profile
profile_path = Path("/home/cubbi/.bashrc")
# Create cubbi env section marker
env_section_start = "# CUBBI GOOSE ENVIRONMENT VARIABLES"
env_section_end = "# END CUBBI GOOSE ENVIRONMENT VARIABLES"
# Read existing profile or create empty
if profile_path.exists():
with open(profile_path, "r") as f:
lines = f.readlines()
else:
lines = []
# Remove existing cubbi env section
new_lines = []
skip_section = False
for line in lines:
@@ -79,20 +53,17 @@ class GoosePlugin(ToolPlugin):
elif not skip_section:
new_lines.append(line)
# Add new env vars section
if env_vars:
new_lines.append(f"\n{env_section_start}\n")
for key, value in env_vars.items():
new_lines.append(f'export {key}="{value}"\n')
new_lines.append(f"{env_section_end}\n")
# Write updated profile
profile_path.parent.mkdir(parents=True, exist_ok=True)
with open(profile_path, "w") as f:
f.writelines(new_lines)
# Set ownership
self._set_ownership(profile_path)
set_ownership(profile_path)
self.status.log(
f"Updated shell profile with {len(env_vars)} environment variables"
@@ -103,12 +74,7 @@ class GoosePlugin(ToolPlugin):
f"Failed to write environment variables to profile: {e}", "ERROR"
)
def initialize(self) -> bool:
self._ensure_user_config_dir()
return self.setup_tool_configuration()
def setup_tool_configuration(self) -> bool:
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
if not config_dir.exists():
self.status.log(
@@ -189,8 +155,7 @@ class GoosePlugin(ToolPlugin):
with config_file.open("w") as f:
yaml.dump(config_data, f)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
set_ownership(config_file)
self.status.log(f"Updated Goose configuration at {config_file}")
return True
@@ -203,7 +168,6 @@ class GoosePlugin(ToolPlugin):
self.status.log("No MCP servers to integrate")
return True
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
if not config_dir.exists():
self.status.log(
@@ -256,10 +220,12 @@ class GoosePlugin(ToolPlugin):
with config_file.open("w") as f:
yaml.dump(config_data, f)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
set_ownership(config_file)
return True
except Exception as e:
self.status.log(f"Failed to integrate MCP servers: {e}", "ERROR")
return False
PLUGIN_CLASS = GoosePlugin

View File

@@ -4,10 +4,10 @@ import json
import os
from pathlib import Path
from cubbi_init import ToolPlugin, cubbi_config
from cubbi_init import ToolPlugin, cubbi_config, set_ownership
# Standard providers that OpenCode supports natively
STANDARD_PROVIDERS = ["anthropic", "openai", "google", "openrouter"]
STANDARD_PROVIDERS: list[str] = ["anthropic", "openai", "google", "openrouter"]
class OpencodePlugin(ToolPlugin):
@@ -15,92 +15,30 @@ class OpencodePlugin(ToolPlugin):
def tool_name(self) -> str:
return "opencode"
def _get_user_ids(self) -> tuple[int, int]:
return cubbi_config.user.uid, cubbi_config.user.gid
def _set_ownership(self, path: Path) -> None:
user_id, group_id = self._get_user_ids()
try:
os.chown(path, user_id, group_id)
except OSError as e:
self.status.log(f"Failed to set ownership for {path}: {e}", "WARNING")
def _get_user_config_path(self) -> Path:
return Path("/home/cubbi/.config/opencode")
def _get_user_data_path(self) -> Path:
return Path("/home/cubbi/.local/share/opencode")
def is_already_configured(self) -> bool:
config_file = self._get_user_config_path() / "config.json"
return config_file.exists()
def _ensure_user_config_dir(self) -> Path:
config_dir = self._get_user_config_path()
def configure(self) -> bool:
self.create_directory_with_ownership(self._get_user_config_path())
# Create the full directory path
try:
config_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# Directory already exists, which is fine
pass
except OSError as e:
self.status.log(
f"Failed to create config directory {config_dir}: {e}", "ERROR"
)
return config_dir
# Set ownership for the directories
config_parent = config_dir.parent
if config_parent.exists():
self._set_ownership(config_parent)
if config_dir.exists():
self._set_ownership(config_dir)
return config_dir
def _ensure_user_data_dir(self) -> Path:
data_dir = self._get_user_data_path()
# Create the full directory path
try:
data_dir.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# Directory already exists, which is fine
pass
except OSError as e:
self.status.log(f"Failed to create data directory {data_dir}: {e}", "ERROR")
return data_dir
# Set ownership for the directories
data_parent = data_dir.parent
if data_parent.exists():
self._set_ownership(data_parent)
if data_dir.exists():
self._set_ownership(data_dir)
return data_dir
def initialize(self) -> bool:
self._ensure_user_config_dir()
# Set up tool configuration with new provider format
config_success = self.setup_tool_configuration()
return config_success
def setup_tool_configuration(self) -> bool:
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
if not config_dir.exists():
self.status.log(
f"Config directory {config_dir} does not exist and could not be created",
"ERROR",
)
if not config_success:
return False
return self.integrate_mcp_servers()
def setup_tool_configuration(self) -> bool:
config_dir = self._get_user_config_path()
config_file = config_dir / "config.json"
# Initialize configuration with schema
config_data = {"$schema": "https://opencode.ai/config.json"}
config_data: dict[str, str | dict[str, dict[str, str | dict[str, str]]]] = {
"$schema": "https://opencode.ai/config.json"
}
# Set default theme to system
config_data["theme"] = "system"
@@ -113,7 +51,7 @@ class OpencodePlugin(ToolPlugin):
# Check if this is a custom provider (has baseURL)
if provider_config.base_url:
# Custom provider - include baseURL and name
provider_entry = {
provider_entry: dict[str, str | dict[str, str]] = {
"options": {
"apiKey": provider_config.api_key,
"baseURL": provider_config.base_url,
@@ -162,6 +100,8 @@ class OpencodePlugin(ToolPlugin):
self.status.log(f"Set default model to {config_data['model']}")
# Add the specific model only to the provider that matches the default model
provider_name: str
model_name: str
provider_name, model_name = cubbi_config.defaults.model.split("/", 1)
if provider_name in config_data["provider"]:
config_data["provider"][provider_name]["models"] = {
@@ -172,8 +112,8 @@ class OpencodePlugin(ToolPlugin):
)
else:
# Fallback to legacy environment variables
opencode_model = os.environ.get("CUBBI_MODEL")
opencode_provider = os.environ.get("CUBBI_PROVIDER")
opencode_model: str | None = os.environ.get("CUBBI_MODEL")
opencode_provider: str | None = os.environ.get("CUBBI_PROVIDER")
if opencode_model and opencode_provider:
config_data["model"] = f"{opencode_provider}/{opencode_model}"
@@ -199,8 +139,7 @@ class OpencodePlugin(ToolPlugin):
with config_file.open("w") as f:
json.dump(config_data, f, indent=2)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
set_ownership(config_file)
self.status.log(
f"Updated OpenCode configuration at {config_file} with {len(config_data.get('provider', {}))} providers"
@@ -215,22 +154,16 @@ class OpencodePlugin(ToolPlugin):
self.status.log("No MCP servers to integrate")
return True
# Ensure directory exists before writing
config_dir = self._ensure_user_config_dir()
if not config_dir.exists():
self.status.log(
f"Config directory {config_dir} does not exist and could not be created",
"ERROR",
)
return False
config_dir = self._get_user_config_path()
config_file = config_dir / "config.json"
if config_file.exists():
with config_file.open("r") as f:
config_data = json.load(f) or {}
config_data: dict[str, str | dict[str, dict[str, str]]] = (
json.load(f) or {}
)
else:
config_data = {}
config_data: dict[str, str | dict[str, dict[str, str]]] = {}
if "mcp" not in config_data:
config_data["mcp"] = {}
@@ -247,8 +180,8 @@ class OpencodePlugin(ToolPlugin):
}
elif mcp.type in ["docker", "proxy"]:
if mcp.name and mcp.host:
mcp_port = mcp.port or 8080
mcp_url = f"http://{mcp.host}:{mcp_port}/sse"
mcp_port: int = mcp.port or 8080
mcp_url: str = f"http://{mcp.host}:{mcp_port}/sse"
self.status.log(f"Adding MCP extension: {mcp.name} - {mcp_url}")
config_data["mcp"][mcp.name] = {
"type": "remote",
@@ -259,10 +192,12 @@ class OpencodePlugin(ToolPlugin):
with config_file.open("w") as f:
json.dump(config_data, f, indent=2)
# Set ownership of the config file to cubbi user
self._set_ownership(config_file)
set_ownership(config_file)
return True
except Exception as e:
self.status.log(f"Failed to integrate MCP servers: {e}", "ERROR")
return False
PLUGIN_CLASS = OpencodePlugin