Skip to content

Commit

Permalink
Merge branch 'main' of github.com:ava-labs/avalanche-network-runner i…
Browse files Browse the repository at this point in the history
…nto add-upgradesconfig
  • Loading branch information
darioush committed Aug 17, 2022
2 parents 69a67ed + f5f0419 commit dae8043
Show file tree
Hide file tree
Showing 65 changed files with 3,811 additions and 4,169 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/build-test-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: "v1.45"
version: "v1.47.0"
working-directory: .
args: --timeout 3m
unit_test:
Expand All @@ -38,7 +38,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.17.2" # The Go version to use.
go-version: 1.18
- run: go test -v -timeout 10m -race ./...
e2e_test:
name: e2e tests
Expand All @@ -51,10 +51,10 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.18
- name: Run e2e tests
shell: bash
run: scripts/tests.e2e.sh 1.7.12 1.7.13
run: scripts/tests.e2e.sh 1.7.15 1.7.16 0.2.4
release:
needs: [lint_test, unit_test, e2e_test]
runs-on: ubuntu-latest
Expand All @@ -66,7 +66,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.18
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run:
- api/mocks
- local/mocks
- rpcpb
go: "1.17"
go: "1.18"

linters:
disable:
Expand Down
219 changes: 103 additions & 116 deletions README.md

Large diffs are not rendered by default.

23 changes: 16 additions & 7 deletions api/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,35 @@ type EthClient interface {
// ethClient websocket ethclient.Client with mutexed api calls and lazy conn (on first call)
// All calls are wrapped in a mutex, and try to create a connection if it doesn't exist yet
type ethClient struct {
ipAddr string
port uint
client ethclient.Client
lock sync.Mutex
ipAddr string
chainID string
port uint
client ethclient.Client
lock sync.Mutex
}

// NewEthClient mainly takes ip/port info for usage in future calls
// Connection can't be initialized in constructor because node is not ready when the constructor is called
// It follows convention of most avalanchego api constructors that can be called without having a ready node
func NewEthClient(ipAddr string, port uint) EthClient {
// default to using the C chain
return NewEthClientWithChainID(ipAddr, port, "C")
}

// NewEthClientWithChainID creates an EthClient initialized to connect to
// ipAddr/port and communicate with the given chainID.
func NewEthClientWithChainID(ipAddr string, port uint, chainID string) EthClient {
return &ethClient{
ipAddr: ipAddr,
port: port,
ipAddr: ipAddr,
port: port,
chainID: chainID,
}
}

// connect attempts to connect with websocket ethclient API
func (c *ethClient) connect() error {
if c.client == nil {
client, err := ethclient.Dial(fmt.Sprintf("ws://%s:%d/ext/bc/C/ws", c.ipAddr, c.port))
client, err := ethclient.Dial(fmt.Sprintf("ws://%s:%d/ext/bc/%s/ws", c.ipAddr, c.port, c.chainID))
if err != nil {
return err
}
Expand Down
94 changes: 38 additions & 56 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"time"

"github.com/ava-labs/avalanche-network-runner/local"
"github.com/ava-labs/avalanche-network-runner/pkg/logutil"
"github.com/ava-labs/avalanche-network-runner/rpcpb"
"github.com/ava-labs/avalanchego/utils/logging"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -23,7 +23,6 @@ import (
)

type Config struct {
LogLevel string
Endpoint string
DialTimeout time.Duration
}
Expand Down Expand Up @@ -52,6 +51,7 @@ type Client interface {

type client struct {
cfg Config
log logging.Logger

conn *grpc.ClientConn

Expand All @@ -62,16 +62,8 @@ type client struct {
closeOnce sync.Once
}

func New(cfg Config) (Client, error) {
lcfg := logutil.GetDefaultZapLoggerConfig()
lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(cfg.LogLevel))
logger, err := lcfg.Build()
if err != nil {
return nil, err
}
_ = zap.ReplaceGlobals(logger)

zap.L().Debug("dialing server", zap.String("endpoint", cfg.Endpoint))
func New(cfg Config, log logging.Logger) (Client, error) {
log.Debug("dialing server at ", zap.String("endpoint", cfg.Endpoint))

ctx, cancel := context.WithTimeout(context.Background(), cfg.DialTimeout)
conn, err := grpc.DialContext(
Expand All @@ -87,6 +79,7 @@ func New(cfg Config) (Client, error) {

return &client{
cfg: cfg,
log: log,
conn: conn,
pingc: rpcpb.NewPingServiceClient(conn),
controlc: rpcpb.NewControlServiceClient(conn),
Expand All @@ -95,7 +88,7 @@ func New(cfg Config) (Client, error) {
}

func (c *client) Ping(ctx context.Context) (*rpcpb.PingResponse, error) {
zap.L().Info("ping")
c.log.Info("ping")

// ref. https://grpc-ecosystem.github.io/grpc-gateway/docs/tutorials/adding_annotations/
// curl -X POST -k http://localhost:8081/v1/ping -d ''
Expand All @@ -120,8 +113,8 @@ func (c *client) Start(ctx context.Context, execPath string, opts ...OpOption) (
if ret.pluginDir != "" {
req.PluginDir = &ret.pluginDir
}
if len(ret.customVMs) > 0 {
req.CustomVms = ret.customVMs
if len(ret.blockchainSpecs) > 0 {
req.BlockchainSpecs = ret.blockchainSpecs
}
if ret.globalNodeConfig != "" {
req.GlobalNodeConfig = &ret.globalNodeConfig
Expand All @@ -130,7 +123,7 @@ func (c *client) Start(ctx context.Context, execPath string, opts ...OpOption) (
req.CustomNodeConfigs = ret.customNodeConfigs
}

zap.L().Info("start")
c.log.Info("start")
return c.controlc.Start(ctx, req)
}

Expand All @@ -139,7 +132,7 @@ func (c *client) CreateBlockchains(ctx context.Context, blockchainSpecs []*rpcpb
BlockchainSpecs: blockchainSpecs,
}

zap.L().Info("create blockchains")
c.log.Info("create blockchains")
return c.controlc.CreateBlockchains(ctx, req)
}

Expand All @@ -153,17 +146,17 @@ func (c *client) CreateSubnets(ctx context.Context, opts ...OpOption) (*rpcpb.Cr
req.NumSubnets = &ret.numSubnets
}

zap.L().Info("create subnets")
c.log.Info("create subnets")
return c.controlc.CreateSubnets(ctx, req)
}

func (c *client) Health(ctx context.Context) (*rpcpb.HealthResponse, error) {
zap.L().Info("health")
c.log.Info("health")
return c.controlc.Health(ctx, &rpcpb.HealthRequest{})
}

func (c *client) URIs(ctx context.Context) ([]string, error) {
zap.L().Info("uris")
c.log.Info("uris")
resp, err := c.controlc.URIs(ctx, &rpcpb.URIsRequest{})
if err != nil {
return nil, err
Expand All @@ -172,7 +165,7 @@ func (c *client) URIs(ctx context.Context) ([]string, error) {
}

func (c *client) Status(ctx context.Context) (*rpcpb.StatusResponse, error) {
zap.L().Info("status")
c.log.Info("status")
return c.controlc.Status(ctx, &rpcpb.StatusRequest{})
}

Expand All @@ -187,10 +180,10 @@ func (c *client) StreamStatus(ctx context.Context, pushInterval time.Duration) (
ch := make(chan *rpcpb.ClusterInfo, 1)
go func() {
defer func() {
zap.L().Debug("closing stream send", zap.Error(stream.CloseSend()))
c.log.Debug("closing stream send", zap.Error(stream.CloseSend()))
close(ch)
}()
zap.L().Info("start receive routine")
c.log.Info("start receive routine")
for {
select {
case <-ctx.Done():
Expand All @@ -209,13 +202,13 @@ func (c *client) StreamStatus(ctx context.Context, pushInterval time.Duration) (
}

if errors.Is(err, io.EOF) {
zap.L().Debug("received EOF from client; returning to close the stream from server side")
c.log.Debug("received EOF from client; returning to close the stream from server side")
return
}
if isClientCanceled(stream.Context().Err(), err) {
zap.L().Warn("failed to receive status request from gRPC stream due to client cancellation", zap.Error(err))
c.log.Warn("failed to receive status request from gRPC stream due to client cancellation", zap.Error(err))
} else {
zap.L().Warn("failed to receive status request from gRPC stream", zap.Error(err))
c.log.Warn("failed to receive status request from gRPC stream", zap.Error(err))
}
return
}
Expand All @@ -224,7 +217,7 @@ func (c *client) StreamStatus(ctx context.Context, pushInterval time.Duration) (
}

func (c *client) Stop(ctx context.Context) (*rpcpb.StopResponse, error) {
zap.L().Info("stop")
c.log.Info("stop")
return c.controlc.Stop(ctx, &rpcpb.StopRequest{})
}

Expand All @@ -233,27 +226,19 @@ func (c *client) AddNode(ctx context.Context, name string, execPath string, opts
ret.applyOpts(opts)

req := &rpcpb.AddNodeRequest{
Name: name,
StartRequest: &rpcpb.StartRequest{},
Name: name,
ExecPath: execPath,
NodeConfig: &ret.globalNodeConfig,
ChainConfigs: ret.chainConfigs,
UpgradeConfigs: ret.upgradeConfigs,
}
if ret.whitelistedSubnets != "" {
req.StartRequest.WhitelistedSubnets = &ret.whitelistedSubnets
}
if ret.execPath != "" {
req.StartRequest.ExecPath = ret.execPath
}
if ret.pluginDir != "" {
req.StartRequest.PluginDir = &ret.pluginDir
}
req.StartRequest.ChainConfigs = ret.chainConfigs
req.StartRequest.UpgradeConfigs = ret.upgradeConfigs

zap.L().Info("add node", zap.String("name", name))
c.log.Info("add node", zap.String("name", name))
return c.controlc.AddNode(ctx, req)
}

func (c *client) RemoveNode(ctx context.Context, name string) (*rpcpb.RemoveNodeResponse, error) {
zap.L().Info("remove node", zap.String("name", name))
c.log.Info("remove node", zap.String("name", name))
return c.controlc.RemoveNode(ctx, &rpcpb.RemoveNodeRequest{Name: name})
}

Expand All @@ -268,23 +253,20 @@ func (c *client) RestartNode(ctx context.Context, name string, opts ...OpOption)
if ret.whitelistedSubnets != "" {
req.WhitelistedSubnets = &ret.whitelistedSubnets
}
if ret.rootDataDir != "" {
req.RootDataDir = &ret.rootDataDir
}
req.ChainConfigs = ret.chainConfigs
req.UpgradeConfigs = ret.upgradeConfigs

zap.L().Info("restart node", zap.String("name", name))
c.log.Info("restart node", zap.String("name", name))
return c.controlc.RestartNode(ctx, req)
}

func (c *client) AttachPeer(ctx context.Context, nodeName string) (*rpcpb.AttachPeerResponse, error) {
zap.L().Info("attaching peer", zap.String("node-name", nodeName))
c.log.Info("attaching peer", zap.String("name", nodeName))
return c.controlc.AttachPeer(ctx, &rpcpb.AttachPeerRequest{NodeName: nodeName})
}

func (c *client) SendOutboundMessage(ctx context.Context, nodeName string, peerID string, op uint32, msgBody []byte) (*rpcpb.SendOutboundMessageResponse, error) {
zap.L().Info("sending outbound message", zap.String("node-name", nodeName), zap.String("peer-id", peerID))
c.log.Info("sending outbound message", zap.String("name", nodeName), zap.String("peer-ID", peerID))
return c.controlc.SendOutboundMessage(ctx, &rpcpb.SendOutboundMessageRequest{
NodeName: nodeName,
PeerId: peerID,
Expand All @@ -294,12 +276,12 @@ func (c *client) SendOutboundMessage(ctx context.Context, nodeName string, peerI
}

func (c *client) SaveSnapshot(ctx context.Context, snapshotName string) (*rpcpb.SaveSnapshotResponse, error) {
zap.L().Info("save snapshot", zap.String("snapshot-name", snapshotName))
c.log.Info("save snapshot", zap.String("snapshot-name", snapshotName))
return c.controlc.SaveSnapshot(ctx, &rpcpb.SaveSnapshotRequest{SnapshotName: snapshotName})
}

func (c *client) LoadSnapshot(ctx context.Context, snapshotName string, opts ...OpOption) (*rpcpb.LoadSnapshotResponse, error) {
zap.L().Info("load snapshot", zap.String("snapshot-name", snapshotName))
c.log.Info("load snapshot", zap.String("snapshot-name", snapshotName))
ret := &Op{}
ret.applyOpts(opts)
req := rpcpb.LoadSnapshotRequest{
Expand All @@ -322,12 +304,12 @@ func (c *client) LoadSnapshot(ctx context.Context, snapshotName string, opts ...
}

func (c *client) RemoveSnapshot(ctx context.Context, snapshotName string) (*rpcpb.RemoveSnapshotResponse, error) {
zap.L().Info("remove snapshot", zap.String("snapshot-name", snapshotName))
c.log.Info("remove snapshot", zap.String("snapshot-name", snapshotName))
return c.controlc.RemoveSnapshot(ctx, &rpcpb.RemoveSnapshotRequest{SnapshotName: snapshotName})
}

func (c *client) GetSnapshotNames(ctx context.Context) ([]string, error) {
zap.L().Info("get snapshot names")
c.log.Info("get snapshot names")
resp, err := c.controlc.GetSnapshotNames(ctx, &rpcpb.GetSnapshotNamesRequest{})
if err != nil {
return nil, err
Expand All @@ -349,7 +331,7 @@ type Op struct {
globalNodeConfig string
rootDataDir string
pluginDir string
customVMs map[string]string
blockchainSpecs []*rpcpb.BlockchainSpec
customNodeConfigs map[string]string
numSubnets uint32
chainConfigs map[string]string
Expand Down Expand Up @@ -400,10 +382,10 @@ func WithPluginDir(pluginDir string) OpOption {
}
}

// Map from VM name to its genesis path.
func WithCustomVMs(customVMs map[string]string) OpOption {
// Slice of BlockchainSpec
func WithBlockchainSpecs(blockchainSpecs []*rpcpb.BlockchainSpec) OpOption {
return func(op *Op) {
op.customVMs = customVMs
op.blockchainSpecs = blockchainSpecs
}
}

Expand Down
Loading

0 comments on commit dae8043

Please sign in to comment.