Skip to content

Commit

Permalink
fix: sharding cluster can not be deleted (#8643)
Browse files Browse the repository at this point in the history
(cherry picked from commit fd36c73)
  • Loading branch information
Y-Rookie committed Dec 18, 2024
1 parent 926fbe3 commit e81ea18
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
38 changes: 34 additions & 4 deletions controllers/apps/cluster_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,40 @@ func (c *clusterTransformContext) GetLogger() logr.Logger {
return c.Logger
}

func (c *clusterTransformContext) sharding(name string) bool {
// hack: use shardingComps to determine if the entity is sharding or component
_, ok := c.shardingComps[name]
return ok
// sharding use to determine if the entity is sharding or component
func (c *clusterTransformContext) sharding(name string) (bool, error) {
// fast return when the object has already been initialized
if len(c.components) > 0 || len(c.shardings) > 0 {
_, ok := c.shardingComps[name]
return ok, nil
}

// sharding defined in cd topology
if len(c.Cluster.Spec.ClusterDef) > 0 && len(c.Cluster.Spec.Topology) > 0 {
if c.clusterDef == nil {
return false, fmt.Errorf("clusterDefinition is not initialized")
}
for _, topo := range c.clusterDef.Spec.Topologies {
if c.Cluster.Spec.Topology != topo.Name {
continue
}
for _, sharding := range topo.Shardings {
if sharding.Name == name {
return true, nil
}
}
return false, nil
}
return false, fmt.Errorf("topology %s not found in ClusterDefinition %s", c.Cluster.Spec.Topology, c.Cluster.Spec.ClusterDef)
}

// sharding defined in cluster.spec.shardings
for _, sharding := range c.Cluster.Spec.Shardings {
if sharding.Name == name {
return true, nil
}
}
return false, nil
}

func (c *clusterTransformContext) total() int {
Expand Down
18 changes: 15 additions & 3 deletions controllers/apps/transformer_cluster_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ func (c *notExistPrecondition) match(transCtx *clusterTransformContext, dag *gra
}

func (c *notExistPrecondition) predecessorExist(transCtx *clusterTransformContext, dag *graph.DAG, name string) (bool, error) {
if transCtx.sharding(name) {
isSharding, err := transCtx.sharding(name)
if err != nil {
return false, err
}
if isSharding {
return c.shardingExist(transCtx, dag, name)
}
return c.compExist(transCtx, dag, name)
Expand Down Expand Up @@ -453,7 +457,11 @@ func (c *phasePrecondition) match(transCtx *clusterTransformContext, dag *graph.
}

func (c *phasePrecondition) predecessorMatch(transCtx *clusterTransformContext, dag *graph.DAG, name string) (bool, error) {
if transCtx.sharding(name) {
isSharding, err := transCtx.sharding(name)
if err != nil {
return false, err
}
if isSharding {
return c.shardingMatch(transCtx, dag, name)
}
return c.compMatch(transCtx, dag, name)
Expand Down Expand Up @@ -542,7 +550,11 @@ type clusterCompNShardingHandler struct {
}

func (h *clusterCompNShardingHandler) handle(transCtx *clusterTransformContext, dag *graph.DAG, name string) error {
if transCtx.sharding(name) {
isSharding, err := transCtx.sharding(name)
if err != nil {
return err
}
if isSharding {
handler := &clusterShardingHandler{scaleIn: h.scaleIn}
switch h.op {
case createOp:
Expand Down

0 comments on commit e81ea18

Please sign in to comment.