diff --git a/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py b/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py index e9c39e64..4d429d29 100644 --- a/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py +++ b/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.py @@ -30,7 +30,7 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aorchestrator_service.proto\x1a\x13orchestration.proto\x1a\x14history_events.proto\x1a\x1aorchestrator_actions.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x02\n\x0f\x41\x63tivityRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x10workflowInstance\x18\x04 \x01(\x0b\x32\x11.WorkflowInstance\x12\x0e\n\x06taskId\x18\x05 \x01(\x05\x12)\n\x12parentTraceContext\x18\x06 \x01(\x0b\x32\r.TraceContext\x12\x17\n\x0ftaskExecutionId\x18\x07 \x01(\t\x12\x32\n\x11propagatedHistory\x18\x08 \x01(\x0b\x32\x12.PropagatedHistoryH\x00\x88\x01\x01\x42\x14\n\x12_propagatedHistory\"\xaa\x01\n\x10\x41\x63tivityResponse\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0e\n\x06taskId\x18\x02 \x01(\x05\x12,\n\x06result\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x0e\x66\x61ilureDetails\x18\x04 \x01(\x0b\x32\x13.TaskFailureDetails\x12\x17\n\x0f\x63ompletionToken\x18\x05 \x01(\t\"\xbc\x02\n\x0fWorkflowRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x31\n\x0b\x65xecutionId\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12!\n\npastEvents\x18\x03 \x03(\x0b\x32\r.HistoryEvent\x12 \n\tnewEvents\x18\x04 \x03(\x0b\x32\r.HistoryEvent\x12 \n\x18requiresHistoryStreaming\x18\x06 \x01(\x08\x12 \n\x06router\x18\x07 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x12\x32\n\x11propagatedHistory\x18\x08 \x01(\x0b\x32\x12.PropagatedHistoryH\x01\x88\x01\x01\x42\t\n\x07_routerB\x14\n\x12_propagatedHistoryJ\x04\x08\x05\x10\x06\"\x82\x02\n\x10WorkflowResponse\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12 \n\x07\x61\x63tions\x18\x02 \x03(\x0b\x32\x0f.WorkflowAction\x12\x32\n\x0c\x63ustomStatus\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x17\n\x0f\x63ompletionToken\x18\x04 \x01(\t\x12\x37\n\x12numEventsProcessed\x18\x05 \x01(\x0b\x32\x1b.google.protobuf.Int32Value\x12&\n\x07version\x18\x06 \x01(\x0b\x32\x10.WorkflowVersionH\x00\x88\x01\x01\x42\n\n\x08_version\"\xaf\x03\n\x15\x43reateInstanceRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12;\n\x17scheduledStartTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\x0b\x65xecutionId\x18\x07 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12.\n\x04tags\x18\x08 \x03(\x0b\x32 .CreateInstanceRequest.TagsEntry\x12)\n\x12parentTraceContext\x18\t \x01(\x0b\x32\r.TraceContext\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01J\x04\x08\x06\x10\x07R\x1aorchestrationIdReusePolicy\",\n\x16\x43reateInstanceResponse\x12\x12\n\ninstanceId\x18\x01 \x01(\t\"E\n\x12GetInstanceRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x1b\n\x13getInputsAndOutputs\x18\x02 \x01(\x08\"L\n\x13GetInstanceResponse\x12\x0e\n\x06\x65xists\x18\x01 \x01(\x08\x12%\n\rworkflowState\x18\x02 \x01(\x0b\x32\x0e.WorkflowState\"b\n\x11RaiseEventRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\x14\n\x12RaiseEventResponse\"g\n\x10TerminateRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06output\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x11\n\trecursive\x18\x03 \x01(\x08\"\x13\n\x11TerminateResponse\"R\n\x0eSuspendRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\x11\n\x0fSuspendResponse\"Q\n\rResumeRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\x10\n\x0eResumeResponse\"\x9e\x01\n\x15PurgeInstancesRequest\x12\x14\n\ninstanceId\x18\x01 \x01(\tH\x00\x12\x33\n\x13purgeInstanceFilter\x18\x02 \x01(\x0b\x32\x14.PurgeInstanceFilterH\x00\x12\x11\n\trecursive\x18\x03 \x01(\x08\x12\x12\n\x05\x66orce\x18\x04 \x01(\x08H\x01\x88\x01\x01\x42\t\n\x07requestB\x08\n\x06_force\"\xaa\x01\n\x13PurgeInstanceFilter\x12\x33\n\x0f\x63reatedTimeFrom\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\rcreatedTimeTo\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\rruntimeStatus\x18\x03 \x03(\x0e\x32\x14.OrchestrationStatus\"f\n\x16PurgeInstancesResponse\x12\x1c\n\x14\x64\x65letedInstanceCount\x18\x01 \x01(\x05\x12.\n\nisComplete\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.BoolValue\"-\n\x13GetWorkItemsRequestJ\x04\x08\x01\x10\x02J\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04J\x04\x08\n\x10\x0b\"\x9a\x01\n\x08WorkItem\x12+\n\x0fworkflowRequest\x18\x01 \x01(\x0b\x32\x10.WorkflowRequestH\x00\x12+\n\x0f\x61\x63tivityRequest\x18\x02 \x01(\x0b\x32\x10.ActivityRequestH\x00\x12\x17\n\x0f\x63ompletionToken\x18\n \x01(\tB\t\n\x07requestJ\x04\x08\x03\x10\x04J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06\"\x16\n\x14\x43ompleteTaskResponse\"\x85\x02\n\x1dRerunWorkflowFromEventRequest\x12\x18\n\x10sourceInstanceID\x18\x01 \x01(\t\x12\x0f\n\x07\x65ventID\x18\x02 \x01(\r\x12\x1a\n\rnewInstanceID\x18\x03 \x01(\tH\x00\x88\x01\x01\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x16\n\x0eoverwriteInput\x18\x05 \x01(\x08\x12\'\n\x1anewChildWorkflowInstanceID\x18\x06 \x01(\tH\x01\x88\x01\x01\x42\x10\n\x0e_newInstanceIDB\x1d\n\x1b_newChildWorkflowInstanceID\"7\n\x1eRerunWorkflowFromEventResponse\x12\x15\n\rnewInstanceID\x18\x01 \x01(\t\"r\n\x16ListInstanceIDsRequest\x12\x1e\n\x11\x63ontinuationToken\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x15\n\x08pageSize\x18\x02 \x01(\rH\x01\x88\x01\x01\x42\x14\n\x12_continuationTokenB\x0b\n\t_pageSize\"d\n\x17ListInstanceIDsResponse\x12\x13\n\x0binstanceIds\x18\x01 \x03(\t\x12\x1e\n\x11\x63ontinuationToken\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x14\n\x12_continuationToken\"/\n\x19GetInstanceHistoryRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\";\n\x1aGetInstanceHistoryResponse\x12\x1d\n\x06\x65vents\x18\x01 \x03(\x0b\x32\r.HistoryEvent*^\n\x10WorkerCapability\x12!\n\x1dWORKER_CAPABILITY_UNSPECIFIED\x10\x00\x12\'\n#WORKER_CAPABILITY_HISTORY_STREAMING\x10\x01\x32\xe8\x08\n\x15TaskHubSidecarService\x12\x37\n\x05Hello\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12@\n\rStartInstance\x12\x16.CreateInstanceRequest\x1a\x17.CreateInstanceResponse\x12\x38\n\x0bGetInstance\x12\x13.GetInstanceRequest\x1a\x14.GetInstanceResponse\x12\x41\n\x14WaitForInstanceStart\x12\x13.GetInstanceRequest\x1a\x14.GetInstanceResponse\x12\x46\n\x19WaitForInstanceCompletion\x12\x13.GetInstanceRequest\x1a\x14.GetInstanceResponse\x12\x35\n\nRaiseEvent\x12\x12.RaiseEventRequest\x1a\x13.RaiseEventResponse\x12:\n\x11TerminateInstance\x12\x11.TerminateRequest\x1a\x12.TerminateResponse\x12\x34\n\x0fSuspendInstance\x12\x0f.SuspendRequest\x1a\x10.SuspendResponse\x12\x31\n\x0eResumeInstance\x12\x0e.ResumeRequest\x1a\x0f.ResumeResponse\x12\x41\n\x0ePurgeInstances\x12\x16.PurgeInstancesRequest\x1a\x17.PurgeInstancesResponse\x12\x31\n\x0cGetWorkItems\x12\x14.GetWorkItemsRequest\x1a\t.WorkItem0\x01\x12@\n\x14\x43ompleteActivityTask\x12\x11.ActivityResponse\x1a\x15.CompleteTaskResponse\x12I\n\x18\x43ompleteOrchestratorTask\x12\x11.WorkflowResponse\x1a\x15.CompleteTaskResponse\"\x03\x88\x02\x01\x12@\n\x14\x43ompleteWorkflowTask\x12\x11.WorkflowResponse\x1a\x15.CompleteTaskResponse\x12Y\n\x16RerunWorkflowFromEvent\x12\x1e.RerunWorkflowFromEventRequest\x1a\x1f.RerunWorkflowFromEventResponse\x12\x44\n\x0fListInstanceIDs\x12\x17.ListInstanceIDsRequest\x1a\x18.ListInstanceIDsResponse\x12M\n\x12GetInstanceHistory\x12\x1a.GetInstanceHistoryRequest\x1a\x1b.GetInstanceHistoryResponseBV\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aorchestrator_service.proto\x1a\x13orchestration.proto\x1a\x14history_events.proto\x1a\x1aorchestrator_actions.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc6\x02\n\x0f\x41\x63tivityRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12-\n\x07version\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x10workflowInstance\x18\x04 \x01(\x0b\x32\x11.WorkflowInstance\x12\x0e\n\x06taskId\x18\x05 \x01(\x05\x12)\n\x12parentTraceContext\x18\x06 \x01(\x0b\x32\r.TraceContext\x12\x17\n\x0ftaskExecutionId\x18\x07 \x01(\t\x12\x32\n\x11propagatedHistory\x18\x08 \x01(\x0b\x32\x12.PropagatedHistoryH\x00\x88\x01\x01\x42\x14\n\x12_propagatedHistory\"\xaa\x01\n\x10\x41\x63tivityResponse\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0e\n\x06taskId\x18\x02 \x01(\x05\x12,\n\x06result\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x0e\x66\x61ilureDetails\x18\x04 \x01(\x0b\x32\x13.TaskFailureDetails\x12\x17\n\x0f\x63ompletionToken\x18\x05 \x01(\t\"#\n\rCachedHistory\x12\x12\n\neventCount\x18\x01 \x01(\x05\"\xfa\x02\n\x0fWorkflowRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x31\n\x0b\x65xecutionId\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12!\n\npastEvents\x18\x03 \x03(\x0b\x32\r.HistoryEvent\x12 \n\tnewEvents\x18\x04 \x03(\x0b\x32\r.HistoryEvent\x12 \n\x18requiresHistoryStreaming\x18\x06 \x01(\x08\x12 \n\x06router\x18\x07 \x01(\x0b\x32\x0b.TaskRouterH\x00\x88\x01\x01\x12\x32\n\x11propagatedHistory\x18\x08 \x01(\x0b\x32\x12.PropagatedHistoryH\x01\x88\x01\x01\x12*\n\rcachedHistory\x18\t \x01(\x0b\x32\x0e.CachedHistoryH\x02\x88\x01\x01\x42\t\n\x07_routerB\x14\n\x12_propagatedHistoryB\x10\n\x0e_cachedHistoryJ\x04\x08\x05\x10\x06\"\x82\x02\n\x10WorkflowResponse\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12 \n\x07\x61\x63tions\x18\x02 \x03(\x0b\x32\x0f.WorkflowAction\x12\x32\n\x0c\x63ustomStatus\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x17\n\x0f\x63ompletionToken\x18\x04 \x01(\t\x12\x37\n\x12numEventsProcessed\x18\x05 \x01(\x0b\x32\x1b.google.protobuf.Int32Value\x12&\n\x07version\x18\x06 \x01(\x0b\x32\x10.WorkflowVersionH\x00\x88\x01\x01\x42\n\n\x08_version\"\xaf\x03\n\x15\x43reateInstanceRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x07version\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12;\n\x17scheduledStartTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\x0b\x65xecutionId\x18\x07 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12.\n\x04tags\x18\x08 \x03(\x0b\x32 .CreateInstanceRequest.TagsEntry\x12)\n\x12parentTraceContext\x18\t \x01(\x0b\x32\r.TraceContext\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01J\x04\x08\x06\x10\x07R\x1aorchestrationIdReusePolicy\",\n\x16\x43reateInstanceResponse\x12\x12\n\ninstanceId\x18\x01 \x01(\t\"E\n\x12GetInstanceRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x1b\n\x13getInputsAndOutputs\x18\x02 \x01(\x08\"L\n\x13GetInstanceResponse\x12\x0e\n\x06\x65xists\x18\x01 \x01(\x08\x12%\n\rworkflowState\x18\x02 \x01(\x0b\x32\x0e.WorkflowState\"b\n\x11RaiseEventRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x05input\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\x14\n\x12RaiseEventResponse\"g\n\x10TerminateRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06output\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x11\n\trecursive\x18\x03 \x01(\x08\"\x13\n\x11TerminateResponse\"R\n\x0eSuspendRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\x11\n\x0fSuspendResponse\"Q\n\rResumeRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\x12,\n\x06reason\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"\x10\n\x0eResumeResponse\"\x9e\x01\n\x15PurgeInstancesRequest\x12\x14\n\ninstanceId\x18\x01 \x01(\tH\x00\x12\x33\n\x13purgeInstanceFilter\x18\x02 \x01(\x0b\x32\x14.PurgeInstanceFilterH\x00\x12\x11\n\trecursive\x18\x03 \x01(\x08\x12\x12\n\x05\x66orce\x18\x04 \x01(\x08H\x01\x88\x01\x01\x42\t\n\x07requestB\x08\n\x06_force\"\xaa\x01\n\x13PurgeInstanceFilter\x12\x33\n\x0f\x63reatedTimeFrom\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\rcreatedTimeTo\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\rruntimeStatus\x18\x03 \x03(\x0e\x32\x14.OrchestrationStatus\"f\n\x16PurgeInstancesResponse\x12\x1c\n\x14\x64\x65letedInstanceCount\x18\x01 \x01(\x05\x12.\n\nisComplete\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.BoolValue\"V\n\x13GetWorkItemsRequest\x12\'\n\x0c\x63\x61pabilities\x18\x04 \x03(\x0e\x32\x11.WorkerCapabilityJ\x04\x08\x01\x10\x02J\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04J\x04\x08\n\x10\x0b\"\x9a\x01\n\x08WorkItem\x12+\n\x0fworkflowRequest\x18\x01 \x01(\x0b\x32\x10.WorkflowRequestH\x00\x12+\n\x0f\x61\x63tivityRequest\x18\x02 \x01(\x0b\x32\x10.ActivityRequestH\x00\x12\x17\n\x0f\x63ompletionToken\x18\n \x01(\tB\t\n\x07requestJ\x04\x08\x03\x10\x04J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06\"\x16\n\x14\x43ompleteTaskResponse\"\x85\x02\n\x1dRerunWorkflowFromEventRequest\x12\x18\n\x10sourceInstanceID\x18\x01 \x01(\t\x12\x0f\n\x07\x65ventID\x18\x02 \x01(\r\x12\x1a\n\rnewInstanceID\x18\x03 \x01(\tH\x00\x88\x01\x01\x12+\n\x05input\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x16\n\x0eoverwriteInput\x18\x05 \x01(\x08\x12\'\n\x1anewChildWorkflowInstanceID\x18\x06 \x01(\tH\x01\x88\x01\x01\x42\x10\n\x0e_newInstanceIDB\x1d\n\x1b_newChildWorkflowInstanceID\"7\n\x1eRerunWorkflowFromEventResponse\x12\x15\n\rnewInstanceID\x18\x01 \x01(\t\"r\n\x16ListInstanceIDsRequest\x12\x1e\n\x11\x63ontinuationToken\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x15\n\x08pageSize\x18\x02 \x01(\rH\x01\x88\x01\x01\x42\x14\n\x12_continuationTokenB\x0b\n\t_pageSize\"d\n\x17ListInstanceIDsResponse\x12\x13\n\x0binstanceIds\x18\x01 \x03(\t\x12\x1e\n\x11\x63ontinuationToken\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x14\n\x12_continuationToken\"/\n\x19GetInstanceHistoryRequest\x12\x12\n\ninstanceId\x18\x01 \x01(\t\";\n\x1aGetInstanceHistoryResponse\x12\x1d\n\x06\x65vents\x18\x01 \x03(\x0b\x32\r.HistoryEvent*\x88\x01\n\x10WorkerCapability\x12!\n\x1dWORKER_CAPABILITY_UNSPECIFIED\x10\x00\x12&\n\"WORKER_CAPABILITY_STATEFUL_HISTORY\x10\x02\"\x04\x08\x01\x10\x01*#WORKER_CAPABILITY_HISTORY_STREAMING2\xe8\x08\n\x15TaskHubSidecarService\x12\x37\n\x05Hello\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12@\n\rStartInstance\x12\x16.CreateInstanceRequest\x1a\x17.CreateInstanceResponse\x12\x38\n\x0bGetInstance\x12\x13.GetInstanceRequest\x1a\x14.GetInstanceResponse\x12\x41\n\x14WaitForInstanceStart\x12\x13.GetInstanceRequest\x1a\x14.GetInstanceResponse\x12\x46\n\x19WaitForInstanceCompletion\x12\x13.GetInstanceRequest\x1a\x14.GetInstanceResponse\x12\x35\n\nRaiseEvent\x12\x12.RaiseEventRequest\x1a\x13.RaiseEventResponse\x12:\n\x11TerminateInstance\x12\x11.TerminateRequest\x1a\x12.TerminateResponse\x12\x34\n\x0fSuspendInstance\x12\x0f.SuspendRequest\x1a\x10.SuspendResponse\x12\x31\n\x0eResumeInstance\x12\x0e.ResumeRequest\x1a\x0f.ResumeResponse\x12\x41\n\x0ePurgeInstances\x12\x16.PurgeInstancesRequest\x1a\x17.PurgeInstancesResponse\x12\x31\n\x0cGetWorkItems\x12\x14.GetWorkItemsRequest\x1a\t.WorkItem0\x01\x12@\n\x14\x43ompleteActivityTask\x12\x11.ActivityResponse\x1a\x15.CompleteTaskResponse\x12I\n\x18\x43ompleteOrchestratorTask\x12\x11.WorkflowResponse\x1a\x15.CompleteTaskResponse\"\x03\x88\x02\x01\x12@\n\x14\x43ompleteWorkflowTask\x12\x11.WorkflowResponse\x1a\x15.CompleteTaskResponse\x12Y\n\x16RerunWorkflowFromEvent\x12\x1e.RerunWorkflowFromEventRequest\x1a\x1f.RerunWorkflowFromEventResponse\x12\x44\n\x0fListInstanceIDs\x12\x17.ListInstanceIDsRequest\x1a\x18.ListInstanceIDsResponse\x12M\n\x12GetInstanceHistory\x12\x1a.GetInstanceHistoryRequest\x1a\x1b.GetInstanceHistoryResponseBV\n+io.dapr.durabletask.implementation.protobufZ\x0b/api/protos\xaa\x02\x19\x44\x61pr.DurableTask.Protobufb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -42,66 +42,68 @@ _globals['_CREATEINSTANCEREQUEST_TAGSENTRY']._serialized_options = b'8\001' _globals['_TASKHUBSIDECARSERVICE'].methods_by_name['CompleteOrchestratorTask']._loaded_options = None _globals['_TASKHUBSIDECARSERVICE'].methods_by_name['CompleteOrchestratorTask']._serialized_options = b'\210\002\001' - _globals['_WORKERCAPABILITY']._serialized_start=3673 - _globals['_WORKERCAPABILITY']._serialized_end=3767 + _globals['_WORKERCAPABILITY']._serialized_start=3814 + _globals['_WORKERCAPABILITY']._serialized_end=3950 _globals['_ACTIVITYREQUEST']._serialized_start=196 _globals['_ACTIVITYREQUEST']._serialized_end=522 _globals['_ACTIVITYRESPONSE']._serialized_start=525 _globals['_ACTIVITYRESPONSE']._serialized_end=695 - _globals['_WORKFLOWREQUEST']._serialized_start=698 - _globals['_WORKFLOWREQUEST']._serialized_end=1014 - _globals['_WORKFLOWRESPONSE']._serialized_start=1017 - _globals['_WORKFLOWRESPONSE']._serialized_end=1275 - _globals['_CREATEINSTANCEREQUEST']._serialized_start=1278 - _globals['_CREATEINSTANCEREQUEST']._serialized_end=1709 - _globals['_CREATEINSTANCEREQUEST_TAGSENTRY']._serialized_start=1632 - _globals['_CREATEINSTANCEREQUEST_TAGSENTRY']._serialized_end=1675 - _globals['_CREATEINSTANCERESPONSE']._serialized_start=1711 - _globals['_CREATEINSTANCERESPONSE']._serialized_end=1755 - _globals['_GETINSTANCEREQUEST']._serialized_start=1757 - _globals['_GETINSTANCEREQUEST']._serialized_end=1826 - _globals['_GETINSTANCERESPONSE']._serialized_start=1828 - _globals['_GETINSTANCERESPONSE']._serialized_end=1904 - _globals['_RAISEEVENTREQUEST']._serialized_start=1906 - _globals['_RAISEEVENTREQUEST']._serialized_end=2004 - _globals['_RAISEEVENTRESPONSE']._serialized_start=2006 - _globals['_RAISEEVENTRESPONSE']._serialized_end=2026 - _globals['_TERMINATEREQUEST']._serialized_start=2028 - _globals['_TERMINATEREQUEST']._serialized_end=2131 - _globals['_TERMINATERESPONSE']._serialized_start=2133 - _globals['_TERMINATERESPONSE']._serialized_end=2152 - _globals['_SUSPENDREQUEST']._serialized_start=2154 - _globals['_SUSPENDREQUEST']._serialized_end=2236 - _globals['_SUSPENDRESPONSE']._serialized_start=2238 - _globals['_SUSPENDRESPONSE']._serialized_end=2255 - _globals['_RESUMEREQUEST']._serialized_start=2257 - _globals['_RESUMEREQUEST']._serialized_end=2338 - _globals['_RESUMERESPONSE']._serialized_start=2340 - _globals['_RESUMERESPONSE']._serialized_end=2356 - _globals['_PURGEINSTANCESREQUEST']._serialized_start=2359 - _globals['_PURGEINSTANCESREQUEST']._serialized_end=2517 - _globals['_PURGEINSTANCEFILTER']._serialized_start=2520 - _globals['_PURGEINSTANCEFILTER']._serialized_end=2690 - _globals['_PURGEINSTANCESRESPONSE']._serialized_start=2692 - _globals['_PURGEINSTANCESRESPONSE']._serialized_end=2794 - _globals['_GETWORKITEMSREQUEST']._serialized_start=2796 - _globals['_GETWORKITEMSREQUEST']._serialized_end=2841 - _globals['_WORKITEM']._serialized_start=2844 - _globals['_WORKITEM']._serialized_end=2998 - _globals['_COMPLETETASKRESPONSE']._serialized_start=3000 - _globals['_COMPLETETASKRESPONSE']._serialized_end=3022 - _globals['_RERUNWORKFLOWFROMEVENTREQUEST']._serialized_start=3025 - _globals['_RERUNWORKFLOWFROMEVENTREQUEST']._serialized_end=3286 - _globals['_RERUNWORKFLOWFROMEVENTRESPONSE']._serialized_start=3288 - _globals['_RERUNWORKFLOWFROMEVENTRESPONSE']._serialized_end=3343 - _globals['_LISTINSTANCEIDSREQUEST']._serialized_start=3345 - _globals['_LISTINSTANCEIDSREQUEST']._serialized_end=3459 - _globals['_LISTINSTANCEIDSRESPONSE']._serialized_start=3461 - _globals['_LISTINSTANCEIDSRESPONSE']._serialized_end=3561 - _globals['_GETINSTANCEHISTORYREQUEST']._serialized_start=3563 - _globals['_GETINSTANCEHISTORYREQUEST']._serialized_end=3610 - _globals['_GETINSTANCEHISTORYRESPONSE']._serialized_start=3612 - _globals['_GETINSTANCEHISTORYRESPONSE']._serialized_end=3671 - _globals['_TASKHUBSIDECARSERVICE']._serialized_start=3770 - _globals['_TASKHUBSIDECARSERVICE']._serialized_end=4898 + _globals['_CACHEDHISTORY']._serialized_start=697 + _globals['_CACHEDHISTORY']._serialized_end=732 + _globals['_WORKFLOWREQUEST']._serialized_start=735 + _globals['_WORKFLOWREQUEST']._serialized_end=1113 + _globals['_WORKFLOWRESPONSE']._serialized_start=1116 + _globals['_WORKFLOWRESPONSE']._serialized_end=1374 + _globals['_CREATEINSTANCEREQUEST']._serialized_start=1377 + _globals['_CREATEINSTANCEREQUEST']._serialized_end=1808 + _globals['_CREATEINSTANCEREQUEST_TAGSENTRY']._serialized_start=1731 + _globals['_CREATEINSTANCEREQUEST_TAGSENTRY']._serialized_end=1774 + _globals['_CREATEINSTANCERESPONSE']._serialized_start=1810 + _globals['_CREATEINSTANCERESPONSE']._serialized_end=1854 + _globals['_GETINSTANCEREQUEST']._serialized_start=1856 + _globals['_GETINSTANCEREQUEST']._serialized_end=1925 + _globals['_GETINSTANCERESPONSE']._serialized_start=1927 + _globals['_GETINSTANCERESPONSE']._serialized_end=2003 + _globals['_RAISEEVENTREQUEST']._serialized_start=2005 + _globals['_RAISEEVENTREQUEST']._serialized_end=2103 + _globals['_RAISEEVENTRESPONSE']._serialized_start=2105 + _globals['_RAISEEVENTRESPONSE']._serialized_end=2125 + _globals['_TERMINATEREQUEST']._serialized_start=2127 + _globals['_TERMINATEREQUEST']._serialized_end=2230 + _globals['_TERMINATERESPONSE']._serialized_start=2232 + _globals['_TERMINATERESPONSE']._serialized_end=2251 + _globals['_SUSPENDREQUEST']._serialized_start=2253 + _globals['_SUSPENDREQUEST']._serialized_end=2335 + _globals['_SUSPENDRESPONSE']._serialized_start=2337 + _globals['_SUSPENDRESPONSE']._serialized_end=2354 + _globals['_RESUMEREQUEST']._serialized_start=2356 + _globals['_RESUMEREQUEST']._serialized_end=2437 + _globals['_RESUMERESPONSE']._serialized_start=2439 + _globals['_RESUMERESPONSE']._serialized_end=2455 + _globals['_PURGEINSTANCESREQUEST']._serialized_start=2458 + _globals['_PURGEINSTANCESREQUEST']._serialized_end=2616 + _globals['_PURGEINSTANCEFILTER']._serialized_start=2619 + _globals['_PURGEINSTANCEFILTER']._serialized_end=2789 + _globals['_PURGEINSTANCESRESPONSE']._serialized_start=2791 + _globals['_PURGEINSTANCESRESPONSE']._serialized_end=2893 + _globals['_GETWORKITEMSREQUEST']._serialized_start=2895 + _globals['_GETWORKITEMSREQUEST']._serialized_end=2981 + _globals['_WORKITEM']._serialized_start=2984 + _globals['_WORKITEM']._serialized_end=3138 + _globals['_COMPLETETASKRESPONSE']._serialized_start=3140 + _globals['_COMPLETETASKRESPONSE']._serialized_end=3162 + _globals['_RERUNWORKFLOWFROMEVENTREQUEST']._serialized_start=3165 + _globals['_RERUNWORKFLOWFROMEVENTREQUEST']._serialized_end=3426 + _globals['_RERUNWORKFLOWFROMEVENTRESPONSE']._serialized_start=3428 + _globals['_RERUNWORKFLOWFROMEVENTRESPONSE']._serialized_end=3483 + _globals['_LISTINSTANCEIDSREQUEST']._serialized_start=3485 + _globals['_LISTINSTANCEIDSREQUEST']._serialized_end=3599 + _globals['_LISTINSTANCEIDSRESPONSE']._serialized_start=3601 + _globals['_LISTINSTANCEIDSRESPONSE']._serialized_end=3701 + _globals['_GETINSTANCEHISTORYREQUEST']._serialized_start=3703 + _globals['_GETINSTANCEHISTORYREQUEST']._serialized_end=3750 + _globals['_GETINSTANCEHISTORYRESPONSE']._serialized_start=3752 + _globals['_GETINSTANCEHISTORYRESPONSE']._serialized_end=3811 + _globals['_TASKHUBSIDECARSERVICE']._serialized_start=3953 + _globals['_TASKHUBSIDECARSERVICE']._serialized_end=5081 # @@protoc_insertion_point(module_scope) diff --git a/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.pyi b/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.pyi index a75bb3d8..dcce421a 100644 --- a/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.pyi +++ b/dapr/ext/workflow/_durabletask/internal/orchestrator_service_pb2.pyi @@ -19,10 +19,10 @@ from dapr.ext.workflow._durabletask.internal import orchestrator_actions_pb2 as import sys import typing as _typing -if sys.version_info >= (3, 11): - from typing import TypeAlias as _TypeAlias, Never as _Never +if sys.version_info >= (3, 10): + from typing import TypeAlias as _TypeAlias else: - from typing_extensions import TypeAlias as _TypeAlias, Never as _Never + from typing_extensions import TypeAlias as _TypeAlias DESCRIPTOR: _descriptor.FileDescriptor @@ -33,21 +33,31 @@ class _WorkerCapability: class _WorkerCapabilityEnumTypeWrapper(_enum_type_wrapper._EnumTypeWrapper[_WorkerCapability.ValueType], _builtins.type): DESCRIPTOR: _descriptor.EnumDescriptor WORKER_CAPABILITY_UNSPECIFIED: _WorkerCapability.ValueType # 0 - WORKER_CAPABILITY_HISTORY_STREAMING: _WorkerCapability.ValueType # 1 - """Indicates that the worker is capable of streaming instance history as a more optimized - alternative to receiving the full history embedded in the workflow work-item. - When set, the service may return work items without any history events as an optimization. - It is strongly recommended that all SDKs support this capability. + WORKER_CAPABILITY_STATEFUL_HISTORY: _WorkerCapability.ValueType # 2 + """Indicates that the worker retains an instance's accumulated history in + memory between workflow turns on the same work-item stream, so that the + service can send only the new events (the delta) instead of the full + history each turn. When the service has dispatched a turn for an + instance to this stream and believes the stream is still warm for it, it + may set WorkflowRequest.cachedHistory and drop the committed-history + prefix the worker already holds from pastEvents, leaving only the delta + there. On a cache miss the worker recovers the full history via the + GetInstanceHistory RPC, so the optimization never affects correctness. """ class WorkerCapability(_WorkerCapability, metaclass=_WorkerCapabilityEnumTypeWrapper): ... WORKER_CAPABILITY_UNSPECIFIED: WorkerCapability.ValueType # 0 -WORKER_CAPABILITY_HISTORY_STREAMING: WorkerCapability.ValueType # 1 -"""Indicates that the worker is capable of streaming instance history as a more optimized -alternative to receiving the full history embedded in the workflow work-item. -When set, the service may return work items without any history events as an optimization. -It is strongly recommended that all SDKs support this capability. +WORKER_CAPABILITY_STATEFUL_HISTORY: WorkerCapability.ValueType # 2 +"""Indicates that the worker retains an instance's accumulated history in +memory between workflow turns on the same work-item stream, so that the +service can send only the new events (the delta) instead of the full +history each turn. When the service has dispatched a turn for an +instance to this stream and believes the stream is still warm for it, it +may set WorkflowRequest.cachedHistory and drop the committed-history +prefix the worker already holds from pastEvents, leaving only the delta +there. On a cache miss the worker recovers the full history via the +GetInstanceHistory RPC, so the optimization never affects correctness. """ Global___WorkerCapability: _TypeAlias = WorkerCapability # noqa: Y015 @@ -132,10 +142,45 @@ class ActivityResponse(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["completionToken", b"completionToken", "failureDetails", b"failureDetails", "instanceId", b"instanceId", "result", b"result", "taskId", b"taskId"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___ActivityResponse: _TypeAlias = ActivityResponse # noqa: Y015 +@_typing.final +class CachedHistory(_message.Message): + """CachedHistory is set on a WorkflowRequest when the service has intentionally + omitted the committed history prefix the worker is expected to already hold + for this instance from a previous turn on the same stream (see + WORKER_CAPABILITY_STATEFUL_HISTORY). Its presence means pastEvents carries + only the delta since the worker was last brought up to date; its absence + means pastEvents is the full committed history. The worker reconstructs the + full past history by prepending its cached events to pastEvents. The service + only sets this for workers that advertised + WORKER_CAPABILITY_STATEFUL_HISTORY and that it believes to be warm for the + instance, so it is always safe for a worker to fall back to the + GetInstanceHistory RPC. + """ + + DESCRIPTOR: _descriptor.Descriptor + + EVENTCOUNT_FIELD_NUMBER: _builtins.int + eventCount: _builtins.int + """eventCount is the number of leading (committed) history events the + service believes the worker already holds, i.e. the length of the prefix + omitted from pastEvents. The worker's cached prefix must contain exactly + this many events; if it does not, the worker must treat this as a cache + miss and fetch the full history via GetInstanceHistory before applying + newEvents. + """ + def __init__( + self, + *, + eventCount: _builtins.int = ..., + ) -> None: ... + _ClearFieldArgType: _TypeAlias = _typing.Literal["eventCount", b"eventCount"] # noqa: Y015 + def ClearField(self, field_name: _ClearFieldArgType) -> None: ... + +Global___CachedHistory: _TypeAlias = CachedHistory # noqa: Y015 + @_typing.final class WorkflowRequest(_message.Message): DESCRIPTOR: _descriptor.Descriptor @@ -147,6 +192,7 @@ class WorkflowRequest(_message.Message): REQUIRESHISTORYSTREAMING_FIELD_NUMBER: _builtins.int ROUTER_FIELD_NUMBER: _builtins.int PROPAGATEDHISTORY_FIELD_NUMBER: _builtins.int + CACHEDHISTORY_FIELD_NUMBER: _builtins.int instanceId: _builtins.str requiresHistoryStreaming: _builtins.bool @_builtins.property @@ -164,6 +210,14 @@ class WorkflowRequest(_message.Message): workflow function can access it via ctx. """ + @_builtins.property + def cachedHistory(self) -> Global___CachedHistory: + """cachedHistory, when present, signals that pastEvents holds only the + delta and the worker must reconstruct the omitted prefix from its own + cache (or fetch it via GetInstanceHistory on a miss). Absent for + full-history sends. + """ + def __init__( self, *, @@ -174,16 +228,21 @@ class WorkflowRequest(_message.Message): requiresHistoryStreaming: _builtins.bool = ..., router: _orchestration_pb2.TaskRouter | None = ..., propagatedHistory: _history_events_pb2.PropagatedHistory | None = ..., + cachedHistory: Global___CachedHistory | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _typing.Literal["_propagatedHistory", b"_propagatedHistory", "_router", b"_router", "executionId", b"executionId", "propagatedHistory", b"propagatedHistory", "router", b"router"] # noqa: Y015 + _HasFieldArgType: _TypeAlias = _typing.Literal["_cachedHistory", b"_cachedHistory", "_propagatedHistory", b"_propagatedHistory", "_router", b"_router", "cachedHistory", b"cachedHistory", "executionId", b"executionId", "propagatedHistory", b"propagatedHistory", "router", b"router"] # noqa: Y015 def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _typing.Literal["_propagatedHistory", b"_propagatedHistory", "_router", b"_router", "executionId", b"executionId", "instanceId", b"instanceId", "newEvents", b"newEvents", "pastEvents", b"pastEvents", "propagatedHistory", b"propagatedHistory", "requiresHistoryStreaming", b"requiresHistoryStreaming", "router", b"router"] # noqa: Y015 + _ClearFieldArgType: _TypeAlias = _typing.Literal["_cachedHistory", b"_cachedHistory", "_propagatedHistory", b"_propagatedHistory", "_router", b"_router", "cachedHistory", b"cachedHistory", "executionId", b"executionId", "instanceId", b"instanceId", "newEvents", b"newEvents", "pastEvents", b"pastEvents", "propagatedHistory", b"propagatedHistory", "requiresHistoryStreaming", b"requiresHistoryStreaming", "router", b"router"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... + _WhichOneofReturnType__cachedHistory: _TypeAlias = _typing.Literal["cachedHistory"] # noqa: Y015 + _WhichOneofArgType__cachedHistory: _TypeAlias = _typing.Literal["_cachedHistory", b"_cachedHistory"] # noqa: Y015 _WhichOneofReturnType__propagatedHistory: _TypeAlias = _typing.Literal["propagatedHistory"] # noqa: Y015 _WhichOneofArgType__propagatedHistory: _TypeAlias = _typing.Literal["_propagatedHistory", b"_propagatedHistory"] # noqa: Y015 _WhichOneofReturnType__router: _TypeAlias = _typing.Literal["router"] # noqa: Y015 _WhichOneofArgType__router: _TypeAlias = _typing.Literal["_router", b"_router"] # noqa: Y015 @_typing.overload + def WhichOneof(self, oneof_group: _WhichOneofArgType__cachedHistory) -> _WhichOneofReturnType__cachedHistory | None: ... + @_typing.overload def WhichOneof(self, oneof_group: _WhichOneofArgType__propagatedHistory) -> _WhichOneofReturnType__propagatedHistory | None: ... @_typing.overload def WhichOneof(self, oneof_group: _WhichOneofArgType__router) -> _WhichOneofReturnType__router | None: ... @@ -252,11 +311,8 @@ class CreateInstanceRequest(_message.Message): key: _builtins.str = ..., value: _builtins.str = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["key", b"key", "value", b"value"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... INSTANCEID_FIELD_NUMBER: _builtins.int NAME_FIELD_NUMBER: _builtins.int @@ -296,7 +352,6 @@ class CreateInstanceRequest(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["executionId", b"executionId", "input", b"input", "instanceId", b"instanceId", "name", b"name", "parentTraceContext", b"parentTraceContext", "scheduledStartTimestamp", b"scheduledStartTimestamp", "tags", b"tags", "version", b"version"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___CreateInstanceRequest: _TypeAlias = CreateInstanceRequest # noqa: Y015 @@ -311,11 +366,8 @@ class CreateInstanceResponse(_message.Message): *, instanceId: _builtins.str = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___CreateInstanceResponse: _TypeAlias = CreateInstanceResponse # noqa: Y015 @@ -333,11 +385,8 @@ class GetInstanceRequest(_message.Message): instanceId: _builtins.str = ..., getInputsAndOutputs: _builtins.bool = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["getInputsAndOutputs", b"getInputsAndOutputs", "instanceId", b"instanceId"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___GetInstanceRequest: _TypeAlias = GetInstanceRequest # noqa: Y015 @@ -360,7 +409,6 @@ class GetInstanceResponse(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["exists", b"exists", "workflowState", b"workflowState"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___GetInstanceResponse: _TypeAlias = GetInstanceResponse # noqa: Y015 @@ -386,7 +434,6 @@ class RaiseEventRequest(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["input", b"input", "instanceId", b"instanceId", "name", b"name"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___RaiseEventRequest: _TypeAlias = RaiseEventRequest # noqa: Y015 @@ -399,11 +446,6 @@ class RaiseEventResponse(_message.Message): def __init__( self, ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _Never # noqa: Y015 - def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___RaiseEventResponse: _TypeAlias = RaiseEventResponse # noqa: Y015 @@ -429,7 +471,6 @@ class TerminateRequest(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId", "output", b"output", "recursive", b"recursive"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___TerminateRequest: _TypeAlias = TerminateRequest # noqa: Y015 @@ -442,11 +483,6 @@ class TerminateResponse(_message.Message): def __init__( self, ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _Never # noqa: Y015 - def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___TerminateResponse: _TypeAlias = TerminateResponse # noqa: Y015 @@ -469,7 +505,6 @@ class SuspendRequest(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId", "reason", b"reason"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___SuspendRequest: _TypeAlias = SuspendRequest # noqa: Y015 @@ -482,11 +517,6 @@ class SuspendResponse(_message.Message): def __init__( self, ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _Never # noqa: Y015 - def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___SuspendResponse: _TypeAlias = SuspendResponse # noqa: Y015 @@ -509,7 +539,6 @@ class ResumeRequest(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId", "reason", b"reason"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___ResumeRequest: _TypeAlias = ResumeRequest # noqa: Y015 @@ -522,11 +551,6 @@ class ResumeResponse(_message.Message): def __init__( self, ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _Never # noqa: Y015 - def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___ResumeResponse: _TypeAlias = ResumeResponse # noqa: Y015 @@ -600,7 +624,6 @@ class PurgeInstanceFilter(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["createdTimeFrom", b"createdTimeFrom", "createdTimeTo", b"createdTimeTo", "runtimeStatus", b"runtimeStatus"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___PurgeInstanceFilter: _TypeAlias = PurgeInstanceFilter # noqa: Y015 @@ -623,7 +646,6 @@ class PurgeInstancesResponse(_message.Message): def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["deletedInstanceCount", b"deletedInstanceCount", "isComplete", b"isComplete"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___PurgeInstancesResponse: _TypeAlias = PurgeInstancesResponse # noqa: Y015 @@ -631,14 +653,22 @@ Global___PurgeInstancesResponse: _TypeAlias = PurgeInstancesResponse # noqa: Y0 class GetWorkItemsRequest(_message.Message): DESCRIPTOR: _descriptor.Descriptor + CAPABILITIES_FIELD_NUMBER: _builtins.int + @_builtins.property + def capabilities(self) -> _containers.RepeatedScalarFieldContainer[Global___WorkerCapability.ValueType]: + """capabilities advertises the optional protocol features this worker + supports, so the service can opt into optimizations on a per-stream + basis. Workers that leave this empty receive the default (fully + self-contained) behavior. + """ + def __init__( self, + *, + capabilities: _abc.Iterable[Global___WorkerCapability.ValueType] | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _Never # noqa: Y015 + _ClearFieldArgType: _TypeAlias = _typing.Literal["capabilities", b"capabilities"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___GetWorkItemsRequest: _TypeAlias = GetWorkItemsRequest # noqa: Y015 @@ -680,11 +710,6 @@ class CompleteTaskResponse(_message.Message): def __init__( self, ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... - _ClearFieldArgType: _TypeAlias = _Never # noqa: Y015 - def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___CompleteTaskResponse: _TypeAlias = CompleteTaskResponse # noqa: Y015 @@ -769,11 +794,8 @@ class RerunWorkflowFromEventResponse(_message.Message): *, newInstanceID: _builtins.str = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["newInstanceID", b"newInstanceID"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___RerunWorkflowFromEventResponse: _TypeAlias = RerunWorkflowFromEventResponse # noqa: Y015 @@ -862,11 +884,8 @@ class GetInstanceHistoryRequest(_message.Message): *, instanceId: _builtins.str = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["instanceId", b"instanceId"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___GetInstanceHistoryRequest: _TypeAlias = GetInstanceHistoryRequest # noqa: Y015 @@ -884,10 +903,7 @@ class GetInstanceHistoryResponse(_message.Message): *, events: _abc.Iterable[_history_events_pb2.HistoryEvent] | None = ..., ) -> None: ... - _HasFieldArgType: _TypeAlias = _Never # noqa: Y015 - def HasField(self, field_name: _HasFieldArgType) -> _builtins.bool: ... _ClearFieldArgType: _TypeAlias = _typing.Literal["events", b"events"] # noqa: Y015 def ClearField(self, field_name: _ClearFieldArgType) -> None: ... - def WhichOneof(self, oneof_group: _Never) -> None: ... Global___GetInstanceHistoryResponse: _TypeAlias = GetInstanceHistoryResponse # noqa: Y015 diff --git a/dapr/ext/workflow/_durabletask/worker.py b/dapr/ext/workflow/_durabletask/worker.py index bdc9ff48..d14a48b3 100644 --- a/dapr/ext/workflow/_durabletask/worker.py +++ b/dapr/ext/workflow/_durabletask/worker.py @@ -16,12 +16,13 @@ import os import random import threading +import time import warnings from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from threading import Event, Thread from types import GeneratorType -from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar, Union +from typing import Any, Callable, Generator, Iterator, Optional, Sequence, TypeVar, Union import grpc from google.protobuf import empty_pb2, timestamp_pb2 @@ -223,6 +224,125 @@ def _is_message_too_large(rpc_error: grpc.RpcError) -> bool: # TODO: refactor this to closely match durabletask-go/client/worker_grpc.go instead of this. +_DEFAULT_HISTORY_CACHE_TTL = 3600.0 +_DEFAULT_HISTORY_CACHE_MAX_INSTANCES = 100_000 +_HISTORY_CACHE_SWEEP_INTERVAL = 60.0 + + +class _CachedHistory: + """One instance's cached committed history on a work-item stream.""" + + __slots__ = ('events', 'last_access', 'num_bytes') + + def __init__(self, events: list[pb.HistoryEvent], last_access: float, num_bytes: int): + self.events = events + self.last_access = last_access + self.num_bytes = num_bytes + + +class _WorkflowHistoryCache: + """Per-stream cache of each instance's committed history, enabling delta work items. + + A worker advertising WORKER_CAPABILITY_STATEFUL_HISTORY keeps the committed history it + replayed for each instance so the sidecar can send only the new events (the delta). + Entries are reclaimed by a sliding TTL, an instance-count cap, and an optional byte + budget (LRU eviction). Eviction is always safe: a miss is recovered via the + GetInstanceHistory RPC, so it only costs one extra fetch. + """ + + def __init__( + self, + *, + ttl: float = _DEFAULT_HISTORY_CACHE_TTL, + max_instances: int = _DEFAULT_HISTORY_CACHE_MAX_INSTANCES, + max_bytes: int = 0, + clock: Callable[[], float] = time.monotonic, + ): + self._ttl = ttl if ttl > 0 else _DEFAULT_HISTORY_CACHE_TTL + self._max_instances = ( + max_instances if max_instances > 0 else _DEFAULT_HISTORY_CACHE_MAX_INSTANCES + ) + self._max_bytes = max_bytes if max_bytes > 0 else 0 + self._clock = clock + self._lock = threading.Lock() + self._entries: dict[str, _CachedHistory] = {} + self._total_bytes = 0 + + def get(self, instance_id: str) -> Optional[list[pb.HistoryEvent]]: + """Returns an instance's cached committed history, refreshing its TTL.""" + with self._lock: + entry = self._entries.get(instance_id) + if entry is None: + return None + entry.last_access = self._clock() + return entry.events + + def put(self, instance_id: str, events: list[pb.HistoryEvent]) -> None: + """Caches an instance's committed history, evicting LRU entries to stay in bounds.""" + num_bytes = sum(event.ByteSize() for event in events) + with self._lock: + existing = self._entries.get(instance_id) + if existing is not None: + self._total_bytes -= existing.num_bytes + self._entries[instance_id] = _CachedHistory(events, self._clock(), num_bytes) + self._total_bytes += num_bytes + self._evict_to_fit(instance_id) + + def delete(self, instance_id: str) -> None: + """Drops an instance's cached history (e.g. once it completes).""" + with self._lock: + self._remove(instance_id) + + def reset(self) -> None: + """Clears the cache; used when the stream reconnects (and starts cold).""" + with self._lock: + self._entries.clear() + self._total_bytes = 0 + + def sweep_expired(self) -> None: + """Evicts entries whose last turn was longer ago than the TTL.""" + now = self._clock() + with self._lock: + expired = [ + instance_id + for instance_id, entry in self._entries.items() + if now - entry.last_access > self._ttl + ] + for instance_id in expired: + self._remove(instance_id) + + def _remove(self, instance_id: str) -> None: + entry = self._entries.pop(instance_id, None) + if entry is not None: + self._total_bytes -= entry.num_bytes + + def _evict_to_fit(self, keep: str) -> None: + """Evicts LRU entries until within the count and byte bounds. + + Always keeps the just-touched entry so the active working set is never evicted out + from under the current turn; a lone entry over the byte budget is kept (soft overage). + """ + while len(self._entries) > 1: + over_count = len(self._entries) > self._max_instances + over_bytes = self._max_bytes > 0 and self._total_bytes > self._max_bytes + if not over_count and not over_bytes: + return + victim = self._lru_except(keep) + if victim is None: + return + self._remove(victim) + + def _lru_except(self, keep: str) -> Optional[str]: + oldest_id: Optional[str] = None + oldest_access = 0.0 + for instance_id, entry in self._entries.items(): + if instance_id == keep: + continue + if oldest_id is None or entry.last_access < oldest_access: + oldest_id, oldest_access = instance_id, entry.last_access + return oldest_id + + class TaskHubGrpcWorker: """A gRPC-based worker for processing durable task orchestrations and activities. @@ -323,6 +443,10 @@ def __init__( channel_options: Optional[Sequence[tuple[str, Any]]] = None, stop_timeout: float = 30.0, keepalive_interval: float = 30.0, + disable_stateful_history: bool = False, + history_cache_ttl: float = _DEFAULT_HISTORY_CACHE_TTL, + history_cache_max_instances: int = _DEFAULT_HISTORY_CACHE_MAX_INSTANCES, + history_cache_max_bytes: int = 0, ): self._registry = _Registry() self._host_address = host_address if host_address else shared.get_default_host_address() @@ -355,6 +479,14 @@ def __init__( self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options, self._logger) self._activity_executor = _ActivityExecutor(self._logger) + self._disable_stateful_history = disable_stateful_history + self._history_cache = _WorkflowHistoryCache( + ttl=history_cache_ttl, + max_instances=history_cache_max_instances, + max_bytes=history_cache_max_bytes, + ) + self._history_janitor: Optional[Thread] = None + @property def concurrency_options(self) -> ConcurrencyOptions: """Get the current concurrency options for this worker.""" @@ -392,6 +524,11 @@ def run_loop(): loop.run_until_complete(self._async_run_loop()) self._logger.info(f'Starting gRPC worker that connects to {self._host_address}') + if not self._disable_stateful_history: + self._history_janitor = Thread( + target=self._sweep_history_cache_loop, name='WorkerHistoryJanitor', daemon=True + ) + self._history_janitor.start() self._runLoop = Thread(target=run_loop, name='WorkerRunLoop') self._runLoop.start() while not self._stream_ready.wait(timeout=1): @@ -483,6 +620,9 @@ def invalidate_connection(): self._current_channel = None current_stub = None self._response_stream = None + # The sidecar drops this stream's warm set on disconnect, so start the + # next stream cold to stay in sync. + self._history_cache.reset() if current_reader_thread is not None: current_reader_thread.join(timeout=5) @@ -523,6 +663,10 @@ def should_invalidate_connection(rpc_error): assert current_stub is not None stub = current_stub get_work_items_request = pb.GetWorkItemsRequest() + if not self._disable_stateful_history: + get_work_items_request.capabilities.append( + pb.WORKER_CAPABILITY_STATEFUL_HISTORY + ) try: self._response_stream = stub.GetWorkItems(get_work_items_request) self._logger.info( @@ -885,6 +1029,49 @@ def _handle_grpc_execution_error(self, rpc_error: grpc.RpcError, request_type: s else: self._logger.exception(f'Failed to deliver {request_type} result: {rpc_error}') + def _sweep_history_cache_loop(self): + """Periodically reclaims TTL-expired history cache entries until shutdown.""" + while not self._shutdown.wait(_HISTORY_CACHE_SWEEP_INTERVAL): + self._history_cache.sweep_expired() + + def _resolve_history( + self, req: pb.WorkflowRequest, stub: stubs.TaskHubSidecarServiceStub + ) -> list[pb.HistoryEvent]: + """Resolves the full committed history to replay for a workflow work item. + + For a full send it is the request's pastEvents. For a delta send (cachedHistory) it + is the cached prefix plus the delta, recovered via GetInstanceHistory on any cache + miss (cold stream, eviction, or a prefix-length mismatch). + """ + if not req.HasField('cachedHistory'): + return list(req.pastEvents) + + cached = self._history_cache.get(req.instanceId) + if cached is not None and len(cached) == req.cachedHistory.eventCount: + return cached + list(req.pastEvents) + + response = stub.GetInstanceHistory(pb.GetInstanceHistoryRequest(instanceId=req.instanceId)) + return list(response.events) + + def _update_history_cache( + self, instance_id: str, committed_history: list[pb.HistoryEvent], actions + ) -> None: + """Refreshes the per-stream history cache after a turn. + + Caches only the committed history (never the not-yet-committed new events), and + drops the entry when the turn ends the execution. The current instance ends via a + completeWorkflow action whatever its status (completed/failed/terminated/ + continued-as-new); a terminateWorkflow action targets a different instance and is + deliberately not treated as a reset. + """ + if self._disable_stateful_history: + return + ended = any(a.WhichOneof('workflowActionType') == 'completeWorkflow' for a in actions) + if ended: + self._history_cache.delete(instance_id) + return + self._history_cache.put(instance_id, committed_history) + def _execute_orchestrator( self, req: pb.WorkflowRequest, @@ -898,9 +1085,11 @@ def _execute_orchestrator( if req.HasField('propagatedHistory') else None ) + old_events = self._resolve_history(req, stub) result = executor.execute( - req.instanceId, req.pastEvents, req.newEvents, propagated_history=propagated + req.instanceId, old_events, req.newEvents, propagated_history=propagated ) + self._update_history_cache(req.instanceId, old_events, result.actions) version = None if result.version_name: diff --git a/tests/ext/workflow/durabletask/test_orchestration_e2e.py b/tests/ext/workflow/durabletask/test_orchestration_e2e.py index 9aac1c22..07e5c9a8 100644 --- a/tests/ext/workflow/durabletask/test_orchestration_e2e.py +++ b/tests/ext/workflow/durabletask/test_orchestration_e2e.py @@ -121,6 +121,56 @@ def sequence(ctx: task.OrchestrationContext, start_val: int): assert state.serialized_custom_status is None +def _run_accumulate(disable_stateful_history: bool) -> Optional[client.WorkflowState]: + """Runs a 20-turn activity sequence and returns its terminal state. + + Each sequential activity is a distinct turn with a longer committed history, so a + stateful worker receives most turns as cached-history deltas. Requires a sidecar built + with the durabletask-go stateful-history change. + """ + + def plus_one(_: task.ActivityContext, value: int) -> int: + return value + 1 + + def accumulate(ctx: task.OrchestrationContext, start: int): + current = start + for _ in range(20): + current = yield ctx.call_activity(plus_one, input=current) + return current + + with worker.TaskHubGrpcWorker( + stop_timeout=2.0, disable_stateful_history=disable_stateful_history + ) as w: + w.add_orchestrator(accumulate) + w.add_activity(plus_one) + w.start() + + with client.TaskHubGrpcClient() as task_hub_client: + instance_id = task_hub_client.schedule_new_orchestration(accumulate, input=0) + return task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30) + + +def test_stateful_history_multi_turn(): + """A multi-turn workflow completes correctly with the stateful-history delta path on. + + Deltas are exercised because the worker advertises WORKER_CAPABILITY_STATEFUL_HISTORY by + default; the reconstructed history must yield the same result a full-history run would. + Wire-level delta verification lives in the durabletask-go and dapr integration suites. + """ + state = _run_accumulate(disable_stateful_history=False) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_output == json.dumps(20) + + +def test_stateful_history_disabled_matches(): + """With the optimization disabled (full history every turn) the result is identical.""" + state = _run_accumulate(disable_stateful_history=True) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_output == json.dumps(20) + + def test_activity_error_handling(): def throw(_: task.ActivityContext, input: int) -> int: raise RuntimeError('Kah-BOOOOM!!!') diff --git a/tests/ext/workflow/durabletask/test_worker_history_cache.py b/tests/ext/workflow/durabletask/test_worker_history_cache.py new file mode 100644 index 00000000..b4f9e237 --- /dev/null +++ b/tests/ext/workflow/durabletask/test_worker_history_cache.py @@ -0,0 +1,229 @@ +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the worker's stateful-history cache and history reconstruction. + +These mirror the Go reference (durabletask-go client/worker_history_test.go): the cache +bounds (TTL, instance count, byte budget) with LRU eviction, and the worker's resolution +of full vs delta work items with a GetInstanceHistory fallback on a cache miss. +""" + +from typing import cast + +import dapr.ext.workflow._durabletask.internal.orchestrator_service_pb2_grpc as stubs +import dapr.ext.workflow._durabletask.internal.protos as pb +from dapr.ext.workflow._durabletask.worker import TaskHubGrpcWorker, _WorkflowHistoryCache + + +class _Clock: + """A controllable monotonic clock for deterministic TTL tests.""" + + def __init__(self) -> None: + self.now = 0.0 + + def __call__(self) -> float: + return self.now + + +def _events(count: int) -> list[pb.HistoryEvent]: + """Events with non-zero serialized size (eventId 0 is the proto default → 0 bytes).""" + return [pb.HistoryEvent(eventId=i + 1) for i in range(count)] + + +class _FakeStub: + """A stub whose GetInstanceHistory returns a fixed history and counts calls.""" + + def __init__(self, events: list[pb.HistoryEvent]) -> None: + self._events = events + self.get_instance_history_calls = 0 + + def GetInstanceHistory(self, request: pb.GetInstanceHistoryRequest): + self.get_instance_history_calls += 1 + return pb.GetInstanceHistoryResponse(events=self._events) + + +# --- cache bounds ----------------------------------------------------------------- + + +def test_get_put_delete_reset(): + cache = _WorkflowHistoryCache() + + assert cache.get('a') is None + cache.put('a', _events(3)) + cached = cache.get('a') + assert cached is not None and len(cached) == 3 + + cache.delete('a') + assert cache.get('a') is None + + cache.put('b', _events(1)) + cache.reset() + assert cache.get('b') is None + + +def test_count_cap_evicts_lru(): + clock = _Clock() + cache = _WorkflowHistoryCache(max_instances=2, clock=clock) + + cache.put('a', _events(1)) + clock.now += 1 + cache.put('b', _events(1)) + clock.now += 1 + cache.put('c', _events(1)) # over the cap → evict LRU ('a') + + assert cache.get('a') is None + assert cache.get('b') is not None + assert cache.get('c') is not None + + +def test_byte_cap_evicts_lru(): + entry_bytes = sum(e.ByteSize() for e in _events(4)) + assert entry_bytes > 0 + clock = _Clock() + cache = _WorkflowHistoryCache(max_bytes=entry_bytes + 1, clock=clock) + + cache.put('a', _events(4)) + clock.now += 1 + cache.put('b', _events(4)) # two entries exceed the byte budget → evict LRU ('a') + + assert cache.get('a') is None + assert cache.get('b') is not None + assert cache._total_bytes <= entry_bytes + 1 + + +def test_single_oversized_entry_kept(): + cache = _WorkflowHistoryCache(max_bytes=1) + cache.put('big', _events(5)) + assert cache.get('big') is not None + + +def test_byte_accounting(): + cache = _WorkflowHistoryCache() + + cache.put('a', _events(3)) + cache.put('b', _events(2)) + assert cache._total_bytes == sum(e.ByteSize() for e in _events(3)) + sum( + e.ByteSize() for e in _events(2) + ) + + cache.put('a', _events(6)) # replace adjusts the running total to the new size + assert cache._total_bytes == sum(e.ByteSize() for e in _events(6)) + sum( + e.ByteSize() for e in _events(2) + ) + + cache.delete('a') + assert cache._total_bytes == sum(e.ByteSize() for e in _events(2)) + + cache.reset() + assert cache._total_bytes == 0 + + +def test_ttl_sweep_is_sliding(): + clock = _Clock() + cache = _WorkflowHistoryCache(ttl=60.0, clock=clock) + + cache.put('idle', _events(2)) + cache.put('active', _events(2)) + + clock.now += 120 # past the TTL... + assert cache.get('active') is not None # ...but a turn refreshes 'active' + + cache.sweep_expired() + assert cache.get('idle') is None + assert cache.get('active') is not None + + +def test_non_positive_config_uses_defaults(): + cache = _WorkflowHistoryCache(ttl=0, max_instances=-1, max_bytes=-5) + assert cache._ttl > 0 + assert cache._max_instances > 0 + assert cache._max_bytes == 0 # unlimited + + +# --- worker history resolution ---------------------------------------------------- + + +def _worker(**kwargs) -> TaskHubGrpcWorker: + return TaskHubGrpcWorker(host_address='localhost:0', **kwargs) + + +def _resolve( + worker: TaskHubGrpcWorker, req: pb.WorkflowRequest, stub: _FakeStub +) -> list[pb.HistoryEvent]: + return worker._resolve_history(req, cast(stubs.TaskHubSidecarServiceStub, stub)) + + +def test_resolve_full_send_returns_past_events(): + worker = _worker() + stub = _FakeStub(_events(99)) + req = pb.WorkflowRequest(instanceId='a', pastEvents=_events(4)) + + resolved = _resolve(worker, req, stub) + assert len(resolved) == 4 + assert stub.get_instance_history_calls == 0 + + +def test_resolve_cache_hit_reconstructs(): + worker = _worker() + worker._history_cache.put('a', _events(5)) + stub = _FakeStub(_events(99)) + req = pb.WorkflowRequest(instanceId='a', pastEvents=_events(3)) + req.cachedHistory.eventCount = 5 + + resolved = _resolve(worker, req, stub) + assert len(resolved) == 8 # cached prefix (5) + delta (3) + assert stub.get_instance_history_calls == 0 + + +def test_resolve_cache_miss_fetches_full_history(): + worker = _worker() + stub = _FakeStub(_events(7)) + req = pb.WorkflowRequest(instanceId='a', pastEvents=_events(3)) + req.cachedHistory.eventCount = 5 + + resolved = _resolve(worker, req, stub) + assert len(resolved) == 7 # recovered via GetInstanceHistory + assert stub.get_instance_history_calls == 1 + + +def test_resolve_length_mismatch_is_miss(): + worker = _worker() + worker._history_cache.put('a', _events(4)) # worker holds 4... + stub = _FakeStub(_events(7)) + req = pb.WorkflowRequest(instanceId='a', pastEvents=_events(3)) + req.cachedHistory.eventCount = 5 # ...but the sidecar expects 5 → fetch + + resolved = _resolve(worker, req, stub) + assert len(resolved) == 7 + assert stub.get_instance_history_calls == 1 + + +def test_update_cache_stores_then_evicts_on_complete(): + worker = _worker() + running = [pb.WorkflowAction(scheduleTask=pb.ScheduleTaskAction())] + worker._update_history_cache('a', _events(6), running) + assert worker._history_cache.get('a') is not None + + completed = [pb.WorkflowAction(completeWorkflow=pb.CompleteWorkflowAction())] + worker._update_history_cache('a', _events(6), completed) + assert worker._history_cache.get('a') is None + + +def test_disabled_worker_does_not_cache_and_passes_full_history(): + worker = _worker(disable_stateful_history=True) + worker._update_history_cache('a', _events(6), []) + assert worker._history_cache.get('a') is None + + stub = _FakeStub(_events(99)) + req = pb.WorkflowRequest(instanceId='a', pastEvents=_events(4)) + resolved = _resolve(worker, req, stub) + assert len(resolved) == 4 + assert stub.get_instance_history_calls == 0