mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-03-21 22:56:47 +00:00
feat: add change_seq to transcripts for ingestion support (#868)
* feat: add change_seq to transcripts for ingestion support Add a monotonically increasing change_seq column to the transcript table, backed by a PostgreSQL sequence and BEFORE INSERT OR UPDATE trigger. Every mutation gets a new sequence value, letting external ingesters checkpoint and never miss an update. * chore: regenerate frontend API types
This commit is contained in:
@@ -0,0 +1,74 @@
|
||||
"""add_change_seq_to_transcript
|
||||
|
||||
Revision ID: 623af934249a
|
||||
Revises: 3aa20b96d963
|
||||
Create Date: 2026-02-19 18:53:12.315440
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "623af934249a"
|
||||
down_revision: Union[str, None] = "3aa20b96d963"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Sequence
|
||||
op.execute("CREATE SEQUENCE IF NOT EXISTS transcript_change_seq;")
|
||||
|
||||
# Column (nullable first for backfill)
|
||||
op.add_column("transcript", sa.Column("change_seq", sa.BigInteger(), nullable=True))
|
||||
|
||||
# Backfill existing rows with sequential values (ordered by created_at for determinism)
|
||||
op.execute("""
|
||||
UPDATE transcript SET change_seq = sub.seq FROM (
|
||||
SELECT id, nextval('transcript_change_seq') AS seq
|
||||
FROM transcript ORDER BY created_at ASC
|
||||
) sub WHERE transcript.id = sub.id;
|
||||
""")
|
||||
|
||||
# Now make NOT NULL
|
||||
op.alter_column("transcript", "change_seq", nullable=False)
|
||||
|
||||
# Default for any inserts between now and trigger creation
|
||||
op.alter_column(
|
||||
"transcript",
|
||||
"change_seq",
|
||||
server_default=sa.text("nextval('transcript_change_seq')"),
|
||||
)
|
||||
|
||||
# Trigger function
|
||||
op.execute("""
|
||||
CREATE OR REPLACE FUNCTION set_transcript_change_seq()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.change_seq := nextval('transcript_change_seq');
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""")
|
||||
|
||||
# Trigger (fires on every INSERT or UPDATE)
|
||||
op.execute("""
|
||||
CREATE TRIGGER trigger_transcript_change_seq
|
||||
BEFORE INSERT OR UPDATE ON transcript
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION set_transcript_change_seq();
|
||||
""")
|
||||
|
||||
# Index for efficient polling
|
||||
op.create_index("idx_transcript_change_seq", "transcript", ["change_seq"])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.execute("DROP TRIGGER IF EXISTS trigger_transcript_change_seq ON transcript;")
|
||||
op.execute("DROP FUNCTION IF EXISTS set_transcript_change_seq();")
|
||||
op.drop_index("idx_transcript_change_seq", table_name="transcript")
|
||||
op.drop_column("transcript", "change_seq")
|
||||
op.execute("DROP SEQUENCE IF EXISTS transcript_change_seq;")
|
||||
@@ -151,6 +151,7 @@ class SearchResultDB(BaseModel):
|
||||
title: str | None = None
|
||||
source_kind: SourceKind
|
||||
room_id: str | None = None
|
||||
change_seq: int | None = None
|
||||
rank: float = Field(..., ge=0, le=1)
|
||||
|
||||
|
||||
@@ -173,6 +174,7 @@ class SearchResult(BaseModel):
|
||||
total_match_count: NonNegativeInt = Field(
|
||||
default=0, description="Total number of matches found in the transcript"
|
||||
)
|
||||
change_seq: int | None = None
|
||||
|
||||
@field_serializer("created_at", when_used="json")
|
||||
def serialize_datetime(self, dt: datetime) -> str:
|
||||
@@ -356,6 +358,7 @@ class SearchController:
|
||||
transcripts.c.user_id,
|
||||
transcripts.c.room_id,
|
||||
transcripts.c.source_kind,
|
||||
transcripts.c.change_seq,
|
||||
transcripts.c.webvtt,
|
||||
transcripts.c.long_summary,
|
||||
sqlalchemy.case(
|
||||
|
||||
@@ -35,6 +35,8 @@ class SourceKind(enum.StrEnum):
|
||||
FILE = enum.auto()
|
||||
|
||||
|
||||
transcript_change_seq = sqlalchemy.Sequence("transcript_change_seq", metadata=metadata)
|
||||
|
||||
transcripts = sqlalchemy.Table(
|
||||
"transcript",
|
||||
metadata,
|
||||
@@ -89,6 +91,12 @@ transcripts = sqlalchemy.Table(
|
||||
sqlalchemy.Column("webvtt", sqlalchemy.Text),
|
||||
# Hatchet workflow run ID for resumption of failed workflows
|
||||
sqlalchemy.Column("workflow_run_id", sqlalchemy.String),
|
||||
sqlalchemy.Column(
|
||||
"change_seq",
|
||||
sqlalchemy.BigInteger,
|
||||
transcript_change_seq,
|
||||
server_default=transcript_change_seq.next_value(),
|
||||
),
|
||||
sqlalchemy.Index("idx_transcript_recording_id", "recording_id"),
|
||||
sqlalchemy.Index("idx_transcript_user_id", "user_id"),
|
||||
sqlalchemy.Index("idx_transcript_created_at", "created_at"),
|
||||
@@ -229,6 +237,7 @@ class Transcript(BaseModel):
|
||||
audio_deleted: bool | None = None
|
||||
webvtt: str | None = None
|
||||
workflow_run_id: str | None = None # Hatchet workflow run ID for resumption
|
||||
change_seq: int | None = None
|
||||
|
||||
@field_serializer("created_at", when_used="json")
|
||||
def serialize_datetime(self, dt: datetime) -> str:
|
||||
@@ -381,6 +390,7 @@ class TranscriptController:
|
||||
source_kind: SourceKind | None = None,
|
||||
room_id: str | None = None,
|
||||
search_term: str | None = None,
|
||||
change_seq_from: int | None = None,
|
||||
return_query: bool = False,
|
||||
exclude_columns: list[str] = [
|
||||
"topics",
|
||||
@@ -401,6 +411,7 @@ class TranscriptController:
|
||||
- `filter_recording`: filter out transcripts that are currently recording
|
||||
- `room_id`: filter transcripts by room ID
|
||||
- `search_term`: filter transcripts by search term
|
||||
- `change_seq_from`: filter transcripts with change_seq > this value
|
||||
"""
|
||||
|
||||
query = transcripts.select().join(
|
||||
@@ -423,6 +434,9 @@ class TranscriptController:
|
||||
if search_term:
|
||||
query = query.where(transcripts.c.title.ilike(f"%{search_term}%"))
|
||||
|
||||
if change_seq_from is not None:
|
||||
query = query.where(transcripts.c.change_seq > change_seq_from)
|
||||
|
||||
# Exclude heavy JSON columns from list queries
|
||||
transcript_columns = [
|
||||
col for col in transcripts.c if col.name not in exclude_columns
|
||||
@@ -436,9 +450,10 @@ class TranscriptController:
|
||||
)
|
||||
|
||||
if order_by is not None:
|
||||
field = getattr(transcripts.c, order_by[1:])
|
||||
if order_by.startswith("-"):
|
||||
field = field.desc()
|
||||
field = getattr(transcripts.c, order_by[1:]).desc()
|
||||
else:
|
||||
field = getattr(transcripts.c, order_by)
|
||||
query = query.order_by(field)
|
||||
|
||||
if filter_empty:
|
||||
|
||||
@@ -111,6 +111,7 @@ class GetTranscriptMinimal(BaseModel):
|
||||
room_id: str | None = None
|
||||
room_name: str | None = None
|
||||
audio_deleted: bool | None = None
|
||||
change_seq: int | None = None
|
||||
|
||||
|
||||
class TranscriptParticipantWithEmail(TranscriptParticipant):
|
||||
@@ -266,12 +267,22 @@ async def transcripts_list(
|
||||
source_kind: SourceKind | None = None,
|
||||
room_id: str | None = None,
|
||||
search_term: str | None = None,
|
||||
change_seq_from: int | None = None,
|
||||
sort_by: Literal["created_at", "change_seq"] | None = None,
|
||||
):
|
||||
if not user and not settings.PUBLIC_MODE:
|
||||
raise HTTPException(status_code=401, detail="Not authenticated")
|
||||
|
||||
user_id = user["sub"] if user else None
|
||||
|
||||
# Default behavior preserved: sort_by=None → "-created_at"
|
||||
if sort_by == "change_seq":
|
||||
order_by = "change_seq" # ASC (ascending for checkpoint-based polling)
|
||||
elif sort_by == "created_at":
|
||||
order_by = "-created_at" # DESC (newest first, same as current default)
|
||||
else:
|
||||
order_by = "-created_at" # default, backward compatible
|
||||
|
||||
return await apaginate(
|
||||
get_database(),
|
||||
await transcripts_controller.get_all(
|
||||
@@ -279,7 +290,8 @@ async def transcripts_list(
|
||||
source_kind=SourceKind(source_kind) if source_kind else None,
|
||||
room_id=room_id,
|
||||
search_term=search_term,
|
||||
order_by="-created_at",
|
||||
order_by=order_by,
|
||||
change_seq_from=change_seq_from,
|
||||
return_query=True,
|
||||
),
|
||||
)
|
||||
@@ -512,6 +524,7 @@ async def transcript_get(
|
||||
"room_id": transcript.room_id,
|
||||
"room_name": room_name,
|
||||
"audio_deleted": transcript.audio_deleted,
|
||||
"change_seq": transcript.change_seq,
|
||||
"participants": participants,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user