diff --git a/api/v1/self_healing_types.go b/api/v1/self_healing_types.go index 90501a57..cd6c34e2 100644 --- a/api/v1/self_healing_types.go +++ b/api/v1/self_healing_types.go @@ -26,6 +26,10 @@ type SelfHealSpec struct { // // +optional HeightDriftMitigation *HeightDriftMitigationSpec `json:"heightDriftMitigation"` + // Take action when a pod is stuck. + // + // +optional + StuckPodMitigation *StuckPodMitigationSpec `json:"stuckPodMitigation"` } type PVCAutoScaleSpec struct { @@ -63,6 +67,11 @@ type HeightDriftMitigationSpec struct { Threshold uint32 `json:"threshold"` } +type StuckPodMitigationSpec struct { + // If a pod is stuck in a non-running state for this duration, the pod is deleted. + Threshold uint32 `json:"threshold"` +} + type SelfHealingStatus struct { // PVC auto-scaling status. // +optional diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index a2ea16cb..fdffcbe1 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -693,6 +693,11 @@ func (in *SelfHealSpec) DeepCopyInto(out *SelfHealSpec) { *out = new(HeightDriftMitigationSpec) **out = **in } + if in.StuckPodMitigation != nil { + in, out := &in.StuckPodMitigation, &out.StuckPodMitigation + *out = new(StuckPodMitigationSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SelfHealSpec. @@ -794,6 +799,21 @@ func (in *ServiceSpec) DeepCopy() *ServiceSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StuckPodMitigationSpec) DeepCopyInto(out *StuckPodMitigationSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StuckPodMitigationSpec. +func (in *StuckPodMitigationSpec) DeepCopy() *StuckPodMitigationSpec { + if in == nil { + return nil + } + out := new(StuckPodMitigationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SyncInfoPodStatus) DeepCopyInto(out *SyncInfoPodStatus) { *out = *in diff --git a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml index df0f8b39..aed78565 100644 --- a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml +++ b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml @@ -5669,6 +5669,17 @@ spec: - increaseQuantity - usedSpacePercentage type: object + stuckPodMitigation: + description: Take action when a pod is stuck. + properties: + threshold: + description: If a pod is stuck in a non-running state for + this duration, the pod is deleted. + format: int32 + type: integer + required: + - threshold + type: object type: object service: description: |- diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index e93a714f..b2b5ba22 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -43,9 +43,16 @@ rules: resources: - pods verbs: + - delete - get - list - watch +- apiGroups: + - "" + resources: + - pods/log + verbs: + - get - apiGroups: - "" resources: diff --git a/controllers/selfhealing_controller.go b/controllers/selfhealing_controller.go index c5b7bc99..80e38863 100644 --- a/controllers/selfhealing_controller.go +++ b/controllers/selfhealing_controller.go @@ -28,6 +28,7 @@ import ( "github.com/strangelove-ventures/cosmos-operator/internal/fullnode" "github.com/strangelove-ventures/cosmos-operator/internal/healthcheck" "github.com/strangelove-ventures/cosmos-operator/internal/kube" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -42,6 +43,7 @@ type SelfHealingReconciler struct { driftDetector fullnode.DriftDetection pvcAutoScaler *fullnode.PVCAutoScaler recorder record.EventRecorder + stuckDetector *fullnode.StuckPodDetection } func NewSelfHealing( @@ -61,6 +63,9 @@ func NewSelfHealing( } } +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete +//+kubebuilder:rbac:groups="",resources=pods/log,verbs=get + // Reconcile reconciles only the self-healing spec in CosmosFullNode. If changes needed, this controller // updates a CosmosFullNode status subresource thus triggering another reconcile loop. The CosmosFullNode // uses the status object to reconcile its state. @@ -85,6 +90,7 @@ func (r *SelfHealingReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.pvcAutoScale(ctx, reporter, crd) r.mitigateHeightDrift(ctx, reporter, crd) + r.mitigateStuckPods(ctx, reporter, crd) return ctrl.Result{RequeueAfter: 60 * time.Second}, nil } @@ -120,21 +126,38 @@ func (r *SelfHealingReconciler) mitigateHeightDrift(ctx context.Context, reporte } pods := r.driftDetector.LaggingPods(ctx, crd) + deleted := r.DeletePods(pods, "HeightDriftMitigationDeletePod", reporter, ctx) + if deleted > 0 { + msg := fmt.Sprintf("Height lagged behind by %d or more blocks; deleted pod(s)", crd.Spec.SelfHeal.HeightDriftMitigation.Threshold) + reporter.RecordInfo("HeightDriftMitigation", msg) + } +} + +func (r *SelfHealingReconciler) mitigateStuckPods(ctx context.Context, reporter kube.Reporter, crd *cosmosv1.CosmosFullNode) { + if crd.Spec.SelfHeal.StuckPodMitigation == nil { + return + } + + pods := r.stuckDetector.StuckPods(ctx, crd) + deleted := r.DeletePods(pods, "StuckPodMitigationDeletePod", reporter, ctx) + if deleted > 0 { + msg := fmt.Sprintf("Stuck for %d seconds; deleted pod(s)", crd.Spec.SelfHeal.StuckPodMitigation.Threshold) + reporter.RecordInfo("StuckPodMitigation", msg) + } +} + +func (r *SelfHealingReconciler) DeletePods(pods []*v1.Pod, reason string, reporter kube.Reporter, ctx context.Context) int { var deleted int for _, pod := range pods { - // CosmosFullNodeController will detect missing pod and re-create it. if err := r.Delete(ctx, pod); kube.IgnoreNotFound(err) != nil { reporter.Error(err, "Failed to delete pod", "pod", pod.Name) - reporter.RecordError("HeightDriftMitigationDeletePod", err) + reporter.RecordError(reason, err) continue } - reporter.Info("Deleted pod for meeting height drift threshold", "pod", pod.Name) + reporter.Info("Deleted pod for ", reason, " pod:", pod.Name) deleted++ } - if deleted > 0 { - msg := fmt.Sprintf("Height lagged behind by %d or more blocks; deleted pod(s)", crd.Spec.SelfHeal.HeightDriftMitigation.Threshold) - reporter.RecordInfo("HeightDriftMitigation", msg) - } + return deleted } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/fullnode/stuck_detection.go b/internal/fullnode/stuck_detection.go new file mode 100644 index 00000000..a75de0ec --- /dev/null +++ b/internal/fullnode/stuck_detection.go @@ -0,0 +1,121 @@ +package fullnode + +import ( + "context" + "fmt" + "io" + "strings" + "time" + + cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" + "github.com/strangelove-ventures/cosmos-operator/internal/kube" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type StuckPodDetection struct { + available func(pods []*corev1.Pod, minReady time.Duration, now time.Time) []*corev1.Pod + collector StatusCollector + computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int +} + +func NewStuckDetection(collector StatusCollector) StuckPodDetection { + return StuckPodDetection{ + available: kube.AvailablePods, + collector: collector, + computeRollout: kube.ComputeRollout, + } +} + +// StuckPods returns pods that are stuck on a block height due to a cometbft issue that manifests on sentries using horcrux. +func (d StuckPodDetection) StuckPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod { + pods := d.collector.Collect(ctx, client.ObjectKeyFromObject(crd)).Synced().Pods() + + for i, pod := range pods { + config, err := rest.InClusterConfig() + if err != nil { + panic(err.Error()) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + + receivedString := getPodLogsLastLine(clientset, pod) + fmt.Println(receivedString) + podIsStuck := isPodStuck(receivedString) + + //MORE TODO HERE + if podIsStuck { + pods = removeElement(pods, i) + } + } + return pods +} + +func isPodStuck(receivedString string) bool { + if strings.Contains(receivedString, "SignerListener: Connected") { + timeInLog, err := extractTimeFromLog(receivedString) + if err != nil { + fmt.Println("Error parsing time from log:", err) + return true + } + + currentTime := time.Now().UTC() + + logTimeToday := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), + timeInLog.Hour(), timeInLog.Minute(), timeInLog.Second(), timeInLog.Nanosecond(), currentTime.Location()) + + timeDiff := currentTime.Sub(logTimeToday) + + if timeDiff >= time.Minute { + return true + } + } + + return false +} + +func extractTimeFromLog(log string) (time.Time, error) { + parts := strings.Fields(log) + + const timeLayout = "3:04PM" + parsedTime, err := time.Parse(timeLayout, parts[0]) + if err != nil { + return time.Time{}, err + } + + return parsedTime, nil +} + +func getPodLogsLastLine(clientset *kubernetes.Clientset, pod *corev1.Pod) string { + podLogOpts := corev1.PodLogOptions{} + logRequest := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) + + logStream, err := logRequest.Stream(context.Background()) + if err != nil { + fmt.Printf("Error getting logs for pod %s: %v\n", pod.Name, err) + return "" + } + defer logStream.Close() + + logBytes, err := io.ReadAll(logStream) + if err != nil { + fmt.Printf("Error reading logs for pod %s: %v\n", pod.Name, err) + return "" + } + + logLines := strings.Split(strings.TrimRight(string(logBytes), "\n"), "\n") + if len(logLines) > 0 { + return logLines[len(logLines)-1] + } + return "" +} + +func removeElement(slice []*corev1.Pod, index int) []*corev1.Pod { + return append(slice[:index], slice[index+1:]...) +}