Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(enterprise): audit logs for alpha and zero #7295

Merged
merged 33 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6c8d67d
starting work on audit logs
aman-bansal Jan 13, 2021
c77f3d0
making audit logs to work with alpha
aman-bansal Jan 14, 2021
6ba4f3b
making audit logs to work with zero too
aman-bansal Jan 14, 2021
8f4b3bc
go mod tidy
aman-bansal Jan 14, 2021
33aff9c
fixing minor things to clean
aman-bansal Jan 14, 2021
d78651f
adding endpoint to grpc audit
aman-bansal Jan 14, 2021
8f33f61
fixing alpha and zero default directories
aman-bansal Jan 14, 2021
eb2b7c7
fixing alpha and zero default directories
aman-bansal Jan 15, 2021
7515a1b
adding skip method for grpc audits
aman-bansal Jan 15, 2021
6e3d770
renaming rw
aman-bansal Jan 15, 2021
6706ac5
refactoring basic things
aman-bansal Jan 15, 2021
72db976
audit fix
aman-bansal Jan 15, 2021
97729e5
making zero and alpha logs from the start itself.
aman-bansal Jan 15, 2021
9b5c43e
fixing zero init audit process
aman-bansal Jan 15, 2021
d7adbf8
adding licence file + build tags
aman-bansal Jan 18, 2021
0710595
adding dgraph audit tool and logs encryption
aman-bansal Jan 19, 2021
40e0194
adding logwriter to handle encryption and everything
aman-bansal Jan 19, 2021
cdd76e8
adding test cases to check requests are getting logged into the audit…
aman-bansal Jan 19, 2021
87bea22
adding interceptor ee version
aman-bansal Jan 19, 2021
3248e2a
basic refactoring and log message truncate functionality
aman-bansal Jan 19, 2021
2ffcbfe
fixing auditing request handler
aman-bansal Jan 19, 2021
512f291
refactoring alpha and audit_ee
aman-bansal Jan 20, 2021
6336d09
refactoring some error handling
aman-bansal Jan 20, 2021
5b7c9f4
making audit conf more reliable
aman-bansal Jan 20, 2021
7cd693d
fixing goimports
aman-bansal Jan 20, 2021
6373287
Merge branch 'master' into aman/audit_logs
aman-bansal Jan 20, 2021
642c296
fixing merge conflicts
aman-bansal Jan 20, 2021
d97d78d
making log writer performant using buffered writer
aman-bansal Jan 24, 2021
05ea358
fixing zero test case and comments
aman-bansal Jan 24, 2021
d01d743
gracefully closing all the go routines
aman-bansal Jan 26, 2021
2fa132d
Merge branch 'master' into aman/audit_logs
aman-bansal Jan 26, 2021
2498f6d
fixing oss build
aman-bansal Jan 26, 2021
e35918c
fixing failed test case
aman-bansal Jan 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"time"

badgerpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/dgraph/ee/audit"

"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/ee/enc"
Expand Down Expand Up @@ -191,6 +193,14 @@ they form a Raft group and provide synchronous replication.
`Cache percentages summing up to 100 for various caches (FORMAT:
PostingListCache,PstoreBlockCache,PstoreIndexCache,WAL).`)

flag.String("audit", "",
`Various audit options.
dir=/path/to/audits to define the path where to store the audit logs.
compress=true/false to enabled the compression of old audit logs (default behaviour is false).
encrypt_file=enc/key/file enables the audit log encryption with the key path provided with the
flag.
Sample flag could look like --audit dir=aa;encrypt_file=/filepath;compress=true`)

// TLS configurations
x.RegisterServerTLSFlags(flag)
}
Expand Down Expand Up @@ -379,6 +389,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
grpc.MaxSendMsgSize(x.GrpcMaxSize),
grpc.MaxConcurrentStreams(1000),
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
grpc.UnaryInterceptor(audit.AuditRequestGRPC),
}
if tlsCfg != nil {
opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
Expand Down Expand Up @@ -417,15 +428,18 @@ func setupServer(closer *z.Closer) {
log.Fatal(err)
}

http.HandleFunc("/query", queryHandler)
http.HandleFunc("/query/", queryHandler)
http.HandleFunc("/mutate", mutationHandler)
http.HandleFunc("/mutate/", mutationHandler)
http.HandleFunc("/commit", commitHandler)
http.HandleFunc("/alter", alterHandler)
http.HandleFunc("/health", healthCheck)
http.HandleFunc("/state", stateHandler)
http.HandleFunc("/jemalloc", x.JemallocHandler)
baseMux := http.NewServeMux()
http.Handle("/", audit.AuditRequestHttp(baseMux))

baseMux.HandleFunc("/query", queryHandler)
baseMux.HandleFunc("/query/", queryHandler)
baseMux.HandleFunc("/mutate", mutationHandler)
baseMux.HandleFunc("/mutate/", mutationHandler)
baseMux.HandleFunc("/commit", commitHandler)
baseMux.HandleFunc("/alter", alterHandler)
baseMux.HandleFunc("/health", healthCheck)
baseMux.HandleFunc("/state", stateHandler)
baseMux.HandleFunc("/jemalloc", x.JemallocHandler)

// TODO: Figure out what this is for?
http.HandleFunc("/debug/store", storeStatsHandler)
Expand All @@ -451,8 +465,9 @@ func setupServer(closer *z.Closer) {
var gqlHealthStore *admin.GraphQLHealthStore
// Do not use := notation here because adminServer is a global variable.
mainServer, adminServer, gqlHealthStore = admin.NewServers(introspection, &globalEpoch, closer)
http.Handle("/graphql", mainServer.HTTPHandler())
http.HandleFunc("/probe/graphql", func(w http.ResponseWriter, r *http.Request) {
baseMux.Handle("/graphql", mainServer.HTTPHandler())
baseMux.HandleFunc("/probe/graphql", func(w http.ResponseWriter,
r *http.Request) {
healthStatus := gqlHealthStore.GetHealth()
httpStatusCode := http.StatusOK
if !healthStatus.Healthy {
Expand All @@ -463,18 +478,19 @@ func setupServer(closer *z.Closer) {
x.Check2(w.Write([]byte(fmt.Sprintf(`{"status":"%s","schemaUpdateCounter":%d}`,
healthStatus.StatusMsg, atomic.LoadUint64(&globalEpoch)))))
})
http.Handle("/admin", allowedMethodsHandler(allowedMethods{
baseMux.Handle("/admin", allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPost: true,
http.MethodOptions: true,
}, adminAuthHandler(adminServer.HTTPHandler())))

http.Handle("/admin/schema", adminAuthHandler(http.HandlerFunc(func(w http.ResponseWriter,
baseMux.Handle("/admin/schema", adminAuthHandler(http.HandlerFunc(func(
w http.ResponseWriter,
r *http.Request) {
adminSchemaHandler(w, r, adminServer)
})))

http.Handle("/admin/schema/validate", http.HandlerFunc(func(w http.ResponseWriter,
baseMux.HandleFunc("/admin/schema/validate", func(w http.ResponseWriter,
r *http.Request) {
schema := readRequest(w, r)
w.Header().Set("Content-Type", "application/json")
Expand All @@ -489,26 +505,28 @@ func setupServer(closer *z.Closer) {
w.WriteHeader(http.StatusBadRequest)
errs := strings.Split(strings.TrimSpace(err.Error()), "\n")
x.SetStatusWithErrors(w, x.ErrorInvalidRequest, errs)
}))
})

http.Handle("/admin/shutdown", allowedMethodsHandler(allowedMethods{http.MethodGet: true},
baseMux.Handle("/admin/shutdown", allowedMethodsHandler(allowedMethods{http.
MethodGet: true},
adminAuthHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
shutDownHandler(w, r, adminServer)
}))))

http.Handle("/admin/draining", allowedMethodsHandler(allowedMethods{
baseMux.Handle("/admin/draining", allowedMethodsHandler(allowedMethods{
http.MethodPut: true,
http.MethodPost: true,
}, adminAuthHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
drainingHandler(w, r, adminServer)
}))))

http.Handle("/admin/export", allowedMethodsHandler(allowedMethods{http.MethodGet: true},
baseMux.Handle("/admin/export", allowedMethodsHandler(
allowedMethods{http.MethodGet: true},
adminAuthHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
exportHandler(w, r, adminServer)
}))))

http.Handle("/admin/config/cache_mb", allowedMethodsHandler(allowedMethods{
baseMux.Handle("/admin/config/cache_mb", allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPut: true,
}, adminAuthHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -520,10 +538,10 @@ func setupServer(closer *z.Closer) {
glog.Infof("Bringing up GraphQL HTTP admin API at %s/admin", addr)

// Add OpenCensus z-pages.
zpages.Handle(http.DefaultServeMux, "/z")
zpages.Handle(baseMux, "/z")

http.HandleFunc("/", homeHandler)
http.HandleFunc("/ui/keywords", keywordHandler)
baseMux.Handle("/", http.HandlerFunc(homeHandler))
baseMux.Handle("/ui/keywords", http.HandlerFunc(keywordHandler))

// Initialize the servers.
admin.ServerCloser.AddRunning(3)
Expand Down Expand Up @@ -585,6 +603,9 @@ func run() {
walCache := (cachePercent[3] * (totalCache << 20)) / 100

ctype, clevel := x.ParseCompression(Alpha.Conf.GetString("badger.compression"))

conf, err := audit.GetAuditConf(Alpha.Conf.GetString("audit"))
x.Check(err)
opts := worker.Options{
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
Expand All @@ -597,6 +618,7 @@ func run() {

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Audit: conf,
}

secretFile := Alpha.Conf.GetString("acl_secret_file")
Expand Down Expand Up @@ -658,6 +680,8 @@ func run() {
LudicrousConcurrency: Alpha.Conf.GetInt("ludicrous_concurrency"),
TLSClientConfig: tlsClientConf,
TLSServerConfig: tlsServerConf,
HmacSecret: opts.HmacSecret,
Audit: opts.Audit != nil,
}
x.WorkerConfig.Parse(Alpha.Conf)

Expand Down Expand Up @@ -699,6 +723,9 @@ func run() {

worker.InitServerState()

// Audit is enterprise feature.
x.Check(audit.InitAuditorIfNecessary(opts.Audit, worker.EnterpriseEnabled))

if Alpha.Conf.GetBool("expose_trace") {
// TODO: Remove this once we get rid of event logs.
trace.AuthRequest = func(req *http.Request) (any, sensitive bool) {
Expand Down Expand Up @@ -792,6 +819,8 @@ func run() {
adminCloser.SignalAndWait()
glog.Infoln("adminCloser closed.")

audit.Close()
glog.Infoln("audit logs if enabled are closed.")
worker.State.Dispose()
x.RemoveCidFile()
glog.Info("worker.State disposed.")
Expand Down
3 changes: 1 addition & 2 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok"
Expand Down Expand Up @@ -132,7 +131,7 @@ func handleError(err error, isRetry bool) {
dur := time.Duration(1+rand.Intn(10)) * time.Minute
fmt.Printf("Server is overloaded. Will retry after %s.\n", dur.Round(time.Minute))
time.Sleep(dur)
case err != zero.ErrConflict && err != dgo.ErrAborted:
case err != x.ErrConflict && err != dgo.ErrAborted:
fmt.Printf("Error while mutating: %v s.Code %v\n", s.Message(), s.Code())
}
}
Expand Down
2 changes: 2 additions & 0 deletions dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package cmd

import (
acl "github.com/dgraph-io/dgraph/ee/acl"
"github.com/dgraph-io/dgraph/ee/audit"
"github.com/dgraph-io/dgraph/ee/backup"
)

Expand All @@ -24,5 +25,6 @@ func init() {
&backup.LsBackup,
&backup.ExportBackup,
&acl.CmdAcl,
&audit.CmdAudit,
)
}
4 changes: 4 additions & 0 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"net/http"
"time"

"github.com/dgraph-io/dgraph/ee/audit"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
Expand Down Expand Up @@ -91,6 +93,8 @@ func (n *node) updateEnterpriseState(closer *z.Closer) {
active := time.Now().UTC().Before(expiry)
if !active {
n.server.expireLicense()
audit.Close()
glog.Infoln("audit logs if enabled are closed.")
glog.Warningf("Your enterprise license has expired and enterprise features are " +
"disabled. To continue using enterprise features, apply a valid license. To receive " +
"a new license, contact us at https://dgraph.io/contact.")
Expand Down
5 changes: 1 addition & 4 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (o *Oracle) commit(src *api.TxnContext) error {
defer o.Unlock()

if o.hasConflict(src) {
return ErrConflict
return x.ErrConflict
}
// We store src.Keys as string to ensure compatibility with all the various language clients we
// have. But, really they are just uint64s encoded as strings. We use base 36 during creation of
Expand Down Expand Up @@ -310,9 +310,6 @@ func (o *Oracle) MaxPending() uint64 {
return o.maxAssigned
}

// ErrConflict is returned when commit couldn't succeed due to conflicts.
var ErrConflict = errors.New("Transaction conflict")

// proposeTxn proposes a txn update, and then updates src to reflect the state
// of the commit after proposal is run.
func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
Expand Down
7 changes: 7 additions & 0 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync"
"time"

"github.com/dgraph-io/dgraph/ee/audit"

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/dgraph/conn"
Expand Down Expand Up @@ -403,6 +405,11 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
// Check expiry and set enabled accordingly.
expiry := time.Unix(state.License.ExpiryTs, 0).UTC()
state.License.Enabled = time.Now().UTC().Before(expiry)
if state.License.Enabled && opts.audit != nil {
if err := audit.InitAuditor(opts.audit); err != nil {
glog.Errorf("error while initializing audit logs %+v", err)
}
}
}
if p.Snapshot != nil {
if err := n.applySnapshot(p.Snapshot); err != nil {
Expand Down
46 changes: 38 additions & 8 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/dgraph-io/dgraph/ee/audit"

"go.opencensus.io/plugin/ocgrpc"
otrace "go.opencensus.io/trace"
"go.opencensus.io/zpages"
Expand All @@ -54,6 +57,7 @@ type options struct {
w string
rebalanceInterval time.Duration
tlsClientConfig *tls.Config
audit *audit.AuditConf
}

var opts options
Expand Down Expand Up @@ -95,6 +99,15 @@ instances to achieve high-availability.
flag.StringP("wal", "w", "zw", "Directory storing WAL.")
flag.Duration("rebalance_interval", 8*time.Minute, "Interval for trying a predicate move.")
flag.String("enterprise_license", "", "Path to the enterprise license file.")

flag.String("audit", "",
`Various audit options.
dir=/path/to/audits to define the path where to store the audit logs.
compress=true/false to enabled the compression of old audit logs (default behaviour is false).
encrypt_file=enc/key/file enables the audit log encryption with the key path provided with the
flag.
Sample flag could look like --audit dir=aa;encrypt_file=/filepath;compress=true`)

// TLS configurations
x.RegisterServerTLSFlags(flag)
}
Expand All @@ -118,6 +131,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) {
grpc.MaxSendMsgSize(x.GrpcMaxSize),
grpc.MaxConcurrentStreams(1000),
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
grpc.UnaryInterceptor(audit.AuditRequestGRPC),
}

tlsConf, err := x.LoadServerTLSConfigForInternalPort(Zero.Conf)
Expand Down Expand Up @@ -186,6 +200,8 @@ func run() {
x.Check(err)

raft := x.NewSuperFlag(Zero.Conf.GetString("raft")).MergeAndCheckDefault(raftDefault)
conf, err := audit.GetAuditConf(Zero.Conf.GetString("audit"))
x.Check(err)
opts = options{
bindall: Zero.Conf.GetBool("bindall"),
portOffset: Zero.Conf.GetInt("port_offset"),
Expand All @@ -195,6 +211,7 @@ func run() {
w: Zero.Conf.GetString("wal"),
rebalanceInterval: Zero.Conf.GetDuration("rebalance_interval"),
tlsClientConfig: tlsConf,
audit: conf,
}
glog.Infof("Setting Config to: %+v", opts)
x.WorkerConfig.Parse(Zero.Conf)
Expand All @@ -215,6 +232,14 @@ func run() {
}
}

if opts.audit != nil {
wd, err := filepath.Abs(opts.w)
x.Check(err)
ad, err := filepath.Abs(opts.audit.Dir)
x.Check(err)
x.AssertTruef(ad != wd, "WAL and Audit directory cannot be the same ('%s').", opts.audit.Dir)
}

if opts.rebalanceInterval <= 0 {
log.Fatalf("ERROR: Rebalance interval must be greater than zero. Found: %d",
opts.rebalanceInterval)
Expand Down Expand Up @@ -255,14 +280,17 @@ func run() {
x.Check(err)
go x.StartListenHttpAndHttps(httpListener, tlsCfg, st.zero.closer)

http.HandleFunc("/health", st.pingResponse)
http.HandleFunc("/state", st.getState)
http.HandleFunc("/removeNode", st.removeNode)
http.HandleFunc("/moveTablet", st.moveTablet)
http.HandleFunc("/assign", st.assign)
http.HandleFunc("/enterpriseLicense", st.applyEnterpriseLicense)
http.HandleFunc("/jemalloc", x.JemallocHandler)
zpages.Handle(http.DefaultServeMux, "/z")
baseMux := http.NewServeMux()
http.Handle("/", audit.AuditRequestHttp(baseMux))

baseMux.HandleFunc("/health", st.pingResponse)
baseMux.HandleFunc("/state", st.getState)
baseMux.HandleFunc("/removeNode", st.removeNode)
baseMux.HandleFunc("/moveTablet", st.moveTablet)
baseMux.HandleFunc("/assign", st.assign)
baseMux.HandleFunc("/enterpriseLicense", st.applyEnterpriseLicense)
baseMux.HandleFunc("/jemalloc", x.JemallocHandler)
zpages.Handle(baseMux, "/z")

// This must be here. It does not work if placed before Grpc init.
x.Check(st.node.initAndStartNode())
Expand Down Expand Up @@ -322,4 +350,6 @@ func run() {
glog.Infof("Raft WAL closed with err: %v\n", err)
st.zero.orc.close()
glog.Infoln("All done. Goodbye!")
audit.Close()
glog.Infoln("audit logs if enabled are closed.")
}
Loading