refactor: migrate to SQLAlchemy 2.0 ORM-style patterns

- Replace __table__.join() with ORM-style joins using select_from().outerjoin()
- Replace __table__.delete() with delete(Model) in tests
- Migrate from **row.__dict__ to model_validate() with ConfigDict(from_attributes=True)
- Add ConfigDict(from_attributes=True) to all Pydantic models for proper SQLAlchemy model conversion
- Update all controller methods to use model_validate() instead of dict unpacking

This completes the migration to SQLAlchemy 2.0 recommended patterns while maintaining
backwards compatibility and improving code consistency.
This commit is contained in:
2025-09-23 16:46:37 -06:00
parent a883df0d63
commit e0c71c5548
7 changed files with 51 additions and 40 deletions

View File

@@ -103,6 +103,8 @@ class TranscriptParticipant(BaseModel):
class Transcript(BaseModel):
"""Full transcript model with all fields."""
model_config = ConfigDict(from_attributes=True)
id: str = Field(default_factory=generate_uuid4)
user_id: str | None = None
name: str = Field(default_factory=generate_transcript_name)
@@ -317,8 +319,9 @@ class TranscriptController:
query = query.where(TranscriptModel.title.ilike(f"%{search_term}%"))
# Exclude heavy JSON columns from list queries
# Get all ORM column attributes except excluded ones
transcript_columns = [
col
getattr(TranscriptModel, col.name)
for col in TranscriptModel.__table__.c
if col.name not in exclude_columns
]
@@ -361,7 +364,7 @@ class TranscriptController:
row = result.scalar_one_or_none()
if not row:
return None
return Transcript(**row.__dict__)
return Transcript.model_validate(row)
async def get_by_recording_id(
self, session: AsyncSession, recording_id: str, **kwargs
@@ -378,7 +381,7 @@ class TranscriptController:
row = result.scalar_one_or_none()
if not row:
return None
return Transcript(**row.__dict__)
return Transcript.model_validate(row)
async def get_by_room_id(
self, session: AsyncSession, room_id: str, **kwargs
@@ -396,7 +399,9 @@ class TranscriptController:
field = field.desc()
query = query.order_by(field)
results = await session.execute(query)
return [Transcript(**dict(row)) for row in results.mappings().all()]
return [
Transcript.model_validate(dict(row)) for row in results.mappings().all()
]
async def get_by_id_for_http(
self,
@@ -420,7 +425,7 @@ class TranscriptController:
raise HTTPException(status_code=404, detail="Transcript not found")
# if the transcript is anonymous, share mode is not checked
transcript = Transcript(**row.__dict__)
transcript = Transcript.model_validate(row)
if transcript.user_id is None:
return transcript