Sessions Audio Presigned PUT Migration — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Migrate POST /sessions/{id}/audio?windowIndex=N from a synchronous body-bearing handler to a presigned-PUT URL grant + S3-event-driven completion handler. Removes the 29s API Gateway cap as a failure class for multi-MB WAV chunks.
Architecture: SessionAudioFunction (renamed conceptually; AWS resource stays AudioPostFunction) becomes a URL minter — accepts {sizeBytes} JSON, returns a presigned PUT URL with strict ContentType + ContentLength bindings. Client PUTs raw audio bytes directly to S3, bypassing API Gateway. A new AudioUploadCompleteFunction is invoked by an S3 ObjectCreated event on sessions/*/window_*/audio.wav; it does the DynamoDB ADD completedAudioWindows update and the _claim_and_trigger ingest invoke. Three Lambdas, one new. No new HTTP endpoints from the client's POV.
Tech Stack: Python 3.12, AWS SAM, boto3, moto for unit/integration tests, pytest, React Native + TypeScript + Jest.
Spec: docs/plans/2026-05-19-sessions-audio-presigned.md — read before starting.
Conventions
- All server tests live in
main/server/tests/unit/andmain/server/tests/integration/. Thetests/worker_utils.py:load_worker_modulehelper imports handler code by file path. - Each task is one red-green-refactor cycle ending in a commit. Tests come first.
- Commit messages follow Conventional Commits (
feat(...),test(...),chore(...), etc.) — enforced by pre-commit hook. - Server commands assume cwd is
main/server. Client commands assume cwd ismain/app. motois already inmain/server/tests/requirements.txt. If a needed mock isn't available, fall back tounittest.mock.patch.- Date in commit messages and the implementation period: 2026-05-20 onward (today).
File structure
Server (Python) — new + modified:
| Path | Action | Responsibility |
|---|---|---|
main/server/api/sessions/audio/app.py | Rewrite | URL minter: validate input, verify session in DDB, generate presigned PUT URL |
main/server/events/__init__.py | Create | Empty — marks new package |
main/server/events/audio_upload_complete/__init__.py | Create | Empty — marks new package |
main/server/events/audio_upload_complete/app.py | Create | S3 event handler: parse key, DDB ADD, claim+trigger ingest |
main/server/template.yaml | Modify | Rewrite AudioPostFunction env/policies; add AudioUploadCompleteFunction + AudioUploadCompleteDLQ + S3 NotificationConfiguration |
main/server/tests/unit/test_audio_url_minter.py | Create | Unit tests for rewritten SessionAudioFunction |
main/server/tests/unit/test_audio_upload_complete.py | Create | Unit tests for new event handler |
main/server/tests/integration/test_audio_presigned_flow.py | Create | End-to-end with moto: URL grant → S3 PUT → event → DDB → ingest invoke |
main/server/tests/unit/test_audio_chunk_api.py | Delete | Superseded by test_audio_url_minter.py; old behavior gone |
Client (TypeScript) — modified:
| Path | Action | Responsibility |
|---|---|---|
main/app/lib/capture-session.ts | Modify | Rewrite audio branch of uploadItem (lines 103-112) — 2-step presigned flow |
main/app/__tests__/capture-session-audio-upload.test.ts | Create | Jest tests for new audio upload flow |
Constants (used across tasks):
MAX_AUDIO_BYTES = 10 * 1024 * 1024 # 10 MB
URL_EXPIRES_SECONDS = 300 # 5 min
S3_KEY_REGEX = r"^sessions/(?P<session_id>[^/]+)/window_(?P<window_index>\d{3})/audio\.wav$"
Task 1: Tests for URL minter happy path + URL bindings
Files: - Test: main/server/tests/unit/test_audio_url_minter.py (create)
- [ ] Step 1: Write the failing test file
# main/server/tests/unit/test_audio_url_minter.py
"""Unit tests for the rewritten SessionAudioFunction (URL minter).
Each test cites a row in docs/plans/2026-05-19-sessions-audio-presigned.md
either from Happy Path step 2 or from the Failure Modes table.
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from ..worker_utils import load_worker_module
_APP_PATH = (
Path(__file__).resolve().parents[2] / "api" / "sessions" / "audio" / "app.py"
)
def _load_module():
return load_worker_module("sessions_audio_app", _APP_PATH)
def _event(session_id: str = "sess-1", window_index: int | str | None = 7, body: dict | None = None) -> dict:
event: dict = {
"pathParameters": {"sessionId": session_id} if session_id else {},
}
if window_index is not None:
event["queryStringParameters"] = {"windowIndex": str(window_index)}
else:
event["queryStringParameters"] = None
event["body"] = json.dumps(body) if body is not None else json.dumps({"sizeBytes": 1_923_456})
return event
def _ddb_session_exists(session_id: str = "sess-1") -> MagicMock:
"""Mock DynamoDB table whose get_item returns a session row."""
mock_table = MagicMock()
mock_table.get_item.return_value = {"Item": {"sessionId": session_id}}
return mock_table
class TestURLMinterHappyPath:
@staticmethod
def test_returns_presigned_url_with_correct_s3_key_for_valid_request():
module = _load_module()
mock_s3 = MagicMock()
mock_s3.generate_presigned_url.return_value = "https://s3.example/presigned"
with (
patch.object(module, "s3", mock_s3),
patch.object(module, "table", _ddb_session_exists("sess-1")),
):
result = module.lambda_handler(_event("sess-1", 7, {"sizeBytes": 1_923_456}), None)
assert result["statusCode"] == 200
body = json.loads(result["body"])
assert body["s3Key"] == "sessions/sess-1/window_007/audio.wav"
assert body["windowIndex"] == 7
assert body["url"] == "https://s3.example/presigned"
assert body["expiresIn"] == 300
@staticmethod
def test_url_binds_content_type_audio_wav():
module = _load_module()
mock_s3 = MagicMock()
mock_s3.generate_presigned_url.return_value = "https://s3.example/presigned"
with (
patch.object(module, "s3", mock_s3),
patch.object(module, "table", _ddb_session_exists("sess-1")),
):
module.lambda_handler(_event("sess-1", 7, {"sizeBytes": 1000}), None)
kwargs = mock_s3.generate_presigned_url.call_args.kwargs
assert kwargs["Params"]["ContentType"] == "audio/wav"
@staticmethod
def test_url_binds_exact_content_length_from_size_bytes():
module = _load_module()
mock_s3 = MagicMock()
mock_s3.generate_presigned_url.return_value = "u"
with (
patch.object(module, "s3", mock_s3),
patch.object(module, "table", _ddb_session_exists("sess-1")),
):
module.lambda_handler(_event("sess-1", 7, {"sizeBytes": 1_923_456}), None)
kwargs = mock_s3.generate_presigned_url.call_args.kwargs
assert kwargs["Params"]["ContentLength"] == 1_923_456
@staticmethod
def test_url_expires_in_300_seconds():
module = _load_module()
mock_s3 = MagicMock()
mock_s3.generate_presigned_url.return_value = "u"
with (
patch.object(module, "s3", mock_s3),
patch.object(module, "table", _ddb_session_exists("sess-1")),
):
module.lambda_handler(_event(), None)
kwargs = mock_s3.generate_presigned_url.call_args.kwargs
assert kwargs["ExpiresIn"] == 300
- [ ] Step 2: Run tests, confirm they fail (no production code yet)
Run: cd main/server && pytest tests/unit/test_audio_url_minter.py -v Expected: All 4 tests FAIL — current app.py returns the legacy response shape, not {url, s3Key, windowIndex, expiresIn}.
- [ ] Step 3: Implement minimal URL minter in
app.py
Replace the entire contents of main/server/api/sessions/audio/app.py:
# main/server/api/sessions/audio/app.py
"""Audio Upload — issues a presigned PUT URL for per-window WAV chunks.
This handler does not accept the audio body. The client uses the returned
URL to PUT bytes directly to S3, bypassing the 29s API Gateway cap.
Completion is handled by AudioUploadCompleteFunction via S3 ObjectCreated.
"""
from __future__ import annotations
import json
import os
import boto3
from shared.logger import create_logger
BUCKET = os.environ.get("BUCKET_NAME", "encache-raw-memory")
SESSIONS_TABLE = os.environ.get("SESSIONS_TABLE_NAME", "encache-sessions")
URL_EXPIRES_SECONDS = 300
MAX_AUDIO_BYTES = 10 * 1024 * 1024 # 10 MB
s3 = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(SESSIONS_TABLE)
logger = create_logger({"flow": "audio_url_grant"})
def _bad_request(message: str) -> dict:
return {
"statusCode": 400,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": message}),
}
def _not_found(message: str) -> dict:
return {
"statusCode": 404,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": message}),
}
def lambda_handler(event, _context):
session_id = (event.get("pathParameters") or {}).get("sessionId", "")
if not session_id:
return _bad_request("sessionId required")
query_params = event.get("queryStringParameters") or {}
window_index_str = query_params.get("windowIndex")
if window_index_str is None:
return _bad_request("windowIndex required")
try:
window_index = int(window_index_str)
except (ValueError, TypeError):
return _bad_request("windowIndex must be an integer")
if window_index < 0:
return _bad_request("windowIndex must be non-negative")
raw_body = event.get("body") or ""
try:
body = json.loads(raw_body) if raw_body else {}
except json.JSONDecodeError:
return _bad_request("body must be valid JSON")
size_bytes = body.get("sizeBytes")
if not isinstance(size_bytes, int) or size_bytes <= 0:
return _bad_request("sizeBytes must be a positive integer")
if size_bytes > MAX_AUDIO_BYTES:
return _bad_request(f"sizeBytes exceeds cap of {MAX_AUDIO_BYTES}")
resp = table.get_item(Key={"sessionId": session_id}, ProjectionExpression="sessionId")
if "Item" not in resp:
return _not_found("session not found")
s3_key = f"sessions/{session_id}/window_{window_index:03d}/audio.wav"
url = s3.generate_presigned_url(
"put_object",
Params={
"Bucket": BUCKET,
"Key": s3_key,
"ContentType": "audio/wav",
"ContentLength": size_bytes,
},
ExpiresIn=URL_EXPIRES_SECONDS,
)
logger(
{
"step": "audio_url_granted",
"additional": {
"session_id": session_id,
"window_index": window_index,
"size_bytes": size_bytes,
"s3_key": s3_key,
},
}
)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(
{
"url": url,
"s3Key": s3_key,
"windowIndex": window_index,
"expiresIn": URL_EXPIRES_SECONDS,
}
),
}
- [ ] Step 4: Run tests, confirm they pass
Run: cd main/server && pytest tests/unit/test_audio_url_minter.py -v Expected: 4 PASS.
- [ ] Step 5: Commit
git add main/server/api/sessions/audio/app.py main/server/tests/unit/test_audio_url_minter.py
git commit -m "feat(audio): rewrite SessionAudioFunction as presigned URL minter
Returns {url, s3Key, windowIndex, expiresIn} with strict
ContentType=audio/wav and exact ContentLength binding. Body is no
longer accepted through API Gateway. Completion handled separately by
AudioUploadCompleteFunction via S3 ObjectCreated event.
Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"
Task 2: Tests for URL minter input validation (400 cases)
Files: - Test: main/server/tests/unit/test_audio_url_minter.py (append)
- [ ] Step 1: Append the failing test class
Add this class to test_audio_url_minter.py:
class TestURLMinterInputValidation:
@staticmethod
def test_returns_400_when_session_id_path_param_missing():
module = _load_module()
with patch.object(module, "table", MagicMock()):
event = {"pathParameters": {}, "queryStringParameters": {"windowIndex": "0"}, "body": json.dumps({"sizeBytes": 1})}
result = module.lambda_handler(event, None)
assert result["statusCode"] == 400
@staticmethod
def test_returns_400_when_window_index_missing():
module = _load_module()
with patch.object(module, "table", _ddb_session_exists()):
event = _event(window_index=None, body={"sizeBytes": 1})
result = module.lambda_handler(event, None)
assert result["statusCode"] == 400
@staticmethod
def test_returns_400_when_window_index_negative():
module = _load_module()
with patch.object(module, "table", _ddb_session_exists()):
result = module.lambda_handler(_event(window_index=-1, body={"sizeBytes": 1}), None)
assert result["statusCode"] == 400
@staticmethod
def test_returns_400_when_window_index_non_integer():
module = _load_module()
with patch.object(module, "table", _ddb_session_exists()):
result = module.lambda_handler(_event(window_index="abc", body={"sizeBytes": 1}), None)
assert result["statusCode"] == 400
@staticmethod
def test_returns_400_when_size_bytes_missing():
module = _load_module()
with patch.object(module, "table", _ddb_session_exists()):
result = module.lambda_handler(_event(body={}), None)
assert result["statusCode"] == 400
@staticmethod
def test_returns_400_when_size_bytes_zero_or_negative():
module = _load_module()
with patch.object(module, "table", _ddb_session_exists()):
assert module.lambda_handler(_event(body={"sizeBytes": 0}), None)["statusCode"] == 400
assert module.lambda_handler(_event(body={"sizeBytes": -1}), None)["statusCode"] == 400
@staticmethod
def test_returns_400_when_size_bytes_exceeds_10mb_cap():
module = _load_module()
with patch.object(module, "table", _ddb_session_exists()):
result = module.lambda_handler(_event(body={"sizeBytes": 10 * 1024 * 1024 + 1}), None)
assert result["statusCode"] == 400
- [ ] Step 2: Run tests, confirm they pass
Run: cd main/server && pytest tests/unit/test_audio_url_minter.py::TestURLMinterInputValidation -v Expected: 7 PASS (handler already implements the validation from Task 1).
- [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_url_minter.py
git commit -m "test(audio): cover URL minter input validation 400 cases"
Task 3: Tests for URL minter session-not-found (404)
Files: - Test: main/server/tests/unit/test_audio_url_minter.py (append)
- [ ] Step 1: Append the failing test class
class TestURLMinterSessionLookup:
@staticmethod
def test_returns_404_when_session_id_not_in_dynamodb():
module = _load_module()
mock_table = MagicMock()
mock_table.get_item.return_value = {} # no Item key = not found
with patch.object(module, "table", mock_table):
result = module.lambda_handler(_event("unknown-sess", 0, {"sizeBytes": 1}), None)
assert result["statusCode"] == 404
- [ ] Step 2: Run, confirm pass
Run: cd main/server && pytest tests/unit/test_audio_url_minter.py::TestURLMinterSessionLookup -v Expected: PASS (handler already returns 404 on missing Item).
- [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_url_minter.py
git commit -m "test(audio): cover URL minter 404 on unknown session"
Task 4: Scaffold AudioUploadCompleteFunction + S3 key parser
Files: - Create: main/server/events/__init__.py (empty) - Create: main/server/events/audio_upload_complete/__init__.py (empty) - Create: main/server/events/audio_upload_complete/app.py - Create: main/server/tests/unit/test_audio_upload_complete.py
- [ ] Step 1: Create empty package markers
mkdir -p main/server/events/audio_upload_complete
touch main/server/events/__init__.py
touch main/server/events/audio_upload_complete/__init__.py
- [ ] Step 2: Write the failing test file
# main/server/tests/unit/test_audio_upload_complete.py
"""Unit tests for AudioUploadCompleteFunction (S3 event handler).
Each test cites a Failure Modes table row or Happy Path step in
docs/plans/2026-05-19-sessions-audio-presigned.md.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from ..worker_utils import load_worker_module
_APP_PATH = (
Path(__file__).resolve().parents[2]
/ "events"
/ "audio_upload_complete"
/ "app.py"
)
def _load_module():
return load_worker_module("audio_upload_complete_app", _APP_PATH)
def _s3_event(key: str = "sessions/sess-1/window_007/audio.wav") -> dict:
return {
"Records": [
{
"s3": {
"bucket": {"name": "encache-raw-memory"},
"object": {"key": key},
}
}
]
}
class TestS3KeyParser:
@staticmethod
def test_parses_valid_key():
module = _load_module()
result = module._parse_key("sessions/sess-1/window_007/audio.wav")
assert result == ("sess-1", 7)
@staticmethod
def test_returns_none_for_malformed_key():
module = _load_module()
for bad in [
"wrong/prefix/window_000/audio.wav",
"sessions/sess-1/audio.wav",
"sessions/sess-1/window_xyz/audio.wav",
"sessions/sess-1/window_007/audio.mp3",
"",
]:
assert module._parse_key(bad) is None
- [ ] Step 3: Run, confirm fail
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestS3KeyParser -v Expected: FAIL — app.py doesn't exist yet, import errors.
- [ ] Step 4: Implement minimal
app.pywith parser only
# main/server/events/audio_upload_complete/app.py
"""Audio Upload Complete — S3 ObjectCreated handler.
Triggered by S3 events on sessions/*/window_*/audio.wav. Updates
DynamoDB completedAudioWindows and conditionally triggers ingest.
"""
from __future__ import annotations
import json
import os
import re
import boto3
from shared.logger import create_logger
SESSIONS_TABLE = os.environ.get("SESSIONS_TABLE_NAME", "encache-sessions")
INGEST_FUNCTION = os.environ.get("INGEST_FUNCTION_NAME", "")
DEFAULT_FRAME_COUNT_PER_WINDOW = 60
_KEY_RE = re.compile(
r"^sessions/(?P<session_id>[^/]+)/window_(?P<window_index>\d{3})/audio\.wav$"
)
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(SESSIONS_TABLE)
lambda_client = boto3.client("lambda")
logger = create_logger({"flow": "audio_upload_complete"})
def _parse_key(key: str) -> tuple[str, int] | None:
match = _KEY_RE.match(key)
if not match:
return None
return match.group("session_id"), int(match.group("window_index"))
def lambda_handler(event, _context):
# Implementation grows across subsequent tasks.
return {"ok": True}
- [ ] Step 5: Run, confirm pass
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestS3KeyParser -v Expected: PASS.
- [ ] Step 6: Commit
git add main/server/events/ main/server/tests/unit/test_audio_upload_complete.py
git commit -m "feat(audio-upload-complete): scaffold S3 event handler + key parser
New event-driven Lambda under main/server/events/. Top-level events/
directory introduced for non-API Lambdas (no existing precedent;
keeps separation from api/ for HTTP and worldmm/ for the ingest
pipeline).
Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"
Task 5: AudioUploadComplete — DDB ADD + audio_only mode triggers ingest
Post-execution note: The
_claim_and_triggersnippet below was updated after the plan ran to reflect the cubic review fixes that landed in commitbf694ebc0. The shipped helper now rolls back the claim via DDBDELETE ingestTriggeredWindowswhen the Lambda invoke fails, emittingingest_claim_rolled_back(oringest_claim_rollback_failedif the rollback DDB call itself errors). See the spec's failure-modes table for the rationale.
Files: - Modify: main/server/events/audio_upload_complete/app.py - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)
- [ ] Step 1: Append failing tests
class TestAudioOnlyMode:
@staticmethod
def test_happy_path_audio_only_mode_triggers_ingest_immediately():
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.return_value = {
"Attributes": {
"sessionId": "sess-1",
"captureMode": "audio_only",
"completedAudioWindows": {7},
"completedFrameWindows": set(),
"userId": "user-1",
}
}
mock_lambda = MagicMock()
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
):
module.lambda_handler(_s3_event("sessions/sess-1/window_007/audio.wav"), None)
# DDB ADD called with correct window index
update_kwargs = mock_table.update_item.call_args.kwargs
assert update_kwargs["Key"] == {"sessionId": "sess-1"}
assert ":win" in update_kwargs["ExpressionAttributeValues"]
assert 7 in update_kwargs["ExpressionAttributeValues"][":win"]
# Ingest invoked
invoke_kwargs = mock_lambda.invoke.call_args.kwargs
assert invoke_kwargs["FunctionName"] == "IngestFn"
assert invoke_kwargs["InvocationType"] == "Event"
payload = json.loads(invoke_kwargs["Payload"])
assert payload["sessionId"] == "sess-1"
assert payload["windowIndex"] == 7
@staticmethod
def test_adds_window_index_to_completed_audio_windows_in_ddb():
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.return_value = {
"Attributes": {
"captureMode": "audio_only",
"completedAudioWindows": {3},
"completedFrameWindows": set(),
"userId": "u",
}
}
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", MagicMock()),
):
module.lambda_handler(_s3_event("sessions/sess-1/window_003/audio.wav"), None)
kwargs = mock_table.update_item.call_args.kwargs
assert kwargs["UpdateExpression"] == "ADD completedAudioWindows :win"
Add import json, os at the top of test_audio_upload_complete.py if not already present.
- [ ] Step 2: Run, confirm fail
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestAudioOnlyMode -v Expected: FAIL — handler returns {"ok": True} without doing any DDB or invoke work.
- [ ] Step 3: Implement DDB + ingest trigger logic
Replace the lambda_handler function (and add _claim_and_trigger helper) in main/server/events/audio_upload_complete/app.py:
def _claim_and_trigger(
session_id: str, window_index: int, user_id: str, frame_count: int
) -> bool:
"""Conditional claim + async invoke of IngestWindowFunction.
If the invoke fails, the claim is rolled back so AWS async retry can
re-claim and re-invoke. Without the rollback, a transient invoke
failure would leave the claim in place and silently drop the window
on retry (the conditional would reject the second attempt).
Returns True if we won the claim race AND the invoke succeeded.
Returns False if another invocation already claimed this window.
Raises if the invoke fails — caller (lambda_handler) propagates so
AWS async-invoke retry kicks in. By the time the retry runs, the
claim has been rolled back so the new attempt can succeed.
"""
from botocore.exceptions import ClientError
try:
table.update_item(
Key={"sessionId": session_id},
UpdateExpression="ADD ingestTriggeredWindows :win",
ConditionExpression=(
"attribute_not_exists(ingestTriggeredWindows) "
"OR NOT contains(ingestTriggeredWindows, :winVal)"
),
ExpressionAttributeValues={
":win": {window_index},
":winVal": window_index,
},
)
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
logger(
{
"step": "ingest_claim_lost",
"additional": {
"session_id": session_id,
"window_index": window_index,
},
}
)
return False
raise
try:
lambda_client.invoke(
FunctionName=INGEST_FUNCTION,
InvocationType="Event",
Payload=json.dumps(
{
"sessionId": session_id,
"userId": user_id,
"windowIndex": window_index,
"frameCount": frame_count,
}
),
)
except Exception:
# Roll back the claim so AWS async retry can re-claim + re-invoke.
# Without this, the conditional rejects the retry and the window
# is silently dropped.
try:
table.update_item(
Key={"sessionId": session_id},
UpdateExpression="DELETE ingestTriggeredWindows :win",
ExpressionAttributeValues={":win": {window_index}},
)
logger(
{
"step": "ingest_claim_rolled_back",
"additional": {
"session_id": session_id,
"window_index": window_index,
},
}
)
except Exception as rollback_err:
# If rollback fails, log loudly but still re-raise the original
# invoke failure. The rollback failure is recoverable manually
# (operator can delete the window from ingestTriggeredWindows).
logger(
{
"step": "ingest_claim_rollback_failed",
"additional": {
"session_id": session_id,
"window_index": window_index,
"error": str(rollback_err),
},
}
)
raise
return True
def lambda_handler(event, _context):
for record in event.get("Records", []):
key = record.get("s3", {}).get("object", {}).get("key", "")
parsed = _parse_key(key)
if parsed is None:
logger(
{"step": "audio_upload_malformed_key", "additional": {"key": key}}
)
continue
session_id, window_index = parsed
resp = table.update_item(
Key={"sessionId": session_id},
UpdateExpression="ADD completedAudioWindows :win",
ExpressionAttributeValues={":win": {window_index}},
ReturnValues="ALL_NEW",
)
attrs = resp.get("Attributes") or {}
capture_mode = attrs.get("captureMode", "audio_video")
user_id = attrs.get("userId", "")
completed_frames = attrs.get("completedFrameWindows") or set()
should_trigger = capture_mode == "audio_only" or window_index in completed_frames
if should_trigger and INGEST_FUNCTION:
frame_count = 0 if capture_mode == "audio_only" else DEFAULT_FRAME_COUNT_PER_WINDOW
_claim_and_trigger(session_id, window_index, user_id, frame_count)
logger(
{
"step": "audio_upload_registered",
"additional": {
"session_id": session_id,
"window_index": window_index,
"capture_mode": capture_mode,
},
}
)
return {"ok": True}
- [ ] Step 4: Run, confirm pass
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestAudioOnlyMode -v Expected: 2 PASS.
- [ ] Step 5: Commit
git add main/server/events/audio_upload_complete/app.py main/server/tests/unit/test_audio_upload_complete.py
git commit -m "feat(audio-upload-complete): DDB ADD and ingest trigger in audio_only mode"
Task 6: AudioUploadComplete — audio_video mode race with frames
Files: - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)
- [ ] Step 1: Append failing tests
class TestAudioVideoMode:
@staticmethod
def test_audio_video_mode_triggers_ingest_only_when_frames_ready():
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.return_value = {
"Attributes": {
"captureMode": "audio_video",
"completedAudioWindows": {7},
"completedFrameWindows": {7}, # frames already done
"userId": "u",
}
}
mock_lambda = MagicMock()
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
):
module.lambda_handler(_s3_event("sessions/sess-1/window_007/audio.wav"), None)
# Invoke should fire — frames are ready
assert mock_lambda.invoke.called
@staticmethod
def test_audio_video_mode_does_not_trigger_when_frames_pending():
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.return_value = {
"Attributes": {
"captureMode": "audio_video",
"completedAudioWindows": {7},
"completedFrameWindows": set(), # frames NOT done
"userId": "u",
}
}
mock_lambda = MagicMock()
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
):
module.lambda_handler(_s3_event("sessions/sess-1/window_007/audio.wav"), None)
# Invoke should NOT fire — frames pending
assert not mock_lambda.invoke.called
- [ ] Step 2: Run, confirm pass
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestAudioVideoMode -v Expected: 2 PASS (Task 5 handler already implements the should_trigger logic).
- [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_upload_complete.py
git commit -m "test(audio-upload-complete): cover audio_video mode trigger race"
Task 7: AudioUploadComplete — error handling paths
Files: - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)
- [ ] Step 1: Append failing tests
class TestErrorHandling:
@staticmethod
def test_malformed_s3_key_logs_and_returns_success():
module = _load_module()
mock_table = MagicMock()
with patch.object(module, "table", mock_table):
result = module.lambda_handler(_s3_event("garbage/path/no-match"), None)
assert result["ok"] is True
assert not mock_table.update_item.called
@staticmethod
def test_unknown_session_returns_success_without_invoking_ingest():
"""DDB returns no Attributes when session row doesn't exist (or row exists but no fields).
Handler should still succeed without invoking ingest."""
module = _load_module()
mock_table = MagicMock()
# update_item on a non-existent row creates an empty row with the
# ADD attribute — Attributes will contain only completedAudioWindows.
mock_table.update_item.return_value = {
"Attributes": {"completedAudioWindows": {7}}
}
mock_lambda = MagicMock()
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
):
result = module.lambda_handler(_s3_event("sessions/ghost/window_007/audio.wav"), None)
assert result["ok"] is True
# captureMode defaults to "audio_video"; completedFrameWindows is empty → no trigger
assert not mock_lambda.invoke.called
@staticmethod
def test_ddb_throttle_raises_for_aws_retry():
from botocore.exceptions import ClientError
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.side_effect = ClientError(
{"Error": {"Code": "ProvisionedThroughputExceededException", "Message": "throttled"}},
"UpdateItem",
)
with patch.object(module, "table", mock_table):
with pytest.raises(ClientError):
module.lambda_handler(_s3_event(), None)
@staticmethod
def test_ingest_invoke_failure_raises_for_aws_retry():
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.return_value = {
"Attributes": {
"captureMode": "audio_only",
"userId": "u",
"completedFrameWindows": set(),
}
}
mock_lambda = MagicMock()
mock_lambda.invoke.side_effect = RuntimeError("boom")
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
):
with pytest.raises(RuntimeError):
module.lambda_handler(_s3_event(), None)
- [ ] Step 2: Run, confirm pass
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestErrorHandling -v Expected: 4 PASS. Handler already skips malformed keys (Task 5), defaults capture_mode so unknown sessions are no-op, and propagates raises from DDB and lambda_client.
- [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_upload_complete.py
git commit -m "test(audio-upload-complete): cover error/recovery handling"
Task 8: AudioUploadComplete — duplicate event idempotency
Files: - Modify: main/server/tests/unit/test_audio_upload_complete.py (append)
- [ ] Step 1: Append failing test
class TestIdempotency:
@staticmethod
def test_duplicate_s3_event_does_not_double_trigger_ingest():
"""Second event for same key fails the conditional claim → no second invoke."""
from botocore.exceptions import ClientError
module = _load_module()
mock_table = MagicMock()
# First call to update_item (audio ADD): succeeds with frames ready
# Second call (claim): succeeds — first invocation
# Third call (audio ADD again, second event): succeeds idempotently
# Fourth call (claim, second event): conditional fails — already claimed
mock_table.update_item.side_effect = [
# First event: audio ADD
{
"Attributes": {
"captureMode": "audio_video",
"completedAudioWindows": {7},
"completedFrameWindows": {7},
"userId": "u",
}
},
# First event: claim
{"Attributes": {"ingestTriggeredWindows": {7}}},
# Second event: audio ADD (idempotent — set semantics)
{
"Attributes": {
"captureMode": "audio_video",
"completedAudioWindows": {7},
"completedFrameWindows": {7},
"userId": "u",
}
},
# Second event: claim fails conditional
ClientError(
{"Error": {"Code": "ConditionalCheckFailedException", "Message": "x"}},
"UpdateItem",
),
]
mock_lambda = MagicMock()
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
):
evt = _s3_event("sessions/sess-1/window_007/audio.wav")
module.lambda_handler(evt, None)
module.lambda_handler(evt, None)
# invoke called exactly once
assert mock_lambda.invoke.call_count == 1
@staticmethod
def test_concurrent_audio_and_frame_completion_invoke_ingest_exactly_once():
"""Both an audio S3 event and a frame completion (from FramePostFunction)
can race to call _claim_and_trigger for the same window. The conditional
ensures exactly one invoke fires.
Simulated by calling _claim_and_trigger twice in sequence — second call
returns False because the conditional rejects the duplicate.
"""
from botocore.exceptions import ClientError
module = _load_module()
mock_table = MagicMock()
mock_table.update_item.side_effect = [
{"Attributes": {"ingestTriggeredWindows": {7}}}, # first claim wins
ClientError( # second claim loses
{"Error": {"Code": "ConditionalCheckFailedException", "Message": "x"}},
"UpdateItem",
),
]
mock_lambda = MagicMock()
with (
patch.object(module, "table", mock_table),
patch.object(module, "lambda_client", mock_lambda),
patch.dict(os.environ, {"INGEST_FUNCTION_NAME": "IngestFn"}),
):
first = module._claim_and_trigger("sess-1", 7, "u", 60)
second = module._claim_and_trigger("sess-1", 7, "u", 60)
assert first is True
assert second is False
assert mock_lambda.invoke.call_count == 1
- [ ] Step 2: Run, confirm pass
Run: cd main/server && pytest tests/unit/test_audio_upload_complete.py::TestIdempotency -v Expected: 2 PASS (claim conditional already implemented in Task 5).
- [ ] Step 3: Commit
git add main/server/tests/unit/test_audio_upload_complete.py
git commit -m "test(audio-upload-complete): cover duplicate S3 event idempotency"
Task 9: Update SAM template — new function + DLQ + IAM
Files: - Modify: main/server/template.yaml
- [ ] Step 1: Edit
AudioPostFunctionpolicies — drop write/invoke perms
Find the AudioPostFunction block (around main/server/template.yaml:306-326). Replace its Environment.Variables and Policies sections:
AudioPostFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: api/sessions/audio/
Handler: app.lambda_handler
Runtime: python3.12
Layers:
- !Ref SharedLayer
Environment:
Variables:
BUCKET_NAME: !Ref RawMemoryBucket
SESSIONS_TABLE_NAME: !Ref SessionsTable
Policies:
- !Ref S3AccessPolicy
- Statement:
- Effect: Allow
Action: dynamodb:GetItem
Resource: !GetAtt SessionsTable.Arn
Events:
AudioPost:
Type: Api
Properties:
Path: /sessions/{sessionId}/audio
Method: post
Notes: - Drops INGEST_FUNCTION_NAME env (handler no longer invokes ingest). - Drops DynamoDBPolicy (full table write) in favor of inline read-only GetItem. - Drops LambdaInvokePolicy. - Keeps S3AccessPolicy for presigned URL signing (signing is local but boto3 requires creds).
- [ ] Step 2: Add
AudioUploadCompleteDLQresource
Append to the Resources: section (after the existing DLQ definitions; search for IngestDLQ to find a good neighbor):
AudioUploadCompleteDLQ:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 720
MessageRetentionPeriod: 1209600
- [ ] Step 3: Add
AudioUploadCompleteFunctionresource
Append (place near IngestWindowFunction for cohesion):
AudioUploadCompleteFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: events/audio_upload_complete/
Handler: app.lambda_handler
Runtime: python3.12
Timeout: 30
MemorySize: 256
Layers:
- !Ref SharedLayer
Environment:
Variables:
SESSIONS_TABLE_NAME: !Ref SessionsTable
INGEST_FUNCTION_NAME: !Ref IngestWindowFunction
Policies:
- !Ref DynamoDBPolicy
- !Ref LambdaInvokePolicy
EventInvokeConfig:
MaximumRetryAttempts: 2
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt AudioUploadCompleteDLQ.Arn
- [ ] Step 4: Add S3 → Lambda permission grant
AudioUploadCompletePermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt AudioUploadCompleteFunction.Arn
Principal: s3.amazonaws.com
SourceArn: !GetAtt RawMemoryBucket.Arn
SourceAccount: !Ref AWS::AccountId
Note — Step 5 (SAM NotificationConfiguration) removed: Discovered during execution that encache-raw-memory is Terraform-managed (main/devops/main.tf:305), not SAM-managed. The S3 notification wiring lives in Task 9b (added during execution) — see docs/plans/2026-05-20-sessions-audio-presigned-implementation.md Task 9b for the Terraform change. SAM continues to own the Lambda Permission grant (Step 4 above) since the Lambda itself is SAM-managed.
- [ ] ~~Step 5: Add S3 NotificationConfiguration on
RawMemoryBucket~~ — SUPERSEDED, see Task 9b.
~~Find the RawMemoryBucket resource definition (grep RawMemoryBucket: in template.yaml). Add or extend its NotificationConfiguration:~~
~~If RawMemoryBucket already has a NotificationConfiguration, merge the new LambdaConfigurations entry into the existing list. Also ensure RawMemoryBucket has DependsOn: AudioUploadCompletePermission so CloudFormation creates the permission before wiring up the notification (S3 rejects notification configs that point at unauthorized functions).~~
- [ ] Step 6: Validate template
Run: cd main/server && sam validate --lint Expected: <path>/template.yaml is a valid SAM Template.
- [ ] Step 7: Commit
git add main/server/template.yaml
git commit -m "chore(infra): add AudioUploadCompleteFunction + DLQ + S3 notification
- New SAM function under events/audio_upload_complete/.
- Dedicated SQS DLQ with 2 retries before drop.
- S3 ObjectCreated event on sessions/*/audio.wav prefix.
- Downgrades AudioPostFunction IAM to S3 sign + DDB GetItem only.
Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"
Task 9b: Terraform S3 notification → AudioUploadCompleteFunction
Files: - Modify: main/devops/main.tf
Added during plan execution after discovering the bucket is Terraform-managed.
- Adds
data "aws_lambda_function" "audio_upload_complete"that looks up the SAM-deployed Lambda by deterministic nameserver-AudioUploadCompleteFunction(name pinned via SAMFunctionNameproperty). - Adds
aws_s3_bucket_notification.raw_data_audiofilterings3:ObjectCreated:*on prefixsessions/, suffixaudio.wav. - Lambda permission grant remains in SAM (
AudioUploadCompletePermissionresource). - Deploy order:
sam deployfirst, thenterraform apply.
Validation: cd main/devops && terraform fmt -check && terraform validate.
Implemented in commit 35814ea35 + 384bc8e75 (pinned function name for deterministic lookup).
Task 10: Integration test — end-to-end with moto
Files: - Create: main/server/tests/integration/test_audio_presigned_flow.py
- [ ] Step 1: Write the failing integration test file
# main/server/tests/integration/test_audio_presigned_flow.py
"""End-to-end test of the presigned audio flow against moto.
Covers Happy Path steps 2-5 and the URL-signature rejection cases
from the Failure Modes table in
docs/plans/2026-05-19-sessions-audio-presigned.md.
"""
from __future__ import annotations
import json
import time
from pathlib import Path
import boto3
import pytest
import requests
from moto import mock_aws
from ..worker_utils import load_worker_module
_URL_MINTER_PATH = (
Path(__file__).resolve().parents[2] / "api" / "sessions" / "audio" / "app.py"
)
_COMPLETE_PATH = (
Path(__file__).resolve().parents[2] / "events" / "audio_upload_complete" / "app.py"
)
@pytest.fixture
def aws_env(monkeypatch):
monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
monkeypatch.setenv("BUCKET_NAME", "test-bucket")
monkeypatch.setenv("SESSIONS_TABLE_NAME", "test-sessions")
monkeypatch.setenv("INGEST_FUNCTION_NAME", "IngestFn")
def _setup_aws():
"""Create the bucket + sessions table inside an active mock_aws context."""
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="test-bucket")
ddb = boto3.resource("dynamodb", region_name="us-east-1")
ddb.create_table(
TableName="test-sessions",
KeySchema=[{"AttributeName": "sessionId", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "sessionId", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
table = ddb.Table("test-sessions")
table.put_item(Item={"sessionId": "sess-1", "captureMode": "audio_only", "userId": "u"})
return table
@mock_aws
def test_full_upload_flow_writes_audio_and_triggers_ingest(aws_env, monkeypatch):
table = _setup_aws()
# Reload modules under the mock context so they pick up moto's clients
minter = load_worker_module("audio_minter_int", _URL_MINTER_PATH)
complete = load_worker_module("audio_complete_int", _COMPLETE_PATH)
# Step 1: client POSTs for URL
grant_event = {
"pathParameters": {"sessionId": "sess-1"},
"queryStringParameters": {"windowIndex": "7"},
"body": json.dumps({"sizeBytes": 4}),
}
grant_resp = minter.lambda_handler(grant_event, None)
assert grant_resp["statusCode"] == 200
grant_body = json.loads(grant_resp["body"])
url = grant_body["url"]
# Step 2: client PUTs bytes
put_resp = requests.put(
url,
data=b"DATA",
headers={"Content-Type": "audio/wav", "Content-Length": "4"},
timeout=10,
)
assert put_resp.status_code == 200
# Step 3: simulate S3 event firing → complete handler
invoke_calls: list = []
monkeypatch.setattr(
complete.lambda_client,
"invoke",
lambda **kw: invoke_calls.append(kw),
)
s3_event = {
"Records": [
{
"s3": {
"bucket": {"name": "test-bucket"},
"object": {"key": "sessions/sess-1/window_007/audio.wav"},
}
}
]
}
complete.lambda_handler(s3_event, None)
# DDB row reflects window completion
row = table.get_item(Key={"sessionId": "sess-1"})["Item"]
assert 7 in row.get("completedAudioWindows", set())
# Ingest invoke captured
assert len(invoke_calls) == 1
payload = json.loads(invoke_calls[0]["Payload"])
assert payload["sessionId"] == "sess-1"
assert payload["windowIndex"] == 7
@mock_aws
def test_url_signature_rejects_wrong_content_length(aws_env):
_setup_aws()
minter = load_worker_module("audio_minter_int2", _URL_MINTER_PATH)
grant_resp = minter.lambda_handler(
{
"pathParameters": {"sessionId": "sess-1"},
"queryStringParameters": {"windowIndex": "0"},
"body": json.dumps({"sizeBytes": 100}),
},
None,
)
url = json.loads(grant_resp["body"])["url"]
# PUT with mismatched Content-Length should fail
resp = requests.put(
url,
data=b"X" * 50, # 50 bytes != signed 100
headers={"Content-Type": "audio/wav", "Content-Length": "50"},
timeout=10,
)
assert resp.status_code >= 400
@mock_aws
def test_url_signature_rejects_wrong_content_type(aws_env):
_setup_aws()
minter = load_worker_module("audio_minter_int3", _URL_MINTER_PATH)
grant_resp = minter.lambda_handler(
{
"pathParameters": {"sessionId": "sess-1"},
"queryStringParameters": {"windowIndex": "0"},
"body": json.dumps({"sizeBytes": 4}),
},
None,
)
url = json.loads(grant_resp["body"])["url"]
resp = requests.put(
url,
data=b"DATA",
headers={"Content-Type": "video/mp4", "Content-Length": "4"},
timeout=10,
)
assert resp.status_code >= 400
@mock_aws
def test_expired_url_returns_403_on_put(aws_env, monkeypatch):
"""Generate a URL with 1s expiry, sleep past it, attempt PUT, expect rejection.
Adjusts URL_EXPIRES_SECONDS via monkeypatch instead of waiting 5 minutes.
"""
_setup_aws()
minter = load_worker_module("audio_minter_int4", _URL_MINTER_PATH)
monkeypatch.setattr(minter, "URL_EXPIRES_SECONDS", 1)
grant_resp = minter.lambda_handler(
{
"pathParameters": {"sessionId": "sess-1"},
"queryStringParameters": {"windowIndex": "0"},
"body": json.dumps({"sizeBytes": 4}),
},
None,
)
url = json.loads(grant_resp["body"])["url"]
time.sleep(2)
resp = requests.put(
url,
data=b"DATA",
headers={"Content-Type": "audio/wav", "Content-Length": "4"},
timeout=10,
)
# Real S3 returns 403 with "Request has expired"; moto may return 403 or 400.
# Either way the PUT must NOT succeed.
assert resp.status_code >= 400, (
f"expected expired URL to reject PUT, got {resp.status_code} "
f"(if moto skips expiry validation, skip this test and rely on issue #497)"
)
- [ ] Step 2: Run, confirm pass (or surface moto behavior to adjust)
Run: cd main/server && pytest tests/integration/test_audio_presigned_flow.py -v Expected: PASS. If moto does not enforce ContentLength/ContentType strictly in your version, mark the two strict-binding tests as pytest.skip("moto does not enforce strict bindings; covered by issue #497") with a comment.
- [ ] Step 3: Commit
git add main/server/tests/integration/test_audio_presigned_flow.py
git commit -m "test(audio): integration tests for presigned upload flow with moto"
Task 11: Delete obsolete test_audio_chunk_api.py
Files: - Delete: main/server/tests/unit/test_audio_chunk_api.py
- [ ] Step 1: Confirm replacement coverage
Run: cd main/server && pytest tests/unit/test_audio_url_minter.py tests/unit/test_audio_upload_complete.py -v Expected: all green. These supersede the old test file.
- [ ] Step 2: Delete the old file
- [ ] Step 3: Run the full suite to confirm no orphan reference
Run: cd main/server && pytest tests/ -v Expected: PASS. No collection errors mentioning test_audio_chunk_api.
- [ ] Step 4: Commit
git add -A main/server/tests/unit/
git commit -m "chore(test): drop test_audio_chunk_api.py — superseded by URL minter + complete tests"
Task 12: Client — Jest test for happy path audio upload
Post-execution note: The
uploadAudioItemsnippet below was updated after the plan ran to reflect the cubic review fixes that landed in commitbf694ebc0. The shipped helper wraps the S3 PUT in a 60sAbortControllerso a stalled cell upload can't block the upload queue forever — without it, the fetch never resolves and the queue's per-item retry policy never fires.
Files: - Create: main/app/__tests__/capture-session-audio-upload.test.ts
- [ ] Step 1: Write the failing test
// main/app/__tests__/capture-session-audio-upload.test.ts
/**
* Tests for the rewritten audio branch of the capture-session uploadFn
* (see main/app/lib/capture-session.ts, replacing the lines that used to
* POST raw audio bytes through API Gateway).
*
* Each test cites a Happy Path step or Failure Modes table row in
* docs/plans/2026-05-19-sessions-audio-presigned.md.
*/
import { uploadAudioItem } from "../lib/capture-session";
const sessionId = "sess-1";
const baseItem = {
id: "audio-1",
type: "audio" as const,
uri: "file:///tmp/audio.wav",
sessionId,
windowIndex: 7,
retryCount: 0,
sizeBytes: 1234,
};
const flushMicrotasks = () => new Promise((r) => setImmediate(r));
describe("uploadAudioItem", () => {
let originalFetch: typeof global.fetch;
let mockApiPost: jest.Mock;
let mockFetch: jest.Mock;
let mockReadBytes: jest.Mock;
beforeEach(() => {
originalFetch = global.fetch;
mockApiPost = jest.fn().mockResolvedValue({
url: "https://s3.example/presigned",
s3Key: `sessions/${sessionId}/window_007/audio.wav`,
windowIndex: 7,
expiresIn: 300,
});
mockFetch = jest.fn().mockResolvedValue({ ok: true, status: 200 });
mockReadBytes = jest.fn().mockResolvedValue(new Uint8Array(1234));
global.fetch = mockFetch as unknown as typeof global.fetch;
});
afterEach(() => {
global.fetch = originalFetch;
jest.restoreAllMocks();
});
test("uploads_audio_when_both_url_grant_and_put_succeed", async () => {
await uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
});
expect(mockApiPost).toHaveBeenCalledWith(
`/sessions/${sessionId}/audio?windowIndex=7`,
JSON.stringify({ sizeBytes: 1234 }),
expect.objectContaining({
headers: { "Content-Type": "application/json" },
}),
);
expect(mockFetch).toHaveBeenCalledWith(
"https://s3.example/presigned",
expect.objectContaining({
method: "PUT",
headers: {
"Content-Type": "audio/wav",
"Content-Length": "1234",
},
}),
);
});
test("does_not_remove_item_from_queue_if_put_throws", async () => {
mockFetch.mockRejectedValueOnce(new Error("network down"));
await expect(
uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
}),
).rejects.toThrow(/network down/);
});
test("parks_item_when_url_grant_returns_5xx", async () => {
mockApiPost.mockRejectedValueOnce(Object.assign(new Error("500"), { status: 500 }));
await expect(
uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
}),
).rejects.toMatchObject({ status: 500 });
expect(mockFetch).not.toHaveBeenCalled();
});
test("parks_item_when_put_to_s3_returns_5xx", async () => {
mockFetch.mockResolvedValueOnce({ ok: false, status: 503 });
await expect(
uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
}),
).rejects.toThrow(/S3 PUT failed/);
});
test("parks_item_when_put_returns_403_signature_mismatch", async () => {
mockFetch.mockResolvedValueOnce({ ok: false, status: 403 });
await expect(
uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
}),
).rejects.toThrow(/S3 PUT failed/);
});
test("refetches_url_after_5xx_retry_does_not_reuse_stale_url", async () => {
// Simulate two consecutive uploads of the same item — each must
// request its own URL, not reuse a cached one.
await uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
});
await uploadAudioItem({
item: baseItem,
sessionId,
apiPost: mockApiPost,
readBytes: mockReadBytes,
});
expect(mockApiPost).toHaveBeenCalledTimes(2);
expect(mockFetch).toHaveBeenCalledTimes(2);
});
});
- [ ] Step 2: Run, confirm fail (uploadAudioItem doesn't exist yet)
Run: cd main/app && npx jest __tests__/capture-session-audio-upload.test.ts Expected: FAIL — uploadAudioItem not exported from lib/capture-session.
- [ ] Step 3: Implement
uploadAudioItemincapture-session.ts
Open main/app/lib/capture-session.ts. Find the audio branch of uploadItem (currently around lines 103-112). Extract the audio upload logic into a new exported function. Add near the top of the file (after imports):
export interface UploadAudioDeps {
item: {
uri: string;
windowIndex?: number;
sizeBytes: number;
};
sessionId: string;
apiPost: (
path: string,
body: string,
options: { headers: Record<string, string>; timeout?: number },
) => Promise<{ url: string; s3Key: string; windowIndex: number; expiresIn: number }>;
readBytes: (uri: string) => Promise<Uint8Array>;
}
export async function uploadAudioItem(deps: UploadAudioDeps): Promise<void> {
const { item, sessionId, apiPost, readBytes } = deps;
const windowIndex = item.windowIndex ?? 0;
const bytes = await readBytes(item.uri);
const sizeBytes = bytes.byteLength;
const grant = await apiPost(
`/sessions/${sessionId}/audio?windowIndex=${windowIndex}`,
JSON.stringify({ sizeBytes }),
{
headers: { "Content-Type": "application/json" },
timeout: 5000,
},
);
const PUT_TIMEOUT_MS = 60000;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), PUT_TIMEOUT_MS);
let putResp: Response;
try {
putResp = await fetch(grant.url, {
method: "PUT",
headers: {
"Content-Type": "audio/wav",
"Content-Length": String(sizeBytes),
},
body: bytes as unknown as BodyInit,
signal: controller.signal,
});
} finally {
clearTimeout(timeoutId);
}
if (!putResp.ok) {
throw new Error(`S3 PUT failed: ${putResp.status}`);
}
}
Then replace the audio branch in uploadItem (the function around line 88-114) to call this helper:
if (item.type === "frame") {
await api.post(`/sessions/${sessionId}/frames`, bytes, {
headers: { "Content-Type": "image/jpeg" },
timeout: 10000,
});
const filename = item.uri.split("/").pop() || "";
await markUploaded(sessionId, filename);
} else {
const file = new File(toFileUri(item.uri));
await uploadAudioItem({
item,
sessionId,
apiPost: async (path, body, options) => {
const res = await api.post<{
url: string;
s3Key: string;
windowIndex: number;
expiresIn: number;
}>(path, body, options);
return res;
},
readBytes: async (_uri) => await file.bytes(),
});
}
Note: this assumes api.post returns a parsed JSON body for the URL grant call. If api.post only returns the raw response, adapt accordingly using whatever the existing API client exposes for typed reads.
- [ ] Step 4: Run tests, confirm pass
Run: cd main/app && npx jest __tests__/capture-session-audio-upload.test.ts Expected: 6 PASS.
- [ ] Step 5: Commit
git add main/app/lib/capture-session.ts main/app/__tests__/capture-session-audio-upload.test.ts
git commit -m "feat(client): switch audio upload to presigned PUT flow
uploadAudioItem requests presigned URL from server, then PUTs raw
WAV bytes directly to S3 — bypasses API Gateway 29s cap. Frame
upload unchanged.
Refs: docs/plans/2026-05-19-sessions-audio-presigned.md"
Task 13: Open PR + run CI
- [ ] Step 1: Push branch
- [ ] Step 2: Open PR
gh pr create --title "feat(audio): migrate sessions/audio to presigned PUT + S3 event" --body "$(cat <<'EOF'
## Summary
- Rewrites \`SessionAudioFunction\` as a presigned PUT URL minter (no body through Lambda).
- Adds \`AudioUploadCompleteFunction\` (new \`events/\` top-level dir) — S3 ObjectCreated triggered, handles DDB ADD + ingest claim+trigger.
- Removes legacy full-session upload path (no users today).
- Strict URL bindings: \`ContentType=audio/wav\`, exact \`ContentLength\`, 5 min expiry.
- Dedicated DLQ for the new Lambda with 2-retry policy.
Eliminates the 29s API Gateway cap failure class for multi-MB WAV chunks (root cause: audio body routes through Lambda memory; 60s WAV window ≈ 2 MB, slow cell upload exceeds cap).
## Spec
\`docs/plans/2026-05-19-sessions-audio-presigned.md\`
## Test plan
- [ ] \`pytest main/server/tests/unit/test_audio_url_minter.py -v\` → 12 PASS
- [ ] \`pytest main/server/tests/unit/test_audio_upload_complete.py -v\` → 10 PASS
- [ ] \`pytest main/server/tests/integration/test_audio_presigned_flow.py -v\` → up to 3 PASS (strict-binding cases may skip on moto)
- [ ] \`cd main/app && npx jest __tests__/capture-session-audio-upload.test.ts\` → 6 PASS
- [ ] \`sam validate --lint\` clean
- [ ] CI green on this PR
- [ ] (Post-merge) \`sam deploy\` to dev, exercise capture flow on test device, confirm audio lands in S3 + DDB updates + ingest fires
## Follow-ups
- #496 property-based tests
- #497 real-AWS integration test
- #498 load test for AudioUploadCompleteFunction
🤖 Generated with [Claude Code](https://claude.com/claude-code)
EOF
)"
- [ ] Step 3: Watch CI until green
Run: gh pr checks --watch Expected: all checks pass. If any fail, diagnose and fix in a new commit (do not amend).
Task 14: Deploy + manual verification
- [ ] Step 1: Deploy to dev environment
Run: cd main/server && sam deploy --no-confirm-changeset Expected: stack updates successfully. CloudFormation events show creation of AudioUploadCompleteFunction, AudioUploadCompleteDLQ, AudioUploadCompletePermission, and modification of AudioPostFunction and RawMemoryBucket.
- [ ] Step 2: Exercise capture flow on test device
Manual: open the RN app on a test device, start a capture session, let it run long enough to produce at least one audio window (60s), then end the session.
- [ ] Step 3: Verify S3 + DDB + ingest fired
Run (substitute the actual sessionId from device logs):
aws s3 ls s3://encache-raw-memory/sessions/<SESSION_ID>/window_000/
aws dynamodb get-item --table-name encache-sessions --key '{"sessionId":{"S":"<SESSION_ID>"}}' | jq '.Item.completedAudioWindows, .Item.ingestTriggeredWindows'
aws logs tail /aws/lambda/<stack>-AudioUploadCompleteFunction --since 10m
audio.wav in the window directory. - DDB shows the window index in both completedAudioWindows and ingestTriggeredWindows. - CloudWatch logs show step=audio_upload_registered for the window. - [ ] Step 4: Trigger artificial failure to verify DLQ
Manually upload an unknown-shaped object to a sessions/...audio.wav path that does not match the regex, then verify DLQ stays empty (malformed key logs + returns success). Then upload a key that matches but for a session that does not exist in DDB; verify the same.
For a true DLQ test, temporarily revoke the function's lambda:InvokeFunction permission, upload a valid event, verify DLQ receives the message after 2 retries, then restore the permission.
- [ ] Step 5: Mark plan executed
In docs/plans/2026-05-19-sessions-audio-presigned.md, change Status: draft to Status: documentation. Commit:
git add docs/plans/2026-05-19-sessions-audio-presigned.md
git commit -m "docs(plans): mark sessions/audio presigned plan as documentation"
Self-review checklist (for the engineer executing this plan)
After each task: run the full server test suite (pytest main/server/tests/ -v) to catch unintended breakage.
After Task 11 specifically: run the full suite plus cd main/app && npx jest. The audio behavior change is the most blast-radius change; nothing else should be affected, but verify.
Before Task 13 (PR open): ensure all the test commands in the PR template pass locally.
After deploy (Task 14): if anything fails, git revert the deploy commit and re-deploy — see Rollback section in spec.