Skip to content

Add stateful-history delta delivery to the workflow worker#1112

Draft
JoshVanL wants to merge 1 commit into
dapr:mainfrom
JoshVanL:stateful-history
Draft

Add stateful-history delta delivery to the workflow worker#1112
JoshVanL wants to merge 1 commit into
dapr:mainfrom
JoshVanL:stateful-history

Conversation

@JoshVanL

@JoshVanL JoshVanL commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

durabletask-go adds the stateful-history optimization: a worker caches an instance's committed history between turns on its work-item stream, and the sidecar sends only the new events instead of the full history each turn. This implements the worker side in the vendored durabletask runtime; the backend is daprd, so there is no other production change.

Regenerate the durabletask proto stubs
(dapr/ext/workflow/_durabletask/internal) for the new contract: the CachedHistory message, optional WorkflowRequest.cachedHistory, GetWorkItemsRequest.capabilities, and WORKER_CAPABILITY_STATEFUL_HISTORY (the never-implemented HISTORY_STREAMING value is now reserved).

worker.py (TaskHubGrpcWorker):

  • Add _WorkflowHistoryCache: a lock-guarded, per-stream cache of each instance's committed history, reclaimed by a sliding TTL, an instance-count cap, and an optional byte budget (LRU eviction). Eviction is always safe because a miss is recovered via the GetInstanceHistory RPC.
  • Advertise WORKER_CAPABILITY_STATEFUL_HISTORY on GetWorkItems.
  • Before replay, reconstruct the full history: for a delta work item (cachedHistory) prepend the cached prefix to the delta, falling back to GetInstanceHistory on any miss (cold stream, eviction, prefix-length mismatch); otherwise use the full pastEvents.
  • After a turn, cache the committed history (never the new events), and drop it when the turn ends the execution (a completeWorkflow action, whatever its status); reset the whole cache on stream reconnect; sweep TTL on a janitor thread.
  • Add a disable_stateful_history opt-out and history_cache_ttl / _max_instances / _max_bytes tuning to TaskHubGrpcWorker.

Requires:
dapr/durabletask-protobuf#54
dapr/durabletask-go#110
dapr/dapr#10142

durabletask-go adds the stateful-history optimization: a worker caches
an instance's committed history between turns on its work-item stream,
and the sidecar sends only the new events instead of the full history
each turn. This implements the worker side in the vendored durabletask
runtime; the backend is daprd, so there is no other production change.

Regenerate the durabletask proto stubs
(dapr/ext/workflow/_durabletask/internal) for the new contract: the
CachedHistory message, optional WorkflowRequest.cachedHistory,
GetWorkItemsRequest.capabilities, and WORKER_CAPABILITY_STATEFUL_HISTORY
(the never-implemented HISTORY_STREAMING value is now reserved).

worker.py (TaskHubGrpcWorker):
- Add _WorkflowHistoryCache: a lock-guarded, per-stream cache of each instance's
  committed history, reclaimed by a sliding TTL, an instance-count cap, and an
  optional byte budget (LRU eviction). Eviction is always safe because a miss is
  recovered via the GetInstanceHistory RPC.
- Advertise WORKER_CAPABILITY_STATEFUL_HISTORY on GetWorkItems.
- Before replay, reconstruct the full history: for a delta work item
  (cachedHistory) prepend the cached prefix to the delta, falling back to
  GetInstanceHistory on any miss (cold stream, eviction, prefix-length mismatch);
  otherwise use the full pastEvents.
- After a turn, cache the committed history (never the new events), and drop it
  when the turn ends the execution (a completeWorkflow action, whatever its
  status); reset the whole cache on stream reconnect; sweep TTL on a janitor
  thread.
- Add a disable_stateful_history opt-out and history_cache_ttl / _max_instances /
  _max_bytes tuning to TaskHubGrpcWorker.

Requires:
dapr/durabletask-protobuf#54
dapr/durabletask-go#110
dapr/dapr#10142

Signed-off-by: joshvanl <me@joshvanl.dev>
@codecov

codecov Bot commented Jun 30, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 58.89571% with 67 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.25%. Comparing base (bffb749) to head (f4cdb08).
⚠️ Report is 163 commits behind head on main.

Files with missing lines Patch % Lines
.../_durabletask/internal/orchestrator_service_pb2.py 1.63% 60 Missing ⚠️
dapr/ext/workflow/_durabletask/worker.py 93.13% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1112      +/-   ##
==========================================
- Coverage   86.63%   82.25%   -4.38%     
==========================================
  Files          84      116      +32     
  Lines        4473     9681    +5208     
==========================================
+ Hits         3875     7963    +4088     
- Misses        598     1718    +1120     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements the worker-side support for Durable Task’s stateful-history (delta) delivery optimization in the vendored workflow runtime. This adds a per-stream committed-history cache so daprd can omit already-known history prefixes on subsequent turns, reducing per-turn payload sizes while preserving correctness via fallback history fetches.

Changes:

  • Add a lock-guarded, bounded (TTL / max instances / optional byte budget) per-stream workflow history cache and integrate it into TaskHubGrpcWorker history replay.
  • Advertise WORKER_CAPABILITY_STATEFUL_HISTORY via GetWorkItemsRequest.capabilities and reconstruct full history for delta work items (fallback to GetInstanceHistory on cache miss).
  • Regenerate durabletask protobuf stubs and add unit/e2e tests covering cache behavior and end-to-end correctness.

Reviewed changes

Copilot reviewed 4 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
dapr/ext/workflow/_durabletask/worker.py Adds history cache + janitor, capability advertisement, delta history reconstruction, and cache maintenance on turn completion.
tests/ext/workflow/durabletask/test_worker_history_cache.py New unit tests for cache bounds/eviction and _resolve_history fallback behavior.
tests/ext/workflow/durabletask/test_orchestration_e2e.py Adds an e2e workflow that exercises multi-turn behavior with optimization enabled/disabled.
dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.pyi Updated type stubs for new CachedHistory, capabilities, and capability enum changes.
dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py Regenerated protobuf runtime module reflecting the updated contract.
Files not reviewed (1)
  • dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py: Generated file

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +527 to +531
if not self._disable_stateful_history:
self._history_janitor = Thread(
target=self._sweep_history_cache_loop, name='WorkerHistoryJanitor', daemon=True
)
self._history_janitor.start()
Comment on lines +1049 to +1054
cached = self._history_cache.get(req.instanceId)
if cached is not None and len(cached) == req.cachedHistory.eventCount:
return cached + list(req.pastEvents)

response = stub.GetInstanceHistory(pb.GetInstanceHistoryRequest(instanceId=req.instanceId))
return list(response.events)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants