From 84e6e0cf16a7367b92351db5bc1ab5e2ff2f5cb3 Mon Sep 17 00:00:00 2001 From: Ayushi Ahjolia Date: Fri, 26 Jun 2026 16:48:30 -0700 Subject: [PATCH 1/2] feat(serdes): Add filesystem serdes for large payload storage --- .../examples-catalog.json | 42 + .../src/filesystem_serdes/__init__.py | 0 .../filesystem_serdes_basic.py | 71 ++ .../filesystem_serdes_overflow.py | 61 ++ .../filesystem_serdes_preview.py | 81 ++ .../template.yaml | 69 ++ .../test/filesystem_serdes/__init__.py | 0 .../test_filesystem_serdes_basic.py | 146 ++++ .../filesystem_serdes.py | 320 ++++++++ .../preview.py | 199 +++++ .../tests/e2e/filesystem_serdes_int_test.py | 494 ++++++++++++ .../tests/filesystem_serdes_test.py | 727 ++++++++++++++++++ .../tests/preview_test.py | 222 ++++++ 13 files changed, 2432 insertions(+) create mode 100644 packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/__init__.py create mode 100644 packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_basic.py create mode 100644 packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_overflow.py create mode 100644 packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_preview.py create mode 100644 packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/__init__.py create mode 100644 packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/test_filesystem_serdes_basic.py create mode 100644 packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/filesystem_serdes.py create mode 100644 packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/preview.py create mode 100644 packages/aws-durable-execution-sdk-python/tests/e2e/filesystem_serdes_int_test.py create mode 100644 packages/aws-durable-execution-sdk-python/tests/filesystem_serdes_test.py create mode 100644 packages/aws-durable-execution-sdk-python/tests/preview_test.py diff --git a/packages/aws-durable-execution-sdk-python-examples/examples-catalog.json b/packages/aws-durable-execution-sdk-python-examples/examples-catalog.json index 76bede86..a8085ada 100644 --- a/packages/aws-durable-execution-sdk-python-examples/examples-catalog.json +++ b/packages/aws-durable-execution-sdk-python-examples/examples-catalog.json @@ -654,6 +654,48 @@ "OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED": "true" }, "path": "./src/otel/otel_logger_example.py" + }, + { + "name": "Filesystem SerDes Basic", + "description": "Basic usage of filesystem serdes in ALWAYS mode to store step results on a mounted filesystem", + "handler": "filesystem_serdes_basic.handler", + "integration": false, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "environment": { + "FILESYSTEM_MOUNT_PATH": "/mnt/s3" + }, + "path": "./src/filesystem_serdes/filesystem_serdes_basic.py" + }, + { + "name": "Filesystem SerDes Overflow", + "description": "Filesystem serdes in OVERFLOW mode — inline for small payloads, file for large", + "handler": "filesystem_serdes_overflow.handler", + "integration": false, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "environment": { + "FILESYSTEM_MOUNT_PATH": "/mnt/s3" + }, + "path": "./src/filesystem_serdes/filesystem_serdes_overflow.py" + }, + { + "name": "Filesystem SerDes Preview", + "description": "Filesystem serdes with preview configuration for observability", + "handler": "filesystem_serdes_preview.handler", + "integration": false, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "environment": { + "FILESYSTEM_MOUNT_PATH": "/mnt/s3" + }, + "path": "./src/filesystem_serdes/filesystem_serdes_preview.py" } ] } diff --git a/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/__init__.py b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_basic.py b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_basic.py new file mode 100644 index 00000000..b545ad36 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_basic.py @@ -0,0 +1,71 @@ +"""Example demonstrating basic filesystem serdes usage. + +This example shows how to use create_filesystem_serdes to store step results +on a durable filesystem (e.g., Amazon S3 Files or EFS mounted to Lambda). + +In ALWAYS mode (the default), every step result is written to a file on the +mounted filesystem, and only a file pointer is stored in the checkpoint. This +keeps checkpoint sizes small regardless of payload size. + +WARNING: This requires a durable filesystem mount (S3 Files or EFS). +Do NOT use /tmp — it is ephemeral and not shared across invocations. +""" + +import os +from typing import Any + +from aws_durable_execution_sdk_python.config import StepConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.filesystem_serdes import ( + FileSystemSerdesConfig, + FileSystemSerdesMode, + create_filesystem_serdes, +) + + +# Mount path for the durable filesystem (S3 Files or EFS) +# In production, this would be /mnt/s3 or /mnt/efs +MOUNT_PATH = os.environ.get("FILESYSTEM_MOUNT_PATH", "/mnt/s3") + + +@durable_execution +def handler(event: Any, context: DurableContext) -> dict[str, Any]: + """Process data using filesystem serdes for large payloads. + + This example demonstrates: + 1. Creating a filesystem serdes instance + 2. Using it with context.step() via StepConfig + 3. The serdes transparently writes results to mounted storage + """ + # Create filesystem serdes - ALWAYS mode writes every result to file + fs_serdes = create_filesystem_serdes(MOUNT_PATH) + + # Step 1: Generate some data (result stored on filesystem) + data = context.step( + lambda _: { + "users": [ + {"id": i, "name": f"user_{i}", "email": f"user{i}@example.com"} + for i in range(10) + ], + "metadata": {"total": 10, "page": 1}, + }, + name="generate_data", + config=StepConfig(serdes=fs_serdes), + ) + + # Step 2: Process the data (result also stored on filesystem) + processed = context.step( + lambda _: { + "processed_count": len(data["users"]), + "user_ids": [u["id"] for u in data["users"]], + }, + name="process_data", + config=StepConfig(serdes=fs_serdes), + ) + + return { + "success": True, + "processed_count": processed["processed_count"], + "user_ids": processed["user_ids"], + } diff --git a/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_overflow.py b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_overflow.py new file mode 100644 index 00000000..7661810a --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_overflow.py @@ -0,0 +1,61 @@ +"""Example demonstrating filesystem serdes in OVERFLOW mode. + +In OVERFLOW mode, small values are stored inline in the checkpoint (as JSON), +and only values exceeding the ~256KB checkpoint size limit overflow to a file. +This is ideal for mixed workloads where most payloads are small but some may +be large. +""" + +import os +from typing import Any + +from aws_durable_execution_sdk_python.config import StepConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.filesystem_serdes import ( + FileSystemSerdesConfig, + FileSystemSerdesMode, + create_filesystem_serdes, +) + + +MOUNT_PATH = os.environ.get("FILESYSTEM_MOUNT_PATH", "/mnt/s3") + + +@durable_execution +def handler(event: Any, context: DurableContext) -> dict[str, Any]: + """Demonstrate OVERFLOW mode — inline for small, file for large. + + This example shows that: + - Small step results stay inline in the checkpoint (fast, no file I/O) + - Large step results automatically overflow to the filesystem + """ + # OVERFLOW mode: inline if small, file if > ~256KB + fs_serdes = create_filesystem_serdes( + MOUNT_PATH, + FileSystemSerdesConfig(storage_mode=FileSystemSerdesMode.OVERFLOW), + ) + + # Step 1: Small payload — stays inline in checkpoint + small_result = context.step( + lambda _: {"status": "ok", "count": 42}, + name="small_step", + config=StepConfig(serdes=fs_serdes), + ) + + # Step 2: Large payload — overflows to filesystem + large_result = context.step( + lambda _: { + "records": [ + {"id": i, "data": "x" * 1000} + for i in range(300) # ~300KB payload + ] + }, + name="large_step", + config=StepConfig(serdes=fs_serdes), + ) + + return { + "small_status": small_result["status"], + "large_record_count": len(large_result["records"]), + } diff --git a/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_preview.py b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_preview.py new file mode 100644 index 00000000..494d823b --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-examples/src/filesystem_serdes/filesystem_serdes_preview.py @@ -0,0 +1,81 @@ +"""Example demonstrating filesystem serdes with preview configuration. + +When a preview generator is configured, the checkpoint envelope stores a compact +preview alongside the file pointer. This makes key fields visible in the console +and API without reading the full file from storage. +""" + +import os +from typing import Any + +from aws_durable_execution_sdk_python.config import StepConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.filesystem_serdes import ( + FileSystemSerdesConfig, + PreviewConfig, + PreviewField, + PreviewMode, + build_preview, + create_filesystem_serdes, +) + + +MOUNT_PATH = os.environ.get("FILESYSTEM_MOUNT_PATH", "/mnt/s3") + + +@durable_execution +def handler(event: Any, context: DurableContext) -> dict[str, Any]: + """Demonstrate filesystem serdes with preview for observability. + + The preview config controls which fields appear inline in the checkpoint: + - include: fields to show + - mask: fields to show with masked values (e.g., "***") + - exclude: fields to never show (takes priority over mask) + """ + # Configure filesystem serdes with preview + fs_serdes = create_filesystem_serdes( + MOUNT_PATH, + FileSystemSerdesConfig( + generate_preview=lambda value: build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[ + PreviewField(name="order_id"), + PreviewField(name="status"), + PreviewField(name="total"), + ], + mask=[PreviewField(name="email")], + ), + ), + ), + ) + + # Process an order — full data stored on filesystem, + # but order_id, status, total, and masked email are visible in checkpoint + order = context.step( + lambda _: { + "order_id": "ORD-12345", + "status": "completed", + "total": 99.99, + "email": "customer@example.com", + "items": [ + {"sku": "ITEM-001", "quantity": 2, "price": 29.99}, + {"sku": "ITEM-002", "quantity": 1, "price": 40.01}, + ], + "shipping_address": { + "street": "123 Main St", + "city": "Seattle", + "state": "WA", + }, + }, + name="process_order", + config=StepConfig(serdes=fs_serdes), + ) + + return { + "order_id": order["order_id"], + "status": order["status"], + "item_count": len(order["items"]), + } diff --git a/packages/aws-durable-execution-sdk-python-examples/template.yaml b/packages/aws-durable-execution-sdk-python-examples/template.yaml index 5e39d9e6..0cbb9a82 100644 --- a/packages/aws-durable-execution-sdk-python-examples/template.yaml +++ b/packages/aws-durable-execution-sdk-python-examples/template.yaml @@ -1068,6 +1068,75 @@ } } } + }, + "FilesystemSerDesBasic": { + "Type": "AWS::Serverless::Function", + "Properties": { + "CodeUri": "build/", + "Handler": "filesystem_serdes_basic.handler", + "Description": "Basic usage of filesystem serdes in ALWAYS mode to store step results on a mounted filesystem", + "Role": { + "Fn::GetAtt": [ + "DurableFunctionRole", + "Arn" + ] + }, + "DurableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "Environment": { + "Variables": { + "FILESYSTEM_MOUNT_PATH": "/mnt/s3" + } + } + } + }, + "FilesystemSerDesOverflow": { + "Type": "AWS::Serverless::Function", + "Properties": { + "CodeUri": "build/", + "Handler": "filesystem_serdes_overflow.handler", + "Description": "Filesystem serdes in OVERFLOW mode — inline for small payloads, file for large", + "Role": { + "Fn::GetAtt": [ + "DurableFunctionRole", + "Arn" + ] + }, + "DurableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "Environment": { + "Variables": { + "FILESYSTEM_MOUNT_PATH": "/mnt/s3" + } + } + } + }, + "FilesystemSerDesPreview": { + "Type": "AWS::Serverless::Function", + "Properties": { + "CodeUri": "build/", + "Handler": "filesystem_serdes_preview.handler", + "Description": "Filesystem serdes with preview configuration for observability", + "Role": { + "Fn::GetAtt": [ + "DurableFunctionRole", + "Arn" + ] + }, + "DurableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "Environment": { + "Variables": { + "FILESYSTEM_MOUNT_PATH": "/mnt/s3" + } + } + } } } } \ No newline at end of file diff --git a/packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/__init__.py b/packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/test_filesystem_serdes_basic.py b/packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/test_filesystem_serdes_basic.py new file mode 100644 index 00000000..16479ee8 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-examples/test/filesystem_serdes/test_filesystem_serdes_basic.py @@ -0,0 +1,146 @@ +"""Integration tests for filesystem serdes examples. + +These tests use a temporary directory as the filesystem mount path to verify +that the filesystem serdes correctly writes and reads files during durable +execution. They bypass the standard durable_runner fixture because they need +to control the FILESYSTEM_MOUNT_PATH environment variable before handler import. +""" + +import importlib +import json +import os + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +def test_filesystem_serdes_basic(tmp_path, monkeypatch): + """Test basic filesystem serdes in ALWAYS mode. + + Verifies that: + - The handler completes successfully + - Step results are written as JSON files to the mounted filesystem + - Deserialized results are correct + """ + monkeypatch.setenv("FILESYSTEM_MOUNT_PATH", str(tmp_path)) + + from src.filesystem_serdes import filesystem_serdes_basic + + importlib.reload(filesystem_serdes_basic) + + runner = DurableFunctionTestRunner(handler=filesystem_serdes_basic.handler) + + with runner: + result = runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify the workflow completed successfully + assert result_data["success"] is True + assert result_data["processed_count"] == 10 + assert len(result_data["user_ids"]) == 10 + assert result_data["user_ids"] == list(range(10)) + + # Verify files were actually written to the temp directory + json_files = [] + for root, dirs, files in os.walk(str(tmp_path)): + for f in files: + if f.endswith(".json"): + json_files.append(os.path.join(root, f)) + + # At least 2 files (one per step) + assert len(json_files) >= 2 + + # Verify file contents are valid JSON + for file_path in json_files: + with open(file_path) as f: + data = json.load(f) + assert data is not None + + +@pytest.mark.example +def test_filesystem_serdes_overflow(tmp_path, monkeypatch): + """Test filesystem serdes in OVERFLOW mode. + + Verifies that: + - Small payloads stay inline (no file written for small step) + - Large payloads overflow to file + - The handler returns correct results + """ + monkeypatch.setenv("FILESYSTEM_MOUNT_PATH", str(tmp_path)) + + from src.filesystem_serdes import filesystem_serdes_overflow + + importlib.reload(filesystem_serdes_overflow) + + runner = DurableFunctionTestRunner(handler=filesystem_serdes_overflow.handler) + + with runner: + result = runner.run(input="test", timeout=30) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data["small_status"] == "ok" + assert result_data["large_record_count"] == 300 + + # In overflow mode, at least the large step should have written a file + json_files = [] + for root, dirs, files in os.walk(str(tmp_path)): + for f in files: + if f.endswith(".json"): + json_files.append(os.path.join(root, f)) + + assert len(json_files) >= 1 + + +@pytest.mark.example +def test_filesystem_serdes_preview(tmp_path, monkeypatch): + """Test filesystem serdes with preview configuration. + + Verifies that: + - The handler completes successfully + - Files are written containing the FULL data (not just preview) + - The handler returns correct results + """ + monkeypatch.setenv("FILESYSTEM_MOUNT_PATH", str(tmp_path)) + + from src.filesystem_serdes import filesystem_serdes_preview + + importlib.reload(filesystem_serdes_preview) + + runner = DurableFunctionTestRunner(handler=filesystem_serdes_preview.handler) + + with runner: + result = runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify the order was processed + assert result_data["order_id"] == "ORD-12345" + assert result_data["status"] == "completed" + assert result_data["item_count"] == 2 + + # Verify a file was written for the order step + json_files = [] + for root, dirs, files in os.walk(str(tmp_path)): + for f in files: + if f.endswith(".json"): + json_files.append(os.path.join(root, f)) + + assert len(json_files) >= 1 + + # Verify the written file contains the full order data (not just preview) + with open(json_files[0]) as f: + stored_data = json.load(f) + assert "order_id" in stored_data + assert "items" in stored_data # Full data, not truncated preview + assert "shipping_address" in stored_data diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/filesystem_serdes.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/filesystem_serdes.py new file mode 100644 index 00000000..62552381 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/filesystem_serdes.py @@ -0,0 +1,320 @@ +"""Filesystem-based serialization for durable functions. + +This module provides a SerDes implementation that stores serialized values on a +durable filesystem (Amazon S3 Files or Amazon EFS mounted to Lambda). + +WARNING: Do NOT use with Lambda's ephemeral /tmp storage. Lambda's /tmp is local +to a single execution environment and is not shared across invocations. On replay, +a different environment may be used and the file will not be found. + +Use only with a durable, shared filesystem such as: +- Amazon S3 Files — mount an S3 bucket as a filesystem via the Lambda console or IaC +- Amazon EFS — mount an EFS file system to your Lambda function + +Key Features: +- Two storage modes: ALWAYS (write every value to file) and OVERFLOW (inline if + small, overflow to file if exceeds checkpoint size limit) +- Two path encoding modes: URI (human-readable) and HASH (fixed-length, safe) +- Optional preview support for storing compact summaries inline in checkpoints +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import re +import sys +from dataclasses import dataclass +from enum import StrEnum +from typing import Any, Protocol + +from aws_durable_execution_sdk_python.preview import ( + FieldMatchMode, + PreviewConfig, + PreviewField, + PreviewMode, + build_preview, +) +from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext + + +# Re-export preview types so users can import everything from filesystem_serdes +__all__ = [ + "FieldMatchMode", + "FileSystemPathEncoding", + "FileSystemSerdesConfig", + "FileSystemSerdesMode", + "PreviewConfig", + "PreviewField", + "PreviewMode", + "build_preview", + "create_filesystem_serdes", +] + +logger = logging.getLogger(__name__) + +# Checkpoint size limit in bytes (256KB) +_CHECKPOINT_SIZE_LIMIT_BYTES = 256 * 1024 + +# Subtract 1KB headroom for the envelope wrapper and other checkpoint metadata +_OVERFLOW_THRESHOLD_BYTES = _CHECKPOINT_SIZE_LIMIT_BYTES - 1024 + + +class FileSystemSerdesMode(StrEnum): + """Controls when data is written to the filesystem. + + - ALWAYS: Every value is written to a file; the checkpoint stores only a + file pointer. Best for consistently large payloads or when you want + predictable checkpoint sizes. + + - OVERFLOW: Data is written inline (as JSON) unless it exceeds the durable + function checkpoint size limit (~256KB), in which case it overflows to a + file. Best for mixed workloads where most payloads are small. + """ + + ALWAYS = "ALWAYS" + OVERFLOW = "OVERFLOW" + + +class FileSystemPathEncoding(StrEnum): + """Controls how the durable execution ARN and operation ID are turned into + on-disk directory and file names. + + - URI: The per-execution directory is a compact, human-navigable path built + from the ARN's function name, execution name and invocation id + (//); the file name is the + operation ID percent-encoded. Names stay readable, but a very long + operation ID may exceed the filesystem's per-name length limit (commonly + 255 bytes). If the ARN does not match the expected durable-execution shape, + the whole ARN is percent-encoded into a single directory segment instead. + + - HASH: The ARN (directory) and operation ID (file name) are each replaced + by their SHA-256 hex digest. Names are a fixed length (64 chars) and always + filesystem-safe regardless of the characters or length of the original + value, at the cost of no longer being human-readable. + """ + + URI = "URI" + HASH = "HASH" + + +class GeneratePreview(Protocol): + """Protocol for preview generation functions.""" + + def __call__(self, value: Any) -> dict[str, Any] | None: ... + + +@dataclass(frozen=True) +class FileSystemSerdesConfig: + """Configuration options for create_filesystem_serdes. + + Attributes: + storage_mode: Controls when data is written to the filesystem. + Default: FileSystemSerdesMode.ALWAYS + path_encoding: Controls how the ARN (directory) and operation ID (file + name) are encoded into path segments. Default: FileSystemPathEncoding.URI + generate_preview: Optional function that generates a preview object from + the value. When provided, the preview is stored inline in the + checkpoint envelope alongside the file pointer, making data visible + in the console and API without reading the full file. + """ + + storage_mode: FileSystemSerdesMode = FileSystemSerdesMode.ALWAYS + path_encoding: FileSystemPathEncoding = FileSystemPathEncoding.URI + generate_preview: GeneratePreview | None = None + + +# Matches a durable execution ARN of the form: +# arn::lambda:::function::/durable-execution// +_DURABLE_EXECUTION_ARN_PATTERN = re.compile( + r"^arn:[^:]*:lambda:[^:]*:[^:]*:function:([^:/]+):[^:/]+/durable-execution/([^/]+)/([^/]+)$" +) + + +def _encode_segment(value: str, encoding: FileSystemPathEncoding) -> str: + """Encode a path segment (ARN or operation ID) into a filesystem-safe name. + + For URI encoding, uses percent-encoding of all characters except unreserved + (RFC 3986). For HASH encoding, uses SHA-256 hex digest. + """ + if encoding == FileSystemPathEncoding.HASH: + return hashlib.sha256(value.encode()).hexdigest() + from urllib.parse import quote + + return quote(value, safe="") + + +def _parse_durable_execution_arn( + arn: str, +) -> tuple[str, str, str] | None: + """Parse a durable execution ARN into (functionName, executionName, invocationId). + + Returns None if the ARN doesn't match the expected pattern. + """ + match = _DURABLE_EXECUTION_ARN_PATTERN.match(arn) + if not match: + return None + return match.group(1), match.group(2), match.group(3) + + +def _resolve_execution_dir( + base_path: str, + arn: str, + path_encoding: FileSystemPathEncoding, +) -> str: + """Resolve the per-execution directory under base_path. + + In URI mode, derives a compact human-navigable path from the execution's + function name, execution name and invocation id. If the ARN doesn't match + the expected shape, the whole ARN is URI-encoded into a single segment. + + In HASH mode, the whole ARN is hashed into a single fixed-length segment. + """ + if path_encoding == FileSystemPathEncoding.URI: + parts = _parse_durable_execution_arn(arn) + if parts: + function_name, execution_name, invocation_id = parts + return os.path.join(base_path, function_name, execution_name, invocation_id) + return os.path.join(base_path, _encode_segment(arn, path_encoding)) + + +def _write_to_file( + base_path: str, + value: Any, + context: SerDesContext, + path_encoding: FileSystemPathEncoding, +) -> str: + """Write value as JSON to a file and return the file path.""" + dir_path = _resolve_execution_dir( + base_path, context.durable_execution_arn, path_encoding + ) + os.makedirs(dir_path, exist_ok=True) + file_name = f"{_encode_segment(context.operation_id, path_encoding)}.json" + file_path = os.path.join(dir_path, file_name) + with open(file_path, "w", encoding="utf-8") as f: + json.dump(value, f) + return file_path + + +class _FileSystemSerDes(SerDes[Any]): + """SerDes that stores values on a durable filesystem. + + The checkpoint stores a JSON envelope that is either: + - {"data": ""} — value stored inline (OVERFLOW mode, under threshold) + - {"file": ""} — value stored in a file + - {"file": "", "preview": {...}} — file pointer with inline preview + """ + + def __init__(self, base_path: str, config: FileSystemSerdesConfig) -> None: + self._base_path = base_path + self._storage_mode = config.storage_mode + self._path_encoding = config.path_encoding + self._generate_preview = config.generate_preview + + def serialize(self, value: Any, serdes_context: SerDesContext) -> str: + """Serialize value to a JSON envelope string. + + In ALWAYS mode, writes to file and returns a file pointer envelope. + In OVERFLOW mode, stores inline if small, overflows to file if large. + """ + if self._storage_mode == FileSystemSerdesMode.ALWAYS: + file_path = _write_to_file( + self._base_path, value, serdes_context, self._path_encoding + ) + envelope: dict[str, Any] = {"file": file_path} + if self._generate_preview: + preview = self._generate_preview(value) + if preview: + envelope["preview"] = preview + return json.dumps(envelope) + + # OVERFLOW mode: serialize inline first, overflow to file if too large + inline_json = json.dumps(value) + envelope_str = json.dumps({"data": inline_json}) + if len(envelope_str.encode("utf-8")) > _OVERFLOW_THRESHOLD_BYTES: + file_path = _write_to_file( + self._base_path, value, serdes_context, self._path_encoding + ) + envelope = {"file": file_path} + if self._generate_preview: + preview = self._generate_preview(value) + if preview: + envelope["preview"] = preview + return json.dumps(envelope) + + return envelope_str + + def deserialize(self, data: str, serdes_context: SerDesContext) -> Any: # noqa: ARG002 + """Deserialize from a JSON envelope string. + + Reads from file if envelope contains a file pointer, otherwise parses + inline data. + """ + envelope = json.loads(data) + + if "file" in envelope: + with open(envelope["file"], encoding="utf-8") as f: + return json.load(f) + + return json.loads(envelope["data"]) + + +def create_filesystem_serdes( + base_path: str, + config: FileSystemSerdesConfig | None = None, +) -> SerDes[Any]: + """Create a SerDes that stores serialized values on a durable filesystem. + + WARNING: Do NOT use with Lambda's ephemeral /tmp storage. + Lambda's /tmp filesystem is local to a single execution environment and is + not shared across invocations or function instances. On replay, a different + execution environment may be used and the file will not be found, causing + deserialization to fail. + + Use only with a durable, shared filesystem such as: + - Amazon S3 Files — mount an S3 bucket as a filesystem via the Lambda + console or IaC + - Amazon EFS — mount an EFS file system to your Lambda function + + Both options provide persistence across invocations and are accessible from + multiple concurrent function instances, which is required for correct replay + behavior. + + The checkpoint stores a JSON envelope that is either: + - {"data": ""} — value stored inline (OVERFLOW mode, under + threshold) + - {"file": ""} — value stored in a file + - {"file": "", "preview": {...}} — file pointer with inline preview + (when preview is configured) + + Args: + base_path: Directory path where data files will be stored (e.g. "/mnt/s3" + for S3 Files, "/mnt/efs" for EFS). + config: Optional configuration options. Defaults to ALWAYS mode with + URI path encoding. + + Returns: + A SerDes instance that reads/writes JSON files under base_path. + + Example: + # Always write to S3 Files mount (default) + fs_serdes = create_filesystem_serdes("/mnt/s3") + + # Only overflow to filesystem when payload exceeds ~256KB + fs_serdes = create_filesystem_serdes("/mnt/s3", FileSystemSerdesConfig( + storage_mode=FileSystemSerdesMode.OVERFLOW, + )) + + # With preview: show id and masked email in checkpoint + fs_serdes = create_filesystem_serdes("/mnt/s3", FileSystemSerdesConfig( + generate_preview=lambda value: build_preview(value, PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="id"), PreviewField(name="status")], + mask=[PreviewField(name="email")], + )), + )) + """ + effective_config = config or FileSystemSerdesConfig() + return _FileSystemSerDes(base_path, effective_config) diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/preview.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/preview.py new file mode 100644 index 00000000..0cccf9cf --- /dev/null +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/preview.py @@ -0,0 +1,199 @@ +"""Preview generation utilities for durable function checkpoints. + +This module provides a standalone utility for building compact preview objects +from values. Previews can be stored inline in checkpoint envelopes to make key +fields visible in the console and API without reading the full stored data. + +The preview system is designed to be used with any SerDes implementation — not +just the filesystem serdes. For example, you could use build_preview with a +custom DynamoDB-backed serdes or any other external storage serdes. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Any + + +class PreviewMode(StrEnum): + """Controls which fields are included in the preview by default. + + - INCLUDE_ALL: Include all fields, then apply exclude and mask rules. + - EXCLUDE_ALL: Exclude all fields, then apply include and mask rules. + """ + + INCLUDE_ALL = "INCLUDE_ALL" + EXCLUDE_ALL = "EXCLUDE_ALL" + + +class FieldMatchMode(StrEnum): + """Controls whether a preview field is matched by name anywhere in the + object tree, or by exact dot-notation path from the root. + + - ANYWHERE: Match the field name at any depth in the object tree (default). + - PATH: Match by exact dot-notation path from root. A single segment + (e.g. "email") matches only the root-level field. + """ + + ANYWHERE = "ANYWHERE" + PATH = "PATH" + + +@dataclass(frozen=True) +class PreviewField: + """A field selector used in preview include/exclude/mask lists. + + Attributes: + name: Field name or dot-notation path. + match: How to match the field. Defaults to FieldMatchMode.ANYWHERE. + """ + + name: str + match: FieldMatchMode = FieldMatchMode.ANYWHERE + + +@dataclass(frozen=True) +class PreviewConfig: + """Configuration for build_preview. + + Attributes: + mode: Whether to start with all fields included or all excluded. + include: Fields to include (used with EXCLUDE_ALL mode). + exclude: Fields to exclude (used with INCLUDE_ALL mode). + mask: Fields to mask — if visible, their value is replaced with mask_string. + mask_string: String used to replace masked field values. Default: "***" + max_preview_bytes: Maximum size in bytes for the preview object + (JSON-serialized). Fields are added until this limit is reached. + Default: 4096. + """ + + mode: PreviewMode + include: list[PreviewField] = field(default_factory=list) + exclude: list[PreviewField] = field(default_factory=list) + mask: list[PreviewField] = field(default_factory=list) + mask_string: str = "***" + max_preview_bytes: int = 4096 + + +def _field_matches(path: str, preview_field: PreviewField) -> bool: + """Check if a field at the given path matches a PreviewField rule.""" + if preview_field.match == FieldMatchMode.PATH: + return path == preview_field.name + # ANYWHERE: match if any segment of the path matches the field name + return preview_field.name in path.split(".") + + +def _is_matched(path: str, fields: list[PreviewField]) -> bool: + """Check if a path matches any field in the list.""" + return any(_field_matches(path, f) for f in fields) + + +def build_preview( + value: Any, + config: PreviewConfig, +) -> dict[str, Any] | None: + """Build a preview object from value according to config. + + Traverses the object tree and collects fields based on the include/exclude/mask + rules in config. The result is a nested object mirroring the original structure, + capped at config.max_preview_bytes (default 4096 bytes). + + Priority rules: + - exclude always wins — excluded fields are never shown, even if in mask + - mask implies visibility — masked fields are shown (with mask_string) unless + excluded + + Limitations: + - Field names containing dots are not supported (indistinguishable from path + separators) + - Array structure is not preserved — fields from array elements are merged + into a plain object at the array's path + - When array elements have heterogeneous shapes at the same field path, + later elements overwrite earlier primitives in the preview + + Args: + value: The value to build a preview from. + config: Preview configuration. + + Returns: + A nested dict representing the preview, or None if no fields are visible + or value is not a dict/object. + """ + if not isinstance(value, dict): + return None + + dangerous_keys = {"__proto__", "constructor", "prototype"} + pairs: list[tuple[str, Any]] = [] + + def collect(obj: Any, path_prefix: str) -> None: + if obj is None or not isinstance(obj, dict | list): + return + + if isinstance(obj, list): + for item in obj: + collect(item, path_prefix) + return + + for key in obj: + if key in dangerous_keys: + continue + if "." in str(key): + continue + + path = f"{path_prefix}.{key}" if path_prefix else str(key) + masked = _is_matched(path, config.mask) + excluded = _is_matched(path, config.exclude) + visible = not excluded and ( + masked + or ( + config.mode == PreviewMode.INCLUDE_ALL + if not _is_matched(path, config.include) + else True + ) + ) + + if not visible: + if not excluded: + collect(obj[key], path) + continue + + if masked: + pairs.append((path, config.mask_string)) + continue + + if isinstance(obj[key], dict | list): + collect(obj[key], path) + else: + pairs.append((path, obj[key])) + + collect(value, "") + if not pairs: + return None + + # Apply byte budget + accepted: list[tuple[str, Any]] = [] + estimated_size = 2 # "{}" + for path, val in pairs: + entry_size = len(f'"{path}":{json.dumps(val)},'.encode()) + if estimated_size + entry_size > config.max_preview_bytes: + break + accepted.append((path, val)) + estimated_size += entry_size + + if not accepted: + return None + + # Build nested result dict + result: dict[str, Any] = {} + for path, val in accepted: + parts = path.split(".") + node = result + for i in range(len(parts) - 1): + if not isinstance(node.get(parts[i]), dict): + node[parts[i]] = {} + node = node[parts[i]] + node[parts[-1]] = val + + return result if result else None diff --git a/packages/aws-durable-execution-sdk-python/tests/e2e/filesystem_serdes_int_test.py b/packages/aws-durable-execution-sdk-python/tests/e2e/filesystem_serdes_int_test.py new file mode 100644 index 00000000..bfab3417 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python/tests/e2e/filesystem_serdes_int_test.py @@ -0,0 +1,494 @@ +"""Integration tests for filesystem serdes end-to-end execution. + +Tests the full execution flow with filesystem serdes: +- First invocation: step executes, writes to filesystem, checkpoints envelope +- Replay: step deserializes from checkpointed envelope, reads from filesystem +""" + +from __future__ import annotations + +import json +import os +from typing import TYPE_CHECKING, Any +from unittest.mock import Mock, patch + +import pytest + +from aws_durable_execution_sdk_python.config import StepConfig +from aws_durable_execution_sdk_python.context import DurableContext, durable_step +from aws_durable_execution_sdk_python.execution import ( + InvocationStatus, + durable_execution, +) +from aws_durable_execution_sdk_python.filesystem_serdes import ( + FileSystemPathEncoding, + FileSystemSerdesConfig, + FileSystemSerdesMode, + PreviewConfig, + PreviewField, + PreviewMode, + build_preview, + create_filesystem_serdes, +) +from aws_durable_execution_sdk_python.lambda_service import ( + CheckpointOutput, + CheckpointUpdatedExecutionState, + Operation, + OperationAction, + OperationStatus, + OperationType, + StepDetails, +) + +if TYPE_CHECKING: + from aws_durable_execution_sdk_python.types import StepContext + + +def _create_lambda_context(): + """Create a mock Lambda context.""" + ctx = Mock() + ctx.aws_request_id = "test-request-id" + ctx.client_context = None + ctx.identity = None + ctx._epoch_deadline_time_in_ms = 0 # noqa: SLF001 + ctx.invoked_function_arn = "test-arn" + ctx.tenant_id = None + return ctx + + +def _create_initial_event(input_payload: str = "{}"): + """Create a fresh execution event (first invocation).""" + return { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test-func:1/durable-execution/exec-001/inv-001", + "CheckpointToken": "test-token", + "InitialExecutionState": { + "Operations": [ + { + "Id": "execution-1", + "Type": "EXECUTION", + "Status": "STARTED", + "ExecutionDetails": {"InputPayload": input_payload}, + } + ], + "NextMarker": "", + }, + "LocalRunner": True, + } + + +def _create_replay_event(operations: list[dict], input_payload: str = "{}"): + """Create a replay event with pre-existing operations.""" + base_ops = [ + { + "Id": "execution-1", + "Type": "EXECUTION", + "Status": "STARTED", + "ExecutionDetails": {"InputPayload": input_payload}, + } + ] + return { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test-func:1/durable-execution/exec-001/inv-001", + "CheckpointToken": "test-token", + "InitialExecutionState": { + "Operations": base_ops + operations, + "NextMarker": "", + }, + "LocalRunner": True, + } + + +def test_filesystem_serdes_first_invocation(tmp_path): + """Test first invocation: step executes and writes result to filesystem. + + Verifies that: + - The handler completes successfully + - The checkpoint payload contains a file pointer envelope + - The file was actually written to the filesystem + """ + mount_path = str(tmp_path) + + @durable_execution + def my_handler(event, context: DurableContext) -> dict[str, Any]: + fs_serdes = create_filesystem_serdes(mount_path) + result = context.step( + lambda _: {"order_id": "ORD-123", "total": 99.99}, + name="process_order", + config=StepConfig(serdes=fs_serdes), + ) + return result + + with patch( + "aws_durable_execution_sdk_python.execution.LambdaClient" + ) as mock_client_class: + mock_client = Mock() + mock_client_class.initialize_client.return_value = mock_client + + checkpoint_calls = [] + + def mock_checkpoint( + durable_execution_arn, + checkpoint_token, + updates, + client_token="token", # noqa: S107 + ): + checkpoint_calls.append(updates) + return CheckpointOutput( + checkpoint_token="new_token", # noqa: S106 + new_execution_state=CheckpointUpdatedExecutionState(), + ) + + mock_client.checkpoint = mock_checkpoint + + event = _create_initial_event() + result = my_handler(event, _create_lambda_context()) + + assert result["Status"] == InvocationStatus.SUCCEEDED.value + + # Verify the result is correct + result_data = json.loads(result["Result"]) + assert result_data == {"order_id": "ORD-123", "total": 99.99} + + # Verify checkpoint payload contains file pointer envelope + all_operations = [op for batch in checkpoint_calls for op in batch] + succeed_ops = [op for op in all_operations if op.action == OperationAction.SUCCEED] + assert len(succeed_ops) >= 1 + + # The step's succeed payload should be a file envelope + step_payload = succeed_ops[0].payload + envelope = json.loads(step_payload) + assert "file" in envelope + assert os.path.exists(envelope["file"]) + + # Verify file contains the actual data + with open(envelope["file"]) as f: + stored_data = json.load(f) + assert stored_data == {"order_id": "ORD-123", "total": 99.99} + + +def test_filesystem_serdes_replay_from_checkpoint(tmp_path): + """Test replay: step result is deserialized from filesystem via envelope. + + Simulates a replay scenario where: + 1. A file already exists on the filesystem (from a previous invocation) + 2. The checkpoint contains a file pointer envelope + 3. On replay, the serdes reads the file to get the result + """ + mount_path = str(tmp_path) + + # Generate the deterministic step ID that the SDK will produce + from tests.test_helpers import operation_id_sequence + + step_id = next(operation_id_sequence()) + + # Pre-create the file that would have been written in the first invocation + arn = "arn:aws:lambda:us-east-1:123456789012:function:test-func:1/durable-execution/exec-001/inv-001" + dir_path = os.path.join(mount_path, "test-func", "exec-001", "inv-001") + os.makedirs(dir_path, exist_ok=True) + + from urllib.parse import quote + + file_name = f"{quote(step_id, safe='')}.json" + file_path = os.path.join(dir_path, file_name) + with open(file_path, "w") as f: + json.dump({"order_id": "ORD-123", "total": 99.99}, f) + + # The envelope that would have been checkpointed + envelope = json.dumps({"file": file_path}) + + @durable_execution + def my_handler(event, context: DurableContext) -> dict[str, Any]: + fs_serdes = create_filesystem_serdes(mount_path) + result = context.step( + lambda _: (_ for _ in ()).throw( + RuntimeError("Should not execute on replay") + ), + name="process_order", + config=StepConfig(serdes=fs_serdes), + ) + return result + + with patch( + "aws_durable_execution_sdk_python.execution.LambdaClient" + ) as mock_client_class: + mock_client = Mock() + mock_client_class.initialize_client.return_value = mock_client + + checkpoint_calls = [] + + def mock_checkpoint( + durable_execution_arn, + checkpoint_token, + updates, + client_token="token", # noqa: S107 + ): + checkpoint_calls.append(updates) + return CheckpointOutput( + checkpoint_token="new_token", # noqa: S106 + new_execution_state=CheckpointUpdatedExecutionState(), + ) + + mock_client.checkpoint = mock_checkpoint + + # Create replay event with the step already SUCCEEDED using the correct ID + event = _create_replay_event( + [ + { + "Id": step_id, + "Type": "STEP", + "Status": "SUCCEEDED", + "ParentId": "execution-1", + "StepDetails": {"Result": envelope}, + } + ] + ) + + result = my_handler(event, _create_lambda_context()) + + assert result["Status"] == InvocationStatus.SUCCEEDED.value + + # Verify the deserialized result matches the file content + result_data = json.loads(result["Result"]) + assert result_data == {"order_id": "ORD-123", "total": 99.99} + + +def test_filesystem_serdes_overflow_mode_small_inline(tmp_path): + """Test OVERFLOW mode: small values stay inline in the checkpoint.""" + mount_path = str(tmp_path) + + @durable_execution + def my_handler(event, context: DurableContext) -> dict[str, Any]: + fs_serdes = create_filesystem_serdes( + mount_path, + FileSystemSerdesConfig(storage_mode=FileSystemSerdesMode.OVERFLOW), + ) + result = context.step( + lambda _: {"status": "ok", "count": 5}, + name="small_step", + config=StepConfig(serdes=fs_serdes), + ) + return result + + with patch( + "aws_durable_execution_sdk_python.execution.LambdaClient" + ) as mock_client_class: + mock_client = Mock() + mock_client_class.initialize_client.return_value = mock_client + + checkpoint_calls = [] + + def mock_checkpoint( + durable_execution_arn, + checkpoint_token, + updates, + client_token="token", # noqa: S107 + ): + checkpoint_calls.append(updates) + return CheckpointOutput( + checkpoint_token="new_token", # noqa: S106 + new_execution_state=CheckpointUpdatedExecutionState(), + ) + + mock_client.checkpoint = mock_checkpoint + + event = _create_initial_event() + result = my_handler(event, _create_lambda_context()) + + assert result["Status"] == InvocationStatus.SUCCEEDED.value + result_data = json.loads(result["Result"]) + assert result_data == {"status": "ok", "count": 5} + + # Verify checkpoint payload is inline (data envelope, no file) + all_operations = [op for batch in checkpoint_calls for op in batch] + succeed_ops = [op for op in all_operations if op.action == OperationAction.SUCCEED] + assert len(succeed_ops) >= 1 + + step_payload = succeed_ops[0].payload + envelope = json.loads(step_payload) + assert "data" in envelope + assert "file" not in envelope + + # No files should have been written + json_files = list(tmp_path.rglob("*.json")) + assert len(json_files) == 0 + + +def test_filesystem_serdes_with_preview(tmp_path): + """Test that preview is stored in the checkpoint envelope alongside file pointer.""" + mount_path = str(tmp_path) + + @durable_execution + def my_handler(event, context: DurableContext) -> dict[str, Any]: + fs_serdes = create_filesystem_serdes( + mount_path, + FileSystemSerdesConfig( + generate_preview=lambda value: build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="order_id")], + mask=[PreviewField(name="email")], + ), + ), + ), + ) + result = context.step( + lambda _: { + "order_id": "ORD-456", + "email": "secret@example.com", + "items": [{"sku": "A", "qty": 2}], + }, + name="order_step", + config=StepConfig(serdes=fs_serdes), + ) + return result + + with patch( + "aws_durable_execution_sdk_python.execution.LambdaClient" + ) as mock_client_class: + mock_client = Mock() + mock_client_class.initialize_client.return_value = mock_client + + checkpoint_calls = [] + + def mock_checkpoint( + durable_execution_arn, + checkpoint_token, + updates, + client_token="token", # noqa: S107 + ): + checkpoint_calls.append(updates) + return CheckpointOutput( + checkpoint_token="new_token", # noqa: S106 + new_execution_state=CheckpointUpdatedExecutionState(), + ) + + mock_client.checkpoint = mock_checkpoint + + event = _create_initial_event() + result = my_handler(event, _create_lambda_context()) + + assert result["Status"] == InvocationStatus.SUCCEEDED.value + + # Verify the checkpoint envelope includes preview + all_operations = [op for batch in checkpoint_calls for op in batch] + succeed_ops = [op for op in all_operations if op.action == OperationAction.SUCCEED] + + step_payload = succeed_ops[0].payload + envelope = json.loads(step_payload) + + assert "file" in envelope + assert "preview" in envelope + assert envelope["preview"]["order_id"] == "ORD-456" + assert envelope["preview"]["email"] == "***" + assert "items" not in envelope["preview"] + + +def test_filesystem_serdes_multiple_steps(tmp_path): + """Test multiple steps using filesystem serdes in a single execution.""" + mount_path = str(tmp_path) + + @durable_execution + def my_handler(event, context: DurableContext) -> dict[str, Any]: + fs_serdes = create_filesystem_serdes(mount_path) + + step1 = context.step( + lambda _: {"step": 1, "data": "first"}, + name="step_one", + config=StepConfig(serdes=fs_serdes), + ) + step2 = context.step( + lambda _: {"step": 2, "data": "second", "prev": step1["data"]}, + name="step_two", + config=StepConfig(serdes=fs_serdes), + ) + return {"results": [step1, step2]} + + with patch( + "aws_durable_execution_sdk_python.execution.LambdaClient" + ) as mock_client_class: + mock_client = Mock() + mock_client_class.initialize_client.return_value = mock_client + + def mock_checkpoint( + durable_execution_arn, + checkpoint_token, + updates, + client_token="token", # noqa: S107 + ): + return CheckpointOutput( + checkpoint_token="new_token", # noqa: S106 + new_execution_state=CheckpointUpdatedExecutionState(), + ) + + mock_client.checkpoint = mock_checkpoint + + event = _create_initial_event() + result = my_handler(event, _create_lambda_context()) + + assert result["Status"] == InvocationStatus.SUCCEEDED.value + + result_data = json.loads(result["Result"]) + assert result_data["results"][0] == {"step": 1, "data": "first"} + assert result_data["results"][1] == { + "step": 2, + "data": "second", + "prev": "first", + } + + # Two separate files should exist + json_files = list(tmp_path.rglob("*.json")) + assert len(json_files) == 2 + + +def test_filesystem_serdes_hash_encoding(tmp_path): + """Test that HASH path encoding produces fixed-length file names.""" + mount_path = str(tmp_path) + + @durable_execution + def my_handler(event, context: DurableContext) -> dict[str, Any]: + fs_serdes = create_filesystem_serdes( + mount_path, + FileSystemSerdesConfig(path_encoding=FileSystemPathEncoding.HASH), + ) + result = context.step( + lambda _: {"encoded": True}, + name="hash_step", + config=StepConfig(serdes=fs_serdes), + ) + return result + + with patch( + "aws_durable_execution_sdk_python.execution.LambdaClient" + ) as mock_client_class: + mock_client = Mock() + mock_client_class.initialize_client.return_value = mock_client + + def mock_checkpoint( + durable_execution_arn, + checkpoint_token, + updates, + client_token="token", # noqa: S107 + ): + return CheckpointOutput( + checkpoint_token="new_token", # noqa: S106 + new_execution_state=CheckpointUpdatedExecutionState(), + ) + + mock_client.checkpoint = mock_checkpoint + + event = _create_initial_event() + result = my_handler(event, _create_lambda_context()) + + assert result["Status"] == InvocationStatus.SUCCEEDED.value + + # Verify HASH encoding: directory and file should be hex digests + json_files = list(tmp_path.rglob("*.json")) + assert len(json_files) == 1 + + file_name = json_files[0].name + # Hash (64 chars) + ".json" (5 chars) = 69 chars + assert len(file_name) == 69 + + # Directory should be a hash too + dir_name = json_files[0].parent.name + assert len(dir_name) == 64 diff --git a/packages/aws-durable-execution-sdk-python/tests/filesystem_serdes_test.py b/packages/aws-durable-execution-sdk-python/tests/filesystem_serdes_test.py new file mode 100644 index 00000000..863a42af --- /dev/null +++ b/packages/aws-durable-execution-sdk-python/tests/filesystem_serdes_test.py @@ -0,0 +1,727 @@ +"""Unit tests for filesystem serdes module.""" + +import hashlib +import json +import os + +import pytest + +from aws_durable_execution_sdk_python.filesystem_serdes import ( + FileSystemPathEncoding, + FileSystemSerdesConfig, + FileSystemSerdesMode, + PreviewConfig, + PreviewField, + PreviewMode, + _OVERFLOW_THRESHOLD_BYTES, + _encode_segment, + _parse_durable_execution_arn, + _resolve_execution_dir, + _write_to_file, + build_preview, + create_filesystem_serdes, +) +from aws_durable_execution_sdk_python.serdes import SerDesContext + +BASE_PATH = "/mnt/s3" +TEST_ARN = "arn:aws:lambda:us-east-1:123456789012:function:test-function:1/durable-execution/test-exec-id/test-invocation-id" +TEST_OPERATION_ID = "step-1" + +# A non-durable-execution ARN (fallback path) +NON_DURABLE_ARN = "arn:aws:lambda:us-east-1:123456789012:function:my-func" + +MOCK_CONTEXT = SerDesContext( + operation_id=TEST_OPERATION_ID, + durable_execution_arn=TEST_ARN, +) + +# --- ARN Parsing Tests --- + + +def test_parse_durable_execution_arn_valid(): + """Test parsing a valid durable execution ARN.""" + result = _parse_durable_execution_arn(TEST_ARN) + assert result is not None + function_name, execution_name, invocation_id = result + assert function_name == "test-function" + assert execution_name == "test-exec-id" + assert invocation_id == "test-invocation-id" + + +def test_parse_durable_execution_arn_with_different_partition(): + """Test parsing ARN with different partition (e.g., aws-cn).""" + arn = "arn:aws-cn:lambda:cn-north-1:123456789012:function:my-func:3/durable-execution/exec-123/inv-456" + result = _parse_durable_execution_arn(arn) + assert result is not None + assert result == ("my-func", "exec-123", "inv-456") + + +def test_parse_durable_execution_arn_non_durable(): + """Test parsing a non-durable-execution ARN returns None.""" + result = _parse_durable_execution_arn(NON_DURABLE_ARN) + assert result is None + + +def test_parse_durable_execution_arn_empty_string(): + """Test parsing an empty string returns None.""" + result = _parse_durable_execution_arn("") + assert result is None + + +def test_parse_durable_execution_arn_random_string(): + """Test parsing a random string returns None.""" + result = _parse_durable_execution_arn("not-an-arn-at-all") + assert result is None + + +def test_parse_durable_execution_arn_missing_version(): + """Test parsing an ARN without version qualifier returns None.""" + arn = "arn:aws:lambda:us-east-1:123456789012:function:test-function/durable-execution/exec/inv" + result = _parse_durable_execution_arn(arn) + assert result is None + + +# --- Path Encoding Tests --- + + +def test_encode_segment_uri_simple(): + """Test URI encoding of a simple string.""" + result = _encode_segment("step-1", FileSystemPathEncoding.URI) + assert result == "step-1" + + +def test_encode_segment_uri_with_special_chars(): + """Test URI encoding of a string with special characters.""" + result = _encode_segment("../invoices/2026", FileSystemPathEncoding.URI) + # Should percent-encode slashes (path separators) + assert "/" not in result + # Dots are safe in file names, only slashes cause traversal + assert "%2F" in result + + +def test_encode_segment_uri_with_spaces(): + """Test URI encoding of a string with spaces.""" + result = _encode_segment("my step name", FileSystemPathEncoding.URI) + assert " " not in result + assert "%20" in result + + +def test_encode_segment_hash(): + """Test HASH encoding produces SHA-256 hex digest.""" + result = _encode_segment("step-1", FileSystemPathEncoding.HASH) + expected = hashlib.sha256("step-1".encode()).hexdigest() + assert result == expected + assert len(result) == 64 + + +def test_encode_segment_hash_fixed_length(): + """Test HASH encoding produces fixed-length output regardless of input.""" + short_result = _encode_segment("x", FileSystemPathEncoding.HASH) + long_result = _encode_segment("x" * 10000, FileSystemPathEncoding.HASH) + assert len(short_result) == 64 + assert len(long_result) == 64 + + +# --- Resolve Execution Dir Tests --- + + +def test_resolve_execution_dir_uri_with_durable_arn(): + """Test URI mode derives compact directory from ARN parts.""" + result = _resolve_execution_dir(BASE_PATH, TEST_ARN, FileSystemPathEncoding.URI) + expected = os.path.join( + BASE_PATH, "test-function", "test-exec-id", "test-invocation-id" + ) + assert result == expected + + +def test_resolve_execution_dir_uri_with_non_durable_arn(): + """Test URI mode falls back to encoding the whole ARN for non-durable ARN.""" + result = _resolve_execution_dir( + BASE_PATH, NON_DURABLE_ARN, FileSystemPathEncoding.URI + ) + from urllib.parse import quote + + expected = os.path.join(BASE_PATH, quote(NON_DURABLE_ARN, safe="")) + assert result == expected + + +def test_resolve_execution_dir_hash(): + """Test HASH mode hashes the whole ARN into a single segment.""" + result = _resolve_execution_dir(BASE_PATH, TEST_ARN, FileSystemPathEncoding.HASH) + expected = os.path.join(BASE_PATH, hashlib.sha256(TEST_ARN.encode()).hexdigest()) + assert result == expected + + +# --- Write to File Tests --- + + +def test_write_to_file_creates_directory_and_writes(tmp_path): + """Test that _write_to_file creates directory structure and writes JSON.""" + context = SerDesContext( + operation_id="my-step", + durable_execution_arn=TEST_ARN, + ) + value = {"id": 1, "name": "Alice"} + + file_path = _write_to_file( + str(tmp_path), value, context, FileSystemPathEncoding.URI + ) + + assert os.path.exists(file_path) + with open(file_path) as f: + written_data = json.load(f) + assert written_data == value + + +def test_write_to_file_with_hash_encoding(tmp_path): + """Test _write_to_file with HASH encoding.""" + context = SerDesContext( + operation_id="my-step", + durable_execution_arn=TEST_ARN, + ) + value = {"test": "data"} + + file_path = _write_to_file( + str(tmp_path), value, context, FileSystemPathEncoding.HASH + ) + + assert os.path.exists(file_path) + # File name should be the hash of operation_id + .json + file_name = os.path.basename(file_path) + expected_name = f"{hashlib.sha256('my-step'.encode()).hexdigest()}.json" + assert file_name == expected_name + + +def test_write_to_file_unsafe_operation_id(tmp_path): + """Test _write_to_file with an unsafe operation ID containing path traversal.""" + context = SerDesContext( + operation_id="../invoices/2026", + durable_execution_arn=TEST_ARN, + ) + value = {"id": 1} + + file_path = _write_to_file( + str(tmp_path), value, context, FileSystemPathEncoding.URI + ) + + assert os.path.exists(file_path) + # The file should be in the expected directory, not escaped + dir_path = os.path.dirname(file_path) + expected_dir = _resolve_execution_dir( + str(tmp_path), TEST_ARN, FileSystemPathEncoding.URI + ) + assert dir_path == expected_dir + # No path traversal + assert "/../" not in file_path + + +class TestAlwaysMode: + """Tests for FileSystemSerdesMode.ALWAYS.""" + + def test_serialize_writes_to_file_and_returns_envelope(self, tmp_path): + """Serialize should write to file and return file pointer envelope.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = {"id": 1, "name": "Alice"} + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "file" in envelope + assert "data" not in envelope + assert os.path.exists(envelope["file"]) + + def test_deserialize_reads_from_file(self, tmp_path): + """Deserialize should read value from file pointer envelope.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = {"id": 1, "name": "Alice"} + + serialized = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(serialized, MOCK_CONTEXT) + + assert deserialized == value + + def test_roundtrip_complex_data(self, tmp_path): + """Test round-trip with complex nested data.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = { + "users": [ + {"id": 1, "name": "Alice", "active": True}, + {"id": 2, "name": "Bob", "active": False}, + ], + "metadata": {"count": 2, "page": 1}, + "tags": ["admin", "verified"], + } + + serialized = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(serialized, MOCK_CONTEXT) + + assert deserialized == value + + def test_serialize_none_value(self, tmp_path): + """Test serialization of None value.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result = serdes.serialize(None, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "file" in envelope + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + assert deserialized is None + + def test_serialize_string_value(self, tmp_path): + """Test serialization of a plain string value.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result = serdes.serialize("hello world", MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == "hello world" + + def test_serialize_numeric_value(self, tmp_path): + """Test serialization of numeric values.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + for value in [42, 3.14, -100, 0]: + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + assert deserialized == value + + def test_serialize_list_value(self, tmp_path): + """Test serialization of list values.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = [1, 2, 3, "hello", None, True] + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_multiple_operations_different_files(self, tmp_path): + """Different operations should produce different files.""" + serdes = create_filesystem_serdes(str(tmp_path)) + ctx1 = SerDesContext(operation_id="step-1", durable_execution_arn=TEST_ARN) + ctx2 = SerDesContext(operation_id="step-2", durable_execution_arn=TEST_ARN) + + result1 = serdes.serialize({"a": 1}, ctx1) + result2 = serdes.serialize({"b": 2}, ctx2) + + envelope1 = json.loads(result1) + envelope2 = json.loads(result2) + + assert envelope1["file"] != envelope2["file"] + assert os.path.exists(envelope1["file"]) + assert os.path.exists(envelope2["file"]) + + +class TestOverflowMode: + """Tests for FileSystemSerdesMode.OVERFLOW.""" + + def _create_overflow_serdes(self, tmp_path): + return create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig(storage_mode=FileSystemSerdesMode.OVERFLOW), + ) + + def test_small_value_stored_inline(self, tmp_path): + """Small values should be stored inline in the envelope.""" + serdes = self._create_overflow_serdes(tmp_path) + value = {"id": 1} + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "data" in envelope + assert "file" not in envelope + assert json.loads(envelope["data"]) == value + + def test_small_value_roundtrip(self, tmp_path): + """Small values should round-trip through inline storage.""" + serdes = self._create_overflow_serdes(tmp_path) + value = {"name": "test", "value": 123} + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_large_value_overflows_to_file(self, tmp_path): + """Values exceeding threshold should overflow to file.""" + serdes = self._create_overflow_serdes(tmp_path) + # Create a value that exceeds the ~255KB threshold + value = {"data": "x" * (256 * 1024)} + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "file" in envelope + assert "data" not in envelope + assert os.path.exists(envelope["file"]) + + def test_large_value_roundtrip(self, tmp_path): + """Large values should round-trip through file storage.""" + serdes = self._create_overflow_serdes(tmp_path) + value = {"data": "x" * (256 * 1024)} + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_deserialize_file_pointer_envelope(self, tmp_path): + """Deserialize should handle file pointer envelope from ALWAYS mode.""" + always_serdes = create_filesystem_serdes(str(tmp_path)) + overflow_serdes = self._create_overflow_serdes(tmp_path) + value = {"id": 1, "name": "Alice"} + + # Serialize with ALWAYS (creates file), deserialize with OVERFLOW + serialized = always_serdes.serialize(value, MOCK_CONTEXT) + deserialized = overflow_serdes.deserialize(serialized, MOCK_CONTEXT) + + assert deserialized == value + + def test_deserialize_inline_data_envelope(self, tmp_path): + """Deserialize should handle inline data envelope.""" + serdes = self._create_overflow_serdes(tmp_path) + value = {"id": 1} + + envelope = json.dumps({"data": json.dumps(value)}) + deserialized = serdes.deserialize(envelope, MOCK_CONTEXT) + + assert deserialized == value + + +class TestPathEncodingIntegration: + """Tests for path encoding modes with full serialize/deserialize cycle.""" + + def test_uri_encoding_with_durable_arn(self, tmp_path): + """URI encoding should derive compact directory from ARN.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result = serdes.serialize({"id": 1}, MOCK_CONTEXT) + envelope = json.loads(result) + + file_path = envelope["file"] + # Should contain the function name and execution details in the path + assert "test-function" in file_path + assert "test-exec-id" in file_path + assert "test-invocation-id" in file_path + + def test_uri_encoding_with_non_durable_arn(self, tmp_path): + """URI encoding should fallback for non-durable ARNs.""" + serdes = create_filesystem_serdes(str(tmp_path)) + context = SerDesContext( + operation_id="step-1", + durable_execution_arn=NON_DURABLE_ARN, + ) + + result = serdes.serialize({"id": 1}, context) + envelope = json.loads(result) + + file_path = envelope["file"] + # Should NOT contain raw colons (they get encoded) + dir_name = os.path.basename(os.path.dirname(file_path)) + assert ":" not in dir_name + + def test_hash_encoding(self, tmp_path): + """HASH encoding should produce fixed-length segment names.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig(path_encoding=FileSystemPathEncoding.HASH), + ) + + result = serdes.serialize({"id": 1}, MOCK_CONTEXT) + envelope = json.loads(result) + + file_path = envelope["file"] + file_name = os.path.basename(file_path) + # Hash (64 chars) + ".json" (5 chars) + assert len(file_name) == 69 + # Directory should also be a hash + dir_name = os.path.basename(os.path.dirname(file_path)) + assert len(dir_name) == 64 + + def test_hash_encoding_roundtrip(self, tmp_path): + """HASH encoding should support full round-trip.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig(path_encoding=FileSystemPathEncoding.HASH), + ) + value = {"complex": {"nested": [1, 2, 3]}} + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_uri_encoding_unsafe_entity_id(self, tmp_path): + """URI encoding should safely encode path-traversal entity IDs.""" + serdes = create_filesystem_serdes(str(tmp_path)) + unsafe_context = SerDesContext( + operation_id="../invoices/2026", + durable_execution_arn=TEST_ARN, + ) + + result = serdes.serialize({"id": 1}, unsafe_context) + envelope = json.loads(result) + + file_path = envelope["file"] + # The file should be under the expected directory + assert str(tmp_path) in file_path + # No path traversal + assert "/../" not in file_path + + def test_uri_encoding_safe_ids_unchanged(self, tmp_path): + """URI encoding should not alter safe IDs.""" + serdes = create_filesystem_serdes(str(tmp_path)) + context = SerDesContext( + operation_id="simple-step-id", + durable_execution_arn=TEST_ARN, + ) + + result = serdes.serialize({"id": 1}, context) + envelope = json.loads(result) + + file_path = envelope["file"] + assert "simple-step-id.json" in file_path + + +class TestPreviewWithSerdes: + """Tests for preview feature integrated with filesystem serdes.""" + + def test_always_mode_with_preview(self, tmp_path): + """ALWAYS mode should include preview in envelope alongside file pointer.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig( + generate_preview=lambda value: build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="id")], + mask=[PreviewField(name="secret")], + ), + ), + ), + ) + value = {"id": "abc", "secret": "s3cr3t", "other": "ignored"} + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "file" in envelope + assert "preview" in envelope + assert envelope["preview"] == {"id": "abc", "secret": "***"} + + def test_deserialize_ignores_preview(self, tmp_path): + """Deserialize should ignore preview field and read from file.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig( + generate_preview=lambda value: build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + ), + ), + ), + ) + value = {"id": "abc", "full_data": "complete"} + + serialized = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(serialized, MOCK_CONTEXT) + + # Full data is returned, not just the preview + assert deserialized == value + + def test_overflow_mode_no_preview_for_inline(self, tmp_path): + """OVERFLOW mode should not include preview for inline payloads.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig( + storage_mode=FileSystemSerdesMode.OVERFLOW, + generate_preview=lambda value: build_preview( + value, + PreviewConfig(mode=PreviewMode.INCLUDE_ALL), + ), + ), + ) + value = {"id": "abc"} # small — stays inline + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "data" in envelope + assert "preview" not in envelope + + def test_overflow_mode_includes_preview_when_overflows(self, tmp_path): + """OVERFLOW mode should include preview when payload overflows to file.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig( + storage_mode=FileSystemSerdesMode.OVERFLOW, + generate_preview=lambda value: build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="id")], + ), + ), + ), + ) + value = {"id": "abc", "data": "x" * (256 * 1024)} + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "file" in envelope + assert "preview" in envelope + assert envelope["preview"] == {"id": "abc"} + + def test_no_preview_when_generate_returns_none(self, tmp_path): + """No preview field in envelope when generate_preview returns None.""" + serdes = create_filesystem_serdes( + str(tmp_path), + FileSystemSerdesConfig( + generate_preview=lambda value: None, + ), + ) + value = {"id": "abc"} + + result = serdes.serialize(value, MOCK_CONTEXT) + envelope = json.loads(result) + + assert "file" in envelope + assert "preview" not in envelope + + +class TestSerdesApiIntegration: + """Tests for filesystem serdes with the top-level serialize/deserialize functions.""" + + def test_works_with_top_level_serialize_function(self, tmp_path): + """FileSystem serdes should work with the top-level serialize function.""" + from aws_durable_execution_sdk_python.serdes import deserialize, serialize + + fs_serdes = create_filesystem_serdes(str(tmp_path)) + value = {"id": 1, "name": "test"} + + serialized = serialize(fs_serdes, value, TEST_OPERATION_ID, TEST_ARN) + deserialized = deserialize(fs_serdes, serialized, TEST_OPERATION_ID, TEST_ARN) + + assert deserialized == value + + def test_serialization_error_handling(self, tmp_path): + """Serialize should raise ExecutionError on failure via top-level API.""" + from aws_durable_execution_sdk_python.exceptions import ExecutionError + from aws_durable_execution_sdk_python.serdes import serialize + + fs_serdes = create_filesystem_serdes("/nonexistent/readonly/path") + + with pytest.raises(ExecutionError, match="Serialization failed"): + serialize(fs_serdes, {"data": "test"}, TEST_OPERATION_ID, TEST_ARN) + + def test_deserialization_error_handling(self, tmp_path): + """Deserialize should raise ExecutionError on failure via top-level API.""" + from aws_durable_execution_sdk_python.exceptions import ExecutionError + from aws_durable_execution_sdk_python.serdes import deserialize + + fs_serdes = create_filesystem_serdes(str(tmp_path)) + # Invalid envelope pointing to nonexistent file + invalid_data = json.dumps({"file": "/nonexistent/file.json"}) + + with pytest.raises(ExecutionError, match="Deserialization failed"): + deserialize(fs_serdes, invalid_data, TEST_OPERATION_ID, TEST_ARN) + + +class TestEdgeCases: + """Tests for edge cases and error scenarios.""" + + def test_empty_dict(self, tmp_path): + """Empty dict should serialize and deserialize correctly.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result = serdes.serialize({}, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == {} + + def test_empty_list(self, tmp_path): + """Empty list should serialize and deserialize correctly.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result = serdes.serialize([], MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == [] + + def test_deeply_nested_structure(self, tmp_path): + """Deeply nested structures should serialize correctly.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = {"a": {"b": {"c": {"d": {"e": "deep"}}}}} + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_unicode_content(self, tmp_path): + """Unicode content should be handled correctly.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = {"emoji": "🚀", "japanese": "日本語", "arabic": "العربية"} + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_boolean_values(self, tmp_path): + """Boolean values should serialize correctly.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = {"active": True, "deleted": False} + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_large_number_values(self, tmp_path): + """Large numbers should serialize correctly.""" + serdes = create_filesystem_serdes(str(tmp_path)) + value = { + "big_int": 2**53 - 1, + "negative": -(2**53), + "float": 1.7976931348623157e308, + } + + result = serdes.serialize(value, MOCK_CONTEXT) + deserialized = serdes.deserialize(result, MOCK_CONTEXT) + + assert deserialized == value + + def test_same_context_overwrites_file(self, tmp_path): + """Serializing with same context should overwrite the file.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result1 = serdes.serialize({"version": 1}, MOCK_CONTEXT) + result2 = serdes.serialize({"version": 2}, MOCK_CONTEXT) + + # Both should point to the same file path + envelope1 = json.loads(result1) + envelope2 = json.loads(result2) + assert envelope1["file"] == envelope2["file"] + + # File should contain the latest value + deserialized = serdes.deserialize(result2, MOCK_CONTEXT) + assert deserialized == {"version": 2} + + def test_default_config(self, tmp_path): + """Default config should use ALWAYS mode and URI encoding.""" + serdes = create_filesystem_serdes(str(tmp_path)) + + result = serdes.serialize({"id": 1}, MOCK_CONTEXT) + envelope = json.loads(result) + + # ALWAYS mode: file pointer, no inline data + assert "file" in envelope + assert "data" not in envelope + # URI encoding: human-readable path + assert "test-function" in envelope["file"] diff --git a/packages/aws-durable-execution-sdk-python/tests/preview_test.py b/packages/aws-durable-execution-sdk-python/tests/preview_test.py new file mode 100644 index 00000000..59ac2678 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python/tests/preview_test.py @@ -0,0 +1,222 @@ +"""Unit tests for the preview module.""" + +from aws_durable_execution_sdk_python.preview import ( + FieldMatchMode, + PreviewConfig, + PreviewField, + PreviewMode, + build_preview, +) + + +class TestBuildPreview: + """Tests for the build_preview function.""" + + def test_include_all_mode(self): + """INCLUDE_ALL should include all fields by default.""" + value = {"id": "123", "email": "alice@example.com", "ssn": "000-00-0000"} + result = build_preview(value, PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) + + assert result is not None + assert result["id"] == "123" + assert result["email"] == "alice@example.com" + assert result["ssn"] == "000-00-0000" + + def test_include_all_with_exclude(self): + """INCLUDE_ALL + exclude should omit excluded fields.""" + value = {"id": "123", "email": "alice@example.com", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + exclude=[PreviewField(name="ssn")], + ), + ) + + assert result is not None + assert "ssn" not in result + assert result["id"] == "123" + + def test_exclude_all_with_include(self): + """EXCLUDE_ALL + include should only include specified fields.""" + value = {"id": "123", "email": "alice@example.com", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="id"), PreviewField(name="email")], + ), + ) + + assert result is not None + assert result["id"] == "123" + assert result["email"] == "alice@example.com" + assert "ssn" not in result + + def test_mask_replaces_value(self): + """Mask should replace visible field value with mask_string.""" + value = {"id": "123", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + mask=[PreviewField(name="ssn")], + ), + ) + + assert result is not None + assert result["ssn"] == "***" + assert result["id"] == "123" + + def test_mask_custom_string(self): + """Mask should use custom mask_string.""" + value = {"id": "123", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + mask=[PreviewField(name="ssn")], + mask_string="[REDACTED]", + ), + ) + + assert result is not None + assert result["ssn"] == "[REDACTED]" + + def test_mask_implies_visibility_in_exclude_all(self): + """Mask implies visibility in EXCLUDE_ALL — masked field shown even without include.""" + value = {"id": "123", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + mask=[PreviewField(name="ssn")], + ), + ) + + assert result is not None + assert result["ssn"] == "***" + assert "id" not in result + + def test_exclude_wins_over_mask(self): + """Exclude always wins — excluded field is not shown even if in mask.""" + value = {"id": "123", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + exclude=[PreviewField(name="ssn")], + mask=[PreviewField(name="ssn")], + ), + ) + + assert result is not None + assert "ssn" not in result + + def test_path_match_mode(self): + """PATH match should only match exact path.""" + value = {"email": "root@example.com", "user": {"email": "nested@example.com"}} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="email", match=FieldMatchMode.PATH)], + ), + ) + + assert result is not None + assert result["email"] == "root@example.com" + assert "user" not in result + + def test_anywhere_match_mode(self): + """ANYWHERE match should match field at any depth.""" + value = {"email": "root@example.com", "user": {"email": "nested@example.com"}} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.EXCLUDE_ALL, + include=[PreviewField(name="email")], + ), + ) + + assert result is not None + assert result["email"] == "root@example.com" + assert result["user"]["email"] == "nested@example.com" + + def test_nested_objects(self): + """Preview should handle nested objects.""" + value = {"user": {"name": "Alice", "role": "admin"}} + result = build_preview(value, PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) + + assert result is not None + assert result["user"]["name"] == "Alice" + assert result["user"]["role"] == "admin" + + def test_arrays_merged(self): + """Array structure is not preserved — fields merged into a plain object.""" + value = {"items": [{"secret": "xyz"}, {"secret": "abc"}]} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + mask=[PreviewField(name="secret")], + ), + ) + + assert result is not None + assert result["items"]["secret"] == "***" + + def test_max_preview_bytes_budget(self): + """Preview should respect maxPreviewBytes budget.""" + value = {"id": "123", "email": "alice@example.com", "ssn": "000-00-0000"} + result = build_preview( + value, + PreviewConfig( + mode=PreviewMode.INCLUDE_ALL, + max_preview_bytes=20, + ), + ) + + assert result is not None + assert len(result) < len(value) + + def test_returns_none_for_non_dict(self): + """Preview should return None for non-dict values.""" + assert ( + build_preview("string", PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) is None + ) + assert build_preview(42, PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) is None + assert build_preview(None, PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) is None + assert ( + build_preview([1, 2, 3], PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) + is None + ) + + def test_returns_none_when_no_fields_visible(self): + """Preview should return None when no fields match.""" + value = {"id": "123", "name": "Alice"} + result = build_preview( + value, + PreviewConfig(mode=PreviewMode.EXCLUDE_ALL), + ) + + assert result is None + + def test_skips_dangerous_keys(self): + """Preview should skip dangerous keys like __proto__.""" + value = {"id": "123", "__proto__": "malicious", "constructor": "bad"} + result = build_preview(value, PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) + + assert result is not None + assert result["id"] == "123" + assert "__proto__" not in result + assert "constructor" not in result + + def test_skips_keys_with_dots(self): + """Preview should skip keys containing dots.""" + value = {"id": "123", "some.dotted.key": "value"} + result = build_preview(value, PreviewConfig(mode=PreviewMode.INCLUDE_ALL)) + + assert result is not None + assert result["id"] == "123" + assert "some.dotted.key" not in result From acf1008ce7a32d9cb6c3d79f1c4c367ee067cadc Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Fri, 26 Jun 2026 23:56:57 +0000 Subject: [PATCH 2/2] chore: update SAM template --- .../template.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/aws-durable-execution-sdk-python-examples/template.yaml b/packages/aws-durable-execution-sdk-python-examples/template.yaml index 0cbb9a82..6f434698 100644 --- a/packages/aws-durable-execution-sdk-python-examples/template.yaml +++ b/packages/aws-durable-execution-sdk-python-examples/template.yaml @@ -1069,7 +1069,7 @@ } } }, - "FilesystemSerDesBasic": { + "FilesystemSerdesBasic": { "Type": "AWS::Serverless::Function", "Properties": { "CodeUri": "build/", @@ -1092,12 +1092,12 @@ } } }, - "FilesystemSerDesOverflow": { + "FilesystemSerdesOverflow": { "Type": "AWS::Serverless::Function", "Properties": { "CodeUri": "build/", "Handler": "filesystem_serdes_overflow.handler", - "Description": "Filesystem serdes in OVERFLOW mode — inline for small payloads, file for large", + "Description": "Filesystem serdes in OVERFLOW mode \u2014 inline for small payloads, file for large", "Role": { "Fn::GetAtt": [ "DurableFunctionRole", @@ -1115,7 +1115,7 @@ } } }, - "FilesystemSerDesPreview": { + "FilesystemSerdesPreview": { "Type": "AWS::Serverless::Function", "Properties": { "CodeUri": "build/",