diff --git a/examples/simple/main.go b/examples/simple/main.go index bf717171d2a4431520ca290b80e97c37f4b8b471..a90ee8e97320a414b796cb9193aded77830e7bb7 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -48,13 +48,15 @@ func main() { }() worker = crontab.NewWorker(crontab.EveryMinute, &crontabs.Example{}) - worker.SetRunOnStartup(true) + worker.SetRunOnStartup(true).SetGloballyUnique(true) if err = cron.Add(worker); err == nil { - if err = cron.Add( - crontab.NewWorker(crontab.EveryHour, &my{}). - SetRunOnStartup(true), - ); err == nil { + // err = cron.Add( + // crontab.NewWorker(crontab.EveryHour, &my{}). + // SetRunOnStartup(true), + // ); + + if err == nil { err = cron.Start(ctx) } } diff --git a/go.mod b/go.mod index 6e3226731b0134aa1403c86fcbcc4154f672900c..9ca84a1338ea639bed616aa74c6552ce70ce2600 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( gitee.com/go-libs/process v1.0.1 + gitee.com/go-libs/redis v1.0.0 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 ) @@ -13,13 +14,17 @@ require ( ) require ( + gitee.com/go-libs/config v1.0.1 // indirect github.com/IBM/sarama v1.43.1 // indirect github.com/andybalholm/brotli v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect + github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/go.sum b/go.sum index 544c980d0d3275e92d0ec0aaab5755920561e333..ec636037614ef90cd15eefac92115bfea34c22bc 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,22 @@ +gitee.com/go-libs/config v1.0.1 h1:9q+pfk7g9xX/t87foZ81inNcyysDLCBqhbX78mh76vA= +gitee.com/go-libs/config v1.0.1/go.mod h1:azzGgnpQ4cTP7EXl7uctzfZ0L+y+v7JGYf65qfSwsXs= gitee.com/go-libs/process v1.0.1 h1:vk4TdCxRRgTBxNkZTanoYaxTOOQtWR8Mo8NQ8gw9q1Y= gitee.com/go-libs/process v1.0.1/go.mod h1:oQ5Q2fNJr8PvDRqW6axCIWJAmt6d7k9bYWpF+lFBOgc= +gitee.com/go-libs/redis v1.0.0 h1:lFlXsi4qSBRx77xxRc+XdZ+C+LfjIV+h1PhSYk1Y9WI= +gitee.com/go-libs/redis v1.0.0/go.mod h1:Lk6cLLppHPoFOvfTc7wxhFWW6lMqQUuKaFJzUu/Vx/8= gitee.com/go-wares/log v0.1.10 h1:DGvvH6DOF8YXLcBTqABGGhkrwe+OhmD3sV9W6NHcB4E= gitee.com/go-wares/log v0.1.10/go.mod h1:HDs8rnqgyYDGzrUM+SNJx6/R1MK7JpKG5jqAY1HIW/o= github.com/IBM/sarama v1.43.1 h1:Z5uz65Px7f4DhI/jQqEm/tV9t8aU+JUdTyW/K/fCXpA= github.com/IBM/sarama v1.43.1/go.mod h1:GG5q1RURtDNPz8xxJs3mgX6Ytak8Z9eLhAkJPObe2xE= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= @@ -18,6 +26,9 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -49,6 +60,9 @@ github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLA github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -92,6 +106,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -99,13 +114,16 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/worker.go b/src/worker.go index fc977e5276ed609cc2c8eb4578a86b641f3d1add..54040aef4003cbd637ab3ca4e898d4d6685b1775 100644 --- a/src/worker.go +++ b/src/worker.go @@ -18,6 +18,7 @@ package src import ( "context" "fmt" + "gitee.com/go-libs/redis" "gitee.com/go-wares/log" "github.com/gorhill/cronexpr" "reflect" @@ -151,13 +152,6 @@ func (o *Worker) run(ctx context.Context, tm time.Time) (do bool, err error) { } } - // Unique on cluster checker, Return false if job configured as globally-unique - // and previous scheduling in any node not completed. It's dependent on redis - // lock. - if o.globallyUnique { - // todo: distributed lock with redis. - } - // End access check. do = true return @@ -166,7 +160,10 @@ func (o *Worker) run(ctx context.Context, tm time.Time) (do bool, err error) { // Schedule // job handlers. func (o *Worker) schedule(ctx context.Context, tm time.Time) (err error) { - var span log.Span + var ( + locker redis.Locker + span log.Span + ) // Set // last schedule state when scheduler begin. @@ -196,10 +193,39 @@ func (o *Worker) schedule(ctx context.Context, tm time.Time) (err error) { // Close span. span.End() + // Release distributed lock resource. + if locker != nil { + locker.Release() + } + // Reset statistic. atomic.AddInt32(&o.runProcessing, -1) }() + // Unique on cluster checker, Return false if job configured as globally-unique + // and previous scheduling in any node not completed. It's dependent on redis + // lock. + if o.globallyUnique { + span.Info(`apply distributed lock resource`) + + // Apply + // a distributed lock resource. + if locker, err = redis.Lock(span.Context(), o.class); err != nil { + return + } + + // Locked + // by other goroutine. + if locker == nil { + span.Info(`distributed lock resource applied by other goroutine`) + return + } + + // Enable + // distributed lock resource renew. + locker.Renew() + } + // Before // handler called. if v, ok := o.job.(JobBefore); ok {