Skip to content

Memory Upload Flow Contract

Plan Metadata

  • Plan type: plan
  • Parent plan: N/A
  • Depends on: N/A
  • Status: documentation

System Intent

  • What is being built: A source-of-truth contract focused on the client-side createNewMemoryFromFile flow that orchestrates multipart upload and the post MPU ingest memory pipeline.
  • Primary consumer(s): Mobile client upload orchestration (createNewMemoryFromFile) that calls the three MPU API flows (start, parts, complete), with abort on failure.
  • Boundary (black-box scope only): Client-side createNewMemoryFromFile orchestration across MPU (start, parts, complete, abort), memory persistence contracts through Memory ORM/DB state transitions, and the post MPU ingest memory pipeline that links vectors to the saved memory record.

Stage Gate Tracker

  • [x] Stage 1 Mermaid approved
  • [x] Stage 2 I/O contracts approved
  • [x] Stage 3 pseudocode approved or skipped

1. Mermaid Diagram

Reference: .agent/skills/create-mermaid-diagram/SKILL.md

graph TD
  subgraph Scope[Black Box Scope Client Upload]
    A[Client Orchestrator - main/app/lib/api/memory/createNewMemoryFromFile.ts]
  end

  subgraph Backend[Server Backend]
    direction TB
    B[Start Upload Lambda - main/server/api/memories/uploads/start/app.py]
    E[Part URL Lambda - main/server/api/memories/uploads/parts/app.py]
    F[Complete Upload Lambda - main/server/api/memories/uploads/complete/app.py]
    G[Abort Upload Lambda - main/server/api/memories/uploads/abort/app.py]
    H[Ingest Memory Pipeline - main/server/memories/pipeline/ingest/app.py]
    C[S3 Raw Memory Bucket - external/aws/s3]
    D[Memory ORM State status and embedding_vector - main/server/layers/shared/python/shared/orm/memory_orm.py]
  end

  A -->|1 start request POST /memories/uploads| B
  B -->|2 create memory record status started| D
  B -->|3 create multipart upload session| C
  B -->|4 start response memory_id key upload_id| A

  A -->|5 part url request POST /memories/uploads/:uploadId/parts| E
  E -->|6 presigned upload_part url| A
  A -->|7 upload part bytes PUT| C
  C -->|8 part etag| A

  A -->|9 complete request POST /memories/uploads/:uploadId/complete| F
  F -->|10 complete multipart upload| C
  F -->|11 update memory status uploaded| D
  F -->|12 complete response location etag| A

  A -->|error abort request| G
  G -->|abort multipart upload| C
  G -->|update memory status aborted| D

  C -.->|no deployed S3 event trigger mapping in template.yaml| H
  H -->|write embedding_vector for memory_id| D

  classDef done fill:#d3d3d3,stroke:#666,stroke-width:1px;
  class A,B,C,D,E,F,G,H done;

2. Black-Box Inputs and Outputs

Keep this short. Define types in JSON-style blocks and capture each flow with path-level rows.

Global Types

Define shared types used across multiple flows.

Current helper behavior note: for these memory lambdas, non-AppError exceptions (including ValueError) are returned as HTTP 500 by invoke_lambda; only AuthError preserves 401 with explicit error code.

Identifier {
  value: string (stable unique identifier)
}

CompleteMemoryUploadPart {
  PartNumber: number (1-based multipart part number)
  ETag: string (S3 part checksum token)
}

MemoryOrmMemoryRecord {
  memory_id: Identifier (primary record id)
  user_id: string (owner identity)
  memory_type: string (MIME media type)
  raw_memory: string (S3 object URL)
  memory_upload_id: string (S3 multipart upload id)
  memory_upload_status: "started" | "in_progress" | "uploaded" | "failed" | "aborted" | "processed"
  embedding_vector: vector[384] | null (written by post MPU ingest flow)
}

StandardError {
  status: number (HTTP or equivalent status)
  code: string (stable machine-readable code)
  message: string (human-readable summary)
}

Flow: main/app/lib/api/memory/startMemoryUpload.ts.startMemoryUpload, main/app/__tests__/memory-api-mpu.test.ts, main/server/tests/unit/test_memories_uploads_start_app.py

Type Definitions

StartMemoryUploadPayload {
  user_id: string (required)
  memory_type: string (required)
}

StartMemoryUploadResponse {
  memory_id: Identifier
  bucket: string
  key: string
  url: string
  upload_id: string
}

Paths

path-name input output/expected state change path-type notes
startMemoryUpload.success StartMemoryUploadPayload StartMemoryUploadResponse; MemoryOrmMemoryRecord created with memory_upload_status=started and embedding_vector=null happy path starts S3 MPU and creates ORM record
startMemoryUpload.invalid-input StartMemoryUploadPayload StandardError status=500 code=internal-error error missing memory_type currently raises ValueError, mapped to 500 by invoke_lambda
startMemoryUpload.auth-missing-or-invalid StartMemoryUploadPayload StandardError status=401 code=AUTH_MISSING or AUTH_INVALID error AuthError path from helper preserves 401 + code
startMemoryUpload.s3-create-multipart-failed StartMemoryUploadPayload StandardError status=500 code=internal-error error S3 create_multipart_upload failure maps to generic 500 path
startMemoryUpload.db-write-failed StartMemoryUploadPayload StandardError status=500 code=memory-record-create-failed error createMemory failure after S3 init; leaves potential orphaned MPU
startMemoryUpload.db-config-or-credential-failed StartMemoryUploadPayload StandardError status=500 code=db-config-missing error DB env/SSM credential retrieval failure
startMemoryUpload.unexpected-runtime-error StartMemoryUploadPayload StandardError status=500 code=internal-error error generic exception path in helper returns status 500

Flow: main/app/lib/api/memory/uploadMemoryPart.ts.uploadMemoryPart, main/app/__tests__/memory-api-mpu.test.ts, main/server/tests/unit/test_memories_uploads_parts_app.py

Type Definitions

UploadMemoryPartPayload {
  user_id: string (required)
  key: string (required)
  upload_id: string (required)
  part_number: number (required, integer > 0)
  content_type: string (optional)
}

UploadMemoryPartResponse {
  upload_id: string
  key: string
  part_number: number
  url: string
  headers: map<string,string>
}

Paths

path-name input output/expected state change path-type notes
uploadMemoryPart.success UploadMemoryPartPayload UploadMemoryPartResponse; no ORM state change happy path returns presigned S3 upload_part URL
uploadMemoryPart.invalid-input UploadMemoryPartPayload StandardError status=500 code=internal-error error missing key/upload_id or part_number <= 0 currently raises ValueError
uploadMemoryPart.ownership-error UploadMemoryPartPayload StandardError status=500 code=internal-error error ownership mismatch currently raises ValueError
uploadMemoryPart.s3-presign-failed UploadMemoryPartPayload StandardError status=500 code=internal-error error S3 generate_presigned_url failure maps to generic 500 path
uploadMemoryPart.invalid-or-expired-upload-id UploadMemoryPartPayload StandardError status=500 code=internal-error error invalid/expired upload session currently maps to generic 500 path
uploadMemoryPart.server-config-error UploadMemoryPartPayload StandardError status=500 code=server-config-error error bucket/region/runtime configuration issue
uploadMemoryPart.unexpected-runtime-error UploadMemoryPartPayload StandardError status=500 code=internal-error error unexpected lambda exception path

Flow: main/app/lib/api/memory/createNewMemoryFromFile.ts.createNewMemoryFromFile.uploadPartToS3, main/app/__tests__/memory-create-from-file.test.ts

Type Definitions

UploadPartToS3Input {
  url: string (required)
  headers: map<string,string> (required)
  body: bytes (required)
}

UploadPartToS3Output {
  etag: string (required)
}

Paths

path-name input output/expected state change path-type notes
createNewMemoryFromFile.uploadPartToS3.success UploadPartToS3Input UploadPartToS3Output; S3 part persisted happy path etag is collected for completion payload
createNewMemoryFromFile.uploadPartToS3.transfer-error UploadPartToS3Input StandardError message=Failed to upload part <n>: <status> error client throws generic Error on non-2xx response
createNewMemoryFromFile.uploadPartToS3.missing-etag UploadPartToS3Input StandardError message=Missing ETag for part <n>. error client throws generic Error and aborts flow
createNewMemoryFromFile.uploadPartToS3.network-timeout-or-abort UploadPartToS3Input StandardError message=<fetch runtime error> error network failures or abort signal bubble as runtime errors

Flow: main/app/lib/api/memory/completeMemoryUpload.ts.completeMemoryUpload, main/app/__tests__/memory-api-mpu.test.ts, main/server/tests/unit/test_memories_uploads_complete_app.py

Type Definitions

CompleteMemoryUploadPayload {
  user_id: string (required)
  memory_id: Identifier (required)
  key: string (required)
  upload_id: string (required)
  parts: CompleteMemoryUploadPart[] (required, non-empty)
}

CompleteMemoryUploadResponse {
  location: string | null
  etag: string | null
  bucket: string
  key: string
}

Paths

path-name input output/expected state change path-type notes
completeMemoryUpload.success CompleteMemoryUploadPayload CompleteMemoryUploadResponse; MemoryOrmMemoryRecord.memory_upload_status transitions to uploaded happy path finalizes S3 MPU and marks ORM record uploaded
completeMemoryUpload.invalid-input CompleteMemoryUploadPayload StandardError status=500 code=internal-error error missing key/upload_id/memory_id/parts currently raises ValueError
completeMemoryUpload.ownership-error CompleteMemoryUploadPayload StandardError status=500 code=internal-error error ownership mismatch currently raises ValueError
completeMemoryUpload.invalid-parts-manifest CompleteMemoryUploadPayload StandardError status=500 code=internal-error error parts list malformed/out-of-order/etag mismatch for S3 completion
completeMemoryUpload.invalid-or-expired-upload-id CompleteMemoryUploadPayload StandardError status=500 code=internal-error error upload_id not valid for target key at completion time
completeMemoryUpload.s3-complete-failed CompleteMemoryUploadPayload StandardError status=500 code=internal-error error S3 complete_multipart_upload failure
completeMemoryUpload.orm-status-update-failed CompleteMemoryUploadPayload StandardError status=500 code=internal-error error updateMemoryStatus failure after S3 complete
completeMemoryUpload.unexpected-runtime-error CompleteMemoryUploadPayload StandardError status=500 code=internal-error error generic exception path in helper

Flow: main/app/lib/api/memory/abortMemoryUpload.ts.abortMemoryUpload, main/app/__tests__/memory-create-from-file.test.ts, main/server/tests/unit/test_memories_uploads_abort_app.py

Type Definitions

AbortMemoryUploadPayload {
  user_id: string (required)
  memory_id: Identifier (required)
  key: string (required)
  upload_id: string (required)
}

AbortMemoryUploadResponse {
  message: string
  bucket: string
  key: string
}

Paths

path-name input output/expected state change path-type notes
abortMemoryUpload.success AbortMemoryUploadPayload AbortMemoryUploadResponse; MemoryOrmMemoryRecord.memory_upload_status transitions to aborted happy path best-effort cleanup on failure path
abortMemoryUpload.invalid-input AbortMemoryUploadPayload StandardError status=500 code=internal-error error missing key/upload_id/memory_id currently raises ValueError
abortMemoryUpload.ownership-error AbortMemoryUploadPayload StandardError status=500 code=internal-error error ownership mismatch currently raises ValueError
abortMemoryUpload.invalid-or-expired-upload-id AbortMemoryUploadPayload StandardError status=500 code=internal-error error upload_id is invalid/expired for target key
abortMemoryUpload.s3-abort-failed AbortMemoryUploadPayload StandardError status=500 code=internal-error error S3 abort_multipart_upload failure
abortMemoryUpload.orm-status-update-failed AbortMemoryUploadPayload StandardError status=500 code=internal-error error updateMemoryStatus failure after S3 abort call
abortMemoryUpload.unexpected-runtime-error AbortMemoryUploadPayload StandardError status=500 code=internal-error error generic exception path in helper

Flow: main/server/memories/pipeline/ingest/app.py.implementation, main/server/tests/unit/test_memories_pipeline_ingest_app.py

Type Definitions

PostUploadIngestMemoryInput {
  memory_id: Identifier (required)
  user_id: string (required)
  key: string (required)
  bucket: string (required)
}

PostUploadIngestMemoryOutput {
  memory_id: Identifier
  memory_upload_status: "processed"
}

Paths

path-name input output/expected state change path-type notes
implementation.success PostUploadIngestMemoryInput PostUploadIngestMemoryOutput; MemoryOrmMemoryRecord.embedding_vector written and memory_upload_status transitions to processed happy path implemented in main/server/memories/pipeline/ingest/app.py; deployment trigger wiring is separate
implementation.not-found PostUploadIngestMemoryInput StandardError status=500 code=internal-error error unless ingest lambda raises NotFoundError/AppError, helper maps this to 500
implementation.vector-write-failed PostUploadIngestMemoryInput StandardError status=500 code=internal-error error failed embedding/vector write leaves record unprocessed

3. Pseudocode for Critical Flows (Optional)

  • Flow name:: createNewMemoryFromFile (current)
    validate file size > 0
    start = call startMemoryUpload(user_id, memory_type)
    for each part in file:
      presigned = call uploadMemoryPart(start.key, start.upload_id, part_number)
      PUT part bytes to presigned.url
      assert ETag exists and append to parts list
    complete = call completeMemoryUpload(start.key, start.upload_id, start.memory_id, parts)
    return merged upload result
    on any failure after start:
      best-effort call abortMemoryUpload(start.key, start.upload_id, start.memory_id)
      rethrow original error
    
  • Implementation notes:
  • Memory ORM lifecycle contract is owned by this plan: createMemory starts records in started; updateMemoryStatus drives upload transitions (uploaded, aborted, failed, processed); updateMemoryEmbedding persists vector[384] and marks processed.
  • Current code includes an ingest lambda contract in main/server/memories/pipeline/ingest/app.py that writes embedding_vector; template.yaml does not currently declare an automatic S3 event mapping to this function.

After all stages are approved, apply .agent/skills/reconcile-plans/SKILL.md to propagate contract updates across linked plans.