Skip to content

Commit

Permalink
implement rfd 19
Browse files Browse the repository at this point in the history
  • Loading branch information
xacrimon committed May 10, 2021
1 parent 363348d commit 88f07de
Show file tree
Hide file tree
Showing 25 changed files with 1,916 additions and 641 deletions.
47 changes: 47 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
"io"
"net"
"sync"
Expand Down Expand Up @@ -1293,6 +1294,52 @@ func (c *Client) DeleteAllNodes(ctx context.Context, namespace string) error {
return trail.FromGRPC(err)
}

// SearchEvents allows searching for events with a full pagination support.
func (c *Client) SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]map[string]interface{}, string, error) {
request := &proto.GetEventsRequest{
Namespace: namespace,
StartDate: fromUTC,
EndDate: toUTC,
EventTypes: eventTypes,
Limit: int32(limit),
StartKey: startKey,
}

response, err := c.grpc.GetEvents(ctx, request)
if err != nil {
return nil, "", trail.FromGRPC(err)
}

var decodedEvents []map[string]interface{}
if err := json.Unmarshal(response.Items, &decodedEvents); err != nil {
return nil, "", trace.Wrap(err)
}

return decodedEvents, response.LastKey, nil
}

// SearchSessionEvents allows searching for session events with a full pagination support.
func (c *Client) SearchSessionEvents(ctx context.Context, fromUTC time.Time, toUTC time.Time, limit int, startKey string) ([]map[string]interface{}, string, error) {
request := &proto.GetSessionEventsRequest{
StartDate: fromUTC,
EndDate: toUTC,
Limit: int32(limit),
StartKey: startKey,
}

response, err := c.grpc.GetSessionEvents(ctx, request)
if err != nil {
return nil, "", trail.FromGRPC(err)
}

var decodedEvents []map[string]interface{}
if err := json.Unmarshal(response.Items, &decodedEvents); err != nil {
return nil, "", trace.Wrap(err)
}

return decodedEvents, response.LastKey, nil
}

// GetClusterNetworkingConfig gets cluster networking configuration.
func (c *Client) GetClusterNetworkingConfig(ctx context.Context) (types.ClusterNetworkingConfig, error) {
resp, err := c.grpc.GetClusterNetworkingConfig(ctx, &empty.Empty{}, c.callOpts...)
Expand Down
1,835 changes: 1,454 additions & 381 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions api/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,42 @@ message SingleUseUserCert {
}
}

message GetEventsRequest {
// Namespace, if not set, defaults to 'default'
string Namespace = 1;
// Oldest date of returned events
google.protobuf.Timestamp StartDate = 2
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
// Newest date of returned events
google.protobuf.Timestamp EndDate = 3
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
// EventTypes is optional, if not set, returns all events
repeated string EventTypes = 4;
// Maximum amount of events returned
int32 Limit = 5;
// When supplied the search will resume from the last key
string StartKey = 6;
}

message GetSessionEventsRequest {
// Oldest date of returned events
google.protobuf.Timestamp StartDate = 1
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
// Newest date of returned events
google.protobuf.Timestamp EndDate = 2
[ (gogoproto.stdtime) = true, (gogoproto.nullable) = false ];
int32 Limit = 3;
string StartKey = 4;
}

message Events {
bytes Items = 1;
// the key of the last event if the returned set did not contain all events found i.e limit <
// actual amount. this is the key clients can supply in another API request to continue fetching
// events from the previous last position
string LastKey = 2;
}

// AuthService is authentication/authorization service implementation
service AuthService {
// SendKeepAlives allows node to send a stream of keep alive requests
Expand Down Expand Up @@ -1043,4 +1079,9 @@ service AuthService {
rpc GetClusterNetworkingConfig(google.protobuf.Empty) returns (types.ClusterNetworkingConfigV2);
// SetClusterNetworkingConfig sets cluster networking configuration.
rpc SetClusterNetworkingConfig(types.ClusterNetworkingConfigV2) returns (google.protobuf.Empty);

// Out-of-session request for audit events.
rpc GetEvents(GetEventsRequest) returns (Events);
// In-session request for audit events.
rpc GetSessionEvents(GetSessionEventsRequest) returns (Events);
}
2 changes: 1 addition & 1 deletion e
Submodule e updated from a7e421 to 7d5351
4 changes: 2 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,12 +1509,12 @@ func (s *IntSuite) twoClustersTunnel(c *check.C, now time.Time, proxyRecordMode
stopCh := time.After(5 * time.Second)

// only look for exec events
execQuery := fmt.Sprintf("%s=%s", events.EventType, events.ExecEvent)
eventTypes := []string{events.ExecEvent}

for {
select {
case <-tickCh:
eventsInSite, err := site.SearchEvents(now, now.Add(1*time.Hour), execQuery, 0)
eventsInSite, _, err := site.SearchEvents(now, now.Add(1*time.Hour), defaults.Namespace, eventTypes, 0, "")
if err != nil {
return trace.Wrap(err)
}
Expand Down
11 changes: 4 additions & 7 deletions lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,12 +1819,9 @@ func (s *APIServer) searchEvents(auth ClientI, w http.ResponseWriter, r *http.Re
return nil, trace.BadParameter("failed to parse limit: %q", limit)
}
}
// remove 'to', 'from' and 'limit' fields, passing the rest of the query unmodified
// to whatever pluggable search is implemented by the backend
query.Del("to")
query.Del("from")
query.Del("limit")
eventsList, err := auth.SearchEvents(from, to, query.Encode(), limit)

eventTypes := query[events.EventType]
eventsList, _, err := auth.SearchEvents(from, to, defaults.Namespace, eventTypes, limit, "")
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1864,7 +1861,7 @@ func (s *APIServer) searchSessionEvents(auth ClientI, w http.ResponseWriter, r *
}
}
// only pull back start and end events to build list of completed sessions
eventsList, err := auth.SearchSessionEvents(from, to, limit)
eventsList, _, err := auth.SearchSessionEvents(from, to, limit, "")
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
44 changes: 28 additions & 16 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -1996,22 +1996,6 @@ func (a *ServerWithRoles) GetSessionEvents(namespace string, sid session.ID, aft
return a.alog.GetSessionEvents(namespace, sid, afterN, includePrintEvents)
}

func (a *ServerWithRoles) SearchEvents(from, to time.Time, query string, limit int) ([]events.EventFields, error) {
if err := a.action(defaults.Namespace, services.KindEvent, services.VerbList); err != nil {
return nil, trace.Wrap(err)
}

return a.alog.SearchEvents(from, to, query, limit)
}

func (a *ServerWithRoles) SearchSessionEvents(from, to time.Time, limit int) ([]events.EventFields, error) {
if err := a.action(defaults.Namespace, services.KindSession, services.VerbList); err != nil {
return nil, trace.Wrap(err)
}

return a.alog.SearchSessionEvents(from, to, limit)
}

// GetNamespaces returns a list of namespaces
func (a *ServerWithRoles) GetNamespaces() ([]services.Namespace, error) {
if err := a.action(defaults.Namespace, services.KindNamespace, services.VerbList); err != nil {
Expand Down Expand Up @@ -2914,6 +2898,34 @@ func (a *ServerWithRoles) IsMFARequired(ctx context.Context, req *proto.IsMFAReq
return a.authServer.isMFARequired(ctx, a.context.Checker, req)
}

// SearchEvents allows searching audit events with pagination support.
func (a *ServerWithRoles) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) (events []events.EventFields, lastKey string, err error) {
if err := a.action(defaults.Namespace, services.KindEvent, services.VerbList); err != nil {
return nil, "", trace.Wrap(err)
}

events, lastKey, err = a.alog.SearchEvents(fromUTC, toUTC, namespace, eventTypes, limit, startKey)
if err != nil {
return nil, "", trace.Wrap(err)
}

return events, lastKey, nil
}

// SearchSessionEvents allows searching session audit events with pagination support.
func (a *ServerWithRoles) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, startKey string) (events []events.EventFields, lastKey string, err error) {
if err := a.action(defaults.Namespace, services.KindEvent, services.VerbList); err != nil {
return nil, "", trace.Wrap(err)
}

events, lastKey, err = a.alog.SearchSessionEvents(fromUTC, toUTC, limit, startKey)
if err != nil {
return nil, "", trace.Wrap(err)
}

return events, lastKey, nil
}

// NewAdminAuthServer returns auth server authorized as admin,
// used for auth server cached access
func NewAdminAuthServer(authServer *Server, sessions session.Service, alog events.IAuditLog) (ClientI, error) {
Expand Down
47 changes: 19 additions & 28 deletions lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,45 +1451,36 @@ func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int,
return retval, nil
}

// SearchEvents returns events that fit the criteria
func (c *Client) SearchEvents(from, to time.Time, query string, limit int) ([]events.EventFields, error) {
q, err := url.ParseQuery(query)
// SearchEvents allows searching for audit events with pagination support.
func (c *Client) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]events.EventFields, string, error) {
eventsOpaque, lastKey, err := c.APIClient.SearchEvents(context.TODO(), fromUTC, toUTC, namespace, eventTypes, limit, startKey)
if err != nil {
return nil, trace.BadParameter("query")
return nil, "", trace.Wrap(err)
}
q.Set("from", from.Format(time.RFC3339))
q.Set("to", to.Format(time.RFC3339))
q.Set("limit", fmt.Sprintf("%v", limit))
response, err := c.Get(c.Endpoint("events"), q)
if err != nil {
return nil, trace.Wrap(err)
}
retval := make([]events.EventFields, 0)
if err := json.Unmarshal(response.Bytes(), &retval); err != nil {
return nil, trace.Wrap(err)

var eventsConcrete []events.EventFields

for _, event := range eventsOpaque {
eventsConcrete = append(eventsConcrete, events.EventFields(event))
}
return retval, nil

return eventsConcrete, lastKey, nil
}

// SearchSessionEvents returns session related events to find completed sessions.
func (c *Client) SearchSessionEvents(from, to time.Time, limit int) ([]events.EventFields, error) {
query := url.Values{
"to": []string{to.Format(time.RFC3339)},
"from": []string{from.Format(time.RFC3339)},
"limit": []string{fmt.Sprintf("%v", limit)},
}

response, err := c.Get(c.Endpoint("events", "session"), query)
func (c *Client) SearchSessionEvents(fromUTC time.Time, toUTC time.Time, limit int, startKey string) ([]events.EventFields, string, error) {
eventsOpaque, lastKey, err := c.APIClient.SearchSessionEvents(context.TODO(), fromUTC, toUTC, limit, startKey)
if err != nil {
return nil, trace.Wrap(err)
return nil, "", trace.Wrap(err)
}

retval := make([]events.EventFields, 0)
if err := json.Unmarshal(response.Bytes(), &retval); err != nil {
return nil, trace.Wrap(err)
var eventsConcrete []events.EventFields

for _, event := range eventsOpaque {
eventsConcrete = append(eventsConcrete, events.EventFields(event))
}

return retval, nil
return eventsConcrete, lastKey, nil
}

// GetNamespaces returns a list of namespaces
Expand Down
49 changes: 49 additions & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auth
import (
"context"
"crypto/tls"
"encoding/json"
"io"
"net"
"time"
Expand Down Expand Up @@ -2512,6 +2513,54 @@ func (g *GRPCServer) authenticate(ctx context.Context) (*grpcContext, error) {
}, nil
}

// GetEvents searches for events on the backend and sends them back in a response.
func (g *GRPCServer) GetEvents(ctx context.Context, req *proto.GetEventsRequest) (*proto.Events, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trail.ToGRPC(err)
}

events, lastkey, err := auth.ServerWithRoles.SearchEvents(req.StartDate, req.EndDate, req.Namespace, req.EventTypes, int(req.Limit), req.StartKey)
if err != nil {
return nil, trail.ToGRPC(err)
}

var res *proto.Events = &proto.Events{}

encodedEvents, err := json.Marshal(events)
if err != nil {
return nil, trail.ToGRPC(err)
}

res.Items = encodedEvents
res.LastKey = lastkey
return res, nil
}

// GetEvents searches for session events on the backend and sends them back in a response.
func (g *GRPCServer) GetSessionEvents(ctx context.Context, req *proto.GetSessionEventsRequest) (*proto.Events, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trail.ToGRPC(err)
}

events, lastkey, err := auth.ServerWithRoles.SearchSessionEvents(req.StartDate, req.EndDate, int(req.Limit), req.StartKey)
if err != nil {
return nil, trail.ToGRPC(err)
}

var res *proto.Events = &proto.Events{}

encodedEvents, err := json.Marshal(events)
if err != nil {
return nil, trail.ToGRPC(err)
}

res.Items = encodedEvents
res.LastKey = lastkey
return res, nil
}

// GRPCServerConfig specifies GRPC server configuration
type GRPCServerConfig struct {
// APIConfig is GRPC server API configuration
Expand Down
5 changes: 2 additions & 3 deletions lib/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,15 +1347,14 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
// try searching for events with no filter (empty query) - should get all 3 events:
to := time.Now().In(time.UTC).Add(time.Hour)
from := to.Add(-time.Hour * 2)
history, err := clt.SearchEvents(from, to, "", 0)
history, _, err := clt.SearchEvents(from, to, defaults.Namespace, nil, 0, "")
c.Assert(err, check.IsNil)
c.Assert(history, check.NotNil)
// Extra event is the upload event
c.Assert(len(history), check.Equals, 5)

// try searching for only "session.end" events (real query)
history, err = clt.SearchEvents(from, to,
fmt.Sprintf("%s=%s", events.EventType, events.SessionEndEvent), 0)
history, _, err = clt.SearchEvents(from, to, defaults.Namespace, []string{events.SessionEndEvent}, 0, "")
c.Assert(err, check.IsNil)
c.Assert(history, check.NotNil)
c.Assert(len(history), check.Equals, 2)
Expand Down
19 changes: 11 additions & 8 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,19 +579,22 @@ type IAuditLog interface {
// replay recorded session streams.
GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]EventFields, error)

// SearchEvents is a flexible way to find events. The format of a query string
// depends on the implementing backend. A recommended format is urlencoded
// (good enough for Lucene/Solr)
// SearchEvents is a flexible way to find events.
//
// Pagination is also defined via backend-specific query format.
// Event types to filter can be specified and pagination is handled by an iterator key that allows
// a query to be resumed.
//
// The only mandatory requirement is a date range (UTC). Results must always
// show up sorted by date (newest first)
SearchEvents(fromUTC, toUTC time.Time, query string, limit int) ([]EventFields, error)
SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, startKey string) ([]EventFields, string, error)

// SearchSessionEvents returns session related events only. This is used to
// find completed session.
SearchSessionEvents(fromUTC time.Time, toUTC time.Time, limit int) ([]EventFields, error)
// SearchSessionEvents is a flexible way to find session events.
// Only session events are returned by this function.
// This is used to find completed session.
//
// Event types to filter can be specified and pagination is handled by an iterator key that allows
// a query to be resumed.
SearchSessionEvents(fromUTC time.Time, toUTC time.Time, limit int, startKey string) ([]EventFields, string, error)

// WaitForDelivery waits for resources to be released and outstanding requests to
// complete after calling Close method
Expand Down
Loading

0 comments on commit 88f07de

Please sign in to comment.