Skip to content

Ingest Segment-First with Processing Status

Plan Metadata

  • Plan type: plan
  • Parent plan: N/A
  • Depends on: docs/plans/memory-fetch-display.md
  • Status: documentation

System Intent

  • What is being built: A resilient ingest pipeline that writes a worldmm_segments row immediately after transcription (before any GPU work), tracks processing completeness via a processing_status column, and uses that status to drive idempotent DLQ retries so GPU enrichment (caption, NER, triples, embeddings) can be re-attempted on incomplete segments without data loss.
  • Primary consumer(s): ingest_window.py (Lambda), DLQ retry path (sqs_handler), feed API (reads all segments regardless of status).
  • Boundary: IngestWindowFunction receives a window event → writes segment immediately → attempts GPU enrichment → marks complete on success or failed on error → DLQ retries query by status and re-enrich.

Stage Gate Tracker

  • [x] Stage 1 Mermaid approved
  • [x] Stage 2 I/O contracts approved
  • [x] Stage 3 pseudocode/technical details approved

1. Mermaid Diagram

flowchart TD
    subgraph TRIGGER["Trigger Sources"]
        FRAME["FramePostFunction\napi/sessions/frames/app.py"]:::unchanged
        SESS["SessionEndFunction\napi/sessions/end/app.py"]:::unchanged
        DLQ["IngestDLQConsumer\nworldmm/pipeline/ingest_window.py — sqs_handler"]:::unchanged
    end

    subgraph INGEST["IngestWindowFunction"]
        IW["ingest_window.py\nworldmm/pipeline/ingest_window.py"]:::updated
    end

    subgraph DB["PostgreSQL"]
        SEG[("worldmm_segments\n+ processing_status\nshared/orm/worldmm_orm.py")]:::updated
        MIG["Alembic migration\nadd processing_status column"]:::created
    end

    subgraph GPU["GPU Worker EC2"]
        GPU_W["gpu_worker — caption + NER + embeddings"]:::unchanged
    end

    subgraph S3["S3"]
        FRAMES["Frame images + audio\nsessions/session_id/window_NNN/"]:::unchanged
    end

    FRAME -->|"async invoke — window event"| IW
    SESS -->|"async invoke — window event"| IW
    DLQ -->|"SQS record — window event"| IW

    IW -->|"read frames + audio"| S3
    S3 -->|"bytes"| IW
    IW -->|"transcribe audio"| IW
    IW -->|"create_segment processing_status=pending"| SEG
    IW -->|"caption + NER + triples + embeddings"| GPU_W
    GPU_W -->|"caption, entities, triples, embedding"| IW
    IW -->|"update processing_status=complete"| SEG
    IW -->|"update processing_status=failed on GPU error"| SEG

classDef unchanged fill:#d3d3d3,stroke:#666,stroke-width:1px
classDef updated fill:#ffe58a,stroke:#666,stroke-width:1px
classDef created fill:#a8e6a3,stroke:#666,stroke-width:1px

2. Black-Box Inputs and Outputs

Global Types

WindowEvent {
  sessionId:    string   (UUID — originating session)
  userId:       string   (owner)
  windowIndex:  int      (0-based window index within session)
  frameCount:   int      (number of frames uploaded for this window)
}

ProcessingStatus = "pending" | "complete" | "failed"

SegmentRow {
  id:                string            (UUID — worldmm_segments PK)
  user_id:           string
  source_session_id: string
  source_window_index: int
  start_time:        string ISO 8601
  end_time:          string ISO 8601
  duration_seconds:  int               (always 30)
  transcript:        string | null
  caption:           string | null     (null until GPU enrichment succeeds)
  s3_frames_key:     string | null
  processing_status: ProcessingStatus  (NEW — default "complete" for existing rows)
}

Flow: ingestWindow

  • Test files: main/server/tests/integration/test_ingest_segment_first.py
  • Core files: main/server/worldmm/pipeline/ingest_window.py, main/server/layers/shared/python/shared/orm/worldmm_orm.py

Paths

path-name input output/expected state change path-type notes updated
ingestWindow.segment-created-before-gpu WindowEvent with frames segment row written with processing_status=pending before any GPU call happy path segment must exist even if GPU crashes immediately after Y
ingestWindow.gpu-success WindowEvent, GPU available segment updated: caption set, processing_status=complete happy path Y
ingestWindow.gpu-failure WindowEvent, GPU unavailable or error segment persists with processing_status=failed, caption=null; handler returns success error no exception propagated to caller; DLQ not triggered Y
ingestWindow.already-complete WindowEvent for a segment with processing_status=complete no-op — return already_processed subpath idempotency guard unchanged Y
ingestWindow.already-incomplete WindowEvent for a segment with processing_status=pending or failed re-attempt GPU enrichment; update processing_status on outcome subpath handles at-least-once delivery from Lambda/SQS Y
ingestWindow.no-frames WindowEvent with frameCount=0 and no S3 frames no segment written — return skipped subpath existing behavior preserved

3. Pseudocode / Technical Details for Critical Flows (Optional)

DB migration

ALTER TABLE worldmm_segments
  ADD COLUMN processing_status VARCHAR(16) NOT NULL DEFAULT 'complete';

Default 'complete' ensures all existing rows are treated as fully processed and skipped on re-delivery. New rows set this explicitly to 'pending' at creation time.

ingest_window.lambda_handler reorder

1. idempotency check:
     existing = segment_exists(user_id, session_id, window_index)
     if existing and existing.processing_status == "complete":
         return {status: "already_processed", segment_id: existing.id}

2. load frames from S3                   (unchanged)
3. if no frames: return {status: skip}   (unchanged)
4. load + transcribe audio               (unchanged)

5. [NEW] upsert segment with processing_status="pending":
     if existing (pending/failed): update transcript + s3_frames_key in place
     else: create_segment(caption=None, transcript=..., s3_frames_key=..., processing_status="pending")

6. [NEW — best-effort GPU block]
   try:
       resolve GPU URL, build gpu_worker
       caption = gpu_worker.caption(sampled_frames, transcript=transcript)
       entities, triples = gpu_worker.ner_and_triples(caption)
       store entities + triples
       store visual embedding (best-effort inner try/except, unchanged)
       update_segment(segment_id, caption=caption, processing_status="complete")
       return {status: "ok", segment_id: segment_id, enriched: True}
   except Exception as e:
       update_segment(segment_id, processing_status="failed")
       log {step: "gpu_enrichment_failed", error: str(e)}
       return {status: "ok", segment_id: segment_id, enriched: False}

ORM changes

  • WorldMMSegment: add processing_status: Mapped[str] column (VARCHAR(16) DEFAULT 'complete').
  • create_segment: add processing_status param, default "pending".
  • update_segment: new function — updates caption and processing_status by segment_id.
  • segment_exists: return full WorldMMSegment object (not just id) so caller can read processing_status.

After all stages are approved, apply .agent/skills/reconcile-plans/SKILL.md to propagate contract updates across linked plans.