GPU Caption Queue + DLQ-Backed Alerting
Plan Metadata
- Plan type:
plan - Parent plan: N/A
- Depends on: PR #396 (auto-bootstrap GPU when SSM holds 'none' sentinel) — merged
- Status:
draft
Status semantics: - draft: Plan is being created or updated and is not final. - approved: Plan is approved but not yet applied in code. - documentation: Code currently exists and matches the plan contract.
Update rule: - When an existing plan is edited, set status to draft until re-approved.
System Intent
- What is being built: Replace the silent "mark complete on no GPU" path in the visual-enrichment pipeline with a queue-and-retry mechanism backed by the existing
IngestDLQ. Add a single CloudWatch alarm on DLQ depth that pages Discord when the system is seriously broken. Provide a Discord slash command to mute alerts during known-degraded windows. - Primary consumer(s):
IngestWindowFunction(visual enrichment Lambda);IngestDLQConsumer(re-invokes ingest);DiscordAlertsFunction(notification sink);DiscordInteractionFunction(slash-command handler). - Boundary (black-box scope only):
VLM2VecClient._ensure_running()is the GPU-bootstrap signal — internals out of scope (see PR #396).aws_sns_topic.cost_alertsand thediscord_alertsLambda are the existing alarm fan-out — internals out of scope.
Motivation
The existing pipeline silently degraded for 90+ days because ingest_window.py:223 marked segments processing_status="complete" whenever GPU_INSTANCE_ID was empty. CloudTrail showed zero GPU lifecycle events the entire window. PR #396 addressed bootstrap; this plan addresses the observability and recoverability gaps that allowed the regression to hide.
Goals:
- No more silent skip — segments awaiting enrichment must be visible somewhere queryable.
- Captions recover automatically once GPU returns. Not lost.
- Operator gets paged on Discord when the system is seriously broken (precision over count not required).
- Operator can mute alerts for a bounded window (default 1 day, override allowed).
Non-goals:
- Per-segment retry-count tracking in DB.
- Alarm precision beyond "something is wrong" (we accept conflation of GPU-down with other transient bugs).
- New SQS queue or new drain Lambda — reuse existing
IngestDLQ+IngestDLQConsumer.
Stage Gate Tracker
- [ ] Stage 1 Mermaid approved
- [ ] Stage 2 Flows approved
- [ ] Stage 3 Logs + Deployment approved or skipped
Mermaid Diagram
graph TD
IngestWindow["IngestWindowFunction\ningest_window.lambda_handler"]:::changed
GpuClient["VLM2VecClient._ensure_running\n(per PR #396)"]:::unchanged
IngestDLQ["IngestDLQ\n(SQS, existing)"]:::unchanged
DLQConsumer["IngestDLQConsumer\ningest_window.sqs_handler"]:::changed
DBSegment["WorldMMSegment\nprocessing_status"]:::unchanged
DLQAlarm["aws_cloudwatch_metric_alarm\nencache-ingest-dlq-backlog"]:::added
SNSCostAlerts["aws_sns_topic.cost_alerts\n(existing)"]:::unchanged
DiscordAlerts["DiscordAlertsFunction\nhandler.py"]:::changed
MuteSSM["/encache/alerts/dlq_muted_until\n(SSM)"]:::added
DiscordInteraction["DiscordInteractionFunction"]:::changed
Operator["Operator (Discord)"]:::external
IngestWindow -->|"raises GpuUnavailableError\nor encoder exception"| IngestDLQ
IngestWindow -->|"writes status=pending\nthen status=complete on success"| DBSegment
IngestDLQ -->|"polled by"| DLQConsumer
DLQConsumer -->|"checks GPU,\nextends visibility 15min if down,\nelse calls lambda_handler"| IngestWindow
DLQConsumer -->|"on receiveCount > 5"| DBSegment
GpuClient -.->|"called inside\nlambda_handler"| IngestWindow
IngestDLQ -.->|"ApproximateNumberOfMessagesVisible"| DLQAlarm
DLQAlarm -->|"breach > 100 for 10min"| SNSCostAlerts
SNSCostAlerts --> DiscordAlerts
DiscordAlerts -->|"reads before notify"| MuteSSM
DiscordAlerts -->|"if not muted"| Operator
Operator -->|"/mute-alerts <duration>"| DiscordInteraction
DiscordInteraction -->|"writes timestamp"| MuteSSM
classDef added fill:#c8e6c9,stroke:#2e7d32
classDef changed fill:#fff9c4,stroke:#f57f17
classDef unchanged fill:#eceff1,stroke:#546e7a
classDef external fill:#bbdefb,stroke:#1565c0 Behavior Contract
IngestWindowFunction (ingest_window.lambda_handler)
Replaces silent-skip path at lines 218-235.
INPUT: SQS/EventBridge event with sessionId, userId, windowIndex, frameCount
1. Insert WorldMMSegment row with processing_status="pending" (unchanged from current line 201).
2. Resolve gpu_instance_id by READING SSM /encache/gpu/instance_id directly
(NOT from os.environ). Reason: env vars resolve at deploy time via
{{resolve:ssm}} and become stale across warm invocations. Fresh SSM read
ensures DLQConsumer sees current GPU state on retry.
- normalize_gpu_instance_id(ssm.get_parameter(...).Parameter.Value)
3. If gpu_instance_id is None or sentinel "none":
raise GpuUnavailableError("SSM sentinel='none', no GPU provisioned").
4. Else: call VLM2VecClient(base_url=..., instance_id=gpu_instance_id).
Bootstrap, caption, NER, triples (current behavior — encoder handles
bootstrap per PR #396).
- On success: update_segment(processing_status="complete", caption=...).
- On any exception: re-raise (do NOT mark "failed" inline).
5. Lambda destination config (MaximumRetryAttempts: 0, OnFailure: IngestDLQ)
routes the raised exception to IngestDLQ automatically.
INVARIANTS:
- A segment row is NEVER marked "complete" without a real caption.
- A segment row is NEVER marked "failed" by lambda_handler directly.
- Every segment row reaches "complete" or "failed" eventually (via DLQConsumer).
- gpu_instance_id is read fresh from SSM each invocation (not cached env).
Note on env var: GPU_INSTANCE_ID env (set via {{resolve:ssm:/encache/gpu/instance_id}}) becomes stale on warm Lambda invocations. PR #391 patched env in-process for the chat path. Here we instead drop env reliance entirely — SSM GetParameter adds ~50ms but eliminates the staleness class of bug. IAM DatabaseSsmPolicy already grants /encache/gpu/* read.
IngestDLQConsumer (ingest_window.sqs_handler)
Replaces current sqs_handler (lines 76-102).
INPUT: SQS event with batched DLQ messages
For each record:
receive_count = int(record.attributes.ApproximateReceiveCount)
payload = parse_envelope(record.body)
segment_id = lookup_segment_id(payload) # by sessionId + windowIndex
IF receive_count > 5:
update_segment(segment_id, processing_status="failed")
log {"step": "dlq_exhausted", "segment_id": segment_id, "receive_count": receive_count}
ACK message (do not return failure)
continue
IF NOT _gpu_available():
sqs.change_message_visibility(record, VisibilityTimeout=900) # 15 min
return failure for this messageId # keeps it on queue, hidden 15min
continue
TRY:
lambda_handler(payload, context) # success → segment marked complete inside
EXCEPT Exception:
return failure for this messageId # back to queue, will retry
OUTPUT: {"batchItemFailures": [...]}
HELPERS:
_gpu_available():
instance_id = normalize_gpu_instance_id(ssm.get("/encache/gpu/instance_id"))
if not instance_id: return False
state = ec2.describe_instances([instance_id]).reservations[0].instances[0].state.name
return state == "running"
INVARIANTS:
- A message that has been received >5 times always ends with segment="failed" and is ack'd.
- A message processed when GPU is unavailable always extends visibility before returning failure.
- DLQConsumer never enriches inline; it only delegates to lambda_handler.
CloudWatch Alarm
New resource in main/devops/main.tf.
resource "aws_cloudwatch_metric_alarm" "ingest_dlq_backlog" {
alarm_name = "encache-ingest-dlq-backlog"
alarm_description = "Ingest DLQ has >100 messages for 10min — likely GPU outage or systemic ingest failure."
namespace = "AWS/SQS"
metric_name = "ApproximateNumberOfMessagesVisible"
dimensions = { QueueName = "encache-ingest-dlq" }
statistic = "Maximum"
period = 300
evaluation_periods = 2
threshold = 100
comparison_operator = "GreaterThanThreshold"
treat_missing_data = "notBreaching"
alarm_actions = [aws_sns_topic.cost_alerts.arn]
ok_actions = [aws_sns_topic.cost_alerts.arn]
}
Reuses existing aws_sns_topic.cost_alerts (main.tf:663) → discord_alerts Lambda.
DiscordAlertsFunction (mute-aware)
Modify main/devops/lambdas/discord_alerts/handler.py.
ON SNS event:
muted_until = ssm.get_parameter("/encache/alerts/dlq_muted_until", default="0")
IF int(muted_until) > now_unix():
log {"step": "alert_suppressed", "muted_until": muted_until}
return 200
ELSE:
POST to DISCORD_WEBHOOK_URL with formatted alarm message (existing behavior)
/encache/alerts/dlq_muted_until is a String SSM param holding a unix timestamp. Defaults to "0" (never muted) when absent.
DiscordInteractionFunction (new commands)
Modify main/devops/lambdas/discord_interaction/handler.py (or equivalent).
Add two slash commands:
/mute-alerts [duration]
duration: optional string, default "1d"
formats accepted: "30m", "4h", "1d", "2d", "12h"
seconds = parse_duration(duration)
until = now_unix() + seconds
ssm.put_parameter(
Name="/encache/alerts/dlq_muted_until",
Value=str(until),
Type="String",
Overwrite=True
)
respond with: f"Alerts muted until <timestamp> ({duration})"
/unmute-alerts
ssm.put_parameter(
Name="/encache/alerts/dlq_muted_until",
Value="0",
Type="String",
Overwrite=True
)
respond with: "Alerts active"
parse_duration accepts ^(\d+)(m|h|d)$. Reject any other format.
Data Model
No schema changes. Reuses existing processing_status column. Status transitions:
| From | To | Trigger |
|---|---|---|
| (insert) | pending | lambda_handler line 201 |
pending | complete | Successful enrichment |
pending | failed | DLQConsumer detects receive_count > 5 |
Note: pending rows that never reach complete or failed indicate either (a) DLQ retention expiry (14 days) — segment is effectively orphaned, or (b) drain in progress. A reaper cron may be added later if orphan rate is non-zero. Out of scope for this plan.
SSM Parameters
New: /encache/alerts/dlq_muted_until - Type: String (unix timestamp as string) - Default: not-present → treated as "0" - Written by: discord_interaction Lambda - Read by: discord_alerts Lambda
IAM additions: - discord_interaction role: add ssm:PutParameter on /encache/alerts/* - discord_alerts role: add ssm:GetParameter on /encache/alerts/*
Logs
All new log lines use create_logger({"flow": "ingest_window"}) (or "flow": "discord_alerts" etc.).
| Step | Where | Fields |
|---|---|---|
gpu_unavailable_raised | lambda_handler before raise | segment_id, gpu_sentinel |
dlq_received | sqs_handler per record | message_id, receive_count, segment_id |
dlq_visibility_extended | sqs_handler GPU-down branch | message_id, extended_seconds |
dlq_exhausted | sqs_handler receive_count > 5 | segment_id, receive_count |
alert_suppressed | discord_alerts mute branch | muted_until |
alert_muted | discord_interaction /mute-alerts | duration, muted_until |
alert_unmuted | discord_interaction /unmute-alerts | (none) |
Deployment
Order matters. Deploy in this sequence to avoid silent-fail regression during rollout:
- Add CloudWatch alarm + IAM SSM parameter perms (terraform apply).
- Update
discord_alertsLambda to read mute SSM (deploy first; safe even if param absent). - Add
/mute-alertsand/unmute-alertsslash commands todiscord_interactionLambda. - Update
IngestDLQConsumer(sqs_handler) with bounded retries and visibility extension. - Last: update
IngestWindowFunction(lambda_handler) to raise instead of silent-skip.
Step 5 is last because before that point, raising in step 5 would route to the OLD sqs_handler, which calls lambda_handler immediately on consume — would loop fast against a downed GPU and burn invocations until visibility timeout. Step 4 must be live first so the consumer extends visibility properly.
Cost Implications
- Drain rate: SQS visibility extension caps re-consume at ~4×/hour per message. With 2,880 stuck messages from a 24h outage, re-consumes ≈ 11,520/hr until GPU returns. Each consume invokes Lambda (~0.5s, 2GB) plus one
ec2.describe_instancesfor the gating check. Estimated burn: ~$0.20/hr in Lambda compute + minimal SQS request charges. A full 24h outage = ~$5. Acceptable for the user-base size; revisit if scale grows. - Optimization (deferred): SQS event source mapping
MaximumConcurrencyandBatchSize > 1could amortize the GPU-down check across messages. Out of scope. - After GPU returns: each message processed once (assuming first try succeeds). No extra cost vs. current architecture.
- SQS message retention: 14 days (existing). After that, unprocessed messages drop. Segment row stays at
pending. Operator must reap manually if it matters.
Test Contract
Unit tests (server-side)
test_ingest_window_gpu_unavailable_raises: GivenGPU_INSTANCE_ID="",lambda_handlerraisesGpuUnavailableErrorAND segment row exists withprocessing_status="pending".test_ingest_window_encoder_exception_propagates: WhenVLM2VecClient.captionraises (e.g., 500 from GPU),lambda_handlerre-raises. Segment row stays atpending.test_dlq_consumer_gpu_down_extends_visibility: Given GPU SSM returns "none",sqs_handlercallschange_message_visibilitywith 900s and returns the message inbatchItemFailures.test_dlq_consumer_gpu_up_invokes_handler: Given GPU running,sqs_handlercallslambda_handlerwith the payload.test_dlq_consumer_marks_failed_after_5_receives: WithApproximateReceiveCount=6,sqs_handlercallsupdate_segment(status="failed")and ack's the message.
Unit tests (discord_alerts)
test_alert_suppressed_when_muted: SSM param holds future timestamp → handler returns 200 without HTTP POST.test_alert_fires_when_unmuted: SSM param "0" → handler POSTs to webhook.test_alert_fires_when_param_missing: SSM ParameterNotFound → handler POSTs (default-open).
Unit tests (discord_interaction)
test_mute_alerts_default_duration:/mute-alertswith no arg writesnow + 86400to SSM.test_mute_alerts_custom_duration:/mute-alerts 4hwritesnow + 14400.test_mute_alerts_invalid_duration:/mute-alerts bananareturns error response, does not write SSM.test_unmute_alerts_writes_zero:/unmute-alertswrites "0" to SSM.
Integration / smoke
- Manual: trigger ingest with
GPU_INSTANCE_IDSSM forced to "none". Verify: segment row atpending, message in DLQ, no Discord alert until 100+ messages, alert fires once threshold breached. - Manual:
/mute-alerts 1h→ trigger alarm condition → no Discord ping. Wait 1h → alarm re-fires.
Out of Scope (deferred)
- Reaper cron for orphaned
pendingrows (post-14d DLQ expiry). - Per-segment retry telemetry (counts, duration histograms).
- Alarming on GPU CloudTrail event drought (the original reviewer suggestion).
- Migrating
cost_alertsSNS topic to a dedicatedsystem_alertstopic for clarity.
Open Questions
None at design time. Re-open if: orphan rate post-deploy is non-zero, or alert false-positive rate is high enough to need precision (would push toward pending_gpu DB column design instead).