mirror of
https://github.com/Monadical-SAS/cubbi.git
synced 2025-12-20 12:19:07 +00:00
fix: prevent concurrent YAML corruption in sessions (#36)
fix: add file locking to prevent concurrent YAML corruption in sessions When multiple cubbi instances run simultaneously, they can corrupt the sessions.yaml file due to concurrent writes. This manifests as malformed YAML entries (e.g., "status: running\ning2dc3ff11:"). This commit adds: - fcntl-based file locking for all write operations - Read-modify-write pattern that reloads from disk before each write - Proper lock acquisition/release via context manager All write operations (add_session, remove_session, save) now: 1. Acquire exclusive lock on sessions.yaml 2. Reload latest state from disk 3. Apply modifications 4. Write atomically to file 5. Update in-memory cache 6. Release lock This ensures that concurrent cubbi instances can safely modify the sessions file without corruption.
This commit is contained in:
@@ -2,7 +2,9 @@
|
|||||||
Session storage management for Cubbi Container Tool.
|
Session storage management for Cubbi Container Tool.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import fcntl
|
||||||
import os
|
import os
|
||||||
|
from contextlib import contextmanager
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
|
|
||||||
@@ -11,6 +13,31 @@ import yaml
|
|||||||
DEFAULT_SESSIONS_FILE = Path.home() / ".config" / "cubbi" / "sessions.yaml"
|
DEFAULT_SESSIONS_FILE = Path.home() / ".config" / "cubbi" / "sessions.yaml"
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _file_lock(file_path: Path):
|
||||||
|
"""Context manager for file locking.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Path to the file to lock
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
File descriptor with exclusive lock
|
||||||
|
"""
|
||||||
|
# Ensure the file exists
|
||||||
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
if not file_path.exists():
|
||||||
|
file_path.touch(mode=0o600)
|
||||||
|
|
||||||
|
# Open file and acquire exclusive lock
|
||||||
|
fd = open(file_path, "r+")
|
||||||
|
try:
|
||||||
|
fcntl.flock(fd.fileno(), fcntl.LOCK_EX)
|
||||||
|
yield fd
|
||||||
|
finally:
|
||||||
|
fcntl.flock(fd.fileno(), fcntl.LOCK_UN)
|
||||||
|
fd.close()
|
||||||
|
|
||||||
|
|
||||||
class SessionManager:
|
class SessionManager:
|
||||||
"""Manager for container sessions."""
|
"""Manager for container sessions."""
|
||||||
|
|
||||||
@@ -42,9 +69,26 @@ class SessionManager:
|
|||||||
return sessions
|
return sessions
|
||||||
|
|
||||||
def save(self) -> None:
|
def save(self) -> None:
|
||||||
"""Save the sessions to file."""
|
"""Save the sessions to file.
|
||||||
with open(self.sessions_path, "w") as f:
|
|
||||||
yaml.safe_dump(self.sessions, f)
|
Note: This method acquires a file lock and merges with existing data
|
||||||
|
to prevent concurrent write issues.
|
||||||
|
"""
|
||||||
|
with _file_lock(self.sessions_path) as fd:
|
||||||
|
# Reload sessions from disk to get latest state
|
||||||
|
fd.seek(0)
|
||||||
|
sessions = yaml.safe_load(fd) or {}
|
||||||
|
|
||||||
|
# Merge current in-memory sessions with disk state
|
||||||
|
sessions.update(self.sessions)
|
||||||
|
|
||||||
|
# Write back to file
|
||||||
|
fd.seek(0)
|
||||||
|
fd.truncate()
|
||||||
|
yaml.safe_dump(sessions, fd)
|
||||||
|
|
||||||
|
# Update in-memory cache
|
||||||
|
self.sessions = sessions
|
||||||
|
|
||||||
def add_session(self, session_id: str, session_data: dict) -> None:
|
def add_session(self, session_id: str, session_data: dict) -> None:
|
||||||
"""Add a session to storage.
|
"""Add a session to storage.
|
||||||
@@ -53,8 +97,21 @@ class SessionManager:
|
|||||||
session_id: The unique session ID
|
session_id: The unique session ID
|
||||||
session_data: The session data (Session model dump as dict)
|
session_data: The session data (Session model dump as dict)
|
||||||
"""
|
"""
|
||||||
self.sessions[session_id] = session_data
|
with _file_lock(self.sessions_path) as fd:
|
||||||
self.save()
|
# Reload sessions from disk to get latest state
|
||||||
|
fd.seek(0)
|
||||||
|
sessions = yaml.safe_load(fd) or {}
|
||||||
|
|
||||||
|
# Apply the modification
|
||||||
|
sessions[session_id] = session_data
|
||||||
|
|
||||||
|
# Write back to file
|
||||||
|
fd.seek(0)
|
||||||
|
fd.truncate()
|
||||||
|
yaml.safe_dump(sessions, fd)
|
||||||
|
|
||||||
|
# Update in-memory cache
|
||||||
|
self.sessions = sessions
|
||||||
|
|
||||||
def get_session(self, session_id: str) -> Optional[dict]:
|
def get_session(self, session_id: str) -> Optional[dict]:
|
||||||
"""Get a session by ID.
|
"""Get a session by ID.
|
||||||
@@ -81,6 +138,19 @@ class SessionManager:
|
|||||||
Args:
|
Args:
|
||||||
session_id: The session ID to remove
|
session_id: The session ID to remove
|
||||||
"""
|
"""
|
||||||
if session_id in self.sessions:
|
with _file_lock(self.sessions_path) as fd:
|
||||||
del self.sessions[session_id]
|
# Reload sessions from disk to get latest state
|
||||||
self.save()
|
fd.seek(0)
|
||||||
|
sessions = yaml.safe_load(fd) or {}
|
||||||
|
|
||||||
|
# Apply the modification
|
||||||
|
if session_id in sessions:
|
||||||
|
del sessions[session_id]
|
||||||
|
|
||||||
|
# Write back to file
|
||||||
|
fd.seek(0)
|
||||||
|
fd.truncate()
|
||||||
|
yaml.safe_dump(sessions, fd)
|
||||||
|
|
||||||
|
# Update in-memory cache
|
||||||
|
self.sessions = sessions
|
||||||
|
|||||||
Reference in New Issue
Block a user