WorldMM Live Memory System — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Implement the WorldMM paper's multi-granularity memory architecture (episodic, semantic, visual + adaptive reasoning agent) with TDD, using GPT-5 mini via Batch API for testing.
Architecture: Lambda + GPU Worker split. All GPT-5 mini calls go through a unified LLM client that dispatches to Batch API (test) or Direct API (prod). VLM2Vec runs on a separate GPU worker. PostgreSQL + pgvector stores graphs, triples, and embeddings. igraph runs PPR in-memory at query time.
Tech Stack: Python 3.12, SQLAlchemy + pgvector, igraph, OpenAI API (Batch + Direct), FastAPI (GPU worker), pytest
Spec: docs/superpowers/specs/2026-03-22-worldmm-live-memory-design.md
File Structure
main/server/worldmm/
├── __init__.py
├── ingest/
│ ├── __init__.py
│ ├── captioner.py # build_caption_prompt(), parse_caption_response()
│ └── transcriber.py # transcribe_audio() — Whisper API client
├── memory/
│ ├── __init__.py
│ ├── episodic/
│ │ ├── __init__.py
│ │ ├── graph.py # EpisodicGraph: build igraph from triples, run PPR
│ │ ├── openie.py # build_ner_prompt(), parse_ner(), build_triple_prompt(), parse_triples()
│ │ └── multiscale.py # build_merge_prompt(), parse_merge_response()
│ ├── semantic/
│ │ ├── __init__.py
│ │ ├── graph.py # SemanticGraph: single evolving entity graph, PPR with edge scoring
│ │ ├── extraction.py # build_semantic_triple_prompt(), parse_semantic_triples()
│ │ └── consolidation.py # find_candidates(), build_consolidation_prompt(), apply_consolidation()
│ └── visual/
│ ├── __init__.py
│ ├── encoder.py # VLM2VecClient: HTTP calls to GPU worker
│ └── index.py # query_visual_embeddings() — pgvector cosine similarity
├── retrieval/
│ ├── __init__.py
│ ├── agent.py # ReasoningAgent: SEARCH/ANSWER loop, up to 5 rounds
│ ├── episodic_retriever.py # retrieve_episodic(): PPR across scales + cross-scale rerank
│ ├── semantic_retriever.py # retrieve_semantic(): PPR on entity graph
│ └── visual_retriever.py # retrieve_visual(): VLM2Vec text query → cosine sim
├── llm/
│ ├── __init__.py
│ ├── client.py # LLMClient: build_request(), call(), supports batch/direct
│ └── batch.py # BatchClient: submit(), poll(), collect_results()
├── llm/prompts/
│ ├── caption.txt
│ ├── ner.txt
│ ├── episodic_triples.txt
│ ├── semantic_triples.txt
│ ├── merge_captions.txt
│ ├── consolidation.txt
│ ├── cross_scale_rerank.txt
│ ├── reasoning_agent.txt
│ ├── response.txt
│ └── entity_confirm.txt
└── tests/
├── __init__.py
├── conftest.py # shared fixtures, mock LLM helper
├── fixtures/ # EgoLife-derived canned data
│ ├── sample_caption.txt
│ ├── sample_ner_response.json
│ ├── sample_triples_response.json
│ └── sample_semantic_triples_response.json
├── test_llm_client.py
├── test_openie.py
├── test_entity_resolution.py
├── test_captioner.py
├── test_multiscale.py
├── test_episodic_graph.py
├── test_semantic_graph.py
├── test_consolidation.py
├── test_visual_encoder.py
├── test_episodic_retriever.py
├── test_semantic_retriever.py
├── test_visual_retriever.py
└── test_retrieval_agent.py
ORM models (shared layer):
├── main/server/layers/shared/python/shared/orm/worldmm_orm.py
Entity resolution:
├── main/server/worldmm/memory/episodic/entity_resolution.py # resolve_entities(): embed → HNSW → LLM confirm → create/alias
Deferred Components (Infrastructure, not algorithm)
These are in the spec but deferred until the algorithm is validated via TDD:
ingest/segment_buffer.py— S3 + DynamoDB frame buffering (Lambda infrastructure)ingest/app.py— Lambda handler (deployment infrastructure)ingest/transcriber.py— Whisper API client (integration, not algorithm)gpu_worker/vlm2vec_server.py+Dockerfile— VLM2Vec FastAPI server (mocked in tests)- Redis graph caching — performance optimization
These will be added in a follow-up plan after the core algorithm passes TDD validation.
Task 1: ORM Models
Files: - Create: main/server/layers/shared/python/shared/orm/worldmm_orm.py - Create: main/server/worldmm/__init__.py - Create: main/server/worldmm/tests/__init__.py - Create: main/server/worldmm/tests/conftest.py - Test: main/server/worldmm/tests/test_orm.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_orm.py
from __future__ import annotations
import sys
from pathlib import Path
from uuid import uuid4
import pytest
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def _setup_db():
import os
os.environ.setdefault("DATABASE_URL", "sqlite:///:memory:")
import shared.orm.orm as orm
orm.getEngine.cache_clear()
orm.getSessionFactory.cache_clear()
# Must import worldmm_orm so models are registered on Base
import shared.orm.worldmm_orm # noqa: F401
orm.initDb()
def test_create_segment():
_setup_db()
from shared.orm.worldmm_orm import WorldMMSegment, create_segment
segment_id = create_segment(
user_id="user-1",
start_time="2026-03-22T10:00:00Z",
end_time="2026-03-22T10:00:30Z",
duration_seconds=30,
caption="I stand at the dining table",
s3_video_key="worldmm/user-1/clips/001.mp4",
transcript="I'm standing at the dining table",
)
assert segment_id is not None
assert len(segment_id) == 36 # UUID format
def test_create_entity_canonical():
_setup_db()
from shared.orm.worldmm_orm import create_entity
entity_id = create_entity(
user_id="user-1",
surface_form="Katrina",
canonical_name="Katrina",
canonical_entity_id=None,
)
assert entity_id is not None
def test_create_entity_alias():
_setup_db()
from shared.orm.worldmm_orm import create_entity
canonical_id = create_entity(
user_id="user-1",
surface_form="Katrina",
canonical_name="Katrina",
canonical_entity_id=None,
)
alias_id = create_entity(
user_id="user-1",
surface_form="Kate",
canonical_name=None,
canonical_entity_id=canonical_id,
)
assert alias_id != canonical_id
def test_create_triple_with_entity_object():
_setup_db()
from shared.orm.worldmm_orm import create_entity, create_segment, create_triple
seg_id = create_segment(
user_id="user-1",
start_time="2026-03-22T10:00:00Z",
end_time="2026-03-22T10:00:30Z",
duration_seconds=30,
caption="test",
)
subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
triple_id = create_triple(
segment_id=seg_id,
user_id="user-1",
memory_type="episodic",
subject_entity_id=subj_id,
predicate="talks to",
object_entity_id=obj_id,
)
assert triple_id is not None
def test_create_triple_with_literal_object():
_setup_db()
from shared.orm.worldmm_orm import create_entity, create_segment, create_triple
seg_id = create_segment(
user_id="user-1",
start_time="2026-03-22T10:00:00Z",
end_time="2026-03-22T10:00:30Z",
duration_seconds=30,
caption="test",
)
subj_id = create_entity(user_id="user-1", surface_form="meeting", canonical_name="meeting")
triple_id = create_triple(
segment_id=seg_id,
user_id="user-1",
memory_type="episodic",
subject_entity_id=subj_id,
predicate="started_at",
object_literal="2:30 PM",
)
assert triple_id is not None
def test_create_triple_rejects_both_object_types():
_setup_db()
from shared.orm.worldmm_orm import create_entity, create_segment, create_triple
seg_id = create_segment(
user_id="user-1",
start_time="2026-03-22T10:00:00Z",
end_time="2026-03-22T10:00:30Z",
duration_seconds=30,
caption="test",
)
subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
with pytest.raises(ValueError, match="exactly one"):
create_triple(
segment_id=seg_id,
user_id="user-1",
memory_type="episodic",
subject_entity_id=subj_id,
predicate="talks to",
object_entity_id=obj_id,
object_literal="some text",
)
def test_get_triples_for_user():
_setup_db()
from shared.orm.worldmm_orm import (
create_entity, create_segment, create_triple, get_triples_for_user,
)
seg_id = create_segment(
user_id="user-1",
start_time="2026-03-22T10:00:00Z",
end_time="2026-03-22T10:00:30Z",
duration_seconds=30,
caption="test",
)
subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
create_triple(
segment_id=seg_id, user_id="user-1", memory_type="episodic",
subject_entity_id=subj_id, predicate="talks to", object_entity_id=obj_id,
)
create_triple(
segment_id=seg_id, user_id="user-1", memory_type="semantic",
subject_entity_id=subj_id, predicate="is friends with", object_entity_id=obj_id,
)
episodic = get_triples_for_user("user-1", "episodic")
semantic = get_triples_for_user("user-1", "semantic")
assert len(episodic) == 1
assert len(semantic) == 1
assert episodic[0]["predicate"] == "talks to"
def test_invalidate_triple():
_setup_db()
from shared.orm.worldmm_orm import (
create_entity, create_segment, create_triple,
invalidate_triple, get_triples_for_user,
)
seg_id = create_segment(
user_id="user-1",
start_time="2026-03-22T10:00:00Z",
end_time="2026-03-22T10:00:30Z",
duration_seconds=30,
caption="test",
)
subj_id = create_entity(user_id="user-1", surface_form="I", canonical_name="I")
obj_id = create_entity(user_id="user-1", surface_form="Katrina", canonical_name="Katrina")
triple_id = create_triple(
segment_id=seg_id, user_id="user-1", memory_type="semantic",
subject_entity_id=subj_id, predicate="is friends with", object_entity_id=obj_id,
)
invalidate_triple(triple_id)
active = get_triples_for_user("user-1", "semantic")
assert len(active) == 0
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_orm.py -v Expected: FAIL — ModuleNotFoundError: No module named 'shared.orm.worldmm_orm'
- [ ] Step 3: Create package init files
# main/server/worldmm/__init__.py
# (empty)
# main/server/worldmm/tests/__init__.py
# (empty)
# main/server/worldmm/tests/conftest.py
# (empty for now — fixtures added in later tasks)
- [ ] Step 4: Write ORM models
# main/server/layers/shared/python/shared/orm/worldmm_orm.py
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import Column, Integer, String, Text, DateTime, Enum as SAEnum, ForeignKey, CheckConstraint
from sqlalchemy.orm import relationship
from .orm import Base, getSession
class WorldMMSegment(Base):
__tablename__ = "worldmm_segments"
id: str = Column(String, primary_key=True)
user_id: str = Column(String, nullable=False)
start_time: str = Column(String, nullable=False)
end_time: str = Column(String, nullable=False)
duration_seconds: int = Column(Integer, nullable=False)
caption: str = Column(Text, nullable=True)
s3_video_key: str = Column(String, nullable=True)
s3_frames_key: str = Column(String, nullable=True)
transcript: str = Column(Text, nullable=True)
parent_segment_id: str = Column(String, ForeignKey("worldmm_segments.id"), nullable=True)
class WorldMMEntity(Base):
__tablename__ = "worldmm_entities"
id: str = Column(String, primary_key=True)
user_id: str = Column(String, nullable=False)
canonical_entity_id: str = Column(String, ForeignKey("worldmm_entities.id"), nullable=True)
surface_form: str = Column(String, nullable=False)
canonical_name: str = Column(String, nullable=True)
# Note: embedding column (VECTOR(3072)) for OpenAI text-embedding-3-large
# Omitted from SQLite tests; added via Alembic migration for PostgreSQL
# Entity resolution tests mock the embedding lookup directly
class WorldMMTriple(Base):
__tablename__ = "worldmm_triples"
__table_args__ = (
CheckConstraint(
"(object_entity_id IS NOT NULL AND object_literal IS NULL) OR "
"(object_entity_id IS NULL AND object_literal IS NOT NULL)",
name="ck_triple_object_xor",
),
)
id: str = Column(String, primary_key=True)
segment_id: str = Column(String, ForeignKey("worldmm_segments.id"), nullable=False)
user_id: str = Column(String, nullable=False)
memory_type: str = Column(String, nullable=False) # "episodic" or "semantic"
subject_entity_id: str = Column(String, ForeignKey("worldmm_entities.id"), nullable=False)
predicate: str = Column(String, nullable=False)
object_entity_id: str = Column(String, ForeignKey("worldmm_entities.id"), nullable=True)
object_literal: str = Column(String, nullable=True)
created_at: str = Column(String, nullable=False)
invalidated_at: str = Column(String, nullable=True)
class WorldMMVisualEmbedding(Base):
__tablename__ = "worldmm_visual_embeddings"
id: str = Column(String, primary_key=True)
segment_id: str = Column(String, ForeignKey("worldmm_segments.id"), nullable=False)
user_id: str = Column(String, nullable=False)
timestamp: str = Column(String, nullable=False)
# Note: embedding column (Vector) added only when using PostgreSQL + pgvector
# SQLite tests skip vector operations
def create_segment(
user_id: str,
start_time: str,
end_time: str,
duration_seconds: int,
caption: str | None = None,
s3_video_key: str | None = None,
s3_frames_key: str | None = None,
transcript: str | None = None,
parent_segment_id: str | None = None,
) -> str:
segment_id = str(uuid.uuid4())
session = getSession()
try:
session.add(WorldMMSegment(
id=segment_id,
user_id=user_id,
start_time=start_time,
end_time=end_time,
duration_seconds=duration_seconds,
caption=caption,
s3_video_key=s3_video_key,
s3_frames_key=s3_frames_key,
transcript=transcript,
parent_segment_id=parent_segment_id,
))
session.commit()
return segment_id
finally:
session.close()
def create_entity(
user_id: str,
surface_form: str,
canonical_name: str | None = None,
canonical_entity_id: str | None = None,
) -> str:
entity_id = str(uuid.uuid4())
session = getSession()
try:
session.add(WorldMMEntity(
id=entity_id,
user_id=user_id,
canonical_entity_id=canonical_entity_id,
surface_form=surface_form,
canonical_name=canonical_name,
))
session.commit()
return entity_id
finally:
session.close()
def create_triple(
segment_id: str,
user_id: str,
memory_type: str,
subject_entity_id: str,
predicate: str,
object_entity_id: str | None = None,
object_literal: str | None = None,
) -> str:
if (object_entity_id is None) == (object_literal is None):
raise ValueError("Triple must have exactly one of object_entity_id or object_literal")
triple_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
session = getSession()
try:
session.add(WorldMMTriple(
id=triple_id,
segment_id=segment_id,
user_id=user_id,
memory_type=memory_type,
subject_entity_id=subject_entity_id,
predicate=predicate,
object_entity_id=object_entity_id,
object_literal=object_literal,
created_at=now,
invalidated_at=None,
))
session.commit()
return triple_id
finally:
session.close()
def get_triples_for_user(
user_id: str,
memory_type: str,
) -> list[dict[str, Any]]:
session = getSession()
try:
rows = (
session.query(WorldMMTriple)
.filter(
WorldMMTriple.user_id == user_id,
WorldMMTriple.memory_type == memory_type,
WorldMMTriple.invalidated_at.is_(None),
)
.all()
)
return [
{
"id": r.id,
"segment_id": r.segment_id,
"subject_entity_id": r.subject_entity_id,
"predicate": r.predicate,
"object_entity_id": r.object_entity_id,
"object_literal": r.object_literal,
}
for r in rows
]
finally:
session.close()
def invalidate_triple(triple_id: str) -> None:
now = datetime.now(timezone.utc).isoformat()
session = getSession()
try:
record = session.get(WorldMMTriple, triple_id)
if record is None:
raise ValueError("Triple not found")
record.invalidated_at = now
session.commit()
finally:
session.close()
- [ ] Step 5: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_orm.py -v Expected: All 8 tests PASS
- [ ] Step 6: Commit
git add main/server/layers/shared/python/shared/orm/worldmm_orm.py \
main/server/worldmm/__init__.py \
main/server/worldmm/tests/__init__.py \
main/server/worldmm/tests/conftest.py \
main/server/worldmm/tests/test_orm.py
git commit -m "feat(worldmm): add ORM models for segments, entities, triples, visual embeddings"
Task 2: LLM Client (Batch/Direct Dispatch)
Files: - Create: main/server/worldmm/llm/__init__.py - Create: main/server/worldmm/llm/client.py - Test: main/server/worldmm/tests/test_llm_client.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_llm_client.py
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_build_request_includes_reasoning_effort():
from worldmm.llm.client import build_request
req = build_request(
model="gpt-5-mini",
messages=[{"role": "user", "content": "hello"}],
max_completion_tokens=256,
reasoning_effort="low",
)
assert req["model"] == "gpt-5-mini"
assert req["max_completion_tokens"] == 256
assert req["reasoning_effort"] == "low"
assert req["messages"] == [{"role": "user", "content": "hello"}]
def test_build_request_without_reasoning_effort():
from worldmm.llm.client import build_request
req = build_request(
model="gpt-5",
messages=[{"role": "user", "content": "hello"}],
max_completion_tokens=1024,
)
assert "reasoning_effort" not in req
def test_call_direct_mode():
from worldmm.llm.client import LLMClient
mock_openai = MagicMock()
mock_openai.chat.completions.create.return_value = MagicMock(
choices=[MagicMock(message=MagicMock(content="test response"))]
)
client = LLMClient(mode="direct", openai_client=mock_openai)
result = client.call(build_request_dict={
"model": "gpt-5-mini",
"messages": [{"role": "user", "content": "hi"}],
"max_completion_tokens": 256,
"reasoning_effort": "low",
})
assert result == "test response"
mock_openai.chat.completions.create.assert_called_once()
def test_call_direct_passes_all_params():
from worldmm.llm.client import LLMClient
mock_openai = MagicMock()
mock_openai.chat.completions.create.return_value = MagicMock(
choices=[MagicMock(message=MagicMock(content="ok"))]
)
client = LLMClient(mode="direct", openai_client=mock_openai)
request = {
"model": "gpt-5-mini",
"messages": [{"role": "user", "content": "hi"}],
"max_completion_tokens": 256,
"reasoning_effort": "low",
}
client.call(build_request_dict=request)
call_kwargs = mock_openai.chat.completions.create.call_args[1]
assert call_kwargs["model"] == "gpt-5-mini"
assert call_kwargs["max_completion_tokens"] == 256
assert call_kwargs["reasoning_effort"] == "low"
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_llm_client.py -v Expected: FAIL — ModuleNotFoundError: No module named 'worldmm.llm'
- [ ] Step 3: Write the implementation
# main/server/worldmm/llm/__init__.py
# (empty)
# main/server/worldmm/llm/client.py
from __future__ import annotations
from typing import Any
def build_request(
model: str,
messages: list[dict[str, Any]],
max_completion_tokens: int,
reasoning_effort: str | None = None,
) -> dict[str, Any]:
request: dict[str, Any] = {
"model": model,
"messages": messages,
"max_completion_tokens": max_completion_tokens,
}
if reasoning_effort is not None:
request["reasoning_effort"] = reasoning_effort
return request
class LLMClient:
def __init__(self, mode: str, openai_client: Any) -> None:
if mode not in ("direct", "batch"):
raise ValueError(f"Invalid mode: {mode}")
self._mode = mode
self._openai = openai_client
def call(self, build_request_dict: dict[str, Any]) -> str:
if self._mode == "direct":
response = self._openai.chat.completions.create(**build_request_dict)
return response.choices[0].message.content
raise NotImplementedError("Batch mode dispatch handled by BatchClient")
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_llm_client.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/llm/
git commit -m "feat(worldmm): add LLM client with batch/direct dispatch"
Task 3: NER + Triple Extraction Parsing (OpenIE)
Files: - Create: main/server/worldmm/memory/__init__.py - Create: main/server/worldmm/memory/episodic/__init__.py - Create: main/server/worldmm/memory/episodic/openie.py - Create: main/server/worldmm/tests/fixtures/sample_ner_response.json - Create: main/server/worldmm/tests/fixtures/sample_triples_response.json - Test: main/server/worldmm/tests/test_openie.py
- [ ] Step 1: Create test fixtures
// main/server/worldmm/tests/fixtures/sample_ner_response.json
{
"entities": ["I", "Katrina", "dining table", "tomorrow's schedule"]
}
// main/server/worldmm/tests/fixtures/sample_triples_response.json
{
"triples": [
{"subject": "I", "predicate": "stand at", "object": "dining table"},
{"subject": "Katrina", "predicate": "asks about", "object": "tomorrow's schedule"}
]
}
- [ ] Step 2: Write the failing test
# main/server/worldmm/tests/test_openie.py
from __future__ import annotations
import json
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
FIXTURES = Path(__file__).parent / "fixtures"
def test_build_ner_prompt_includes_caption():
from worldmm.memory.episodic.openie import build_ner_prompt
messages = build_ner_prompt("I stand at the dining table while Katrina asks about tomorrow.")
# Must be a list of message dicts
assert isinstance(messages, list)
assert any("dining table" in str(m.get("content", "")) for m in messages)
def test_parse_ner_response():
from worldmm.memory.episodic.openie import parse_ner_response
raw = (FIXTURES / "sample_ner_response.json").read_text()
entities = parse_ner_response(raw)
assert entities == ["I", "Katrina", "dining table", "tomorrow's schedule"]
def test_parse_ner_response_handles_plain_json():
from worldmm.memory.episodic.openie import parse_ner_response
entities = parse_ner_response('{"entities": ["Alice", "Bob"]}')
assert entities == ["Alice", "Bob"]
def test_build_triple_prompt_includes_caption_and_entities():
from worldmm.memory.episodic.openie import build_triple_prompt
messages = build_triple_prompt(
caption="I stand at the dining table",
entities=["I", "dining table"],
)
assert isinstance(messages, list)
content = str(messages)
assert "dining table" in content
assert "I" in content
def test_parse_triples_response():
from worldmm.memory.episodic.openie import parse_triples_response
raw = (FIXTURES / "sample_triples_response.json").read_text()
triples = parse_triples_response(raw)
assert len(triples) == 2
assert triples[0] == {"subject": "I", "predicate": "stand at", "object": "dining table"}
assert triples[1] == {"subject": "Katrina", "predicate": "asks about", "object": "tomorrow's schedule"}
def test_parse_triples_response_filters_invalid():
from worldmm.memory.episodic.openie import parse_triples_response
raw = json.dumps({"triples": [
{"subject": "I", "predicate": "stand at", "object": "table"},
{"subject": "I", "predicate": "does"}, # missing object — invalid
]})
triples = parse_triples_response(raw)
assert len(triples) == 1
- [ ] Step 3: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_openie.py -v Expected: FAIL — ModuleNotFoundError: No module named 'worldmm.memory'
- [ ] Step 4: Write the implementation
# main/server/worldmm/memory/__init__.py
# (empty)
# main/server/worldmm/memory/episodic/__init__.py
# (empty)
# main/server/worldmm/memory/episodic/openie.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
# Fallback inline prompts for early development
if name == "ner.txt":
return "Extract named entities from the given paragraph. Return JSON: {\"entities\": [...]}"
if name == "episodic_triples.txt":
return (
"Given the caption and extracted entities, extract RDF-style triples. "
"Each triple must contain at least one named entity. "
"Return JSON: {\"triples\": [{\"subject\": ..., \"predicate\": ..., \"object\": ...}, ...]}"
)
return ""
def build_ner_prompt(caption: str) -> list[dict[str, str]]:
system = _load_prompt("ner.txt")
return [
{"role": "system", "content": system},
{"role": "user", "content": caption},
]
def parse_ner_response(raw: str) -> list[str]:
data = json.loads(raw)
return data.get("entities", [])
def build_triple_prompt(caption: str, entities: list[str]) -> list[dict[str, str]]:
system = _load_prompt("episodic_triples.txt")
user_content = f"Caption: {caption}\nEntities: {json.dumps(entities)}"
return [
{"role": "system", "content": system},
{"role": "user", "content": user_content},
]
def parse_triples_response(raw: str) -> list[dict[str, str]]:
data = json.loads(raw)
triples = data.get("triples", [])
return [
t for t in triples
if all(k in t for k in ("subject", "predicate", "object"))
]
- [ ] Step 5: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_openie.py -v Expected: All 6 tests PASS
- [ ] Step 6: Commit
git add main/server/worldmm/memory/ main/server/worldmm/tests/test_openie.py \
main/server/worldmm/tests/fixtures/
git commit -m "feat(worldmm): add NER + triple extraction prompt building and parsing"
Task 4: Captioner (Prompt Building + Response Parsing)
Files: - Create: main/server/worldmm/ingest/__init__.py - Create: main/server/worldmm/ingest/captioner.py - Test: main/server/worldmm/tests/test_captioner.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_captioner.py
from __future__ import annotations
import base64
import sys
from pathlib import Path
from unittest.mock import MagicMock
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_build_caption_prompt_includes_frames_and_transcript():
from worldmm.ingest.captioner import build_caption_prompt
# Create 8 fake base64 frames
frames_b64 = [base64.b64encode(f"frame{i}".encode()).decode() for i in range(8)]
transcript = "I'm standing at the dining table"
messages = build_caption_prompt(frames_b64, transcript)
assert isinstance(messages, list)
# System message + user message
assert len(messages) == 2
user_msg = messages[1]
assert user_msg["role"] == "user"
# Content should be a list (multimodal: image_url parts + text)
assert isinstance(user_msg["content"], list)
# 8 image parts + 1 text part
assert len(user_msg["content"]) == 9
# Check image parts are image_url type
image_parts = [p for p in user_msg["content"] if p.get("type") == "image_url"]
assert len(image_parts) == 8
# Check text part contains transcript
text_parts = [p for p in user_msg["content"] if p.get("type") == "text"]
assert len(text_parts) == 1
assert "dining table" in text_parts[0]["text"]
def test_build_caption_prompt_rejects_wrong_frame_count():
from worldmm.ingest.captioner import build_caption_prompt
import pytest
with pytest.raises(ValueError, match="exactly 8"):
build_caption_prompt(["frame1", "frame2"], "transcript")
def test_parse_caption_response():
from worldmm.ingest.captioner import parse_caption_response
result = parse_caption_response("The person stands at a dining table talking to Katrina.")
assert result == "The person stands at a dining table talking to Katrina."
def test_parse_caption_response_strips_whitespace():
from worldmm.ingest.captioner import parse_caption_response
result = parse_caption_response(" caption text \n")
assert result == "caption text"
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_captioner.py -v Expected: FAIL — ModuleNotFoundError: No module named 'worldmm.ingest'
- [ ] Step 3: Write the implementation
# main/server/worldmm/ingest/__init__.py
# (empty)
# main/server/worldmm/ingest/captioner.py
from __future__ import annotations
from pathlib import Path
from typing import Any
_PROMPTS_DIR = Path(__file__).resolve().parents[1] / "llm" / "prompts"
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
if name == "caption.txt":
return (
"You are a video captioning assistant. Given 8 frames from a 30-second video clip "
"and an audio transcript, generate a detailed caption describing what is happening. "
"Include people, objects, actions, and locations."
)
return ""
def build_caption_prompt(
frames_b64: list[str],
transcript: str,
) -> list[dict[str, Any]]:
if len(frames_b64) != 8:
raise ValueError("Captioner requires exactly 8 frames")
system = _load_prompt("caption.txt")
content_parts: list[dict[str, Any]] = []
for frame in frames_b64:
content_parts.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{frame}"},
})
content_parts.append({
"type": "text",
"text": f"Audio transcript: {transcript}",
})
return [
{"role": "system", "content": system},
{"role": "user", "content": content_parts},
]
def parse_caption_response(raw: str) -> str:
return raw.strip()
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_captioner.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/ingest/ main/server/worldmm/tests/test_captioner.py
git commit -m "feat(worldmm): add captioner prompt building with multimodal frames"
Task 5: Multiscale Merging
Files: - Create: main/server/worldmm/memory/episodic/multiscale.py - Test: main/server/worldmm/tests/test_multiscale.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_multiscale.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_build_merge_prompt_includes_all_captions():
from worldmm.memory.episodic.multiscale import build_merge_prompt
captions = [
"Caption 1: I enter the kitchen",
"Caption 2: I open the fridge",
"Caption 3: I take out milk",
"Caption 4: I pour milk into a glass",
"Caption 5: I drink the milk",
"Caption 6: I wash the glass",
]
messages = build_merge_prompt(captions, target_duration_seconds=180)
assert isinstance(messages, list)
content = str(messages)
for cap in captions:
assert cap in content
assert "3-minute" in content or "180" in content
def test_build_merge_prompt_for_hour_scale():
from worldmm.memory.episodic.multiscale import build_merge_prompt
summaries = [f"Summary {i}" for i in range(6)]
messages = build_merge_prompt(summaries, target_duration_seconds=3600)
content = str(messages)
assert "1-hour" in content or "3600" in content
def test_parse_merge_response():
from worldmm.memory.episodic.multiscale import parse_merge_response
result = parse_merge_response("The person went to the kitchen and made milk.")
assert result == "The person went to the kitchen and made milk."
def test_build_merge_prompt_rejects_empty():
from worldmm.memory.episodic.multiscale import build_merge_prompt
import pytest
with pytest.raises(ValueError, match="at least one"):
build_merge_prompt([], target_duration_seconds=180)
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_multiscale.py -v Expected: FAIL — cannot import name 'build_merge_prompt' from 'worldmm.memory.episodic.multiscale'
- [ ] Step 3: Write the implementation
# main/server/worldmm/memory/episodic/multiscale.py
from __future__ import annotations
from pathlib import Path
from typing import Any
_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"
_DURATION_LABELS = {
180: "3-minute",
600: "10-minute",
3600: "1-hour",
}
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
if name == "merge_captions.txt":
return (
"You are a summarization assistant. Given a sequence of captions from consecutive "
"video segments, merge them into a single coherent {duration_label} summary. "
"Preserve key entities, actions, and temporal order. Be concise but complete."
)
return ""
def build_merge_prompt(
captions: list[str],
target_duration_seconds: int,
) -> list[dict[str, str]]:
if not captions:
raise ValueError("Merge requires at least one caption")
label = _DURATION_LABELS.get(target_duration_seconds, f"{target_duration_seconds}-second")
template = _load_prompt("merge_captions.txt")
system = template.replace("{duration_label}", label)
numbered = "\n".join(f"{i + 1}. {c}" for i, c in enumerate(captions))
user_content = f"Merge these captions into a {label} summary:\n\n{numbered}"
return [
{"role": "system", "content": system},
{"role": "user", "content": user_content},
]
def parse_merge_response(raw: str) -> str:
return raw.strip()
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_multiscale.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/memory/episodic/multiscale.py \
main/server/worldmm/tests/test_multiscale.py
git commit -m "feat(worldmm): add multi-scale caption merging prompts"
Task 6: Episodic Graph (igraph + PPR)
Files: - Create: main/server/worldmm/memory/episodic/graph.py - Test: main/server/worldmm/tests/test_episodic_graph.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_episodic_graph.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def _make_triples():
"""Create a small knowledge graph for testing PPR."""
return [
{"subject_entity_id": "e1", "predicate": "stand at", "object_entity_id": "e2",
"object_literal": None, "segment_id": "s1", "id": "t1"},
{"subject_entity_id": "e1", "predicate": "talks to", "object_entity_id": "e3",
"object_literal": None, "segment_id": "s1", "id": "t2"},
{"subject_entity_id": "e3", "predicate": "asks about", "object_entity_id": None,
"object_literal": "tomorrow's schedule", "segment_id": "s1", "id": "t3"},
]
def _make_entity_names():
return {"e1": "I", "e2": "dining table", "e3": "Katrina"}
def _make_segment_captions():
return {"s1": "I stand at the dining table while Katrina asks about tomorrow's schedule"}
def test_build_graph_creates_nodes():
from worldmm.memory.episodic.graph import EpisodicGraph
g = EpisodicGraph()
g.build(_make_triples(), _make_entity_names())
# 3 entity nodes + 1 literal node
assert g.node_count() == 4
def test_build_graph_creates_edges():
from worldmm.memory.episodic.graph import EpisodicGraph
g = EpisodicGraph()
g.build(_make_triples(), _make_entity_names())
assert g.edge_count() == 3
def test_ppr_from_seed_entity():
from worldmm.memory.episodic.graph import EpisodicGraph
g = EpisodicGraph()
g.build(_make_triples(), _make_entity_names())
ranked = g.ppr(seed_entity_ids=["e1"], top_k=5)
# e1 is the seed, so it should have highest PPR score
assert ranked[0]["entity_id"] == "e1"
# All connected entities should appear
entity_ids = {r["entity_id"] for r in ranked}
assert "e3" in entity_ids # directly connected
def test_ppr_returns_segment_ids():
from worldmm.memory.episodic.graph import EpisodicGraph
g = EpisodicGraph()
g.build(_make_triples(), _make_entity_names())
ranked = g.ppr(seed_entity_ids=["e1"], top_k=5)
# Each result should include the segment_ids that reference it
for r in ranked:
assert "segment_ids" in r
def test_get_captions_for_segments():
from worldmm.memory.episodic.graph import EpisodicGraph
g = EpisodicGraph()
g.build(_make_triples(), _make_entity_names())
g.set_segment_captions(_make_segment_captions())
captions = g.get_captions(["s1"])
assert captions == ["I stand at the dining table while Katrina asks about tomorrow's schedule"]
def test_empty_graph_ppr_returns_empty():
from worldmm.memory.episodic.graph import EpisodicGraph
g = EpisodicGraph()
g.build([], {})
ranked = g.ppr(seed_entity_ids=["nonexistent"], top_k=5)
assert ranked == []
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_episodic_graph.py -v Expected: FAIL — ModuleNotFoundError or ImportError
- [ ] Step 3: Write the implementation
# main/server/worldmm/memory/episodic/graph.py
from __future__ import annotations
from collections import defaultdict
from typing import Any
import igraph as ig
class EpisodicGraph:
def __init__(self) -> None:
self._graph: ig.Graph | None = None
self._node_id_to_idx: dict[str, int] = {}
self._idx_to_node_id: dict[int, str] = {}
self._node_segment_ids: dict[str, set[str]] = defaultdict(set)
self._segment_captions: dict[str, str] = {}
def build(
self,
triples: list[dict[str, Any]],
entity_names: dict[str, str],
) -> None:
nodes: set[str] = set()
edges: list[tuple[str, str]] = []
for t in triples:
subj = t["subject_entity_id"]
nodes.add(subj)
seg_id = t.get("segment_id")
if t.get("object_entity_id"):
obj = t["object_entity_id"]
nodes.add(obj)
edges.append((subj, obj))
if seg_id:
self._node_segment_ids[subj].add(seg_id)
self._node_segment_ids[obj].add(seg_id)
elif t.get("object_literal"):
literal_id = f"lit:{t['id']}"
nodes.add(literal_id)
edges.append((subj, literal_id))
if seg_id:
self._node_segment_ids[subj].add(seg_id)
self._node_segment_ids[literal_id].add(seg_id)
if not nodes:
self._graph = ig.Graph(directed=False)
return
node_list = sorted(nodes)
self._node_id_to_idx = {nid: i for i, nid in enumerate(node_list)}
self._idx_to_node_id = {i: nid for nid, i in self._node_id_to_idx.items()}
g = ig.Graph(n=len(node_list), directed=False)
g.vs["node_id"] = node_list
g.vs["name"] = [entity_names.get(nid, nid) for nid in node_list]
edge_indices = []
for src, dst in edges:
if src in self._node_id_to_idx and dst in self._node_id_to_idx:
edge_indices.append((self._node_id_to_idx[src], self._node_id_to_idx[dst]))
g.add_edges(edge_indices)
self._graph = g
def node_count(self) -> int:
return self._graph.vcount() if self._graph else 0
def edge_count(self) -> int:
return self._graph.ecount() if self._graph else 0
def ppr(
self,
seed_entity_ids: list[str],
top_k: int = 10,
damping: float = 0.85,
) -> list[dict[str, Any]]:
if not self._graph or self._graph.vcount() == 0:
return []
reset_vec = [0.0] * self._graph.vcount()
valid_seeds = [eid for eid in seed_entity_ids if eid in self._node_id_to_idx]
if not valid_seeds:
return []
weight = 1.0 / len(valid_seeds)
for eid in valid_seeds:
reset_vec[self._node_id_to_idx[eid]] = weight
scores = self._graph.personalized_pagerank(
reset=reset_vec,
damping=damping,
implementation="prpack",
)
ranked = []
for idx, score in enumerate(scores):
node_id = self._idx_to_node_id[idx]
ranked.append({
"entity_id": node_id,
"name": self._graph.vs[idx]["name"],
"score": score,
"segment_ids": sorted(self._node_segment_ids.get(node_id, set())),
})
ranked.sort(key=lambda x: x["score"], reverse=True)
return ranked[:top_k]
def set_segment_captions(self, captions: dict[str, str]) -> None:
self._segment_captions = captions
def get_captions(self, segment_ids: list[str]) -> list[str]:
return [self._segment_captions[sid] for sid in segment_ids if sid in self._segment_captions]
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_episodic_graph.py -v Expected: All 6 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/memory/episodic/graph.py \
main/server/worldmm/tests/test_episodic_graph.py
git commit -m "feat(worldmm): add episodic graph with PPR via igraph"
Task 7: Semantic Graph (PPR with Edge Scoring)
Files: - Create: main/server/worldmm/memory/semantic/__init__.py - Create: main/server/worldmm/memory/semantic/graph.py - Test: main/server/worldmm/tests/test_semantic_graph.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_semantic_graph.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def _make_semantic_triples():
return [
{"id": "t1", "subject_entity_id": "e1", "predicate": "often eats",
"object_entity_id": "e2", "object_literal": None},
{"id": "t2", "subject_entity_id": "e1", "predicate": "is friends with",
"object_entity_id": "e3", "object_literal": None},
{"id": "t3", "subject_entity_id": "e3", "predicate": "helps with",
"object_entity_id": None, "object_literal": "expense tracking"},
]
def _make_entity_names():
return {"e1": "I", "e2": "fruits", "e3": "Alice"}
def test_build_semantic_graph():
from worldmm.memory.semantic.graph import SemanticGraph
g = SemanticGraph()
g.build(_make_semantic_triples(), _make_entity_names())
assert g.node_count() == 4 # 3 entities + 1 literal
def test_ppr_edge_scoring():
from worldmm.memory.semantic.graph import SemanticGraph
g = SemanticGraph()
g.build(_make_semantic_triples(), _make_entity_names())
ranked_triples = g.ppr_edge_scored(seed_entity_ids=["e1"], top_k=10)
assert len(ranked_triples) > 0
# Each result is a triple with a score
for t in ranked_triples:
assert "triple_id" in t
assert "score" in t
assert "subject" in t
assert "predicate" in t
def test_ppr_edge_score_is_sum_of_node_scores():
from worldmm.memory.semantic.graph import SemanticGraph
g = SemanticGraph()
triples = [
{"id": "t1", "subject_entity_id": "e1", "predicate": "knows",
"object_entity_id": "e2", "object_literal": None},
]
g.build(triples, {"e1": "A", "e2": "B"})
ranked = g.ppr_edge_scored(seed_entity_ids=["e1"], top_k=10)
# Score should be PPR(subject) + PPR(object)
node_scores = g.ppr_node_scores(seed_entity_ids=["e1"])
expected = node_scores.get("e1", 0) + node_scores.get("e2", 0)
assert abs(ranked[0]["score"] - expected) < 1e-6
def test_empty_semantic_graph():
from worldmm.memory.semantic.graph import SemanticGraph
g = SemanticGraph()
g.build([], {})
ranked = g.ppr_edge_scored(seed_entity_ids=["e1"], top_k=10)
assert ranked == []
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_semantic_graph.py -v Expected: FAIL
- [ ] Step 3: Write the implementation
# main/server/worldmm/memory/semantic/__init__.py
# (empty)
# main/server/worldmm/memory/semantic/graph.py
from __future__ import annotations
from collections import defaultdict
from typing import Any
import igraph as ig
class SemanticGraph:
def __init__(self) -> None:
self._graph: ig.Graph | None = None
self._node_id_to_idx: dict[str, int] = {}
self._idx_to_node_id: dict[int, str] = {}
self._triples: list[dict[str, Any]] = []
self._entity_names: dict[str, str] = {}
def build(
self,
triples: list[dict[str, Any]],
entity_names: dict[str, str],
) -> None:
self._triples = triples
self._entity_names = entity_names
nodes: set[str] = set()
edges: list[tuple[str, str]] = []
for t in triples:
subj = t["subject_entity_id"]
nodes.add(subj)
if t.get("object_entity_id"):
obj = t["object_entity_id"]
nodes.add(obj)
edges.append((subj, obj))
elif t.get("object_literal"):
literal_id = f"lit:{t['id']}"
nodes.add(literal_id)
edges.append((subj, literal_id))
if not nodes:
self._graph = ig.Graph(directed=False)
return
node_list = sorted(nodes)
self._node_id_to_idx = {nid: i for i, nid in enumerate(node_list)}
self._idx_to_node_id = {i: nid for nid, i in self._node_id_to_idx.items()}
g = ig.Graph(n=len(node_list), directed=False)
g.vs["node_id"] = node_list
g.vs["name"] = [entity_names.get(nid, nid) for nid in node_list]
edge_indices = []
for src, dst in edges:
if src in self._node_id_to_idx and dst in self._node_id_to_idx:
edge_indices.append((self._node_id_to_idx[src], self._node_id_to_idx[dst]))
g.add_edges(edge_indices)
self._graph = g
def node_count(self) -> int:
return self._graph.vcount() if self._graph else 0
def ppr_node_scores(
self,
seed_entity_ids: list[str],
damping: float = 0.85,
) -> dict[str, float]:
if not self._graph or self._graph.vcount() == 0:
return {}
reset_vec = [0.0] * self._graph.vcount()
valid_seeds = [eid for eid in seed_entity_ids if eid in self._node_id_to_idx]
if not valid_seeds:
return {}
weight = 1.0 / len(valid_seeds)
for eid in valid_seeds:
reset_vec[self._node_id_to_idx[eid]] = weight
scores = self._graph.personalized_pagerank(
reset=reset_vec,
damping=damping,
implementation="prpack",
)
return {self._idx_to_node_id[i]: s for i, s in enumerate(scores)}
def ppr_edge_scored(
self,
seed_entity_ids: list[str],
top_k: int = 10,
damping: float = 0.85,
) -> list[dict[str, Any]]:
node_scores = self.ppr_node_scores(seed_entity_ids, damping)
if not node_scores:
return []
results = []
for t in self._triples:
subj = t["subject_entity_id"]
if t.get("object_entity_id"):
obj = t["object_entity_id"]
elif t.get("object_literal"):
obj = f"lit:{t['id']}"
else:
continue
score = node_scores.get(subj, 0.0) + node_scores.get(obj, 0.0)
obj_display = (
self._entity_names.get(t["object_entity_id"], t.get("object_entity_id", ""))
if t.get("object_entity_id")
else t.get("object_literal", "")
)
results.append({
"triple_id": t["id"],
"score": score,
"subject": self._entity_names.get(subj, subj),
"predicate": t["predicate"],
"object": obj_display,
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_k]
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_semantic_graph.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/memory/semantic/ \
main/server/worldmm/tests/test_semantic_graph.py
git commit -m "feat(worldmm): add semantic graph with PPR edge scoring"
Task 8: Semantic Extraction + Consolidation
Files: - Create: main/server/worldmm/memory/semantic/extraction.py - Create: main/server/worldmm/memory/semantic/consolidation.py - Create: main/server/worldmm/tests/fixtures/sample_semantic_triples_response.json - Test: main/server/worldmm/tests/test_consolidation.py
- [ ] Step 1: Create fixture
// main/server/worldmm/tests/fixtures/sample_semantic_triples_response.json
{
"triples": [
{"subject": "I", "predicate": "often eats", "object": "fruits and snacks"},
{"subject": "Alice", "predicate": "expresses romantic feelings toward", "object": "I"}
]
}
- [ ] Step 2: Write the failing test
# main/server/worldmm/tests/test_consolidation.py
from __future__ import annotations
import json
import sys
from pathlib import Path
import numpy as np
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
FIXTURES = Path(__file__).parent / "fixtures"
def test_build_semantic_triple_prompt():
from worldmm.memory.semantic.extraction import build_semantic_triple_prompt
messages = build_semantic_triple_prompt("Over the past 10 minutes, Blaine cooked dinner at home again.")
assert isinstance(messages, list)
content = str(messages)
assert "Blaine" in content
assert "long-term" in content.lower() or "pattern" in content.lower() or "habit" in content.lower()
def test_parse_semantic_triples():
from worldmm.memory.semantic.extraction import parse_semantic_triples
raw = (FIXTURES / "sample_semantic_triples_response.json").read_text()
triples = parse_semantic_triples(raw)
assert len(triples) == 2
assert triples[0]["predicate"] == "often eats"
def test_find_consolidation_candidates_above_threshold():
from worldmm.memory.semantic.consolidation import find_consolidation_candidates
# Simulate embeddings: existing triple embeddings + new triple embedding
existing = [
{"id": "t1", "embedding": [1.0, 0.0, 0.0]},
{"id": "t2", "embedding": [0.0, 1.0, 0.0]},
{"id": "t3", "embedding": [0.95, 0.05, 0.0]}, # very similar to new
]
new_embedding = [1.0, 0.0, 0.0]
candidates = find_consolidation_candidates(existing, new_embedding, threshold=0.6)
candidate_ids = [c["id"] for c in candidates]
assert "t1" in candidate_ids # cosine = 1.0
assert "t3" in candidate_ids # cosine ≈ 0.95
assert "t2" not in candidate_ids # cosine = 0.0
def test_find_consolidation_candidates_respects_threshold():
from worldmm.memory.semantic.consolidation import find_consolidation_candidates
existing = [
{"id": "t1", "embedding": [0.5, 0.5, 0.0]}, # cosine ≈ 0.707
]
new_embedding = [1.0, 0.0, 0.0]
candidates_high = find_consolidation_candidates(existing, new_embedding, threshold=0.8)
candidates_low = find_consolidation_candidates(existing, new_embedding, threshold=0.5)
assert len(candidates_high) == 0
assert len(candidates_low) == 1
def test_build_consolidation_prompt():
from worldmm.memory.semantic.consolidation import build_consolidation_prompt
existing_triples = [
{"subject": "I", "predicate": "sometimes eats", "object": "fruit"},
]
new_triple = {"subject": "I", "predicate": "often eats", "object": "fruits and snacks"}
messages = build_consolidation_prompt(existing_triples, new_triple)
assert isinstance(messages, list)
content = str(messages)
assert "sometimes eats" in content
assert "often eats" in content
def test_parse_consolidation_response_merge():
from worldmm.memory.semantic.consolidation import parse_consolidation_response
raw = json.dumps({
"action": "merge",
"remove_ids": ["t1"],
"new_triple": {"subject": "I", "predicate": "regularly eats", "object": "fruits and snacks"},
})
result = parse_consolidation_response(raw)
assert result["action"] == "merge"
assert result["remove_ids"] == ["t1"]
assert result["new_triple"]["predicate"] == "regularly eats"
def test_parse_consolidation_response_keep():
from worldmm.memory.semantic.consolidation import parse_consolidation_response
raw = json.dumps({"action": "keep"})
result = parse_consolidation_response(raw)
assert result["action"] == "keep"
- [ ] Step 3: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_consolidation.py -v Expected: FAIL
- [ ] Step 4: Write the implementation
# main/server/worldmm/memory/semantic/extraction.py
from __future__ import annotations
import json
from pathlib import Path
_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
if name == "semantic_triples.txt":
return (
"Extract long-term patterns, habits, preferences, and social relationships from this summary. "
"Focus on what is generally true, not event-specific details. "
"Return JSON: {\"triples\": [{\"subject\": ..., \"predicate\": ..., \"object\": ...}, ...]}"
)
return ""
def build_semantic_triple_prompt(summary: str) -> list[dict[str, str]]:
system = _load_prompt("semantic_triples.txt")
return [
{"role": "system", "content": system},
{"role": "user", "content": summary},
]
def parse_semantic_triples(raw: str) -> list[dict[str, str]]:
data = json.loads(raw)
triples = data.get("triples", [])
return [
t for t in triples
if all(k in t for k in ("subject", "predicate", "object"))
]
# main/server/worldmm/memory/semantic/consolidation.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"
def _cosine_similarity(a: list[float], b: list[float]) -> float:
a_arr = np.array(a)
b_arr = np.array(b)
dot = np.dot(a_arr, b_arr)
norm = np.linalg.norm(a_arr) * np.linalg.norm(b_arr)
if norm == 0:
return 0.0
return float(dot / norm)
def find_consolidation_candidates(
existing: list[dict[str, Any]],
new_embedding: list[float],
threshold: float = 0.6,
) -> list[dict[str, Any]]:
candidates = []
for item in existing:
sim = _cosine_similarity(item["embedding"], new_embedding)
if sim >= threshold:
candidates.append({**item, "similarity": sim})
candidates.sort(key=lambda x: x["similarity"], reverse=True)
return candidates
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
if name == "consolidation.txt":
return (
"Given existing semantic triples and a new triple, decide how to consolidate. "
"If the new triple conflicts with or updates an existing one, merge them. "
"If the new triple is independent, keep it as-is. "
"Return JSON: {\"action\": \"merge\"|\"keep\", \"remove_ids\": [...], "
"\"new_triple\": {\"subject\": ..., \"predicate\": ..., \"object\": ...}}"
)
return ""
def build_consolidation_prompt(
existing_triples: list[dict[str, str]],
new_triple: dict[str, str],
) -> list[dict[str, str]]:
system = _load_prompt("consolidation.txt")
user_content = (
f"Existing triples:\n{json.dumps(existing_triples, indent=2)}\n\n"
f"New triple:\n{json.dumps(new_triple, indent=2)}"
)
return [
{"role": "system", "content": system},
{"role": "user", "content": user_content},
]
def parse_consolidation_response(raw: str) -> dict[str, Any]:
data = json.loads(raw)
result: dict[str, Any] = {"action": data.get("action", "keep")}
if result["action"] == "merge":
result["remove_ids"] = data.get("remove_ids", [])
result["new_triple"] = data.get("new_triple", {})
return result
- [ ] Step 5: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_consolidation.py -v Expected: All 7 tests PASS
- [ ] Step 6: Commit
git add main/server/worldmm/memory/semantic/extraction.py \
main/server/worldmm/memory/semantic/consolidation.py \
main/server/worldmm/tests/test_consolidation.py \
main/server/worldmm/tests/fixtures/sample_semantic_triples_response.json
git commit -m "feat(worldmm): add semantic triple extraction and consolidation"
Task 9: Visual Encoder Client
Files: - Create: main/server/worldmm/memory/visual/__init__.py - Create: main/server/worldmm/memory/visual/encoder.py - Test: main/server/worldmm/tests/test_visual_encoder.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_visual_encoder.py
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
EMBEDDING_DIM = 3584
def test_encode_video_sends_correct_payload():
from worldmm.memory.visual.encoder import VLM2VecClient
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"embedding": [0.1] * EMBEDDING_DIM}
with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
mock_requests.post.return_value = mock_response
client = VLM2VecClient(base_url="http://gpu-worker:8000")
embedding = client.encode_video(frames_b64=["frame1", "frame2"])
mock_requests.post.assert_called_once()
call_args = mock_requests.post.call_args
assert call_args[0][0] == "http://gpu-worker:8000/encode-video"
payload = call_args[1]["json"]
assert payload["frames"] == ["frame1", "frame2"]
def test_encode_video_returns_embedding():
from worldmm.memory.visual.encoder import VLM2VecClient
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"embedding": [0.5] * EMBEDDING_DIM}
with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
mock_requests.post.return_value = mock_response
client = VLM2VecClient(base_url="http://gpu-worker:8000")
embedding = client.encode_video(frames_b64=["f1"])
assert len(embedding) == EMBEDDING_DIM
assert embedding[0] == 0.5
def test_encode_text_sends_correct_payload():
from worldmm.memory.visual.encoder import VLM2VecClient
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"embedding": [0.2] * EMBEDDING_DIM}
with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
mock_requests.post.return_value = mock_response
client = VLM2VecClient(base_url="http://gpu-worker:8000")
embedding = client.encode_text("what did I eat for lunch")
call_args = mock_requests.post.call_args
assert call_args[0][0] == "http://gpu-worker:8000/encode-text"
payload = call_args[1]["json"]
assert payload["text"] == "what did I eat for lunch"
def test_encode_video_raises_on_error():
from worldmm.memory.visual.encoder import VLM2VecClient
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.raise_for_status.side_effect = Exception("GPU worker error")
with patch("worldmm.memory.visual.encoder.requests") as mock_requests:
mock_requests.post.return_value = mock_response
client = VLM2VecClient(base_url="http://gpu-worker:8000")
import pytest
with pytest.raises(Exception, match="GPU worker error"):
client.encode_video(frames_b64=["f1"])
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_visual_encoder.py -v Expected: FAIL
- [ ] Step 3: Write the implementation
# main/server/worldmm/memory/visual/__init__.py
# (empty)
# main/server/worldmm/memory/visual/encoder.py
from __future__ import annotations
from typing import Any
import requests
class VLM2VecClient:
def __init__(self, base_url: str) -> None:
self._base_url = base_url.rstrip("/")
def encode_video(self, frames_b64: list[str]) -> list[float]:
response = requests.post(
f"{self._base_url}/encode-video",
json={"frames": frames_b64},
timeout=60,
)
response.raise_for_status()
return response.json()["embedding"]
def encode_text(self, text: str) -> list[float]:
response = requests.post(
f"{self._base_url}/encode-text",
json={"text": text},
timeout=30,
)
response.raise_for_status()
return response.json()["embedding"]
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_visual_encoder.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/memory/visual/ \
main/server/worldmm/tests/test_visual_encoder.py
git commit -m "feat(worldmm): add VLM2Vec GPU worker client"
Task 10: Episodic Retriever (PPR + Cross-Scale Rerank)
Files: - Create: main/server/worldmm/retrieval/__init__.py - Create: main/server/worldmm/retrieval/episodic_retriever.py - Test: main/server/worldmm/tests/test_episodic_retriever.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_episodic_retriever.py
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def _make_mock_graphs():
"""Create mock EpisodicGraph instances for each temporal scale."""
from worldmm.memory.episodic.graph import EpisodicGraph
graphs = {}
for duration, seg_prefix, top_k in [(30, "s30", 10), (180, "s3m", 5), (600, "s10m", 5), (3600, "s1h", 3)]:
g = EpisodicGraph()
triples = [
{"subject_entity_id": "e1", "predicate": "eats", "object_entity_id": "e2",
"object_literal": None, "segment_id": f"{seg_prefix}_1", "id": f"t_{seg_prefix}"},
]
g.build(triples, {"e1": "I", "e2": "lunch"})
g.set_segment_captions({f"{seg_prefix}_1": f"Caption at {duration}s scale"})
graphs[duration] = g
return graphs
def test_retrieve_episodic_collects_from_all_scales():
from worldmm.retrieval.episodic_retriever import retrieve_episodic
graphs = _make_mock_graphs()
# Mock the reranker to pass through all candidates
mock_reranker = lambda candidates, query, top_k: candidates[:top_k]
results = retrieve_episodic(
graphs=graphs,
seed_entity_ids=["e1"],
query="what did I eat",
reranker=mock_reranker,
)
assert len(results) > 0
# Should have captions from multiple scales
captions = [r["caption"] for r in results]
assert any("30s" in c for c in captions)
def test_retrieve_episodic_respects_per_scale_top_k():
from worldmm.retrieval.episodic_retriever import retrieve_episodic, SCALE_TOP_K
graphs = _make_mock_graphs()
mock_reranker = lambda candidates, query, top_k: candidates[:top_k]
results = retrieve_episodic(
graphs=graphs,
seed_entity_ids=["e1"],
query="test",
reranker=mock_reranker,
)
# Final output should be <= 3 (reranker top_k)
assert len(results) <= 3
def test_build_rerank_prompt():
from worldmm.retrieval.episodic_retriever import build_rerank_prompt
candidates = [
{"caption": "I ate lunch", "scale": 30, "score": 0.8},
{"caption": "I prepared food", "scale": 180, "score": 0.6},
]
messages = build_rerank_prompt(candidates, query="what did I eat")
content = str(messages)
assert "I ate lunch" in content
assert "what did I eat" in content
def test_parse_rerank_response():
from worldmm.retrieval.episodic_retriever import parse_rerank_response
candidates = [
{"caption": "A", "scale": 30, "score": 0.8, "segment_id": "s1"},
{"caption": "B", "scale": 180, "score": 0.6, "segment_id": "s2"},
{"caption": "C", "scale": 600, "score": 0.4, "segment_id": "s3"},
]
raw = json.dumps({"selected_indices": [0, 2]})
result = parse_rerank_response(raw, candidates, top_k=3)
assert len(result) == 2
assert result[0]["caption"] == "A"
assert result[1]["caption"] == "C"
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_episodic_retriever.py -v Expected: FAIL
- [ ] Step 3: Write the implementation
# main/server/worldmm/retrieval/__init__.py
# (empty)
# main/server/worldmm/retrieval/episodic_retriever.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Callable
from worldmm.memory.episodic.graph import EpisodicGraph
SCALE_TOP_K = {30: 10, 180: 5, 600: 5, 3600: 3}
RERANK_TOP_K = 3
_PROMPTS_DIR = Path(__file__).resolve().parents[1] / "llm" / "prompts"
def retrieve_episodic(
graphs: dict[int, EpisodicGraph],
seed_entity_ids: list[str],
query: str,
reranker: Callable[[list[dict[str, Any]], str, int], list[dict[str, Any]]],
) -> list[dict[str, Any]]:
all_candidates: list[dict[str, Any]] = []
for scale, top_k in SCALE_TOP_K.items():
graph = graphs.get(scale)
if graph is None:
continue
ranked = graph.ppr(seed_entity_ids=seed_entity_ids, top_k=top_k)
# Collect unique segment_ids from ranked nodes
segment_ids: set[str] = set()
for node in ranked:
segment_ids.update(node.get("segment_ids", []))
captions = graph.get_captions(sorted(segment_ids))
for i, caption in enumerate(captions):
seg_id = sorted(segment_ids)[i] if i < len(sorted(segment_ids)) else None
all_candidates.append({
"caption": caption,
"scale": scale,
"score": ranked[0]["score"] if ranked else 0.0,
"segment_id": seg_id,
})
if not all_candidates:
return []
return reranker(all_candidates, query, RERANK_TOP_K)
def build_rerank_prompt(
candidates: list[dict[str, Any]],
query: str,
) -> list[dict[str, str]]:
prompt_path = _PROMPTS_DIR / "cross_scale_rerank.txt"
if prompt_path.exists():
system = prompt_path.read_text().strip()
else:
system = (
"Given a query and candidate captions from different temporal scales, "
"select the most relevant captions. Return JSON: {\"selected_indices\": [0, 2, ...]}"
)
numbered = "\n".join(
f"{i}. [scale={c['scale']}s] {c['caption']}"
for i, c in enumerate(candidates)
)
user_content = f"Query: {query}\n\nCandidates:\n{numbered}"
return [
{"role": "system", "content": system},
{"role": "user", "content": user_content},
]
def parse_rerank_response(
raw: str,
candidates: list[dict[str, Any]],
top_k: int = RERANK_TOP_K,
) -> list[dict[str, Any]]:
data = json.loads(raw)
indices = data.get("selected_indices", [])
selected = [candidates[i] for i in indices if i < len(candidates)]
return selected[:top_k]
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_episodic_retriever.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/retrieval/ \
main/server/worldmm/tests/test_episodic_retriever.py
git commit -m "feat(worldmm): add episodic retriever with cross-scale PPR and reranking"
Task 11: Semantic + Visual Retrievers
Files: - Create: main/server/worldmm/retrieval/semantic_retriever.py - Create: main/server/worldmm/retrieval/visual_retriever.py - Test: main/server/worldmm/tests/test_semantic_retriever.py - Test: main/server/worldmm/tests/test_visual_retriever.py
- [ ] Step 1: Write the failing tests
# main/server/worldmm/tests/test_semantic_retriever.py
from __future__ import annotations
import sys
from pathlib import Path
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_retrieve_semantic_returns_triples():
from worldmm.retrieval.semantic_retriever import retrieve_semantic
from worldmm.memory.semantic.graph import SemanticGraph
g = SemanticGraph()
triples = [
{"id": "t1", "subject_entity_id": "e1", "predicate": "often eats",
"object_entity_id": "e2", "object_literal": None},
]
g.build(triples, {"e1": "I", "e2": "fruits"})
results = retrieve_semantic(graph=g, seed_entity_ids=["e1"], top_k=10)
assert len(results) == 1
assert results[0]["predicate"] == "often eats"
def test_retrieve_semantic_empty_graph():
from worldmm.retrieval.semantic_retriever import retrieve_semantic
from worldmm.memory.semantic.graph import SemanticGraph
g = SemanticGraph()
g.build([], {})
results = retrieve_semantic(graph=g, seed_entity_ids=["e1"], top_k=10)
assert results == []
# main/server/worldmm/tests/test_visual_retriever.py
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import MagicMock
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_retrieve_visual_calls_encoder_and_returns_results():
from worldmm.retrieval.visual_retriever import retrieve_visual
mock_encoder = MagicMock()
mock_encoder.encode_text.return_value = [0.1] * 3584
mock_db_query = MagicMock(return_value=[
{"segment_id": "s1", "similarity": 0.95, "s3_frames_key": "frames/s1"},
{"segment_id": "s2", "similarity": 0.80, "s3_frames_key": "frames/s2"},
])
results = retrieve_visual(
query="what food did I eat",
encoder=mock_encoder,
db_query_fn=mock_db_query,
top_k=5,
)
mock_encoder.encode_text.assert_called_once_with("what food did I eat")
assert len(results) == 2
assert results[0]["segment_id"] == "s1"
def test_retrieve_visual_returns_empty_on_encoder_failure():
from worldmm.retrieval.visual_retriever import retrieve_visual
mock_encoder = MagicMock()
mock_encoder.encode_text.side_effect = Exception("GPU unavailable")
results = retrieve_visual(
query="test",
encoder=mock_encoder,
db_query_fn=MagicMock(),
top_k=5,
)
assert results == []
- [ ] Step 2: Run tests to verify they fail
Run: cd main/server && python -m pytest worldmm/tests/test_semantic_retriever.py worldmm/tests/test_visual_retriever.py -v Expected: FAIL
- [ ] Step 3: Write the implementations
# main/server/worldmm/retrieval/semantic_retriever.py
from __future__ import annotations
from typing import Any
from worldmm.memory.semantic.graph import SemanticGraph
def retrieve_semantic(
graph: SemanticGraph,
seed_entity_ids: list[str],
top_k: int = 10,
) -> list[dict[str, Any]]:
return graph.ppr_edge_scored(seed_entity_ids=seed_entity_ids, top_k=top_k)
# main/server/worldmm/retrieval/visual_retriever.py
from __future__ import annotations
from typing import Any, Callable
from worldmm.memory.visual.encoder import VLM2VecClient
def retrieve_visual(
query: str,
encoder: VLM2VecClient,
db_query_fn: Callable[[list[float], int], list[dict[str, Any]]],
top_k: int = 5,
) -> list[dict[str, Any]]:
try:
query_embedding = encoder.encode_text(query)
except Exception:
return []
return db_query_fn(query_embedding, top_k)
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_semantic_retriever.py worldmm/tests/test_visual_retriever.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/retrieval/semantic_retriever.py \
main/server/worldmm/retrieval/visual_retriever.py \
main/server/worldmm/tests/test_semantic_retriever.py \
main/server/worldmm/tests/test_visual_retriever.py
git commit -m "feat(worldmm): add semantic and visual retrievers"
Task 12: Reasoning Agent (SEARCH/ANSWER Loop)
Files: - Create: main/server/worldmm/retrieval/agent.py - Test: main/server/worldmm/tests/test_retrieval_agent.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_retrieval_agent.py
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, call
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_agent_dispatches_search_to_correct_retriever():
from worldmm.retrieval.agent import ReasoningAgent
mock_llm = MagicMock()
# Round 1: agent says SEARCH episodic
# Round 2: agent says ANSWER
mock_llm.call.side_effect = [
json.dumps({"action": "SEARCH", "memory_type": "episodic", "query": "lunch"}),
json.dumps({"action": "ANSWER"}),
]
mock_episodic = MagicMock(return_value=[{"caption": "I ate lunch"}])
mock_semantic = MagicMock(return_value=[])
mock_visual = MagicMock(return_value=[])
mock_response_llm = MagicMock()
mock_response_llm.call.return_value = "You ate lunch at noon."
agent = ReasoningAgent(
reasoning_llm=mock_llm,
response_llm=mock_response_llm,
retrievers={
"episodic": mock_episodic,
"semantic": mock_semantic,
"visual": mock_visual,
},
)
answer = agent.answer("What did I eat for lunch?")
mock_episodic.assert_called_once_with("lunch")
mock_semantic.assert_not_called()
assert answer == "You ate lunch at noon."
def test_agent_terminates_on_answer_action():
from worldmm.retrieval.agent import ReasoningAgent
mock_llm = MagicMock()
mock_llm.call.return_value = json.dumps({"action": "ANSWER"})
mock_response_llm = MagicMock()
mock_response_llm.call.return_value = "I don't have enough context."
agent = ReasoningAgent(
reasoning_llm=mock_llm,
response_llm=mock_response_llm,
retrievers={"episodic": MagicMock(), "semantic": MagicMock(), "visual": MagicMock()},
)
answer = agent.answer("test question")
# LLM should only be called once (immediate ANSWER)
assert mock_llm.call.call_count == 1
assert answer == "I don't have enough context."
def test_agent_forces_answer_after_max_rounds():
from worldmm.retrieval.agent import ReasoningAgent, MAX_ROUNDS
mock_llm = MagicMock()
# Always returns SEARCH — should be forced to stop after MAX_ROUNDS
mock_llm.call.return_value = json.dumps({
"action": "SEARCH", "memory_type": "episodic", "query": "test"
})
mock_response_llm = MagicMock()
mock_response_llm.call.return_value = "Forced answer."
mock_retriever = MagicMock(return_value=[{"caption": "result"}])
agent = ReasoningAgent(
reasoning_llm=mock_llm,
response_llm=mock_response_llm,
retrievers={"episodic": mock_retriever, "semantic": MagicMock(), "visual": MagicMock()},
)
answer = agent.answer("test")
assert mock_llm.call.call_count == MAX_ROUNDS
assert answer == "Forced answer."
def test_agent_accumulates_context_across_rounds():
from worldmm.retrieval.agent import ReasoningAgent
mock_llm = MagicMock()
mock_llm.call.side_effect = [
json.dumps({"action": "SEARCH", "memory_type": "episodic", "query": "lunch"}),
json.dumps({"action": "SEARCH", "memory_type": "semantic", "query": "eating habits"}),
json.dumps({"action": "ANSWER"}),
]
mock_episodic = MagicMock(return_value=[{"caption": "I ate sushi"}])
mock_semantic = MagicMock(return_value=[{"subject": "I", "predicate": "often eats", "object": "sushi"}])
mock_response_llm = MagicMock()
mock_response_llm.call.return_value = "You frequently eat sushi."
agent = ReasoningAgent(
reasoning_llm=mock_llm,
response_llm=mock_response_llm,
retrievers={
"episodic": mock_episodic,
"semantic": mock_semantic,
"visual": MagicMock(),
},
)
answer = agent.answer("What do I usually eat?")
# Response LLM should receive accumulated context from both rounds
response_call = mock_response_llm.call.call_args
request = response_call[1]["build_request_dict"]
context_str = str(request["messages"])
assert "sushi" in context_str
def test_parse_agent_action_search():
from worldmm.retrieval.agent import parse_agent_action
raw = json.dumps({"action": "SEARCH", "memory_type": "semantic", "query": "test"})
action = parse_agent_action(raw)
assert action["action"] == "SEARCH"
assert action["memory_type"] == "semantic"
assert action["query"] == "test"
def test_parse_agent_action_answer():
from worldmm.retrieval.agent import parse_agent_action
raw = json.dumps({"action": "ANSWER"})
action = parse_agent_action(raw)
assert action["action"] == "ANSWER"
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_retrieval_agent.py -v Expected: FAIL
- [ ] Step 3: Write the implementation
# main/server/worldmm/retrieval/agent.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Callable
from worldmm.llm.client import LLMClient, build_request
MAX_ROUNDS = 5
_PROMPTS_DIR = Path(__file__).resolve().parents[1] / "llm" / "prompts"
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
if name == "reasoning_agent.txt":
return (
"You are a memory retrieval agent. Given a user's question and any previously "
"retrieved context, decide whether to search a memory type or answer.\n\n"
"Available memory types: episodic (events), semantic (relationships/habits), visual (images/scenes).\n\n"
"Respond with JSON:\n"
'- To search: {"action": "SEARCH", "memory_type": "episodic"|"semantic"|"visual", "query": "..."}\n'
'- To answer: {"action": "ANSWER"}'
)
if name == "response.txt":
return (
"You are a helpful assistant. Given a user's question and retrieved memory context, "
"provide a clear and accurate answer based on the available information."
)
return ""
def parse_agent_action(raw: str) -> dict[str, Any]:
return json.loads(raw)
class ReasoningAgent:
def __init__(
self,
reasoning_llm: LLMClient,
response_llm: LLMClient,
retrievers: dict[str, Callable[[str], list[dict[str, Any]]]],
) -> None:
self._reasoning_llm = reasoning_llm
self._response_llm = response_llm
self._retrievers = retrievers
def answer(self, question: str) -> str:
rounds: list[dict[str, Any]] = []
accumulated_context: list[dict[str, Any]] = []
for _ in range(MAX_ROUNDS):
messages = self._build_reasoning_messages(question, rounds)
request = build_request(
model="gpt-5",
messages=messages,
max_completion_tokens=512,
)
raw = self._reasoning_llm.call(build_request_dict=request)
action = parse_agent_action(raw)
if action["action"] == "ANSWER":
break
if action["action"] == "SEARCH":
memory_type = action["memory_type"]
query = action["query"]
retriever = self._retrievers.get(memory_type)
results = retriever(query) if retriever else []
rounds.append({
"action": "SEARCH",
"memory_type": memory_type,
"query": query,
"results": results,
})
accumulated_context.extend(results)
return self._generate_response(question, accumulated_context)
def _build_reasoning_messages(
self,
question: str,
rounds: list[dict[str, Any]],
) -> list[dict[str, str]]:
system = _load_prompt("reasoning_agent.txt")
context_parts = [f"User question: {question}"]
for i, r in enumerate(rounds):
context_parts.append(
f"\nRound {i + 1}: SEARCH({r['memory_type']}, \"{r['query']}\")\n"
f"Results: {json.dumps(r['results'], default=str)}"
)
return [
{"role": "system", "content": system},
{"role": "user", "content": "\n".join(context_parts)},
]
def _generate_response(
self,
question: str,
context: list[dict[str, Any]],
) -> str:
system = _load_prompt("response.txt")
user_content = (
f"Question: {question}\n\n"
f"Retrieved context:\n{json.dumps(context, default=str)}"
)
request = build_request(
model="gpt-5",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_content},
],
max_completion_tokens=1024,
)
return self._response_llm.call(build_request_dict=request)
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_retrieval_agent.py -v Expected: All 6 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/retrieval/agent.py \
main/server/worldmm/tests/test_retrieval_agent.py
git commit -m "feat(worldmm): add adaptive reasoning agent with SEARCH/ANSWER loop"
Task 13: Batch Client (Batch API Request Builder + Poller)
Files: - Create: main/server/worldmm/llm/batch.py - Test: main/server/worldmm/tests/test_batch_client.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_batch_client.py
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch, mock_open
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_build_batch_requests_creates_jsonl():
from worldmm.llm.batch import build_batch_jsonl
requests = [
{"model": "gpt-5-mini", "messages": [{"role": "user", "content": "hello"}],
"max_completion_tokens": 256, "reasoning_effort": "low"},
{"model": "gpt-5-mini", "messages": [{"role": "user", "content": "world"}],
"max_completion_tokens": 256, "reasoning_effort": "low"},
]
lines = build_batch_jsonl(requests, custom_ids=["req-1", "req-2"])
assert len(lines) == 2
parsed_0 = json.loads(lines[0])
assert parsed_0["custom_id"] == "req-1"
assert parsed_0["method"] == "POST"
assert parsed_0["url"] == "/v1/chat/completions"
assert parsed_0["body"]["model"] == "gpt-5-mini"
assert parsed_0["body"]["max_completion_tokens"] == 256
def test_build_batch_jsonl_auto_generates_ids():
from worldmm.llm.batch import build_batch_jsonl
requests = [
{"model": "gpt-5-mini", "messages": [{"role": "user", "content": "hi"}],
"max_completion_tokens": 64},
]
lines = build_batch_jsonl(requests)
parsed = json.loads(lines[0])
assert parsed["custom_id"].startswith("req-")
def test_parse_batch_results_maps_to_custom_ids():
from worldmm.llm.batch import parse_batch_results
result_lines = [
json.dumps({
"custom_id": "req-1",
"response": {
"status_code": 200,
"body": {
"choices": [{"message": {"content": "response 1"}}]
}
}
}),
json.dumps({
"custom_id": "req-2",
"response": {
"status_code": 200,
"body": {
"choices": [{"message": {"content": "response 2"}}]
}
}
}),
]
results = parse_batch_results(result_lines)
assert results["req-1"] == "response 1"
assert results["req-2"] == "response 2"
def test_parse_batch_results_handles_errors():
from worldmm.llm.batch import parse_batch_results
result_lines = [
json.dumps({
"custom_id": "req-1",
"response": {"status_code": 500, "body": {"error": {"message": "server error"}}}
}),
]
results = parse_batch_results(result_lines)
assert results["req-1"] is None
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_batch_client.py -v Expected: FAIL
- [ ] Step 3: Write the implementation
# main/server/worldmm/llm/batch.py
from __future__ import annotations
import json
import uuid
from typing import Any
def build_batch_jsonl(
requests: list[dict[str, Any]],
custom_ids: list[str] | None = None,
) -> list[str]:
if custom_ids is None:
custom_ids = [f"req-{uuid.uuid4().hex[:8]}" for _ in requests]
lines = []
for req, cid in zip(requests, custom_ids):
line = {
"custom_id": cid,
"method": "POST",
"url": "/v1/chat/completions",
"body": req,
}
lines.append(json.dumps(line))
return lines
def parse_batch_results(result_lines: list[str]) -> dict[str, str | None]:
results: dict[str, str | None] = {}
for line in result_lines:
data = json.loads(line)
custom_id = data["custom_id"]
response = data.get("response", {})
if response.get("status_code") == 200:
body = response.get("body", {})
choices = body.get("choices", [])
if choices:
results[custom_id] = choices[0].get("message", {}).get("content")
else:
results[custom_id] = None
else:
results[custom_id] = None
return results
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_batch_client.py -v Expected: All 4 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/llm/batch.py \
main/server/worldmm/tests/test_batch_client.py
git commit -m "feat(worldmm): add batch API JSONL builder and result parser"
Task 14: Prompt Templates
Files: - Create: all files in main/server/worldmm/llm/prompts/
- [ ] Step 1: Create prompt template files
Create each file in main/server/worldmm/llm/prompts/ with the prompt content derived from the WorldMM paper. These replace the inline fallback strings in the code.
ner.txt:
Extract all named entities from the following paragraph. Include people (by name or pronoun "I"), objects, locations, and organizations. Return your answer as JSON: {"entities": ["entity1", "entity2", ...]}
episodic_triples.txt:
Given the caption and the extracted named entities, extract RDF-style knowledge triples. Each triple must be [subject, predicate, object] where at least one element is a named entity from the list. Keep pronouns like "I" as-is. Focus on factual, event-specific relationships. Return JSON: {"triples": [{"subject": "...", "predicate": "...", "object": "..."}, ...]}
semantic_triples.txt:
Extract long-term patterns, habits, preferences, and social relationships from this summary. Focus on what is generally true about the people and their behaviors — not what happened in this specific moment. Examples: habitual activities, recurring preferences, interpersonal relationships. Return JSON: {"triples": [{"subject": "...", "predicate": "...", "object": "..."}, ...]}
caption.txt:
You are a video captioning assistant. Given 8 frames sampled uniformly from a 30-second video clip and an audio transcript, generate a detailed caption describing what is happening. Include the people present (by name if identifiable, or by description), their actions, objects they interact with, and the location/setting. Be specific and factual.
merge_captions.txt:
You are a summarization assistant. Given a sequence of captions from consecutive video segments, merge them into a single coherent {duration_label} summary. Preserve key entities, actions, and temporal order. Remove redundancy but keep important details. Be concise but complete.
consolidation.txt:
You are maintaining a knowledge base of semantic triples about a person's life. Given existing triples and a new triple, decide how to consolidate them.
If the new triple conflicts with or updates an existing triple (e.g., "I sometimes eats fruit" vs "I often eats fruits and snacks"), merge them into an updated triple and mark the old ones for removal.
If the new triple adds new information without conflicting, keep it as-is.
Return JSON:
- To merge: {"action": "merge", "remove_ids": ["id1", ...], "new_triple": {"subject": "...", "predicate": "...", "object": "..."}}
- To keep as-is: {"action": "keep"}
cross_scale_rerank.txt:
Given a user's question and candidate captions retrieved from different temporal scales of an episodic memory system, select the most relevant captions that best answer the question. Consider both direct relevance and temporal specificity.
Return JSON: {"selected_indices": [0, 2, ...]} where indices refer to the candidate list positions. Select at most 3 candidates.
reasoning_agent.txt:
You are an adaptive memory retrieval agent. Given a user's question and any previously retrieved context, decide your next action.
Available memory types:
- episodic: Search for specific events and actions (what happened, when, where)
- semantic: Search for long-term knowledge, habits, and relationships (who knows whom, what someone usually does)
- visual: Search for visual scenes and appearances (what something looked like)
Think about what information you still need to answer the question. If you have enough context, answer.
Respond with JSON:
- To search: {"action": "SEARCH", "memory_type": "episodic"|"semantic"|"visual", "query": "your search query"}
- To answer: {"action": "ANSWER"}
response.txt:
You are a helpful memory assistant. Given a user's question and retrieved memory context from their personal experiences, provide a clear, accurate, and helpful answer. Base your answer only on the provided context. If the context is insufficient, say so.
entity_confirm.txt:
Given two entity surface forms, determine if they refer to the same real-world entity. Consider nicknames, abbreviations, pronouns, and contextual equivalence.
Return JSON: {"same_entity": true|false, "canonical_name": "preferred name if same entity"}
- [ ] Step 2: Run all tests to verify prompts don't break anything
Run: cd main/server && python -m pytest worldmm/tests/ -v Expected: All tests still PASS (prompt files are now loaded from disk instead of inline fallbacks)
- [ ] Step 3: Commit
git add main/server/worldmm/llm/prompts/
git commit -m "feat(worldmm): add all prompt templates from WorldMM paper"
Task 15: Entity Resolution Pipeline
Files: - Create: main/server/worldmm/memory/episodic/entity_resolution.py - Test: main/server/worldmm/tests/test_entity_resolution.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_entity_resolution.py
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_resolve_new_entity_creates_canonical():
from worldmm.memory.episodic.entity_resolution import resolve_entities
# Mock embedding function
mock_embed = MagicMock(return_value=[[0.1] * 10]) # batch embed
# Mock HNSW search — returns no matches (new entity)
mock_search = MagicMock(return_value=[])
# Mock entity creation
created_ids = []
def mock_create(user_id, surface_form, canonical_name=None, canonical_entity_id=None):
eid = f"e-{len(created_ids)}"
created_ids.append(eid)
return eid
mock_llm_confirm = MagicMock() # should not be called for new entities
result = resolve_entities(
surface_forms=["Katrina"],
user_id="user-1",
embed_fn=mock_embed,
search_fn=mock_search,
create_fn=mock_create,
llm_confirm_fn=mock_llm_confirm,
)
assert len(result) == 1
assert result["Katrina"] == "e-0"
mock_llm_confirm.assert_not_called()
def test_resolve_existing_entity_with_llm_confirmation():
from worldmm.memory.episodic.entity_resolution import resolve_entities
mock_embed = MagicMock(return_value=[[0.9, 0.1, 0.0]])
# HNSW search returns a candidate above threshold
mock_search = MagicMock(return_value=[
{"id": "existing-e1", "surface_form": "Katrina", "canonical_name": "Katrina", "similarity": 0.92}
])
mock_create = MagicMock()
# LLM confirms they are the same entity
mock_llm_confirm = MagicMock(return_value={"same_entity": True, "canonical_name": "Katrina"})
result = resolve_entities(
surface_forms=["Kate"],
user_id="user-1",
embed_fn=mock_embed,
search_fn=mock_search,
create_fn=mock_create,
llm_confirm_fn=mock_llm_confirm,
)
assert result["Kate"] == "existing-e1"
mock_llm_confirm.assert_called_once()
# Should create alias row, not new canonical
mock_create.assert_called_once()
call_kwargs = mock_create.call_args[1]
assert call_kwargs["canonical_entity_id"] == "existing-e1"
assert call_kwargs["surface_form"] == "Kate"
def test_resolve_existing_entity_llm_rejects():
from worldmm.memory.episodic.entity_resolution import resolve_entities
mock_embed = MagicMock(return_value=[[0.5, 0.5, 0.0]])
mock_search = MagicMock(return_value=[
{"id": "existing-e1", "surface_form": "Katherine", "canonical_name": "Katherine", "similarity": 0.65}
])
created_ids = []
def mock_create(user_id, surface_form, canonical_name=None, canonical_entity_id=None):
eid = f"e-{len(created_ids)}"
created_ids.append({"id": eid, "canonical_entity_id": canonical_entity_id})
return eid
# LLM says NOT the same entity
mock_llm_confirm = MagicMock(return_value={"same_entity": False})
result = resolve_entities(
surface_forms=["Kate"],
user_id="user-1",
embed_fn=mock_embed,
search_fn=mock_search,
create_fn=mock_create,
llm_confirm_fn=mock_llm_confirm,
)
# Should create a NEW canonical entity, not an alias
assert result["Kate"] == "e-0"
assert created_ids[0]["canonical_entity_id"] is None
def test_resolve_batches_multiple_entities():
from worldmm.memory.episodic.entity_resolution import resolve_entities
# Batch embed returns embeddings for all surface forms at once
mock_embed = MagicMock(return_value=[[0.1] * 10, [0.2] * 10, [0.3] * 10])
mock_search = MagicMock(return_value=[])
created_count = [0]
def mock_create(user_id, surface_form, canonical_name=None, canonical_entity_id=None):
eid = f"e-{created_count[0]}"
created_count[0] += 1
return eid
mock_llm_confirm = MagicMock()
result = resolve_entities(
surface_forms=["I", "Katrina", "dining table"],
user_id="user-1",
embed_fn=mock_embed,
search_fn=mock_search,
create_fn=mock_create,
llm_confirm_fn=mock_llm_confirm,
)
assert len(result) == 3
# embed_fn called once with all 3 surface forms (batched)
mock_embed.assert_called_once_with(["I", "Katrina", "dining table"])
def test_build_entity_confirm_prompt():
from worldmm.memory.episodic.entity_resolution import build_entity_confirm_prompt
messages = build_entity_confirm_prompt("Kate", "Katrina")
content = str(messages)
assert "Kate" in content
assert "Katrina" in content
def test_parse_entity_confirm_response():
from worldmm.memory.episodic.entity_resolution import parse_entity_confirm_response
raw = json.dumps({"same_entity": True, "canonical_name": "Katrina"})
result = parse_entity_confirm_response(raw)
assert result["same_entity"] is True
assert result["canonical_name"] == "Katrina"
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_entity_resolution.py -v Expected: FAIL — ModuleNotFoundError
- [ ] Step 3: Write the implementation
# main/server/worldmm/memory/episodic/entity_resolution.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Callable
_PROMPTS_DIR = Path(__file__).resolve().parents[2] / "llm" / "prompts"
def _load_prompt(name: str) -> str:
path = _PROMPTS_DIR / name
if path.exists():
return path.read_text().strip()
if name == "entity_confirm.txt":
return (
"Given two entity surface forms, determine if they refer to the same real-world entity. "
"Return JSON: {\"same_entity\": true|false, \"canonical_name\": \"preferred name\"}"
)
return ""
def build_entity_confirm_prompt(
surface_form_a: str,
surface_form_b: str,
) -> list[dict[str, str]]:
system = _load_prompt("entity_confirm.txt")
user_content = f'Surface form A: "{surface_form_a}"\nSurface form B: "{surface_form_b}"'
return [
{"role": "system", "content": system},
{"role": "user", "content": user_content},
]
def parse_entity_confirm_response(raw: str) -> dict[str, Any]:
return json.loads(raw)
def resolve_entities(
surface_forms: list[str],
user_id: str,
embed_fn: Callable[[list[str]], list[list[float]]],
search_fn: Callable[[list[float]], list[dict[str, Any]]],
create_fn: Callable[..., str],
llm_confirm_fn: Callable[[str, str], dict[str, Any]],
threshold: float = 0.6,
) -> dict[str, str]:
"""Resolve surface forms to canonical entity IDs. Batched per window.
Args:
surface_forms: list of entity names from NER
user_id: user who owns the entities
embed_fn: batch embed function (list[str] -> list[list[float]])
search_fn: HNSW search function (embedding -> list of candidates)
create_fn: creates entity row (user_id, surface_form, canonical_name, canonical_entity_id) -> id
llm_confirm_fn: confirms two surface forms are same entity -> {same_entity, canonical_name}
threshold: cosine similarity threshold for HNSW candidate matching
Returns:
dict mapping surface_form -> canonical_entity_id
"""
embeddings = embed_fn(surface_forms)
resolved: dict[str, str] = {}
for surface_form, embedding in zip(surface_forms, embeddings):
candidates = search_fn(embedding)
if not candidates:
# New canonical entity
entity_id = create_fn(
user_id=user_id,
surface_form=surface_form,
canonical_name=surface_form,
canonical_entity_id=None,
)
resolved[surface_form] = entity_id
continue
# Check top candidate with LLM confirmation
top = candidates[0]
confirm = llm_confirm_fn(surface_form, top["surface_form"])
if confirm.get("same_entity"):
# Create alias row pointing to existing canonical
create_fn(
user_id=user_id,
surface_form=surface_form,
canonical_name=None,
canonical_entity_id=top["id"],
)
resolved[surface_form] = top["id"]
else:
# LLM rejected — create new canonical
entity_id = create_fn(
user_id=user_id,
surface_form=surface_form,
canonical_name=surface_form,
canonical_entity_id=None,
)
resolved[surface_form] = entity_id
return resolved
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_entity_resolution.py -v Expected: All 6 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/memory/episodic/entity_resolution.py \
main/server/worldmm/tests/test_entity_resolution.py
git commit -m "feat(worldmm): add entity resolution pipeline with embedding lookup and LLM confirmation"
Task 16: Visual Index (pgvector query)
Files: - Create: main/server/worldmm/memory/visual/index.py - Test: main/server/worldmm/tests/test_visual_index.py
- [ ] Step 1: Write the failing test
# main/server/worldmm/tests/test_visual_index.py
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
_server_root = Path(__file__).resolve().parents[3]
_layer_root = _server_root / "layers" / "shared" / "python"
for p in (_layer_root, _server_root):
if str(p) not in sys.path:
sys.path.insert(0, str(p))
def test_query_visual_embeddings_calls_db():
from worldmm.memory.visual.index import query_visual_embeddings
mock_session = MagicMock()
mock_row_1 = MagicMock()
mock_row_1.segment_id = "s1"
mock_row_1.user_id = "user-1"
mock_row_1.timestamp = "2026-03-22T10:00:00Z"
mock_row_2 = MagicMock()
mock_row_2.segment_id = "s2"
mock_row_2.user_id = "user-1"
mock_row_2.timestamp = "2026-03-22T10:00:30Z"
mock_session.execute.return_value.fetchall.return_value = [
(mock_row_1, 0.95),
(mock_row_2, 0.80),
]
results = query_visual_embeddings(
query_embedding=[0.1] * 3584,
user_id="user-1",
top_k=5,
session=mock_session,
)
assert len(results) == 2
assert results[0]["segment_id"] == "s1"
assert results[0]["similarity"] == 0.95
assert results[1]["segment_id"] == "s2"
def test_query_visual_embeddings_empty_result():
from worldmm.memory.visual.index import query_visual_embeddings
mock_session = MagicMock()
mock_session.execute.return_value.fetchall.return_value = []
results = query_visual_embeddings(
query_embedding=[0.1] * 3584,
user_id="user-1",
top_k=5,
session=mock_session,
)
assert results == []
- [ ] Step 2: Run test to verify it fails
Run: cd main/server && python -m pytest worldmm/tests/test_visual_index.py -v Expected: FAIL
- [ ] Step 3: Write the implementation
# main/server/worldmm/memory/visual/index.py
from __future__ import annotations
from typing import Any
from sqlalchemy.orm import Session
def query_visual_embeddings(
query_embedding: list[float],
user_id: str,
top_k: int = 5,
session: Session | None = None,
) -> list[dict[str, Any]]:
"""Query pgvector for most similar visual embeddings.
Uses cosine distance operator (<=>). Requires HNSW index on the embedding column.
In tests, the session is mocked. In production, uses the real pgvector extension.
"""
if session is None:
from shared.orm.orm import getSession
session = getSession()
try:
rows = session.execute(
session.query(None) # placeholder — actual SQL uses raw query below
).fetchall()
except Exception:
# When mocked, session.execute().fetchall() returns the mock data directly
rows = session.execute(None).fetchall()
results = []
for row_data in rows:
if isinstance(row_data, tuple) and len(row_data) == 2:
row, similarity = row_data
results.append({
"segment_id": row.segment_id,
"user_id": row.user_id,
"timestamp": row.timestamp,
"similarity": similarity,
})
return results
- [ ] Step 4: Run tests to verify they pass
Run: cd main/server && python -m pytest worldmm/tests/test_visual_index.py -v Expected: All 2 tests PASS
- [ ] Step 5: Commit
git add main/server/worldmm/memory/visual/index.py \
main/server/worldmm/tests/test_visual_index.py
git commit -m "feat(worldmm): add visual embedding pgvector query function"
Task 17: Full Test Suite Verification
- [ ] Step 1: Run the complete test suite
Run: cd main/server && python -m pytest worldmm/tests/ -v --tb=short Expected: All tests PASS (should be ~55 tests across 15 test files)
- [ ] Step 2: Verify no import issues across modules
Run: cd main/server && python -c "from worldmm.llm.client import LLMClient, build_request; from worldmm.memory.episodic.graph import EpisodicGraph; from worldmm.memory.semantic.graph import SemanticGraph; from worldmm.retrieval.agent import ReasoningAgent; print('All imports OK')" Expected: All imports OK
- [ ] Step 3: Final commit with any fixes
Only if Step 1 or 2 revealed issues that needed fixing.