-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enterprise - Support reading a signed file with details of Enterprise license #3824
Changes from 14 commits
b0f67ae
706746d
ce2b931
69871db
5dd9a02
c7bff37
c880a3b
9f0fe3e
6bfb453
6eda1a5
80b9c11
bbc98fc
41fa1c6
2a4949d
23ca13b
08809dd
27ca7ee
de2beaf
8f06a59
1f7d3f6
a2e5669
2c277d3
277f10b
2fa8fb3
a538d66
5b223dc
fd1be0d
b5dcda7
85f85ca
c5bb124
a053cd8
04b4641
101e1c0
09969a0
68064d6
ba7f3fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright 2017-2018 Dgraph Labs, Inc. and Contributors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package zero | ||
|
||
import ( | ||
"encoding/json" | ||
"io/ioutil" | ||
"os" | ||
|
||
"github.com/pkg/errors" | ||
"golang.org/x/crypto/openpgp" | ||
"golang.org/x/crypto/openpgp/armor" | ||
) | ||
|
||
func enterpriseDetails(signedFile string, e *enterprise) error { | ||
publicKeyFile, err := os.Open(Zero.Conf.GetString("public_key")) | ||
if err != nil { | ||
return errors.Wrapf(err, "while opening public key file") | ||
} | ||
defer publicKeyFile.Close() | ||
|
||
entityList, err := openpgp.ReadArmoredKeyRing(publicKeyFile) | ||
if err != nil { | ||
return errors.Wrapf(err, "while reading public key") | ||
} | ||
|
||
sf, err := os.Open(signedFile) | ||
if err != nil { | ||
return errors.Wrapf(err, "while opening signed license file: %v", signedFile) | ||
} | ||
|
||
// The signed file is expected to be have ASCII encoding, so we have to decode it before | ||
// reading. | ||
b, err := armor.Decode(sf) | ||
if err != nil { | ||
return errors.Wrapf(err, "while decoding license file") | ||
} | ||
|
||
md, err := openpgp.ReadMessage(b.Body, entityList, nil, nil) | ||
if err != nil { | ||
return errors.Wrapf(err, "while reading PGP message from license file") | ||
} | ||
|
||
// We need to read the body for the signature verification check to happen. | ||
// md.Signature would be non-nil after reading the body if the verification is successfull. | ||
buf, err := ioutil.ReadAll(md.UnverifiedBody) | ||
if err != nil { | ||
return errors.Wrapf(err, "while reading body from signed license file") | ||
} | ||
if md.Signature == nil { | ||
return errors.New("invalid signature while trying to verify license file") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not positive, but looks like maybe you could wrap the error from |
||
} | ||
|
||
err = json.Unmarshal(buf, e) | ||
return errors.Wrapf(err, "while JSON unmarshaling body of license file") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -356,6 +356,9 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) { | |
return p.Key, err | ||
} | ||
} | ||
if p.Enterprise != nil { | ||
state.Enterprise = p.Enterprise | ||
} | ||
|
||
if p.MaxLeaseId > state.MaxLeaseId { | ||
state.MaxLeaseId = p.MaxLeaseId | ||
|
@@ -502,11 +505,59 @@ func (n *node) initAndStartNode() error { | |
}() | ||
} | ||
|
||
if fpath := Zero.Conf.GetString("enterprise_license"); len(fpath) > 0 { | ||
var e enterprise | ||
if err := enterpriseDetails(fpath, &e); err != nil { | ||
x.CheckfNoTrace(err) | ||
} | ||
|
||
proposal := &pb.ZeroProposal{ | ||
Enterprise: &pb.Enterprise{ | ||
Entity: e.Entity, | ||
MaxNodes: e.MaxNodes, | ||
ExpiryTs: e.Expiry.Unix(), | ||
}, | ||
} | ||
|
||
go func() { | ||
for { | ||
err := n.proposeAndWait(context.Background(), proposal) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to double check, is the |
||
if err == nil { | ||
glog.Infof("Enterprise state proposed to the cluster") | ||
break | ||
} | ||
if err == errInvalidProposal { | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should anything be logged in this case or is it expected to no-op silently on an invalidProposal? |
||
} | ||
glog.Errorf("While proposing enterprise state: %v. Retrying...", err) | ||
time.Sleep(3 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be any limit to the number of times this is tried or possibly some sort of exponential backoff? Or maybe worth a TODO for the future. |
||
} | ||
}() | ||
} | ||
|
||
go n.Run() | ||
go n.BatchAndSendMessages() | ||
return nil | ||
} | ||
|
||
// periodically checks the validity of the enterprise license and updates the membership state. | ||
func (n *node) updateEnterpriseStatePeriodically(closer *y.Closer) { | ||
defer closer.Done() | ||
|
||
ticker := time.NewTicker(5 * time.Second) | ||
defer ticker.Stop() | ||
|
||
n.server.updateEnterpriseState() | ||
for { | ||
select { | ||
case <-ticker.C: | ||
n.server.updateEnterpriseState() | ||
case <-closer.HasBeenClosed(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { | ||
defer closer.Done() | ||
ticker := time.NewTicker(10 * time.Second) | ||
|
@@ -604,14 +655,15 @@ func (n *node) Run() { | |
// snapshot can cause select loop to block while deleting entries, so run | ||
// it in goroutine | ||
readStateCh := make(chan raft.ReadState, 100) | ||
closer := y.NewCloser(4) | ||
closer := y.NewCloser(5) | ||
defer func() { | ||
closer.SignalAndWait() | ||
n.closer.Done() | ||
glog.Infof("Zero Node.Run finished.") | ||
}() | ||
|
||
go n.snapshotPeriodically(closer) | ||
go n.updateEnterpriseStatePeriodically(closer) | ||
go n.updateZeroMembershipPeriodically(closer) | ||
go n.checkQuorum(closer) | ||
go n.RunReadIndexLoop(closer, readStateCh) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,9 @@ instances to achieve high-availability. | |
// about the status of supporting annotation logs through the datadog exporter | ||
flag.String("datadog.collector", "", "Send opencensus traces to Datadog. As of now, the trace"+ | ||
" exporter does not support annotation logs and would discard them.") | ||
flag.String("enterprise_license", "", "Path to the enterprise license file") | ||
// TODO - Only for testing, remove before shipping. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could consider upgrading this to a FIXME if it is supposed to be removed before shipping to prevent it sneaking by if FIXME caries a higher weight on your team. |
||
flag.String("public_key", "", "Path to public key.") | ||
} | ||
|
||
func setupListener(addr string, port int, kind string) (listener net.Listener, err error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,12 @@ var ( | |
errServerShutDown = errors.New("Server is being shut down") | ||
) | ||
|
||
type enterprise struct { | ||
Entity string `json:"entity"` | ||
MaxNodes uint64 `json:"max_nodes"` | ||
Expiry time.Time `json:"expiry"` | ||
} | ||
|
||
// Server implements the zero server. | ||
type Server struct { | ||
x.SafeMutex | ||
|
@@ -264,6 +270,26 @@ func (s *Server) updateZeroLeader() { | |
} | ||
} | ||
|
||
// updateEnterpriseState periodically checks the validity of the enterprise license | ||
// based on its expiry. | ||
func (s *Server) updateEnterpriseState() { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
||
// Return early if enterprise is not enabled. This would happen when user didn't supply us a | ||
// license file yet. | ||
if s.state.GetEnterprise() == nil { | ||
return | ||
} | ||
|
||
expiry := time.Unix(s.state.Enterprise.ExpiryTs, 0) | ||
if time.Now().Before(expiry) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like could potentially simplify this to one line since you already have a boolean expression:
|
||
s.state.Enterprise.Enabled = true | ||
} else { | ||
s.state.Enterprise.Enabled = false | ||
} | ||
} | ||
|
||
func (s *Server) removeZero(nodeId uint64) { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
@@ -397,7 +423,7 @@ func (s *Server) removeNode(ctx context.Context, nodeId uint64, groupId uint32) | |
return s.Node.proposeAndWait(ctx, zp) | ||
} | ||
|
||
// Connect is used to connect the very first time with group zero. | ||
// Connect is used by Alpha nodes to connect the very first time with group zero. | ||
func (s *Server) Connect(ctx context.Context, | ||
m *pb.Member) (resp *pb.ConnectionState, err error) { | ||
// Ensures that connect requests are always serialized | ||
|
@@ -435,10 +461,13 @@ func (s *Server) Connect(ctx context.Context, | |
} | ||
} | ||
|
||
numberOfNodes := len(ms.Zeros) | ||
for _, group := range ms.Groups { | ||
for _, member := range group.Members { | ||
switch { | ||
case member.Addr == m.Addr && m.Id == 0: | ||
// TODO - Verify if we need the m.Id == 0 condition here and why. | ||
// If we have this member, then we should just connect to it and return. | ||
case member.Addr == m.Addr: | ||
glog.Infof("Found a member with the same address. Returning: %+v", member) | ||
conn.GetPools().Connect(m.Addr) | ||
return &pb.ConnectionState{ | ||
|
@@ -460,9 +489,18 @@ func (s *Server) Connect(ctx context.Context, | |
" with same ID: %+v", member) | ||
} | ||
} | ||
numberOfNodes++ | ||
} | ||
} | ||
|
||
// TODO - Zero MaxNodes should probably be an error. | ||
maxNodes := s.state.GetEnterprise().GetMaxNodes() | ||
if s.state.GetEnterprise().GetEnabled() && maxNodes != 0 && | ||
uint64(numberOfNodes) >= maxNodes { | ||
return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+ | ||
"number of nodes: [%v] permitted for your enterprise license.", maxNodes) | ||
} | ||
|
||
// Create a connection and check validity of the address by doing an Echo. | ||
conn.GetPools().Connect(m.Addr) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to close the
sf
file handle at some point or possiblydefer sf.Close()
after checking the error below?