Chat Flow
System Intent
- What this is: The end-to-end flow for a user asking a natural-language question about their memories. Covers the React Native chat screen, the synchronous dispatcher Lambda, the async worker Lambda, GPU auto-start, and the GET /chats/messages polling path.
Mermaid Diagram
flowchart TD
ChatScreen["ChatScreen (chat.tsx)"] -->|"POST /memories/chat {question, chat_id?}"| APIGW["API Gateway REST (Cognito auth)"]
APIGW -->|"invoke sync"| DispatcherLambda["MemoriesChatFunction — dispatcher path"]
DispatcherLambda -->|"create chat / validate ownership"| DDBChats["DynamoDB: Chats table"]
DispatcherLambda -->|"lambda:InvokeFunction InvocationType=Event"| WorkerLambda["MemoriesChatFunction — async worker path"]
DispatcherLambda -->|"save user message"| DDBMessages["DynamoDB: Messages table"]
DispatcherLambda -->|"{chat_id, status=thinking}"| APIGW
APIGW -->|"200 {chat_id, status=thinking}"| ChatScreen
ChatScreen -->|"isLoading=true (Thinking...)"| UI["User sees 'Thinking...'"]
WorkerLambda -->|"ec2:DescribeInstances"| EC2["EC2: GPU worker instance"]
WorkerLambda -->|"GET /health (exponential backoff)"| GPUWorker["GPU Worker FastAPI\nport 8000"]
WorkerLambda -->|"ReasoningAgent.answer()"| GPUWorker
WorkerLambda -->|"save assistant message"| DDBMessages
WorkerLambda -->|"touch chat last_activity"| DDBChats
ChatScreen -->|"POST /chats/messages {chat_id}"| GetMsgs["ChatMessagesFunction"]
GetMsgs -->|"DynamoDB query"| DDBMessages
GetMsgs -->|"{messages, next_cursor}"| ChatScreen
ChatScreen -->|"render assistant bubble"| UI
Flows
Flow: sendMessage (frontend → dispatcher → async worker → poll)
- Test files:
main/app/__tests__/chat-with-memory.test.ts main/server/tests/unit/test_async_chat_dispatcher.py main/server/worldmm/tests/test_chat_endpoint.py - Core files:
main/app/app/chat.tsx (ChatScreen) main/app/lib/api/memory/chatWithMemory.ts main/app/lib/api/chats/getChatMessages.ts main/server/api/memories/chat/app.py (dispatcher + worker Lambda) main/server/layers/shared/python/shared/chat/repository.py
Types
// Frontend request (POST /memories/chat)
ChatRequest {
question: string (required)
chat_id: string (optional — omitted for new conversations)
verbose: boolean (optional — returns trace when true)
}
// Dispatcher response (synchronous, returned within ~1-2s)
DispatcherResponse {
chat_id: string
status: "thinking"
}
// Full answer response (from async worker, accessed via polling)
ChatMessage {
message_id: string (ULID)
chat_id: string
role: "user" | "assistant"
content: string
created_at: string (ISO 8601)
}
GetChatMessagesRequest {
chat_id: string (required)
cursor: string | null (optional — DynamoDB ExclusiveStartKey)
limit: number (optional, default 20)
}
GetChatMessagesResponse {
messages: ChatMessage[]
next_cursor: string | null
}
Paths
| path | input | output | path-type | notes |
sendMessage.new_chat | {question} no chat_id | {chat_id, status: "thinking"} | happy path | New chat created in Chats table with question as title |
sendMessage.existing_chat | {question, chat_id} | {chat_id, status: "thinking"} | happy path | Ownership verified; 403 if chat belongs to another user |
sendMessage.verbose | {question, verbose: true} | answer + trace object | happy path | Worker returns full ReasoningAgent trace rounds |
sendMessage.gpu_starting | GPU cold-starting | fallback message saved to DynamoDB | graceful degradation | Worker returns "GPU worker is starting" message immediately if URL never resolves |
sendMessage.gpu_unreachable | GPU terminated, no template | fallback message saved | graceful degradation | All EC2 recovery paths exhausted |
sendMessage.missing_question | {} | 400 ValueError | error | Dispatcher raises ValueError before any DB writes |
sendMessage.forbidden | {chat_id} owned by other user | 403 Forbidden | error | Ownership check in ChatRepository |
sendMessage.async_invoke_fail | Lambda invoke fails | 500 re-raised | error | User message NOT saved if invoke fails (prevents orphaned messages) |
Pseudocode
// === FRONTEND (chat.tsx) ===
askQuestion(question):
append user message bubble
setIsLoading(true) // shows "Thinking..." footer
{ answer, chat_id } = await chatWithMemory(question, chatId, isDebug)
setChatId(chat_id)
append assistant message bubble
invalidate ["chats", "feed"] (refreshes side menu)
setIsLoading(false)
// chatWithMemory.ts
POST /memories/chat { question, chat_id?, verbose? }
timeout: 60_000 ms (client-side axios timeout)
// Note: API Gateway hard limit is 29s — dispatcher returns well within that
return { answer, chat_id, trace? }
// NOTE: the chat screen does NOT poll for the answer.
// chatWithMemory.ts awaits the full response from the dispatcher,
// which in production returns { chat_id, status: "thinking" }.
// The current UI flow BLOCKS on chatWithMemory and renders the answer
// from the synchronous response — the dispatcher path sends { chat_id, status }
// but the handler wraps it so the response to the client is the dispatcher result.
// When the async worker later saves the answer, it is accessible via
// POST /chats/messages but the current chat.tsx does NOT poll — it just awaits
// the initial chatWithMemory call which returns once the dispatcher responds.
// === BACKEND DISPATCHER (lambda_handler → _dispatch_async) ===
lambda_handler(event):
if event._async_worker:
→ run worker path (see below)
else:
→ run _dispatch_async() (dispatcher path)
_dispatch_async(payload, auth_context):
validate question != "" else raise ValueError
validate user_id in auth_context else raise ValueError
repo = ChatRepository(MESSAGES_TABLE_NAME, CHATS_TABLE_NAME)
if chat_id is None:
chat_id = repo.create_chat(user_id, title=question[:120])
else:
verify repo.chat_owned_by(user_id, chat_id) else raise Forbidden
async_payload = { ...payload, _async_worker: True, _user_id: user_id, chat_id }
lambda.invoke(FunctionName=self, InvocationType="Event", Payload=async_payload)
repo.save_message(chat_id, "user", question) // saved AFTER invoke succeeds
return { chat_id, status: "thinking" }
// === ASYNC WORKER PATH ===
lambda_handler(event) when event._async_worker == True:
auth = { user_id: event._user_id }
result = implementation(payload, auth, _skip_user_message_save=True)
return { statusCode: 200, body: result }
implementation(payload, auth, _skip_user_message_save=False):
// 1. DB init
configure PostgreSQL from SSM secrets
orm.initDb()
// 2. DynamoDB setup
repo = ChatRepository(...)
if not _skip_user_message_save:
repo.save_message(chat_id, "user", question)
// 3. GPU resolution
gpu_instance_id = normalize(GPU_INSTANCE_ID env var)
if not gpu_instance_id:
→ find running by tag
→ find stopped by tag and start_instances
→ launch from template with type fallback
else:
gpu_worker_url = _resolve_gpu_url(instance_id, port)
// 4. GPU health polling
if gpu_worker_url:
is_healthy, elapsed = wait_for_gpu_health(url, GPU_MAX_WAIT_MS=15000, initial=2000)
if not is_healthy: gpu_worker_url = None
// 5. Fallback
if not gpu_worker_url:
repo.save_message(chat_id, "assistant", "GPU starting up…")
repo.touch_chat(user_id, chat_id)
return fallback
// 6. Answer
result = handle_chat(user_id, question, chat_id, gpu_worker_url)
return result
handle_chat(user_id, question, chat_id, gpu_worker_url):
vlm2vec = VLM2VecClient(gpu_worker_url)
reasoning_llm = GPULLMClient(vlm2vec)
embedder = TextEmbedder(vlm2vec)
episodic_graphs = load_episodic_graphs(user_id)
semantic_graph = load_semantic_graph(user_id)
agent = ReasoningAgent(reasoning_llm, response_llm, { episodic, semantic, visual })
answer, trace = agent.answer(question)
repo.save_message(chat_id, "assistant", answer)
repo.touch_chat(user_id, chat_id)
return { answer, chat_id, trace? }
Flow: gpuAutoStart
- Core files:
main/server/api/memories/chat/app.py _find_gpu_instance_by_tag() _resolve_gpu_url() _launch_gpu_with_type_fallback() _update_ssm_gpu_instance_id()
Paths
| path | input | output | path-type | notes |
gpuAutoStart.running_by_ssm | GPU_INSTANCE_ID env = valid instance ID, state=running | http://{privateIP}:{port} | happy path | Private IP preferred over public for SG-to-SG rules |
gpuAutoStart.running_by_tag | SSM ID stale/None, running instance found by tag | URL returned, SSM updated | happy path | Tag filter: Name=encache-gpu-worker, state=running |
gpuAutoStart.stopped_restart | Instance state=stopped | start_instances called, URL=None (worker waits) | recovery | Returns None immediately; health poll waits for boot |
gpuAutoStart.stopped_no_capacity | start_instances → InsufficientInstanceCapacity | SSM cleared to "none", launch from template fallback | recovery | normalize_gpu_instance_id("none") → None |
gpuAutoStart.terminated_relaunch | Instance state=terminated | EC2 run_instances with launch template + type fallback | recovery | SSM updated with new instance ID |
gpuAutoStart.type_fallback | Primary type unavailable (InsufficientInstanceCapacity) | Try next type in GPU_FALLBACK_INSTANCE_TYPES | recovery | Default fallback order: g6e.2xlarge,g5.2xlarge,g4dn.2xlarge |
gpuAutoStart.all_exhausted | All instance types fail | GPU URL = None, fallback message saved | error | Logs chat_gpu_all_types_exhausted |
Pseudocode
// GPU resolution when instance_id is set:
_resolve_gpu_url(instance_id, port, launch_template_id, region):
state = ec2.describe_instances([instance_id]).State.Name
if state in (stopped, stopping):
running = _find_running_gpu_instance_by_tag()
if running: adopt running instance, update SSM, return URL
if state == stopped:
try: ec2.start_instances([instance_id])
except InsufficientInstanceCapacity:
_update_ssm_gpu_instance_id("none")
if launch_template_id: _launch_gpu_with_type_fallback(...)
return None
if state == terminated:
running = _find_running_gpu_instance_by_tag()
if running: adopt running instance, update SSM, return URL
if launch_template_id: _launch_gpu_with_type_fallback(...)
return None
ip = instance.PrivateIpAddress or instance.PublicIpAddress
return f"http://{ip}:{port}"
// GPU resolution when instance_id is None (stale SSM / first deploy):
1. _find_gpu_instance_by_tag(state=running) → adopt if found
2. _find_gpu_instance_by_tag(state=stopped) → start_instances if found
3. _launch_gpu_with_type_fallback(launch_template_id) if still None
_launch_gpu_with_type_fallback(launch_template_id):
for type in GPU_FALLBACK_INSTANCE_TYPES:
try:
ami = _gpu_ami_for_type(type) // per-type AMI override from GPU_INSTANCE_TYPE_AMIS
resp = ec2.run_instances(LaunchTemplate, InstanceType=type, Tags=[Name=encache-gpu-worker])
_update_ssm_gpu_instance_id(resp.InstanceId)
return resp.InstanceId
except InsufficientInstanceCapacity:
continue // try next type
except other ClientError:
return None // surface in logs, abort
return None // all types exhausted
Flow: gpuHealthPoll
- Core files:
main/server/api/memories/chat/app.py wait_for_gpu_health() _gpu_is_healthy()
Pseudocode
wait_for_gpu_health(gpu_url, max_wait_ms=15000, initial_delay_ms=2000, lambda_context):
start = time.time()
retry_delay_ms = initial_delay_ms
retry_count = 0
lambda_timeout_guard_ms = 10000 // reserve 10s for response assembly
loop:
is_healthy = GET {gpu_url}/health (timeout=3s) == 200
if is_healthy: return (True, elapsed_ms)
elapsed_ms = (time.time() - start) * 1000
if elapsed_ms >= max_wait_ms: return (False, elapsed_ms) // max-wait exhausted
if lambda_remaining_ms - elapsed_ms <= 10000: return (False, elapsed_ms) // Lambda timeout guard
sleep(retry_delay_ms / 1000)
retry_delay_ms = min(retry_delay_ms * 2, 60000) // exponential backoff, cap 60s
// GPU worker /health endpoint (server.py):
GET /health:
if _embed_model is None or _caption_model is None:
return HTTP 503 (models still loading)
return { status: "ready", model: "VLM2Vec-2B+Qwen2-VL-7B", device: "cuda:0" }
Flow: getMessages (history load + post-answer refresh)
- Test files:
main/app/__tests__/get-chat-messages.test.ts - Core files:
main/app/lib/api/chats/getChatMessages.ts main/server/api/chats/messages/app.py
Paths
| path | input | output | path-type | notes |
getMessages.load_history | {chat_id} on screen open | {messages[], next_cursor} | happy path | Called once on mount when opening an existing chat |
getMessages.paginated | {chat_id, cursor} | {messages[], next_cursor} | happy path | Messages sorted descending (newest first); reversed in UI |
getMessages.forbidden | chat_id not owned by user | 403 | error | ChatRepository.get_messages checks ownership |
getMessages.missing_chat_id | {} | 400 ValueError | error | Validated in implementation() |
Pseudocode
// Backend: ChatMessagesFunction
implementation(payload, auth_context):
validate user_id, chat_id
repo.get_messages(user_id, chat_id, cursor, limit=20)
// ChatRepository.get_messages:
verify chat_owned_by(user_id, chat_id) else Forbidden
DynamoDB.query(
KeyConditionExpression: "chat_id = :cid",
ScanIndexForward: False, // newest first
Limit: limit,
ExclusiveStartKey: cursor // if paginating
)
return (items, LastEvaluatedKey)
// Frontend: chat.tsx history load
useEffect (on mount, if params.chatId):
{ messages } = await getChatMessages({ chat_id: params.chatId })
setMessages([...messages].reverse()) // reverse to chronological order
Flow: ReasoningAgent (multi-step retrieval loop)
- Core files:
main/server/worldmm/retrieval/agent.py
Pseudocode
ReasoningAgent.answer(question):
rounds = [], accumulated_context = [], trace = []
for round in range(MAX_ROUNDS=5):
messages = _build_reasoning_messages(question, rounds)
raw = reasoning_llm.call(model=gpu, json_object)
action = parse_agent_action(raw) // normalizes {decision/action} field
if action.action == "ANSWER": break
if action.action == "SEARCH":
results = retrievers[action.memory_type](action.query)
// retrievers: "episodic", "semantic", "visual"
rounds.append({ memory_type, query, results })
accumulated_context.extend(results)
trace.append({ round, decision: SEARCH, memory_type, agent_query, results, result_count })
response = _generate_response(question, accumulated_context)
return response, trace
Logs
| Source | Location |
| Dispatcher start/invoke | CloudWatch: /aws/lambda/server-MemoriesChatFunction-* step chat_dispatcher_start, chat_dispatcher_async_invoke_sent |
| GPU resolve / EC2 state | step gpu_instance_state_issue, gpu_instance_adopted_by_tag, gpu_instance_starting_stopped, gpu_instance_relaunching |
| GPU health polling | step gpu_retry_healthy, gpu_retry_exhausted_max_wait, gpu_retry_lambda_timeout_approaching |
| GPU launch | step chat_gpu_instance_launched, chat_gpu_capacity_unavailable, chat_gpu_all_types_exhausted |
| SSM update | step ssm_gpu_instance_id_updated, ssm_gpu_instance_id_update_failed |
| Worker path | step chat_impl_start, chat_gpu_resolved, chat_gpu_retry_failed |
| Answer complete | log_event("chat", "agent_answer_complete", ...) |
Deployment
- Mechanism:
SAM - Deploy command:
cd main/server
sam build
sam deploy
- Notes:
- Stack:
server, region: us-east-1 MemoriesChatFunction: Python 3.12, Timeout 60s, MemorySize 2048 MB, in VPC ChatMessagesFunction: Python 3.12, default 30s Timeout ChatsListFunction: Python 3.12, default 30s Timeout GPU_MAX_WAIT_MS=15000 is hardcoded in template.yaml (not overridable via SSM). This caps GPU health polling to keep the total Lambda duration well under the API Gateway REST API 29s hard timeout. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md. - GPU config resolved from SSM at deploy time:
/encache/gpu/instance_id, /encache/gpu/worker_port, /encache/gpu/launch_template_id, /encache/gpu/instance_type_amis - DynamoDB tables:
encache-chats (key: user_id + chat_id, GSI: user_id-last_activity-index), encache-chat-messages (key: chat_id + message_id) MemoriesChatFunction has lambda:InvokeFunction permission scoped to itself (for async self-invocation)
Key Files
| File | Role |
main/app/app/chat.tsx | Chat screen UI — message list, InputDock, "Thinking..." indicator |
main/app/lib/api/memory/chatWithMemory.ts | POST /memories/chat API call (60s client timeout) |
main/app/lib/api/chats/getChatMessages.ts | POST /chats/messages API call (history load) |
main/app/lib/api/chats/fetchChats.ts | POST /chats/list API call (side-menu conversation list) |
main/app/lib/api/chats/useChatsApi.ts | React Query hooks: useChatsFeed, useChatMessages, useChatWithMemory |
main/server/api/memories/chat/app.py | Lambda: dispatcher + async worker + GPU resolution + health polling |
main/server/api/chats/messages/app.py | Lambda: GET messages for a chat |
main/server/api/chats/list/app.py | Lambda: list user's conversations |
main/server/layers/shared/python/shared/chat/repository.py | ChatRepository — DynamoDB CRUD for chats and messages |
main/server/layers/shared/python/shared/gpu_utils.py | normalize_gpu_instance_id() — null sentinel handling |
main/server/worldmm/retrieval/agent.py | ReasoningAgent — multi-step search/answer loop |
main/server/worldmm/gpu_worker/server.py | FastAPI GPU worker — /health, /encode-video, /encode-text, /caption |
Architecture Notes
Async Dispatcher Pattern
The chat flow uses an async self-invocation pattern to decouple the synchronous API Gateway response from GPU warm-up time:
- The dispatcher Lambda runs on the synchronous HTTP request path. It creates/validates the chat, fires an async Lambda invocation (InvocationType=Event), saves the user message to DynamoDB, and returns
{chat_id, status: "thinking"} — all within ~1-2 seconds, well under the API Gateway 29s hard timeout. - The async worker Lambda runs independently. It performs all GPU resolution, health polling, ReasoningAgent execution, and saves the assistant answer to DynamoDB.
- The frontend currently does not poll for the answer after receiving
{status: "thinking"} — chatWithMemory.ts awaits the full answer from a single POST call. This means the status: "thinking" response is the terminal response for the current UI.
GPU Instance Lifecycle
The GPU worker auto-shuts down after 1 hour of idle via a watchdog thread in server.py (IDLE_TIMEOUT_S=3600, checked every 30s). The watchdog does NOT update SSM when shutting down, which creates a stale-ID cycle. The tag-based fallback in _resolve_gpu_url and implementation() breaks this cycle by scanning for Name=encache-gpu-worker instances when the SSM ID is stale.
Private IP is always preferred over public IP because both the Lambda and GPU worker run in the same VPC. Using private IP ensures security-group-to-security-group rules apply correctly; public IP routes through NAT and loses SG identity.
API Gateway Timeout Constraint
API Gateway REST API has a hard 29-second maximum integration timeout. The Lambda Timeout is 60s, but any Lambda response arriving after 29s is discarded by API Gateway (client sees 504). GPU_MAX_WAIT_MS=15000 is set to keep the dispatcher Lambda within the 29s window even accounting for exponential backoff overshoot. See docs/bugs/2026-05-14-chat-504-api-gateway-29s-hard-timeout.md for full diagnosis.