Ai
1 Star 0 Fork 0

乐观的兔子/jwt

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.go 9.58 KB
一键复制 编辑 原始数据 按行查看 历史
乐观的兔子 提交于 2024-08-26 16:03 +08:00 . commit
package main
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned"
"os"
"sort"
"time"
)
// PodUsage 结构体用于存储单个 Pod 的 CPU 和内存使用情况
type PodUsage struct {
Pod corev1.Pod // Pod 对象
CPUUsage int64 // CPU 使用量(毫核)
MemUsage int64 // 内存使用量(字节)
}
func main() {
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
logrus.SetOutput(os.Stdout)
// 创建日志文件
logFile, err := os.OpenFile("k8s_node_optimization.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
logrus.Fatalf("无法创建日志文件: %v", err)
}
defer logFile.Close()
logrus.SetOutput(logFile)
logrus.Info("开始节点优化操作")
// 加载 Kubernetes 配置
config, err := clientcmd.BuildConfigFromFlags("", "config/meta.kubeconfig")
if err != nil {
logrus.Fatalf("无法加载 kubeconfig: %v", err)
}
// 初始化 Kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
logrus.Fatalf("无法初始化 Kubernetes 客户端: %v", err)
}
// 初始化 Metrics API 客户端
metricsClientset, err := metricsclientset.NewForConfig(config)
if err != nil {
logrus.Fatalf("无法初始化 Metrics API 客户端: %v", err)
}
// 获取所有节点的信息
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
logrus.Fatalf("无法获取节点列表: %v", err)
}
// 获取所有节点的资源使用情况
nodeMetrics, err := metricsClientset.MetricsV1beta1().NodeMetricses().List(context.TODO(), metav1.ListOptions{})
if err != nil {
logrus.Fatalf("无法获取节点指标: %v", err)
}
// 按节点负载排序,优先处理负载高的节点
sort.Slice(nodes.Items, func(i, j int) bool {
return calculateNodeScore(&nodeMetrics.Items[i]) > calculateNodeScore(&nodeMetrics.Items[j])
})
var unschedulableNodes []string // 存储标记为不可调度的节点名称
// 顺序处理每个节点的优化操作
for _, node := range nodes.Items {
if err := optimizeNode(node, nodeMetrics, clientset, metricsClientset, &unschedulableNodes); err != nil {
logrus.Errorf("优化节点 %s 时出错: %v", node.Name, err)
}
}
// 恢复所有节点为可调度状态
restoreNodes(clientset, unschedulableNodes)
logrus.Info("节点优化操作完成")
}
// optimizeNode 优化单个节点的资源使用情况
func optimizeNode(node corev1.Node, nodeMetrics *metricsv1beta1.NodeMetricsList, clientset *kubernetes.Clientset, metricsClientset *metricsclientset.Clientset, unschedulableNodes *[]string) error {
// 计算当前不可调度的节点数
unschedulableCount := len(*unschedulableNodes)
totalNodes := len(nodeMetrics.Items)
// 如果不可调度节点数已经接近总节点数的一半,则跳过当前节点的优化
if unschedulableCount >= totalNodes/2 {
logrus.Warnf("跳过节点 %s 的优化,因为已有过多的节点不可调度", node.Name)
return nil
}
for _, m := range nodeMetrics.Items {
if m.Name == node.Name {
cpuQuantity := m.Usage[corev1.ResourceCPU]
memQuantity := m.Usage[corev1.ResourceMemory]
cpuUsage := cpuQuantity.MilliValue()
memUsage := memQuantity.Value()
logrus.Infof("节点名称: %s, CPU 使用量: %d mCPU, 内存使用量: %d bytes", node.Name, cpuUsage, memUsage)
// 标记节点为不可调度
if err := cordonNode(clientset, node.Name); err != nil {
logrus.Fatalf("标记节点 %s 为不可调度状态失败: %v", node.Name, err)
}
*unschedulableNodes = append(*unschedulableNodes, node.Name)
podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name),
})
if err != nil {
logrus.Fatalf("无法获取节点 %s 上的 Pod 列表: %v", node.Name, err)
}
var podUsages []PodUsage
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
logrus.Warnf("Pod %s 正在删除,跳过...", pod.Name)
continue
}
podMetrics, err := metricsClientset.MetricsV1beta1().PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
logrus.Warnf("无法获取 Pod %s 的 Metrics 数据,跳过: %v", pod.Name, err)
continue
}
var totalCPUUsage, totalMemUsage int64
for _, container := range podMetrics.Containers {
cpuQuantity := container.Usage[corev1.ResourceCPU]
memQuantity := container.Usage[corev1.ResourceMemory]
cpuUsage := cpuQuantity.MilliValue()
memUsage := memQuantity.Value()
totalCPUUsage += cpuUsage
totalMemUsage += memUsage
}
podUsages = append(podUsages, PodUsage{
Pod: pod,
CPUUsage: totalCPUUsage,
MemUsage: totalMemUsage,
})
}
sort.Slice(podUsages, func(i, j int) bool {
return podUsages[i].MemUsage > podUsages[j].MemUsage
})
for i := 0; i < len(podUsages); i++ {
targetNode, err := selectBalancedNode(podUsages[i], nodeMetrics)
if err != nil {
logrus.Warnf("未能找到适合的目标节点进行迁移: %v", err)
continue
}
if err := migratePodWithRetry(clientset, podUsages[i], targetNode, 3); err != nil {
logrus.Warnf("迁移 Pod %s 到节点 %s 失败: %v,跳过...", podUsages[i].Pod.Name, targetNode, err)
continue
}
time.Sleep(10 * time.Second)
}
}
}
return nil
}
// cordonNode 将指定节点标记为不可调度
func cordonNode(clientset *kubernetes.Clientset, nodeName string) error {
_, err := clientset.CoreV1().Nodes().Patch(context.TODO(), nodeName, "application/strategic-merge-patch+json", []byte(`{"spec":{"unschedulable":true}}`), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("标记节点 %s 为不可调度状态失败: %v", nodeName, err)
}
logrus.Infof("节点 %s 已标记为不可调度", nodeName)
return nil
}
// restoreNodes 恢复所有不可调度节点为可调度状态
func restoreNodes(clientset *kubernetes.Clientset, unschedulableNodes []string) {
if len(unschedulableNodes) > 0 {
logrus.Info("恢复所有节点为可调度状态...")
for _, nodeName := range unschedulableNodes {
_, err := clientset.CoreV1().Nodes().Patch(context.TODO(), nodeName, "application/strategic-merge-patch+json", []byte(`{"spec":{"unschedulable":false}}`), metav1.PatchOptions{})
if err != nil {
logrus.Warnf("恢复节点 %s 为可调度状态失败: %v", nodeName, err)
} else {
logrus.Infof("节点 %s 已恢复为可调度", nodeName)
}
}
} else {
logrus.Info("所有节点的负载均正常,无需恢复操作")
}
}
// calculateNodeScore 计算节点的资源负载得分
func calculateNodeScore(nodeUsage *metricsv1beta1.NodeMetrics) int64 {
cpuQuantity := nodeUsage.Usage[corev1.ResourceCPU]
memQuantity := nodeUsage.Usage[corev1.ResourceMemory]
cpuUsage := cpuQuantity.MilliValue()
memUsage := memQuantity.Value()
return cpuUsage + memUsage
}
// selectBalancedNode 选择最平衡的目标节点,用于迁移指定的 Pod
func selectBalancedNode(pod PodUsage, nodeMetrics *metricsv1beta1.NodeMetricsList) (string, error) {
var selectedNode string
var lowestScore int64 = -1
for _, node := range nodeMetrics.Items {
nodeScore := calculateNodeScore(&node)
if lowestScore == -1 || nodeScore < lowestScore {
lowestScore = nodeScore
selectedNode = node.Name
}
}
if selectedNode == "" {
return "", fmt.Errorf("无法找到合适的目标节点进行迁移")
}
logrus.Infof("为 Pod 选择的目标节点: %s, 节点得分: %d", selectedNode, lowestScore)
return selectedNode, nil
}
// retry 重试机制,确保迁移 Pod 的稳定性
func retry(attempts int, sleep time.Duration, fn func() error) (err error) {
for i := 0; i < attempts; i++ {
if err = fn(); err == nil {
return nil
}
logrus.Warnf("第 %d 次尝试失败,等待 %v 后重试... 错误: %v", i+1, sleep, err)
time.Sleep(sleep)
}
return fmt.Errorf("多次重试后操作仍然失败: %w", err)
}
// migratePodWithRetry 使用重试机制迁移 Pod 到目标节点
func migratePodWithRetry(clientset *kubernetes.Clientset, podUsage PodUsage, targetNode string, retryAttempts int) error {
return retry(retryAttempts, 5*time.Second, func() error {
err := migratePod(clientset, podUsage.Pod, targetNode)
if err != nil {
logrus.Warnf("迁移 Pod %s 到节点 %s 失败: %v, 正在重试...", podUsage.Pod.Name, targetNode, err)
}
return err
})
}
// migratePod 将 Pod 驱逐到目标节点
func migratePod(clientset *kubernetes.Clientset, pod corev1.Pod, targetNode string) error {
eviction := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: &metav1.DeleteOptions{},
}
err := clientset.CoreV1().Pods(pod.Namespace).EvictV1(context.TODO(), eviction)
if err != nil {
logrus.Warnf("驱逐 Pod %s 失败: %v", pod.Name, err)
return nil
}
logrus.Infof("驱逐 Pod %s 成功,等待其迁移到节点 %s...", pod.Name, targetNode)
for {
time.Sleep(5 * time.Second)
updatedPod, err := clientset.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
logrus.Warnf("获取 Pod %s 状态失败: %v", pod.Name, err)
return nil
}
if updatedPod.Spec.NodeName == targetNode {
logrus.Infof("Pod %s 已成功迁移到节点 %s", pod.Name, targetNode)
break
}
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/optimistic-rabbit/jwt.git
git@gitee.com:optimistic-rabbit/jwt.git
optimistic-rabbit
jwt
jwt
master

搜索帮助