Skip to content

Sub-Session Splitting

Plan Metadata

  • Plan type: plan
  • Parent plan: N/A
  • Depends on: docs/docs/ingest-window.md, docs/docs/memories-feed.md
  • Status: draft

System Intent

  • What is being built: A semantic chunking system that splits long recording sessions into meaningful memory cards in real time. Instead of grouping all segments from a 45-minute session under one feed card, the system identifies topic/scene boundaries and creates separate feed cards for each coherent chunk (e.g., "Breakfast conversation", "Work call", "Evening walk").

  • Primary consumer(s): The memories feed Lambda, which currently groups segments by source_session_id. Under the new system, the feed will group by sub_session_id instead, allowing users to see fine-grained memories rather than monolithic session summaries.

  • Boundary (black-box scope only):

  • Ingest time assignment: After a window completes enrichment (transcription, visual caption, entity extraction), determine whether it continues the current sub-session or starts a new one.
  • Database changes: Add sub_session_id field to WorldMMSegment. Create a new SubSessionMetadata table to store sub-session name/summary.
  • Feed regrouping: Change feed Lambda to group by sub_session_id instead of source_session_id.
  • Backward compatibility: Sessions ingested before sub-session splitting is deployed fall back to source_session_id grouping.

Stage Gate Tracker

  • [x] Stage 1 Mermaid approved
  • [x] Stage 2 I/O contracts approved
  • [x] Stage 3 pseudocode/technical details approved or skipped

1. Mermaid Diagram

graph TD
  IngestWindow["ingest_window.py\nmain/server/worldmm/pipeline/ingest_window.py"]:::updated -->|enriched segment transcript + caption| SubSessionAssigner["sub_session_assigner.py\nmain/server/worldmm/pipeline/sub_session_assigner.py"]:::created
  SubSessionAssigner -->|query latest segment by source_session_id| WorldMMSegment["WorldMMSegment table\nmain/server/worldmm/models.py"]:::updated
  WorldMMSegment -->|current chain windows transcript + caption| SubSessionAssigner
  SubSessionAssigner -->|topic continuation prompt| ClaudeAPI["Claude API\nexternal"]:::unchanged
  ClaudeAPI -->|same or new decision| SubSessionAssigner
  SubSessionAssigner -->|write sub_session_id| WorldMMSegment
  SubSessionAssigner -->|create or update sub-session record| SubSessionMetadata["SubSessionMetadata table\nmain/server/worldmm/models.py"]:::created
  FeedLambda["feed app.py\nmain/server/api/memories/feed/app.py"]:::updated -->|query segments grouped by sub_session_id| WorldMMSegment
  FeedLambda -->|fetch sub-session title| SubSessionMetadata
  WorldMMSegment -->|grouped segment rows| FeedLambda
  SubSessionMetadata -->|sub-session name| FeedLambda
  FeedLambda -->|MemoryFeedItem per sub-session| MobileApp["Mobile Feed\nexternal"]:::unchanged

classDef unchanged fill:#d3d3d3,stroke:#666,stroke-width:1px;
classDef updated fill:#ffe58a,stroke:#666,stroke-width:1px;
classDef created fill:#a8e6a3,stroke:#666,stroke-width:1px;

2. Black-Box Inputs and Outputs

Global Types

SubSessionId {
  value: string (UUID, unique per sub-session across a user)
}

SessionContext {
  source_session_id: string (original recording session)
  current_sub_session_id: string (active sub-session ID)
}

WindowTranscript {
  text: string (30-second window transcript)
  duration_seconds: int (window length, typically 30)
  window_index: int (sequence number in session)
  start_time: timestamp (window start time)
}

WindowCaption {
  text: string (visual caption from GPU worker)
  entities: [string] (named entities extracted from vision)
  objects: [string] (detected objects/scenes)
}

SubSessionWindow {
  segment_id: string (WorldMMSegment primary key)
  transcript: WindowTranscript (transcription data)
  caption: WindowCaption (visual/caption data)
}

ContinuationDecision {
  continue: boolean (true = same topic, false = new topic)
  reasoning: string (explanation of decision)
}

SubSessionMetadata {
  sub_session_id: SubSessionId (reference)
  name: string (user-facing title, e.g., "Breakfast conversation")
  summary: string (optional: short paragraph describing the chunk)
  window_count: int (number of segments in this sub-session)
  start_time: timestamp (earliest segment in chain)
  end_time: timestamp (latest segment in chain)
  created_at: timestamp
}

Flow: assignSubSessionToWindow

  • Test files: tests/integration/test_sub_session_assignment.py, tests/unit/test_sub_session_continuation_logic.py
  • Core files: main/server/worldmm/pipeline/ingest_window.py, main/server/worldmm/pipeline/sub_session_assigner.py

Type Definitions

AssignSubSessionInput {
  segment_id: string (required, the newly completed segment)
  session_id: string (required, the recording session)
  user_id: string (required)
  transcript: WindowTranscript (required)
  caption: WindowCaption (required)
}

AssignSubSessionOutput {
  sub_session_id: SubSessionId (assigned or created)
  is_new_chain: boolean (true if new sub-session was created)
  chain_window_count: int (total windows now in this chain)
}

AssignSubSessionError {
  status: string (one of: "db_error", "claude_error", "validation_error")
  message: string (human-readable error)
}

Paths

path input output path-type notes updated
assignSubSession.continueTopic AssignSubSessionInput AssignSubSessionOutput{is_new_chain=false} happy path New window continues current topic; inherits chain's sub_session_id Y
assignSubSession.startNewTopic AssignSubSessionInput AssignSubSessionOutput{is_new_chain=true} happy path Claude detects topic change; new sub_session_id generated Y
assignSubSession.firstWindowInSession AssignSubSessionInput AssignSubSessionOutput{is_new_chain=true, chain_window_count=1} happy path No prior segments in session; create first sub-session Y
assignSubSession.outOfOrderWindow AssignSubSessionInput AssignSubSessionOutput edge case Window processes after later windows; uses best-effort chain context available at assignment time Y
assignSubSession.claudeCallFailure AssignSubSessionInput AssignSubSessionError{status=claude_error} error Claude API unavailable; fallback: create new sub-session ID Y
assignSubSession.databaseError AssignSubSessionInput AssignSubSessionError{status=db_error} error Query or insert fails; caller (ingest_window) retries via DLQ Y

Flow: feedRegrouping

  • Test files: tests/integration/test_feed_with_sub_sessions.py
  • Core files: main/server/api/memories/feed/app.pylambda_handler

Type Definitions

FeedRequest {
  cursor: int (offset for pagination)
  limit: int (max items per page, capped at 50)
}

MemoryFeedItem {
  id: string (sub_session_id, not source_session_id)
  time: timestamp (earliest window in sub-session)
  type: string (one of: "visual", "audio", "text")
  thumbnail: string (optional presigned S3 URL)
  processing_status: string (one of: "pending", "complete", "failed")
  title: string (optional sub-session title from SubSessionMetadata)
}

FeedResponse {
  memories: [MemoryFeedItem] (paginated results)
  next_cursor: int | null (next offset, or null if no more pages)
}

Paths

path input output path-type notes updated
feedRegrouping.happyPath FeedRequest FeedResponse with sub_session_id grouping happy path Sessions with sub_session_id data group by sub_session_id Y
feedRegrouping.backwardCompat FeedRequest FeedResponse with source_session_id grouping for old segments happy path Segments without sub_session_id fall back to source_session_id grouping Y
feedRegrouping.mixedOldNew FeedRequest FeedResponse mixing old/new grouping edge case User has segments from before and after deployment; both grouping strategies coexist Y
feedRegrouping.emptySession FeedRequest FeedResponse{memories=[]} happy path User has no memories; empty list returned Y

3. Pseudocode / Technical Details for Critical Flows (Optional)

Flow: assignSubSession

function assignSubSessionToWindow(segment_id, session_id, user_id, transcript, caption):
  // Step 1: Find the most recently completed segment in this session
  latest_segment = query WorldMMSegment
    where source_session_id = session_id and processing_status = 'complete'
    order by start_time DESC
    limit 1

  if latest_segment is null:
    // First window in session; create new sub-session
    sub_session_id = generate_uuid()
    write SubSessionMetadata(sub_session_id, name="Untitled", window_count=1)
    update WorldMMSegment.sub_session_id = sub_session_id
    return { sub_session_id, is_new_chain: true }

  // Step 2: Fetch all segments in the current chain
  current_chain = query WorldMMSegment
    where sub_session_id = latest_segment.sub_session_id
    order by start_time ASC

  // Step 3: Prepare context for Claude (summarize if chain is long)
  if current_chain.length > 20:
    // Keep only last 10 windows + first 2 for context
    chain_context = current_chain[0:2] + current_chain[-10:]
  else:
    chain_context = current_chain

  // Build prompt
  chain_transcripts = [w.transcript.text for w in chain_context]
  chain_captions = [w.caption.text for w in chain_context]

  prompt = """
    You are an expert at identifying semantic boundaries in conversation and scene changes.

    Current topic chain (most recent segments):
    Transcripts: {chain_transcripts}
    Visuals: {chain_captions}

    New incoming window:
    Transcript: {transcript.text}
    Visual: {caption.text}

    Question: Does this new window CONTINUE the same conversational topic and scene,
    or does it start something MEANINGFULLY DIFFERENT?

    Respond with JSON: {{ "continue": true/false, "reasoning": "..." }}
  """

  // Step 4: Call Claude
  try:
    response = claude_api.messages.create(
      model="claude-3-5-sonnet-20241022",
      max_tokens=200,
      messages=[{ role: "user", content: prompt }]
    )
    decision = parse_json(response.content[0].text)
  catch ClaudeApiError:
    // Fallback: create new sub-session on API failure
    sub_session_id = generate_uuid()
    write SubSessionMetadata(...)
    update WorldMMSegment.sub_session_id = sub_session_id
    return { sub_session_id, is_new_chain: true }

  // Step 5: Assign based on decision
  if decision.continue:
    // Inherit the current chain's sub_session_id
    update WorldMMSegment.sub_session_id = latest_segment.sub_session_id
    update SubSessionMetadata.window_count += 1
    update SubSessionMetadata.end_time = segment_id.start_time
    return {
      sub_session_id: latest_segment.sub_session_id,
      is_new_chain: false,
      chain_window_count: current_chain.length + 1
    }
  else:
    // Create new chain
    sub_session_id = generate_uuid()
    write SubSessionMetadata(sub_session_id, name="Untitled", window_count=1)
    update WorldMMSegment.sub_session_id = sub_session_id
    return { sub_session_id, is_new_chain: true }

Flow: feedRegrouping

function getFeed(user_id, cursor=0, limit=20):
  // Query all segments for user
  segments = query WorldMMSegment
    where user_id = user_id
    order by start_time DESC, id DESC

  // Group by sub_session_id (or source_session_id if sub_session_id is null)
  grouped = {}
  for segment in segments:
    group_key = segment.sub_session_id or segment.source_session_id
    if group_key not in grouped:
      grouped[group_key] = []
    grouped[group_key].append(segment)

  // Sort groups by latest segment's start_time
  sorted_groups = sorted(grouped.items(),
    key=lambda g: max(s.start_time for s in g[1]),
    reverse=True
  )

  // Pagination
  paginated_groups = sorted_groups[cursor : cursor + limit + 1]
  has_next = len(paginated_groups) > limit
  groups_to_return = paginated_groups[:limit]

  // Build response
  memories = []
  for group_id, group_segments in groups_to_return:
    metadata = query SubSessionMetadata where sub_session_id = group_id (if exists)

    memory_item = {
      id: group_id,
      time: min(s.start_time for s in group_segments),
      type: richest_type([s.type for s in group_segments]),
      thumbnail: first_visual_thumbnail(group_segments),
      processing_status: aggregate_status(group_segments),
      title: metadata.name if metadata else "Untitled Memory"
    }
    memories.append(memory_item)

  return {
    memories,
    next_cursor: cursor + limit if has_next else null
  }
  • Implementation notes:
  • Sub-session assignment is a synchronous call within ingest_window.pylambda_handler, running after GPU/enrichment steps complete.
  • Claude calls should have a timeout of 5 seconds; if Claude is slow or unavailable, fallback to creating a new sub-session.
  • The "best-effort" out-of-order strategy means if window 7 finishes before window 6, it uses whatever chain context is available. This may create slightly earlier boundaries than ideal, but is acceptable.
  • Feed Lambda changes are backward compatible: segments without sub_session_id automatically group by source_session_id.
  • Optional: Sub-session name/summary can be populated asynchronously via a separate Lambda that runs periodically (e.g., every 30 seconds) to re-title completed sub-sessions based on their full transcript.

After all stages are approved, apply .agent/skills/reconcile-plans/SKILL.md to propagate contract updates across linked plans (ingest-window.md, memories-feed.md).