Skip to content

GPU Caption Queue + DLQ-Backed Alerting

Plan Metadata

  • Plan type: plan
  • Parent plan: N/A
  • Depends on: PR #396 (auto-bootstrap GPU when SSM holds 'none' sentinel) — merged
  • Status: draft

Status semantics: - draft: Plan is being created or updated and is not final. - approved: Plan is approved but not yet applied in code. - documentation: Code currently exists and matches the plan contract.

Update rule: - When an existing plan is edited, set status to draft until re-approved.

System Intent

  • What is being built: Replace the silent "mark complete on no GPU" path in the visual-enrichment pipeline with a queue-and-retry mechanism backed by the existing IngestDLQ. Add a single CloudWatch alarm on DLQ depth that pages Discord when the system is seriously broken. Provide a Discord slash command to mute alerts during known-degraded windows.
  • Primary consumer(s): IngestWindowFunction (visual enrichment Lambda); IngestDLQConsumer (re-invokes ingest); DiscordAlertsFunction (notification sink); DiscordInteractionFunction (slash-command handler).
  • Boundary (black-box scope only): VLM2VecClient._ensure_running() is the GPU-bootstrap signal — internals out of scope (see PR #396). aws_sns_topic.cost_alerts and the discord_alerts Lambda are the existing alarm fan-out — internals out of scope.

Motivation

The existing pipeline silently degraded for 90+ days because ingest_window.py:223 marked segments processing_status="complete" whenever GPU_INSTANCE_ID was empty. CloudTrail showed zero GPU lifecycle events the entire window. PR #396 addressed bootstrap; this plan addresses the observability and recoverability gaps that allowed the regression to hide.

Goals:

  1. No more silent skip — segments awaiting enrichment must be visible somewhere queryable.
  2. Captions recover automatically once GPU returns. Not lost.
  3. Operator gets paged on Discord when the system is seriously broken (precision over count not required).
  4. Operator can mute alerts for a bounded window (default 1 day, override allowed).

Non-goals:

  1. Per-segment retry-count tracking in DB.
  2. Alarm precision beyond "something is wrong" (we accept conflation of GPU-down with other transient bugs).
  3. New SQS queue or new drain Lambda — reuse existing IngestDLQ + IngestDLQConsumer.

Stage Gate Tracker

  • [ ] Stage 1 Mermaid approved
  • [ ] Stage 2 Flows approved
  • [ ] Stage 3 Logs + Deployment approved or skipped

Mermaid Diagram

graph TD
  IngestWindow["IngestWindowFunction\ningest_window.lambda_handler"]:::changed
  GpuClient["VLM2VecClient._ensure_running\n(per PR #396)"]:::unchanged
  IngestDLQ["IngestDLQ\n(SQS, existing)"]:::unchanged
  DLQConsumer["IngestDLQConsumer\ningest_window.sqs_handler"]:::changed
  DBSegment["WorldMMSegment\nprocessing_status"]:::unchanged
  DLQAlarm["aws_cloudwatch_metric_alarm\nencache-ingest-dlq-backlog"]:::added
  SNSCostAlerts["aws_sns_topic.cost_alerts\n(existing)"]:::unchanged
  DiscordAlerts["DiscordAlertsFunction\nhandler.py"]:::changed
  MuteSSM["/encache/alerts/dlq_muted_until\n(SSM)"]:::added
  DiscordInteraction["DiscordInteractionFunction"]:::changed
  Operator["Operator (Discord)"]:::external

  IngestWindow -->|"raises GpuUnavailableError\nor encoder exception"| IngestDLQ
  IngestWindow -->|"writes status=pending\nthen status=complete on success"| DBSegment
  IngestDLQ -->|"polled by"| DLQConsumer
  DLQConsumer -->|"checks GPU,\nextends visibility 15min if down,\nelse calls lambda_handler"| IngestWindow
  DLQConsumer -->|"on receiveCount > 5"| DBSegment
  GpuClient -.->|"called inside\nlambda_handler"| IngestWindow

  IngestDLQ -.->|"ApproximateNumberOfMessagesVisible"| DLQAlarm
  DLQAlarm -->|"breach > 100 for 10min"| SNSCostAlerts
  SNSCostAlerts --> DiscordAlerts
  DiscordAlerts -->|"reads before notify"| MuteSSM
  DiscordAlerts -->|"if not muted"| Operator

  Operator -->|"/mute-alerts <duration>"| DiscordInteraction
  DiscordInteraction -->|"writes timestamp"| MuteSSM

  classDef added fill:#c8e6c9,stroke:#2e7d32
  classDef changed fill:#fff9c4,stroke:#f57f17
  classDef unchanged fill:#eceff1,stroke:#546e7a
  classDef external fill:#bbdefb,stroke:#1565c0

Behavior Contract

IngestWindowFunction (ingest_window.lambda_handler)

Replaces silent-skip path at lines 218-235.

INPUT: SQS/EventBridge event with sessionId, userId, windowIndex, frameCount

1. Insert WorldMMSegment row with processing_status="pending" (unchanged from current line 201).
2. Resolve gpu_instance_id by READING SSM /encache/gpu/instance_id directly
   (NOT from os.environ). Reason: env vars resolve at deploy time via
   {{resolve:ssm}} and become stale across warm invocations. Fresh SSM read
   ensures DLQConsumer sees current GPU state on retry.
   - normalize_gpu_instance_id(ssm.get_parameter(...).Parameter.Value)
3. If gpu_instance_id is None or sentinel "none":
     raise GpuUnavailableError("SSM sentinel='none', no GPU provisioned").
4. Else: call VLM2VecClient(base_url=..., instance_id=gpu_instance_id).
   Bootstrap, caption, NER, triples (current behavior — encoder handles
   bootstrap per PR #396).
   - On success: update_segment(processing_status="complete", caption=...).
   - On any exception: re-raise (do NOT mark "failed" inline).
5. Lambda destination config (MaximumRetryAttempts: 0, OnFailure: IngestDLQ)
   routes the raised exception to IngestDLQ automatically.

INVARIANTS:
- A segment row is NEVER marked "complete" without a real caption.
- A segment row is NEVER marked "failed" by lambda_handler directly.
- Every segment row reaches "complete" or "failed" eventually (via DLQConsumer).
- gpu_instance_id is read fresh from SSM each invocation (not cached env).

Note on env var: GPU_INSTANCE_ID env (set via {{resolve:ssm:/encache/gpu/instance_id}}) becomes stale on warm Lambda invocations. PR #391 patched env in-process for the chat path. Here we instead drop env reliance entirely — SSM GetParameter adds ~50ms but eliminates the staleness class of bug. IAM DatabaseSsmPolicy already grants /encache/gpu/* read.

IngestDLQConsumer (ingest_window.sqs_handler)

Replaces current sqs_handler (lines 76-102).

INPUT: SQS event with batched DLQ messages

For each record:
  receive_count = int(record.attributes.ApproximateReceiveCount)
  payload = parse_envelope(record.body)
  segment_id = lookup_segment_id(payload)  # by sessionId + windowIndex

  IF receive_count > 5:
    update_segment(segment_id, processing_status="failed")
    log {"step": "dlq_exhausted", "segment_id": segment_id, "receive_count": receive_count}
    ACK message (do not return failure)
    continue

  IF NOT _gpu_available():
    sqs.change_message_visibility(record, VisibilityTimeout=900)  # 15 min
    return failure for this messageId  # keeps it on queue, hidden 15min
    continue

  TRY:
    lambda_handler(payload, context)  # success → segment marked complete inside
  EXCEPT Exception:
    return failure for this messageId  # back to queue, will retry

OUTPUT: {"batchItemFailures": [...]}

HELPERS:
  _gpu_available():
    instance_id = normalize_gpu_instance_id(ssm.get("/encache/gpu/instance_id"))
    if not instance_id: return False
    state = ec2.describe_instances([instance_id]).reservations[0].instances[0].state.name
    return state == "running"

INVARIANTS:
- A message that has been received >5 times always ends with segment="failed" and is ack'd.
- A message processed when GPU is unavailable always extends visibility before returning failure.
- DLQConsumer never enriches inline; it only delegates to lambda_handler.

CloudWatch Alarm

New resource in main/devops/main.tf.

resource "aws_cloudwatch_metric_alarm" "ingest_dlq_backlog" {
  alarm_name          = "encache-ingest-dlq-backlog"
  alarm_description   = "Ingest DLQ has >100 messages for 10min — likely GPU outage or systemic ingest failure."
  namespace           = "AWS/SQS"
  metric_name         = "ApproximateNumberOfMessagesVisible"
  dimensions          = { QueueName = "encache-ingest-dlq" }
  statistic           = "Maximum"
  period              = 300
  evaluation_periods  = 2
  threshold           = 100
  comparison_operator = "GreaterThanThreshold"
  treat_missing_data  = "notBreaching"
  alarm_actions       = [aws_sns_topic.cost_alerts.arn]
  ok_actions          = [aws_sns_topic.cost_alerts.arn]
}

Reuses existing aws_sns_topic.cost_alerts (main.tf:663) → discord_alerts Lambda.

DiscordAlertsFunction (mute-aware)

Modify main/devops/lambdas/discord_alerts/handler.py.

ON SNS event:
  muted_until = ssm.get_parameter("/encache/alerts/dlq_muted_until", default="0")
  IF int(muted_until) > now_unix():
    log {"step": "alert_suppressed", "muted_until": muted_until}
    return 200
  ELSE:
    POST to DISCORD_WEBHOOK_URL with formatted alarm message (existing behavior)

/encache/alerts/dlq_muted_until is a String SSM param holding a unix timestamp. Defaults to "0" (never muted) when absent.

DiscordInteractionFunction (new commands)

Modify main/devops/lambdas/discord_interaction/handler.py (or equivalent).

Add two slash commands:

/mute-alerts [duration]
  duration: optional string, default "1d"
  formats accepted: "30m", "4h", "1d", "2d", "12h"

  seconds = parse_duration(duration)
  until = now_unix() + seconds
  ssm.put_parameter(
    Name="/encache/alerts/dlq_muted_until",
    Value=str(until),
    Type="String",
    Overwrite=True
  )
  respond with: f"Alerts muted until <timestamp> ({duration})"

/unmute-alerts
  ssm.put_parameter(
    Name="/encache/alerts/dlq_muted_until",
    Value="0",
    Type="String",
    Overwrite=True
  )
  respond with: "Alerts active"

parse_duration accepts ^(\d+)(m|h|d)$. Reject any other format.

Data Model

No schema changes. Reuses existing processing_status column. Status transitions:

From To Trigger
(insert) pending lambda_handler line 201
pending complete Successful enrichment
pending failed DLQConsumer detects receive_count > 5

Note: pending rows that never reach complete or failed indicate either (a) DLQ retention expiry (14 days) — segment is effectively orphaned, or (b) drain in progress. A reaper cron may be added later if orphan rate is non-zero. Out of scope for this plan.

SSM Parameters

New: /encache/alerts/dlq_muted_until - Type: String (unix timestamp as string) - Default: not-present → treated as "0" - Written by: discord_interaction Lambda - Read by: discord_alerts Lambda

IAM additions: - discord_interaction role: add ssm:PutParameter on /encache/alerts/* - discord_alerts role: add ssm:GetParameter on /encache/alerts/*

Logs

All new log lines use create_logger({"flow": "ingest_window"}) (or "flow": "discord_alerts" etc.).

Step Where Fields
gpu_unavailable_raised lambda_handler before raise segment_id, gpu_sentinel
dlq_received sqs_handler per record message_id, receive_count, segment_id
dlq_visibility_extended sqs_handler GPU-down branch message_id, extended_seconds
dlq_exhausted sqs_handler receive_count > 5 segment_id, receive_count
alert_suppressed discord_alerts mute branch muted_until
alert_muted discord_interaction /mute-alerts duration, muted_until
alert_unmuted discord_interaction /unmute-alerts (none)

Deployment

Order matters. Deploy in this sequence to avoid silent-fail regression during rollout:

  1. Add CloudWatch alarm + IAM SSM parameter perms (terraform apply).
  2. Update discord_alerts Lambda to read mute SSM (deploy first; safe even if param absent).
  3. Add /mute-alerts and /unmute-alerts slash commands to discord_interaction Lambda.
  4. Update IngestDLQConsumer (sqs_handler) with bounded retries and visibility extension.
  5. Last: update IngestWindowFunction (lambda_handler) to raise instead of silent-skip.

Step 5 is last because before that point, raising in step 5 would route to the OLD sqs_handler, which calls lambda_handler immediately on consume — would loop fast against a downed GPU and burn invocations until visibility timeout. Step 4 must be live first so the consumer extends visibility properly.

Cost Implications

  • Drain rate: SQS visibility extension caps re-consume at ~4×/hour per message. With 2,880 stuck messages from a 24h outage, re-consumes ≈ 11,520/hr until GPU returns. Each consume invokes Lambda (~0.5s, 2GB) plus one ec2.describe_instances for the gating check. Estimated burn: ~$0.20/hr in Lambda compute + minimal SQS request charges. A full 24h outage = ~$5. Acceptable for the user-base size; revisit if scale grows.
  • Optimization (deferred): SQS event source mapping MaximumConcurrency and BatchSize > 1 could amortize the GPU-down check across messages. Out of scope.
  • After GPU returns: each message processed once (assuming first try succeeds). No extra cost vs. current architecture.
  • SQS message retention: 14 days (existing). After that, unprocessed messages drop. Segment row stays at pending. Operator must reap manually if it matters.

Test Contract

Unit tests (server-side)

  • test_ingest_window_gpu_unavailable_raises: Given GPU_INSTANCE_ID="", lambda_handler raises GpuUnavailableError AND segment row exists with processing_status="pending".
  • test_ingest_window_encoder_exception_propagates: When VLM2VecClient.caption raises (e.g., 500 from GPU), lambda_handler re-raises. Segment row stays at pending.
  • test_dlq_consumer_gpu_down_extends_visibility: Given GPU SSM returns "none", sqs_handler calls change_message_visibility with 900s and returns the message in batchItemFailures.
  • test_dlq_consumer_gpu_up_invokes_handler: Given GPU running, sqs_handler calls lambda_handler with the payload.
  • test_dlq_consumer_marks_failed_after_5_receives: With ApproximateReceiveCount=6, sqs_handler calls update_segment(status="failed") and ack's the message.

Unit tests (discord_alerts)

  • test_alert_suppressed_when_muted: SSM param holds future timestamp → handler returns 200 without HTTP POST.
  • test_alert_fires_when_unmuted: SSM param "0" → handler POSTs to webhook.
  • test_alert_fires_when_param_missing: SSM ParameterNotFound → handler POSTs (default-open).

Unit tests (discord_interaction)

  • test_mute_alerts_default_duration: /mute-alerts with no arg writes now + 86400 to SSM.
  • test_mute_alerts_custom_duration: /mute-alerts 4h writes now + 14400.
  • test_mute_alerts_invalid_duration: /mute-alerts banana returns error response, does not write SSM.
  • test_unmute_alerts_writes_zero: /unmute-alerts writes "0" to SSM.

Integration / smoke

  • Manual: trigger ingest with GPU_INSTANCE_ID SSM forced to "none". Verify: segment row at pending, message in DLQ, no Discord alert until 100+ messages, alert fires once threshold breached.
  • Manual: /mute-alerts 1h → trigger alarm condition → no Discord ping. Wait 1h → alarm re-fires.

Out of Scope (deferred)

  • Reaper cron for orphaned pending rows (post-14d DLQ expiry).
  • Per-segment retry telemetry (counts, duration histograms).
  • Alarming on GPU CloudTrail event drought (the original reviewer suggestion).
  • Migrating cost_alerts SNS topic to a dedicated system_alerts topic for clarity.

Open Questions

None at design time. Re-open if: orphan rate post-deploy is non-zero, or alert false-positive rate is high enough to need precision (would push toward pending_gpu DB column design instead).