diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index c60488ed60..80907e316d 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -43,6 +43,7 @@ type PipedAPI struct { pipedStatsStore datastore.PipedStatsStore pipedStore datastore.PipedStore projectStore datastore.ProjectStore + eventStore datastore.EventStore stageLogStore stagelogstore.Store applicationLiveStateStore applicationlivestatestore.Store commandStore commandstore.Store @@ -63,6 +64,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore. pipedStatsStore: datastore.NewPipedStatsStore(ds), pipedStore: datastore.NewPipedStore(ds), projectStore: datastore.NewProjectStore(ds), + eventStore: datastore.NewEventStore(ds), stageLogStore: sls, applicationLiveStateStore: alss, commandStore: cs, @@ -679,6 +681,87 @@ func (a *PipedAPI) ReportApplicationLiveStateEvents(ctx context.Context, req *pi return &pipedservice.ReportApplicationLiveStateEventsResponse{}, nil } +// GetLatestEvent returns the latest event that meets the given conditions. +func (a *PipedAPI) GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest) (*pipedservice.GetLatestEventResponse, error) { + projectID, _, _, err := rpcauth.ExtractPipedToken(ctx) + if err != nil { + return nil, err + } + + // Try to fetch the most recently registered event. + opts := datastore.ListOptions{ + PageSize: 1, + Filters: []datastore.ListFilter{ + { + Field: "ProjectId", + Operator: "==", + Value: projectID, + }, + { + Field: "Name", + Operator: "==", + Value: req.Name, + }, + // TODO: Enable to use labels to filter Events + }, + Orders: []datastore.Order{ + { + Field: "UpdatedAt", + Direction: datastore.Desc, + }, + }, + } + events, err := a.eventStore.ListEvents(ctx, opts) + if err != nil { + a.logger.Error("failed to list events", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to list event") + } + if len(events) == 0 { + return nil, status.Error(codes.NotFound, "no events found") + } + + return &pipedservice.GetLatestEventResponse{ + Event: events[0], + }, nil +} + +// ListEvents returns a list of Events inside the given range. +func (a *PipedAPI) ListEvents(ctx context.Context, req *pipedservice.ListEventsRequest) (*pipedservice.ListEventsResponse, error) { + projectID, _, _, err := rpcauth.ExtractPipedToken(ctx) + if err != nil { + return nil, err + } + + opts := datastore.ListOptions{ + Filters: []datastore.ListFilter{ + { + Field: "ProjectId", + Operator: "==", + Value: projectID, + }, + { + Field: "CreatedAt", + Operator: ">=", + Value: req.From, + }, + { + Field: "CreatedAt", + Operator: "<", + Value: req.To, + }, + }, + } + + events, err := a.eventStore.ListEvents(ctx, opts) + if err != nil { + a.logger.Error("failed to list events", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to list events") + } + return &pipedservice.ListEventsResponse{ + Events: events, + }, nil +} + // validateAppBelongsToPiped checks if the given application belongs to the given piped. // It gives back an error unless the application belongs to the piped. func (a *PipedAPI) validateAppBelongsToPiped(ctx context.Context, appID, pipedID string) error { diff --git a/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go b/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go index c7baa2287e..a3d5d2fa52 100644 --- a/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go +++ b/pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go @@ -463,4 +463,20 @@ func (c *fakeClient) ReportApplicationLiveStateEvents(ctx context.Context, req * return &pipedservice.ReportApplicationLiveStateEventsResponse{}, nil } +func (c *fakeClient) GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest, opts ...grpc.CallOption) (*pipedservice.GetLatestEventResponse, error) { + c.logger.Info("fake client received GetLatestEvent rpc", zap.Any("request", req)) + return &pipedservice.GetLatestEventResponse{ + Event: &model.Event{ + Id: "dev", + Name: "dev", + ProjectId: "dev", + }, + }, nil +} + +func (c *fakeClient) ListEvents(ctx context.Context, req *pipedservice.ListEventsRequest, opts ...grpc.CallOption) (*pipedservice.ListEventsResponse, error) { + c.logger.Info("fake client received ListEvents rpc", zap.Any("request", req)) + return &pipedservice.ListEventsResponse{}, nil +} + var _ pipedservice.PipedServiceClient = (*fakeClient)(nil) diff --git a/pkg/app/api/service/pipedservice/service.proto b/pkg/app/api/service/pipedservice/service.proto index 9e5d056de9..aeb5c00ab2 100644 --- a/pkg/app/api/service/pipedservice/service.proto +++ b/pkg/app/api/service/pipedservice/service.proto @@ -27,6 +27,7 @@ import "pkg/model/deployment.proto"; import "pkg/model/logblock.proto"; import "pkg/model/piped.proto"; import "pkg/model/piped_stats.proto"; +import "pkg/model/event.proto"; // PipedService contains all RPC definitions for piped. // All of these RPCs are only called by piped and authenticated by using PIPED_TOKEN. @@ -133,6 +134,12 @@ service PipedService { // and then another Handler service will pick them inorder to apply to build new state. // By that way we can control the traffic to the datastore in a better way. rpc ReportApplicationLiveStateEvents(ReportApplicationLiveStateEventsRequest) returns (ReportApplicationLiveStateEventsResponse) {} + + // GetLatestEvent returns the latest event that meets the given conditions. + rpc GetLatestEvent(GetLatestEventRequest) returns (GetLatestEventResponse) {} + + // ListEvents returns a list of Events inside the given range. + rpc ListEvents(ListEventsRequest) returns (ListEventsResponse) {} } message PingRequest { @@ -345,3 +352,21 @@ message ReportApplicationLiveStateEventsRequest { message ReportApplicationLiveStateEventsResponse { repeated string failed_ids = 1; } + +message GetLatestEventRequest { + string name = 1 [(validate.rules).string.min_len = 1]; + map labels = 2; +} + +message GetLatestEventResponse { + pipe.model.Event event = 1 [(validate.rules).message.required = true]; +} + +message ListEventsRequest { + int64 from = 1 [(validate.rules).int64.gt = 0]; + int64 to = 2 [(validate.rules).int64.gt = 0]; +} + +message ListEventsResponse { + repeated pipe.model.Event events = 1; +} diff --git a/pkg/datastore/BUILD.bazel b/pkg/datastore/BUILD.bazel index b01f65b054..2892905d77 100644 --- a/pkg/datastore/BUILD.bazel +++ b/pkg/datastore/BUILD.bazel @@ -9,7 +9,7 @@ go_library( "datastore.go", "deploymentstore.go", "environmentstore.go", - "event.go", + "eventstore.go", "mock.go", "pipedstatsstore.go", "pipedstore.go", @@ -32,7 +32,7 @@ go_test( "commandstore_test.go", "deploymentstore_test.go", "environmentstore_test.go", - "event_test.go", + "eventstore_test.go", "pipedstatsstore_test.go", "pipedstore_test.go", "projectstore_test.go", diff --git a/pkg/datastore/event.go b/pkg/datastore/eventstore.go similarity index 74% rename from pkg/datastore/event.go rename to pkg/datastore/eventstore.go index e6509dc130..690b15ef2a 100644 --- a/pkg/datastore/event.go +++ b/pkg/datastore/eventstore.go @@ -31,6 +31,7 @@ var ( type EventStore interface { AddEvent(ctx context.Context, e model.Event) error + ListEvents(ctx context.Context, opts ListOptions) ([]*model.Event, error) } type eventStore struct { @@ -60,3 +61,23 @@ func (s *eventStore) AddEvent(ctx context.Context, e model.Event) error { } return s.ds.Create(ctx, eventModelKind, e.Id, &e) } + +func (s *eventStore) ListEvents(ctx context.Context, opts ListOptions) ([]*model.Event, error) { + it, err := s.ds.Find(ctx, eventModelKind, opts) + if err != nil { + return nil, err + } + es := make([]*model.Event, 0) + for { + var e model.Event + err := it.Next(&e) + if err == ErrIteratorDone { + break + } + if err != nil { + return nil, err + } + es = append(es, &e) + } + return es, nil +} diff --git a/pkg/datastore/event_test.go b/pkg/datastore/eventstore_test.go similarity index 100% rename from pkg/datastore/event_test.go rename to pkg/datastore/eventstore_test.go