Skip to content

Chat Flow

Metadata

  • System type: flow

System Intent

  • What this is: The end-to-end flow for a user asking a natural-language question about their memories. Covers the React Native chat screen, the synchronous dispatcher Lambda, the async worker Lambda, GPU auto-start, and the GET /chats/messages polling path.

Mermaid Diagram

flowchart TD
  ChatScreen["ChatScreen (chat.tsx)"] -->|"POST /memories/chat {question, chat_id?}"| APIGW["API Gateway REST (Cognito auth)"]
  APIGW -->|"invoke sync"| DispatcherLambda["MemoriesChatFunction — dispatcher path"]
  DispatcherLambda -->|"create chat / validate ownership"| DDBChats["DynamoDB: Chats table"]
  DispatcherLambda -->|"lambda:InvokeFunction InvocationType=Event"| WorkerLambda["MemoriesChatFunction — async worker path"]
  DispatcherLambda -->|"save user message + placeholder ready=false"| DDBMessages["DynamoDB: Messages table"]
  DispatcherLambda -->|"{chat_id, message_id, status=thinking}"| APIGW
  APIGW -->|"200 {chat_id, message_id, status=thinking}"| ChatScreen
  ChatScreen -->|"poll pollForMessage every 500ms"| GetMsgs["ChatMessagesFunction"]
  GetMsgs -->|"DynamoDB query"| DDBMessages
  GetMsgs -->|"{messages, next_cursor}"| ChatScreen

  WorkerLambda -->|"ec2:DescribeInstances"| EC2["EC2: GPU worker instance"]
  WorkerLambda -->|"GET /health (exponential backoff)"| GPUWorker["GPU Worker FastAPI\nport 8000"]
  WorkerLambda -->|"GPU healthy: ReasoningAgent.answer()"| GPUWorker
  WorkerLambda -->|"GPU healthy: save answer ready=true"| DDBMessages
  WorkerLambda -->|"GPU timeout: mark_gpu_pending + enqueue retry_count=0"| SQS["ChatGpuRetryQueue (SQS)"]
  WorkerLambda -->|"touch chat last_activity"| DDBChats

  SQS -->|"trigger every 2 min"| RetryLambda["ChatGpuRetryFunction"]
  RetryLambda -->|"GET /health"| GPUWorker
  RetryLambda -->|"GPU healthy: save answer ready=true"| DDBMessages
  RetryLambda -->|"push notification"| Expo["Expo Push API"]
  RetryLambda -->|"GPU still down + retries < 15: re-enqueue"| SQS
  RetryLambda -->|"max retries: save failure ready=true + push"| DDBMessages

  ChatScreen -->|"ready=true: render assistant bubble"| UI["User sees answer"]

  AppLayout["_layout.tsx (on auth)"] -->|"PUT /users/push-token"| PushTokenFn["UsersPushTokenFunction"]
  PushTokenFn -->|"SET push_token"| Sessions["Sessions DynamoDB"]
  RetryLambda -->|"GET push_token"| Sessions

Flows

Flow: sendMessage (frontend → dispatcher → async worker → poll)

  • Test files:
  • main/app/__tests__/chat-with-memory.test.ts
  • main/server/tests/unit/test_async_chat_dispatcher.py
  • main/server/worldmm/tests/test_chat_endpoint.py
  • main/server/api/memories/chat/__tests__/test_worker_timeout.py
  • Core files:
  • main/app/app/chat.tsx (ChatScreen)
  • main/app/lib/api/memory/chatWithMemory.ts
  • main/app/lib/api/chats/getChatMessages.ts
  • main/app/lib/api/chats/pollForMessage.ts
  • main/server/api/memories/chat/app.py (dispatcher + worker Lambda)
  • main/server/layers/shared/python/shared/chat/repository.py

Types

// Frontend request (POST /memories/chat)
ChatRequest {
  question: string (required)
  chat_id: string (optional — omitted for new conversations)
  verbose: boolean (optional — returns trace when true)
}

// Dispatcher response (synchronous, returned within ~1-2s)
DispatcherResponse {
  chat_id: string
  message_id: string   // placeholder message ID for polling
  status: "thinking"
}

// ChatMessage — returned by ChatMessagesFunction and pollForMessage
ChatMessage {
  message_id: string (ULID)
  chat_id: string
  role: "user" | "assistant"
  content: string
  ready: boolean        // false while GPU is processing; true when answer is saved
  gpu_pending: boolean  // true when message is queued for GPU retry
  created_at: string (ISO 8601)
}

GetChatMessagesRequest {
  chat_id: string (required)
  message_id: string (optional — filter to a single message for polling)
  cursor: string | null (optional — DynamoDB ExclusiveStartKey)
  limit: number (optional, default 20)
}

GetChatMessagesResponse {
  messages: ChatMessage[]
  next_cursor: string | null
}

Paths

path input output path-type notes
sendMessage.new_chat {question} no chat_id {chat_id, status: "thinking"} happy path New chat created in Chats table with question as title
sendMessage.existing_chat {question, chat_id} {chat_id, status: "thinking"} happy path Ownership verified; 403 if chat belongs to another user
sendMessage.verbose {question, verbose: true} answer + trace object happy path Worker returns full ReasoningAgent trace rounds
sendMessage.gpu_timeout GPU cold-starting, GPU_MAX_WAIT_MS exhausted mark_gpu_pending called; SQS retry enqueued; GpuUnavailableError thrown to frontend graceful degradation Frontend shows "Starting up the GPU, we will notify you when we have an answer"; polling continues but message stays ready=false until retry Lambda resolves
sendMessage.gpu_unreachable GPU terminated, no template fallback message saved graceful degradation All EC2 recovery paths exhausted; legacy sync path only (no message_id)
sendMessage.missing_question {} 400 ValueError error Dispatcher raises ValueError before any DB writes
sendMessage.forbidden {chat_id} owned by other user 403 Forbidden error Ownership check in ChatRepository
sendMessage.async_invoke_fail Lambda invoke fails 500 re-raised error User message NOT saved if invoke fails (prevents orphaned messages)

Pseudocode

// === FRONTEND (chat.tsx) ===

askQuestion(question):
  append user message bubble
  setIsLoading(true)
  { chat_id, message_id, status, answer, trace } = await chatWithMemory(question, chatId, isDebug)
  setChatId(chat_id)

  if status === "thinking" and message_id:
    // Async path: dispatcher returned handle; poll for ready=true
    append placeholder assistant bubble (id=message_id, content="")
    readyMsg = await pollForMessage({ chat_id, message_id })  // 500ms interval, 60s max
    replace placeholder bubble content with readyMsg.content
    invalidate ["chats", "feed"]

  else if answer is defined:
    // Legacy/verbose path: answer returned directly
    append assistant bubble with answer (+ trace if present)
    invalidate ["chats", "feed"]

  catch GpuUnavailableError:
    // GPU timed out and retry was enqueued backend-side
    append assistant bubble: "Starting up the GPU, we will notify you when we have an answer"
    // push notification will arrive when ChatGpuRetryFunction resolves
  catch PollTimeoutError:
    append assistant bubble: "Sorry, I couldn't process that. Please try again."

// chatWithMemory.ts
POST /memories/chat { question, chat_id?, verbose? }
  timeout: 60_000 ms (client-side axios timeout)
  return { chat_id, message_id, status?, answer?, trace? }

// pollForMessage.ts — polls POST /chats/messages?message_id=<id>
pollForMessage({ chat_id, message_id, intervalMs=500, maxWaitMs=60000 }):
  deadline = now + maxWaitMs
  loop:
    { messages } = getChatMessages({ chat_id, message_id })
    msg = messages[0]
    if msg.ready === true: return msg
    if now >= deadline: throw PollTimeoutError
    sleep(intervalMs)
  // NOTE: if the GPU retry Lambda resolves the message after PollTimeoutError,
  // the push notification will alert the user and they can re-open the chat
  // to see the answer loaded via the history fetch.

// === BACKEND DISPATCHER (lambda_handler → _dispatch_async) ===

lambda_handler(event):
  if event._async_worker:
    → run worker path (see below)
  else:
    → run _dispatch_async() (dispatcher path)

_dispatch_async(payload, auth_context):
  validate question != "" else raise ValueError
  validate user_id in auth_context else raise ValueError
  repo = ChatRepository(MESSAGES_TABLE_NAME, CHATS_TABLE_NAME)
  if chat_id is None:
    chat_id = repo.create_chat(user_id, title=question[:120])
  else:
    verify repo.chat_owned_by(user_id, chat_id) else raise Forbidden
  async_payload = { ...payload, _async_worker: True, _user_id: user_id, chat_id }
  lambda.invoke(FunctionName=self, InvocationType="Event", Payload=async_payload)
  repo.save_message(chat_id, "user", question)  // saved AFTER invoke succeeds
  return { chat_id, status: "thinking" }

// === ASYNC WORKER PATH ===

lambda_handler(event) when event._async_worker == True:
  auth = { user_id: event._user_id }
  result = implementation(payload, auth, _skip_user_message_save=True)
  return { statusCode: 200, body: result }

implementation(payload, auth, _skip_user_message_save=False):
  // 1. DB init
  configure PostgreSQL from SSM secrets
  orm.initDb()

  // 2. DynamoDB setup
  repo = ChatRepository(...)
  if not _skip_user_message_save:
    repo.save_message(chat_id, "user", question)

  // 3. GPU resolution
  gpu_instance_id = normalize(GPU_INSTANCE_ID env var)
  if not gpu_instance_id:
    → find running by tag
    → find stopped by tag and start_instances
    → launch from template with type fallback
  else:
    gpu_worker_url = _resolve_gpu_url(instance_id, port)

  // 4. GPU health polling
  if gpu_worker_url:
    is_healthy, elapsed = wait_for_gpu_health(url, GPU_MAX_WAIT_MS=15000, initial=2000)
    if not is_healthy: gpu_worker_url = None

  // 5. Fallback
  if not gpu_worker_url:
    if message_id:  // async dispatcher path
      repo.mark_gpu_pending(chat_id, message_id)  // sets gpu_pending=True, ready=False, content=""
      _enqueue_gpu_retry(chat_id, message_id, user_id, question, retry_count=0, delay_seconds=120)
      return { status: "gpu_retry_enqueued", chat_id, message_id }
      // chatWithMemory.ts raises GpuUnavailableError on this response
    else:  // legacy synchronous path (no message_id)
      repo.save_message(chat_id, "assistant", "GPU starting up…")
      repo.touch_chat(user_id, chat_id)
      return fallback

  // 6. Answer
  result = handle_chat(user_id, question, chat_id, gpu_worker_url)
  return result

handle_chat(user_id, question, chat_id, gpu_worker_url):
  vlm2vec = VLM2VecClient(gpu_worker_url)
  reasoning_llm = GPULLMClient(vlm2vec)
  embedder = TextEmbedder(vlm2vec)
  episodic_graphs = load_episodic_graphs(user_id)
  semantic_graph = load_semantic_graph(user_id)
  agent = ReasoningAgent(reasoning_llm, response_llm, { episodic, semantic, visual })
  answer, trace = agent.answer(question)
  repo.save_message(chat_id, "assistant", answer)
  repo.touch_chat(user_id, chat_id)
  return { answer, chat_id, trace? }

Flow: gpuRetry (ChatGpuRetryFunction — SQS-driven retry)

  • Test files: main/server/api/memories/chat/__tests__/test_gpu_retry.py
  • Core files:
  • main/server/api/memories/chat/retry.py
  • main/server/layers/shared/python/shared/chat/repository.py

Types

// SQS message body (ChatGpuRetryQueue)
GpuRetryRecord {
  chat_id: string
  message_id: string
  user_id: string
  question: string
  retry_count: number   // 0-based; compared against GPU_RETRY_MAX_ATTEMPTS (default 15)
}

Paths

path input output path-type notes
gpuRetry.gpu_healthy SQS record, GPU healthy within GPU_RETRY_WAIT_MS update_message_ready called; Expo push notification sent happy path push skipped if no push_token in Sessions table
gpuRetry.gpu_unhealthy_reenqueue SQS record, GPU unavailable, retry_count < 15 re-enqueued with retry_count+1, DelaySeconds=120 graceful degradation SQS visibility timeout 300s prevents double-processing
gpuRetry.max_retries_exceeded SQS record, retry_count >= 15 failure string saved ready=true; failure push notification sent graceful degradation failure message: "Sorry, the GPU took too long to start. Please try again."
gpuRetry.no_push_token GPU healthy, Sessions table has no push_token answer saved normally; no push sent happy path non-fatal; user can open app and see answer via history
gpuRetry.invalid_message SQS record missing required fields logged and skipped error no re-enqueue, no exception raised

Pseudocode

retry_handler(event):
  for record in event.Records:
    body = parse(record.body)
    { chat_id, message_id, user_id, question, retry_count } = body

    if retry_count >= GPU_RETRY_MAX_ATTEMPTS (default 15):
      repo.update_message_ready(chat_id, message_id, failure_message)
      push_token = get_push_token(user_id)  // reads Sessions table
      if push_token: send_push(push_token, "Couldn't answer", failure_body)
      return

    // Resolve running GPU by tag
    running = _find_running_gpu_instance_by_tag(region)
    if not running:
      _re_enqueue(chat_id, message_id, user_id, question, retry_count)
      return

    gpu_worker_url = http://{running.PrivateIpAddress}:{GPU_WORKER_PORT}
    is_healthy, _ = wait_for_gpu_health(gpu_worker_url, GPU_RETRY_WAIT_MS=240000, initial_delay_ms=5000)

    if not is_healthy:
      _re_enqueue(chat_id, message_id, user_id, question, retry_count)
      return

    // GPU healthy: run full reasoning pipeline
    handle_chat(user_id, question, chat_id, gpu_worker_url, message_id=message_id)
    // handle_chat calls update_message_ready internally (ready=True, content=answer)

    push_token = get_push_token(user_id)
    if push_token:
      send_push(push_token, "Your answer is ready", question[:80])

Environment variables

Env Var Default Description
GPU_RETRY_WAIT_MS 240000 Max GPU health poll duration per retry attempt (ms)
GPU_RETRY_MAX_ATTEMPTS 15 Max retry attempts before writing failure message (~30 min total at 2-min intervals)
CHAT_GPU_RETRY_QUEUE_URL SQS queue URL; set by SAM template
SESSIONS_TABLE_NAME Sessions DynamoDB table holding push_token per user

Flow: pushTokenRegistration (app startup → Sessions table)

  • Test files: main/app/lib/__tests__/push-notifications.test.ts
  • Core files:
  • main/app/app/_layout.tsx
  • main/app/lib/push-notifications.ts
  • main/app/lib/api/users/pushToken.ts
  • main/server/api/users/push_token/app.py

Types

// PUT /users/push-token request body
PushTokenRequest {
  push_token: string (required, non-empty)
}

// PUT /users/push-token response
PushTokenResponse {
  success: true
}

Paths

path input output path-type notes
pushTokenRegistration.granted physical device, permission granted HTTP 200 {success: true}; Sessions table push_token set happy path token stored keyed by user_id (sessionId)
pushTokenRegistration.denied user denies permission registerForPushNotifications returns null; savePushToken not called graceful degradation no error; push notification silently skipped in retry flow
pushTokenRegistration.simulator Device.isDevice = false returns null without requesting permission graceful degradation skipped on simulators
pushTokenRegistration.no_project_id EXPO_PUBLIC_EAS_PROJECT_ID not set returns null graceful degradation logged to console
pushTokenRegistration.missing_token push_token empty string HTTP 400 InvalidInputError error validated in handle_push_token

Pseudocode

// _layout.tsx — runs after successful auth
useEffect([user]):
  if user:
    token = await registerForPushNotifications()
    if token: savePushToken(token)  // PUT /users/push-token

// push-notifications.ts
registerForPushNotifications():
  if not Device.isDevice: return null
  { status } = getPermissionsAsync()
  if status != "granted": { status } = requestPermissionsAsync()
  if status != "granted": return null
  projectId = EXPO_PUBLIC_EAS_PROJECT_ID
  if not projectId: return null
  token = Notifications.getExpoPushTokenAsync({ projectId })
  return token.data  // "ExponentPushToken[...]"

// UsersPushTokenFunction (PUT /users/push-token)
handle_push_token(payload, auth_context):
  push_token = payload.push_token.strip()
  if not push_token: raise InvalidInputError
  user_id = auth_context.user_id
  Sessions.update_item(Key={sessionId: user_id}, SET push_token=:t, updated_at=:u)
  return { success: True }

Flow: gpuAutoStart

  • Core files: main/server/api/memories/chat/app.py
  • _find_gpu_instance_by_tag()
  • _resolve_gpu_url()
  • _launch_gpu_with_type_fallback()
  • _update_ssm_gpu_instance_id()

Paths

path input output path-type notes
gpuAutoStart.running_by_ssm GPU_INSTANCE_ID env = valid instance ID, state=running http://{privateIP}:{port} happy path Private IP preferred over public for SG-to-SG rules
gpuAutoStart.running_by_tag SSM ID stale/None, running instance found by tag URL returned, SSM updated happy path Tag filter: Name=encache-gpu-worker, state=running
gpuAutoStart.stopped_restart Instance state=stopped start_instances called, URL=None (worker waits) recovery Returns None immediately; health poll waits for boot
gpuAutoStart.stopped_no_capacity start_instances → InsufficientInstanceCapacity SSM cleared to "none", launch from template fallback recovery normalize_gpu_instance_id("none") → None
gpuAutoStart.terminated_relaunch Instance state=terminated EC2 run_instances with launch template + type fallback recovery SSM updated with new instance ID
gpuAutoStart.type_fallback Primary type unavailable (InsufficientInstanceCapacity) Try next type in GPU_FALLBACK_INSTANCE_TYPES recovery Default fallback order: g6e.2xlarge,g5.2xlarge,g4dn.2xlarge
gpuAutoStart.all_exhausted All instance types fail GPU URL = None, fallback message saved error Logs chat_gpu_all_types_exhausted

Pseudocode

// GPU resolution when instance_id is set:
_resolve_gpu_url(instance_id, port, launch_template_id, region):
  state = ec2.describe_instances([instance_id]).State.Name
  if state in (stopped, stopping):
    running = _find_running_gpu_instance_by_tag()
    if running: adopt running instance, update SSM, return URL
    if state == stopped:
      try: ec2.start_instances([instance_id])
      except InsufficientInstanceCapacity:
        _update_ssm_gpu_instance_id("none")
        if launch_template_id: _launch_gpu_with_type_fallback(...)
    return None
  if state == terminated:
    running = _find_running_gpu_instance_by_tag()
    if running: adopt running instance, update SSM, return URL
    if launch_template_id: _launch_gpu_with_type_fallback(...)
    return None
  ip = instance.PrivateIpAddress or instance.PublicIpAddress
  return f"http://{ip}:{port}"

// GPU resolution when instance_id is None (stale SSM / first deploy):
  1. _find_gpu_instance_by_tag(state=running) → adopt if found
  2. _find_gpu_instance_by_tag(state=stopped) → start_instances if found
  3. _launch_gpu_with_type_fallback(launch_template_id) if still None

_launch_gpu_with_type_fallback(launch_template_id):
  for type in GPU_FALLBACK_INSTANCE_TYPES:
    try:
      ami = _gpu_ami_for_type(type)  // per-type AMI override from GPU_INSTANCE_TYPE_AMIS
      resp = ec2.run_instances(LaunchTemplate, InstanceType=type, Tags=[Name=encache-gpu-worker])
      _update_ssm_gpu_instance_id(resp.InstanceId)
      return resp.InstanceId
    except InsufficientInstanceCapacity:
      continue  // try next type
    except other ClientError:
      return None  // surface in logs, abort
  return None  // all types exhausted

Flow: gpuHealthPoll

  • Core files: main/server/api/memories/chat/app.py
  • wait_for_gpu_health()
  • _gpu_is_healthy()

Pseudocode

wait_for_gpu_health(gpu_url, max_wait_ms=15000, initial_delay_ms=2000, lambda_context):
  start = time.time()
  retry_delay_ms = initial_delay_ms
  retry_count = 0
  lambda_timeout_guard_ms = 10000  // reserve 10s for response assembly

  loop:
    is_healthy = GET {gpu_url}/health (timeout=3s) == 200
    if is_healthy: return (True, elapsed_ms)
    elapsed_ms = (time.time() - start) * 1000
    if elapsed_ms >= max_wait_ms: return (False, elapsed_ms)  // max-wait exhausted
    if lambda_remaining_ms - elapsed_ms <= 10000: return (False, elapsed_ms)  // Lambda timeout guard
    sleep(retry_delay_ms / 1000)
    retry_delay_ms = min(retry_delay_ms * 2, 60000)  // exponential backoff, cap 60s

// GPU worker /health endpoint (server.py):
GET /health:
  if _embed_model is None or _caption_model is None:
    return HTTP 503 (models still loading)
  return { status: "ready", model: "VLM2Vec-2B+Qwen2-VL-7B", device: "cuda:0" }

Flow: getMessages (history load + post-answer refresh)

  • Test files: main/app/__tests__/get-chat-messages.test.ts
  • Core files:
  • main/app/lib/api/chats/getChatMessages.ts
  • main/server/api/chats/messages/app.py

Paths

path input output path-type notes
getMessages.load_history {chat_id} on screen open {messages[], next_cursor} happy path Called once on mount when opening an existing chat
getMessages.paginated {chat_id, cursor} {messages[], next_cursor} happy path Messages sorted descending (newest first); reversed in UI
getMessages.forbidden chat_id not owned by user 403 error ChatRepository.get_messages checks ownership
getMessages.missing_chat_id {} 400 ValueError error Validated in implementation()

Pseudocode

// Backend: ChatMessagesFunction
implementation(payload, auth_context):
  validate user_id, chat_id
  repo.get_messages(user_id, chat_id, cursor, limit=20)

// ChatRepository.get_messages:
  verify chat_owned_by(user_id, chat_id) else Forbidden
  DynamoDB.query(
    KeyConditionExpression: "chat_id = :cid",
    ScanIndexForward: False,   // newest first
    Limit: limit,
    ExclusiveStartKey: cursor  // if paginating
  )
  return (items, LastEvaluatedKey)

// Frontend: chat.tsx history load
useEffect (on mount, if params.chatId):
  { messages } = await getChatMessages({ chat_id: params.chatId })
  setMessages([...messages].reverse())  // reverse to chronological order

Flow: ReasoningAgent (multi-step retrieval loop)

  • Core files: main/server/worldmm/retrieval/agent.py

Pseudocode

ReasoningAgent.answer(question):
  rounds = [], accumulated_context = [], trace = []
  for round in range(MAX_ROUNDS=5):
    messages = _build_reasoning_messages(question, rounds)
    raw = reasoning_llm.call(model=gpu, json_object)
    action = parse_agent_action(raw)  // normalizes {decision/action} field
    if action.action == "ANSWER": break
    if action.action == "SEARCH":
      results = retrievers[action.memory_type](action.query)
      // retrievers: "episodic", "semantic", "visual"
      rounds.append({ memory_type, query, results })
      accumulated_context.extend(results)
      trace.append({ round, decision: SEARCH, memory_type, agent_query, results, result_count })
  response = _generate_response(question, accumulated_context)
  return response, trace

Logs

Source Location
Dispatcher start/invoke CloudWatch: /aws/lambda/server-MemoriesChatFunction-* step chat_dispatcher_start, chat_dispatcher_async_invoke_sent
GPU resolve / EC2 state step gpu_instance_state_issue, gpu_instance_adopted_by_tag, gpu_instance_starting_stopped, gpu_instance_relaunching
GPU health polling step gpu_retry_healthy, gpu_retry_exhausted_max_wait, gpu_retry_lambda_timeout_approaching
GPU launch step chat_gpu_instance_launched, chat_gpu_capacity_unavailable, chat_gpu_all_types_exhausted
SSM update step ssm_gpu_instance_id_updated, ssm_gpu_instance_id_update_failed
Worker path step chat_impl_start, chat_gpu_resolved, chat_gpu_retry_failed
GPU retry enqueue (worker) step gpu_retry_enqueued
Answer complete log_event("chat", "agent_answer_complete", ...)
GPU retry Lambda CloudWatch: /aws/lambda/server-ChatGpuRetryFunction-* step gpu_retry_handler_invoked, gpu_retry_reenqueued, gpu_retry_not_available_reenqueued, gpu_retry_health_check_failed_reenqueued, gpu_retry_success, gpu_retry_max_attempts_exceeded
Push notification step push_notification_sent, push_notification_send_failed, push_token_retrieved, push_token_not_found
Push token store CloudWatch: /aws/lambda/server-UsersPushTokenFunction-* step push_token_stored, push_token_store_failed

Deployment

  • Mechanism: SAM
  • Deploy command:
    cd main/server
    sam build
    sam deploy
    
  • Notes:
  • Stack: server, region: us-east-1
  • MemoriesChatFunction: Python 3.12, Timeout 60s, MemorySize 2048 MB, in VPC
  • ChatMessagesFunction: Python 3.12, default 30s Timeout
  • ChatsListFunction: Python 3.12, default 30s Timeout
  • ChatGpuRetryFunction: Python 3.12, Timeout 300s, MemorySize 2048 MB, in VPC; triggered by ChatGpuRetryQueue SQS events, BatchSize 1
  • UsersPushTokenFunction: Python 3.12, default Timeout; PUT /users/push-token API Gateway route
  • GPU_MAX_WAIT_MS=15000 is hardcoded in template.yaml (not overridable via SSM). This caps GPU health polling to keep the total Lambda duration well under the API Gateway REST API 29s hard timeout. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md.
  • GPU_RETRY_WAIT_MS=240000 for ChatGpuRetryFunction (4 min per retry attempt; SQS visibility timeout 300s prevents double-processing)
  • GPU config resolved from SSM at deploy time: /encache/gpu/instance_id, /encache/gpu/worker_port, /encache/gpu/launch_template_id, /encache/gpu/instance_type_amis
  • DynamoDB tables: encache-chats (key: user_id + chat_id, GSI: user_id-last_activity-index), encache-chat-messages (key: chat_id + message_id)
  • Sessions DynamoDB table stores push_token keyed by user_id (sessionId); updated by UsersPushTokenFunction, read by ChatGpuRetryFunction
  • MemoriesChatFunction has lambda:InvokeFunction permission scoped to itself (for async self-invocation) and sqs:SendMessage permission on ChatGpuRetryQueue
  • ChatGpuRetryQueue: SQS FIFO queue named encache-chat-gpu-retry, VisibilityTimeout 300s, MessageRetentionPeriod 3600s, DLQ (ChatGpuRetryDLQ) with maxReceiveCount 3

Key Files

File Role
main/app/app/chat.tsx Chat screen UI — message list, InputDock, polling, GpuUnavailableError handling
main/app/app/_layout.tsx Registers Expo push token on auth and calls savePushToken
main/app/lib/push-notifications.ts registerForPushNotifications() — requests permission, returns Expo push token
main/app/lib/api/users/pushToken.ts savePushToken(token) — PUT /users/push-token API client
main/app/lib/api/memory/chatWithMemory.ts POST /memories/chat API call (60s client timeout)
main/app/lib/api/chats/getChatMessages.ts POST /chats/messages API call (history load + polling filter)
main/app/lib/api/chats/pollForMessage.ts Polls GET /chats/messages for ready=true on a specific message_id
main/app/lib/api/chats/fetchChats.ts POST /chats/list API call (side-menu conversation list)
main/app/lib/api/chats/useChatsApi.ts React Query hooks: useChatsFeed, useChatMessages, useChatWithMemory
main/server/api/memories/chat/app.py Lambda: dispatcher + async worker + GPU resolution + health polling + _enqueue_gpu_retry
main/server/api/memories/chat/retry.py ChatGpuRetryFunction — SQS-driven GPU retry, answer save, Expo push notification
main/server/api/users/push_token/app.py UsersPushTokenFunction — PUT /users/push-token, writes Sessions table
main/server/api/chats/messages/app.py Lambda: GET messages for a chat
main/server/api/chats/list/app.py Lambda: list user's conversations
main/server/layers/shared/python/shared/chat/repository.py ChatRepositorymark_gpu_pending, update_message_ready, and other DynamoDB CRUD
main/server/layers/shared/python/shared/gpu_utils.py normalize_gpu_instance_id() — null sentinel handling
main/server/worldmm/retrieval/agent.py ReasoningAgent — multi-step search/answer loop
main/server/worldmm/gpu_worker/server.py FastAPI GPU worker — /health, /encode-video, /encode-text, /caption

Architecture Notes

Async Dispatcher Pattern

The chat flow uses an async self-invocation pattern to decouple the synchronous API Gateway response from GPU warm-up time:

  1. The dispatcher Lambda runs on the synchronous HTTP request path. It creates/validates the chat, saves a placeholder assistant message (ready=false), fires an async Lambda invocation (InvocationType=Event), saves the user message to DynamoDB, and returns {chat_id, message_id, status: "thinking"} — all within ~1-2 seconds, well under the API Gateway 29s hard timeout.
  2. The async worker Lambda runs independently. It performs all GPU resolution, health polling, ReasoningAgent execution, and saves the assistant answer to DynamoDB with ready=true.
  3. The frontend polls POST /chats/messages every 500ms for the specific message_id until ready=true. pollForMessage.ts handles this loop with a 60-second timeout.

GPU Retry via SQS

When the async worker exhausts GPU_MAX_WAIT_MS (15s) without a healthy GPU response, instead of saving a failure string it:

  1. Calls repo.mark_gpu_pending(chat_id, message_id) — sets gpu_pending=True, keeps ready=False, sets content="".
  2. Enqueues a message to ChatGpuRetryQueue with retry_count=0 and DelaySeconds=120.
  3. Returns a gpu_retry_enqueued status — chatWithMemory.ts raises GpuUnavailableError on this response.

ChatGpuRetryFunction (triggered by SQS) retries every 2 minutes, up to GPU_RETRY_MAX_ATTEMPTS=15 (≈ 30 minutes total). When the GPU becomes healthy, it runs the full reasoning pipeline and writes ready=true — the frontend polling loop picks this up on the next poll cycle. A push notification is sent via Expo Push API so the user knows the answer is ready even if they have left the app.

The SQS visibility timeout (300s) exceeds GPU_RETRY_WAIT_MS (240s) to prevent double-processing of a single retry attempt.

Push Token Registration

On every app startup after successful auth, _layout.tsx calls registerForPushNotifications() (which requests Expo permission) and then savePushToken(token) (which calls PUT /users/push-token). The token is stored in the Sessions DynamoDB table keyed by user_id and read by ChatGpuRetryFunction when sending notifications. If the user has not granted permission or the device is a simulator, registration is silently skipped and push notifications are omitted from the retry flow.

GPU Instance Lifecycle

The GPU worker auto-shuts down after 1 hour of idle via a watchdog thread in server.py (IDLE_TIMEOUT_S=3600, checked every 30s). The watchdog does NOT update SSM when shutting down, which creates a stale-ID cycle. The tag-based fallback in _resolve_gpu_url and implementation() breaks this cycle by scanning for Name=encache-gpu-worker instances when the SSM ID is stale.

Private IP is always preferred over public IP because both the Lambda and GPU worker run in the same VPC. Using private IP ensures security-group-to-security-group rules apply correctly; public IP routes through NAT and loses SG identity.

API Gateway Timeout Constraint

API Gateway REST API has a hard 29-second maximum integration timeout. The Lambda Timeout is 60s, but any Lambda response arriving after 29s is discarded by API Gateway (client sees 504). GPU_MAX_WAIT_MS=15000 is set to keep the dispatcher Lambda within the 29s window even accounting for exponential backoff overshoot. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md for full diagnosis.