Memory Enrichment Flow
Metadata
- System type:
flow
System Intent
- What this is: The end-to-end pipeline that transforms raw audio and video frames from a recording session into a structured knowledge graph (segments, entities, RDF triples, visual embeddings) stored in PostgreSQL. It has two entry points: a cloud Lambda path (
IngestWindowFunction) that processes individual 30-second windows in real time as uploads arrive, and a local/script path (ingest_session) used for batch offline processing. Both paths produce the same database artefacts.
Mermaid Diagram
flowchart TD
Trigger["FramePost / AudioPost / EndSession Lambda\n(async invoke)"]
IWF["IngestWindowFunction\n(ingest_window.lambda_handler)"]
SSM["/encache/gpu/instance_id (SSM)"]
S3["S3: sessions/{sessionId}/window_{n}/"]
Whisper["Groq Whisper\n(whisper-large-v3)"]
GPU["VLM2Vec GPU worker\n(caption + NER/triples + encode_video)"]
PG["PostgreSQL\nWorldMMSegment / WorldMMEntity / WorldMMTriple"]
DLQ["IngestDLQ (SQS)\n14-day retention"]
DLQC["IngestDLQConsumer\n(ingest_window.sqs_handler)"]
EC2["EC2 describe_instances\n(GPU tag fallback)"]
RT["RetriggerIngestFunction\n(retrigger_ingest.lambda_handler)"]
Trigger -->|async Lambda invoke| IWF
IWF -->|GetItem sessions/{sessionId}| DDB["DynamoDB encache-sessions\n(session.createdAt)"]
IWF -->|GetObject frame_*.jpg + audio.wav| S3
IWF -->|transcribe| Whisper
IWF -->|GetParameter| SSM
SSM -->|"none or absent → GpuUnavailableError"| DLQ
IWF -->|"GPU present: caption + NER + triples + embed"| GPU
IWF -->|segment + entities + triples + embedding| PG
GPU -->|stale SSM ID: tag scan| EC2
EC2 -->|adopt running instance + update SSM| SSM
IWF -->|"exception re-raised → OnFailure"| DLQ
DLQ -->|SQS trigger BatchSize=1| DLQC
DLQC -->|GPU down: extend visibility 900s| DLQ
DLQC -->|GPU up: delegate| IWF
RT -->|reset pending + async invoke| IWF Flows
Flow: ingestWindow (cloud production path)
- Core files:
main/server/worldmm/pipeline/ingest_window.py→lambda_handler - Test files:
main/server/tests/unit/test_ingest_window_audio_only.py,test_ingest_window_no_gpu.py,main/server/tests/integration/test_ingest_segment_first.py
Types
IngestWindowEvent {
sessionId: string (required)
userId: string (required)
windowIndex: int (required)
frameCount: int (required — 0 for audio-only)
}
IngestWindowSuccess (one of):
{ status: "ok", segmentId: string, enriched: true,
entityCount: int, tripleCount: int, visualStored: bool }
{ status: "ok", segmentId: string, enriched: false } // audio-only
{ status: "already_processed", segment_id: string } // idempotency
{ status: "skipped", reason: string } // no frames found
GpuUnavailableError // raised → routes to IngestDLQ via Lambda OnFailure
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
ingestWindow.happyPath | IngestWindowEvent | IngestWindowSuccess{status=ok,enriched=true} | happy path | GPU present; full caption+NER+triples+embedding |
ingestWindow.audioOnly | IngestWindowEvent{frameCount=0} | IngestWindowSuccess{status=ok,enriched=false} | happy path | No frames; Whisper transcription only |
ingestWindow.alreadyProcessed | IngestWindowEvent | IngestWindowSuccess{status=already_processed} | happy path | Idempotency guard — segment already complete |
ingestWindow.noFrames | IngestWindowEvent{frameCount>0} | IngestWindowSuccess{status=skipped} | error | frameCount > 0 but S3 has no frames |
ingestWindow.gpuUnavailable | IngestWindowEvent | GpuUnavailableError raised | error | SSM sentinel = "none" or absent; OnFailure → IngestDLQ |
ingestWindow.gpuException | IngestWindowEvent | exception re-raised | error | GPU reachable but call fails; OnFailure → IngestDLQ |
Pseudocode
lambda_handler(event):
1. _init_db() — connect PostgreSQL via SSM secrets.
2. Idempotency: segment_exists(userId, sessionId, windowIndex)
if processing_status == "complete" → return already_processed.
3. _load_frames_from_s3(sessionId, windowIndex, frameCount)
If frameCount == 0: list S3 prefix to discover frames (retrigger fallback).
If frameCount > 0 and no frames → return skipped.
4. _load_audio_from_s3() — per-window chunk (.wav) or legacy full-session (.m4a).
_transcribe_audio() via Groq Whisper if audio present.
5. Write WorldMMSegment: processing_status="pending".
If existing pending/failed row: delete its triples + embeddings, reuse its timestamps.
6. Audio-only (frameCount == 0): update_segment(status="complete"); return enriched=false.
7. _read_gpu_instance_id() — live SSM GetParameter (NOT env var; env is stale on warm Lambda).
normalize_gpu_instance_id() maps "none"/empty → None.
If None → raise GpuUnavailableError.
8. _resolve_gpu_url(instance_id):
ec2.describe_instances([instance_id]).
If stopped/stopping/terminated → _find_running_gpu_instance_by_tag()
(EC2 filter: tag:Name=encache-gpu-worker, state=running)
If found: _update_ssm_gpu_instance_id(); use private IP.
If not found: return 0.0.0.0 (triggers GpuUnavailableError downstream).
Private IP preferred — both Lambdas in VPC; public IP routes via NAT, breaks SG rules.
9. GPU enrichment (VLM2VecClient):
caption(uniform_sample(frames, 4), transcript=transcript)
generate(build_ner_prompt(caption)) → entities
generate(build_triple_prompt(caption, entities)) → triples
For each entity: _resolve_or_create_entity() (exact-match DB lookup in this path)
For each triple: create_triple(memory_type="episodic")
10. encode_video(frames) → store_visual_embedding() (best-effort; failure logged, not fatal).
11. update_segment(status="complete", caption=caption); return success payload.
On exception in steps 8-11: re-raise → Lambda OnFailure destination → IngestDLQ.
Flow: ingestDlqConsumer
- Core files:
main/server/worldmm/pipeline/ingest_window.py→sqs_handler
The DLQ retry policy is owned by the ingest-window system. See ingest-window.md — sections Flow: ingestDlqConsumer and Data Model — for the canonical paths, pseudocode, and processing_status transitions. Summary for this flow diagram: GPU-down extends visibility by 900s and re-queues; GPU-up delegates to lambda_handler; non-GPU exceptions return the message to the queue. There is no receive_count cap (removed in PR #409); poison-pill payloads age out via SQS's 14-day retention.
Flow: retriggerIngest
- Core files:
main/server/worldmm/pipeline/retrigger_ingest.py→lambda_handler - Test files:
main/server/tests/unit/test_retrigger_ingest.py
Types
RetriggerEvent {
sessionId: string (required)
userId: string (required)
windowIndices: int[] (optional — if omitted, resolved from DB or DynamoDB)
force: bool (optional — if true, resets complete segments to pending for re-enrichment)
}
RetriggerResponse {
triggered: int[]
skipped: int[]
}
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
retrigger.pending | RetriggerEvent{force=false} | triggered=pending windows | happy path | Queries PostgreSQL for non-complete segments |
retrigger.force | RetriggerEvent{force=true} | triggered=all windows after reset | happy path | Resets complete→pending; use when GPU was down at ingest time |
retrigger.explicit | RetriggerEvent{windowIndices=[...]} | triggered=specified windows | happy path | Bypasses DB lookup |
retrigger.badInput | missing sessionId or userId | status=400 | error |
Pseudocode
lambda_handler(event):
Resolve window_indices:
if windowIndices provided → use directly
elif force=true → _resolve_all_windows() (all windows regardless of status)
else → _resolve_pending_windows() (processing_status != "complete")
fallback: _enumerate_session_windows() from DynamoDB on DB failure
if force and windows_to_retry:
_reset_segments_to_pending() — sets processing_status="pending" so idempotency
check in IngestWindowFunction does not short-circuit on "complete" rows.
For each window:
frame_count = _resolve_frame_count(session_item, window_index)
async Lambda invoke → IngestWindowFunction
return {triggered: [...], skipped: [...]}
Flow: ingestSession (local/script path)
- Core files:
main/server/worldmm/pipeline/ingest_session.py→ingest_session()
This path processes a full session directory offline. It is not triggered by Lambdas in production; it is used by scripts and local tooling. The enrichment stages are identical to ingestWindow but batched serially and followed by multiscale merging and semantic triple extraction.
Stages: 1. Load metadata.json (session_id, window_count, video_start_ms, audio_start_ms). user_id is passed as a direct parameter to ingest_session(), not read from this file. 2. Transcribe full audio.m4a via Whisper; align segments to 30-second windows. 3. For each window: caption (vision LLM) → segment write → NER → entity resolution → episodic triples → visual embedding (optional GPU). 4. Multiscale merging: 6 windows → 3min summary; 3 summaries → 10min; 6 ten-min → 1h. Each merged summary stored as a synthetic WorldMMSegment with source_session_id. 5. Semantic triple extraction from 10min+ summaries → WorldMMTriple{memory_type="semantic"}.
Logs
| Source | Location |
|---|---|
IngestWindowFunction | CloudWatch: /aws/lambda/IngestWindowFunction |
IngestDLQConsumer | CloudWatch: /aws/lambda/IngestDLQConsumer |
RetriggerIngestFunction | CloudWatch: /aws/lambda/RetriggerIngestFunction |
ingest_session (local) | stdout via Python logging module |
Structured log steps from create_logger({"flow": "ingest_window"}):
| Step key | Where | Key fields |
|---|---|---|
invoked | lambda_handler entry | session_id, window_index, frame_count |
already_processed | idempotency guard | segment_id |
frames_loaded | after S3 fetch | frame_count |
no_frames | skip path | session_id, window_index |
audio_loaded | after S3 fetch | has_audio |
transcription_complete | after Whisper | transcript_length |
segment_created | after DB insert | segment_id |
segment_retrying | DLQ re-delivery of pending row | segment_id |
gpu_unavailable_raised | before GpuUnavailableError | segment_id, gpu_sentinel |
caption_generated | after VLM call | caption_preview (first 80 chars) |
ner_complete | after NER | entity_count |
triples_extracted | after triple parse | triple_count |
embedding_stored | after visual embed | segment_id |
embedding_failed | best-effort embed error | error |
completed | final success | segment_id, entity_count, triple_count, visual_stored |
dlq_received | sqs_handler per record | message_id, receive_count, segment_id |
dlq_visibility_extended | GPU-down branch | message_id, extended_seconds, reason |
gpu_instance_state_issue | stale SSM ID detected | instance_id, state |
gpu_instance_adopted_by_tag | tag-scan found running replacement | old_instance_id, new_instance_id |
gpu_instance_recovery_failed | stale ID, no replacement found | instance_id, state |
ssm_gpu_instance_id_updated | SSM write after adoption | instance_id |
session_created_at_fallback | DynamoDB lookup failed; using datetime.now() | session_id |
Data Model
WorldMMSegment.processing_status transitions:
| From | To | Trigger |
|---|---|---|
| (new row) | pending | lambda_handler step 5 |
pending | complete | Successful enrichment or audio-only path |
complete | pending | _reset_segments_to_pending() called by RetriggerIngestFunction with force=true |
No reaper for orphaned pending rows — they are cleaned manually if DLQ retention (14 days) expires with no resolution.
Recent Changes
The enrichment flow has been updated actively in 2026:
| Commit | Date | Change |
|---|---|---|
2aebbb3f | 2026-05-09 | _resolve_gpu_url() and _gpu_available() now scan EC2 by tag Name=encache-gpu-worker when SSM holds a stale/stopped/terminated instance ID. Found instance is adopted and SSM is updated. EC2 client in tag-scan path has 5s connect / 10s read timeout. |
f1c2f099 | 2026-05-07 | Added DynamoDBPolicy to IngestWindowFunction (missing permission caused all segment timestamps to fall back to datetime.now()). Added force flag to RetriggerIngestFunction + _reset_segments_to_pending(). Added S3 listing fallback in _load_frames_from_s3 when frameCount=0. |
b2fbb303 | earlier | Added receive_count > 5 retry cap to sqs_handler; added _read_gpu_instance_id() and _gpu_available() helpers; added INGEST_DLQ_URL env var to IngestDLQConsumer. |
eb782c57 | 2026-04-05 | Added IngestDLQ (SQS, 14-day retention) and IngestDLQConsumer Lambda; wired OnFailure: SQS destination on IngestWindowFunction. |
189666b3 (PR #409) | earlier | Primary fix: auto-bootstrap GPU when SSM holds 'none' sentinel; extract _persist_instance_id_to_ssm and call it from both adoption and launch paths so the sentinel heals on first successful run. Also bundled in the same squash-merge: scoped mute suppression in discord_alerts to DLQ alarm only; dropped the receive_count > 5 cap from sqs_handler (receive_count inflates during GPU outages, causing the first GPU-up retry to trip the cap; poison pills now age out via 14-day SQS retention); added Discord alerting self-monitoring alarms in Terraform; pinned black to 24.1.1 in CI. |
Deployment
- Mechanism:
SAM(main/server/template.yaml) - Lambdas:
IngestWindowFunction,IngestDLQConsumer,RetriggerIngestFunction IngestWindowFunctionhasOnFailure: Type: SQS, Destination: IngestDLQ.Arn;MaximumRetryAttempts: 0(no Lambda-managed retry — DLQ consumer owns retry policy).IngestDLQConsumeris SQS-triggered withBatchSize: 1,FunctionResponseTypes: [ReportBatchItemFailures].- Deploy command:
- Notes: Deploy
IngestDLQConsumerbefore changingIngestWindowFunctionraise behavior. If consumer is not present, a GPU-down GpuUnavailableError loops fast against old visibility timeout.