Skip to content

Ingest Window

Metadata

  • System type: service

System Intent

  • What this is: Cloud Lambda that processes one 30-second recording window. Loads frames and audio from S3, transcribes audio, generates a visual caption via the GPU worker, extracts named entities and RDF triples, stores a visual embedding, and writes a WorldMMSegment row. When the GPU is unavailable, the Lambda raises GpuUnavailableError so the invocation is routed to IngestDLQ for retry. A companion SQS handler (IngestDLQConsumer) drains the DLQ: when the GPU is down it calls _try_start_gpu() (fire-and-forget) to attempt an EC2 start before extending message visibility 900 s and re-queuing the record. Stuck messages age out via SQS's 14-day retention; orphaned-segment cleanup is manual.

Mermaid Diagram

flowchart TD
  Trigger["FramePost / AudioUpload / EndSession Lambda\n(async invoke)"]
  LH["lambda_handler\ningest_window.py"]
  SSM_GPU["/encache/gpu/instance_id\n(SSM, read fresh each invocation)"]
  DB["WorldMMSegment\nprocessing_status"]
  GPU["VLM2VecClient\nGPU worker"]
  DLQ["IngestDLQ\n(SQS)"]
  DLQC["sqs_handler\nIngestDLQConsumer"]
  EC2["EC2 describe_instances\n(GPU health check)"]

  Trigger -->|"async Lambda invoke"| LH
  LH -->|"GetParameter"| SSM_GPU
  SSM_GPU -->|"instance_id = 'none' or absent"| LH
  LH -->|"GpuUnavailableError raised"| DLQ
  LH -->|"GPU present: caption + NER + triples + embedding"| GPU
  LH -->|"status=pending on create\nstatus=complete on success"| DB
  LH -->|"stale ID: tag scan + SSM update"| EC2
  DLQ -->|"polled"| DLQC
  DLQC -->|"GetParameter + describe_instances"| EC2
  EC2 -->|"GPU down: _try_start_gpu() then extend 900s"| DLQ
  EC2 -->|"stale ID: tag scan + SSM update"| EC2
  EC2 -->|"GPU up: call lambda_handler"| LH
  DLQC -->|"receiveCount > 5: status=failed"| DB

Flows

Flow: ingestWindow.happyPath

  • Core files: main/server/worldmm/pipeline/ingest_window.pylambda_handler
  • Test files: main/server/tests/unit/test_ingest_window_audio_path.py, test_ingest_window_audio_only.py, test_ingest_window_datetime.py, test_ingest_window_transcribe.py

Types

IngestWindowEvent {
  sessionId: string (required)
  userId: string (required)
  windowIndex: int (required)
  frameCount: int (required)
}

IngestWindowSuccess (one of):
  // Happy path with GPU enrichment
  { status: "ok", segmentId: string, enriched: true,
    entityCount: int, tripleCount: int, visualStored: bool }

  // Happy path without GPU enrichment (audio-only window, no frames)
  { status: "ok", segmentId: string, enriched: false }

  // Idempotency short-circuit (segment already at processing_status="complete")
  // Note: snake_case key for legacy reasons.
  { status: "already_processed", segment_id: string }

  // No frames available for a window that expected frames
  { status: "skipped", reason: string }

Paths

path input output path-type notes
ingestWindow.happyPath IngestWindowEvent IngestWindowSuccess{status=ok} happy path GPU present and reachable; segment written complete
ingestWindow.alreadyProcessed IngestWindowEvent IngestWindowSuccess{status=already_processed} happy path Idempotency guard — segment already complete
ingestWindow.audioOnly IngestWindowEvent{frameCount=0} IngestWindowSuccess{status=ok, enriched=false} happy path No frames; transcription only; GPU not contacted
ingestWindow.noFramesFound IngestWindowEvent IngestWindowSuccess{status=skipped} error frameCount > 0 but S3 returned no frames
ingestWindow.gpuUnavailable IngestWindowEvent GpuUnavailableError raised error SSM sentinel = "none" or absent; Lambda destination routes to IngestDLQ
ingestWindow.encoderException IngestWindowEvent exception re-raised error GPU reachable but caption/encode fails; Lambda destination routes to IngestDLQ

Pseudocode

lambda_handler(event):
  1. Idempotency: if segment already complete → return early.
  2. Load frames from S3 (sessions/{sessionId}/window_{windowIndex:03d}/frame_*.jpg).
     If frameCount > 0 and no frames found → return skipped.
  3. Load audio (per-window chunk or legacy full-session fallback).
     Transcribe via Groq Whisper (whisper-large-v3) if audio present.
  4. Create WorldMMSegment with processing_status="pending".
  5. If frameCount == 0 (audio-only):
       update_segment(status="complete"); return.
  6. Read /encache/gpu/instance_id from SSM fresh (NOT env var — env resolves at deploy and
     becomes stale across warm invocations).
     normalize_gpu_instance_id() maps "none"/empty to None.
     If None: log gpu_unavailable_raised; raise GpuUnavailableError.
  7. Resolve GPU private IP via _resolve_gpu_url():
     ec2.describe_instances([instance_id]).
     If state is stopped/stopping/terminated (stale SSM ID):
       scan EC2 by tag Name=encache-gpu-worker, state=running.
       If a running instance found: update SSM; use its private IP.
       If not found: return dummy URL 0.0.0.0, which triggers GpuUnavailableError.
     Private IP is always preferred — both Lambdas are in the VPC; public IP
     routes through NAT and breaks SG-to-SG rules.
  8. VLM2VecClient: caption (up to 4 uniformly-sampled frames + transcript context).
  9. NER extraction → entity resolution → triple storage.
  10. Visual embedding via encode_video (best-effort; failure logged, not fatal).
  11. update_segment(status="complete", caption=caption); return success payload.
  If any exception in steps 7-11: re-raise (DLQ destination handles retry).

Flow: ingestDlqConsumer

  • Core files: main/server/worldmm/pipeline/ingest_window.pysqs_handler
  • Test files: main/server/worldmm/tests/test_ingest_dlq.py

Types

SQSRecord {
  messageId: string
  receiptHandle: string
  body: string (JSON — Lambda OnFailure envelope wrapping original IngestWindowEvent)
  attributes.ApproximateReceiveCount: string (int)
}

BatchItemFailure {
  itemIdentifier: string (messageId)
}

SQSResponse {
  batchItemFailures: BatchItemFailure[]
}

Paths

path input output path-type notes
dlqConsumer.exhausted SQSRecord{receiveCount > 5} segment marked failed; message ack'd happy path Bounded retry limit; message removed from queue
dlqConsumer.gpuDown SQSRecord _try_start_gpu() called; visibility extended 900s; message in batchItemFailures retry GPU not running; auto-start attempted then message stays hidden 15 min
dlqConsumer.gpuUp SQSRecord delegates to lambda_handler happy path Segment reaches complete or exception → back to batchItemFailures
dlqConsumer.lambdaFails SQSRecord message in batchItemFailures retry lambda_handler raises; message returns to queue

Pseudocode

sqs_handler(event):
  for each SQS record:
    parse envelope: body → requestPayload (Lambda OnFailure wraps original event)
    receive_count = ApproximateReceiveCount

    if receive_count > 5:
      lookup segment by (userId, sessionId, windowIndex)
      update_segment(status="failed")
      log dlq_exhausted
      ACK (do not add to batchItemFailures); continue

    if not _gpu_available():
      _try_start_gpu()           # fire-and-forget; never raises
      sqs.change_message_visibility(receiptHandle, VisibilityTimeout=900)
      log dlq_visibility_extended
      add to batchItemFailures; continue

    try:
      lambda_handler(payload, context)
    except:
      add to batchItemFailures

  return {"batchItemFailures": [...]}

_gpu_available():
  instance_id = normalize(ssm.get_parameter("/encache/gpu/instance_id"))
  if not instance_id: return False
  state = ec2.describe_instances([instance_id])...State.Name
  if state == "running": return True
  // stale SSM ID — tag-based fallback
  if state in (stopped, stopping, terminated):
    running = _find_running_gpu_instance_by_tag()   // tag:Name=encache-gpu-worker, state=running
    if running:
      _update_ssm_gpu_instance_id(running.InstanceId)   // /encache/gpu/instance_id
      return True
    return False
  return False

_try_start_gpu():
  // Fire-and-forget — entire body wrapped in try/except; never raises.
  instance_id = _read_gpu_instance_id()   // None when SSM holds "none"
  source = "ssm"
  if not instance_id:
    // SSM sentinel "none" — scan for a stopped instance by tag
    desc = ec2.describe_instances(
        Filters=[tag:Name=encache-gpu-worker, instance-state-name=stopped])
    instance_id = first result; source = "tag_scan"
  if not instance_id:
    log gpu_autostart_no_instance_found; return
  ec2.start_instances([instance_id])
  log gpu_autostart_attempted {instance_id, source}
  // on any exception: log gpu_autostart_failed {error}; return

Logs

Source Location
lambda_handler CloudWatch: /aws/lambda/IngestWindowFunction
sqs_handler (IngestDLQConsumer) CloudWatch: /aws/lambda/IngestDLQConsumer

Structured log steps emitted by create_logger({"flow": "ingest_window"}):

Step key Where Key fields
invoked lambda_handler entry session_id, window_index, frame_count
already_processed idempotency guard segment_id
frames_loaded after S3 fetch frame_count
no_frames skip path session_id, window_index
audio_loaded after S3 fetch has_audio
transcription_complete after Whisper transcript_length
segment_created after DB insert segment_id
segment_retrying DLQ re-delivery of pending row segment_id
gpu_unavailable_raised before GpuUnavailableError raise segment_id, gpu_sentinel
caption_generated after VLM call caption_preview (first 80 chars)
ner_complete after NER entity_count
triples_extracted after triple parse triple_count
embedding_stored after visual embed segment_id
embedding_failed best-effort embed error error
completed final success segment_id, entity_count, triple_count, visual_stored
dlq_received sqs_handler per record message_id, receive_count, segment_id
dlq_visibility_extended GPU-down branch message_id, extended_seconds, receive_count, reason
dlq_exhausted receive_count > 5 segment_id, receive_count
gpu_autostart_attempted _try_start_gpu — instance found and start call succeeded instance_id, source (ssm or tag_scan)
gpu_autostart_no_instance_found _try_start_gpu — SSM is none and tag scan returned empty
gpu_autostart_failed _try_start_gpu — any exception during start attempt error
gpu_url_resolve_failed EC2 IP lookup error instance_id
gpu_availability_check_failed _gpu_available error error
gpu_instance_state_issue _resolve_gpu_url / _gpu_available — stale ID detected instance_id, state
gpu_tag_scan_failed tag-scan EC2 call error region
gpu_instance_adopted_by_tag tag scan found a running replacement old_instance_id, new_instance_id
ssm_gpu_instance_id_updated SSM write after tag-scan adoption instance_id
ssm_gpu_instance_id_update_failed SSM write error instance_id
gpu_instance_recovery_failed stale ID, no running replacement found instance_id, state

Data Model

WorldMMSegment.processing_status transitions:

From To Trigger
(new row) pending lambda_handler step 4
pending complete Successful enrichment (step 11) or audio-only path (step 5)
pending failed sqs_handler detects receiveCount > 5

A pending row that is never resolved means either DLQ retention expired (14 days, message dropped) or drain is in progress. No reaper exists today — orphaned rows are cleaned manually if needed.

GPU Instance Discovery and Watchdog/SSM Gap

The GPU worker's idle watchdog shuts the instance down after 480 s of inactivity but does not update the SSM parameter /encache/gpu/instance_id. This creates a stale-ID cycle:

  1. Watchdog terminates the instance; SSM still holds its ID.
  2. Ingest Lambda reads the stale ID, resolves a dead instance, receives connection errors.
  3. Invocation count stays at zero; watchdog kills any replacement immediately.

The tag-based fallback in both _resolve_gpu_url() and _gpu_available() breaks the cycle: when EC2 reports the stored instance as stopped/stopping/terminated, the code scans for any running EC2 instance tagged Name=encache-gpu-worker. If one is found it is adopted and its ID is written back to SSM. All future Lambda invocations see the correct ID without a scan.

Both Lambdas must use private IP to address the GPU worker. Public IP routes through the VPC NAT gateway, which strips the source SG identity and causes the GPU worker's security group inbound rule to reject the connection.

SSM Parameters

Parameter Purpose Writer Reader
/encache/gpu/instance_id Current GPU EC2 instance ID or sentinel "none" GPU bootstrap (PR #396); tag-scan fallback in _resolve_gpu_url / _gpu_available lambda_handler, _gpu_available

Deployment

  • Mechanism: SAM (main/server/template.yaml)
  • Lambda: IngestWindowFunction — invoked async by FramePost, AudioUpload, EndSession Lambdas
  • Lambda: IngestDLQConsumer — SQS event source mapping on IngestDLQ
  • Lambda destination: IngestWindowFunction has OnFailure → IngestDLQ, MaximumRetryAttempts: 0
  • Deploy command:
    cd main/server && sam build && sam deploy
    
  • Notes: Deploy IngestDLQConsumer (step 4 of rollout) before IngestWindowFunction's raise path (step 5). If the consumer is not deployed first, a GPU-down raise will loop fast against the old sqs_handler and burn invocations until visibility timeout expires. IngestDLQConsumer requires EC2Policy attached (ec2:StartInstances, ec2:DescribeInstances) in addition to the existing S3AccessPolicy and DatabaseSsmPolicy; this is defined in main/server/template.yaml.