Chat Flow
Metadata
- System type:
flow
System Intent
- What this is: The end-to-end flow for a user asking a natural-language question about their memories. Covers the React Native chat screen, the synchronous dispatcher Lambda, the async worker Lambda, GPU auto-start, and the GET /chats/messages polling path.
Mermaid Diagram
flowchart TD
ChatScreen["ChatScreen (chat.tsx)"] -->|"POST /memories/chat {question, chat_id?}"| APIGW["API Gateway REST (Cognito auth)"]
APIGW -->|"invoke sync"| DispatcherLambda["MemoriesChatFunction — dispatcher path"]
DispatcherLambda -->|"create chat / validate ownership"| DDBChats["DynamoDB: Chats table"]
DispatcherLambda -->|"lambda:InvokeFunction InvocationType=Event"| WorkerLambda["MemoriesChatFunction — async worker path"]
DispatcherLambda -->|"save user message + placeholder ready=false"| DDBMessages["DynamoDB: Messages table"]
DispatcherLambda -->|"{chat_id, message_id, status=thinking}"| APIGW
APIGW -->|"200 {chat_id, message_id, status=thinking}"| ChatScreen
ChatScreen -->|"poll pollForMessage every 500ms"| GetMsgs["ChatMessagesFunction"]
GetMsgs -->|"DynamoDB query"| DDBMessages
GetMsgs -->|"{messages, next_cursor}"| ChatScreen
WorkerLambda -->|"ec2:DescribeInstances"| EC2["EC2: GPU worker instance"]
WorkerLambda -->|"GET /health (exponential backoff)"| GPUWorker["GPU Worker FastAPI\nport 8000"]
WorkerLambda -->|"GPU healthy: ReasoningAgent.answer()"| GPUWorker
WorkerLambda -->|"GPU healthy: save answer ready=true"| DDBMessages
WorkerLambda -->|"GPU timeout: mark_gpu_pending + enqueue retry_count=0"| SQS["ChatGpuRetryQueue (SQS)"]
WorkerLambda -->|"touch chat last_activity"| DDBChats
SQS -->|"trigger every 2 min"| RetryLambda["ChatGpuRetryFunction"]
RetryLambda -->|"GET /health"| GPUWorker
RetryLambda -->|"GPU healthy: save answer ready=true"| DDBMessages
RetryLambda -->|"push notification"| Expo["Expo Push API"]
RetryLambda -->|"GPU still down + retries < 15: re-enqueue"| SQS
RetryLambda -->|"max retries: save failure ready=true + push"| DDBMessages
ChatScreen -->|"ready=true: render assistant bubble"| UI["User sees answer"]
AppLayout["_layout.tsx (on auth)"] -->|"PUT /users/push-token"| PushTokenFn["UsersPushTokenFunction"]
PushTokenFn -->|"SET push_token"| Sessions["Sessions DynamoDB"]
RetryLambda -->|"GET push_token"| Sessions Flows
Flow: sendMessage (frontend → dispatcher → async worker → poll)
- Test files:
main/app/__tests__/chat-with-memory.test.tsmain/server/tests/unit/test_async_chat_dispatcher.pymain/server/worldmm/tests/test_chat_endpoint.pymain/server/api/memories/chat/__tests__/test_worker_timeout.py- Core files:
main/app/app/chat.tsx(ChatScreen)main/app/lib/api/memory/chatWithMemory.tsmain/app/lib/api/chats/getChatMessages.tsmain/app/lib/api/chats/pollForMessage.tsmain/server/api/memories/chat/app.py(dispatcher + worker Lambda)main/server/layers/shared/python/shared/chat/repository.py
Types
// Frontend request (POST /memories/chat)
ChatRequest {
question: string (required)
chat_id: string (optional — omitted for new conversations)
verbose: boolean (optional — returns trace when true)
}
// Dispatcher response (synchronous, returned within ~1-2s)
DispatcherResponse {
chat_id: string
message_id: string // placeholder message ID for polling
status: "thinking"
}
// ChatMessage — returned by ChatMessagesFunction and pollForMessage
ChatMessage {
message_id: string (ULID)
chat_id: string
role: "user" | "assistant"
content: string
ready: boolean // false while GPU is processing; true when answer is saved
gpu_pending: boolean // true when message is queued for GPU retry
created_at: string (ISO 8601)
}
GetChatMessagesRequest {
chat_id: string (required)
message_id: string (optional — filter to a single message for polling)
cursor: string | null (optional — DynamoDB ExclusiveStartKey)
limit: number (optional, default 20)
}
GetChatMessagesResponse {
messages: ChatMessage[]
next_cursor: string | null
}
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
sendMessage.new_chat | {question} no chat_id | {chat_id, status: "thinking"} | happy path | New chat created in Chats table with question as title |
sendMessage.existing_chat | {question, chat_id} | {chat_id, status: "thinking"} | happy path | Ownership verified; 403 if chat belongs to another user |
sendMessage.verbose | {question, verbose: true} | answer + trace object | happy path | Worker returns full ReasoningAgent trace rounds |
sendMessage.gpu_timeout | GPU cold-starting, GPU_MAX_WAIT_MS exhausted | mark_gpu_pending called; SQS retry enqueued; GpuUnavailableError thrown to frontend | graceful degradation | Frontend shows "Starting up the GPU, we will notify you when we have an answer"; polling continues but message stays ready=false until retry Lambda resolves |
sendMessage.gpu_unreachable | GPU terminated, no template | fallback message saved | graceful degradation | All EC2 recovery paths exhausted; legacy sync path only (no message_id) |
sendMessage.missing_question | {} | 400 ValueError | error | Dispatcher raises ValueError before any DB writes |
sendMessage.forbidden | {chat_id} owned by other user | 403 Forbidden | error | Ownership check in ChatRepository |
sendMessage.async_invoke_fail | Lambda invoke fails | 500 re-raised | error | User message NOT saved if invoke fails (prevents orphaned messages) |
Pseudocode
// === FRONTEND (chat.tsx) ===
askQuestion(question):
append user message bubble
setIsLoading(true)
{ chat_id, message_id, status, answer, trace } = await chatWithMemory(question, chatId, isDebug)
setChatId(chat_id)
if status === "thinking" and message_id:
// Async path: dispatcher returned handle; poll for ready=true
append placeholder assistant bubble (id=message_id, content="")
readyMsg = await pollForMessage({ chat_id, message_id }) // 500ms interval, 60s max
replace placeholder bubble content with readyMsg.content
invalidate ["chats", "feed"]
else if answer is defined:
// Legacy/verbose path: answer returned directly
append assistant bubble with answer (+ trace if present)
invalidate ["chats", "feed"]
catch GpuUnavailableError:
// GPU timed out and retry was enqueued backend-side
append assistant bubble: "Starting up the GPU, we will notify you when we have an answer"
// push notification will arrive when ChatGpuRetryFunction resolves
catch PollTimeoutError:
append assistant bubble: "Sorry, I couldn't process that. Please try again."
// chatWithMemory.ts
POST /memories/chat { question, chat_id?, verbose? }
timeout: 60_000 ms (client-side axios timeout)
return { chat_id, message_id, status?, answer?, trace? }
// pollForMessage.ts — polls POST /chats/messages?message_id=<id>
pollForMessage({ chat_id, message_id, intervalMs=500, maxWaitMs=60000 }):
deadline = now + maxWaitMs
loop:
{ messages } = getChatMessages({ chat_id, message_id })
msg = messages[0]
if msg.ready === true: return msg
if now >= deadline: throw PollTimeoutError
sleep(intervalMs)
// NOTE: if the GPU retry Lambda resolves the message after PollTimeoutError,
// the push notification will alert the user and they can re-open the chat
// to see the answer loaded via the history fetch.
// === BACKEND DISPATCHER (lambda_handler → _dispatch_async) ===
lambda_handler(event):
if event._async_worker:
→ run worker path (see below)
else:
→ run _dispatch_async() (dispatcher path)
_dispatch_async(payload, auth_context):
validate question != "" else raise ValueError
validate user_id in auth_context else raise ValueError
repo = ChatRepository(MESSAGES_TABLE_NAME, CHATS_TABLE_NAME)
if chat_id is None:
chat_id = repo.create_chat(user_id, title=question[:120])
else:
verify repo.chat_owned_by(user_id, chat_id) else raise Forbidden
async_payload = { ...payload, _async_worker: True, _user_id: user_id, chat_id }
lambda.invoke(FunctionName=self, InvocationType="Event", Payload=async_payload)
repo.save_message(chat_id, "user", question) // saved AFTER invoke succeeds
return { chat_id, status: "thinking" }
// === ASYNC WORKER PATH ===
lambda_handler(event) when event._async_worker == True:
auth = { user_id: event._user_id }
result = implementation(payload, auth, _skip_user_message_save=True)
return { statusCode: 200, body: result }
implementation(payload, auth, _skip_user_message_save=False):
// 1. DB init
configure PostgreSQL from SSM secrets
orm.initDb()
// 2. DynamoDB setup
repo = ChatRepository(...)
if not _skip_user_message_save:
repo.save_message(chat_id, "user", question)
// 3. GPU resolution
gpu_instance_id = normalize(GPU_INSTANCE_ID env var)
if not gpu_instance_id:
→ find running by tag
→ find stopped by tag and start_instances
→ launch from template with type fallback
else:
gpu_worker_url = _resolve_gpu_url(instance_id, port)
// 4. GPU health polling
if gpu_worker_url:
is_healthy, elapsed = wait_for_gpu_health(url, GPU_MAX_WAIT_MS=15000, initial=2000)
if not is_healthy: gpu_worker_url = None
// 5. Fallback
if not gpu_worker_url:
if message_id: // async dispatcher path
repo.mark_gpu_pending(chat_id, message_id) // sets gpu_pending=True, ready=False, content=""
_enqueue_gpu_retry(chat_id, message_id, user_id, question, retry_count=0, delay_seconds=120)
return { status: "gpu_retry_enqueued", chat_id, message_id }
// chatWithMemory.ts raises GpuUnavailableError on this response
else: // legacy synchronous path (no message_id)
repo.save_message(chat_id, "assistant", "GPU starting up…")
repo.touch_chat(user_id, chat_id)
return fallback
// 6. Answer
result = handle_chat(user_id, question, chat_id, gpu_worker_url)
return result
handle_chat(user_id, question, chat_id, gpu_worker_url):
vlm2vec = VLM2VecClient(gpu_worker_url)
reasoning_llm = GPULLMClient(vlm2vec)
embedder = TextEmbedder(vlm2vec)
episodic_graphs = load_episodic_graphs(user_id)
semantic_graph = load_semantic_graph(user_id)
agent = ReasoningAgent(reasoning_llm, response_llm, { episodic, semantic, visual })
answer, trace = agent.answer(question)
repo.save_message(chat_id, "assistant", answer)
repo.touch_chat(user_id, chat_id)
return { answer, chat_id, trace? }
Flow: gpuRetry (ChatGpuRetryFunction — SQS-driven retry)
- Test files:
main/server/api/memories/chat/__tests__/test_gpu_retry.py - Core files:
main/server/api/memories/chat/retry.pymain/server/layers/shared/python/shared/chat/repository.py
Types
// SQS message body (ChatGpuRetryQueue)
GpuRetryRecord {
chat_id: string
message_id: string
user_id: string
question: string
retry_count: number // 0-based; compared against GPU_RETRY_MAX_ATTEMPTS (default 15)
}
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
gpuRetry.gpu_healthy | SQS record, GPU healthy within GPU_RETRY_WAIT_MS | update_message_ready called; Expo push notification sent | happy path | push skipped if no push_token in Sessions table |
gpuRetry.gpu_unhealthy_reenqueue | SQS record, GPU unavailable, retry_count < 15 | re-enqueued with retry_count+1, DelaySeconds=120 | graceful degradation | SQS visibility timeout 300s prevents double-processing |
gpuRetry.max_retries_exceeded | SQS record, retry_count >= 15 | failure string saved ready=true; failure push notification sent | graceful degradation | failure message: "Sorry, the GPU took too long to start. Please try again." |
gpuRetry.no_push_token | GPU healthy, Sessions table has no push_token | answer saved normally; no push sent | happy path | non-fatal; user can open app and see answer via history |
gpuRetry.invalid_message | SQS record missing required fields | logged and skipped | error | no re-enqueue, no exception raised |
Pseudocode
retry_handler(event):
for record in event.Records:
body = parse(record.body)
{ chat_id, message_id, user_id, question, retry_count } = body
if retry_count >= GPU_RETRY_MAX_ATTEMPTS (default 15):
repo.update_message_ready(chat_id, message_id, failure_message)
push_token = get_push_token(user_id) // reads Sessions table
if push_token: send_push(push_token, "Couldn't answer", failure_body)
return
// Resolve running GPU by tag
running = _find_running_gpu_instance_by_tag(region)
if not running:
_re_enqueue(chat_id, message_id, user_id, question, retry_count)
return
gpu_worker_url = http://{running.PrivateIpAddress}:{GPU_WORKER_PORT}
is_healthy, _ = wait_for_gpu_health(gpu_worker_url, GPU_RETRY_WAIT_MS=240000, initial_delay_ms=5000)
if not is_healthy:
_re_enqueue(chat_id, message_id, user_id, question, retry_count)
return
// GPU healthy: run full reasoning pipeline
handle_chat(user_id, question, chat_id, gpu_worker_url, message_id=message_id)
// handle_chat calls update_message_ready internally (ready=True, content=answer)
push_token = get_push_token(user_id)
if push_token:
send_push(push_token, "Your answer is ready", question[:80])
Environment variables
| Env Var | Default | Description |
|---|---|---|
GPU_RETRY_WAIT_MS | 240000 | Max GPU health poll duration per retry attempt (ms) |
GPU_RETRY_MAX_ATTEMPTS | 15 | Max retry attempts before writing failure message (~30 min total at 2-min intervals) |
CHAT_GPU_RETRY_QUEUE_URL | — | SQS queue URL; set by SAM template |
SESSIONS_TABLE_NAME | Sessions | DynamoDB table holding push_token per user |
Flow: pushTokenRegistration (app startup → Sessions table)
- Test files:
main/app/lib/__tests__/push-notifications.test.ts - Core files:
main/app/app/_layout.tsxmain/app/lib/push-notifications.tsmain/app/lib/api/users/pushToken.tsmain/server/api/users/push_token/app.py
Types
// PUT /users/push-token request body
PushTokenRequest {
push_token: string (required, non-empty)
}
// PUT /users/push-token response
PushTokenResponse {
success: true
}
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
pushTokenRegistration.granted | physical device, permission granted | HTTP 200 {success: true}; Sessions table push_token set | happy path | token stored keyed by user_id (sessionId) |
pushTokenRegistration.denied | user denies permission | registerForPushNotifications returns null; savePushToken not called | graceful degradation | no error; push notification silently skipped in retry flow |
pushTokenRegistration.simulator | Device.isDevice = false | returns null without requesting permission | graceful degradation | skipped on simulators |
pushTokenRegistration.no_project_id | EXPO_PUBLIC_EAS_PROJECT_ID not set | returns null | graceful degradation | logged to console |
pushTokenRegistration.missing_token | push_token empty string | HTTP 400 InvalidInputError | error | validated in handle_push_token |
Pseudocode
// _layout.tsx — runs after successful auth
useEffect([user]):
if user:
token = await registerForPushNotifications()
if token: savePushToken(token) // PUT /users/push-token
// push-notifications.ts
registerForPushNotifications():
if not Device.isDevice: return null
{ status } = getPermissionsAsync()
if status != "granted": { status } = requestPermissionsAsync()
if status != "granted": return null
projectId = EXPO_PUBLIC_EAS_PROJECT_ID
if not projectId: return null
token = Notifications.getExpoPushTokenAsync({ projectId })
return token.data // "ExponentPushToken[...]"
// UsersPushTokenFunction (PUT /users/push-token)
handle_push_token(payload, auth_context):
push_token = payload.push_token.strip()
if not push_token: raise InvalidInputError
user_id = auth_context.user_id
Sessions.update_item(Key={sessionId: user_id}, SET push_token=:t, updated_at=:u)
return { success: True }
Flow: gpuAutoStart
- Core files:
main/server/api/memories/chat/app.py _find_gpu_instance_by_tag()_resolve_gpu_url()_launch_gpu_with_type_fallback()_update_ssm_gpu_instance_id()
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
gpuAutoStart.running_by_ssm | GPU_INSTANCE_ID env = valid instance ID, state=running | http://{privateIP}:{port} | happy path | Private IP preferred over public for SG-to-SG rules |
gpuAutoStart.running_by_tag | SSM ID stale/None, running instance found by tag | URL returned, SSM updated | happy path | Tag filter: Name=encache-gpu-worker, state=running |
gpuAutoStart.stopped_restart | Instance state=stopped | start_instances called, URL=None (worker waits) | recovery | Returns None immediately; health poll waits for boot |
gpuAutoStart.stopped_no_capacity | start_instances → InsufficientInstanceCapacity | SSM cleared to "none", launch from template fallback | recovery | normalize_gpu_instance_id("none") → None |
gpuAutoStart.terminated_relaunch | Instance state=terminated | EC2 run_instances with launch template + type fallback | recovery | SSM updated with new instance ID |
gpuAutoStart.type_fallback | Primary type unavailable (InsufficientInstanceCapacity) | Try next type in GPU_FALLBACK_INSTANCE_TYPES | recovery | Default fallback order: g6e.2xlarge,g5.2xlarge,g4dn.2xlarge |
gpuAutoStart.all_exhausted | All instance types fail | GPU URL = None, fallback message saved | error | Logs chat_gpu_all_types_exhausted |
Pseudocode
// GPU resolution when instance_id is set:
_resolve_gpu_url(instance_id, port, launch_template_id, region):
state = ec2.describe_instances([instance_id]).State.Name
if state in (stopped, stopping):
running = _find_running_gpu_instance_by_tag()
if running: adopt running instance, update SSM, return URL
if state == stopped:
try: ec2.start_instances([instance_id])
except InsufficientInstanceCapacity:
_update_ssm_gpu_instance_id("none")
if launch_template_id: _launch_gpu_with_type_fallback(...)
return None
if state == terminated:
running = _find_running_gpu_instance_by_tag()
if running: adopt running instance, update SSM, return URL
if launch_template_id: _launch_gpu_with_type_fallback(...)
return None
ip = instance.PrivateIpAddress or instance.PublicIpAddress
return f"http://{ip}:{port}"
// GPU resolution when instance_id is None (stale SSM / first deploy):
1. _find_gpu_instance_by_tag(state=running) → adopt if found
2. _find_gpu_instance_by_tag(state=stopped) → start_instances if found
3. _launch_gpu_with_type_fallback(launch_template_id) if still None
_launch_gpu_with_type_fallback(launch_template_id):
for type in GPU_FALLBACK_INSTANCE_TYPES:
try:
ami = _gpu_ami_for_type(type) // per-type AMI override from GPU_INSTANCE_TYPE_AMIS
resp = ec2.run_instances(LaunchTemplate, InstanceType=type, Tags=[Name=encache-gpu-worker])
_update_ssm_gpu_instance_id(resp.InstanceId)
return resp.InstanceId
except InsufficientInstanceCapacity:
continue // try next type
except other ClientError:
return None // surface in logs, abort
return None // all types exhausted
Flow: gpuHealthPoll
- Core files:
main/server/api/memories/chat/app.py wait_for_gpu_health()_gpu_is_healthy()
Pseudocode
wait_for_gpu_health(gpu_url, max_wait_ms=15000, initial_delay_ms=2000, lambda_context):
start = time.time()
retry_delay_ms = initial_delay_ms
retry_count = 0
lambda_timeout_guard_ms = 10000 // reserve 10s for response assembly
loop:
is_healthy = GET {gpu_url}/health (timeout=3s) == 200
if is_healthy: return (True, elapsed_ms)
elapsed_ms = (time.time() - start) * 1000
if elapsed_ms >= max_wait_ms: return (False, elapsed_ms) // max-wait exhausted
if lambda_remaining_ms - elapsed_ms <= 10000: return (False, elapsed_ms) // Lambda timeout guard
sleep(retry_delay_ms / 1000)
retry_delay_ms = min(retry_delay_ms * 2, 60000) // exponential backoff, cap 60s
// GPU worker /health endpoint (server.py):
GET /health:
if _embed_model is None or _caption_model is None:
return HTTP 503 (models still loading)
return { status: "ready", model: "VLM2Vec-2B+Qwen2-VL-7B", device: "cuda:0" }
Flow: getMessages (history load + post-answer refresh)
- Test files:
main/app/__tests__/get-chat-messages.test.ts - Core files:
main/app/lib/api/chats/getChatMessages.tsmain/server/api/chats/messages/app.py
Paths
| path | input | output | path-type | notes |
|---|---|---|---|---|
getMessages.load_history | {chat_id} on screen open | {messages[], next_cursor} | happy path | Called once on mount when opening an existing chat |
getMessages.paginated | {chat_id, cursor} | {messages[], next_cursor} | happy path | Messages sorted descending (newest first); reversed in UI |
getMessages.forbidden | chat_id not owned by user | 403 | error | ChatRepository.get_messages checks ownership |
getMessages.missing_chat_id | {} | 400 ValueError | error | Validated in implementation() |
Pseudocode
// Backend: ChatMessagesFunction
implementation(payload, auth_context):
validate user_id, chat_id
repo.get_messages(user_id, chat_id, cursor, limit=20)
// ChatRepository.get_messages:
verify chat_owned_by(user_id, chat_id) else Forbidden
DynamoDB.query(
KeyConditionExpression: "chat_id = :cid",
ScanIndexForward: False, // newest first
Limit: limit,
ExclusiveStartKey: cursor // if paginating
)
return (items, LastEvaluatedKey)
// Frontend: chat.tsx history load
useEffect (on mount, if params.chatId):
{ messages } = await getChatMessages({ chat_id: params.chatId })
setMessages([...messages].reverse()) // reverse to chronological order
Flow: ReasoningAgent (multi-step retrieval loop)
- Core files:
main/server/worldmm/retrieval/agent.py
Pseudocode
ReasoningAgent.answer(question):
rounds = [], accumulated_context = [], trace = []
for round in range(MAX_ROUNDS=5):
messages = _build_reasoning_messages(question, rounds)
raw = reasoning_llm.call(model=gpu, json_object)
action = parse_agent_action(raw) // normalizes {decision/action} field
if action.action == "ANSWER": break
if action.action == "SEARCH":
results = retrievers[action.memory_type](action.query)
// retrievers: "episodic", "semantic", "visual"
rounds.append({ memory_type, query, results })
accumulated_context.extend(results)
trace.append({ round, decision: SEARCH, memory_type, agent_query, results, result_count })
response = _generate_response(question, accumulated_context)
return response, trace
Logs
| Source | Location |
|---|---|
| Dispatcher start/invoke | CloudWatch: /aws/lambda/server-MemoriesChatFunction-* step chat_dispatcher_start, chat_dispatcher_async_invoke_sent |
| GPU resolve / EC2 state | step gpu_instance_state_issue, gpu_instance_adopted_by_tag, gpu_instance_starting_stopped, gpu_instance_relaunching |
| GPU health polling | step gpu_retry_healthy, gpu_retry_exhausted_max_wait, gpu_retry_lambda_timeout_approaching |
| GPU launch | step chat_gpu_instance_launched, chat_gpu_capacity_unavailable, chat_gpu_all_types_exhausted |
| SSM update | step ssm_gpu_instance_id_updated, ssm_gpu_instance_id_update_failed |
| Worker path | step chat_impl_start, chat_gpu_resolved, chat_gpu_retry_failed |
| GPU retry enqueue (worker) | step gpu_retry_enqueued |
| Answer complete | log_event("chat", "agent_answer_complete", ...) |
| GPU retry Lambda | CloudWatch: /aws/lambda/server-ChatGpuRetryFunction-* step gpu_retry_handler_invoked, gpu_retry_reenqueued, gpu_retry_not_available_reenqueued, gpu_retry_health_check_failed_reenqueued, gpu_retry_success, gpu_retry_max_attempts_exceeded |
| Push notification | step push_notification_sent, push_notification_send_failed, push_token_retrieved, push_token_not_found |
| Push token store | CloudWatch: /aws/lambda/server-UsersPushTokenFunction-* step push_token_stored, push_token_store_failed |
Deployment
- Mechanism:
SAM - Deploy command:
- Notes:
- Stack:
server, region:us-east-1 MemoriesChatFunction: Python 3.12, Timeout 60s, MemorySize 2048 MB, in VPCChatMessagesFunction: Python 3.12, default 30s TimeoutChatsListFunction: Python 3.12, default 30s TimeoutChatGpuRetryFunction: Python 3.12, Timeout 300s, MemorySize 2048 MB, in VPC; triggered byChatGpuRetryQueueSQS events, BatchSize 1UsersPushTokenFunction: Python 3.12, default Timeout;PUT /users/push-tokenAPI Gateway routeGPU_MAX_WAIT_MS=15000is hardcoded in template.yaml (not overridable via SSM). This caps GPU health polling to keep the total Lambda duration well under the API Gateway REST API 29s hard timeout. Seedocs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md.GPU_RETRY_WAIT_MS=240000forChatGpuRetryFunction(4 min per retry attempt; SQS visibility timeout 300s prevents double-processing)- GPU config resolved from SSM at deploy time:
/encache/gpu/instance_id,/encache/gpu/worker_port,/encache/gpu/launch_template_id,/encache/gpu/instance_type_amis - DynamoDB tables:
encache-chats(key: user_id + chat_id, GSI: user_id-last_activity-index),encache-chat-messages(key: chat_id + message_id) - Sessions DynamoDB table stores
push_tokenkeyed byuser_id(sessionId); updated byUsersPushTokenFunction, read byChatGpuRetryFunction MemoriesChatFunctionhaslambda:InvokeFunctionpermission scoped to itself (for async self-invocation) andsqs:SendMessagepermission onChatGpuRetryQueueChatGpuRetryQueue: SQS FIFO queue namedencache-chat-gpu-retry, VisibilityTimeout 300s, MessageRetentionPeriod 3600s, DLQ (ChatGpuRetryDLQ) with maxReceiveCount 3
Key Files
| File | Role |
|---|---|
main/app/app/chat.tsx | Chat screen UI — message list, InputDock, polling, GpuUnavailableError handling |
main/app/app/_layout.tsx | Registers Expo push token on auth and calls savePushToken |
main/app/lib/push-notifications.ts | registerForPushNotifications() — requests permission, returns Expo push token |
main/app/lib/api/users/pushToken.ts | savePushToken(token) — PUT /users/push-token API client |
main/app/lib/api/memory/chatWithMemory.ts | POST /memories/chat API call (60s client timeout) |
main/app/lib/api/chats/getChatMessages.ts | POST /chats/messages API call (history load + polling filter) |
main/app/lib/api/chats/pollForMessage.ts | Polls GET /chats/messages for ready=true on a specific message_id |
main/app/lib/api/chats/fetchChats.ts | POST /chats/list API call (side-menu conversation list) |
main/app/lib/api/chats/useChatsApi.ts | React Query hooks: useChatsFeed, useChatMessages, useChatWithMemory |
main/server/api/memories/chat/app.py | Lambda: dispatcher + async worker + GPU resolution + health polling + _enqueue_gpu_retry |
main/server/api/memories/chat/retry.py | ChatGpuRetryFunction — SQS-driven GPU retry, answer save, Expo push notification |
main/server/api/users/push_token/app.py | UsersPushTokenFunction — PUT /users/push-token, writes Sessions table |
main/server/api/chats/messages/app.py | Lambda: GET messages for a chat |
main/server/api/chats/list/app.py | Lambda: list user's conversations |
main/server/layers/shared/python/shared/chat/repository.py | ChatRepository — mark_gpu_pending, update_message_ready, and other DynamoDB CRUD |
main/server/layers/shared/python/shared/gpu_utils.py | normalize_gpu_instance_id() — null sentinel handling |
main/server/worldmm/retrieval/agent.py | ReasoningAgent — multi-step search/answer loop |
main/server/worldmm/gpu_worker/server.py | FastAPI GPU worker — /health, /encode-video, /encode-text, /caption |
Architecture Notes
Async Dispatcher Pattern
The chat flow uses an async self-invocation pattern to decouple the synchronous API Gateway response from GPU warm-up time:
- The dispatcher Lambda runs on the synchronous HTTP request path. It creates/validates the chat, saves a placeholder assistant message (
ready=false), fires an async Lambda invocation (InvocationType=Event), saves the user message to DynamoDB, and returns{chat_id, message_id, status: "thinking"}— all within ~1-2 seconds, well under the API Gateway 29s hard timeout. - The async worker Lambda runs independently. It performs all GPU resolution, health polling, ReasoningAgent execution, and saves the assistant answer to DynamoDB with
ready=true. - The frontend polls
POST /chats/messagesevery 500ms for the specificmessage_iduntilready=true.pollForMessage.tshandles this loop with a 60-second timeout.
GPU Retry via SQS
When the async worker exhausts GPU_MAX_WAIT_MS (15s) without a healthy GPU response, instead of saving a failure string it:
- Calls
repo.mark_gpu_pending(chat_id, message_id)— setsgpu_pending=True, keepsready=False, setscontent="". - Enqueues a message to
ChatGpuRetryQueuewithretry_count=0andDelaySeconds=120. - Returns a
gpu_retry_enqueuedstatus —chatWithMemory.tsraisesGpuUnavailableErroron this response.
ChatGpuRetryFunction (triggered by SQS) retries every 2 minutes, up to GPU_RETRY_MAX_ATTEMPTS=15 (≈ 30 minutes total). When the GPU becomes healthy, it runs the full reasoning pipeline and writes ready=true — the frontend polling loop picks this up on the next poll cycle. A push notification is sent via Expo Push API so the user knows the answer is ready even if they have left the app.
The SQS visibility timeout (300s) exceeds GPU_RETRY_WAIT_MS (240s) to prevent double-processing of a single retry attempt.
Push Token Registration
On every app startup after successful auth, _layout.tsx calls registerForPushNotifications() (which requests Expo permission) and then savePushToken(token) (which calls PUT /users/push-token). The token is stored in the Sessions DynamoDB table keyed by user_id and read by ChatGpuRetryFunction when sending notifications. If the user has not granted permission or the device is a simulator, registration is silently skipped and push notifications are omitted from the retry flow.
GPU Instance Lifecycle
The GPU worker auto-shuts down after 1 hour of idle via a watchdog thread in server.py (IDLE_TIMEOUT_S=3600, checked every 30s). The watchdog does NOT update SSM when shutting down, which creates a stale-ID cycle. The tag-based fallback in _resolve_gpu_url and implementation() breaks this cycle by scanning for Name=encache-gpu-worker instances when the SSM ID is stale.
Private IP is always preferred over public IP because both the Lambda and GPU worker run in the same VPC. Using private IP ensures security-group-to-security-group rules apply correctly; public IP routes through NAT and loses SG identity.
API Gateway Timeout Constraint
API Gateway REST API has a hard 29-second maximum integration timeout. The Lambda Timeout is 60s, but any Lambda response arriving after 29s is discarded by API Gateway (client sees 504). GPU_MAX_WAIT_MS=15000 is set to keep the dispatcher Lambda within the 29s window even accounting for exponential backoff overshoot. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md for full diagnosis.