1 Star 0 Fork 0

陈天/taibak

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rtask.go 10.49 KB
一键复制 编辑 原始数据 按行查看 历史
package models
import (
"errors"
"strings"
"time"
"github.com/go-xorm/xorm"
)
type TaskProtocol int8
const (
TaskDataBase TaskProtocol = iota + 1
TaskDoSql //执行sql
TaskHTTP // HTTP协议
TaskRPC // RPC方式执行命令
)
type TaskExeType int8
const (
BakDb TaskExeType = iota + 1 // 备份数据库
ExeSql // 执行脚本
)
type TaskLevel int8
const (
TaskLevelParent TaskLevel = 1 // 父任务
TaskLevelChild TaskLevel = 2 // 子任务(依赖任务)
)
type TaskDependencyStatus int8
const (
TaskDependencyStatusStrong TaskDependencyStatus = 1 // 强依赖
TaskDependencyStatusWeak TaskDependencyStatus = 2 // 弱依赖
)
type TaskHTTPMethod int8
const (
TaskHTTPMethodGet TaskHTTPMethod = 1
TaskHttpMethodPost TaskHTTPMethod = 2
)
type DTaskHostDetail struct {
Dhostname string `json:"dhostname"`
}
// 任务
type Rtask struct {
Id int `json:"id" xorm:"int pk autoincr"`
Name string `json:"name" xorm:"varchar(32) notnull"` // 任务名称
Level TaskLevel `json:"level" xorm:"tinyint notnull index default 1"` // 任务等级 1: 主任务 2: 依赖任务
DependencyTaskId string `json:"dependency_task_id" xorm:"varchar(64) notnull default ''"` // 依赖任务ID,多个ID逗号分隔
DependencyStatus TaskDependencyStatus `json:"dependency_status" xorm:"tinyint notnull default 1"` // 依赖关系 1:强依赖 主任务执行成功, 依赖任务才会被执行 2:弱依赖
Spec string `json:"spec" xorm:"varchar(64) notnull"` // crontab
Protocol TaskProtocol `json:"protocol" xorm:"tinyint notnull index"` // 协议 1:http 2:系统命令
Command string `json:"command" xorm:"varchar(256) notnull"` // URL地址或shell命令
HttpMethod TaskHTTPMethod `json:"http_method" xorm:"tinyint notnull default 1"` // http请求方法
Timeout int `json:"timeout" xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制
Multi int8 `json:"multi" xorm:"tinyint notnull default 1"` // 是否允许多实例运行
RetryTimes int8 `json:"retry_times" xorm:"tinyint notnull default 0"` // 重试次数
RetryInterval int16 `json:"retry_interval" xorm:"smallint notnull default 0"` // 重试间隔时间
NotifyStatus int8 `json:"notify_status" xorm:"tinyint notnull default 1"` // 任务执行结束是否通知 0: 不通知 1: 失败通知 2: 执行结束通知 3: 任务执行结果关键字匹配通知
NotifyType int8 `json:"notify_type" xorm:"tinyint notnull default 0"` // 通知类型 1: 邮件 2: slack 3: webhook
NotifyReceiverId string `json:"notify_receiver_id" xorm:"varchar(256) notnull default '' "` // 通知接受者ID, setting表主键ID,多个ID逗号分隔
NotifyKeyword string `json:"notify_keyword" xorm:"varchar(128) notnull default '' "`
Tag string `json:"tag" xorm:"varchar(32) notnull default ''"`
Remark string `json:"remark" xorm:"varchar(100) notnull default ''"` // 备注
Status Status `json:"status" xorm:"tinyint notnull index default 0"` // 状态 1:正常 0:停止
Created time.Time `json:"created" xorm:"datetime notnull created"` // 创建时间
Deleted time.Time `json:"deleted" xorm:"datetime deleted"` // 删除时间
DhostId int `json:"dhost_id" xorm:"int"`
TaskType TaskExeType `json:"tasktype" xorm:"tinyint notnull"`
BaseModel `json:"-" xorm:"-"`
Hosts []TaskHostDetail `json:"hosts" xorm:"-"`
// DHost DHost `json:"dhost" xorm:"-"`
DataBHost DTaskHostDetail `json:"datahosts" xorm:"-"`
NextRunTime time.Time `json:"next_run_time" xorm:"-"`
}
func taskHostTableNamebak() []string {
return []string{TablePrefix + "task_host", "th"}
}
// 返回表名
func (task *Rtask) TableName() string {
return "task" //TablePrefix +
}
// 新增
func (task *Rtask) Create() (insertId int, err error) {
_, err = Db.Insert(task)
if err == nil {
insertId = task.Id
}
return
}
func (task *Rtask) UpdateBean(id int) (int64, error) {
return Db.ID(id).
Cols(`name,spec,protocol,command,timeout,multi,
retry_times,retry_interval,remark,notify_status,
notify_type,notify_receiver_id, dependency_task_id, dependency_status, tag,http_method, notify_keyword`).
Update(task)
}
// 更新
func (task *Rtask) Update(id int, data CommonMap) (int64, error) {
return Db.Table(task).ID(id).Update(data)
}
// 删除
func (task *Rtask) Delete(id int) (int64, error) {
return Db.Id(id).Delete(task)
}
// 禁用
func (task *Rtask) Disable(id int) (int64, error) {
return task.Update(id, CommonMap{"status": Disabled})
}
// 激活
func (task *Rtask) Enable(id int) (int64, error) {
return task.Update(id, CommonMap{"status": Enabled})
}
// 获取所有激活任务
func (task *Rtask) ActiveList(page, pageSize int) ([]Rtask, error) {
params := CommonMap{"Page": page, "PageSize": pageSize}
task.parsePageAndPageSize(params)
list := make([]Rtask, 0)
err := Db.Where("status = ? AND level = ?", Enabled, TaskLevelParent).Limit(task.PageSize, task.pageLimitOffset()).
Find(&list)
if err != nil {
return list, err
}
task.setHostsForTasks(list)
return task.setDataHostsForTasks(list, true)
}
// 获取某个主机下的所有激活任务
func (task *Rtask) ActiveListByHostId(hostId int16) ([]Rtask, error) { //修改主机后 刷新任务主机信息
taskHostModel := new(TaskHost)
taskIds, err := taskHostModel.GetTaskIdsByHostId(hostId)
if err != nil {
return nil, err
}
list := make([]Rtask, 0)
err = Db.Where("status = ? AND level = ?", Enabled, TaskLevelParent).
In("id", taskIds...).
Find(&list)
if err != nil {
return list, err
}
tasks, err := task.setHostsForTasks(list)
return task.setDataHostsForTasks(tasks, true)
}
func (task *Rtask) setHostsForTasks(tasks []Rtask) ([]Rtask, error) {
taskHostModel := new(TaskHost)
var err error
for i, value := range tasks {
taskHostDetails, err := taskHostModel.GetHostIdsByTaskId(value.Id)
if err != nil {
return nil, err
}
tasks[i].Hosts = taskHostDetails
}
return tasks, err
}
func (task *Rtask) setDataHostsForTasks(tasks []Rtask, showDb bool) ([]Rtask, error) {
taskDHost := DHost{}
var err error
for i, value := range tasks {
/* taskHostDetails, err := taskDataHostModel.GetHostIdsByTaskId(value.Id)
if err != nil {
return nil, err
}
if len(taskHostDetails) == 0 {
continue
}*/
if value.DhostId >= 0 {
err = taskDHost.Find(value.DhostId)
if err == nil {
tasks[i].DataBHost.Dhostname = taskDHost.Name
}
/* if showDb {
tasks[i].DHost = taskDHost //这里看下能否GC
}*/
}
}
return tasks, err
}
// 判断任务名称是否存在
func (task *Rtask) NameExist(name string, id int) (bool, error) {
if id > 0 {
count, err := Db.Where("name = ? AND status = ? AND id != ?", name, Enabled, id).Count(task)
return count > 0, err
}
count, err := Db.Where("name = ? AND status = ?", name, Enabled).Count(task)
return count > 0, err
}
func (task *Rtask) GetStatus(id int) (Status, error) {
exist, err := Db.Id(id).Get(task)
if err != nil {
return 0, err
}
if !exist {
return 0, errors.New("not exist")
}
return task.Status, nil
}
func (task *Rtask) Detail(id int) (Rtask, error) {
t := Rtask{}
_, err := Db.Where("id=?", id).Get(&t)
if err != nil {
return t, err
}
if t.Protocol == TaskRPC || t.Protocol == TaskDataBase {
//t.DataBHost = TaskDataHostDetail{TaskDataHost{1,5,6},"name",1433}
taskHostModel := new(TaskHost)
t.Hosts, err = taskHostModel.GetHostIdsByTaskId(id)
}
taskDHost := DHost{}
if t.Protocol == TaskDataBase { //modfy qjh 20200809
//t.Hosts = []TaskHostDetail{}
//t.Hosts[0] = TaskHostDetail{ TaskHost{1,5,6},"name",1433,"alis"}
//taskDataHostModel := new(TaskDataHost)
//DataBHosts, err := taskDataHostModel.GetHostIdsByTaskId(id)
err = taskDHost.Find(t.DhostId)
if err == nil {
t.DataBHost.Dhostname = taskDHost.Name
}
/* if err == nil {
//t.DataBHost = DataBHosts[0]
// }
}*/
}
return t, err
}
func (task *Rtask) List(params CommonMap) ([]Rtask, error) {
task.parsePageAndPageSize(params)
list := make([]Rtask, 0)
session := Db.Alias("t").Join("LEFT", taskHostTableName(), "t.id = th.task_id")
task.parseWhere(session, params)
err := session.GroupBy("t.id").Desc("t.id").Cols("t.*").Limit(task.PageSize, task.pageLimitOffset()).Find(&list)
if err != nil {
return nil, err
}
tasks, err := task.setHostsForTasks(list)
// tasks, err
return task.setDataHostsForTasks(tasks, false)
}
// 获取依赖任务列表
func (task *Rtask) GetDependencyTaskList(ids string) ([]Rtask, error) {
list := make([]Rtask, 0)
if ids == "" {
return list, nil
}
idList := strings.Split(ids, ",")
taskIds := make([]interface{}, len(idList))
for i, v := range idList {
taskIds[i] = v
}
fields := "t.*"
err := Db.Alias("t").
Where("t.level = ?", TaskLevelChild).
In("t.id", taskIds).
Cols(fields).
Find(&list)
if err != nil {
return list, err
}
return task.setHostsForTasks(list)
}
func (task *Rtask) Total(params CommonMap) (int64, error) {
session := Db.Alias("t").Join("LEFT", taskHostTableName(), "t.id = th.task_id")
task.parseWhere(session, params)
list := make([]Rtask, 0)
err := session.GroupBy("t.id").Find(&list)
return int64(len(list)), err
}
// 解析where
func (task *Rtask) parseWhere(session *xorm.Session, params CommonMap) {
if len(params) == 0 {
return
}
id, ok := params["Id"]
if ok && id.(int) > 0 {
session.And("t.id = ?", id)
}
hostId, ok := params["HostId"]
if ok && hostId.(int) > 0 {
session.And("th.host_id = ?", hostId)
}
name, ok := params["Name"]
if ok && name.(string) != "" {
session.And("t.name LIKE ?", "%"+name.(string)+"%")
}
protocol, ok := params["Protocol"]
if ok && protocol.(int) > 0 {
session.And("protocol = ?", protocol)
}
status, ok := params["Status"]
if ok && status.(int) > -1 {
session.And("status = ?", status)
}
tag, ok := params["Tag"]
if ok && tag.(string) != "" {
session.And("tag = ? ", tag)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/hakwolf/taibak.git
git@gitee.com:hakwolf/taibak.git
hakwolf
taibak
taibak
000f062082a1

搜索帮助