Skip to content

WorldMM Vertical Slice — EgoLife Validation Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Process EgoLife data through the full WorldMM pipeline and validate accuracy on 500 QA questions against the paper's 65.6%.

Architecture: Local Python pipeline orchestrator processes EgoLife data via OpenAI Batch API (translation, captioning, NER, triples, merging, semantic extraction) and a remote GPU worker (VLM2Vec visual embeddings). Evaluation runs QA questions with direct GPT-5 API calls.

Tech Stack: Python 3.10+, OpenAI Batch API, FastAPI (GPU worker), VLM2Vec-V2, pysrt, ffmpeg, igraph, numpy, pytest

Spec: docs/superpowers/specs/2026-03-22-worldmm-vertical-slice-design.md


File Structure

main/server/worldmm/
+-- pipeline/
|   +-- __init__.py
|   +-- srt_parser.py          # Parse SRT files, extract timestamp + text entries
|   +-- timestamps.py          # Timestamp utilities (HHMMSS, query_time format)
|   +-- translate.py           # Step 1: batch translate Chinese DenseCaptions
|   +-- sync.py                # Step 2: merge captions + transcripts, match to video files
|   +-- fine_caption.py        # Step 3: rewrite into first-person 30s captions
|   +-- multiscale_runner.py   # Step 4: multi-scale merging (3min, 10min, 1hr)
|   +-- triple_runner.py       # Step 5: NER + episodic triple extraction
|   +-- semantic_runner.py     # Step 6: semantic extraction + consolidation
|   +-- visual_runner.py       # Step 7: VLM2Vec encoding via GPU worker
|   +-- evaluate_pipeline.py   # Step 8: QA evaluation with answer extraction
|   +-- run.py                 # Main orchestrator
+-- gpu_worker/
|   +-- __init__.py
|   +-- server.py              # FastAPI VLM2Vec server
|   +-- setup.sh               # Instance setup script
+-- tests/
    +-- test_srt_parser.py
    +-- test_timestamps.py
    +-- test_translate.py
    +-- test_evaluate_pipeline.py
    +-- fixtures/
        +-- sample.srt

Task 1: SRT Parser + Timestamp Utilities

Files: - Create: main/server/worldmm/pipeline/__init__.py - Create: main/server/worldmm/pipeline/srt_parser.py - Create: main/server/worldmm/pipeline/timestamps.py - Create: main/server/worldmm/tests/fixtures/sample.srt - Test: main/server/worldmm/tests/test_srt_parser.py - Test: main/server/worldmm/tests/test_timestamps.py

  • [ ] Step 1: Create sample SRT fixture

Save to main/server/worldmm/tests/fixtures/sample.srt:

1
00:00:01,066 --> 00:00:02,166
We look at everyone

2
00:00:02,166 --> 00:00:03,533
We continue looking at everyone

3
00:00:03,766 --> 00:00:06,866
We sit and keep looking at everyone

  • [ ] Step 2: Write the failing tests
# main/server/worldmm/tests/test_srt_parser.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))
FIXTURES = Path(__file__).parent / "fixtures"

def test_parse_srt_returns_entries():
    from worldmm.pipeline.srt_parser import parse_srt
    entries = parse_srt(FIXTURES / "sample.srt")
    assert len(entries) == 3
    assert entries[0]["text"] == "We look at everyone"
    assert entries[0]["start_seconds"] == 1.066
    assert entries[0]["end_seconds"] == 2.166

def test_parse_srt_with_hour_offset():
    from worldmm.pipeline.srt_parser import parse_srt
    entries = parse_srt(FIXTURES / "sample.srt", hour_offset=11)
    assert entries[0]["start_hhmmss"] == "110001"
    assert entries[0]["end_hhmmss"] == "110002"

def test_parse_bilingual_transcript():
    from worldmm.pipeline.srt_parser import parse_bilingual_transcript
    text = "Jake: Good then a stopwatch\nJake: Okay, then we need a stopwatch."
    speaker, line1, line2 = parse_bilingual_transcript(text)
    assert speaker == "Jake"
    assert line2 == "Okay, then we need a stopwatch."

def test_extract_hour_from_filename():
    from worldmm.pipeline.srt_parser import extract_hour_from_filename
    assert extract_hour_from_filename("A1_JAKE_DAY1_11000000.srt") == 11
    assert extract_hour_from_filename("A1_JAKE_DAY6_19000000.srt") == 19
# main/server/worldmm/tests/test_timestamps.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))

def test_make_query_time():
    from worldmm.pipeline.timestamps import make_query_time
    assert make_query_time("DAY1", "11210217") == 111210217
    assert make_query_time("DAY6", "11153417") == 611153417

def test_seconds_to_hhmmss():
    from worldmm.pipeline.timestamps import seconds_to_hhmmss
    assert seconds_to_hhmmss(3661.5, hour_offset=0) == "010101"
    assert seconds_to_hhmmss(1.066, hour_offset=11) == "110001"

def test_make_segment_timestamp():
    from worldmm.pipeline.timestamps import make_segment_timestamp
    assert make_segment_timestamp("DAY6", "11530000") == "611530000"
    assert make_segment_timestamp("DAY1", "12000000") == "112000000"

def test_compare_timestamps():
    from worldmm.pipeline.timestamps import compare_timestamps
    assert compare_timestamps("111210217", "611530000") < 0
    assert compare_timestamps("611530000", "111210217") > 0
    assert compare_timestamps("111210217", "111210217") == 0
  • [ ] Step 3: Run tests to verify they fail

Run: cd main/server && python -m pytest worldmm/tests/test_srt_parser.py worldmm/tests/test_timestamps.py -v Expected: FAIL

  • [ ] Step 4: Write the implementations
# main/server/worldmm/pipeline/__init__.py
# (empty)

# main/server/worldmm/pipeline/srt_parser.py
from __future__ import annotations
import re
from pathlib import Path
from typing import Any


def parse_srt(path: Path, hour_offset: int = 0) -> list[dict[str, Any]]:
    text = path.read_text(encoding="utf-8")
    entries = []
    blocks = re.split(r"\n\n+", text.strip())
    for block in blocks:
        lines = block.strip().split("\n")
        if len(lines) < 3:
            continue
        time_match = re.match(
            r"(\d{2}):(\d{2}):(\d{2}),(\d{3})\s*-->\s*(\d{2}):(\d{2}):(\d{2}),(\d{3})",
            lines[1],
        )
        if not time_match:
            continue
        g = [int(x) for x in time_match.groups()]
        start_secs = g[0] * 3600 + g[1] * 60 + g[2] + g[3] / 1000
        end_secs = g[4] * 3600 + g[5] * 60 + g[6] + g[7] / 1000
        caption_text = "\n".join(lines[2:])
        start_h = hour_offset + g[0]
        end_h = hour_offset + g[4]
        entries.append({
            "text": caption_text,
            "start_seconds": round(start_secs, 3),
            "end_seconds": round(end_secs, 3),
            "start_hhmmss": f"{start_h:02d}{g[1]:02d}{g[2]:02d}",
            "end_hhmmss": f"{end_h:02d}{g[5]:02d}{g[6]:02d}",
        })
    return entries


def parse_bilingual_transcript(text: str) -> tuple[str, str, str]:
    lines = text.strip().split("\n")
    if len(lines) < 2:
        return "", text, ""
    def extract(line: str) -> tuple[str, str]:
        match = re.match(r"^(\w+):\s*(.+)$", line.strip())
        return (match.group(1), match.group(2)) if match else ("", line.strip())
    speaker1, text1 = extract(lines[0])
    speaker2, text2 = extract(lines[1])
    return speaker1 or speaker2, text1, text2


def extract_hour_from_filename(filename: str) -> int:
    match = re.search(r"_(\d{2})\d{6}\.", filename)
    if match:
        return int(match.group(1))
    match = re.search(r"_(\d{2})\d{6}$", filename)
    if match:
        return int(match.group(1))
    return 0
# main/server/worldmm/pipeline/timestamps.py
from __future__ import annotations


def make_query_time(date: str, time_str: str) -> int:
    day_num = date.replace("DAY", "")
    return int(f"{day_num}{time_str.zfill(8)}")


def seconds_to_hhmmss(seconds: float, hour_offset: int = 0) -> str:
    total_secs = int(seconds)
    h = hour_offset + total_secs // 3600
    m = (total_secs % 3600) // 60
    s = total_secs % 60
    return f"{h:02d}{m:02d}{s:02d}"


def make_segment_timestamp(date: str, end_time: str) -> str:
    day_num = date.replace("DAY", "")
    return f"{day_num}{end_time.zfill(8)}"


def compare_timestamps(a: str, b: str) -> int:
    ia, ib = int(a), int(b)
    return (ia > ib) - (ia < ib)
  • [ ] Step 5: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_srt_parser.py worldmm/tests/test_timestamps.py -v Expected: All 8 tests PASS

  • [ ] Step 6: Commit
git add main/server/worldmm/pipeline/ main/server/worldmm/tests/test_srt_parser.py \
        main/server/worldmm/tests/test_timestamps.py main/server/worldmm/tests/fixtures/sample.srt
git commit -m "feat(worldmm): add SRT parser and timestamp utilities"

Task 2: Translation Pipeline

Files: - Create: main/server/worldmm/pipeline/translate.py - Test: main/server/worldmm/tests/test_translate.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_translate.py
from __future__ import annotations
import json
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))

def test_build_translation_requests():
    from worldmm.pipeline.translate import build_translation_requests
    entries = [
        {"text": "text1", "start_hhmmss": "110001", "end_hhmmss": "110002"},
        {"text": "text2", "start_hhmmss": "110002", "end_hhmmss": "110003"},
    ]
    requests, ids = build_translation_requests(entries, subject="A1_JAKE", date="DAY1")
    assert len(requests) == 2
    assert ids[0] == "0-A1_JAKE-DAY1-110001-110002"
    assert "text1" in str(requests[0]["messages"])

def test_parse_translation_results():
    from worldmm.pipeline.translate import parse_translation_results
    batch_results = {
        "0-A1_JAKE-DAY1-110001-110002": "I look at everyone",
        "1-A1_JAKE-DAY1-110002-110003": "I continue looking",
    }
    entries = [
        {"text": "cn1", "start_hhmmss": "110001", "end_hhmmss": "110002"},
        {"text": "cn2", "start_hhmmss": "110002", "end_hhmmss": "110003"},
    ]
    translated = parse_translation_results(batch_results, entries, "A1_JAKE", "DAY1")
    assert len(translated) == 2
    assert translated[0]["translated_text"] == "I look at everyone"

def test_save_load_translations(tmp_path):
    from worldmm.pipeline.translate import save_translations, load_translations
    data = [{"start_hhmmss": "110001", "translated_text": "hello"}]
    save_translations(data, tmp_path / "DAY1.json")
    assert load_translations(tmp_path / "DAY1.json") == data
  • [ ] Step 2: Run test, verify fail

Run: cd main/server && python -m pytest worldmm/tests/test_translate.py -v

  • [ ] Step 3: Write implementation
# main/server/worldmm/pipeline/translate.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from worldmm.llm.client import build_request


def build_translation_requests(
    entries: list[dict[str, Any]], subject: str, date: str,
) -> tuple[list[dict[str, Any]], list[str]]:
    requests, custom_ids = [], []
    for i, entry in enumerate(entries):
        custom_id = f"{i}-{subject}-{date}-{entry['start_hhmmss']}-{entry['end_hhmmss']}"
        req = build_request(
            model="gpt-5-mini",
            messages=[
                {"role": "system", "content": "Translate the following Chinese text to English. Return only the translation."},
                {"role": "user", "content": entry["text"]},
            ],
            max_completion_tokens=256, reasoning_effort="low",
        )
        requests.append(req)
        custom_ids.append(custom_id)
    return requests, custom_ids


def parse_translation_results(
    batch_results: dict[str, str | None], entries: list[dict[str, Any]], subject: str, date: str,
) -> list[dict[str, Any]]:
    translated = []
    for i, entry in enumerate(entries):
        custom_id = f"{i}-{subject}-{date}-{entry['start_hhmmss']}-{entry['end_hhmmss']}"
        translated.append({**entry, "translated_text": batch_results.get(custom_id, "") or ""})
    return translated


def save_translations(data: list[dict[str, Any]], path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(data, indent=2))


def load_translations(path: Path) -> list[dict[str, Any]]:
    return json.loads(path.read_text())
  • [ ] Step 4: Run tests, verify pass

  • [ ] Step 5: Commit

git add main/server/worldmm/pipeline/translate.py main/server/worldmm/tests/test_translate.py
git commit -m "feat(worldmm): add caption translation pipeline for batch API"

Task 3: Sync + Fine Caption + Multiscale + Triple + Semantic Runners

Files: - Create: main/server/worldmm/pipeline/sync.py - Create: main/server/worldmm/pipeline/fine_caption.py - Create: main/server/worldmm/pipeline/multiscale_runner.py - Create: main/server/worldmm/pipeline/triple_runner.py - Create: main/server/worldmm/pipeline/semantic_runner.py - Create: main/server/worldmm/pipeline/visual_runner.py

These are orchestration modules that wire together the existing core components (from Tasks 1-16 of the core plan). They don't need separate TDD since they delegate to already-tested functions. Write them, verify imports work, commit.

  • [ ] Step 1: Write all runner files

Write each file following the implementations shown in the spec. Each file should import from the existing worldmm.memory.* and worldmm.llm.* modules and add pipeline-specific orchestration.

Key files: - sync.py — merges translated captions + transcripts by timestamp - fine_caption.py — groups entries into 30s windows, builds batch requests for first-person rewriting - multiscale_runner.py — builds batch merge requests for 3min/10min/1hr scales - triple_runner.py — builds batch NER + triple extraction requests - semantic_runner.py — builds batch semantic extraction requests - visual_runner.py — samples frames via ffmpeg, encodes via GPU worker client

  • [ ] Step 2: Verify imports

Run: cd main/server && python -c "from worldmm.pipeline import sync, fine_caption, multiscale_runner, triple_runner, semantic_runner, visual_runner; print('All runners OK')"

  • [ ] Step 3: Commit
git add main/server/worldmm/pipeline/
git commit -m "feat(worldmm): add pipeline runner modules for sync, captions, triples, semantic, visual"

Task 4: Evaluation Runner (TDD)

Files: - Create: main/server/worldmm/pipeline/evaluate_pipeline.py - Test: main/server/worldmm/tests/test_evaluate_pipeline.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_evaluate_pipeline.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))

def test_extract_answer_exact_match():
    from worldmm.pipeline.evaluate_pipeline import extract_answer
    choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
    assert extract_answer("Alice", choices) == "B"

def test_extract_answer_letter_extraction():
    from worldmm.pipeline.evaluate_pipeline import extract_answer
    choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
    assert extract_answer("The answer is (B)", choices) == "B"
    assert extract_answer("B.", choices) == "B"
    assert extract_answer("**B**", choices) == "B"
    assert extract_answer("Answer: C", choices) == "C"

def test_extract_answer_full_pattern():
    from worldmm.pipeline.evaluate_pipeline import extract_answer
    choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
    assert extract_answer("B. Alice", choices) == "B"
    assert extract_answer("(B) Alice", choices) == "B"

def test_extract_answer_no_match():
    from worldmm.pipeline.evaluate_pipeline import extract_answer
    choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
    assert extract_answer("I don't know the answer", choices) is None

def test_extract_answer_case_insensitive():
    from worldmm.pipeline.evaluate_pipeline import extract_answer
    choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
    assert extract_answer("alice", choices) == "B"

def test_compute_accuracy():
    from worldmm.pipeline.evaluate_pipeline import compute_accuracy
    results = [{"correct": True}, {"correct": False}, {"correct": True}]
    assert abs(compute_accuracy(results) - 66.67) < 0.1

def test_compute_accuracy_by_type():
    from worldmm.pipeline.evaluate_pipeline import compute_accuracy_by_type
    results = [
        {"type": "EntityLog", "correct": True},
        {"type": "EntityLog", "correct": False},
        {"type": "HabitInsight", "correct": True},
    ]
    by_type = compute_accuracy_by_type(results)
    assert abs(by_type["EntityLog"] - 50.0) < 0.1
    assert abs(by_type["HabitInsight"] - 100.0) < 0.1
  • [ ] Step 2: Run test, verify fail

  • [ ] Step 3: Write implementation

# main/server/worldmm/pipeline/evaluate_pipeline.py
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any


def extract_answer(response: str, choices: dict[str, str]) -> str | None:
    response_norm = response.strip().lower()
    # Strategy 1: Exact match
    for letter, value in choices.items():
        if response_norm == value.strip().lower():
            return letter
    # Strategy 2: Letter extraction
    for pattern in [r"\(([A-D])\)", r"\*\*([A-D])\*\*", r"[Aa]nswer:\s*([A-D])", r"^([A-D])\.", r"^([A-D])\s*$", r"^([A-D]):"]:
        match = re.search(pattern, response.strip())
        if match:
            return match.group(1).upper()
    # Strategy 3: Full pattern
    for letter, value in choices.items():
        for pat in [rf"{letter}\.\s*{re.escape(value)}", rf"\({letter}\)\s*{re.escape(value)}"]:
            if re.search(pat, response, re.IGNORECASE):
                return letter
    return None


def compute_accuracy(results: list[dict[str, Any]]) -> float:
    if not results:
        return 0.0
    return round(100 * sum(1 for r in results if r.get("correct")) / len(results), 2)


def compute_accuracy_by_type(results: list[dict[str, Any]]) -> dict[str, float]:
    by_type: dict[str, list[bool]] = {}
    for r in results:
        by_type.setdefault(r.get("type", "unknown"), []).append(r.get("correct", False))
    return {t: round(100 * sum(v) / len(v), 2) for t, v in by_type.items()}


def load_qa_questions(path: Path) -> list[dict[str, Any]]:
    return json.loads(path.read_text())


def save_results(results: list[dict[str, Any]], path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(results, indent=2))
  • [ ] Step 4: Run tests, verify pass

  • [ ] Step 5: Commit

git add main/server/worldmm/pipeline/evaluate_pipeline.py main/server/worldmm/tests/test_evaluate_pipeline.py
git commit -m "feat(worldmm): add evaluation runner with three-strategy answer extraction"

Task 5: Main Orchestrator

Files: - Create: main/server/worldmm/pipeline/run.py

  • [ ] Step 1: Write run.py

The main orchestrator that runs all 8 pipeline steps in sequence. Takes --data-dir, --subject, --cache-dir, --gpu-worker-url, and --openai-api-key arguments.

Each step checks for cached output before running. Uses submit_batch_and_wait() to handle OpenAI Batch API lifecycle (upload JSONL, create batch, poll until complete, download results).

Key flow: 1. Translate DenseCaptions (batch API, per-day caching) 2. Generate sync data + fine captions (batch API) 3. Multi-scale merging (batch API, per-scale caching) 4. Episodic triple extraction: NER batch then triple batch 5. Semantic extraction (batch API) 6. Visual encoding (GPU worker, incremental caching) 7. Load QA questions, build context from indexed memories, query GPT-5-mini, extract answers, compute accuracy

  • [ ] Step 2: Verify imports

Run: cd main/server && python -c "from worldmm.pipeline.run import main; print('OK')"

  • [ ] Step 3: Verify help

Run: cd main/server && python -m worldmm.pipeline.run --help

  • [ ] Step 4: Commit
git add main/server/worldmm/pipeline/run.py
git commit -m "feat(worldmm): add main pipeline orchestrator for EgoLife validation"

Task 6: GPU Worker Server

Files: - Create: main/server/worldmm/gpu_worker/__init__.py - Create: main/server/worldmm/gpu_worker/server.py - Create: main/server/worldmm/gpu_worker/setup.sh

  • [ ] Step 1: Write server.py

FastAPI server with three endpoints: - GET /health — returns model status - POST /encode-video — accepts {"frames": [base64...]}, returns {"embedding": [float...]} - POST /encode-text — accepts {"text": "query"}, returns {"embedding": [float...]}

Uses Qwen2-VL-2B-Instruct as base + TIGER-Lab/VLM2Vec-LoRA adapters. Model loaded once at startup into GPU memory.

  • [ ] Step 2: Write setup.sh

Instance setup script: installs Python deps, downloads model weights to local disk.

  • [ ] Step 3: Commit
git add main/server/worldmm/gpu_worker/
git commit -m "feat(worldmm): add VLM2Vec GPU worker server and setup script"

Task 7: Full Integration Verification

  • [ ] Step 1: Run full test suite

Run: cd main/server && python -m pytest worldmm/tests/ -v --tb=short Expected: All tests PASS

  • [ ] Step 2: Verify pipeline help

Run: cd main/server && python -m worldmm.pipeline.run --help

  • [ ] Step 3: Final commit if fixes needed