mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-03-22 07:06:47 +00:00
feat: add custom S3 endpoint support + Garage standalone storage
Add TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL setting to enable S3-compatible backends (Garage, MinIO). When set, uses path-style addressing and routes all requests to the custom endpoint. When unset, AWS behavior is unchanged. - AwsStorage: accept aws_endpoint_url, pass to all 6 session.client() calls, configure path-style addressing and base_url - Fix 4 direct AwsStorage constructions in Hatchet workflows to pass endpoint_url (would have silently targeted wrong endpoint) - Standalone: add Garage service to docker-compose.standalone.yml, setup script initializes layout/bucket/key and writes credentials - Fix compose_cmd() bug: Mac path was missing standalone yml - garage.toml template with runtime secret generation via openssl
This commit is contained in:
@@ -171,11 +171,13 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
|
||||
|
||||
def _spawn_storage():
|
||||
"""Create fresh storage instance."""
|
||||
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
|
||||
return AwsStorage(
|
||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -49,11 +49,13 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
|
||||
)
|
||||
|
||||
source_url = await storage.get_file_url(
|
||||
|
||||
@@ -60,6 +60,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
|
||||
try:
|
||||
# Create fresh storage instance to avoid aioboto3 fork issues
|
||||
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
@@ -68,6 +69,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
|
||||
)
|
||||
|
||||
source_url = await storage.get_file_url(
|
||||
@@ -159,6 +161,7 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
||||
raise ValueError("Missing padded_key from pad_track")
|
||||
|
||||
# Presign URL on demand (avoids stale URLs on workflow replay)
|
||||
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
@@ -167,6 +170,7 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
|
||||
)
|
||||
|
||||
audio_url = await storage.get_file_url(
|
||||
|
||||
@@ -49,6 +49,7 @@ class Settings(BaseSettings):
|
||||
TRANSCRIPT_STORAGE_AWS_REGION: str = "us-east-1"
|
||||
TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID: str | None = None
|
||||
TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
|
||||
TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL: str | None = None
|
||||
|
||||
# Platform-specific recording storage (follows {PREFIX}_STORAGE_AWS_{CREDENTIAL} pattern)
|
||||
# Whereby storage configuration
|
||||
|
||||
@@ -53,6 +53,7 @@ class AwsStorage(Storage):
|
||||
aws_access_key_id: str | None = None,
|
||||
aws_secret_access_key: str | None = None,
|
||||
aws_role_arn: str | None = None,
|
||||
aws_endpoint_url: str | None = None,
|
||||
):
|
||||
if not aws_bucket_name:
|
||||
raise ValueError("Storage `aws_storage` require `aws_bucket_name`")
|
||||
@@ -73,17 +74,26 @@ class AwsStorage(Storage):
|
||||
self._access_key_id = aws_access_key_id
|
||||
self._secret_access_key = aws_secret_access_key
|
||||
self._role_arn = aws_role_arn
|
||||
self._endpoint_url = aws_endpoint_url
|
||||
|
||||
self.aws_folder = ""
|
||||
if "/" in aws_bucket_name:
|
||||
self._bucket_name, self.aws_folder = aws_bucket_name.split("/", 1)
|
||||
self.boto_config = Config(retries={"max_attempts": 3, "mode": "adaptive"})
|
||||
|
||||
config_kwargs: dict = {"retries": {"max_attempts": 3, "mode": "adaptive"}}
|
||||
if aws_endpoint_url:
|
||||
config_kwargs["s3"] = {"addressing_style": "path"}
|
||||
self.boto_config = Config(**config_kwargs)
|
||||
|
||||
self.session = aioboto3.Session(
|
||||
aws_access_key_id=aws_access_key_id,
|
||||
aws_secret_access_key=aws_secret_access_key,
|
||||
region_name=aws_region,
|
||||
)
|
||||
self.base_url = f"https://{self._bucket_name}.s3.amazonaws.com/"
|
||||
if aws_endpoint_url:
|
||||
self.base_url = f"{aws_endpoint_url}/{self._bucket_name}/"
|
||||
else:
|
||||
self.base_url = f"https://{self._bucket_name}.s3.amazonaws.com/"
|
||||
|
||||
# Implement credential properties
|
||||
@property
|
||||
@@ -139,7 +149,9 @@ class AwsStorage(Storage):
|
||||
s3filename = f"{folder}/{filename}" if folder else filename
|
||||
logger.info(f"Uploading {filename} to S3 {actual_bucket}/{folder}")
|
||||
|
||||
async with self.session.client("s3", config=self.boto_config) as client:
|
||||
async with self.session.client(
|
||||
"s3", config=self.boto_config, endpoint_url=self._endpoint_url
|
||||
) as client:
|
||||
if isinstance(data, bytes):
|
||||
await client.put_object(Bucket=actual_bucket, Key=s3filename, Body=data)
|
||||
else:
|
||||
@@ -162,7 +174,9 @@ class AwsStorage(Storage):
|
||||
actual_bucket = bucket or self._bucket_name
|
||||
folder = self.aws_folder
|
||||
s3filename = f"{folder}/{filename}" if folder else filename
|
||||
async with self.session.client("s3", config=self.boto_config) as client:
|
||||
async with self.session.client(
|
||||
"s3", config=self.boto_config, endpoint_url=self._endpoint_url
|
||||
) as client:
|
||||
presigned_url = await client.generate_presigned_url(
|
||||
operation,
|
||||
Params={"Bucket": actual_bucket, "Key": s3filename},
|
||||
@@ -177,7 +191,9 @@ class AwsStorage(Storage):
|
||||
folder = self.aws_folder
|
||||
logger.info(f"Deleting {filename} from S3 {actual_bucket}/{folder}")
|
||||
s3filename = f"{folder}/{filename}" if folder else filename
|
||||
async with self.session.client("s3", config=self.boto_config) as client:
|
||||
async with self.session.client(
|
||||
"s3", config=self.boto_config, endpoint_url=self._endpoint_url
|
||||
) as client:
|
||||
await client.delete_object(Bucket=actual_bucket, Key=s3filename)
|
||||
|
||||
@handle_s3_client_errors("download")
|
||||
@@ -186,7 +202,9 @@ class AwsStorage(Storage):
|
||||
folder = self.aws_folder
|
||||
logger.info(f"Downloading {filename} from S3 {actual_bucket}/{folder}")
|
||||
s3filename = f"{folder}/{filename}" if folder else filename
|
||||
async with self.session.client("s3", config=self.boto_config) as client:
|
||||
async with self.session.client(
|
||||
"s3", config=self.boto_config, endpoint_url=self._endpoint_url
|
||||
) as client:
|
||||
response = await client.get_object(Bucket=actual_bucket, Key=s3filename)
|
||||
return await response["Body"].read()
|
||||
|
||||
@@ -201,7 +219,9 @@ class AwsStorage(Storage):
|
||||
logger.info(f"Listing objects from S3 {actual_bucket} with prefix '{s3prefix}'")
|
||||
|
||||
keys = []
|
||||
async with self.session.client("s3", config=self.boto_config) as client:
|
||||
async with self.session.client(
|
||||
"s3", config=self.boto_config, endpoint_url=self._endpoint_url
|
||||
) as client:
|
||||
paginator = client.get_paginator("list_objects_v2")
|
||||
async for page in paginator.paginate(Bucket=actual_bucket, Prefix=s3prefix):
|
||||
if "Contents" in page:
|
||||
@@ -227,7 +247,9 @@ class AwsStorage(Storage):
|
||||
folder = self.aws_folder
|
||||
logger.info(f"Streaming {filename} from S3 {actual_bucket}/{folder}")
|
||||
s3filename = f"{folder}/{filename}" if folder else filename
|
||||
async with self.session.client("s3", config=self.boto_config) as client:
|
||||
async with self.session.client(
|
||||
"s3", config=self.boto_config, endpoint_url=self._endpoint_url
|
||||
) as client:
|
||||
await client.download_fileobj(
|
||||
Bucket=actual_bucket, Key=s3filename, Fileobj=fileobj
|
||||
)
|
||||
|
||||
@@ -319,3 +319,51 @@ def test_aws_storage_constructor_rejects_mixed_auth():
|
||||
aws_secret_access_key="test-secret",
|
||||
aws_role_arn="arn:aws:iam::123456789012:role/test-role",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aws_storage_custom_endpoint_url():
|
||||
"""Test that custom endpoint_url configures path-style addressing and passes endpoint to client."""
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name="reflector-media",
|
||||
aws_region="garage",
|
||||
aws_access_key_id="GKtest",
|
||||
aws_secret_access_key="secret",
|
||||
aws_endpoint_url="http://garage:3900",
|
||||
)
|
||||
assert storage._endpoint_url == "http://garage:3900"
|
||||
assert storage.boto_config.s3["addressing_style"] == "path"
|
||||
assert storage.base_url == "http://garage:3900/reflector-media/"
|
||||
# retries config preserved (merge, not replace)
|
||||
assert storage.boto_config.retries["max_attempts"] == 3
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.put_object = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_client.generate_presigned_url = AsyncMock(
|
||||
return_value="http://garage:3900/reflector-media/test.txt"
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
storage.session, "client", return_value=mock_client
|
||||
) as mock_session_client:
|
||||
await storage.put_file("test.txt", b"data")
|
||||
mock_session_client.assert_called_with(
|
||||
"s3", config=storage.boto_config, endpoint_url="http://garage:3900"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aws_storage_none_endpoint_url():
|
||||
"""Test that None endpoint preserves current AWS behavior."""
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name="reflector-bucket",
|
||||
aws_region="us-east-1",
|
||||
aws_access_key_id="AKIAtest",
|
||||
aws_secret_access_key="secret",
|
||||
)
|
||||
assert storage._endpoint_url is None
|
||||
assert storage.base_url == "https://reflector-bucket.s3.amazonaws.com/"
|
||||
# No s3 addressing_style override — boto_config should only have retries
|
||||
assert not hasattr(storage.boto_config, "s3") or storage.boto_config.s3 is None
|
||||
|
||||
Reference in New Issue
Block a user