Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apis/apps/v1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,9 @@ type ClusterSharding struct {
// between the desired and actual number of shards.
// KubeBlocks provides lifecycle management for sharding, including:
//
// - Executing the shardProvision Action defined in the ShardingDefinition when the number of shards increases.
// - Executing the shardAdd Action defined in the ShardingDefinition when the number of shards increases.
// This allows for custom actions to be performed after a new shard is provisioned.
// - Executing the shardTerminate Action defined in the ShardingDefinition when the number of shards decreases.
// - Executing the shardRemove Action defined in the ShardingDefinition when the number of shards decreases.
// This enables custom cleanup or data migration tasks to be executed before a shard is terminated.
// Resources and data associated with the corresponding Component will also be deleted.
//
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11385,9 +11385,9 @@ spec:
KubeBlocks provides lifecycle management for sharding, including:


- Executing the shardProvision Action defined in the ShardingDefinition when the number of shards increases.
- Executing the shardAdd Action defined in the ShardingDefinition when the number of shards increases.
This allows for custom actions to be performed after a new shard is provisioned.
- Executing the shardTerminate Action defined in the ShardingDefinition when the number of shards decreases.
- Executing the shardRemove Action defined in the ShardingDefinition when the number of shards decreases.
This enables custom cleanup or data migration tasks to be executed before a shard is terminated.
Resources and data associated with the corresponding Component will also be deleted.
format: int32
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// TODO: transformers are vertices, theirs' dependencies are edges, make plan Build stage a DAG.
plan, errBuild := planBuilder.
AddTransformer(
// handle cluster termination policy
&clusterTerminationPolicyTransformer{},
// handle cluster deletion
&clusterDeletionTransformer{},
Expand Down
258 changes: 243 additions & 15 deletions controllers/apps/cluster/transformer_cluster_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"slices"
"strconv"
"strings"
"time"

"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,9 +39,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/sharding"
ictrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
Expand Down Expand Up @@ -80,19 +83,19 @@ func (t *clusterComponentTransformer) transform(transCtx *clusterTransformContex

createSet, deleteSet, updateSet := setDiff(runningSet, protoSet)

if err := deleteCompNShardingInOrder(transCtx, dag, deleteSet, pointer.Bool(true)); err != nil {
if err = deleteCompNShardingInOrder(transCtx, dag, deleteSet, pointer.Bool(true)); err != nil {
return err
}

var delayedErr error
if err := t.handleUpdate(transCtx, dag, updateSet); err != nil {
if err = t.handleUpdate(transCtx, dag, updateSet); err != nil {
if !ictrlutil.IsDelayedRequeueError(err) {
return err
}
delayedErr = err
}

if err := t.handleCreate(transCtx, dag, createSet); err != nil {
if err = t.handleCreate(transCtx, dag, createSet); err != nil {
return err
}

Expand Down Expand Up @@ -809,21 +812,50 @@ type clusterShardingHandler struct {
scaleIn *bool
}

const (
// kbShardingPostProvisionKey is an annotation key to mark the sharding components which need post-provision
kbShardingPostProvisionKey = "kubeblocks.io/sharding-post-provision"
// kbShardingPreTerminateDoneKey is an annotation key to mark the sharding components which have done pre-terminate
kbShardingPreTerminateDoneKey = "kubeblocks.io/sharding-pre-terminate-done"
// kbShardingAddKey is an annotation key to mark the sharding components which are added during sharding scaling out
kbShardingAddKey = "kubeblocks.io/sharding-add"
// kbShardingRemoveDoneKey is an annotation key to mark the sharding components which have been removed during sharding scaling in
kbShardingRemoveDoneKey = "kubeblocks.io/sharding-remove-done"

kbShardingPostProvisionAction = "shardingPostProvision"
kbShardingPreTerminateAction = "shardingPreTerminate"
kbShardingAddAction = "shardingAdd"
kbShardingRemoveAction = "shardingRemove"
)

func (h *clusterShardingHandler) create(transCtx *clusterTransformContext, dag *graph.DAG, name string) error {
protoComps, err := h.protoComps(transCtx, name, nil)
if err != nil {
return err
}

shardingDef := transCtx.shardingDefs[name]
now := time.Now().Format(time.RFC3339Nano)
addPostProvisionAnnotation := func(comp *appsv1.Component) {
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil || shardingDef.Spec.LifecycleActions.ShardAdd == nil {
return
}

if comp.Annotations == nil {
comp.Annotations = make(map[string]string)
}
comp.Annotations[kbShardingPostProvisionKey] = now
}

graphCli, _ := transCtx.Client.(model.GraphClient)
for i := range protoComps {
addPostProvisionAnnotation(protoComps[i])
graphCli.Create(dag, protoComps[i])
}

// initClusterCompNShardingStatus(transCtx, name)

// TODO:
// 1. sharding post-provision
// 2. provision strategy
// TODO: provision strategy

return nil
}
Expand All @@ -835,10 +867,15 @@ func (h *clusterShardingHandler) delete(transCtx *clusterTransformContext, dag *
return err
}

// TODO: sharding pre-terminate

graphCli, _ := transCtx.Client.(model.GraphClient)
for i := range runningComps {
err = doShardingLifecycleAction(transCtx, dag, transCtx.shardingDefs[name], &runningComps[i], name, kbShardingPreTerminateAction)
if err != nil {
transCtx.Logger.Error(err, "failed to do shard lifecycle actions",
"action", kbShardingPreTerminateAction, "component", runningComps[i].Name)
continue
}

h.deleteComp(transCtx, graphCli, dag, &runningComps[i], nil)
}

Expand Down Expand Up @@ -889,36 +926,62 @@ func (h *clusterShardingHandler) update(transCtx *clusterTransformContext, dag *
toCreate, toDelete, toUpdate := mapDiff(runningCompsMap, protoCompsMap)

// TODO: update strategy
h.deleteComps(transCtx, dag, runningCompsMap, toDelete)
h.updateComps(transCtx, dag, runningCompsMap, protoCompsMap, toUpdate)
h.createComps(transCtx, dag, protoCompsMap, toCreate)
h.deleteComps(transCtx, dag, runningCompsMap, toDelete, name)
h.updateComps(transCtx, dag, runningCompsMap, protoCompsMap, toUpdate, name)
h.createComps(transCtx, dag, protoCompsMap, toCreate, name)

return nil
}

func (h *clusterShardingHandler) createComps(transCtx *clusterTransformContext, dag *graph.DAG,
protoComps map[string]*appsv1.Component, createSet sets.Set[string]) {
protoComps map[string]*appsv1.Component, createSet sets.Set[string], shardingName string) {
graphCli, _ := transCtx.Client.(model.GraphClient)
now := time.Now().Format(time.RFC3339Nano)
addShardAddAnnotation := func(comp *appsv1.Component) {
shardingDef := transCtx.shardingDefs[shardingName]
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil || shardingDef.Spec.LifecycleActions.ShardAdd == nil {
return
}

if comp.Annotations == nil {
comp.Annotations = make(map[string]string)
}
comp.Annotations[kbShardingAddKey] = now
}

for name := range createSet {
addShardAddAnnotation(protoComps[name])
graphCli.Create(dag, protoComps[name])
// TODO: shard post-provision
}
}

// deleteComps deletes the subcomponents of the sharding when the shards count is updated.
func (h *clusterShardingHandler) deleteComps(transCtx *clusterTransformContext, dag *graph.DAG,
runningComps map[string]*appsv1.Component, deleteSet sets.Set[string]) {
runningComps map[string]*appsv1.Component, deleteSet sets.Set[string], shardingName string) {
graphCli, _ := transCtx.Client.(model.GraphClient)

for name := range deleteSet {
err := doShardingLifecycleAction(transCtx, dag, transCtx.shardingDefs[shardingName], runningComps[name], shardingName, kbShardingRemoveAction)
if err != nil {
transCtx.Logger.Error(err, "failed to do shard lifecycle actions",
"action", kbShardingRemoveAction, "component", name)
continue
}

h.deleteComp(transCtx, graphCli, dag, runningComps[name], pointer.Bool(true))
}
}

func (h *clusterShardingHandler) updateComps(transCtx *clusterTransformContext, dag *graph.DAG,
runningComps map[string]*appsv1.Component, protoComps map[string]*appsv1.Component, updateSet sets.Set[string]) {
runningComps map[string]*appsv1.Component, protoComps map[string]*appsv1.Component, updateSet sets.Set[string], shardingName string) {
graphCli, _ := transCtx.Client.(model.GraphClient)
for name := range updateSet {
running, proto := runningComps[name], protoComps[name]
err := handleShardingAddNPostProvision(transCtx, dag, transCtx.shardingDefs[shardingName], running)
if err != nil {
continue
}

if obj := copyAndMergeComponent(running, proto); obj != nil {
graphCli.Update(dag, running, obj)
}
Expand Down Expand Up @@ -1262,3 +1325,168 @@ func buildComponentSidecar(proto, running *appsv1.Component, sidecarName string,
}
return checkedAppend(sidecar, sidecarDef)
}

func lifecycleAction4Sharding(transCtx *clusterTransformContext, comp *appsv1.Component, lifecycleAction *appsv1.ShardingLifecycleActions, shardingName string) (lifecycle.ShardingLifecycle, error) {
synthesizedComp, err := synthesizedComponent(transCtx, comp)
if err != nil {
return nil, err
}

pods, err := component.ListOwnedPods(transCtx.Context, transCtx.Client,
synthesizedComp.Namespace, synthesizedComp.ClusterName, synthesizedComp.Name)
if err != nil {
return nil, err
}

return lifecycle.NewShardingLifecycle(transCtx.Cluster.Namespace, transCtx.Cluster.Name, synthesizedComp.Name, shardingName, lifecycleAction, synthesizedComp.TemplateVars, nil, pods...)
}

func synthesizedComponent(transCtx *clusterTransformContext, comp *appsv1.Component) (*component.SynthesizedComponent, error) {
synthesizedComp, err := component.BuildSynthesizedComponent(transCtx.Context, transCtx.Client, transCtx.componentDefs[comp.Spec.CompDef], comp)
if err != nil {
return nil, ictrlutil.NewRequeueError(appsutil.RequeueDuration,
fmt.Sprintf("build synthesized component failed at pre-terminate: %s", err.Error()))
}
synthesizedComp.TemplateVars, _, err = component.ResolveTemplateNEnvVars(transCtx.Context, transCtx.Client, synthesizedComp, transCtx.componentDefs[comp.Spec.CompDef].Spec.Vars)
if err != nil {
return nil, err
}
return synthesizedComp, nil
}

func handleShardingAddNPostProvision(transCtx *clusterTransformContext, dag *graph.DAG, shardingDef *appsv1.ShardingDefinition, comp *appsv1.Component) error {
var err error
switch {
case comp.Annotations[kbShardingAddKey] != "" && comp.Annotations[kbShardingPostProvisionKey] != "":
return fmt.Errorf("sharding component %s has both add and post-provision annotations", comp.Name)
case comp.Annotations[kbShardingAddKey] != "":
err = doShardingLifecycleAction(transCtx, dag, shardingDef, comp, shardingDef.Name, kbShardingAddAction)
if err != nil {
transCtx.Logger.Error(err, "failed to do shard lifecycle actions",
"action", kbShardingAddAction, "component", comp.Name)
}
case comp.Annotations[kbShardingPostProvisionKey] != "":
err = doShardingLifecycleAction(transCtx, dag, shardingDef, comp, shardingDef.Name, kbShardingPostProvisionAction)
if err != nil {
transCtx.Logger.Error(err, "failed to do shard lifecycle actions",
"action", kbShardingPostProvisionAction, "component", comp.Name)
}
}

return err
}

func doShardingLifecycleAction(transCtx *clusterTransformContext,
dag *graph.DAG,
shardingDef *appsv1.ShardingDefinition,
comp *appsv1.Component,
shardingName string,
actionName string) error {
if shardingDef == nil || shardingDef.Spec.LifecycleActions == nil {
return nil
}

var checkAnnotationExist bool
var annotation string
var needsAction func() bool
var actionFunc func(lfa lifecycle.ShardingLifecycle) error
switch actionName {
case kbShardingPostProvisionAction:
checkAnnotationExist = false
annotation = kbShardingPostProvisionKey
needsAction = func() bool {
return shardingDef.Spec.LifecycleActions.PostProvision != nil
}
actionFunc = func(lfa lifecycle.ShardingLifecycle) error {
return lfa.PostProvision(transCtx.Context, transCtx.Client, nil)
}
case kbShardingPreTerminateAction:
checkAnnotationExist = true
annotation = kbShardingPreTerminateDoneKey
needsAction = func() bool {
return shardingDef.Spec.LifecycleActions.PreTerminate != nil
}
actionFunc = func(lfa lifecycle.ShardingLifecycle) error {
return lfa.PreTerminate(transCtx.Context, transCtx.Client, nil)
}
case kbShardingRemoveAction:
checkAnnotationExist = true
annotation = kbShardingRemoveDoneKey
needsAction = func() bool {
if shardingDef.Spec.LifecycleActions.ShardRemove == nil {
return false
}
if comp.Annotations != nil && comp.Annotations[kbShardingAddKey] != "" {
// shardAdd is not done yet, skip shardRemove
return false
}
return true
}
actionFunc = func(lfa lifecycle.ShardingLifecycle) error {
return lfa.ShardRemove(transCtx.Context, transCtx.Client, nil)
}
case kbShardingAddAction:
checkAnnotationExist = false
annotation = kbShardingAddKey
needsAction = func() bool {
return shardingDef.Spec.LifecycleActions.ShardAdd != nil
}
actionFunc = func(lfa lifecycle.ShardingLifecycle) error {
return lfa.ShardAdd(transCtx.Context, transCtx.Client, nil)
}
default:
return fmt.Errorf("unknown sharding lifecycle action: %s", actionName)
}

if !needsAction() {
return nil
}

if checkShardingActionDone(comp, checkAnnotationExist, annotation) {
return nil
}

lfa, err := lifecycleAction4Sharding(transCtx, comp, shardingDef.Spec.LifecycleActions, shardingName)
if err != nil {
return err
}

err = actionFunc(lfa)
if err != nil {
return lifecycle.IgnoreNotDefined(err)
}

markShardingActionDone(transCtx, dag, comp, checkAnnotationExist, annotation)
return nil
}

func checkShardingActionDone(comp *appsv1.Component, checkExist bool, annotation string) bool {
if comp.Annotations == nil {
return !checkExist
}

val, ok := comp.Annotations[annotation]
return (ok && val != "") == checkExist
}

func markShardingActionDone(transCtx *clusterTransformContext, dag *graph.DAG, comp *appsv1.Component, checkExist bool, annotation string) {
graphCli, _ := transCtx.Client.(model.GraphClient)
now := time.Now().Format(time.RFC3339Nano)

compCopyObj := comp.DeepCopy()
if compCopyObj.Annotations == nil {
compCopyObj.Annotations = make(map[string]string)
}

val, ok := compCopyObj.Annotations[annotation]
if (ok && val != "") == checkExist {
return
}

if checkExist {
compCopyObj.Annotations[annotation] = now
} else {
compCopyObj.Annotations[annotation] = ""
}
graphCli.Update(dag, comp, compCopyObj)
}
4 changes: 2 additions & 2 deletions deploy/helm/crds/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11385,9 +11385,9 @@ spec:
KubeBlocks provides lifecycle management for sharding, including:


- Executing the shardProvision Action defined in the ShardingDefinition when the number of shards increases.
- Executing the shardAdd Action defined in the ShardingDefinition when the number of shards increases.
This allows for custom actions to be performed after a new shard is provisioned.
- Executing the shardTerminate Action defined in the ShardingDefinition when the number of shards decreases.
- Executing the shardRemove Action defined in the ShardingDefinition when the number of shards decreases.
This enables custom cleanup or data migration tasks to be executed before a shard is terminated.
Resources and data associated with the corresponding Component will also be deleted.
format: int32
Expand Down
Loading