GPU Retry & Push Notification
System Intent
- What this is: The backend retry loop and push notification pipeline that activates when the async chat worker exhausts its GPU health-check budget. On timeout the worker marks the placeholder message as
gpu_pending and enqueues a retry record to ChatGpuRetryQueue. ChatGpuRetryFunction retries up to 15 times (≈ 30 minutes at 2-minute intervals). On success it saves the answer (ready=true) and fires an Expo push notification. On max-retries it saves a failure message and fires a failure notification. The frontend polling loop already handles the eventual ready=true update; the push notification is provided so the user can leave the app and still be alerted.
Mermaid Diagram
flowchart TD
Worker["MemoriesChatFunction (async worker)"] -->|"GPU_MAX_WAIT_MS exhausted\nmark_gpu_pending + enqueue retry_count=0"| SQS["ChatGpuRetryQueue\n(SQS, encache-chat-gpu-retry)"]
SQS -->|"DelaySeconds=120\ntrigger"| RetryFn["ChatGpuRetryFunction\napi/memories/chat/retry.py"]
RetryFn -->|"_find_running_gpu_instance_by_tag"| EC2["EC2 (GPU worker)"]
EC2 -->|"running instance found"| RetryFn
RetryFn -->|"wait_for_gpu_health GPU_RETRY_WAIT_MS=240s"| GPU["GPU Worker /health"]
GPU -->|"healthy"| RetryFn
RetryFn -->|"handle_chat → save answer ready=true"| DDB["DynamoDB Messages"]
RetryFn -->|"GET push_token"| Sessions["Sessions DynamoDB"]
RetryFn -->|"POST /push/send title=Your answer is ready"| Expo["Expo Push API"]
Expo -->|"push notification"| App["Mobile App"]
RetryFn -->|"GPU not found or unhealthy\nretry_count < 15: re-enqueue retry_count+1"| SQS
RetryFn -->|"retry_count >= 15:\nsave failure msg ready=true\nPOST /push/send title=Couldn't answer"| DDB
DDB -->|"ready=true picked up by\npollForMessage loop"| App
AppLayout["_layout.tsx (on auth)"] -->|"PUT /users/push-token"| PushTokenFn["UsersPushTokenFunction"]
PushTokenFn -->|"SET push_token"| Sessions
Flows
Flow: gpuRetry (ChatGpuRetryFunction)
- Test files:
main/server/api/memories/chat/__tests__/test_gpu_retry.py - Core files:
main/server/api/memories/chat/retry.py main/server/api/memories/chat/app.py (_find_running_gpu_instance_by_tag, handle_chat, wait_for_gpu_health) main/server/layers/shared/python/shared/chat/repository.py
Types
// SQS message body (ChatGpuRetryQueue)
GpuRetryRecord {
chat_id: string
message_id: string
user_id: string
question: string
retry_count: number // 0-based; compared against GPU_RETRY_MAX_ATTEMPTS (default 15)
}
Paths
| path | input | output | path-type | notes |
gpuRetry.gpu_healthy | SQS record, GPU healthy within GPU_RETRY_WAIT_MS=240s | update_message_ready(content=answer, ready=True); Expo push notification sent | happy path | push skipped if push_token absent from Sessions table |
gpuRetry.gpu_not_found_reenqueue | SQS record, no running GPU instance by tag | re-enqueued with retry_count+1, DelaySeconds=120 | graceful degradation | _find_running_gpu_instance_by_tag returns None |
gpuRetry.gpu_unhealthy_reenqueue | SQS record, GPU found but /health fails within GPU_RETRY_WAIT_MS | re-enqueued with retry_count+1, DelaySeconds=120 | graceful degradation | SQS visibility timeout 300s prevents double-processing |
gpuRetry.max_retries_exceeded | retry_count >= GPU_RETRY_MAX_ATTEMPTS (15) | update_message_ready(content=failure_message, ready=True); failure push notification sent | graceful degradation | failure message: "Sorry, the GPU took too long to start. Please try again." |
gpuRetry.no_push_token | GPU healthy, Sessions table has no push_token for user | answer saved normally; Expo Push API not called | happy path | non-fatal; user sees answer on next poll or history load |
gpuRetry.invalid_message | SQS record missing chat_id, message_id, user_id, or question | logged and skipped; no error raised | error | |
Pseudocode
retry_handler(event, context):
for record in event.Records:
body = json.loads(record.body)
{ chat_id, message_id, user_id, question, retry_count } = body
validate all fields present
max_attempts = GPU_RETRY_MAX_ATTEMPTS env (default 15)
gpu_wait_ms = GPU_RETRY_WAIT_MS env (default 240000)
if retry_count >= max_attempts:
repo.update_message_ready(chat_id, message_id, failure_message)
push_token = _get_push_token(user_id) // Sessions.get_item(Key={sessionId: user_id})
if push_token:
_send_push_notification(push_token, "Couldn't answer",
"The GPU took too long to start. Please open the app and retry.")
return
// GPU resolution — tag-based only (no SSM read in retry Lambda)
running = _find_running_gpu_instance_by_tag(region)
if not running:
_re_enqueue(chat_id, message_id, user_id, question, retry_count) // retry_count+1 inside
return
ip = running.PrivateIpAddress or running.PublicIpAddress
gpu_worker_url = http://{ip}:{GPU_WORKER_PORT}
is_healthy, _ = wait_for_gpu_health(gpu_worker_url, max_wait_ms=gpu_wait_ms,
initial_delay_ms=5000, lambda_context=None)
if not is_healthy:
_re_enqueue(chat_id, message_id, user_id, question, retry_count)
return
// Run full reasoning pipeline
handle_chat(user_id, question, chat_id, gpu_worker_url,
gpu_instance_id=running.InstanceId, verbose=False,
_repo=repo, message_id=message_id)
// handle_chat calls update_message_ready internally (ready=True, content=answer)
push_token = _get_push_token(user_id)
if push_token:
_send_push_notification(push_token, "Your answer is ready", question[:80])
// _re_enqueue(chat_id, message_id, user_id, question, retry_count):
sqs.send_message(
QueueUrl=CHAT_GPU_RETRY_QUEUE_URL,
MessageBody=json.dumps({...retry_count+1}),
DelaySeconds=120,
)
Flow: workerTimeout (async worker GPU-timeout path)
- Test files:
main/server/api/memories/chat/__tests__/test_worker_timeout.py - Core files:
main/server/api/memories/chat/app.py (implementation, _enqueue_gpu_retry) main/server/layers/shared/python/shared/chat/repository.py (mark_gpu_pending)
Paths
| path | input | output | path-type | notes |
workerTimeout.async_path | _async_worker=True, message_id set, GPU health check exhausted | mark_gpu_pending called; SQS message enqueued retry_count=0, DelaySeconds=120; returns {status: "gpu_retry_enqueued"} | happy path | frontend receives GpuUnavailableError from chatWithMemory.ts |
workerTimeout.legacy_sync | no message_id (legacy path), GPU health check exhausted | static fallback message saved ready=True | graceful degradation | kept for backwards compat; no push notification |
Pseudocode
implementation(payload, auth_context, _skip_user_message_save=False):
...
if not gpu_worker_url:
if message_id: // async dispatcher path
repo.mark_gpu_pending(chat_id, message_id)
// mark_gpu_pending: SET gpu_pending=True, ready=False, content="", updated_at=now
_enqueue_gpu_retry(chat_id, message_id, user_id, question, retry_count=0, delay_seconds=120)
return { status: "gpu_retry_enqueued", chat_id, message_id }
else: // legacy sync path
repo.save_message(chat_id, "assistant", fallback_string)
repo.touch_chat(user_id, chat_id)
return { answer: fallback_string, chat_id }
Flow: pushTokenRegistration (app startup)
- Test files:
main/app/lib/__tests__/push-notifications.test.ts - Core files:
main/app/app/_layout.tsx main/app/lib/push-notifications.ts main/app/lib/api/users/pushToken.ts main/server/api/users/push_token/app.py
Types
// PUT /users/push-token request body
PushTokenRequest {
push_token: string (required, non-empty)
}
// PUT /users/push-token response (HTTP 200)
PushTokenResponse {
success: true
}
Paths
| path | input | output | path-type | notes |
pushTokenRegistration.granted | physical device, permission granted, EXPO_PUBLIC_EAS_PROJECT_ID set | HTTP 200 {success: true}; Sessions table push_token set | happy path | updated_at written alongside push_token |
pushTokenRegistration.denied | user denies notification permission | registerForPushNotifications returns null; savePushToken not called | graceful degradation | silent; retry flow omits push |
pushTokenRegistration.simulator | Device.isDevice = false | returns null; no permission request | graceful degradation | |
pushTokenRegistration.no_project_id | EXPO_PUBLIC_EAS_PROJECT_ID not set | returns null | graceful degradation | logged to console |
pushTokenRegistration.missing_token | empty push_token in request body | HTTP 400 InvalidInputError code=INVALID_INPUT | error | |
pushTokenRegistration.unauthenticated | no user_id in auth context | HTTP 401 AuthError code=AUTH_MISSING | error | |
Pseudocode
// _layout.tsx
useEffect([user]):
if user:
token = await registerForPushNotifications()
if token: await savePushToken(token)
// push-notifications.ts
registerForPushNotifications():
if not Device.isDevice: return null
{ status } = await Notifications.getPermissionsAsync()
if status != "granted":
{ status } = await Notifications.requestPermissionsAsync()
if status != "granted": return null
projectId = process.env.EXPO_PUBLIC_EAS_PROJECT_ID
if not projectId: return null
token = await Notifications.getExpoPushTokenAsync({ projectId })
return token.data // "ExponentPushToken[...]"
// UsersPushTokenFunction (PUT /users/push-token)
handle_push_token(payload, auth_context):
push_token = payload.get("push_token", "").strip()
if not push_token: raise InvalidInputError
user_id = auth_context.user_id
Sessions.update_item(
Key={ sessionId: user_id },
UpdateExpression="SET push_token = :t, updated_at = :u"
)
return { success: True }
Logs
| Source | Location |
| Worker enqueue | CloudWatch: /aws/lambda/server-MemoriesChatFunction-* step gpu_retry_enqueued |
| Retry Lambda invoked | CloudWatch: /aws/lambda/server-ChatGpuRetryFunction-* step gpu_retry_handler_invoked |
| Retry reenqueue | step gpu_retry_reenqueued, gpu_retry_not_available_reenqueued, gpu_retry_health_check_failed_reenqueued |
| Retry success | step gpu_retry_success |
| Max retries exceeded | step gpu_retry_max_attempts_exceeded |
| Push notification sent | step push_notification_sent, push_notification_send_failed |
| Push token read | step push_token_retrieved, push_token_not_found, push_token_retrieval_failed |
| Push token stored | CloudWatch: /aws/lambda/server-UsersPushTokenFunction-* step push_token_stored, push_token_store_failed |
Deployment
- Mechanism:
SAM - Deploy command:
cd main/server
sam build
sam deploy
- Notes:
ChatGpuRetryQueue: SQS queue encache-chat-gpu-retry, VisibilityTimeout 300s (exceeds GPU_RETRY_WAIT_MS=240s), MessageRetentionPeriod 3600s, DLQ (ChatGpuRetryDLQ) maxReceiveCount 3. ChatGpuRetryFunction: Python 3.12, Timeout 300s, MemorySize 2048 MB, in VPC. Env vars: GPU_RETRY_WAIT_MS=240000, GPU_RETRY_MAX_ATTEMPTS=15, CHAT_GPU_RETRY_QUEUE_URL, SESSIONS_TABLE_NAME, MESSAGES_TABLE_NAME, CHATS_TABLE_NAME. UsersPushTokenFunction: Python 3.12, default Timeout, no VPC. Env var: SESSIONS_TABLE_NAME. API Gateway route: PUT /users/push-token (Cognito-authenticated). MemoriesChatFunction has sqs:SendMessage permission on ChatGpuRetryQueue.Arn. - Push token stored in Sessions DynamoDB table (existing table; new
push_token attribute added via update_item). - Expo push token obtained using
EXPO_PUBLIC_EAS_PROJECT_ID env var (must be set in EAS build config).
Key Files
| File | Role |
main/server/api/memories/chat/retry.py | ChatGpuRetryFunction handler — GPU resolution, health check, answer save, push notification, re-enqueue |
main/server/api/memories/chat/app.py | _enqueue_gpu_retry() called on worker GPU timeout; also provides _find_running_gpu_instance_by_tag, handle_chat, wait_for_gpu_health used by retry Lambda |
main/server/api/users/push_token/app.py | UsersPushTokenFunction handler — validates and stores push token in Sessions table |
main/server/layers/shared/python/shared/chat/repository.py | mark_gpu_pending() and update_message_ready() — DynamoDB writes on GPU timeout and retry completion |
main/app/app/_layout.tsx | Calls registerForPushNotifications + savePushToken on auth |
main/app/lib/push-notifications.ts | Expo permission request and token retrieval |
main/app/lib/api/users/pushToken.ts | savePushToken(token) — PUT /users/push-token API client |
main/app/app/chat.tsx | Catches GpuUnavailableError; shows "Starting up the GPU, we will notify you when we have an answer" |