Sub-Session Splitting
Plan Metadata
- Plan type:
plan - Parent plan:
N/A - Depends on:
docs/docs/ingest-window.md,docs/docs/memories-feed.md - Status:
draft
System Intent
-
What is being built: A semantic chunking system that splits long recording sessions into meaningful memory cards in real time. Instead of grouping all segments from a 45-minute session under one feed card, the system identifies topic/scene boundaries and creates separate feed cards for each coherent chunk (e.g., "Breakfast conversation", "Work call", "Evening walk").
-
Primary consumer(s): The memories feed Lambda, which currently groups segments by
source_session_id. Under the new system, the feed will group bysub_session_idinstead, allowing users to see fine-grained memories rather than monolithic session summaries. -
Boundary (black-box scope only):
- Ingest time assignment: After a window completes enrichment (transcription, visual caption, entity extraction), determine whether it continues the current sub-session or starts a new one.
- Database changes: Add
sub_session_idfield toWorldMMSegment. Create a newSubSessionMetadatatable to store sub-session name/summary. - Feed regrouping: Change feed Lambda to group by
sub_session_idinstead ofsource_session_id. - Backward compatibility: Sessions ingested before sub-session splitting is deployed fall back to
source_session_idgrouping.
Stage Gate Tracker
- [x] Stage 1 Mermaid approved
- [x] Stage 2 I/O contracts approved
- [x] Stage 3 pseudocode/technical details approved or skipped
1. Mermaid Diagram
graph TD
IngestWindow["ingest_window.py\nmain/server/worldmm/pipeline/ingest_window.py"]:::updated -->|enriched segment transcript + caption| SubSessionAssigner["sub_session_assigner.py\nmain/server/worldmm/pipeline/sub_session_assigner.py"]:::created
SubSessionAssigner -->|query latest segment by source_session_id| WorldMMSegment["WorldMMSegment table\nmain/server/worldmm/models.py"]:::updated
WorldMMSegment -->|current chain windows transcript + caption| SubSessionAssigner
SubSessionAssigner -->|topic continuation prompt| ClaudeAPI["Claude API\nexternal"]:::unchanged
ClaudeAPI -->|same or new decision| SubSessionAssigner
SubSessionAssigner -->|write sub_session_id| WorldMMSegment
SubSessionAssigner -->|create or update sub-session record| SubSessionMetadata["SubSessionMetadata table\nmain/server/worldmm/models.py"]:::created
FeedLambda["feed app.py\nmain/server/api/memories/feed/app.py"]:::updated -->|query segments grouped by sub_session_id| WorldMMSegment
FeedLambda -->|fetch sub-session title| SubSessionMetadata
WorldMMSegment -->|grouped segment rows| FeedLambda
SubSessionMetadata -->|sub-session name| FeedLambda
FeedLambda -->|MemoryFeedItem per sub-session| MobileApp["Mobile Feed\nexternal"]:::unchanged
classDef unchanged fill:#d3d3d3,stroke:#666,stroke-width:1px;
classDef updated fill:#ffe58a,stroke:#666,stroke-width:1px;
classDef created fill:#a8e6a3,stroke:#666,stroke-width:1px; 2. Black-Box Inputs and Outputs
Global Types
SubSessionId {
value: string (UUID, unique per sub-session across a user)
}
SessionContext {
source_session_id: string (original recording session)
current_sub_session_id: string (active sub-session ID)
}
WindowTranscript {
text: string (30-second window transcript)
duration_seconds: int (window length, typically 30)
window_index: int (sequence number in session)
start_time: timestamp (window start time)
}
WindowCaption {
text: string (visual caption from GPU worker)
entities: [string] (named entities extracted from vision)
objects: [string] (detected objects/scenes)
}
SubSessionWindow {
segment_id: string (WorldMMSegment primary key)
transcript: WindowTranscript (transcription data)
caption: WindowCaption (visual/caption data)
}
ContinuationDecision {
continue: boolean (true = same topic, false = new topic)
reasoning: string (explanation of decision)
}
SubSessionMetadata {
sub_session_id: SubSessionId (reference)
name: string (user-facing title, e.g., "Breakfast conversation")
summary: string (optional: short paragraph describing the chunk)
window_count: int (number of segments in this sub-session)
start_time: timestamp (earliest segment in chain)
end_time: timestamp (latest segment in chain)
created_at: timestamp
}
Flow: assignSubSessionToWindow
- Test files:
tests/integration/test_sub_session_assignment.py,tests/unit/test_sub_session_continuation_logic.py - Core files:
main/server/worldmm/pipeline/ingest_window.py,main/server/worldmm/pipeline/sub_session_assigner.py
Type Definitions
AssignSubSessionInput {
segment_id: string (required, the newly completed segment)
session_id: string (required, the recording session)
user_id: string (required)
transcript: WindowTranscript (required)
caption: WindowCaption (required)
}
AssignSubSessionOutput {
sub_session_id: SubSessionId (assigned or created)
is_new_chain: boolean (true if new sub-session was created)
chain_window_count: int (total windows now in this chain)
}
AssignSubSessionError {
status: string (one of: "db_error", "claude_error", "validation_error")
message: string (human-readable error)
}
Paths
| path | input | output | path-type | notes | updated |
|---|---|---|---|---|---|
assignSubSession.continueTopic | AssignSubSessionInput | AssignSubSessionOutput{is_new_chain=false} | happy path | New window continues current topic; inherits chain's sub_session_id | Y |
assignSubSession.startNewTopic | AssignSubSessionInput | AssignSubSessionOutput{is_new_chain=true} | happy path | Claude detects topic change; new sub_session_id generated | Y |
assignSubSession.firstWindowInSession | AssignSubSessionInput | AssignSubSessionOutput{is_new_chain=true, chain_window_count=1} | happy path | No prior segments in session; create first sub-session | Y |
assignSubSession.outOfOrderWindow | AssignSubSessionInput | AssignSubSessionOutput | edge case | Window processes after later windows; uses best-effort chain context available at assignment time | Y |
assignSubSession.claudeCallFailure | AssignSubSessionInput | AssignSubSessionError{status=claude_error} | error | Claude API unavailable; fallback: create new sub-session ID | Y |
assignSubSession.databaseError | AssignSubSessionInput | AssignSubSessionError{status=db_error} | error | Query or insert fails; caller (ingest_window) retries via DLQ | Y |
Flow: feedRegrouping
- Test files:
tests/integration/test_feed_with_sub_sessions.py - Core files:
main/server/api/memories/feed/app.py→lambda_handler
Type Definitions
FeedRequest {
cursor: int (offset for pagination)
limit: int (max items per page, capped at 50)
}
MemoryFeedItem {
id: string (sub_session_id, not source_session_id)
time: timestamp (earliest window in sub-session)
type: string (one of: "visual", "audio", "text")
thumbnail: string (optional presigned S3 URL)
processing_status: string (one of: "pending", "complete", "failed")
title: string (optional sub-session title from SubSessionMetadata)
}
FeedResponse {
memories: [MemoryFeedItem] (paginated results)
next_cursor: int | null (next offset, or null if no more pages)
}
Paths
| path | input | output | path-type | notes | updated |
|---|---|---|---|---|---|
feedRegrouping.happyPath | FeedRequest | FeedResponse with sub_session_id grouping | happy path | Sessions with sub_session_id data group by sub_session_id | Y |
feedRegrouping.backwardCompat | FeedRequest | FeedResponse with source_session_id grouping for old segments | happy path | Segments without sub_session_id fall back to source_session_id grouping | Y |
feedRegrouping.mixedOldNew | FeedRequest | FeedResponse mixing old/new grouping | edge case | User has segments from before and after deployment; both grouping strategies coexist | Y |
feedRegrouping.emptySession | FeedRequest | FeedResponse{memories=[]} | happy path | User has no memories; empty list returned | Y |
3. Pseudocode / Technical Details for Critical Flows (Optional)
Flow: assignSubSession
function assignSubSessionToWindow(segment_id, session_id, user_id, transcript, caption):
// Step 1: Find the most recently completed segment in this session
latest_segment = query WorldMMSegment
where source_session_id = session_id and processing_status = 'complete'
order by start_time DESC
limit 1
if latest_segment is null:
// First window in session; create new sub-session
sub_session_id = generate_uuid()
write SubSessionMetadata(sub_session_id, name="Untitled", window_count=1)
update WorldMMSegment.sub_session_id = sub_session_id
return { sub_session_id, is_new_chain: true }
// Step 2: Fetch all segments in the current chain
current_chain = query WorldMMSegment
where sub_session_id = latest_segment.sub_session_id
order by start_time ASC
// Step 3: Prepare context for Claude (summarize if chain is long)
if current_chain.length > 20:
// Keep only last 10 windows + first 2 for context
chain_context = current_chain[0:2] + current_chain[-10:]
else:
chain_context = current_chain
// Build prompt
chain_transcripts = [w.transcript.text for w in chain_context]
chain_captions = [w.caption.text for w in chain_context]
prompt = """
You are an expert at identifying semantic boundaries in conversation and scene changes.
Current topic chain (most recent segments):
Transcripts: {chain_transcripts}
Visuals: {chain_captions}
New incoming window:
Transcript: {transcript.text}
Visual: {caption.text}
Question: Does this new window CONTINUE the same conversational topic and scene,
or does it start something MEANINGFULLY DIFFERENT?
Respond with JSON: {{ "continue": true/false, "reasoning": "..." }}
"""
// Step 4: Call Claude
try:
response = claude_api.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=200,
messages=[{ role: "user", content: prompt }]
)
decision = parse_json(response.content[0].text)
catch ClaudeApiError:
// Fallback: create new sub-session on API failure
sub_session_id = generate_uuid()
write SubSessionMetadata(...)
update WorldMMSegment.sub_session_id = sub_session_id
return { sub_session_id, is_new_chain: true }
// Step 5: Assign based on decision
if decision.continue:
// Inherit the current chain's sub_session_id
update WorldMMSegment.sub_session_id = latest_segment.sub_session_id
update SubSessionMetadata.window_count += 1
update SubSessionMetadata.end_time = segment_id.start_time
return {
sub_session_id: latest_segment.sub_session_id,
is_new_chain: false,
chain_window_count: current_chain.length + 1
}
else:
// Create new chain
sub_session_id = generate_uuid()
write SubSessionMetadata(sub_session_id, name="Untitled", window_count=1)
update WorldMMSegment.sub_session_id = sub_session_id
return { sub_session_id, is_new_chain: true }
Flow: feedRegrouping
function getFeed(user_id, cursor=0, limit=20):
// Query all segments for user
segments = query WorldMMSegment
where user_id = user_id
order by start_time DESC, id DESC
// Group by sub_session_id (or source_session_id if sub_session_id is null)
grouped = {}
for segment in segments:
group_key = segment.sub_session_id or segment.source_session_id
if group_key not in grouped:
grouped[group_key] = []
grouped[group_key].append(segment)
// Sort groups by latest segment's start_time
sorted_groups = sorted(grouped.items(),
key=lambda g: max(s.start_time for s in g[1]),
reverse=True
)
// Pagination
paginated_groups = sorted_groups[cursor : cursor + limit + 1]
has_next = len(paginated_groups) > limit
groups_to_return = paginated_groups[:limit]
// Build response
memories = []
for group_id, group_segments in groups_to_return:
metadata = query SubSessionMetadata where sub_session_id = group_id (if exists)
memory_item = {
id: group_id,
time: min(s.start_time for s in group_segments),
type: richest_type([s.type for s in group_segments]),
thumbnail: first_visual_thumbnail(group_segments),
processing_status: aggregate_status(group_segments),
title: metadata.name if metadata else "Untitled Memory"
}
memories.append(memory_item)
return {
memories,
next_cursor: cursor + limit if has_next else null
}
- Implementation notes:
- Sub-session assignment is a synchronous call within
ingest_window.py→lambda_handler, running after GPU/enrichment steps complete. - Claude calls should have a timeout of 5 seconds; if Claude is slow or unavailable, fallback to creating a new sub-session.
- The "best-effort" out-of-order strategy means if window 7 finishes before window 6, it uses whatever chain context is available. This may create slightly earlier boundaries than ideal, but is acceptable.
- Feed Lambda changes are backward compatible: segments without
sub_session_idautomatically group bysource_session_id. - Optional: Sub-session name/summary can be populated asynchronously via a separate Lambda that runs periodically (e.g., every 30 seconds) to re-title completed sub-sessions based on their full transcript.
4. Handoff to Related Plan Reconciliation
After all stages are approved, apply .agent/skills/reconcile-plans/SKILL.md to propagate contract updates across linked plans (ingest-window.md, memories-feed.md).