Skip to content

Commit

Permalink
Merge pull request #1773 from gravitational/sasha/ultimate-aws
Browse files Browse the repository at this point in the history
External events and sessions storage.
  • Loading branch information
klizhentas authored Mar 15, 2018
2 parents de18a22 + bad1b04 commit 724e827
Show file tree
Hide file tree
Showing 91 changed files with 42,072 additions and 136 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ const (
// ComponentSession is an active session.
ComponentSession = "session"

// ComponentDynamoDB represents dynamodb clients
ComponentDynamoDB = "dynamodb"

// DebugEnvVar tells tests to use verbose debug output
DebugEnvVar = "DEBUG"

Expand Down Expand Up @@ -209,6 +212,16 @@ const (

// Off means mode is off
Off = "off"

// SchemeS3 is S3 file scheme, means upload or download to S3 like object
// storage
SchemeS3 = "s3"

// SchemeFile is a local disk file storage
SchemeFile = "file"

// LogsDir is a log subdirectory for events and logs
LogsDir = "log"
)

// Component generates "component:subcomponent1:subcomponent2" strings used
Expand Down
2 changes: 1 addition & 1 deletion e
Submodule e updated from cfd6b0 to 3b7ad4
13 changes: 11 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/gravitational/teleport/lib/backend/dir"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/services"
Expand Down Expand Up @@ -70,6 +71,9 @@ type TeleInstance struct {
// Nodes is a list of additional nodes
// started with this instance
Nodes []*service.TeleportProcess

// UploadEventsC is a channel for upload events
UploadEventsC chan *events.UploadEvent
}

type User struct {
Expand Down Expand Up @@ -177,8 +181,9 @@ func NewInstance(cfg InstanceConfig) *TeleInstance {
fatalIf(err)

i := &TeleInstance{
Ports: cfg.Ports,
Hostname: cfg.NodeName,
Ports: cfg.Ports,
Hostname: cfg.NodeName,
UploadEventsC: make(chan *events.UploadEvent, 100),
}
secrets := InstanceSecrets{
SiteName: cfg.ClusterName,
Expand Down Expand Up @@ -326,6 +331,7 @@ func (i *TeleInstance) CreateEx(trustedSecrets []*InstanceSecrets, tconf *servic
tconf = service.MakeDefaultConfig()
}
tconf.DataDir = dataDir
tconf.UploadEventsC = i.UploadEventsC
tconf.Auth.ClusterName, err = services.NewClusterName(services.ClusterNameSpecV2{
ClusterName: i.Secrets.SiteName,
})
Expand Down Expand Up @@ -454,6 +460,7 @@ func (i *TeleInstance) StartNode(name string, sshPort int) (*service.TeleportPro
tconf.HostUUID = name
tconf.Hostname = name
tconf.DataDir = dataDir
tconf.UploadEventsC = i.UploadEventsC
var ttl time.Duration
tconf.CachePolicy = service.CachePolicy{
Enabled: true,
Expand Down Expand Up @@ -507,6 +514,7 @@ func (i *TeleInstance) StartNodeAndProxy(name string, sshPort, proxyWebPort, pro
tconf.Token = "token"
tconf.HostUUID = name
tconf.Hostname = name
tconf.UploadEventsC = i.UploadEventsC
tconf.DataDir = dataDir
var ttl time.Duration
tconf.CachePolicy = service.CachePolicy{
Expand Down Expand Up @@ -576,6 +584,7 @@ func (i *TeleInstance) StartProxy(cfg ProxyConfig) error {
tconf.AuthServers = append(tconf.AuthServers, *authServer)
tconf.CachePolicy = service.CachePolicy{Enabled: true}
tconf.DataDir = dataDir
tconf.UploadEventsC = i.UploadEventsC
tconf.HostUUID = cfg.Name
tconf.Hostname = cfg.Name
tconf.Token = "token"
Expand Down
51 changes: 43 additions & 8 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2016 Gravitational, Inc.
Copyright 2016-2018 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -162,23 +162,37 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
var tests = []struct {
inRecordLocation string
inForwardAgent bool
auditSessionsURI string
}{

// normal teleport
{
services.RecordAtNode,
false,
inRecordLocation: services.RecordAtNode,
inForwardAgent: false,
},
// recording proxy
{
services.RecordAtProxy,
true,
inRecordLocation: services.RecordAtProxy,
inForwardAgent: true,
},
// normal teleport with upload to file server
{
inRecordLocation: services.RecordAtNode,
inForwardAgent: false,
auditSessionsURI: c.MkDir(),
},
{
inRecordLocation: services.RecordAtProxy,
inForwardAgent: false,
auditSessionsURI: c.MkDir(),
},
}

for _, tt := range tests {
makeConfig := func() (*check.C, []string, []*InstanceSecrets, *service.Config) {
clusterConfig, err := services.NewClusterConfig(services.ClusterConfigSpecV3{
SessionRecording: tt.inRecordLocation,
Audit: services.AuditConfig{AuditSessionsURI: tt.auditSessionsURI},
})
c.Assert(err, check.IsNil)

Expand All @@ -189,7 +203,6 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
tconf.Proxy.DisableWebService = true
tconf.Proxy.DisableWebInterface = true
tconf.SSH.Enabled = true

return c, nil, nil, tconf
}
t := s.newTeleportWithConfig(makeConfig())
Expand Down Expand Up @@ -285,7 +298,29 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
myTerm.Type("\aecho hi\n\r\aexit\n\r\a")

// wait for session to end:
<-endC
select {
case <-endC:
case <-time.After(10 * time.Second):
c.Fatalf("Timeout waiting for session to finish")
}

// wait for the upload of the right session to complete
if tt.auditSessionsURI != "" {
timeoutC := time.After(10 * time.Second)
loop:
for {
select {
case event := <-t.UploadEventsC:
if event.SessionID != string(session.ID) {
log.Debugf("Skipping mismatching session %v, expecting upload of %v.", event.SessionID, session.ID)
continue
}
break loop
case <-timeoutC:
c.Fatalf("Timeout waiting for upload of session %v to complete to %v", session.ID, tt.auditSessionsURI)
}
}
}

// read back the entire session (we have to try several times until we get back
// everything because the session is closing)
Expand Down Expand Up @@ -323,7 +358,7 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
select {
case <-tickCh:
// Get all session events from the backend.
sessionEvents, err := site.GetSessionEvents(defaults.Namespace, session.ID, 0)
sessionEvents, err := site.GetSessionEvents(defaults.Namespace, session.ID, 0, false)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
7 changes: 6 additions & 1 deletion lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,7 +1883,12 @@ func (s *APIServer) getSessionEvents(auth ClientI, w http.ResponseWriter, r *htt
if err != nil {
afterN = 0
}
return auth.GetSessionEvents(namespace, *sid, afterN)
includePrintEvents, err := strconv.ParseBool(r.URL.Query().Get("print"))
if err != nil {
includePrintEvents = false
}

return auth.GetSessionEvents(namespace, *sid, afterN, includePrintEvents)
}

type upsertNamespaceReq struct {
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,12 +808,12 @@ func (a *AuthWithRoles) GetSessionChunk(namespace string, sid session.ID, offset
return a.alog.GetSessionChunk(namespace, sid, offsetBytes, maxBytes)
}

func (a *AuthWithRoles) GetSessionEvents(namespace string, sid session.ID, afterN int) ([]events.EventFields, error) {
func (a *AuthWithRoles) GetSessionEvents(namespace string, sid session.ID, afterN int, includePrintEvents bool) ([]events.EventFields, error) {
if err := a.action(namespace, services.KindSession, services.VerbRead); err != nil {
return nil, trace.Wrap(err)
}

return a.alog.GetSessionEvents(namespace, sid, afterN)
return a.alog.GetSessionEvents(namespace, sid, afterN, includePrintEvents)
}

func (a *AuthWithRoles) SearchEvents(from, to time.Time, query string, limit int) ([]events.EventFields, error) {
Expand Down
5 changes: 4 additions & 1 deletion lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,14 +1686,17 @@ func (c *Client) GetSessionChunk(namespace string, sid session.ID, offsetBytes,
//
// This function is usually used in conjunction with GetSessionReader to
// replay recorded session streams.
func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int) (retval []events.EventFields, err error) {
func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int, includePrintEvents bool) (retval []events.EventFields, err error) {
if namespace == "" {
return nil, trace.BadParameter(MissingNamespaceError)
}
query := make(url.Values)
if afterN > 0 {
query.Set("after", strconv.Itoa(afterN))
}
if includePrintEvents {
query.Set("print", fmt.Sprintf("%v", includePrintEvents))
}
response, err := c.Get(c.Endpoint("namespaces", namespace, "sessions", string(sid), "events"), query)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion lib/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
})
c.Assert(err, check.IsNil)
// ask for strictly session events:
e, err := clt.GetSessionEvents(defaults.Namespace, sess.ID, 0)
e, err := clt.GetSessionEvents(defaults.Namespace, sess.ID, 0, true)
c.Assert(err, check.IsNil)
c.Assert(len(e), check.Equals, 2)
c.Assert(e[0].GetString("val"), check.Equals, "one")
Expand Down
48 changes: 13 additions & 35 deletions lib/backend/dir/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"os"
"path"
"path/filepath"
"syscall"
"time"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/trace"

log "github.com/sirupsen/logrus"
"github.com/gravitational/teleport/lib/utils"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -161,10 +160,10 @@ func (bk *Backend) CreateVal(bucket []string, key string, val []byte, ttl time.D
return trace.ConvertSystemError(err)
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
if err := f.Truncate(0); err != nil {
return trace.ConvertSystemError(err)
}
Expand All @@ -175,27 +174,6 @@ func (bk *Backend) CreateVal(bucket []string, key string, val []byte, ttl time.D
return trace.Wrap(bk.applyTTL(dirPath, key, ttl))
}

func writeLock(f *os.File) error {
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}

func readLock(f *os.File) error {
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_SH); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}

func unlock(f *os.File) error {
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}

// UpsertVal updates or inserts value with a given TTL into a bucket
// ForeverTTL for no TTL
func (bk *Backend) UpsertVal(bucket []string, key string, val []byte, ttl time.Duration) error {
Expand All @@ -214,10 +192,10 @@ func (bk *Backend) UpsertVal(bucket []string, key string, val []byte, ttl time.D
return trace.ConvertSystemError(err)
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
if err := f.Truncate(0); err != nil {
return trace.ConvertSystemError(err)
}
Expand Down Expand Up @@ -249,10 +227,10 @@ func (bk *Backend) GetVal(bucket []string, key string) ([]byte, error) {
return nil, trace.ConvertSystemError(err)
}
defer f.Close()
if err := readLock(f); err != nil {
if err := utils.FSReadLock(f); err != nil {
return nil, trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
bytes, err := ioutil.ReadAll(f)
if err != nil {
return nil, trace.ConvertSystemError(err)
Expand All @@ -278,10 +256,10 @@ func (bk *Backend) DeleteKey(bucket []string, key string) error {
return trace.ConvertSystemError(err)
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
if err := os.Remove(bk.ttlFile(dirPath, key)); err != nil {
if !os.IsNotExist(err) {
log.Warn(err)
Expand Down Expand Up @@ -345,10 +323,10 @@ func removeFile(path string) error {
return nil
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
err = os.Remove(path)
if err != nil {
err = trace.ConvertSystemError(err)
Expand Down
Loading

0 comments on commit 724e827

Please sign in to comment.