Ingest Window
Metadata
- System type:
service
System Intent
- What this is: Cloud Lambda that processes one 30-second recording window. Loads frames and audio from S3, transcribes audio, generates a visual caption via the GPU worker, extracts named entities and RDF triples, stores a visual embedding, and writes a
WorldMMSegmentrow. When the GPU is unavailable, the Lambda raisesGpuUnavailableErrorso the invocation is routed toIngestDLQfor retry. A companion SQS handler (IngestDLQConsumer) drains the DLQ: when the GPU is down it calls_try_start_gpu()(fire-and-forget) to attempt an EC2 start before extending message visibility 900 s and re-queuing the record. Stuck messages age out via SQS's 14-day retention; orphaned-segment cleanup is manual.
Mermaid Diagram
flowchart TD
Trigger["FramePost / AudioUpload / EndSession Lambda\n(async invoke)"]
LH["lambda_handler\ningest_window.py"]
SSM_GPU["/encache/gpu/instance_id\n(SSM, read fresh each invocation)"]
DB["WorldMMSegment\nprocessing_status"]
GPU["VLM2VecClient\nGPU worker"]
DLQ["IngestDLQ\n(SQS)"]
DLQC["sqs_handler\nIngestDLQConsumer"]
EC2["EC2 describe_instances\n(GPU health check)"]
Trigger -->|"async Lambda invoke"| LH
LH -->|"GetParameter"| SSM_GPU
SSM_GPU -->|"instance_id = 'none' or absent"| LH
LH -->|"GpuUnavailableError raised"| DLQ
LH -->|"GPU present: caption + NER + triples + embedding"| GPU
LH -->|"status=pending on create\nstatus=complete on success"| DB
LH -->|"stale ID: tag scan + SSM update"| EC2
DLQ -->|"polled"| DLQC
DLQC -->|"GetParameter + describe_instances"| EC2
EC2 -->|"GPU down: _try_start_gpu() then extend 900s"| DLQ
EC2 -->|"stale ID: tag scan + SSM update"| EC2
EC2 -->|"GPU up: call lambda_handler"| LH
DLQC -->|"receiveCount > 5: status=failed"| DB Flows
Flow: ingestWindow.happyPath
- Core files:
main/server/worldmm/pipeline/ingest_window.py→lambda_handler - Test files:
main/server/tests/unit/test_ingest_window_audio_path.py,test_ingest_window_audio_only.py,test_ingest_window_datetime.py,test_ingest_window_transcribe.py
Types
IngestWindowEvent {
sessionId: string (required)
userId: string (required)
windowIndex: int (required)
frameCount: int (required)
}
IngestWindowSuccess (one of):
// Happy path with GPU enrichment
{ status: "ok", segmentId: string, enriched: true,
entityCount: int, tripleCount: int, visualStored: bool }
// Happy path without GPU enrichment (audio-only window, no frames)
{ status: "ok", segmentId: string, enriched: false }
// Idempotency short-circuit (segment already at processing_status="complete")
// Note: snake_case key for legacy reasons.
{ status: "already_processed", segment_id: string }
// No frames available for a window that expected frames
{ status: "skipped", reason: string }
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
ingestWindow.happyPath | IngestWindowEvent | IngestWindowSuccess{status=ok} | happy path | GPU present and reachable; segment written complete |
ingestWindow.alreadyProcessed | IngestWindowEvent | IngestWindowSuccess{status=already_processed} | happy path | Idempotency guard — segment already complete |
ingestWindow.audioOnly | IngestWindowEvent{frameCount=0} | IngestWindowSuccess{status=ok, enriched=false} | happy path | No frames; transcription only; GPU not contacted |
ingestWindow.noFramesFound | IngestWindowEvent | IngestWindowSuccess{status=skipped} | error | frameCount > 0 but S3 returned no frames |
ingestWindow.gpuUnavailable | IngestWindowEvent | GpuUnavailableError raised | error | SSM sentinel = "none" or absent; Lambda destination routes to IngestDLQ |
ingestWindow.encoderException | IngestWindowEvent | exception re-raised | error | GPU reachable but caption/encode fails; Lambda destination routes to IngestDLQ |
Pseudocode
lambda_handler(event):
1. Idempotency: if segment already complete → return early.
2. Load frames from S3 (sessions/{sessionId}/window_{windowIndex:03d}/frame_*.jpg).
If frameCount > 0 and no frames found → return skipped.
3. Load audio (per-window chunk or legacy full-session fallback).
Transcribe via Groq Whisper (whisper-large-v3) if audio present.
4. Create WorldMMSegment with processing_status="pending".
5. If frameCount == 0 (audio-only):
update_segment(status="complete"); return.
6. Read /encache/gpu/instance_id from SSM fresh (NOT env var — env resolves at deploy and
becomes stale across warm invocations).
normalize_gpu_instance_id() maps "none"/empty to None.
If None: log gpu_unavailable_raised; raise GpuUnavailableError.
7. Resolve GPU private IP via _resolve_gpu_url():
ec2.describe_instances([instance_id]).
If state is stopped/stopping/terminated (stale SSM ID):
scan EC2 by tag Name=encache-gpu-worker, state=running.
If a running instance found: update SSM; use its private IP.
If not found: return dummy URL 0.0.0.0, which triggers GpuUnavailableError.
Private IP is always preferred — both Lambdas are in the VPC; public IP
routes through NAT and breaks SG-to-SG rules.
8. VLM2VecClient: caption (up to 4 uniformly-sampled frames + transcript context).
9. NER extraction → entity resolution → triple storage.
10. Visual embedding via encode_video (best-effort; failure logged, not fatal).
11. update_segment(status="complete", caption=caption); return success payload.
If any exception in steps 7-11: re-raise (DLQ destination handles retry).
Flow: ingestDlqConsumer
- Core files:
main/server/worldmm/pipeline/ingest_window.py→sqs_handler - Test files:
main/server/worldmm/tests/test_ingest_dlq.py
Types
SQSRecord {
messageId: string
receiptHandle: string
body: string (JSON — Lambda OnFailure envelope wrapping original IngestWindowEvent)
attributes.ApproximateReceiveCount: string (int)
}
BatchItemFailure {
itemIdentifier: string (messageId)
}
SQSResponse {
batchItemFailures: BatchItemFailure[]
}
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
dlqConsumer.exhausted | SQSRecord{receiveCount > 5} | segment marked failed; message ack'd | happy path | Bounded retry limit; message removed from queue |
dlqConsumer.gpuDown | SQSRecord | _try_start_gpu() called; visibility extended 900s; message in batchItemFailures | retry | GPU not running; auto-start attempted then message stays hidden 15 min |
dlqConsumer.gpuUp | SQSRecord | delegates to lambda_handler | happy path | Segment reaches complete or exception → back to batchItemFailures |
dlqConsumer.lambdaFails | SQSRecord | message in batchItemFailures | retry | lambda_handler raises; message returns to queue |
Pseudocode
sqs_handler(event):
for each SQS record:
parse envelope: body → requestPayload (Lambda OnFailure wraps original event)
receive_count = ApproximateReceiveCount
if receive_count > 5:
lookup segment by (userId, sessionId, windowIndex)
update_segment(status="failed")
log dlq_exhausted
ACK (do not add to batchItemFailures); continue
if not _gpu_available():
_try_start_gpu() # fire-and-forget; never raises
sqs.change_message_visibility(receiptHandle, VisibilityTimeout=900)
log dlq_visibility_extended
add to batchItemFailures; continue
try:
lambda_handler(payload, context)
except:
add to batchItemFailures
return {"batchItemFailures": [...]}
_gpu_available():
instance_id = normalize(ssm.get_parameter("/encache/gpu/instance_id"))
if not instance_id: return False
state = ec2.describe_instances([instance_id])...State.Name
if state == "running": return True
// stale SSM ID — tag-based fallback
if state in (stopped, stopping, terminated):
running = _find_running_gpu_instance_by_tag() // tag:Name=encache-gpu-worker, state=running
if running:
_update_ssm_gpu_instance_id(running.InstanceId) // /encache/gpu/instance_id
return True
return False
return False
_try_start_gpu():
// Fire-and-forget — entire body wrapped in try/except; never raises.
instance_id = _read_gpu_instance_id() // None when SSM holds "none"
source = "ssm"
if not instance_id:
// SSM sentinel "none" — scan for a stopped instance by tag
desc = ec2.describe_instances(
Filters=[tag:Name=encache-gpu-worker, instance-state-name=stopped])
instance_id = first result; source = "tag_scan"
if not instance_id:
log gpu_autostart_no_instance_found; return
ec2.start_instances([instance_id])
log gpu_autostart_attempted {instance_id, source}
// on any exception: log gpu_autostart_failed {error}; return
Logs
| Source | Location |
|---|---|
lambda_handler | CloudWatch: /aws/lambda/IngestWindowFunction |
sqs_handler (IngestDLQConsumer) | CloudWatch: /aws/lambda/IngestDLQConsumer |
Structured log steps emitted by create_logger({"flow": "ingest_window"}):
| Step key | Where | Key fields |
|---|---|---|
invoked | lambda_handler entry | session_id, window_index, frame_count |
already_processed | idempotency guard | segment_id |
frames_loaded | after S3 fetch | frame_count |
no_frames | skip path | session_id, window_index |
audio_loaded | after S3 fetch | has_audio |
transcription_complete | after Whisper | transcript_length |
segment_created | after DB insert | segment_id |
segment_retrying | DLQ re-delivery of pending row | segment_id |
gpu_unavailable_raised | before GpuUnavailableError raise | segment_id, gpu_sentinel |
caption_generated | after VLM call | caption_preview (first 80 chars) |
ner_complete | after NER | entity_count |
triples_extracted | after triple parse | triple_count |
embedding_stored | after visual embed | segment_id |
embedding_failed | best-effort embed error | error |
completed | final success | segment_id, entity_count, triple_count, visual_stored |
dlq_received | sqs_handler per record | message_id, receive_count, segment_id |
dlq_visibility_extended | GPU-down branch | message_id, extended_seconds, receive_count, reason |
dlq_exhausted | receive_count > 5 | segment_id, receive_count |
gpu_autostart_attempted | _try_start_gpu — instance found and start call succeeded | instance_id, source (ssm or tag_scan) |
gpu_autostart_no_instance_found | _try_start_gpu — SSM is none and tag scan returned empty | — |
gpu_autostart_failed | _try_start_gpu — any exception during start attempt | error |
gpu_url_resolve_failed | EC2 IP lookup error | instance_id |
gpu_availability_check_failed | _gpu_available error | error |
gpu_instance_state_issue | _resolve_gpu_url / _gpu_available — stale ID detected | instance_id, state |
gpu_tag_scan_failed | tag-scan EC2 call error | region |
gpu_instance_adopted_by_tag | tag scan found a running replacement | old_instance_id, new_instance_id |
ssm_gpu_instance_id_updated | SSM write after tag-scan adoption | instance_id |
ssm_gpu_instance_id_update_failed | SSM write error | instance_id |
gpu_instance_recovery_failed | stale ID, no running replacement found | instance_id, state |
Data Model
WorldMMSegment.processing_status transitions:
| From | To | Trigger |
|---|---|---|
| (new row) | pending | lambda_handler step 4 |
pending | complete | Successful enrichment (step 11) or audio-only path (step 5) |
pending | failed | sqs_handler detects receiveCount > 5 |
A pending row that is never resolved means either DLQ retention expired (14 days, message dropped) or drain is in progress. No reaper exists today — orphaned rows are cleaned manually if needed.
GPU Instance Discovery and Watchdog/SSM Gap
The GPU worker's idle watchdog shuts the instance down after 480 s of inactivity but does not update the SSM parameter /encache/gpu/instance_id. This creates a stale-ID cycle:
- Watchdog terminates the instance; SSM still holds its ID.
- Ingest Lambda reads the stale ID, resolves a dead instance, receives connection errors.
- Invocation count stays at zero; watchdog kills any replacement immediately.
The tag-based fallback in both _resolve_gpu_url() and _gpu_available() breaks the cycle: when EC2 reports the stored instance as stopped/stopping/terminated, the code scans for any running EC2 instance tagged Name=encache-gpu-worker. If one is found it is adopted and its ID is written back to SSM. All future Lambda invocations see the correct ID without a scan.
Both Lambdas must use private IP to address the GPU worker. Public IP routes through the VPC NAT gateway, which strips the source SG identity and causes the GPU worker's security group inbound rule to reject the connection.
SSM Parameters
| Parameter | Purpose | Writer | Reader |
|---|---|---|---|
/encache/gpu/instance_id | Current GPU EC2 instance ID or sentinel "none" | GPU bootstrap (PR #396); tag-scan fallback in _resolve_gpu_url / _gpu_available | lambda_handler, _gpu_available |
Deployment
- Mechanism: SAM (
main/server/template.yaml) - Lambda:
IngestWindowFunction— invoked async by FramePost, AudioUpload, EndSession Lambdas - Lambda:
IngestDLQConsumer— SQS event source mapping onIngestDLQ - Lambda destination:
IngestWindowFunctionhasOnFailure → IngestDLQ,MaximumRetryAttempts: 0 - Deploy command:
- Notes: Deploy
IngestDLQConsumer(step 4 of rollout) beforeIngestWindowFunction's raise path (step 5). If the consumer is not deployed first, a GPU-down raise will loop fast against the old sqs_handler and burn invocations until visibility timeout expires.IngestDLQConsumerrequiresEC2Policyattached (ec2:StartInstances,ec2:DescribeInstances) in addition to the existingS3AccessPolicyandDatabaseSsmPolicy; this is defined inmain/server/template.yaml.