Ai
1 Star 1 Fork 0

李沐风岚/murphy

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
dbmanager.go 3.34 KB
一键复制 编辑 原始数据 按行查看 历史
U 提交于 2024-08-08 21:56 +08:00 . murphy
package mdb
import (
"database/sql"
"fmt"
"gitee.com/oshine/murphy/core/config"
"github.com/goccy/go-json"
"github.com/gomodule/redigo/redis"
"gopkg.in/yaml.v3"
"log"
"os"
"reflect"
"time"
)
type DbManager struct {
Db *sql.DB
Rdb *redis.Pool
Ssdb *redis.Pool
}
// DbHandler 数据库句柄池
var DbHandler = DbManager{}
type Redis struct {
Host string
Port string
Db string
Init int
MaxCon int `yaml:"maxCon"`
Idle int
Psw string
}
type Database struct {
Adapter string
User string
Pass string
Host string
Port string
Db string
Prefix string
}
type DbConfig struct {
Database *Database
Redis, Ssdb *Redis
}
func InitAsyncTask(fs []IAsync) {
var funMap = make(map[string]IAsync)
{
if len(fs) == 0 {
log.Println("There is no async task, Async task stop.")
return
}
for _, v := range fs {
f := reflect.TypeOf(v)
n := f.Elem().Name()
funMap[n] = v
}
}
execTask := func(tid, r []uint8) {
var p = &DelayObj{}
err := json.Unmarshal(r, p)
if err != nil {
println(err.Error())
}
log.Println("async start:", p.Params)
if nr, ok := funMap[p.Name]; ok {
nr.AsyncStart(p)
}
rds := DbHandler.GetRds(9)
defer rds.Close()
rds.Send("del", tid)
rds.Send("zrem", START_QUEUE_NAME, tid)
err = rds.Flush()
if err != nil {
println("async del error:", err.Error())
}
}
go func() {
s := `local qname='` + DEFAULT_QUEUE_NAME + `'
local r=redis.call('zrangebyscore', qname,'-inf',KEYS[1],'limit',0,1);
if #r==1 then
local id=r[1]
if redis.call('zrem',qname,id)==1 then
redis.call('zadd','` + START_QUEUE_NAME + `', KEYS[1],id)
return {id,redis.call('get',id)}
end;
end
`
var (
rds redis.Conn
r1 any
err error
lsleep = time.Millisecond * 20000
fsleep = time.Second
sc = redis.NewScript(1, s)
)
link := func() {
time.Sleep(fsleep)
if rds != nil {
rds.Close()
}
rds = DbHandler.GetRds(9)
}
link()
s = ""
for {
//-- 解锁
r1, err = sc.Do(rds, time.Now().Unix())
if err != nil {
println("async read error:", err.Error())
link()
continue
}
if r1 != nil {
switch r1.(type) {
case []any:
r := r1.([]any)
go execTask(r[0].([]uint8), r[1].([]uint8))
time.Sleep(fsleep)
continue
default:
println("async read unknown type")
}
}
time.Sleep(lsleep)
}
}()
}
// 1.Parse yaml config
//
// 2.Init database pool
//
// 3.Load .env file
func ParseDbConfigAndInit(path string) bool {
file, err := os.ReadFile(path)
if err != nil {
fmt.Println(err.Error())
return false
}
err = config.LoadEnv()
if err != nil {
fmt.Println(err.Error())
}
dc := &DbConfig{}
err = yaml.Unmarshal(file, dc)
if err != nil {
fmt.Println(err.Error())
return false
}
config.FromEnv(dc.Database)
config.FromEnv(dc.Redis)
config.FromEnv(dc.Ssdb)
err = DbHandler.InitDb(dc.Database)
if err != nil {
println("connect database fail with host:", dc.Database.Host+", port:"+dc.Database.Port+", user:"+dc.Database.User)
return false
}
if !DbHandler.InitRds(dc.Redis) {
return false
}
if !DbHandler.InitSsdb(dc.Ssdb) {
return false
}
TABLE_PREFIX = dc.Database.Prefix
murphy()
return true
}
func murphy() {
p, _ := os.ReadFile("murphy.logo")
v, _ := os.ReadFile("ver")
fmt.Println(fmt.Sprintf(string(p), string(v)))
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/oshine/murphy.git
git@gitee.com:oshine/murphy.git
oshine
murphy
murphy
v1.0.26

搜索帮助