Ingest Segment-First with Processing Status
Plan Metadata
- Plan type:
plan - Parent plan: N/A
- Depends on:
docs/plans/memory-fetch-display.md - Status:
documentation
System Intent
- What is being built: A resilient ingest pipeline that writes a
worldmm_segmentsrow immediately after transcription (before any GPU work), tracks processing completeness via aprocessing_statuscolumn, and uses that status to drive idempotent DLQ retries so GPU enrichment (caption, NER, triples, embeddings) can be re-attempted on incomplete segments without data loss. - Primary consumer(s):
ingest_window.py(Lambda), DLQ retry path (sqs_handler), feed API (reads all segments regardless of status). - Boundary:
IngestWindowFunctionreceives a window event → writes segment immediately → attempts GPU enrichment → markscompleteon success orfailedon error → DLQ retries query by status and re-enrich.
Stage Gate Tracker
- [x] Stage 1 Mermaid approved
- [x] Stage 2 I/O contracts approved
- [x] Stage 3 pseudocode/technical details approved
1. Mermaid Diagram
flowchart TD
subgraph TRIGGER["Trigger Sources"]
FRAME["FramePostFunction\napi/sessions/frames/app.py"]:::unchanged
SESS["SessionEndFunction\napi/sessions/end/app.py"]:::unchanged
DLQ["IngestDLQConsumer\nworldmm/pipeline/ingest_window.py — sqs_handler"]:::unchanged
end
subgraph INGEST["IngestWindowFunction"]
IW["ingest_window.py\nworldmm/pipeline/ingest_window.py"]:::updated
end
subgraph DB["PostgreSQL"]
SEG[("worldmm_segments\n+ processing_status\nshared/orm/worldmm_orm.py")]:::updated
MIG["Alembic migration\nadd processing_status column"]:::created
end
subgraph GPU["GPU Worker EC2"]
GPU_W["gpu_worker — caption + NER + embeddings"]:::unchanged
end
subgraph S3["S3"]
FRAMES["Frame images + audio\nsessions/session_id/window_NNN/"]:::unchanged
end
FRAME -->|"async invoke — window event"| IW
SESS -->|"async invoke — window event"| IW
DLQ -->|"SQS record — window event"| IW
IW -->|"read frames + audio"| S3
S3 -->|"bytes"| IW
IW -->|"transcribe audio"| IW
IW -->|"create_segment processing_status=pending"| SEG
IW -->|"caption + NER + triples + embeddings"| GPU_W
GPU_W -->|"caption, entities, triples, embedding"| IW
IW -->|"update processing_status=complete"| SEG
IW -->|"update processing_status=failed on GPU error"| SEG
classDef unchanged fill:#d3d3d3,stroke:#666,stroke-width:1px
classDef updated fill:#ffe58a,stroke:#666,stroke-width:1px
classDef created fill:#a8e6a3,stroke:#666,stroke-width:1px 2. Black-Box Inputs and Outputs
Global Types
WindowEvent {
sessionId: string (UUID — originating session)
userId: string (owner)
windowIndex: int (0-based window index within session)
frameCount: int (number of frames uploaded for this window)
}
ProcessingStatus = "pending" | "complete" | "failed"
SegmentRow {
id: string (UUID — worldmm_segments PK)
user_id: string
source_session_id: string
source_window_index: int
start_time: string ISO 8601
end_time: string ISO 8601
duration_seconds: int (always 30)
transcript: string | null
caption: string | null (null until GPU enrichment succeeds)
s3_frames_key: string | null
processing_status: ProcessingStatus (NEW — default "complete" for existing rows)
}
Flow: ingestWindow
- Test files:
main/server/tests/integration/test_ingest_segment_first.py - Core files:
main/server/worldmm/pipeline/ingest_window.py,main/server/layers/shared/python/shared/orm/worldmm_orm.py
Paths
| path-name | input | output/expected state change | path-type | notes | updated |
|---|---|---|---|---|---|
ingestWindow.segment-created-before-gpu | WindowEvent with frames | segment row written with processing_status=pending before any GPU call | happy path | segment must exist even if GPU crashes immediately after | Y |
ingestWindow.gpu-success | WindowEvent, GPU available | segment updated: caption set, processing_status=complete | happy path | Y | |
ingestWindow.gpu-failure | WindowEvent, GPU unavailable or error | segment persists with processing_status=failed, caption=null; handler returns success | error | no exception propagated to caller; DLQ not triggered | Y |
ingestWindow.already-complete | WindowEvent for a segment with processing_status=complete | no-op — return already_processed | subpath | idempotency guard unchanged | Y |
ingestWindow.already-incomplete | WindowEvent for a segment with processing_status=pending or failed | re-attempt GPU enrichment; update processing_status on outcome | subpath | handles at-least-once delivery from Lambda/SQS | Y |
ingestWindow.no-frames | WindowEvent with frameCount=0 and no S3 frames | no segment written — return skipped | subpath | existing behavior preserved |
3. Pseudocode / Technical Details for Critical Flows (Optional)
DB migration
Default 'complete' ensures all existing rows are treated as fully processed and skipped on re-delivery. New rows set this explicitly to 'pending' at creation time.
ingest_window.lambda_handler reorder
1. idempotency check:
existing = segment_exists(user_id, session_id, window_index)
if existing and existing.processing_status == "complete":
return {status: "already_processed", segment_id: existing.id}
2. load frames from S3 (unchanged)
3. if no frames: return {status: skip} (unchanged)
4. load + transcribe audio (unchanged)
5. [NEW] upsert segment with processing_status="pending":
if existing (pending/failed): update transcript + s3_frames_key in place
else: create_segment(caption=None, transcript=..., s3_frames_key=..., processing_status="pending")
6. [NEW — best-effort GPU block]
try:
resolve GPU URL, build gpu_worker
caption = gpu_worker.caption(sampled_frames, transcript=transcript)
entities, triples = gpu_worker.ner_and_triples(caption)
store entities + triples
store visual embedding (best-effort inner try/except, unchanged)
update_segment(segment_id, caption=caption, processing_status="complete")
return {status: "ok", segment_id: segment_id, enriched: True}
except Exception as e:
update_segment(segment_id, processing_status="failed")
log {step: "gpu_enrichment_failed", error: str(e)}
return {status: "ok", segment_id: segment_id, enriched: False}
ORM changes
WorldMMSegment: addprocessing_status: Mapped[str]column (VARCHAR(16) DEFAULT 'complete').create_segment: addprocessing_statusparam, default"pending".update_segment: new function — updatescaptionandprocessing_statusbysegment_id.segment_exists: return fullWorldMMSegmentobject (not justid) so caller can readprocessing_status.
4. Handoff to Related Plan Reconciliation
After all stages are approved, apply .agent/skills/reconcile-plans/SKILL.md to propagate contract updates across linked plans.