From de410f44bfe5b2a15a696956a50f6abc202ced75 Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Thu, 4 Jun 2026 16:32:25 -0700 Subject: [PATCH 1/5] feat(stovepipe): add change ingestion hooks and push filtering Introduce ChangeInfo, ChangeIngester/ChangeHandler, a no-op logging handler stub, and URI watch filtering for downstream ingestion. --- stovepipe/core/filter/filter.go | 41 +++++++++++++++++ stovepipe/entity/entity.go | 37 ++++++++++++++- stovepipe/extension/changeingester/logging.go | 46 +++++++++++++++++++ stovepipe/extension/extension.go | 18 ++++++++ 4 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 stovepipe/core/filter/filter.go create mode 100644 stovepipe/extension/changeingester/logging.go diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go new file mode 100644 index 00000000..356f6d56 --- /dev/null +++ b/stovepipe/core/filter/filter.go @@ -0,0 +1,41 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package filter implements a filter for commit events. +package filter + +import ( + "strings" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// Config controls which VCS URIs are watched. +// WatchedURIPrefixes is a list of URI prefixes to match against ChangeInfo.URI. +// Example: "git://github.com/uber/go-code/refs/heads/main" +// watches all commits on the main branch of uber/go-code. +type Config struct { + WatchedURIPrefixes []string +} + +// ShouldProcess returns true if the commit event's URI matches +// any of the configured watched prefixes. +func ShouldProcess(cfg Config, event entity.ChangeInfo) bool { + for _, prefix := range cfg.WatchedURIPrefixes { + if strings.HasPrefix(event.URI, prefix) { + return true + } + } + return false +} diff --git a/stovepipe/entity/entity.go b/stovepipe/entity/entity.go index 15ac8680..80c625df 100644 --- a/stovepipe/entity/entity.go +++ b/stovepipe/entity/entity.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -12,5 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package entity holds Stovepipe-specific domain types (distinct from shared repo entity/). +// Package entity holds Stovepipe-specific domain entities. package entity + +// ChangeInfo represents a new change detected on a VCS remote. +// It is intentionally VCS-agnostic: the URI scheme carries the +// provider identity, mirroring the github:// scheme used in +// SubmitQueue's ChangeInfo. +// +// URI format: "git://///" +// Example: "git://github.com/uber/go-code/refs/heads/main/c3a4d5e6f789..." +// +// Fields are immutable after construction. +type ChangeInfo struct { + // URI is the canonical VCS identifier for this change. + // Scheme is "git://"; path encodes host, repo, ref, and new revision. + // This mirrors the ChangeInfo.URI pattern used in SubmitQueue. + URI string `json:"uri"` + + // PreviousURI is the URI of the prior revision on the same ref, if known. + // Empty string if unavailable. + // Example: "git://github.com/uber/go-code/refs/heads/main/aabbccdd..." + PreviousURI string `json:"previous_uri,omitempty"` + + // Author is the identity of the person who authored the change. + Author Author `json:"author"` +} + +// Author identifies the person who authored a change. +// Mirrors SubmitQueue's Author to keep the two domains consistent. +type Author struct { + // Name is the display name of the author. + Name string `json:"name"` + // Email is the email address of the author. + Email string `json:"email,omitempty"` +} diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go new file mode 100644 index 00000000..e3c50e57 --- /dev/null +++ b/stovepipe/extension/changeingester/logging.go @@ -0,0 +1,46 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package changeingester provides ChangeIngester implementations. +package changeingester + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension" + "go.uber.org/zap" +) + +// LoggingHandler is a stub ChangeHandler that logs received changes. +// Replace with real persistence logic once DB schema is ready. +type LoggingHandler struct { + logger *zap.Logger +} + +// New constructs a new LoggingHandler. +// The return type enforces interface compliance at compile time. +func New(logger *zap.Logger) extension.ChangeHandler { + return LoggingHandler{logger: logger} +} + +func (h LoggingHandler) IngestChange(ctx context.Context, info entity.ChangeInfo) error { + h.logger.Info("ingested change", + zap.String("uri", info.URI), + zap.String("previous_uri", info.PreviousURI), + zap.String("author", info.Author.Name), + zap.String("author_email", info.Author.Email), + ) + return nil +} diff --git a/stovepipe/extension/extension.go b/stovepipe/extension/extension.go index 92d4bcea..5a891faf 100644 --- a/stovepipe/extension/extension.go +++ b/stovepipe/extension/extension.go @@ -14,3 +14,21 @@ // Package extension holds Stovepipe-specific extension implementations. package extension + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// ChangeIngester subscribes to change events from a VCS source +// and dispatches them for processing. The source and VCS are +// implementation details left to the injected backend. +type ChangeIngester interface { + Start(ctx context.Context) error +} + +// ChangeHandler processes a single change received from the ingester. +type ChangeHandler interface { + IngestChange(ctx context.Context, info entity.ChangeInfo) error +} From bbc3a5cd8ecee1885c7b9ea43a3221196db63e42 Mon Sep 17 00:00:00 2001 From: gsturges <59843992+gsturges@users.noreply.github.com> Date: Mon, 8 Jun 2026 10:19:26 -0700 Subject: [PATCH 2/5] Apply suggestion from @behinddwalls Co-authored-by: Preetam Dwivedi --- stovepipe/core/filter/filter.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go index 356f6d56..31e72513 100644 --- a/stovepipe/core/filter/filter.go +++ b/stovepipe/core/filter/filter.go @@ -25,14 +25,8 @@ import ( // WatchedURIPrefixes is a list of URI prefixes to match against ChangeInfo.URI. // Example: "git://github.com/uber/go-code/refs/heads/main" // watches all commits on the main branch of uber/go-code. -type Config struct { - WatchedURIPrefixes []string -} - -// ShouldProcess returns true if the commit event's URI matches -// any of the configured watched prefixes. -func ShouldProcess(cfg Config, event entity.ChangeInfo) bool { - for _, prefix := range cfg.WatchedURIPrefixes { +func ShouldProcess(event entity.ChangeInfo, watchedPrefixes []string) bool { + for _, prefix := range watchedPrefixes { if strings.HasPrefix(event.URI, prefix) { return true } From 8c8aa2a1b1a7734a04c83448f7dcd8bbad759df0 Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Tue, 9 Jun 2026 11:08:15 -0700 Subject: [PATCH 3/5] feat(stovepipe): scaffold pipeline entities and storage extension Add Commit and Batch as the core domain entities the validation pipeline operates on, each with status types and optimistic-locking Version fields. Add CommitStore and BatchStore extension interfaces scoped to their respective service owners (gateway and orchestrator). Fix missing BUILD.bazel files for the filter and changeingester packages. Flatten ChangeInfo.Author into AuthorName/AuthorEmail fields directly. --- stovepipe/core/filter/BUILD.bazel | 9 +++ stovepipe/entity/BUILD.bazel | 3 +- stovepipe/entity/batch.go | 63 +++++++++++++++++++ stovepipe/entity/commit.go | 57 +++++++++++++++++ stovepipe/entity/entity.go | 16 ++--- stovepipe/extension/BUILD.bazel | 1 + .../extension/changeingester/BUILD.bazel | 13 ++++ stovepipe/extension/changeingester/logging.go | 4 +- stovepipe/extension/storage/BUILD.bazel | 13 ++++ stovepipe/extension/storage/batch_store.go | 43 +++++++++++++ stovepipe/extension/storage/commit_store.go | 48 ++++++++++++++ stovepipe/extension/storage/storage.go | 44 +++++++++++++ 12 files changed, 299 insertions(+), 15 deletions(-) create mode 100644 stovepipe/core/filter/BUILD.bazel create mode 100644 stovepipe/entity/batch.go create mode 100644 stovepipe/entity/commit.go create mode 100644 stovepipe/extension/changeingester/BUILD.bazel create mode 100644 stovepipe/extension/storage/BUILD.bazel create mode 100644 stovepipe/extension/storage/batch_store.go create mode 100644 stovepipe/extension/storage/commit_store.go create mode 100644 stovepipe/extension/storage/storage.go diff --git a/stovepipe/core/filter/BUILD.bazel b/stovepipe/core/filter/BUILD.bazel new file mode 100644 index 00000000..454754b5 --- /dev/null +++ b/stovepipe/core/filter/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "filter", + srcs = ["filter.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/filter", + visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], +) diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index a0c70059..cda29058 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -3,8 +3,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "entity", srcs = [ + "batch.go", + "commit.go", "entity.go", - "ingest_request.go", ], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], diff --git a/stovepipe/entity/batch.go b/stovepipe/entity/batch.go new file mode 100644 index 00000000..feb580c4 --- /dev/null +++ b/stovepipe/entity/batch.go @@ -0,0 +1,63 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// BatchStatus is the state of a validation batch as it moves through the pipeline. +type BatchStatus string + +const ( + // BatchStatusUnknown is the unreachable default. It should never be seen in the system. + BatchStatusUnknown BatchStatus = "" + // BatchStatusPending means the batch has been created but speculate has not yet run. + BatchStatusPending BatchStatus = "pending" + // BatchStatusBuilding means the batch is moving through the speculate→build→buildsignal→bisect cycle. + BatchStatusBuilding BatchStatus = "building" + // BatchStatusSucceeded means all commits in the batch's range have been validated green. + BatchStatusSucceeded BatchStatus = "succeeded" + // BatchStatusFailed means an offending commit in the range has been isolated and marked failed. + BatchStatusFailed BatchStatus = "failed" +) + +// IsBatchStatusTerminal returns true if the batch has reached a final state. +func IsBatchStatusTerminal(s BatchStatus) bool { + return s == BatchStatusSucceeded || s == BatchStatusFailed +} + +// Batch is a contiguous range of trunk commits submitted for validation together. +// The range spans from FromSHA (oldest, inclusive) to ToSHA (newest, inclusive) +// and represents all commits since the last known green on the branch. +// Bisection creates sub-range batches from the same type — there is no separate +// bisection entity; the state of the search lives in the ordinary batch results. +type Batch struct { + // ID is the unique identifier for this batch. + ID string + // FromSHA is the oldest commit SHA in the validation range (inclusive). + FromSHA string + // ToSHA is the newest commit SHA in the validation range (inclusive). + ToSHA string + // Repository is the repository this batch validates. + Repository string + // Branch is the branch this batch validates. + Branch string + // Status is the current state of this batch. + Status BatchStatus + // Version is incremented on each update and used for optimistic locking. + // Version arithmetic lives in the controller; the store performs a pure conditional write. + Version int32 + // CreatedAt is the time this batch was created, in milliseconds since epoch. + CreatedAt int64 + // UpdatedAt is the time this batch was last updated, in milliseconds since epoch. + UpdatedAt int64 +} diff --git a/stovepipe/entity/commit.go b/stovepipe/entity/commit.go new file mode 100644 index 00000000..b5850f86 --- /dev/null +++ b/stovepipe/entity/commit.go @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// CommitStatus is the validation state of a trunk commit as determined by Stovepipe. +type CommitStatus string + +const ( + // CommitStatusUnknown is the default state when a commit is first ingested. + // The commit has landed on main but has not yet been validated. + CommitStatusUnknown CommitStatus = "" + // CommitStatusSucceeded means the relevant targets build and test successfully at this commit. + CommitStatusSucceeded CommitStatus = "succeeded" + // CommitStatusFailed means a target is broken at this commit; it is the offending change. + CommitStatusFailed CommitStatus = "failed" +) + +// IsCommitStatusTerminal returns true if the status is a final, irreversible state. +func IsCommitStatusTerminal(s CommitStatus) bool { + return s == CommitStatusSucceeded || s == CommitStatusFailed +} + +// Commit is a trunk commit tracked by Stovepipe. The SHA scoped by Repository and +// Branch is the natural identity and dedup key: a commit announced by both a webhook +// and a poll backfill resolves to the same record and is processed once. +type Commit struct { + // SHA is the full commit hash. Identity key; immutable after creation. + SHA string + // Repository is the repository URI (e.g. "github.com/uber/go-code"). + Repository string + // Branch is the target branch (e.g. "main"). + Branch string + // CommitterTimeMs is the committer timestamp in milliseconds since epoch. + // Used to order commits within a range and to establish the trunk sequence. + CommitterTimeMs int64 + // Status is the current validation state of this commit. + Status CommitStatus + // Version is incremented on each update and used for optimistic locking. + // Version arithmetic lives in the controller; the store performs a pure conditional write. + Version int32 + // CreatedAt is the time this commit was first recorded, in milliseconds since epoch. + CreatedAt int64 + // UpdatedAt is the time this commit was last updated, in milliseconds since epoch. + UpdatedAt int64 +} diff --git a/stovepipe/entity/entity.go b/stovepipe/entity/entity.go index 80c625df..a7136aa3 100644 --- a/stovepipe/entity/entity.go +++ b/stovepipe/entity/entity.go @@ -27,7 +27,6 @@ package entity type ChangeInfo struct { // URI is the canonical VCS identifier for this change. // Scheme is "git://"; path encodes host, repo, ref, and new revision. - // This mirrors the ChangeInfo.URI pattern used in SubmitQueue. URI string `json:"uri"` // PreviousURI is the URI of the prior revision on the same ref, if known. @@ -35,15 +34,8 @@ type ChangeInfo struct { // Example: "git://github.com/uber/go-code/refs/heads/main/aabbccdd..." PreviousURI string `json:"previous_uri,omitempty"` - // Author is the identity of the person who authored the change. - Author Author `json:"author"` -} - -// Author identifies the person who authored a change. -// Mirrors SubmitQueue's Author to keep the two domains consistent. -type Author struct { - // Name is the display name of the author. - Name string `json:"name"` - // Email is the email address of the author. - Email string `json:"email,omitempty"` + // AuthorName is the display name of the person who authored the change. + AuthorName string `json:"author_name,omitempty"` + // AuthorEmail is the email address of the author. + AuthorEmail string `json:"author_email,omitempty"` } diff --git a/stovepipe/extension/BUILD.bazel b/stovepipe/extension/BUILD.bazel index bbf4f83b..ac032b39 100644 --- a/stovepipe/extension/BUILD.bazel +++ b/stovepipe/extension/BUILD.bazel @@ -5,4 +5,5 @@ go_library( srcs = ["extension.go"], importpath = "github.com/uber/submitqueue/stovepipe/extension", visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], ) diff --git a/stovepipe/extension/changeingester/BUILD.bazel b/stovepipe/extension/changeingester/BUILD.bazel new file mode 100644 index 00000000..4575d4b3 --- /dev/null +++ b/stovepipe/extension/changeingester/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "changeingester", + srcs = ["logging.go"], + importpath = "github.com/uber/submitqueue/stovepipe/extension/changeingester", + visibility = ["//visibility:public"], + deps = [ + "//stovepipe/entity", + "//stovepipe/extension", + "@org_uber_go_zap//:zap", + ], +) diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go index e3c50e57..69e6dc2f 100644 --- a/stovepipe/extension/changeingester/logging.go +++ b/stovepipe/extension/changeingester/logging.go @@ -39,8 +39,8 @@ func (h LoggingHandler) IngestChange(ctx context.Context, info entity.ChangeInfo h.logger.Info("ingested change", zap.String("uri", info.URI), zap.String("previous_uri", info.PreviousURI), - zap.String("author", info.Author.Name), - zap.String("author_email", info.Author.Email), + zap.String("author_name", info.AuthorName), + zap.String("author_email", info.AuthorEmail), ) return nil } diff --git a/stovepipe/extension/storage/BUILD.bazel b/stovepipe/extension/storage/BUILD.bazel new file mode 100644 index 00000000..d06b7577 --- /dev/null +++ b/stovepipe/extension/storage/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "storage", + srcs = [ + "batch_store.go", + "commit_store.go", + "storage.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/extension/storage", + visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], +) diff --git a/stovepipe/extension/storage/batch_store.go b/stovepipe/extension/storage/batch_store.go new file mode 100644 index 00000000..316d7a59 --- /dev/null +++ b/stovepipe/extension/storage/batch_store.go @@ -0,0 +1,43 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=batch_store.go -destination=mock/batch_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// BatchStore is the orchestrator-owned store for in-flight validation batches. +// A batch represents a contiguous range of trunk commits under validation. +// Bisection reuses the same type — sub-range probes are ordinary batches +// driven through the same speculate→build→buildsignal→bisect loop. +type BatchStore interface { + // Get retrieves a batch by ID. Returns ErrNotFound if no record exists. + Get(ctx context.Context, id string) (entity.Batch, error) + + // Create records a new batch with status BatchStatusPending. + // Returns ErrAlreadyExists if a batch with the same ID already exists. + Create(ctx context.Context, batch entity.Batch) error + + // UpdateStatus updates the batch's status and advances the version from + // oldVersion to newVersion. Returns ErrVersionMismatch if the current + // persisted version does not match oldVersion; the caller must re-read and retry. + // Version arithmetic is owned by the caller (controller); the store performs + // a pure conditional write. + UpdateStatus(ctx context.Context, id string, oldVersion, newVersion int32, status entity.BatchStatus) error +} diff --git a/stovepipe/extension/storage/commit_store.go b/stovepipe/extension/storage/commit_store.go new file mode 100644 index 00000000..579b9c6a --- /dev/null +++ b/stovepipe/extension/storage/commit_store.go @@ -0,0 +1,48 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=commit_store.go -destination=mock/commit_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// CommitStore is the gateway-owned store for trunk commit health state. +// It is the authoritative record of each commit's validation status and is the +// only storage the gateway's GetStatus RPC reads from. +// +// The (Repository, Branch, SHA) triple is the identity key and dedup handle: +// a commit announced by both a webhook and a poll backfill resolves to the same +// row and is processed once. +type CommitStore interface { + // Get retrieves a commit by its (repository, branch, sha) identity key. + // Returns ErrNotFound if no record exists. + Get(ctx context.Context, repository, branch, sha string) (entity.Commit, error) + + // Create records a new commit. The status must be CommitStatusUnknown. + // Returns ErrAlreadyExists if a commit with the same identity already exists; + // callers treat this as a successful dedup, not a failure. + Create(ctx context.Context, commit entity.Commit) error + + // UpdateStatus updates the commit's status and advances the version from + // oldVersion to newVersion. Returns ErrVersionMismatch if the current + // persisted version does not match oldVersion; the caller must re-read and retry. + // Version arithmetic is owned by the caller (controller); the store performs + // a pure conditional write. + UpdateStatus(ctx context.Context, repository, branch, sha string, oldVersion, newVersion int32, status entity.CommitStatus) error +} diff --git a/stovepipe/extension/storage/storage.go b/stovepipe/extension/storage/storage.go new file mode 100644 index 00000000..be4b980a --- /dev/null +++ b/stovepipe/extension/storage/storage.go @@ -0,0 +1,44 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package storage defines the storage extension interfaces for the Stovepipe domain. +// The gateway owns the CommitStore (commit-status store and event log). +// The orchestrator owns the BatchStore (in-flight batches and pipeline working state). +// The two services share no storage; they communicate only through the messaging queue. +package storage + +import ( + "errors" + "fmt" +) + +// ErrNotFound is returned by storage implementations when the requested record is not found. +var ErrNotFound = errors.New("record not found") + +// IsNotFound returns true if any error in the chain is ErrNotFound. +func IsNotFound(err error) bool { + return errors.Is(err, ErrNotFound) +} + +// WrapNotFound wraps ErrNotFound with the underlying implementation error. +func WrapNotFound(err error) error { + return fmt.Errorf("%w: %w", ErrNotFound, err) +} + +// ErrAlreadyExists is returned when attempting to create a record whose identity already exists. +var ErrAlreadyExists = errors.New("record already exists") + +// ErrVersionMismatch is returned when an optimistic-locking CAS write finds the persisted +// version does not match the expected version. Callers should retry from a fresh read. +var ErrVersionMismatch = errors.New("version mismatch") From 4205299fcbea3726add057027d37897fcc88c4a2 Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Tue, 9 Jun 2026 17:34:44 -0700 Subject: [PATCH 4/5] refactor(stovepipe): scope entities and extensions to gateway only Replace ChangeInfo with ChangeEvent as the ingestion payload type and introduce ChangeURI as the lightweight pipeline reference. Update Commit entity and create CommitStatusentity. Update ChangeHandler and the filter to accept ChangeEvent. Remove the storage extension and Batch entity, which belong to the orchestrator. --- stovepipe/core/filter/filter.go | 4 +- stovepipe/entity/BUILD.bazel | 17 +---- stovepipe/entity/batch.go | 63 ------------------ stovepipe/entity/change_event.go | 59 +++++++++++++++++ stovepipe/entity/change_uri.go | 36 +++++++++++ stovepipe/entity/commit.go | 64 ++++++++++--------- stovepipe/entity/entity.go | 41 ------------ stovepipe/extension/changeingester/logging.go | 9 ++- stovepipe/extension/extension.go | 2 +- stovepipe/extension/storage/BUILD.bazel | 13 ---- stovepipe/extension/storage/batch_store.go | 43 ------------- stovepipe/extension/storage/commit_store.go | 48 -------------- stovepipe/extension/storage/storage.go | 44 ------------- 13 files changed, 140 insertions(+), 303 deletions(-) delete mode 100644 stovepipe/entity/batch.go create mode 100644 stovepipe/entity/change_event.go create mode 100644 stovepipe/entity/change_uri.go delete mode 100644 stovepipe/entity/entity.go delete mode 100644 stovepipe/extension/storage/BUILD.bazel delete mode 100644 stovepipe/extension/storage/batch_store.go delete mode 100644 stovepipe/extension/storage/commit_store.go delete mode 100644 stovepipe/extension/storage/storage.go diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go index 31e72513..4130bb5a 100644 --- a/stovepipe/core/filter/filter.go +++ b/stovepipe/core/filter/filter.go @@ -22,10 +22,10 @@ import ( ) // Config controls which VCS URIs are watched. -// WatchedURIPrefixes is a list of URI prefixes to match against ChangeInfo.URI. +// WatchedURIPrefixes is a list of URI prefixes to match against ChangeEvent.URI. // Example: "git://github.com/uber/go-code/refs/heads/main" // watches all commits on the main branch of uber/go-code. -func ShouldProcess(event entity.ChangeInfo, watchedPrefixes []string) bool { +func ShouldProcess(event entity.ChangeEvent, watchedPrefixes []string) bool { for _, prefix := range watchedPrefixes { if strings.HasPrefix(event.URI, prefix) { return true diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index cda29058..17ed12c7 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -3,22 +3,11 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "entity", srcs = [ - "batch.go", + "change_event.go", + "change_uri.go", "commit.go", - "entity.go", ], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], - deps = ["//platform/base/change"], -) - -go_test( - name = "entity_test", - srcs = ["ingest_request_test.go"], - embed = [":entity"], - deps = [ - "//platform/base/change", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - ], + deps = ["//platform/base/change/git"], ) diff --git a/stovepipe/entity/batch.go b/stovepipe/entity/batch.go deleted file mode 100644 index feb580c4..00000000 --- a/stovepipe/entity/batch.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -// BatchStatus is the state of a validation batch as it moves through the pipeline. -type BatchStatus string - -const ( - // BatchStatusUnknown is the unreachable default. It should never be seen in the system. - BatchStatusUnknown BatchStatus = "" - // BatchStatusPending means the batch has been created but speculate has not yet run. - BatchStatusPending BatchStatus = "pending" - // BatchStatusBuilding means the batch is moving through the speculate→build→buildsignal→bisect cycle. - BatchStatusBuilding BatchStatus = "building" - // BatchStatusSucceeded means all commits in the batch's range have been validated green. - BatchStatusSucceeded BatchStatus = "succeeded" - // BatchStatusFailed means an offending commit in the range has been isolated and marked failed. - BatchStatusFailed BatchStatus = "failed" -) - -// IsBatchStatusTerminal returns true if the batch has reached a final state. -func IsBatchStatusTerminal(s BatchStatus) bool { - return s == BatchStatusSucceeded || s == BatchStatusFailed -} - -// Batch is a contiguous range of trunk commits submitted for validation together. -// The range spans from FromSHA (oldest, inclusive) to ToSHA (newest, inclusive) -// and represents all commits since the last known green on the branch. -// Bisection creates sub-range batches from the same type — there is no separate -// bisection entity; the state of the search lives in the ordinary batch results. -type Batch struct { - // ID is the unique identifier for this batch. - ID string - // FromSHA is the oldest commit SHA in the validation range (inclusive). - FromSHA string - // ToSHA is the newest commit SHA in the validation range (inclusive). - ToSHA string - // Repository is the repository this batch validates. - Repository string - // Branch is the branch this batch validates. - Branch string - // Status is the current state of this batch. - Status BatchStatus - // Version is incremented on each update and used for optimistic locking. - // Version arithmetic lives in the controller; the store performs a pure conditional write. - Version int32 - // CreatedAt is the time this batch was created, in milliseconds since epoch. - CreatedAt int64 - // UpdatedAt is the time this batch was last updated, in milliseconds since epoch. - UpdatedAt int64 -} diff --git a/stovepipe/entity/change_event.go b/stovepipe/entity/change_event.go new file mode 100644 index 00000000..d410aa20 --- /dev/null +++ b/stovepipe/entity/change_event.go @@ -0,0 +1,59 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import ( + "encoding/json" + "fmt" + + entitygit "github.com/uber/submitqueue/platform/base/change/git" +) + +// ChangeEvent represents a single new trunk change entering the pipeline, published to the +// start topic. A trunk change is one commit, so the event carries one git-backed URI. It is +// source-agnostic: both the webhook and the reconciliation poller emit it. Additional fields +// (e.g. source, committer time) can be added later as ingestion needs them. +type ChangeEvent struct { + // URI identifies the commit that entered the pipeline (git://owner/repo/branch/revision). + URI string `json:"uri"` +} + +// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload. +func (e ChangeEvent) ToBytes() ([]byte, error) { + return json.Marshal(e) +} + +// Validate checks that the change event carries a valid git-backed commit URI. +func (e ChangeEvent) Validate() error { + if e.URI == "" { + return fmt.Errorf("change event requires a commit URI") + } + if _, err := entitygit.ParseChangeID(e.URI); err != nil { + return fmt.Errorf("change event URI: %w", err) + } + return nil +} + +// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes. +func ChangeEventFromBytes(data []byte) (ChangeEvent, error) { + var event ChangeEvent + if err := json.Unmarshal(data, &event); err != nil { + return ChangeEvent{}, err + } + if err := event.Validate(); err != nil { + return ChangeEvent{}, err + } + return event, nil +} diff --git a/stovepipe/entity/change_uri.go b/stovepipe/entity/change_uri.go new file mode 100644 index 00000000..7abc45f3 --- /dev/null +++ b/stovepipe/entity/change_uri.go @@ -0,0 +1,36 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import "encoding/json" + +// ChangeURI is the lightweight reference passed between pipeline stages. It +// carries only the change identity; stages re-resolve any state they need. +type ChangeURI struct { + // URI is the change identity (git://owner/repo/branch/revision). + URI string `json:"uri"` +} + +// ToBytes serializes the ChangeURI to JSON bytes for a queue message payload. +func (c ChangeURI) ToBytes() ([]byte, error) { + return json.Marshal(c) +} + +// ChangeURIFromBytes deserializes a ChangeURI from JSON bytes. +func ChangeURIFromBytes(data []byte) (ChangeURI, error) { + var ref ChangeURI + err := json.Unmarshal(data, &ref) + return ref, err +} diff --git a/stovepipe/entity/commit.go b/stovepipe/entity/commit.go index b5850f86..ef1c3b10 100644 --- a/stovepipe/entity/commit.go +++ b/stovepipe/entity/commit.go @@ -14,44 +14,50 @@ package entity -// CommitStatus is the validation state of a trunk commit as determined by Stovepipe. -type CommitStatus string +// CommitStatusKind is the validation state of a trunk commit as determined by Stovepipe. +type CommitStatusKind string const ( - // CommitStatusUnknown is the default state when a commit is first ingested. + // CommitStatusKindUnknown is the default state when a commit is first ingested. // The commit has landed on main but has not yet been validated. - CommitStatusUnknown CommitStatus = "" - // CommitStatusSucceeded means the relevant targets build and test successfully at this commit. - CommitStatusSucceeded CommitStatus = "succeeded" - // CommitStatusFailed means a target is broken at this commit; it is the offending change. - CommitStatusFailed CommitStatus = "failed" + CommitStatusKindUnknown CommitStatusKind = "" + // CommitStatusKindIngested means the commit has been received and recorded by the gateway. + CommitStatusKindIngested CommitStatusKind = "ingested" + // CommitStatusKindQueued means the commit is waiting to enter the validation pipeline. + CommitStatusKindQueued CommitStatusKind = "queued" + // CommitStatusKindProcessing means the commit is actively being validated. + CommitStatusKindProcessing CommitStatusKind = "processing" + // CommitStatusKindSucceeded means the relevant targets build and test successfully at this commit. + CommitStatusKindSucceeded CommitStatusKind = "succeeded" + // CommitStatusKindFailed means a target is broken at this commit; it is the offending change. + CommitStatusKindFailed CommitStatusKind = "failed" ) // IsCommitStatusTerminal returns true if the status is a final, irreversible state. -func IsCommitStatusTerminal(s CommitStatus) bool { - return s == CommitStatusSucceeded || s == CommitStatusFailed +func IsCommitStatusTerminal(s CommitStatusKind) bool { + return s == CommitStatusKindSucceeded || s == CommitStatusKindFailed } -// Commit is a trunk commit tracked by Stovepipe. The SHA scoped by Repository and -// Branch is the natural identity and dedup key: a commit announced by both a webhook -// and a poll backfill resolves to the same record and is processed once. +// Commit is a trunk commit tracked by Stovepipe's gateway. +// URI is the primary key — it is the canonical change identity from the originating ChangeEvent. type Commit struct { - // SHA is the full commit hash. Identity key; immutable after creation. - SHA string - // Repository is the repository URI (e.g. "github.com/uber/go-code"). - Repository string - // Branch is the target branch (e.g. "main"). - Branch string - // CommitterTimeMs is the committer timestamp in milliseconds since epoch. - // Used to order commits within a range and to establish the trunk sequence. - CommitterTimeMs int64 - // Status is the current validation state of this commit. - Status CommitStatus - // Version is incremented on each update and used for optimistic locking. - // Version arithmetic lives in the controller; the store performs a pure conditional write. - Version int32 + // URI is the canonical change identity from the originating ChangeEvent. + URI string + // SequenceNumber is the number of commits reachable from this commit on the trunk branch, + // derived from `git rev-list --count`. Higher values are newer. + // Must be populated at ingestion time — a zero value indicates the field was not set. + SequenceNumber int64 // CreatedAt is the time this commit was first recorded, in milliseconds since epoch. CreatedAt int64 - // UpdatedAt is the time this commit was last updated, in milliseconds since epoch. - UpdatedAt int64 +} + +// CommitStatus is a point-in-time validation status entry for a Commit. +// Multiple CommitStatus records form the status history of a single Commit. +type CommitStatus struct { + // CommitURI is the URI of the Commit this status belongs to. + CommitURI string + // Status is the validation state recorded at this point in time. + Status CommitStatusKind + // CreatedAt is the time this status was recorded, in milliseconds since epoch. + CreatedAt int64 } diff --git a/stovepipe/entity/entity.go b/stovepipe/entity/entity.go deleted file mode 100644 index a7136aa3..00000000 --- a/stovepipe/entity/entity.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package entity holds Stovepipe-specific domain entities. -package entity - -// ChangeInfo represents a new change detected on a VCS remote. -// It is intentionally VCS-agnostic: the URI scheme carries the -// provider identity, mirroring the github:// scheme used in -// SubmitQueue's ChangeInfo. -// -// URI format: "git://///" -// Example: "git://github.com/uber/go-code/refs/heads/main/c3a4d5e6f789..." -// -// Fields are immutable after construction. -type ChangeInfo struct { - // URI is the canonical VCS identifier for this change. - // Scheme is "git://"; path encodes host, repo, ref, and new revision. - URI string `json:"uri"` - - // PreviousURI is the URI of the prior revision on the same ref, if known. - // Empty string if unavailable. - // Example: "git://github.com/uber/go-code/refs/heads/main/aabbccdd..." - PreviousURI string `json:"previous_uri,omitempty"` - - // AuthorName is the display name of the person who authored the change. - AuthorName string `json:"author_name,omitempty"` - // AuthorEmail is the email address of the author. - AuthorEmail string `json:"author_email,omitempty"` -} diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go index 69e6dc2f..7375eca5 100644 --- a/stovepipe/extension/changeingester/logging.go +++ b/stovepipe/extension/changeingester/logging.go @@ -25,6 +25,8 @@ import ( // LoggingHandler is a stub ChangeHandler that logs received changes. // Replace with real persistence logic once DB schema is ready. +// Implementations must resolve entity.Commit.SequenceNumber (via `git rev-list --count`) +// before persisting — a zero value indicates the field was not populated. type LoggingHandler struct { logger *zap.Logger } @@ -35,12 +37,9 @@ func New(logger *zap.Logger) extension.ChangeHandler { return LoggingHandler{logger: logger} } -func (h LoggingHandler) IngestChange(ctx context.Context, info entity.ChangeInfo) error { +func (h LoggingHandler) IngestChange(ctx context.Context, event entity.ChangeEvent) error { h.logger.Info("ingested change", - zap.String("uri", info.URI), - zap.String("previous_uri", info.PreviousURI), - zap.String("author_name", info.AuthorName), - zap.String("author_email", info.AuthorEmail), + zap.String("uri", event.URI), ) return nil } diff --git a/stovepipe/extension/extension.go b/stovepipe/extension/extension.go index 5a891faf..f4de9644 100644 --- a/stovepipe/extension/extension.go +++ b/stovepipe/extension/extension.go @@ -30,5 +30,5 @@ type ChangeIngester interface { // ChangeHandler processes a single change received from the ingester. type ChangeHandler interface { - IngestChange(ctx context.Context, info entity.ChangeInfo) error + IngestChange(ctx context.Context, event entity.ChangeEvent) error } diff --git a/stovepipe/extension/storage/BUILD.bazel b/stovepipe/extension/storage/BUILD.bazel deleted file mode 100644 index d06b7577..00000000 --- a/stovepipe/extension/storage/BUILD.bazel +++ /dev/null @@ -1,13 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "storage", - srcs = [ - "batch_store.go", - "commit_store.go", - "storage.go", - ], - importpath = "github.com/uber/submitqueue/stovepipe/extension/storage", - visibility = ["//visibility:public"], - deps = ["//stovepipe/entity"], -) diff --git a/stovepipe/extension/storage/batch_store.go b/stovepipe/extension/storage/batch_store.go deleted file mode 100644 index 316d7a59..00000000 --- a/stovepipe/extension/storage/batch_store.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -//go:generate mockgen -source=batch_store.go -destination=mock/batch_store_mock.go -package=mock - -import ( - "context" - - "github.com/uber/submitqueue/stovepipe/entity" -) - -// BatchStore is the orchestrator-owned store for in-flight validation batches. -// A batch represents a contiguous range of trunk commits under validation. -// Bisection reuses the same type — sub-range probes are ordinary batches -// driven through the same speculate→build→buildsignal→bisect loop. -type BatchStore interface { - // Get retrieves a batch by ID. Returns ErrNotFound if no record exists. - Get(ctx context.Context, id string) (entity.Batch, error) - - // Create records a new batch with status BatchStatusPending. - // Returns ErrAlreadyExists if a batch with the same ID already exists. - Create(ctx context.Context, batch entity.Batch) error - - // UpdateStatus updates the batch's status and advances the version from - // oldVersion to newVersion. Returns ErrVersionMismatch if the current - // persisted version does not match oldVersion; the caller must re-read and retry. - // Version arithmetic is owned by the caller (controller); the store performs - // a pure conditional write. - UpdateStatus(ctx context.Context, id string, oldVersion, newVersion int32, status entity.BatchStatus) error -} diff --git a/stovepipe/extension/storage/commit_store.go b/stovepipe/extension/storage/commit_store.go deleted file mode 100644 index 579b9c6a..00000000 --- a/stovepipe/extension/storage/commit_store.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -//go:generate mockgen -source=commit_store.go -destination=mock/commit_store_mock.go -package=mock - -import ( - "context" - - "github.com/uber/submitqueue/stovepipe/entity" -) - -// CommitStore is the gateway-owned store for trunk commit health state. -// It is the authoritative record of each commit's validation status and is the -// only storage the gateway's GetStatus RPC reads from. -// -// The (Repository, Branch, SHA) triple is the identity key and dedup handle: -// a commit announced by both a webhook and a poll backfill resolves to the same -// row and is processed once. -type CommitStore interface { - // Get retrieves a commit by its (repository, branch, sha) identity key. - // Returns ErrNotFound if no record exists. - Get(ctx context.Context, repository, branch, sha string) (entity.Commit, error) - - // Create records a new commit. The status must be CommitStatusUnknown. - // Returns ErrAlreadyExists if a commit with the same identity already exists; - // callers treat this as a successful dedup, not a failure. - Create(ctx context.Context, commit entity.Commit) error - - // UpdateStatus updates the commit's status and advances the version from - // oldVersion to newVersion. Returns ErrVersionMismatch if the current - // persisted version does not match oldVersion; the caller must re-read and retry. - // Version arithmetic is owned by the caller (controller); the store performs - // a pure conditional write. - UpdateStatus(ctx context.Context, repository, branch, sha string, oldVersion, newVersion int32, status entity.CommitStatus) error -} diff --git a/stovepipe/extension/storage/storage.go b/stovepipe/extension/storage/storage.go deleted file mode 100644 index be4b980a..00000000 --- a/stovepipe/extension/storage/storage.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package storage defines the storage extension interfaces for the Stovepipe domain. -// The gateway owns the CommitStore (commit-status store and event log). -// The orchestrator owns the BatchStore (in-flight batches and pipeline working state). -// The two services share no storage; they communicate only through the messaging queue. -package storage - -import ( - "errors" - "fmt" -) - -// ErrNotFound is returned by storage implementations when the requested record is not found. -var ErrNotFound = errors.New("record not found") - -// IsNotFound returns true if any error in the chain is ErrNotFound. -func IsNotFound(err error) bool { - return errors.Is(err, ErrNotFound) -} - -// WrapNotFound wraps ErrNotFound with the underlying implementation error. -func WrapNotFound(err error) error { - return fmt.Errorf("%w: %w", ErrNotFound, err) -} - -// ErrAlreadyExists is returned when attempting to create a record whose identity already exists. -var ErrAlreadyExists = errors.New("record already exists") - -// ErrVersionMismatch is returned when an optimistic-locking CAS write finds the persisted -// version does not match the expected version. Callers should retry from a fresh read. -var ErrVersionMismatch = errors.New("version mismatch") From e7fd8f3f7426d27fcdf480d6f6ee173d6038e3a7 Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Thu, 18 Jun 2026 17:02:47 -0700 Subject: [PATCH 5/5] feat(stovepipe): add IngestController and commit status domain entities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the Ingest RPC controller wired to the proto contract defined in api/stovepipe/gateway/proto/gateway.proto. The controller validates the queue and change URIs, generates a SPID via the counter extension, publishes the IngestRequest to TopicKeyStart, and returns the SPID for lifecycle tracking. Also introduces the commit status domain: CommitStatusKind enum (ingested → queued → processing → succeeded/failed), IsCommitStatusTerminal, and CommitStatus as an append-only history record keyed by SPID. - Remove ChangeEvent/ChangeURI (superseded by IngestRequest) - Remove changeingester stub (superseded by IngestController) - Update filter.ShouldProcess to accept a URI string directly - Update entity BUILD.bazel to reflect current file set Co-Authored-By: Claude Sonnet 4.6 (1M context) --- stovepipe/core/filter/BUILD.bazel | 1 - stovepipe/core/filter/filter.go | 17 +- stovepipe/entity/BUILD.bazel | 19 +- stovepipe/entity/change_event.go | 59 ----- stovepipe/entity/change_uri.go | 36 --- stovepipe/entity/commit.go | 63 ----- stovepipe/extension/BUILD.bazel | 1 - .../extension/changeingester/BUILD.bazel | 13 -- stovepipe/extension/changeingester/logging.go | 45 ---- stovepipe/extension/extension.go | 18 -- stovepipe/gateway/controller/BUILD.bazel | 25 +- stovepipe/gateway/controller/ingest.go | 138 +++++++++++ stovepipe/gateway/controller/ingest_test.go | 216 ++++++++++++++++++ 13 files changed, 396 insertions(+), 255 deletions(-) delete mode 100644 stovepipe/entity/change_event.go delete mode 100644 stovepipe/entity/change_uri.go delete mode 100644 stovepipe/entity/commit.go delete mode 100644 stovepipe/extension/changeingester/BUILD.bazel delete mode 100644 stovepipe/extension/changeingester/logging.go create mode 100644 stovepipe/gateway/controller/ingest.go create mode 100644 stovepipe/gateway/controller/ingest_test.go diff --git a/stovepipe/core/filter/BUILD.bazel b/stovepipe/core/filter/BUILD.bazel index 454754b5..923cee0a 100644 --- a/stovepipe/core/filter/BUILD.bazel +++ b/stovepipe/core/filter/BUILD.bazel @@ -5,5 +5,4 @@ go_library( srcs = ["filter.go"], importpath = "github.com/uber/submitqueue/stovepipe/core/filter", visibility = ["//visibility:public"], - deps = ["//stovepipe/entity"], ) diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go index 4130bb5a..5dd86eeb 100644 --- a/stovepipe/core/filter/filter.go +++ b/stovepipe/core/filter/filter.go @@ -15,19 +15,14 @@ // Package filter implements a filter for commit events. package filter -import ( - "strings" +import "strings" - "github.com/uber/submitqueue/stovepipe/entity" -) - -// Config controls which VCS URIs are watched. -// WatchedURIPrefixes is a list of URI prefixes to match against ChangeEvent.URI. -// Example: "git://github.com/uber/go-code/refs/heads/main" -// watches all commits on the main branch of uber/go-code. -func ShouldProcess(event entity.ChangeEvent, watchedPrefixes []string) bool { +// ShouldProcess reports whether a commit URI should be processed by the pipeline. +// watchedPrefixes is a list of URI prefixes to match against the commit URI. +// Example prefix: "git://github.com/uber/go-code/refs/heads/main" +func ShouldProcess(uri string, watchedPrefixes []string) bool { for _, prefix := range watchedPrefixes { - if strings.HasPrefix(event.URI, prefix) { + if strings.HasPrefix(uri, prefix) { return true } } diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index 17ed12c7..8f48ac81 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -2,12 +2,19 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "entity", - srcs = [ - "change_event.go", - "change_uri.go", - "commit.go", - ], + srcs = ["ingest_request.go"], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], - deps = ["//platform/base/change/git"], + deps = ["//platform/base/change"], +) + +go_test( + name = "entity_test", + srcs = ["ingest_request_test.go"], + embed = [":entity"], + deps = [ + "//platform/base/change", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], ) diff --git a/stovepipe/entity/change_event.go b/stovepipe/entity/change_event.go deleted file mode 100644 index d410aa20..00000000 --- a/stovepipe/entity/change_event.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -import ( - "encoding/json" - "fmt" - - entitygit "github.com/uber/submitqueue/platform/base/change/git" -) - -// ChangeEvent represents a single new trunk change entering the pipeline, published to the -// start topic. A trunk change is one commit, so the event carries one git-backed URI. It is -// source-agnostic: both the webhook and the reconciliation poller emit it. Additional fields -// (e.g. source, committer time) can be added later as ingestion needs them. -type ChangeEvent struct { - // URI identifies the commit that entered the pipeline (git://owner/repo/branch/revision). - URI string `json:"uri"` -} - -// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload. -func (e ChangeEvent) ToBytes() ([]byte, error) { - return json.Marshal(e) -} - -// Validate checks that the change event carries a valid git-backed commit URI. -func (e ChangeEvent) Validate() error { - if e.URI == "" { - return fmt.Errorf("change event requires a commit URI") - } - if _, err := entitygit.ParseChangeID(e.URI); err != nil { - return fmt.Errorf("change event URI: %w", err) - } - return nil -} - -// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes. -func ChangeEventFromBytes(data []byte) (ChangeEvent, error) { - var event ChangeEvent - if err := json.Unmarshal(data, &event); err != nil { - return ChangeEvent{}, err - } - if err := event.Validate(); err != nil { - return ChangeEvent{}, err - } - return event, nil -} diff --git a/stovepipe/entity/change_uri.go b/stovepipe/entity/change_uri.go deleted file mode 100644 index 7abc45f3..00000000 --- a/stovepipe/entity/change_uri.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -import "encoding/json" - -// ChangeURI is the lightweight reference passed between pipeline stages. It -// carries only the change identity; stages re-resolve any state they need. -type ChangeURI struct { - // URI is the change identity (git://owner/repo/branch/revision). - URI string `json:"uri"` -} - -// ToBytes serializes the ChangeURI to JSON bytes for a queue message payload. -func (c ChangeURI) ToBytes() ([]byte, error) { - return json.Marshal(c) -} - -// ChangeURIFromBytes deserializes a ChangeURI from JSON bytes. -func ChangeURIFromBytes(data []byte) (ChangeURI, error) { - var ref ChangeURI - err := json.Unmarshal(data, &ref) - return ref, err -} diff --git a/stovepipe/entity/commit.go b/stovepipe/entity/commit.go deleted file mode 100644 index ef1c3b10..00000000 --- a/stovepipe/entity/commit.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -// CommitStatusKind is the validation state of a trunk commit as determined by Stovepipe. -type CommitStatusKind string - -const ( - // CommitStatusKindUnknown is the default state when a commit is first ingested. - // The commit has landed on main but has not yet been validated. - CommitStatusKindUnknown CommitStatusKind = "" - // CommitStatusKindIngested means the commit has been received and recorded by the gateway. - CommitStatusKindIngested CommitStatusKind = "ingested" - // CommitStatusKindQueued means the commit is waiting to enter the validation pipeline. - CommitStatusKindQueued CommitStatusKind = "queued" - // CommitStatusKindProcessing means the commit is actively being validated. - CommitStatusKindProcessing CommitStatusKind = "processing" - // CommitStatusKindSucceeded means the relevant targets build and test successfully at this commit. - CommitStatusKindSucceeded CommitStatusKind = "succeeded" - // CommitStatusKindFailed means a target is broken at this commit; it is the offending change. - CommitStatusKindFailed CommitStatusKind = "failed" -) - -// IsCommitStatusTerminal returns true if the status is a final, irreversible state. -func IsCommitStatusTerminal(s CommitStatusKind) bool { - return s == CommitStatusKindSucceeded || s == CommitStatusKindFailed -} - -// Commit is a trunk commit tracked by Stovepipe's gateway. -// URI is the primary key — it is the canonical change identity from the originating ChangeEvent. -type Commit struct { - // URI is the canonical change identity from the originating ChangeEvent. - URI string - // SequenceNumber is the number of commits reachable from this commit on the trunk branch, - // derived from `git rev-list --count`. Higher values are newer. - // Must be populated at ingestion time — a zero value indicates the field was not set. - SequenceNumber int64 - // CreatedAt is the time this commit was first recorded, in milliseconds since epoch. - CreatedAt int64 -} - -// CommitStatus is a point-in-time validation status entry for a Commit. -// Multiple CommitStatus records form the status history of a single Commit. -type CommitStatus struct { - // CommitURI is the URI of the Commit this status belongs to. - CommitURI string - // Status is the validation state recorded at this point in time. - Status CommitStatusKind - // CreatedAt is the time this status was recorded, in milliseconds since epoch. - CreatedAt int64 -} diff --git a/stovepipe/extension/BUILD.bazel b/stovepipe/extension/BUILD.bazel index ac032b39..bbf4f83b 100644 --- a/stovepipe/extension/BUILD.bazel +++ b/stovepipe/extension/BUILD.bazel @@ -5,5 +5,4 @@ go_library( srcs = ["extension.go"], importpath = "github.com/uber/submitqueue/stovepipe/extension", visibility = ["//visibility:public"], - deps = ["//stovepipe/entity"], ) diff --git a/stovepipe/extension/changeingester/BUILD.bazel b/stovepipe/extension/changeingester/BUILD.bazel deleted file mode 100644 index 4575d4b3..00000000 --- a/stovepipe/extension/changeingester/BUILD.bazel +++ /dev/null @@ -1,13 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "changeingester", - srcs = ["logging.go"], - importpath = "github.com/uber/submitqueue/stovepipe/extension/changeingester", - visibility = ["//visibility:public"], - deps = [ - "//stovepipe/entity", - "//stovepipe/extension", - "@org_uber_go_zap//:zap", - ], -) diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go deleted file mode 100644 index 7375eca5..00000000 --- a/stovepipe/extension/changeingester/logging.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package changeingester provides ChangeIngester implementations. -package changeingester - -import ( - "context" - - "github.com/uber/submitqueue/stovepipe/entity" - "github.com/uber/submitqueue/stovepipe/extension" - "go.uber.org/zap" -) - -// LoggingHandler is a stub ChangeHandler that logs received changes. -// Replace with real persistence logic once DB schema is ready. -// Implementations must resolve entity.Commit.SequenceNumber (via `git rev-list --count`) -// before persisting — a zero value indicates the field was not populated. -type LoggingHandler struct { - logger *zap.Logger -} - -// New constructs a new LoggingHandler. -// The return type enforces interface compliance at compile time. -func New(logger *zap.Logger) extension.ChangeHandler { - return LoggingHandler{logger: logger} -} - -func (h LoggingHandler) IngestChange(ctx context.Context, event entity.ChangeEvent) error { - h.logger.Info("ingested change", - zap.String("uri", event.URI), - ) - return nil -} diff --git a/stovepipe/extension/extension.go b/stovepipe/extension/extension.go index f4de9644..92d4bcea 100644 --- a/stovepipe/extension/extension.go +++ b/stovepipe/extension/extension.go @@ -14,21 +14,3 @@ // Package extension holds Stovepipe-specific extension implementations. package extension - -import ( - "context" - - "github.com/uber/submitqueue/stovepipe/entity" -) - -// ChangeIngester subscribes to change events from a VCS source -// and dispatches them for processing. The source and VCS are -// implementation details left to the injected backend. -type ChangeIngester interface { - Start(ctx context.Context) error -} - -// ChangeHandler processes a single change received from the ingester. -type ChangeHandler interface { - IngestChange(ctx context.Context, event entity.ChangeEvent) error -} diff --git a/stovepipe/gateway/controller/BUILD.bazel b/stovepipe/gateway/controller/BUILD.bazel index 94c6639f..56a6a494 100644 --- a/stovepipe/gateway/controller/BUILD.bazel +++ b/stovepipe/gateway/controller/BUILD.bazel @@ -2,12 +2,22 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "controller", - srcs = ["ping.go"], + srcs = [ + "ingest.go", + "ping.go", + ], importpath = "github.com/uber/submitqueue/stovepipe/gateway/controller", visibility = ["//visibility:public"], deps = [ "//api/stovepipe/gateway/protopb", + "//platform/base/change", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/errs", + "//platform/extension/counter", "//platform/metrics", + "//stovepipe/core/topickey", + "//stovepipe/entity", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -15,13 +25,24 @@ go_library( go_test( name = "controller_test", - srcs = ["ping_test.go"], + srcs = [ + "ingest_test.go", + "ping_test.go", + ], embed = [":controller"], deps = [ + "//api/base/change/protopb", "//api/stovepipe/gateway/protopb", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/counter/mock", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/topickey", + "//stovepipe/entity", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", "@org_uber_go_zap//:zap", ], ) diff --git a/stovepipe/gateway/controller/ingest.go b/stovepipe/gateway/controller/ingest.go new file mode 100644 index 00000000..5154d5ae --- /dev/null +++ b/stovepipe/gateway/controller/ingest.go @@ -0,0 +1,138 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + "github.com/uber/submitqueue/platform/extension/counter" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/core/topickey" + "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/zap" +) + +// ErrInvalidRequest is returned when the request fails validation. +// This error should be mapped to codes.InvalidArgument at the gRPC layer. +var ErrInvalidRequest = errs.NewUserError(errors.New("invalid request")) + +// IsInvalidRequest returns true if any error in the error chain is ErrInvalidRequest. +func IsInvalidRequest(err error) bool { + return errors.Is(err, ErrInvalidRequest) +} + +// IngestController handles ingest business logic for the stovepipe gateway. +type IngestController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + registry consumer.TopicRegistry +} + +// NewIngestController creates a new instance of the stovepipe ingest controller. +// The controller publishes ingest requests to the topic registered under +// topickey.TopicKeyStart in the registry. +func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, registry consumer.TopicRegistry) *IngestController { + return &IngestController{ + logger: logger, + metricsScope: scope.SubScope("ingest_controller"), + counter: counter, + registry: registry, + } +} + +// Ingest validates the request, generates a SPID, publishes the ingest request +// to the pipeline queue, and returns the SPID for tracking. +func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) { + const opName = "ingest" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + if req.Queue == "" { + return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest) + } + if req.Change == nil || len(req.Change.Uris) == 0 { + return nil, fmt.Errorf("IngestController requires the request to have at least one change URI specified: %w", ErrInvalidRequest) + } + + queue := req.Queue + + seq, err := c.counter.Next(ctx, "ingest/"+queue) + if err != nil { + return nil, fmt.Errorf("IngestController failed to generate SPID for queue=%s: %w", queue, err) + } + + ingestRequest := entity.IngestRequest{ + ID: fmt.Sprintf("%s/%d", queue, seq), + Queue: queue, + Change: change.Change{URIs: req.Change.GetUris()}, + } + + c.logger.Debugw("ingest request created", + "queue", queue, + "spid", ingestRequest.ID, + "change_uris", ingestRequest.Change.URIs, + ) + + if err := c.publishToQueue(ctx, ingestRequest); err != nil { + return nil, fmt.Errorf("IngestController failed to publish request to queue: %w", err) + } + + c.logger.Infow("ingest request published to queue", + "queue", queue, + "spid", ingestRequest.ID, + "topic_key", topickey.TopicKeyStart, + ) + metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1) + + return &pb.IngestResponse{ + Spid: ingestRequest.ID, + }, nil +} + +// publishToQueue serializes the ingest request and publishes it to the start topic. +func (c *IngestController) publishToQueue(ctx context.Context, ingestRequest entity.IngestRequest) error { + payload, err := ingestRequest.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize ingest request: %w", err) + } + + msg := entityqueue.NewMessage(ingestRequest.ID, payload, ingestRequest.Queue, nil) + + q, ok := c.registry.Queue(topickey.TopicKeyStart) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyStart) + } + + topicName, ok := c.registry.TopicName(topickey.TopicKeyStart) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyStart) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish ingest request message: %w", err) + } + + return nil +} diff --git a/stovepipe/gateway/controller/ingest_test.go b/stovepipe/gateway/controller/ingest_test.go new file mode 100644 index 00000000..19d2c644 --- /dev/null +++ b/stovepipe/gateway/controller/ingest_test.go @@ -0,0 +1,216 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + changepb "github.com/uber/submitqueue/api/base/change/protopb" + pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + countermock "github.com/uber/submitqueue/platform/extension/counter/mock" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/stovepipe/core/topickey" + "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +const ( + testGitURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01" + testQueueName = "stovepipe-monorepo" +) + +// newIngestTestRegistry builds a TopicRegistry for TopicKeyStart wired to a mock +// Queue/Publisher and returns both so callers can set EXPECT() on the publisher. +func newIngestTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, *queuemock.MockPublisher) { + t.Helper() + pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyStart, Name: "start", Queue: q}, + }) + require.NoError(t, err) + return registry, pub +} + +// newIngestTestRegistryWithNoopPublisher returns a registry whose publisher +// silently accepts any Publish call. +func newIngestTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) consumer.TopicRegistry { + t.Helper() + registry, pub := newIngestTestRegistry(t, ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return registry +} + +func TestNewIngestController(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + require.NotNil(t, c) +} + +func TestIngest_ReturnsSPID(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/1", resp.Spid) +} + +func TestIngest_CounterDomainIncludesQueue(t *testing.T) { + var capturedDomain string + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, domain string) (int64, error) { + capturedDomain = domain + return 1, nil + }, + ) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "ingest/"+testQueueName, capturedDomain) +} + +func TestIngest_ReturnsErrorOnCounterFailure(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) +} + +func TestIngest_ReturnsErrorOnEmptyQueue(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: "", + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngest_ReturnsErrorOnNilChange(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: nil, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngest_ReturnsErrorOnEmptyChangeURIs(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{}}, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngest_PublishesToQueue(t *testing.T) { + var publishedTopic string + var publishedMessage entityqueue.Message + + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(42), nil) + + registry, publisher := newIngestTestRegistry(t, ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg entityqueue.Message) error { + publishedTopic = topic + publishedMessage = msg + return nil + }, + ) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, registry) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/42", resp.Spid) + assert.Equal(t, "start", publishedTopic) + assert.Equal(t, "stovepipe-monorepo/42", publishedMessage.ID) + assert.Equal(t, testQueueName, publishedMessage.PartitionKey) + + deserialized, err := entity.IngestRequestFromBytes(publishedMessage.Payload) + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/42", deserialized.ID) + assert.Equal(t, testQueueName, deserialized.Queue) + assert.Equal(t, []string{testGitURI}, deserialized.Change.URIs) +} + +func TestIngest_ReturnsErrorOnPublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) + + registry, publisher := newIngestTestRegistry(t, ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, registry) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) +}