Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,48 @@
"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED": "true"
},
"path": "./src/otel/otel_logger_example.py"
},
{
"name": "Filesystem SerDes Basic",
"description": "Basic usage of filesystem serdes in ALWAYS mode to store step results on a mounted filesystem",
"handler": "filesystem_serdes_basic.handler",
"integration": false,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"environment": {
"FILESYSTEM_MOUNT_PATH": "/mnt/s3"
},
"path": "./src/filesystem_serdes/filesystem_serdes_basic.py"
},
{
"name": "Filesystem SerDes Overflow",
"description": "Filesystem serdes in OVERFLOW mode — inline for small payloads, file for large",
"handler": "filesystem_serdes_overflow.handler",
"integration": false,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"environment": {
"FILESYSTEM_MOUNT_PATH": "/mnt/s3"
},
"path": "./src/filesystem_serdes/filesystem_serdes_overflow.py"
},
{
"name": "Filesystem SerDes Preview",
"description": "Filesystem serdes with preview configuration for observability",
"handler": "filesystem_serdes_preview.handler",
"integration": false,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"environment": {
"FILESYSTEM_MOUNT_PATH": "/mnt/s3"
},
"path": "./src/filesystem_serdes/filesystem_serdes_preview.py"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Example demonstrating basic filesystem serdes usage.

This example shows how to use create_filesystem_serdes to store step results
on a durable filesystem (e.g., Amazon S3 Files or EFS mounted to Lambda).

In ALWAYS mode (the default), every step result is written to a file on the
mounted filesystem, and only a file pointer is stored in the checkpoint. This
keeps checkpoint sizes small regardless of payload size.

WARNING: This requires a durable filesystem mount (S3 Files or EFS).
Do NOT use /tmp — it is ephemeral and not shared across invocations.
"""

import os
from typing import Any

from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.filesystem_serdes import (
FileSystemSerdesConfig,
FileSystemSerdesMode,
create_filesystem_serdes,
)


# Mount path for the durable filesystem (S3 Files or EFS)
# In production, this would be /mnt/s3 or /mnt/efs
MOUNT_PATH = os.environ.get("FILESYSTEM_MOUNT_PATH", "/mnt/s3")


@durable_execution
def handler(event: Any, context: DurableContext) -> dict[str, Any]:
"""Process data using filesystem serdes for large payloads.

This example demonstrates:
1. Creating a filesystem serdes instance
2. Using it with context.step() via StepConfig
3. The serdes transparently writes results to mounted storage
"""
# Create filesystem serdes - ALWAYS mode writes every result to file
fs_serdes = create_filesystem_serdes(MOUNT_PATH)

# Step 1: Generate some data (result stored on filesystem)
data = context.step(
lambda _: {
"users": [
{"id": i, "name": f"user_{i}", "email": f"user{i}@example.com"}
for i in range(10)
],
"metadata": {"total": 10, "page": 1},
},
name="generate_data",
config=StepConfig(serdes=fs_serdes),
)

# Step 2: Process the data (result also stored on filesystem)
processed = context.step(
lambda _: {
"processed_count": len(data["users"]),
"user_ids": [u["id"] for u in data["users"]],
},
name="process_data",
config=StepConfig(serdes=fs_serdes),
)

return {
"success": True,
"processed_count": processed["processed_count"],
"user_ids": processed["user_ids"],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Example demonstrating filesystem serdes in OVERFLOW mode.

In OVERFLOW mode, small values are stored inline in the checkpoint (as JSON),
and only values exceeding the ~256KB checkpoint size limit overflow to a file.
This is ideal for mixed workloads where most payloads are small but some may
be large.
"""

import os
from typing import Any

from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.filesystem_serdes import (
FileSystemSerdesConfig,
FileSystemSerdesMode,
create_filesystem_serdes,
)


MOUNT_PATH = os.environ.get("FILESYSTEM_MOUNT_PATH", "/mnt/s3")


@durable_execution
def handler(event: Any, context: DurableContext) -> dict[str, Any]:
"""Demonstrate OVERFLOW mode — inline for small, file for large.

This example shows that:
- Small step results stay inline in the checkpoint (fast, no file I/O)
- Large step results automatically overflow to the filesystem
"""
# OVERFLOW mode: inline if small, file if > ~256KB
fs_serdes = create_filesystem_serdes(
MOUNT_PATH,
FileSystemSerdesConfig(storage_mode=FileSystemSerdesMode.OVERFLOW),
)

# Step 1: Small payload — stays inline in checkpoint
small_result = context.step(
lambda _: {"status": "ok", "count": 42},
name="small_step",
config=StepConfig(serdes=fs_serdes),
)

# Step 2: Large payload — overflows to filesystem
large_result = context.step(
lambda _: {
"records": [
{"id": i, "data": "x" * 1000}
for i in range(300) # ~300KB payload
]
},
name="large_step",
config=StepConfig(serdes=fs_serdes),
)

return {
"small_status": small_result["status"],
"large_record_count": len(large_result["records"]),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Example demonstrating filesystem serdes with preview configuration.

When a preview generator is configured, the checkpoint envelope stores a compact
preview alongside the file pointer. This makes key fields visible in the console
and API without reading the full file from storage.
"""

import os
from typing import Any

from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.filesystem_serdes import (
FileSystemSerdesConfig,
PreviewConfig,
PreviewField,
PreviewMode,
build_preview,
create_filesystem_serdes,
)


MOUNT_PATH = os.environ.get("FILESYSTEM_MOUNT_PATH", "/mnt/s3")


@durable_execution
def handler(event: Any, context: DurableContext) -> dict[str, Any]:
"""Demonstrate filesystem serdes with preview for observability.

The preview config controls which fields appear inline in the checkpoint:
- include: fields to show
- mask: fields to show with masked values (e.g., "***")
- exclude: fields to never show (takes priority over mask)
"""
# Configure filesystem serdes with preview
fs_serdes = create_filesystem_serdes(
MOUNT_PATH,
FileSystemSerdesConfig(
generate_preview=lambda value: build_preview(
value,
PreviewConfig(
mode=PreviewMode.EXCLUDE_ALL,
include=[
PreviewField(name="order_id"),
PreviewField(name="status"),
PreviewField(name="total"),
],
mask=[PreviewField(name="email")],
),
),
),
)

# Process an order — full data stored on filesystem,
# but order_id, status, total, and masked email are visible in checkpoint
order = context.step(
lambda _: {
"order_id": "ORD-12345",
"status": "completed",
"total": 99.99,
"email": "customer@example.com",
"items": [
{"sku": "ITEM-001", "quantity": 2, "price": 29.99},
{"sku": "ITEM-002", "quantity": 1, "price": 40.01},
],
"shipping_address": {
"street": "123 Main St",
"city": "Seattle",
"state": "WA",
},
},
name="process_order",
config=StepConfig(serdes=fs_serdes),
)

return {
"order_id": order["order_id"],
"status": order["status"],
"item_count": len(order["items"]),
}
69 changes: 69 additions & 0 deletions packages/aws-durable-execution-sdk-python-examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,75 @@
}
}
}
},
"FilesystemSerdesBasic": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "filesystem_serdes_basic.handler",
"Description": "Basic usage of filesystem serdes in ALWAYS mode to store step results on a mounted filesystem",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"Environment": {
"Variables": {
"FILESYSTEM_MOUNT_PATH": "/mnt/s3"
}
}
}
},
"FilesystemSerdesOverflow": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "filesystem_serdes_overflow.handler",
"Description": "Filesystem serdes in OVERFLOW mode \u2014 inline for small payloads, file for large",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"Environment": {
"Variables": {
"FILESYSTEM_MOUNT_PATH": "/mnt/s3"
}
}
}
},
"FilesystemSerdesPreview": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "filesystem_serdes_preview.handler",
"Description": "Filesystem serdes with preview configuration for observability",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"Environment": {
"Variables": {
"FILESYSTEM_MOUNT_PATH": "/mnt/s3"
}
}
}
}
}
}
Loading