Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RFD 19: Event Iteration API #6731

Merged
merged 32 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e3db8dd
implement rfd 19
xacrimon May 12, 2021
63eb715
mostly refactor internal handlers minus helpers to grpc events
xacrimon May 12, 2021
112fc09
migrated some tests
xacrimon May 12, 2021
7307651
fix optimized array sizing
xacrimon May 12, 2021
9a36827
implemented ToEventFields and GetSessionID
xacrimon May 13, 2021
3ad5728
update some code to handle an error
xacrimon May 13, 2021
1c99f18
add deserialization code
xacrimon May 13, 2021
f13f52b
update test mock impl
xacrimon May 13, 2021
3c61a4a
Merge branch 'master' into joel/event-api-pagination
xacrimon May 13, 2021
1caf18e
fix lint
xacrimon May 13, 2021
df8ce88
apply feedback
xacrimon May 14, 2021
954422d
refactor auditlog test
xacrimon May 14, 2021
44ee806
newline check
xacrimon May 14, 2021
5afea94
fix test
xacrimon May 14, 2021
e914828
more helpful event conversion error
xacrimon May 14, 2021
467e0ee
adjust comments
xacrimon May 14, 2021
58011e5
adjust session query priveleges
xacrimon May 14, 2021
742d9b2
adjust log
xacrimon May 14, 2021
fe95d6b
log event type only
xacrimon May 14, 2021
12d2c5e
use objecttostruct
xacrimon May 14, 2021
2509fa4
use fast unmarshalling
xacrimon May 14, 2021
81462f3
feedback
xacrimon May 14, 2021
b378e23
embed iauditlog in forward to get rid of wrappers
xacrimon May 14, 2021
7ffce90
fix typo
xacrimon May 14, 2021
8a9b873
Merge branch 'master' into joel/event-api-pagination
xacrimon May 14, 2021
82e019b
fix json error due to non pointer
xacrimon May 14, 2021
8ca3e9c
fix test
xacrimon May 14, 2021
82cad40
expand pagination test
xacrimon May 14, 2021
6d1162e
feedback from andrej
xacrimon May 17, 2021
377f74e
update proto
xacrimon May 17, 2021
dd271a8
Merge branch 'master' into joel/event-api-pagination
xacrimon May 17, 2021
f3d5cbc
Merge branch 'master' into joel/event-api-pagination
xacrimon May 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,60 @@ func (c *Client) DeleteAllNodes(ctx context.Context, namespace string) error {
return trail.FromGRPC(err)
}

// SearchEvents allows searching for events with a full pagination support.
func (c *Client) SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]events.AuditEvent, string, error) {
request := &proto.GetEventsRequest{
Namespace: namespace,
StartDate: fromUTC,
EndDate: toUTC,
EventTypes: eventTypes,
Limit: int32(limit),
StartKey: startKey,
}

response, err := c.grpc.GetEvents(ctx, request)
if err != nil {
return nil, "", trail.FromGRPC(err)
}

decodedEvents := make([]events.AuditEvent, 0, len(response.Items))
for _, rawEvent := range response.Items {
event, err := events.FromOneOf(*rawEvent)
if err != nil {
return nil, "", trace.Wrap(err)
}
decodedEvents = append(decodedEvents, event)
}

return decodedEvents, response.LastKey, nil
}

// SearchSessionEvents allows searching for session events with a full pagination support.
func (c *Client) SearchSessionEvents(ctx context.Context, fromUTC time.Time, toUTC time.Time, limit int, startKey string) ([]events.AuditEvent, string, error) {
request := &proto.GetSessionEventsRequest{
StartDate: fromUTC,
EndDate: toUTC,
Limit: int32(limit),
StartKey: startKey,
}

response, err := c.grpc.GetSessionEvents(ctx, request)
if err != nil {
return nil, "", trail.FromGRPC(err)
}

decodedEvents := make([]events.AuditEvent, 0, len(response.Items))
for _, rawEvent := range response.Items {
event, err := events.FromOneOf(*rawEvent)
if err != nil {
return nil, "", trace.Wrap(err)
}
decodedEvents = append(decodedEvents, event)
}

return decodedEvents, response.LastKey, nil
}

// GetClusterNetworkingConfig gets cluster networking configuration.
func (c *Client) GetClusterNetworkingConfig(ctx context.Context) (types.ClusterNetworkingConfig, error) {
resp, err := c.grpc.GetClusterNetworkingConfig(ctx, &empty.Empty{}, c.callOpts...)
Expand Down
1,840 changes: 1,461 additions & 379 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions api/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,42 @@ message SingleUseUserCert {
}
}

message GetEventsRequest {
// Namespace, if not set, defaults to 'default'
string Namespace = 1;
// Oldest date of returned events
xacrimon marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Timestamp StartDate = 2
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
// Newest date of returned events
google.protobuf.Timestamp EndDate = 3
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
// EventTypes is optional, if not set, returns all events
repeated string EventTypes = 4;
// Maximum amount of events returned
int32 Limit = 5;
// When supplied the search will resume from the last key
string StartKey = 6;
}

message GetSessionEventsRequest {
// Oldest date of returned events
google.protobuf.Timestamp StartDate = 1
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
// Newest date of returned events
google.protobuf.Timestamp EndDate = 2
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
int32 Limit = 3;
string StartKey = 4;
xacrimon marked this conversation as resolved.
Show resolved Hide resolved
}

message Events {
repeated events.OneOf Items = 1;
// the key of the last event if the returned set did not contain all events found i.e limit <
// actual amount. this is the key clients can supply in another API request to continue fetching
// events from the previous last position
string LastKey = 2;
}

// AuthService is authentication/authorization service implementation
service AuthService {
// SendKeepAlives allows node to send a stream of keep alive requests
Expand Down Expand Up @@ -1054,4 +1090,9 @@ service AuthService {
rpc GetClusterNetworkingConfig(google.protobuf.Empty) returns (types.ClusterNetworkingConfigV2);
// SetClusterNetworkingConfig sets cluster networking configuration.
rpc SetClusterNetworkingConfig(types.ClusterNetworkingConfigV2) returns (google.protobuf.Empty);

// Out-of-session request for audit events.
rpc GetEvents(GetEventsRequest) returns (Events);
// In-session request for audit events.
rpc GetSessionEvents(GetSessionEventsRequest) returns (Events);
}
2 changes: 1 addition & 1 deletion e
Submodule e updated from a7e421 to dee358
4 changes: 2 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,12 +1509,12 @@ func (s *IntSuite) twoClustersTunnel(c *check.C, now time.Time, proxyRecordMode
stopCh := time.After(5 * time.Second)

// only look for exec events
execQuery := fmt.Sprintf("%s=%s", events.EventType, events.ExecEvent)
eventTypes := []string{events.ExecEvent}

for {
select {
case <-tickCh:
eventsInSite, err := site.SearchEvents(now, now.Add(1*time.Hour), execQuery, 0)
eventsInSite, _, err := site.SearchEvents(now, now.Add(1*time.Hour), defaults.Namespace, eventTypes, 0, "")
if err != nil {
return trace.Wrap(err)
}
Expand Down
11 changes: 4 additions & 7 deletions lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,12 +1819,9 @@ func (s *APIServer) searchEvents(auth ClientI, w http.ResponseWriter, r *http.Re
return nil, trace.BadParameter("failed to parse limit: %q", limit)
}
}
// remove 'to', 'from' and 'limit' fields, passing the rest of the query unmodified
// to whatever pluggable search is implemented by the backend
query.Del("to")
query.Del("from")
query.Del("limit")
eventsList, err := auth.SearchEvents(from, to, query.Encode(), limit)

eventTypes := query[events.EventType]
eventsList, _, err := auth.SearchEvents(from, to, defaults.Namespace, eventTypes, limit, "")
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1864,7 +1861,7 @@ func (s *APIServer) searchSessionEvents(auth ClientI, w http.ResponseWriter, r *
}
}
// only pull back start and end events to build list of completed sessions
eventsList, err := auth.SearchSessionEvents(from, to, limit)
eventsList, _, err := auth.SearchSessionEvents(from, to, limit, "")
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
44 changes: 28 additions & 16 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -1996,22 +1996,6 @@ func (a *ServerWithRoles) GetSessionEvents(namespace string, sid session.ID, aft
return a.alog.GetSessionEvents(namespace, sid, afterN, includePrintEvents)
}

func (a *ServerWithRoles) SearchEvents(from, to time.Time, query string, limit int) ([]events.EventFields, error) {
if err := a.action(defaults.Namespace, services.KindEvent, services.VerbList); err != nil {
return nil, trace.Wrap(err)
}

return a.alog.SearchEvents(from, to, query, limit)
}

func (a *ServerWithRoles) SearchSessionEvents(from, to time.Time, limit int) ([]events.EventFields, error) {
if err := a.action(defaults.Namespace, services.KindSession, services.VerbList); err != nil {
return nil, trace.Wrap(err)
}

return a.alog.SearchSessionEvents(from, to, limit)
}

// GetNamespaces returns a list of namespaces
func (a *ServerWithRoles) GetNamespaces() ([]services.Namespace, error) {
if err := a.action(defaults.Namespace, services.KindNamespace, services.VerbList); err != nil {
Expand Down Expand Up @@ -2914,6 +2898,34 @@ func (a *ServerWithRoles) IsMFARequired(ctx context.Context, req *proto.IsMFAReq
return a.authServer.isMFARequired(ctx, a.context.Checker, req)
}

// SearchEvents allows searching audit events with pagination support.
func (a *ServerWithRoles) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) (events []events.AuditEvent, lastKey string, err error) {
if err := a.action(defaults.Namespace, services.KindEvent, services.VerbList); err != nil {
return nil, "", trace.Wrap(err)
}

events, lastKey, err = a.alog.SearchEvents(fromUTC, toUTC, namespace, eventTypes, limit, startKey)
if err != nil {
return nil, "", trace.Wrap(err)
}

return events, lastKey, nil
}

// SearchSessionEvents allows searching session audit events with pagination support.
func (a *ServerWithRoles) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, startKey string) (events []events.AuditEvent, lastKey string, err error) {
if err := a.action(defaults.Namespace, services.KindEvent, services.VerbList); err != nil {
xacrimon marked this conversation as resolved.
Show resolved Hide resolved
return nil, "", trace.Wrap(err)
}

events, lastKey, err = a.alog.SearchSessionEvents(fromUTC, toUTC, limit, startKey)
if err != nil {
return nil, "", trace.Wrap(err)
}

return events, lastKey, nil
}

// NewAdminAuthServer returns auth server authorized as admin,
// used for auth server cached access
func NewAdminAuthServer(authServer *Server, sessions session.Service, alog events.IAuditLog) (ClientI, error) {
Expand Down
41 changes: 10 additions & 31 deletions lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,45 +1451,24 @@ func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int,
return retval, nil
}

// SearchEvents returns events that fit the criteria
func (c *Client) SearchEvents(from, to time.Time, query string, limit int) ([]events.EventFields, error) {
q, err := url.ParseQuery(query)
// SearchEvents allows searching for audit events with pagination support.
func (c *Client) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]events.AuditEvent, string, error) {
events, lastKey, err := c.APIClient.SearchEvents(context.TODO(), fromUTC, toUTC, namespace, eventTypes, limit, startKey)
if err != nil {
return nil, trace.BadParameter("query")
return nil, "", trace.Wrap(err)
}
q.Set("from", from.Format(time.RFC3339))
q.Set("to", to.Format(time.RFC3339))
q.Set("limit", fmt.Sprintf("%v", limit))
response, err := c.Get(c.Endpoint("events"), q)
if err != nil {
return nil, trace.Wrap(err)
}
retval := make([]events.EventFields, 0)
if err := json.Unmarshal(response.Bytes(), &retval); err != nil {
return nil, trace.Wrap(err)
}
return retval, nil

return events, lastKey, nil
}

// SearchSessionEvents returns session related events to find completed sessions.
func (c *Client) SearchSessionEvents(from, to time.Time, limit int) ([]events.EventFields, error) {
query := url.Values{
"to": []string{to.Format(time.RFC3339)},
"from": []string{from.Format(time.RFC3339)},
"limit": []string{fmt.Sprintf("%v", limit)},
}

response, err := c.Get(c.Endpoint("events", "session"), query)
func (c *Client) SearchSessionEvents(fromUTC time.Time, toUTC time.Time, limit int, startKey string) ([]events.AuditEvent, string, error) {
events, lastKey, err := c.APIClient.SearchSessionEvents(context.TODO(), fromUTC, toUTC, limit, startKey)
if err != nil {
return nil, trace.Wrap(err)
}

retval := make([]events.EventFields, 0)
if err := json.Unmarshal(response.Bytes(), &retval); err != nil {
return nil, trace.Wrap(err)
return nil, "", trace.Wrap(err)
}

return retval, nil
return events, lastKey, nil
}

// GetNamespaces returns a list of namespaces
Expand Down
58 changes: 58 additions & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2512,6 +2512,64 @@ func (g *GRPCServer) authenticate(ctx context.Context) (*grpcContext, error) {
}, nil
}

// GetEvents searches for events on the backend and sends them back in a response.
func (g *GRPCServer) GetEvents(ctx context.Context, req *proto.GetEventsRequest) (*proto.Events, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trail.ToGRPC(err)
}

rawEvents, lastkey, err := auth.ServerWithRoles.SearchEvents(req.StartDate, req.EndDate, req.Namespace, req.EventTypes, int(req.Limit), req.StartKey)
if err != nil {
return nil, trail.ToGRPC(err)
}

var res *proto.Events = &proto.Events{}

encodedEvents := make([]*apievents.OneOf, 0, len(rawEvents))

for _, rawEvent := range rawEvents {
event, err := events.ToOneOf(rawEvent)
if err != nil {
return nil, trail.ToGRPC(err)
}
encodedEvents = append(encodedEvents, event)
}

res.Items = encodedEvents
res.LastKey = lastkey
return res, nil
}

// GetEvents searches for session events on the backend and sends them back in a response.
func (g *GRPCServer) GetSessionEvents(ctx context.Context, req *proto.GetSessionEventsRequest) (*proto.Events, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trail.ToGRPC(err)
}

rawEvents, lastkey, err := auth.ServerWithRoles.SearchSessionEvents(req.StartDate, req.EndDate, int(req.Limit), req.StartKey)
if err != nil {
return nil, trail.ToGRPC(err)
}

var res *proto.Events = &proto.Events{}

encodedEvents := make([]*apievents.OneOf, 0, len(rawEvents))

for _, rawEvent := range rawEvents {
event, err := events.ToOneOf(rawEvent)
if err != nil {
return nil, trail.ToGRPC(err)
}
encodedEvents = append(encodedEvents, event)
}

res.Items = encodedEvents
res.LastKey = lastkey
return res, nil
}

// GRPCServerConfig specifies GRPC server configuration
type GRPCServerConfig struct {
// APIConfig is GRPC server API configuration
Expand Down
15 changes: 8 additions & 7 deletions lib/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,13 +1299,13 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
Time: time.Now().UTC().UnixNano(),
EventIndex: 0,
EventType: events.SessionStartEvent,
Data: marshal(events.EventFields{events.EventLogin: "alice", "val": "three"}),
Data: marshal(events.EventFields{events.EventLogin: "alice"}),
},
{
Time: time.Now().UTC().UnixNano(),
EventIndex: 1,
EventType: events.SessionEndEvent,
Data: marshal(events.EventFields{events.EventLogin: "alice", "val": "three"}),
Data: marshal(events.EventFields{events.EventLogin: "alice"}),
},
},
Version: events.V3,
Expand Down Expand Up @@ -1347,23 +1347,24 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
// try searching for events with no filter (empty query) - should get all 3 events:
to := time.Now().In(time.UTC).Add(time.Hour)
from := to.Add(-time.Hour * 2)
history, err := clt.SearchEvents(from, to, "", 0)
history, _, err := clt.SearchEvents(from, to, defaults.Namespace, nil, 0, "")
c.Assert(err, check.IsNil)
c.Assert(history, check.NotNil)
// Extra event is the upload event
c.Assert(len(history), check.Equals, 5)

// try searching for only "session.end" events (real query)
history, err = clt.SearchEvents(from, to,
fmt.Sprintf("%s=%s", events.EventType, events.SessionEndEvent), 0)
history, _, err = clt.SearchEvents(from, to, defaults.Namespace, []string{events.SessionEndEvent}, 0, "")
c.Assert(err, check.IsNil)
c.Assert(history, check.NotNil)
c.Assert(len(history), check.Equals, 2)
var found bool
for _, event := range history {
if event.GetString(events.SessionEventID) == string(anotherSessionID) {
realEvent, ok := event.(*events.SessionEnd)
c.Assert(ok, check.Equals, true)
if realEvent.GetSessionID() == string(anotherSessionID) {
found = true
c.Assert(event.GetString("val"), check.Equals, "three")
c.Assert(realEvent.Login, check.Equals, "alice")
}
}
c.Assert(found, check.Equals, true)
Expand Down
Loading