diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index 578bf3b624..612e74756e 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -734,6 +734,7 @@ func (a *PipedAPI) ListEvents(ctx context.Context, req *pipedservice.ListEventsR return nil, err } + // Build options based on the request. opts := datastore.ListOptions{ Filters: []datastore.ListFilter{ { @@ -741,17 +742,37 @@ func (a *PipedAPI) ListEvents(ctx context.Context, req *pipedservice.ListEventsR Operator: "==", Value: projectID, }, + }, + } + if req.From > 0 { + opts.Filters = append(opts.Filters, datastore.ListFilter{ + Field: "CreatedAt", + Operator: ">=", + Value: req.From, + }) + } + if req.To > 0 { + opts.Filters = append(opts.Filters, datastore.ListFilter{ + Field: "CreatedAt", + Operator: "<", + Value: req.To, + }) + } + switch req.Order { + case pipedservice.ListOrder_ASC: + opts.Orders = []datastore.Order{ { - Field: "CreatedAt", - Operator: ">=", - Value: req.From, + Field: "CreatedAt", + Direction: datastore.Asc, }, + } + case pipedservice.ListOrder_DESC: + opts.Orders = []datastore.Order{ { - Field: "CreatedAt", - Operator: "<", - Value: req.To, + Field: "CreatedAt", + Direction: datastore.Desc, }, - }, + } } events, err := a.eventStore.ListEvents(ctx, opts) diff --git a/pkg/app/api/service/pipedservice/service.proto b/pkg/app/api/service/pipedservice/service.proto index aeb5c00ab2..a083997f5a 100644 --- a/pkg/app/api/service/pipedservice/service.proto +++ b/pkg/app/api/service/pipedservice/service.proto @@ -142,6 +142,12 @@ service PipedService { rpc ListEvents(ListEventsRequest) returns (ListEventsResponse) {} } +enum ListOrder { + NONE = 0; + ASC = 1; + DESC = 2; +} + message PingRequest { pipe.model.PipedStats piped_stats = 1 [(validate.rules).message.required = true]; } @@ -362,9 +368,11 @@ message GetLatestEventResponse { pipe.model.Event event = 1 [(validate.rules).message.required = true]; } + message ListEventsRequest { - int64 from = 1 [(validate.rules).int64.gt = 0]; - int64 to = 2 [(validate.rules).int64.gt = 0]; + int64 from = 1; + int64 to = 2; + ListOrder order = 3 [(validate.rules).enum.defined_only = true]; } message ListEventsResponse { diff --git a/pkg/app/piped/apistore/eventstore/BUILD.bazel b/pkg/app/piped/apistore/eventstore/BUILD.bazel new file mode 100644 index 0000000000..d9f96977ac --- /dev/null +++ b/pkg/app/piped/apistore/eventstore/BUILD.bazel @@ -0,0 +1,22 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["store.go"], + importpath = "github.com/pipe-cd/pipe/pkg/app/piped/apistore/eventstore", + visibility = ["//visibility:public"], + deps = [ + "//pkg/app/api/service/pipedservice:go_default_library", + "//pkg/model:go_default_library", + "@org_golang_google_grpc//:go_default_library", + "@org_uber_go_zap//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + size = "small", + srcs = ["store_test.go"], + embed = [":go_default_library"], + deps = ["@com_github_stretchr_testify//assert:go_default_library"], +) diff --git a/pkg/app/piped/apistore/eventstore/store.go b/pkg/app/piped/apistore/eventstore/store.go new file mode 100644 index 0000000000..c2df5aa1fd --- /dev/null +++ b/pkg/app/piped/apistore/eventstore/store.go @@ -0,0 +1,195 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "time" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" + "github.com/pipe-cd/pipe/pkg/model" +) + +// Getter helps get an event. All objects returned here must be treated as read-only. +type Getter interface { + // GetLatest returns the latest event that meets the given conditions. + GetLatest(ctx context.Context, name string, labels map[string]string) (*model.Event, bool) +} + +type Store interface { + // Run starts syncing the event list with the control-plane. + Run(ctx context.Context) error + // Getter returns a getter for retrieving an event. + Getter() Getter +} + +type apiClient interface { + GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest, opts ...grpc.CallOption) (*pipedservice.GetLatestEventResponse, error) + ListEvents(ctx context.Context, req *pipedservice.ListEventsRequest, opts ...grpc.CallOption) (*pipedservice.ListEventsResponse, error) +} + +type store struct { + apiClient apiClient + syncInterval time.Duration + gracePeriod time.Duration + logger *zap.Logger + + // Mark that it has handled all events that was created before this UNIX time. + milestone int64 + mu sync.RWMutex + // The key is supposed to be a string consists of name and labels. + // And the value is the address to the latest Event. + latestEvents map[string]*model.Event +} + +const ( + defaultSyncInterval = time.Minute +) + +// NewStore creates a new event store instance. +// This syncs with the control plane to keep the list of events for this runner up-to-date. +func NewStore(apiClient apiClient, gracePeriod time.Duration, logger *zap.Logger) Store { + return &store{ + apiClient: apiClient, + syncInterval: defaultSyncInterval, + gracePeriod: gracePeriod, + latestEvents: make(map[string]*model.Event), + logger: logger.Named("event-store"), + } +} + +// Run starts runner that periodically makes the Events in the cache up-to-date +// by fetching from the control-plane. +func (s *store) Run(ctx context.Context) error { + s.logger.Info("start running event store") + + syncTicker := time.NewTicker(s.syncInterval) + defer syncTicker.Stop() + + // Do first sync without waiting the first ticker. + s.milestone = time.Now().Add(-time.Hour).Unix() + s.sync(ctx) + + for { + select { + case <-syncTicker.C: + if err := s.sync(ctx); err != nil { + s.logger.Error("failed to sync events", zap.Error(err)) + } + + case <-ctx.Done(): + s.logger.Info("event store has been stopped") + return nil + } + } +} + +// sync fetches a list of events newly created after its own milestone, +// and updates the cache of latest events. +func (s *store) sync(ctx context.Context) error { + resp, err := s.apiClient.ListEvents(ctx, &pipedservice.ListEventsRequest{ + From: s.milestone, + Order: pipedservice.ListOrder_ASC, + }) + if err != nil { + return fmt.Errorf("failed to list events: %w", err) + } + if len(resp.Events) == 0 { + return nil + } + + // Eliminate events that have duplicated key. + filtered := make(map[string]*model.Event, len(resp.Events)) + for _, e := range resp.Events { + key := makeEventKey(e.Name, e.Labels) + filtered[key] = e + } + // Make the cache up-to-date. + s.mu.Lock() + for key, event := range filtered { + cached, ok := s.latestEvents[key] + if ok && cached.CreatedAt > event.CreatedAt { + continue + } + s.latestEvents[key] = event + } + s.mu.Unlock() + + // Set the latest one within the result as the next time's "from". + s.milestone = resp.Events[len(resp.Events)-1].CreatedAt + 1 + return nil +} + +func (s *store) Getter() Getter { + return s +} + +func (s *store) GetLatest(ctx context.Context, name string, labels map[string]string) (*model.Event, bool) { + key := makeEventKey(name, labels) + s.mu.RLock() + event, ok := s.latestEvents[key] + s.mu.RUnlock() + if ok { + return event, true + } + + // If not found in the cache, fetch from the control-plane. + resp, err := s.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{ + Name: name, + Labels: labels, + }) + if err != nil { + s.logger.Error("failed to get the latest event", zap.Error(err)) + return nil, false + } + + s.mu.Lock() + defer s.mu.Unlock() + cached, ok := s.latestEvents[key] + if ok && cached.CreatedAt > event.CreatedAt { + return resp.Event, true + } + s.latestEvents[key] = resp.Event + return resp.Event, true +} + +// makeEventKey builds a unique identifier based on the given name and labels. +// It returns the exact same string as long as both are the same. +func makeEventKey(name string, labels map[string]string) string { + if len(labels) == 0 { + return name + } + + var b strings.Builder + b.WriteString(name) + + // Guarantee uniqueness by sorting by keys. + keys := make([]string, 0, len(labels)) + for key := range labels { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + b.WriteString(fmt.Sprintf("/%s:%s", key, labels[key])) + } + return b.String() +} diff --git a/pkg/app/piped/apistore/eventstore/store_test.go b/pkg/app/piped/apistore/eventstore/store_test.go new file mode 100644 index 0000000000..486cc30dc4 --- /dev/null +++ b/pkg/app/piped/apistore/eventstore/store_test.go @@ -0,0 +1,63 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMakeEventKey(t *testing.T) { + testcases := []struct { + testname string + name string + labels map[string]string + want string + }{ + { + testname: "no name and labels given", + want: "", + }, + { + testname: "no labels given", + name: "name1", + want: "name1", + }, + { + testname: "no name given", + labels: map[string]string{ + "key1": "value1", + }, + want: "/key1:value1", + }, + { + testname: "labels given", + name: "name1", + labels: map[string]string{ + "key1": "value", + "key2": "value", + "key3": "value", + }, + want: "name1/key1:value/key2:value/key3:value", + }, + } + for _, tc := range testcases { + t.Run(tc.testname, func(t *testing.T) { + got := makeEventKey(tc.name, tc.labels) + assert.Equal(t, tc.want, got) + }) + } +}