Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default use random port allocator for runtime controllers #2580

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ spec:
- --runtime-workers={{ .Values.runtime.goosefs.runtimeWorkers }}
- --pprof-addr=:6060
- --enable-leader-election
- --port-allocate-policy={{ .Values.runtime.goosefs.portAllocatePolicy }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spec:
- --runtime-workers={{ .Values.runtime.jindo.runtimeWorkers }}
- --pprof-addr=:6060
- --enable-leader-election
- --port-allocate-policy={{ .Values.runtime.jindo.portAllocatePolicy }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
4 changes: 3 additions & 1 deletion charts/fluid/fluid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 20000-26000
portAllocatePolicy: bitmap
portAllocatePolicy: random
enabled: false
init:
image: fluidcloudnative/init-users:v0.9.0-fcf2004
Expand All @@ -48,6 +48,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 18000-19999
portAllocatePolicy: random
enabled: false
engine: jindofsx
queryUfsTotal: true
Expand All @@ -65,6 +66,7 @@ runtime:
replicas: 1
runtimeWorkers: 3
portRange: 26000-32000
portAllocatePolicy: random
enabled: false
init:
image: fluidcloudnative/init-users:v0.9.0-fcf2004
Expand Down
9 changes: 7 additions & 2 deletions cmd/alluxio/app/alluxio.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func init() {
alluxioCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for Alluxio")
alluxioCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for AlluxioRuntime controller")
alluxioCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "bitmap", "Set port allocating policy, available choice is bitmap or random(default bitmap).")
alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
}

func handle() {
Expand Down Expand Up @@ -128,7 +128,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocatorWithType(mgr.GetClient(), pr, portallocator.AllocatePolicy(portAllocatePolicy), alluxio.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, alluxio.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting alluxioruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/dataset/app/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/spf13/cobra"
zapOpt "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

Expand Down
9 changes: 8 additions & 1 deletion cmd/goosefs/app/goosefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
portRange string
maxConcurrentReconciles int
pprofAddr string
portAllocatePolicy string
)

var cmd = &cobra.Command{
Expand All @@ -73,6 +74,7 @@ func init() {
startCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
startCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
startCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for GooseFS")
startCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
startCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
startCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for GooseFSRuntime controller")
cmd.AddCommand(startCmd)
Expand Down Expand Up @@ -129,7 +131,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, goosefs.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, goosefs.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting goosefsruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions cmd/jindo/app/jindo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ var (
development bool
// The new mode
eventDriven bool
portRange string
maxConcurrentReconciles int
pprofAddr string
portRange string
portAllocatePolicy string
)

var jindoCmd = &cobra.Command{
Expand All @@ -74,6 +75,7 @@ func init() {
jindoCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
jindoCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
jindoCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "18000-19999", "Set available port range for Jindo")
jindoCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).")
jindoCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for JindoRuntime controller")
jindoCmd.Flags().BoolVar(&eventDriven, "event-driven", true, "The reconciler's loop strategy. if it's false, it indicates period driven.")
jindoCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
Expand Down Expand Up @@ -130,7 +132,12 @@ func handle() {
}
setupLog.Info("port range parsed", "port range", pr.String())

portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, jindo.GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, jindo.GetReservedPorts)
if err != nil {
setupLog.Error(err, "failed to setup runtime port allocator")
os.Exit(1)
}
setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy)

setupLog.Info("starting jindoruntime-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
cdatabackup "github.com/fluid-cloudnative/fluid/pkg/databackup"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/go-logr/logr"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand All @@ -38,6 +37,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

const controllerName string = "DataBackupController"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/v1alpha1/dataload/dataload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/v1alpha1/dataset/dataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/controllers/deploy"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"sigs.k8s.io/controller-runtime/pkg/controller"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"

Expand Down
14 changes: 10 additions & 4 deletions pkg/ddc/alluxio/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
}
portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -239,8 +242,11 @@ func TestGenerateAlluxioValueFile(t *testing.T) {
},
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts)
err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/shutdown_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func TestShutdown(t *testing.T) {

pr := net.ParsePortRangeOrDie("20000-21000")

portallocator.SetupRuntimePortAllocator(nil, pr, dummy)
err = portallocator.SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}

var testCase = []struct {
expectedWorkers int32
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ func TestAlluxioEngineReleasePorts(t *testing.T) {
Log: fake.NullLogger(),
}

portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
allocator, _ := portallocator.GetRuntimePortAllocator()
patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts",
func(_ *portallocator.RuntimePortAllocator, ports []int) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/alluxio/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,10 @@ func TestAlluxioEngine_allocateSinglePort(t *testing.T) {

func TestAlluxioEngine_allocatePorts(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
portallocator.SetupRuntimePortAllocator(nil, pr, dummy)
err := portallocator.SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Fatal(err.Error())
}
type fields struct {
runtime *datav1alpha1.AlluxioRuntime
name string
Expand Down
25 changes: 21 additions & 4 deletions pkg/ddc/base/portallocator/port_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package portallocator

import (
"fmt"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/net"
Expand All @@ -31,6 +33,15 @@ const (
BitMap AllocatePolicy = "bitmap"
)

func ValidateEnum(allocatePolicyStr string) (AllocatePolicy, error) {
switch AllocatePolicy(allocatePolicyStr) {
case Random, BitMap:
return AllocatePolicy(allocatePolicyStr), nil
default:
return AllocatePolicy(allocatePolicyStr), fmt.Errorf("runtime-port-allocator can only be random or bitmap")
}
}

type BatchAllocatorInterface interface {
Allocate(int) error

Expand Down Expand Up @@ -60,9 +71,15 @@ type RuntimePortAllocator struct {
// rpa is a global singleton of type RuntimePortAllocator
var rpa *RuntimePortAllocator

// SetupRuntimePortAllocator instantiates the global singleton rpa, use BitMap port allocating policy
func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, getReservedPorts func(client client.Client) (ports []int, err error)) {
SetupRuntimePortAllocatorWithType(client, pr, BitMap, getReservedPorts)
// SetupRuntimePortAllocator instantiates the global singleton rpa, set up port allocating policy according to the given allocatePolicyStr.
// Currently the valid policies are either "random" or "bitmap".
func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, allocatePolicyStr string, getReservedPorts func(client client.Client) (ports []int, err error)) error {
policy, err := ValidateEnum(allocatePolicyStr)
if err != nil {
return err
}
SetupRuntimePortAllocatorWithType(client, pr, policy, getReservedPorts)
return nil
}

// SetupRuntimePortAllocatorWithType instantiates the global singleton rpa with specified port allocating policy
Expand Down Expand Up @@ -90,7 +107,7 @@ func (alloc *RuntimePortAllocator) createAndRestorePortAllocator() (err error) {
case BitMap:
alloc.pa, err = newBitMapAllocator(alloc.pr, alloc.log)
default:
err = errors.New("allocate-port-policy can only be random or bitmap")
err = errors.New("runtime-port-allocator can only be random or bitmap")
}

if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/ddc/base/portallocator/port_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,24 @@ var errDummy = func(client client.Client) (ports []int, err error) {

func TestRuntimePortAllocatorWithError(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
SetupRuntimePortAllocator(nil, pr, errDummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", errDummy)
if err != nil {
t.Fatalf("failed to setup runtime port allocator due to %v", err)
}

_, err := GetRuntimePortAllocator()
_, err = GetRuntimePortAllocator()
if err == nil {
t.Errorf("Expecetd error when GetRuntimePortAllocator")
}
}

func TestRuntimePortAllocator(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-21000")
SetupRuntimePortAllocator(nil, pr, dummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Errorf("get non-nil err when GetRuntimePortAllocator")
return
}

allocator, err := GetRuntimePortAllocator()
if err != nil {
Expand Down Expand Up @@ -71,7 +78,11 @@ func TestRuntimePortAllocator(t *testing.T) {

func TestRuntimePortAllocatorRelease(t *testing.T) {
pr := net.ParsePortRangeOrDie("20000-20010")
SetupRuntimePortAllocator(nil, pr, dummy)
err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy)
if err != nil {
t.Errorf("get non-nil err when GetRuntimePortAllocator")
return
}

preservedPorts, _ := dummy(nil)

Expand Down
15 changes: 10 additions & 5 deletions pkg/ddc/eac/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ func TestSetupMasterInternal(t *testing.T) {
runtime: eacruntime,
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)

err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
// check release found
err := gohook.Hook(helm.CheckRelease, mockExecCheckReleaseCommonFound, nil)
err = gohook.Hook(helm.CheckRelease, mockExecCheckReleaseCommonFound, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -244,9 +246,12 @@ func TestGenerateEACValueFile(t *testing.T) {
runtime: eacruntime,
}

portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}

err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockCreateConfigMap, nil)
err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockCreateConfigMap, nil)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddc/eac/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ func TestEACEngineReleasePorts(t *testing.T) {
Log: fake.NullLogger(),
}

portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts)
err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}
allocator, _ := portallocator.GetRuntimePortAllocator()
patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts",
func(_ *portallocator.RuntimePortAllocator, ports []int) {
Expand Down
Loading