Files
reflector/server/reflector/tools/exportdb.py
Mathieu Virbel 9eab952c63 feat: postgresql migration and removal of sqlite in pytest (#546)
* feat: remove support of sqlite, 100% postgres

* fix: more migration and make datetime timezone aware in postgres

* fix: change how database is get, and use contextvar to have difference instance between different loops

* test: properly use client fixture that handle lifetime/database connection

* fix: add missing client fixture parameters to test functions

This commit fixes NameError issues where test functions were trying to use
the 'client' fixture but didn't have it as a parameter. The changes include:

1. Added 'client' parameter to test functions in:
   - test_transcripts_audio_download.py (6 functions including fixture)
   - test_transcripts_speaker.py (3 functions)
   - test_transcripts_upload.py (1 function)
   - test_transcripts_rtc_ws.py (2 functions + appserver fixture)

2. Resolved naming conflicts in test_transcripts_rtc_ws.py where both HTTP
   client and StreamClient were using variable name 'client'. StreamClient
   instances are now named 'stream_client' to avoid conflicts.

3. Added missing 'from reflector.app import app' import in rtc_ws tests.

Background: Previously implemented contextvars solution with get_database()
function resolves asyncio event loop conflicts in Celery tasks. The global
client fixture was also created to replace manual AsyncClient instances,
ensuring proper FastAPI application lifecycle management and database
connections during tests.

All tests now pass except for 2 pre-existing RTC WebSocket test failures
related to asyncpg connection issues unrelated to these fixes.

* fix: ensure task are correctly closed

* fix: make separate event loop for the live server

* fix: make default settings pointing at postgres

* build: remove pytest-docker deps out of dev, just tests group
2025-08-14 11:40:52 -06:00

70 lines
2.5 KiB
Python

import csv
import pathlib
async def export_db(filename: str) -> None:
from reflector.settings import settings
filename = pathlib.Path(filename).resolve()
settings.DATABASE_URL = f"sqlite:///{filename}"
from reflector.db import get_database, transcripts
database = get_database()
await database.connect()
transcripts = await database.fetch_all(transcripts.select())
await database.disconnect()
def export_transcript(transcript):
tid = transcript.id
yield tid, "title", transcript.title
yield tid, "name", transcript.name
yield tid, "created_at", transcript.created_at
yield tid, "long_summary", transcript.long_summary
yield tid, "short_summary", transcript.short_summary
yield tid, "source_language", transcript.source_language
yield tid, "target_language", transcript.target_language
yield tid, "user_id", transcript.user_id
yield tid, "status", transcript.status
for topic in transcript.topics:
yield tid, "topic", topic["id"], "title", topic["title"]
yield tid, "topic", topic["id"], "summary", topic["summary"]
yield tid, "topic", topic["id"], "timestamp", topic["timestamp"]
yield tid, "topic", topic["id"], "transcript", topic["transcript"]
# extract transcripts
for idx, entry in enumerate(transcript.events):
if entry["event"] == "TRANSCRIPT":
yield tid, "event_transcript", idx, "text", entry["data"]["text"]
if entry["data"].get("translation") is not None:
yield (
tid,
"event_transcript",
idx,
"translation",
entry["data"].get("translation", None),
)
def export_transcripts(transcripts):
for transcript in transcripts:
yield from export_transcript(transcript)
csv_output = pathlib.Path("export.csv").resolve()
output = csv.writer(open(csv_output, "w"))
output.writerow(["transcript_id", "key", "value", "key", "value"])
for row in export_transcripts(transcripts):
output.writerow(row)
print(f"Exported {len(transcripts)} transcripts to {csv_output}")
if __name__ == "__main__":
import argparse
import asyncio
parser = argparse.ArgumentParser()
parser.add_argument("database", help="Sqlite Database file")
args = parser.parse_args()
asyncio.run(export_db(args.database))