Add stateful-history delta delivery to the workflow worker#1112
Add stateful-history delta delivery to the workflow worker#1112JoshVanL wants to merge 1 commit into
Conversation
durabletask-go adds the stateful-history optimization: a worker caches an instance's committed history between turns on its work-item stream, and the sidecar sends only the new events instead of the full history each turn. This implements the worker side in the vendored durabletask runtime; the backend is daprd, so there is no other production change. Regenerate the durabletask proto stubs (dapr/ext/workflow/_durabletask/internal) for the new contract: the CachedHistory message, optional WorkflowRequest.cachedHistory, GetWorkItemsRequest.capabilities, and WORKER_CAPABILITY_STATEFUL_HISTORY (the never-implemented HISTORY_STREAMING value is now reserved). worker.py (TaskHubGrpcWorker): - Add _WorkflowHistoryCache: a lock-guarded, per-stream cache of each instance's committed history, reclaimed by a sliding TTL, an instance-count cap, and an optional byte budget (LRU eviction). Eviction is always safe because a miss is recovered via the GetInstanceHistory RPC. - Advertise WORKER_CAPABILITY_STATEFUL_HISTORY on GetWorkItems. - Before replay, reconstruct the full history: for a delta work item (cachedHistory) prepend the cached prefix to the delta, falling back to GetInstanceHistory on any miss (cold stream, eviction, prefix-length mismatch); otherwise use the full pastEvents. - After a turn, cache the committed history (never the new events), and drop it when the turn ends the execution (a completeWorkflow action, whatever its status); reset the whole cache on stream reconnect; sweep TTL on a janitor thread. - Add a disable_stateful_history opt-out and history_cache_ttl / _max_instances / _max_bytes tuning to TaskHubGrpcWorker. Requires: dapr/durabletask-protobuf#54 dapr/durabletask-go#110 dapr/dapr#10142 Signed-off-by: joshvanl <me@joshvanl.dev>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1112 +/- ##
==========================================
- Coverage 86.63% 82.25% -4.38%
==========================================
Files 84 116 +32
Lines 4473 9681 +5208
==========================================
+ Hits 3875 7963 +4088
- Misses 598 1718 +1120 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Implements the worker-side support for Durable Task’s stateful-history (delta) delivery optimization in the vendored workflow runtime. This adds a per-stream committed-history cache so daprd can omit already-known history prefixes on subsequent turns, reducing per-turn payload sizes while preserving correctness via fallback history fetches.
Changes:
- Add a lock-guarded, bounded (TTL / max instances / optional byte budget) per-stream workflow history cache and integrate it into
TaskHubGrpcWorkerhistory replay. - Advertise
WORKER_CAPABILITY_STATEFUL_HISTORYviaGetWorkItemsRequest.capabilitiesand reconstruct full history for delta work items (fallback toGetInstanceHistoryon cache miss). - Regenerate durabletask protobuf stubs and add unit/e2e tests covering cache behavior and end-to-end correctness.
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
dapr/ext/workflow/_durabletask/worker.py |
Adds history cache + janitor, capability advertisement, delta history reconstruction, and cache maintenance on turn completion. |
tests/ext/workflow/durabletask/test_worker_history_cache.py |
New unit tests for cache bounds/eviction and _resolve_history fallback behavior. |
tests/ext/workflow/durabletask/test_orchestration_e2e.py |
Adds an e2e workflow that exercises multi-turn behavior with optimization enabled/disabled. |
dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.pyi |
Updated type stubs for new CachedHistory, capabilities, and capability enum changes. |
dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py |
Regenerated protobuf runtime module reflecting the updated contract. |
Files not reviewed (1)
- dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py: Generated file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not self._disable_stateful_history: | ||
| self._history_janitor = Thread( | ||
| target=self._sweep_history_cache_loop, name='WorkerHistoryJanitor', daemon=True | ||
| ) | ||
| self._history_janitor.start() |
| cached = self._history_cache.get(req.instanceId) | ||
| if cached is not None and len(cached) == req.cachedHistory.eventCount: | ||
| return cached + list(req.pastEvents) | ||
|
|
||
| response = stub.GetInstanceHistory(pb.GetInstanceHistoryRequest(instanceId=req.instanceId)) | ||
| return list(response.events) |
durabletask-go adds the stateful-history optimization: a worker caches an instance's committed history between turns on its work-item stream, and the sidecar sends only the new events instead of the full history each turn. This implements the worker side in the vendored durabletask runtime; the backend is daprd, so there is no other production change.
Regenerate the durabletask proto stubs
(dapr/ext/workflow/_durabletask/internal) for the new contract: the CachedHistory message, optional WorkflowRequest.cachedHistory, GetWorkItemsRequest.capabilities, and WORKER_CAPABILITY_STATEFUL_HISTORY (the never-implemented HISTORY_STREAMING value is now reserved).
worker.py (TaskHubGrpcWorker):
Requires:
dapr/durabletask-protobuf#54
dapr/durabletask-go#110
dapr/dapr#10142