Skip to content

Chat Flow

Metadata

  • System type: flow

System Intent

  • What this is: The end-to-end flow for a user asking a natural-language question about their memories. Covers the React Native chat screen, the synchronous dispatcher Lambda, the async worker Lambda, GPU auto-start, and the GET /chats/messages polling path.

Mermaid Diagram

flowchart TD
  ChatScreen["ChatScreen (chat.tsx)"] -->|"POST /memories/chat {question, chat_id?}"| APIGW["API Gateway REST (Cognito auth)"]
  APIGW -->|"invoke sync"| DispatcherLambda["MemoriesChatFunction — dispatcher path"]
  DispatcherLambda -->|"create chat / validate ownership"| DDBChats["DynamoDB: Chats table"]
  DispatcherLambda -->|"lambda:InvokeFunction InvocationType=Event"| WorkerLambda["MemoriesChatFunction — async worker path"]
  DispatcherLambda -->|"save user message"| DDBMessages["DynamoDB: Messages table"]
  DispatcherLambda -->|"{chat_id, status=thinking}"| APIGW
  APIGW -->|"200 {chat_id, status=thinking}"| ChatScreen
  ChatScreen -->|"isLoading=true (Thinking...)"| UI["User sees 'Thinking...'"]

  WorkerLambda -->|"ec2:DescribeInstances"| EC2["EC2: GPU worker instance"]
  WorkerLambda -->|"GET /health (exponential backoff)"| GPUWorker["GPU Worker FastAPI\nport 8000"]
  WorkerLambda -->|"ReasoningAgent.answer()"| GPUWorker
  WorkerLambda -->|"save assistant message"| DDBMessages
  WorkerLambda -->|"touch chat last_activity"| DDBChats

  ChatScreen -->|"POST /chats/messages {chat_id}"| GetMsgs["ChatMessagesFunction"]
  GetMsgs -->|"DynamoDB query"| DDBMessages
  GetMsgs -->|"{messages, next_cursor}"| ChatScreen
  ChatScreen -->|"render assistant bubble"| UI

Flows

Flow: sendMessage (frontend → dispatcher → async worker → poll)

  • Test files:
  • main/app/__tests__/chat-with-memory.test.ts
  • main/server/tests/unit/test_async_chat_dispatcher.py
  • main/server/worldmm/tests/test_chat_endpoint.py
  • Core files:
  • main/app/app/chat.tsx (ChatScreen)
  • main/app/lib/api/memory/chatWithMemory.ts
  • main/app/lib/api/chats/getChatMessages.ts
  • main/server/api/memories/chat/app.py (dispatcher + worker Lambda)
  • main/server/layers/shared/python/shared/chat/repository.py

Types

// Frontend request (POST /memories/chat)
ChatRequest {
  question: string (required)
  chat_id: string (optional — omitted for new conversations)
  verbose: boolean (optional — returns trace when true)
}

// Dispatcher response (synchronous, returned within ~1-2s)
DispatcherResponse {
  chat_id: string
  status: "thinking"
}

// Full answer response (from async worker, accessed via polling)
ChatMessage {
  message_id: string (ULID)
  chat_id: string
  role: "user" | "assistant"
  content: string
  created_at: string (ISO 8601)
}

GetChatMessagesRequest {
  chat_id: string (required)
  cursor: string | null (optional — DynamoDB ExclusiveStartKey)
  limit: number (optional, default 20)
}

GetChatMessagesResponse {
  messages: ChatMessage[]
  next_cursor: string | null
}

Paths

path input output path-type notes
sendMessage.new_chat {question} no chat_id {chat_id, status: "thinking"} happy path New chat created in Chats table with question as title
sendMessage.existing_chat {question, chat_id} {chat_id, status: "thinking"} happy path Ownership verified; 403 if chat belongs to another user
sendMessage.verbose {question, verbose: true} answer + trace object happy path Worker returns full ReasoningAgent trace rounds
sendMessage.gpu_starting GPU cold-starting fallback message saved to DynamoDB graceful degradation Worker returns "GPU worker is starting" message immediately if URL never resolves
sendMessage.gpu_unreachable GPU terminated, no template fallback message saved graceful degradation All EC2 recovery paths exhausted
sendMessage.missing_question {} 400 ValueError error Dispatcher raises ValueError before any DB writes
sendMessage.forbidden {chat_id} owned by other user 403 Forbidden error Ownership check in ChatRepository
sendMessage.async_invoke_fail Lambda invoke fails 500 re-raised error User message NOT saved if invoke fails (prevents orphaned messages)

Pseudocode

// === FRONTEND (chat.tsx) ===

askQuestion(question):
  append user message bubble
  setIsLoading(true)  // shows "Thinking..." footer
  { answer, chat_id } = await chatWithMemory(question, chatId, isDebug)
  setChatId(chat_id)
  append assistant message bubble
  invalidate ["chats", "feed"] (refreshes side menu)
  setIsLoading(false)

// chatWithMemory.ts
POST /memories/chat { question, chat_id?, verbose? }
  timeout: 60_000 ms (client-side axios timeout)
  // Note: API Gateway hard limit is 29s — dispatcher returns well within that
  return { answer, chat_id, trace? }

// NOTE: the chat screen does NOT poll for the answer.
// chatWithMemory.ts awaits the full response from the dispatcher,
// which in production returns { chat_id, status: "thinking" }.
// The current UI flow BLOCKS on chatWithMemory and renders the answer
// from the synchronous response — the dispatcher path sends { chat_id, status }
// but the handler wraps it so the response to the client is the dispatcher result.
// When the async worker later saves the answer, it is accessible via
// POST /chats/messages but the current chat.tsx does NOT poll — it just awaits
// the initial chatWithMemory call which returns once the dispatcher responds.

// === BACKEND DISPATCHER (lambda_handler → _dispatch_async) ===

lambda_handler(event):
  if event._async_worker:
    → run worker path (see below)
  else:
    → run _dispatch_async() (dispatcher path)

_dispatch_async(payload, auth_context):
  validate question != "" else raise ValueError
  validate user_id in auth_context else raise ValueError
  repo = ChatRepository(MESSAGES_TABLE_NAME, CHATS_TABLE_NAME)
  if chat_id is None:
    chat_id = repo.create_chat(user_id, title=question[:120])
  else:
    verify repo.chat_owned_by(user_id, chat_id) else raise Forbidden
  async_payload = { ...payload, _async_worker: True, _user_id: user_id, chat_id }
  lambda.invoke(FunctionName=self, InvocationType="Event", Payload=async_payload)
  repo.save_message(chat_id, "user", question)  // saved AFTER invoke succeeds
  return { chat_id, status: "thinking" }

// === ASYNC WORKER PATH ===

lambda_handler(event) when event._async_worker == True:
  auth = { user_id: event._user_id }
  result = implementation(payload, auth, _skip_user_message_save=True)
  return { statusCode: 200, body: result }

implementation(payload, auth, _skip_user_message_save=False):
  // 1. DB init
  configure PostgreSQL from SSM secrets
  orm.initDb()

  // 2. DynamoDB setup
  repo = ChatRepository(...)
  if not _skip_user_message_save:
    repo.save_message(chat_id, "user", question)

  // 3. GPU resolution
  gpu_instance_id = normalize(GPU_INSTANCE_ID env var)
  if not gpu_instance_id:
    → find running by tag
    → find stopped by tag and start_instances
    → launch from template with type fallback
  else:
    gpu_worker_url = _resolve_gpu_url(instance_id, port)

  // 4. GPU health polling
  if gpu_worker_url:
    is_healthy, elapsed = wait_for_gpu_health(url, GPU_MAX_WAIT_MS=15000, initial=2000)
    if not is_healthy: gpu_worker_url = None

  // 5. Fallback
  if not gpu_worker_url:
    repo.save_message(chat_id, "assistant", "GPU starting up…")
    repo.touch_chat(user_id, chat_id)
    return fallback

  // 6. Answer
  result = handle_chat(user_id, question, chat_id, gpu_worker_url)
  return result

handle_chat(user_id, question, chat_id, gpu_worker_url):
  vlm2vec = VLM2VecClient(gpu_worker_url)
  reasoning_llm = GPULLMClient(vlm2vec)
  embedder = TextEmbedder(vlm2vec)
  episodic_graphs = load_episodic_graphs(user_id)
  semantic_graph = load_semantic_graph(user_id)
  agent = ReasoningAgent(reasoning_llm, response_llm, { episodic, semantic, visual })
  answer, trace = agent.answer(question)
  repo.save_message(chat_id, "assistant", answer)
  repo.touch_chat(user_id, chat_id)
  return { answer, chat_id, trace? }

Flow: gpuAutoStart

  • Core files: main/server/api/memories/chat/app.py
  • _find_gpu_instance_by_tag()
  • _resolve_gpu_url()
  • _launch_gpu_with_type_fallback()
  • _update_ssm_gpu_instance_id()

Paths

path input output path-type notes
gpuAutoStart.running_by_ssm GPU_INSTANCE_ID env = valid instance ID, state=running http://{privateIP}:{port} happy path Private IP preferred over public for SG-to-SG rules
gpuAutoStart.running_by_tag SSM ID stale/None, running instance found by tag URL returned, SSM updated happy path Tag filter: Name=encache-gpu-worker, state=running
gpuAutoStart.stopped_restart Instance state=stopped start_instances called, URL=None (worker waits) recovery Returns None immediately; health poll waits for boot
gpuAutoStart.stopped_no_capacity start_instances → InsufficientInstanceCapacity SSM cleared to "none", launch from template fallback recovery normalize_gpu_instance_id("none") → None
gpuAutoStart.terminated_relaunch Instance state=terminated EC2 run_instances with launch template + type fallback recovery SSM updated with new instance ID
gpuAutoStart.type_fallback Primary type unavailable (InsufficientInstanceCapacity) Try next type in GPU_FALLBACK_INSTANCE_TYPES recovery Default fallback order: g6e.2xlarge,g5.2xlarge,g4dn.2xlarge
gpuAutoStart.all_exhausted All instance types fail GPU URL = None, fallback message saved error Logs chat_gpu_all_types_exhausted

Pseudocode

// GPU resolution when instance_id is set:
_resolve_gpu_url(instance_id, port, launch_template_id, region):
  state = ec2.describe_instances([instance_id]).State.Name
  if state in (stopped, stopping):
    running = _find_running_gpu_instance_by_tag()
    if running: adopt running instance, update SSM, return URL
    if state == stopped:
      try: ec2.start_instances([instance_id])
      except InsufficientInstanceCapacity:
        _update_ssm_gpu_instance_id("none")
        if launch_template_id: _launch_gpu_with_type_fallback(...)
    return None
  if state == terminated:
    running = _find_running_gpu_instance_by_tag()
    if running: adopt running instance, update SSM, return URL
    if launch_template_id: _launch_gpu_with_type_fallback(...)
    return None
  ip = instance.PrivateIpAddress or instance.PublicIpAddress
  return f"http://{ip}:{port}"

// GPU resolution when instance_id is None (stale SSM / first deploy):
  1. _find_gpu_instance_by_tag(state=running) → adopt if found
  2. _find_gpu_instance_by_tag(state=stopped) → start_instances if found
  3. _launch_gpu_with_type_fallback(launch_template_id) if still None

_launch_gpu_with_type_fallback(launch_template_id):
  for type in GPU_FALLBACK_INSTANCE_TYPES:
    try:
      ami = _gpu_ami_for_type(type)  // per-type AMI override from GPU_INSTANCE_TYPE_AMIS
      resp = ec2.run_instances(LaunchTemplate, InstanceType=type, Tags=[Name=encache-gpu-worker])
      _update_ssm_gpu_instance_id(resp.InstanceId)
      return resp.InstanceId
    except InsufficientInstanceCapacity:
      continue  // try next type
    except other ClientError:
      return None  // surface in logs, abort
  return None  // all types exhausted

Flow: gpuHealthPoll

  • Core files: main/server/api/memories/chat/app.py
  • wait_for_gpu_health()
  • _gpu_is_healthy()

Pseudocode

wait_for_gpu_health(gpu_url, max_wait_ms=15000, initial_delay_ms=2000, lambda_context):
  start = time.time()
  retry_delay_ms = initial_delay_ms
  retry_count = 0
  lambda_timeout_guard_ms = 10000  // reserve 10s for response assembly

  loop:
    is_healthy = GET {gpu_url}/health (timeout=3s) == 200
    if is_healthy: return (True, elapsed_ms)
    elapsed_ms = (time.time() - start) * 1000
    if elapsed_ms >= max_wait_ms: return (False, elapsed_ms)  // max-wait exhausted
    if lambda_remaining_ms - elapsed_ms <= 10000: return (False, elapsed_ms)  // Lambda timeout guard
    sleep(retry_delay_ms / 1000)
    retry_delay_ms = min(retry_delay_ms * 2, 60000)  // exponential backoff, cap 60s

// GPU worker /health endpoint (server.py):
GET /health:
  if _embed_model is None or _caption_model is None:
    return HTTP 503 (models still loading)
  return { status: "ready", model: "VLM2Vec-2B+Qwen2-VL-7B", device: "cuda:0" }

Flow: getMessages (history load + post-answer refresh)

  • Test files: main/app/__tests__/get-chat-messages.test.ts
  • Core files:
  • main/app/lib/api/chats/getChatMessages.ts
  • main/server/api/chats/messages/app.py

Paths

path input output path-type notes
getMessages.load_history {chat_id} on screen open {messages[], next_cursor} happy path Called once on mount when opening an existing chat
getMessages.paginated {chat_id, cursor} {messages[], next_cursor} happy path Messages sorted descending (newest first); reversed in UI
getMessages.forbidden chat_id not owned by user 403 error ChatRepository.get_messages checks ownership
getMessages.missing_chat_id {} 400 ValueError error Validated in implementation()

Pseudocode

// Backend: ChatMessagesFunction
implementation(payload, auth_context):
  validate user_id, chat_id
  repo.get_messages(user_id, chat_id, cursor, limit=20)

// ChatRepository.get_messages:
  verify chat_owned_by(user_id, chat_id) else Forbidden
  DynamoDB.query(
    KeyConditionExpression: "chat_id = :cid",
    ScanIndexForward: False,   // newest first
    Limit: limit,
    ExclusiveStartKey: cursor  // if paginating
  )
  return (items, LastEvaluatedKey)

// Frontend: chat.tsx history load
useEffect (on mount, if params.chatId):
  { messages } = await getChatMessages({ chat_id: params.chatId })
  setMessages([...messages].reverse())  // reverse to chronological order

Flow: ReasoningAgent (multi-step retrieval loop)

  • Core files: main/server/worldmm/retrieval/agent.py

Pseudocode

ReasoningAgent.answer(question):
  rounds = [], accumulated_context = [], trace = []
  for round in range(MAX_ROUNDS=5):
    messages = _build_reasoning_messages(question, rounds)
    raw = reasoning_llm.call(model=gpu, json_object)
    action = parse_agent_action(raw)  // normalizes {decision/action} field
    if action.action == "ANSWER": break
    if action.action == "SEARCH":
      results = retrievers[action.memory_type](action.query)
      // retrievers: "episodic", "semantic", "visual"
      rounds.append({ memory_type, query, results })
      accumulated_context.extend(results)
      trace.append({ round, decision: SEARCH, memory_type, agent_query, results, result_count })
  response = _generate_response(question, accumulated_context)
  return response, trace

Logs

Source Location
Dispatcher start/invoke CloudWatch: /aws/lambda/server-MemoriesChatFunction-* step chat_dispatcher_start, chat_dispatcher_async_invoke_sent
GPU resolve / EC2 state step gpu_instance_state_issue, gpu_instance_adopted_by_tag, gpu_instance_starting_stopped, gpu_instance_relaunching
GPU health polling step gpu_retry_healthy, gpu_retry_exhausted_max_wait, gpu_retry_lambda_timeout_approaching
GPU launch step chat_gpu_instance_launched, chat_gpu_capacity_unavailable, chat_gpu_all_types_exhausted
SSM update step ssm_gpu_instance_id_updated, ssm_gpu_instance_id_update_failed
Worker path step chat_impl_start, chat_gpu_resolved, chat_gpu_retry_failed
Answer complete log_event("chat", "agent_answer_complete", ...)

Deployment

  • Mechanism: SAM
  • Deploy command:
    cd main/server
    sam build
    sam deploy
    
  • Notes:
  • Stack: server, region: us-east-1
  • MemoriesChatFunction: Python 3.12, Timeout 60s, MemorySize 2048 MB, in VPC
  • ChatMessagesFunction: Python 3.12, default 30s Timeout
  • ChatsListFunction: Python 3.12, default 30s Timeout
  • GPU_MAX_WAIT_MS=15000 is hardcoded in template.yaml (not overridable via SSM). This caps GPU health polling to keep the total Lambda duration well under the API Gateway REST API 29s hard timeout. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md.
  • GPU config resolved from SSM at deploy time: /encache/gpu/instance_id, /encache/gpu/worker_port, /encache/gpu/launch_template_id, /encache/gpu/instance_type_amis
  • DynamoDB tables: encache-chats (key: user_id + chat_id, GSI: user_id-last_activity-index), encache-chat-messages (key: chat_id + message_id)
  • MemoriesChatFunction has lambda:InvokeFunction permission scoped to itself (for async self-invocation)

Key Files

File Role
main/app/app/chat.tsx Chat screen UI — message list, InputDock, "Thinking..." indicator
main/app/lib/api/memory/chatWithMemory.ts POST /memories/chat API call (60s client timeout)
main/app/lib/api/chats/getChatMessages.ts POST /chats/messages API call (history load)
main/app/lib/api/chats/fetchChats.ts POST /chats/list API call (side-menu conversation list)
main/app/lib/api/chats/useChatsApi.ts React Query hooks: useChatsFeed, useChatMessages, useChatWithMemory
main/server/api/memories/chat/app.py Lambda: dispatcher + async worker + GPU resolution + health polling
main/server/api/chats/messages/app.py Lambda: GET messages for a chat
main/server/api/chats/list/app.py Lambda: list user's conversations
main/server/layers/shared/python/shared/chat/repository.py ChatRepository — DynamoDB CRUD for chats and messages
main/server/layers/shared/python/shared/gpu_utils.py normalize_gpu_instance_id() — null sentinel handling
main/server/worldmm/retrieval/agent.py ReasoningAgent — multi-step search/answer loop
main/server/worldmm/gpu_worker/server.py FastAPI GPU worker — /health, /encode-video, /encode-text, /caption

Architecture Notes

Async Dispatcher Pattern

The chat flow uses an async self-invocation pattern to decouple the synchronous API Gateway response from GPU warm-up time:

  1. The dispatcher Lambda runs on the synchronous HTTP request path. It creates/validates the chat, fires an async Lambda invocation (InvocationType=Event), saves the user message to DynamoDB, and returns {chat_id, status: "thinking"} — all within ~1-2 seconds, well under the API Gateway 29s hard timeout.
  2. The async worker Lambda runs independently. It performs all GPU resolution, health polling, ReasoningAgent execution, and saves the assistant answer to DynamoDB.
  3. The frontend currently does not poll for the answer after receiving {status: "thinking"}chatWithMemory.ts awaits the full answer from a single POST call. This means the status: "thinking" response is the terminal response for the current UI.

GPU Instance Lifecycle

The GPU worker auto-shuts down after 1 hour of idle via a watchdog thread in server.py (IDLE_TIMEOUT_S=3600, checked every 30s). The watchdog does NOT update SSM when shutting down, which creates a stale-ID cycle. The tag-based fallback in _resolve_gpu_url and implementation() breaks this cycle by scanning for Name=encache-gpu-worker instances when the SSM ID is stale.

Private IP is always preferred over public IP because both the Lambda and GPU worker run in the same VPC. Using private IP ensures security-group-to-security-group rules apply correctly; public IP routes through NAT and loses SG identity.

API Gateway Timeout Constraint

API Gateway REST API has a hard 29-second maximum integration timeout. The Lambda Timeout is 60s, but any Lambda response arriving after 29s is discarded by API Gateway (client sees 504). GPU_MAX_WAIT_MS=15000 is set to keep the dispatcher Lambda within the 29s window even accounting for exponential backoff overshoot. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md for full diagnosis.