Ingest Streaming Pipeline
System Intent
- What this is: The end-to-end pipeline by which a recording session — from any of three capture sources (wearable glasses, glasses-audio-only, or phone microphone) — is broken into 30-second windowed chunks on-device, uploaded to S3, and handed off to the per-window ingest Lambda. All three capture paths share the same server-side pipeline; differences are confined to client-side capture mechanics and the
captureMode flag set at session start.
Mermaid Diagram
flowchart TD
subgraph "Client — Glasses (audio+video)"
GAudio[WearablesModule\nstartAudioCapture\n30s WAV chunks\nonAudioChunkReady event]
GFrames[WearablesModule\nstartRecordingFrames\nevery frame → JPEG on disk\nstartFramePolling every 500ms]
end
subgraph "Client — Glasses-Audio (audio only)"
GAAudio[WearablesModule\nstartAudioCapture\n30s WAV chunks\nonAudioChunkReady event]
end
subgraph "Client — Phone (audio only)"
PAudio[phone-audio-capture.ts\nexpo-av\n30s WAV chunks\nonChunkReady callback]
end
subgraph "Client — PersistentUploadQueue"
Queue[enqueue audio / frame]
end
GAudio -->|windowIndex=n, sizeBytes| Queue
GAAudio -->|windowIndex=n, sizeBytes| Queue
PAudio -->|windowIndex=n, sizeBytes| Queue
GFrames -->|image/jpeg| Queue
subgraph "Server — Audio path"
AudioGrant["POST /sessions/{id}/audio?windowIndex=n\n→ presigned S3 PUT URL (5min TTL)"]
AudioS3[Client PUT audio/wav → S3\nsessions/{id}/window_NNN/audio.wav]
AudioComplete[AudioUploadCompleteFunction\nEventBridge S3 ObjectCreated]
end
subgraph "Server — Frame path (Android/POST)"
FramePost["POST /sessions/{id}/frames\nframes/app.py\nDynamoDB frameCount++\nS3 write frame_NNN.jpg"]
end
subgraph "Server — Frame path (iOS/presigned)"
FramePresign["GET /sessions/{id}/frames/presigned-url\n→ presigned S3 PUT URL (1h TTL)"]
FrameS3["Client PUT image/jpeg → S3\n(URLSession background upload)"]
end
Queue -->|audio| AudioGrant
AudioGrant --> AudioS3
AudioS3 -->|ObjectCreated event| AudioComplete
Queue -->|frame, iOS| FramePresign
FramePresign --> FrameS3
Queue -->|frame, Android| FramePost
AudioComplete -->|captureMode=audio_only OR window in completedFrameWindows| IngestTrigger
FramePost -->|60th frame AND window in completedAudioWindows| IngestTrigger
IngestTrigger[IngestWindowFunction\nasync Lambda invoke]
Flows
Flow: glasses.audioChunks
- Core files:
- Android:
main/app/wearables-module/android/src/main/java/expo/modules/wearablesmodule/WearablesModule.kt - iOS:
main/app/wearables-module/ios/WearablesModule.swift - Client orchestration:
main/app/lib/capture-session.ts
Types
AudioChunkEvent {
filePath: string (filesystem path to WAV file — no file:// prefix on Android, native path on iOS)
windowIndex: number (0-based, monotonically increasing)
durationMs: number (actual duration of this chunk)
sizeBytes: number (file size in bytes)
}
Paths
| path | input | output | path-type | notes |
glasses.audioChunks.normal | PCM bytes from glasses HFP port | onAudioChunkReady event every ~30 s | happy path | Chunk fires when buffer reaches 960,000 bytes (30s × 16kHz × 16-bit × mono) |
glasses.audioChunks.finalPartial | WearablesModule.stopAudioCapture() | onAudioChunkReady for remaining buffer (any size) | happy path | Final partial chunk is always emitted; may be < 30 s |
Pseudocode
// Shared across Android and iOS
pcmBytesPerChunk = 960_000 // 30s × 16kHz × 16-bit × mono
sampleRate = 16_000 Hz, mono, 16-bit
onPCMData(buffer):
audioChunkBuffer += buffer
if audioChunkBuffer.size >= pcmBytesPerChunk:
flushAudioChunk(isFinal=false)
stopAudioCapture():
flushAudioChunk(isFinal=true) // emits partial chunk even if < 30s
emit onAudioChunkReady
// Android: AudioRecord(MediaRecorder.AudioSource.MIC) — captures ambient room audio
// iOS: AVAudioEngine tap on HFP input port matching "ray-ban"/"meta" by name
// Both write WAV header + PCM to temp file, emit onAudioChunkReady event
Flow: glasses.frameCapture
- Core files:
- Android:
main/app/wearables-module/android/src/main/java/expo/modules/wearablesmodule/WearablesModule.kt - iOS:
main/app/wearables-module/ios/WearablesModule.swift - Polling:
main/app/lib/capture-session.ts → startFramePolling
Paths
| path | input | output | path-type | notes |
glasses.frames.write | videoFramePublisher events from Meta SDK | JPEG files in temp dir (frame_NNNNN_<timestamp>.jpg) | happy path | Android: 70% quality; iOS: 70% quality via jpegData(compressionQuality: 0.7) |
glasses.frames.poll | setInterval(500ms) | new frames enqueued into PersistentUploadQueue | happy path | Polling reads directory, compares against dedup index, enqueues unseen frames |
glasses.frames.finalPoll | stopRecordingFrames() | remaining frames collected and enqueued | happy path | Final sweep on teardown |
Pseudocode
// Frame write (native, event-driven)
videoFramePublisher.listen { frame ->
jpegData = frame.makeUIImage().jpegData(compressionQuality: 0.7)
write to framesDirectory/frame_NNNNN_<timestamp>.jpg
frameCount++
}
// Frame poll (JS, 500ms interval)
setInterval(() => {
files = readDir(recordingDir)
unseen = files.filter(f => !uploadedSet.has(f))
unseen.forEach(f => uploadQueue.enqueue("frame", f))
}, 500)
Flow: phone.audioChunks
- Core files:
main/app/lib/phone-audio-capture.ts
Types
AudioChunkEvent {
filePath: string (absolute file:// URI to WAV file on device)
windowIndex: number (0-based sequential chunk index)
durationMs: number (actual duration of this chunk in milliseconds)
sizeBytes: number (file size in bytes)
}
Paths
| path | input | output | path-type | notes |
phone.audioChunks.normal | expo-av Audio.Recording | onChunkReady callback every ~30 s | happy path | setInterval(30_000): stops current recording, starts next, finalizes previous |
phone.audioChunks.finalPartial | stopPhoneAudioCapture() | onChunkReady for remaining partial chunk | happy path | < 30 s; always emitted on stop |
Pseudocode
// Phone path — pure JS using expo-av
chunkTimer = setInterval(() => {
snap currentRecording + windowIndex
startNewChunk() → nextRecording
windowIndex++
finalizeChunk(currentRecording) // emits onChunkReady
}, 30_000)
// HIGH_QUALITY preset: 44.1kHz mono WAV (compatible with /sessions/{id}/audio endpoint)
// Note: server transcription uses Groq whisper-large-v3, which handles both 44.1kHz and 16kHz
Flow: audioUpload
- Core files:
- Client:
main/app/lib/capture-session.ts → uploadAudioItem - Server (grant):
main/server/api/sessions/audio/app.py - Server (complete):
main/server/events/audio_upload_complete/app.py
Types
AudioUploadGrantRequest {
POST /sessions/{sessionId}/audio?windowIndex=n
body: { sizeBytes: number }
}
AudioUploadGrantResponse {
url: string (presigned S3 PUT URL, TTL=300s)
s3Key: string (sessions/{sessionId}/window_NNN/audio.wav)
windowIndex: number
expiresIn: number (300)
}
AudioUploadCompleteEvent (EventBridge) {
source: "aws.s3"
detail-type: "Object Created"
detail.object.key: "sessions/{sessionId}/window_NNN/audio.wav"
}
Paths
| path | input | output | path-type | notes |
audioUpload.grant | AudioUploadGrantRequest | presigned PUT URL | happy path | Lambda validates session exists, sizeBytes <= 10MB, windowIndex >= 0 |
audioUpload.put | Client PUT to presigned URL | audio.wav written to S3 | happy path | Client timeout: 60s; PUT bypasses API Gateway's 29s limit |
audioUpload.complete | EventBridge S3 ObjectCreated | DynamoDB completedAudioWindows updated; ingest triggered if conditions met | happy path | Replaces old POST-body path; no API Gateway body size limit |
audioUpload.ingestTrigger | captureMode=audio_only OR windowIndex in completedFrameWindows | async invoke of IngestWindowFunction | happy path | Dedup claim via conditional DynamoDB ADD to ingestTriggeredWindows |
audioUpload.grant.badRequest | missing windowIndex, sizeBytes <= 0, sizeBytes > 10MB | HTTP 400 | error | |
audioUpload.grant.notFound | sessionId not in DynamoDB | HTTP 404 | error | |
Flow: frameUpload.android
- Core files:
main/server/api/sessions/frames/app.py
Types
FrameUploadRequest {
POST /sessions/{sessionId}/frames
body: image/jpeg bytes
}
FrameUploadResponse {
frameCount: number
windowIndex: number
ingestTriggered: boolean
}
Paths
| path | input | output | path-type | notes |
frameUpload.android.normal | JPEG body (< 60th frame) | { frameCount, windowIndex, ingestTriggered: false } | happy path | DynamoDB atomic increment; S3 write to window_NNN/frame_NNN.jpg |
frameUpload.android.windowComplete | JPEG body (60th frame for window) | { ingestTriggered: true } | happy path | DynamoDB: frameCount→0, currentFrameWindow+1, windowIndex→completedFrameWindows; ingest triggered if audio already complete |
Flow: frameUpload.ios
- Core files:
main/app/lib/capture-session.ts → uploadItem - Note: The iOS frame presigned URL grant is handled client-side via
capture-session.ts:117. There is no separate server-side Lambda for this grant; the frames/app.py endpoint (POST /sessions/{id}/frames) is the only server-side frames handler and is used by the Android path. The iOS presigned URL flow has no corresponding server-side implementation in the current codebase.
Paths
| path | input | output | path-type | notes |
frameUpload.ios.enqueue | presigned URL grant + WearablesModule.enqueueBackgroundUpload | URLSession background upload task started | happy path | Presigned URL TTL = 3600s (1 hour) — known limitation; should be >= 86400s |
frameUpload.ios.complete | Native URLSession delegate | frame filename written to uploaded_frames.json dedup index | happy path | OS manages retries; survives app suspension/termination/reboot |
Flow: ingestTrigger
- Core files:
main/server/worldmm/pipeline/ingest_window.py - See also:
docs/docs/ingest-window.md
Paths
| path | input | output | path-type | notes |
ingestTrigger.audioOnly | { sessionId, userId, windowIndex, frameCount=0 } | transcription only; segment created; processing_status=complete | happy path | Phone and glasses-audio paths; no GPU contact |
ingestTrigger.audioVideo | { sessionId, userId, windowIndex, frameCount=60 } | frames + audio → caption, NER, triples, visual embedding | happy path | Glasses path; GPU required |
ingestTrigger.gpuUnavailable | GPU SSM sentinel = "none" | GpuUnavailableError; message routed to IngestDLQ | error | DLQ consumer retries after GPU auto-start |
Key Constants
| Constant | Value | Source | Applies to |
| Audio chunk size (PCM bytes) | 960,000 | WearablesModule.kt, WearablesModule.swift | Glasses audio (both platforms) |
| Audio chunk duration | 30 seconds | 960,000 ÷ (16,000 Hz × 2 bytes × 1 channel) | Glasses audio |
| Audio chunk interval | 30,000 ms | phone-audio-capture.ts setInterval | Phone audio |
| Audio sample rate (glasses) | 16,000 Hz, mono, 16-bit | Android AUDIO_SAMPLE_RATE, iOS sampleRate | Glasses audio |
| Audio sample rate (phone) | 44,100 Hz (HIGH_QUALITY preset) | expo-av | Phone audio |
| Frames per window | 60 | main/server/api/sessions/frames/app.py FRAMES_PER_WINDOW | Glasses video |
| Frame polling interval | 500 ms | capture-session.ts startFramePolling | Glasses video (both platforms) |
| Frame upload timeout | 10,000 ms | capture-session.ts uploadItem | Android frame POST |
| Presigned URL TTL (audio) | 300 s | main/server/api/sessions/audio/app.py URL_EXPIRES_SECONDS | All audio uploads |
| Presigned URL TTL (frame, iOS) | 3,600 s | frames presigned-url Lambda | iOS frame uploads (known limitation) |
| Audio PUT timeout | 60,000 ms | capture-session.ts uploadAudioItem | All audio |
| Max audio size | 10 MB | main/server/api/sessions/audio/app.py MAX_AUDIO_BYTES | All audio |
Per-Source Differences
| Dimension | Glasses (audio+video) | Glasses-Audio | Phone |
captureMode | audio_video | audio_only | audio_only |
| Audio source | WearablesModule (HFP/AVAudioEngine) | WearablesModule (HFP/AVAudioEngine) | expo-av Audio.Recording |
| Audio mic type | Android: MediaRecorder.AudioSource.MIC (ambient, captures all voices) | same | expo-av HIGH_QUALITY (device mic) |
| Audio sample rate | 16 kHz mono 16-bit | 16 kHz mono 16-bit | 44.1 kHz mono (expo-av preset) |
| Audio chunk mechanism | native byte-count threshold (960KB) | same | JS setInterval(30_000) |
| Audio chunk delivery | onAudioChunkReady native event | onAudioChunkReady native event | onChunkReady JS callback |
| Video frames captured | yes — videoFramePublisher → JPEG | no | no |
| Frame upload path (iOS) | URLSession background presigned PUT | N/A | N/A |
| Frame upload path (Android) | POST /sessions/{id}/frames every 500ms poll | N/A | N/A |
| Ingest trigger condition | audio AND frame window both complete | audio complete (audio_only) | audio complete (audio_only) |
frameCount passed to ingest | 60 | 0 | 0 |
| GPU used in ingest | yes (caption, NER, triples, visual embedding) | no | no |
Shared Pipeline Architecture
All three paths converge at PersistentUploadQueue on the client and at IngestWindowFunction on the server. The shared structure is:
- Client: Audio chunks (all three) are delivered to
PersistentUploadQueue.enqueue("audio", ...) with windowIndex. Frames (glasses only) are enqueued as "frame" items. - Client:
uploadAudioItem issues a presigned URL grant (POST /sessions/{id}/audio?windowIndex=n) then PUTs audio directly to S3. - Server:
AudioUploadCompleteFunction is triggered by EventBridge on every sessions/*/window_*/audio.wav S3 ObjectCreated event. It updates completedAudioWindows in DynamoDB and conditionally triggers ingest. - Server: Frame upload (
POST /sessions/{id}/frames or iOS presigned PUT) updates completedFrameWindows. On the 60th frame per window, ingest is conditionally triggered. - Server: Ingest is deduped via a conditional
ADD to ingestTriggeredWindows in DynamoDB — only one path wins per window regardless of which arrives first. - Server:
IngestWindowFunction processes each window: loads audio from window_NNN/audio.wav, loads frames from window_NNN/frame_*.jpg. For audio_only sessions (phone or glasses-audio), frameCount=0 is passed and the Lambda skips the GPU path.
Logs
| Source | Location |
| Audio URL grant Lambda | CloudWatch: /aws/lambda/<stack>-AudioPostFunction (step key: audio_url_granted) |
| Audio upload complete Lambda | CloudWatch: /aws/lambda/<stack>-AudioUploadCompleteFunction (step keys: audio_upload_registered, ingest_claim_lost, ingest_claim_rolled_back, ingest_claim_rollback_failed) |
| Frame upload Lambda | CloudWatch: /aws/lambda/<stack>-FramePostFunction |
| Ingest window Lambda | CloudWatch: /aws/lambda/IngestWindowFunction |
| DLQ consumer Lambda | CloudWatch: /aws/lambda/IngestDLQConsumer |
| Client capture | createFlowLogger("capture") — step keys include capture_start, session_created, capture_phone_mode_starting, capture_audio_starting, capture_flush_queue, frame_upload_enqueued_ios, frame_upload_ios_error |
Deployment
- Mechanism:
SAM (server), local only (client app) - Deploy command:
cd main/server && sam build && sam deploy --template .aws-sam/build/template.yaml
- Notes:
AudioUploadCompleteFunction is triggered by EventBridge S3 ObjectCreated on encache-raw-memory, key prefix sessions/, suffix audio.wav. - The old audio upload path (POST body to Lambda) is replaced by the presigned URL grant + S3 direct PUT flow. The
upload-audio.md doc describes the former design; the current server code at main/server/api/sessions/audio/app.py issues presigned URLs instead of accepting audio bodies. - iOS frame uploads use background URLSession with
URLSessionConfiguration.background(withIdentifier: "com.encache.frame-uploads"); presigned URL TTL is 3600s (known limitation — should be >= 86400s). - Android frame uploads and phone audio uploads use the
fetch API path via PersistentUploadQueue.