Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions pkg/app/api/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type PipedAPI struct {
pipedStatsStore datastore.PipedStatsStore
pipedStore datastore.PipedStore
projectStore datastore.ProjectStore
eventStore datastore.EventStore
stageLogStore stagelogstore.Store
applicationLiveStateStore applicationlivestatestore.Store
commandStore commandstore.Store
Expand All @@ -63,6 +64,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.
pipedStatsStore: datastore.NewPipedStatsStore(ds),
pipedStore: datastore.NewPipedStore(ds),
projectStore: datastore.NewProjectStore(ds),
eventStore: datastore.NewEventStore(ds),
stageLogStore: sls,
applicationLiveStateStore: alss,
commandStore: cs,
Expand Down Expand Up @@ -679,6 +681,87 @@ func (a *PipedAPI) ReportApplicationLiveStateEvents(ctx context.Context, req *pi
return &pipedservice.ReportApplicationLiveStateEventsResponse{}, nil
}

// GetLatestEvent returns the latest event that meets the given conditions.
func (a *PipedAPI) GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest) (*pipedservice.GetLatestEventResponse, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still debating letting it give back a list like ListNotCompletedDeployments. Honestly not sure it's able to perform IN query against map[string]string for all Datastore type well. I attempted to put off it but seems like I'd better carefully look into it at this point.

projectID, _, _, err := rpcauth.ExtractPipedToken(ctx)
if err != nil {
return nil, err
}

// Try to fetch the most recently registered event.
opts := datastore.ListOptions{
PageSize: 1,
Filters: []datastore.ListFilter{
{
Field: "ProjectId",
Operator: "==",
Value: projectID,
},
{
Field: "Name",
Operator: "==",
Value: req.Name,
},
// TODO: Enable to use labels to filter Events
},
Orders: []datastore.Order{
{
Field: "UpdatedAt",
Direction: datastore.Desc,
},
},
}
events, err := a.eventStore.ListEvents(ctx, opts)
if err != nil {
a.logger.Error("failed to list events", zap.Error(err))
return nil, status.Error(codes.Internal, "failed to list event")
}
if len(events) == 0 {
return nil, status.Error(codes.NotFound, "no events found")
}

return &pipedservice.GetLatestEventResponse{
Event: events[0],
}, nil
}

// ListEvents returns a list of Events inside the given range.
func (a *PipedAPI) ListEvents(ctx context.Context, req *pipedservice.ListEventsRequest) (*pipedservice.ListEventsResponse, error) {
projectID, _, _, err := rpcauth.ExtractPipedToken(ctx)
if err != nil {
return nil, err
}

opts := datastore.ListOptions{
Filters: []datastore.ListFilter{
{
Field: "ProjectId",
Operator: "==",
Value: projectID,
},
{
Field: "CreatedAt",
Operator: ">=",
Value: req.From,
},
{
Field: "CreatedAt",
Operator: "<",
Value: req.To,
},
},
}

events, err := a.eventStore.ListEvents(ctx, opts)
if err != nil {
a.logger.Error("failed to list events", zap.Error(err))
return nil, status.Error(codes.Internal, "failed to list events")
}
return &pipedservice.ListEventsResponse{
Events: events,
}, nil
}

// validateAppBelongsToPiped checks if the given application belongs to the given piped.
// It gives back an error unless the application belongs to the piped.
func (a *PipedAPI) validateAppBelongsToPiped(ctx context.Context, appID, pipedID string) error {
Expand Down
16 changes: 16 additions & 0 deletions pkg/app/api/service/pipedservice/pipedclientfake/fakeclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,4 +463,20 @@ func (c *fakeClient) ReportApplicationLiveStateEvents(ctx context.Context, req *
return &pipedservice.ReportApplicationLiveStateEventsResponse{}, nil
}

func (c *fakeClient) GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest, opts ...grpc.CallOption) (*pipedservice.GetLatestEventResponse, error) {
c.logger.Info("fake client received GetLatestEvent rpc", zap.Any("request", req))
return &pipedservice.GetLatestEventResponse{
Event: &model.Event{
Id: "dev",
Name: "dev",
ProjectId: "dev",
},
}, nil
}

func (c *fakeClient) ListEvents(ctx context.Context, req *pipedservice.ListEventsRequest, opts ...grpc.CallOption) (*pipedservice.ListEventsResponse, error) {
c.logger.Info("fake client received ListEvents rpc", zap.Any("request", req))
return &pipedservice.ListEventsResponse{}, nil
}

var _ pipedservice.PipedServiceClient = (*fakeClient)(nil)
25 changes: 25 additions & 0 deletions pkg/app/api/service/pipedservice/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import "pkg/model/deployment.proto";
import "pkg/model/logblock.proto";
import "pkg/model/piped.proto";
import "pkg/model/piped_stats.proto";
import "pkg/model/event.proto";

// PipedService contains all RPC definitions for piped.
// All of these RPCs are only called by piped and authenticated by using PIPED_TOKEN.
Expand Down Expand Up @@ -133,6 +134,12 @@ service PipedService {
// and then another Handler service will pick them inorder to apply to build new state.
// By that way we can control the traffic to the datastore in a better way.
rpc ReportApplicationLiveStateEvents(ReportApplicationLiveStateEventsRequest) returns (ReportApplicationLiveStateEventsResponse) {}

// GetLatestEvent returns the latest event that meets the given conditions.
rpc GetLatestEvent(GetLatestEventRequest) returns (GetLatestEventResponse) {}

// ListEvents returns a list of Events inside the given range.
rpc ListEvents(ListEventsRequest) returns (ListEventsResponse) {}
}

message PingRequest {
Expand Down Expand Up @@ -345,3 +352,21 @@ message ReportApplicationLiveStateEventsRequest {
message ReportApplicationLiveStateEventsResponse {
repeated string failed_ids = 1;
}

message GetLatestEventRequest {
string name = 1 [(validate.rules).string.min_len = 1];
map<string,string> labels = 2;
}

message GetLatestEventResponse {
pipe.model.Event event = 1 [(validate.rules).message.required = true];
}

message ListEventsRequest {
int64 from = 1 [(validate.rules).int64.gt = 0];
int64 to = 2 [(validate.rules).int64.gt = 0];
}

message ListEventsResponse {
repeated pipe.model.Event events = 1;
}
4 changes: 2 additions & 2 deletions pkg/datastore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_library(
"datastore.go",
"deploymentstore.go",
"environmentstore.go",
"event.go",
"eventstore.go",
"mock.go",
"pipedstatsstore.go",
"pipedstore.go",
Expand All @@ -32,7 +32,7 @@ go_test(
"commandstore_test.go",
"deploymentstore_test.go",
"environmentstore_test.go",
"event_test.go",
"eventstore_test.go",
"pipedstatsstore_test.go",
"pipedstore_test.go",
"projectstore_test.go",
Expand Down
21 changes: 21 additions & 0 deletions pkg/datastore/event.go → pkg/datastore/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (

type EventStore interface {
AddEvent(ctx context.Context, e model.Event) error
ListEvents(ctx context.Context, opts ListOptions) ([]*model.Event, error)
}

type eventStore struct {
Expand Down Expand Up @@ -60,3 +61,23 @@ func (s *eventStore) AddEvent(ctx context.Context, e model.Event) error {
}
return s.ds.Create(ctx, eventModelKind, e.Id, &e)
}

func (s *eventStore) ListEvents(ctx context.Context, opts ListOptions) ([]*model.Event, error) {
it, err := s.ds.Find(ctx, eventModelKind, opts)
if err != nil {
return nil, err
}
es := make([]*model.Event, 0)
for {
var e model.Event
err := it.Next(&e)
if err == ErrIteratorDone {
break
}
if err != nil {
return nil, err
}
es = append(es, &e)
}
return es, nil
}
File renamed without changes.