59 Star 21 Fork 78

Ascend/mind-cluster

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
frame.go 8.79 KB
一键复制 编辑 原始数据 按行查看 历史
luxiang6 提交于 5个月前 . cleancode修改
/*
Copyright(C)2020-2023. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package module910x8 is using for HuaWei A800/9000 Ascend910 pin affinity schedule.
*/
package module910x8
import (
"errors"
"fmt"
"reflect"
"k8s.io/api/core/v1"
"k8s.io/klog"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/internal/base"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/internal/rescheduling"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/plugin"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/util"
)
// New return npu plugin
func New(name string) base.AscendHandler {
m := &module910x8{}
m.SetPluginName(name)
m.SetAnnoName(util.NPU910CardName)
m.SetAnnoPreVal(util.NPU910CardNamePre)
m.SetDefaultJobSchedulerConfig(nil)
m.SetMaxNodeNPUNum(nodeNPUNumber)
m.netUnhealthyKey = networkUnhealthyNPU
m.affScoreList = [][]int{
{util.AffScore0, util.AffScore2, util.AffScore1, util.AffScore3},
{util.AffScore4, util.AffScore0, util.AffScore2, util.AffScore1},
{util.AffScore4, util.AffScore4, util.AffScore4, util.AffScore4},
{util.AffScore4, util.AffScore4, util.AffScore4, util.AffScore0},
}
return m
}
// ValidNPUJob check job req npu num and mode
func (tp *module910x8) ValidNPUJob() *api.ValidateResult {
vResult := &api.ValidateResult{}
var vErr error = nil
defer func() {
if vErr != nil {
vResult.Pass = false
vResult.Reason = vErr.Error()
vResult.Message = vErr.Error()
return
}
}()
// 1. check parameter.
if tp == nil {
vErr = fmt.Errorf("nil plugin %s", SchedulerName)
klog.V(util.LogErrorLev).Infof("ValidNPUJob err: %s.", vErr)
return vResult
}
// 2.check job train mode:distribute and single.
if vErr = tp.checkJobTrainMode(); vErr != nil {
klog.V(util.LogErrorLev).Infof("checkJobTrainMode: %s.", vErr)
return vResult
}
return tp.reHandle.ValidJobByReschedule(tp.SchedulerJobAttr)
}
// PreStartAction pre-processing actions for rescheduling
func (tp *module910x8) PreStartAction(i interface{}, _ *framework.Session) error {
k, ok := i.(*rescheduling.ReScheduler)
if !ok {
return fmt.Errorf("preStartAction failed %s, interface is not ReScheduler", SchedulerName)
}
tp.reHandle = k
return nil
}
// CheckNodeNPUByTask check nod npu meet task req
func (tp *module910x8) CheckNodeNPUByTask(task *api.TaskInfo, node plugin.NPUNode) error {
if tp == nil || task == nil || len(node.Annotation) == 0 {
err := errors.New(util.ArgumentError)
klog.V(util.LogErrorLev).Infof("CheckNodeNPUByTask err: %s", err.Error())
return err
}
if err := checkNodeLabelOK(node); err != nil {
return err
}
taskNPUNum, err := tp.GetTaskReqNPUNum(task)
if err != nil {
klog.V(util.LogErrorLev).Infof("%s CheckNodeNPUByTask err: %s", tp.GetPluginName(), err.Error())
return err
}
_, ok := tp.Jobs[task.Job]
if !ok {
err = fmt.Errorf("task<%s> is not npu task", task.Name)
klog.V(util.LogErrorLev).Infof("%s CheckNodeNPUByTask err: %s", tp.GetPluginName(), err.Error())
return err
}
nodeTop, err := tp.getUsableTopFromNode(node, tp.NPUTaskNum > 1)
if err != nil {
klog.V(util.LogErrorLev).Infof("%s CheckNodeNPUByTask err: %s", tp.GetPluginName(), err.Error())
return err
}
if err = tp.judgeNodeAndTaskNPU(taskNPUNum, nodeTop); err != nil {
klog.V(util.LogErrorLev).Infof("%s CheckNodeNPUByTask err: %s", tp.GetPluginName(), err.Error())
return fmt.Errorf("npu topology not meet job require,network unhealthy card is [ %s ]",
node.Annotation[tp.netUnhealthyKey])
}
if tp.reHandle != nil {
if reErr := tp.reHandle.CheckNodeNPUByTask(task, node, tp.ReqNPUName); reErr != nil {
return fmt.Errorf("rescheduling %s", reErr.Error())
}
}
return nil
}
// ScoreBestNPUNodes core node by calculate task req npu num and node npu top
func (tp *module910x8) ScoreBestNPUNodes(task *api.TaskInfo, nodes []*api.NodeInfo, scoreMap map[string]float64) error {
if tp == nil || task == nil || len(nodes) == 0 || len(scoreMap) == 0 {
err := errors.New(util.ArgumentError)
klog.V(util.LogErrorLev).Infof("ScoreBestNPUNodes %v.", err.Error())
return err
}
taskNPUNum, err := tp.GetTaskReqNPUNum(task)
if err != nil {
klog.V(util.LogErrorLev).Infof("%s ScoreBestNPUNodes err: %s", tp.GetPluginName(), err.Error())
return err
}
for _, node := range nodes {
if reflect.ValueOf(node).IsNil() {
continue
}
nNode, ok := tp.Nodes[node.Name]
if !ok {
klog.V(util.LogWarningLev).Infof("%s ScoreBestNPUNodes node<%s> is not npu node",
tp.GetPluginName(), node.Name)
continue
}
cardIds, err := tp.getUsableTopFromNode(nNode, tp.NPUTaskNum > 1)
if err != nil {
klog.V(util.LogWarningLev).Infof("%s ScoreBestNPUNodes err: %s", tp.GetPluginName(), err.Error())
continue
}
bestScore, err := tp.getNodeBestScore(taskNPUNum, cardIds)
if err != nil {
klog.V(util.LogWarningLev).Infof("%s ScoreBestNPUNodes err: %s", tp.GetPluginName(), err.Error())
continue
}
healthyNPUNum, ok := nNode.Allocate[v1.ResourceName(tp.GetAnnoName())]
if !ok {
klog.V(util.LogWarningLev).Infof("%s ScoreBestNPUNodes node<%s> get allocate npu failed",
tp.GetPluginName(), node.Name)
continue
}
sortScore := tp.MaxNodeNPUNum - len(cardIds)
scoreMap[node.Name] = nodeWeight*float64(int(healthyNPUNum/util.NPUHexKilo)*npuNumPerHccs-bestScore) +
float64(sortScore)
}
reErr := tp.reHandle.ScoreBestNPUNodes(task, scoreMap)
if reErr != nil {
klog.V(util.LogErrorLev).Infof(
"%s rescheduling ScoreBestNPUNodes failed :%s.", SchedulerName, reErr.Error())
}
klog.V(util.LogInfoLev).Infof("%s ScoreBestNPUNodes task<%s> scoreMap<%v>", tp.GetPluginName(),
task.Name, scoreMap)
return nil
}
// UseAnnotation select npu for task from node
func (tp *module910x8) UseAnnotation(task *api.TaskInfo, node plugin.NPUNode) *plugin.NPUNode {
if tp == nil || task == nil || len(node.Annotation) == 0 {
err := errors.New(util.ArgumentError)
klog.V(util.LogErrorLev).Infof("UseAnnotation %s.", err.Error())
return nil
}
klog.V(util.LogDebugLev).Infof("%s UseAnnotation task<%s> node<%s> resource<%s> Annotation: %s",
tp.GetPluginName(), task.Name, node.Name, tp.GetAnnoName(), util.SafePrint(node.Annotation))
selectedNPU, err := tp.selectNPUFromNode(task, node)
if err != nil {
klog.V(util.LogErrorLev).Infof("%s UseAnnotation err:%s.", tp.GetPluginName(), err.Error())
return nil
}
klog.V(util.LogInfoLev).Infof("%s UseAnnotation task<%s> select npu <%v>.",
tp.GetPluginName(), task.Name, selectedNPU)
tp.SetNPUTopologyToPodFn(task, selectedNPU, node)
newNode := tp.UpdateNodeInfo(node, selectedNPU)
return newNode
}
func (tp *module910x8) selectNPUFromNode(task *api.TaskInfo, node plugin.NPUNode) ([]int, error) {
taskNPUNum, err := tp.GetTaskReqNPUNum(task)
if err != nil {
klog.V(util.LogErrorLev).Infof("%s GetTaskReqNPUNum err: %s", tp.GetPluginName(), err.Error())
return nil, err
}
nodeTop, err := tp.getUsableTopFromNode(node, tp.NPUTaskNum > 1)
if err != nil {
klog.V(util.LogErrorLev).Infof("%s getUsableTopFromNode err: %s", tp.GetPluginName(), err.Error())
return nil, err
}
if taskNPUNum == nodeNPUNumber {
if len(nodeTop) == nodeNPUNumber {
return nodeTop, nil
}
err = fmt.Errorf("node<%s> top<%v> can not meet task req<%d>", node.Name, nodeTop, taskNPUNum)
klog.V(util.LogErrorLev).Infof("%s selectNPUFromNode err: %s", tp.GetPluginName(), err.Error())
return nil, err
}
priorityArray, err := getNPUAllocPriorityArray(taskNPUNum)
if err != nil {
klog.V(util.LogErrorLev).Info(err.Error())
return nil, err
}
klog.V(util.LogInfoLev).Infof("%s selectNPUFromNode %s[%d] priority:%v in %v.", tp.GetPluginName(),
task.Name, taskNPUNum, priorityArray, nodeTop)
leftHccsArray, rightHccsArray := getNodeHccsArray(nodeTop)
for _, priority := range priorityArray {
if priority == len(leftHccsArray) {
return leftHccsArray[:taskNPUNum], nil
}
if priority == len(rightHccsArray) {
return rightHccsArray[:taskNPUNum], nil
}
}
err = fmt.Errorf("node<%s> top<%v> can not meet task req<%d>", node.Name, len(nodeTop), taskNPUNum)
klog.V(util.LogErrorLev).Infof("%s selectNPUFromNode err: %s", tp.GetPluginName(), err.Error())
return nil, err
}
// ReleaseAnnotation Release used resource.
func (tp *module910x8) ReleaseAnnotation(_ *api.TaskInfo, node plugin.NPUNode) *plugin.NPUNode {
return &node
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ascend/mind-cluster.git
git@gitee.com:ascend/mind-cluster.git
ascend
mind-cluster
mind-cluster
v6.0.0

搜索帮助