Skip to content

Ingest Streaming Pipeline

Metadata

  • System type: flow

System Intent

  • What this is: The end-to-end pipeline by which a recording session — from any of three capture sources (wearable glasses, glasses-audio-only, or phone microphone) — is broken into 30-second windowed chunks on-device, uploaded to S3, and handed off to the per-window ingest Lambda. All three capture paths share the same server-side pipeline; differences are confined to client-side capture mechanics and the captureMode flag set at session start.

Mermaid Diagram

flowchart TD
  subgraph "Client — Glasses (audio+video)"
    GAudio[WearablesModule\nstartAudioCapture\n30s WAV chunks\nonAudioChunkReady event]
    GFrames[WearablesModule\nstartRecordingFrames\nevery frame → JPEG on disk\nstartFramePolling every 500ms]
  end

  subgraph "Client — Glasses-Audio (audio only)"
    GAAudio[WearablesModule\nstartAudioCapture\n30s WAV chunks\nonAudioChunkReady event]
  end

  subgraph "Client — Phone (audio only)"
    PAudio[phone-audio-capture.ts\nexpo-av\n30s WAV chunks\nonChunkReady callback]
  end

  subgraph "Client — PersistentUploadQueue"
    Queue[enqueue audio / frame]
  end

  GAudio -->|windowIndex=n, sizeBytes| Queue
  GAAudio -->|windowIndex=n, sizeBytes| Queue
  PAudio -->|windowIndex=n, sizeBytes| Queue
  GFrames -->|image/jpeg| Queue

  subgraph "Server — Audio path"
    AudioGrant["POST /sessions/{id}/audio?windowIndex=n\n→ presigned S3 PUT URL (5min TTL)"]
    AudioS3[Client PUT audio/wav → S3\nsessions/{id}/window_NNN/audio.wav]
    AudioComplete[AudioUploadCompleteFunction\nEventBridge S3 ObjectCreated]
  end

  subgraph "Server — Frame path (Android/POST)"
    FramePost["POST /sessions/{id}/frames\nframes/app.py\nDynamoDB frameCount++\nS3 write frame_NNN.jpg"]
  end

  subgraph "Server — Frame path (iOS/presigned)"
    FramePresign["GET /sessions/{id}/frames/presigned-url\n→ presigned S3 PUT URL (1h TTL)"]
    FrameS3["Client PUT image/jpeg → S3\n(URLSession background upload)"]
  end

  Queue -->|audio| AudioGrant
  AudioGrant --> AudioS3
  AudioS3 -->|ObjectCreated event| AudioComplete

  Queue -->|frame, iOS| FramePresign
  FramePresign --> FrameS3

  Queue -->|frame, Android| FramePost

  AudioComplete -->|captureMode=audio_only OR window in completedFrameWindows| IngestTrigger
  FramePost -->|60th frame AND window in completedAudioWindows| IngestTrigger

  IngestTrigger[IngestWindowFunction\nasync Lambda invoke]

Flows

Flow: glasses.audioChunks

  • Core files:
  • Android: main/app/wearables-module/android/src/main/java/expo/modules/wearablesmodule/WearablesModule.kt
  • iOS: main/app/wearables-module/ios/WearablesModule.swift
  • Client orchestration: main/app/lib/capture-session.ts

Types

AudioChunkEvent {
  filePath: string      (filesystem path to WAV file — no file:// prefix on Android, native path on iOS)
  windowIndex: number   (0-based, monotonically increasing)
  durationMs: number    (actual duration of this chunk)
  sizeBytes: number     (file size in bytes)
}

Paths

path input output path-type notes
glasses.audioChunks.normal PCM bytes from glasses HFP port onAudioChunkReady event every ~30 s happy path Chunk fires when buffer reaches 960,000 bytes (30s × 16kHz × 16-bit × mono)
glasses.audioChunks.finalPartial WearablesModule.stopAudioCapture() onAudioChunkReady for remaining buffer (any size) happy path Final partial chunk is always emitted; may be < 30 s

Pseudocode

// Shared across Android and iOS
pcmBytesPerChunk = 960_000  // 30s × 16kHz × 16-bit × mono
sampleRate = 16_000 Hz, mono, 16-bit

onPCMData(buffer):
  audioChunkBuffer += buffer
  if audioChunkBuffer.size >= pcmBytesPerChunk:
    flushAudioChunk(isFinal=false)

stopAudioCapture():
  flushAudioChunk(isFinal=true)   // emits partial chunk even if < 30s
  emit onAudioChunkReady

// Android: AudioRecord(MediaRecorder.AudioSource.MIC) — captures ambient room audio
// iOS: AVAudioEngine tap on HFP input port matching "ray-ban"/"meta" by name
// Both write WAV header + PCM to temp file, emit onAudioChunkReady event

Flow: glasses.frameCapture

  • Core files:
  • Android: main/app/wearables-module/android/src/main/java/expo/modules/wearablesmodule/WearablesModule.kt
  • iOS: main/app/wearables-module/ios/WearablesModule.swift
  • Polling: main/app/lib/capture-session.tsstartFramePolling

Paths

path input output path-type notes
glasses.frames.write videoFramePublisher events from Meta SDK JPEG files in temp dir (frame_NNNNN_<timestamp>.jpg) happy path Android: 70% quality; iOS: 70% quality via jpegData(compressionQuality: 0.7)
glasses.frames.poll setInterval(500ms) new frames enqueued into PersistentUploadQueue happy path Polling reads directory, compares against dedup index, enqueues unseen frames
glasses.frames.finalPoll stopRecordingFrames() remaining frames collected and enqueued happy path Final sweep on teardown

Pseudocode

// Frame write (native, event-driven)
videoFramePublisher.listen { frame ->
  jpegData = frame.makeUIImage().jpegData(compressionQuality: 0.7)
  write to framesDirectory/frame_NNNNN_<timestamp>.jpg
  frameCount++
}

// Frame poll (JS, 500ms interval)
setInterval(() => {
  files = readDir(recordingDir)
  unseen = files.filter(f => !uploadedSet.has(f))
  unseen.forEach(f => uploadQueue.enqueue("frame", f))
}, 500)

Flow: phone.audioChunks

  • Core files: main/app/lib/phone-audio-capture.ts

Types

AudioChunkEvent {
  filePath: string      (absolute file:// URI to WAV file on device)
  windowIndex: number   (0-based sequential chunk index)
  durationMs: number    (actual duration of this chunk in milliseconds)
  sizeBytes: number     (file size in bytes)
}

Paths

path input output path-type notes
phone.audioChunks.normal expo-av Audio.Recording onChunkReady callback every ~30 s happy path setInterval(30_000): stops current recording, starts next, finalizes previous
phone.audioChunks.finalPartial stopPhoneAudioCapture() onChunkReady for remaining partial chunk happy path < 30 s; always emitted on stop

Pseudocode

// Phone path — pure JS using expo-av
chunkTimer = setInterval(() => {
  snap currentRecording + windowIndex
  startNewChunk() → nextRecording
  windowIndex++
  finalizeChunk(currentRecording)  // emits onChunkReady
}, 30_000)

// HIGH_QUALITY preset: 44.1kHz mono WAV (compatible with /sessions/{id}/audio endpoint)
// Note: server transcription uses Groq whisper-large-v3, which handles both 44.1kHz and 16kHz

Flow: audioUpload

  • Core files:
  • Client: main/app/lib/capture-session.tsuploadAudioItem
  • Server (grant): main/server/api/sessions/audio/app.py
  • Server (complete): main/server/events/audio_upload_complete/app.py

Types

AudioUploadGrantRequest {
  POST /sessions/{sessionId}/audio?windowIndex=n
  body: { sizeBytes: number }
}

AudioUploadGrantResponse {
  url: string           (presigned S3 PUT URL, TTL=300s)
  s3Key: string         (sessions/{sessionId}/window_NNN/audio.wav)
  windowIndex: number
  expiresIn: number     (300)
}

AudioUploadCompleteEvent (EventBridge) {
  source: "aws.s3"
  detail-type: "Object Created"
  detail.object.key: "sessions/{sessionId}/window_NNN/audio.wav"
}

Paths

path input output path-type notes
audioUpload.grant AudioUploadGrantRequest presigned PUT URL happy path Lambda validates session exists, sizeBytes <= 10MB, windowIndex >= 0
audioUpload.put Client PUT to presigned URL audio.wav written to S3 happy path Client timeout: 60s; PUT bypasses API Gateway's 29s limit
audioUpload.complete EventBridge S3 ObjectCreated DynamoDB completedAudioWindows updated; ingest triggered if conditions met happy path Replaces old POST-body path; no API Gateway body size limit
audioUpload.ingestTrigger captureMode=audio_only OR windowIndex in completedFrameWindows async invoke of IngestWindowFunction happy path Dedup claim via conditional DynamoDB ADD to ingestTriggeredWindows
audioUpload.grant.badRequest missing windowIndex, sizeBytes <= 0, sizeBytes > 10MB HTTP 400 error
audioUpload.grant.notFound sessionId not in DynamoDB HTTP 404 error

Flow: frameUpload.android

  • Core files: main/server/api/sessions/frames/app.py

Types

FrameUploadRequest {
  POST /sessions/{sessionId}/frames
  body: image/jpeg bytes
}

FrameUploadResponse {
  frameCount: number
  windowIndex: number
  ingestTriggered: boolean
}

Paths

path input output path-type notes
frameUpload.android.normal JPEG body (< 60th frame) { frameCount, windowIndex, ingestTriggered: false } happy path DynamoDB atomic increment; S3 write to window_NNN/frame_NNN.jpg
frameUpload.android.windowComplete JPEG body (60th frame for window) { ingestTriggered: true } happy path DynamoDB: frameCount→0, currentFrameWindow+1, windowIndex→completedFrameWindows; ingest triggered if audio already complete

Flow: frameUpload.ios

  • Core files: main/app/lib/capture-session.tsuploadItem
  • Note: The iOS frame presigned URL grant is handled client-side via capture-session.ts:117. There is no separate server-side Lambda for this grant; the frames/app.py endpoint (POST /sessions/{id}/frames) is the only server-side frames handler and is used by the Android path. The iOS presigned URL flow has no corresponding server-side implementation in the current codebase.

Paths

path input output path-type notes
frameUpload.ios.enqueue presigned URL grant + WearablesModule.enqueueBackgroundUpload URLSession background upload task started happy path Presigned URL TTL = 3600s (1 hour) — known limitation; should be >= 86400s
frameUpload.ios.complete Native URLSession delegate frame filename written to uploaded_frames.json dedup index happy path OS manages retries; survives app suspension/termination/reboot

Flow: ingestTrigger

  • Core files: main/server/worldmm/pipeline/ingest_window.py
  • See also: docs/docs/ingest-window.md

Paths

path input output path-type notes
ingestTrigger.audioOnly { sessionId, userId, windowIndex, frameCount=0 } transcription only; segment created; processing_status=complete happy path Phone and glasses-audio paths; no GPU contact
ingestTrigger.audioVideo { sessionId, userId, windowIndex, frameCount=60 } frames + audio → caption, NER, triples, visual embedding happy path Glasses path; GPU required
ingestTrigger.gpuUnavailable GPU SSM sentinel = "none" GpuUnavailableError; message routed to IngestDLQ error DLQ consumer retries after GPU auto-start

Key Constants

Constant Value Source Applies to
Audio chunk size (PCM bytes) 960,000 WearablesModule.kt, WearablesModule.swift Glasses audio (both platforms)
Audio chunk duration 30 seconds 960,000 ÷ (16,000 Hz × 2 bytes × 1 channel) Glasses audio
Audio chunk interval 30,000 ms phone-audio-capture.ts setInterval Phone audio
Audio sample rate (glasses) 16,000 Hz, mono, 16-bit Android AUDIO_SAMPLE_RATE, iOS sampleRate Glasses audio
Audio sample rate (phone) 44,100 Hz (HIGH_QUALITY preset) expo-av Phone audio
Frames per window 60 main/server/api/sessions/frames/app.py FRAMES_PER_WINDOW Glasses video
Frame polling interval 500 ms capture-session.ts startFramePolling Glasses video (both platforms)
Frame upload timeout 10,000 ms capture-session.ts uploadItem Android frame POST
Presigned URL TTL (audio) 300 s main/server/api/sessions/audio/app.py URL_EXPIRES_SECONDS All audio uploads
Presigned URL TTL (frame, iOS) 3,600 s frames presigned-url Lambda iOS frame uploads (known limitation)
Audio PUT timeout 60,000 ms capture-session.ts uploadAudioItem All audio
Max audio size 10 MB main/server/api/sessions/audio/app.py MAX_AUDIO_BYTES All audio

Per-Source Differences

Dimension Glasses (audio+video) Glasses-Audio Phone
captureMode audio_video audio_only audio_only
Audio source WearablesModule (HFP/AVAudioEngine) WearablesModule (HFP/AVAudioEngine) expo-av Audio.Recording
Audio mic type Android: MediaRecorder.AudioSource.MIC (ambient, captures all voices) same expo-av HIGH_QUALITY (device mic)
Audio sample rate 16 kHz mono 16-bit 16 kHz mono 16-bit 44.1 kHz mono (expo-av preset)
Audio chunk mechanism native byte-count threshold (960KB) same JS setInterval(30_000)
Audio chunk delivery onAudioChunkReady native event onAudioChunkReady native event onChunkReady JS callback
Video frames captured yes — videoFramePublisher → JPEG no no
Frame upload path (iOS) URLSession background presigned PUT N/A N/A
Frame upload path (Android) POST /sessions/{id}/frames every 500ms poll N/A N/A
Ingest trigger condition audio AND frame window both complete audio complete (audio_only) audio complete (audio_only)
frameCount passed to ingest 60 0 0
GPU used in ingest yes (caption, NER, triples, visual embedding) no no

Shared Pipeline Architecture

All three paths converge at PersistentUploadQueue on the client and at IngestWindowFunction on the server. The shared structure is:

  1. Client: Audio chunks (all three) are delivered to PersistentUploadQueue.enqueue("audio", ...) with windowIndex. Frames (glasses only) are enqueued as "frame" items.
  2. Client: uploadAudioItem issues a presigned URL grant (POST /sessions/{id}/audio?windowIndex=n) then PUTs audio directly to S3.
  3. Server: AudioUploadCompleteFunction is triggered by EventBridge on every sessions/*/window_*/audio.wav S3 ObjectCreated event. It updates completedAudioWindows in DynamoDB and conditionally triggers ingest.
  4. Server: Frame upload (POST /sessions/{id}/frames or iOS presigned PUT) updates completedFrameWindows. On the 60th frame per window, ingest is conditionally triggered.
  5. Server: Ingest is deduped via a conditional ADD to ingestTriggeredWindows in DynamoDB — only one path wins per window regardless of which arrives first.
  6. Server: IngestWindowFunction processes each window: loads audio from window_NNN/audio.wav, loads frames from window_NNN/frame_*.jpg. For audio_only sessions (phone or glasses-audio), frameCount=0 is passed and the Lambda skips the GPU path.

Logs

Source Location
Audio URL grant Lambda CloudWatch: /aws/lambda/<stack>-AudioPostFunction (step key: audio_url_granted)
Audio upload complete Lambda CloudWatch: /aws/lambda/<stack>-AudioUploadCompleteFunction (step keys: audio_upload_registered, ingest_claim_lost, ingest_claim_rolled_back, ingest_claim_rollback_failed)
Frame upload Lambda CloudWatch: /aws/lambda/<stack>-FramePostFunction
Ingest window Lambda CloudWatch: /aws/lambda/IngestWindowFunction
DLQ consumer Lambda CloudWatch: /aws/lambda/IngestDLQConsumer
Client capture createFlowLogger("capture") — step keys include capture_start, session_created, capture_phone_mode_starting, capture_audio_starting, capture_flush_queue, frame_upload_enqueued_ios, frame_upload_ios_error

Deployment

  • Mechanism: SAM (server), local only (client app)
  • Deploy command:
    cd main/server && sam build && sam deploy --template .aws-sam/build/template.yaml
    
  • Notes:
  • AudioUploadCompleteFunction is triggered by EventBridge S3 ObjectCreated on encache-raw-memory, key prefix sessions/, suffix audio.wav.
  • The old audio upload path (POST body to Lambda) is replaced by the presigned URL grant + S3 direct PUT flow. The upload-audio.md doc describes the former design; the current server code at main/server/api/sessions/audio/app.py issues presigned URLs instead of accepting audio bodies.
  • iOS frame uploads use background URLSession with URLSessionConfiguration.background(withIdentifier: "com.encache.frame-uploads"); presigned URL TTL is 3600s (known limitation — should be >= 86400s).
  • Android frame uploads and phone audio uploads use the fetch API path via PersistentUploadQueue.