WorldMM Vertical Slice — EgoLife Validation Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Process EgoLife data through the full WorldMM pipeline and validate accuracy on 500 QA questions against the paper's 65.6%.
Architecture: Local Python pipeline orchestrator processes EgoLife data via OpenAI Batch API (translation, captioning, NER, triples, merging, semantic extraction) and a remote GPU worker (VLM2Vec visual embeddings). Evaluation runs QA questions with direct GPT-5 API calls.
Tech Stack: Python 3.10+, OpenAI Batch API, FastAPI (GPU worker), VLM2Vec-V2, pysrt, ffmpeg, igraph, numpy, pytest
Spec: docs/superpowers/specs/2026-03-22-worldmm-vertical-slice-design.md
File Structure
main/server/worldmm/
+-- pipeline/
| +-- __init__.py
| +-- srt_parser.py # Parse SRT files, extract timestamp + text entries
| +-- timestamps.py # Timestamp utilities (HHMMSS, query_time format)
| +-- translate.py # Step 1: batch translate Chinese DenseCaptions
| +-- sync.py # Step 2: merge captions + transcripts, match to video files
| +-- fine_caption.py # Step 3: rewrite into first-person 30s captions
| +-- multiscale_runner.py # Step 4: multi-scale merging (3min, 10min, 1hr)
| +-- triple_runner.py # Step 5: NER + episodic triple extraction
| +-- semantic_runner.py # Step 6: semantic extraction + consolidation
| +-- visual_runner.py # Step 7: VLM2Vec encoding via GPU worker
| +-- evaluate_pipeline.py # Step 8: QA evaluation with answer extraction
| +-- run.py # Main orchestrator
+-- gpu_worker/
| +-- __init__.py
| +-- server.py # FastAPI VLM2Vec server
| +-- setup.sh # Instance setup script
+-- tests/
+-- test_srt_parser.py
+-- test_timestamps.py
+-- test_translate.py
+-- test_evaluate_pipeline.py
+-- fixtures/
+-- sample.srt
Task 1: SRT Parser + Timestamp Utilities
Files: - Create: main/server/worldmm/pipeline/__init__.py - Create: main/server/worldmm/pipeline/srt_parser.py - Create: main/server/worldmm/pipeline/timestamps.py - Create: main/server/worldmm/tests/fixtures/sample.srt - Test: main/server/worldmm/tests/test_srt_parser.py - Test: main/server/worldmm/tests/test_timestamps.py
- [ ] Step 1: Create sample SRT fixture
Save to main/server/worldmm/tests/fixtures/sample.srt:
1
00:00:01,066 --> 00:00:02,166
We look at everyone
2
00:00:02,166 --> 00:00:03,533
We continue looking at everyone
3
00:00:03,766 --> 00:00:06,866
We sit and keep looking at everyone
- [ ] Step 2: Write the failing tests
# main/server/worldmm/tests/test_srt_parser.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
FIXTURES = Path(__file__).parent / "fixtures"
def test_parse_srt_returns_entries():
from worldmm.pipeline.srt_parser import parse_srt
entries = parse_srt(FIXTURES / "sample.srt")
assert len(entries) == 3
assert entries[0]["text"] == "We look at everyone"
assert entries[0]["start_seconds"] == 1.066
assert entries[0]["end_seconds"] == 2.166
def test_parse_srt_with_hour_offset():
from worldmm.pipeline.srt_parser import parse_srt
entries = parse_srt(FIXTURES / "sample.srt", hour_offset=11)
assert entries[0]["start_hhmmss"] == "110001"
assert entries[0]["end_hhmmss"] == "110002"
def test_parse_bilingual_transcript():
from worldmm.pipeline.srt_parser import parse_bilingual_transcript
text = "Jake: Good then a stopwatch\nJake: Okay, then we need a stopwatch."
speaker, line1, line2 = parse_bilingual_transcript(text)
assert speaker == "Jake"
assert line2 == "Okay, then we need a stopwatch."
def test_extract_hour_from_filename():
from worldmm.pipeline.srt_parser import extract_hour_from_filename
assert extract_hour_from_filename("A1_JAKE_DAY1_11000000.srt") == 11
assert extract_hour_from_filename("A1_JAKE_DAY6_19000000.srt") == 19
# main/server/worldmm/tests/test_timestamps.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_make_query_time():
from worldmm.pipeline.timestamps import make_query_time
assert make_query_time("DAY1", "11210217") == 111210217
assert make_query_time("DAY6", "11153417") == 611153417
def test_seconds_to_hhmmss():
from worldmm.pipeline.timestamps import seconds_to_hhmmss
assert seconds_to_hhmmss(3661.5, hour_offset=0) == "010101"
assert seconds_to_hhmmss(1.066, hour_offset=11) == "110001"
def test_make_segment_timestamp():
from worldmm.pipeline.timestamps import make_segment_timestamp
assert make_segment_timestamp("DAY6", "11530000") == "611530000"
assert make_segment_timestamp("DAY1", "12000000") == "112000000"
def test_compare_timestamps():
from worldmm.pipeline.timestamps import compare_timestamps
assert compare_timestamps("111210217", "611530000") < 0
assert compare_timestamps("611530000", "111210217") > 0
assert compare_timestamps("111210217", "111210217") == 0
- [ ] Step 3: Run tests to verify they fail
Run: cd main/server && python -m pytest worldmm/tests/test_srt_parser.py worldmm/tests/test_timestamps.py -v Expected: FAIL
- [ ] Step 4: Write the implementations
# main/server/worldmm/pipeline/__init__.py
# (empty)
# main/server/worldmm/pipeline/srt_parser.py
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
def parse_srt(path: Path, hour_offset: int = 0) -> list[dict[str, Any]]:
text = path.read_text(encoding="utf-8")
entries = []
blocks = re.split(r"\n\n+", text.strip())
for block in blocks:
lines = block.strip().split("\n")
if len(lines) < 3:
continue
time_match = re.match(
r"(\d{2}):(\d{2}):(\d{2}),(\d{3})\s*-->\s*(\d{2}):(\d{2}):(\d{2}),(\d{3})",
lines[1],
)
if not time_match:
continue
g = [int(x) for x in time_match.groups()]
start_secs = g[0] * 3600 + g[1] * 60 + g[2] + g[3] / 1000
end_secs = g[4] * 3600 + g[5] * 60 + g[6] + g[7] / 1000
caption_text = "\n".join(lines[2:])
start_h = hour_offset + g[0]
end_h = hour_offset + g[4]
entries.append({
"text": caption_text,
"start_seconds": round(start_secs, 3),
"end_seconds": round(end_secs, 3),
"start_hhmmss": f"{start_h:02d}{g[1]:02d}{g[2]:02d}",
"end_hhmmss": f"{end_h:02d}{g[5]:02d}{g[6]:02d}",
})
return entries
def parse_bilingual_transcript(text: str) -> tuple[str, str, str]:
lines = text.strip().split("\n")
if len(lines) < 2:
return "", text, ""
def extract(line: str) -> tuple[str, str]:
match = re.match(r"^(\w+):\s*(.+)$", line.strip())
return (match.group(1), match.group(2)) if match else ("", line.strip())
speaker1, text1 = extract(lines[0])
speaker2, text2 = extract(lines[1])
return speaker1 or speaker2, text1, text2
def extract_hour_from_filename(filename: str) -> int:
match = re.search(r"_(\d{2})\d{6}\.", filename)
if match:
return int(match.group(1))
match = re.search(r"_(\d{2})\d{6}$", filename)
if match:
return int(match.group(1))
return 0
# main/server/worldmm/pipeline/timestamps.py
from __future__ import annotations
def make_query_time(date: str, time_str: str) -> int:
day_num = date.replace("DAY", "")
return int(f"{day_num}{time_str.zfill(8)}")
def seconds_to_hhmmss(seconds: float, hour_offset: int = 0) -> str:
total_secs = int(seconds)
h = hour_offset + total_secs // 3600
m = (total_secs % 3600) // 60
s = total_secs % 60
return f"{h:02d}{m:02d}{s:02d}"
def make_segment_timestamp(date: str, end_time: str) -> str:
day_num = date.replace("DAY", "")
return f"{day_num}{end_time.zfill(8)}"
def compare_timestamps(a: str, b: str) -> int:
ia, ib = int(a), int(b)
return (ia > ib) - (ia < ib)
- [ ] Step 5: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_srt_parser.py worldmm/tests/test_timestamps.py -v Expected: All 8 tests PASS
- [ ] Step 6: Commit
git add main/server/worldmm/pipeline/ main/server/worldmm/tests/test_srt_parser.py \
main/server/worldmm/tests/test_timestamps.py main/server/worldmm/tests/fixtures/sample.srt
git commit -m "feat(worldmm): add SRT parser and timestamp utilities"
Task 2: Translation Pipeline
Files: - Create: main/server/worldmm/pipeline/translate.py - Test: main/server/worldmm/tests/test_translate.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_translate.py
from __future__ import annotations
import json
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_build_translation_requests():
from worldmm.pipeline.translate import build_translation_requests
entries = [
{"text": "text1", "start_hhmmss": "110001", "end_hhmmss": "110002"},
{"text": "text2", "start_hhmmss": "110002", "end_hhmmss": "110003"},
]
requests, ids = build_translation_requests(entries, subject="A1_JAKE", date="DAY1")
assert len(requests) == 2
assert ids[0] == "0-A1_JAKE-DAY1-110001-110002"
assert "text1" in str(requests[0]["messages"])
def test_parse_translation_results():
from worldmm.pipeline.translate import parse_translation_results
batch_results = {
"0-A1_JAKE-DAY1-110001-110002": "I look at everyone",
"1-A1_JAKE-DAY1-110002-110003": "I continue looking",
}
entries = [
{"text": "cn1", "start_hhmmss": "110001", "end_hhmmss": "110002"},
{"text": "cn2", "start_hhmmss": "110002", "end_hhmmss": "110003"},
]
translated = parse_translation_results(batch_results, entries, "A1_JAKE", "DAY1")
assert len(translated) == 2
assert translated[0]["translated_text"] == "I look at everyone"
def test_save_load_translations(tmp_path):
from worldmm.pipeline.translate import save_translations, load_translations
data = [{"start_hhmmss": "110001", "translated_text": "hello"}]
save_translations(data, tmp_path / "DAY1.json")
assert load_translations(tmp_path / "DAY1.json") == data
- [ ] Step 2: Run test, verify fail
Run: cd main/server && python -m pytest worldmm/tests/test_translate.py -v
- [ ] Step 3: Write implementation
# main/server/worldmm/pipeline/translate.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from worldmm.llm.client import build_request
def build_translation_requests(
entries: list[dict[str, Any]], subject: str, date: str,
) -> tuple[list[dict[str, Any]], list[str]]:
requests, custom_ids = [], []
for i, entry in enumerate(entries):
custom_id = f"{i}-{subject}-{date}-{entry['start_hhmmss']}-{entry['end_hhmmss']}"
req = build_request(
model="gpt-5-mini",
messages=[
{"role": "system", "content": "Translate the following Chinese text to English. Return only the translation."},
{"role": "user", "content": entry["text"]},
],
max_completion_tokens=256, reasoning_effort="low",
)
requests.append(req)
custom_ids.append(custom_id)
return requests, custom_ids
def parse_translation_results(
batch_results: dict[str, str | None], entries: list[dict[str, Any]], subject: str, date: str,
) -> list[dict[str, Any]]:
translated = []
for i, entry in enumerate(entries):
custom_id = f"{i}-{subject}-{date}-{entry['start_hhmmss']}-{entry['end_hhmmss']}"
translated.append({**entry, "translated_text": batch_results.get(custom_id, "") or ""})
return translated
def save_translations(data: list[dict[str, Any]], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
def load_translations(path: Path) -> list[dict[str, Any]]:
return json.loads(path.read_text())
-
[ ] Step 4: Run tests, verify pass
-
[ ] Step 5: Commit
git add main/server/worldmm/pipeline/translate.py main/server/worldmm/tests/test_translate.py
git commit -m "feat(worldmm): add caption translation pipeline for batch API"
Task 3: Sync + Fine Caption + Multiscale + Triple + Semantic Runners
Files: - Create: main/server/worldmm/pipeline/sync.py - Create: main/server/worldmm/pipeline/fine_caption.py - Create: main/server/worldmm/pipeline/multiscale_runner.py - Create: main/server/worldmm/pipeline/triple_runner.py - Create: main/server/worldmm/pipeline/semantic_runner.py - Create: main/server/worldmm/pipeline/visual_runner.py
These are orchestration modules that wire together the existing core components (from Tasks 1-16 of the core plan). They don't need separate TDD since they delegate to already-tested functions. Write them, verify imports work, commit.
- [ ] Step 1: Write all runner files
Write each file following the implementations shown in the spec. Each file should import from the existing worldmm.memory.* and worldmm.llm.* modules and add pipeline-specific orchestration.
Key files: - sync.py — merges translated captions + transcripts by timestamp - fine_caption.py — groups entries into 30s windows, builds batch requests for first-person rewriting - multiscale_runner.py — builds batch merge requests for 3min/10min/1hr scales - triple_runner.py — builds batch NER + triple extraction requests - semantic_runner.py — builds batch semantic extraction requests - visual_runner.py — samples frames via ffmpeg, encodes via GPU worker client
- [ ] Step 2: Verify imports
Run: cd main/server && python -c "from worldmm.pipeline import sync, fine_caption, multiscale_runner, triple_runner, semantic_runner, visual_runner; print('All runners OK')"
- [ ] Step 3: Commit
git add main/server/worldmm/pipeline/
git commit -m "feat(worldmm): add pipeline runner modules for sync, captions, triples, semantic, visual"
Task 4: Evaluation Runner (TDD)
Files: - Create: main/server/worldmm/pipeline/evaluate_pipeline.py - Test: main/server/worldmm/tests/test_evaluate_pipeline.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_evaluate_pipeline.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[2]
for p in (_server_root,):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_extract_answer_exact_match():
from worldmm.pipeline.evaluate_pipeline import extract_answer
choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
assert extract_answer("Alice", choices) == "B"
def test_extract_answer_letter_extraction():
from worldmm.pipeline.evaluate_pipeline import extract_answer
choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
assert extract_answer("The answer is (B)", choices) == "B"
assert extract_answer("B.", choices) == "B"
assert extract_answer("**B**", choices) == "B"
assert extract_answer("Answer: C", choices) == "C"
def test_extract_answer_full_pattern():
from worldmm.pipeline.evaluate_pipeline import extract_answer
choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
assert extract_answer("B. Alice", choices) == "B"
assert extract_answer("(B) Alice", choices) == "B"
def test_extract_answer_no_match():
from worldmm.pipeline.evaluate_pipeline import extract_answer
choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
assert extract_answer("I don't know the answer", choices) is None
def test_extract_answer_case_insensitive():
from worldmm.pipeline.evaluate_pipeline import extract_answer
choices = {"A": "Tasha", "B": "Alice", "C": "Shure", "D": "Lucia"}
assert extract_answer("alice", choices) == "B"
def test_compute_accuracy():
from worldmm.pipeline.evaluate_pipeline import compute_accuracy
results = [{"correct": True}, {"correct": False}, {"correct": True}]
assert abs(compute_accuracy(results) - 66.67) < 0.1
def test_compute_accuracy_by_type():
from worldmm.pipeline.evaluate_pipeline import compute_accuracy_by_type
results = [
{"type": "EntityLog", "correct": True},
{"type": "EntityLog", "correct": False},
{"type": "HabitInsight", "correct": True},
]
by_type = compute_accuracy_by_type(results)
assert abs(by_type["EntityLog"] - 50.0) < 0.1
assert abs(by_type["HabitInsight"] - 100.0) < 0.1
-
[ ] Step 2: Run test, verify fail
-
[ ] Step 3: Write implementation
# main/server/worldmm/pipeline/evaluate_pipeline.py
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
def extract_answer(response: str, choices: dict[str, str]) -> str | None:
response_norm = response.strip().lower()
# Strategy 1: Exact match
for letter, value in choices.items():
if response_norm == value.strip().lower():
return letter
# Strategy 2: Letter extraction
for pattern in [r"\(([A-D])\)", r"\*\*([A-D])\*\*", r"[Aa]nswer:\s*([A-D])", r"^([A-D])\.", r"^([A-D])\s*$", r"^([A-D]):"]:
match = re.search(pattern, response.strip())
if match:
return match.group(1).upper()
# Strategy 3: Full pattern
for letter, value in choices.items():
for pat in [rf"{letter}\.\s*{re.escape(value)}", rf"\({letter}\)\s*{re.escape(value)}"]:
if re.search(pat, response, re.IGNORECASE):
return letter
return None
def compute_accuracy(results: list[dict[str, Any]]) -> float:
if not results:
return 0.0
return round(100 * sum(1 for r in results if r.get("correct")) / len(results), 2)
def compute_accuracy_by_type(results: list[dict[str, Any]]) -> dict[str, float]:
by_type: dict[str, list[bool]] = {}
for r in results:
by_type.setdefault(r.get("type", "unknown"), []).append(r.get("correct", False))
return {t: round(100 * sum(v) / len(v), 2) for t, v in by_type.items()}
def load_qa_questions(path: Path) -> list[dict[str, Any]]:
return json.loads(path.read_text())
def save_results(results: list[dict[str, Any]], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(results, indent=2))
-
[ ] Step 4: Run tests, verify pass
-
[ ] Step 5: Commit
git add main/server/worldmm/pipeline/evaluate_pipeline.py main/server/worldmm/tests/test_evaluate_pipeline.py
git commit -m "feat(worldmm): add evaluation runner with three-strategy answer extraction"
Task 5: Main Orchestrator
Files: - Create: main/server/worldmm/pipeline/run.py
- [ ] Step 1: Write run.py
The main orchestrator that runs all 8 pipeline steps in sequence. Takes --data-dir, --subject, --cache-dir, --gpu-worker-url, and --openai-api-key arguments.
Each step checks for cached output before running. Uses submit_batch_and_wait() to handle OpenAI Batch API lifecycle (upload JSONL, create batch, poll until complete, download results).
Key flow: 1. Translate DenseCaptions (batch API, per-day caching) 2. Generate sync data + fine captions (batch API) 3. Multi-scale merging (batch API, per-scale caching) 4. Episodic triple extraction: NER batch then triple batch 5. Semantic extraction (batch API) 6. Visual encoding (GPU worker, incremental caching) 7. Load QA questions, build context from indexed memories, query GPT-5-mini, extract answers, compute accuracy
- [ ] Step 2: Verify imports
Run: cd main/server && python -c "from worldmm.pipeline.run import main; print('OK')"
- [ ] Step 3: Verify help
Run: cd main/server && python -m worldmm.pipeline.run --help
- [ ] Step 4: Commit
git add main/server/worldmm/pipeline/run.py
git commit -m "feat(worldmm): add main pipeline orchestrator for EgoLife validation"
Task 6: GPU Worker Server
Files: - Create: main/server/worldmm/gpu_worker/__init__.py - Create: main/server/worldmm/gpu_worker/server.py - Create: main/server/worldmm/gpu_worker/setup.sh
- [ ] Step 1: Write server.py
FastAPI server with three endpoints: - GET /health — returns model status - POST /encode-video — accepts {"frames": [base64...]}, returns {"embedding": [float...]} - POST /encode-text — accepts {"text": "query"}, returns {"embedding": [float...]}
Uses Qwen2-VL-2B-Instruct as base + TIGER-Lab/VLM2Vec-LoRA adapters. Model loaded once at startup into GPU memory.
- [ ] Step 2: Write setup.sh
Instance setup script: installs Python deps, downloads model weights to local disk.
- [ ] Step 3: Commit
git add main/server/worldmm/gpu_worker/
git commit -m "feat(worldmm): add VLM2Vec GPU worker server and setup script"
Task 7: Full Integration Verification
- [ ] Step 1: Run full test suite
Run: cd main/server && python -m pytest worldmm/tests/ -v --tb=short Expected: All tests PASS
- [ ] Step 2: Verify pipeline help
Run: cd main/server && python -m worldmm.pipeline.run --help
- [ ] Step 3: Final commit if fixes needed