Memory Upload Flow Contract
- Plan type:
plan - Parent plan:
N/A - Depends on:
N/A - Status:
documentation
System Intent
- What is being built: A source-of-truth contract focused on the client-side
createNewMemoryFromFile flow that orchestrates multipart upload and the post MPU ingest memory pipeline. - Primary consumer(s): Mobile client upload orchestration (
createNewMemoryFromFile) that calls the three MPU API flows (start, parts, complete), with abort on failure. - Boundary (black-box scope only): Client-side
createNewMemoryFromFile orchestration across MPU (start, parts, complete, abort), memory persistence contracts through Memory ORM/DB state transitions, and the post MPU ingest memory pipeline that links vectors to the saved memory record.
Stage Gate Tracker
- [x] Stage 1 Mermaid approved
- [x] Stage 2 I/O contracts approved
- [x] Stage 3 pseudocode approved or skipped
1. Mermaid Diagram
Reference: .agent/skills/create-mermaid-diagram/SKILL.md
graph TD
subgraph Scope[Black Box Scope Client Upload]
A[Client Orchestrator - main/app/lib/api/memory/createNewMemoryFromFile.ts]
end
subgraph Backend[Server Backend]
direction TB
B[Start Upload Lambda - main/server/api/memories/uploads/start/app.py]
E[Part URL Lambda - main/server/api/memories/uploads/parts/app.py]
F[Complete Upload Lambda - main/server/api/memories/uploads/complete/app.py]
G[Abort Upload Lambda - main/server/api/memories/uploads/abort/app.py]
H[Ingest Memory Pipeline - main/server/memories/pipeline/ingest/app.py]
C[S3 Raw Memory Bucket - external/aws/s3]
D[Memory ORM State status and embedding_vector - main/server/layers/shared/python/shared/orm/memory_orm.py]
end
A -->|1 start request POST /memories/uploads| B
B -->|2 create memory record status started| D
B -->|3 create multipart upload session| C
B -->|4 start response memory_id key upload_id| A
A -->|5 part url request POST /memories/uploads/:uploadId/parts| E
E -->|6 presigned upload_part url| A
A -->|7 upload part bytes PUT| C
C -->|8 part etag| A
A -->|9 complete request POST /memories/uploads/:uploadId/complete| F
F -->|10 complete multipart upload| C
F -->|11 update memory status uploaded| D
F -->|12 complete response location etag| A
A -->|error abort request| G
G -->|abort multipart upload| C
G -->|update memory status aborted| D
C -.->|no deployed S3 event trigger mapping in template.yaml| H
H -->|write embedding_vector for memory_id| D
classDef done fill:#d3d3d3,stroke:#666,stroke-width:1px;
class A,B,C,D,E,F,G,H done;
Keep this short. Define types in JSON-style blocks and capture each flow with path-level rows.
Global Types
Define shared types used across multiple flows.
Current helper behavior note: for these memory lambdas, non-AppError exceptions (including ValueError) are returned as HTTP 500 by invoke_lambda; only AuthError preserves 401 with explicit error code.
Identifier {
value: string (stable unique identifier)
}
CompleteMemoryUploadPart {
PartNumber: number (1-based multipart part number)
ETag: string (S3 part checksum token)
}
MemoryOrmMemoryRecord {
memory_id: Identifier (primary record id)
user_id: string (owner identity)
memory_type: string (MIME media type)
raw_memory: string (S3 object URL)
memory_upload_id: string (S3 multipart upload id)
memory_upload_status: "started" | "in_progress" | "uploaded" | "failed" | "aborted" | "processed"
embedding_vector: vector[384] | null (written by post MPU ingest flow)
}
StandardError {
status: number (HTTP or equivalent status)
code: string (stable machine-readable code)
message: string (human-readable summary)
}
Flow: main/app/lib/api/memory/startMemoryUpload.ts.startMemoryUpload, main/app/__tests__/memory-api-mpu.test.ts, main/server/tests/unit/test_memories_uploads_start_app.py
Type Definitions
StartMemoryUploadPayload {
user_id: string (required)
memory_type: string (required)
}
StartMemoryUploadResponse {
memory_id: Identifier
bucket: string
key: string
url: string
upload_id: string
}
Paths
| path-name | input | output/expected state change | path-type | notes |
startMemoryUpload.success | StartMemoryUploadPayload | StartMemoryUploadResponse; MemoryOrmMemoryRecord created with memory_upload_status=started and embedding_vector=null | happy path | starts S3 MPU and creates ORM record |
startMemoryUpload.invalid-input | StartMemoryUploadPayload | StandardError status=500 code=internal-error | error | missing memory_type currently raises ValueError, mapped to 500 by invoke_lambda |
startMemoryUpload.auth-missing-or-invalid | StartMemoryUploadPayload | StandardError status=401 code=AUTH_MISSING or AUTH_INVALID | error | AuthError path from helper preserves 401 + code |
startMemoryUpload.s3-create-multipart-failed | StartMemoryUploadPayload | StandardError status=500 code=internal-error | error | S3 create_multipart_upload failure maps to generic 500 path |
startMemoryUpload.db-write-failed | StartMemoryUploadPayload | StandardError status=500 code=memory-record-create-failed | error | createMemory failure after S3 init; leaves potential orphaned MPU |
startMemoryUpload.db-config-or-credential-failed | StartMemoryUploadPayload | StandardError status=500 code=db-config-missing | error | DB env/SSM credential retrieval failure |
startMemoryUpload.unexpected-runtime-error | StartMemoryUploadPayload | StandardError status=500 code=internal-error | error | generic exception path in helper returns status 500 |
Flow: main/app/lib/api/memory/uploadMemoryPart.ts.uploadMemoryPart, main/app/__tests__/memory-api-mpu.test.ts, main/server/tests/unit/test_memories_uploads_parts_app.py
Type Definitions
UploadMemoryPartPayload {
user_id: string (required)
key: string (required)
upload_id: string (required)
part_number: number (required, integer > 0)
content_type: string (optional)
}
UploadMemoryPartResponse {
upload_id: string
key: string
part_number: number
url: string
headers: map<string,string>
}
Paths
| path-name | input | output/expected state change | path-type | notes |
uploadMemoryPart.success | UploadMemoryPartPayload | UploadMemoryPartResponse; no ORM state change | happy path | returns presigned S3 upload_part URL |
uploadMemoryPart.invalid-input | UploadMemoryPartPayload | StandardError status=500 code=internal-error | error | missing key/upload_id or part_number <= 0 currently raises ValueError |
uploadMemoryPart.ownership-error | UploadMemoryPartPayload | StandardError status=500 code=internal-error | error | ownership mismatch currently raises ValueError |
uploadMemoryPart.s3-presign-failed | UploadMemoryPartPayload | StandardError status=500 code=internal-error | error | S3 generate_presigned_url failure maps to generic 500 path |
uploadMemoryPart.invalid-or-expired-upload-id | UploadMemoryPartPayload | StandardError status=500 code=internal-error | error | invalid/expired upload session currently maps to generic 500 path |
uploadMemoryPart.server-config-error | UploadMemoryPartPayload | StandardError status=500 code=server-config-error | error | bucket/region/runtime configuration issue |
uploadMemoryPart.unexpected-runtime-error | UploadMemoryPartPayload | StandardError status=500 code=internal-error | error | unexpected lambda exception path |
Flow: main/app/lib/api/memory/createNewMemoryFromFile.ts.createNewMemoryFromFile.uploadPartToS3, main/app/__tests__/memory-create-from-file.test.ts
Type Definitions
UploadPartToS3Input {
url: string (required)
headers: map<string,string> (required)
body: bytes (required)
}
UploadPartToS3Output {
etag: string (required)
}
Paths
| path-name | input | output/expected state change | path-type | notes |
createNewMemoryFromFile.uploadPartToS3.success | UploadPartToS3Input | UploadPartToS3Output; S3 part persisted | happy path | etag is collected for completion payload |
createNewMemoryFromFile.uploadPartToS3.transfer-error | UploadPartToS3Input | StandardError message=Failed to upload part <n>: <status> | error | client throws generic Error on non-2xx response |
createNewMemoryFromFile.uploadPartToS3.missing-etag | UploadPartToS3Input | StandardError message=Missing ETag for part <n>. | error | client throws generic Error and aborts flow |
createNewMemoryFromFile.uploadPartToS3.network-timeout-or-abort | UploadPartToS3Input | StandardError message=<fetch runtime error> | error | network failures or abort signal bubble as runtime errors |
Flow: main/app/lib/api/memory/completeMemoryUpload.ts.completeMemoryUpload, main/app/__tests__/memory-api-mpu.test.ts, main/server/tests/unit/test_memories_uploads_complete_app.py
Type Definitions
CompleteMemoryUploadPayload {
user_id: string (required)
memory_id: Identifier (required)
key: string (required)
upload_id: string (required)
parts: CompleteMemoryUploadPart[] (required, non-empty)
}
CompleteMemoryUploadResponse {
location: string | null
etag: string | null
bucket: string
key: string
}
Paths
| path-name | input | output/expected state change | path-type | notes |
completeMemoryUpload.success | CompleteMemoryUploadPayload | CompleteMemoryUploadResponse; MemoryOrmMemoryRecord.memory_upload_status transitions to uploaded | happy path | finalizes S3 MPU and marks ORM record uploaded |
completeMemoryUpload.invalid-input | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | missing key/upload_id/memory_id/parts currently raises ValueError |
completeMemoryUpload.ownership-error | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | ownership mismatch currently raises ValueError |
completeMemoryUpload.invalid-parts-manifest | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | parts list malformed/out-of-order/etag mismatch for S3 completion |
completeMemoryUpload.invalid-or-expired-upload-id | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | upload_id not valid for target key at completion time |
completeMemoryUpload.s3-complete-failed | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | S3 complete_multipart_upload failure |
completeMemoryUpload.orm-status-update-failed | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | updateMemoryStatus failure after S3 complete |
completeMemoryUpload.unexpected-runtime-error | CompleteMemoryUploadPayload | StandardError status=500 code=internal-error | error | generic exception path in helper |
Flow: main/app/lib/api/memory/abortMemoryUpload.ts.abortMemoryUpload, main/app/__tests__/memory-create-from-file.test.ts, main/server/tests/unit/test_memories_uploads_abort_app.py
Type Definitions
AbortMemoryUploadPayload {
user_id: string (required)
memory_id: Identifier (required)
key: string (required)
upload_id: string (required)
}
AbortMemoryUploadResponse {
message: string
bucket: string
key: string
}
Paths
| path-name | input | output/expected state change | path-type | notes |
abortMemoryUpload.success | AbortMemoryUploadPayload | AbortMemoryUploadResponse; MemoryOrmMemoryRecord.memory_upload_status transitions to aborted | happy path | best-effort cleanup on failure path |
abortMemoryUpload.invalid-input | AbortMemoryUploadPayload | StandardError status=500 code=internal-error | error | missing key/upload_id/memory_id currently raises ValueError |
abortMemoryUpload.ownership-error | AbortMemoryUploadPayload | StandardError status=500 code=internal-error | error | ownership mismatch currently raises ValueError |
abortMemoryUpload.invalid-or-expired-upload-id | AbortMemoryUploadPayload | StandardError status=500 code=internal-error | error | upload_id is invalid/expired for target key |
abortMemoryUpload.s3-abort-failed | AbortMemoryUploadPayload | StandardError status=500 code=internal-error | error | S3 abort_multipart_upload failure |
abortMemoryUpload.orm-status-update-failed | AbortMemoryUploadPayload | StandardError status=500 code=internal-error | error | updateMemoryStatus failure after S3 abort call |
abortMemoryUpload.unexpected-runtime-error | AbortMemoryUploadPayload | StandardError status=500 code=internal-error | error | generic exception path in helper |
Flow: main/server/memories/pipeline/ingest/app.py.implementation, main/server/tests/unit/test_memories_pipeline_ingest_app.py
Type Definitions
PostUploadIngestMemoryInput {
memory_id: Identifier (required)
user_id: string (required)
key: string (required)
bucket: string (required)
}
PostUploadIngestMemoryOutput {
memory_id: Identifier
memory_upload_status: "processed"
}
Paths
| path-name | input | output/expected state change | path-type | notes |
implementation.success | PostUploadIngestMemoryInput | PostUploadIngestMemoryOutput; MemoryOrmMemoryRecord.embedding_vector written and memory_upload_status transitions to processed | happy path | implemented in main/server/memories/pipeline/ingest/app.py; deployment trigger wiring is separate |
implementation.not-found | PostUploadIngestMemoryInput | StandardError status=500 code=internal-error | error | unless ingest lambda raises NotFoundError/AppError, helper maps this to 500 |
implementation.vector-write-failed | PostUploadIngestMemoryInput | StandardError status=500 code=internal-error | error | failed embedding/vector write leaves record unprocessed |
3. Pseudocode for Critical Flows (Optional)
- Flow name::
createNewMemoryFromFile (current) validate file size > 0
start = call startMemoryUpload(user_id, memory_type)
for each part in file:
presigned = call uploadMemoryPart(start.key, start.upload_id, part_number)
PUT part bytes to presigned.url
assert ETag exists and append to parts list
complete = call completeMemoryUpload(start.key, start.upload_id, start.memory_id, parts)
return merged upload result
on any failure after start:
best-effort call abortMemoryUpload(start.key, start.upload_id, start.memory_id)
rethrow original error
- Implementation notes:
- Memory ORM lifecycle contract is owned by this plan:
createMemory starts records in started; updateMemoryStatus drives upload transitions (uploaded, aborted, failed, processed); updateMemoryEmbedding persists vector[384] and marks processed. - Current code includes an ingest lambda contract in
main/server/memories/pipeline/ingest/app.py that writes embedding_vector; template.yaml does not currently declare an automatic S3 event mapping to this function.
After all stages are approved, apply .agent/skills/reconcile-plans/SKILL.md to propagate contract updates across linked plans.