Skip to content

Sessions Audio Presigned PUT Migration — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Migrate POST /sessions/{id}/audio?windowIndex=N from a synchronous body-bearing handler to a presigned-PUT URL grant + S3-event-driven completion handler. Removes the 29s API Gateway cap as a failure class for multi-MB WAV chunks.

Architecture: SessionAudioFunction (renamed conceptually; AWS resource stays AudioPostFunction) becomes a URL minter — accepts {sizeBytes} JSON, returns a presigned PUT URL with strict ContentType + ContentLength bindings. Client PUTs raw audio bytes directly to S3, bypassing API Gateway. A new AudioUploadCompleteFunction is invoked by an S3 ObjectCreated event on sessions/*/window_*/audio.wav; it does the DynamoDB ADD completedAudioWindows update and the _claim_and_trigger ingest invoke. Three Lambdas, one new. No new HTTP endpoints from the client's POV.

Tech Stack: Python 3.12, AWS SAM, boto3, moto for unit/integration tests, pytest, React Native + TypeScript + Jest.

Spec: docs/plans/2026-05-19-sessions-audio-presigned.md — read before starting.


Conventions

  • All server tests live in main/server/tests/unit/ and main/server/tests/integration/. The tests/worker_utils.py:load_worker_module helper imports handler code by file path.
  • Each task is one red-green-refactor cycle ending in a commit. Tests come first.
  • Commit messages follow Conventional Commits (feat(...), test(...), chore(...), etc.) — enforced by pre-commit hook.
  • Server commands assume cwd is main/server. Client commands assume cwd is main/app.
  • moto is already in main/server/tests/requirements.txt. If a needed mock isn't available, fall back to unittest.mock.patch.
  • Date in commit messages and the implementation period: 2026-05-20 onward (today).

File structure

Server (Python) — new + modified:

Path Action Responsibility
main/server/api/sessions/audio/app.py Rewrite URL minter: validate input, verify session in DDB, generate presigned PUT URL
main/server/events/__init__.py Create Empty — marks new package
main/server/events/audio_upload_complete/__init__.py Create Empty — marks new package
main/server/events/audio_upload_complete/app.py Create S3 event handler: parse key, DDB ADD, claim+trigger ingest
main/server/template.yaml Modify Rewrite AudioPostFunction env/policies; add AudioUploadCompleteFunction + AudioUploadCompleteDLQ + S3 NotificationConfiguration
main/server/tests/unit/test_audio_url_minter.py Create Unit tests for rewritten SessionAudioFunction
main/server/tests/unit/test_audio_upload_complete.py Create Unit tests for new event handler
main/server/tests/integration/test_audio_presigned_flow.py Create End-to-end with moto: URL grant → S3 PUT → event → DDB → ingest invoke
main/server/tests/unit/test_audio_chunk_api.py Delete Superseded by test_audio_url_minter.py; old behavior gone

Client (TypeScript) — modified:

Path Action Responsibility
main/app/lib/capture-session.ts Modify Rewrite audio branch of uploadItem (lines 103-112) — 2-step presigned flow
main/app/__tests__/capture-session-audio-upload.test.ts Create Jest tests for new audio upload flow

Constants (used across tasks):

MAX_AUDIO_BYTES = 10 * 1024 * 1024  # 10 MB
URL_EXPIRES_SECONDS = 300            # 5 min
S3_KEY_REGEX = r"^sessions/(?P<session_id>[^/]+)/window_(?P<window_index>\d{3})/audio\.wav$"

Task 1: Tests for URL minter happy path + URL bindings

Files: - Test: main/server/tests/unit/test_audio_url_minter.py (create)

  • [ ] Step 1: Write the failing test file
# main/server/tests/unit/test_audio_url_minter.py
"""Unit tests for the rewritten SessionAudioFunction (URL minter).

Each test cites a row in docs/plans/2026-05-19-sessions-audio-presigned.md
either from Happy Path step 2 or from the Failure Modes table.
"""

from __future__ import annotations

import json
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from ..worker_utils import load_worker_module

_APP_PATH = (
    Path(__file__).resolve().parents[2] / "api" / "sessions" / "audio" / "app.py"
)


def _load_module():
    return load_worker_module("sessions_audio_app", _APP_PATH)


def _event(session_id: str = "sess-1", window_index: int | str | None = 7, body: dict | None = None) -> dict:
    event: dict = {
        "pathParameters": {"sessionId": session_id} if session_id else {},
    }
    if window_index is not None:
        event["queryStringParameters"] = {"windowIndex": str(window_index)}
    else:
        event["queryStringParameters"] = None
    event["body"] = json.dumps(body) if body is not None else json.dumps({"sizeBytes": 1_923_456})
    return event


def _ddb_session_exists(session_id: str = "sess-1") -> MagicMock:
    """Mock DynamoDB table whose get_item returns a session row."""
    mock_table = MagicMock()
    mock_table.get_item.return_value = {"Item": {"sessionId": session_id}}
    return mock_table


class TestURLMinterHappyPath:
    @staticmethod
    def test_returns_presigned_url_with_correct_s3_key_for_valid_request():
        module = _load_module()
        mock_s3 = MagicMock()
        mock_s3.generate_presigned_url.return_value = "https://s3.example/presigned"
        with (
            patch.object(module, "s3", mock_s3),
            patch.object(module, "table", _ddb_session_exists("sess-1")),
        ):
            result = module.lambda_handler(_event("sess-1", 7, {"sizeBytes": 1_923_456}), None)

        assert result["statusCode"] == 200
        body = json.loads(result["body"])
        assert body["s3Key"] == "sessions/sess-1/window_007/audio.wav"
        assert body["windowIndex"] == 7
        assert body["url"] == "https://s3.example/presigned"
        assert body["expiresIn"] == 300

    @staticmethod
    def test_url_binds_content_type_audio_wav():
        module = _load_module()
        mock_s3 = MagicMock()
        mock_s3.generate_presigned_url.return_value = "https://s3.example/presigned"
        with (
            patch.object(module, "s3", mock_s3),
            patch.object(module, "table", _ddb_session_exists("sess-1")),
        ):
            module.lambda_handler(_event("sess-1", 7, {"sizeBytes": 1000}), None)

        kwargs = mock_s3.generate_presigned_url.call_args.kwargs
        assert kwargs["Params"]["ContentType"] == "audio/wav"

    @staticmethod
    def test_url_binds_exact_content_length_from_size_bytes():
        module = _load_module()
        mock_s3 = MagicMock()
        mock_s3.generate_presigned_url.return_value = "u"
        with (
            patch.object(module, "s3", mock_s3),
            patch.object(module, "table", _ddb_session_exists("sess-1")),
        ):
            module.lambda_handler(_event("sess-1", 7, {"sizeBytes": 1_923_456}), None)

        kwargs = mock_s3.generate_presigned_url.call_args.kwargs
        assert kwargs["Params"]["ContentLength"] == 1_923_456

    @staticmethod
    def test_url_expires_in_300_seconds():
        module = _load_module()
        mock_s3 = MagicMock()
        mock_s3.generate_presigned_url.return_value = "u"
        with (
            patch.object(module, "s3", mock_s3),
            patch.object(module, "table", _ddb_session_exists("sess-1")),
        ):
            module.lambda_handler(_event(), None)

        kwargs = mock_s3.generate_presigned_url.call_args.kwargs
        assert kwargs["ExpiresIn"] == 300
  • [ ] Step 2: Run tests, confirm they fail (no production code yet)

Run: cd main/server && pytest tests/unit/test_audio_url_minter.py -v Expected: All 4 tests FAIL — current app.py returns the legacy response shape, not {url, s3Key, windowIndex, expiresIn}.

  • [ ] Step 3: Implement minimal URL minter in app.py

Replace the entire contents of main/server/api/sessions/audio/app.py:

# main/server/api/sessions/audio/app.py
"""Audio Upload — issues a presigned PUT URL for per-window WAV chunks.

This handler does not accept the audio body. The client uses the returned
URL to PUT bytes directly to S3, bypassing the 29s API Gateway cap.
Completion is handled by AudioUploadCompleteFunction via S3 ObjectCreated.
"""

from __future__ import annotations

import json
import os

import boto3
from shared.logger import create_logger

BUCKET = os.environ.get("BUCKET_NAME", "encache-raw-memory")
SESSIONS_TABLE = os.environ.get("SESSIONS_TABLE_NAME", "encache-sessions")
URL_EXPIRES_SECONDS = 300
MAX_AUDIO_BYTES = 10 * 1024 * 1024  # 10 MB

s3 = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(SESSIONS_TABLE)

logger = create_logger({"flow": "audio_url_grant"})


def _bad_request(message: str) -> dict:
    return {
        "statusCode": 400,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"error": message}),
    }


def _not_found(message: str) -> dict:
    return {
        "statusCode": 404,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"error": message}),
    }


def lambda_handler(event, _context):
    session_id = (event.get("pathParameters") or {}).get("sessionId", "")
    if not session_id:
        return _bad_request("sessionId required")

    query_params = event.get("queryStringParameters") or {}
    window_index_str = query_params.get("windowIndex")
    if window_index_str is None:
        return _bad_request("windowIndex required")
    try:
        window_index = int(window_index_str)
    except (ValueError, TypeError):
        return _bad_request("windowIndex must be an integer")
    if window_index < 0:
        return _bad_request("windowIndex must be non-negative")

    raw_body = event.get("body") or ""
    try:
        body = json.loads(raw_body) if raw_body else {}
    except json.JSONDecodeError:
        return _bad_request("body must be valid JSON")
    size_bytes = body.get("sizeBytes")
    if not isinstance(size_bytes, int) or size_bytes <= 0:
        return _bad_request("sizeBytes must be a positive integer")
    if size_bytes > MAX_AUDIO_BYTES:
        return _bad_request(f"sizeBytes exceeds cap of {MAX_AUDIO_BYTES}")

    resp = table.get_item(Key={"sessionId": session_id}, ProjectionExpression="sessionId")
    if "Item" not in resp:
        return _not_found("session not found")

    s3_key = f"sessions/{session_id}/window_{window_index:03d}/audio.wav"
    url = s3.generate_presigned_url(
        "put_object",
        Params={
            "Bucket": BUCKET,
            "Key": s3_key,
            "ContentType": "audio/wav",
            "ContentLength": size_bytes,
        },
        ExpiresIn=URL_EXPIRES_SECONDS,
    )

    logger(
        {
            "step": "audio_url_granted",
            "additional": {
                "session_id": session_id,
                "window_index": window_index,
                "size_bytes": size_bytes,
                "s3_key": s3_key,
            },
        }
    )

    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(
            {
                "url": url,
                "s3Key": s3_key,
                "windowIndex": window_index,
                "expiresIn": URL_EXPIRES_SECONDS,
            }
        ),
    }
  • [ ] Step 4: Run tests, confirm they pass

Run: cd main/server && pytest tests/unit/test_audio_url_minter.py -v Expected: 4 PASS.

  • [ ] Step 5: Commit
git add main/server/api/sessions/audio/app.py main/server/tests/unit/test_audio_url_minter.py
git commit -m "feat(audio): rewrite SessionAudioFunction as presigned URL minter

Returns {url, s3Key, windowIndex, expiresIn} with strict
ContentType=audio/wav and exact ContentLength binding. Body is no
longer accepted through API Gateway. Completion handled separately by
AudioUploadCompleteFunction via S3 ObjectCreated event.

Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"

Task 2: Tests for URL minter input validation (400 cases)

Files: - Test: main/server/tests/unit/test_audio_url_minter.py (append)

  • [ ] Step 1: Append the failing test class

Add this class to test_audio_url_minter.py:

class TestURLMinterInputValidation:
    @staticmethod
    def test_returns_400_when_session_id_path_param_missing():
        module = _load_module()
        with patch.object(module, "table", MagicMock()):
            event = {"pathParameters": {}, "queryStringParameters": {"windowIndex": "0"}, "body": json.dumps({"sizeBytes": 1})}
            result = module.lambda_handler(event, None)
        assert result["statusCode"] == 400

    @staticmethod
    def test_returns_400_when_window_index_missing():
        module = _load_module()
        with patch.object(module, "table", _ddb_session_exists()):
            event = _event(window_index=None, body={"sizeBytes": 1})
            result = module.lambda_handler(event, None)
        assert result["statusCode"] == 400

    @staticmethod
    def test_returns_400_when_window_index_negative():
        module = _load_module()
        with patch.object(module, "table", _ddb_session_exists()):
            result = module.lambda_handler(_event(window_index=-1, body={"sizeBytes": 1}), None)
        assert result["statusCode"] == 400

    @staticmethod
    def test_returns_400_when_window_index_non_integer():
        module = _load_module()
        with patch.object(module, "table", _ddb_session_exists()):
            result = module.lambda_handler(_event(window_index="abc", body={"sizeBytes": 1}), None)
        assert result["statusCode"] == 400

    @staticmethod
    def test_returns_400_when_size_bytes_missing():
        module = _load_module()
        with patch.object(module, "table", _ddb_session_exists()):
            result = module.lambda_handler(_event(body={}), None)
        assert result["statusCode"] == 400

    @staticmethod
    def test_returns_400_when_size_bytes_zero_or_negative():
        module = _load_module()
        with patch.object(module, "table", _ddb_session_exists()):
            assert module.lambda_handler(_event(body={"sizeBytes": 0}), None)["statusCode"] == 400
            assert module.lambda_handler(_event(body={"sizeBytes": -1}), None)["statusCode"] == 400

    @staticmethod
    def test_returns_400_when_size_bytes_exceeds_10mb_cap():
        module = _load_module()
        with patch.object(module, "table", _ddb_session_exists()):
            result = module.lambda_handler(_event(body={"sizeBytes": 10 * 1024 * 1024 + 1}), None)
        assert result["statusCode"] == 400
  • [ ] Step 2: Run tests, confirm they pass

Run: cd main/server && pytest tests/unit/test_audio_url_minter.py::TestURLMinterInputValidation -v Expected: 7 PASS (handler already implements the validation from Task 1).

  • [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_url_minter.py
git commit -m "test(audio): cover URL minter input validation 400 cases"

Task 3: Tests for URL minter session-not-found (404)

Files: - Test: main/server/tests/unit/test_audio_url_minter.py (append)

  • [ ] Step 1: Append the failing test class
class TestURLMinterSessionLookup:
    @staticmethod
    def test_returns_404_when_session_id_not_in_dynamodb():
        module = _load_module()
        mock_table = MagicMock()
        mock_table.get_item.return_value = {}  # no Item key = not found
        with patch.object(module, "table", mock_table):
            result = module.lambda_handler(_event("unknown-sess", 0, {"sizeBytes": 1}), None)
        assert result["statusCode"] == 404
  • [ ] Step 2: Run, confirm pass

Run: cd main/server && pytest tests/unit/test_audio_url_minter.py::TestURLMinterSessionLookup -v Expected: PASS (handler already returns 404 on missing Item).

  • [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_url_minter.py
git commit -m "test(audio): cover URL minter 404 on unknown session"

Task 4: Scaffold AudioUploadCompleteFunction + S3 key parser

Files: - Create: main/server/events/__init__.py (empty) - Create: main/server/events/audio_upload_complete/__init__.py (empty) - Create: main/server/events/audio_upload_complete/app.py - Create: main/server/tests/unit/test_audio_upload_complete.py

  • [ ] Step 1: Create empty package markers
mkdir -p main/server/events/audio_upload_complete
touch main/server/events/__init__.py
touch main/server/events/audio_upload_complete/__init__.py
  • [ ] Step 2: Write the failing test file
# main/server/tests/unit/test_audio_upload_complete.py
"""Unit tests for AudioUploadCompleteFunction (S3 event handler).

Each test cites a Failure Modes table row or Happy Path step in
docs/plans/2026-05-19-sessions-audio-presigned.md.
"""

from __future__ import annotations

from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from ..worker_utils import load_worker_module

_APP_PATH = (
    Path(__file__).resolve().parents[2]
    / "events"
    / "audio_upload_complete"
    / "app.py"
)


def _load_module():
    return load_worker_module("audio_upload_complete_app", _APP_PATH)


def _s3_event(key: str = "sessions/sess-1/window_007/audio.wav") -> dict:
    return {
        "Records": [
            {
                "s3": {
                    "bucket": {"name": "encache-raw-memory"},
                    "object": {"key": key},
                }
            }
        ]
    }


class TestS3KeyParser:
    @staticmethod
    def test_parses_valid_key():
        module = _load_module()
        result = module._parse_key("sessions/sess-1/window_007/audio.wav")
        assert result == ("sess-1", 7)

    @staticmethod
    def test_returns_none_for_malformed_key():
        module = _load_module()
        for bad in [
            "wrong/prefix/window_000/audio.wav",
            "sessions/sess-1/audio.wav",
            "sessions/sess-1/window_xyz/audio.wav",
            "sessions/sess-1/window_007/audio.mp3",
            "",
        ]:
            assert module._parse_key(bad) is None
  • [ ] Step 3: Run, confirm fail

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestS3KeyParser -v Expected: FAIL — app.py doesn't exist yet, import errors.

  • [ ] Step 4: Implement minimal app.py with parser only
# main/server/events/audio_upload_complete/app.py
"""Audio Upload Complete — S3 ObjectCreated handler.

Triggered by S3 events on sessions/*/window_*/audio.wav. Updates
DynamoDB completedAudioWindows and conditionally triggers ingest.
"""

from __future__ import annotations

import json
import os
import re

import boto3
from shared.logger import create_logger

SESSIONS_TABLE = os.environ.get("SESSIONS_TABLE_NAME", "encache-sessions")
INGEST_FUNCTION = os.environ.get("INGEST_FUNCTION_NAME", "")
DEFAULT_FRAME_COUNT_PER_WINDOW = 60

_KEY_RE = re.compile(
    r"^sessions/(?P<session_id>[^/]+)/window_(?P<window_index>\d{3})/audio\.wav$"
)

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(SESSIONS_TABLE)
lambda_client = boto3.client("lambda")

logger = create_logger({"flow": "audio_upload_complete"})


def _parse_key(key: str) -> tuple[str, int] | None:
    match = _KEY_RE.match(key)
    if not match:
        return None
    return match.group("session_id"), int(match.group("window_index"))


def lambda_handler(event, _context):
    # Implementation grows across subsequent tasks.
    return {"ok": True}
  • [ ] Step 5: Run, confirm pass

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestS3KeyParser -v Expected: PASS.

  • [ ] Step 6: Commit
git add main/server/events/ main/server/tests/unit/test_audio_upload_complete.py
git commit -m "feat(audio-upload-complete): scaffold S3 event handler + key parser

New event-driven Lambda under main/server/events/. Top-level events/
directory introduced for non-API Lambdas (no existing precedent;
keeps separation from api/ for HTTP and worldmm/ for the ingest
pipeline).

Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"

Task 5: AudioUploadComplete — DDB ADD + audio_only mode triggers ingest

Post-execution note: The _claim_and_trigger snippet below was updated after the plan ran to reflect the cubic review fixes that landed in commit bf694ebc0. The shipped helper now rolls back the claim via DDB DELETE ingestTriggeredWindows when the Lambda invoke fails, emitting ingest_claim_rolled_back (or ingest_claim_rollback_failed if the rollback DDB call itself errors). See the spec's failure-modes table for the rationale.

Files: - Modify: main/server/events/audio_upload_complete/app.py - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)

  • [ ] Step 1: Append failing tests
class TestAudioOnlyMode:
    @staticmethod
    def test_happy_path_audio_only_mode_triggers_ingest_immediately():
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.return_value = {
            "Attributes": {
                "sessionId": "sess-1",
                "captureMode": "audio_only",
                "completedAudioWindows": {7},
                "completedFrameWindows": set(),
                "userId": "user-1",
            }
        }
        mock_lambda = MagicMock()
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
            patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
        ):
            module.lambda_handler(_s3_event("sessions/sess-1/window_007/audio.wav"), None)

        # DDB ADD called with correct window index
        update_kwargs = mock_table.update_item.call_args.kwargs
        assert update_kwargs["Key"] == {"sessionId": "sess-1"}
        assert ":win" in update_kwargs["ExpressionAttributeValues"]
        assert 7 in update_kwargs["ExpressionAttributeValues"][":win"]

        # Ingest invoked
        invoke_kwargs = mock_lambda.invoke.call_args.kwargs
        assert invoke_kwargs["FunctionName"] == "IngestFn"
        assert invoke_kwargs["InvocationType"] == "Event"
        payload = json.loads(invoke_kwargs["Payload"])
        assert payload["sessionId"] == "sess-1"
        assert payload["windowIndex"] == 7

    @staticmethod
    def test_adds_window_index_to_completed_audio_windows_in_ddb():
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.return_value = {
            "Attributes": {
                "captureMode": "audio_only",
                "completedAudioWindows": {3},
                "completedFrameWindows": set(),
                "userId": "u",
            }
        }
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", MagicMock()),
        ):
            module.lambda_handler(_s3_event("sessions/sess-1/window_003/audio.wav"), None)

        kwargs = mock_table.update_item.call_args.kwargs
        assert kwargs["UpdateExpression"] == "ADD completedAudioWindows :win"

Add import json, os at the top of test_audio_upload_complete.py if not already present.

  • [ ] Step 2: Run, confirm fail

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestAudioOnlyMode -v Expected: FAIL — handler returns {"ok": True} without doing any DDB or invoke work.

  • [ ] Step 3: Implement DDB + ingest trigger logic

Replace the lambda_handler function (and add _claim_and_trigger helper) in main/server/events/audio_upload_complete/app.py:

def _claim_and_trigger(
    session_id: str, window_index: int, user_id: str, frame_count: int
) -> bool:
    """Conditional claim + async invoke of IngestWindowFunction.

    If the invoke fails, the claim is rolled back so AWS async retry can
    re-claim and re-invoke. Without the rollback, a transient invoke
    failure would leave the claim in place and silently drop the window
    on retry (the conditional would reject the second attempt).

    Returns True if we won the claim race AND the invoke succeeded.
    Returns False if another invocation already claimed this window.
    Raises if the invoke fails — caller (lambda_handler) propagates so
    AWS async-invoke retry kicks in. By the time the retry runs, the
    claim has been rolled back so the new attempt can succeed.
    """
    from botocore.exceptions import ClientError

    try:
        table.update_item(
            Key={"sessionId": session_id},
            UpdateExpression="ADD ingestTriggeredWindows :win",
            ConditionExpression=(
                "attribute_not_exists(ingestTriggeredWindows) "
                "OR NOT contains(ingestTriggeredWindows, :winVal)"
            ),
            ExpressionAttributeValues={
                ":win": {window_index},
                ":winVal": window_index,
            },
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            logger(
                {
                    "step": "ingest_claim_lost",
                    "additional": {
                        "session_id": session_id,
                        "window_index": window_index,
                    },
                }
            )
            return False
        raise

    try:
        lambda_client.invoke(
            FunctionName=INGEST_FUNCTION,
            InvocationType="Event",
            Payload=json.dumps(
                {
                    "sessionId": session_id,
                    "userId": user_id,
                    "windowIndex": window_index,
                    "frameCount": frame_count,
                }
            ),
        )
    except Exception:
        # Roll back the claim so AWS async retry can re-claim + re-invoke.
        # Without this, the conditional rejects the retry and the window
        # is silently dropped.
        try:
            table.update_item(
                Key={"sessionId": session_id},
                UpdateExpression="DELETE ingestTriggeredWindows :win",
                ExpressionAttributeValues={":win": {window_index}},
            )
            logger(
                {
                    "step": "ingest_claim_rolled_back",
                    "additional": {
                        "session_id": session_id,
                        "window_index": window_index,
                    },
                }
            )
        except Exception as rollback_err:
            # If rollback fails, log loudly but still re-raise the original
            # invoke failure. The rollback failure is recoverable manually
            # (operator can delete the window from ingestTriggeredWindows).
            logger(
                {
                    "step": "ingest_claim_rollback_failed",
                    "additional": {
                        "session_id": session_id,
                        "window_index": window_index,
                        "error": str(rollback_err),
                    },
                }
            )
        raise

    return True


def lambda_handler(event, _context):
    for record in event.get("Records", []):
        key = record.get("s3", {}).get("object", {}).get("key", "")
        parsed = _parse_key(key)
        if parsed is None:
            logger(
                {"step": "audio_upload_malformed_key", "additional": {"key": key}}
            )
            continue
        session_id, window_index = parsed

        resp = table.update_item(
            Key={"sessionId": session_id},
            UpdateExpression="ADD completedAudioWindows :win",
            ExpressionAttributeValues={":win": {window_index}},
            ReturnValues="ALL_NEW",
        )
        attrs = resp.get("Attributes") or {}
        capture_mode = attrs.get("captureMode", "audio_video")
        user_id = attrs.get("userId", "")
        completed_frames = attrs.get("completedFrameWindows") or set()

        should_trigger = capture_mode == "audio_only" or window_index in completed_frames
        if should_trigger and INGEST_FUNCTION:
            frame_count = 0 if capture_mode == "audio_only" else DEFAULT_FRAME_COUNT_PER_WINDOW
            _claim_and_trigger(session_id, window_index, user_id, frame_count)

        logger(
            {
                "step": "audio_upload_registered",
                "additional": {
                    "session_id": session_id,
                    "window_index": window_index,
                    "capture_mode": capture_mode,
                },
            }
        )

    return {"ok": True}
  • [ ] Step 4: Run, confirm pass

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestAudioOnlyMode -v Expected: 2 PASS.

  • [ ] Step 5: Commit
git add main/server/events/audio_upload_complete/app.py main/server/tests/unit/test_audio_upload_complete.py
git commit -m "feat(audio-upload-complete): DDB ADD and ingest trigger in audio_only mode"

Task 6: AudioUploadComplete — audio_video mode race with frames

Files: - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)

  • [ ] Step 1: Append failing tests
class TestAudioVideoMode:
    @staticmethod
    def test_audio_video_mode_triggers_ingest_only_when_frames_ready():
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.return_value = {
            "Attributes": {
                "captureMode": "audio_video",
                "completedAudioWindows": {7},
                "completedFrameWindows": {7},  # frames already done
                "userId": "u",
            }
        }
        mock_lambda = MagicMock()
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
            patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
        ):
            module.lambda_handler(_s3_event("sessions/sess-1/window_007/audio.wav"), None)

        # Invoke should fire — frames are ready
        assert mock_lambda.invoke.called

    @staticmethod
    def test_audio_video_mode_does_not_trigger_when_frames_pending():
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.return_value = {
            "Attributes": {
                "captureMode": "audio_video",
                "completedAudioWindows": {7},
                "completedFrameWindows": set(),  # frames NOT done
                "userId": "u",
            }
        }
        mock_lambda = MagicMock()
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
        ):
            module.lambda_handler(_s3_event("sessions/sess-1/window_007/audio.wav"), None)

        # Invoke should NOT fire — frames pending
        assert not mock_lambda.invoke.called
  • [ ] Step 2: Run, confirm pass

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestAudioVideoMode -v Expected: 2 PASS (Task 5 handler already implements the should_trigger logic).

  • [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_upload_complete.py
git commit -m "test(audio-upload-complete): cover audio_video mode trigger race"

Task 7: AudioUploadComplete — error handling paths

Files: - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)

  • [ ] Step 1: Append failing tests
class TestErrorHandling:
    @staticmethod
    def test_malformed_s3_key_logs_and_returns_success():
        module = _load_module()
        mock_table = MagicMock()
        with patch.object(module, "table", mock_table):
            result = module.lambda_handler(_s3_event("garbage/path/no-match"), None)
        assert result["ok"] is True
        assert not mock_table.update_item.called

    @staticmethod
    def test_unknown_session_returns_success_without_invoking_ingest():
        """DDB returns no Attributes when session row doesn't exist (or row exists but no fields).
        Handler should still succeed without invoking ingest."""
        module = _load_module()
        mock_table = MagicMock()
        # update_item on a non-existent row creates an empty row with the
        # ADD attribute — Attributes will contain only completedAudioWindows.
        mock_table.update_item.return_value = {
            "Attributes": {"completedAudioWindows": {7}}
        }
        mock_lambda = MagicMock()
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
        ):
            result = module.lambda_handler(_s3_event("sessions/ghost/window_007/audio.wav"), None)
        assert result["ok"] is True
        # captureMode defaults to "audio_video"; completedFrameWindows is empty → no trigger
        assert not mock_lambda.invoke.called

    @staticmethod
    def test_ddb_throttle_raises_for_aws_retry():
        from botocore.exceptions import ClientError
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.side_effect = ClientError(
            {"Error": {"Code": "ProvisionedThroughputExceededException", "Message": "throttled"}},
            "UpdateItem",
        )
        with patch.object(module, "table", mock_table):
            with pytest.raises(ClientError):
                module.lambda_handler(_s3_event(), None)

    @staticmethod
    def test_ingest_invoke_failure_raises_for_aws_retry():
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.return_value = {
            "Attributes": {
                "captureMode": "audio_only",
                "userId": "u",
                "completedFrameWindows": set(),
            }
        }
        mock_lambda = MagicMock()
        mock_lambda.invoke.side_effect = RuntimeError("boom")
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
            patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
        ):
            with pytest.raises(RuntimeError):
                module.lambda_handler(_s3_event(), None)
  • [ ] Step 2: Run, confirm pass

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestErrorHandling -v Expected: 4 PASS. Handler already skips malformed keys (Task 5), defaults capture_mode so unknown sessions are no-op, and propagates raises from DDB and lambda_client.

  • [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_upload_complete.py
git commit -m "test(audio-upload-complete): cover error/recovery handling"

Task 8: AudioUploadComplete — duplicate event idempotency

Files: - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)

  • [ ] Step 1: Append failing test
class TestIdempotency:
    @staticmethod
    def test_duplicate_s3_event_does_not_double_trigger_ingest():
        """Second event for same key fails the conditional claim → no second invoke."""
        from botocore.exceptions import ClientError
        module = _load_module()

        mock_table = MagicMock()
        # First call to update_item (audio ADD): succeeds with frames ready
        # Second call (claim): succeeds — first invocation
        # Third call (audio ADD again, second event): succeeds idempotently
        # Fourth call (claim, second event): conditional fails — already claimed
        mock_table.update_item.side_effect = [
            # First event: audio ADD
            {
                "Attributes": {
                    "captureMode": "audio_video",
                    "completedAudioWindows": {7},
                    "completedFrameWindows": {7},
                    "userId": "u",
                }
            },
            # First event: claim
            {"Attributes": {"ingestTriggeredWindows": {7}}},
            # Second event: audio ADD (idempotent — set semantics)
            {
                "Attributes": {
                    "captureMode": "audio_video",
                    "completedAudioWindows": {7},
                    "completedFrameWindows": {7},
                    "userId": "u",
                }
            },
            # Second event: claim fails conditional
            ClientError(
                {"Error": {"Code": "ConditionalCheckFailedException", "Message": "x"}},
                "UpdateItem",
            ),
        ]
        mock_lambda = MagicMock()
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
            patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
        ):
            evt = _s3_event("sessions/sess-1/window_007/audio.wav")
            module.lambda_handler(evt, None)
            module.lambda_handler(evt, None)

        # invoke called exactly once
        assert mock_lambda.invoke.call_count == 1

    @staticmethod
    def test_concurrent_audio_and_frame_completion_invoke_ingest_exactly_once():
        """Both an audio S3 event and a frame completion (from FramePostFunction)
        can race to call _claim_and_trigger for the same window. The conditional
        ensures exactly one invoke fires.

        Simulated by calling _claim_and_trigger twice in sequence — second call
        returns False because the conditional rejects the duplicate.
        """
        from botocore.exceptions import ClientError
        module = _load_module()
        mock_table = MagicMock()
        mock_table.update_item.side_effect = [
            {"Attributes": {"ingestTriggeredWindows": {7}}},  # first claim wins
            ClientError(  # second claim loses
                {"Error": {"Code": "ConditionalCheckFailedException", "Message": "x"}},
                "UpdateItem",
            ),
        ]
        mock_lambda = MagicMock()
        with (
            patch.object(module, "table", mock_table),
            patch.object(module, "lambda_client", mock_lambda),
            patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
        ):
            first = module._claim_and_trigger("sess-1", 7, "u", 60)
            second = module._claim_and_trigger("sess-1", 7, "u", 60)

        assert first is True
        assert second is False
        assert mock_lambda.invoke.call_count == 1
  • [ ] Step 2: Run, confirm pass

Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestIdempotency -v Expected: 2 PASS (claim conditional already implemented in Task 5).

  • [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_upload_complete.py
git commit -m "test(audio-upload-complete): cover duplicate S3 event idempotency"

Task 9: Update SAM template — new function + DLQ + IAM

Files: - Modify: main/server/template.yaml

  • [ ] Step 1: Edit AudioPostFunction policies — drop write/invoke perms

Find the AudioPostFunction block (around main/server/template.yaml:306-326). Replace its Environment.Variables and Policies sections:

  AudioPostFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: api/sessions/audio/
      Handler: app.lambda_handler
      Runtime: python3.12
      Layers:
        - !Ref SharedLayer
      Environment:
        Variables:
          BUCKET_NAME: !Ref RawMemoryBucket
          SESSIONS_TABLE_NAME: !Ref SessionsTable
      Policies:
        - !Ref S3AccessPolicy
        - Statement:
            - Effect: Allow
              Action: dynamodb:GetItem
              Resource: !GetAtt SessionsTable.Arn
      Events:
        AudioPost:
          Type: Api
          Properties:
            Path: /sessions/{sessionId}/audio
            Method: post

Notes: - Drops INGEST_FUNCTION_NAME env (handler no longer invokes ingest). - Drops DynamoDBPolicy (full table write) in favor of inline read-only GetItem. - Drops LambdaInvokePolicy. - Keeps S3AccessPolicy for presigned URL signing (signing is local but boto3 requires creds).

  • [ ] Step 2: Add AudioUploadCompleteDLQ resource

Append to the Resources: section (after the existing DLQ definitions; search for IngestDLQ to find a good neighbor):

  AudioUploadCompleteDLQ:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 720
      MessageRetentionPeriod: 1209600
  • [ ] Step 3: Add AudioUploadCompleteFunction resource

Append (place near IngestWindowFunction for cohesion):

  AudioUploadCompleteFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: events/audio_upload_complete/
      Handler: app.lambda_handler
      Runtime: python3.12
      Timeout: 30
      MemorySize: 256
      Layers:
        - !Ref SharedLayer
      Environment:
        Variables:
          SESSIONS_TABLE_NAME: !Ref SessionsTable
          INGEST_FUNCTION_NAME: !Ref IngestWindowFunction
      Policies:
        - !Ref DynamoDBPolicy
        - !Ref LambdaInvokePolicy
      EventInvokeConfig:
        MaximumRetryAttempts: 2
        DestinationConfig:
          OnFailure:
            Type: SQS
            Destination: !GetAtt AudioUploadCompleteDLQ.Arn
  • [ ] Step 4: Add S3 → Lambda permission grant
  AudioUploadCompletePermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt AudioUploadCompleteFunction.Arn
      Principal: s3.amazonaws.com
      SourceArn: !GetAtt RawMemoryBucket.Arn
      SourceAccount: !Ref AWS::AccountId

Note — Step 5 (SAM NotificationConfiguration) removed: Discovered during execution that encache-raw-memory is Terraform-managed (main/devops/main.tf:305), not SAM-managed. The S3 notification wiring lives in Task 9b (added during execution) — see docs/plans/2026-05-20-sessions-audio-presigned-implementation.md Task 9b for the Terraform change. SAM continues to own the Lambda Permission grant (Step 4 above) since the Lambda itself is SAM-managed.

  • [ ] ~~Step 5: Add S3 NotificationConfiguration on RawMemoryBucket~~ — SUPERSEDED, see Task 9b.

~~Find the RawMemoryBucket resource definition (grep RawMemoryBucket: in template.yaml). Add or extend its NotificationConfiguration:~~

~~If RawMemoryBucket already has a NotificationConfiguration, merge the new LambdaConfigurations entry into the existing list. Also ensure RawMemoryBucket has DependsOn: AudioUploadCompletePermission so CloudFormation creates the permission before wiring up the notification (S3 rejects notification configs that point at unauthorized functions).~~

  • [ ] Step 6: Validate template

Run: cd main/server && sam validate --lint Expected: <path>/template.yaml is a valid SAM Template.

  • [ ] Step 7: Commit
git add main/server/template.yaml
git commit -m "chore(infra): add AudioUploadCompleteFunction + DLQ + S3 notification

- New SAM function under events/audio_upload_complete/.
- Dedicated SQS DLQ with 2 retries before drop.
- S3 ObjectCreated event on sessions/*/audio.wav prefix.
- Downgrades AudioPostFunction IAM to S3 sign + DDB GetItem only.

Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"

Task 9b: Terraform S3 notification → AudioUploadCompleteFunction

Files: - Modify: main/devops/main.tf

Added during plan execution after discovering the bucket is Terraform-managed.

  • Adds data "aws_lambda_function" "audio_upload_complete" that looks up the SAM-deployed Lambda by deterministic name server-AudioUploadCompleteFunction (name pinned via SAM FunctionName property).
  • Adds aws_s3_bucket_notification.raw_data_audio filtering s3:ObjectCreated:* on prefix sessions/, suffix audio.wav.
  • Lambda permission grant remains in SAM (AudioUploadCompletePermission resource).
  • Deploy order: sam deploy first, then terraform apply.

Validation: cd main/devops && terraform fmt -check && terraform validate.

Implemented in commit 35814ea35 + 384bc8e75 (pinned function name for deterministic lookup).


Task 10: Integration test — end-to-end with moto

Files: - Create: main/server/tests/integration/test_audio_presigned_flow.py

  • [ ] Step 1: Write the failing integration test file
# main/server/tests/integration/test_audio_presigned_flow.py
"""End-to-end test of the presigned audio flow against moto.

Covers Happy Path steps 2-5 and the URL-signature rejection cases
from the Failure Modes table in
docs/plans/2026-05-19-sessions-audio-presigned.md.
"""

from __future__ import annotations

import json
import time
from pathlib import Path

import boto3
import pytest
import requests
from moto import mock_aws

from ..worker_utils import load_worker_module

_URL_MINTER_PATH = (
    Path(__file__).resolve().parents[2] / "api" / "sessions" / "audio" / "app.py"
)
_COMPLETE_PATH = (
    Path(__file__).resolve().parents[2] / "events" / "audio_upload_complete" / "app.py"
)


@pytest.fixture
def aws_env(monkeypatch):
    monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")
    monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
    monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
    monkeypatch.setenv("BUCKET_NAME", "test-bucket")
    monkeypatch.setenv("SESSIONS_TABLE_NAME", "test-sessions")
    monkeypatch.setenv("INGEST_FUNCTION_NAME", "IngestFn")


def _setup_aws():
    """Create the bucket + sessions table inside an active mock_aws context."""
    s3 = boto3.client("s3", region_name="us-east-1")
    s3.create_bucket(Bucket="test-bucket")

    ddb = boto3.resource("dynamodb", region_name="us-east-1")
    ddb.create_table(
        TableName="test-sessions",
        KeySchema=[{"AttributeName": "sessionId", "KeyType": "HASH"}],
        AttributeDefinitions=[{"AttributeName": "sessionId", "AttributeType": "S"}],
        BillingMode="PAY_PER_REQUEST",
    )
    table = ddb.Table("test-sessions")
    table.put_item(Item={"sessionId": "sess-1", "captureMode": "audio_only", "userId": "u"})
    return table


@mock_aws
def test_full_upload_flow_writes_audio_and_triggers_ingest(aws_env, monkeypatch):
    table = _setup_aws()

    # Reload modules under the mock context so they pick up moto's clients
    minter = load_worker_module("audio_minter_int", _URL_MINTER_PATH)
    complete = load_worker_module("audio_complete_int", _COMPLETE_PATH)

    # Step 1: client POSTs for URL
    grant_event = {
        "pathParameters": {"sessionId": "sess-1"},
        "queryStringParameters": {"windowIndex": "7"},
        "body": json.dumps({"sizeBytes": 4}),
    }
    grant_resp = minter.lambda_handler(grant_event, None)
    assert grant_resp["statusCode"] == 200
    grant_body = json.loads(grant_resp["body"])
    url = grant_body["url"]

    # Step 2: client PUTs bytes
    put_resp = requests.put(
        url,
        data=b"DATA",
        headers={"Content-Type": "audio/wav", "Content-Length": "4"},
        timeout=10,
    )
    assert put_resp.status_code == 200

    # Step 3: simulate S3 event firing → complete handler
    invoke_calls: list = []
    monkeypatch.setattr(
        complete.lambda_client,
        "invoke",
        lambda **kw: invoke_calls.append(kw),
    )
    s3_event = {
        "Records": [
            {
                "s3": {
                    "bucket": {"name": "test-bucket"},
                    "object": {"key": "sessions/sess-1/window_007/audio.wav"},
                }
            }
        ]
    }
    complete.lambda_handler(s3_event, None)

    # DDB row reflects window completion
    row = table.get_item(Key={"sessionId": "sess-1"})["Item"]
    assert 7 in row.get("completedAudioWindows", set())

    # Ingest invoke captured
    assert len(invoke_calls) == 1
    payload = json.loads(invoke_calls[0]["Payload"])
    assert payload["sessionId"] == "sess-1"
    assert payload["windowIndex"] == 7


@mock_aws
def test_url_signature_rejects_wrong_content_length(aws_env):
    _setup_aws()
    minter = load_worker_module("audio_minter_int2", _URL_MINTER_PATH)
    grant_resp = minter.lambda_handler(
        {
            "pathParameters": {"sessionId": "sess-1"},
            "queryStringParameters": {"windowIndex": "0"},
            "body": json.dumps({"sizeBytes": 100}),
        },
        None,
    )
    url = json.loads(grant_resp["body"])["url"]

    # PUT with mismatched Content-Length should fail
    resp = requests.put(
        url,
        data=b"X" * 50,  # 50 bytes != signed 100
        headers={"Content-Type": "audio/wav", "Content-Length": "50"},
        timeout=10,
    )
    assert resp.status_code >= 400


@mock_aws
def test_url_signature_rejects_wrong_content_type(aws_env):
    _setup_aws()
    minter = load_worker_module("audio_minter_int3", _URL_MINTER_PATH)
    grant_resp = minter.lambda_handler(
        {
            "pathParameters": {"sessionId": "sess-1"},
            "queryStringParameters": {"windowIndex": "0"},
            "body": json.dumps({"sizeBytes": 4}),
        },
        None,
    )
    url = json.loads(grant_resp["body"])["url"]

    resp = requests.put(
        url,
        data=b"DATA",
        headers={"Content-Type": "video/mp4", "Content-Length": "4"},
        timeout=10,
    )
    assert resp.status_code >= 400


@mock_aws
def test_expired_url_returns_403_on_put(aws_env, monkeypatch):
    """Generate a URL with 1s expiry, sleep past it, attempt PUT, expect rejection.

    Adjusts URL_EXPIRES_SECONDS via monkeypatch instead of waiting 5 minutes.
    """
    _setup_aws()
    minter = load_worker_module("audio_minter_int4", _URL_MINTER_PATH)
    monkeypatch.setattr(minter, "URL_EXPIRES_SECONDS", 1)
    grant_resp = minter.lambda_handler(
        {
            "pathParameters": {"sessionId": "sess-1"},
            "queryStringParameters": {"windowIndex": "0"},
            "body": json.dumps({"sizeBytes": 4}),
        },
        None,
    )
    url = json.loads(grant_resp["body"])["url"]

    time.sleep(2)

    resp = requests.put(
        url,
        data=b"DATA",
        headers={"Content-Type": "audio/wav", "Content-Length": "4"},
        timeout=10,
    )
    # Real S3 returns 403 with "Request has expired"; moto may return 403 or 400.
    # Either way the PUT must NOT succeed.
    assert resp.status_code >= 400, (
        f"expected expired URL to reject PUT, got {resp.status_code} "
        f"(if moto skips expiry validation, skip this test and rely on issue #497)"
    )
  • [ ] Step 2: Run, confirm pass (or surface moto behavior to adjust)

Run: cd main/server && pytest tests/integration/test_audio_presigned_flow.py -v Expected: PASS. If moto does not enforce ContentLength/ContentType strictly in your version, mark the two strict-binding tests as pytest.skip("moto does not enforce strict bindings; covered by issue #497") with a comment.

  • [ ] Step 3: Commit
git add main/server/tests/integration/test_audio_presigned_flow.py
git commit -m "test(audio): integration tests for presigned upload flow with moto"

Task 11: Delete obsolete test_audio_chunk_api.py

Files: - Delete: main/server/tests/unit/test_audio_chunk_api.py

  • [ ] Step 1: Confirm replacement coverage

Run: cd main/server && pytest tests/unit/test_audio_url_minter.py tests/unit/test_audio_upload_complete.py -v Expected: all green. These supersede the old test file.

  • [ ] Step 2: Delete the old file
rm main/server/tests/unit/test_audio_chunk_api.py
  • [ ] Step 3: Run the full suite to confirm no orphan reference

Run: cd main/server && pytest tests/ -v Expected: PASS. No collection errors mentioning test_audio_chunk_api.

  • [ ] Step 4: Commit
git add -A main/server/tests/unit/
git commit -m "chore(test): drop test_audio_chunk_api.py — superseded by URL minter + complete tests"

Task 12: Client — Jest test for happy path audio upload

Post-execution note: The uploadAudioItem snippet below was updated after the plan ran to reflect the cubic review fixes that landed in commit bf694ebc0. The shipped helper wraps the S3 PUT in a 60s AbortController so a stalled cell upload can't block the upload queue forever — without it, the fetch never resolves and the queue's per-item retry policy never fires.

Files: - Create: main/app/__tests__/capture-session-audio-upload.test.ts

  • [ ] Step 1: Write the failing test
// main/app/__tests__/capture-session-audio-upload.test.ts
/**
 * Tests for the rewritten audio branch of the capture-session uploadFn
 * (see main/app/lib/capture-session.ts, replacing the lines that used to
 * POST raw audio bytes through API Gateway).
 *
 * Each test cites a Happy Path step or Failure Modes table row in
 * docs/plans/2026-05-19-sessions-audio-presigned.md.
 */

import { uploadAudioItem } from "../lib/capture-session";

const sessionId = "sess-1";
const baseItem = {
  id: "audio-1",
  type: "audio" as const,
  uri: "file:///tmp/audio.wav",
  sessionId,
  windowIndex: 7,
  retryCount: 0,
  sizeBytes: 1234,
};

const flushMicrotasks = () => new Promise((r) => setImmediate(r));

describe("uploadAudioItem", () => {
  let originalFetch: typeof global.fetch;
  let mockApiPost: jest.Mock;
  let mockFetch: jest.Mock;
  let mockReadBytes: jest.Mock;

  beforeEach(() => {
    originalFetch = global.fetch;
    mockApiPost = jest.fn().mockResolvedValue({
      url: "https://s3.example/presigned",
      s3Key: `sessions/${sessionId}/window_007/audio.wav`,
      windowIndex: 7,
      expiresIn: 300,
    });
    mockFetch = jest.fn().mockResolvedValue({ ok: true, status: 200 });
    mockReadBytes = jest.fn().mockResolvedValue(new Uint8Array(1234));
    global.fetch = mockFetch as unknown as typeof global.fetch;
  });

  afterEach(() => {
    global.fetch = originalFetch;
    jest.restoreAllMocks();
  });

  test("uploads_audio_when_both_url_grant_and_put_succeed", async () => {
    await uploadAudioItem({
      item: baseItem,
      sessionId,
      apiPost: mockApiPost,
      readBytes: mockReadBytes,
    });

    expect(mockApiPost).toHaveBeenCalledWith(
      `/sessions/${sessionId}/audio?windowIndex=7`,
      JSON.stringify({ sizeBytes: 1234 }),
      expect.objectContaining({
        headers: { "Content-Type": "application/json" },
      }),
    );

    expect(mockFetch).toHaveBeenCalledWith(
      "https://s3.example/presigned",
      expect.objectContaining({
        method: "PUT",
        headers: {
          "Content-Type": "audio/wav",
          "Content-Length": "1234",
        },
      }),
    );
  });

  test("does_not_remove_item_from_queue_if_put_throws", async () => {
    mockFetch.mockRejectedValueOnce(new Error("network down"));
    await expect(
      uploadAudioItem({
        item: baseItem,
        sessionId,
        apiPost: mockApiPost,
        readBytes: mockReadBytes,
      }),
    ).rejects.toThrow(/network down/);
  });

  test("parks_item_when_url_grant_returns_5xx", async () => {
    mockApiPost.mockRejectedValueOnce(Object.assign(new Error("500"), { status: 500 }));
    await expect(
      uploadAudioItem({
        item: baseItem,
        sessionId,
        apiPost: mockApiPost,
        readBytes: mockReadBytes,
      }),
    ).rejects.toMatchObject({ status: 500 });
    expect(mockFetch).not.toHaveBeenCalled();
  });

  test("parks_item_when_put_to_s3_returns_5xx", async () => {
    mockFetch.mockResolvedValueOnce({ ok: false, status: 503 });
    await expect(
      uploadAudioItem({
        item: baseItem,
        sessionId,
        apiPost: mockApiPost,
        readBytes: mockReadBytes,
      }),
    ).rejects.toThrow(/S3 PUT failed/);
  });

  test("parks_item_when_put_returns_403_signature_mismatch", async () => {
    mockFetch.mockResolvedValueOnce({ ok: false, status: 403 });
    await expect(
      uploadAudioItem({
        item: baseItem,
        sessionId,
        apiPost: mockApiPost,
        readBytes: mockReadBytes,
      }),
    ).rejects.toThrow(/S3 PUT failed/);
  });

  test("refetches_url_after_5xx_retry_does_not_reuse_stale_url", async () => {
    // Simulate two consecutive uploads of the same item — each must
    // request its own URL, not reuse a cached one.
    await uploadAudioItem({
      item: baseItem,
      sessionId,
      apiPost: mockApiPost,
      readBytes: mockReadBytes,
    });
    await uploadAudioItem({
      item: baseItem,
      sessionId,
      apiPost: mockApiPost,
      readBytes: mockReadBytes,
    });

    expect(mockApiPost).toHaveBeenCalledTimes(2);
    expect(mockFetch).toHaveBeenCalledTimes(2);
  });
});
  • [ ] Step 2: Run, confirm fail (uploadAudioItem doesn't exist yet)

Run: cd main/app && npx jest __tests__/capture-session-audio-upload.test.ts Expected: FAIL — uploadAudioItem not exported from lib/capture-session.

  • [ ] Step 3: Implement uploadAudioItem in capture-session.ts

Open main/app/lib/capture-session.ts. Find the audio branch of uploadItem (currently around lines 103-112). Extract the audio upload logic into a new exported function. Add near the top of the file (after imports):

export interface UploadAudioDeps {
  item: {
    uri: string;
    windowIndex?: number;
    sizeBytes: number;
  };
  sessionId: string;
  apiPost: (
    path: string,
    body: string,
    options: { headers: Record<string, string>; timeout?: number },
  ) => Promise<{ url: string; s3Key: string; windowIndex: number; expiresIn: number }>;
  readBytes: (uri: string) => Promise<Uint8Array>;
}

export async function uploadAudioItem(deps: UploadAudioDeps): Promise<void> {
  const { item, sessionId, apiPost, readBytes } = deps;
  const windowIndex = item.windowIndex ?? 0;
  const bytes = await readBytes(item.uri);
  const sizeBytes = bytes.byteLength;

  const grant = await apiPost(
    `/sessions/${sessionId}/audio?windowIndex=${windowIndex}`,
    JSON.stringify({ sizeBytes }),
    {
      headers: { "Content-Type": "application/json" },
      timeout: 5000,
    },
  );

  const PUT_TIMEOUT_MS = 60000;
  const controller = new AbortController();
  const timeoutId = setTimeout(() => controller.abort(), PUT_TIMEOUT_MS);
  let putResp: Response;
  try {
    putResp = await fetch(grant.url, {
      method: "PUT",
      headers: {
        "Content-Type": "audio/wav",
        "Content-Length": String(sizeBytes),
      },
      body: bytes as unknown as BodyInit,
      signal: controller.signal,
    });
  } finally {
    clearTimeout(timeoutId);
  }
  if (!putResp.ok) {
    throw new Error(`S3 PUT failed: ${putResp.status}`);
  }
}

Then replace the audio branch in uploadItem (the function around line 88-114) to call this helper:

    if (item.type === "frame") {
      await api.post(`/sessions/${sessionId}/frames`, bytes, {
        headers: { "Content-Type": "image/jpeg" },
        timeout: 10000,
      });
      const filename = item.uri.split("/").pop() || "";
      await markUploaded(sessionId, filename);
    } else {
      const file = new File(toFileUri(item.uri));
      await uploadAudioItem({
        item,
        sessionId,
        apiPost: async (path, body, options) => {
          const res = await api.post<{
            url: string;
            s3Key: string;
            windowIndex: number;
            expiresIn: number;
          }>(path, body, options);
          return res;
        },
        readBytes: async (_uri) => await file.bytes(),
      });
    }

Note: this assumes api.post returns a parsed JSON body for the URL grant call. If api.post only returns the raw response, adapt accordingly using whatever the existing API client exposes for typed reads.

  • [ ] Step 4: Run tests, confirm pass

Run: cd main/app && npx jest __tests__/capture-session-audio-upload.test.ts Expected: 6 PASS.

  • [ ] Step 5: Commit
git add main/app/lib/capture-session.ts main/app/__tests__/capture-session-audio-upload.test.ts
git commit -m "feat(client): switch audio upload to presigned PUT flow

uploadAudioItem requests presigned URL from server, then PUTs raw
WAV bytes directly to S3 — bypasses API Gateway 29s cap. Frame
upload unchanged.

Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"

Task 13: Open PR + run CI

  • [ ] Step 1: Push branch
git push -u origin plan/sessions-audio-presigned
  • [ ] Step 2: Open PR
gh pr create --title "feat(audio): migrate sessions/audio to presigned PUT + S3 event" --body "$(cat <<'EOF'
## Summary

- Rewrites \`SessionAudioFunction\` as a presigned PUT URL minter (no body through Lambda).
- Adds \`AudioUploadCompleteFunction\` (new \`events/\` top-level dir) — S3 ObjectCreated triggered, handles DDB ADD + ingest claim+trigger.
- Removes legacy full-session upload path (no users today).
- Strict URL bindings: \`ContentType=audio/wav\`, exact \`ContentLength\`, 5 min expiry.
- Dedicated DLQ for the new Lambda with 2-retry policy.

Eliminates the 29s API Gateway cap failure class for multi-MB WAV chunks (root cause: audio body routes through Lambda memory; 60s WAV window ≈ 2 MB, slow cell upload exceeds cap).

## Spec

\`docs/plans/2026-05-19-sessions-audio-presigned.md\`

## Test plan

- [ ] \`pytest main/server/tests/unit/test_audio_url_minter.py -v\` → 12 PASS
- [ ] \`pytest main/server/tests/unit/test_audio_upload_complete.py -v\` → 10 PASS
- [ ] \`pytest main/server/tests/integration/test_audio_presigned_flow.py -v\` → up to 3 PASS (strict-binding cases may skip on moto)
- [ ] \`cd main/app && npx jest __tests__/capture-session-audio-upload.test.ts\` → 6 PASS
- [ ] \`sam validate --lint\` clean
- [ ] CI green on this PR
- [ ] (Post-merge) \`sam deploy\` to dev, exercise capture flow on test device, confirm audio lands in S3 + DDB updates + ingest fires

## Follow-ups

- #496 property-based tests
- #497 real-AWS integration test
- #498 load test for AudioUploadCompleteFunction

🤖 Generated with [Claude Code](https://claude.com/claude-code)
EOF
)"
  • [ ] Step 3: Watch CI until green

Run: gh pr checks --watch Expected: all checks pass. If any fail, diagnose and fix in a new commit (do not amend).


Task 14: Deploy + manual verification

  • [ ] Step 1: Deploy to dev environment

Run: cd main/server && sam deploy --no-confirm-changeset Expected: stack updates successfully. CloudFormation events show creation of AudioUploadCompleteFunction, AudioUploadCompleteDLQ, AudioUploadCompletePermission, and modification of AudioPostFunction and RawMemoryBucket.

  • [ ] Step 2: Exercise capture flow on test device

Manual: open the RN app on a test device, start a capture session, let it run long enough to produce at least one audio window (60s), then end the session.

  • [ ] Step 3: Verify S3 + DDB + ingest fired

Run (substitute the actual sessionId from device logs):

aws s3 ls s3://encache-raw-memory/sessions/<SESSION_ID>/window_000/
aws dynamodb get-item --table-name encache-sessions --key '{"sessionId":{"S":"<SESSION_ID>"}}' | jq '.Item.completedAudioWindows, .Item.ingestTriggeredWindows'
aws logs tail /aws/lambda/<stack>-AudioUploadCompleteFunction --since 10m
Expected: - S3 lists audio.wav in the window directory. - DDB shows the window index in both completedAudioWindows and ingestTriggeredWindows. - CloudWatch logs show step=audio_upload_registered for the window.

  • [ ] Step 4: Trigger artificial failure to verify DLQ

Manually upload an unknown-shaped object to a sessions/...audio.wav path that does not match the regex, then verify DLQ stays empty (malformed key logs + returns success). Then upload a key that matches but for a session that does not exist in DDB; verify the same.

For a true DLQ test, temporarily revoke the function's lambda:InvokeFunction permission, upload a valid event, verify DLQ receives the message after 2 retries, then restore the permission.

  • [ ] Step 5: Mark plan executed

In docs/plans/2026-05-19-sessions-audio-presigned.md, change Status: draft to Status: documentation. Commit:

git add docs/plans/2026-05-19-sessions-audio-presigned.md
git commit -m "docs(plans): mark sessions/audio presigned plan as documentation"

Self-review checklist (for the engineer executing this plan)

After each task: run the full server test suite (pytest main/server/tests/ -v) to catch unintended breakage.

After Task 11 specifically: run the full suite plus cd main/app && npx jest. The audio behavior change is the most blast-radius change; nothing else should be affected, but verify.

Before Task 13 (PR open): ensure all the test commands in the PR template pass locally.

After deploy (Task 14): if anything fails, git revert the deploy commit and re-deploy — see Rollback section in spec.