Skip to content

GPU Retry & Push Notification

Metadata

  • System type: flow

System Intent

  • What this is: The backend retry loop and push notification pipeline that activates when the async chat worker exhausts its GPU health-check budget. On timeout the worker marks the placeholder message as gpu_pending and enqueues a retry record to ChatGpuRetryQueue. ChatGpuRetryFunction retries up to 15 times (≈ 30 minutes at 2-minute intervals). On success it saves the answer (ready=true) and fires an Expo push notification. On max-retries it saves a failure message and fires a failure notification. The frontend polling loop already handles the eventual ready=true update; the push notification is provided so the user can leave the app and still be alerted.

Mermaid Diagram

flowchart TD
  Worker["MemoriesChatFunction (async worker)"] -->|"GPU_MAX_WAIT_MS exhausted\nmark_gpu_pending + enqueue retry_count=0"| SQS["ChatGpuRetryQueue\n(SQS, encache-chat-gpu-retry)"]
  SQS -->|"DelaySeconds=120\ntrigger"| RetryFn["ChatGpuRetryFunction\napi/memories/chat/retry.py"]

  RetryFn -->|"_find_running_gpu_instance_by_tag"| EC2["EC2 (GPU worker)"]
  EC2 -->|"running instance found"| RetryFn
  RetryFn -->|"wait_for_gpu_health GPU_RETRY_WAIT_MS=240s"| GPU["GPU Worker /health"]
  GPU -->|"healthy"| RetryFn
  RetryFn -->|"handle_chat → save answer ready=true"| DDB["DynamoDB Messages"]
  RetryFn -->|"GET push_token"| Sessions["Sessions DynamoDB"]
  RetryFn -->|"POST /push/send title=Your answer is ready"| Expo["Expo Push API"]
  Expo -->|"push notification"| App["Mobile App"]

  RetryFn -->|"GPU not found or unhealthy\nretry_count < 15: re-enqueue retry_count+1"| SQS
  RetryFn -->|"retry_count >= 15:\nsave failure msg ready=true\nPOST /push/send title=Couldn't answer"| DDB

  DDB -->|"ready=true picked up by\npollForMessage loop"| App

  AppLayout["_layout.tsx (on auth)"] -->|"PUT /users/push-token"| PushTokenFn["UsersPushTokenFunction"]
  PushTokenFn -->|"SET push_token"| Sessions

Flows

Flow: gpuRetry (ChatGpuRetryFunction)

  • Test files: main/server/api/memories/chat/__tests__/test_gpu_retry.py
  • Core files:
  • main/server/api/memories/chat/retry.py
  • main/server/api/memories/chat/app.py (_find_running_gpu_instance_by_tag, handle_chat, wait_for_gpu_health)
  • main/server/layers/shared/python/shared/chat/repository.py

Types

// SQS message body (ChatGpuRetryQueue)
GpuRetryRecord {
  chat_id: string
  message_id: string
  user_id: string
  question: string
  retry_count: number   // 0-based; compared against GPU_RETRY_MAX_ATTEMPTS (default 15)
}

Paths

path input output path-type notes
gpuRetry.gpu_healthy SQS record, GPU healthy within GPU_RETRY_WAIT_MS=240s update_message_ready(content=answer, ready=True); Expo push notification sent happy path push skipped if push_token absent from Sessions table
gpuRetry.gpu_not_found_reenqueue SQS record, no running GPU instance by tag re-enqueued with retry_count+1, DelaySeconds=120 graceful degradation _find_running_gpu_instance_by_tag returns None
gpuRetry.gpu_unhealthy_reenqueue SQS record, GPU found but /health fails within GPU_RETRY_WAIT_MS re-enqueued with retry_count+1, DelaySeconds=120 graceful degradation SQS visibility timeout 300s prevents double-processing
gpuRetry.max_retries_exceeded retry_count >= GPU_RETRY_MAX_ATTEMPTS (15) update_message_ready(content=failure_message, ready=True); failure push notification sent graceful degradation failure message: "Sorry, the GPU took too long to start. Please try again."
gpuRetry.no_push_token GPU healthy, Sessions table has no push_token for user answer saved normally; Expo Push API not called happy path non-fatal; user sees answer on next poll or history load
gpuRetry.invalid_message SQS record missing chat_id, message_id, user_id, or question logged and skipped; no error raised error

Pseudocode

retry_handler(event, context):
  for record in event.Records:
    body = json.loads(record.body)
    { chat_id, message_id, user_id, question, retry_count } = body

    validate all fields present

    max_attempts = GPU_RETRY_MAX_ATTEMPTS env (default 15)
    gpu_wait_ms  = GPU_RETRY_WAIT_MS env (default 240000)

    if retry_count >= max_attempts:
      repo.update_message_ready(chat_id, message_id, failure_message)
      push_token = _get_push_token(user_id)     // Sessions.get_item(Key={sessionId: user_id})
      if push_token:
        _send_push_notification(push_token, "Couldn't answer",
          "The GPU took too long to start. Please open the app and retry.")
      return

    // GPU resolution — tag-based only (no SSM read in retry Lambda)
    running = _find_running_gpu_instance_by_tag(region)
    if not running:
      _re_enqueue(chat_id, message_id, user_id, question, retry_count)  // retry_count+1 inside
      return

    ip = running.PrivateIpAddress or running.PublicIpAddress
    gpu_worker_url = http://{ip}:{GPU_WORKER_PORT}

    is_healthy, _ = wait_for_gpu_health(gpu_worker_url, max_wait_ms=gpu_wait_ms,
                                        initial_delay_ms=5000, lambda_context=None)
    if not is_healthy:
      _re_enqueue(chat_id, message_id, user_id, question, retry_count)
      return

    // Run full reasoning pipeline
    handle_chat(user_id, question, chat_id, gpu_worker_url,
                gpu_instance_id=running.InstanceId, verbose=False,
                _repo=repo, message_id=message_id)
    // handle_chat calls update_message_ready internally (ready=True, content=answer)

    push_token = _get_push_token(user_id)
    if push_token:
      _send_push_notification(push_token, "Your answer is ready", question[:80])

// _re_enqueue(chat_id, message_id, user_id, question, retry_count):
  sqs.send_message(
    QueueUrl=CHAT_GPU_RETRY_QUEUE_URL,
    MessageBody=json.dumps({...retry_count+1}),
    DelaySeconds=120,
  )

Flow: workerTimeout (async worker GPU-timeout path)

  • Test files: main/server/api/memories/chat/__tests__/test_worker_timeout.py
  • Core files:
  • main/server/api/memories/chat/app.py (implementation, _enqueue_gpu_retry)
  • main/server/layers/shared/python/shared/chat/repository.py (mark_gpu_pending)

Paths

path input output path-type notes
workerTimeout.async_path _async_worker=True, message_id set, GPU health check exhausted mark_gpu_pending called; SQS message enqueued retry_count=0, DelaySeconds=120; returns {status: "gpu_retry_enqueued"} happy path frontend receives GpuUnavailableError from chatWithMemory.ts
workerTimeout.legacy_sync no message_id (legacy path), GPU health check exhausted static fallback message saved ready=True graceful degradation kept for backwards compat; no push notification

Pseudocode

implementation(payload, auth_context, _skip_user_message_save=False):
  ...
  if not gpu_worker_url:
    if message_id:   // async dispatcher path
      repo.mark_gpu_pending(chat_id, message_id)
      // mark_gpu_pending: SET gpu_pending=True, ready=False, content="", updated_at=now
      _enqueue_gpu_retry(chat_id, message_id, user_id, question, retry_count=0, delay_seconds=120)
      return { status: "gpu_retry_enqueued", chat_id, message_id }
    else:            // legacy sync path
      repo.save_message(chat_id, "assistant", fallback_string)
      repo.touch_chat(user_id, chat_id)
      return { answer: fallback_string, chat_id }

Flow: pushTokenRegistration (app startup)

  • Test files: main/app/lib/__tests__/push-notifications.test.ts
  • Core files:
  • main/app/app/_layout.tsx
  • main/app/lib/push-notifications.ts
  • main/app/lib/api/users/pushToken.ts
  • main/server/api/users/push_token/app.py

Types

// PUT /users/push-token request body
PushTokenRequest {
  push_token: string (required, non-empty)
}

// PUT /users/push-token response (HTTP 200)
PushTokenResponse {
  success: true
}

Paths

path input output path-type notes
pushTokenRegistration.granted physical device, permission granted, EXPO_PUBLIC_EAS_PROJECT_ID set HTTP 200 {success: true}; Sessions table push_token set happy path updated_at written alongside push_token
pushTokenRegistration.denied user denies notification permission registerForPushNotifications returns null; savePushToken not called graceful degradation silent; retry flow omits push
pushTokenRegistration.simulator Device.isDevice = false returns null; no permission request graceful degradation
pushTokenRegistration.no_project_id EXPO_PUBLIC_EAS_PROJECT_ID not set returns null graceful degradation logged to console
pushTokenRegistration.missing_token empty push_token in request body HTTP 400 InvalidInputError code=INVALID_INPUT error
pushTokenRegistration.unauthenticated no user_id in auth context HTTP 401 AuthError code=AUTH_MISSING error

Pseudocode

// _layout.tsx
useEffect([user]):
  if user:
    token = await registerForPushNotifications()
    if token: await savePushToken(token)

// push-notifications.ts
registerForPushNotifications():
  if not Device.isDevice: return null
  { status } = await Notifications.getPermissionsAsync()
  if status != "granted":
    { status } = await Notifications.requestPermissionsAsync()
  if status != "granted": return null
  projectId = process.env.EXPO_PUBLIC_EAS_PROJECT_ID
  if not projectId: return null
  token = await Notifications.getExpoPushTokenAsync({ projectId })
  return token.data   // "ExponentPushToken[...]"

// UsersPushTokenFunction (PUT /users/push-token)
handle_push_token(payload, auth_context):
  push_token = payload.get("push_token", "").strip()
  if not push_token: raise InvalidInputError
  user_id = auth_context.user_id
  Sessions.update_item(
    Key={ sessionId: user_id },
    UpdateExpression="SET push_token = :t, updated_at = :u"
  )
  return { success: True }

Logs

Source Location
Worker enqueue CloudWatch: /aws/lambda/server-MemoriesChatFunction-* step gpu_retry_enqueued
Retry Lambda invoked CloudWatch: /aws/lambda/server-ChatGpuRetryFunction-* step gpu_retry_handler_invoked
Retry reenqueue step gpu_retry_reenqueued, gpu_retry_not_available_reenqueued, gpu_retry_health_check_failed_reenqueued
Retry success step gpu_retry_success
Max retries exceeded step gpu_retry_max_attempts_exceeded
Push notification sent step push_notification_sent, push_notification_send_failed
Push token read step push_token_retrieved, push_token_not_found, push_token_retrieval_failed
Push token stored CloudWatch: /aws/lambda/server-UsersPushTokenFunction-* step push_token_stored, push_token_store_failed

Deployment

  • Mechanism: SAM
  • Deploy command:
    cd main/server
    sam build
    sam deploy
    
  • Notes:
  • ChatGpuRetryQueue: SQS queue encache-chat-gpu-retry, VisibilityTimeout 300s (exceeds GPU_RETRY_WAIT_MS=240s), MessageRetentionPeriod 3600s, DLQ (ChatGpuRetryDLQ) maxReceiveCount 3.
  • ChatGpuRetryFunction: Python 3.12, Timeout 300s, MemorySize 2048 MB, in VPC. Env vars: GPU_RETRY_WAIT_MS=240000, GPU_RETRY_MAX_ATTEMPTS=15, CHAT_GPU_RETRY_QUEUE_URL, SESSIONS_TABLE_NAME, MESSAGES_TABLE_NAME, CHATS_TABLE_NAME.
  • UsersPushTokenFunction: Python 3.12, default Timeout, no VPC. Env var: SESSIONS_TABLE_NAME. API Gateway route: PUT /users/push-token (Cognito-authenticated).
  • MemoriesChatFunction has sqs:SendMessage permission on ChatGpuRetryQueue.Arn.
  • Push token stored in Sessions DynamoDB table (existing table; new push_token attribute added via update_item).
  • Expo push token obtained using EXPO_PUBLIC_EAS_PROJECT_ID env var (must be set in EAS build config).

Key Files

File Role
main/server/api/memories/chat/retry.py ChatGpuRetryFunction handler — GPU resolution, health check, answer save, push notification, re-enqueue
main/server/api/memories/chat/app.py _enqueue_gpu_retry() called on worker GPU timeout; also provides _find_running_gpu_instance_by_tag, handle_chat, wait_for_gpu_health used by retry Lambda
main/server/api/users/push_token/app.py UsersPushTokenFunction handler — validates and stores push token in Sessions table
main/server/layers/shared/python/shared/chat/repository.py mark_gpu_pending() and update_message_ready() — DynamoDB writes on GPU timeout and retry completion
main/app/app/_layout.tsx Calls registerForPushNotifications + savePushToken on auth
main/app/lib/push-notifications.ts Expo permission request and token retrieval
main/app/lib/api/users/pushToken.ts savePushToken(token) — PUT /users/push-token API client
main/app/app/chat.tsx Catches GpuUnavailableError; shows "Starting up the GPU, we will notify you when we have an answer"