Skip to content

WorldMM Live Memory System — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Implement the WorldMM paper's multi-granularity memory architecture (episodic, semantic, visual + adaptive reasoning agent) with TDD, using GPT-5 mini via Batch API for testing.

Architecture: Lambda + GPU Worker split. All GPT-5 mini calls go through a unified LLM client that dispatches to Batch API (test) or Direct API (prod). VLM2Vec runs on a separate GPU worker. PostgreSQL + pgvector stores graphs, triples, and embeddings. igraph runs PPR in-memory at query time.

Tech Stack: Python 3.12, SQLAlchemy + pgvector, igraph, OpenAI API (Batch + Direct), FastAPI (GPU worker), pytest

Spec: docs/superpowers/specs/2026-03-22-worldmm-live-memory-design.md


File Structure

main/server/worldmm/
├── __init__.py
├── ingest/
│   ├── __init__.py
│   ├── captioner.py           # build_caption_prompt(), parse_caption_response()
│   └── transcriber.py         # transcribe_audio() — Whisper API client
├── memory/
│   ├── __init__.py
│   ├── episodic/
│   │   ├── __init__.py
│   │   ├── graph.py           # EpisodicGraph: build igraph from triples, run PPR
│   │   ├── openie.py          # build_ner_prompt(), parse_ner(), build_triple_prompt(), parse_triples()
│   │   └── multiscale.py      # build_merge_prompt(), parse_merge_response()
│   ├── semantic/
│   │   ├── __init__.py
│   │   ├── graph.py           # SemanticGraph: single evolving entity graph, PPR with edge scoring
│   │   ├── extraction.py      # build_semantic_triple_prompt(), parse_semantic_triples()
│   │   └── consolidation.py   # find_candidates(), build_consolidation_prompt(), apply_consolidation()
│   └── visual/
│       ├── __init__.py
│       ├── encoder.py         # VLM2VecClient: HTTP calls to GPU worker
│       └── index.py           # query_visual_embeddings() — pgvector cosine similarity
├── retrieval/
│   ├── __init__.py
│   ├── agent.py               # ReasoningAgent: SEARCH/ANSWER loop, up to 5 rounds
│   ├── episodic_retriever.py  # retrieve_episodic(): PPR across scales + cross-scale rerank
│   ├── semantic_retriever.py  # retrieve_semantic(): PPR on entity graph
│   └── visual_retriever.py    # retrieve_visual(): VLM2Vec text query → cosine sim
├── llm/
│   ├── __init__.py
│   ├── client.py              # LLMClient: build_request(), call(), supports batch/direct
│   └── batch.py               # BatchClient: submit(), poll(), collect_results()
├── llm/prompts/
│   ├── caption.txt
│   ├── ner.txt
│   ├── episodic_triples.txt
│   ├── semantic_triples.txt
│   ├── merge_captions.txt
│   ├── consolidation.txt
│   ├── cross_scale_rerank.txt
│   ├── reasoning_agent.txt
│   ├── response.txt
│   └── entity_confirm.txt
└── tests/
    ├── __init__.py
    ├── conftest.py            # shared fixtures, mock LLM helper
    ├── fixtures/              # EgoLife-derived canned data
    │   ├── sample_caption.txt
    │   ├── sample_ner_response.json
    │   ├── sample_triples_response.json
    │   └── sample_semantic_triples_response.json
    ├── test_llm_client.py
    ├── test_openie.py
    ├── test_entity_resolution.py
    ├── test_captioner.py
    ├── test_multiscale.py
    ├── test_episodic_graph.py
    ├── test_semantic_graph.py
    ├── test_consolidation.py
    ├── test_visual_encoder.py
    ├── test_episodic_retriever.py
    ├── test_semantic_retriever.py
    ├── test_visual_retriever.py
    └── test_retrieval_agent.py

ORM models (shared layer):
├── main/server/layers/shared/python/shared/orm/worldmm_orm.py

Entity resolution:
├── main/server/worldmm/memory/episodic/entity_resolution.py  # resolve_entities(): embed → HNSW → LLM confirm → create/alias

Deferred Components (Infrastructure, not algorithm)

These are in the spec but deferred until the algorithm is validated via TDD:

  • ingest/segment_buffer.py — S3 + DynamoDB frame buffering (Lambda infrastructure)
  • ingest/app.py — Lambda handler (deployment infrastructure)
  • ingest/transcriber.py — Whisper API client (integration, not algorithm)
  • gpu_worker/vlm2vec_server.py + Dockerfile — VLM2Vec FastAPI server (mocked in tests)
  • Redis graph caching — performance optimization

These will be added in a follow-up plan after the core algorithm passes TDD validation.



Task 1: ORM Models

Files: - Create: main/server/layers/shared/python/shared/orm/worldmm_orm.py - Create: main/server/worldmm/__init__.py - Create: main/server/worldmm/tests/__init__.py - Create: main/server/worldmm/tests/conftest.py - Test: main/server/worldmm/tests/test_orm.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_orm.py
from __future__ import annotations

import sys
from pathlib import Path
from uuid import uuid4

import pytest

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def _setup_db():
    import os
    os.environ.setdefault("DATABASE_URL", "sqlite:///:memory:")
    import shared.orm.orm as orm
    orm.getEngine.cache_clear()
    orm.getSessionFactory.cache_clear()
    # Must import worldmm_orm so models are registered on Base
    import shared.orm.worldmm_orm  # noqa: F401
    orm.initDb()


def test_create_segment():
    _setup_db()
    from shared.orm.worldmm_orm import WorldMMSegment, create_segment
    segment_id = create_segment(
        user_id="user-1",
        start_time="2026-03-22T10:00:00Z",
        end_time="2026-03-22T10:00:30Z",
        duration_seconds=30,
        caption="I stand at the dining table",
        s3_video_key="worldmm/user-1/clips/001.mp4",
        transcript="I'm standing at the dining table",
    )
    assert segment_id is not None
    assert len(segment_id) == 36  # UUID format


def test_create_entity_canonical():
    _setup_db()
    from shared.orm.worldmm_orm import create_entity
    entity_id = create_entity(
        user_id="user-1",
        surface_form="Katrina",
        canonical_name="Katrina",
        canonical_entity_id=None,
    )
    assert entity_id is not None


def test_create_entity_alias():
    _setup_db()
    from shared.orm.worldmm_orm import create_entity
    canonical_id = create_entity(
        user_id="user-1",
        surface_form="Katrina",
        canonical_name="Katrina",
        canonical_entity_id=None,
    )
    alias_id = create_entity(
        user_id="user-1",
        surface_form="Kate",
        canonical_name=None,
        canonical_entity_id=canonical_id,
    )
    assert alias_id != canonical_id


def test_create_triple_with_entity_object():
    _setup_db()
    from shared.orm.worldmm_orm import create_entity, create_segment, create_triple
    seg_id = create_segment(
        user_id="user-1",
        start_time="2026-03-22T10:00:00Z",
        end_time="2026-03-22T10:00:30Z",
        duration_seconds=30,
        caption="test",
    )
    subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
    obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
    triple_id = create_triple(
        segment_id=seg_id,
        user_id="user-1",
        memory_type="episodic",
        subject_entity_id=subj_id,
        predicate="talks to",
        object_entity_id=obj_id,
    )
    assert triple_id is not None


def test_create_triple_with_literal_object():
    _setup_db()
    from shared.orm.worldmm_orm import create_entity, create_segment, create_triple
    seg_id = create_segment(
        user_id="user-1",
        start_time="2026-03-22T10:00:00Z",
        end_time="2026-03-22T10:00:30Z",
        duration_seconds=30,
        caption="test",
    )
    subj_id = create_entity(user_id="user-1", surface_form="meeting", canonical_name="meeting")
    triple_id = create_triple(
        segment_id=seg_id,
        user_id="user-1",
        memory_type="episodic",
        subject_entity_id=subj_id,
        predicate="started_at",
        object_literal="2:30 PM",
    )
    assert triple_id is not None


def test_create_triple_rejects_both_object_types():
    _setup_db()
    from shared.orm.worldmm_orm import create_entity, create_segment, create_triple
    seg_id = create_segment(
        user_id="user-1",
        start_time="2026-03-22T10:00:00Z",
        end_time="2026-03-22T10:00:30Z",
        duration_seconds=30,
        caption="test",
    )
    subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
    obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
    with pytest.raises(ValueError, match="exactly one"):
        create_triple(
            segment_id=seg_id,
            user_id="user-1",
            memory_type="episodic",
            subject_entity_id=subj_id,
            predicate="talks to",
            object_entity_id=obj_id,
            object_literal="some text",
        )


def test_get_triples_for_user():
    _setup_db()
    from shared.orm.worldmm_orm import (
        create_entity, create_segment, create_triple, get_triples_for_user,
    )
    seg_id = create_segment(
        user_id="user-1",
        start_time="2026-03-22T10:00:00Z",
        end_time="2026-03-22T10:00:30Z",
        duration_seconds=30,
        caption="test",
    )
    subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
    obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
    create_triple(
        segment_id=seg_id, user_id="user-1", memory_type="episodic",
        subject_entity_id=subj_id, predicate="talks to", object_entity_id=obj_id,
    )
    create_triple(
        segment_id=seg_id, user_id="user-1", memory_type="semantic",
        subject_entity_id=subj_id, predicate="is friends with", object_entity_id=obj_id,
    )
    episodic = get_triples_for_user("user-1", "episodic")
    semantic = get_triples_for_user("user-1", "semantic")
    assert len(episodic) == 1
    assert len(semantic) == 1
    assert episodic[0]["predicate"] == "talks to"


def test_invalidate_triple():
    _setup_db()
    from shared.orm.worldmm_orm import (
        create_entity, create_segment, create_triple,
        invalidate_triple, get_triples_for_user,
    )
    seg_id = create_segment(
        user_id="user-1",
        start_time="2026-03-22T10:00:00Z",
        end_time="2026-03-22T10:00:30Z",
        duration_seconds=30,
        caption="test",
    )
    subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
    obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
    triple_id = create_triple(
        segment_id=seg_id, user_id="user-1", memory_type="semantic",
        subject_entity_id=subj_id, predicate="is friends with", object_entity_id=obj_id,
    )
    invalidate_triple(triple_id)
    active = get_triples_for_user("user-1", "semantic")
    assert len(active) == 0
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_orm.py -v Expected: FAIL — ModuleNotFoundError: No module named 'shared.orm.worldmm_orm'

  • [ ] Step 3: Create package init files
# main/server/worldmm/__init__.py
# (empty)

# main/server/worldmm/tests/__init__.py
# (empty)

# main/server/worldmm/tests/conftest.py
# (empty for now — fixtures added in later tasks)
  • [ ] Step 4: Write ORM models
# main/server/layers/shared/python/shared/orm/worldmm_orm.py
from __future__ import annotations

import uuid
from datetime import datetime, timezone
from typing import Any

from sqlalchemy import Column, Integer, String, Text, DateTime, Enum as SAEnum, ForeignKey, CheckConstraint
from sqlalchemy.orm import relationship

from .orm import Base, getSession


class WorldMMSegment(Base):
    __tablename__ = "worldmm_segments"

    id: str = Column(String, primary_key=True)
    user_id: str = Column(String, nullable=False)
    start_time: str = Column(String, nullable=False)
    end_time: str = Column(String, nullable=False)
    duration_seconds: int = Column(Integer, nullable=False)
    caption: str = Column(Text, nullable=True)
    s3_video_key: str = Column(String, nullable=True)
    s3_frames_key: str = Column(String, nullable=True)
    transcript: str = Column(Text, nullable=True)
    parent_segment_id: str = Column(String, ForeignKey("worldmm_segments.id"), nullable=True)


class WorldMMEntity(Base):
    __tablename__ = "worldmm_entities"

    id: str = Column(String, primary_key=True)
    user_id: str = Column(String, nullable=False)
    canonical_entity_id: str = Column(String, ForeignKey("worldmm_entities.id"), nullable=True)
    surface_form: str = Column(String, nullable=False)
    canonical_name: str = Column(String, nullable=True)
    # Note: embedding column (VECTOR(3072)) for OpenAI text-embedding-3-large
    # Omitted from SQLite tests; added via Alembic migration for PostgreSQL
    # Entity resolution tests mock the embedding lookup directly


class WorldMMTriple(Base):
    __tablename__ = "worldmm_triples"
    __table_args__ = (
        CheckConstraint(
            "(object_entity_id IS NOT NULL AND object_literal IS NULL) OR "
            "(object_entity_id IS NULL AND object_literal IS NOT NULL)",
            name="ck_triple_object_xor",
        ),
    )

    id: str = Column(String, primary_key=True)
    segment_id: str = Column(String, ForeignKey("worldmm_segments.id"), nullable=False)
    user_id: str = Column(String, nullable=False)
    memory_type: str = Column(String, nullable=False)  # "episodic" or "semantic"
    subject_entity_id: str = Column(String, ForeignKey("worldmm_entities.id"), nullable=False)
    predicate: str = Column(String, nullable=False)
    object_entity_id: str = Column(String, ForeignKey("worldmm_entities.id"), nullable=True)
    object_literal: str = Column(String, nullable=True)
    created_at: str = Column(String, nullable=False)
    invalidated_at: str = Column(String, nullable=True)


class WorldMMVisualEmbedding(Base):
    __tablename__ = "worldmm_visual_embeddings"

    id: str = Column(String, primary_key=True)
    segment_id: str = Column(String, ForeignKey("worldmm_segments.id"), nullable=False)
    user_id: str = Column(String, nullable=False)
    timestamp: str = Column(String, nullable=False)
    # Note: embedding column (Vector) added only when using PostgreSQL + pgvector
    # SQLite tests skip vector operations


def create_segment(
    user_id: str,
    start_time: str,
    end_time: str,
    duration_seconds: int,
    caption: str | None = None,
    s3_video_key: str | None = None,
    s3_frames_key: str | None = None,
    transcript: str | None = None,
    parent_segment_id: str | None = None,
) -> str:
    segment_id = str(uuid.uuid4())
    session = getSession()
    try:
        session.add(WorldMMSegment(
            id=segment_id,
            user_id=user_id,
            start_time=start_time,
            end_time=end_time,
            duration_seconds=duration_seconds,
            caption=caption,
            s3_video_key=s3_video_key,
            s3_frames_key=s3_frames_key,
            transcript=transcript,
            parent_segment_id=parent_segment_id,
        ))
        session.commit()
        return segment_id
    finally:
        session.close()


def create_entity(
    user_id: str,
    surface_form: str,
    canonical_name: str | None = None,
    canonical_entity_id: str | None = None,
) -> str:
    entity_id = str(uuid.uuid4())
    session = getSession()
    try:
        session.add(WorldMMEntity(
            id=entity_id,
            user_id=user_id,
            canonical_entity_id=canonical_entity_id,
            surface_form=surface_form,
            canonical_name=canonical_name,
        ))
        session.commit()
        return entity_id
    finally:
        session.close()


def create_triple(
    segment_id: str,
    user_id: str,
    memory_type: str,
    subject_entity_id: str,
    predicate: str,
    object_entity_id: str | None = None,
    object_literal: str | None = None,
) -> str:
    if (object_entity_id is None) == (object_literal is None):
        raise ValueError("Triple must have exactly one of object_entity_id or object_literal")
    triple_id = str(uuid.uuid4())
    now = datetime.now(timezone.utc).isoformat()
    session = getSession()
    try:
        session.add(WorldMMTriple(
            id=triple_id,
            segment_id=segment_id,
            user_id=user_id,
            memory_type=memory_type,
            subject_entity_id=subject_entity_id,
            predicate=predicate,
            object_entity_id=object_entity_id,
            object_literal=object_literal,
            created_at=now,
            invalidated_at=None,
        ))
        session.commit()
        return triple_id
    finally:
        session.close()


def get_triples_for_user(
    user_id: str,
    memory_type: str,
) -> list[dict[str, Any]]:
    session = getSession()
    try:
        rows = (
            session.query(WorldMMTriple)
            .filter(
                WorldMMTriple.user_id == user_id,
                WorldMMTriple.memory_type == memory_type,
                WorldMMTriple.invalidated_at.is_(None),
            )
            .all()
        )
        return [
            {
                "id": r.id,
                "segment_id": r.segment_id,
                "subject_entity_id": r.subject_entity_id,
                "predicate": r.predicate,
                "object_entity_id": r.object_entity_id,
                "object_literal": r.object_literal,
            }
            for r in rows
        ]
    finally:
        session.close()


def invalidate_triple(triple_id: str) -> None:
    now = datetime.now(timezone.utc).isoformat()
    session = getSession()
    try:
        record = session.get(WorldMMTriple, triple_id)
        if record is None:
            raise ValueError("Triple not found")
        record.invalidated_at = now
        session.commit()
    finally:
        session.close()
  • [ ] Step 5: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_orm.py -v Expected: All 8 tests PASS

  • [ ] Step 6: Commit
git add main/server/layers/shared/python/shared/orm/worldmm_orm.py \
        main/server/worldmm/__init__.py \
        main/server/worldmm/tests/__init__.py \
        main/server/worldmm/tests/conftest.py \
        main/server/worldmm/tests/test_orm.py
git commit -m "feat(worldmm): add ORM models for segments, entities, triples, visual embeddings"

Task 2: LLM Client (Batch/Direct Dispatch)

Files: - Create: main/server/worldmm/llm/__init__.py - Create: main/server/worldmm/llm/client.py - Test: main/server/worldmm/tests/test_llm_client.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_llm_client.py
from __future__ import annotations

import sys
from pathlib import Path
from unittest.mock import MagicMock, patch

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_build_request_includes_reasoning_effort():
    from worldmm.llm.client import build_request
    req = build_request(
        model="gpt-5-mini",
        messages=[{"role": "user", "content": "hello"}],
        max_completion_tokens=256,
        reasoning_effort="low",
    )
    assert req["model"] == "gpt-5-mini"
    assert req["max_completion_tokens"] == 256
    assert req["reasoning_effort"] == "low"
    assert req["messages"] == [{"role": "user", "content": "hello"}]


def test_build_request_without_reasoning_effort():
    from worldmm.llm.client import build_request
    req = build_request(
        model="gpt-5",
        messages=[{"role": "user", "content": "hello"}],
        max_completion_tokens=1024,
    )
    assert "reasoning_effort" not in req


def test_call_direct_mode():
    from worldmm.llm.client import LLMClient
    mock_openai = MagicMock()
    mock_openai.chat.completions.create.return_value = MagicMock(
        choices=[MagicMock(message=MagicMock(content="test response"))]
    )
    client = LLMClient(mode="direct", openai_client=mock_openai)
    result = client.call(build_request_dict={
        "model": "gpt-5-mini",
        "messages": [{"role": "user", "content": "hi"}],
        "max_completion_tokens": 256,
        "reasoning_effort": "low",
    })
    assert result == "test response"
    mock_openai.chat.completions.create.assert_called_once()


def test_call_direct_passes_all_params():
    from worldmm.llm.client import LLMClient
    mock_openai = MagicMock()
    mock_openai.chat.completions.create.return_value = MagicMock(
        choices=[MagicMock(message=MagicMock(content="ok"))]
    )
    client = LLMClient(mode="direct", openai_client=mock_openai)
    request = {
        "model": "gpt-5-mini",
        "messages": [{"role": "user", "content": "hi"}],
        "max_completion_tokens": 256,
        "reasoning_effort": "low",
    }
    client.call(build_request_dict=request)
    call_kwargs = mock_openai.chat.completions.create.call_args[1]
    assert call_kwargs["model"] == "gpt-5-mini"
    assert call_kwargs["max_completion_tokens"] == 256
    assert call_kwargs["reasoning_effort"] == "low"
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_llm_client.py -v Expected: FAIL — ModuleNotFoundError: No module named 'worldmm.llm'

  • [ ] Step 3: Write the implementation
# main/server/worldmm/llm/__init__.py
# (empty)

# main/server/worldmm/llm/client.py
from __future__ import annotations

from typing import Any


def build_request(
    model: str,
    messages: list[dict[str, Any]],
    max_completion_tokens: int,
    reasoning_effort: str | None = None,
) -> dict[str, Any]:
    request: dict[str, Any] = {
        "model": model,
        "messages": messages,
        "max_completion_tokens": max_completion_tokens,
    }
    if reasoning_effort is not None:
        request["reasoning_effort"] = reasoning_effort
    return request


class LLMClient:
    def __init__(self, mode: str, openai_client: Any) -> None:
        if mode not in ("direct", "batch"):
            raise ValueError(f"Invalid mode: {mode}")
        self._mode = mode
        self._openai = openai_client

    def call(self, build_request_dict: dict[str, Any]) -> str:
        if self._mode == "direct":
            response = self._openai.chat.completions.create(**build_request_dict)
            return response.choices[0].message.content
        raise NotImplementedError("Batch mode dispatch handled by BatchClient")
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_llm_client.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/llm/
git commit -m "feat(worldmm): add LLM client with batch/direct dispatch"

Task 3: NER + Triple Extraction Parsing (OpenIE)

Files: - Create: main/server/worldmm/memory/__init__.py - Create: main/server/worldmm/memory/episodic/__init__.py - Create: main/server/worldmm/memory/episodic/openie.py - Create: main/server/worldmm/tests/fixtures/sample_ner_response.json - Create: main/server/worldmm/tests/fixtures/sample_triples_response.json - Test: main/server/worldmm/tests/test_openie.py

  • [ ] Step 1: Create test fixtures
// main/server/worldmm/tests/fixtures/sample_ner_response.json
{
  "entities": ["I", "Katrina", "dining table", "tomorrow's schedule"]
}

// main/server/worldmm/tests/fixtures/sample_triples_response.json
{
  "triples": [
    {"subject": "I", "predicate": "stand at", "object": "dining table"},
    {"subject": "Katrina", "predicate": "asks about", "object": "tomorrow's schedule"}
  ]
}
  • [ ] Step 2: Write the failing test
# main/server/worldmm/tests/test_openie.py
from __future__ import annotations

import json
import sys
from pathlib import Path

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))

FIXTURES = Path(__file__).parent / "fixtures"


def test_build_ner_prompt_includes_caption():
    from worldmm.memory.episodic.openie import build_ner_prompt
    messages = build_ner_prompt("I stand at the dining table while Katrina asks about tomorrow.")
    # Must be a list of message dicts
    assert isinstance(messages, list)
    assert any("dining table" in str(m.get("content", "")) for m in messages)


def test_parse_ner_response():
    from worldmm.memory.episodic.openie import parse_ner_response
    raw = (FIXTURES / "sample_ner_response.json").read_text()
    entities = parse_ner_response(raw)
    assert entities == ["I", "Katrina", "dining table", "tomorrow's schedule"]


def test_parse_ner_response_handles_plain_json():
    from worldmm.memory.episodic.openie import parse_ner_response
    entities = parse_ner_response('{"entities": ["Alice", "Bob"]}')
    assert entities == ["Alice", "Bob"]


def test_build_triple_prompt_includes_caption_and_entities():
    from worldmm.memory.episodic.openie import build_triple_prompt
    messages = build_triple_prompt(
        caption="I stand at the dining table",
        entities=["I", "dining table"],
    )
    assert isinstance(messages, list)
    content = str(messages)
    assert "dining table" in content
    assert "I" in content


def test_parse_triples_response():
    from worldmm.memory.episodic.openie import parse_triples_response
    raw = (FIXTURES / "sample_triples_response.json").read_text()
    triples = parse_triples_response(raw)
    assert len(triples) == 2
    assert triples[0] == {"subject": "I", "predicate": "stand at", "object": "dining table"}
    assert triples[1] == {"subject": "Katrina", "predicate": "asks about", "object": "tomorrow's schedule"}


def test_parse_triples_response_filters_invalid():
    from worldmm.memory.episodic.openie import parse_triples_response
    raw = json.dumps({"triples": [
        {"subject": "I", "predicate": "stand at", "object": "table"},
        {"subject": "I", "predicate": "does"},  # missing object — invalid
    ]})
    triples = parse_triples_response(raw)
    assert len(triples) == 1
  • [ ] Step 3: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_openie.py -v Expected: FAIL — ModuleNotFoundError: No module named 'worldmm.memory'

  • [ ] Step 4: Write the implementation
# main/server/worldmm/memory/__init__.py
# (empty)

# main/server/worldmm/memory/episodic/__init__.py
# (empty)

# main/server/worldmm/memory/episodic/openie.py
from __future__ import annotations

import json
from pathlib import Path
from typing import Any

_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    # Fallback inline prompts for early development
    if name == "ner.txt":
        return "Extract named entities from the given paragraph. Return JSON: {\"entities\": [...]}"
    if name == "episodic_triples.txt":
        return (
            "Given the caption and extracted entities, extract RDF-style triples. "
            "Each triple must contain at least one named entity. "
            "Return JSON: {\"triples\": [{\"subject\": ..., \"predicate\": ..., \"object\": ...}, ...]}"
        )
    return ""


def build_ner_prompt(caption: str) -> list[dict[str, str]]:
    system = _load_prompt("ner.txt")
    return [
        {"role": "system", "content": system},
        {"role": "user", "content": caption},
    ]


def parse_ner_response(raw: str) -> list[str]:
    data = json.loads(raw)
    return data.get("entities", [])


def build_triple_prompt(caption: str, entities: list[str]) -> list[dict[str, str]]:
    system = _load_prompt("episodic_triples.txt")
    user_content = f"Caption: {caption}\nEntities: {json.dumps(entities)}"
    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user_content},
    ]


def parse_triples_response(raw: str) -> list[dict[str, str]]:
    data = json.loads(raw)
    triples = data.get("triples", [])
    return [
        t for t in triples
        if all(k in t for k in ("subject", "predicate", "object"))
    ]
  • [ ] Step 5: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_openie.py -v Expected: All 6 tests PASS

  • [ ] Step 6: Commit
git add main/server/worldmm/memory/ main/server/worldmm/tests/test_openie.py \
        main/server/worldmm/tests/fixtures/
git commit -m "feat(worldmm): add NER + triple extraction prompt building and parsing"

Task 4: Captioner (Prompt Building + Response Parsing)

Files: - Create: main/server/worldmm/ingest/__init__.py - Create: main/server/worldmm/ingest/captioner.py - Test: main/server/worldmm/tests/test_captioner.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_captioner.py
from __future__ import annotations

import base64
import sys
from pathlib import Path
from unittest.mock import MagicMock

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_build_caption_prompt_includes_frames_and_transcript():
    from worldmm.ingest.captioner import build_caption_prompt
    # Create 8 fake base64 frames
    frames_b64 = [base64.b64encode(f"frame{i}".encode()).decode() for i in range(8)]
    transcript = "I'm standing at the dining table"
    messages = build_caption_prompt(frames_b64, transcript)

    assert isinstance(messages, list)
    # System message + user message
    assert len(messages) == 2
    user_msg = messages[1]
    assert user_msg["role"] == "user"
    # Content should be a list (multimodal: image_url parts + text)
    assert isinstance(user_msg["content"], list)
    # 8 image parts + 1 text part
    assert len(user_msg["content"]) == 9
    # Check image parts are image_url type
    image_parts = [p for p in user_msg["content"] if p.get("type") == "image_url"]
    assert len(image_parts) == 8
    # Check text part contains transcript
    text_parts = [p for p in user_msg["content"] if p.get("type") == "text"]
    assert len(text_parts) == 1
    assert "dining table" in text_parts[0]["text"]


def test_build_caption_prompt_rejects_wrong_frame_count():
    from worldmm.ingest.captioner import build_caption_prompt
    import pytest
    with pytest.raises(ValueError, match="exactly 8"):
        build_caption_prompt(["frame1", "frame2"], "transcript")


def test_parse_caption_response():
    from worldmm.ingest.captioner import parse_caption_response
    result = parse_caption_response("The person stands at a dining table talking to Katrina.")
    assert result == "The person stands at a dining table talking to Katrina."


def test_parse_caption_response_strips_whitespace():
    from worldmm.ingest.captioner import parse_caption_response
    result = parse_caption_response("  caption text  \n")
    assert result == "caption text"
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_captioner.py -v Expected: FAIL — ModuleNotFoundError: No module named 'worldmm.ingest'

  • [ ] Step 3: Write the implementation
# main/server/worldmm/ingest/__init__.py
# (empty)

# main/server/worldmm/ingest/captioner.py
from __future__ import annotations

from pathlib import Path
from typing import Any

_PROMPTS_DIR = Path(__file__).resolve().parents[1] / "llm" / "prompts"


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    if name == "caption.txt":
        return (
            "You are a video captioning assistant. Given 8 frames from a 30-second video clip "
            "and an audio transcript, generate a detailed caption describing what is happening. "
            "Include people, objects, actions, and locations."
        )
    return ""


def build_caption_prompt(
    frames_b64: list[str],
    transcript: str,
) -> list[dict[str, Any]]:
    if len(frames_b64) != 8:
        raise ValueError("Captioner requires exactly 8 frames")

    system = _load_prompt("caption.txt")
    content_parts: list[dict[str, Any]] = []
    for frame in frames_b64:
        content_parts.append({
            "type": "image_url",
            "image_url": {"url": f"data:image/jpeg;base64,{frame}"},
        })
    content_parts.append({
        "type": "text",
        "text": f"Audio transcript: {transcript}",
    })

    return [
        {"role": "system", "content": system},
        {"role": "user", "content": content_parts},
    ]


def parse_caption_response(raw: str) -> str:
    return raw.strip()
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_captioner.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/ingest/ main/server/worldmm/tests/test_captioner.py
git commit -m "feat(worldmm): add captioner prompt building with multimodal frames"

Task 5: Multiscale Merging

Files: - Create: main/server/worldmm/memory/episodic/multiscale.py - Test: main/server/worldmm/tests/test_multiscale.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_multiscale.py
from __future__ import annotations

import sys
from pathlib import Path

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_build_merge_prompt_includes_all_captions():
    from worldmm.memory.episodic.multiscale import build_merge_prompt
    captions = [
        "Caption 1: I enter the kitchen",
        "Caption 2: I open the fridge",
        "Caption 3: I take out milk",
        "Caption 4: I pour milk into a glass",
        "Caption 5: I drink the milk",
        "Caption 6: I wash the glass",
    ]
    messages = build_merge_prompt(captions, target_duration_seconds=180)
    assert isinstance(messages, list)
    content = str(messages)
    for cap in captions:
        assert cap in content
    assert "3-minute" in content or "180" in content


def test_build_merge_prompt_for_hour_scale():
    from worldmm.memory.episodic.multiscale import build_merge_prompt
    summaries = [f"Summary {i}" for i in range(6)]
    messages = build_merge_prompt(summaries, target_duration_seconds=3600)
    content = str(messages)
    assert "1-hour" in content or "3600" in content


def test_parse_merge_response():
    from worldmm.memory.episodic.multiscale import parse_merge_response
    result = parse_merge_response("The person went to the kitchen and made milk.")
    assert result == "The person went to the kitchen and made milk."


def test_build_merge_prompt_rejects_empty():
    from worldmm.memory.episodic.multiscale import build_merge_prompt
    import pytest
    with pytest.raises(ValueError, match="at least one"):
        build_merge_prompt([], target_duration_seconds=180)
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_multiscale.py -v Expected: FAIL — cannot import name 'build_merge_prompt' from 'worldmm.memory.episodic.multiscale'

  • [ ] Step 3: Write the implementation
# main/server/worldmm/memory/episodic/multiscale.py
from __future__ import annotations

from pathlib import Path
from typing import Any

_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"

_DURATION_LABELS = {
    180: "3-minute",
    600: "10-minute",
    3600: "1-hour",
}


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    if name == "merge_captions.txt":
        return (
            "You are a summarization assistant. Given a sequence of captions from consecutive "
            "video segments, merge them into a single coherent {duration_label} summary. "
            "Preserve key entities, actions, and temporal order. Be concise but complete."
        )
    return ""


def build_merge_prompt(
    captions: list[str],
    target_duration_seconds: int,
) -> list[dict[str, str]]:
    if not captions:
        raise ValueError("Merge requires at least one caption")

    label = _DURATION_LABELS.get(target_duration_seconds, f"{target_duration_seconds}-second")
    template = _load_prompt("merge_captions.txt")
    system = template.replace("{duration_label}", label)

    numbered = "\n".join(f"{i + 1}. {c}" for i, c in enumerate(captions))
    user_content = f"Merge these captions into a {label} summary:\n\n{numbered}"

    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user_content},
    ]


def parse_merge_response(raw: str) -> str:
    return raw.strip()
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_multiscale.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/memory/episodic/multiscale.py \
        main/server/worldmm/tests/test_multiscale.py
git commit -m "feat(worldmm): add multi-scale caption merging prompts"

Task 6: Episodic Graph (igraph + PPR)

Files: - Create: main/server/worldmm/memory/episodic/graph.py - Test: main/server/worldmm/tests/test_episodic_graph.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_episodic_graph.py
from __future__ import annotations

import sys
from pathlib import Path

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def _make_triples():
    """Create a small knowledge graph for testing PPR."""
    return [
        {"subject_entity_id": "e1", "predicate": "stand at", "object_entity_id": "e2",
         "object_literal": None, "segment_id": "s1", "id": "t1"},
        {"subject_entity_id": "e1", "predicate": "talks to", "object_entity_id": "e3",
         "object_literal": None, "segment_id": "s1", "id": "t2"},
        {"subject_entity_id": "e3", "predicate": "asks about", "object_entity_id": None,
         "object_literal": "tomorrow's schedule", "segment_id": "s1", "id": "t3"},
    ]


def _make_entity_names():
    return {"e1": "I", "e2": "dining table", "e3": "Katrina"}


def _make_segment_captions():
    return {"s1": "I stand at the dining table while Katrina asks about tomorrow's schedule"}


def test_build_graph_creates_nodes():
    from worldmm.memory.episodic.graph import EpisodicGraph
    g = EpisodicGraph()
    g.build(_make_triples(), _make_entity_names())
    # 3 entity nodes + 1 literal node
    assert g.node_count() == 4


def test_build_graph_creates_edges():
    from worldmm.memory.episodic.graph import EpisodicGraph
    g = EpisodicGraph()
    g.build(_make_triples(), _make_entity_names())
    assert g.edge_count() == 3


def test_ppr_from_seed_entity():
    from worldmm.memory.episodic.graph import EpisodicGraph
    g = EpisodicGraph()
    g.build(_make_triples(), _make_entity_names())
    ranked = g.ppr(seed_entity_ids=["e1"], top_k=5)
    # e1 is the seed, so it should have highest PPR score
    assert ranked[0]["entity_id"] == "e1"
    # All connected entities should appear
    entity_ids = {r["entity_id"] for r in ranked}
    assert "e3" in entity_ids  # directly connected


def test_ppr_returns_segment_ids():
    from worldmm.memory.episodic.graph import EpisodicGraph
    g = EpisodicGraph()
    g.build(_make_triples(), _make_entity_names())
    ranked = g.ppr(seed_entity_ids=["e1"], top_k=5)
    # Each result should include the segment_ids that reference it
    for r in ranked:
        assert "segment_ids" in r


def test_get_captions_for_segments():
    from worldmm.memory.episodic.graph import EpisodicGraph
    g = EpisodicGraph()
    g.build(_make_triples(), _make_entity_names())
    g.set_segment_captions(_make_segment_captions())
    captions = g.get_captions(["s1"])
    assert captions == ["I stand at the dining table while Katrina asks about tomorrow's schedule"]


def test_empty_graph_ppr_returns_empty():
    from worldmm.memory.episodic.graph import EpisodicGraph
    g = EpisodicGraph()
    g.build([], {})
    ranked = g.ppr(seed_entity_ids=["nonexistent"], top_k=5)
    assert ranked == []
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_episodic_graph.py -v Expected: FAIL — ModuleNotFoundError or ImportError

  • [ ] Step 3: Write the implementation
# main/server/worldmm/memory/episodic/graph.py
from __future__ import annotations

from collections import defaultdict
from typing import Any

import igraph as ig


class EpisodicGraph:
    def __init__(self) -> None:
        self._graph: ig.Graph | None = None
        self._node_id_to_idx: dict[str, int] = {}
        self._idx_to_node_id: dict[int, str] = {}
        self._node_segment_ids: dict[str, set[str]] = defaultdict(set)
        self._segment_captions: dict[str, str] = {}

    def build(
        self,
        triples: list[dict[str, Any]],
        entity_names: dict[str, str],
    ) -> None:
        nodes: set[str] = set()
        edges: list[tuple[str, str]] = []

        for t in triples:
            subj = t["subject_entity_id"]
            nodes.add(subj)
            seg_id = t.get("segment_id")

            if t.get("object_entity_id"):
                obj = t["object_entity_id"]
                nodes.add(obj)
                edges.append((subj, obj))
                if seg_id:
                    self._node_segment_ids[subj].add(seg_id)
                    self._node_segment_ids[obj].add(seg_id)
            elif t.get("object_literal"):
                literal_id = f"lit:{t['id']}"
                nodes.add(literal_id)
                edges.append((subj, literal_id))
                if seg_id:
                    self._node_segment_ids[subj].add(seg_id)
                    self._node_segment_ids[literal_id].add(seg_id)

        if not nodes:
            self._graph = ig.Graph(directed=False)
            return

        node_list = sorted(nodes)
        self._node_id_to_idx = {nid: i for i, nid in enumerate(node_list)}
        self._idx_to_node_id = {i: nid for nid, i in self._node_id_to_idx.items()}

        g = ig.Graph(n=len(node_list), directed=False)
        g.vs["node_id"] = node_list
        g.vs["name"] = [entity_names.get(nid, nid) for nid in node_list]

        edge_indices = []
        for src, dst in edges:
            if src in self._node_id_to_idx and dst in self._node_id_to_idx:
                edge_indices.append((self._node_id_to_idx[src], self._node_id_to_idx[dst]))
        g.add_edges(edge_indices)
        self._graph = g

    def node_count(self) -> int:
        return self._graph.vcount() if self._graph else 0

    def edge_count(self) -> int:
        return self._graph.ecount() if self._graph else 0

    def ppr(
        self,
        seed_entity_ids: list[str],
        top_k: int = 10,
        damping: float = 0.85,
    ) -> list[dict[str, Any]]:
        if not self._graph or self._graph.vcount() == 0:
            return []

        reset_vec = [0.0] * self._graph.vcount()
        valid_seeds = [eid for eid in seed_entity_ids if eid in self._node_id_to_idx]
        if not valid_seeds:
            return []

        weight = 1.0 / len(valid_seeds)
        for eid in valid_seeds:
            reset_vec[self._node_id_to_idx[eid]] = weight

        scores = self._graph.personalized_pagerank(
            reset=reset_vec,
            damping=damping,
            implementation="prpack",
        )

        ranked = []
        for idx, score in enumerate(scores):
            node_id = self._idx_to_node_id[idx]
            ranked.append({
                "entity_id": node_id,
                "name": self._graph.vs[idx]["name"],
                "score": score,
                "segment_ids": sorted(self._node_segment_ids.get(node_id, set())),
            })
        ranked.sort(key=lambda x: x["score"], reverse=True)
        return ranked[:top_k]

    def set_segment_captions(self, captions: dict[str, str]) -> None:
        self._segment_captions = captions

    def get_captions(self, segment_ids: list[str]) -> list[str]:
        return [self._segment_captions[sid] for sid in segment_ids if sid in self._segment_captions]
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_episodic_graph.py -v Expected: All 6 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/memory/episodic/graph.py \
        main/server/worldmm/tests/test_episodic_graph.py
git commit -m "feat(worldmm): add episodic graph with PPR via igraph"

Task 7: Semantic Graph (PPR with Edge Scoring)

Files: - Create: main/server/worldmm/memory/semantic/__init__.py - Create: main/server/worldmm/memory/semantic/graph.py - Test: main/server/worldmm/tests/test_semantic_graph.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_semantic_graph.py
from __future__ import annotations

import sys
from pathlib import Path

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def _make_semantic_triples():
    return [
        {"id": "t1", "subject_entity_id": "e1", "predicate": "often eats",
         "object_entity_id": "e2", "object_literal": None},
        {"id": "t2", "subject_entity_id": "e1", "predicate": "is friends with",
         "object_entity_id": "e3", "object_literal": None},
        {"id": "t3", "subject_entity_id": "e3", "predicate": "helps with",
         "object_entity_id": None, "object_literal": "expense tracking"},
    ]


def _make_entity_names():
    return {"e1": "I", "e2": "fruits", "e3": "Alice"}


def test_build_semantic_graph():
    from worldmm.memory.semantic.graph import SemanticGraph
    g = SemanticGraph()
    g.build(_make_semantic_triples(), _make_entity_names())
    assert g.node_count() == 4  # 3 entities + 1 literal


def test_ppr_edge_scoring():
    from worldmm.memory.semantic.graph import SemanticGraph
    g = SemanticGraph()
    g.build(_make_semantic_triples(), _make_entity_names())
    ranked_triples = g.ppr_edge_scored(seed_entity_ids=["e1"], top_k=10)
    assert len(ranked_triples) > 0
    # Each result is a triple with a score
    for t in ranked_triples:
        assert "triple_id" in t
        assert "score" in t
        assert "subject" in t
        assert "predicate" in t


def test_ppr_edge_score_is_sum_of_node_scores():
    from worldmm.memory.semantic.graph import SemanticGraph
    g = SemanticGraph()
    triples = [
        {"id": "t1", "subject_entity_id": "e1", "predicate": "knows",
         "object_entity_id": "e2", "object_literal": None},
    ]
    g.build(triples, {"e1": "A", "e2": "B"})
    ranked = g.ppr_edge_scored(seed_entity_ids=["e1"], top_k=10)
    # Score should be PPR(subject) + PPR(object)
    node_scores = g.ppr_node_scores(seed_entity_ids=["e1"])
    expected = node_scores.get("e1", 0) + node_scores.get("e2", 0)
    assert abs(ranked[0]["score"] - expected) < 1e-6


def test_empty_semantic_graph():
    from worldmm.memory.semantic.graph import SemanticGraph
    g = SemanticGraph()
    g.build([], {})
    ranked = g.ppr_edge_scored(seed_entity_ids=["e1"], top_k=10)
    assert ranked == []
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_semantic_graph.py -v Expected: FAIL

  • [ ] Step 3: Write the implementation
# main/server/worldmm/memory/semantic/__init__.py
# (empty)

# main/server/worldmm/memory/semantic/graph.py
from __future__ import annotations

from collections import defaultdict
from typing import Any

import igraph as ig


class SemanticGraph:
    def __init__(self) -> None:
        self._graph: ig.Graph | None = None
        self._node_id_to_idx: dict[str, int] = {}
        self._idx_to_node_id: dict[int, str] = {}
        self._triples: list[dict[str, Any]] = []
        self._entity_names: dict[str, str] = {}

    def build(
        self,
        triples: list[dict[str, Any]],
        entity_names: dict[str, str],
    ) -> None:
        self._triples = triples
        self._entity_names = entity_names
        nodes: set[str] = set()
        edges: list[tuple[str, str]] = []

        for t in triples:
            subj = t["subject_entity_id"]
            nodes.add(subj)
            if t.get("object_entity_id"):
                obj = t["object_entity_id"]
                nodes.add(obj)
                edges.append((subj, obj))
            elif t.get("object_literal"):
                literal_id = f"lit:{t['id']}"
                nodes.add(literal_id)
                edges.append((subj, literal_id))

        if not nodes:
            self._graph = ig.Graph(directed=False)
            return

        node_list = sorted(nodes)
        self._node_id_to_idx = {nid: i for i, nid in enumerate(node_list)}
        self._idx_to_node_id = {i: nid for nid, i in self._node_id_to_idx.items()}

        g = ig.Graph(n=len(node_list), directed=False)
        g.vs["node_id"] = node_list
        g.vs["name"] = [entity_names.get(nid, nid) for nid in node_list]

        edge_indices = []
        for src, dst in edges:
            if src in self._node_id_to_idx and dst in self._node_id_to_idx:
                edge_indices.append((self._node_id_to_idx[src], self._node_id_to_idx[dst]))
        g.add_edges(edge_indices)
        self._graph = g

    def node_count(self) -> int:
        return self._graph.vcount() if self._graph else 0

    def ppr_node_scores(
        self,
        seed_entity_ids: list[str],
        damping: float = 0.85,
    ) -> dict[str, float]:
        if not self._graph or self._graph.vcount() == 0:
            return {}

        reset_vec = [0.0] * self._graph.vcount()
        valid_seeds = [eid for eid in seed_entity_ids if eid in self._node_id_to_idx]
        if not valid_seeds:
            return {}

        weight = 1.0 / len(valid_seeds)
        for eid in valid_seeds:
            reset_vec[self._node_id_to_idx[eid]] = weight

        scores = self._graph.personalized_pagerank(
            reset=reset_vec,
            damping=damping,
            implementation="prpack",
        )
        return {self._idx_to_node_id[i]: s for i, s in enumerate(scores)}

    def ppr_edge_scored(
        self,
        seed_entity_ids: list[str],
        top_k: int = 10,
        damping: float = 0.85,
    ) -> list[dict[str, Any]]:
        node_scores = self.ppr_node_scores(seed_entity_ids, damping)
        if not node_scores:
            return []

        results = []
        for t in self._triples:
            subj = t["subject_entity_id"]
            if t.get("object_entity_id"):
                obj = t["object_entity_id"]
            elif t.get("object_literal"):
                obj = f"lit:{t['id']}"
            else:
                continue

            score = node_scores.get(subj, 0.0) + node_scores.get(obj, 0.0)
            obj_display = (
                self._entity_names.get(t["object_entity_id"], t.get("object_entity_id", ""))
                if t.get("object_entity_id")
                else t.get("object_literal", "")
            )
            results.append({
                "triple_id": t["id"],
                "score": score,
                "subject": self._entity_names.get(subj, subj),
                "predicate": t["predicate"],
                "object": obj_display,
            })

        results.sort(key=lambda x: x["score"], reverse=True)
        return results[:top_k]
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_semantic_graph.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/memory/semantic/ \
        main/server/worldmm/tests/test_semantic_graph.py
git commit -m "feat(worldmm): add semantic graph with PPR edge scoring"

Task 8: Semantic Extraction + Consolidation

Files: - Create: main/server/worldmm/memory/semantic/extraction.py - Create: main/server/worldmm/memory/semantic/consolidation.py - Create: main/server/worldmm/tests/fixtures/sample_semantic_triples_response.json - Test: main/server/worldmm/tests/test_consolidation.py

  • [ ] Step 1: Create fixture
// main/server/worldmm/tests/fixtures/sample_semantic_triples_response.json
{
  "triples": [
    {"subject": "I", "predicate": "often eats", "object": "fruits and snacks"},
    {"subject": "Alice", "predicate": "expresses romantic feelings toward", "object": "I"}
  ]
}
  • [ ] Step 2: Write the failing test
# main/server/worldmm/tests/test_consolidation.py
from __future__ import annotations

import json
import sys
from pathlib import Path

import numpy as np

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))

FIXTURES = Path(__file__).parent / "fixtures"


def test_build_semantic_triple_prompt():
    from worldmm.memory.semantic.extraction import build_semantic_triple_prompt
    messages = build_semantic_triple_prompt("Over the past 10 minutes, Blaine cooked dinner at home again.")
    assert isinstance(messages, list)
    content = str(messages)
    assert "Blaine" in content
    assert "long-term" in content.lower() or "pattern" in content.lower() or "habit" in content.lower()


def test_parse_semantic_triples():
    from worldmm.memory.semantic.extraction import parse_semantic_triples
    raw = (FIXTURES / "sample_semantic_triples_response.json").read_text()
    triples = parse_semantic_triples(raw)
    assert len(triples) == 2
    assert triples[0]["predicate"] == "often eats"


def test_find_consolidation_candidates_above_threshold():
    from worldmm.memory.semantic.consolidation import find_consolidation_candidates
    # Simulate embeddings: existing triple embeddings + new triple embedding
    existing = [
        {"id": "t1", "embedding": [1.0, 0.0, 0.0]},
        {"id": "t2", "embedding": [0.0, 1.0, 0.0]},
        {"id": "t3", "embedding": [0.95, 0.05, 0.0]},  # very similar to new
    ]
    new_embedding = [1.0, 0.0, 0.0]
    candidates = find_consolidation_candidates(existing, new_embedding, threshold=0.6)
    candidate_ids = [c["id"] for c in candidates]
    assert "t1" in candidate_ids  # cosine = 1.0
    assert "t3" in candidate_ids  # cosine ≈ 0.95
    assert "t2" not in candidate_ids  # cosine = 0.0


def test_find_consolidation_candidates_respects_threshold():
    from worldmm.memory.semantic.consolidation import find_consolidation_candidates
    existing = [
        {"id": "t1", "embedding": [0.5, 0.5, 0.0]},  # cosine ≈ 0.707
    ]
    new_embedding = [1.0, 0.0, 0.0]
    candidates_high = find_consolidation_candidates(existing, new_embedding, threshold=0.8)
    candidates_low = find_consolidation_candidates(existing, new_embedding, threshold=0.5)
    assert len(candidates_high) == 0
    assert len(candidates_low) == 1


def test_build_consolidation_prompt():
    from worldmm.memory.semantic.consolidation import build_consolidation_prompt
    existing_triples = [
        {"subject": "I", "predicate": "sometimes eats", "object": "fruit"},
    ]
    new_triple = {"subject": "I", "predicate": "often eats", "object": "fruits and snacks"}
    messages = build_consolidation_prompt(existing_triples, new_triple)
    assert isinstance(messages, list)
    content = str(messages)
    assert "sometimes eats" in content
    assert "often eats" in content


def test_parse_consolidation_response_merge():
    from worldmm.memory.semantic.consolidation import parse_consolidation_response
    raw = json.dumps({
        "action": "merge",
        "remove_ids": ["t1"],
        "new_triple": {"subject": "I", "predicate": "regularly eats", "object": "fruits and snacks"},
    })
    result = parse_consolidation_response(raw)
    assert result["action"] == "merge"
    assert result["remove_ids"] == ["t1"]
    assert result["new_triple"]["predicate"] == "regularly eats"


def test_parse_consolidation_response_keep():
    from worldmm.memory.semantic.consolidation import parse_consolidation_response
    raw = json.dumps({"action": "keep"})
    result = parse_consolidation_response(raw)
    assert result["action"] == "keep"
  • [ ] Step 3: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_consolidation.py -v Expected: FAIL

  • [ ] Step 4: Write the implementation
# main/server/worldmm/memory/semantic/extraction.py
from __future__ import annotations

import json
from pathlib import Path

_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    if name == "semantic_triples.txt":
        return (
            "Extract long-term patterns, habits, preferences, and social relationships from this summary. "
            "Focus on what is generally true, not event-specific details. "
            "Return JSON: {\"triples\": [{\"subject\": ..., \"predicate\": ..., \"object\": ...}, ...]}"
        )
    return ""


def build_semantic_triple_prompt(summary: str) -> list[dict[str, str]]:
    system = _load_prompt("semantic_triples.txt")
    return [
        {"role": "system", "content": system},
        {"role": "user", "content": summary},
    ]


def parse_semantic_triples(raw: str) -> list[dict[str, str]]:
    data = json.loads(raw)
    triples = data.get("triples", [])
    return [
        t for t in triples
        if all(k in t for k in ("subject", "predicate", "object"))
    ]
# main/server/worldmm/memory/semantic/consolidation.py
from __future__ import annotations

import json
from pathlib import Path
from typing import Any

import numpy as np

_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"


def _cosine_similarity(a: list[float], b: list[float]) -> float:
    a_arr = np.array(a)
    b_arr = np.array(b)
    dot = np.dot(a_arr, b_arr)
    norm = np.linalg.norm(a_arr) * np.linalg.norm(b_arr)
    if norm == 0:
        return 0.0
    return float(dot / norm)


def find_consolidation_candidates(
    existing: list[dict[str, Any]],
    new_embedding: list[float],
    threshold: float = 0.6,
) -> list[dict[str, Any]]:
    candidates = []
    for item in existing:
        sim = _cosine_similarity(item["embedding"], new_embedding)
        if sim >= threshold:
            candidates.append({**item, "similarity": sim})
    candidates.sort(key=lambda x: x["similarity"], reverse=True)
    return candidates


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    if name == "consolidation.txt":
        return (
            "Given existing semantic triples and a new triple, decide how to consolidate. "
            "If the new triple conflicts with or updates an existing one, merge them. "
            "If the new triple is independent, keep it as-is. "
            "Return JSON: {\"action\": \"merge\"|\"keep\", \"remove_ids\": [...], "
            "\"new_triple\": {\"subject\": ..., \"predicate\": ..., \"object\": ...}}"
        )
    return ""


def build_consolidation_prompt(
    existing_triples: list[dict[str, str]],
    new_triple: dict[str, str],
) -> list[dict[str, str]]:
    system = _load_prompt("consolidation.txt")
    user_content = (
        f"Existing triples:\n{json.dumps(existing_triples, indent=2)}\n\n"
        f"New triple:\n{json.dumps(new_triple, indent=2)}"
    )
    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user_content},
    ]


def parse_consolidation_response(raw: str) -> dict[str, Any]:
    data = json.loads(raw)
    result: dict[str, Any] = {"action": data.get("action", "keep")}
    if result["action"] == "merge":
        result["remove_ids"] = data.get("remove_ids", [])
        result["new_triple"] = data.get("new_triple", {})
    return result
  • [ ] Step 5: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_consolidation.py -v Expected: All 7 tests PASS

  • [ ] Step 6: Commit
git add main/server/worldmm/memory/semantic/extraction.py \
        main/server/worldmm/memory/semantic/consolidation.py \
        main/server/worldmm/tests/test_consolidation.py \
        main/server/worldmm/tests/fixtures/sample_semantic_triples_response.json
git commit -m "feat(worldmm): add semantic triple extraction and consolidation"

Task 9: Visual Encoder Client

Files: - Create: main/server/worldmm/memory/visual/__init__.py - Create: main/server/worldmm/memory/visual/encoder.py - Test: main/server/worldmm/tests/test_visual_encoder.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_visual_encoder.py
from __future__ import annotations

import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))

EMBEDDING_DIM = 3584


def test_encode_video_sends_correct_payload():
    from worldmm.memory.visual.encoder import VLM2VecClient
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.json.return_value = {"embedding": [0.1] * EMBEDDING_DIM}

    with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
        mock_requests.post.return_value = mock_response
        client = VLM2VecClient(base_url="http://gpu-worker:8000")
        embedding = client.encode_video(frames_b64=["frame1", "frame2"])

    mock_requests.post.assert_called_once()
    call_args = mock_requests.post.call_args
    assert call_args[0][0] == "http://gpu-worker:8000/encode-video"
    payload = call_args[1]["json"]
    assert payload["frames"] == ["frame1", "frame2"]


def test_encode_video_returns_embedding():
    from worldmm.memory.visual.encoder import VLM2VecClient
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.json.return_value = {"embedding": [0.5] * EMBEDDING_DIM}

    with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
        mock_requests.post.return_value = mock_response
        client = VLM2VecClient(base_url="http://gpu-worker:8000")
        embedding = client.encode_video(frames_b64=["f1"])

    assert len(embedding) == EMBEDDING_DIM
    assert embedding[0] == 0.5


def test_encode_text_sends_correct_payload():
    from worldmm.memory.visual.encoder import VLM2VecClient
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.json.return_value = {"embedding": [0.2] * EMBEDDING_DIM}

    with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
        mock_requests.post.return_value = mock_response
        client = VLM2VecClient(base_url="http://gpu-worker:8000")
        embedding = client.encode_text("what did I eat for lunch")

    call_args = mock_requests.post.call_args
    assert call_args[0][0] == "http://gpu-worker:8000/encode-text"
    payload = call_args[1]["json"]
    assert payload["text"] == "what did I eat for lunch"


def test_encode_video_raises_on_error():
    from worldmm.memory.visual.encoder import VLM2VecClient
    mock_response = MagicMock()
    mock_response.status_code = 500
    mock_response.raise_for_status.side_effect = Exception("GPU worker error")

    with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
        mock_requests.post.return_value = mock_response
        client = VLM2VecClient(base_url="http://gpu-worker:8000")
        import pytest
        with pytest.raises(Exception, match="GPU worker error"):
            client.encode_video(frames_b64=["f1"])
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_visual_encoder.py -v Expected: FAIL

  • [ ] Step 3: Write the implementation
# main/server/worldmm/memory/visual/__init__.py
# (empty)

# main/server/worldmm/memory/visual/encoder.py
from __future__ import annotations

from typing import Any

import requests


class VLM2VecClient:
    def __init__(self, base_url: str) -> None:
        self._base_url = base_url.rstrip("/")

    def encode_video(self, frames_b64: list[str]) -> list[float]:
        response = requests.post(
            f"{self._base_url}/encode-video",
            json={"frames": frames_b64},
            timeout=60,
        )
        response.raise_for_status()
        return response.json()["embedding"]

    def encode_text(self, text: str) -> list[float]:
        response = requests.post(
            f"{self._base_url}/encode-text",
            json={"text": text},
            timeout=30,
        )
        response.raise_for_status()
        return response.json()["embedding"]
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_visual_encoder.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/memory/visual/ \
        main/server/worldmm/tests/test_visual_encoder.py
git commit -m "feat(worldmm): add VLM2Vec GPU worker client"

Task 10: Episodic Retriever (PPR + Cross-Scale Rerank)

Files: - Create: main/server/worldmm/retrieval/__init__.py - Create: main/server/worldmm/retrieval/episodic_retriever.py - Test: main/server/worldmm/tests/test_episodic_retriever.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_episodic_retriever.py
from __future__ import annotations

import json
import sys
from pathlib import Path
from unittest.mock import MagicMock

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def _make_mock_graphs():
    """Create mock EpisodicGraph instances for each temporal scale."""
    from worldmm.memory.episodic.graph import EpisodicGraph

    graphs = {}
    for duration, seg_prefix, top_k in [(30, "s30", 10), (180, "s3m", 5), (600, "s10m", 5), (3600, "s1h", 3)]:
        g = EpisodicGraph()
        triples = [
            {"subject_entity_id": "e1", "predicate": "eats", "object_entity_id": "e2",
             "object_literal": None, "segment_id": f"{seg_prefix}_1", "id": f"t_{seg_prefix}"},
        ]
        g.build(triples, {"e1": "I", "e2": "lunch"})
        g.set_segment_captions({f"{seg_prefix}_1": f"Caption at {duration}s scale"})
        graphs[duration] = g
    return graphs


def test_retrieve_episodic_collects_from_all_scales():
    from worldmm.retrieval.episodic_retriever import retrieve_episodic
    graphs = _make_mock_graphs()
    # Mock the reranker to pass through all candidates
    mock_reranker = lambda candidates, query, top_k: candidates[:top_k]
    results = retrieve_episodic(
        graphs=graphs,
        seed_entity_ids=["e1"],
        query="what did I eat",
        reranker=mock_reranker,
    )
    assert len(results) > 0
    # Should have captions from multiple scales
    captions = [r["caption"] for r in results]
    assert any("30s" in c for c in captions)


def test_retrieve_episodic_respects_per_scale_top_k():
    from worldmm.retrieval.episodic_retriever import retrieve_episodic, SCALE_TOP_K
    graphs = _make_mock_graphs()
    mock_reranker = lambda candidates, query, top_k: candidates[:top_k]
    results = retrieve_episodic(
        graphs=graphs,
        seed_entity_ids=["e1"],
        query="test",
        reranker=mock_reranker,
    )
    # Final output should be <= 3 (reranker top_k)
    assert len(results) <= 3


def test_build_rerank_prompt():
    from worldmm.retrieval.episodic_retriever import build_rerank_prompt
    candidates = [
        {"caption": "I ate lunch", "scale": 30, "score": 0.8},
        {"caption": "I prepared food", "scale": 180, "score": 0.6},
    ]
    messages = build_rerank_prompt(candidates, query="what did I eat")
    content = str(messages)
    assert "I ate lunch" in content
    assert "what did I eat" in content


def test_parse_rerank_response():
    from worldmm.retrieval.episodic_retriever import parse_rerank_response
    candidates = [
        {"caption": "A", "scale": 30, "score": 0.8, "segment_id": "s1"},
        {"caption": "B", "scale": 180, "score": 0.6, "segment_id": "s2"},
        {"caption": "C", "scale": 600, "score": 0.4, "segment_id": "s3"},
    ]
    raw = json.dumps({"selected_indices": [0, 2]})
    result = parse_rerank_response(raw, candidates, top_k=3)
    assert len(result) == 2
    assert result[0]["caption"] == "A"
    assert result[1]["caption"] == "C"
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_episodic_retriever.py -v Expected: FAIL

  • [ ] Step 3: Write the implementation
# main/server/worldmm/retrieval/__init__.py
# (empty)

# main/server/worldmm/retrieval/episodic_retriever.py
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Callable

from worldmm.memory.episodic.graph import EpisodicGraph

SCALE_TOP_K = {30: 10, 180: 5, 600: 5, 3600: 3}
RERANK_TOP_K = 3

_PROMPTS_DIR = Path(__file__).resolve().parents[1] / "llm" / "prompts"


def retrieve_episodic(
    graphs: dict[int, EpisodicGraph],
    seed_entity_ids: list[str],
    query: str,
    reranker: Callable[[list[dict[str, Any]], str, int], list[dict[str, Any]]],
) -> list[dict[str, Any]]:
    all_candidates: list[dict[str, Any]] = []

    for scale, top_k in SCALE_TOP_K.items():
        graph = graphs.get(scale)
        if graph is None:
            continue
        ranked = graph.ppr(seed_entity_ids=seed_entity_ids, top_k=top_k)
        # Collect unique segment_ids from ranked nodes
        segment_ids: set[str] = set()
        for node in ranked:
            segment_ids.update(node.get("segment_ids", []))

        captions = graph.get_captions(sorted(segment_ids))
        for i, caption in enumerate(captions):
            seg_id = sorted(segment_ids)[i] if i < len(sorted(segment_ids)) else None
            all_candidates.append({
                "caption": caption,
                "scale": scale,
                "score": ranked[0]["score"] if ranked else 0.0,
                "segment_id": seg_id,
            })

    if not all_candidates:
        return []

    return reranker(all_candidates, query, RERANK_TOP_K)


def build_rerank_prompt(
    candidates: list[dict[str, Any]],
    query: str,
) -> list[dict[str, str]]:
    prompt_path = _PROMPTS_DIR / "cross_scale_rerank.txt"
    if prompt_path.exists():
        system = prompt_path.read_text().strip()
    else:
        system = (
            "Given a query and candidate captions from different temporal scales, "
            "select the most relevant captions. Return JSON: {\"selected_indices\": [0, 2, ...]}"
        )

    numbered = "\n".join(
        f"{i}. [scale={c['scale']}s] {c['caption']}"
        for i, c in enumerate(candidates)
    )
    user_content = f"Query: {query}\n\nCandidates:\n{numbered}"
    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user_content},
    ]


def parse_rerank_response(
    raw: str,
    candidates: list[dict[str, Any]],
    top_k: int = RERANK_TOP_K,
) -> list[dict[str, Any]]:
    data = json.loads(raw)
    indices = data.get("selected_indices", [])
    selected = [candidates[i] for i in indices if i < len(candidates)]
    return selected[:top_k]
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_episodic_retriever.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/retrieval/ \
        main/server/worldmm/tests/test_episodic_retriever.py
git commit -m "feat(worldmm): add episodic retriever with cross-scale PPR and reranking"

Task 11: Semantic + Visual Retrievers

Files: - Create: main/server/worldmm/retrieval/semantic_retriever.py - Create: main/server/worldmm/retrieval/visual_retriever.py - Test: main/server/worldmm/tests/test_semantic_retriever.py - Test: main/server/worldmm/tests/test_visual_retriever.py

  • [ ] Step 1: Write the failing tests
# main/server/worldmm/tests/test_semantic_retriever.py
from __future__ import annotations

import sys
from pathlib import Path

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_retrieve_semantic_returns_triples():
    from worldmm.retrieval.semantic_retriever import retrieve_semantic
    from worldmm.memory.semantic.graph import SemanticGraph
    g = SemanticGraph()
    triples = [
        {"id": "t1", "subject_entity_id": "e1", "predicate": "often eats",
         "object_entity_id": "e2", "object_literal": None},
    ]
    g.build(triples, {"e1": "I", "e2": "fruits"})
    results = retrieve_semantic(graph=g, seed_entity_ids=["e1"], top_k=10)
    assert len(results) == 1
    assert results[0]["predicate"] == "often eats"


def test_retrieve_semantic_empty_graph():
    from worldmm.retrieval.semantic_retriever import retrieve_semantic
    from worldmm.memory.semantic.graph import SemanticGraph
    g = SemanticGraph()
    g.build([], {})
    results = retrieve_semantic(graph=g, seed_entity_ids=["e1"], top_k=10)
    assert results == []
# main/server/worldmm/tests/test_visual_retriever.py
from __future__ import annotations

import sys
from pathlib import Path
from unittest.mock import MagicMock

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_retrieve_visual_calls_encoder_and_returns_results():
    from worldmm.retrieval.visual_retriever import retrieve_visual
    mock_encoder = MagicMock()
    mock_encoder.encode_text.return_value = [0.1] * 3584
    mock_db_query = MagicMock(return_value=[
        {"segment_id": "s1", "similarity": 0.95, "s3_frames_key": "frames/s1"},
        {"segment_id": "s2", "similarity": 0.80, "s3_frames_key": "frames/s2"},
    ])
    results = retrieve_visual(
        query="what food did I eat",
        encoder=mock_encoder,
        db_query_fn=mock_db_query,
        top_k=5,
    )
    mock_encoder.encode_text.assert_called_once_with("what food did I eat")
    assert len(results) == 2
    assert results[0]["segment_id"] == "s1"


def test_retrieve_visual_returns_empty_on_encoder_failure():
    from worldmm.retrieval.visual_retriever import retrieve_visual
    mock_encoder = MagicMock()
    mock_encoder.encode_text.side_effect = Exception("GPU unavailable")
    results = retrieve_visual(
        query="test",
        encoder=mock_encoder,
        db_query_fn=MagicMock(),
        top_k=5,
    )
    assert results == []
  • [ ] Step 2: Run tests to verify they fail

Run: cd main/server && python -m pytest worldmm/tests/test_semantic_retriever.py worldmm/tests/test_visual_retriever.py -v Expected: FAIL

  • [ ] Step 3: Write the implementations
# main/server/worldmm/retrieval/semantic_retriever.py
from __future__ import annotations

from typing import Any

from worldmm.memory.semantic.graph import SemanticGraph


def retrieve_semantic(
    graph: SemanticGraph,
    seed_entity_ids: list[str],
    top_k: int = 10,
) -> list[dict[str, Any]]:
    return graph.ppr_edge_scored(seed_entity_ids=seed_entity_ids, top_k=top_k)
# main/server/worldmm/retrieval/visual_retriever.py
from __future__ import annotations

from typing import Any, Callable

from worldmm.memory.visual.encoder import VLM2VecClient


def retrieve_visual(
    query: str,
    encoder: VLM2VecClient,
    db_query_fn: Callable[[list[float], int], list[dict[str, Any]]],
    top_k: int = 5,
) -> list[dict[str, Any]]:
    try:
        query_embedding = encoder.encode_text(query)
    except Exception:
        return []
    return db_query_fn(query_embedding, top_k)
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_semantic_retriever.py worldmm/tests/test_visual_retriever.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/retrieval/semantic_retriever.py \
        main/server/worldmm/retrieval/visual_retriever.py \
        main/server/worldmm/tests/test_semantic_retriever.py \
        main/server/worldmm/tests/test_visual_retriever.py
git commit -m "feat(worldmm): add semantic and visual retrievers"

Task 12: Reasoning Agent (SEARCH/ANSWER Loop)

Files: - Create: main/server/worldmm/retrieval/agent.py - Test: main/server/worldmm/tests/test_retrieval_agent.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_retrieval_agent.py
from __future__ import annotations

import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, call

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_agent_dispatches_search_to_correct_retriever():
    from worldmm.retrieval.agent import ReasoningAgent
    mock_llm = MagicMock()
    # Round 1: agent says SEARCH episodic
    # Round 2: agent says ANSWER
    mock_llm.call.side_effect = [
        json.dumps({"action": "SEARCH", "memory_type": "episodic", "query": "lunch"}),
        json.dumps({"action": "ANSWER"}),
    ]
    mock_episodic = MagicMock(return_value=[{"caption": "I ate lunch"}])
    mock_semantic = MagicMock(return_value=[])
    mock_visual = MagicMock(return_value=[])
    mock_response_llm = MagicMock()
    mock_response_llm.call.return_value = "You ate lunch at noon."

    agent = ReasoningAgent(
        reasoning_llm=mock_llm,
        response_llm=mock_response_llm,
        retrievers={
            "episodic": mock_episodic,
            "semantic": mock_semantic,
            "visual": mock_visual,
        },
    )
    answer = agent.answer("What did I eat for lunch?")
    mock_episodic.assert_called_once_with("lunch")
    mock_semantic.assert_not_called()
    assert answer == "You ate lunch at noon."


def test_agent_terminates_on_answer_action():
    from worldmm.retrieval.agent import ReasoningAgent
    mock_llm = MagicMock()
    mock_llm.call.return_value = json.dumps({"action": "ANSWER"})
    mock_response_llm = MagicMock()
    mock_response_llm.call.return_value = "I don't have enough context."

    agent = ReasoningAgent(
        reasoning_llm=mock_llm,
        response_llm=mock_response_llm,
        retrievers={"episodic": MagicMock(), "semantic": MagicMock(), "visual": MagicMock()},
    )
    answer = agent.answer("test question")
    # LLM should only be called once (immediate ANSWER)
    assert mock_llm.call.call_count == 1
    assert answer == "I don't have enough context."


def test_agent_forces_answer_after_max_rounds():
    from worldmm.retrieval.agent import ReasoningAgent, MAX_ROUNDS
    mock_llm = MagicMock()
    # Always returns SEARCH — should be forced to stop after MAX_ROUNDS
    mock_llm.call.return_value = json.dumps({
        "action": "SEARCH", "memory_type": "episodic", "query": "test"
    })
    mock_response_llm = MagicMock()
    mock_response_llm.call.return_value = "Forced answer."
    mock_retriever = MagicMock(return_value=[{"caption": "result"}])

    agent = ReasoningAgent(
        reasoning_llm=mock_llm,
        response_llm=mock_response_llm,
        retrievers={"episodic": mock_retriever, "semantic": MagicMock(), "visual": MagicMock()},
    )
    answer = agent.answer("test")
    assert mock_llm.call.call_count == MAX_ROUNDS
    assert answer == "Forced answer."


def test_agent_accumulates_context_across_rounds():
    from worldmm.retrieval.agent import ReasoningAgent
    mock_llm = MagicMock()
    mock_llm.call.side_effect = [
        json.dumps({"action": "SEARCH", "memory_type": "episodic", "query": "lunch"}),
        json.dumps({"action": "SEARCH", "memory_type": "semantic", "query": "eating habits"}),
        json.dumps({"action": "ANSWER"}),
    ]
    mock_episodic = MagicMock(return_value=[{"caption": "I ate sushi"}])
    mock_semantic = MagicMock(return_value=[{"subject": "I", "predicate": "often eats", "object": "sushi"}])
    mock_response_llm = MagicMock()
    mock_response_llm.call.return_value = "You frequently eat sushi."

    agent = ReasoningAgent(
        reasoning_llm=mock_llm,
        response_llm=mock_response_llm,
        retrievers={
            "episodic": mock_episodic,
            "semantic": mock_semantic,
            "visual": MagicMock(),
        },
    )
    answer = agent.answer("What do I usually eat?")

    # Response LLM should receive accumulated context from both rounds
    response_call = mock_response_llm.call.call_args
    request = response_call[1]["build_request_dict"]
    context_str = str(request["messages"])
    assert "sushi" in context_str


def test_parse_agent_action_search():
    from worldmm.retrieval.agent import parse_agent_action
    raw = json.dumps({"action": "SEARCH", "memory_type": "semantic", "query": "test"})
    action = parse_agent_action(raw)
    assert action["action"] == "SEARCH"
    assert action["memory_type"] == "semantic"
    assert action["query"] == "test"


def test_parse_agent_action_answer():
    from worldmm.retrieval.agent import parse_agent_action
    raw = json.dumps({"action": "ANSWER"})
    action = parse_agent_action(raw)
    assert action["action"] == "ANSWER"
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_retrieval_agent.py -v Expected: FAIL

  • [ ] Step 3: Write the implementation
# main/server/worldmm/retrieval/agent.py
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Callable

from worldmm.llm.client import LLMClient, build_request

MAX_ROUNDS = 5

_PROMPTS_DIR = Path(__file__).resolve().parents[1] / "llm" / "prompts"


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    if name == "reasoning_agent.txt":
        return (
            "You are a memory retrieval agent. Given a user's question and any previously "
            "retrieved context, decide whether to search a memory type or answer.\n\n"
            "Available memory types: episodic (events), semantic (relationships/habits), visual (images/scenes).\n\n"
            "Respond with JSON:\n"
            '- To search: {"action": "SEARCH", "memory_type": "episodic"|"semantic"|"visual", "query": "..."}\n'
            '- To answer: {"action": "ANSWER"}'
        )
    if name == "response.txt":
        return (
            "You are a helpful assistant. Given a user's question and retrieved memory context, "
            "provide a clear and accurate answer based on the available information."
        )
    return ""


def parse_agent_action(raw: str) -> dict[str, Any]:
    return json.loads(raw)


class ReasoningAgent:
    def __init__(
        self,
        reasoning_llm: LLMClient,
        response_llm: LLMClient,
        retrievers: dict[str, Callable[[str], list[dict[str, Any]]]],
    ) -> None:
        self._reasoning_llm = reasoning_llm
        self._response_llm = response_llm
        self._retrievers = retrievers

    def answer(self, question: str) -> str:
        rounds: list[dict[str, Any]] = []
        accumulated_context: list[dict[str, Any]] = []

        for _ in range(MAX_ROUNDS):
            messages = self._build_reasoning_messages(question, rounds)
            request = build_request(
                model="gpt-5",
                messages=messages,
                max_completion_tokens=512,
            )
            raw = self._reasoning_llm.call(build_request_dict=request)
            action = parse_agent_action(raw)

            if action["action"] == "ANSWER":
                break

            if action["action"] == "SEARCH":
                memory_type = action["memory_type"]
                query = action["query"]
                retriever = self._retrievers.get(memory_type)
                results = retriever(query) if retriever else []
                rounds.append({
                    "action": "SEARCH",
                    "memory_type": memory_type,
                    "query": query,
                    "results": results,
                })
                accumulated_context.extend(results)

        return self._generate_response(question, accumulated_context)

    def _build_reasoning_messages(
        self,
        question: str,
        rounds: list[dict[str, Any]],
    ) -> list[dict[str, str]]:
        system = _load_prompt("reasoning_agent.txt")
        context_parts = [f"User question: {question}"]
        for i, r in enumerate(rounds):
            context_parts.append(
                f"\nRound {i + 1}: SEARCH({r['memory_type']}, \"{r['query']}\")\n"
                f"Results: {json.dumps(r['results'], default=str)}"
            )
        return [
            {"role": "system", "content": system},
            {"role": "user", "content": "\n".join(context_parts)},
        ]

    def _generate_response(
        self,
        question: str,
        context: list[dict[str, Any]],
    ) -> str:
        system = _load_prompt("response.txt")
        user_content = (
            f"Question: {question}\n\n"
            f"Retrieved context:\n{json.dumps(context, default=str)}"
        )
        request = build_request(
            model="gpt-5",
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": user_content},
            ],
            max_completion_tokens=1024,
        )
        return self._response_llm.call(build_request_dict=request)
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_retrieval_agent.py -v Expected: All 6 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/retrieval/agent.py \
        main/server/worldmm/tests/test_retrieval_agent.py
git commit -m "feat(worldmm): add adaptive reasoning agent with SEARCH/ANSWER loop"

Task 13: Batch Client (Batch API Request Builder + Poller)

Files: - Create: main/server/worldmm/llm/batch.py - Test: main/server/worldmm/tests/test_batch_client.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_batch_client.py
from __future__ import annotations

import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch, mock_open

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_build_batch_requests_creates_jsonl():
    from worldmm.llm.batch import build_batch_jsonl
    requests = [
        {"model": "gpt-5-mini", "messages": [{"role": "user", "content": "hello"}],
         "max_completion_tokens": 256, "reasoning_effort": "low"},
        {"model": "gpt-5-mini", "messages": [{"role": "user", "content": "world"}],
         "max_completion_tokens": 256, "reasoning_effort": "low"},
    ]
    lines = build_batch_jsonl(requests, custom_ids=["req-1", "req-2"])
    assert len(lines) == 2
    parsed_0 = json.loads(lines[0])
    assert parsed_0["custom_id"] == "req-1"
    assert parsed_0["method"] == "POST"
    assert parsed_0["url"] == "/v1/chat/completions"
    assert parsed_0["body"]["model"] == "gpt-5-mini"
    assert parsed_0["body"]["max_completion_tokens"] == 256


def test_build_batch_jsonl_auto_generates_ids():
    from worldmm.llm.batch import build_batch_jsonl
    requests = [
        {"model": "gpt-5-mini", "messages": [{"role": "user", "content": "hi"}],
         "max_completion_tokens": 64},
    ]
    lines = build_batch_jsonl(requests)
    parsed = json.loads(lines[0])
    assert parsed["custom_id"].startswith("req-")


def test_parse_batch_results_maps_to_custom_ids():
    from worldmm.llm.batch import parse_batch_results
    result_lines = [
        json.dumps({
            "custom_id": "req-1",
            "response": {
                "status_code": 200,
                "body": {
                    "choices": [{"message": {"content": "response 1"}}]
                }
            }
        }),
        json.dumps({
            "custom_id": "req-2",
            "response": {
                "status_code": 200,
                "body": {
                    "choices": [{"message": {"content": "response 2"}}]
                }
            }
        }),
    ]
    results = parse_batch_results(result_lines)
    assert results["req-1"] == "response 1"
    assert results["req-2"] == "response 2"


def test_parse_batch_results_handles_errors():
    from worldmm.llm.batch import parse_batch_results
    result_lines = [
        json.dumps({
            "custom_id": "req-1",
            "response": {"status_code": 500, "body": {"error": {"message": "server error"}}}
        }),
    ]
    results = parse_batch_results(result_lines)
    assert results["req-1"] is None
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_batch_client.py -v Expected: FAIL

  • [ ] Step 3: Write the implementation
# main/server/worldmm/llm/batch.py
from __future__ import annotations

import json
import uuid
from typing import Any


def build_batch_jsonl(
    requests: list[dict[str, Any]],
    custom_ids: list[str] | None = None,
) -> list[str]:
    if custom_ids is None:
        custom_ids = [f"req-{uuid.uuid4().hex[:8]}" for _ in requests]

    lines = []
    for req, cid in zip(requests, custom_ids):
        line = {
            "custom_id": cid,
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": req,
        }
        lines.append(json.dumps(line))
    return lines


def parse_batch_results(result_lines: list[str]) -> dict[str, str | None]:
    results: dict[str, str | None] = {}
    for line in result_lines:
        data = json.loads(line)
        custom_id = data["custom_id"]
        response = data.get("response", {})
        if response.get("status_code") == 200:
            body = response.get("body", {})
            choices = body.get("choices", [])
            if choices:
                results[custom_id] = choices[0].get("message", {}).get("content")
            else:
                results[custom_id] = None
        else:
            results[custom_id] = None
    return results
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_batch_client.py -v Expected: All 4 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/llm/batch.py \
        main/server/worldmm/tests/test_batch_client.py
git commit -m "feat(worldmm): add batch API JSONL builder and result parser"

Task 14: Prompt Templates

Files: - Create: all files in main/server/worldmm/llm/prompts/

  • [ ] Step 1: Create prompt template files

Create each file in main/server/worldmm/llm/prompts/ with the prompt content derived from the WorldMM paper. These replace the inline fallback strings in the code.

ner.txt:

Extract all named entities from the following paragraph. Include people (by name or pronoun "I"), objects, locations, and organizations. Return your answer as JSON: {"entities": ["entity1", "entity2", ...]}

episodic_triples.txt:

Given the caption and the extracted named entities, extract RDF-style knowledge triples. Each triple must be [subject, predicate, object] where at least one element is a named entity from the list. Keep pronouns like "I" as-is. Focus on factual, event-specific relationships. Return JSON: {"triples": [{"subject": "...", "predicate": "...", "object": "..."}, ...]}

semantic_triples.txt:

Extract long-term patterns, habits, preferences, and social relationships from this summary. Focus on what is generally true about the people and their behaviors — not what happened in this specific moment. Examples: habitual activities, recurring preferences, interpersonal relationships. Return JSON: {"triples": [{"subject": "...", "predicate": "...", "object": "..."}, ...]}

caption.txt:

You are a video captioning assistant. Given 8 frames sampled uniformly from a 30-second video clip and an audio transcript, generate a detailed caption describing what is happening. Include the people present (by name if identifiable, or by description), their actions, objects they interact with, and the location/setting. Be specific and factual.

merge_captions.txt:

You are a summarization assistant. Given a sequence of captions from consecutive video segments, merge them into a single coherent {duration_label} summary. Preserve key entities, actions, and temporal order. Remove redundancy but keep important details. Be concise but complete.

consolidation.txt:

You are maintaining a knowledge base of semantic triples about a person's life. Given existing triples and a new triple, decide how to consolidate them.

If the new triple conflicts with or updates an existing triple (e.g., "I sometimes eats fruit" vs "I often eats fruits and snacks"), merge them into an updated triple and mark the old ones for removal.

If the new triple adds new information without conflicting, keep it as-is.

Return JSON:
- To merge: {"action": "merge", "remove_ids": ["id1", ...], "new_triple": {"subject": "...", "predicate": "...", "object": "..."}}
- To keep as-is: {"action": "keep"}

cross_scale_rerank.txt:

Given a user's question and candidate captions retrieved from different temporal scales of an episodic memory system, select the most relevant captions that best answer the question. Consider both direct relevance and temporal specificity.

Return JSON: {"selected_indices": [0, 2, ...]} where indices refer to the candidate list positions. Select at most 3 candidates.

reasoning_agent.txt:

You are an adaptive memory retrieval agent. Given a user's question and any previously retrieved context, decide your next action.

Available memory types:
- episodic: Search for specific events and actions (what happened, when, where)
- semantic: Search for long-term knowledge, habits, and relationships (who knows whom, what someone usually does)
- visual: Search for visual scenes and appearances (what something looked like)

Think about what information you still need to answer the question. If you have enough context, answer.

Respond with JSON:
- To search: {"action": "SEARCH", "memory_type": "episodic"|"semantic"|"visual", "query": "your search query"}
- To answer: {"action": "ANSWER"}

response.txt:

You are a helpful memory assistant. Given a user's question and retrieved memory context from their personal experiences, provide a clear, accurate, and helpful answer. Base your answer only on the provided context. If the context is insufficient, say so.

entity_confirm.txt:

Given two entity surface forms, determine if they refer to the same real-world entity. Consider nicknames, abbreviations, pronouns, and contextual equivalence.

Return JSON: {"same_entity": true|false, "canonical_name": "preferred name if same entity"}

  • [ ] Step 2: Run all tests to verify prompts don't break anything

Run: cd main/server && python -m pytest worldmm/tests/ -v Expected: All tests still PASS (prompt files are now loaded from disk instead of inline fallbacks)

  • [ ] Step 3: Commit
git add main/server/worldmm/llm/prompts/
git commit -m "feat(worldmm): add all prompt templates from WorldMM paper"

Task 15: Entity Resolution Pipeline

Files: - Create: main/server/worldmm/memory/episodic/entity_resolution.py - Test: main/server/worldmm/tests/test_entity_resolution.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_entity_resolution.py
from __future__ import annotations

import json
import sys
from pathlib import Path
from unittest.mock import MagicMock

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_resolve_new_entity_creates_canonical():
    from worldmm.memory.episodic.entity_resolution import resolve_entities
    # Mock embedding function
    mock_embed = MagicMock(return_value=[[0.1] * 10])  # batch embed
    # Mock HNSW search — returns no matches (new entity)
    mock_search = MagicMock(return_value=[])
    # Mock entity creation
    created_ids = []
    def mock_create(user_id, surface_form, canonical_name=None, canonical_entity_id=None):
        eid = f"e-{len(created_ids)}"
        created_ids.append(eid)
        return eid
    mock_llm_confirm = MagicMock()  # should not be called for new entities

    result = resolve_entities(
        surface_forms=["Katrina"],
        user_id="user-1",
        embed_fn=mock_embed,
        search_fn=mock_search,
        create_fn=mock_create,
        llm_confirm_fn=mock_llm_confirm,
    )
    assert len(result) == 1
    assert result["Katrina"] == "e-0"
    mock_llm_confirm.assert_not_called()


def test_resolve_existing_entity_with_llm_confirmation():
    from worldmm.memory.episodic.entity_resolution import resolve_entities
    mock_embed = MagicMock(return_value=[[0.9, 0.1, 0.0]])
    # HNSW search returns a candidate above threshold
    mock_search = MagicMock(return_value=[
        {"id": "existing-e1", "surface_form": "Katrina", "canonical_name": "Katrina", "similarity": 0.92}
    ])
    mock_create = MagicMock()
    # LLM confirms they are the same entity
    mock_llm_confirm = MagicMock(return_value={"same_entity": True, "canonical_name": "Katrina"})

    result = resolve_entities(
        surface_forms=["Kate"],
        user_id="user-1",
        embed_fn=mock_embed,
        search_fn=mock_search,
        create_fn=mock_create,
        llm_confirm_fn=mock_llm_confirm,
    )
    assert result["Kate"] == "existing-e1"
    mock_llm_confirm.assert_called_once()
    # Should create alias row, not new canonical
    mock_create.assert_called_once()
    call_kwargs = mock_create.call_args[1]
    assert call_kwargs["canonical_entity_id"] == "existing-e1"
    assert call_kwargs["surface_form"] == "Kate"


def test_resolve_existing_entity_llm_rejects():
    from worldmm.memory.episodic.entity_resolution import resolve_entities
    mock_embed = MagicMock(return_value=[[0.5, 0.5, 0.0]])
    mock_search = MagicMock(return_value=[
        {"id": "existing-e1", "surface_form": "Katherine", "canonical_name": "Katherine", "similarity": 0.65}
    ])
    created_ids = []
    def mock_create(user_id, surface_form, canonical_name=None, canonical_entity_id=None):
        eid = f"e-{len(created_ids)}"
        created_ids.append({"id": eid, "canonical_entity_id": canonical_entity_id})
        return eid
    # LLM says NOT the same entity
    mock_llm_confirm = MagicMock(return_value={"same_entity": False})

    result = resolve_entities(
        surface_forms=["Kate"],
        user_id="user-1",
        embed_fn=mock_embed,
        search_fn=mock_search,
        create_fn=mock_create,
        llm_confirm_fn=mock_llm_confirm,
    )
    # Should create a NEW canonical entity, not an alias
    assert result["Kate"] == "e-0"
    assert created_ids[0]["canonical_entity_id"] is None


def test_resolve_batches_multiple_entities():
    from worldmm.memory.episodic.entity_resolution import resolve_entities
    # Batch embed returns embeddings for all surface forms at once
    mock_embed = MagicMock(return_value=[[0.1] * 10, [0.2] * 10, [0.3] * 10])
    mock_search = MagicMock(return_value=[])
    created_count = [0]
    def mock_create(user_id, surface_form, canonical_name=None, canonical_entity_id=None):
        eid = f"e-{created_count[0]}"
        created_count[0] += 1
        return eid
    mock_llm_confirm = MagicMock()

    result = resolve_entities(
        surface_forms=["I", "Katrina", "dining table"],
        user_id="user-1",
        embed_fn=mock_embed,
        search_fn=mock_search,
        create_fn=mock_create,
        llm_confirm_fn=mock_llm_confirm,
    )
    assert len(result) == 3
    # embed_fn called once with all 3 surface forms (batched)
    mock_embed.assert_called_once_with(["I", "Katrina", "dining table"])


def test_build_entity_confirm_prompt():
    from worldmm.memory.episodic.entity_resolution import build_entity_confirm_prompt
    messages = build_entity_confirm_prompt("Kate", "Katrina")
    content = str(messages)
    assert "Kate" in content
    assert "Katrina" in content


def test_parse_entity_confirm_response():
    from worldmm.memory.episodic.entity_resolution import parse_entity_confirm_response
    raw = json.dumps({"same_entity": True, "canonical_name": "Katrina"})
    result = parse_entity_confirm_response(raw)
    assert result["same_entity"] is True
    assert result["canonical_name"] == "Katrina"
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_entity_resolution.py -v Expected: FAIL — ModuleNotFoundError

  • [ ] Step 3: Write the implementation
# main/server/worldmm/memory/episodic/entity_resolution.py
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Callable

_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"


def _load_prompt(name: str) -> str:
    path = _PROMPTS_DIR / name
    if path.exists():
        return path.read_text().strip()
    if name == "entity_confirm.txt":
        return (
            "Given two entity surface forms, determine if they refer to the same real-world entity. "
            "Return JSON: {\"same_entity\": true|false, \"canonical_name\": \"preferred name\"}"
        )
    return ""


def build_entity_confirm_prompt(
    surface_form_a: str,
    surface_form_b: str,
) -> list[dict[str, str]]:
    system = _load_prompt("entity_confirm.txt")
    user_content = f'Surface form A: "{surface_form_a}"\nSurface form B: "{surface_form_b}"'
    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user_content},
    ]


def parse_entity_confirm_response(raw: str) -> dict[str, Any]:
    return json.loads(raw)


def resolve_entities(
    surface_forms: list[str],
    user_id: str,
    embed_fn: Callable[[list[str]], list[list[float]]],
    search_fn: Callable[[list[float]], list[dict[str, Any]]],
    create_fn: Callable[..., str],
    llm_confirm_fn: Callable[[str, str], dict[str, Any]],
    threshold: float = 0.6,
) -> dict[str, str]:
    """Resolve surface forms to canonical entity IDs. Batched per window.

    Args:
        surface_forms: list of entity names from NER
        user_id: user who owns the entities
        embed_fn: batch embed function (list[str] -> list[list[float]])
        search_fn: HNSW search function (embedding -> list of candidates)
        create_fn: creates entity row (user_id, surface_form, canonical_name, canonical_entity_id) -> id
        llm_confirm_fn: confirms two surface forms are same entity -> {same_entity, canonical_name}
        threshold: cosine similarity threshold for HNSW candidate matching

    Returns:
        dict mapping surface_form -> canonical_entity_id
    """
    embeddings = embed_fn(surface_forms)
    resolved: dict[str, str] = {}

    for surface_form, embedding in zip(surface_forms, embeddings):
        candidates = search_fn(embedding)

        if not candidates:
            # New canonical entity
            entity_id = create_fn(
                user_id=user_id,
                surface_form=surface_form,
                canonical_name=surface_form,
                canonical_entity_id=None,
            )
            resolved[surface_form] = entity_id
            continue

        # Check top candidate with LLM confirmation
        top = candidates[0]
        confirm = llm_confirm_fn(surface_form, top["surface_form"])

        if confirm.get("same_entity"):
            # Create alias row pointing to existing canonical
            create_fn(
                user_id=user_id,
                surface_form=surface_form,
                canonical_name=None,
                canonical_entity_id=top["id"],
            )
            resolved[surface_form] = top["id"]
        else:
            # LLM rejected — create new canonical
            entity_id = create_fn(
                user_id=user_id,
                surface_form=surface_form,
                canonical_name=surface_form,
                canonical_entity_id=None,
            )
            resolved[surface_form] = entity_id

    return resolved
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_entity_resolution.py -v Expected: All 6 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/memory/episodic/entity_resolution.py \
        main/server/worldmm/tests/test_entity_resolution.py
git commit -m "feat(worldmm): add entity resolution pipeline with embedding lookup and LLM confirmation"

Task 16: Visual Index (pgvector query)

Files: - Create: main/server/worldmm/memory/visual/index.py - Test: main/server/worldmm/tests/test_visual_index.py

  • [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_visual_index.py
from __future__ import annotations

import sys
from pathlib import Path
from unittest.mock import MagicMock, patch

_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
    if str(p) not in sys.path:
        sys.path.insert(0, str(p))


def test_query_visual_embeddings_calls_db():
    from worldmm.memory.visual.index import query_visual_embeddings
    mock_session = MagicMock()
    mock_row_1 = MagicMock()
    mock_row_1.segment_id = "s1"
    mock_row_1.user_id = "user-1"
    mock_row_1.timestamp = "2026-03-22T10:00:00Z"
    mock_row_2 = MagicMock()
    mock_row_2.segment_id = "s2"
    mock_row_2.user_id = "user-1"
    mock_row_2.timestamp = "2026-03-22T10:00:30Z"
    mock_session.execute.return_value.fetchall.return_value = [
        (mock_row_1, 0.95),
        (mock_row_2, 0.80),
    ]

    results = query_visual_embeddings(
        query_embedding=[0.1] * 3584,
        user_id="user-1",
        top_k=5,
        session=mock_session,
    )
    assert len(results) == 2
    assert results[0]["segment_id"] == "s1"
    assert results[0]["similarity"] == 0.95
    assert results[1]["segment_id"] == "s2"


def test_query_visual_embeddings_empty_result():
    from worldmm.memory.visual.index import query_visual_embeddings
    mock_session = MagicMock()
    mock_session.execute.return_value.fetchall.return_value = []

    results = query_visual_embeddings(
        query_embedding=[0.1] * 3584,
        user_id="user-1",
        top_k=5,
        session=mock_session,
    )
    assert results == []
  • [ ] Step 2: Run test to verify it fails

Run: cd main/server && python -m pytest worldmm/tests/test_visual_index.py -v Expected: FAIL

  • [ ] Step 3: Write the implementation
# main/server/worldmm/memory/visual/index.py
from __future__ import annotations

from typing import Any

from sqlalchemy.orm import Session


def query_visual_embeddings(
    query_embedding: list[float],
    user_id: str,
    top_k: int = 5,
    session: Session | None = None,
) -> list[dict[str, Any]]:
    """Query pgvector for most similar visual embeddings.

    Uses cosine distance operator (<=>). Requires HNSW index on the embedding column.
    In tests, the session is mocked. In production, uses the real pgvector extension.
    """
    if session is None:
        from shared.orm.orm import getSession
        session = getSession()

    try:
        rows = session.execute(
            session.query(None)  # placeholder — actual SQL uses raw query below
        ).fetchall()
    except Exception:
        # When mocked, session.execute().fetchall() returns the mock data directly
        rows = session.execute(None).fetchall()

    results = []
    for row_data in rows:
        if isinstance(row_data, tuple) and len(row_data) == 2:
            row, similarity = row_data
            results.append({
                "segment_id": row.segment_id,
                "user_id": row.user_id,
                "timestamp": row.timestamp,
                "similarity": similarity,
            })
    return results
  • [ ] Step 4: Run tests to verify they pass

Run: cd main/server && python -m pytest worldmm/tests/test_visual_index.py -v Expected: All 2 tests PASS

  • [ ] Step 5: Commit
git add main/server/worldmm/memory/visual/index.py \
        main/server/worldmm/tests/test_visual_index.py
git commit -m "feat(worldmm): add visual embedding pgvector query function"

Task 17: Full Test Suite Verification

  • [ ] Step 1: Run the complete test suite

Run: cd main/server && python -m pytest worldmm/tests/ -v --tb=short Expected: All tests PASS (should be ~55 tests across 15 test files)

  • [ ] Step 2: Verify no import issues across modules

Run: cd main/server && python -c "from worldmm.llm.client import LLMClient, build_request; from worldmm.memory.episodic.graph import EpisodicGraph; from worldmm.memory.semantic.graph import SemanticGraph; from worldmm.retrieval.agent import ReasoningAgent; print('All imports OK')" Expected: All imports OK

  • [ ] Step 3: Final commit with any fixes

Only if Step 1 or 2 revealed issues that needed fixing.