Skip to content

Commit

Permalink
fix: use pod name to get lock if hashVal is nil (#1148)
Browse files Browse the repository at this point in the history
* fix: use pod name to get lock if hashVal is nil
* fix uniqueid without random strings
* resolve mountinfo after mount pod ready to avoid mountinfo changes
* deal with podHash is nil in stopFd

Signed-off-by: zwwhdls <weiwei.zhu@juicefs.io>
  • Loading branch information
zwwhdls authored Oct 21, 2024
1 parent c545bcd commit 26da478
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 39 deletions.
12 changes: 12 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

corev1 "k8s.io/api/core/v1"

"github.com/juicedata/juicefs-csi-driver/pkg/common"
k8s "github.com/juicedata/juicefs-csi-driver/pkg/k8sclient"
)

Expand Down Expand Up @@ -105,6 +106,17 @@ func IsInterVolume(name string) bool {

var PodLocks [1024]sync.Mutex

func GetPodLockKey(pod *corev1.Pod) string {
if pod == nil {
return ""
}
podHashVal := pod.Labels[common.PodJuiceHashLabelKey]
if podHashVal == "" {
return pod.Name
}
return podHashVal
}

func GetPodLock(podHashVal string) *sync.Mutex {
h := fnv.New32a()
h.Write([]byte(podHashVal))
Expand Down
38 changes: 12 additions & 26 deletions pkg/controller/pod_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ func getPodStatus(pod *corev1.Pod) podStatus {
func (p *PodDriver) checkAnnotations(ctx context.Context, pod *corev1.Pod) error {
log := util.GenLog(ctx, podDriverLog, "")
// check refs in mount pod, the corresponding pod exists or not
hashVal := pod.Labels[common.PodJuiceHashLabelKey]
if hashVal == "" {
return fmt.Errorf("pod %s/%s has no hash label", pod.Namespace, pod.Name)
}
lock := config.GetPodLock(hashVal)
lock := config.GetPodLock(config.GetPodLockKey(pod))
lock.Lock()
defer lock.Unlock()

Expand Down Expand Up @@ -239,10 +235,7 @@ func (p *PodDriver) podCompleteHandler(ctx context.Context, pod *corev1.Pod) (Re
}
log := util.GenLog(ctx, podDriverLog, "podCompleteHandler")
hashVal := pod.Labels[common.PodJuiceHashLabelKey]
if hashVal == "" {
return Result{}, fmt.Errorf("pod %s/%s has no hash label", pod.Namespace, pod.Name)
}
lock := config.GetPodLock(hashVal)
lock := config.GetPodLock(config.GetPodLockKey(pod))
lock.Lock()
defer lock.Unlock()

Expand All @@ -251,7 +244,7 @@ func (p *PodDriver) podCompleteHandler(ctx context.Context, pod *corev1.Pod) (Re
return Result{}, err
}
if !hasAvailable {
newPodName := podmount.GenPodNameByUniqueId(pod.Labels[common.PodUniqueIdLabelKey], true)
newPodName := podmount.GenPodNameByUniqueId(resource.GetUniqueId(*pod), true)
log.Info("need to create a new one", "newPodName", newPodName)
newPod, err := p.newMountPod(ctx, pod, newPodName)
if err != nil {
Expand Down Expand Up @@ -293,11 +286,7 @@ func (p *PodDriver) podErrorHandler(ctx context.Context, pod *corev1.Pod) (Resul
return Result{}, nil
}
log := util.GenLog(ctx, podDriverLog, "podErrorHandler")
hashVal := pod.Labels[common.PodJuiceHashLabelKey]
if hashVal == "" {
return Result{}, fmt.Errorf("pod %s/%s has no hash label", pod.Namespace, pod.Name)
}
lock := config.GetPodLock(hashVal)
lock := config.GetPodLock(config.GetPodLockKey(pod))
lock.Lock()
defer lock.Unlock()

Expand Down Expand Up @@ -408,11 +397,8 @@ func (p *PodDriver) podDeletedHandler(ctx context.Context, pod *corev1.Pod) (Res
existTargets := make(map[string]string)

hashVal := pod.Labels[common.PodJuiceHashLabelKey]
if hashVal == "" {
return Result{}, fmt.Errorf("pod %s/%s has no hash label", pod.Namespace, pod.Name)
}

lock := config.GetPodLock(hashVal)
lock := config.GetPodLock(config.GetPodLockKey(pod))
lock.Lock()
defer lock.Unlock()

Expand Down Expand Up @@ -451,7 +437,7 @@ func (p *PodDriver) podDeletedHandler(ctx context.Context, pod *corev1.Pod) (Res
// create
if len(existTargets) != 0 && !hasAvailPod {
// create pod
newPodName := podmount.GenPodNameByUniqueId(pod.Labels[common.PodUniqueIdLabelKey], true)
newPodName := podmount.GenPodNameByUniqueId(resource.GetUniqueId(*pod), true)
log.Info("pod targetPath not empty, need to create a new one", "newPodName", newPodName)
// delete tmp file
log.Info("delete tmp state file because it is not smoothly upgrade")
Expand Down Expand Up @@ -488,11 +474,7 @@ func (p *PodDriver) podPendingHandler(ctx context.Context, pod *corev1.Pod) (Res
return Result{}, nil
}
log := util.GenLog(ctx, podDriverLog, "podPendingHandler")
hashVal := pod.Labels[common.PodJuiceHashLabelKey]
if hashVal == "" {
return Result{}, fmt.Errorf("pod %s/%s has no hash label", pod.Namespace, pod.Name)
}
lock := config.GetPodLock(hashVal)
lock := config.GetPodLock(config.GetPodLockKey(pod))
lock.Lock()
defer lock.Unlock()

Expand Down Expand Up @@ -578,7 +560,7 @@ func (p *PodDriver) podReadyHandler(ctx context.Context, pod *corev1.Pod) (Resul
supFusePass := util.SupportFusePass(pod.Spec.Containers[0].Image)
podHashVal := pod.Labels[common.PodJuiceHashLabelKey]

lock := config.GetPodLock(podHashVal)
lock := config.GetPodLock(config.GetPodLockKey(pod))
lock.Lock()
defer lock.Unlock()

Expand All @@ -605,6 +587,10 @@ func (p *PodDriver) podReadyHandler(ctx context.Context, pod *corev1.Pod) (Resul

func (p *PodDriver) recover(ctx context.Context, pod *corev1.Pod, mntPath string) error {
log := util.GenLog(ctx, podDriverLog, "recover")
if err := p.mit.parse(); err != nil {
log.Error(err, "parse mount info error")
return err
}
for k, target := range pod.Annotations {
if k == util.GetReferenceKey(target) {
mi := p.mit.resolveTarget(ctx, target)
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/pod_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ func TestPodDriver_podReadyHandler(t *testing.T) {
}
patch1 := ApplyFuncSeq(os.Stat, outputs)
defer patch1.Reset()
patch4 := ApplyFunc(mount.ParseMountInfo, func(filename string) ([]mount.MountInfo, error) {
return genMountInfos(), nil
})
defer patch4.Reset()
_, err := d.podReadyHandler(context.Background(), readyPod)
So(err, ShouldBeNil)
})
Expand All @@ -524,7 +528,7 @@ func TestPodDriver_podReadyHandler(t *testing.T) {
patch2 := ApplyFuncSeq(os.Stat, outputs)
defer patch2.Reset()
_, err := d.podReadyHandler(context.Background(), readyPod)
So(err, ShouldBeNil)
So(err, ShouldNotBeNil)
})
Convey("pod ready add target mntPath not exists ", func() {
d := NewPodDriver(&k8sclient.K8sClient{Interface: fake.NewSimpleClientset()}, mount.SafeFormatAndMount{
Expand All @@ -537,6 +541,10 @@ func TestPodDriver_podReadyHandler(t *testing.T) {
}
patch1 := ApplyFuncSeq(os.Stat, outputs)
defer patch1.Reset()
patch4 := ApplyFunc(mount.ParseMountInfo, func(filename string) ([]mount.MountInfo, error) {
return genMountInfos(), nil
})
defer patch4.Reset()
_, err := d.podReadyHandler(context.Background(), readyPod)
So(err, ShouldBeNil)
})
Expand Down Expand Up @@ -703,6 +711,10 @@ func TestPodDriver_podReadyHandler(t *testing.T) {
}},
},
}
patch4 := ApplyFunc(mount.ParseMountInfo, func(filename string) ([]mount.MountInfo, error) {
return genMountInfos(), nil
})
defer patch4.Reset()
_, err := d.podReadyHandler(context.Background(), pod)
So(err, ShouldBeNil)
})
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
reconcilerLog.Error(err, "doReconcile GetNodeRunningPods error")
goto finish
}
if err := mit.parse(); err != nil {
reconcilerLog.Error(err, "doReconcile ParseMountInfo error")
goto finish
}

for i := range podList.Items {
pod := &podList.Items[i]
Expand Down
3 changes: 3 additions & 0 deletions pkg/fuse/passfd/passfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (fs *Fds) GetFdAddress(ctx context.Context, podHashVal string) (string, err
}

func (fs *Fds) StopFd(ctx context.Context, podHashVal string) {
if podHashVal == "" {
return
}
fs.globalMu.Lock()
f := fs.fds[podHashVal]
if f == nil {
Expand Down
11 changes: 3 additions & 8 deletions pkg/juicefs/juicefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ func (j *juicefs) JfsUnmount(ctx context.Context, volumeId, mountPath string) er
mountPods := []corev1.Pod{}
var mountPod *corev1.Pod
var podName string
var hashVal string
// get pod by exact name
oldPodName := podmount.GenPodNameByUniqueId(uniqueId, false)
pod, err := j.K8sClient.GetPod(ctx, oldPodName, config.Namespace)
Expand All @@ -533,9 +532,6 @@ func (j *juicefs) JfsUnmount(ctx context.Context, volumeId, mountPath string) er
mountPods = append(mountPods, pods...)
// find pod by target
key := util.GetReferenceKey(mountPath)
lock := config.GetPodLock(hashVal)
lock.Lock()
defer lock.Unlock()
for _, po := range mountPods {
if po.DeletionTimestamp != nil || resource.IsPodComplete(&po) {
continue
Expand All @@ -547,11 +543,10 @@ func (j *juicefs) JfsUnmount(ctx context.Context, volumeId, mountPath string) er
}
if mountPod != nil {
podName = mountPod.Name
hashVal = mountPod.Labels[common.PodJuiceHashLabelKey]
if hashVal == "" {
return fmt.Errorf("pod %s/%s has no hash label", mountPod.Namespace, mountPod.Name)
}
}
lock := config.GetPodLock(config.GetPodLockKey(mountPod))
lock.Lock()
defer lock.Unlock()

// umount target path
if err = mnt.UmountTarget(ctx, mountPath, podName); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,15 @@ func GetCommPath(basePath string, pod corev1.Pod) (string, error) {
}
return path.Join(basePath, hashVal, "fuse_fd_comm.1"), nil
}

func GetUniqueId(pod corev1.Pod) string {
if pod.Labels[common.PodUniqueIdLabelKey] != "" {
return pod.Labels[common.PodUniqueIdLabelKey]
}

// for backward compatibility
// pod created by version before: https://github.com/juicedata/juicefs-csi-driver/pull/370
nodeName := pod.Spec.NodeName
uniqueId := strings.SplitN(pod.Name, fmt.Sprintf("%s-", nodeName), 2)[1]
return uniqueId
}
33 changes: 33 additions & 0 deletions pkg/util/resource/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,36 @@ func TestGetAllRefKeys(t *testing.T) {
})
}
}

func TestGetUniqueId(t *testing.T) {
type args struct {
pod corev1.Pod
}
tests := []struct {
name string
args args
want string
}{
{
name: "test",
args: args{
pod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "juicefs-test1-123-3456-6789-123",
},
Spec: corev1.PodSpec{
NodeName: "test1",
},
},
},
want: "123-3456-6789-123",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetUniqueId(tt.args.pod); got != tt.want {
t.Errorf("GetUniqueId() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 26da478

Please sign in to comment.