Add MapAsync for concurrent collection processing#2408
Conversation
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
There was a problem hiding this comment.
Pull request overview
This PR adds a new concurrent collection-processing primitive, IDurableContext.MapAsync, to the Amazon.Lambda.DurableExecution SDK. It mirrors the existing ParallelAsync concurrency engine while introducing Map-specific defaults (notably a permissive AllCompleted() completion policy) and Map-specific exception semantics.
Changes:
- Introduces
MapAsync+MapConfig+MapException, and a newMapOperationbuilt on shared concurrency orchestration. - Refactors the
ParallelAsyncimplementation by extracting shared logic intoConcurrentOperation<T>and generalizing parent checkpoint summaries toBatchSummary. - Adds unit and integration tests plus integration test Lambda function projects for Map scenarios (happy path, partial failure, max concurrency, first-successful, failure tolerance, replay determinism).
Reviewed changes
Copilot reviewed 41 out of 41 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| MAP-IMPLEMENTATION-PLAN.md | Design/implementation plan for MapAsync and related refactors. |
| Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs | Updates replay JSON payload shape from Branches to Units for parallel tests. |
| Libraries/test/Amazon.Lambda.DurableExecution.Tests/MapOperationTests.cs | Adds comprehensive unit tests for MapAsync behavior, replay, naming, and completion policies. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/MapReplayDeterminismFunction.csproj | New integration test function project for map replay determinism. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Function.cs | Workflow that forces suspend/resume to validate per-item replay determinism. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Dockerfile | Container packaging for the replay determinism test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/MapPartialFailureFunction.csproj | New integration test function project for permissive-default partial failure. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Function.cs | Workflow validating Map default AllCompleted() preserves partial failures without failing workflow. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Dockerfile | Container packaging for the partial failure test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/MapMaxConcurrencyFunction.csproj | New integration test function project for MaxConcurrency. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Function.cs | Workflow validating dispatch throttling via durable waits + timestamps. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Dockerfile | Container packaging for the max concurrency test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/MapHappyPathFunction.csproj | New integration test function project for map happy path. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Function.cs | Workflow validating step-per-item processing and ItemNamer visibility. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Dockerfile | Container packaging for the happy path test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/MapFirstSuccessfulFunction.csproj | New integration test function project for first-successful short-circuit. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Function.cs | Workflow validating FirstSuccessful() and started/unfinished item reporting. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Dockerfile | Container packaging for the first-successful test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/MapFailureToleranceFunction.csproj | New integration test function project for failure tolerance exceeded. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Function.cs | Workflow validating failure tolerance triggers MapException and fails workflow. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Dockerfile | Container packaging for the failure tolerance test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapReplayDeterminismTest.cs | Integration test asserting deterministic item operation IDs and replayed step results. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapPartialFailureTest.cs | Integration test asserting permissive default allows partial failure with SUCCEEDED workflow. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapMaxConcurrencyTest.cs | Integration test asserting MaxConcurrency throttles dispatch waves. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapHappyPathTest.cs | Integration test validating end-to-end happy path and history events/names. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFirstSuccessfulTest.cs | Integration test validating first-successful short-circuit behavior. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFailureToleranceTest.cs | Integration test validating failure tolerance triggers FAILED workflow + MapException indication. |
| Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs | Adds OperationSubTypes.Map and OperationSubTypes.MapItem. |
| Libraries/src/Amazon.Lambda.DurableExecution/MapConfig.cs | Adds Map configuration object with permissive default completion config and ItemNamer. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs | Removes the Parallel-specific summary type (replaced by shared BatchSummary). |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs | Refactors ParallelOperation to a thin subclass of ConcurrentOperation. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs | Removes Parallel-specific JSON context (replaced by BatchJsonContext). |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/MapOperation.cs | Adds MapOperation as a thin subclass of ConcurrentOperation. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs | Adds extracted shared orchestration/replay/checkpoint engine for Parallel + Map. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchSummary.cs | Adds shared parent checkpoint payload type for concurrent ops. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchJsonContext.cs | Adds shared source-gen JSON context for BatchSummary payloads. |
| Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs | Adds public MapAsync<TItem, TResult> API with XML docs. |
| Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs | Adds MapException type parallel to ParallelException. |
| Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs | Wires MapAsync into the durable context runtime implementation. |
| Docs/durable-execution-design.md | Updates design docs: Map default completion behavior and removes ItemBatcher references. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| internal sealed class BatchSummary | ||
| { | ||
| [JsonPropertyName("CompletionReason")] | ||
| public string? CompletionReason { get; set; } | ||
|
|
||
| [JsonPropertyName("Units")] | ||
| public IList<BatchUnitSummary> Units { get; set; } = new List<BatchUnitSummary>(); | ||
| } |
There was a problem hiding this comment.
this is fine - since we are in preview
1a1d5bc to
69364a0
Compare
97d51e0 to
e0b6f5f
Compare
c88ad8a to
d347067
Compare
e0b6f5f to
dcda8da
Compare
dcda8da to
fb4572a
Compare
| using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; | ||
| using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; | ||
|
|
||
| namespace Amazon.Lambda.DurableExecution.Internal; |
There was a problem hiding this comment.
all the common logic (mostly everything) shared between map and parallel was moved to ConcurrentOperation
fb4572a to
02c23f5
Compare
…CCEED The parent context for both ParallelAsync and MapAsync now always checkpoints as Action=SUCCEED with the summary (including CompletionReason) in the Payload field, matching the wire format used by the Python, JS, and Java SDKs. Previously the C# SDK emitted Action=FAIL for FailureToleranceExceeded, which left ContextDetails.Result empty (Payload is forbidden on FAIL updates) and made it impossible for replay to reconstruct a caught ParallelException/MapException deterministically. The exception is now thrown SDK-side after the checkpoint based on the CompletionReason in the reconstructed BatchResult.
#2216
What
Adds
IDurableContext.MapAsynctoAmazon.Lambda.DurableExecution.MapAsyncprocesses a collection in parallel with one child context per item, mirroring the Python/JS/Java SDKs whereMapis a sibling ofParallelsharing one concurrency engine. It reuses theIBatchResult<T>family and concurrency/completion machinery introduced byParallelAsyncin #2375.Public API:
IDurableContext.MapAsync<TItem, TResult>(IReadOnlyList<TItem> items, Func<IDurableContext, TItem, int, IReadOnlyList<TItem>, CancellationToken, Task<TResult>> func, string? name, MapConfig? config, CancellationToken)Task<IBatchResult<TResult>>. The per-item func receives the durable context, the item, its zero-based index, the full source list (matching the Python/JS SDKs), and aCancellationTokenlinking the caller's token with the SDK's workflow-shutdown signal (also tripped cooperatively when a sibling satisfies theCompletionConfigand the map short-circuits).MapConfigMaxConcurrency(int?, null = unlimited, must be ≥1),CompletionConfig,NestingType,ItemNamer. DefaultsCompletionConfigtoAllCompleted()(permissive), matching Python/JavaMap— intentionally differs fromParallelConfig'sAllSuccessful().NestingTypedefaults toNested;NestingType.Flatis not yet supported and throwsNotSupportedExceptionwhen invoked.MapConfig.ItemNamerFunc<object, int, string>?— supplies a per-item branch name (item + zero-based index) surfaced in traces and onIBatchItem<T>.Name. When null, branches are named by index ("0","1", ...). (NoItemBatcher— not implemented in any reference SDK.)MapExceptionCompletionConfigsignalsFailureToleranceExceeded, so callers can distinguishMapfailures fromParallelfailures. Exposes a type-erasedIBatchResult? Result(cast toIBatchResult<T>when the item type is known) and theCompletionReason. Base type for map failures; catching it stays forward-compatible.Behavior: by default (
AllCompleted) every item runs regardless of per-item failures, which are captured on the correspondingIBatchItem<T>and surfaced viaIBatchResult<T>.Failedrather than thrown. The map throwsMapExceptiononly when theCompletionConfigcriteria are violated. For fail-fast semantics, setCompletionConfigtoAllSuccessful()or callIBatchResult<T>.ThrowIfError()on the result.Implementation notes:
ConcurrentOperation<T>base holding all orchestration, completion, checkpoint, and replay logic.ParallelOperationandMapOperationare thin subclasses supplying only the per-unit(name, func), sub-type labels, and failure-exception factory. AddsMap/MapItemtoOperationSubTypes.ParallelSummary/ParallelJsonContextinto sharedBatchSummary/BatchJsonContext.Per-item checkpoint payloads are serialized via the
ILambdaSerializerregistered onILambdaContext.Serializer— the same pattern asParallelAsync/StepAsync/RunInChildContextAsync. The AOT story is determined entirely by which serializer the user registers with the runtime (e.g.,SourceGeneratorLambdaJsonSerializer<TContext>).MapConfigdoes not expose a serializer slot.Testing
24 new unit tests in
MapOperationTests.cs, mirroring theParallelset:CompletionConfigmatrix:AllSuccessful,AllCompleted,FirstSuccessful,MinSuccessful,ToleratedFailureCount,ToleratedFailurePercentage— both pass and fail thresholds.MaxConcurrencyenforced; unbounded when null.ItemNamer.IBatchResult<T>accessors andGetResults/GetErrors/ThrowIfErrorsemantics.6 new integration functions/tests mirroring the
Parallelset (require AWS credentials to run):MapHappyPath,MapPartialFailure,MapFailureTolerance,MapFirstSuccessful,MapMaxConcurrency,MapReplayDeterminism.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.