Skip to content

Commit

Permalink
Default use random port allocator for runtime controllers (#2580) (#2621
Browse files Browse the repository at this point in the history
)

* Validate port allocating policy before setting up



* Expose configurable option for port allocating policy



* Fix unit tests



* Fix static check



* Fix static check



* Fix comments for `SetupRuntimePortAllocator()`



* Support configurable port allocate policy



---------

Signed-off-by: dongyun.xzh <[email protected]>
Signed-off-by: cheyang <[email protected]>
Co-authored-by: TzZtzt <[email protected]>
  • Loading branch information
cheyang and TrafalgarZZZ authored Feb 18, 2023
1 parent 990a0b9 commit 885b5a7
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ spec:
- --runtime-workers={{ .Values.runtime.goosefs.runtimeWorkers }}
- --pprof-addr=:6060
- --enable-leader-election
- --port-allocate-policy={{ .Values.runtime.goosefs.portAllocatePolicy }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spec:
- --runtime-workers={{ .Values.runtime.jindo.runtimeWorkers }}
- --pprof-addr=:6060
- --enable-leader-election
- --port-allocate-policy={{ .Values.runtime.jindo.portAllocatePolicy }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
4 changes: 3 additions & 1 deletion charts/fluid/fluid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 20000-26000
portAllocatePolicy: bitmap
portAllocatePolicy: random
enabled: false
init:
image: fluidcloudnative/init-users:v0.8.3-336791f
Expand All @@ -50,6 +50,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 18000-19999
portAllocatePolicy: random
enabled: false
engine: jindofsx
queryUfsTotal: true
Expand All @@ -67,6 +68,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 26000-32000
portAllocatePolicy: random
enabled: false
init:
image: fluidcloudnative/init-users:v0.8.3-336791f
Expand Down
9 changes: 7 additions & 2 deletions cmd/alluxio/app/alluxio.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func init() {
alluxioCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for Alluxio")
alluxioCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for AlluxioRuntime controller")
alluxioCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "bitmap", "Set port allocating policy, available choice is bitmap or random(default bitmap).")
alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
}

func handle() {
Expand Down Expand Up @@ -128,7 +128,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocatorWithType(mgr.GetClient(), pr, portallocator.AllocatePolicy(portAllocatePolicy), alluxio.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, alluxio.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting alluxioruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
9 changes: 8 additions & 1 deletion cmd/goosefs/app/goosefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
portRange string
maxConcurrentReconciles int
pprofAddr string
portAllocatePolicy string
)

var cmd = &cobra.Command{
Expand All @@ -73,6 +74,7 @@ func init() {
startCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
startCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
startCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for GooseFS")
startCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
startCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
startCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for GooseFSRuntime controller")
cmd.AddCommand(startCmd)
Expand Down Expand Up @@ -129,7 +131,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, goosefs.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, goosefs.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting goosefsruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions cmd/jindo/app/jindo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ var (
development bool
// The new mode
eventDriven bool
portRange string
maxConcurrentReconciles int
pprofAddr string
portRange string
portAllocatePolicy string
)

var jindoCmd = &cobra.Command{
Expand All @@ -74,6 +75,7 @@ func init() {
jindoCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
jindoCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
jindoCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "18000-19999", "Set available port range for Jindo")
jindoCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
jindoCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for JindoRuntime controller")
jindoCmd.Flags().BoolVar(&eventDriven, "event-driven", true, "The reconciler's loop strategy. if it's false, it indicates period driven.")
jindoCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
Expand Down Expand Up @@ -130,7 +132,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, jindo.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, jindo.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting jindoruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddc/alluxio/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
}
portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -238,8 +241,11 @@ func TestGenerateAlluxioValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/shutdown_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func TestShutdown(t *testing.T) {

pr := net.ParsePortRangeOrDie("20000-21000")

portallocator.SetupRuntimePortAllocator(nil, pr, dummy)
err = portallocator.SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}

var testCase = []struct {
expectedWorkers int32
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ func TestAlluxioEngineReleasePorts(t *testing.T) {
Log: fake.NullLogger(),
}

portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
allocator, _ := portallocator.GetRuntimePortAllocator()
patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts",
func(_ *portallocator.RuntimePortAllocator, ports []int) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddc/alluxio/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ limitations under the License.
package alluxio

import (
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"reflect"
"testing"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down
25 changes: 21 additions & 4 deletions pkg/ddc/base/portallocator/port_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package portallocator

import (
"fmt"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/net"
Expand All @@ -31,6 +33,15 @@ const (
BitMap AllocatePolicy = "bitmap"
)

func ValidateEnum(allocatePolicyStr string) (AllocatePolicy, error) {
switch AllocatePolicy(allocatePolicyStr) {
case Random, BitMap:
return AllocatePolicy(allocatePolicyStr), nil
default:
return AllocatePolicy(allocatePolicyStr), fmt.Errorf("runtime-port-allocator can only be random or bitmap")
}
}

type BatchAllocatorInterface interface {
Allocate(int) error

Expand Down Expand Up @@ -60,9 +71,15 @@ type RuntimePortAllocator struct {
// rpa is a global singleton of type RuntimePortAllocator
var rpa *RuntimePortAllocator

// SetupRuntimePortAllocator instantiates the global singleton rpa, use BitMap port allocating policy
func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, getReservedPorts func(client client.Client) (ports []int, err error)) {
SetupRuntimePortAllocatorWithType(client, pr, BitMap, getReservedPorts)
// SetupRuntimePortAllocator instantiates the global singleton rpa, set up port allocating policy according to the given allocatePolicyStr.
// Currently the valid policies are either "random" or "bitmap".
func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, allocatePolicyStr string, getReservedPorts func(client client.Client) (ports []int, err error)) error {
policy, err := ValidateEnum(allocatePolicyStr)
if err != nil {
return err
}
SetupRuntimePortAllocatorWithType(client, pr, policy, getReservedPorts)
return nil
}

// SetupRuntimePortAllocatorWithType instantiates the global singleton rpa with specified port allocating policy
Expand Down Expand Up @@ -90,7 +107,7 @@ func (alloc *RuntimePortAllocator) createAndRestorePortAllocator() (err error) {
case BitMap:
alloc.pa, err = newBitMapAllocator(alloc.pr, alloc.log)
default:
err = errors.New("allocate-port-policy can only be random or bitmap")
err = errors.New("runtime-port-allocator can only be random or bitmap")
}

if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/ddc/base/portallocator/port_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,24 @@ var errDummy = func(client client.Client) (ports []int, err error) {

func TestRuntimePortAllocatorWithError(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
SetupRuntimePortAllocator(nil, pr, errDummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", errDummy)
if err != nil {
t.Fatalf("failed to setup runtime port allocator due to %v", err)
}

_, err := GetRuntimePortAllocator()
_, err = GetRuntimePortAllocator()
if err == nil {
t.Errorf("Expecetd error when GetRuntimePortAllocator")
}
}

func TestRuntimePortAllocator(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
SetupRuntimePortAllocator(nil, pr, dummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Errorf("get non-nil err when GetRuntimePortAllocator")
return
}

allocator, err := GetRuntimePortAllocator()
if err != nil {
Expand Down Expand Up @@ -71,7 +78,11 @@ func TestRuntimePortAllocator(t *testing.T) {

func TestRuntimePortAllocatorRelease(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-20010")
SetupRuntimePortAllocator(nil, pr, dummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Errorf("get non-nil err when GetRuntimePortAllocator")
return
}

preservedPorts, _ := dummy(nil)

Expand Down
15 changes: 10 additions & 5 deletions pkg/ddc/goosefs/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ func TestSetupMasterInternal(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}

err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)

if err != nil {

Expand Down Expand Up @@ -400,9 +403,11 @@ func TestGenerateGooseFSValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)

err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)

if err != nil {

Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/goosefs/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,10 @@ func TestGooseFSEngineReleasePorts(t *testing.T) {
Log: fake.NullLogger(),
}

portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts)
if err != nil {
t.Fatalf("Failed to set up runtime port allocator due to %v", err)
}
allocator, _ := portallocator.GetRuntimePortAllocator()
patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts",
func(_ *portallocator.RuntimePortAllocator, ports []int) {
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddc/jindo/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
}
portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -234,8 +237,11 @@ func TestGenerateJindoValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddc/jindofsx/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
}
portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -234,8 +237,11 @@ func TestGenerateJindoValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
Loading

0 comments on commit 885b5a7

Please sign in to comment.