mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-04 18:06:48 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8fc8d8bf4a | |||
| c723752b7e | |||
| 4dc49e5b25 | |||
| 23d2bc283d | |||
| c8743fdf1c | |||
| 8a293882ad |
14
CHANGELOG.md
14
CHANGELOG.md
@@ -1,5 +1,19 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21)
|
||||||
|
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46))
|
||||||
|
|
||||||
|
## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21)
|
||||||
|
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
* ics non-sync bugfix ([#823](https://github.com/Monadical-SAS/reflector/issues/823)) ([23d2bc2](https://github.com/Monadical-SAS/reflector/commit/23d2bc283d4d02187b250d2055103e0374ee93d6))
|
||||||
|
|
||||||
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)
|
## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,44 @@
|
|||||||
|
"""replace_use_hatchet_with_use_celery
|
||||||
|
|
||||||
|
Revision ID: 80beb1ea3269
|
||||||
|
Revises: bd3a729bb379
|
||||||
|
Create Date: 2026-01-20 16:26:25.555869
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "80beb1ea3269"
|
||||||
|
down_revision: Union[str, None] = "bd3a729bb379"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||||
|
batch_op.add_column(
|
||||||
|
sa.Column(
|
||||||
|
"use_celery",
|
||||||
|
sa.Boolean(),
|
||||||
|
server_default=sa.text("false"),
|
||||||
|
nullable=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
batch_op.drop_column("use_hatchet")
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||||
|
batch_op.add_column(
|
||||||
|
sa.Column(
|
||||||
|
"use_hatchet",
|
||||||
|
sa.Boolean(),
|
||||||
|
server_default=sa.text("false"),
|
||||||
|
nullable=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
batch_op.drop_column("use_celery")
|
||||||
@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
|
|||||||
nullable=False,
|
nullable=False,
|
||||||
),
|
),
|
||||||
sqlalchemy.Column(
|
sqlalchemy.Column(
|
||||||
"use_hatchet",
|
"use_celery",
|
||||||
sqlalchemy.Boolean,
|
sqlalchemy.Boolean,
|
||||||
nullable=False,
|
nullable=False,
|
||||||
server_default=false(),
|
server_default=false(),
|
||||||
@@ -97,7 +97,7 @@ class Room(BaseModel):
|
|||||||
ics_last_sync: datetime | None = None
|
ics_last_sync: datetime | None = None
|
||||||
ics_last_etag: str | None = None
|
ics_last_etag: str | None = None
|
||||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||||
use_hatchet: bool = False
|
use_celery: bool = False
|
||||||
skip_consent: bool = False
|
skip_consent: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -12,14 +12,9 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
|||||||
daily_multitrack_pipeline,
|
daily_multitrack_pipeline,
|
||||||
)
|
)
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.settings import settings
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
if not settings.HATCHET_ENABLED:
|
|
||||||
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
|
|
||||||
return
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
|
|||||||
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
||||||
from reflector.hatchet.workflows.track_processing import track_workflow
|
from reflector.hatchet.workflows.track_processing import track_workflow
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.settings import settings
|
|
||||||
|
|
||||||
SLOTS = 10
|
SLOTS = 10
|
||||||
WORKER_NAME = "llm-worker-pool"
|
WORKER_NAME = "llm-worker-pool"
|
||||||
@@ -19,10 +18,6 @@ POOL = "llm-io"
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
if not settings.HATCHET_ENABLED:
|
|
||||||
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
|
|
||||||
return
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|||||||
@@ -319,21 +319,6 @@ class ICSSyncService:
|
|||||||
calendar = self.fetch_service.parse_ics(ics_content)
|
calendar = self.fetch_service.parse_ics(ics_content)
|
||||||
|
|
||||||
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
|
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
|
||||||
if room.ics_last_etag == content_hash:
|
|
||||||
logger.info("No changes in ICS for room", room_id=room.id)
|
|
||||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
|
||||||
events, total_events = self.fetch_service.extract_room_events(
|
|
||||||
calendar, room.name, room_url
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"status": SyncStatus.UNCHANGED,
|
|
||||||
"hash": content_hash,
|
|
||||||
"events_found": len(events),
|
|
||||||
"total_events": total_events,
|
|
||||||
"events_created": 0,
|
|
||||||
"events_updated": 0,
|
|
||||||
"events_deleted": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Extract matching events
|
# Extract matching events
|
||||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
||||||
@@ -371,6 +356,44 @@ class ICSSyncService:
|
|||||||
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
|
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
|
||||||
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
||||||
|
|
||||||
|
def _event_data_changed(self, existing: CalendarEvent, new_data: EventData) -> bool:
|
||||||
|
"""Check if event data has changed by comparing relevant fields.
|
||||||
|
|
||||||
|
IMPORTANT: When adding fields to CalendarEvent/EventData, update this method
|
||||||
|
and the _COMPARED_FIELDS set below for runtime validation.
|
||||||
|
"""
|
||||||
|
# Fields that come from ICS and should trigger updates when changed
|
||||||
|
_COMPARED_FIELDS = {
|
||||||
|
"title",
|
||||||
|
"description",
|
||||||
|
"start_time",
|
||||||
|
"end_time",
|
||||||
|
"location",
|
||||||
|
"attendees",
|
||||||
|
"ics_raw_data",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Runtime exhaustiveness check: ensure we're comparing all EventData fields
|
||||||
|
event_data_fields = set(EventData.__annotations__.keys()) - {"ics_uid"}
|
||||||
|
if event_data_fields != _COMPARED_FIELDS:
|
||||||
|
missing = event_data_fields - _COMPARED_FIELDS
|
||||||
|
extra = _COMPARED_FIELDS - event_data_fields
|
||||||
|
raise RuntimeError(
|
||||||
|
f"_event_data_changed() field mismatch: "
|
||||||
|
f"missing={missing}, extra={extra}. "
|
||||||
|
f"Update the comparison logic when adding/removing fields."
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
existing.title != new_data["title"]
|
||||||
|
or existing.description != new_data["description"]
|
||||||
|
or existing.start_time != new_data["start_time"]
|
||||||
|
or existing.end_time != new_data["end_time"]
|
||||||
|
or existing.location != new_data["location"]
|
||||||
|
or existing.attendees != new_data["attendees"]
|
||||||
|
or existing.ics_raw_data != new_data["ics_raw_data"]
|
||||||
|
)
|
||||||
|
|
||||||
async def _sync_events_to_database(
|
async def _sync_events_to_database(
|
||||||
self, room_id: str, events: list[EventData]
|
self, room_id: str, events: list[EventData]
|
||||||
) -> SyncStats:
|
) -> SyncStats:
|
||||||
@@ -386,11 +409,14 @@ class ICSSyncService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if existing:
|
if existing:
|
||||||
updated += 1
|
# Only count as updated if data actually changed
|
||||||
|
if self._event_data_changed(existing, event_data):
|
||||||
|
updated += 1
|
||||||
|
await calendar_events_controller.upsert(calendar_event)
|
||||||
else:
|
else:
|
||||||
created += 1
|
created += 1
|
||||||
|
await calendar_events_controller.upsert(calendar_event)
|
||||||
|
|
||||||
await calendar_events_controller.upsert(calendar_event)
|
|
||||||
current_ics_uids.append(event_data["ics_uid"])
|
current_ics_uids.append(event_data["ics_uid"])
|
||||||
|
|
||||||
# Soft delete events that are no longer in calendar
|
# Soft delete events that are no longer in calendar
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
|||||||
from reflector.pipelines.main_multitrack_pipeline import (
|
from reflector.pipelines.main_multitrack_pipeline import (
|
||||||
task_pipeline_multitrack_process,
|
task_pipeline_multitrack_process,
|
||||||
)
|
)
|
||||||
from reflector.settings import settings
|
|
||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
|
|
||||||
|
|
||||||
@@ -102,8 +101,8 @@ async def validate_transcript_for_processing(
|
|||||||
if transcript.locked:
|
if transcript.locked:
|
||||||
return ValidationLocked(detail="Recording is locked")
|
return ValidationLocked(detail="Recording is locked")
|
||||||
|
|
||||||
# hatchet is idempotent anyways + if it wasn't dispatched successfully
|
# Check if recording is ready for processing
|
||||||
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
|
if transcript.status == "idle" and not transcript.workflow_run_id:
|
||||||
return ValidationNotReady(detail="Recording is not ready for processing")
|
return ValidationNotReady(detail="Recording is not ready for processing")
|
||||||
|
|
||||||
# Check Celery tasks
|
# Check Celery tasks
|
||||||
@@ -116,7 +115,8 @@ async def validate_transcript_for_processing(
|
|||||||
):
|
):
|
||||||
return ValidationAlreadyScheduled(detail="already running")
|
return ValidationAlreadyScheduled(detail="already running")
|
||||||
|
|
||||||
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
# Check Hatchet workflow status if workflow_run_id exists
|
||||||
|
if transcript.workflow_run_id:
|
||||||
try:
|
try:
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
status = await HatchetClientManager.get_workflow_run_status(
|
||||||
transcript.workflow_run_id
|
transcript.workflow_run_id
|
||||||
@@ -181,19 +181,16 @@ async def dispatch_transcript_processing(
|
|||||||
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
||||||
"""
|
"""
|
||||||
if isinstance(config, MultitrackProcessingConfig):
|
if isinstance(config, MultitrackProcessingConfig):
|
||||||
# Check if room has use_hatchet=True (overrides env vars)
|
use_celery = False
|
||||||
room_forces_hatchet = False
|
|
||||||
if config.room_id:
|
if config.room_id:
|
||||||
room = await rooms_controller.get_by_id(config.room_id)
|
room = await rooms_controller.get_by_id(config.room_id)
|
||||||
room_forces_hatchet = room.use_hatchet if room else False
|
use_celery = room.use_celery if room else False
|
||||||
|
|
||||||
# Start durable workflow if enabled (Hatchet)
|
use_hatchet = not use_celery
|
||||||
# and if room has use_hatchet=True
|
|
||||||
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
|
|
||||||
|
|
||||||
if room_forces_hatchet:
|
if use_celery:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Room forces Hatchet workflow",
|
"Room uses legacy Celery processing",
|
||||||
room_id=config.room_id,
|
room_id=config.room_id,
|
||||||
transcript_id=config.transcript_id,
|
transcript_id=config.transcript_id,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -158,19 +158,10 @@ class Settings(BaseSettings):
|
|||||||
ZULIP_API_KEY: str | None = None
|
ZULIP_API_KEY: str | None = None
|
||||||
ZULIP_BOT_EMAIL: str | None = None
|
ZULIP_BOT_EMAIL: str | None = None
|
||||||
|
|
||||||
# Durable workflow orchestration
|
# Hatchet workflow orchestration (always enabled for multitrack processing)
|
||||||
# Provider: "hatchet" (or "none" to disable)
|
|
||||||
DURABLE_WORKFLOW_PROVIDER: str = "none"
|
|
||||||
|
|
||||||
# Hatchet workflow orchestration
|
|
||||||
HATCHET_CLIENT_TOKEN: str | None = None
|
HATCHET_CLIENT_TOKEN: str | None = None
|
||||||
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
|
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
|
||||||
HATCHET_DEBUG: bool = False
|
HATCHET_DEBUG: bool = False
|
||||||
|
|
||||||
@property
|
|
||||||
def HATCHET_ENABLED(self) -> bool:
|
|
||||||
"""True if Hatchet is the active provider."""
|
|
||||||
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
|
|
||||||
|
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
|
|||||||
@@ -287,11 +287,12 @@ async def _process_multitrack_recording_inner(
|
|||||||
room_id=room.id,
|
room_id=room.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
use_celery = room and room.use_celery
|
||||||
|
use_hatchet = not use_celery
|
||||||
|
|
||||||
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
|
if use_celery:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Room forces Hatchet workflow",
|
"Room uses legacy Celery processing",
|
||||||
room_id=room.id,
|
room_id=room.id,
|
||||||
transcript_id=transcript.id,
|
transcript_id=transcript.id,
|
||||||
)
|
)
|
||||||
@@ -810,7 +811,6 @@ async def reprocess_failed_daily_recordings():
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Fetch room to check use_hatchet flag
|
|
||||||
room = None
|
room = None
|
||||||
if meeting.room_id:
|
if meeting.room_id:
|
||||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||||
@@ -834,10 +834,10 @@ async def reprocess_failed_daily_recordings():
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
use_celery = room and room.use_celery
|
||||||
|
use_hatchet = not use_celery
|
||||||
|
|
||||||
if use_hatchet:
|
if use_hatchet:
|
||||||
# Hatchet requires a transcript for workflow_run_id tracking
|
|
||||||
if not transcript:
|
if not transcript:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"No transcript for Hatchet reprocessing, skipping",
|
"No transcript for Hatchet reprocessing, skipping",
|
||||||
|
|||||||
@@ -2,10 +2,9 @@
|
|||||||
Tests for Hatchet workflow dispatch and routing logic.
|
Tests for Hatchet workflow dispatch and routing logic.
|
||||||
|
|
||||||
These tests verify:
|
These tests verify:
|
||||||
1. Routing to Hatchet when HATCHET_ENABLED=True
|
1. Hatchet workflow validation and replay logic
|
||||||
2. Replay logic for failed workflows
|
2. Force flag to cancel and restart workflows
|
||||||
3. Force flag to cancel and restart
|
3. Validation prevents concurrent workflows
|
||||||
4. Validation prevents concurrent workflows
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from unittest.mock import AsyncMock, patch
|
from unittest.mock import AsyncMock, patch
|
||||||
@@ -34,25 +33,22 @@ async def test_hatchet_validation_blocks_running_workflow():
|
|||||||
workflow_run_id="running-workflow-123",
|
workflow_run_id="running-workflow-123",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
with patch(
|
||||||
mock_settings.HATCHET_ENABLED = True
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
|
) as mock_hatchet:
|
||||||
|
mock_hatchet.get_workflow_run_status = AsyncMock(
|
||||||
|
return_value=V1TaskStatus.RUNNING
|
||||||
|
)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||||
) as mock_hatchet:
|
) as mock_celery_check:
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
mock_celery_check.return_value = False
|
||||||
return_value=V1TaskStatus.RUNNING
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
assert isinstance(result, ValidationAlreadyScheduled)
|
||||||
|
assert "running" in result.detail.lower()
|
||||||
assert isinstance(result, ValidationAlreadyScheduled)
|
|
||||||
assert "running" in result.detail.lower()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
@@ -72,24 +68,21 @@ async def test_hatchet_validation_blocks_queued_workflow():
|
|||||||
workflow_run_id="queued-workflow-123",
|
workflow_run_id="queued-workflow-123",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
with patch(
|
||||||
mock_settings.HATCHET_ENABLED = True
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
|
) as mock_hatchet:
|
||||||
|
mock_hatchet.get_workflow_run_status = AsyncMock(
|
||||||
|
return_value=V1TaskStatus.QUEUED
|
||||||
|
)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||||
) as mock_hatchet:
|
) as mock_celery_check:
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
mock_celery_check.return_value = False
|
||||||
return_value=V1TaskStatus.QUEUED
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
assert isinstance(result, ValidationAlreadyScheduled)
|
||||||
|
|
||||||
assert isinstance(result, ValidationAlreadyScheduled)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
@@ -110,25 +103,22 @@ async def test_hatchet_validation_allows_failed_workflow():
|
|||||||
recording_id="test-recording-id",
|
recording_id="test-recording-id",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
with patch(
|
||||||
mock_settings.HATCHET_ENABLED = True
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
|
) as mock_hatchet:
|
||||||
|
mock_hatchet.get_workflow_run_status = AsyncMock(
|
||||||
|
return_value=V1TaskStatus.FAILED
|
||||||
|
)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||||
) as mock_hatchet:
|
) as mock_celery_check:
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
mock_celery_check.return_value = False
|
||||||
return_value=V1TaskStatus.FAILED
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
assert isinstance(result, ValidationOk)
|
||||||
|
assert result.transcript_id == "test-transcript-id"
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
assert result.transcript_id == "test-transcript-id"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
@@ -149,24 +139,21 @@ async def test_hatchet_validation_allows_completed_workflow():
|
|||||||
recording_id="test-recording-id",
|
recording_id="test-recording-id",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
with patch(
|
||||||
mock_settings.HATCHET_ENABLED = True
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
|
) as mock_hatchet:
|
||||||
|
mock_hatchet.get_workflow_run_status = AsyncMock(
|
||||||
|
return_value=V1TaskStatus.COMPLETED
|
||||||
|
)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||||
) as mock_hatchet:
|
) as mock_celery_check:
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
mock_celery_check.return_value = False
|
||||||
return_value=V1TaskStatus.COMPLETED
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
assert isinstance(result, ValidationOk)
|
||||||
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
@@ -187,26 +174,23 @@ async def test_hatchet_validation_allows_when_status_check_fails():
|
|||||||
recording_id="test-recording-id",
|
recording_id="test-recording-id",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
with patch(
|
||||||
mock_settings.HATCHET_ENABLED = True
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
|
) as mock_hatchet:
|
||||||
|
# Status check fails (workflow might be deleted)
|
||||||
|
mock_hatchet.get_workflow_run_status = AsyncMock(
|
||||||
|
side_effect=ApiException("Workflow not found")
|
||||||
|
)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||||
) as mock_hatchet:
|
) as mock_celery_check:
|
||||||
# Status check fails (workflow might be deleted)
|
mock_celery_check.return_value = False
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
|
||||||
side_effect=ApiException("Workflow not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
# Should allow processing when we can't get status
|
||||||
|
assert isinstance(result, ValidationOk)
|
||||||
# Should allow processing when we can't get status
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
@@ -227,47 +211,11 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
|
|||||||
recording_id="test-recording-id",
|
recording_id="test-recording-id",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
with patch(
|
||||||
mock_settings.HATCHET_ENABLED = True
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
|
) as mock_hatchet:
|
||||||
with patch(
|
# Should not be called
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
mock_hatchet.get_workflow_run_status = AsyncMock()
|
||||||
) as mock_hatchet:
|
|
||||||
# Should not be called
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock()
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
# Should not check Hatchet status
|
|
||||||
mock_hatchet.get_workflow_run_status.assert_not_called()
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_skipped_when_disabled():
|
|
||||||
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationOk,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="uploaded",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="some-workflow-123",
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
|
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
||||||
@@ -276,7 +224,8 @@ async def test_hatchet_validation_skipped_when_disabled():
|
|||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
|
|
||||||
# Should not check Hatchet at all
|
# Should not check Hatchet status
|
||||||
|
mock_hatchet.get_workflow_run_status.assert_not_called()
|
||||||
assert isinstance(result, ValidationOk)
|
assert isinstance(result, ValidationOk)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -189,14 +189,17 @@ async def test_ics_sync_service_sync_room_calendar():
|
|||||||
assert events[0].ics_uid == "sync-event-1"
|
assert events[0].ics_uid == "sync-event-1"
|
||||||
assert events[0].title == "Sync Test Meeting"
|
assert events[0].title == "Sync Test Meeting"
|
||||||
|
|
||||||
# Second sync with same content (should be unchanged)
|
# Second sync with same content (calendar unchanged, but sync always runs)
|
||||||
# Refresh room to get updated etag and force sync by setting old sync time
|
# Refresh room to get updated etag and force sync by setting old sync time
|
||||||
room = await rooms_controller.get_by_id(room.id)
|
room = await rooms_controller.get_by_id(room.id)
|
||||||
await rooms_controller.update(
|
await rooms_controller.update(
|
||||||
room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}
|
room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}
|
||||||
)
|
)
|
||||||
result = await sync_service.sync_room_calendar(room)
|
result = await sync_service.sync_room_calendar(room)
|
||||||
assert result["status"] == "unchanged"
|
assert result["status"] == "success"
|
||||||
|
assert result["events_created"] == 0
|
||||||
|
assert result["events_updated"] == 0
|
||||||
|
assert result["events_deleted"] == 0
|
||||||
|
|
||||||
# Third sync with updated event
|
# Third sync with updated event
|
||||||
event["summary"] = "Updated Meeting Title"
|
event["summary"] = "Updated Meeting Title"
|
||||||
@@ -288,3 +291,43 @@ async def test_ics_sync_service_error_handling():
|
|||||||
result = await sync_service.sync_room_calendar(room)
|
result = await sync_service.sync_room_calendar(room)
|
||||||
assert result["status"] == "error"
|
assert result["status"] == "error"
|
||||||
assert "Network error" in result["error"]
|
assert "Network error" in result["error"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_event_data_changed_exhaustiveness():
|
||||||
|
"""Test that _event_data_changed compares all EventData fields (except ics_uid).
|
||||||
|
|
||||||
|
This test ensures programmers don't forget to update the comparison logic
|
||||||
|
when adding new fields to EventData/CalendarEvent.
|
||||||
|
"""
|
||||||
|
from reflector.services.ics_sync import EventData
|
||||||
|
|
||||||
|
sync_service = ICSSyncService()
|
||||||
|
|
||||||
|
from reflector.db.calendar_events import CalendarEvent
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
event_data: EventData = {
|
||||||
|
"ics_uid": "test-123",
|
||||||
|
"title": "Test",
|
||||||
|
"description": "Desc",
|
||||||
|
"location": "Loc",
|
||||||
|
"start_time": now,
|
||||||
|
"end_time": now + timedelta(hours=1),
|
||||||
|
"attendees": [],
|
||||||
|
"ics_raw_data": "raw",
|
||||||
|
}
|
||||||
|
|
||||||
|
existing = CalendarEvent(
|
||||||
|
room_id="room1",
|
||||||
|
**event_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Will raise RuntimeError if fields are missing from comparison
|
||||||
|
result = sync_service._event_data_changed(existing, event_data)
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
modified_data = event_data.copy()
|
||||||
|
modified_data["title"] = "Changed Title"
|
||||||
|
result = sync_service._event_data_changed(existing, modified_data)
|
||||||
|
assert result is True
|
||||||
|
|||||||
@@ -162,9 +162,24 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from reflector.db.recordings import Recording, recordings_controller
|
from reflector.db.recordings import Recording, recordings_controller
|
||||||
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.db.transcripts import transcripts_controller
|
from reflector.db.transcripts import transcripts_controller
|
||||||
|
|
||||||
# Create transcript with Daily.co multitrack recording
|
room = await rooms_controller.add(
|
||||||
|
name="test-room",
|
||||||
|
user_id="test-user",
|
||||||
|
zulip_auto_post=False,
|
||||||
|
zulip_stream="",
|
||||||
|
zulip_topic="",
|
||||||
|
is_locked=False,
|
||||||
|
room_mode="normal",
|
||||||
|
recording_type="cloud",
|
||||||
|
recording_trigger="automatic-2nd-participant",
|
||||||
|
is_shared=False,
|
||||||
|
)
|
||||||
|
# Force Celery backend for test
|
||||||
|
await rooms_controller.update(room, {"use_celery": True})
|
||||||
|
|
||||||
transcript = await transcripts_controller.add(
|
transcript = await transcripts_controller.add(
|
||||||
"",
|
"",
|
||||||
source_kind="room",
|
source_kind="room",
|
||||||
@@ -172,6 +187,7 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
|||||||
target_language="en",
|
target_language="en",
|
||||||
user_id="test-user",
|
user_id="test-user",
|
||||||
share_mode="public",
|
share_mode="public",
|
||||||
|
room_id=room.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
track_keys = [
|
track_keys = [
|
||||||
|
|||||||
Reference in New Issue
Block a user