diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..9030888 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,5 @@ +{ + "enabledPlugins": { + "frontend-design@claude-plugins-official": true + } +} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 674f4d1..48e88e4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: steps: - uses: actions/checkout@v6 - - uses: dorny/paths-filter@v3 + - uses: dorny/paths-filter@v4 id: filter with: filters: | @@ -72,14 +72,14 @@ jobs: otp-version: ${{ env.OTP_VERSION }} - name: Restore dependencies cache - uses: actions/cache@v4 + uses: actions/cache@v6 with: path: durable/deps key: ${{ runner.os }}-durable-mix-${{ hashFiles('durable/mix.lock') }} restore-keys: ${{ runner.os }}-durable-mix- - name: Restore build cache - uses: actions/cache@v4 + uses: actions/cache@v6 with: path: durable/_build key: ${{ runner.os }}-durable-build-${{ env.OTP_VERSION }}-${{ env.ELIXIR_VERSION }}-${{ hashFiles('durable/mix.lock') }} @@ -120,26 +120,26 @@ jobs: otp-version: ${{ env.OTP_VERSION }} - name: Set up pnpm - uses: pnpm/action-setup@v4 + uses: pnpm/action-setup@v6 with: version: 9 - name: Set up Node - uses: actions/setup-node@v4 + uses: actions/setup-node@v6 with: node-version: "20" cache: pnpm cache-dependency-path: durable_dashboard/assets/pnpm-lock.yaml - name: Restore dependencies cache - uses: actions/cache@v4 + uses: actions/cache@v6 with: path: durable_dashboard/deps key: ${{ runner.os }}-dashboard-mix-${{ hashFiles('durable_dashboard/mix.lock', 'durable/mix.lock') }} restore-keys: ${{ runner.os }}-dashboard-mix- - name: Restore build cache - uses: actions/cache@v4 + uses: actions/cache@v6 with: path: durable_dashboard/_build key: ${{ runner.os }}-dashboard-build-${{ env.OTP_VERSION }}-${{ env.ELIXIR_VERSION }}-${{ hashFiles('durable_dashboard/mix.lock', 'durable/mix.lock') }} diff --git a/durable/README.md b/durable/README.md index 76ff249..9649a1f 100644 --- a/durable/README.md +++ b/durable/README.md @@ -544,13 +544,14 @@ The macro adds the dashboard's pipelines, asset routes, and live routes in-place — no `forward`, no extra endpoint. You get: - Overview with live status counts and recent executions -- Workflow list with filters, search, and pagination -- Workflow detail with summary, ReactFlow graph, topology, logs, I/O, - and child execution history +- Workflow catalog + an Executions list with a faceted filter bar (workflow, + status, time range, execution id) driven by shareable URL params +- Execution detail with Summary, ReactFlow graph, Timeline, Logs, I/O, and + History trace (plus a Family tab for parent/child runs) - Pending inputs queue - Schedules list with toggle / trigger actions - Settings view -- ⌘K command palette +- ⌘K command palette — jump to any page, workflow, or recent run See [`durable_dashboard/README.md`](durable_dashboard/) for the full option list and design notes. diff --git a/durable/lib/durable/config.ex b/durable/lib/durable/config.ex index fbf7f6d..0209988 100644 --- a/durable/lib/durable/config.ex +++ b/durable/lib/durable/config.ex @@ -45,6 +45,8 @@ defmodule Durable.Config do heartbeat_interval: pos_integer(), scheduled_modules: [module()], scheduler_interval: pos_integer(), + sleep_waker_interval: pos_integer(), + sleep_waker_batch_size: pos_integer(), log_level: false | :debug | :info | :warning | :error, pubsub: atom() | nil, owns_pubsub?: boolean() @@ -60,6 +62,8 @@ defmodule Durable.Config do :heartbeat_interval, :scheduled_modules, :scheduler_interval, + :sleep_waker_interval, + :sleep_waker_batch_size, :log_level, :pubsub, owns_pubsub?: false @@ -111,6 +115,18 @@ defmodule Durable.Config do default: 60_000, doc: "Milliseconds between scheduler polls for due schedules" ], + sleep_waker_interval: [ + type: :pos_integer, + default: 1_000, + doc: + "Milliseconds between sleep-waker sweeps that revive workflows " <> + "whose `sleep/1` or `schedule_at/1` wait has elapsed" + ], + sleep_waker_batch_size: [ + type: :pos_integer, + default: 100, + doc: "Maximum number of sleeping workflows to wake in a single sweep" + ], log_level: [ type: {:in, [false, :debug, :info, :warning, :error]}, default: false, diff --git a/durable/lib/durable/executor.ex b/durable/lib/durable/executor.ex index 29f0ee7..6b61c34 100644 --- a/durable/lib/durable/executor.ex +++ b/durable/lib/durable/executor.ex @@ -172,6 +172,7 @@ defmodule Durable.Executor do |> Ecto.Changeset.change( context: new_context, status: :pending, + scheduled_at: nil, locked_by: nil, locked_at: nil ) @@ -458,7 +459,7 @@ defmodule Durable.Executor do {:ok, execution} = execution - |> Ecto.Changeset.change(status: :waiting) + |> Ecto.Changeset.change(status: :waiting, scheduled_at: nil) |> Repo.update(config) {:waiting, execution} @@ -768,7 +769,8 @@ defmodule Durable.Executor do exec |> Ecto.Changeset.change( context: Map.merge(exec.context || %{}, parallel_context), - status: :waiting + status: :waiting, + scheduled_at: nil ) |> Repo.update(config) @@ -1150,12 +1152,36 @@ defmodule Durable.Executor do process_ctx |> Map.merge(data) |> sanitize_for_json() + |> drop_consumed_sleep_marker(execution.current_step) execution |> Ecto.Changeset.change(context: merged) |> Repo.update(config) end + # The SleepWaker stamps `__sleep_satisfied__ => ` into context so + # the woken step's `sleep/1`/`schedule_at/1` returns instead of re-throwing. + # Once that step completes we drop the marker — otherwise it lives in context + # forever and any later re-entry of the SAME step name (a loop/`each` body, or + # a `decision` goto that revisits it) would see the stale marker and silently + # skip its sleep. The marker is therefore a strict one-shot keyed to the step + # that consumed it. (Handles both atom and string key shapes since context can + # arrive atomized from the process dict or string-keyed from a step return.) + defp drop_consumed_sleep_marker(context, current_step) when is_binary(current_step) do + context + |> drop_marker_if_step(:__sleep_satisfied__, current_step) + |> drop_marker_if_step("__sleep_satisfied__", current_step) + end + + defp drop_consumed_sleep_marker(context, _current_step), do: context + + defp drop_marker_if_step(context, key, current_step) do + case Map.get(context, key) do + ^current_step -> Map.delete(context, key) + _ -> context + end + end + defp mark_completed(config, execution, final_data) do # Sanitize before persisting — the final step may return data containing # raw tuples (e.g., child executions in a parallel block complete with @@ -1169,7 +1195,7 @@ defmodule Durable.Executor do completed_at: DateTime.utc_now(), current_step: nil }) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) + |> Ecto.Changeset.change(locked_by: nil, locked_at: nil, scheduled_at: nil) |> Repo.update(config) DurablePubSub.broadcast_workflow(config, :workflow_completed, workflow_event(execution)) @@ -1194,7 +1220,7 @@ defmodule Durable.Executor do error: safe_error, completed_at: DateTime.utc_now() }) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) + |> Ecto.Changeset.change(locked_by: nil, locked_at: nil, scheduled_at: nil) |> Repo.update(config) rescue e -> @@ -1210,7 +1236,7 @@ defmodule Durable.Executor do error: fallback, completed_at: DateTime.utc_now() }) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) + |> Ecto.Changeset.change(locked_by: nil, locked_at: nil, scheduled_at: nil) |> Repo.update(config) end @@ -1285,7 +1311,13 @@ defmodule Durable.Executor do where: p.workflow_id == ^execution.parent_workflow_id and p.event_name == ^parallel_event_name and - p.status == :pending + p.status == :pending, + # `limit: 1` keeps `Repo.one` from RAISING if a duplicate parallel + # event ever exists (e.g. a re-run of fan_out before context + # committed). One pending event per child is the norm; defending the + # claim path against the pathological case avoids crashing the child's + # whole completion handler. + limit: 1 ) ) @@ -1296,36 +1328,89 @@ defmodule Durable.Executor do end end - # Notify parent via WaitGroup (parallel child completion) + # Notify parent via WaitGroup (parallel child completion). + # + # Three updates run inside one transaction: + # + # 1. Mark the child's PendingEvent as :received. + # 2. Lock the WaitGroup row (FOR UPDATE), merge the event into + # received_events, flip to :completed when the wait condition is + # satisfied. The lock is what closes the lost-update race that + # previously allowed concurrent siblings to overwrite each other. + # 3. If the group just transitioned to :completed, flip the parent + # :waiting -> :pending so the queue poller picks it up. + # + # If any stage fails the whole transaction rolls back, so the system + # never observes "event :received but wait group not updated" or "wait + # group complete but parent still :waiting" — the states zombie + # detection later misreads as a crash. defp notify_parallel_parent(config, execution, pending_event, status, data) do payload = Durable.Orchestration.build_result_payload(status, data) + parent_id = execution.parent_workflow_id - # Fulfill the pending event - {:ok, _} = - pending_event - |> PendingEvent.receive_changeset(payload) - |> Repo.update(config) + multi = + Ecto.Multi.new() + |> Ecto.Multi.update(:event, PendingEvent.receive_changeset(pending_event, payload)) + |> Ecto.Multi.run(:wait_group, fn repo, _ -> + WaitGroup.add_event_locked( + repo, + pending_event.wait_group_id, + pending_event.event_name, + payload + ) + end) + |> Ecto.Multi.run(:parent, fn repo, %{wait_group: wg_result} -> + if wg_result.just_completed do + resume_parent_in_multi(repo, parent_id, wg_result.wait_group.received_events) + else + {:ok, %{no_op: true}} + end + end) + + case Repo.transaction(config, multi) do + {:ok, _} -> + :ok - # Update the WaitGroup and resume parent if all children are done - maybe_complete_wait_group(config, pending_event, payload, execution.parent_workflow_id) + {:error, stage, reason, _} -> + Logger.error( + "[Durable] failed to atomically fulfill parallel child event + wait group + resume parent: " <> + "stage=#{stage} reason=#{inspect(reason)} parent=#{parent_id}" + ) - :ok + {:error, reason} + end end - defp maybe_complete_wait_group(_config, %{wait_group_id: nil}, _payload, _parent_id), do: :ok + # Resume a parent (or any waiting workflow) inside an Ecto.Multi.run. + # Mirrors the body of `resume_workflow/3` minus the standalone Repo.update, + # so the resume composes into a transaction with the upstream event / + # wait group updates. + @doc false + def resume_parent_in_multi(repo, workflow_id, additional_context) do + safe_context = sanitize_for_json(additional_context) - defp maybe_complete_wait_group(config, pending_event, payload, parent_id) do - wait_group = Repo.get(config, WaitGroup, pending_event.wait_group_id) + case repo.get(WorkflowExecution, workflow_id) do + nil -> + {:error, :workflow_not_found} - if wait_group && wait_group.status == :pending do - {:ok, updated_group} = - wait_group - |> WaitGroup.add_event_changeset(pending_event.event_name, payload) - |> Repo.update(config) + %WorkflowExecution{status: :waiting} = exec -> + new_context = Map.merge(exec.context || %{}, safe_context) - if updated_group.status == :completed do - resume_workflow(parent_id) - end + exec + |> Ecto.Changeset.change( + context: new_context, + status: :pending, + scheduled_at: nil, + locked_by: nil, + locked_at: nil + ) + |> repo.update() + + %WorkflowExecution{status: status} -> + # Already moved on (cancelled, completed, etc.). Tolerate; the + # upstream event/wait group state is still committed by the + # surrounding Multi. + {:ok, %{status: status, no_op: true}} end end @@ -1390,6 +1475,7 @@ defmodule Durable.Executor do |> Ecto.Changeset.change( context: new_context, status: :pending, + scheduled_at: nil, locked_by: nil, locked_at: nil ) @@ -1536,7 +1622,7 @@ defmodule Durable.Executor do {:ok, _exec} = exec |> WorkflowExecution.compensation_failed_changeset(results, original_error) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) + |> Ecto.Changeset.change(locked_by: nil, locked_at: nil, scheduled_at: nil) |> Repo.update(config) {:error, original_error} @@ -1545,7 +1631,12 @@ defmodule Durable.Executor do {:ok, _exec} = exec |> WorkflowExecution.compensated_changeset(results) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil, error: original_error) + |> Ecto.Changeset.change( + locked_by: nil, + locked_at: nil, + scheduled_at: nil, + error: original_error + ) |> Repo.update(config) {:error, original_error} @@ -1558,9 +1649,19 @@ defmodule Durable.Executor do defp handle_sleep(config, execution, opts) do wake_at = calculate_wake_time(opts) + # Clear the lock alongside the :waiting + scheduled_at update. The + # SleepWaker eventually flips this row back to :pending so the queue + # poller can re-claim it; if the lock weren't cleared here it would + # sit stale on the row until either stale-lock recovery (which only + # acts on :running) or the (already-fixed) zombie sweeper got to it. {:ok, execution} = execution - |> Ecto.Changeset.change(status: :waiting, scheduled_at: wake_at) + |> Ecto.Changeset.change( + status: :waiting, + scheduled_at: wake_at, + locked_by: nil, + locked_at: nil + ) |> Repo.update(config) {:waiting, execution} @@ -1587,7 +1688,7 @@ defmodule Durable.Executor do {:ok, execution} = execution - |> Ecto.Changeset.change(status: :waiting) + |> Ecto.Changeset.change(status: :waiting, scheduled_at: nil) |> Repo.update(config) {:waiting, execution} @@ -1618,7 +1719,7 @@ defmodule Durable.Executor do {:ok, execution} = execution - |> Ecto.Changeset.change(status: :waiting) + |> Ecto.Changeset.change(status: :waiting, scheduled_at: nil) |> Repo.update(config) DurablePubSub.broadcast_workflow(config, :workflow_waiting, workflow_event(execution)) @@ -1674,7 +1775,7 @@ defmodule Durable.Executor do {:ok, execution} = execution - |> Ecto.Changeset.change(status: :waiting) + |> Ecto.Changeset.change(status: :waiting, scheduled_at: nil) |> Repo.update(config) {:waiting, execution} diff --git a/durable/lib/durable/executor/backoff.ex b/durable/lib/durable/executor/backoff.ex index fecdb63..cf1dcde 100644 --- a/durable/lib/durable/executor/backoff.ex +++ b/durable/lib/durable/executor/backoff.ex @@ -18,7 +18,14 @@ defmodule Durable.Executor.Backoff do } @default_base 1_000 - @default_max_backoff 3_600_000 + # Backoff is currently an in-process `Process.sleep/1` that blocks the worker + # and holds the job lock for its duration. A multi-minute (let alone 1-hour) + # default would pin a queue concurrency slot and risk crossing the stale-lock + # timeout. Cap the default well under `stale_lock_timeout` (300s). Callers + # that genuinely want longer, durable backoff should set `max_backoff` + # explicitly — and ideally a future re-enqueue-based backoff will lift this + # ceiling without blocking a worker. + @default_max_backoff 30_000 @doc """ Calculates the delay before the next retry attempt. @@ -32,7 +39,9 @@ defmodule Durable.Executor.Backoff do ## Options - `:base` - Base delay in milliseconds (default: 1000) - - `:max_backoff` - Maximum delay in milliseconds (default: 3600000 = 1 hour) + - `:max_backoff` - Maximum delay in milliseconds (default: 30000 = 30s). + Kept low because the backoff blocks the worker in-process; raise it only + if your `stale_lock_timeout` comfortably exceeds the chosen ceiling. ## Examples @@ -83,15 +92,21 @@ defmodule Durable.Executor.Backoff do Backoff.calculate_with_jitter(:exponential, 1, %{base: 1000}) """ - @spec calculate_with_jitter(strategy(), pos_integer(), opts()) :: pos_integer() + @spec calculate_with_jitter(strategy(), pos_integer(), opts()) :: non_neg_integer() def calculate_with_jitter(strategy, attempt, opts \\ %{}) do base_delay = calculate(strategy, attempt, opts) - jitter_factor = 0.25 - jitter_range = trunc(base_delay * jitter_factor) - - # Add random jitter between -jitter_range and +jitter_range - jitter = :rand.uniform(jitter_range * 2) - jitter_range - max(0, base_delay + jitter) + jitter_range = trunc(base_delay * 0.25) + + # `:rand.uniform/1` requires a positive argument. For small delays (a base + # under ~4 ms, e.g. fast test retries) `jitter_range` truncates to 0 and + # `:rand.uniform(0)` would crash the worker mid-retry. Skip jitter when + # there's no meaningful range to jitter over. + if jitter_range <= 0 do + base_delay + else + jitter = :rand.uniform(jitter_range * 2) - jitter_range + max(0, base_delay + jitter) + end end @doc """ diff --git a/durable/lib/durable/executor/step_runner.ex b/durable/lib/durable/executor/step_runner.ex index 70f80fe..be4eed1 100644 --- a/durable/lib/durable/executor/step_runner.ex +++ b/durable/lib/durable/executor/step_runner.ex @@ -16,6 +16,8 @@ defmodule Durable.Executor.StepRunner do alias Durable.Repo alias Durable.Storage.Schemas.StepExecution + import Ecto.Query, only: [from: 2] + require Logger @type result :: @@ -44,7 +46,36 @@ defmodule Durable.Executor.StepRunner do @spec execute(Step.t(), map(), String.t(), Config.t()) :: result() def execute(%Step{} = step, data, workflow_id, %Config{} = config) do max_attempts = get_max_attempts(step) - execute_with_retry(step, data, workflow_id, 1, max_attempts, config) + start_attempt = next_attempt(config, workflow_id, step) + execute_with_retry(step, data, workflow_id, start_attempt, max_attempts, config) + end + + # Seed the retry counter from durable state instead of always starting at 1. + # + # The retry budget must survive a worker crash / stale-lock recovery: if a + # step is on attempt 3 of 5 when the node dies, the resumed run must continue + # at attempt 4 — not restart at 1 and re-run the side effect up to 5 more + # times (the previous behaviour silently exceeded max_attempts globally and + # contradicted the "durable/resumable" contract). + # + # We count `:failed` step_executions for (workflow_id, step_name), NOT the max + # attempt number: a `:waiting` row is a *suspension* (sleep/event), not a + # failed attempt, so it must not advance the retry counter. Each genuine retry + # writes exactly one `:failed` row, so `failed_count + 1` is the next attempt. + defp next_attempt(config, workflow_id, %Step{name: name}) do + step_name = Atom.to_string(name) + + failed_count = + Repo.one( + config, + from(s in StepExecution, + where: + s.workflow_id == ^workflow_id and s.step_name == ^step_name and s.status == :failed, + select: count(s.id) + ) + ) + + (failed_count || 0) + 1 end defp execute_with_retry(step, data, workflow_id, attempt, max_attempts, config) do @@ -154,7 +185,11 @@ defmodule Durable.Executor.StepRunner do :call_workflow ] do %{step_exec: step_exec, config: config} = ctx - {:ok, _} = update_step_execution(config, step_exec, :waiting) + # A call_workflow throw carries the spawned child id — record it on this + # step's row so step→child is queryable (the parent context map is the + # fallback, not the source of truth). + child_id = if wait_type == :call_workflow, do: opts[:child_id], else: nil + {:ok, _} = update_step_execution(config, step_exec, :waiting, child_id) {wait_type, opts} end @@ -263,9 +298,12 @@ defmodule Durable.Executor.StepRunner do end end - defp update_step_execution(config, step_exec, :waiting) do + defp update_step_execution(config, step_exec, :waiting, child_id) do + changes = + if child_id, do: [status: :waiting, child_workflow_id: child_id], else: [status: :waiting] + case step_exec - |> Ecto.Changeset.change(status: :waiting) + |> Ecto.Changeset.change(changes) |> Repo.update(config) do {:ok, updated} = ok -> DurablePubSub.broadcast_step(config, :step_waiting, step_event(updated)) diff --git a/durable/lib/durable/migration/migrations/v20260623000000_add_lock_fencing.ex b/durable/lib/durable/migration/migrations/v20260623000000_add_lock_fencing.ex new file mode 100644 index 0000000..6fd6105 --- /dev/null +++ b/durable/lib/durable/migration/migrations/v20260623000000_add_lock_fencing.ex @@ -0,0 +1,34 @@ +defmodule Durable.Migration.Migrations.V20260623000000AddLockFencing do + @moduledoc false + # Adds a per-claim fencing token to workflow_executions. + # + # Without it, a worker whose heartbeat stalls past `stale_lock_timeout` can be + # fenced out by stale-lock recovery: recovery releases the row, another worker + # re-claims it, and BOTH run the same workflow. `locked_by` is the node id + # (not unique per claim), so the original worker couldn't tell its claim had + # been superseded. `lock_token` is a fresh UUID stamped on every claim, so a + # worker can detect (via its heartbeat) that the row now carries a different + # token and abort, and a late ack/nack from a fenced worker becomes a no-op. + use Durable.Migration.Base + + @impl true + def version, do: 20_260_623_000_000 + + @impl true + def up(prefix) do + alter table(:workflow_executions, prefix: prefix) do + add_if_not_exists(:lock_token, :binary_id) + end + + :ok + end + + @impl true + def down(prefix) do + alter table(:workflow_executions, prefix: prefix) do + remove_if_exists(:lock_token, :binary_id) + end + + :ok + end +end diff --git a/durable/lib/durable/migration/migrations/v20260623000001_add_child_workflow_link.ex b/durable/lib/durable/migration/migrations/v20260623000001_add_child_workflow_link.ex new file mode 100644 index 0000000..db7dc84 --- /dev/null +++ b/durable/lib/durable/migration/migrations/v20260623000001_add_child_workflow_link.ex @@ -0,0 +1,36 @@ +defmodule Durable.Migration.Migrations.V20260623000001AddChildWorkflowLink do + @moduledoc false + # Adds a queryable step→child link (`step_executions.child_workflow_id`). + # + # A step that spawns a sub-workflow via `call_workflow`/`start_workflow` could + # previously only be tied back to its child WorkflowExecution by parsing the + # parent's `__call_children` JSONB context — mutable, lossy if the context is + # rewritten, and not queryable. This column records the spawned child id + # directly on the parent's step_execution row. + use Durable.Migration.Base + + @impl true + def version, do: 20_260_623_000_001 + + @impl true + def up(prefix) do + alter table(:step_executions, prefix: prefix) do + add_if_not_exists(:child_workflow_id, :binary_id) + end + + create_if_not_exists(index(:step_executions, [:child_workflow_id], prefix: prefix)) + + :ok + end + + @impl true + def down(prefix) do + drop_if_exists(index(:step_executions, [:child_workflow_id], prefix: prefix)) + + alter table(:step_executions, prefix: prefix) do + remove_if_exists(:child_workflow_id, :binary_id) + end + + :ok + end +end diff --git a/durable/lib/durable/migration/migrator.ex b/durable/lib/durable/migration/migrator.ex index 4139bc7..27cdcd9 100644 --- a/durable/lib/durable/migration/migrator.ex +++ b/durable/lib/durable/migration/migrator.ex @@ -15,7 +15,9 @@ defmodule Durable.Migration.Migrator do @migrations [ Durable.Migration.Migrations.V20260103000000InitialSchema, Durable.Migration.Migrations.V20260104000000AddWaitPrimitives, - Durable.Migration.Migrations.V20260413000000AddSchedulerResilience + Durable.Migration.Migrations.V20260413000000AddSchedulerResilience, + Durable.Migration.Migrations.V20260623000000AddLockFencing, + Durable.Migration.Migrations.V20260623000001AddChildWorkflowLink ] @doc """ diff --git a/durable/lib/durable/query.ex b/durable/lib/durable/query.ex index 4d688ff..b358d7c 100644 --- a/durable/lib/durable/query.ex +++ b/durable/lib/durable/query.ex @@ -158,6 +158,31 @@ defmodule Durable.Query do |> Enum.map(&execution_to_map(&1, false, false)) end + @doc """ + Returns a `%{parent_workflow_id => child_count}` map for the given parent + ids, in a single query. Used to render a "N children" drill-down affordance + on top-level run rows without an N+1. Parents with no children are omitted. + + ## Options + + - `:durable` - The Durable instance name (default: Durable) + """ + @spec child_counts([String.t()], keyword()) :: %{String.t() => non_neg_integer()} + def child_counts(parent_ids, opts \\ []) + def child_counts([], _opts), do: %{} + + def child_counts(parent_ids, opts) when is_list(parent_ids) do + config = get_config(opts) + + from(w in WorkflowExecution, + where: w.parent_workflow_id in ^parent_ids, + group_by: w.parent_workflow_id, + select: {w.parent_workflow_id, count(w.id)} + ) + |> then(&Repo.all(config, &1)) + |> Map.new() + end + @doc """ Returns workflow execution counts grouped by status. @@ -182,6 +207,92 @@ defmodule Durable.Query do |> Map.new() end + @doc """ + Lists distinct workflow definitions, derived from execution history. + + Returns one row per `{workflow_module, workflow_name}` pair that has + ever been executed, with aggregate stats. There is no compile-time + registry of definitions in the engine — the executor only resolves + modules by name on demand — so the dashboard derives the catalog from + the persisted executions. + + Each row contains: + * `:workflow_module` (string) + * `:workflow_name` (string) + * `:total_runs` (non_neg_integer) + * `:running_count` (non_neg_integer) + * `:waiting_count` (non_neg_integer) + * `:failed_count` (non_neg_integer) + * `:last_run_at` (DateTime.t() | nil) — most recent inserted_at + * `:last_status` (atom | nil) — status of the most recent run + + Sorted by `last_run_at DESC NULLS LAST`. + + ## Options + + - `:durable` - The Durable instance name (default: Durable) + """ + @spec list_workflows(keyword()) :: [map()] + def list_workflows(opts \\ []) do + config = get_config(opts) + + # Two queries instead of a window function: + # 1. Aggregate counts + max(inserted_at) per (module, name) + # 2. The status of the row that matched max(inserted_at) per pair + # Joining a window function would be cleaner but Ecto's window API + # is awkward and the row count here is "number of distinct + # workflows in this app" — typically tens, not millions. + aggregate_query = + from(w in WorkflowExecution, + # Top-level runs only. Parallel/`each`/`call_workflow` children + # inherit the parent's (module, name), so counting them here would + # inflate total_runs and the status counts by the fan-out width and + # could drive last_status off a child row. + where: is_nil(w.parent_workflow_id), + group_by: [w.workflow_module, w.workflow_name], + select: %{ + workflow_module: w.workflow_module, + workflow_name: w.workflow_name, + total_runs: count(w.id), + running_count: fragment("count(*) FILTER (WHERE ? = 'running')", w.status), + waiting_count: fragment("count(*) FILTER (WHERE ? = 'waiting')", w.status), + failed_count: fragment("count(*) FILTER (WHERE ? = 'failed')", w.status), + last_run_at: max(w.inserted_at) + } + ) + + aggregates = Repo.all(config, aggregate_query) + + aggregates + |> Enum.map(&attach_last_status(config, &1)) + |> sort_by_last_run_desc() + end + + defp sort_by_last_run_desc(rows) do + {with_dates, without} = Enum.split_with(rows, &(not is_nil(&1.last_run_at))) + Enum.sort_by(with_dates, & &1.last_run_at, {:desc, DateTime}) ++ without + end + + defp attach_last_status(_config, %{last_run_at: nil} = row) do + Map.put(row, :last_status, nil) + end + + defp attach_last_status(config, row) do + query = + from(w in WorkflowExecution, + where: + is_nil(w.parent_workflow_id) and + w.workflow_module == ^row.workflow_module and + w.workflow_name == ^row.workflow_name and + w.inserted_at == ^row.last_run_at, + select: w.status, + limit: 1 + ) + + last_status = Repo.one(config, query) + Map.put(row, :last_status, last_status) + end + @doc """ Lists workflow executions with total count for pagination. @@ -238,9 +349,35 @@ defmodule Durable.Query do {:workflow_name, name}, q -> from(w in q, where: w.workflow_name == ^name) + # Top-level runs only — exclude parallel/`each`/`call_workflow` children + # (which carry a non-nil parent_workflow_id and inherit the parent's + # workflow_name verbatim). Without this, run lists are flooded with + # indistinguishable child rows and per-definition counts are inflated by + # the fan-out width. Lists default to this; the detail page's Family tab + # and the per-parent drill-down surface the children. + {:top_level_only, true}, q -> + from(w in q, where: is_nil(w.parent_workflow_id)) + + # Drill-down: only the children of a specific parent execution. + {:parent_workflow_id, parent_id}, q when is_binary(parent_id) -> + from(w in q, where: w.parent_workflow_id == ^parent_id) + {:status, status}, q when is_atom(status) -> from(w in q, where: w.status == ^status) + {:status, statuses}, q when is_list(statuses) and statuses != [] -> + from(w in q, where: w.status in ^statuses) + + # Exact execution id. + {:id, id}, q when is_binary(id) and id != "" -> + from(w in q, where: w.id == ^id) + + # Id prefix — matches the short (8-char) id shown in lists or any leading + # slice of the full UUID. Cast the uuid to text so LIKE works. Callers + # must strip LIKE wildcards from user input (the dashboard does). + {:id_prefix, prefix}, q when is_binary(prefix) and prefix != "" -> + from(w in q, where: fragment("?::text LIKE ?", w.id, ^(prefix <> "%"))) + {:queue, queue}, q -> queue_str = to_string(queue) from(w in q, where: w.queue == ^queue_str) @@ -268,6 +405,7 @@ defmodule Durable.Query do context: execution.context, current_step: execution.current_step, error: execution.error, + parent_workflow_id: execution.parent_workflow_id, scheduled_at: execution.scheduled_at, started_at: execution.started_at, completed_at: execution.completed_at, @@ -298,7 +436,8 @@ defmodule Durable.Query do error: step.error, duration_ms: step.duration_ms, started_at: step.started_at, - completed_at: step.completed_at + completed_at: step.completed_at, + child_workflow_id: step.child_workflow_id } if include_logs do diff --git a/durable/lib/durable/queue/adapter.ex b/durable/lib/durable/queue/adapter.ex index 75d48e8..25f06ca 100644 --- a/durable/lib/durable/queue/adapter.ex +++ b/durable/lib/durable/queue/adapter.ex @@ -27,7 +27,12 @@ defmodule Durable.Queue.Adapter do input: map(), context: map(), scheduled_at: DateTime.t() | nil, - current_step: String.t() | nil + current_step: String.t() | nil, + # Per-claim fencing token stamped by `fetch_jobs`. Workers pass it back + # to `heartbeat`/`ack`/`nack` so a claim superseded by stale-lock + # recovery can be detected (heartbeat → `{:error, :fenced}`) and a + # late ack/nack from a fenced worker becomes a no-op. + lock_token: String.t() | nil } @doc """ @@ -121,7 +126,21 @@ defmodule Durable.Queue.Adapter do @callback recover_zombie_workflows(config :: Config.t(), timeout_seconds :: pos_integer()) :: {:ok, non_neg_integer()} | {:error, term()} - @optional_callbacks recover_zombie_workflows: 2 + @doc """ + Wakes workflows whose `sleep/1` or `schedule_at/1` wait has elapsed. + + Atomically transitions rows where `status = :waiting AND scheduled_at <= NOW()` + back to `:pending`, clears the lock, and merges a `__sleep_satisfied__` + marker into context so the step body's next `sleep`/`schedule_at` call + returns immediately instead of re-throwing. + + Returns the count of workflows woken. Optional so older adapters keep + working — when not implemented, the SleepWaker simply skips its sweep. + """ + @callback wake_sleeping_workflows(config :: Config.t(), batch_size :: pos_integer()) :: + {:ok, non_neg_integer()} | {:error, term()} + + @optional_callbacks recover_zombie_workflows: 2, wake_sleeping_workflows: 2 @doc """ Returns the default adapter module. diff --git a/durable/lib/durable/queue/adapters/postgres.ex b/durable/lib/durable/queue/adapters/postgres.ex index 5dd0754..fc0ff5e 100644 --- a/durable/lib/durable/queue/adapters/postgres.ex +++ b/durable/lib/durable/queue/adapters/postgres.ex @@ -35,9 +35,9 @@ defmodule Durable.Queue.Adapters.Postgres do FOR UPDATE SKIP LOCKED ) UPDATE #{prefix}.workflow_executions - SET locked_by = $3, locked_at = NOW(), status = 'running' + SET locked_by = $3, locked_at = NOW(), status = 'running', lock_token = gen_random_uuid() WHERE id IN (SELECT id FROM claimable) - RETURNING id, workflow_module, workflow_name, queue, priority, input, context, scheduled_at, current_step; + RETURNING id, workflow_module, workflow_name, queue, priority, input, context, scheduled_at, current_step, lock_token; """ case Repo.query(config, sql, [queue, limit, node_id]) do @@ -54,7 +54,15 @@ defmodule Durable.Queue.Adapters.Postgres do @impl true def ack(%Config{} = config, job_id) when is_binary(job_id) do - ack_with_retry(config, job_id, _attempt = 1, _max_attempts = 3) + ack_with_retry(config, job_id, _attempt = 1, _max_attempts = 3, nil) + end + + # Fencing-aware ack. When the caller supplies the `lock_token` it claimed + # with, an ack from a worker whose claim was already superseded (stale + # recovery + reclaim by another worker) is a no-op instead of stomping the + # new owner's lock. + def ack(%Config{} = config, job_id, lock_token) when is_binary(job_id) do + ack_with_retry(config, job_id, 1, 3, lock_token) end # Retry the ack on transient failures (DB blip, connection lost). If we @@ -62,83 +70,124 @@ defmodule Durable.Queue.Adapters.Postgres do # released it 5 minutes later — at which point it would silently re-execute # because there's no idempotency key (Bug M-5). The retry buys us time; # the telemetry surfaces persistent failures so operators can intervene. - defp ack_with_retry(%Config{} = config, job_id, attempt, max_attempts) do + defp ack_with_retry(%Config{} = config, job_id, attempt, max_attempts, lock_token) do case Repo.get(config, WorkflowExecution, job_id) do nil -> {:error, :not_found} execution -> - case execution - |> WorkflowExecution.unlock_changeset() - |> Repo.update(config) do - {:ok, _} -> - :ok - - {:error, _reason} when attempt < max_attempts -> - backoff_ms = :rand.uniform(50) * attempt - Process.sleep(backoff_ms) - ack_with_retry(config, job_id, attempt + 1, max_attempts) - - {:error, reason} -> - :telemetry.execute( - [:durable, :queue, :ack_failed], - %{count: 1, attempts: attempt}, - %{job_id: job_id, reason: reason, durable: config.name} - ) - - Logger.error( - "[Durable] ack failed after #{attempt} attempts for job #{job_id}: " <> - inspect(reason) - ) + ack_loaded(config, execution, job_id, attempt, max_attempts, lock_token) + end + end - {:error, reason} - end + defp ack_loaded(config, execution, job_id, attempt, max_attempts, lock_token) do + if fenced?(execution, lock_token) do + # Another worker owns this row now (stale recovery + reclaim); our ack + # would stomp their claim — no-op. + :ok + else + case execution + |> WorkflowExecution.unlock_changeset() + |> Repo.update(config) do + {:ok, _} -> + :ok + + {:error, _reason} when attempt < max_attempts -> + backoff_ms = :rand.uniform(50) * attempt + Process.sleep(backoff_ms) + ack_with_retry(config, job_id, attempt + 1, max_attempts, lock_token) + + {:error, reason} -> + :telemetry.execute( + [:durable, :queue, :ack_failed], + %{count: 1, attempts: attempt}, + %{job_id: job_id, reason: reason, durable: config.name} + ) + + Logger.error( + "[Durable] ack failed after #{attempt} attempts for job #{job_id}: " <> + inspect(reason) + ) + + {:error, reason} + end end end @impl true def nack(%Config{} = config, job_id, reason) when is_binary(job_id) do + do_nack(config, job_id, reason, nil) + end + + def nack(%Config{} = config, job_id, reason, lock_token) when is_binary(job_id) do + do_nack(config, job_id, reason, lock_token) + end + + defp do_nack(config, job_id, reason, lock_token) do case Repo.get(config, WorkflowExecution, job_id) do nil -> {:error, :not_found} execution -> - error = normalize_error(reason) + if fenced?(execution, lock_token) do + :ok + else + error = normalize_error(reason) - execution - |> Ecto.Changeset.change( - status: :failed, - error: error, - completed_at: DateTime.utc_now(), - locked_by: nil, - locked_at: nil - ) - |> Repo.update(config) + execution + |> Ecto.Changeset.change( + status: :failed, + error: error, + completed_at: DateTime.utc_now(), + locked_by: nil, + locked_at: nil, + lock_token: nil + ) + |> Repo.update(config) - :ok + :ok + end end end @impl true def reschedule(%Config{} = config, job_id, run_at) when is_binary(job_id) do + do_reschedule(config, job_id, run_at, nil) + end + + def reschedule(%Config{} = config, job_id, run_at, lock_token) when is_binary(job_id) do + do_reschedule(config, job_id, run_at, lock_token) + end + + defp do_reschedule(config, job_id, run_at, lock_token) do case Repo.get(config, WorkflowExecution, job_id) do nil -> {:error, :not_found} execution -> - execution - |> Ecto.Changeset.change( - status: :pending, - scheduled_at: run_at, - locked_by: nil, - locked_at: nil - ) - |> Repo.update(config) + if fenced?(execution, lock_token) do + :ok + else + execution + |> Ecto.Changeset.change( + status: :pending, + scheduled_at: run_at, + locked_by: nil, + locked_at: nil, + lock_token: nil + ) + |> Repo.update(config) - :ok + :ok + end end end + # A fenced row is one whose current lock_token differs from the token the + # caller claimed with. `nil` token = legacy/un-fenced caller → never fenced. + defp fenced?(_execution, nil), do: false + defp fenced?(%WorkflowExecution{lock_token: current}, token), do: current != token + @impl true def recover_stale_locks(%Config{} = config, timeout_seconds) when timeout_seconds > 0 do cutoff = DateTime.add(DateTime.utc_now(), -timeout_seconds, :second) @@ -157,7 +206,10 @@ defmodule Durable.Queue.Adapters.Postgres do set: [ status: :pending, locked_by: nil, - locked_at: nil + locked_at: nil, + # Clear the fencing token on release so the next claim stamps a fresh + # one and the fenced-out worker's token can never match again. + lock_token: nil ] ) @@ -184,6 +236,11 @@ defmodule Durable.Queue.Adapters.Postgres do where: w.status == :waiting, where: w.updated_at < ^cutoff, where: is_nil(w.locked_by) or w.locked_at < ^cutoff, + # Sleeping workflows (`sleep/1` / `schedule_at/1`) carry a non-nil + # scheduled_at and are revived by the SleepWaker, not the zombie + # sweeper. Excluding them here prevents a multi-minute sleep from + # being misclassified as a crash and marked :failed. + where: is_nil(w.scheduled_at), where: not exists( from(p in PendingInput, @@ -252,22 +309,118 @@ defmodule Durable.Queue.Adapters.Postgres do e -> {:error, Exception.message(e)} end + @impl true + def wake_sleeping_workflows(%Config{} = config, batch_size) + when is_integer(batch_size) and batch_size > 0 do + prefix = config.prefix + + # Atomically wake any rows whose sleep has elapsed: + # + # 1. Flip status :waiting -> :pending so the queue poller can claim them. + # 2. Clear the lock (set NULL) so a stale `locked_by` from the + # worker that originally suspended them doesn't keep them invisible + # to fetch_jobs after the wake. + # 3. Merge `__sleep_satisfied__: ` into context so the + # step body's next sleep/schedule_at call returns immediately + # instead of re-throwing. + # + # FOR UPDATE SKIP LOCKED keeps multiple SleepWakers (across nodes) + # from contending on the same rows; if a competitor is mid-update, + # we just skip and pick the row up next tick. + # `clock_timestamp()` rather than `NOW()` so the comparison reflects + # the *real* wall clock, not the transaction-start snapshot. This + # matters for two cases: (1) the SQL Sandbox-based test suite, where + # one transaction wraps the whole test and NOW() would be from + # before the row's scheduled_at was set; and (2) any future caller + # that wraps the sweep in a longer-running transaction. In normal + # production calls the two are essentially identical. + # `current_step IS NOT NULL` is a safety guard: the marker we stamp is + # `__sleep_satisfied__ => current_step`, and `Wait.sleep_satisfied?/0` + # only matches when that value equals the running step's name. A row with + # a NULL current_step would get an empty-string marker that never matches, + # so the woken step would re-throw its sleep and re-suspend every tick — a + # busy churn. A legitimately sleeping row always has current_step set + # (the executor stamps it before each step), so excluding NULL ones costs + # nothing and removes the churn failure mode. + # + # `scheduled_at = NULL` on wake keeps the invariant that scheduled_at is + # non-null ONLY while a row is genuinely sleeping — so a later + # event/input/wait-group suspend (which sets :waiting) can never be + # mistaken for an elapsed sleep and prematurely woken. + sql = """ + WITH wakeable AS ( + SELECT id FROM #{prefix}.workflow_executions + WHERE status = 'waiting' + AND scheduled_at IS NOT NULL + AND scheduled_at <= clock_timestamp() + AND current_step IS NOT NULL + ORDER BY scheduled_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE #{prefix}.workflow_executions w + SET status = 'pending', + scheduled_at = NULL, + locked_by = NULL, + locked_at = NULL, + context = jsonb_set( + COALESCE(w.context, '{}'::jsonb), + '{__sleep_satisfied__}', + to_jsonb(w.current_step), + true + ), + updated_at = clock_timestamp() + WHERE w.id IN (SELECT id FROM wakeable); + """ + + case Repo.query(config, sql, [batch_size]) do + {:ok, %{num_rows: count}} -> {:ok, count} + {:error, reason} -> {:error, reason} + end + end + @impl true def heartbeat(%Config{} = config, job_id) when is_binary(job_id) do + do_heartbeat(config, job_id, nil) + end + + # Fencing-aware heartbeat. When the worker passes the token it claimed with, + # a refresh that touches 0 rows is classified: `:fenced` means another worker + # now owns this row (running with a different token) and the caller should + # abort to avoid a double-execution; `:not_found` is a benign miss (the row + # completed/suspended/cancelled, or was released but not yet reclaimed). + def heartbeat(%Config{} = config, job_id, lock_token) when is_binary(job_id) do + do_heartbeat(config, job_id, lock_token) + end + + defp do_heartbeat(config, job_id, lock_token) do now = DateTime.utc_now() - query = + base = from(w in WorkflowExecution, where: w.id == ^job_id, where: w.status == :running ) - {count, _} = Repo.update_all(config, query, set: [locked_at: now]) + guarded = + if lock_token, do: from(w in base, where: w.lock_token == ^lock_token), else: base - if count == 1 do - :ok - else - {:error, :not_found} + {count, _} = Repo.update_all(config, guarded, set: [locked_at: now]) + + cond do + count == 1 -> :ok + is_nil(lock_token) -> {:error, :not_found} + true -> classify_lost_lock(config, job_id, lock_token) + end + end + + defp classify_lost_lock(config, job_id, lock_token) do + case Repo.get(config, WorkflowExecution, job_id) do + %WorkflowExecution{status: :running, lock_token: current} when current != lock_token -> + {:error, :fenced} + + _ -> + {:error, :not_found} end end @@ -330,7 +483,8 @@ defmodule Durable.Queue.Adapters.Postgres do input: decode_json(job.input), context: decode_json(job.context), scheduled_at: job.scheduled_at, - current_step: job.current_step + current_step: job.current_step, + lock_token: decode_uuid(job.lock_token) } end diff --git a/durable/lib/durable/queue/poller.ex b/durable/lib/durable/queue/poller.ex index 6ca5a28..b96b93e 100644 --- a/durable/lib/durable/queue/poller.ex +++ b/durable/lib/durable/queue/poller.ex @@ -28,6 +28,7 @@ defmodule Durable.Queue.Poller do :node_id, :active_jobs, :worker_refs, + :job_tokens, :paused, :timer_ref ] @@ -119,6 +120,7 @@ defmodule Durable.Queue.Poller do node_id: generate_node_id(), active_jobs: MapSet.new(), worker_refs: %{}, + job_tokens: %{}, paused: false, timer_ref: nil } @@ -216,7 +218,8 @@ defmodule Durable.Queue.Poller do state = %{ state | active_jobs: MapSet.delete(state.active_jobs, job_id), - worker_refs: Map.delete(state.worker_refs, ref) + worker_refs: Map.delete(state.worker_refs, ref), + job_tokens: Map.delete(state.job_tokens, job_id) } {:noreply, state} @@ -272,7 +275,8 @@ defmodule Durable.Queue.Poller do %{ state | active_jobs: MapSet.put(state.active_jobs, job.id), - worker_refs: Map.put(state.worker_refs, ref, job.id) + worker_refs: Map.put(state.worker_refs, ref, job.id), + job_tokens: Map.put(state.job_tokens, job.id, job[:lock_token]) } {:error, reason} -> @@ -283,18 +287,24 @@ defmodule Durable.Queue.Poller do defp handle_job_completion(state, job_id, result) do adapter = Adapter.default_adapter() + token = Map.get(state.job_tokens, job_id) case result do :ok -> - adapter.ack(state.config, job_id) + adapter.ack(state.config, job_id, token) :waiting -> # Job is waiting for sleep/event/input - don't ack, leave as-is # The executor already updated the status to :waiting :ok + :fenced -> + # The worker detected it was fenced out and aborted. The row now + # belongs to its new owner — don't ack/nack it, just clean up locally. + :ok + {:error, reason} -> - adapter.nack(state.config, job_id, reason) + adapter.nack(state.config, job_id, reason, token) end # Find and remove the monitor ref for this job @@ -309,7 +319,12 @@ defmodule Durable.Queue.Poller do if ref, do: Process.demonitor(ref, [:flush]) - %{state | active_jobs: MapSet.delete(state.active_jobs, job_id), worker_refs: worker_refs} + %{ + state + | active_jobs: MapSet.delete(state.active_jobs, job_id), + worker_refs: worker_refs, + job_tokens: Map.delete(state.job_tokens, job_id) + } end defp schedule_poll(state, delay) do diff --git a/durable/lib/durable/queue/worker.ex b/durable/lib/durable/queue/worker.ex index 288c2df..7eef977 100644 --- a/durable/lib/durable/queue/worker.ex +++ b/durable/lib/durable/queue/worker.ex @@ -20,7 +20,7 @@ defmodule Durable.Queue.Worker do alias Durable.Config alias Durable.Queue.Adapter - defstruct [:job, :config, :poller_pid, :started_at, :task_ref, :heartbeat_timer] + defstruct [:job, :config, :poller_pid, :started_at, :task_ref, :task_pid, :heartbeat_timer] @type t :: %__MODULE__{ job: map(), @@ -28,6 +28,7 @@ defmodule Durable.Queue.Worker do poller_pid: pid(), started_at: integer(), task_ref: reference() | nil, + task_pid: pid() | nil, heartbeat_timer: reference() | nil } @@ -73,28 +74,30 @@ defmodule Durable.Queue.Worker do # Start heartbeat timer timer = schedule_heartbeat(state.config.heartbeat_interval) - {:noreply, %{state | task_ref: task.ref, heartbeat_timer: timer}} + {:noreply, %{state | task_ref: task.ref, task_pid: task.pid, heartbeat_timer: timer}} end @impl true def handle_info(:heartbeat, state) do - # Send heartbeat to update locked_at + # Refresh the lock, passing the per-claim fencing token so a heartbeat can + # tell whether we've been fenced out by stale-lock recovery + reclaim. adapter = Adapter.default_adapter() - case adapter.heartbeat(state.config, state.job.id) do + case adapter.heartbeat(state.config, state.job.id, state.job[:lock_token]) do + {:error, :fenced} -> + self_fence(state) + :ok -> - :ok + emit_heartbeat_telemetry(state.job) + timer = schedule_heartbeat(state.config.heartbeat_interval) + {:noreply, %{state | heartbeat_timer: timer}} {:error, reason} -> Logger.warning("Heartbeat failed for job #{state.job.id}: #{inspect(reason)}") + emit_heartbeat_telemetry(state.job) + timer = schedule_heartbeat(state.config.heartbeat_interval) + {:noreply, %{state | heartbeat_timer: timer}} end - - # Emit telemetry - emit_heartbeat_telemetry(state.job) - - # Schedule next heartbeat - timer = schedule_heartbeat(state.config.heartbeat_interval) - {:noreply, %{state | heartbeat_timer: timer}} end # Task completed successfully @@ -170,6 +173,33 @@ defmodule Durable.Queue.Worker do }} end + # We've been fenced: stale-lock recovery released our claim and another worker + # re-claimed it. Abort the in-flight Task so we don't run the workflow to + # completion a second time (and stomp the new owner). Report `:fenced` to the + # poller so it cleans us up WITHOUT acking/nacking the row. + defp self_fence(state) do + Logger.error( + "[Durable] worker for job #{state.job.id} was fenced (lock reclaimed by another " <> + "worker after stale-lock recovery) — aborting to avoid a duplicate execution" + ) + + cancel_heartbeat(state.heartbeat_timer) + if state.task_ref, do: Process.demonitor(state.task_ref, [:flush]) + if state.task_pid && Process.alive?(state.task_pid), do: Process.exit(state.task_pid, :kill) + + duration_ms = System.monotonic_time(:millisecond) - state.started_at + + :telemetry.execute( + [:durable, :queue, :worker_fenced], + %{count: 1, duration_ms: duration_ms}, + %{job_id: state.job.id, queue: state.job.queue} + ) + + send(state.poller_pid, {:job_complete, state.job.id, :fenced, duration_ms}) + + {:stop, :normal, %{state | heartbeat_timer: nil}} + end + defp schedule_heartbeat(interval) do Process.send_after(self(), :heartbeat, interval) end diff --git a/durable/lib/durable/storage/schemas/step_execution.ex b/durable/lib/durable/storage/schemas/step_execution.ex index 58056a4..ee48bac 100644 --- a/durable/lib/durable/storage/schemas/step_execution.ex +++ b/durable/lib/durable/storage/schemas/step_execution.ex @@ -56,6 +56,11 @@ defmodule Durable.Storage.Schemas.StepExecution do field(:compensation_for, :string) field(:is_compensation, :boolean, default: false) + # Queryable parent→child link: the WorkflowExecution this step spawned via + # `call_workflow`/`start_workflow`. Lets step→child be resolved directly + # instead of parsing the parent's mutable `__call_children` JSONB context. + field(:child_workflow_id, :binary_id) + belongs_to(:workflow, Durable.Storage.Schemas.WorkflowExecution, foreign_key: :workflow_id) timestamps(type: :utc_datetime_usec) @@ -74,7 +79,8 @@ defmodule Durable.Storage.Schemas.StepExecution do :completed_at, :duration_ms, :compensation_for, - :is_compensation + :is_compensation, + :child_workflow_id ] @doc """ diff --git a/durable/lib/durable/storage/schemas/wait_group.ex b/durable/lib/durable/storage/schemas/wait_group.ex index fd45c21..67ec08a 100644 --- a/durable/lib/durable/storage/schemas/wait_group.ex +++ b/durable/lib/durable/storage/schemas/wait_group.ex @@ -7,6 +7,7 @@ defmodule Durable.Storage.Schemas.WaitGroup do use Ecto.Schema import Ecto.Changeset + import Ecto.Query @type wait_type :: :any | :all @@ -118,6 +119,46 @@ defmodule Durable.Storage.Schemas.WaitGroup do |> cast(changes, [:received_events, :status, :completed_at]) end + @doc """ + Locks the wait group row `FOR UPDATE`, merges the event into + `received_events`, and (when the wait condition is satisfied) flips + `status` to `:completed`. Must be called inside a transaction. + + Returns `{:ok, %{wait_group: updated, just_completed: boolean}}` on + success — `just_completed` is true iff this call transitioned the + group from `:pending` to `:completed`. Already-completed/timed-out + groups are treated as a no-op (`just_completed: false`) so late + arrivals don't double-resume the parent. + + Without the row lock, two concurrent callers can read the same + `received_events`, each merge in only their own event, and have the + later UPDATE silently overwrite the earlier one — leaving the group + permanently short an entry and the parent stuck in `:waiting`. + """ + def add_event_locked(repo, wait_group_id, event_name, payload) do + query = + from(w in __MODULE__, + where: w.id == ^wait_group_id, + lock: "FOR UPDATE" + ) + + case repo.one(query) do + nil -> + {:error, :not_found} + + %__MODULE__{status: :pending} = wait_group -> + with {:ok, updated} <- + wait_group + |> add_event_changeset(event_name, payload) + |> repo.update() do + {:ok, %{wait_group: updated, just_completed: updated.status == :completed}} + end + + %__MODULE__{} = wait_group -> + {:ok, %{wait_group: wait_group, just_completed: false}} + end + end + @doc """ Creates a changeset for timing out a wait group. """ diff --git a/durable/lib/durable/storage/schemas/workflow_execution.ex b/durable/lib/durable/storage/schemas/workflow_execution.ex index fedd307..81b75c2 100644 --- a/durable/lib/durable/storage/schemas/workflow_execution.ex +++ b/durable/lib/durable/storage/schemas/workflow_execution.ex @@ -77,6 +77,9 @@ defmodule Durable.Storage.Schemas.WorkflowExecution do field(:completed_at, :utc_datetime_usec) field(:locked_by, :string) field(:locked_at, :utc_datetime_usec) + # Per-claim fencing token (fresh UUID stamped on each claim). Lets a worker + # detect its claim was superseded by stale-lock recovery + reclaim. + field(:lock_token, :binary_id) # Compensation/Saga support field(:compensation_results, {:array, :map}, default: []) diff --git a/durable/lib/durable/supervisor.ex b/durable/lib/durable/supervisor.ex index 35bcf49..a356637 100644 --- a/durable/lib/durable/supervisor.ex +++ b/durable/lib/durable/supervisor.ex @@ -115,6 +115,7 @@ defmodule Durable.Supervisor do [ {Durable.Queue.Manager, config: config}, {Durable.Wait.TimeoutWorker, config: config}, + {Durable.Wait.SleepWaker, config: config}, {Durable.Scheduler, config: config, interval: config.scheduler_interval, diff --git a/durable/lib/durable/wait.ex b/durable/lib/durable/wait.ex index 8eacd2a..0b1804e 100644 --- a/durable/lib/durable/wait.ex +++ b/durable/lib/durable/wait.ex @@ -101,10 +101,27 @@ defmodule Durable.Wait do sleep(hours(2)) sleep(days(1)) + ## Resumption semantics + + Like `wait_for_event/2`, `sleep/1` is a resumption barrier: when the + step re-runs after the wake fires, the SleepWaker has merged a + `:__sleep_satisfied__` marker into context that this call recognises + and returns `nil` for instead of re-throwing. Any side effects + *before* `sleep/1` therefore run twice (once on suspend, again on + resume) — keep them idempotent or move them into a prior step. + + Multiple `sleep/1` calls inside one step body are not supported: the + marker is per-step, not per-call, so the second call on resume would + return immediately rather than wait. Use one wait per step. + """ @spec sleep(integer()) :: nil def sleep(duration_ms) when is_integer(duration_ms) do - throw({:sleep, duration_ms: duration_ms}) + if sleep_satisfied?() do + nil + else + throw({:sleep, duration_ms: duration_ms}) + end end @doc """ @@ -116,10 +133,40 @@ defmodule Durable.Wait do schedule_at(next_business_day(hour: 9)) schedule_at(next_weekday(:monday, hour: 9)) + See `sleep/1` for resumption semantics — they are identical. """ @spec schedule_at(DateTime.t()) :: nil def schedule_at(%DateTime{} = datetime) do - throw({:sleep, until: datetime}) + if sleep_satisfied?() do + nil + else + throw({:sleep, until: datetime}) + end + end + + # On resume after a sleep wake, the SleepWaker writes + # `:__sleep_satisfied__ => ""` into the workflow's context. + # When the step body re-runs, this guard makes `sleep`/`schedule_at` + # return `nil` instead of throwing again — so the rest of the step + # body runs, the step completes, and the workflow advances. + # + # The marker is keyed by step name (not by call site), so it survives + # exactly until the user step finishes. Subsequent sleep steps get + # their own marker written by the next wake. + defp sleep_satisfied? do + context = Process.get(:durable_context, %{}) + step = Process.get(:durable_current_step) + + cond do + Map.get(context, :__cancelled__) == true -> + true + + step != nil and Map.get(context, :__sleep_satisfied__) == Atom.to_string(step) -> + true + + true -> + false + end end # ============================================================================ @@ -605,42 +652,93 @@ defmodule Durable.Wait do # Sanitize at the API boundary — same rationale as provide_input/4. safe_payload = Durable.Executor.sanitize_for_json(payload) - with {:ok, pending_event} <- find_pending_event(config, workflow_id, event_name), - {:ok, _} <- receive_pending_event(config, pending_event, safe_payload), - {:ok, _} <- maybe_resume_workflow(config, workflow_id, event_name, safe_payload, opts) do - :ok + with {:ok, pending_event} <- find_pending_event(config, workflow_id, event_name) do + # PendingEvent.payload is typed `:map`. Non-map payloads (e.g. an event + # sent with just a string identifier) get wrapped under "value" so the + # cast always succeeds. The wait group / resume context use the + # unwrapped payload so user code reading `received_events` sees the + # original shape it sent. + storable_payload = + if is_map(safe_payload), do: safe_payload, else: %{"value" => safe_payload} + + multi = + build_send_event_multi( + workflow_id, + event_name, + pending_event, + storable_payload, + safe_payload + ) + + case Repo.transaction(config, multi) do + {:ok, _} -> + :ok + + {:error, stage, reason, _} -> + require Logger + + Logger.error( + "[Durable] failed to atomically fulfill event + (optional wait_group) + resume: " <> + "stage=#{stage} reason=#{inspect(reason)} workflow=#{workflow_id} event=#{event_name}" + ) + + {:error, reason} + end end end - defp maybe_resume_workflow(config, workflow_id, event_name, payload, opts) do - # Check if this is part of a wait group - case find_wait_group_for_event(config, workflow_id, event_name) do - {:ok, wait_group} -> - # Update the wait group with the received event - handle_wait_group_event(config, wait_group, event_name, payload, opts) + # Single Ecto.Multi covering: + # + # 1. PendingEvent -> :received + # 2. If the event belongs to a wait group: lock + merge into received_events, + # flip to :completed when satisfied (closes the lost-update race). + # 3. Resume the workflow (`:waiting` -> `:pending`) when the wait condition + # is satisfied — either the single-event case, or the wait group just + # transitioned to :completed. + # + # All three steps share the same transaction so a crash anywhere can't leave + # us with the "event :received but parent stuck :waiting" state that zombie + # detection misreads as a workflow crash. + defp build_send_event_multi( + workflow_id, + event_name, + pending_event, + storable_payload, + resume_payload + ) do + Ecto.Multi.new() + |> Ecto.Multi.update( + :event, + PendingEvent.receive_changeset(pending_event, storable_payload) + ) + |> Ecto.Multi.run(:resume, fn repo, _ -> + resume_after_event(repo, workflow_id, event_name, pending_event, resume_payload) + end) + end + + defp resume_after_event(repo, workflow_id, event_name, pending_event, payload) do + case pending_event.wait_group_id do + nil -> + # Plain wait_for_event — no wait group, resume immediately. + Durable.Executor.resume_parent_in_multi(repo, workflow_id, %{event_name => payload}) - {:error, :not_found} -> - # Single event - resume immediately - event_data = %{event_name => payload} - Durable.Executor.resume_workflow(workflow_id, event_data, opts) + wait_group_id -> + merge_wait_group_and_maybe_resume(repo, wait_group_id, event_name, payload) end end - defp handle_wait_group_event(config, wait_group, event_name, payload, opts) do - {:ok, updated_group} = - wait_group - |> WaitGroup.add_event_changeset(event_name, payload) - |> Repo.update(config) - - if updated_group.status == :completed do - # All required events received - resume workflow - Durable.Executor.resume_workflow( - updated_group.workflow_id, - updated_group.received_events, - opts - ) - else - {:ok, updated_group} + defp merge_wait_group_and_maybe_resume(repo, wait_group_id, event_name, payload) do + with {:ok, wg_result} <- + WaitGroup.add_event_locked(repo, wait_group_id, event_name, payload) do + if wg_result.just_completed do + Durable.Executor.resume_parent_in_multi( + repo, + wg_result.wait_group.workflow_id, + wg_result.wait_group.received_events + ) + else + {:ok, %{no_op: true, wait_group: wg_result.wait_group}} + end end end @@ -810,21 +908,6 @@ defmodule Durable.Wait do end end - defp find_wait_group_for_event(config, workflow_id, event_name) do - query = - from(w in WaitGroup, - where: - w.workflow_id == ^workflow_id and - ^event_name in w.event_names and - w.status == :pending - ) - - case Repo.one(config, query) do - nil -> {:error, :not_found} - wait_group -> {:ok, wait_group} - end - end - defp complete_pending_input(config, pending, response) do # PendingInput.response is typed `:map`. Non-map responses (a single # string from wait_for_choice/wait_for_text, an approval atom, etc.) @@ -846,23 +929,6 @@ defmodule Durable.Wait do |> Repo.update(config) end - defp receive_pending_event(config, pending_event, payload) do - # PendingEvent.payload is typed `:map`. Same wrap pattern as - # complete_pending_input/3 — non-map payloads (e.g. an event sent with - # just a string identifier) get stored under `"value"` so the cast - # always succeeds and the workflow resume isn't blocked. - storable_payload = - if is_map(payload) do - payload - else - %{"value" => payload} - end - - pending_event - |> PendingEvent.receive_changeset(storable_payload) - |> Repo.update(config) - end - defp get_waiting_execution(config, workflow_id) do case Repo.get(config, WorkflowExecution, workflow_id) do nil -> {:error, :not_found} diff --git a/durable/lib/durable/wait/sleep_waker.ex b/durable/lib/durable/wait/sleep_waker.ex new file mode 100644 index 0000000..cce4ba5 --- /dev/null +++ b/durable/lib/durable/wait/sleep_waker.ex @@ -0,0 +1,133 @@ +defmodule Durable.Wait.SleepWaker do + @moduledoc """ + Background worker that revives workflows whose `sleep/1` or + `schedule_at/1` wait has elapsed. + + When a step body calls `sleep` or `schedule_at`, the executor flips the + workflow to `:waiting` and stamps `scheduled_at`. This worker + periodically asks the queue adapter to flip any such row back to + `:pending` once `scheduled_at <= NOW()`, so the queue poller can + re-claim it. The adapter also merges a `:__sleep_satisfied__` marker + into the workflow's context so the step body's next sleep call + returns immediately rather than re-throwing. + + Polls every `:sleep_waker_interval` milliseconds (default 1_000) and + wakes up to `:sleep_waker_batch_size` workflows per tick (default + 100) — both configurable via `Durable.Config`. + """ + + use GenServer + + require Logger + + alias Durable.Queue.Adapter + + defstruct [:config, :interval, :batch_size] + + @doc """ + Starts the sleep waker. + + ## Options + + - `:config` - The Durable config (required) + - `:interval` - Override the config's `:sleep_waker_interval` (optional) + - `:batch_size` - Override the config's `:sleep_waker_batch_size` (optional) + """ + def start_link(opts) do + config = Keyword.fetch!(opts, :config) + interval = Keyword.get(opts, :interval, config.sleep_waker_interval) + batch_size = Keyword.get(opts, :batch_size, config.sleep_waker_batch_size) + + GenServer.start_link( + __MODULE__, + %__MODULE__{config: config, interval: interval, batch_size: batch_size}, + name: worker_name(config.name) + ) + end + + @doc """ + Returns the process name for a given Durable instance. + """ + def worker_name(durable_name) do + Module.concat([durable_name, Wait, SleepWaker]) + end + + @doc """ + Manually triggers a sweep, returning `{:ok, woken_count}`. + + Useful in tests and operational tooling — the periodic timer keeps + ticking independently. + """ + def wake_now(durable_name \\ Durable) do + GenServer.call(worker_name(durable_name), :wake_now) + end + + # GenServer callbacks + + @impl true + def init(state) do + schedule_tick(state.interval) + + Logger.info( + "[Durable] sleep waker started for #{inspect(state.config.name)}, " <> + "interval=#{state.interval}ms, batch_size=#{state.batch_size}" + ) + + {:ok, state} + end + + @impl true + def handle_call(:wake_now, _from, state) do + {:reply, do_wake(state), state} + end + + @impl true + def handle_info(:tick, state) do + do_wake(state) + schedule_tick(state.interval) + {:noreply, state} + end + + # Private + + defp do_wake(%__MODULE__{config: config, batch_size: batch_size}) do + adapter = Adapter.default_adapter() + + if function_exported?(adapter, :wake_sleeping_workflows, 2) do + case adapter.wake_sleeping_workflows(config, batch_size) do + {:ok, 0} = ok -> + ok + + {:ok, count} = ok -> + Logger.debug( + "[Durable] sleep waker woke #{count} workflow(s) for #{inspect(config.name)}" + ) + + emit_telemetry(count, config.name) + ok + + {:error, reason} = err -> + Logger.error( + "[Durable] sleep waker sweep failed for #{inspect(config.name)}: " <> + inspect(reason) + ) + + err + end + else + {:ok, 0} + end + end + + defp schedule_tick(interval) do + Process.send_after(self(), :tick, interval) + end + + defp emit_telemetry(count, durable_name) do + :telemetry.execute( + [:durable, :wait, :sleep_woken], + %{count: count}, + %{durable: durable_name} + ) + end +end diff --git a/durable/lib/durable/wait/timeout_worker.ex b/durable/lib/durable/wait/timeout_worker.ex index 347f560..0addcd1 100644 --- a/durable/lib/durable/wait/timeout_worker.ex +++ b/durable/lib/durable/wait/timeout_worker.ex @@ -214,6 +214,8 @@ defmodule Durable.Wait.TimeoutWorker do "in workflow #{pending_event.workflow_id}" ) + maybe_cancel_timed_out_child(config, pending_event.event_name) + {:error, stage, reason, _changes} -> Logger.error( "Failed event timeout transaction for #{pending_event.workflow_id}: " <> @@ -222,6 +224,27 @@ defmodule Durable.Wait.TimeoutWorker do end end + # A `call_workflow` parent waits on a PendingEvent named + # `__child_done:`. When that wait times out the parent resumes with + # its `timeout_value`, but the child keeps running — wasted work whose result + # is then silently dropped (and which could mutate shared state the parent + # has moved past). Cancel the abandoned child so the timeout actually stops + # it. Best-effort: a child that already finished/cancelled is a harmless no-op. + defp maybe_cancel_timed_out_child(config, "__child_done:" <> child_id) + when byte_size(child_id) > 0 do + Logger.warning( + "[Durable] call_workflow wait timed out — cancelling abandoned child #{child_id}" + ) + + Executor.cancel_workflow(child_id, "parent_call_timeout", durable: config.name) + rescue + e -> + Logger.warning("[Durable] failed to cancel timed-out child #{child_id}: #{inspect(e)}") + :ok + end + + defp maybe_cancel_timed_out_child(_config, _event_name), do: :ok + # Atomically: persist the pending row's :timeout transition AND flip the # owning workflow back to :pending with the timeout payload merged into # context. If either step fails the entire transaction rolls back, so the diff --git a/durable/test/durable/integration/concurrency_test.exs b/durable/test/durable/integration/concurrency_test.exs new file mode 100644 index 0000000..c60de26 --- /dev/null +++ b/durable/test/durable/integration/concurrency_test.exs @@ -0,0 +1,156 @@ +defmodule Durable.Integration.ConcurrencyTest do + @moduledoc """ + Real-concurrency coverage that the shared SQL Sandbox cannot provide. + + Every other DB test runs inside ONE sandboxed connection wrapped in a single + transaction, so two "concurrent" operations actually serialize and the + locking primitives (`FOR UPDATE SKIP LOCKED`, `FOR UPDATE` row locks) are + never contended — a regression that dropped the locks would still pass. These + tests use `Sandbox.unboxed_run/2` to run against REAL, committed connections + on separate backends, so the locks are genuinely exercised. + + They commit + truncate, so they're tagged `:integration` and excluded from the + default suite. Run with: `mix test --only integration`. + """ + use ExUnit.Case, async: false + + @moduletag :integration + + alias Durable.Config + alias Durable.Queue.Adapters.Postgres + alias Durable.Storage.Schemas.{PendingEvent, WaitGroup, WorkflowExecution} + alias Ecto.Adapters.SQL.Sandbox + + @repo Durable.TestRepo + + setup do + start_supervised!({Durable, repo: @repo, queue_enabled: false, pubsub: :start}) + truncate!() + on_exit(&truncate!/0) + %{config: Config.get(Durable)} + end + + defp truncate! do + Sandbox.unboxed_run(@repo, fn -> + @repo.query!( + "TRUNCATE durable.workflow_executions, durable.scheduled_workflows RESTART IDENTITY CASCADE" + ) + end) + end + + defp committed(fun), do: Sandbox.unboxed_run(@repo, fun) + + describe "fetch_jobs/4 — FOR UPDATE SKIP LOCKED across real backends" do + test "two workers claiming concurrently never double-claim a job", %{config: config} do + committed(fn -> + for i <- 1..40 do + @repo.insert!(%WorkflowExecution{ + workflow_module: "Elixir.IntegrationWorkflow", + workflow_name: "claim_#{i}", + status: :pending, + queue: "default", + priority: 0, + input: %{}, + context: %{} + }) + end + end) + + claim = fn node -> + committed(fn -> + config |> Postgres.fetch_jobs("default", 30, node) |> Enum.map(& &1.id) + end) + end + + [a, b] = + [ + Task.async(fn -> claim.("node_a") end), + Task.async(fn -> claim.("node_b") end) + ] + |> Task.await_many(15_000) + + overlap = MapSet.intersection(MapSet.new(a), MapSet.new(b)) + + assert MapSet.size(overlap) == 0, + "two backends claimed the same job(s): #{inspect(MapSet.to_list(overlap))}" + + # Every job is claimed exactly once across the two workers. + assert length(a) + length(b) == 40 + assert MapSet.size(MapSet.new(a ++ b)) == 40 + end + end + + describe "WaitGroup.add_event_locked/4 — FOR UPDATE under real contention" do + test "concurrent sibling completions never lose an event", %{config: _config} do + # A wait group expecting 8 events, plus its pending event rows. + event_names = for i <- 1..8, do: "evt_#{i}" + + {wf_id, wg_id} = + committed(fn -> + wf = + @repo.insert!(%WorkflowExecution{ + workflow_module: "Elixir.IntegrationWorkflow", + workflow_name: "wg_parent", + status: :waiting, + queue: "default", + priority: 0, + input: %{}, + context: %{} + }) + + wg = + %WaitGroup{} + |> WaitGroup.changeset(%{ + workflow_id: wf.id, + step_name: "fan", + wait_type: :all, + event_names: event_names + }) + |> @repo.insert!() + + for name <- event_names do + %PendingEvent{} + |> PendingEvent.changeset(%{ + workflow_id: wf.id, + event_name: name, + step_name: "fan", + wait_group_id: wg.id, + wait_type: :all + }) + |> @repo.insert!() + end + + {wf.id, wg.id} + end) + + _ = wf_id + + # 8 backends each merge a distinct event concurrently. Each runs inside a + # transaction (as production does via the executor's Ecto.Multi) so the + # FOR UPDATE row lock is held across the read-modify-write on + # received_events and serializes the siblings — without it, concurrent + # merges overwrite each other and events are lost. + event_names + |> Enum.map(fn name -> + Task.async(fn -> + committed(fn -> + @repo.transaction(fn -> + WaitGroup.add_event_locked(@repo, wg_id, name, %{"ok" => true}) + end) + end) + end) + end) + |> Task.await_many(15_000) + + received = committed(fn -> @repo.get!(WaitGroup, wg_id).received_events end) + + assert map_size(received) == 8, + "lost an event under contention: #{inspect(Map.keys(received))}" + + assert Enum.sort(Map.keys(received)) == Enum.sort(event_names) + + # Exactly one task observed the group transition to :completed. + assert committed(fn -> @repo.get!(WaitGroup, wg_id).status end) == :completed + end + end +end diff --git a/durable/test/durable/orchestration_test.exs b/durable/test/durable/orchestration_test.exs index f7a68fc..d393636 100644 --- a/durable/test/durable/orchestration_test.exs +++ b/durable/test/durable/orchestration_test.exs @@ -13,7 +13,7 @@ defmodule Durable.OrchestrationTest do alias Durable.Config alias Durable.Executor - alias Durable.Storage.Schemas.{PendingEvent, WorkflowExecution} + alias Durable.Storage.Schemas.{PendingEvent, StepExecution, WorkflowExecution} import Ecto.Query @@ -65,6 +65,30 @@ defmodule Durable.OrchestrationTest do assert parent.context["after_call"] == true end + test "records the spawned child id on the calling step (queryable parent→child link)" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + assert parent.status == :waiting + + child_id = parent.context["__child:simple_child_workflow"] + assert child_id != nil + + # The parent's :call_child step row links directly to the spawned child — + # no need to parse the __call_children JSONB context. + linked = + repo.all( + from(s in StepExecution, + where: s.workflow_id == ^parent.id and not is_nil(s.child_workflow_id) + ) + ) + + assert [step] = linked + assert step.step_name == "call_child" + assert step.child_workflow_id == child_id + end + test "parent calls child, child fails, parent gets error" do config = Config.get(Durable) repo = config.repo diff --git a/durable/test/durable/parallel_race_test.exs b/durable/test/durable/parallel_race_test.exs new file mode 100644 index 0000000..2fb3377 --- /dev/null +++ b/durable/test/durable/parallel_race_test.exs @@ -0,0 +1,228 @@ +defmodule Durable.ParallelRaceTest do + @moduledoc """ + Regression tests for the parallel-children completion path. Each test + exercises a state shape the production incident actually exhibited: + + * Multiple children of an `each` / parallel step completing + concurrently can race on the shared WaitGroup row. Without + row-level locking, the read-modify-write on `received_events` + drops one of the children's payloads, the wait group never + reaches `:completed`, the parent never resumes, and stale-lock + recovery later marks the parent as a zombie. + * Each `__parallel_done` PendingEvent must transition to `:received` + AND the WaitGroup must record the event AND (when the wait is + satisfied) the parent must flip `:waiting` -> `:pending`. All + three updates run inside a single transaction so a crash mid-way + can't leave a half-applied state behind. + """ + + use Durable.DataCase, async: false + + alias Durable.Config + alias Durable.Executor + alias Durable.Storage.Schemas.{PendingEvent, WaitGroup, WorkflowExecution} + + import Ecto.Query + + describe "WaitGroup.add_event_locked/4" do + test "sequentially merges every event and only completes once the wait condition is satisfied" do + config = Config.get(Durable) + repo = config.repo + + {:ok, workflow} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(%{ + workflow_module: "Test.Module", + workflow_name: "test", + status: :waiting, + queue: "default", + priority: 0, + input: %{}, + context: %{} + }) + |> repo.insert() + + event_names = ["a", "b", "c"] + + {:ok, wait_group} = + %WaitGroup{} + |> WaitGroup.changeset(%{ + workflow_id: workflow.id, + step_name: "fan_out", + wait_type: :all, + event_names: event_names + }) + |> repo.insert() + + # First two events: still :pending. + Enum.each(["a", "b"], fn name -> + repo.transaction(fn -> + {:ok, result} = + WaitGroup.add_event_locked(repo, wait_group.id, name, %{"name" => name}) + + refute result.just_completed + assert result.wait_group.status == :pending + end) + end) + + # Third event flips the wait group to :completed exactly once. + repo.transaction(fn -> + {:ok, result} = WaitGroup.add_event_locked(repo, wait_group.id, "c", %{"name" => "c"}) + + assert result.just_completed + assert result.wait_group.status == :completed + end) + + reloaded = repo.get!(WaitGroup, wait_group.id) + assert reloaded.status == :completed + + assert Map.keys(reloaded.received_events) |> Enum.sort() == event_names + end + + test "late arrivals after :completed are an idempotent no-op" do + config = Config.get(Durable) + repo = config.repo + + {:ok, workflow} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(%{ + workflow_module: "Test.Module", + workflow_name: "test", + status: :waiting, + queue: "default", + priority: 0, + input: %{}, + context: %{} + }) + |> repo.insert() + + {:ok, wait_group} = + %WaitGroup{} + |> WaitGroup.changeset(%{ + workflow_id: workflow.id, + step_name: "any_branch", + wait_type: :any, + event_names: ["a", "b"] + }) + |> repo.insert() + + # First event satisfies the :any wait. + repo.transaction(fn -> + {:ok, result} = WaitGroup.add_event_locked(repo, wait_group.id, "a", %{"first" => true}) + assert result.just_completed + end) + + # Late "b" arrival doesn't grow received_events and doesn't mark + # itself as just_completed (which would re-resume the parent). + repo.transaction(fn -> + {:ok, result} = WaitGroup.add_event_locked(repo, wait_group.id, "b", %{"second" => true}) + + refute result.just_completed + assert result.wait_group.status == :completed + refute Map.has_key?(result.wait_group.received_events, "b") + end) + + reloaded = repo.get!(WaitGroup, wait_group.id) + assert Map.keys(reloaded.received_events) == ["a"] + end + end + + describe "parallel children completing concurrently" do + test "all children's events land in received_events and parent resumes" do + # 5 concurrently-completing children is enough for the lost-update + # race in the old code to almost always drop at least one event; + # the new code locks the wait_group row, so every event ends up in + # received_events regardless of execution order. + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FiveWayParallelWorkflow, %{}) + assert parent.status == :waiting + + children = list_children(repo, parent.id) + assert length(children) == 5 + + # Drive all children concurrently. With the sandbox in shared mode + # the spawned tasks serialize on the test connection, but the test + # still exercises the full multi-step transaction for each child + # and asserts the merged final state. + children + |> Task.async_stream( + fn child -> Executor.execute_workflow(child.id, config) end, + max_concurrency: 5, + ordered: false, + timeout: 10_000 + ) + |> Stream.run() + + [wait_group] = repo.all(from(w in WaitGroup, where: w.workflow_id == ^parent.id)) + assert wait_group.status == :completed + assert map_size(wait_group.received_events) == 5 + + pending_events = repo.all(from(p in PendingEvent, where: p.workflow_id == ^parent.id)) + assert length(pending_events) == 5 + assert Enum.all?(pending_events, &(&1.status == :received)) + + reloaded = repo.get!(WorkflowExecution, parent.id) + assert reloaded.status == :pending + + Executor.execute_workflow(reloaded.id, config) + finalized = repo.get!(WorkflowExecution, parent.id) + assert finalized.status == :completed + end + end + + # ============================================================================ + # Helpers + # ============================================================================ + + defp create_and_execute_workflow(module, input) do + config = Config.get(Durable) + repo = config.repo + {:ok, workflow_def} = module.__default_workflow__() + + {:ok, execution} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(%{ + workflow_module: Atom.to_string(module), + workflow_name: workflow_def.name, + status: :pending, + queue: "default", + priority: 0, + input: input, + context: %{} + }) + |> repo.insert() + + Executor.execute_workflow(execution.id, config) + {:ok, repo.get!(WorkflowExecution, execution.id)} + end + + defp list_children(repo, parent_id) do + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_id, + order_by: [asc: :inserted_at] + ) + ) + end +end + +defmodule FiveWayParallelWorkflow do + use Durable + use Durable.Helpers + + workflow "five_way_parallel" do + step(:setup, fn data -> {:ok, assign(data, :initialized, true)} end) + + parallel do + step(:branch_1, fn data -> {:ok, assign(data, :b1, true)} end) + step(:branch_2, fn data -> {:ok, assign(data, :b2, true)} end) + step(:branch_3, fn data -> {:ok, assign(data, :b3, true)} end) + step(:branch_4, fn data -> {:ok, assign(data, :b4, true)} end) + step(:branch_5, fn data -> {:ok, assign(data, :b5, true)} end) + end + + step(:done, fn data -> {:ok, assign(data, :done, true)} end) + end +end diff --git a/durable/test/durable/query_test.exs b/durable/test/durable/query_test.exs new file mode 100644 index 0000000..f899e65 --- /dev/null +++ b/durable/test/durable/query_test.exs @@ -0,0 +1,184 @@ +defmodule Durable.QueryTest do + @moduledoc """ + Covers the parent/child-aware query surface used by the dashboard: + + * `top_level_only` / `parent_workflow_id` filters + * `list_workflows/1` catalog excluding children (no fan-out double-count) + * `child_counts/2` + * `parent_workflow_id` exposed on the execution map + """ + use Durable.DataCase, async: false + + alias Durable.Config + alias Durable.Query + alias Durable.Storage.Schemas.WorkflowExecution + + setup do + config = Config.get(Durable) + %{config: config, repo: config.repo} + end + + defp insert_exec(repo, attrs) do + defaults = %{ + workflow_module: "Elixir.MyApp.SomeWorkflow", + workflow_name: "some_workflow", + status: :completed, + queue: "default", + priority: 0, + input: %{}, + context: %{} + } + + %WorkflowExecution{} + |> Ecto.Changeset.change(Map.merge(defaults, Map.new(attrs))) + |> repo.insert!() + end + + describe "top_level_only / parent_workflow_id filters" do + test "top_level_only excludes children; parent_workflow_id selects them", %{repo: repo} do + parent = insert_exec(repo, %{workflow_name: "fan_out", status: :completed}) + _c1 = insert_exec(repo, %{workflow_name: "fan_out", parent_workflow_id: parent.id}) + _c2 = insert_exec(repo, %{workflow_name: "fan_out", parent_workflow_id: parent.id}) + other = insert_exec(repo, %{workflow_name: "lonely", status: :running}) + + top = Query.list_executions(top_level_only: true) + top_ids = MapSet.new(top, & &1.id) + + assert MapSet.member?(top_ids, parent.id) + assert MapSet.member?(top_ids, other.id) + refute Enum.any?(top, &(&1.parent_workflow_id != nil)) + + kids = Query.list_executions(parent_workflow_id: parent.id) + assert length(kids) == 2 + assert Enum.all?(kids, &(&1.parent_workflow_id == parent.id)) + end + + test "list_executions_with_total respects top_level_only in both list and count", %{ + repo: repo + } do + parent = insert_exec(repo, %{workflow_name: "p"}) + insert_exec(repo, %{workflow_name: "p", parent_workflow_id: parent.id}) + insert_exec(repo, %{workflow_name: "p", parent_workflow_id: parent.id}) + + {rows, total} = Query.list_executions_with_total(top_level_only: true) + + assert total == Enum.count(rows) + assert Enum.all?(rows, &is_nil(&1.parent_workflow_id)) + end + end + + describe "list_workflows/1 catalog" do + test "counts only top-level runs — fan-out children don't inflate totals", %{repo: repo} do + parent = insert_exec(repo, %{workflow_name: "fan_out", workflow_module: "Elixir.FanOut"}) + + # 5 children inheriting the parent's (module, name). + for _ <- 1..5 do + insert_exec(repo, %{ + workflow_name: "fan_out", + workflow_module: "Elixir.FanOut", + parent_workflow_id: parent.id + }) + end + + row = Enum.find(Query.list_workflows(), &(&1.workflow_name == "fan_out")) + + assert row, "fan_out definition should appear in the catalog" + # Exactly one top-level run, not 6. + assert row.total_runs == 1 + end + end + + describe "child_counts/2" do + test "returns parent_id => count, omitting childless parents", %{repo: repo} do + p1 = insert_exec(repo, %{}) + p2 = insert_exec(repo, %{}) + p3 = insert_exec(repo, %{}) + insert_exec(repo, %{parent_workflow_id: p1.id}) + insert_exec(repo, %{parent_workflow_id: p1.id}) + insert_exec(repo, %{parent_workflow_id: p2.id}) + + counts = Query.child_counts([p1.id, p2.id, p3.id]) + + assert counts[p1.id] == 2 + assert counts[p2.id] == 1 + refute Map.has_key?(counts, p3.id) + end + + test "empty parent list short-circuits to %{}" do + assert Query.child_counts([]) == %{} + end + end + + describe "execution map" do + test "exposes parent_workflow_id", %{repo: repo} do + parent = insert_exec(repo, %{}) + child = insert_exec(repo, %{parent_workflow_id: parent.id}) + + {:ok, map} = Query.get_execution(child.id) + assert map.parent_workflow_id == parent.id + + {:ok, parent_map} = Query.get_execution(parent.id) + assert parent_map.parent_workflow_id == nil + end + end + + describe "id / id_prefix / status-list / time filters" do + test "id selects an exact execution", %{repo: repo} do + a = insert_exec(repo, %{workflow_name: "a"}) + _b = insert_exec(repo, %{workflow_name: "b"}) + + rows = Query.list_executions(id: a.id) + + assert Enum.map(rows, & &1.id) == [a.id] + end + + test "id_prefix matches the leading slice of the uuid", %{repo: repo} do + a = insert_exec(repo, %{workflow_name: "a"}) + _b = insert_exec(repo, %{workflow_name: "b"}) + prefix = String.slice(a.id, 0, 8) + + rows = Query.list_executions(id_prefix: prefix) + + assert Enum.any?(rows, &(&1.id == a.id)) + assert Enum.all?(rows, &String.starts_with?(&1.id, prefix)) + end + + test "status accepts a list and matches any (IN)", %{repo: repo} do + f = insert_exec(repo, %{status: :failed}) + r = insert_exec(repo, %{status: :running}) + _c = insert_exec(repo, %{status: :completed}) + + rows = Query.list_executions(status: [:failed, :running]) + ids = MapSet.new(rows, & &1.id) + + assert MapSet.member?(ids, f.id) + assert MapSet.member?(ids, r.id) + refute Enum.any?(rows, &(&1.status == :completed)) + end + + test "single status atom still filters", %{repo: repo} do + f = insert_exec(repo, %{status: :failed}) + _c = insert_exec(repo, %{status: :completed}) + + rows = Query.list_executions(status: :failed) + + assert Enum.any?(rows, &(&1.id == f.id)) + assert Enum.all?(rows, &(&1.status == :failed)) + end + + test "from bounds results by inserted_at", %{repo: repo} do + now = DateTime.utc_now() + + old = + insert_exec(repo, %{workflow_name: "old", inserted_at: DateTime.add(now, -3_600, :second)}) + + recent = insert_exec(repo, %{workflow_name: "recent"}) + + rows = Query.list_executions(from: DateTime.add(now, -60, :second)) + ids = MapSet.new(rows, & &1.id) + + assert MapSet.member?(ids, recent.id) + refute MapSet.member?(ids, old.id) + end + end +end diff --git a/durable/test/durable/queue/adapters/postgres_test.exs b/durable/test/durable/queue/adapters/postgres_test.exs index 698a85a..c2fce64 100644 --- a/durable/test/durable/queue/adapters/postgres_test.exs +++ b/durable/test/durable/queue/adapters/postgres_test.exs @@ -484,6 +484,88 @@ defmodule Durable.Queue.Adapters.PostgresTest do end end + describe "lock fencing" do + test "fetch_jobs stamps a unique lock_token per claim, persisted on the row" do + insert_execution(workflow_name: "fence_a") + insert_execution(workflow_name: "fence_b") + + jobs = Postgres.fetch_jobs(config(), "default", 2, "node_a") + assert length(jobs) == 2 + + tokens = Enum.map(jobs, & &1.lock_token) + assert Enum.all?(tokens, &is_binary/1) + assert tokens == Enum.uniq(tokens) + + for job <- jobs do + assert repo().get!(WorkflowExecution, job.id).lock_token == job.lock_token + end + end + + test "heartbeat: matching token refreshes; stale token is :fenced; no token is legacy-ok" do + insert_execution(workflow_name: "fence_hb") + [job] = Postgres.fetch_jobs(config(), "default", 1, "node_a") + + assert :ok = Postgres.heartbeat(config(), job.id, job.lock_token) + assert :ok = Postgres.heartbeat(config(), job.id) + + # The row is :running with token A; a worker holding a different token has + # been superseded → :fenced so it can abort instead of double-executing. + assert {:error, :fenced} = Postgres.heartbeat(config(), job.id, Ecto.UUID.generate()) + end + + test "a finished row reports :not_found, not :fenced (no spurious abort on completion)" do + insert_execution(workflow_name: "fence_done") + [job] = Postgres.fetch_jobs(config(), "default", 1, "node_a") + + repo().get!(WorkflowExecution, job.id) + |> Ecto.Changeset.change(status: :completed) + |> repo().update!() + + assert {:error, :not_found} = Postgres.heartbeat(config(), job.id, job.lock_token) + end + + test "ack with a stale token is a no-op; the real owner's ack releases the row" do + insert_execution(workflow_name: "fence_ack") + [job] = Postgres.fetch_jobs(config(), "default", 1, "node_a") + + assert :ok = Postgres.ack(config(), job.id, Ecto.UUID.generate()) + still = repo().get!(WorkflowExecution, job.id) + assert still.status == :running + assert still.locked_by == "node_a" + + assert :ok = Postgres.ack(config(), job.id, job.lock_token) + assert repo().get!(WorkflowExecution, job.id).locked_by == nil + end + + test "nack with a stale token is a no-op" do + insert_execution(workflow_name: "fence_nack") + [job] = Postgres.fetch_jobs(config(), "default", 1, "node_a") + + assert :ok = Postgres.nack(config(), job.id, %{message: "boom"}, Ecto.UUID.generate()) + assert repo().get!(WorkflowExecution, job.id).status == :running + + assert :ok = Postgres.nack(config(), job.id, %{message: "boom"}, job.lock_token) + assert repo().get!(WorkflowExecution, job.id).status == :failed + end + + test "recover_stale_locks clears the lock_token so the fenced token can't re-match" do + insert_execution(workflow_name: "fence_recover") + [job] = Postgres.fetch_jobs(config(), "default", 1, "node_a") + assert is_binary(job.lock_token) + + repo().get!(WorkflowExecution, job.id) + |> Ecto.Changeset.change(locked_at: DateTime.add(DateTime.utc_now(), -600, :second)) + |> repo().update!() + + {:ok, n} = Postgres.recover_stale_locks(config(), 300) + assert n >= 1 + + recovered = repo().get!(WorkflowExecution, job.id) + assert recovered.status == :pending + assert recovered.lock_token == nil + end + end + describe "get_stats/2" do test "returns queue statistics" do insert_execution(workflow_name: "pending1", status: :pending) @@ -505,6 +587,133 @@ defmodule Durable.Queue.Adapters.PostgresTest do # Helper functions + describe "wake_sleeping_workflows/2" do + test "flips elapsed sleeps back to :pending and writes the satisfied marker" do + past = DateTime.add(DateTime.utc_now(), -60, :second) + + sleeper = + insert_execution( + workflow_name: "sleeper", + status: :waiting, + scheduled_at: past, + current_step: "wait_step", + locked_by: "stale_node", + locked_at: DateTime.add(DateTime.utc_now(), -120, :second) + ) + + {:ok, count} = Postgres.wake_sleeping_workflows(config(), 100) + assert count == 1 + + reloaded = repo().get!(WorkflowExecution, sleeper.id) + assert reloaded.status == :pending + assert reloaded.locked_by == nil + assert reloaded.locked_at == nil + assert reloaded.context["__sleep_satisfied__"] == "wait_step" + end + + test "leaves :waiting rows whose scheduled_at is still in the future" do + future = DateTime.add(DateTime.utc_now(), 3600, :second) + + sleeper = + insert_execution( + workflow_name: "future_sleeper", + status: :waiting, + scheduled_at: future, + current_step: "wait_step" + ) + + {:ok, count} = Postgres.wake_sleeping_workflows(config(), 100) + assert count == 0 + + reloaded = repo().get!(WorkflowExecution, sleeper.id) + assert reloaded.status == :waiting + end + + test "leaves :waiting rows with no scheduled_at (event/input waits)" do + waiter = + insert_execution( + workflow_name: "event_waiter", + status: :waiting, + current_step: "await" + ) + + {:ok, count} = Postgres.wake_sleeping_workflows(config(), 100) + assert count == 0 + + reloaded = repo().get!(WorkflowExecution, waiter.id) + assert reloaded.status == :waiting + end + + test "preserves prior context keys when merging the marker" do + past = DateTime.add(DateTime.utc_now(), -60, :second) + + sleeper = + insert_execution( + workflow_name: "rich_sleeper", + status: :waiting, + scheduled_at: past, + current_step: "wait_step" + ) + + {1, _} = + repo().update_all( + from(w in WorkflowExecution, where: w.id == ^sleeper.id), + set: [context: %{"customer_email" => "alice@example.com"}] + ) + + {:ok, _count} = Postgres.wake_sleeping_workflows(config(), 100) + + reloaded = repo().get!(WorkflowExecution, sleeper.id) + assert reloaded.context["customer_email"] == "alice@example.com" + assert reloaded.context["__sleep_satisfied__"] == "wait_step" + end + + test "respects batch_size cap" do + past = DateTime.add(DateTime.utc_now(), -60, :second) + + for i <- 1..3 do + insert_execution( + workflow_name: "batched_#{i}", + status: :waiting, + scheduled_at: past, + current_step: "wait" + ) + end + + {:ok, count} = Postgres.wake_sleeping_workflows(config(), 2) + assert count == 2 + end + end + + describe "recover_zombie_workflows/2 — sleeping workflows are not zombies" do + test "leaves :waiting rows whose scheduled_at is set, even when stale" do + long_ago = DateTime.add(DateTime.utc_now(), -3600, :second) + # An ostensibly "stuck" sleeper: stale updated_at, no pending + # input/event, no lock — would be flagged a zombie by the old + # logic. Now exempt because scheduled_at is non-nil; the + # SleepWaker is the right component to deal with it. + sleeper = + insert_execution( + workflow_name: "long_sleeper", + status: :waiting, + scheduled_at: DateTime.add(DateTime.utc_now(), 3600, :second), + current_step: "wait_step" + ) + + {1, _} = + repo().update_all( + from(w in WorkflowExecution, where: w.id == ^sleeper.id), + set: [updated_at: long_ago] + ) + + {:ok, count} = Postgres.recover_zombie_workflows(config(), 300) + assert count == 0 + + reloaded = repo().get!(WorkflowExecution, sleeper.id) + assert reloaded.status == :waiting + end + end + defp insert_execution(opts) do attrs = %{ workflow_module: "TestWorkflow", @@ -515,6 +724,7 @@ defmodule Durable.Queue.Adapters.PostgresTest do input: %{}, context: %{}, scheduled_at: Keyword.get(opts, :scheduled_at), + current_step: Keyword.get(opts, :current_step), locked_by: Keyword.get(opts, :locked_by), locked_at: Keyword.get(opts, :locked_at) } diff --git a/durable/test/durable/retry_test.exs b/durable/test/durable/retry_test.exs new file mode 100644 index 0000000..27736ca --- /dev/null +++ b/durable/test/durable/retry_test.exs @@ -0,0 +1,128 @@ +defmodule Durable.RetryTest do + @moduledoc """ + Retry semantics, with emphasis on the *durable* budget: a step's retry + count must survive a worker crash / stale-lock recovery rather than + restarting at attempt 1 and re-running side effects past max_attempts. + """ + use Durable.DataCase, async: false + + alias Durable.Config + alias Durable.Storage.Schemas.{StepExecution, WorkflowExecution} + + import Durable.DataCase, only: [create_and_execute_workflow: 2] + import Ecto.Query + + defp failed_attempts(repo, workflow_id) do + repo.all( + from(s in StepExecution, + where: s.workflow_id == ^workflow_id and s.status == :failed, + order_by: [asc: s.attempt], + select: s.attempt + ) + ) + end + + defp seed_failed_attempt!(repo, workflow_id, step_name, attempt) do + %StepExecution{} + |> Ecto.Changeset.change(%{ + workflow_id: workflow_id, + step_name: step_name, + step_type: "step", + attempt: attempt, + status: :failed, + error: %{"type" => "error", "message" => "prior crash"} + }) + |> repo.insert!() + end + + test "the in-process retry loop still drives a flaky step to success" do + config = Config.get(Durable) + repo = config.repo + + {:ok, execution} = create_and_execute_workflow(FlakyOnceWorkflow, %{"fail_times" => 2}) + + assert execution.status == :completed + + # 2 failed attempts + 1 completed = 3 rows; attempts 1,2 failed, 3 ok. + assert failed_attempts(repo, execution.id) == [1, 2] + end + + test "retry budget is durable: resume continues at the next attempt, not 1" do + config = Config.get(Durable) + repo = config.repo + + # A run that crashed mid-retry: 2 failed attempts already recorded, and + # current_step still points at the failing step. + {:ok, execution} = + %WorkflowExecution{} + |> Ecto.Changeset.change(%{ + workflow_module: Atom.to_string(AlwaysFailsWorkflow), + workflow_name: "always_fails_wf", + status: :pending, + queue: "default", + priority: 0, + input: %{}, + context: %{}, + current_step: "always_fails" + }) + |> repo.insert() + + seed_failed_attempt!(repo, execution.id, "always_fails", 1) + seed_failed_attempt!(repo, execution.id, "always_fails", 2) + + Durable.Executor.execute_workflow(execution.id, config) + + # max_attempts is 3. With the budget seeded from the 2 prior failures the + # resume runs ONLY attempt 3 and then terminally fails — it does NOT + # restart at 1 and burn attempts 1,2,3 again (which the old behaviour did, + # yielding [1,2,1,2,3] and 5 total invocations). + assert failed_attempts(repo, execution.id) == [1, 2, 3] + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :failed + end +end + +defmodule FlakyOnceWorkflow do + use Durable + use Durable.Helpers + use Durable.Context + + # Fails the first `fail_times` attempts (tracked per-attempt via the durable + # step_executions row count), then succeeds — exercises the in-process loop. + workflow "flaky_once_wf" do + step(:flaky, [retry: [max_attempts: 5, backoff: :constant, base: 1]], fn data -> + fail_times = data["fail_times"] || 0 + attempt = Durable.RetryTestHelper.attempt_for(:flaky) + + if attempt <= fail_times do + {:error, %{reason: "transient", attempt: attempt}} + else + {:ok, assign(data, :done, true)} + end + end) + end +end + +defmodule AlwaysFailsWorkflow do + use Durable + use Durable.Helpers + + workflow "always_fails_wf" do + step(:always_fails, [retry: [max_attempts: 3, backoff: :constant, base: 1]], fn _data -> + {:error, %{reason: "permanent"}} + end) + end +end + +defmodule Durable.RetryTestHelper do + @moduledoc false + # Per-step in-memory attempt counter for FlakyOnceWorkflow. The workflow + # body can't see the runner's attempt number directly, so we count calls. + def attempt_for(step) do + key = {__MODULE__, step} + n = (Process.get(key) || 0) + 1 + Process.put(key, n) + n + end +end diff --git a/durable/test/durable/wait/timeout_worker_integration_test.exs b/durable/test/durable/wait/timeout_worker_integration_test.exs index 57fe84a..74129d6 100644 --- a/durable/test/durable/wait/timeout_worker_integration_test.exs +++ b/durable/test/durable/wait/timeout_worker_integration_test.exs @@ -93,6 +93,55 @@ defmodule Durable.Wait.TimeoutWorkerIntegrationTest do end end + describe "call_workflow child timeout" do + test "cancels the abandoned child when the parent's call_workflow wait times out" do + config = Config.get(Durable) + repo = config.repo + + # A running child whose orchestration parent is waiting on its completion. + child = insert_exec(repo, :running) + parent = insert_exec(repo, :waiting) + + # Parent's call_workflow wait, already past its timeout. + %PendingEvent{} + |> Ecto.Changeset.change(%{ + workflow_id: parent.id, + event_name: "__child_done:#{child.id}", + step_name: "charge_customer", + status: :pending, + wait_type: :single, + timeout_at: DateTime.add(DateTime.utc_now(), -1, :second), + timeout_value: %{"__atom__" => "child_timeout"} + }) + |> repo.insert!() + + TimeoutWorker.check_timeouts(Durable) + _ = :sys.get_state(TimeoutWorker.worker_name(Durable)) + + # The abandoned child is cancelled synchronously within the sweep, so by + # the fence it must be :cancelled — previously it was left running, its + # eventual result silently dropped. + assert repo.get!(WorkflowExecution, child.id).status == :cancelled + end + end + + defp insert_exec(repo, status) do + %WorkflowExecution{} + |> Ecto.Changeset.change(%{ + # A non-resolvable module is fine: the child is never executed (only + # cancelled) and the resumed parent fails module resolution cleanly + # rather than affecting this assertion. + workflow_module: "Elixir.NoSuchModuleForTimeoutTest", + workflow_name: "orphan_test", + status: status, + queue: "default", + priority: 0, + input: %{}, + context: %{} + }) + |> repo.insert!() + end + defp repo_pending_event(%Config{repo: repo}, workflow_id, event_name) do import Ecto.Query diff --git a/durable/test/durable/wait_test.exs b/durable/test/durable/wait_test.exs index 60bfb92..f32d2ef 100644 --- a/durable/test/durable/wait_test.exs +++ b/durable/test/durable/wait_test.exs @@ -20,6 +20,21 @@ defmodule Durable.WaitTest do import Ecto.Query + # The SleepWaker keys off `scheduled_at <= clock_timestamp()` (real wall + # clock). A `sleep(1)` schedules only 1ms out, which races the + # immediately-following wake in a warm full-suite run and makes these tests + # flaky (sometimes `woken == 0`). Back-dating the row's scheduled_at makes + # the elapsed condition deterministically true without changing what the + # test exercises (the wake -> resume mechanism). + defp force_sleep_elapsed!(repo, workflow_id) do + repo.update_all( + from(w in WorkflowExecution, where: w.id == ^workflow_id and w.status == :waiting), + set: [scheduled_at: DateTime.add(DateTime.utc_now(), -60, :second)] + ) + + :ok + end + # ============================================================================ # Pause Primitives Tests # ============================================================================ @@ -47,9 +62,19 @@ defmodule Durable.WaitTest do assert diff_ms >= 29_000 and diff_ms <= 31_000 end - # Note: Resume for sleep is complex as the step re-runs from beginning - # This would require tracking sleep state at the step level - # For now, we just test that sleep correctly suspends the workflow + test "clears the lock when suspending" do + config = Config.get(Durable) + repo = config.repo + + {:ok, execution} = create_and_execute_workflow(SleepTestWorkflow, %{}) + + execution = repo.get!(WorkflowExecution, execution.id) + # The waker depends on a clear lock — without this guarantee a + # stale `locked_by` from the worker that suspended the row would + # keep it invisible to fetch_jobs after the wake. + assert execution.locked_by == nil + assert execution.locked_at == nil + end end describe "schedule_at/1" do @@ -75,6 +100,181 @@ defmodule Durable.WaitTest do end end + # ============================================================================ + # Sleep wake-up (full lifecycle: suspend -> waker -> resume -> complete) + # ============================================================================ + + describe "sleep + waker resumption" do + alias Durable.Queue.Adapter + alias Durable.Wait.SleepWaker + + test "sweeper flips elapsed sleep back to :pending and writes marker" do + config = Config.get(Durable) + repo = config.repo + + {:ok, execution} = create_and_execute_workflow(ShortSleepWorkflow, %{}) + assert execution.status == :waiting + force_sleep_elapsed!(repo, execution.id) + + adapter = Adapter.default_adapter() + {:ok, woken} = adapter.wake_sleeping_workflows(config, 100) + assert woken == 1 + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :pending + assert execution.locked_by == nil + assert execution.locked_at == nil + assert execution.context["__sleep_satisfied__"] == "sleep_step" + end + + test "resuming after wake completes the workflow past the sleep" do + config = Config.get(Durable) + repo = config.repo + + {:ok, execution} = create_and_execute_workflow(ShortSleepWorkflow, %{}) + assert execution.status == :waiting + force_sleep_elapsed!(repo, execution.id) + + Adapter.default_adapter().wake_sleeping_workflows(config, 100) + + # Drive the resumed workflow synchronously, the way the queue + # poller would after fetch_jobs claims the now-:pending row. + Durable.Executor.execute_workflow(execution.id, config) + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :completed + assert execution.context["before"] == true + assert execution.context["slept"] == true + assert execution.context["completed"] == true + end + + test "schedule_at resumes the same way as sleep" do + config = Config.get(Durable) + repo = config.repo + + {:ok, execution} = create_and_execute_workflow(ShortScheduleAtWorkflow, %{}) + assert execution.status == :waiting + + Adapter.default_adapter().wake_sleeping_workflows(config, 100) + Durable.Executor.execute_workflow(execution.id, config) + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :completed + assert execution.context["scheduled"] == true + end + + test "two sequential sleep steps both wake correctly" do + config = Config.get(Durable) + repo = config.repo + + {:ok, execution} = create_and_execute_workflow(TwoSleepWorkflow, %{}) + assert execution.status == :waiting + force_sleep_elapsed!(repo, execution.id) + + Adapter.default_adapter().wake_sleeping_workflows(config, 100) + Durable.Executor.execute_workflow(execution.id, config) + execution = repo.get!(WorkflowExecution, execution.id) + # Suspended again at the second sleep. The first sleep's marker was + # dropped the moment :first_sleep completed (one-shot), so it does not + # linger into the second suspension. + assert execution.status == :waiting + assert execution.context["first_done"] == true + assert execution.context["__sleep_satisfied__"] == nil + + force_sleep_elapsed!(repo, execution.id) + Adapter.default_adapter().wake_sleeping_workflows(config, 100) + Durable.Executor.execute_workflow(execution.id, config) + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :completed + assert execution.context["second_done"] == true + # One-shot marker: once its consuming step completes the marker is + # removed, so the finished workflow carries no `__sleep_satisfied__`. + # (A lingering marker would make a loop/`each` re-entry of the same + # sleep step skip its sleep.) + assert execution.context["__sleep_satisfied__"] == nil + end + + test "the sweep is a no-op when no rows are eligible" do + config = Config.get(Durable) + adapter = Adapter.default_adapter() + + assert {:ok, 0} = adapter.wake_sleeping_workflows(config, 100) + end + + test "a sleep followed by an event-wait is NOT prematurely re-woken (scheduled_at cleared)" do + # Regression for the critical bug: scheduled_at was never cleared once a + # sleep woke, so when the *next* step suspended on an event the stale + # scheduled_at matched the SleepWaker query and the row was flipped + # :waiting -> :pending every tick, churning status and piling up + # duplicate PendingEvent rows. + config = Config.get(Durable) + repo = config.repo + adapter = Adapter.default_adapter() + + {:ok, execution} = create_and_execute_workflow(SleepThenEventWorkflow, %{}) + assert execution.status == :waiting + assert execution.scheduled_at != nil + force_sleep_elapsed!(repo, execution.id) + + # Wake the sleep and resume — the workflow runs :nap, then suspends on + # the "go" event in :await. + assert {:ok, 1} = adapter.wake_sleeping_workflows(config, 100) + Durable.Executor.execute_workflow(execution.id, config) + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :waiting + assert execution.context["napped"] == true + # The event-wait suspension must clear scheduled_at so the SleepWaker + # leaves the row alone. + assert execution.scheduled_at == nil + + pending_count = fn -> + repo.aggregate( + from(p in PendingEvent, where: p.workflow_id == ^execution.id and p.status == :pending), + :count + ) + end + + assert pending_count.() == 1 + + # The waker must now be a no-op for this row, no matter how many ticks + # fire. Before the fix each call woke it and duplicated the pending event. + assert {:ok, 0} = adapter.wake_sleeping_workflows(config, 100) + assert {:ok, 0} = adapter.wake_sleeping_workflows(config, 100) + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :waiting + assert pending_count.() == 1 + + # And the event still resumes the workflow to completion. + :ok = Wait.send_event(execution.id, "go", %{ok: true}) + Durable.Executor.execute_workflow(execution.id, config) + + execution = repo.get!(WorkflowExecution, execution.id) + assert execution.status == :completed + assert execution.context["result"]["ok"] == true + end + + test "wake_now/1 dispatches via the named SleepWaker" do + # The DataCase default boots Durable with `queue_enabled: false`, so the + # SleepWaker isn't running. Start one manually and verify wake_now/1 + # routes to the *named* GenServer and returns a sweep result. The actual + # waking of an elapsed row is covered deterministically by the + # adapter-level tests above (which call wake_sleeping_workflows directly + # on the test connection); this test isolates the named-dispatch path, + # which would otherwise be entangled with cross-process sandbox + # visibility under the shared-mode harness. + config = Config.get(Durable) + pid = start_supervised!({SleepWaker, config: config, interval: 60_000}) + assert is_pid(pid) + assert Process.whereis(SleepWaker.worker_name(Durable)) == pid + + assert {:ok, count} = SleepWaker.wake_now(Durable) + assert is_integer(count) and count >= 0 + end + end + # ============================================================================ # Event Waiting Tests # ============================================================================ @@ -746,9 +946,19 @@ defmodule Durable.WaitTest do wait_group = get_wait_group(repo, execution.id) assert wait_group.status == :completed - # Trying to send second event should fail - either :not_found or :not_waiting - result = Wait.send_event(execution.id, "failure", %{"status" => "error"}) - assert result in [{:error, :not_found}, {:error, :not_waiting}] + # A late event for a wait group that has already :completed is a + # silent no-op — the wait_group's received_events does NOT grow, + # and the workflow does not resume a second time. This is the + # intentional contract: late arrivals are idempotent rather than + # error-returning, because non-atomic implementations of + # send_event used to leak inconsistent states (event :received + # but wait group not updated) and tests can't rely on the error + # type that previously surfaced. + :ok = Wait.send_event(execution.id, "failure", %{"status" => "error"}) + + wait_group = get_wait_group(repo, execution.id) + assert wait_group.status == :completed + refute Map.has_key?(wait_group.received_events, "failure") end end @@ -1000,6 +1210,81 @@ defmodule ScheduleAtTestWorkflow do end end +# Short-duration workflows for the wake-up integration tests. The 1ms +# sleep / past-DateTime is essentially "wake immediately on next sweep," +# which is what the tests need to drive the full lifecycle without +# slowing the suite down. + +defmodule ShortSleepWorkflow do + use Durable + use Durable.Helpers + use Durable.Wait + + workflow "short_sleep" do + step(:before_step, fn data -> {:ok, assign(data, :before, true)} end) + + step(:sleep_step, fn data -> + sleep(1) + {:ok, assign(data, :slept, true)} + end) + + step(:after_step, fn data -> {:ok, assign(data, :completed, true)} end) + end +end + +defmodule ShortScheduleAtWorkflow do + use Durable + use Durable.Helpers + use Durable.Wait + + workflow "short_schedule_at" do + step(:schedule_step, fn data -> + schedule_at(DateTime.add(DateTime.utc_now(), -1, :second)) + {:ok, assign(data, :scheduled, true)} + end) + end +end + +defmodule TwoSleepWorkflow do + use Durable + use Durable.Helpers + use Durable.Wait + + workflow "two_sleep" do + step(:first_sleep, fn data -> + sleep(1) + {:ok, assign(data, :first_done, true)} + end) + + step(:second_sleep, fn data -> + sleep(1) + {:ok, assign(data, :second_done, true)} + end) + end +end + +defmodule SleepThenEventWorkflow do + use Durable + use Durable.Helpers + use Durable.Wait + + # A sleep step immediately followed by an event-wait step. This is the + # shape that exposed the `scheduled_at` premature-wake bug: after the sleep + # woke and completed, the row's stale scheduled_at made the SleepWaker + # repeatedly flip the event-waiting row back to :pending. + workflow "sleep_then_event" do + step(:nap, fn data -> + sleep(1) + {:ok, assign(data, :napped, true)} + end) + + step(:await, fn data -> + result = wait_for_event("go") + {:ok, assign(data, :result, result)} + end) + end +end + defmodule EventWaitTestWorkflow do use Durable use Durable.Helpers diff --git a/durable/test/test_helper.exs b/durable/test/test_helper.exs index c76980f..aa65500 100644 --- a/durable/test/test_helper.exs +++ b/durable/test/test_helper.exs @@ -1,6 +1,11 @@ # Start the test repo {:ok, _} = Durable.TestRepo.start_link() -ExUnit.start() +# `:integration` tests use REAL (committed) connections via +# `Sandbox.unboxed_run/2` to exercise genuine multi-backend concurrency +# (FOR UPDATE SKIP LOCKED, FOR UPDATE row locks) that the shared SQL Sandbox +# structurally cannot. They truncate tables, so they must run in isolation — +# excluded from the default suite, run with `mix test --only integration`. +ExUnit.start(exclude: [:integration]) Ecto.Adapters.SQL.Sandbox.mode(Durable.TestRepo, :manual) diff --git a/durable_dashboard/DESIGN.md b/durable_dashboard/DESIGN.md index 7c19316..96ebd8d 100644 --- a/durable_dashboard/DESIGN.md +++ b/durable_dashboard/DESIGN.md @@ -260,7 +260,19 @@ component. ### `<.code>` -Inline code chip — IDs, JSON snippets, durations, queue names. +Inline code chip — IDs, JSON snippets, durations, queue names. Pass +`copy={"the text"}` to append a hover-revealed `<.copy_button>` (used on the +execution id, settings config values). + +### `<.copy_button>` + +A small ghost clipboard button that copies `text` to the clipboard and flashes +a check (clipboard ⇄ check via `.copy-btn[data-copied]` CSS). It carries +`data-copy` and is driven by **one delegated listener** (`hooks/copy.ts`, wired +in `main.ts`) — no `phx-hook`/`id` per button. Reveal it on hover of its host +(`opacity-0 group-hover/…:opacity-100`), never always-on (DENSITY §8). Already +baked into `<.json>` (top-right of every JSON block) and `<.code copy={…}>`; +drop it beside any other copy-worthy value rather than hand-rolling a button. ### `<.kbd>` @@ -268,8 +280,40 @@ Keyboard hint inside command palette / tooltips. ### `<.relative_time at={dt}>` -Humanized "2m ago" with absolute ISO in `title`. Always use this; never -hand-format times. +Humanized "2m ago" (past) or "in 2h" (future) for durations. Always use this; +never hand-format times. It is sign-aware — a future timestamp (e.g. a +schedule's next run) reads "in …", never "just now". The hover tooltip is +localized to the viewer's timezone by the same `local_time.ts` hook (it carries +`data-rel` so only the title is rewritten; the relative text stays). Use +`local_time` instead when the user needs the exact moment shown inline. + +### `<.local_time at={dt} format="time|datetime|date">` + +For **absolute** timestamps the user reads. The server only knows UTC, so a +raw ISO string is ambiguous; this emits a `