Plan: DLQ GPU Auto-Start
Date: 2026-05-11 Status: Draft Feature: When a segment lands in the IngestDLQ and the GPU EC2 instance is not running, automatically attempt to start it.
System Intent
Problem
When the GPU EC2 instance is stopped and a segment fails ingest, the message lands in the IngestDLQ. IngestDLQConsumer (sqs_handler) detects _gpu_available() == False, extends message visibility by 900 s, and parks the message. The GPU is never started — it must be started manually before the parked messages drain.
This creates unbounded queue backlog during GPU outages and requires operator intervention for what is otherwise a recoverable state.
Goal
When sqs_handler determines the GPU is unavailable, attempt to start it before extending visibility. The start attempt is fire-and-forget: it must not block record processing, must not raise on any error, and must log every action taken.
Systems Involved
| System | Role |
|---|---|
IngestDLQConsumer Lambda | Entry point; sqs_handler in ingest_window.py |
AWS SSM /encache/gpu/instance_id | Stores instance ID or sentinel "none" |
AWS EC2 describe_instances | Used to check instance state |
AWS EC2 start_instances | Used to start a stopped instance |
EC2 tag Name=encache-gpu-worker | Fallback scan when SSM holds "none" or stale value |
EC2Policy IAM Managed Policy | Already attached to IngestWindowFunction; must also be attached to IngestDLQConsumer |
test_ingest_dlq.py | Existing unit tests; new tests are added here |
Constraints
- The auto-start call must be wrapped in
try/except Exceptionand must never propagate an exception tosqs_handler. - It is fire-and-forget: we do not wait for the instance to reach
runningstate. - After the start attempt, the existing visibility-extend +
batchItemFailurespath continues unchanged. - The function must be added as a module-level helper so it can be patched in tests without mocking
boto3directly.
Stage Gate Tracker
| Gate | Status |
|---|---|
| System intent approved | Pending |
| Mermaid diagram approved | Pending |
| Flow: GPU auto-start on DLQ approved | Pending |
| Implementation complete | Pending |
| Tests passing | Pending |
Mermaid Diagram
sequenceDiagram
participant SQS as IngestDLQ (SQS)
participant Consumer as IngestDLQConsumer<br/>(sqs_handler)
participant GPUCheck as _gpu_available()
participant SSM as SSM<br/>/encache/gpu/instance_id
participant EC2 as EC2 API
participant Handler as lambda_handler
SQS->>Consumer: SQS record (segment payload)
Consumer->>GPUCheck: _gpu_available()?
GPUCheck->>SSM: get_parameter(instance_id)
SSM-->>GPUCheck: instance_id or "none"
GPUCheck->>EC2: describe_instances
EC2-->>GPUCheck: state
alt GPU running
GPUCheck-->>Consumer: True
Consumer->>Handler: lambda_handler(payload)
Handler-->>Consumer: ok / raises
else GPU unavailable
GPUCheck-->>Consumer: False
Consumer->>Consumer: _try_start_gpu() [fire-and-forget]
note over Consumer,EC2: Wrapped in try/except — never raises
Consumer->>SSM: get_parameter(instance_id)
SSM-->>Consumer: instance_id or "none"
alt instance_id is "none"
Consumer->>EC2: describe_instances(filter: tag Name=encache-gpu-worker, state=stopped)
EC2-->>Consumer: instance list
end
Consumer->>EC2: start_instances(instance_id)
EC2-->>Consumer: StartingInstances response
Consumer->>Consumer: log: gpu_autostart_attempted
Consumer->>SQS: change_message_visibility(900s)
Consumer-->>SQS: batchItemFailures=[messageId]
end Flows
Flow: GPU auto-start on DLQ
Trigger: sqs_handler processes a DLQ record and _gpu_available() returns False.
Preconditions: - INGEST_DLQ_URL env var is set (validated at function startup). - IngestDLQConsumer has EC2Policy attached (see Files to Change).
Input contract
| Parameter | Source | Type | Notes |
|---|---|---|---|
| SQS record | Lambda event Records[n] | dict | Body is JSON-encoded ingest payload |
receiptHandle | record["receiptHandle"] | str | Required for change_message_visibility |
SSM /encache/gpu/instance_id | boto3 SSM read | str | May be "none" sentinel |
Output contract
| Outcome | Behaviour |
|---|---|
| Start attempted, instance found via SSM | Logs gpu_autostart_attempted with instance_id, source="ssm" |
| Start attempted, instance found via EC2 tag scan | Logs gpu_autostart_attempted with instance_id, source="tag_scan" |
| No instance found (SSM=none, no stopped instance by tag) | Logs gpu_autostart_no_instance_found |
| Any exception during start attempt | Caught; logs gpu_autostart_failed with error; execution continues |
| After all branches | change_message_visibility(900s), message added to batchItemFailures |
Pseudocode
def _try_start_gpu() -> None:
"""Attempt to start the GPU EC2 instance. Fire-and-forget — never raises."""
region = os.environ.get("AWS_REGION", "us-east-1")
try:
# Step 1: resolve instance ID
instance_id = _read_gpu_instance_id() # returns None when SSM holds "none"
source = "ssm"
if not instance_id:
# SSM holds sentinel "none" — scan EC2 for a stopped instance by tag
ec2_scan = boto3.client("ec2", region_name=region)
desc = ec2_scan.describe_instances(
Filters=[
{"Name": "tag:Name", "Values": ["encache-gpu-worker"]},
{"Name": "instance-state-name", "Values": ["stopped"]},
]
)
for reservation in desc.get("Reservations", []):
for inst in reservation.get("Instances", []):
instance_id = inst["InstanceId"]
source = "tag_scan"
break
if instance_id:
break
if not instance_id:
logger({
"step": "gpu_autostart_no_instance_found",
"additional": {},
})
return
# Step 2: start the instance
ec2 = boto3.client("ec2", region_name=region)
ec2.start_instances(InstanceIds=[instance_id])
logger({
"step": "gpu_autostart_attempted",
"additional": {"instance_id": instance_id, "source": source},
})
except Exception as exc:
logger({
"step": "gpu_autostart_failed",
"additional": {"error": str(exc)},
})
# --- Inside sqs_handler, in the gpu_unavailable branch ---
if not _gpu_available():
_try_start_gpu() # fire-and-forget, wrapped in try/except inside
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=record.get("receiptHandle"),
VisibilityTimeout=900,
)
logger({
"step": "dlq_visibility_extended",
"additional": {
"message_id": message_id,
"extended_seconds": 900,
"receive_count": receive_count,
"reason": "gpu_unavailable",
},
})
failures.append({"itemIdentifier": message_id})
continue
Acceptance Criteria and Test Cases
All tests live in main/server/worldmm/tests/test_ingest_dlq.py.
AC-1: Auto-start called when GPU unavailable and SSM holds a real instance ID
Given _gpu_available() returns False, SSM returns i-abc123 (stopped), ec2.start_instances succeeds. Then _try_start_gpu calls ec2.start_instances(InstanceIds=["i-abc123"]) exactly once, logs gpu_autostart_attempted with source="ssm".
def test_try_start_gpu_uses_ssm_instance_id():
from worldmm.pipeline.ingest_window import _try_start_gpu
mock_ec2 = MagicMock()
with (
patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value="i-abc123"),
patch("boto3.client", return_value=mock_ec2),
):
_try_start_gpu()
mock_ec2.start_instances.assert_called_once_with(InstanceIds=["i-abc123"])
AC-2: Auto-start falls back to tag scan when SSM holds sentinel "none"
Given _read_gpu_instance_id() returns None, EC2 tag scan finds one stopped instance i-tagfound. Then start_instances is called with i-tagfound, logs gpu_autostart_attempted with source="tag_scan".
def test_try_start_gpu_tag_scan_fallback():
from worldmm.pipeline.ingest_window import _try_start_gpu
mock_ec2 = MagicMock()
mock_ec2.describe_instances.return_value = {
"Reservations": [{"Instances": [{"InstanceId": "i-tagfound"}]}]
}
with (
patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value=None),
patch("boto3.client", return_value=mock_ec2),
):
_try_start_gpu()
mock_ec2.start_instances.assert_called_once_with(InstanceIds=["i-tagfound"])
AC-3: No start attempt when SSM is "none" and no stopped instance found by tag
Given _read_gpu_instance_id() returns None, EC2 tag scan returns empty. Then start_instances is never called.
def test_try_start_gpu_no_instance_found():
from worldmm.pipeline.ingest_window import _try_start_gpu
mock_ec2 = MagicMock()
mock_ec2.describe_instances.return_value = {"Reservations": []}
with (
patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value=None),
patch("boto3.client", return_value=mock_ec2),
):
_try_start_gpu()
mock_ec2.start_instances.assert_not_called()
AC-4: Exception during start_instances does not propagate to sqs_handler
Given ec2.start_instances raises ClientError. Then _try_start_gpu returns normally; sqs_handler still extends visibility and returns batchItemFailures.
def test_try_start_gpu_exception_does_not_propagate():
from worldmm.pipeline.ingest_window import _try_start_gpu
mock_ec2 = MagicMock()
mock_ec2.start_instances.side_effect = Exception("access denied")
with (
patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value="i-abc"),
patch("boto3.client", return_value=mock_ec2),
):
_try_start_gpu() # must not raise
AC-5: sqs_handler calls _try_start_gpu when GPU is down, then extends visibility
Given _gpu_available() returns False. Then sqs_handler calls _try_start_gpu() once before calling change_message_visibility(900).
def test_sqs_handler_calls_try_start_gpu_when_gpu_down():
from worldmm.pipeline.ingest_window import sqs_handler
payload = {"sessionId": "s1", "userId": "u1", "windowIndex": 0, "frameCount": 30}
sqs_event = {"Records": [{"messageId": "m1", "receiptHandle": "r1",
"attributes": {"ApproximateReceiveCount": "1"},
"body": json.dumps(payload)}]}
mock_sqs = MagicMock()
with (
patch("worldmm.pipeline.ingest_window._gpu_available", return_value=False),
patch("worldmm.pipeline.ingest_window._try_start_gpu") as mock_start,
patch("boto3.client", return_value=mock_sqs),
patch.dict(os.environ, {"INGEST_DLQ_URL": "https://sqs.../queue"}),
):
result = sqs_handler(sqs_event, None)
mock_start.assert_called_once()
mock_sqs.change_message_visibility.assert_called_once_with(
QueueUrl="https://sqs.../queue", ReceiptHandle="r1", VisibilityTimeout=900
)
assert result["batchItemFailures"][0]["itemIdentifier"] == "m1"
AC-6: sqs_handler does NOT call _try_start_gpu when GPU is running
Given _gpu_available() returns True. Then _try_start_gpu is not called.
def test_sqs_handler_does_not_call_try_start_gpu_when_gpu_up():
from worldmm.pipeline.ingest_window import sqs_handler
payload = {"sessionId": "s1", "userId": "u1", "windowIndex": 0, "frameCount": 0}
sqs_event = {"Records": [{"messageId": "m1",
"attributes": {"ApproximateReceiveCount": "1"},
"body": json.dumps(payload)}]}
with (
patch("worldmm.pipeline.ingest_window._gpu_available", return_value=True),
patch("worldmm.pipeline.ingest_window.lambda_handler", return_value={"status": "ok"}),
patch("worldmm.pipeline.ingest_window._try_start_gpu") as mock_start,
):
sqs_handler(sqs_event, None)
mock_start.assert_not_called()
Files to Change
| File | Change |
|---|---|
main/server/worldmm/pipeline/ingest_window.py | Add _try_start_gpu() helper; call it in sqs_handler in the not _gpu_available() branch before change_message_visibility. |
main/server/template.yaml | Add !Ref EC2Policy to the Policies list of IngestDLQConsumer. (EC2Policy already covers ec2:StartInstances, ec2:DescribeInstances; it is currently attached only to IngestWindowFunction.) |
main/server/worldmm/tests/test_ingest_dlq.py | Add test cases AC-1 through AC-6 listed above. |
template.yaml diff (key section)
IngestDLQConsumer:
Type: AWS::Serverless::Function
Properties:
...
Policies:
- !Ref S3AccessPolicy
- !Ref DatabaseSsmPolicy
- !Ref EC2Policy # <-- ADD THIS LINE
- Statement:
- Effect: Allow
Action:
- sqs:ChangeMessageVisibility
Resource:
- !GetAtt IngestDLQ.Arn
Notes and Edge Cases
- Already starting/pending state: If the instance is already in
pendingstate,start_instanceswill return aInvalidInstanceID.NotFoundor no-op without error in most cases. The fire-and-forget wrapper absorbs this. - IAM:
EC2Policyalready containsec2:StartInstancesandec2:DescribeInstanceswithResource: "*". Attaching it toIngestDLQConsumeris sufficient; no new IAM policy is needed. - No SSM write:
_try_start_gpudoes not call_update_ssm_gpu_instance_id. The instance ID found by tag scan is used only forstart_instances; SSM is updated by the existing_resolve_gpu_url/_find_running_gpu_instance_by_taglogic on the next successful ingest invocation. - Message cadence: The 900s visibility timeout means the DLQ consumer will re-check GPU availability every 15 minutes. If the instance takes longer than 15 min to boot (typical GPU cold-start is ~3–5 min), the message will be re-delivered and the already-running instance will be processed normally on the next cycle.
- No duplicate starts: Multiple queued messages may each trigger
_try_start_gpuin the same Lambda batch. BecauseBatchSize: 1is set on the DLQ trigger, each record is a separate invocation, so only one start attempt fires per Lambda execution. Even if multiple invocations fire concurrently,start_instanceson an already-starting instance is idempotent (EC2 ignores the request).