Skip to content

Commit

Permalink
Default use random port allocator for runtime controllers (fluid-clou…
Browse files Browse the repository at this point in the history
…dnative#2580)

* Validate port allocating policy before setting up

Signed-off-by: dongyun.xzh <[email protected]>

* Expose configurable option for port allocating policy

Signed-off-by: dongyun.xzh <[email protected]>

* Fix unit tests

Signed-off-by: dongyun.xzh <[email protected]>

* Fix static check

Signed-off-by: dongyun.xzh <[email protected]>

* Fix static check

Signed-off-by: dongyun.xzh <[email protected]>

* Fix comments for `SetupRuntimePortAllocator()`

Signed-off-by: dongyun.xzh <[email protected]>

* Support configurable port allocate policy

Signed-off-by: dongyun.xzh <[email protected]>

---------

Signed-off-by: dongyun.xzh <[email protected]>
Signed-off-by: cheyang <[email protected]>
  • Loading branch information
TrafalgarZZZ authored and cheyang committed Feb 18, 2023
1 parent 193366d commit 4d0b3bc
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ spec:
- --runtime-workers={{ .Values.runtime.goosefs.runtimeWorkers }}
- --pprof-addr=:6060
- --enable-leader-election
- --port-allocate-policy={{ .Values.runtime.goosefs.portAllocatePolicy }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spec:
- --runtime-workers={{ .Values.runtime.jindo.runtimeWorkers }}
- --pprof-addr=:6060
- --enable-leader-election
- --port-allocate-policy={{ .Values.runtime.jindo.portAllocatePolicy }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
4 changes: 3 additions & 1 deletion charts/fluid/fluid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 20000-26000
portAllocatePolicy: bitmap
portAllocatePolicy: random
enabled: false
init:
image: fluidcloudnative/init-users:v0.8.3-336791f
Expand All @@ -50,6 +50,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 18000-19999
portAllocatePolicy: random
enabled: false
engine: jindofsx
queryUfsTotal: true
Expand All @@ -67,6 +68,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 26000-32000
portAllocatePolicy: random
enabled: false
init:
image: fluidcloudnative/init-users:v0.8.3-336791f
Expand Down
9 changes: 7 additions & 2 deletions cmd/alluxio/app/alluxio.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func init() {
alluxioCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for Alluxio")
alluxioCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for AlluxioRuntime controller")
alluxioCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "bitmap", "Set port allocating policy, available choice is bitmap or random(default bitmap).")
alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
}

func handle() {
Expand Down Expand Up @@ -128,7 +128,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocatorWithType(mgr.GetClient(), pr, portallocator.AllocatePolicy(portAllocatePolicy), alluxio.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, alluxio.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting alluxioruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
9 changes: 8 additions & 1 deletion cmd/goosefs/app/goosefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
portRange string
maxConcurrentReconciles int
pprofAddr string
portAllocatePolicy string
)

var cmd = &cobra.Command{
Expand All @@ -73,6 +74,7 @@ func init() {
startCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
startCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
startCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for GooseFS")
startCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
startCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
startCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for GooseFSRuntime controller")
cmd.AddCommand(startCmd)
Expand Down Expand Up @@ -129,7 +131,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, goosefs.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, goosefs.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting goosefsruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions cmd/jindo/app/jindo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ var (
development bool
// The new mode
eventDriven bool
portRange string
maxConcurrentReconciles int
pprofAddr string
portRange string
portAllocatePolicy string
)

var jindoCmd = &cobra.Command{
Expand All @@ -74,6 +75,7 @@ func init() {
jindoCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
jindoCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
jindoCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "18000-19999", "Set available port range for Jindo")
jindoCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
jindoCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for JindoRuntime controller")
jindoCmd.Flags().BoolVar(&eventDriven, "event-driven", true, "The reconciler's loop strategy. if it's false, it indicates period driven.")
jindoCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
Expand Down Expand Up @@ -130,7 +132,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, jindo.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, jindo.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting jindoruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddc/alluxio/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
}
portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -238,8 +241,11 @@ func TestGenerateAlluxioValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/shutdown_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func TestShutdown(t *testing.T) {

pr := net.ParsePortRangeOrDie("20000-21000")

portallocator.SetupRuntimePortAllocator(nil, pr, dummy)
err = portallocator.SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}

var testCase = []struct {
expectedWorkers int32
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ func TestAlluxioEngineReleasePorts(t *testing.T) {
Log: fake.NullLogger(),
}

portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
allocator, _ := portallocator.GetRuntimePortAllocator()
patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts",
func(_ *portallocator.RuntimePortAllocator, ports []int) {
Expand Down
40 changes: 35 additions & 5 deletions pkg/ddc/alluxio/transform_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -16,11 +13,12 @@ limitations under the License.
package alluxio

import (
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"reflect"
"testing"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -176,6 +174,7 @@ func TestTransformWorkers(t *testing.T) {
Worker: datav1alpha1.AlluxioCompTemplateSpec{
NetworkMode: datav1alpha1.HostNetworkMode,
},
TieredStore: datav1alpha1.TieredStore{},
},
},
wantValue: &Alluxio{
Expand All @@ -184,11 +183,33 @@ func TestTransformWorkers(t *testing.T) {
},
},
},
"test network mode case 4": {
runtime: &datav1alpha1.AlluxioRuntime{
Spec: datav1alpha1.AlluxioRuntimeSpec{
Worker: datav1alpha1.AlluxioCompTemplateSpec{
NetworkMode: datav1alpha1.HostNetworkMode,
NodeSelector: map[string]string{
"workerSelector": "true",
},
},
TieredStore: datav1alpha1.TieredStore{},
},
},
wantValue: &Alluxio{
Worker: Worker{
HostNetwork: true,
NodeSelector: map[string]string{
"workerSelector": "true",
},
},
},
},
}

engine := &AlluxioEngine{Log: fake.NullLogger()}
for k, v := range testCases {
gotValue := &Alluxio{}
engine.runtimeInfo, _ = base.BuildRuntimeInfo("test", "test", "alluxio", v.runtime.Spec.TieredStore)
if err := engine.transformWorkers(v.runtime, gotValue); err == nil {
if gotValue.Worker.HostNetwork != v.wantValue.Worker.HostNetwork {
t.Errorf("check %s failure, got:%t,want:%t",
Expand All @@ -197,6 +218,15 @@ func TestTransformWorkers(t *testing.T) {
v.wantValue.Worker.HostNetwork,
)
}
if len(v.wantValue.Worker.NodeSelector) > 0 {
if !reflect.DeepEqual(v.wantValue.Worker.NodeSelector, gotValue.Worker.NodeSelector) {
t.Errorf("check %s failure, got:%v,want:%v",
k,
gotValue.Worker.NodeSelector,
v.wantValue.Worker.NodeSelector,
)
}
}
}
}
}
Expand Down
25 changes: 21 additions & 4 deletions pkg/ddc/base/portallocator/port_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package portallocator

import (
"fmt"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/net"
Expand All @@ -31,6 +33,15 @@ const (
BitMap AllocatePolicy = "bitmap"
)

func ValidateEnum(allocatePolicyStr string) (AllocatePolicy, error) {
switch AllocatePolicy(allocatePolicyStr) {
case Random, BitMap:
return AllocatePolicy(allocatePolicyStr), nil
default:
return AllocatePolicy(allocatePolicyStr), fmt.Errorf("runtime-port-allocator can only be random or bitmap")
}
}

type BatchAllocatorInterface interface {
Allocate(int) error

Expand Down Expand Up @@ -60,9 +71,15 @@ type RuntimePortAllocator struct {
// rpa is a global singleton of type RuntimePortAllocator
var rpa *RuntimePortAllocator

// SetupRuntimePortAllocator instantiates the global singleton rpa, use BitMap port allocating policy
func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, getReservedPorts func(client client.Client) (ports []int, err error)) {
SetupRuntimePortAllocatorWithType(client, pr, BitMap, getReservedPorts)
// SetupRuntimePortAllocator instantiates the global singleton rpa, set up port allocating policy according to the given allocatePolicyStr.
// Currently the valid policies are either "random" or "bitmap".
func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, allocatePolicyStr string, getReservedPorts func(client client.Client) (ports []int, err error)) error {
policy, err := ValidateEnum(allocatePolicyStr)
if err != nil {
return err
}
SetupRuntimePortAllocatorWithType(client, pr, policy, getReservedPorts)
return nil
}

// SetupRuntimePortAllocatorWithType instantiates the global singleton rpa with specified port allocating policy
Expand Down Expand Up @@ -90,7 +107,7 @@ func (alloc *RuntimePortAllocator) createAndRestorePortAllocator() (err error) {
case BitMap:
alloc.pa, err = newBitMapAllocator(alloc.pr, alloc.log)
default:
err = errors.New("allocate-port-policy can only be random or bitmap")
err = errors.New("runtime-port-allocator can only be random or bitmap")
}

if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/ddc/base/portallocator/port_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,24 @@ var errDummy = func(client client.Client) (ports []int, err error) {

func TestRuntimePortAllocatorWithError(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
SetupRuntimePortAllocator(nil, pr, errDummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", errDummy)
if err != nil {
t.Fatalf("failed to setup runtime port allocator due to %v", err)
}

_, err := GetRuntimePortAllocator()
_, err = GetRuntimePortAllocator()
if err == nil {
t.Errorf("Expecetd error when GetRuntimePortAllocator")
}
}

func TestRuntimePortAllocator(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
SetupRuntimePortAllocator(nil, pr, dummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Errorf("get non-nil err when GetRuntimePortAllocator")
return
}

allocator, err := GetRuntimePortAllocator()
if err != nil {
Expand Down Expand Up @@ -71,7 +78,11 @@ func TestRuntimePortAllocator(t *testing.T) {

func TestRuntimePortAllocatorRelease(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-20010")
SetupRuntimePortAllocator(nil, pr, dummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Errorf("get non-nil err when GetRuntimePortAllocator")
return
}

preservedPorts, _ := dummy(nil)

Expand Down
15 changes: 10 additions & 5 deletions pkg/ddc/goosefs/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ func TestSetupMasterInternal(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}

err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)

if err != nil {

Expand Down Expand Up @@ -400,9 +403,11 @@ func TestGenerateGooseFSValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)

err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)

if err != nil {

Expand Down
Loading

0 comments on commit 4d0b3bc

Please sign in to comment.