Skip to content

Plan: DLQ GPU Auto-Start

Date: 2026-05-11 Status: Draft Feature: When a segment lands in the IngestDLQ and the GPU EC2 instance is not running, automatically attempt to start it.


System Intent

Problem

When the GPU EC2 instance is stopped and a segment fails ingest, the message lands in the IngestDLQ. IngestDLQConsumer (sqs_handler) detects _gpu_available() == False, extends message visibility by 900 s, and parks the message. The GPU is never started — it must be started manually before the parked messages drain.

This creates unbounded queue backlog during GPU outages and requires operator intervention for what is otherwise a recoverable state.

Goal

When sqs_handler determines the GPU is unavailable, attempt to start it before extending visibility. The start attempt is fire-and-forget: it must not block record processing, must not raise on any error, and must log every action taken.

Systems Involved

System Role
IngestDLQConsumer Lambda Entry point; sqs_handler in ingest_window.py
AWS SSM /encache/gpu/instance_id Stores instance ID or sentinel "none"
AWS EC2 describe_instances Used to check instance state
AWS EC2 start_instances Used to start a stopped instance
EC2 tag Name=encache-gpu-worker Fallback scan when SSM holds "none" or stale value
EC2Policy IAM Managed Policy Already attached to IngestWindowFunction; must also be attached to IngestDLQConsumer
test_ingest_dlq.py Existing unit tests; new tests are added here

Constraints

  • The auto-start call must be wrapped in try/except Exception and must never propagate an exception to sqs_handler.
  • It is fire-and-forget: we do not wait for the instance to reach running state.
  • After the start attempt, the existing visibility-extend + batchItemFailures path continues unchanged.
  • The function must be added as a module-level helper so it can be patched in tests without mocking boto3 directly.

Stage Gate Tracker

Gate Status
System intent approved Pending
Mermaid diagram approved Pending
Flow: GPU auto-start on DLQ approved Pending
Implementation complete Pending
Tests passing Pending

Mermaid Diagram

sequenceDiagram
    participant SQS as IngestDLQ (SQS)
    participant Consumer as IngestDLQConsumer<br/>(sqs_handler)
    participant GPUCheck as _gpu_available()
    participant SSM as SSM<br/>/encache/gpu/instance_id
    participant EC2 as EC2 API
    participant Handler as lambda_handler

    SQS->>Consumer: SQS record (segment payload)
    Consumer->>GPUCheck: _gpu_available()?
    GPUCheck->>SSM: get_parameter(instance_id)
    SSM-->>GPUCheck: instance_id or "none"
    GPUCheck->>EC2: describe_instances
    EC2-->>GPUCheck: state

    alt GPU running
        GPUCheck-->>Consumer: True
        Consumer->>Handler: lambda_handler(payload)
        Handler-->>Consumer: ok / raises
    else GPU unavailable
        GPUCheck-->>Consumer: False
        Consumer->>Consumer: _try_start_gpu() [fire-and-forget]
        note over Consumer,EC2: Wrapped in try/except — never raises
        Consumer->>SSM: get_parameter(instance_id)
        SSM-->>Consumer: instance_id or "none"
        alt instance_id is "none"
            Consumer->>EC2: describe_instances(filter: tag Name=encache-gpu-worker, state=stopped)
            EC2-->>Consumer: instance list
        end
        Consumer->>EC2: start_instances(instance_id)
        EC2-->>Consumer: StartingInstances response
        Consumer->>Consumer: log: gpu_autostart_attempted
        Consumer->>SQS: change_message_visibility(900s)
        Consumer-->>SQS: batchItemFailures=[messageId]
    end

Flows

Flow: GPU auto-start on DLQ

Trigger: sqs_handler processes a DLQ record and _gpu_available() returns False.

Preconditions: - INGEST_DLQ_URL env var is set (validated at function startup). - IngestDLQConsumer has EC2Policy attached (see Files to Change).

Input contract

Parameter Source Type Notes
SQS record Lambda event Records[n] dict Body is JSON-encoded ingest payload
receiptHandle record["receiptHandle"] str Required for change_message_visibility
SSM /encache/gpu/instance_id boto3 SSM read str May be "none" sentinel

Output contract

Outcome Behaviour
Start attempted, instance found via SSM Logs gpu_autostart_attempted with instance_id, source="ssm"
Start attempted, instance found via EC2 tag scan Logs gpu_autostart_attempted with instance_id, source="tag_scan"
No instance found (SSM=none, no stopped instance by tag) Logs gpu_autostart_no_instance_found
Any exception during start attempt Caught; logs gpu_autostart_failed with error; execution continues
After all branches change_message_visibility(900s), message added to batchItemFailures

Pseudocode

def _try_start_gpu() -> None:
    """Attempt to start the GPU EC2 instance. Fire-and-forget — never raises."""
    region = os.environ.get("AWS_REGION", "us-east-1")
    try:
        # Step 1: resolve instance ID
        instance_id = _read_gpu_instance_id()  # returns None when SSM holds "none"
        source = "ssm"

        if not instance_id:
            # SSM holds sentinel "none" — scan EC2 for a stopped instance by tag
            ec2_scan = boto3.client("ec2", region_name=region)
            desc = ec2_scan.describe_instances(
                Filters=[
                    {"Name": "tag:Name", "Values": ["encache-gpu-worker"]},
                    {"Name": "instance-state-name", "Values": ["stopped"]},
                ]
            )
            for reservation in desc.get("Reservations", []):
                for inst in reservation.get("Instances", []):
                    instance_id = inst["InstanceId"]
                    source = "tag_scan"
                    break
                if instance_id:
                    break

        if not instance_id:
            logger({
                "step": "gpu_autostart_no_instance_found",
                "additional": {},
            })
            return

        # Step 2: start the instance
        ec2 = boto3.client("ec2", region_name=region)
        ec2.start_instances(InstanceIds=[instance_id])
        logger({
            "step": "gpu_autostart_attempted",
            "additional": {"instance_id": instance_id, "source": source},
        })

    except Exception as exc:
        logger({
            "step": "gpu_autostart_failed",
            "additional": {"error": str(exc)},
        })


# --- Inside sqs_handler, in the gpu_unavailable branch ---
if not _gpu_available():
    _try_start_gpu()          # fire-and-forget, wrapped in try/except inside
    sqs.change_message_visibility(
        QueueUrl=queue_url,
        ReceiptHandle=record.get("receiptHandle"),
        VisibilityTimeout=900,
    )
    logger({
        "step": "dlq_visibility_extended",
        "additional": {
            "message_id": message_id,
            "extended_seconds": 900,
            "receive_count": receive_count,
            "reason": "gpu_unavailable",
        },
    })
    failures.append({"itemIdentifier": message_id})
    continue

Acceptance Criteria and Test Cases

All tests live in main/server/worldmm/tests/test_ingest_dlq.py.

AC-1: Auto-start called when GPU unavailable and SSM holds a real instance ID

Given _gpu_available() returns False, SSM returns i-abc123 (stopped), ec2.start_instances succeeds. Then _try_start_gpu calls ec2.start_instances(InstanceIds=["i-abc123"]) exactly once, logs gpu_autostart_attempted with source="ssm".

def test_try_start_gpu_uses_ssm_instance_id():
    from worldmm.pipeline.ingest_window import _try_start_gpu
    mock_ec2 = MagicMock()
    with (
        patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value="i-abc123"),
        patch("boto3.client", return_value=mock_ec2),
    ):
        _try_start_gpu()
    mock_ec2.start_instances.assert_called_once_with(InstanceIds=["i-abc123"])

AC-2: Auto-start falls back to tag scan when SSM holds sentinel "none"

Given _read_gpu_instance_id() returns None, EC2 tag scan finds one stopped instance i-tagfound. Then start_instances is called with i-tagfound, logs gpu_autostart_attempted with source="tag_scan".

def test_try_start_gpu_tag_scan_fallback():
    from worldmm.pipeline.ingest_window import _try_start_gpu
    mock_ec2 = MagicMock()
    mock_ec2.describe_instances.return_value = {
        "Reservations": [{"Instances": [{"InstanceId": "i-tagfound"}]}]
    }
    with (
        patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value=None),
        patch("boto3.client", return_value=mock_ec2),
    ):
        _try_start_gpu()
    mock_ec2.start_instances.assert_called_once_with(InstanceIds=["i-tagfound"])

AC-3: No start attempt when SSM is "none" and no stopped instance found by tag

Given _read_gpu_instance_id() returns None, EC2 tag scan returns empty. Then start_instances is never called.

def test_try_start_gpu_no_instance_found():
    from worldmm.pipeline.ingest_window import _try_start_gpu
    mock_ec2 = MagicMock()
    mock_ec2.describe_instances.return_value = {"Reservations": []}
    with (
        patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value=None),
        patch("boto3.client", return_value=mock_ec2),
    ):
        _try_start_gpu()
    mock_ec2.start_instances.assert_not_called()

AC-4: Exception during start_instances does not propagate to sqs_handler

Given ec2.start_instances raises ClientError. Then _try_start_gpu returns normally; sqs_handler still extends visibility and returns batchItemFailures.

def test_try_start_gpu_exception_does_not_propagate():
    from worldmm.pipeline.ingest_window import _try_start_gpu
    mock_ec2 = MagicMock()
    mock_ec2.start_instances.side_effect = Exception("access denied")
    with (
        patch("worldmm.pipeline.ingest_window._read_gpu_instance_id", return_value="i-abc"),
        patch("boto3.client", return_value=mock_ec2),
    ):
        _try_start_gpu()  # must not raise

AC-5: sqs_handler calls _try_start_gpu when GPU is down, then extends visibility

Given _gpu_available() returns False. Then sqs_handler calls _try_start_gpu() once before calling change_message_visibility(900).

def test_sqs_handler_calls_try_start_gpu_when_gpu_down():
    from worldmm.pipeline.ingest_window import sqs_handler
    payload = {"sessionId": "s1", "userId": "u1", "windowIndex": 0, "frameCount": 30}
    sqs_event = {"Records": [{"messageId": "m1", "receiptHandle": "r1",
                               "attributes": {"ApproximateReceiveCount": "1"},
                               "body": json.dumps(payload)}]}
    mock_sqs = MagicMock()
    with (
        patch("worldmm.pipeline.ingest_window._gpu_available", return_value=False),
        patch("worldmm.pipeline.ingest_window._try_start_gpu") as mock_start,
        patch("boto3.client", return_value=mock_sqs),
        patch.dict(os.environ, {"INGEST_DLQ_URL": "https://sqs.../queue"}),
    ):
        result = sqs_handler(sqs_event, None)
    mock_start.assert_called_once()
    mock_sqs.change_message_visibility.assert_called_once_with(
        QueueUrl="https://sqs.../queue", ReceiptHandle="r1", VisibilityTimeout=900
    )
    assert result["batchItemFailures"][0]["itemIdentifier"] == "m1"

AC-6: sqs_handler does NOT call _try_start_gpu when GPU is running

Given _gpu_available() returns True. Then _try_start_gpu is not called.

def test_sqs_handler_does_not_call_try_start_gpu_when_gpu_up():
    from worldmm.pipeline.ingest_window import sqs_handler
    payload = {"sessionId": "s1", "userId": "u1", "windowIndex": 0, "frameCount": 0}
    sqs_event = {"Records": [{"messageId": "m1",
                               "attributes": {"ApproximateReceiveCount": "1"},
                               "body": json.dumps(payload)}]}
    with (
        patch("worldmm.pipeline.ingest_window._gpu_available", return_value=True),
        patch("worldmm.pipeline.ingest_window.lambda_handler", return_value={"status": "ok"}),
        patch("worldmm.pipeline.ingest_window._try_start_gpu") as mock_start,
    ):
        sqs_handler(sqs_event, None)
    mock_start.assert_not_called()

Files to Change

File Change
main/server/worldmm/pipeline/ingest_window.py Add _try_start_gpu() helper; call it in sqs_handler in the not _gpu_available() branch before change_message_visibility.
main/server/template.yaml Add !Ref EC2Policy to the Policies list of IngestDLQConsumer. (EC2Policy already covers ec2:StartInstances, ec2:DescribeInstances; it is currently attached only to IngestWindowFunction.)
main/server/worldmm/tests/test_ingest_dlq.py Add test cases AC-1 through AC-6 listed above.

template.yaml diff (key section)

  IngestDLQConsumer:
    Type: AWS::Serverless::Function
    Properties:
      ...
      Policies:
        - !Ref S3AccessPolicy
        - !Ref DatabaseSsmPolicy
        - !Ref EC2Policy          # <-- ADD THIS LINE
        - Statement:
            - Effect: Allow
              Action:
                - sqs:ChangeMessageVisibility
              Resource:
                - !GetAtt IngestDLQ.Arn

Notes and Edge Cases

  • Already starting/pending state: If the instance is already in pending state, start_instances will return a InvalidInstanceID.NotFound or no-op without error in most cases. The fire-and-forget wrapper absorbs this.
  • IAM: EC2Policy already contains ec2:StartInstances and ec2:DescribeInstances with Resource: "*". Attaching it to IngestDLQConsumer is sufficient; no new IAM policy is needed.
  • No SSM write: _try_start_gpu does not call _update_ssm_gpu_instance_id. The instance ID found by tag scan is used only for start_instances; SSM is updated by the existing _resolve_gpu_url / _find_running_gpu_instance_by_tag logic on the next successful ingest invocation.
  • Message cadence: The 900s visibility timeout means the DLQ consumer will re-check GPU availability every 15 minutes. If the instance takes longer than 15 min to boot (typical GPU cold-start is ~3–5 min), the message will be re-delivered and the already-running instance will be processed normally on the next cycle.
  • No duplicate starts: Multiple queued messages may each trigger _try_start_gpu in the same Lambda batch. Because BatchSize: 1 is set on the DLQ trigger, each record is a separate invocation, so only one start attempt fires per Lambda execution. Even if multiple invocations fire concurrently, start_instances on an already-starting instance is idempotent (EC2 ignores the request).