diff --git a/stovepipe/core/filter/BUILD.bazel b/stovepipe/core/filter/BUILD.bazel new file mode 100644 index 00000000..923cee0a --- /dev/null +++ b/stovepipe/core/filter/BUILD.bazel @@ -0,0 +1,8 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "filter", + srcs = ["filter.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/filter", + visibility = ["//visibility:public"], +) diff --git a/stovepipe/entity/entity.go b/stovepipe/core/filter/filter.go similarity index 54% rename from stovepipe/entity/entity.go rename to stovepipe/core/filter/filter.go index 15ac8680..5dd86eeb 100644 --- a/stovepipe/entity/entity.go +++ b/stovepipe/core/filter/filter.go @@ -12,5 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package entity holds Stovepipe-specific domain types (distinct from shared repo entity/). -package entity +// Package filter implements a filter for commit events. +package filter + +import "strings" + +// ShouldProcess reports whether a commit URI should be processed by the pipeline. +// watchedPrefixes is a list of URI prefixes to match against the commit URI. +// Example prefix: "git://github.com/uber/go-code/refs/heads/main" +func ShouldProcess(uri string, watchedPrefixes []string) bool { + for _, prefix := range watchedPrefixes { + if strings.HasPrefix(uri, prefix) { + return true + } + } + return false +} diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index a0c70059..8f48ac81 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -2,10 +2,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "entity", - srcs = [ - "entity.go", - "ingest_request.go", - ], + srcs = ["ingest_request.go"], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], deps = ["//platform/base/change"], diff --git a/stovepipe/gateway/controller/BUILD.bazel b/stovepipe/gateway/controller/BUILD.bazel index 94c6639f..56a6a494 100644 --- a/stovepipe/gateway/controller/BUILD.bazel +++ b/stovepipe/gateway/controller/BUILD.bazel @@ -2,12 +2,22 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "controller", - srcs = ["ping.go"], + srcs = [ + "ingest.go", + "ping.go", + ], importpath = "github.com/uber/submitqueue/stovepipe/gateway/controller", visibility = ["//visibility:public"], deps = [ "//api/stovepipe/gateway/protopb", + "//platform/base/change", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/errs", + "//platform/extension/counter", "//platform/metrics", + "//stovepipe/core/topickey", + "//stovepipe/entity", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -15,13 +25,24 @@ go_library( go_test( name = "controller_test", - srcs = ["ping_test.go"], + srcs = [ + "ingest_test.go", + "ping_test.go", + ], embed = [":controller"], deps = [ + "//api/base/change/protopb", "//api/stovepipe/gateway/protopb", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/counter/mock", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/topickey", + "//stovepipe/entity", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", "@org_uber_go_zap//:zap", ], ) diff --git a/stovepipe/gateway/controller/ingest.go b/stovepipe/gateway/controller/ingest.go new file mode 100644 index 00000000..5154d5ae --- /dev/null +++ b/stovepipe/gateway/controller/ingest.go @@ -0,0 +1,138 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + "github.com/uber/submitqueue/platform/extension/counter" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/core/topickey" + "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/zap" +) + +// ErrInvalidRequest is returned when the request fails validation. +// This error should be mapped to codes.InvalidArgument at the gRPC layer. +var ErrInvalidRequest = errs.NewUserError(errors.New("invalid request")) + +// IsInvalidRequest returns true if any error in the error chain is ErrInvalidRequest. +func IsInvalidRequest(err error) bool { + return errors.Is(err, ErrInvalidRequest) +} + +// IngestController handles ingest business logic for the stovepipe gateway. +type IngestController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + registry consumer.TopicRegistry +} + +// NewIngestController creates a new instance of the stovepipe ingest controller. +// The controller publishes ingest requests to the topic registered under +// topickey.TopicKeyStart in the registry. +func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, registry consumer.TopicRegistry) *IngestController { + return &IngestController{ + logger: logger, + metricsScope: scope.SubScope("ingest_controller"), + counter: counter, + registry: registry, + } +} + +// Ingest validates the request, generates a SPID, publishes the ingest request +// to the pipeline queue, and returns the SPID for tracking. +func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) { + const opName = "ingest" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + if req.Queue == "" { + return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest) + } + if req.Change == nil || len(req.Change.Uris) == 0 { + return nil, fmt.Errorf("IngestController requires the request to have at least one change URI specified: %w", ErrInvalidRequest) + } + + queue := req.Queue + + seq, err := c.counter.Next(ctx, "ingest/"+queue) + if err != nil { + return nil, fmt.Errorf("IngestController failed to generate SPID for queue=%s: %w", queue, err) + } + + ingestRequest := entity.IngestRequest{ + ID: fmt.Sprintf("%s/%d", queue, seq), + Queue: queue, + Change: change.Change{URIs: req.Change.GetUris()}, + } + + c.logger.Debugw("ingest request created", + "queue", queue, + "spid", ingestRequest.ID, + "change_uris", ingestRequest.Change.URIs, + ) + + if err := c.publishToQueue(ctx, ingestRequest); err != nil { + return nil, fmt.Errorf("IngestController failed to publish request to queue: %w", err) + } + + c.logger.Infow("ingest request published to queue", + "queue", queue, + "spid", ingestRequest.ID, + "topic_key", topickey.TopicKeyStart, + ) + metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1) + + return &pb.IngestResponse{ + Spid: ingestRequest.ID, + }, nil +} + +// publishToQueue serializes the ingest request and publishes it to the start topic. +func (c *IngestController) publishToQueue(ctx context.Context, ingestRequest entity.IngestRequest) error { + payload, err := ingestRequest.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize ingest request: %w", err) + } + + msg := entityqueue.NewMessage(ingestRequest.ID, payload, ingestRequest.Queue, nil) + + q, ok := c.registry.Queue(topickey.TopicKeyStart) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyStart) + } + + topicName, ok := c.registry.TopicName(topickey.TopicKeyStart) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyStart) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish ingest request message: %w", err) + } + + return nil +} diff --git a/stovepipe/gateway/controller/ingest_test.go b/stovepipe/gateway/controller/ingest_test.go new file mode 100644 index 00000000..19d2c644 --- /dev/null +++ b/stovepipe/gateway/controller/ingest_test.go @@ -0,0 +1,216 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + changepb "github.com/uber/submitqueue/api/base/change/protopb" + pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + countermock "github.com/uber/submitqueue/platform/extension/counter/mock" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/stovepipe/core/topickey" + "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +const ( + testGitURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01" + testQueueName = "stovepipe-monorepo" +) + +// newIngestTestRegistry builds a TopicRegistry for TopicKeyStart wired to a mock +// Queue/Publisher and returns both so callers can set EXPECT() on the publisher. +func newIngestTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, *queuemock.MockPublisher) { + t.Helper() + pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyStart, Name: "start", Queue: q}, + }) + require.NoError(t, err) + return registry, pub +} + +// newIngestTestRegistryWithNoopPublisher returns a registry whose publisher +// silently accepts any Publish call. +func newIngestTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) consumer.TopicRegistry { + t.Helper() + registry, pub := newIngestTestRegistry(t, ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return registry +} + +func TestNewIngestController(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + require.NotNil(t, c) +} + +func TestIngest_ReturnsSPID(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/1", resp.Spid) +} + +func TestIngest_CounterDomainIncludesQueue(t *testing.T) { + var capturedDomain string + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, domain string) (int64, error) { + capturedDomain = domain + return 1, nil + }, + ) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "ingest/"+testQueueName, capturedDomain) +} + +func TestIngest_ReturnsErrorOnCounterFailure(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) +} + +func TestIngest_ReturnsErrorOnEmptyQueue(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: "", + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngest_ReturnsErrorOnNilChange(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: nil, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngest_ReturnsErrorOnEmptyChangeURIs(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, newIngestTestRegistryWithNoopPublisher(t, ctrl)) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{}}, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngest_PublishesToQueue(t *testing.T) { + var publishedTopic string + var publishedMessage entityqueue.Message + + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(42), nil) + + registry, publisher := newIngestTestRegistry(t, ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg entityqueue.Message) error { + publishedTopic = topic + publishedMessage = msg + return nil + }, + ) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, registry) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/42", resp.Spid) + assert.Equal(t, "start", publishedTopic) + assert.Equal(t, "stovepipe-monorepo/42", publishedMessage.ID) + assert.Equal(t, testQueueName, publishedMessage.PartitionKey) + + deserialized, err := entity.IngestRequestFromBytes(publishedMessage.Payload) + require.NoError(t, err) + assert.Equal(t, "stovepipe-monorepo/42", deserialized.ID) + assert.Equal(t, testQueueName, deserialized.Queue) + assert.Equal(t, []string{testGitURI}, deserialized.Change.URIs) +} + +func TestIngest_ReturnsErrorOnPublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + cnt := countermock.NewMockCounter(ctrl) + cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) + + registry, publisher := newIngestTestRegistry(t, ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) + + c := NewIngestController(zap.NewNop().Sugar(), tally.NoopScope, cnt, registry) + _, err := c.Ingest(context.Background(), &pb.IngestRequest{ + Queue: testQueueName, + Change: &changepb.Change{Uris: []string{testGitURI}}, + }) + + require.Error(t, err) +}