diff --git a/entry.go b/entry.go index 3711fccca39e6a8c8572a3b0ce747bd3b2a5346f..d845f74879f23252195975a3ed9d483b4486239a 100644 --- a/entry.go +++ b/entry.go @@ -21,14 +21,14 @@ func (e *Entry) Obsoleted() bool { if e.Obsolete <= 0 { return true } - if e.Obsolete < time.Now().UnixNano() { + if e.Obsolete < time.Now().Unix() { return true } return false } func (e *Entry) GetObsoleteTTL() (second int64) { - return (e.Obsolete - time.Now().UnixNano()) / int64(time.Second) + return e.Obsolete - time.Now().Unix() } // Expired means that the data is unavailable and data needs to be synchronized @@ -36,14 +36,14 @@ func (e *Entry) Expired() bool { if e.Expiration <= 0 { return true } - if e.Expiration < time.Now().UnixNano() { + if e.Expiration < time.Now().Unix() { return true } return false } func (e *Entry) GetExpireTTL() (second int64) { - return (e.Expiration - time.Now().UnixNano()) / int64(time.Second) + return e.Expiration - time.Now().Unix() } func (e *Entry) String() string { @@ -55,8 +55,8 @@ func NewEntry(v interface{}, second int) *Entry { ttl := second var od, e int64 if second > 0 { - od = time.Now().Add(time.Duration(second) * time.Second).UnixNano() - e = time.Now().Add(time.Duration(second*EntryLazyFactor) * time.Second).UnixNano() + od = time.Now().Add(time.Duration(second) * time.Second).Unix() + e = time.Now().Add(time.Duration(second*EntryLazyFactor) * time.Second).Unix() } return &Entry{ Value: v, diff --git a/example-run.sh b/example-run.sh index a36d13d1beff8e12c732620247e72ce65a30ddc1..733749d496a9ad9b5df5f54d8e7466fd5d8c15ff 100644 --- a/example-run.sh +++ b/example-run.sh @@ -1,7 +1,10 @@ +rm -rf vendor +rm -rf go.mod +rm -rf go.sum go mod init go mod tidy go mod vendor cd ./example || exit -echo "[g2cache.example] 使用默认的redis配置" +rm -rf g2cache-example go build -o g2cache-example main.go ./g2cache-example \ No newline at end of file diff --git a/example/main.go b/example/main.go index 382a30e7654cccb18c539746b36fd2cb9bc4d422..274a083e78326498227952abe2d0d2745d37d257 100644 --- a/example/main.go +++ b/example/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "gitee.com/kelvins-io/g2cache" "log" "math" @@ -12,23 +13,26 @@ import ( ) var ( - testTotal = math.MaxInt8 + testTotal = math.MaxInt16 ) +var appName = "example-" + func main() { g2cache.CacheDebug = true g2cache.CacheMonitor = true - g2cache.CacheMonitorSecond = 5 - g2cache.OutCachePubSub = false - g2cache.EntryLazyFactor = 10 - g2cache.DefaultGPoolWorkerNum = 300 - g2cache.DefaultGPoolJobQueueChanLen = 3000 + g2cache.CacheMonitorSecond = 10 + g2cache.OutCachePubSub = true + g2cache.EntryLazyFactor = 18 + g2cache.DefaultGPoolWorkerNum = 20 + g2cache.DefaultGPoolJobQueueChanLen = 300 g2cache.DefaultFreeCacheSize = 100 * 1024 * 1024 // 100MB g2cache.DefaultPubSubRedisChannel = "g2cache-pubsub-channel" g2cache.DefaultRedisConf.DSN = "127.0.0.1:6379" g2cache.DefaultRedisConf.DB = 1 g2cache.DefaultRedisConf.Pwd = "" g2cache.DefaultRedisConf.MaxConn = 30 + g2cache.DefaultPubSubRedisConf = g2cache.DefaultRedisConf g2,err := g2cache.New(nil, nil) if err != nil { log.Println("g2cache New",err) @@ -39,7 +43,10 @@ func main() { m := g2cache.HitStatisticsOut.String() _, _ = writer.Write([]byte(m)) }) - err := http.ListenAndServe("0.0.0.0:6060", nil) + port := 6000+rand.Intn(1000) + addr := fmt.Sprintf("0.0.0.0:%d",port) + log.Println("g2cache-example run at",addr) + err := http.ListenAndServe(addr, nil) if err != nil { log.Println("server ", err) } @@ -63,23 +70,23 @@ func main() { func delKey(g2 *g2cache.G2Cache, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < testTotal; i++ { - key := g2cache.GenKey("g2cache-key", rand.Intn(math.MaxInt8)) + key := g2cache.GenKey(appName, rand.Intn(math.MaxInt8)) err := g2.Del(key, false) if err != nil { log.Fatalln(err) } - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) } } func setKey(g2 *g2cache.G2Cache, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < testTotal; i++ { - key := g2cache.GenKey("g2cache-key", rand.Intn(math.MaxInt8)) + key := g2cache.GenKey(appName, rand.Intn(math.MaxInt8)) obj := &Object{ ID: i, Value: key, - Address: []string{"example setKey 未来星球🌲✨", time.Now().String()}, + Address: []string{"example setKey 未来星球🌲✨", appName}, Car: &Car{ Name: "概念🚗,✈,🚚️", Price: float64(i) / 100, @@ -89,14 +96,14 @@ func setKey(g2 *g2cache.G2Cache, wg *sync.WaitGroup) { if err != nil { log.Fatalln(err) } - time.Sleep(7 * time.Second) + time.Sleep(5 * time.Second) } } func getKey(g2 *g2cache.G2Cache, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < testTotal; i++ { - key := g2cache.GenKey("g2cache-key", rand.Intn(math.MaxInt8)) + key := g2cache.GenKey(appName, rand.Intn(math.MaxInt8)) var o Object // ttl is second err := g2.Get(key, 30, &o, func() (interface{}, error) { @@ -104,7 +111,7 @@ func getKey(g2 *g2cache.G2Cache, wg *sync.WaitGroup) { return &Object{ ID: i, Value: key, - Address: []string{"example getKey 未来星球🌲✨", time.Now().String()}, + Address: []string{"example getKey 未来星球🌲✨", appName}, Car: &Car{ Name: "概念🚗,✈,🚚️", Price: float64(i) / 100, @@ -120,7 +127,7 @@ func getKey(g2 *g2cache.G2Cache, wg *sync.WaitGroup) { // return //} //fmt.Printf("%s\n",out) - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) } } @@ -139,3 +146,8 @@ type Car struct { //func (o *Object) DeepCopy() interface{} { // return &(*o) //} + +func init() { + rand.Seed(time.Now().UnixNano()) + appName += fmt.Sprintf("%d",rand.Intn(100)) +} \ No newline at end of file diff --git a/g2cache.go b/g2cache.go index f2a09f91f4d8a1decc335e6dead6242ff8b559eb..eaff93a4e36306f918ee9e814e4b78c53553af1f 100644 --- a/g2cache.go +++ b/g2cache.go @@ -25,9 +25,8 @@ var ( DefaultGPoolJobQueueChanLen = 1000 ) -var ( - HitStatisticsOut HitStatistics -) +var HitStatisticsOut HitStatistics +var json = jsoniter.ConfigCompatibleWithStandardLibrary type G2Cache struct { GID string // Identifies the number of an instance @@ -252,7 +251,7 @@ func (g *G2Cache) syncOutCache(key string, ttlSecond int, fn LoadDataSourceFunc) if ok && OutCachePubSub { _err = pubsub.Publish(g.GID, key, SetPublishType, e) if _err != nil { - eS, _ := jsoniter.MarshalToString(e) + eS, _ := json.MarshalToString(e) LogErrF("syncOutCache key=%s,val=%s,err=%v\n", key, eS, err) } } @@ -295,11 +294,10 @@ func (g *G2Cache) set(key string, obj interface{}, ttlSecond int, wait bool) (er if wait { return g.setInternal(key, v) } - g.gPool.SendJob(func() { _err := g.setInternal(key, v) if _err != nil { - objS, _ := jsoniter.MarshalToString(v) + objS, _ := json.MarshalToString(v) LogErrF("setInternal key: %s,obj: %s ,err: %v", key, objS, err) } }) @@ -375,17 +373,9 @@ func (g *G2Cache) subscribe() error { return g.subscribeInternal() } -func (g *G2Cache) subscribeInternal() error { - pubsub, ok := g.out.(PubSub) - if !ok { - return CacheNotImplementPubSub - } - err := pubsub.Subscribe(g.channel) - if err != nil { - return err - } - - for meta := range g.channel { +func (g *G2Cache) subscribeHandle() error { + for ele := range g.channel { + meta := *ele // for range pointer please to do select { case <-g.stop: return OutStorageClose @@ -394,32 +384,65 @@ func (g *G2Cache) subscribeInternal() error { if meta.Gid == g.GID { continue } - key := meta.Key + if meta.Key == "" { + if CacheDebug { + LogDebugF("subscribeHandle receive meta.Key is null: %+v\n", meta) + } + continue + } + if CacheDebug { + metaDump, _ := json.MarshalToString(meta) + LogDebugF("subscribeHandle receive meta: %v\n", metaDump) + } + switch meta.Action { case DelPublishType: g.gPool.SendJob(func() { - if err = g.out.Del(key); err != nil { - LogErrF("out del key=%s, err=%v\n", key, err) + if err := g.out.Del(meta.Key); err != nil { + LogErrF("out del key=%s, err=%v\n", meta.Key, err) } - if err := g.local.Del(key); err != nil { - LogErrF("local del key=%s, err=%v\n", key, err) + if err := g.local.Del(meta.Key); err != nil { + LogErrF("local del key=%s, err=%v\n", meta.Key, err) } }) case SetPublishType: + if meta.Data.Value == nil { + if CacheDebug { + LogDebugF("subscribeHandle receive meta.Data is nil: %+v\n", meta) + } + continue + } g.gPool.SendJob(func() { - if err = g.local.Set(meta.Key, meta.Data); err != nil { - dataS, _ := jsoniter.MarshalToString(meta.Data) - LogErrF("local set key=%s,val=%s, err=%v\n", key, dataS, err) + if err := g.local.Set(meta.Key, meta.Data); err != nil { + dataS, _ := json.MarshalToString(meta.Data) + LogErrF("local set key=%s,val=%s, err=%v\n", meta.Key, dataS, err) } - if err = g.out.Set(key, meta.Data); err != nil { - dataS, _ := jsoniter.MarshalToString(meta.Data) - LogErrF("out set key=%s,val=%s, err=%v\n", key, dataS, err) + if err := g.out.Set(meta.Key, meta.Data); err != nil { + dataS, _ := json.MarshalToString(meta.Data) + LogErrF("out set key=%s,val=%s, err=%v\n", meta.Key, dataS, err) } }) default: continue } } + return nil +} + +func (g *G2Cache) subscribeInternal() error { + pubsub, ok := g.out.(PubSub) + if !ok { + return CacheNotImplementPubSub + } + + g.gPool.SendJob(wrapFuncErr(func() error { + return g.subscribeHandle() + })) + + err := pubsub.Subscribe(g.channel) + if err != nil { + return err + } return nil } diff --git a/g2cache_helper.go b/g2cache_helper.go index 368b43b0d09615766eb7b33eb72d2d4a4c270c88..31d27e96d4e8ac44523b650af22baa5cb7703a4a 100644 --- a/g2cache_helper.go +++ b/g2cache_helper.go @@ -2,12 +2,10 @@ package g2cache import ( "bytes" - "encoding/hex" "errors" "fmt" - jsoniter "github.com/json-iterator/go" + "github.com/google/uuid" "github.com/mohae/deepcopy" - "math/rand" "reflect" "strconv" "sync/atomic" @@ -96,7 +94,7 @@ type HitStatistics struct { } func (h *HitStatistics) String() string { - v, _ := jsoniter.MarshalToString(h) + v, _ := json.MarshalToString(h) return v } @@ -118,29 +116,6 @@ func (h *HitStatistics) StatisticsLocalStorage() { h.HitLocalStorageTotalRate = float64(atomic.LoadInt64(&h.HitLocalStorageTotal)) / float64(atomic.LoadInt64(&h.AccessGetTotal)) } -func encodeUUID(u []byte) string { - buf := make([]byte, 36) - hex.Encode(buf[0:8], u[0:4]) - buf[8] = '-' - hex.Encode(buf[9:13], u[4:6]) - buf[13] = '-' - hex.Encode(buf[14:18], u[6:8]) - buf[18] = '-' - hex.Encode(buf[19:23], u[8:10]) - buf[23] = '-' - hex.Encode(buf[24:], u[10:]) - return string(buf) -} - -// NewUUID create v4 uuid -// More powerful UUID libraries can be used: https://github.com/google/uuid func NewUUID() (string, error) { - b := make([]byte, 16) - _, err := rand.Read(b) - if err != nil { - return "", err - } - b[6] = (b[6] & 0x0f) | 0x40 - b[8] = (b[8] & 0x3f) | 0x80 - return encodeUUID(b), nil + return uuid.New().String(), nil } diff --git a/grpool.go b/grpool.go index 3a3e2e5378a46ea09bb40160851efd2ee429c1a7..b5b5b99dc2f0677f0658a8215798049002a93188 100644 --- a/grpool.go +++ b/grpool.go @@ -2,8 +2,8 @@ package g2cache // thank https://github.com/ivpusic/grpool import ( + "runtime" "sync" - "sync/atomic" "time" ) @@ -27,8 +27,8 @@ func (w *worker) start() { if CacheDebug { LogDebugF("Pool [%d] worker <-stop\n", w.id) } - if len(w.pool.JobQueue) != 0 { - for job := range w.pool.JobQueue { + if len(w.pool.jobQueue) != 0 { + for job := range w.pool.jobQueue { runJob(w.id, job) } } @@ -36,7 +36,7 @@ func (w *worker) start() { LogDebugF("Pool [%d] worker exit\n", w.id) } return - case job, ok := <-w.pool.JobQueue: + case job, ok := <-w.pool.jobQueue: if ok { runJob(w.id, job) } @@ -49,13 +49,22 @@ func runJob(id int64, f func()) { defer func() { if err := recover(); err != nil { if CacheDebug { - LogErrF("Pool [%d] Job panic err: %v\n", id, err) + LogErrF("Pool [%d] Job panic err: %v, stack: %v\n", id, err,string(outputStackErr())) } } }() f() } +func outputStackErr() []byte { + var ( + buf [4096]byte + ) + n := runtime.Stack(buf[:], false) + return buf[:n] +} + + func newWorker(id int64, pool *Pool) *worker { w := &worker{ id: id, @@ -69,8 +78,7 @@ func newWorker(id int64, pool *Pool) *worker { type Job func() type Pool struct { - jobTotal int64 - JobQueue chan Job + jobQueue chan Job workers []*worker stopOne sync.Once stopped chan struct{} @@ -85,7 +93,7 @@ type Pool struct { func NewPool(numWorkers int, jobQueueLen int) *Pool { pool := &Pool{ - JobQueue: make(chan Job, jobQueueLen), + jobQueue: make(chan Job, jobQueueLen), workers: make([]*worker, numWorkers), stopped: make(chan struct{}), } @@ -104,12 +112,7 @@ func NewPool(numWorkers int, jobQueueLen int) *Pool { } func (p *Pool) wrapJob(job func()) func() { - return func() { - defer func() { - atomic.AddInt64(&p.jobTotal, -1) - }() - job() - } + return job } func (p *Pool) SendJobWithTimeout(job func(), t time.Duration) bool { @@ -118,8 +121,7 @@ func (p *Pool) SendJobWithTimeout(job func(), t time.Duration) bool { return false case <-time.After(t): return false - case p.JobQueue <- p.wrapJob(job): - atomic.AddInt64(&p.jobTotal, 1) + case p.jobQueue <- p.wrapJob(job): return true } } @@ -134,16 +136,14 @@ func (p *Pool) SendJobWithDeadline(job func(), t time.Time) bool { return false case <-time.After(s): return false - case p.JobQueue <- p.wrapJob(job): - atomic.AddInt64(&p.jobTotal, 1) + case p.jobQueue <- p.wrapJob(job): return true } } func (p *Pool) SendJob(job func()) { select { - case p.JobQueue <- p.wrapJob(job): - atomic.AddInt64(&p.jobTotal, 1) + case p.jobQueue <- p.wrapJob(job): case <-p.stopped: return } @@ -157,7 +157,7 @@ func (p *Pool) monitor() { t.Stop() return case <-t.C: - LogDebug("Pool jobTotal current num ", atomic.LoadInt64(&p.jobTotal)) + LogDebug("Pool jobQueue current len ", len(p.jobQueue)) } } } @@ -187,7 +187,7 @@ func (p *Pool) release() { }) }) <-force - close(p.JobQueue) + close(p.jobQueue) } // Will release resources used by pool diff --git a/interface.go b/interface.go index a63d668f7c5e1f58004490a45840ddfee0eed8e9..fd9996b03107fa356d649c696aa365cd9067ab24 100644 --- a/interface.go +++ b/interface.go @@ -20,7 +20,7 @@ type OutCache interface { // only out storage pub sub type PubSub interface { - Subscribe(data chan *ChannelMeta) error + Subscribe(data chan<- *ChannelMeta) error Publish(gid, key string, action int8, data *Entry) error } diff --git a/local_freecache.go b/local_freecache.go index 472299c28f41dc8e0c927c8cf353aef033c42601..a4184c1f6fa689a1c89b4e922a999994d8a27036 100644 --- a/local_freecache.go +++ b/local_freecache.go @@ -2,7 +2,6 @@ package g2cache import ( "github.com/coocood/freecache" - jsoniter "github.com/json-iterator/go" "sync" ) @@ -30,7 +29,7 @@ func (c *FreeCache) Set(key string, e *Entry) error { return LocalStorageClose default: } - s, _ := jsoniter.Marshal(e) + s, _ := json.Marshal(e) // local storage should set Obsolete time obsolete := e.GetObsoleteTTL() return c.storage.Set([]byte(key), s, int(obsolete)) @@ -61,7 +60,7 @@ func (c *FreeCache) Get(key string, obj interface{}) (*Entry, bool, error) { } e := new(Entry) e.Value = obj // Save the reflection structure of obj - err = jsoniter.Unmarshal(b, e) + err = json.Unmarshal(b, e) if err != nil { return nil, false, err } diff --git a/out_redis.go b/out_redis.go index 2dd09a311a0d6e7c3e08cf929b7dcbbdbf90e51e..b629f0a8c92e79de6a89f36b93c4d755c6840094 100644 --- a/out_redis.go +++ b/out_redis.go @@ -1,23 +1,26 @@ package g2cache import ( + "fmt" "github.com/gomodule/redigo/redis" - jsoniter "github.com/json-iterator/go" "sync" ) var DefaultPubSubRedisChannel = "g2cache-pubsub-channel" var DefaultRedisConf RedisConf +var DefaultPubSubRedisConf RedisConf func init() { DefaultRedisConf.DSN = "127.0.0.1:6379" DefaultRedisConf.MaxConn = 10 + DefaultPubSubRedisConf = DefaultRedisConf } type RedisCache struct { - pool *redis.Pool - stop chan struct{} - stopOnce sync.Once + pool *redis.Pool + pubsubPool *redis.Pool + stop chan struct{} + stopOnce sync.Once } type RedisConf struct { @@ -27,14 +30,24 @@ type RedisConf struct { MaxConn int } -func NewRedisCache() (*RedisCache,error) { - pool,err := GetRedisPool(&DefaultRedisConf) +func NewRedisCache() (*RedisCache, error) { + pool, err := GetRedisPool(&DefaultRedisConf) if err != nil { - return nil,err + return nil, fmt.Errorf("redis pool init err %v", err) } + + var pubsubPool *redis.Pool + if OutCachePubSub { + pubsubPool, err = GetRedisPool(&DefaultPubSubRedisConf) + if err != nil { + return nil, fmt.Errorf("redis pubsubPool init err %v", err) + } + } + c := &RedisCache{ - pool: pool, - stop: make(chan struct{}, 1), + pool: pool, + pubsubPool: pubsubPool, + stop: make(chan struct{}, 1), } return c, nil } @@ -63,7 +76,7 @@ func (r *RedisCache) Set(key string, obj *Entry) error { return OutStorageClose default: } - str, err := jsoniter.MarshalToString(obj) + str, err := json.MarshalToString(obj) if err != nil { return err } @@ -76,20 +89,23 @@ func (r *RedisCache) DistributedEnable() bool { return true } -func (r *RedisCache) Subscribe(ch chan *ChannelMeta) error { +func (r *RedisCache) Subscribe(ch chan<- *ChannelMeta) error { select { case <-r.stop: return OutStorageClose default: } - conn := r.pool.Get() + conn := r.pubsubPool.Get() defer conn.Close() psc := redis.PubSubConn{Conn: conn} if err := psc.Subscribe(DefaultPubSubRedisChannel); err != nil { - LogErrF("rds subscribe key=%v, err=%v\n", DefaultPubSubRedisChannel, err) + LogErrF("rds subscribe channel=%v, err=%v\n", DefaultPubSubRedisChannel, err) return err } + if CacheDebug { + LogDebugF("rds subscribe channel=%v start ...\n", DefaultPubSubRedisChannel) + } LOOP: for { @@ -101,14 +117,19 @@ LOOP: switch v := psc.Receive().(type) { case redis.Message: meta := &ChannelMeta{} - err := jsoniter.Unmarshal(v.Data, meta) + err := json.Unmarshal(v.Data, meta) if err != nil || meta.Key == "" { - LogErrF("rds Subscribe Unmarshal data: %+v,err:%v",v.Data,err) + LogErrF("rds subscribe Unmarshal data: %+v,err:%v\n", v.Data, err) continue } + select { + case <-r.stop: + return OutStorageClose + default: + } ch <- meta case error: - LogErrF("rds receive error, msg=%v\n", v) + LogErrF("rds subscribe receive error, msg=%v\n", v) break LOOP } } @@ -122,7 +143,7 @@ func (r *RedisCache) Get(key string, obj interface{}) (*Entry, bool, error) { default: } str, err := RedisGetString(key, r.pool) - if err != nil { + if err != nil { if err == redis.ErrNil { return nil, false, nil } @@ -133,7 +154,7 @@ func (r *RedisCache) Get(key string, obj interface{}) (*Entry, bool, error) { } var e Entry e.Value = obj // Save the reflection structure of obj - err = jsoniter.UnmarshalFromString(str, &e) + err = json.UnmarshalFromString(str, &e) if err != nil { return nil, false, err } @@ -153,11 +174,11 @@ func (r *RedisCache) Publish(gid, key string, action int8, value *Entry) error { Action: action, Data: value, } - s, err := jsoniter.MarshalToString(meta) + s, err := json.MarshalToString(meta) if err != nil { return err } - return RedisPublish(DefaultPubSubRedisChannel, s, r.pool) + return RedisPublish(DefaultPubSubRedisChannel, s, r.pubsubPool) } func (r *RedisCache) ThreadSafe() {} diff --git a/redis_helper_test.go b/redis_helper_test.go index b076fbb21dbd9c7215a51b1361826cd7568aa736..06e298b1da0f6e6384dfe69e0d578311dcb136bb 100644 --- a/redis_helper_test.go +++ b/redis_helper_test.go @@ -9,8 +9,14 @@ func TestGetRedisPool(t *testing.T) { DefaultRedisConf.DSN = "127.0.0.1:6379" DefaultRedisConf.DB = 0 DefaultRedisConf.Pwd = "" - DefaultRedisConf.MaxConn = 2 - pool,err := GetRedisPool(&DefaultRedisConf) + DefaultRedisConf.MaxConn = 3 + pool, err := GetRedisPool(&DefaultRedisConf) + if err != nil { + t.Fatal(err) + return + } + DefaultPubSubRedisConf = DefaultRedisConf + pubsubPool, err := GetRedisPool(&DefaultPubSubRedisConf) if err != nil { t.Fatal(err) return @@ -25,5 +31,11 @@ func TestGetRedisPool(t *testing.T) { t.Fatal(err) return } - t.Log("v=", v) + t.Log("GET surprise=", v) + err = RedisPublish(DefaultPubSubRedisChannel, "set surprise g2cache", pubsubPool) + if err != nil { + t.Fatal(err) + return + } + t.Logf("channel %s publish ok", DefaultPubSubRedisChannel) }