1 Star 1 Fork 0

D10.天地弦 / gobase

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bypass.go 7.67 KB
一键复制 编辑 原始数据 按行查看 历史
D10.天地弦 提交于 2024-04-23 14:47 . * 添加Lock/Unlock/GetID/Tag属性
package bypass
import (
"fmt"
"sort"
"strings"
"sync"
"time"
)
type ByPassItem struct {
lk sync.RWMutex
sn int16
id string
online int
max int
usagecnt int
upflag int8
data interface{}
Tag interface{}
blockT time.Time
failCnt int
lastFailMsg string
lastFailT time.Time
}
func (this *ByPassItem) RLock() {
this.lk.RLock()
}
func (this *ByPassItem) RUnlock() {
this.lk.RUnlock()
}
func (this *ByPassItem) Lock() {
this.lk.Lock()
}
func (this *ByPassItem) Unlock() {
this.lk.Unlock()
}
func (this *ByPassItem) GetID() string {
return this.id
}
func (this *ByPassItem) directGet() {
this.lk.Lock()
defer this.lk.Unlock()
this.online++
this.usagecnt++
}
func (this *ByPassItem) tryGet() bool {
this.lk.Lock()
defer this.lk.Unlock()
if this.max == 0 || this.online < this.max {
if this.blockT.IsZero() || time.Now().After(this.blockT) {
this.online++
this.usagecnt++
return true
}
}
return false
}
func (this *ByPassItem) GetData() interface{} {
return this.data
}
// 分流组管理,管理分流节点
type ByPassGroup struct {
sn int16
strategy int8 // 组的分流策略 0:依次分流(前面不可用时使用后面的,遇到不可用时跳过),1:轮流分流(遇到不可用时跳过)
id string
selidx int // 当前选取序号(-1:未开始选择)
lk sync.RWMutex
lst []*ByPassItem
itmMap map[string]*ByPassItem
loadflag int8
batchUpdateflag int8
}
func NewByPassGroup() *ByPassGroup {
rval := &ByPassGroup{itmMap: make(map[string]*ByPassItem), selidx: -1}
return rval
}
func (this *ByPassGroup) ConfigStrategy(v int8) {
if this.batchUpdateflag == 0 {
this.lk.Lock()
defer this.lk.Unlock()
}
this.strategy = v
}
func (this *ByPassGroup) innerSortBy() {
if len(this.lst) > 0 {
this.lst = this.lst[:0]
}
for _, v := range this.itmMap {
this.lst = append(this.lst, v)
}
sort.Slice(this.lst, func(i, j int) bool {
return this.lst[i].sn < this.lst[j].sn
})
}
func (this *ByPassGroup) UpdateByPassNode(id string, sn int16, max int, data interface{}) (isChanged bool) {
if this.batchUpdateflag == 0 {
this.lk.Lock()
defer this.lk.Unlock()
}
itm := this.itmMap[id]
if itm == nil {
itm = &ByPassItem{id: id}
this.itmMap[id] = itm
isChanged = true
}
if itm.sn != sn {
isChanged = true
itm.sn = sn
}
itm.data = data
itm.max = max
if this.batchUpdateflag == 1 { // 批量更新中
itm.upflag = 1
}
if isChanged && this.batchUpdateflag == 0 {
this.innerSortBy()
}
return
}
func (this *ByPassGroup) Status() string {
var sb strings.Builder
this.lk.RLock()
defer this.lk.RUnlock()
idx := this.selidx
if idx < 0 {
idx = 0
}
if len(this.id) > 0 {
sb.WriteString(fmt.Sprintf("group:%s, ", this.id))
}
sb.WriteString(fmt.Sprintf("strategy:%d(%d)\n", this.strategy, idx%len(this.lst)))
for i := 0; i < len(this.lst); i++ {
s := this.lst[i]
sb.WriteString(fmt.Sprintf("%s\t%d/%d/%d\t%s\t%d,%s,%s\n", s.id, s.online, s.max, s.usagecnt,
DateTimeString2(s.blockT), s.failCnt, DateTimeString2(s.lastFailT), s.lastFailMsg))
}
sb.WriteString("\n")
return sb.String()
}
// 开始更新, 和EndUpdate配套使用
func (this *ByPassGroup) BeginUpdate() {
this.lk.Lock()
this.batchUpdateflag = 1
}
// removeflag true:清理没有更新过的
// upcnt 本次更新数量
func (this *ByPassGroup) EndUpdate(removeflag bool) (upcnt int) {
for k, v := range this.itmMap {
if v.upflag == 0 {
if removeflag { // 清理不存在的
v.upflag = -1
delete(this.itmMap, k)
upcnt++
}
} else {
upcnt++
v.upflag = 0
}
}
this.batchUpdateflag = 0
this.innerSortBy()
this.lk.Unlock()
return
}
func (this *ByPassGroup) DeleteByPassItem(id string) (isChanged bool) {
if this.batchUpdateflag == 0 {
this.lk.Lock()
defer this.lk.Unlock()
}
if itm, ok := this.itmMap[id]; ok {
itm.data = nil
delete(this.itmMap, id)
isChanged = true
}
if isChanged && this.batchUpdateflag == 0 {
this.innerSortBy()
}
return
}
// 获取分流节点
func (this *ByPassGroup) GetByPassNode() (node *ByPassItem) {
this.lk.RLock()
defer this.lk.RUnlock()
l := len(this.lst)
if l == 0 {
return nil
}
startidx := 0
if this.strategy == 1 { // 轮流
startidx = this.selidx + 1
if startidx < 0 {
startidx = 0
}
}
for i := startidx; i < startidx+len(this.lst); i++ {
idx := i % l
sitem := this.lst[i%l]
if sitem.tryGet() {
this.selidx = idx
return sitem
}
}
return nil
// 上面都获取不到,获取第一个
// sitem := this.lst[0]
// sitem.directGet()
// return sitem
}
// 获取分流节点
func innerReleaseByPassNode(bypassNode *ByPassItem, fail bool, block time.Duration, failmsg string) {
if bypassNode == nil {
return
}
if bypassNode.upflag == -1 {
return
}
bypassNode.lk.Lock()
defer bypassNode.lk.Unlock()
bypassNode.online--
if fail {
bypassNode.failCnt++
if block > 0 {
bypassNode.blockT = time.Now().Add(block)
}
bypassNode.lastFailT = time.Now()
bypassNode.lastFailMsg = failmsg
}
}
// 获取分流节点
func (this *ByPassGroup) ReleaseByPassNode(node *ByPassItem, fail bool, block time.Duration, failmsg string) {
innerReleaseByPassNode(node, fail, block, failmsg)
}
type ByPassMananger struct {
lk sync.RWMutex
lst map[string]*ByPassGroup
defaultItem *ByPassGroup
}
func NewByPassManager() *ByPassMananger {
rval := &ByPassMananger{
lst: make(map[string]*ByPassGroup),
}
return rval
}
func (this *ByPassMananger) CheckGetByPassGroup(id string, newflag bool) *ByPassGroup {
this.lk.RLock()
itm := this.lst[id]
this.lk.RUnlock()
if itm != nil {
return itm
}
if newflag {
this.lk.Lock()
defer this.lk.Unlock()
itm = this.lst[id]
if itm == nil {
itm = NewByPassGroup()
itm.id = id
this.lst[id] = itm
}
return itm
} else {
return nil
}
}
func (this *ByPassMananger) GetStatusText(lvl int) string {
if lvl == 1 {
var sb strings.Builder
this.lk.RLock()
lst := make([]*ByPassGroup, 0, len(this.lst))
for _, itm := range this.lst {
lst = append(lst, itm)
}
this.lk.RUnlock()
sort.Slice(lst, func(i, j int) bool {
if lst[i].sn != lst[j].sn {
return lst[i].sn < lst[j].sn
}
return lst[i].id < lst[j].id
})
for _, g0 := range lst {
defflag := ""
if g0 == this.defaultItem {
defflag = "(def)"
}
g0.lk.RLock()
idx := g0.selidx
if idx < 0 {
idx = 0
}
sb.WriteString(fmt.Sprintf("group:%s%s, strategy:%d(%d)\n", g0.id, defflag, g0.strategy, idx%len(g0.lst)))
for i := 0; i < len(g0.lst); i++ {
s := g0.lst[i]
sb.WriteString(fmt.Sprintf("%s\t%v\t%d/%d/%d\t%s\t%d,%s,%s\n", s.id, s.data, s.online, s.max, s.usagecnt,
DateTimeString2(s.blockT), s.failCnt, DateTimeString2(s.lastFailT), s.lastFailMsg))
}
sb.WriteString("\n")
g0.lk.RUnlock()
}
return sb.String()
}
return ""
}
func (this *ByPassMananger) TrySetDefault(id string) bool {
if len(id) == 0 {
return false
}
itm := this.CheckGetByPassGroup(id, false)
if itm == nil {
return false
}
this.defaultItem = itm
return true
}
// 获取一个分流节点
func (this *ByPassMananger) GetByPassNode(groupid string) (rval *ByPassItem) {
var group *ByPassGroup
if len(groupid) > 0 {
group = this.CheckGetByPassGroup(groupid, false)
}
if group == nil {
group = this.defaultItem
if group == nil {
return nil
}
}
return group.GetByPassNode()
}
// 释放一个分流节点
func (this *ByPassMananger) ReleaseByPassNode(itm *ByPassItem, fail bool, block time.Duration, failmsg string) {
innerReleaseByPassNode(itm, fail, block, failmsg)
}
func DateTimeString2(val time.Time) string {
if val.IsZero() {
return ""
} else {
return val.Format("2006-01-02 15:04:05")
}
}
1
https://gitee.com/ymofen/gobase.git
git@gitee.com:ymofen/gobase.git
ymofen
gobase
gobase
v1.2.24053

搜索帮助