Add First-Class Google Gen AI SDK Integration to Contrib#1378
Add First-Class Google Gen AI SDK Integration to Contrib#1378JasonSteving99 wants to merge 22 commits into
Conversation
500ab1e to
b770922
Compare
|
Semgrep found 1 Risk: Affected versions of litellm are vulnerable to Exposure of Sensitive Information to an Unauthorized Actor / Use of Password Hash With Insufficient Computational Effort / Use of a Broken or Risky Cryptographic Algorithm. LiteLLM exposes password hashes via authenticated API endpoints and its login flow accepts a stored SHA-256 hash as a valid password, allowing an authenticated user to steal another user's hash and log in as that user, resulting in full authentication bypass and privilege escalation. Fix: Upgrade this library to at least version 1.83.0 at sdk-python/uv.lock:2102. Reference(s): GHSA-69x8-hrgq-fjj8 Semgrep found 1 Risk: Affected versions of litellm are vulnerable to Incorrect Authorization. This vulnerability stems from inadequate enforcement of access control policies, allowing authenticated users to perform actions beyond their intended privilege level and potentially alter sensitive system configurations or access restricted resources. Fix: Upgrade this library to at least version 1.83.0 at sdk-python/uv.lock:2102. Reference(s): GHSA-53mr-6c8q-9789, CVE-2026-35029 Semgrep found 1 Risk: Affected versions of google-adk are vulnerable to Missing Authentication for Critical Function. Google Agent Development Kit (ADK) contains a missing authentication flaw that can let an unauthenticated remote attacker reach code-injection paths and execute arbitrary code on the server running the ADK instance, including ADK Web deployments on Python, Cloud Run, and GKE. Fix: Upgrade this library to at least version 1.28.1 at sdk-python/uv.lock:982. Reference(s): GHSA-rg7c-g689-fr3x, CVE-2026-4810 |
19b495c to
6763054
Compare
# Temporal Integration for the Google Gemini SDK
This adds a first-class integration that lets users call the Gemini SDK's `AsyncClient` directly from within Temporal workflows. Every API call and tool invocation becomes a durable Temporal activity — giving full crash recovery, visibility in workflow event history, and replay safety — while keeping credentials entirely on the worker side.
## How it works
The integration shims three layers of the Gemini SDK so that workflows can use `client.models`, `client.files`, `client.file_search_stores`, `client.chats`, and all other SDK modules naturally:
### `TemporalApiClient` (`_temporal_api_client.py`)
A `BaseApiClient` subclass that replaces the SDK's HTTP layer. Instead of making network calls, `async_request` and `async_request_streamed` serialize the request and dispatch it through `workflow.execute_activity`. The real HTTP call happens inside the activity on the worker, where the actual `genai.Client` with real credentials lives. Sync methods raise immediately. Per-request `http_options` are validated (non-serializable fields like `httpx_client` are rejected), and `timeout` is mapped to Temporal's `start_to_close_timeout`.
### `TemporalAsyncFiles` / `TemporalAsyncFileSearchStores` (`_temporal_files.py`, `_temporal_file_search_stores.py`)
Subclasses of `AsyncFiles` and `AsyncFileSearchStores` that override `upload`, `download`, `register_files`, and `upload_to_file_search_store` to dispatch the entire operation as a Temporal activity. This avoids filesystem access (`os` module) and credential token refresh in the workflow sandbox. Methods like `get`, `delete`, `list` are inherited and work through the `TemporalApiClient`'s `async_request` activity. File uploads accept `str` paths (resolved on the worker), `os.PathLike`, or `io.IOBase` (bytes serialized across the activity boundary).
### `TemporalAsyncClient` (`_temporal_async_client.py`)
An `AsyncClient` subclass that wires in `TemporalAsyncFiles` and `TemporalAsyncFileSearchStores`. All other SDK modules (`models`, `tunings`, `caches`, `batches`, `live`, `tokens`, `operations`) are inherited unchanged since they only use `async_request` under the hood.
### `GeminiPlugin` (`_gemini_plugin.py`)
A `SimplePlugin` that registers all activities, configures the Pydantic data converter, and passes `google.genai` through the workflow sandbox. Users pass a fully configured `genai.Client` — the plugin never constructs one itself. An optional `extra_credentials` parameter supports operations like `register_files` that need separate GCS credentials.
### `activity_as_tool` (`workflow.py`)
Wraps any `@activity.defn` function so it looks like a regular async callable to Gemini's automatic function calling (AFC). When the model decides to call the tool, the SDK invokes the wrapper, which dispatches through `workflow.execute_activity`. Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.
### Batched streaming
`generate_content_stream` is supported via a batched approach: the `async_request_streamed` activity collects all chunks from the real streaming response and returns them as a list. The workflow-side `TemporalApiClient` yields them back as an async generator so the SDK sees the expected interface.
## Usage
```python
# Worker side
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
plugin = GeminiPlugin(client)
# Workflow side
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, query: str) -> str:
client = gemini_client()
response = await client.models.generate_content(
model="gemini-2.5-flash",
contents=query,
config=types.GenerateContentConfig(
tools=[activity_as_tool(my_tool)],
),
)
return response.text
```
## Testing
31 integration tests covering:
- Basic `generate_content` and multi-chunk streaming
- AFC tool calling (single-arg, multi-arg, workflow methods, sequential multi-tool, failure propagation)
- Per-request `http_options` propagation (headers, api_version, base_url)
- File upload via str path and `io.BytesIO`, file download
- File search store upload
- Multi-turn chat via `client.chats`
- `TemporalAsyncClient` wiring verification
- `TemporalApiClient` error paths (sync raises, low-level upload/download raises)
- `activity_as_tool` validation and signature preservation
- A full end-to-end integration test that exercises all real activity implementations (generate, stream, file upload, download, store upload, RAG query, store delete) with a mocked `genai.Client` — ensuring the actual activity code in `_gemini_activity.py` is covered, not just the workflow-side shims.
6763054 to
bf58318
Compare
There was a problem hiding this comment.
Pull request overview
Adds a first-class Temporal contrib integration for the Google Gemini (google-genai) SDK so workflows can use an AsyncClient while executing all SDK HTTP calls and tool invocations as durable Temporal activities (credentials stay on the worker).
Changes:
- Introduces
temporalio.contrib.google_gemini_sdkwith a worker plugin, workflow helpers, activity implementations, and serializable request/response models. - Adds Temporal-aware shims for Gemini file operations and streamed responses.
- Adds a comprehensive integration test suite and wires the optional
google-genaidependency via a new extra.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Adds google-genai dependency via a new google-gemini extra and updates lock metadata. |
| pyproject.toml | Defines google-gemini optional dependency group (google-genai>=1.66.0). |
| tests/contrib/google_gemini_sdk/init.py | Test package marker for the new contrib integration. |
| tests/contrib/google_gemini_sdk/test_gemini.py | Integration + unit tests for API calls, streaming, file ops, tool calling, and plugin wiring. |
| temporalio/contrib/google_gemini_sdk/init.py | Public module entry points and docs for GeminiPlugin, gemini_client, and activity_as_tool. |
| temporalio/contrib/google_gemini_sdk/workflow.py | Workflow-side helpers: gemini_client() and activity_as_tool() wrapper. |
| temporalio/contrib/google_gemini_sdk/justfile | Local dev commands/examples for running workers and sample workflows. |
| temporalio/contrib/google_gemini_sdk/_models.py | Pydantic models that cross the activity boundary (requests/responses/options). |
| temporalio/contrib/google_gemini_sdk/_temporal_api_client.py | Workflow-side BaseApiClient shim that routes SDK HTTP calls through activities. |
| temporalio/contrib/google_gemini_sdk/_temporal_async_client.py | AsyncClient subclass wiring Temporal-aware files and file_search_stores. |
| temporalio/contrib/google_gemini_sdk/_temporal_files.py | Temporal-aware AsyncFiles overriding upload/download/register to run on the worker. |
| temporalio/contrib/google_gemini_sdk/_temporal_file_search_stores.py | Temporal-aware AsyncFileSearchStores overriding upload to run on the worker. |
| temporalio/contrib/google_gemini_sdk/_gemini_activity.py | Worker-side activities that execute real SDK calls using the provided genai.Client. |
| temporalio/contrib/google_gemini_sdk/_gemini_plugin.py | GeminiPlugin that registers activities, configures the data converter, and sandbox passthrough. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ni-sdk-integration # Conflicts: # uv.lock
…ni-sdk-integration # Conflicts: # .github/CODEOWNERS # pyproject.toml # uv.lock
pydoctor runs with warnings-as-errors: the bullet list needed a blank line before it, and the :func: reference must use the unqualified name since pydoctor relocates __all__ re-exports to the package page.
Client-side MCP (Gemini Developer API): TemporalMcpClientSession subclasses
mcp.ClientSession and routes list_tools/call_tool through {server}-list-tools
and {server}-call-tool activities, so the SDK's in-workflow AFC loop drives MCP
tools while the real session lives on the worker. Servers register on the
plugin via mcp_servers={name: factory} with a pooled, idle-evicted worker-side
connection (mcp_connection_idle_timeout). Server-side MCP (Vertex McpServer
config and Interactions API MCP steps) flows through unchanged as data.
Also in this change:
- Interactions API and managed agents support, plus files/file-search activities
- Collapse activity_config defaults to a single documented 60s start_to_close
- activity_as_tool requires an explicit timeout (matches openai_agents/strands)
- Replay and side-effect (ActivityTaskScheduled count) tests, incl. MCP
- mcp is an optional dep: lazy import + TYPE_CHECKING so the package imports
without it; declared in the dev group for tests
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Add GoogleGenAIError(ApplicationError) and register it via the plugin's workflow_failure_exception_types so it terminally fails the workflow rather than retrying the task; activity_as_tool validation now raises it. - Reject the SDK's own retry config instead of silently overriding it: the plugin raises ValueError if the genai.Client has http_options.retry_options, and the workflow-side client raises GoogleGenAIError on per-request retry_options (previously dropped silently). Both point users to the activity retry_policy via activity_config. - Classify API-call activity errors: catch google.genai.errors.APIError and re-raise as ApplicationError with non_retryable set by HTTP status (408/429/5xx retryable, 4xx fail fast), with the SDK error class name as the type. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…n passthrough - Add a user-facing README (overview, install, hello world, tool calling, MCP, retries/errors, Vertex, composing). - Rename the plugin to the conventional "google_genai.GoogleGenAIPlugin". - Document the replay-determinism survey: the generate_content/AFC/MCP paths are replay-safe; note the one in-workflow caveat (Vertex batches.create auto-naming). - Add pydantic_core and annotated_types to sandbox passthrough so the SDK's in-workflow Pydantic validation doesn't reimport them after workflow load. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Vertex AI: document that project/location must be set explicitly on the workflow-side client; auto-discovery can't run in the sandbox and would make in-workflow request formatting non-deterministic. - MCP: drop the server-side/interactions bullet (works unchanged, no wiring) and focus the section on the client-side path the plugin actually wires. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ni-sdk-integration # Conflicts: # pyproject.toml # uv.lock
google-genai 2.8.0 regressed automatic function calling for plain workflow-method tools: the tool executes (its function response is correct) but its in-workflow state mutation is no longer visible on replay/query, failing test_workflow_method_as_tool. The activity_as_tool path is unaffected. Cap at < 2.8.0 until a fixed release ships upstream. Also simplify exclude-newer-package to disable the cutoff for google-adk outright (= false) rather than a dated pin that needs later cleanup. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Export GoogleGenAIError in __all__ (fixes unused-import in __init__ and the "not exported" warning where tests import it from the package). - Sort imports in _gemini_activity.py (ruff I001). - Suppress reportUnusedClass on _TemporalApiClient (used in the sibling module) and reportUnusedFunction on the autouse MCP fixture, matching repo convention. - Use collections.abc.AsyncIterator and annotate the test helper parameter. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add temporalio/contrib/google_genai/testing.py so users can test workflows that use TemporalAsyncClient without real Gemini API calls — the agent-framework "test fakes are table-stakes" expectation, matching openai_agents/testing.py. - text_response / function_call_response build canned generate_content bodies - GeminiTestServer scripts model responses (incl. AFC turns and streaming) and exposes a GoogleGenAIPlugin via .plugin(); records requests for assertions Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Set TemporalAsyncClient(streaming_topic=...) and host a WorkflowStream in the workflow's @workflow.init; each generate_content_stream chunk is then published to that topic (as a parsed GenerateContentResponse) as it arrives, so external WorkflowStreamClient consumers observe model output in real time while the workflow runs durably. The workflow's own iteration is unchanged (still batched). - _models: _GeminiApiRequest carries streaming_topic + streaming_batch_interval_ms - TemporalAsyncClient/_TemporalApiClient: streaming_topic + streaming_batch_interval; fail fast (GoogleGenAIError) if a topic is set but no WorkflowStream is hosted - streamed activity publishes via WorkflowStreamClient.from_within_activity; publishing is best-effort and never breaks the batched return - README Streaming section + module docstring; tests in test_gemini_streaming.py Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
pydoctor (warnings-as-errors) couldn't resolve the `~`-prefixed `:class:` cross-references added for streaming/testing docstrings. Drop the `~` prefix to use the full dotted path, matching the form already used elsewhere (e.g. openai_agents references temporalio.contrib.workflow_streams.WorkflowStream). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
First-class Temporal integration for the Google Gen AI SDK
Adds
temporalio.contrib.google_genai, letting you use the Gemini SDK'sAsyncClientdirectly inside Temporal workflows. Every Gemini API call, toolcall, file operation, interaction, managed-agent operation, and MCP call becomes
a durable Temporal activity — crash recovery, full event-history visibility,
and replay safety — while credentials stay entirely on the worker.
Key properties
genai.Clientlives only onthe worker, inside activities; no API keys or tokens appear in event history.
activity_as_toolwrappers work with no manual agent loop.overridden); use the activity
retry_policy. API errors are classifiedretryable/non-retryable.
What's supported
client.models.generate_content/generate_content_streamactivity_as_tool(fn, ...)or a plain workflow methodclient.files.upload/download/register_filesclient.file_search_stores.*client.interactions.create/get/cancel/delete(+ streamed)client.agents.create/get/list/deleteTemporalMcpClientSession(name)intools=[...]{name}-list-tools/{name}-call-toolactivitiesstreaming_topic=+WorkflowStreamclient.webhooksis not supported in workflows and raises.How it works
The integration shims the SDK at the layers where I/O happens, so workflows use
the native client surface unchanged:
TemporalAsyncClientsubclassesAsyncClientand injects a private_TemporalApiClient(aBaseApiClientwhoseasync_requestdispatches HTTPthrough
workflow.execute_activityinstead of the network). The SDK'srequest-formatting and AFC loop run in the workflow; only the raw HTTP call
crosses into an activity.
models,tunings,caches,batches,tokens,etc. are inherited unchanged.
register as whole activities (avoiding filesystem + token refresh in the
sandbox).
BaseApiClientvia a vendoredclient, so each operation is routed as a whole through its own activity;
streamed interactions are drained in the activity and replayed to the workflow.
GoogleGenAIPlugin(SimplePlugin) registers the activities, installs aPydantic data converter, configures sandbox passthrough (
google.genai,mcp,pydantic_core,annotated_types), and registersGoogleGenAIErroras a workflow-failure type. You pass a fully-configured
genai.Client; theplugin never builds one.
Tool calling
activity_as_toolwraps any@activity.defnas a Gemini tool; the AFC loop(in-workflow) dispatches it as a durable activity. A timeout is required (no
default — matching
openai_agents/strands). Plain workflow methods can also bepassed as tools and run in-workflow.
MCP
GoogleGenAIPlugin(client, mcp_servers={name: factory})— and placeTemporalMcpClientSession(name)in agenerate_contenttoolslist. TheSDK's AFC loop drives it;
list_tools/call_toolrun as activities against apooled, idle-evicted worker-side connection. The full tool parameter schema
reaches the model.
Tool(mcp_servers=[McpServer(...)])) and InteractionsAPI MCP steps are executed by Google's backend and flow through unchanged.
mcpis an optional dependency (lazy-imported;TemporalMcpClientSessionisexposed lazily so importing the package doesn't require it).
Streaming
Set
TemporalAsyncClient(streaming_topic="...")and host atemporalio.contrib.workflow_streams.WorkflowStreamin@workflow.init. Eachgenerate_content_streamchunk is published to that topic (as a parsedGenerateContentResponse) as it arrives, so an externalWorkflowStreamClientcan observe model output in real time while the workflow runs durably. The
workflow's own iteration is unchanged (still batched for the SDK).
Testing utilities
temporalio.contrib.google_genai.testingshipstext_response/function_call_responsebuilders andGeminiTestServer, which scripts modelresponses and returns a
GoogleGenAIPlugin— so users can test Gemini workflows(including AFC tool loops and streaming) without a real LLM endpoint.
Usage
Testing
58 tests (
test_gemini.py,test_gemini_mcp.py,test_gemini_streaming.py),including:
sequential multi-tool, failure propagation), batched streaming, per-request
http_options, multi-turn chatio.BytesIO), download, file-search upload, full end-to-endwith the real activities + a mocked client
exact activity-scheduling counts; Vertex/interactions MCP pass-through
WorkflowStream+ fail-fast when no stream is hostedReplayerwith the plugin) and side-effects(
max_cached_workflows=0, exactActivityTaskScheduledcounts)Notes
google-genaiis pinned<2.8.0: 2.8.0 regressed in-workflow AFC for plainworkflow-method tools (the tool runs but its state mutation isn't visible on
replay/query). Lift once a fixed release ships upstream.
project/locationexplicitly toTemporalAsyncClient—the SDK's environment auto-discovery can't run deterministically in the
sandbox.
time/uuid/randomon the in-workflowgenerate/AFC path; the one in-workflow exception (
batches.createauto-namingon Vertex) is documented.