Skip to content

System Diagram: glasses-video-processing-pipeline

Flow Name

glasses-video-processing-pipeline

Summary

This document describes the end-to-end glasses-mode video upload and processing pipeline, the failure mode discovered on 2026-04-26, and the plan to fix it plus create a retry Lambda.


System Overview

Glasses Mode (captureMode = "audio_video")

The user selects "glasses" as their recording device in the mobile app. The capture-session module starts:

  1. WearablesModule.startAudioCapture() — streams 30-second WAV audio chunks from the glasses' Bluetooth HFP microphone; each chunk fires an onAudioChunkReady event.
  2. WearablesModule.startStreamSession() — activates the glasses LED.
  3. WearablesModule.startRecordingFrames() — returns a local directory path; the app polls that directory every 500 ms for new .jpg frames.

Both audio chunks and frames are enqueued in a PersistentUploadQueue and uploaded to the server as they arrive.


Pipeline Stages

Stage 1 — Session Start

  • Endpoint: POST /sessions/start
  • Lambda: SessionStartFunctionmain/server/api/sessions/start/app.py
  • DynamoDB record created:
  • sessionId, userId, captureMode: "audio_video"
  • frameCount: 0, currentFrameWindow: 0, status: "active", createdAt

Stage 2 — Frame Upload

  • Endpoint: POST /sessions/{sessionId}/frames
  • Lambda: FramePostFunctionmain/server/api/sessions/frames/app.py
  • S3 key: sessions/{sessionId}/window_{windowIndex:03d}/frame_{frameNum:03d}.jpg
  • Window completes after 60 frames. When complete:
  • frameCount resets to 0, currentFrameWindow advances.
  • Ingest is triggered only if audio for the same window is also ready.

Stage 3 — Audio Chunk Upload

  • Endpoint: POST /sessions/{sessionId}/audio?windowIndex=N
  • Lambda: AudioPostFunctionmain/server/api/sessions/audio/app.py
  • S3 key: sessions/{sessionId}/window_{windowIndex:03d}/audio.wav
  • After storing, triggers ingest for the window if frames are also complete.

Stage 4 — Session End

  • Endpoint: POST /sessions/{sessionId}/end
  • Lambda: SessionEndFunctionmain/server/api/sessions/end/app.py
  • Marks session status: "ended". Triggers ingest for any windows that have data but have not yet been processed (safety net for partial windows).

Stage 5 — Ingest Window (IngestWindowFunction)

  • Lambda: IngestWindowFunctionmain/server/worldmm/pipeline/ingest_window.py::lambda_handler
  • Timeout: 600 s, Memory: 2048 MB
  • VPC: yes (needs DB access)
  • On-failure destination: SQS encache-ingest-dlq (max retries = 0)

Steps inside lambda_handler: 1. Idempotency check — if segment already complete, return early. 2. Load frames from S3 (sessions/{sessionId}/window_{N:03d}/frame_{i:03d}.jpg). 3. Load audio from S3 — tries sessions/{sessionId}/window_{N:03d}/audio.wav first, falls back to sessions/{sessionId}/audio/full.m4a. 4. Transcribe audio via Groq Whisper (whisper-large-v3) — BUG HERE. 5. Create WorldMMSegment row in PostgreSQL with processing_status: "pending". 6. Invoke GPU worker for: - Caption generation (gpu_worker.caption) - NER extraction - Triple extraction - Visual embedding (gpu_worker.encode_video) 7. Mark segment processing_status: "complete".

Stage 6 — DLQ Consumer

  • Lambda: IngestDLQConsumermain/server/worldmm/pipeline/ingest_window.py::sqs_handler
  • Reads failed events from encache-ingest-dlq and retries them.

Bug: .wav Audio Written to Temp File with .m4a Extension

Location

main/server/worldmm/pipeline/ingest_window.py, function _transcribe_audio, line 415:

with tempfile.NamedTemporaryFile(suffix=".m4a", delete=True) as f:

Root Cause

The mobile glasses capture saves audio as 30-second WAV chunks uploaded to S3 at sessions/{sessionId}/window_{N:03d}/audio.wav. When IngestWindowFunction loads this audio from S3 and writes it to a temp file for Groq Whisper transcription, it uses .m4a as the file extension. Groq's Whisper API detects the file format from the file extension. Sending a WAV file disguised as .m4a causes a format mismatch that results in an InvalidRequestError or a silent empty transcript, breaking the transcription step and preventing the window from processing successfully.

Fix

Change the suffix in _transcribe_audio from .m4a to .wav:

with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as f:

Retry Lambda: RetriggerIngestFunction

Purpose

Fetch a failed glasses-mode session from S3/DynamoDB and re-trigger the IngestWindowFunction for each window that failed or was skipped.

Inputs (Lambda event payload)

{
  "sessionId": "<uuid>",
  "userId": "<cognito-sub>",
  "windowIndices": [0, 1, 2]   // optional; if omitted, retrigger all non-complete windows
}

Logic

  1. Fetch session record from DynamoDB (encache-sessions).
  2. Determine which windows need retry: query WorldMMSegment where source_session_id = sessionId and processing_status != "complete", or use provided windowIndices.
  3. For each window, determine frameCount (60 for a completed frame window, actual count for a partial window, 0 for audio-only).
  4. Invoke IngestWindowFunction asynchronously (InvocationType: "Event") with { sessionId, userId, windowIndex, frameCount }.
  5. Return a summary of invocations made.

Location

New file: main/server/worldmm/pipeline/retrigger_ingest.py

New SAM resource in template.yaml: RetriggerIngestFunction


Architecture Diagram

flowchart TD
    Glasses([Meta Glasses]) -->|JPEG frames via BT| App[Mobile App\ncapture-session.ts]
    Glasses -->|WAV audio HFP| App

    App -->|POST /sessions/start| SessionStart[SessionStartFunction]
    App -->|POST /sessions/sessionId/frames| FramePost[FramePostFunction]
    App -->|POST /sessions/sessionId/audio?windowIndex=N| AudioPost[AudioPostFunction]
    App -->|POST /sessions/sessionId/end| SessionEnd[SessionEndFunction]

    SessionStart -->|PutItem| DynDB[(DynamoDB\nencache-sessions)]
    FramePost -->|PutObject frame_NNN.jpg| S3[(S3\nencache-raw-memory)]
    FramePost -->|UpdateItem frameCount++| DynDB
    AudioPost -->|PutObject audio.wav| S3
    AudioPost -->|UpdateItem completedAudioWindows| DynDB

    FramePost -->|Invoke async when both ready| IngestWindow[IngestWindowFunction]
    AudioPost -->|Invoke async when both ready| IngestWindow
    SessionEnd -->|Invoke async for untriggered windows| IngestWindow

    IngestWindow -->|GetObject frames| S3
    IngestWindow -->|GetObject audio.wav| S3
    IngestWindow -->|BUG: write .wav bytes to .m4a tmp file| Whisper([Groq Whisper API])
    IngestWindow -->|caption + NER + triples + visual embed| GPU([GPU Worker\nVLM2Vec])
    IngestWindow -->|INSERT segment + triples + entities| PSQL[(PostgreSQL\nWorldMM)]

    IngestWindow -->|OnFailure| DLQ[(SQS\nencache-ingest-dlq)]
    DLQ -->|Trigger| DLQConsumer[IngestDLQConsumer]
    DLQConsumer -->|retry| IngestWindow

    Retrigger[RetriggerIngestFunction\nNEW] -->|Invoke async per window| IngestWindow
    Retrigger -->|Query failed segments| PSQL
    Retrigger -->|Read session state| DynDB

Files to Modify

File Change
main/server/worldmm/pipeline/ingest_window.py Fix _transcribe_audio: change suffix=".m4a"suffix=".wav"
main/server/worldmm/pipeline/retrigger_ingest.py New file — RetriggerIngest Lambda handler
main/server/template.yaml Add RetriggerIngestFunction SAM resource + IAM policies
main/server/tests/unit/test_ingest_window_transcribe.py New regression test verifying temp file uses .wav suffix
main/server/tests/unit/test_retrigger_ingest.py New unit tests for RetriggerIngest Lambda
docs/bugs/2026-04-26-glasses-video-wav-tempfile-m4a-mismatch.md New bug audit log

Regression Test

New test test_transcribe_audio_uses_wav_suffix in test_ingest_window_transcribe.py:

  • Patch tempfile.NamedTemporaryFile and verify it is called with suffix=".wav".
  • Verify the test fails on the un-patched (buggy) code and passes after the fix.

Plan Stage

Status: Ready for Phase 2 (setup-wizard) and Phase 3 (ralph-fix-and-push).