From d4cc6be1fed56ea7fba06acb8d50c9de43b26b07 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Fri, 20 Feb 2026 10:12:05 -0600 Subject: [PATCH] feat: add change_seq to transcripts for ingestion support (#868) * feat: add change_seq to transcripts for ingestion support Add a monotonically increasing change_seq column to the transcript table, backed by a PostgreSQL sequence and BEFORE INSERT OR UPDATE trigger. Every mutation gets a new sequence value, letting external ingesters checkpoint and never miss an update. * chore: regenerate frontend API types --- ...3af934249a_add_change_seq_to_transcript.py | 74 +++++++++++++++++++ server/reflector/db/search.py | 3 + server/reflector/db/transcripts.py | 19 ++++- server/reflector/views/transcripts.py | 15 +++- www/app/reflector-api.d.ts | 59 +++++++++++++-- 5 files changed, 159 insertions(+), 11 deletions(-) create mode 100644 server/migrations/versions/623af934249a_add_change_seq_to_transcript.py diff --git a/server/migrations/versions/623af934249a_add_change_seq_to_transcript.py b/server/migrations/versions/623af934249a_add_change_seq_to_transcript.py new file mode 100644 index 00000000..bde1656d --- /dev/null +++ b/server/migrations/versions/623af934249a_add_change_seq_to_transcript.py @@ -0,0 +1,74 @@ +"""add_change_seq_to_transcript + +Revision ID: 623af934249a +Revises: 3aa20b96d963 +Create Date: 2026-02-19 18:53:12.315440 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "623af934249a" +down_revision: Union[str, None] = "3aa20b96d963" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Sequence + op.execute("CREATE SEQUENCE IF NOT EXISTS transcript_change_seq;") + + # Column (nullable first for backfill) + op.add_column("transcript", sa.Column("change_seq", sa.BigInteger(), nullable=True)) + + # Backfill existing rows with sequential values (ordered by created_at for determinism) + op.execute(""" + UPDATE transcript SET change_seq = sub.seq FROM ( + SELECT id, nextval('transcript_change_seq') AS seq + FROM transcript ORDER BY created_at ASC + ) sub WHERE transcript.id = sub.id; + """) + + # Now make NOT NULL + op.alter_column("transcript", "change_seq", nullable=False) + + # Default for any inserts between now and trigger creation + op.alter_column( + "transcript", + "change_seq", + server_default=sa.text("nextval('transcript_change_seq')"), + ) + + # Trigger function + op.execute(""" + CREATE OR REPLACE FUNCTION set_transcript_change_seq() + RETURNS TRIGGER AS $$ + BEGIN + NEW.change_seq := nextval('transcript_change_seq'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """) + + # Trigger (fires on every INSERT or UPDATE) + op.execute(""" + CREATE TRIGGER trigger_transcript_change_seq + BEFORE INSERT OR UPDATE ON transcript + FOR EACH ROW + EXECUTE FUNCTION set_transcript_change_seq(); + """) + + # Index for efficient polling + op.create_index("idx_transcript_change_seq", "transcript", ["change_seq"]) + + +def downgrade() -> None: + op.execute("DROP TRIGGER IF EXISTS trigger_transcript_change_seq ON transcript;") + op.execute("DROP FUNCTION IF EXISTS set_transcript_change_seq();") + op.drop_index("idx_transcript_change_seq", table_name="transcript") + op.drop_column("transcript", "change_seq") + op.execute("DROP SEQUENCE IF EXISTS transcript_change_seq;") diff --git a/server/reflector/db/search.py b/server/reflector/db/search.py index d61bb27f..210dd25e 100644 --- a/server/reflector/db/search.py +++ b/server/reflector/db/search.py @@ -151,6 +151,7 @@ class SearchResultDB(BaseModel): title: str | None = None source_kind: SourceKind room_id: str | None = None + change_seq: int | None = None rank: float = Field(..., ge=0, le=1) @@ -173,6 +174,7 @@ class SearchResult(BaseModel): total_match_count: NonNegativeInt = Field( default=0, description="Total number of matches found in the transcript" ) + change_seq: int | None = None @field_serializer("created_at", when_used="json") def serialize_datetime(self, dt: datetime) -> str: @@ -356,6 +358,7 @@ class SearchController: transcripts.c.user_id, transcripts.c.room_id, transcripts.c.source_kind, + transcripts.c.change_seq, transcripts.c.webvtt, transcripts.c.long_summary, sqlalchemy.case( diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index 0e4520b6..c443d169 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -35,6 +35,8 @@ class SourceKind(enum.StrEnum): FILE = enum.auto() +transcript_change_seq = sqlalchemy.Sequence("transcript_change_seq", metadata=metadata) + transcripts = sqlalchemy.Table( "transcript", metadata, @@ -89,6 +91,12 @@ transcripts = sqlalchemy.Table( sqlalchemy.Column("webvtt", sqlalchemy.Text), # Hatchet workflow run ID for resumption of failed workflows sqlalchemy.Column("workflow_run_id", sqlalchemy.String), + sqlalchemy.Column( + "change_seq", + sqlalchemy.BigInteger, + transcript_change_seq, + server_default=transcript_change_seq.next_value(), + ), sqlalchemy.Index("idx_transcript_recording_id", "recording_id"), sqlalchemy.Index("idx_transcript_user_id", "user_id"), sqlalchemy.Index("idx_transcript_created_at", "created_at"), @@ -229,6 +237,7 @@ class Transcript(BaseModel): audio_deleted: bool | None = None webvtt: str | None = None workflow_run_id: str | None = None # Hatchet workflow run ID for resumption + change_seq: int | None = None @field_serializer("created_at", when_used="json") def serialize_datetime(self, dt: datetime) -> str: @@ -381,6 +390,7 @@ class TranscriptController: source_kind: SourceKind | None = None, room_id: str | None = None, search_term: str | None = None, + change_seq_from: int | None = None, return_query: bool = False, exclude_columns: list[str] = [ "topics", @@ -401,6 +411,7 @@ class TranscriptController: - `filter_recording`: filter out transcripts that are currently recording - `room_id`: filter transcripts by room ID - `search_term`: filter transcripts by search term + - `change_seq_from`: filter transcripts with change_seq > this value """ query = transcripts.select().join( @@ -423,6 +434,9 @@ class TranscriptController: if search_term: query = query.where(transcripts.c.title.ilike(f"%{search_term}%")) + if change_seq_from is not None: + query = query.where(transcripts.c.change_seq > change_seq_from) + # Exclude heavy JSON columns from list queries transcript_columns = [ col for col in transcripts.c if col.name not in exclude_columns @@ -436,9 +450,10 @@ class TranscriptController: ) if order_by is not None: - field = getattr(transcripts.c, order_by[1:]) if order_by.startswith("-"): - field = field.desc() + field = getattr(transcripts.c, order_by[1:]).desc() + else: + field = getattr(transcripts.c, order_by) query = query.order_by(field) if filter_empty: diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py index 2e1c9d30..cbb41c77 100644 --- a/server/reflector/views/transcripts.py +++ b/server/reflector/views/transcripts.py @@ -111,6 +111,7 @@ class GetTranscriptMinimal(BaseModel): room_id: str | None = None room_name: str | None = None audio_deleted: bool | None = None + change_seq: int | None = None class TranscriptParticipantWithEmail(TranscriptParticipant): @@ -266,12 +267,22 @@ async def transcripts_list( source_kind: SourceKind | None = None, room_id: str | None = None, search_term: str | None = None, + change_seq_from: int | None = None, + sort_by: Literal["created_at", "change_seq"] | None = None, ): if not user and not settings.PUBLIC_MODE: raise HTTPException(status_code=401, detail="Not authenticated") user_id = user["sub"] if user else None + # Default behavior preserved: sort_by=None → "-created_at" + if sort_by == "change_seq": + order_by = "change_seq" # ASC (ascending for checkpoint-based polling) + elif sort_by == "created_at": + order_by = "-created_at" # DESC (newest first, same as current default) + else: + order_by = "-created_at" # default, backward compatible + return await apaginate( get_database(), await transcripts_controller.get_all( @@ -279,7 +290,8 @@ async def transcripts_list( source_kind=SourceKind(source_kind) if source_kind else None, room_id=room_id, search_term=search_term, - order_by="-created_at", + order_by=order_by, + change_seq_from=change_seq_from, return_query=True, ), ) @@ -512,6 +524,7 @@ async def transcript_get( "room_id": transcript.room_id, "room_name": room_name, "audio_deleted": transcript.audio_deleted, + "change_seq": transcript.change_seq, "participants": participants, } diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index af5ec7fa..4dea7901 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -1032,6 +1032,8 @@ export interface components { room_name?: string | null; /** Audio Deleted */ audio_deleted?: boolean | null; + /** Change Seq */ + change_seq?: number | null; }; /** GetTranscriptSegmentTopic */ GetTranscriptSegmentTopic: { @@ -1178,6 +1180,8 @@ export interface components { room_name?: string | null; /** Audio Deleted */ audio_deleted?: boolean | null; + /** Change Seq */ + change_seq?: number | null; /** Participants */ participants: | components["schemas"]["TranscriptParticipantWithEmail"][] @@ -1241,6 +1245,8 @@ export interface components { room_name?: string | null; /** Audio Deleted */ audio_deleted?: boolean | null; + /** Change Seq */ + change_seq?: number | null; /** Participants */ participants: | components["schemas"]["TranscriptParticipantWithEmail"][] @@ -1305,6 +1311,8 @@ export interface components { room_name?: string | null; /** Audio Deleted */ audio_deleted?: boolean | null; + /** Change Seq */ + change_seq?: number | null; /** Participants */ participants: | components["schemas"]["TranscriptParticipantWithEmail"][] @@ -1376,6 +1384,8 @@ export interface components { room_name?: string | null; /** Audio Deleted */ audio_deleted?: boolean | null; + /** Change Seq */ + change_seq?: number | null; /** Participants */ participants: | components["schemas"]["TranscriptParticipantWithEmail"][] @@ -1449,6 +1459,8 @@ export interface components { room_name?: string | null; /** Audio Deleted */ audio_deleted?: boolean | null; + /** Change Seq */ + change_seq?: number | null; /** Participants */ participants: | components["schemas"]["TranscriptParticipantWithEmail"][] @@ -1834,6 +1846,8 @@ export interface components { * @default 0 */ total_match_count: number; + /** Change Seq */ + change_seq?: number | null; }; /** * SourceKind @@ -2146,34 +2160,61 @@ export interface components { }; /** UserTranscriptCreatedData */ UserTranscriptCreatedData: { - /** Id */ + /** + * Id + * @description A non-empty string + */ id: string; }; /** UserTranscriptDeletedData */ UserTranscriptDeletedData: { - /** Id */ + /** + * Id + * @description A non-empty string + */ id: string; }; /** UserTranscriptDurationData */ UserTranscriptDurationData: { - /** Id */ + /** + * Id + * @description A non-empty string + */ id: string; /** Duration */ duration: number; }; /** UserTranscriptFinalTitleData */ UserTranscriptFinalTitleData: { - /** Id */ + /** + * Id + * @description A non-empty string + */ id: string; - /** Title */ + /** + * Title + * @description A non-empty string + */ title: string; }; /** UserTranscriptStatusData */ UserTranscriptStatusData: { - /** Id */ + /** + * Id + * @description A non-empty string + */ id: string; - /** Value */ - value: string; + /** + * Value + * @enum {string} + */ + value: + | "idle" + | "uploaded" + | "recording" + | "processing" + | "error" + | "ended"; }; /** UserWsTranscriptCreated */ UserWsTranscriptCreated: { @@ -2926,6 +2967,8 @@ export interface operations { source_kind?: components["schemas"]["SourceKind"] | null; room_id?: string | null; search_term?: string | null; + change_seq_from?: number | null; + sort_by?: ("created_at" | "change_seq") | null; /** @description Page number */ page?: number; /** @description Page size */