diff --git a/.ai/skills/diffusers-cli/SKILL.md b/.ai/skills/diffusers-cli/SKILL.md new file mode 100644 index 000000000000..cf70620044d7 --- /dev/null +++ b/.ai/skills/diffusers-cli/SKILL.md @@ -0,0 +1,68 @@ +--- +name: diffusers-cli +description: > + Use when the user wants to run a diffusers pipeline from a terminal (one-off + generation, batch jobs, smoke-testing a new model), submit jobs to HF Jobs + hardware via `--remote`, introspect a pipeline's input schema before + calling it, or attach a LoRA at inference time. Prefer this over writing + ad-hoc Python scripts for generation tasks. +--- + +## Overview + +`diffusers-cli` is the shipped CLI in `src/diffusers/commands/`. Subcommands relevant to agentic use: + +| Command | Purpose | +| --------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `generate` | Run any `DiffusionPipeline` or `ModularPipeline`. Forwards `--pipeline-kwargs` verbatim, saves output by sniffing its runtime type, optionally runs on HF Jobs via `--remote`. | +| `describe` | Print the input schema for a pipeline repo (kwarg names, types, defaults, descriptions). **No weights downloaded** — only the small index file. | +| `custom_blocks` | Package a local `ModularPipelineBlocks` subclass for the Hub. | +| `env` | Print versions of diffusers + torch + transformers + accelerate + safetensors + CUDA + GPU info. Use when investigating environment issues, dtype/precision support, or building bug reports. | + +## When to read which file + +Most agentic work goes through `generate`. Read the matching reference file before constructing a command: + +- **[`generate.md`](generate.md)** — full reference for `diffusers-cli generate`. Covers `--pipeline-kwargs` + semantics and the shell-quoting gotcha, LoRA via `--lora`, optimization flags (`--dtype`, `--cpu-offload`, + `--attention-backend`, `--vae-tiling/slicing`), output handling and `--push-to` bucket uploads, the full + `--remote` HF Jobs flow (image, container command, log streaming, timing payload, artifact download), and + context parallel (`--context-parallel`) for both local-torchrun and `--remote` paths. + +The other commands are small enough that `diffusers-cli --help` is the canonical reference: + +```bash +diffusers-cli describe --help +diffusers-cli custom_blocks --help +diffusers-cli env --help +``` + +## When NOT to use this skill + +- Multi-stage workflows where you need intermediate tensor manipulation between pipelines → write Python. +- Training or fine-tuning → CLI only covers inference. +- Anything requiring custom `device_map`, `quantization_config`, or other low-level loader knobs not exposed by + the CLI flags → write Python. + +## Verifying the CLI is installed + +The console entry point is registered in `pyproject.toml` (`diffusers-cli = +"diffusers.commands.diffusers_cli:main"`). If `diffusers-cli` is not on PATH after `pip install -e .`, reinstall +with `pip install -e . --force-reinstall --no-deps` and check `which diffusers-cli`. If the installed binary is +missing recent features (e.g. you see `unrecognized arguments: --lora`), reinstall. + +## Output formats + +`--format {auto, human, agent, json}` (top-level flag, must appear before the subcommand): + +- **`human`** — plain-text indented output for terminals (default when not running under an agent harness). No ANSI color. +- **`agent`** — TSV tables and `key=value` lines. Auto-selected when an agent env var is present + (`CLAUDECODE`, `CLAUDE_CODE`, `CODEX_SANDBOX`, `CURSOR_AI`, `AIDER_AI_CONTEXT`, `GH_COPILOT_AGENT`, + `AI_AGENT`). Token-cheap for LLM agents to read. +- **`json`** — compact JSON. Use for programmatic parsing (scripts, services) where type fidelity and nested + structures matter. + +`stdout` carries data; `stderr` carries hints/warnings/progress — parseable output is never polluted. + +Rule of thumb: `--format json` for scripts that will `json.loads()` the output, otherwise leave it on +auto-detect (`agent` for LLMs, `human` for terminals). diff --git a/.ai/skills/diffusers-cli/generate.md b/.ai/skills/diffusers-cli/generate.md new file mode 100644 index 000000000000..4ba9738ba94e --- /dev/null +++ b/.ai/skills/diffusers-cli/generate.md @@ -0,0 +1,175 @@ +# `diffusers-cli generate` — reference + +Full surface for `diffusers-cli generate`. Use this file as the source of truth when constructing a `generate` +invocation. The top-level [`SKILL.md`](SKILL.md) covers when to use the CLI; this file covers how. + +## The describe → generate flow + +For any model you haven't called before, run `describe` first to learn its input contract, then `generate` with +the right `--pipeline-kwargs`: + +```bash +# 1. Discover what kwargs the pipeline takes (no weight download) +diffusers-cli --format json describe --model black-forest-labs/FLUX.2-klein-9B + +# 2. Run it +diffusers-cli generate \ + --model black-forest-labs/FLUX.2-klein-9B \ + --pipeline-kwargs '{"prompt": "Make the cats fur grey", "image": "https://blobcdn.same.energy/a/d0/58/d058b51c2329b0ea4057e9f12cd9a1da36347e34"}' \ + --dtype bf16 +``` + +`describe --format json` emits a `{task, model, pipeline_class, inputs[]}` payload where each input is +`{name, type_hint, default, required, description}`. + +## Standard vs modular detection + +`generate` auto-detects which kind of pipeline it's calling: + +1. If `model_index.json` exists on the repo → `DiffusionPipeline.from_pretrained` path. +2. Otherwise → `ModularPipeline.from_pretrained` path. + +You don't need to tell it which. Modular repos must pass `--trust-remote-code` if they ship custom block code. + +## `--pipeline-kwargs` semantics + +A JSON object passed straight through to `pipeline(**kwargs)`. String values at known image-input keys (`image`, +`mask_image`, `control_image`, `ip_adapter_image`, `image_2`) are auto-loaded as PIL images, so you can pass URLs +or local paths directly: + +```bash +diffusers-cli generate \ + --model black-forest-labs/FLUX.2-klein-9B \ + --pipeline-kwargs '{"image": "https://example.com/cat.png", "prompt": "make the fur grey", "strength": 0.6}' +``` + +**Shell-quoting gotcha**: the JSON must be on one line (or use `\` to line-continue). A literal newline inside the +single-quoted argument lands as a raw control char inside the string and breaks `json.loads`. + +## LoRA adapters (`--lora`) + +Attach a LoRA after the pipeline loads via a JSON spec: + +```bash +diffusers-cli generate \ + --model black-forest-labs/FLUX.2-klein-9B \ + --pipeline-kwargs '{"prompt": "a tiny grey cat"}' \ + --lora '{"lora_id": "alvdansen/littletinies", "lora_scale": 0.8}' +``` + +Calls `pipeline.load_lora_weights(, adapter_name="default")` and, if `lora_scale` is present, +`pipeline.set_adapters(["default"], adapter_weights=[])`. Errors clearly if the pipeline doesn't support +LoRA or `lora_id` is missing. + +## Optimization flags + +- `--dtype {auto, bf16, fp16, fp32, …}` — pipeline weight dtype. `bf16` is the right default for modern DiTs on + A100/H100. +- `--cpu-offload {model, group}` — `model` uses `enable_model_cpu_offload`, `group` uses + `enable_group_offload(offload_type="leaf_level", use_stream=True)`. Use `group` to fit a 9B+ model on a single A100. +- `--attention-backend {default, flash_hub, flash_varlen_hub, flash_4_hub, sage_hub}` — hub-hosted kernels, + auto-downloaded on first use. Failures (kernel not available, CUDA arch mismatch, network) raise a clear + `SystemExit` listing the alternatives instead of silently reverting to the default. +- `--vae-tiling` / `--vae-slicing` — lower peak VAE decode VRAM. +- `--context-parallel` — Ulysses-style context parallelism on a DiT. See [Context parallel](#context-parallel) below. + +`disable_mmap=True` is always passed to `from_pretrained` — sequential reads are faster than mmap page-faults on +most filesystems. + +## Output handling + +`generate` sniffs the pipeline return type and saves accordingly: + +- `PIL.Image` / list of them → `outputs/generate-.png` +- Frame sequence (≥2 PILs or ndarrays) → `outputs/generate-0.mp4` (uses `--fps`, default 8) +- Numpy audio array → `outputs/generate-0.wav` (uses `--sampling-rate`) +- Anything else → JSON dump + +Override the destination with `--output ` (file or directory). + +Use `--push-to /` to upload outputs to an HF bucket after saving. The bucket is created if it +doesn't exist; objects land under `/`. + +## Remote execution (`--remote`) + +Adds `--remote` to submit the same call as a Hugging Face Job: + +```bash +diffusers-cli generate \ + --model black-forest-labs/FLUX.2-klein-9B \ + --pipeline-kwargs '{"prompt": "Make the cats fur grey", "image": "https://blobcdn.same.energy/a/d0/58/d058b51c2329b0ea4057e9f12cd9a1da36347e34"}' \ + --remote --flavor a100-large \ + --dtype bf16 \ + --cpu-offload group +``` + +What happens: + +1. Your HF token is picked up (from `--token` or your login). +2. A bucket (`/jobs-artifacts` by default) is created if it doesn't exist. +3. The job runs in a pytorch container that already has torch + CUDA preinstalled. Only the small Python + deps (`diffusers`, `accelerate`, `transformers`, `safetensors`) are installed at container start — about + 50 MB instead of 3 GB. +4. Container logs stream to your terminal. When the job finishes, the CLI downloads every file the job + uploaded to the bucket under its `run_id` prefix into `./outputs/`. +5. A timing breakdown (`queued_seconds`, `run_seconds`, `total_seconds`) is printed and included in the JSON + payload. + +Flags: + +- `--flavor ` — HF Jobs hardware (e.g. `a10g-small`, `a100-large`, `4xa100-large`). +- `--timeout ` — max wallclock (e.g. `30m`, `2h`). Defaults to `10m`. +- `--dependencies ` — extra pip deps (repeatable). +- `--namespace ` — run under a different account. +- `--no-wait` — submit, return job id, don't stream logs. +- `--push-to ` — override the artifact bucket id. + +## Context parallel + +`--context-parallel` enables Ulysses CP on a DiT-based pipeline. **Locally** the user must launch via torchrun: + +```bash +torchrun --nproc-per-node=2 -m diffusers.commands.diffusers_cli generate \ + --model black-forest-labs/FLUX.2-klein-9B \ + --pipeline-kwargs '{"prompt": "Make the cats fur grey"}' \ + --dtype bf16 \ + --context-parallel +``` + +**Remotely** the CLI handles the torchrun wrapping — just pass `--context-parallel` to a `--remote` invocation on +a multi-GPU flavor: + +```bash +diffusers-cli generate \ + --model black-forest-labs/FLUX.2-klein-9B \ + --pipeline-kwargs '{"prompt": "Make the cats fur grey", "image": "https://blobcdn.same.energy/a/d0/58/d058b51c2329b0ea4057e9f12cd9a1da36347e34"}' \ + --remote --flavor 4xa100-large \ + --dtype bf16 \ + --context-parallel +``` + +Inside the container, CP swaps the entrypoint to `torchrun --nproc-per-node=gpu -m +diffusers.commands.diffusers_cli`, initializes a hybrid process group (`cpu:gloo,cuda:nccl` — NCCL for the +attention all-to-all, Gloo for `ulysses_anything`'s per-rank size coordination), pins each rank to +`cuda:{LOCAL_RANK}`, and gates output saving/printing to rank 0 only. + +**Memory note**: CP shards the sequence, **not the weights**. Every rank still holds the full transformer. Wins +are wall-clock attention speedup and headroom for very long sequences, not "fit a model that doesn't fit." For +weight sharding you'd want TP or FSDP — not exposed in the CLI yet. + +CP is DiT-only. UNet pipelines raise a clear error directing you to a DiT pipeline (FLUX, SD3, HunyuanDiT, +AuraFlow, …). + +## Output mode (`--format`) + +The CLI auto-detects when running under an AI coding agent (Claude Code, Cursor, Aider, GH Copilot Agent — via +`CLAUDECODE`, `CLAUDE_CODE`, `CURSOR_AI`, `AIDER_AI_CONTEXT`, `GH_COPILOT_AGENT`) and switches output to **agent +mode** automatically — TSV tables, `key=value` results, compact JSON dicts, no progress bars. + +Override explicitly with `--format {auto, human, agent, json}` placed **before** the subcommand: + +```bash +diffusers-cli --format json generate --model --pipeline-kwargs '...' +``` + +The legacy `--json` flag on `generate` still works as a shortcut for `--format json`. diff --git a/src/diffusers/commands/_common.py b/src/diffusers/commands/_common.py new file mode 100644 index 000000000000..bd95b3f88969 --- /dev/null +++ b/src/diffusers/commands/_common.py @@ -0,0 +1,43 @@ +# Copyright 2026 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Shared helpers used by multiple ``diffusers-cli`` subcommands. + +Anything imported by more than one command file lives here so command modules stay standalone — no cross-command +imports between e.g. ``describe`` and ``generate``. +""" + +from __future__ import annotations + +from argparse import Namespace +from pathlib import Path + + +def try_fetch_config(args: Namespace, filename: str) -> str | None: + """Resolve ``filename`` for ``args.model`` (local path or Hub repo). Return None if absent. + + Used by ``generate`` (to detect modular vs standard pipelines) and ``describe`` (to read the pipeline class for + schema introspection) — no weights are downloaded, only the small index file. + """ + local = Path(args.model) + if local.exists(): + candidate = local / filename + return str(candidate) if candidate.exists() else None + + from huggingface_hub import hf_hub_download + from huggingface_hub.utils import EntryNotFoundError, HfHubHTTPError, RepositoryNotFoundError + + try: + return hf_hub_download(args.model, filename, revision=args.revision, token=args.token) + except (EntryNotFoundError, HfHubHTTPError, RepositoryNotFoundError): + return None diff --git a/src/diffusers/commands/_output.py b/src/diffusers/commands/_output.py new file mode 100644 index 000000000000..5b08e2c909e4 --- /dev/null +++ b/src/diffusers/commands/_output.py @@ -0,0 +1,144 @@ +# Copyright 2026 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Output formatting for ``diffusers-cli``. + +Commands print through the singleton ``out`` instead of calling ``print`` directly. ``out`` picks the right format +(human, agent, or json) based on the top-level ``--format`` flag, so commands don't have to check the mode themselves. +""" + +from __future__ import annotations + +import json +import os +from enum import Enum +from typing import Any, Sequence + + +# Environment variables set by known AI coding agents. If any of these is set, `--format auto` +# picks AGENT mode instead of HUMAN. +_AGENT_ENV_VARS = ( + "CLAUDECODE", # Claude Code + "CLAUDE_CODE", # alt spelling + "CODEX_SANDBOX", # Codex + "CURSOR_AI", # Cursor + "AIDER_AI_CONTEXT", # Aider + "GH_COPILOT_AGENT", # GitHub Copilot Agent +) + + +def is_agent() -> bool: + """Return True if the CLI is being run by an AI coding agent.""" + return any(os.environ.get(v) for v in _AGENT_ENV_VARS) + + +class OutputFormat(str, Enum): + AUTO = "auto" + HUMAN = "human" + AGENT = "agent" + JSON = "json" + + +class Output: + """Picks the print format for each method based on the active mode (human / agent / json).""" + + mode: OutputFormat + + def __init__(self) -> None: + self.set_mode(OutputFormat.AUTO) + + def set_mode(self, mode: OutputFormat) -> None: + """Set the active output mode. AUTO becomes AGENT or HUMAN based on is_agent().""" + if mode == OutputFormat.AUTO: + mode = OutputFormat.AGENT if is_agent() else OutputFormat.HUMAN + self.mode = mode + + # ------------------------------------------------------------------ stdout + + def text(self, msg: str) -> None: + """Print a line of text. Same in every mode.""" + print(msg) + + def dict(self, data: dict[str, Any]) -> None: + """Print a dict as JSON. Indented for HUMAN, compact for AGENT and JSON.""" + indent = 2 if self.mode == OutputFormat.HUMAN else None + print(json.dumps(data, indent=indent, default=str)) + + def result(self, message: str, **data: Any) -> None: + """Print a result summary. + + - HUMAN: the message line followed by `` key: value`` lines. + - AGENT: ``key=value`` pairs separated by spaces on one line. + - JSON: compact JSON of the data dict. + """ + if self.mode == OutputFormat.HUMAN: + print(message) + for k, v in data.items(): + if v is not None: + print(f" {k}: {v}") + elif self.mode == OutputFormat.AGENT: + parts = [f"{k}={v}" for k, v in data.items() if v is not None] + print(" ".join(parts) if parts else message) + elif self.mode == OutputFormat.JSON: + print(json.dumps(data, default=str)) + + def table( + self, + items: Sequence[dict[str, Any]], + *, + headers: list[str] | None = None, + ) -> None: + """Print a list of dicts as a table. + + - HUMAN: columns padded so each column lines up. + - AGENT: tab-separated values, one row per line. + - JSON: the list itself as a JSON array. + + ``headers`` defaults to the keys of the first item. + """ + if not items: + if self.mode in (OutputFormat.HUMAN, OutputFormat.AGENT): + print("No results.") + elif self.mode == OutputFormat.JSON: + print("[]") + return + + if headers is None: + headers = list(items[0].keys()) + + if self.mode == OutputFormat.JSON: + print(json.dumps(list(items), default=str)) + return + + rows = [[_cell(item.get(h)) for h in headers] for item in items] + if self.mode == OutputFormat.AGENT: + print("\t".join(headers)) + for row in rows: + print("\t".join(row)) + return + + # HUMAN: pad each column to its widest cell so they line up. + widths = [max(len(h), *(len(r[i]) for r in rows)) for i, h in enumerate(headers)] + print(" ".join(h.ljust(widths[i]) for i, h in enumerate(headers))) + for row in rows: + print(" ".join(c.ljust(widths[i]) for i, c in enumerate(row))) + + +def _cell(value: Any) -> str: + if value is None: + return "" + return str(value) + + +# Shared instance imported by every subcommand. +out = Output() diff --git a/src/diffusers/commands/custom_blocks.py b/src/diffusers/commands/custom_blocks.py index 953240c5a2c3..324978c83d3a 100644 --- a/src/diffusers/commands/custom_blocks.py +++ b/src/diffusers/commands/custom_blocks.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" -Usage example: - TODO +"""``diffusers-cli custom_blocks`` — package a local ``ModularPipelineBlocks`` subclass for the Hub. + +Parses ``block.py`` (or ``--block_module_name``), instantiates the chosen block, and calls ``save_pretrained`` in the +current working directory. """ import ast @@ -28,7 +29,6 @@ EXPECTED_PARENT_CLASSES = ["ModularPipelineBlocks"] -CONFIG = "config.json" def conversion_command_factory(args: Namespace): @@ -38,7 +38,27 @@ def conversion_command_factory(args: Namespace): class CustomBlocksCommand(BaseDiffusersCLICommand): @staticmethod def register_subcommand(parser: ArgumentParser): - conversion_parser = parser.add_parser("custom_blocks") + from argparse import RawDescriptionHelpFormatter + + epilog = ( + "Examples\n" + " $ diffusers-cli custom_blocks\n" + " $ diffusers-cli custom_blocks --block_module_name my_block.py\n" + " $ diffusers-cli custom_blocks --block_module_name my_block.py --block_class_name MyDenoiseBlock\n" + "\n" + "Learn more\n" + " Use `diffusers-cli --help` for more information about a command.\n" + " Read the documentation at https://huggingface.co/docs/diffusers\n" + ) + + conversion_parser = parser.add_parser( + "custom_blocks", + help="Package a local ModularPipelineBlocks subclass for the Hub.", + usage="\n diffusers-cli custom_blocks [options]", + epilog=epilog, + formatter_class=RawDescriptionHelpFormatter, + ) + conversion_parser._optionals.title = "Options" conversion_parser.add_argument( "--block_module_name", type=str, @@ -85,11 +105,6 @@ def run(self): spec.loader.exec_module(module) getattr(module, child_class)().save_pretrained(os.getcwd()) - # or, we could create it manually. - # automap = self._create_automap(parent_class=parent_class, child_class=child_class) - # with open(CONFIG, "w") as f: - # json.dump(automap, f) - def _choose_block(self, candidates, chosen=None): for cls, base in candidates: if cls == chosen: @@ -125,8 +140,3 @@ def _get_base_name(self, node: ast.expr): val = self._get_base_name(node.value) return f"{val}.{node.attr}" if val else node.attr return None - - def _create_automap(self, parent_class, child_class): - module = str(self.block_module_name).replace(".py", "").rsplit(".", 1)[-1] - auto_map = {f"{parent_class}": f"{module}.{child_class}"} - return {"auto_map": auto_map} diff --git a/src/diffusers/commands/describe.py b/src/diffusers/commands/describe.py new file mode 100644 index 000000000000..12b894dfe9ce --- /dev/null +++ b/src/diffusers/commands/describe.py @@ -0,0 +1,239 @@ +# Copyright 2026 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""``diffusers-cli describe`` — print the input schema for any pipeline repo. + +Tries ``DiffusionPipeline.config_name`` first (so standard repos get their ``__call__`` signature introspected); falls +back to ``ModularPipelineBlocks.from_pretrained`` for modular repos. No weights are downloaded — only the small index +file (and any custom block code if ``--trust-remote-code`` is set). +""" + +from __future__ import annotations + +import inspect +import json +import re +from argparse import ArgumentParser, Namespace, _SubParsersAction +from typing import Any + +from . import BaseDiffusersCLICommand +from ._common import try_fetch_config +from ._output import OutputFormat, out + + +def _describe(args: Namespace) -> None: + """Print the pipeline's input schema. + + Tries ``DiffusionPipeline.config_name`` (= ``model_index.json``) first; if present, introspects the declared + pipeline class's ``__call__`` signature. Otherwise falls back to ``ModularPipelineBlocks.from_pretrained`` and + reads the block-declared ``inputs``. No weights downloaded either way. + """ + import diffusers + + model_index = try_fetch_config(args, diffusers.DiffusionPipeline.config_name) + if model_index is not None: + with open(model_index) as f: + index = json.load(f) + class_name = index.get("_class_name") + if class_name is None: + raise SystemExit( + f"{diffusers.DiffusionPipeline.config_name} for {args.model!r} has no `_class_name` field." + ) + pipeline_cls = getattr(diffusers, class_name, None) + if pipeline_cls is None: + raise SystemExit( + f"Pipeline class {class_name!r} declared in {diffusers.DiffusionPipeline.config_name} " + "is not exported by the installed diffusers." + ) + + sig = inspect.signature(pipeline_cls.__call__) + descriptions = _parse_docstring_args(pipeline_cls.__call__.__doc__) if args.verbose else {} + schema: list[dict[str, Any]] = [] + for name, param in sig.parameters.items(): + if name == "self": + continue + if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): + continue + has_default = param.default is not inspect.Parameter.empty + schema.append( + { + "name": name, + "type_hint": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None, + "default": param.default if has_default else None, + "required": not has_default, + "description": descriptions.get(name, ""), + } + ) + else: + kwargs: dict[str, Any] = {"trust_remote_code": args.trust_remote_code} + if args.revision: + kwargs["revision"] = args.revision + if args.token: + kwargs["token"] = args.token + try: + blocks = diffusers.ModularPipelineBlocks.from_pretrained(args.model, **kwargs) + except Exception as e: + raise SystemExit( + f"Could not describe {args.model!r}: no {diffusers.DiffusionPipeline.config_name} and " + f"loading as a modular pipeline failed ({type(e).__name__}: {e}). " + "Is this a diffusers pipeline repo? Pass --trust-remote-code if it ships custom block code." + ) from e + + class_name = type(blocks).__name__ + schema = [ + { + "name": p.name, + "type_hint": str(p.type_hint) if p.type_hint is not None else None, + "default": p.default, + "required": p.required, + "description": p.description, + } + for p in blocks.inputs + ] + + if out.mode == OutputFormat.JSON: + out.dict({"task": "describe", "model": args.model, "pipeline_class": class_name, "inputs": schema}) + elif out.mode == OutputFormat.AGENT: + out.table(schema, headers=["name", "required", "type_hint", "default", "description"]) + else: + out.text(f"{class_name} ({args.model}) inputs:") + for entry in schema: + tag = "required" if entry["required"] else f"optional, default={entry['default']!r}" + out.text(f" {entry['name']} ({tag})") + if entry["type_hint"]: + out.text(f" type: {entry['type_hint']}") + if entry["description"]: + out.text(f" desc: {entry['description']}") + + +def _parse_docstring_args(docstring: str | None) -> dict[str, str]: + """Extract per-argument descriptions from a Google-style ``Args:`` block. + + Returns a ``{name: description}`` mapping. Best-effort — unrecognised formats just yield an empty dict rather than + raising. + """ + if not docstring: + return {} + + lines = docstring.expandtabs().splitlines() + start = None + section_indent = 0 + for i, line in enumerate(lines): + if line.strip() in ("Args:", "Arguments:", "Parameters:"): + start = i + 1 + section_indent = len(line) - len(line.lstrip()) + break + if start is None: + return {} + + descriptions: dict[str, str] = {} + current_name: str | None = None + current_lines: list[str] = [] + arg_indent: int | None = None + name_pattern = re.compile(r"^(\w+)\s*(?:\([^)]*\))?\s*:?\s*(.*)$") + + def _flush() -> None: + if current_name and current_lines: + descriptions[current_name] = " ".join(s.strip() for s in current_lines).strip() + + for line in lines[start:]: + if not line.strip(): + continue + indent = len(line) - len(line.lstrip()) + # A new top-level section ends the Args block. + if indent <= section_indent and line.strip().endswith(":"): + break + if arg_indent is None: + arg_indent = indent + if indent == arg_indent: + _flush() + current_lines = [] + match = name_pattern.match(line.strip()) + if match: + current_name = match.group(1) + tail = match.group(2).strip() + if tail: + current_lines.append(tail) + else: + current_name = None + elif current_name is not None and indent > arg_indent: + current_lines.append(line.strip()) + _flush() + return descriptions + + +class DescribeCommand(BaseDiffusersCLICommand): + task = "describe" + + @staticmethod + def register_subcommand(subparsers: _SubParsersAction) -> None: + from argparse import RawDescriptionHelpFormatter + + epilog = ( + "Examples\n" + " $ diffusers-cli describe -m stabilityai/stable-diffusion-xl-base-1.0\n" + " $ diffusers-cli describe -m black-forest-labs/FLUX.1-dev --verbose\n" + " $ diffusers-cli --format json describe -m stabilityai/stable-diffusion-xl-base-1.0\n" + "\n" + "Learn more\n" + " Use `diffusers-cli --help` for more information about a command.\n" + " Read the documentation at https://huggingface.co/docs/diffusers\n" + ) + + parser: ArgumentParser = subparsers.add_parser( + "describe", + help="Print the input schema for a diffusers pipeline repo. No weights downloaded.", + usage="\n diffusers-cli describe [options]", + epilog=epilog, + formatter_class=RawDescriptionHelpFormatter, + ) + parser._optionals.title = "Options" + parser.add_argument( + "--model", + "-m", + required=True, + help="Model id on the Hugging Face Hub or local path.", + ) + parser.add_argument( + "--revision", + default=None, + help="Model revision (branch, tag, or commit SHA).", + ) + parser.add_argument( + "--token", + default=None, + help="Hugging Face token for gated/private models.", + ) + parser.add_argument( + "--trust-remote-code", + action="store_true", + help="Allow custom code from the Hub (required for modular pipelines that ship block code).", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help=( + "Also include per-argument descriptions from the pipeline's __call__ docstring. " + "Modular pipelines always include block-declared descriptions; --verbose populates " + "the equivalent field for standard pipelines by parsing the Google-style Args: block." + ), + ) + parser.set_defaults(func=DescribeCommand) + + def __init__(self, args: Namespace): + self.args = args + + def run(self) -> None: + _describe(self.args) diff --git a/src/diffusers/commands/diffusers_cli.py b/src/diffusers/commands/diffusers_cli.py index a27ac24f2a3e..8deb98c9916b 100644 --- a/src/diffusers/commands/diffusers_cli.py +++ b/src/diffusers/commands/diffusers_cli.py @@ -15,23 +15,44 @@ from argparse import ArgumentParser +from ._output import OutputFormat, out from .custom_blocks import CustomBlocksCommand +from .describe import DescribeCommand from .env import EnvironmentCommand from .fp16_safetensors import FP16SafetensorsCommand +from .generate import GenerateCommand def main(): - parser = ArgumentParser("Diffusers CLI tool", usage="diffusers-cli []") - commands_parser = parser.add_subparsers(help="diffusers-cli command helpers") + parser = ArgumentParser( + prog="diffusers-cli", + usage="\n diffusers-cli [--format ] [options]", + ) + parser._optionals.title = "Options" + parser.add_argument( + "--format", + choices=[m.value for m in OutputFormat], + default=OutputFormat.AUTO.value, + help=( + "Output format. 'auto' (default) picks 'agent' when an AI coding agent is detected " + "(via CLAUDECODE/CURSOR_AI/AIDER_AI_CONTEXT/... env vars) and 'human' otherwise. " + "Must appear before the subcommand." + ), + ) + commands_parser = parser.add_subparsers(title="Commands", metavar="") # Register commands EnvironmentCommand.register_subcommand(commands_parser) FP16SafetensorsCommand.register_subcommand(commands_parser) CustomBlocksCommand.register_subcommand(commands_parser) + GenerateCommand.register_subcommand(commands_parser) + DescribeCommand.register_subcommand(commands_parser) # Let's go args = parser.parse_args() + out.set_mode(OutputFormat(args.format)) + if not hasattr(args, "func"): parser.print_help() exit(1) diff --git a/src/diffusers/commands/env.py b/src/diffusers/commands/env.py index ba37cfe98646..34962fd10c7b 100644 --- a/src/diffusers/commands/env.py +++ b/src/diffusers/commands/env.py @@ -55,7 +55,12 @@ def info_command_factory(_): class EnvironmentCommand(BaseDiffusersCLICommand): @staticmethod def register_subcommand(parser: ArgumentParser) -> None: - download_parser = parser.add_parser("env") + download_parser = parser.add_parser( + "env", + help="Print versions of diffusers and its dependencies (for bug reports).", + usage="\n diffusers-cli env", + ) + download_parser._optionals.title = "Options" download_parser.set_defaults(func=info_command_factory) def run(self) -> dict: diff --git a/src/diffusers/commands/fp16_safetensors.py b/src/diffusers/commands/fp16_safetensors.py index 382d6c39bd19..44e374b5707d 100644 --- a/src/diffusers/commands/fp16_safetensors.py +++ b/src/diffusers/commands/fp16_safetensors.py @@ -33,6 +33,13 @@ def conversion_command_factory(args: Namespace): + warnings.warn( + "`diffusers-cli fp16_safetensors` is deprecated and will be removed in a future version. " + "Convert weights to fp16 safetensors directly with `safetensors.torch.save_file` or via " + "`pipeline.save_pretrained(..., safe_serialization=True, variant='fp16')`.", + FutureWarning, + stacklevel=2, + ) if args.use_auth_token: warnings.warn( "The `--use_auth_token` flag is deprecated and will be removed in a future version." @@ -44,7 +51,12 @@ def conversion_command_factory(args: Namespace): class FP16SafetensorsCommand(BaseDiffusersCLICommand): @staticmethod def register_subcommand(parser: ArgumentParser): - conversion_parser = parser.add_parser("fp16_safetensors") + conversion_parser = parser.add_parser( + "fp16_safetensors", + help="[DEPRECATED] Convert a Hub checkpoint's weights to fp16 safetensors and push back as a PR.", + usage="\n diffusers-cli fp16_safetensors [options]", + ) + conversion_parser._optionals.title = "Options" conversion_parser.add_argument( "--ckpt_id", type=str, diff --git a/src/diffusers/commands/generate.py b/src/diffusers/commands/generate.py new file mode 100644 index 000000000000..da9eadbafe84 --- /dev/null +++ b/src/diffusers/commands/generate.py @@ -0,0 +1,996 @@ +# Copyright 2026 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""``diffusers-cli generate`` — single agentic entry point. + +Runs any diffusers pipeline (standard or modular) by forwarding ``--pipeline-kwargs`` verbatim, saves the output by +sniffing its runtime type, and can submit the same call to HF Jobs via ``--remote``. +""" + +from __future__ import annotations + +import json +import os +import sys +from argparse import ArgumentParser, Namespace, _SubParsersAction +from pathlib import Path +from typing import Any + +from diffusers.utils import load_image + +from . import BaseDiffusersCLICommand +from ._common import try_fetch_config +from ._output import out + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +DEFAULT_OUTPUT_DIR = "outputs" +DTYPE_CHOICES = ("auto", "float16", "fp16", "bfloat16", "bf16", "float32", "fp32") +CPU_OFFLOAD_CHOICES = ("model", "group") + + +def _hub_attention_backends() -> tuple[str, ...]: + """Hub-hosted attention backends sourced from ``_HUB_KERNELS_REGISTRY``. + + Single source of truth: if the registry grows or shrinks, the CLI choices follow. + """ + from diffusers.models.attention_dispatch import _HUB_KERNELS_REGISTRY + + return tuple(sorted(backend.value for backend in _HUB_KERNELS_REGISTRY)) + + +ATTENTION_BACKEND_CHOICES = ("default", *_hub_attention_backends()) + +# Keys whose string value should be resolved via ``diffusers.utils.load_image`` +# before being passed to the pipeline call. +_IMAGE_INPUT_KEYS = ( + "image", + "mask_image", + "control_image", + "ip_adapter_image", + "image_2", +) + +# Source for the diffusers install used by --remote jobs. While iterating on a +# feature branch, point at the GitHub tarball URL — uv installs it over plain +# HTTP and the container doesn't need ``git``. Once merged, switch back to a +# PyPI release pin. ``--dependencies "diffusers @ ..."`` on the local command +# appends additional dependencies but does not replace this default install. +DIFFUSERS_SOURCE = ( + "diffusers @ https://github.com/huggingface/diffusers/archive/refs/heads/diffuser-cli-for-agent.tar.gz" +) +_DEFAULT_REMOTE_DEPS = ( + DIFFUSERS_SOURCE, + "accelerate", + "transformers", + "safetensors", + "sentencepiece", # required by several text-encoder tokenizers (T5, LLaMA, …) + "ftfy", # required by older CLIP text-encoder paths +) + +# Base container image — provides torch + CUDA so ``uv pip install --system`` +# only has to add the small Python deps. cuda12.8 is the highest cuda12.x tag +# below the HF Jobs host driver's CUDA 12.9 max. +_DEFAULT_REMOTE_IMAGE = "pytorch/pytorch:2.10.0-cuda12.8-cudnn9-runtime" + +# Installed console-script name invoked inside the container after the deps land. +_CONTAINER_CLI_BINARY = "diffusers-cli" + +RUN_ID_ENV = "DIFFUSERS_CLI_RUN_ID" + +# Namespace keys that control *how* a remote job runs locally, not what runs +# inside the container. They are stripped when forwarding argv to the container. +HF_JOBS_KEYS = frozenset( + { + "remote", + "flavor", + "timeout", + "dependencies", + "namespace", + "no_wait", + "poll_interval", + "func", + "format", # top-level --format is a local rendering flag; never forward to the container + } +) + + +# --------------------------------------------------------------------------- +# Argparse helpers +# --------------------------------------------------------------------------- + + +def _add_loading_arguments(parser: ArgumentParser) -> None: + parser.add_argument("--model", "-m", required=True, help="Model id on the Hugging Face Hub or local path.") + parser.add_argument("--device", default=None, help="Device to run on (e.g. cpu, cuda, cuda:0, mps).") + parser.add_argument("--dtype", default="auto", choices=DTYPE_CHOICES, help="Torch dtype for pipeline weights.") + parser.add_argument("--variant", default=None, help='Optional weight variant (e.g. "fp16").') + parser.add_argument("--revision", default=None, help="Model revision (branch, tag, or commit SHA).") + parser.add_argument("--token", default=None, help="Hugging Face token for gated/private models.") + parser.add_argument("--trust-remote-code", action="store_true", help="Allow custom code from the Hub.") + parser.add_argument( + "--lora", + default=None, + help=( + "JSON object describing a LoRA adapter to attach after the pipeline loads. " + 'Shape: {"lora_id": "", "lora_scale": }. ' + 'Example: \'{"lora_id": "alvdansen/littletinies", "lora_scale": 0.8}\'.' + ), + ) + + +def _add_optimization_arguments(parser: ArgumentParser) -> None: + parser.add_argument( + "--cpu-offload", + choices=CPU_OFFLOAD_CHOICES, + default=None, + help=( + "Offload pipeline components to CPU during inference. " + "'model' uses enable_model_cpu_offload, " + "'group' uses pipeline.enable_group_offload(leaf_level, use_stream=True)." + ), + ) + parser.add_argument( + "--attention-backend", + choices=ATTENTION_BACKEND_CHOICES, + default="default", + help=( + "Override the attention backend on the transformer/UNet. " + "Only Hub-hosted kernels are exposed — they auto-download on first use." + ), + ) + parser.add_argument("--vae-tiling", action="store_true", help="Enable VAE tiling (lower peak VRAM).") + parser.add_argument("--vae-slicing", action="store_true", help="Enable VAE slicing (lower peak VRAM).") + parser.add_argument( + "--context-parallel", + action="store_true", + help=( + "Enable Ulysses-style context parallelism (ulysses_anything mode). " + "Requires a DiT-based pipeline and launching the CLI under torchrun with ≥2 GPUs." + ), + ) + parser.add_argument( + "--compile", + nargs="?", + const="{}", + default=None, + metavar="JSON", + help=( + "torch.compile every denoiser submodule on the pipeline. Accepts an optional JSON " + 'object of kwargs forwarded to ``torch.compile``, e.g. \'{"mode": "max-autotune", ' + '"fullgraph": true}\'. Bare ``--compile`` uses torch defaults. Adds a one-time compilation ' + "cost on the first step but speeds up every subsequent step — worth it for multi-step " + "generation (50+ steps)." + ), + ) + + +def _add_output_arguments(parser: ArgumentParser) -> None: + parser.add_argument( + "--output", + "-o", + default=None, + help="Output file or directory. Defaults to ./outputs/-..", + ) + parser.add_argument( + "--push-to", + default=None, + help=( + "Upload the generated files to this HF bucket id after saving (created if missing). " + "When --remote is set, defaults to /jobs-artifacts." + ), + ) + + +def _add_remote_arguments(parser: ArgumentParser) -> None: + parser.add_argument( + "--remote", + action="store_true", + help="Submit this command to Hugging Face Jobs instead of running locally.", + ) + parser.add_argument( + "--flavor", + default="a10g-small", + help="HF Jobs hardware flavor for --remote (e.g. a10g-small, a100-large, cpu-basic).", + ) + parser.add_argument( + "--timeout", + default="10m", + help="HF Jobs timeout for --remote (e.g. 30m, 2h). Defaults to 10m.", + ) + parser.add_argument( + "--dependencies", + action="append", + default=None, + help="Extra pip dependencies for the --remote job. Repeat to add multiple.", + ) + parser.add_argument( + "--namespace", + default=None, + help="HF namespace to run the --remote job under (defaults to the current user).", + ) + parser.add_argument( + "--no-wait", + action="store_true", + help="Don't wait for the --remote job to finish — submit and print the job id.", + ) + parser.add_argument( + "--poll-interval", + type=float, + default=5.0, + help="Seconds between job-status polls when waiting for --remote completion.", + ) + + +# --------------------------------------------------------------------------- +# Pipeline loading + optimization +# --------------------------------------------------------------------------- + + +def _resolve_dtype(name: str | None): + if name in (None, "auto"): + return "auto" + import torch + + mapping = { + "fp32": torch.float32, + "float32": torch.float32, + "fp16": torch.float16, + "float16": torch.float16, + "bf16": torch.bfloat16, + "bfloat16": torch.bfloat16, + } + if name not in mapping: + raise ValueError(f"Unknown dtype: {name}") + return mapping[name] + + +def _resolve_device(name: str | None) -> str: + if name: + return name + + from diffusers.utils.torch_utils import torch_device + + # Under torchrun, LOCAL_RANK identifies this process's assigned GPU. Without this + # pin every rank falls back to cuda:0 and OOMs as the pipeline replicates onto a + # single device. Only applies to cuda — torch_device already handles npu/xpu/mps/etc. + if torch_device == "cuda": + local_rank = os.environ.get("LOCAL_RANK") + if local_rank is not None: + import torch + + torch.cuda.set_device(int(local_rank)) + return f"cuda:{local_rank}" + return torch_device + + +def _map_to_device(pipeline: Any, args: Namespace, device: str) -> Any: + """Move the pipeline to ``device``, or hand off to the chosen CPU-offload helper.""" + if args.cpu_offload is None: + return pipeline.to(device) + if args.cpu_offload == "model": + pipeline.enable_model_cpu_offload(device=device) + elif args.cpu_offload == "group": + import torch + + pipeline.enable_group_offload( + onload_device=torch.device(device), + offload_type="leaf_level", + use_stream=True, + ) + return pipeline + + +def _denoiser(pipeline: Any) -> Any | None: + """Return the pipeline's denoiser submodule (transformer or unet) or None.""" + for attr in ("transformer", "unet"): + module = getattr(pipeline, attr, None) + if module is not None: + return module + return None + + +def _set_attention_backend(pipeline: Any, backend: str) -> None: + module = _denoiser(pipeline) + if module is None or not hasattr(module, "set_attention_backend"): + return + try: + module.set_attention_backend(backend) + except (ValueError, ImportError, RuntimeError) as e: + raise SystemExit( + f"Failed to set attention backend {backend!r}: {type(e).__name__}: {e}. " + f"Allowed backends: {', '.join(ATTENTION_BACKEND_CHOICES)}." + ) from e + + +def _enable_context_parallel(pipeline: Any) -> None: + import torch + + if not torch.distributed.is_available(): + raise SystemExit("--context-parallel requires a torch build with distributed support.") + + if not torch.distributed.is_initialized(): + # Hybrid backend: ulysses_anything's per-rank size coordination wants Gloo on CPU + # (avoids H2D/D2H for a tiny int tensor); the main attention all-to-all stays on NCCL. + torch.distributed.init_process_group(backend="cpu:gloo,cuda:nccl") + + transformer = getattr(pipeline, "transformer", None) + if transformer is None or not hasattr(transformer, "enable_parallelism"): + raise SystemExit( + "--context-parallel requires a DiT-based pipeline. " + f"{type(pipeline).__name__} does not expose a `transformer` with `enable_parallelism`." + ) + + from diffusers import ContextParallelConfig + + transformer.enable_parallelism( + config=ContextParallelConfig( + ulysses_degree=torch.distributed.get_world_size(), + ring_degree=1, + ulysses_anything=True, + ) + ) + + +def _apply_optimizations(pipeline: Any, args: Namespace) -> None: + """Apply VAE tiling/slicing, attention backend, context-parallel, and torch.compile toggles.""" + vae = getattr(pipeline, "vae", None) + if args.vae_tiling and vae is not None and hasattr(vae, "enable_tiling"): + vae.enable_tiling() + if args.vae_slicing and vae is not None and hasattr(vae, "enable_slicing"): + vae.enable_slicing() + if args.attention_backend != "default": + _set_attention_backend(pipeline, args.attention_backend) + if args.context_parallel: + _enable_context_parallel(pipeline) + if args.compile is not None: + _compile_denoiser(pipeline, args.compile) + + +def _compile_denoiser(pipeline: Any, compile_spec: str) -> None: + """Compile every ``transformer*`` and ``unet*`` submodule on the pipeline. + + ``compile_spec`` is the raw JSON string from ``--compile`` (``"{}"`` for bare flag). Decoded into kwargs and + forwarded verbatim to the compile call. + + Prefers regional compilation via ``module.compile_repeated_blocks(**kwargs)`` — only compiles the repeated inner + blocks (the bulk of the compute), much faster first-step latency than compiling the whole module. Falls back to + full ``torch.compile`` if the model doesn't expose ``_repeated_blocks``. + """ + import torch + + try: + compile_kwargs = json.loads(compile_spec) + except json.JSONDecodeError as e: + raise SystemExit(f"--compile must be valid JSON: {e}") from e + if not isinstance(compile_kwargs, dict): + raise SystemExit("--compile must decode to a JSON object.") + + for attr in dir(pipeline): + if not (attr.startswith("transformer") or attr.startswith("unet")): + continue + module = getattr(pipeline, attr, None) + if not isinstance(module, torch.nn.Module): + continue + + if getattr(module, "_repeated_blocks", None): + # Regional compile — only the repeated blocks. Mutates `module` in place. + module.compile_repeated_blocks(**compile_kwargs) + else: + # No regional metadata declared; fall back to compiling the whole module. + setattr(pipeline, attr, torch.compile(module, **compile_kwargs)) + + +def _from_pretrained_kwargs(args: Namespace) -> dict[str, Any]: + dtype = _resolve_dtype(args.dtype) + kwargs: dict[str, Any] = {"trust_remote_code": args.trust_remote_code, "disable_mmap": True} + if dtype != "auto": + kwargs["torch_dtype"] = dtype + if args.variant: + kwargs["variant"] = args.variant + if args.revision: + kwargs["revision"] = args.revision + if args.token: + kwargs["token"] = args.token + return kwargs + + +def _load_pipeline(args: Namespace, modular: bool) -> Any: + import diffusers + + pipeline_cls = diffusers.ModularPipeline if modular else diffusers.DiffusionPipeline + pipeline = pipeline_cls.from_pretrained(args.model, **_from_pretrained_kwargs(args)) + if not hasattr(pipeline, "to"): + return pipeline + pipeline = _map_to_device(pipeline, args, _resolve_device(args.device)) + _apply_optimizations(pipeline, args) + _load_lora(pipeline, args) + return pipeline + + +def _load_lora(pipeline: Any, args: Namespace) -> None: + """Attach a LoRA adapter from a JSON spec like ``{"lora_id": "...", "lora_scale": 0.8}``.""" + if not args.lora: + return + try: + spec = json.loads(args.lora) + except json.JSONDecodeError as e: + raise SystemExit(f"--lora must be valid JSON: {e}") from e + if not isinstance(spec, dict): + raise SystemExit("--lora must decode to a JSON object.") + lora_id = spec.get("lora_id") + if not lora_id: + raise SystemExit("--lora must include a 'lora_id' field.") + if not hasattr(pipeline, "load_lora_weights"): + raise SystemExit(f"{type(pipeline).__name__} does not support LoRA loading.") + + pipeline.load_lora_weights(lora_id, adapter_name="default") + scale = spec.get("lora_scale") + if scale is not None and hasattr(pipeline, "set_adapters"): + pipeline.set_adapters(["default"], adapter_weights=[float(scale)]) + + +# --------------------------------------------------------------------------- +# Modular pipeline detection + introspection +# --------------------------------------------------------------------------- + + +def _is_modular_repo(args: Namespace) -> bool: + """Detect by trying ``DiffusionPipeline.config_name`` first; modular iff that's absent.""" + from diffusers import DiffusionPipeline + + return try_fetch_config(args, DiffusionPipeline.config_name) is None + + +# --------------------------------------------------------------------------- +# Pipeline call helpers +# --------------------------------------------------------------------------- + + +def _parse_pipeline_kwargs(raw: str | None) -> dict[str, Any]: + if not raw: + return {} + try: + parsed = json.loads(raw) + except json.JSONDecodeError as e: + raise SystemExit(f"--pipeline-kwargs must be valid JSON: {e}") from e + if not isinstance(parsed, dict): + raise SystemExit("--pipeline-kwargs must decode to a JSON object.") + return parsed + + +def _resolve_image_inputs(call_kwargs: dict[str, Any]) -> None: + """Replace string paths/URLs at known image-input keys with PIL images.""" + for key in _IMAGE_INPUT_KEYS: + value = call_kwargs.get(key) + if isinstance(value, str): + call_kwargs[key] = load_image(value) + + +def _get_generator(seed: int | None, device: str): + if seed is None: + return None + import torch + + generator_device = "cpu" if device == "mps" else device + return torch.Generator(device=generator_device).manual_seed(seed) + + +def _result_to_savable(result: Any) -> Any: + """Unwrap a pipeline-output object into the raw payload the saver can sniff.""" + if hasattr(result, "images"): + return result.images + if hasattr(result, "frames"): + frames = result.frames + return frames[0] if isinstance(frames, (list, tuple)) and frames else frames + if hasattr(result, "audios"): + return result.audios + return result + + +# --------------------------------------------------------------------------- +# Output saving (auto-sniff by type) +# --------------------------------------------------------------------------- + + +def _default_output_paths(task: str, num: int, explicit: str | None, ext: str) -> list[Path]: + if explicit is None: + base = Path(DEFAULT_OUTPUT_DIR) + base.mkdir(parents=True, exist_ok=True) + return [base / f"{task}-{i}.{ext}" for i in range(num)] + + p = Path(explicit) + if explicit.endswith(os.sep) or p.is_dir(): + p.mkdir(parents=True, exist_ok=True) + return [p / f"{task}-{i}.{ext}" for i in range(num)] + + p.parent.mkdir(parents=True, exist_ok=True) + if num == 1: + return [p] + stem, suffix = p.stem, p.suffix or f".{ext}" + return [p.with_name(f"{stem}-{i}{suffix}") for i in range(num)] + + +def _as_pil_list(value: Any): + try: + from PIL.Image import Image as PILImage + except ImportError: + return None + if isinstance(value, PILImage): + return [value] + if isinstance(value, (list, tuple)) and value and all(isinstance(v, PILImage) for v in value): + return list(value) + return None + + +def _as_frame_sequence(value: Any): + try: + from PIL.Image import Image as PILImage + except ImportError: + PILImage = None # type: ignore[assignment] + + if isinstance(value, (list, tuple)) and len(value) >= 2: + first = value[0] + if PILImage is not None and isinstance(first, PILImage): + return list(value) + try: + import numpy as np + + if isinstance(first, np.ndarray): + return list(value) + except ImportError: + pass + return None + + +def _as_audio_arrays(value: Any): + try: + import numpy as np + except ImportError: + return None + if isinstance(value, np.ndarray) and value.ndim <= 2: + return [value] + if isinstance(value, (list, tuple)) and value and all(isinstance(v, np.ndarray) for v in value): + return list(value) + return None + + +def _save_audio_arrays(audios, sampling_rate: int, args: Namespace, task: str) -> list[str]: + """Write each numpy audio array to a 16-bit PCM WAV at ``sampling_rate`` Hz. + + Uses the stdlib ``wave`` module so no scipy dependency is required. + """ + import wave + + import numpy as np + + paths = _default_output_paths(task, len(audios), args.output, ext="wav") + saved: list[str] = [] + for audio, path in zip(audios, paths): + data = np.asarray(audio) + if data.dtype.kind == "f": + data = (np.clip(data, -1.0, 1.0) * 32767).astype(np.int16) + else: + data = data.astype(np.int16) + if data.ndim == 1: + n_channels = 1 + else: + # Heuristic: shorter axis is channels (interleaved layout for `wave` is + # samples × channels, so transpose if needed). + if data.shape[0] < data.shape[-1]: + data = data.T + n_channels = data.shape[1] + with wave.open(str(path), "wb") as w: + w.setnchannels(n_channels) + w.setsampwidth(2) # 16-bit PCM + w.setframerate(sampling_rate) + w.writeframes(data.tobytes()) + saved.append(str(path)) + return saved + + +def _save_output(value: Any, args: Namespace, task: str) -> list[str]: + """Save ``value`` by sniffing its runtime type.""" + pil_images = _as_pil_list(value) + if pil_images is not None: + paths = _default_output_paths(task, len(pil_images), args.output, ext="png") + for img, path in zip(pil_images, paths): + img.save(path) + return [str(p) for p in paths] + + frames = _as_frame_sequence(value) + if frames is not None: + from diffusers.utils import export_to_video + + path = _default_output_paths(task, 1, args.output, ext="mp4")[0] + export_to_video(frames, str(path), fps=args.fps) + return [str(path)] + + audios = _as_audio_arrays(value) + if audios is not None: + return _save_audio_arrays(audios, args.sampling_rate or 16000, args, task) + + path = _default_output_paths(task, 1, args.output, ext="json")[0] + Path(path).write_text(json.dumps(value, default=str, indent=2)) + return [str(path)] + + +# --------------------------------------------------------------------------- +# Hub bucket upload (--push-to) +# --------------------------------------------------------------------------- + + +def _push_outputs(args: Namespace, saved_paths: list[str], task: str) -> dict[str, Any] | None: + """Upload ``saved_paths`` to the ``--push-to`` bucket. Returns a summary or None.""" + if not args.push_to: + return None + + from huggingface_hub import HfApi + + api = HfApi(token=args.token) + api.create_bucket(args.push_to, exist_ok=True) + + prefix = os.environ.get(RUN_ID_ENV) or task + add = [(local, f"{prefix}/{Path(local).name}") for local in saved_paths] + api.batch_bucket_files(args.push_to, add=add) + + uploaded = [f"hf://buckets/{args.push_to}/{dest}" for _, dest in add] + return {"bucket_id": args.push_to, "uploaded": uploaded} + + +# --------------------------------------------------------------------------- +# Remote submission (HF Jobs) +# --------------------------------------------------------------------------- + + +def _build_task_kwargs(args: Namespace) -> dict[str, Any]: + """Pick out the kwargs the container should invoke the task with.""" + out: dict[str, Any] = {} + for key, value in vars(args).items(): + if key in HF_JOBS_KEYS or value is None or value is False: + continue + out[key] = value + return out + + +def _kwargs_to_argv(task: str, task_kwargs: dict[str, Any]) -> list[str]: + """Render ``task_kwargs`` as the argv list the container's argparse will see.""" + argv: list[str] = [task] + for key, value in task_kwargs.items(): + flag = "--" + key.replace("_", "-") + if value is True: + argv.append(flag) + elif isinstance(value, list): + for item in value: + argv.extend([flag, str(item)]) + else: + argv.extend([flag, str(value)]) + return argv + + +def _maybe_submit_remote(args: Namespace, task: str) -> bool: + """If ``--remote`` was set, submit this invocation to HF Jobs and return True.""" + if not args.remote: + return False + + print( + f"[diffusers-cli] preparing remote {task!r} job on flavor={args.flavor!r}...", + file=sys.stderr, + flush=True, + ) + + import shlex + import uuid + + from huggingface_hub import HfApi, get_token, run_job + + hf_token = args.token or get_token() + api = HfApi(token=hf_token) + + if not args.push_to: + args.push_to = f"{api.whoami()['name']}/jobs-artifacts" + + run_id = uuid.uuid4().hex[:12] + + task_kwargs = _build_task_kwargs(args) + dependencies = list(_DEFAULT_REMOTE_DEPS) + if args.dependencies: + dependencies.extend(args.dependencies) + + secrets = {"HF_TOKEN": hf_token} if hf_token else None + env = { + RUN_ID_ENV: run_id, + "HF_ENABLE_PARALLEL_LOADING": "1", # thread-pool the safetensors load step + } + + if Path(args.model).exists(): + print( + f"[diffusers-cli] WARNING: --model {args.model!r} is a local path; the container can't see it. " + "Pass a Hub repo id so the job can download it.", + file=sys.stderr, + flush=True, + ) + + # Build the in-container shell command: install the small Python deps into the + # image's system Python (where torch + CUDA already live) via ``uv pip install + # --system``, then exec the CLI with the same argv. --break-system-packages + # bypasses PEP 668; safe here because the container is ephemeral. + # For --context-parallel, wrap with torchrun so torch.distributed initializes + # across every visible GPU before our generate command runs. + install_cmd = shlex.join(["uv", "pip", "install", "--system", "--break-system-packages", *dependencies]) + cli_argv = _kwargs_to_argv(task, task_kwargs) + if args.context_parallel: + cli_argv = ["torchrun", "--nproc-per-node=gpu", "-m", "diffusers.commands.diffusers_cli", *cli_argv] + else: + cli_argv = [_CONTAINER_CLI_BINARY, *cli_argv] + cli_cmd = shlex.join(cli_argv) + container_cmd = ["sh", "-c", f"{install_cmd} && {cli_cmd}"] + + job = run_job( + image=_DEFAULT_REMOTE_IMAGE, + command=container_cmd, + flavor=args.flavor, + timeout=args.timeout, + namespace=args.namespace, + secrets=secrets, + env=env, + token=hf_token, + ) + + payload: dict[str, Any] = { + "task": "remote-submit", + "job_id": getattr(job, "id", None), + "job_status": str(getattr(job, "status", "")), + "flavor": args.flavor, + "push_to": args.push_to, + "run_id": run_id, + } + + if args.no_wait: + _format_result(payload) + return True + + print( + f"[diffusers-cli] submitted job {job.id} (run_id={run_id}); " + f"watch at {getattr(job, 'url', 'https://huggingface.co/jobs')}", + file=sys.stderr, + flush=True, + ) + final_status = _wait_for_job(api, job.id, args.namespace, args.poll_interval) + payload["job_status"] = final_status + payload["timing"] = _job_timing(api, job.id, args.namespace) + payload["outputs"] = _download_job_artifacts(api, args.push_to, run_id, args.output) + _format_result(payload) + return True + + +def _job_timing(api: Any, job_id: str, namespace: str | None) -> dict[str, float | None]: + """Return queue/run/total wallclock seconds for ``job_id`` from inspect_job timestamps. + + inspect_job sometimes returns finished_at=None for a few seconds after the container exits while HF Jobs propagates + the terminal state; retry briefly so we don't miss run/total. + """ + import time + + info = api.inspect_job(job_id=job_id, namespace=namespace) + for _ in range(5): + if info.finished_at is not None: + break + time.sleep(1.0) + info = api.inspect_job(job_id=job_id, namespace=namespace) + + def _delta(start, end) -> float | None: + return (end - start).total_seconds() if (start is not None and end is not None) else None + + timing = { + "queued_seconds": _delta(info.created_at, info.started_at), + "run_seconds": _delta(info.started_at, info.finished_at), + "total_seconds": _delta(info.created_at, info.finished_at), + } + parts = [f"{k.replace('_seconds', '')}={v:.1f}s" for k, v in timing.items() if v is not None] + if parts: + print(f"[diffusers-cli] timing: {' '.join(parts)}", file=sys.stderr, flush=True) + return timing + + +def _wait_for_job(api: Any, job_id: str, namespace: str | None, poll_interval: float) -> str: + """Stream container logs to stderr until the job terminates; return the final stage.""" + fetch = getattr(api, "fetch_job_logs", None) + if fetch is not None: + try: + for line in fetch(job_id=job_id, namespace=namespace, follow=True): + print(line, file=sys.stderr, flush=True) + except TypeError: + return _poll_for_job(api, job_id, namespace, poll_interval) + info = api.inspect_job(job_id=job_id, namespace=namespace) + return str(info.status.stage) if info.status else "UNKNOWN" + return _poll_for_job(api, job_id, namespace, poll_interval) + + +def _poll_for_job(api: Any, job_id: str, namespace: str | None, poll_interval: float) -> str: + """Heartbeat-style fallback when ``fetch_job_logs`` isn't available.""" + import time + + terminal = {"COMPLETED", "CANCELED", "ERROR", "DELETED"} + last_stage: str | None = None + while True: + info = api.inspect_job(job_id=job_id, namespace=namespace) + stage = str(info.status.stage) if info.status else "UNKNOWN" + if stage != last_stage: + if last_stage is not None: + print("", file=sys.stderr, flush=True) + print(f"[diffusers-cli] job {job_id}: {stage}", file=sys.stderr, flush=True) + last_stage = stage + else: + print(".", end="", file=sys.stderr, flush=True) + if stage in terminal: + print("", file=sys.stderr, flush=True) + return stage + time.sleep(poll_interval) + + +def _download_job_artifacts(api: Any, bucket_id: str, run_id: str, output: str | None) -> list[str]: + """Download every file under ``/`` from ``bucket_id`` into a local directory.""" + from huggingface_hub import BucketFile + + local_dir = Path(output) if output else Path(DEFAULT_OUTPUT_DIR) + local_dir.mkdir(parents=True, exist_ok=True) + + pairs: list[tuple[Any, Path]] = [] + for entry in api.list_bucket_tree(bucket_id, prefix=f"{run_id}/", recursive=True): + if not isinstance(entry, BucketFile): + continue + pairs.append((entry, local_dir / Path(entry.path).name)) + + if not pairs: + return [] + api.download_bucket_files(bucket_id, files=pairs) + return [str(local) for _, local in pairs] + + +# --------------------------------------------------------------------------- +# Result formatting +# --------------------------------------------------------------------------- + + +def _format_result(payload: dict[str, Any]) -> None: + """Route the result payload through the output sink.""" + out.result(payload.get("task", "done"), **payload) + + +# --------------------------------------------------------------------------- +# Subcommand +# --------------------------------------------------------------------------- + + +class GenerateCommand(BaseDiffusersCLICommand): + task = "generate" + + @staticmethod + def register_subcommand(subparsers: _SubParsersAction) -> None: + from argparse import RawDescriptionHelpFormatter + + epilog = ( + "Examples\n" + " $ diffusers-cli generate -m black-forest-labs/FLUX.1-dev --dtype bf16 \\\n" + ' --pipeline-kwargs \'{"prompt": "a cat on the moon"}\'\n' + " $ diffusers-cli generate -m black-forest-labs/FLUX.1-dev --dtype bf16 \\\n" + ' --pipeline-kwargs \'{"prompt": "make the fur grey", "image": "https://example.com/cat.png"}\'\n' + " $ diffusers-cli generate -m black-forest-labs/FLUX.1-dev --dtype bf16 \\\n" + ' --pipeline-kwargs \'{"prompt": "a tiny cat"}\' \\\n' + ' --lora \'{"lora_id": "alvdansen/littletinies", "lora_scale": 0.8}\'\n' + " $ diffusers-cli generate -m black-forest-labs/FLUX.1-dev --dtype bf16 \\\n" + ' --pipeline-kwargs \'{"prompt": "a cat"}\' --remote --flavor a100-large\n' + " $ diffusers-cli generate -m black-forest-labs/FLUX.1-dev --dtype bf16 --context-parallel \\\n" + ' --pipeline-kwargs \'{"prompt": "a cat"}\' --remote --flavor 4xa100-large\n' + "\n" + "Learn more\n" + " Use `diffusers-cli --help` for more information about a command.\n" + " Read the documentation at https://huggingface.co/docs/diffusers\n" + ) + + parser: ArgumentParser = subparsers.add_parser( + "generate", + help="Run any diffusers pipeline locally or remotely with HF Jobs.", + usage="\n diffusers-cli generate [options]", + epilog=epilog, + formatter_class=RawDescriptionHelpFormatter, + ) + parser._optionals.title = "Options" + _add_loading_arguments(parser) + _add_optimization_arguments(parser) + parser.add_argument( + "--pipeline-kwargs", + default=None, + help=( + "JSON object of kwargs passed to the pipeline call. String values at known " + f"image-input keys ({', '.join(_IMAGE_INPUT_KEYS)}) are auto-loaded as PIL images." + ), + ) + parser.add_argument( + "--output-key", + default=None, + help="For modular pipelines: name of the intermediate to extract (passed as `output=` to the call).", + ) + parser.add_argument("--seed", type=int, default=None, help="Random seed for reproducibility.") + parser.add_argument( + "--fps", + type=int, + default=8, + help="FPS used when the output happens to be a frame sequence.", + ) + parser.add_argument( + "--sampling-rate", + type=int, + default=None, + help="Sample rate used when the output happens to be an audio array.", + ) + _add_remote_arguments(parser) + _add_output_arguments(parser) + parser.set_defaults(func=GenerateCommand) + + def __init__(self, args: Namespace): + self.args = args + + def run(self) -> None: + is_modular = _is_modular_repo(self.args) + + if _maybe_submit_remote(self.args, self.task): + return + + pipeline = _load_pipeline(self.args, modular=is_modular) + + call_kwargs = _parse_pipeline_kwargs(self.args.pipeline_kwargs) + _resolve_image_inputs(call_kwargs) + + if self.args.output_key is not None: + call_kwargs["output"] = self.args.output_key + + device = pipeline.device.type if hasattr(pipeline, "device") else "cpu" + generator = _get_generator(self.args.seed, device) + if generator is not None: + call_kwargs["generator"] = generator + + try: + result = pipeline(**call_kwargs) + + # Under torchrun, ranks > 0 produce identical output to rank 0 (CP shards the + # transformer compute but ranks reduce to the same final tensors). Save/push/print + # from rank 0 only to avoid clobbering bucket files 4x and printing 4x. + if os.environ.get("RANK", "0") == "0": + savable = result if is_modular else _result_to_savable(result) + saved = _save_output(savable, self.args, self.task) + pushed = _push_outputs(self.args, saved, self.task) + + _format_result( + { + "task": self.task, + "model": self.args.model, + "device": device, + "pipeline_class": type(pipeline).__name__, + "modular": is_modular, + "outputs": saved, + "pushed": pushed, + "seed": self.args.seed, + "output_key": self.args.output_key, + }, + ) + finally: + import torch + + if torch.distributed.is_available() and torch.distributed.is_initialized(): + torch.distributed.destroy_process_group()