Skip to content

Memory Enrichment Flow

Metadata

  • System type: flow

System Intent

  • What this is: The end-to-end pipeline that transforms raw audio and video frames from a recording session into a structured knowledge graph (segments, entities, RDF triples, visual embeddings) stored in PostgreSQL. It has two entry points: a cloud Lambda path (IngestWindowFunction) that processes individual 30-second windows in real time as uploads arrive, and a local/script path (ingest_session) used for batch offline processing. Both paths produce the same database artefacts.

Mermaid Diagram

flowchart TD
  Trigger["FramePost / AudioPost / EndSession Lambda\n(async invoke)"]
  IWF["IngestWindowFunction\n(ingest_window.lambda_handler)"]
  SSM["/encache/gpu/instance_id (SSM)"]
  S3["S3: sessions/{sessionId}/window_{n}/"]
  Whisper["Groq Whisper\n(whisper-large-v3)"]
  GPU["VLM2Vec GPU worker\n(caption + NER/triples + encode_video)"]
  PG["PostgreSQL\nWorldMMSegment / WorldMMEntity / WorldMMTriple"]
  DLQ["IngestDLQ (SQS)\n14-day retention"]
  DLQC["IngestDLQConsumer\n(ingest_window.sqs_handler)"]
  EC2["EC2 describe_instances\n(GPU tag fallback)"]
  RT["RetriggerIngestFunction\n(retrigger_ingest.lambda_handler)"]

  Trigger -->|async Lambda invoke| IWF
  IWF -->|GetItem sessions/{sessionId}| DDB["DynamoDB encache-sessions\n(session.createdAt)"]
  IWF -->|GetObject frame_*.jpg + audio.wav| S3
  IWF -->|transcribe| Whisper
  IWF -->|GetParameter| SSM
  SSM -->|"none or absent → GpuUnavailableError"| DLQ
  IWF -->|"GPU present: caption + NER + triples + embed"| GPU
  IWF -->|segment + entities + triples + embedding| PG
  GPU -->|stale SSM ID: tag scan| EC2
  EC2 -->|adopt running instance + update SSM| SSM
  IWF -->|"exception re-raised → OnFailure"| DLQ
  DLQ -->|SQS trigger BatchSize=1| DLQC
  DLQC -->|GPU down: extend visibility 900s| DLQ
  DLQC -->|GPU up: delegate| IWF
  RT -->|reset pending + async invoke| IWF

Flows

Flow: ingestWindow (cloud production path)

  • Core files: main/server/worldmm/pipeline/ingest_window.pylambda_handler
  • Test files: main/server/tests/unit/test_ingest_window_audio_only.py, test_ingest_window_no_gpu.py, main/server/tests/integration/test_ingest_segment_first.py

Types

IngestWindowEvent {
  sessionId: string (required)
  userId: string (required)
  windowIndex: int (required)
  frameCount: int (required — 0 for audio-only)
}

IngestWindowSuccess (one of):
  { status: "ok", segmentId: string, enriched: true,
    entityCount: int, tripleCount: int, visualStored: bool }
  { status: "ok", segmentId: string, enriched: false }      // audio-only
  { status: "already_processed", segment_id: string }       // idempotency
  { status: "skipped", reason: string }                     // no frames found

GpuUnavailableError   // raised → routes to IngestDLQ via Lambda OnFailure

Paths

path input output path-type notes
ingestWindow.happyPath IngestWindowEvent IngestWindowSuccess{status=ok,enriched=true} happy path GPU present; full caption+NER+triples+embedding
ingestWindow.audioOnly IngestWindowEvent{frameCount=0} IngestWindowSuccess{status=ok,enriched=false} happy path No frames; Whisper transcription only
ingestWindow.alreadyProcessed IngestWindowEvent IngestWindowSuccess{status=already_processed} happy path Idempotency guard — segment already complete
ingestWindow.noFrames IngestWindowEvent{frameCount>0} IngestWindowSuccess{status=skipped} error frameCount > 0 but S3 has no frames
ingestWindow.gpuUnavailable IngestWindowEvent GpuUnavailableError raised error SSM sentinel = "none" or absent; OnFailure → IngestDLQ
ingestWindow.gpuException IngestWindowEvent exception re-raised error GPU reachable but call fails; OnFailure → IngestDLQ

Pseudocode

lambda_handler(event):
  1. _init_db() — connect PostgreSQL via SSM secrets.
  2. Idempotency: segment_exists(userId, sessionId, windowIndex)
       if processing_status == "complete" → return already_processed.
  3. _load_frames_from_s3(sessionId, windowIndex, frameCount)
       If frameCount == 0: list S3 prefix to discover frames (retrigger fallback).
       If frameCount > 0 and no frames → return skipped.
  4. _load_audio_from_s3() — per-window chunk (.wav) or legacy full-session (.m4a).
     _transcribe_audio() via Groq Whisper if audio present.
  5. Write WorldMMSegment: processing_status="pending".
     If existing pending/failed row: delete its triples + embeddings, reuse its timestamps.
  6. Audio-only (frameCount == 0): update_segment(status="complete"); return enriched=false.
  7. _read_gpu_instance_id() — live SSM GetParameter (NOT env var; env is stale on warm Lambda).
     normalize_gpu_instance_id() maps "none"/empty → None.
     If None → raise GpuUnavailableError.
  8. _resolve_gpu_url(instance_id):
       ec2.describe_instances([instance_id]).
       If stopped/stopping/terminated → _find_running_gpu_instance_by_tag()
         (EC2 filter: tag:Name=encache-gpu-worker, state=running)
         If found: _update_ssm_gpu_instance_id(); use private IP.
         If not found: return 0.0.0.0 (triggers GpuUnavailableError downstream).
       Private IP preferred — both Lambdas in VPC; public IP routes via NAT, breaks SG rules.
  9. GPU enrichment (VLM2VecClient):
       caption(uniform_sample(frames, 4), transcript=transcript)
       generate(build_ner_prompt(caption)) → entities
       generate(build_triple_prompt(caption, entities)) → triples
       For each entity: _resolve_or_create_entity() (exact-match DB lookup in this path)
       For each triple: create_triple(memory_type="episodic")
  10. encode_video(frames) → store_visual_embedding() (best-effort; failure logged, not fatal).
  11. update_segment(status="complete", caption=caption); return success payload.
  On exception in steps 8-11: re-raise → Lambda OnFailure destination → IngestDLQ.

Flow: ingestDlqConsumer

  • Core files: main/server/worldmm/pipeline/ingest_window.pysqs_handler

The DLQ retry policy is owned by the ingest-window system. See ingest-window.md — sections Flow: ingestDlqConsumer and Data Model — for the canonical paths, pseudocode, and processing_status transitions. Summary for this flow diagram: GPU-down extends visibility by 900s and re-queues; GPU-up delegates to lambda_handler; non-GPU exceptions return the message to the queue. There is no receive_count cap (removed in PR #409); poison-pill payloads age out via SQS's 14-day retention.


Flow: retriggerIngest

  • Core files: main/server/worldmm/pipeline/retrigger_ingest.pylambda_handler
  • Test files: main/server/tests/unit/test_retrigger_ingest.py

Types

RetriggerEvent {
  sessionId: string (required)
  userId: string (required)
  windowIndices: int[] (optional — if omitted, resolved from DB or DynamoDB)
  force: bool (optional — if true, resets complete segments to pending for re-enrichment)
}

RetriggerResponse {
  triggered: int[]
  skipped: int[]
}

Paths

path input output path-type notes
retrigger.pending RetriggerEvent{force=false} triggered=pending windows happy path Queries PostgreSQL for non-complete segments
retrigger.force RetriggerEvent{force=true} triggered=all windows after reset happy path Resets complete→pending; use when GPU was down at ingest time
retrigger.explicit RetriggerEvent{windowIndices=[...]} triggered=specified windows happy path Bypasses DB lookup
retrigger.badInput missing sessionId or userId status=400 error

Pseudocode

lambda_handler(event):
  Resolve window_indices:
    if windowIndices provided → use directly
    elif force=true → _resolve_all_windows() (all windows regardless of status)
    else → _resolve_pending_windows() (processing_status != "complete")
      fallback: _enumerate_session_windows() from DynamoDB on DB failure

  if force and windows_to_retry:
    _reset_segments_to_pending() — sets processing_status="pending" so idempotency
    check in IngestWindowFunction does not short-circuit on "complete" rows.

  For each window:
    frame_count = _resolve_frame_count(session_item, window_index)
    async Lambda invoke → IngestWindowFunction

  return {triggered: [...], skipped: [...]}

Flow: ingestSession (local/script path)

  • Core files: main/server/worldmm/pipeline/ingest_session.pyingest_session()

This path processes a full session directory offline. It is not triggered by Lambdas in production; it is used by scripts and local tooling. The enrichment stages are identical to ingestWindow but batched serially and followed by multiscale merging and semantic triple extraction.

Stages: 1. Load metadata.json (session_id, window_count, video_start_ms, audio_start_ms). user_id is passed as a direct parameter to ingest_session(), not read from this file. 2. Transcribe full audio.m4a via Whisper; align segments to 30-second windows. 3. For each window: caption (vision LLM) → segment write → NER → entity resolution → episodic triples → visual embedding (optional GPU). 4. Multiscale merging: 6 windows → 3min summary; 3 summaries → 10min; 6 ten-min → 1h. Each merged summary stored as a synthetic WorldMMSegment with source_session_id. 5. Semantic triple extraction from 10min+ summaries → WorldMMTriple{memory_type="semantic"}.


Logs

Source Location
IngestWindowFunction CloudWatch: /aws/lambda/IngestWindowFunction
IngestDLQConsumer CloudWatch: /aws/lambda/IngestDLQConsumer
RetriggerIngestFunction CloudWatch: /aws/lambda/RetriggerIngestFunction
ingest_session (local) stdout via Python logging module

Structured log steps from create_logger({"flow": "ingest_window"}):

Step key Where Key fields
invoked lambda_handler entry session_id, window_index, frame_count
already_processed idempotency guard segment_id
frames_loaded after S3 fetch frame_count
no_frames skip path session_id, window_index
audio_loaded after S3 fetch has_audio
transcription_complete after Whisper transcript_length
segment_created after DB insert segment_id
segment_retrying DLQ re-delivery of pending row segment_id
gpu_unavailable_raised before GpuUnavailableError segment_id, gpu_sentinel
caption_generated after VLM call caption_preview (first 80 chars)
ner_complete after NER entity_count
triples_extracted after triple parse triple_count
embedding_stored after visual embed segment_id
embedding_failed best-effort embed error error
completed final success segment_id, entity_count, triple_count, visual_stored
dlq_received sqs_handler per record message_id, receive_count, segment_id
dlq_visibility_extended GPU-down branch message_id, extended_seconds, reason
gpu_instance_state_issue stale SSM ID detected instance_id, state
gpu_instance_adopted_by_tag tag-scan found running replacement old_instance_id, new_instance_id
gpu_instance_recovery_failed stale ID, no replacement found instance_id, state
ssm_gpu_instance_id_updated SSM write after adoption instance_id
session_created_at_fallback DynamoDB lookup failed; using datetime.now() session_id

Data Model

WorldMMSegment.processing_status transitions:

From To Trigger
(new row) pending lambda_handler step 5
pending complete Successful enrichment or audio-only path
complete pending _reset_segments_to_pending() called by RetriggerIngestFunction with force=true

No reaper for orphaned pending rows — they are cleaned manually if DLQ retention (14 days) expires with no resolution.

Recent Changes

The enrichment flow has been updated actively in 2026:

Commit Date Change
2aebbb3f 2026-05-09 _resolve_gpu_url() and _gpu_available() now scan EC2 by tag Name=encache-gpu-worker when SSM holds a stale/stopped/terminated instance ID. Found instance is adopted and SSM is updated. EC2 client in tag-scan path has 5s connect / 10s read timeout.
f1c2f099 2026-05-07 Added DynamoDBPolicy to IngestWindowFunction (missing permission caused all segment timestamps to fall back to datetime.now()). Added force flag to RetriggerIngestFunction + _reset_segments_to_pending(). Added S3 listing fallback in _load_frames_from_s3 when frameCount=0.
b2fbb303 earlier Added receive_count > 5 retry cap to sqs_handler; added _read_gpu_instance_id() and _gpu_available() helpers; added INGEST_DLQ_URL env var to IngestDLQConsumer.
eb782c57 2026-04-05 Added IngestDLQ (SQS, 14-day retention) and IngestDLQConsumer Lambda; wired OnFailure: SQS destination on IngestWindowFunction.
189666b3 (PR #409) earlier Primary fix: auto-bootstrap GPU when SSM holds 'none' sentinel; extract _persist_instance_id_to_ssm and call it from both adoption and launch paths so the sentinel heals on first successful run. Also bundled in the same squash-merge: scoped mute suppression in discord_alerts to DLQ alarm only; dropped the receive_count > 5 cap from sqs_handler (receive_count inflates during GPU outages, causing the first GPU-up retry to trip the cap; poison pills now age out via 14-day SQS retention); added Discord alerting self-monitoring alarms in Terraform; pinned black to 24.1.1 in CI.

Deployment

  • Mechanism: SAM (main/server/template.yaml)
  • Lambdas: IngestWindowFunction, IngestDLQConsumer, RetriggerIngestFunction
  • IngestWindowFunction has OnFailure: Type: SQS, Destination: IngestDLQ.Arn; MaximumRetryAttempts: 0 (no Lambda-managed retry — DLQ consumer owns retry policy).
  • IngestDLQConsumer is SQS-triggered with BatchSize: 1, FunctionResponseTypes: [ReportBatchItemFailures].
  • Deploy command:
    cd main/server && sam build && sam deploy
    
  • Notes: Deploy IngestDLQConsumer before changing IngestWindowFunction raise behavior. If consumer is not present, a GPU-down GpuUnavailableError loops fast against old visibility timeout.