Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions go/vt/topo/etcd2topo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,38 @@ limitations under the License.
package etcd2topo

import (
"flag"
"context"
"fmt"
"path"

"context"
"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/servenv"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
)

var (
leaseTTL = flag.Int("topo_etcd_lease_ttl", 30, "Lease TTL for locks and leader election. The client will use KeepAlive to keep the lease going.")
leaseTTL = 30
)

func init() {
for _, cmd := range []string{"vttablet", "vtctl", "vtctld", "mysqlctl", "mysqlctld", "vttestserver", "vtcombo", "vtctldclient", "vtexplain", "vtgate",
"vtgr", "vtorc", "vtbackup"} {
servenv.OnParseFor(cmd, registerEtcd2TopoLockFlags)
}
}

func registerEtcd2TopoLockFlags(fs *pflag.FlagSet) {
fs.IntVar(&leaseTTL, "topo_etcd_lease_ttl", leaseTTL, "Lease TTL for locks and leader election. The client will use KeepAlive to keep the lease going.")
}

// newUniqueEphemeralKV creates a new file in the provided directory.
// It is linked to the Lease.
// Errors returned are converted to topo errors.
Expand Down Expand Up @@ -141,7 +153,7 @@ func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.Lock
nodePath = path.Join(s.root, nodePath, locksPath)

// Get a lease, set its KeepAlive.
lease, err := s.cli.Grant(ctx, int64(*leaseTTL))
lease, err := s.cli.Grant(ctx, int64(leaseTTL))
if err != nil {
return nil, convertError(err, nodePath)
}
Expand Down
33 changes: 23 additions & 10 deletions go/vt/topo/etcd2topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,25 @@ package etcd2topo
import (
"crypto/tls"
"crypto/x509"
"flag"
"strings"
"time"

"google.golang.org/grpc"
"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/servenv"

"go.etcd.io/etcd/client/pkg/v3/tlsutil"
"google.golang.org/grpc"

clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/vt/topo"
)

var (
clientCertPath = flag.String("topo_etcd_tls_cert", "", "path to the client cert to use to connect to the etcd topo server, requires topo_etcd_tls_key, enables TLS")
clientKeyPath = flag.String("topo_etcd_tls_key", "", "path to the client key to use to connect to the etcd topo server, enables TLS")
serverCaPath = flag.String("topo_etcd_tls_ca", "", "path to the ca to use to validate the server cert when connecting to the etcd topo server")
clientCertPath string
clientKeyPath string
serverCaPath string
)

// Factory is the consul topo.Factory implementation.
Expand All @@ -78,6 +81,20 @@ type Server struct {
running chan struct{}
}

func init() {
for _, cmd := range []string{"vttablet", "vtctl", "vtctld", "mysqlctl", "mysqlctld", "vttestserver", "vtcombo", "vtctldclient", "vtexplain", "vtgate",
"vtgr", "vtorc", "vtbackup"} {
servenv.OnParseFor(cmd, registerEtcd2TopoFlags)
}
topo.RegisterFactory("etcd2", Factory{})
}

func registerEtcd2TopoFlags(fs *pflag.FlagSet) {
fs.StringVar(&clientCertPath, "topo_etcd_tls_cert", clientCertPath, "path to the client cert to use to connect to the etcd topo server, requires topo_etcd_tls_key, enables TLS")
fs.StringVar(&clientKeyPath, "topo_etcd_tls_key", clientKeyPath, "path to the client key to use to connect to the etcd topo server, enables TLS")
fs.StringVar(&serverCaPath, "topo_etcd_tls_ca", serverCaPath, "path to the ca to use to validate the server cert when connecting to the etcd topo server")
}

// Close implements topo.Server.Close.
// It will nil out the global and cells fields, so any attempt to
// re-use this server will panic.
Expand Down Expand Up @@ -153,9 +170,5 @@ func NewServerWithOpts(serverAddr, root, certPath, keyPath, caPath string) (*Ser
func NewServer(serverAddr, root string) (*Server, error) {
// TODO: Rename this to a name to signifies this function uses the process-wide TLS settings.

return NewServerWithOpts(serverAddr, root, *clientCertPath, *clientKeyPath, *serverCaPath)
}

func init() {
topo.RegisterFactory("etcd2", Factory{})
return NewServerWithOpts(serverAddr, root, clientCertPath, clientKeyPath, serverCaPath)
}
4 changes: 2 additions & 2 deletions go/vt/topo/etcd2topo/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func testKeyspaceLock(t *testing.T, ts *topo.Server) {
}

// Long TTL, unlock before lease runs out.
*leaseTTL = 1000
leaseTTL = 1000
lockDescriptor, err := conn.Lock(ctx, keyspacePath, "ttl")
if err != nil {
t.Fatalf("Lock failed: %v", err)
Expand All @@ -281,7 +281,7 @@ func testKeyspaceLock(t *testing.T, ts *topo.Server) {
}

// Short TTL, make sure it doesn't expire.
*leaseTTL = 1
leaseTTL = 1
lockDescriptor, err = conn.Lock(ctx, keyspacePath, "short ttl")
if err != nil {
t.Fatalf("Lock failed: %v", err)
Expand Down