Skip to content

Commit

Permalink
Log events to multiple destinations
Browse files Browse the repository at this point in the history
This commit implements #2070

```yaml
teleport:
  storage:
    type: dir
    audit_events_uri:  [file:///var/lib/teleport/events, dynamodb://test_grv8_events]
    audit_sessions_uri: s3://testgrv8records
```
  • Loading branch information
klizhentas committed Jul 17, 2018
1 parent 0561ab1 commit e595c37
Show file tree
Hide file tree
Showing 22 changed files with 1,146 additions and 1,357 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
# name = "github.com/x/y"
# version = "2.4.0"

ignored = ["github.com/Sirupsen/logrus"]

[prune]
go-tests = true
unused-packages = true
non-go = true

ignored = ["github.com/Sirupsen/logrus"]

[[constraint]]
name = "github.com/alecthomas/template"
branch = "master"
Expand Down Expand Up @@ -130,7 +130,7 @@ ignored = ["github.com/Sirupsen/logrus"]

[[constraint]]
name = "github.com/gravitational/trace"
version = "1.1.3"
version = "1.1.5"

[[constraint]]
name = "github.com/coreos/go-oidc"
Expand Down
57 changes: 54 additions & 3 deletions lib/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package auth

import (
"context"
"encoding/base32"
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"time"

"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -842,9 +845,22 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
return data
}

uploadDir := c.MkDir()

// emit two events: "one" and "two" for this session, and event "three"
// for some other session
err = clt.PostSessionSlice(events.SessionSlice{
err = os.MkdirAll(filepath.Join(uploadDir, "upload", "sessions", defaults.Namespace), 0755)
forwarder, err := events.NewForwarder(events.ForwarderConfig{
Namespace: defaults.Namespace,
SessionID: sess.ID,
ServerID: teleport.ComponentUpload,
DataDir: uploadDir,
RecordSessions: true,
ForwardTo: clt,
})
c.Assert(err, check.IsNil)

err = forwarder.PostSessionSlice(events.SessionSlice{
Namespace: defaults.Namespace,
SessionID: string(sess.ID),
Chunks: []*events.SessionChunk{
Expand All @@ -861,11 +877,21 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
Data: marshal(events.EventFields{events.EventLogin: "bob", "val": "two"}),
},
},
Version: events.V2,
Version: events.V3,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)

anotherSessionID := session.NewID()
forwarder, err = events.NewForwarder(events.ForwarderConfig{
Namespace: defaults.Namespace,
SessionID: sess.ID,
ServerID: teleport.ComponentUpload,
DataDir: uploadDir,
RecordSessions: true,
ForwardTo: clt,
})
c.Assert(err, check.IsNil)
err = clt.PostSessionSlice(events.SessionSlice{
Namespace: defaults.Namespace,
SessionID: string(anotherSessionID),
Expand All @@ -883,9 +909,34 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
Data: marshal(events.EventFields{events.EventLogin: "alice", "val": "three"}),
},
},
Version: events.V2,
Version: events.V3,
})
c.Assert(err, check.IsNil)
c.Assert(forwarder.Close(), check.IsNil)

// start uploader process
eventsC := make(chan *events.UploadEvent, 100)
uploader, err := events.NewUploader(events.UploaderConfig{
ServerID: "upload",
DataDir: uploadDir,
Namespace: defaults.Namespace,
Context: context.TODO(),
ScanPeriod: 100 * time.Millisecond,
AuditLog: clt,
EventsC: eventsC,
})
c.Assert(err, check.IsNil)
err = uploader.Scan()
c.Assert(err, check.IsNil)

// scanner should upload the events
select {
case event := <-eventsC:
c.Assert(event, check.NotNil)
c.Assert(event.Error, check.IsNil)
case <-time.After(time.Second):
c.Fatalf("Timeout wating for the upload event")
}

// ask for strictly session events:
e, err := clt.GetSessionEvents(defaults.Namespace, sess.ID, 0, true)
Expand Down
6 changes: 3 additions & 3 deletions lib/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (s *ConfigTestSuite) TestApplyConfig(c *check.C) {
conf, err := ReadConfig(bytes.NewBufferString(SmallConfigString))
c.Assert(err, check.IsNil)
c.Assert(conf, check.NotNil)
c.Assert(conf.Proxy.PublicAddr, check.DeepEquals, Strings{"web3:443"})
c.Assert(conf.Proxy.PublicAddr, check.DeepEquals, utils.Strings{"web3:443"})

cfg := service.MakeDefaultConfig()
err = ApplyFileConfig(conf, cfg)
Expand Down Expand Up @@ -542,7 +542,7 @@ func checkStaticConfig(c *check.C, conf *FileConfig) {
c.Assert(conf.SSH.Commands[1].Name, check.Equals, "date")
c.Assert(conf.SSH.Commands[1].Command, check.DeepEquals, []string{"/bin/date"})
c.Assert(conf.SSH.Commands[1].Period.Nanoseconds(), check.Equals, int64(20000000))
c.Assert(conf.SSH.PublicAddr, check.DeepEquals, Strings{
c.Assert(conf.SSH.PublicAddr, check.DeepEquals, utils.Strings{
"luna3:22",
})

Expand All @@ -569,7 +569,7 @@ func checkStaticConfig(c *check.C, conf *FileConfig) {
c.Assert(conf.Auth.StaticTokens, check.DeepEquals,
StaticTokens{"proxy,node:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "auth:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"})

c.Assert(conf.Auth.PublicAddr, check.DeepEquals, Strings{
c.Assert(conf.Auth.PublicAddr, check.DeepEquals, utils.Strings{
"auth.default.svc.cluster.local:3080",
})

Expand Down
47 changes: 5 additions & 42 deletions lib/config/fileconf.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2015 Gravitational, Inc.
Copyright 2015-2018 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,6 +139,7 @@ var (
"proxy_checks_host_keys": false,
"audit_table_name": false,
"audit_sessions_uri": false,
"audit_events_uri": false,
"pam": true,
"service_name": false,
"client_idle_timeout": false,
Expand Down Expand Up @@ -524,7 +525,7 @@ type Auth struct {

// PublicAddr sets SSH host principals and TLS DNS names to auth
// server certificates
PublicAddr Strings `yaml:"public_addr,omitempty"`
PublicAddr utils.Strings `yaml:"public_addr,omitempty"`

// KubeCACertFile is a path to kubernetes certificate authority certificate file
KubeCACertFile string `yaml:"kube_ca_cert_file,omitempty"`
Expand Down Expand Up @@ -669,7 +670,7 @@ type SSH struct {
PermitUserEnvironment bool `yaml:"permit_user_env,omitempty"`
PAM *PAM `yaml:"pam,omitempty"`
// PublicAddr sets SSH host principals for SSH service
PublicAddr Strings `yaml:"public_addr,omitempty"`
PublicAddr utils.Strings `yaml:"public_addr,omitempty"`
}

// CommandLabel is `command` section of `ssh_service` in the config file
Expand Down Expand Up @@ -718,7 +719,7 @@ type Proxy struct {
// CertFile is a TLS Certificate file
CertFile string `yaml:"https_cert_file,omitempty"`
// PublicAddr is a publicly advertised address of the proxy
PublicAddr Strings `yaml:"public_addr,omitempty"`
PublicAddr utils.Strings `yaml:"public_addr,omitempty"`
// ProxyProtocol turns on support for HAProxy proxy protocol
// this is the option that has be turned on only by administrator,
// as only admin knows whether service is in front of trusted load balancer
Expand Down Expand Up @@ -961,41 +962,3 @@ func (u *U2F) Parse() (*services.U2F, error) {
Facets: facets,
}, nil
}

// Strings is a list of string that can unmarshal from list or a single yaml value
type Strings []string

// UnmarshalYAML is used to allow Strings to unmarshal from
// scalar string value or from the list
func (s *Strings) UnmarshalYAML(unmarshal func(interface{}) error) error {
// try unmarshal as string
var val string
err := unmarshal(&val)
if err == nil {
*s = []string{val}
return nil
}

// try unmarshal as slice
var slice []string
err = unmarshal(&slice)
if err == nil {
*s = slice
return nil
}

return err
}

// Addrs returns strings list converted to address list
func (s Strings) Addrs(defaultPort int) ([]utils.NetAddr, error) {
addrs := make([]utils.NetAddr, len(s))
for i, val := range s {
addr, err := utils.ParseHostPortAddr(val, defaultPort)
if err != nil {
return nil, trace.Wrap(err)
}
addrs[i] = *addr
}
return addrs, nil
}
Loading

0 comments on commit e595c37

Please sign in to comment.