test: update test fixtures to use @with_session decorator

- Replace manual session management in test fixtures with @with_session decorator
- Simplify async test fixtures by removing explicit session handling
- Update dependencies in pyproject.toml and uv.lock
This commit is contained in:
2025-09-23 12:09:26 -06:00
parent 8ad1270229
commit 27b3b9cdee
14 changed files with 1776 additions and 1837 deletions

View File

@@ -3,17 +3,15 @@ import os
import sys
import pytest
import pytest_asyncio
from sqlalchemy.pool import NullPool
@pytest.fixture(scope="session")
def event_loop():
"""Session-scoped event loop."""
"""Create an instance of the default event loop for the test session."""
if sys.platform.startswith("win") and sys.version_info[:2] >= (3, 8):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.get_event_loop_policy().new_event_loop()
loop = asyncio.new_event_loop()
yield loop
loop.close()
@@ -53,65 +51,42 @@ def docker_ip():
return "127.0.0.1"
# Only register docker_services dependent fixtures if docker plugin is available
try:
import pytest_docker # noqa: F401
@pytest.fixture(scope="session")
def postgres_service(docker_ip, docker_services):
"""Ensure that PostgreSQL service is up and responsive."""
port = docker_services.port_for("postgres_test", 5432)
@pytest.fixture(scope="session")
def postgres_service(docker_ip, docker_services):
"""Ensure that PostgreSQL service is up and responsive."""
port = docker_services.port_for("postgres_test", 5432)
def is_responsive():
try:
import psycopg2
def is_responsive():
try:
import psycopg2
conn = psycopg2.connect(
host=docker_ip,
port=port,
dbname="reflector_test",
user="test_user",
password="test_password",
)
conn.close()
return True
except Exception:
return False
conn = psycopg2.connect(
host=docker_ip,
port=port,
dbname="reflector_test",
user="test_user",
password="test_password",
)
conn.close()
return True
except Exception:
return False
docker_services.wait_until_responsive(timeout=30.0, pause=0.1, check=is_responsive)
docker_services.wait_until_responsive(
timeout=30.0, pause=0.1, check=is_responsive
)
# Return connection parameters
return {
"host": docker_ip,
"port": port,
"database": "reflector_test",
"user": "test_user",
"password": "test_password",
}
except ImportError:
# Docker plugin not available, provide a dummy fixture
@pytest.fixture(scope="session")
def postgres_service(docker_ip):
"""Dummy postgres service when docker plugin is not available"""
return {
"host": docker_ip,
"port": 15432, # Default test postgres port
"database": "reflector_test",
"user": "test_user",
"password": "test_password",
}
# Return connection parameters
return {
"host": docker_ip,
"port": port,
"database": "reflector_test",
"user": "test_user",
"password": "test_password",
}
@pytest_asyncio.fixture(scope="session", autouse=True)
async def setup_database(postgres_service):
"""Setup database and run migrations"""
from sqlalchemy.ext.asyncio import create_async_engine
from reflector.db import Base
# Build database URL from connection params
@pytest.fixture(scope="session")
def _database_url(postgres_service):
"""Provide database URL for pytest-async-sqlalchemy."""
db_config = postgres_service
DATABASE_URL = (
f"postgresql+asyncpg://{db_config['user']}:{db_config['password']}"
@@ -123,70 +98,15 @@ async def setup_database(postgres_service):
settings.DATABASE_URL = DATABASE_URL
engine = create_async_engine(
DATABASE_URL,
echo=False,
poolclass=NullPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield
await engine.dispose()
return DATABASE_URL
@pytest_asyncio.fixture
async def session(setup_database):
"""Provide a transactional database session for tests"""
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
@pytest.fixture(scope="session")
def init_database():
"""Provide database initialization for pytest-async-sqlalchemy."""
from reflector.db import Base
from reflector.settings import settings
engine = create_async_engine(
settings.DATABASE_URL,
echo=False,
poolclass=NullPool,
)
async_session_maker = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
async with async_session_maker() as session:
# Start a savepoint instead of a transaction to handle nested commits
await session.begin()
# Override commit to use flush instead in tests
original_commit = session.commit
async def flush_instead_of_commit():
await session.flush()
session.commit = flush_instead_of_commit
try:
yield session
await session.rollback()
except Exception:
await session.rollback()
raise
finally:
session.commit = original_commit # Restore original commit
await session.close()
# Properly dispose of the engine to close all connections
await engine.dispose()
return Base.metadata.create_all
@pytest.fixture