From 38e3c6b78e1db069ca8c54065c36bad03710e174 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 10 Jul 2024 13:46:04 +0800 Subject: [PATCH 01/12] init --- doc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc.go b/doc.go index 67271fb..b977c49 100644 --- a/doc.go +++ b/doc.go @@ -9,6 +9,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-10 // Package crontab // provides a ticker manager like linux crontab. -- Gitee From bdaa9e2ce6110d77b0f81b559b020df04362ddbe Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Tue, 16 Jul 2024 15:58:34 +0800 Subject: [PATCH 02/12] init --- go.sum | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 go.sum diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..544c980 --- /dev/null +++ b/go.sum @@ -0,0 +1,111 @@ +gitee.com/go-libs/process v1.0.1 h1:vk4TdCxRRgTBxNkZTanoYaxTOOQtWR8Mo8NQ8gw9q1Y= +gitee.com/go-libs/process v1.0.1/go.mod h1:oQ5Q2fNJr8PvDRqW6axCIWJAmt6d7k9bYWpF+lFBOgc= +gitee.com/go-wares/log v0.1.10 h1:DGvvH6DOF8YXLcBTqABGGhkrwe+OhmD3sV9W6NHcB4E= +gitee.com/go-wares/log v0.1.10/go.mod h1:HDs8rnqgyYDGzrUM+SNJx6/R1MK7JpKG5jqAY1HIW/o= +github.com/IBM/sarama v1.43.1 h1:Z5uz65Px7f4DhI/jQqEm/tV9t8aU+JUdTyW/K/fCXpA= +github.com/IBM/sarama v1.43.1/go.mod h1:GG5q1RURtDNPz8xxJs3mgX6Ytak8Z9eLhAkJPObe2xE= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= +github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY= +github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.52.0 h1:wqBQpxH71XW0e2g+Og4dzQM8pk34aFYlA1Ga8db7gU0= +github.com/valyala/fasthttp v1.52.0/go.mod h1:hf5C4QnVMkNXMspnsUlfM3WitlgYflyhHYoKol/szxQ= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -- Gitee From 998f4a73c2e463d5168f031a8d7a0f945c0e40bf Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Tue, 16 Jul 2024 15:58:37 +0800 Subject: [PATCH 03/12] init --- examples/simple/config/app.yaml | 4 + examples/simple/config/log.yaml | 10 + examples/simple/crontabs/example.go | 45 +++++ examples/simple/main.go | 61 ++++++ go.mod | 37 ++++ init.go | 46 +++++ src/cron.go | 198 +++++++++++++++++++ src/job.go | 60 ++++++ src/timeline.go | 41 ++++ src/worker.go | 289 ++++++++++++++++++++++++++++ 10 files changed, 791 insertions(+) create mode 100644 examples/simple/config/app.yaml create mode 100644 examples/simple/config/log.yaml create mode 100644 examples/simple/crontabs/example.go create mode 100644 examples/simple/main.go create mode 100644 init.go create mode 100644 src/cron.go create mode 100644 src/job.go create mode 100644 src/timeline.go create mode 100644 src/worker.go diff --git a/examples/simple/config/app.yaml b/examples/simple/config/app.yaml new file mode 100644 index 0000000..f6e3c28 --- /dev/null +++ b/examples/simple/config/app.yaml @@ -0,0 +1,4 @@ +name: "example" +version: "1.0.0" +host: "" +port: 8080 diff --git a/examples/simple/config/log.yaml b/examples/simple/config/log.yaml new file mode 100644 index 0000000..d330693 --- /dev/null +++ b/examples/simple/config/log.yaml @@ -0,0 +1,10 @@ +level: debug +time-format: "2006-01-02T15:04:05.999999999Z07:00" +adapter: term +term-adapter: + color: true +tracer: "" +jaeger-tracer: + endpoint: http://localhost:14268/api/traces + username: + password: diff --git a/examples/simple/crontabs/example.go b/examples/simple/crontabs/example.go new file mode 100644 index 0000000..c9aa052 --- /dev/null +++ b/examples/simple/crontabs/example.go @@ -0,0 +1,45 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-15 + +package crontabs + +import ( + "context" + "gitee.com/go-wares/log" +) + +type Example struct{} + +func (o *Example) Do(ctx context.Context) (err error) { + log.Infofc(ctx, `job.Do()`) + return +} + +func (o *Example) OnBefore(ctx context.Context) (err error) { + log.Infofc(ctx, `job.OnBefore()`) + return +} + +func (o *Example) OnFailed(ctx context.Context, err error) { + log.Infofc(ctx, `job.OnFailed() ~ %v`, err) +} + +func (o *Example) OnFinish(ctx context.Context) { + log.Infofc(ctx, `job.OnFinish()`) +} + +func (o *Example) OnSucceed(ctx context.Context) { + log.Infofc(ctx, `job.OnSucceed()`) +} diff --git a/examples/simple/main.go b/examples/simple/main.go new file mode 100644 index 0000000..0452aa9 --- /dev/null +++ b/examples/simple/main.go @@ -0,0 +1,61 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-13 + +package main + +import ( + "context" + "gitee.com/go-libs/crontab" + "gitee.com/go-libs/crontab/examples/simple/crontabs" + "gitee.com/go-wares/log" + "time" +) + +type my struct { +} + +func (o *my) Do(ctx context.Context) (err error) { return nil } + +func main() { + var ( + cron = crontab.New("main") + ctx, cancel = context.WithCancel(context.Background()) + err error + worker *crontab.Worker + ) + + defer log.Close() + + go func() { + time.Sleep(time.Second * 5) + if err != nil { + return + } + + cancel() + }() + + worker = crontab.NewWorker(crontab.TimelineEveryMinute, &crontabs.Example{}) + worker.SetRunOnStartup(true) + + if err = cron.Add(worker); err == nil { + if err = cron.Add( + crontab.NewWorker(crontab.TimelineEveryHour, &my{}). + SetRunOnStartup(true), + ); err == nil { + err = cron.Start(ctx) + } + } +} diff --git a/go.mod b/go.mod index dcafba9..6e32267 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,40 @@ module gitee.com/go-libs/crontab go 1.18 + +require ( + gitee.com/go-libs/process v1.0.1 + github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 +) + +require ( + gitee.com/go-wares/log v0.1.10 + github.com/google/uuid v1.6.0 // indirect +) + +require ( + github.com/IBM/sarama v1.43.1 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.6.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.17.7 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.52.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.22.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/init.go b/init.go new file mode 100644 index 0000000..6807df2 --- /dev/null +++ b/init.go @@ -0,0 +1,46 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-13 + +package crontab + +import "gitee.com/go-libs/crontab/src" + +// Export src types. + +type ( + Cron = src.Cron + Job = src.Job + JobBefore = src.JobBefore + JobFailed = src.JobFailed + JobFinish = src.JobFinish + JobSucceed = src.JobSucceed + Timeline = src.Timeline + Worker = src.Worker +) + +// Export constants. + +const ( + TimelineEverySecond = src.TimelineEverySecond + TimelineEveryMinute = src.TimelineEveryMinute + TimelineEveryHour = src.TimelineEveryHour +) + +// Export src functions. + +var ( + New = src.New + NewWorker = src.NewWorker +) diff --git a/src/cron.go b/src/cron.go new file mode 100644 index 0000000..b333000 --- /dev/null +++ b/src/cron.go @@ -0,0 +1,198 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-15 + +package src + +import ( + "context" + "fmt" + "gitee.com/go-libs/process" + "gitee.com/go-wares/log" + "sync" + "sync/atomic" + "time" +) + +type ( + // Cron + // is a scheduler for job schedules. + Cron interface { + // Add + // a worker list into scheduler. + Add(worker *Worker) (err error) + + // Start + // starts a scheduler as service. + Start(ctx context.Context) (err error) + + // Stop + // stops a scheduler service. + Stop() + } + + cron struct { + executing int32 + mapper map[string]*Worker + mu *sync.RWMutex + name string + process process.Process + } +) + +// New +// creates a new cron instance. +func New(name string) Cron { + o := &cron{mu: &sync.RWMutex{}, mapper: make(map[string]*Worker), name: name} + o.process = process.New(o.name, o) + return o +} + +// Add +// a worker list into scheduler. +func (o *cron) Add(worker *Worker) (err error) { + o.mu.Lock() + defer o.mu.Unlock() + + // Return + // if passed worker is nil. + if worker == nil { + return + } + + // Parse + // worker timed execution time. + if worker.expression, err = worker.timeline.Parse(); err != nil { + return + } + + // Ignore if worker added into mapper already. + if _, ok := o.mapper[worker.class]; ok { + err = fmt.Errorf(`worker added already: %s`, worker.class) + return + } + + // Bind + // relation fields then update mapper. + worker.cron = o + o.mapper[worker.class] = worker + return +} + +// Start +// starts a scheduler as service. +func (o *cron) Start(ctx context.Context) (err error) { return o.process.Start(ctx) } + +// Stop +// stops a scheduler service. +func (o *cron) Stop() { o.process.Stop() } + +// +---------------------------------------------------------------------------+ +// | Implemented on process | +// +---------------------------------------------------------------------------+ + +// After +// is a handler for process. +func (o *cron) After(_ context.Context) (err error) { + for { + if atomic.LoadInt32(&o.executing) == 0 { + break + } + time.Sleep(time.Millisecond * 10) + } + + if log.Config().DebugOn() { + log.Debugf(`[crontab=%s] cleanup crontab component`, o.name) + } + return +} + +// Run +// is a handler for process, It's called by process. Do not call it directly. +func (o *cron) Run(ctx context.Context) (err error) { + ticker := time.NewTicker(time.Second) + + // Iterate channel messages. + for { + select { + case tm := <-ticker.C: + // Call scheduler every second. + go o.schedule(ctx, tm) + + case <-ctx.Done(): + // Context cancelled. + ticker.Stop() + return + } + } +} + +func (o *cron) decrement() { + atomic.AddInt32(&o.executing, -1) +} + +func (o *cron) increment() { + atomic.AddInt32(&o.executing, 1) +} + +// Schedule +// schedules the crontab workers. +func (o *cron) schedule(ctx context.Context, tm time.Time) { + var list = make([]*Worker, 0) + + if log.Config().DebugOn() { + log.Debugf(`[crontab=%s] schedule at %s`, o.name, tm) + } + + // Iterate added workers. + for _, worker := range func() map[string]*Worker { + o.mu.RLock() + defer o.mu.RUnlock() + return o.mapper + }() { + if worker.Match(tm) { + list = append(list, worker) + } + } + + // Do nothing + // if no workers matched. + if len(list) == 0 { + return + } + + // Prepare + // to schedule matched workers. + wait := new(sync.WaitGroup) + span := log.NewSpanWithContext(ctx, `crontab`) + span.SetAttr(`crontab.scheduler.name`, o.name) + span.SetAttr(`crontab.scheduler.time`, tm.Format(time.RFC3339)) + + // Cleanup + // when scheduler done. + defer func() { + wait.Wait() + span.End() + }() + + // Iterate + // matched workers. + for _, x := range list { + wait.Add(1) + go func(worker *Worker) { + defer wait.Done() + worker.Do(span.Context(), tm) + }(x) + } +} diff --git a/src/job.go b/src/job.go new file mode 100644 index 0000000..e2ddb53 --- /dev/null +++ b/src/job.go @@ -0,0 +1,60 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-15 + +package src + +import ( + "context" +) + +type ( + // Job + // is an interface used to define a handler for a job. + // + // Step: + // - Before + // - Do + // - Failed/Succeed + // - Finish + Job interface { + // Do + // job handler when system time matches policy time. + Do(ctx context.Context) (err error) + } + + // JobBefore + // is an interface used to define a before handler for AOP. + JobBefore interface { + OnBefore(ctx context.Context) (err error) + } + + // JobFailed + // is an interface used to define a failed handler for AOP. + JobFailed interface { + OnFailed(ctx context.Context, err error) + } + + // JobFinish + // is an interface used to define a finish handler for AOP. + JobFinish interface { + OnFinish(ctx context.Context) + } + + // JobSucceed + // is an interface used to define a succeed handler for AOP. + JobSucceed interface { + OnSucceed(ctx context.Context) + } +) diff --git a/src/timeline.go b/src/timeline.go new file mode 100644 index 0000000..84276a0 --- /dev/null +++ b/src/timeline.go @@ -0,0 +1,41 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-15 + +package src + +import "github.com/gorhill/cronexpr" + +// Timeline +// is a type name used for timed execution time. +type Timeline string + +const ( + TimelineEverySecond Timeline = "*/1 * * * * * *" + + TimelineEveryMinute Timeline = "0 */1 * * * * *" + TimelineEveryHour Timeline = "0 0 */1 * * * *" +) + +// Parse +// parses the timed execution time. +func (o Timeline) Parse() (*cronexpr.Expression, error) { + return cronexpr.Parse(o.String()) +} + +// String +// returns a formatted string for timed execution time. +func (o Timeline) String() string { + return string(o) +} diff --git a/src/worker.go b/src/worker.go new file mode 100644 index 0000000..bf72733 --- /dev/null +++ b/src/worker.go @@ -0,0 +1,289 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-15 + +package src + +import ( + "context" + "fmt" + "gitee.com/go-wares/log" + "github.com/gorhill/cronexpr" + "reflect" + "sync/atomic" + "time" +) + +const ( + // DefaultNodeUnique + // when enabled, only one job is allowed to be in execution per node. + DefaultNodeUnique = true + + // DefaultGloballyUnique + // when enabled, only one job is allowed to be in execution per cluster. It's work dependent + // on redis. + DefaultGloballyUnique = false + + // DefaultRunOnStartup + // when enabled, crontab is immediately scheduled upon startup. + DefaultRunOnStartup = false +) + +// Worker +// is a component of crontab. +type Worker struct { + class string + cron *cron + expression *cronexpr.Expression + job Job + timeline Timeline + + runCount int32 + runTime time.Time + runTotal uint64 + + nodeUnique, globallyUnique bool + runOnStartup bool +} + +// NewWorker +// creates a worker for crontab. +func NewWorker(timeline Timeline, job Job) *Worker { + ref := reflect.TypeOf(job) + worker := &Worker{timeline: timeline, job: job} + + if ref.Kind() == reflect.Pointer { + elem := ref.Elem() + worker.class = fmt.Sprintf(`%s.%s`, elem.PkgPath(), elem.Name()) + } else { + worker.class = fmt.Sprintf(`%s.%s`, ref.PkgPath(), ref.Name()) + } + + worker.nodeUnique = DefaultNodeUnique + worker.globallyUnique = DefaultGloballyUnique + worker.runOnStartup = DefaultRunOnStartup + return worker +} + +// Do +// crontab worker. +func (o *Worker) Do(ctx context.Context, tm time.Time) { + // Set last scheduled time. + o.runTime = tm + o.cron.increment() + + // Statistics increment. + atomic.AddUint64(&o.runTotal, 1) + atomic.AddInt32(&o.runCount, 1) + + // Create span based on parent. + span := log.NewSpanWithContext(ctx, o.class) + span.SetAttr(`crontab.scheduled.class`, o.class) + span.SetAttr(`crontab.scheduled.count`, o.runTotal) + span.SetAttr(`crontab.scheduled.time`, tm) + span.SetAttr(`crontab.scheduled.time.next`, o.expression.Next(tm)) + span.SetAttr(`crontab.scheduled.timeline`, o.timeline) + + // Called when done. + defer func() { + atomic.AddInt32(&o.runCount, -1) + span.End() + o.cron.decrement() + }() + + // Before handler. + if v, ok := o.job.(JobBefore); ok { + if o.doBefore(span.Context(), v.OnBefore) != nil { + return + } + } + + // Finish handler. + defer func() { + if v, ok := o.job.(JobFinish); ok { + o.doFinish(span.Context(), v.OnFinish) + } + }() + + // Job handler. + if err := o.do(span.Context(), o.job.Do); err != nil { + // Failed handler. + if v, ok := o.job.(JobFailed); ok { + o.doFailed(span.Context(), err, v.OnFailed) + } + } else { + // Succeed handler. + if v, ok := o.job.(JobSucceed); ok { + o.doSucceed(span.Context(), v.OnSucceed) + } + } +} + +func (o *Worker) NodeUnique() bool { + return o.nodeUnique +} + +func (o *Worker) GloballyUnique() bool { + return o.globallyUnique +} + +func (o *Worker) RunOnStartup() bool { + return o.runOnStartup +} + +// Match +// matches the system time with worker timeline, Return true if match succeed. +func (o *Worker) Match(tm time.Time) bool { + // Return true + // if worker scheduled for first time and run on startup is enabled. + if o.runOnStartup && atomic.LoadUint64(&o.runTotal) == 0 { + return true + } + + // Return false + // if system time not matched on timed execution time. + if o.expression.Next(tm.Add(-1*time.Second)).Unix() != tm.Unix() { + return false + } + + // Return false + // if worker scheduling by other goroutine in the same node and node unique + // enabled. + if o.nodeUnique && atomic.LoadInt32(&o.runCount) > 0 { + return false + } + + // Return false + // if worker scheduling by other goroutine in the same node order other node + // goroutine. + if o.globallyUnique { + } + + // Allow schedule. + return true +} + +func (o *Worker) SetNodeUnique(v bool) *Worker { + o.nodeUnique = v + return o +} + +func (o *Worker) SetGloballyUnique(v bool) *Worker { + o.globallyUnique = v + return o +} + +func (o *Worker) SetRunOnStartup(v bool) *Worker { + o.runOnStartup = v + return o +} + +// +---------------------------------------------------------------------------+ +// | Access methods | +// +---------------------------------------------------------------------------+ + +func (o *Worker) do(ctx context.Context, handler func(context.Context) error) (err error) { + // Span created. + span := log.NewSpanWithContext(ctx, `on:do`) + + // Call when handler done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + err = fmt.Errorf(`%v`, r) + span.Fatal(`fatal occurred on handler: %v`, err) + } + // Close span. + span.End() + }() + + // Call job handler. + err = handler(span.Context()) + return +} + +func (o *Worker) doBefore(ctx context.Context, handler func(ctx context.Context) error) (err error) { + // Span created. + span := log.NewSpanWithContext(ctx, `on:before`) + + // Call when handler done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + err = fmt.Errorf(`%v`, r) + span.Fatal(`fatal occurred on before handler: %v`, err) + } + // Close span. + span.End() + }() + + // Call job handler. + err = handler(span.Context()) + return +} + +func (o *Worker) doFailed(ctx context.Context, err error, handler func(context.Context, error)) { + // Span created. + span := log.NewSpanWithContext(ctx, `on:failed`) + + // Call when handler done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + err = fmt.Errorf(`%v`, r) + span.Fatal(`fatal occurred on failed handler: %v`, err) + } + // Close span. + span.End() + }() + + // Call job handler. + handler(span.Context(), err) +} + +func (o *Worker) doFinish(ctx context.Context, handler func(context.Context)) { + // Span created. + span := log.NewSpanWithContext(ctx, `on:finish`) + + // Call when handler done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal occurred on finish handler: %v`, r) + } + // Close span. + span.End() + }() + + // Call job handler. + handler(span.Context()) +} + +func (o *Worker) doSucceed(ctx context.Context, handler func(context.Context)) { + // Span created. + span := log.NewSpanWithContext(ctx, `on:succeed`) + + // Call when handler done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal occurred on succeed handler: %v`, r) + } + // Close span. + span.End() + }() + + // Call job handler. + handler(span.Context()) +} -- Gitee From 47913a4510d04777d0a0b076b0f1065f3914aa55 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Tue, 16 Jul 2024 15:59:14 +0800 Subject: [PATCH 04/12] init --- examples/simple/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple/main.go b/examples/simple/main.go index 0452aa9..b7b31c4 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -26,7 +26,7 @@ import ( type my struct { } -func (o *my) Do(ctx context.Context) (err error) { return nil } +func (o *my) Do(_ context.Context) (err error) { return nil } func main() { var ( -- Gitee From a551a586e446538bdfb28ed5536c8d1a5afd4263 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Tue, 16 Jul 2024 16:08:40 +0800 Subject: [PATCH 05/12] init --- README.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/README.md b/README.md index 62b13b8..9beb7a2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,42 @@ # Crontab A processor run in goroutine like linux crontab. + +## Timed execution crontab + +| Field name | Mandatory? | Allowed values | Allowed special characters | +|--------------|------------|-----------------|----------------------------| +| Seconds | No | 0-59 | * / , - | +| Minutes | Yes | 0-59 | * / , - | +| Hours | Yes | 0-23 | * / , - | +| Day of month | Yes | 1-31 | * / , - L W | +| Month | Yes | 1-12 or JAN-DEC | * / , - | +| Day of week | Yes | 0-6 or SUN-SAT | * / , - L # | +| Year | No | 1970–2099 | * / , - | + +#### Asterisk ( * ) +The asterisk indicates that the cron expression matches for all values of the field. E.g., using an asterisk in the 4th field (month) indicates every month. + +#### Slash ( / ) +Slashes describe increments of ranges. For example `3-59/15` in the minute field indicate the third minute of the hour and every 15 minutes thereafter. The form `*/...` is equivalent to the form "first-last/...", that is, an increment over the largest possible range of the field. + +#### Comma ( , ) +Commas are used to separate items of a list. For example, using `MON,WED,FRI` in the 5th field (day of week) means Mondays, Wednesdays and Fridays. + +#### Hyphen ( - ) +Hyphens define ranges. For example, 2000-2010 indicates every year between 2000 and 2010 AD, inclusive. + +#### L +`L` stands for "last". When used in the day-of-week field, it allows you to specify constructs such as "the last Friday" (`5L`) of a given month. In the day-of-month field, it specifies the last day of the month. + +#### W +The `W` character is allowed for the day-of-month field. This character is used to specify the business day (Monday-Friday) nearest the given day. As an example, if you were to specify `15W` as the value for the day-of-month field, the meaning is: "the nearest business day to the 15th of the month." + +So, if the 15th is a Saturday, the trigger fires on Friday the 14th. If the 15th is a Sunday, the trigger fires on Monday the 16th. If the 15th is a Tuesday, then it fires on Tuesday the 15th. However if you specify `1W` as the value for day-of-month, and the 1st is a Saturday, the trigger fires on Monday the 3rd, as it does not 'jump' over the boundary of a month's days. + +The `W` character can be specified only when the day-of-month is a single day, not a range or list of days. + +The `W` character can also be combined with `L`, i.e. `LW` to mean "the last business day of the month." + +#### Hash ( # ) +`#` is allowed for the day-of-week field, and must be followed by a number between one and five. It allows you to specify constructs such as "the second Friday" of a given month. -- Gitee From 97de57759de26af0cf77323985f6ff4872e97d7f Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 13:54:00 +0800 Subject: [PATCH 06/12] init --- src/cron.go | 188 ++--------------------------- src/crontab.go | 219 ++++++++++++++++++++++++++++++++++ src/crontab_test.go | 60 ++++++++++ src/errors.go | 34 ++++++ src/job.go | 58 ++++++--- src/strategy.go | 48 ++++++++ src/worker.go | 284 ++++++++------------------------------------ src/worker_test.go | 38 ++++++ 8 files changed, 498 insertions(+), 431 deletions(-) create mode 100644 src/crontab.go create mode 100644 src/crontab_test.go create mode 100644 src/errors.go create mode 100644 src/strategy.go create mode 100644 src/worker_test.go diff --git a/src/cron.go b/src/cron.go index b333000..7242dc0 100644 --- a/src/cron.go +++ b/src/cron.go @@ -11,188 +11,14 @@ // limitations under the License. // // Author: wsfuyibing <682805@qq.com> -// Date: 2024-07-15 +// Date: 2024-07-16 package src -import ( - "context" - "fmt" - "gitee.com/go-libs/process" - "gitee.com/go-wares/log" - "sync" - "sync/atomic" - "time" -) - -type ( - // Cron - // is a scheduler for job schedules. - Cron interface { - // Add - // a worker list into scheduler. - Add(worker *Worker) (err error) - - // Start - // starts a scheduler as service. - Start(ctx context.Context) (err error) - - // Stop - // stops a scheduler service. - Stop() - } - - cron struct { - executing int32 - mapper map[string]*Worker - mu *sync.RWMutex - name string - process process.Process - } -) - -// New -// creates a new cron instance. -func New(name string) Cron { - o := &cron{mu: &sync.RWMutex{}, mapper: make(map[string]*Worker), name: name} - o.process = process.New(o.name, o) - return o -} - -// Add -// a worker list into scheduler. -func (o *cron) Add(worker *Worker) (err error) { - o.mu.Lock() - defer o.mu.Unlock() - - // Return - // if passed worker is nil. - if worker == nil { - return - } - - // Parse - // worker timed execution time. - if worker.expression, err = worker.timeline.Parse(); err != nil { - return - } - - // Ignore if worker added into mapper already. - if _, ok := o.mapper[worker.class]; ok { - err = fmt.Errorf(`worker added already: %s`, worker.class) - return - } - - // Bind - // relation fields then update mapper. - worker.cron = o - o.mapper[worker.class] = worker - return -} - -// Start -// starts a scheduler as service. -func (o *cron) Start(ctx context.Context) (err error) { return o.process.Start(ctx) } - -// Stop -// stops a scheduler service. -func (o *cron) Stop() { o.process.Stop() } - -// +---------------------------------------------------------------------------+ -// | Implemented on process | -// +---------------------------------------------------------------------------+ - -// After -// is a handler for process. -func (o *cron) After(_ context.Context) (err error) { - for { - if atomic.LoadInt32(&o.executing) == 0 { - break - } - time.Sleep(time.Millisecond * 10) - } - - if log.Config().DebugOn() { - log.Debugf(`[crontab=%s] cleanup crontab component`, o.name) - } - return -} - -// Run -// is a handler for process, It's called by process. Do not call it directly. -func (o *cron) Run(ctx context.Context) (err error) { - ticker := time.NewTicker(time.Second) - - // Iterate channel messages. - for { - select { - case tm := <-ticker.C: - // Call scheduler every second. - go o.schedule(ctx, tm) - - case <-ctx.Done(): - // Context cancelled. - ticker.Stop() - return - } - } -} - -func (o *cron) decrement() { - atomic.AddInt32(&o.executing, -1) -} - -func (o *cron) increment() { - atomic.AddInt32(&o.executing, 1) -} - -// Schedule -// schedules the crontab workers. -func (o *cron) schedule(ctx context.Context, tm time.Time) { - var list = make([]*Worker, 0) - - if log.Config().DebugOn() { - log.Debugf(`[crontab=%s] schedule at %s`, o.name, tm) - } - - // Iterate added workers. - for _, worker := range func() map[string]*Worker { - o.mu.RLock() - defer o.mu.RUnlock() - return o.mapper - }() { - if worker.Match(tm) { - list = append(list, worker) - } - } - - // Do nothing - // if no workers matched. - if len(list) == 0 { - return - } - - // Prepare - // to schedule matched workers. - wait := new(sync.WaitGroup) - span := log.NewSpanWithContext(ctx, `crontab`) - span.SetAttr(`crontab.scheduler.name`, o.name) - span.SetAttr(`crontab.scheduler.time`, tm.Format(time.RFC3339)) - - // Cleanup - // when scheduler done. - defer func() { - wait.Wait() - span.End() - }() - - // Iterate - // matched workers. - for _, x := range list { - wait.Add(1) - go func(worker *Worker) { - defer wait.Done() - worker.Do(span.Context(), tm) - }(x) - } +// Cron +// is an interface of Crontab. +type Cron interface { + // Add + // a worker into crontab. + Add(worker *Worker) error } diff --git a/src/crontab.go b/src/crontab.go new file mode 100644 index 0000000..f6a11eb --- /dev/null +++ b/src/crontab.go @@ -0,0 +1,219 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-16 + +package src + +import ( + "context" + "gitee.com/go-libs/process" + "gitee.com/go-wares/log" + "sort" + "sync" + "time" +) + +// Crontab +// is a scheduler for crontab. +type Crontab struct { + mu *sync.RWMutex + process process.Process + name string + workers map[string]*Worker +} + +// NewCrontab +// creates a Crontab instance. +func NewCrontab(name string) *Crontab { + o := &Crontab{mu: new(sync.RWMutex), name: name} + o.process = process.New(o.name, o) + o.workers = make(map[string]*Worker) + return o +} + +// Add +// a worker into crontab. +func (o *Crontab) Add(worker *Worker) error { + o.mu.Lock() + defer o.mu.Unlock() + + // Return error + // if passed worker is nil. + if worker == nil { + return Error{ + Type: "crontab", + Message: "worker can not be nil", + } + } + + // Return error + // if job is nil or not allowed. + if worker.job == nil { + return Error{ + Type: "worker", + Message: "job for worker not allowed", + } + } + + // Return error + // if timed execution time not valid. + var err error + if worker.expression, err = worker.strategy.Parse(); err != nil { + return Error{ + Type: "strategy", + Message: "invalid strategy", + Value: worker.strategy.String(), + } + } + + // Return error + // if worker added already. + if _, ok := o.workers[worker.class]; ok { + return Error{ + Type: "crontab", + Message: "worker added already", + Value: worker.class, + } + } + + // Update + // workers mapper. + o.workers[worker.class] = worker + return nil +} + +// Del +// deletes a worker from crontab. +func (o *Crontab) Del(worker *Worker) { + if worker != nil { + o.DelByClass(worker.class) + } +} + +// DelByClass +// deletes a worker from crontab by class name. +func (o *Crontab) DelByClass(class string) { + o.mu.Lock() + defer o.mu.Unlock() + + if _, ok := o.workers[class]; ok { + delete(o.workers, class) + } +} + +// GetName +// returns the crontab name. +func (o *Crontab) GetName() string { return o.name } + +func (o *Crontab) Start(ctx context.Context) (err error) { + return o.process.Start(ctx) +} + +func (o *Crontab) Stop() { + o.process.Stop() +} + +// +---------------------------------------------------------------------------+ +// | Implemented on process | +// +---------------------------------------------------------------------------+ + +// Run +// crontab service. It's called by process, do not call it directly. +func (o *Crontab) Run(ctx context.Context) (err error) { + // Creates + // a ticker to schedule. + tick := time.NewTicker(time.Second) + + // Range + // ticker and context channel. + for { + select { + case tm := <-tick.C: + go o.schedule(ctx, tm) + case <-ctx.Done(): + tick.Stop() + tick = nil + return + } + } +} + +// +---------------------------------------------------------------------------+ +// | Access methods | +// +---------------------------------------------------------------------------+ + +func (o *Crontab) schedule(ctx context.Context, tm time.Time) { + var ( + count, total, failed, succeed int + list = make([]string, 0) + mapper = make(map[string]*Worker) + span log.Span + wait *sync.WaitGroup + ) + + // Iterate added workers in crontab. + for _, worker := range func() map[string]*Worker { + o.mu.RLock() + defer o.mu.RUnlock() + return o.workers + }() { + total++ + if worker.Access(tm) { + list = append(list, worker.class) + mapper[worker.class] = worker + } + } + + // Do nothing + // if no workers accessed. + if count = len(list); count == 0 { + return + } + + // Prepare + // scheduler span and cleanup when done. + span = log.NewSpanWithContext(ctx, "crontab.scheduler") + span.Info(`scheduler begin: count="%d", total="%d"`, count, total) + wait = new(sync.WaitGroup) + + defer func() { + wait.Wait() + span.Info(`scheduler finish: failed="%d", succeed="%d"`, failed, succeed) + span.End() + }() + + sort.Strings(list) + + span.SetAttr("crontab.scheduler.count", count) + span.SetAttr("crontab.scheduler.total", total) + span.SetAttr("crontab.scheduler.name", o.name) + span.SetAttr("crontab.scheduler.jobs", list) + span.SetAttr("crontab.scheduler.time", tm.Format("2006-01-02 15:04:05 Z07:00")) + + // Iterate + // access workers. + for _, class := range list { + if v, ok := mapper[class]; ok { + wait.Add(1) + go func(worker *Worker) { + defer wait.Done() + if worker.run(span.Context(), tm) == nil { + succeed++ + } else { + failed++ + } + }(v) + } + } +} diff --git a/src/crontab_test.go b/src/crontab_test.go new file mode 100644 index 0000000..c824394 --- /dev/null +++ b/src/crontab_test.go @@ -0,0 +1,60 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-16 + +package src + +import ( + "context" + "testing" + "time" +) + +func TestNewCrontab(t *testing.T) { + ExampleNewCrontab() +} + +func ExampleNewCrontab() { + crontab := NewCrontab("example") + println("crontab:", crontab.name) + // Output: + // crontab: example +} + +func TestCrontab_Start(t *testing.T) { + ExampleCrontab_Start() +} + +func ExampleCrontab_Start() { + var ( + crontab *Crontab + ctx = context.Background() + err error + worker *Worker + ) + + go func() { + time.Sleep(time.Second * 5) + + if err == nil { + crontab.Stop() + } + }() + + worker = NewWorker(EveryMinute, &ej{}) + crontab = NewCrontab("example") + if err = crontab.Add(worker); err == nil { + err = crontab.Start(ctx) + } +} diff --git a/src/errors.go b/src/errors.go new file mode 100644 index 0000000..11060e1 --- /dev/null +++ b/src/errors.go @@ -0,0 +1,34 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-16 + +package src + +import ( + "errors" + "fmt" +) + +type Error struct { + Type string + Message, Value string +} + +func (o Error) Error() string { + return fmt.Sprintf(`type=%s: %s`, o.Type, o.Message) +} + +var ( + ErrInvalidStrategy = errors.New("invalid strategy") +) diff --git a/src/job.go b/src/job.go index e2ddb53..b3fdfcd 100644 --- a/src/job.go +++ b/src/job.go @@ -11,49 +11,73 @@ // limitations under the License. // // Author: wsfuyibing <682805@qq.com> -// Date: 2024-07-15 +// Date: 2024-07-16 package src -import ( - "context" -) +import "context" type ( // Job - // is an interface used to define a handler for a job. + // is a required handler of a job, it's called by worker. If an error is + // returned then call JobFailed.OnFailed() otherwise JobSucceed.OnSucceed() + // called. + // + // Called sequence: + // + // 1. OnBefore + // 2. Do + // 3. OnFailed / OnSucceed + // 4. OnFinish + // + // Code: // - // Step: - // - Before - // - Do - // - Failed/Succeed - // - Finish + // func (o *YourJob) Do(ctx context.Context) (err error){ + // // ... + // return + // } Job interface { - // Do - // job handler when system time matches policy time. Do(ctx context.Context) (err error) } // JobBefore - // is an interface used to define a before handler for AOP. + // is an optional handler of a job, it's called by worker. If an error + // returned then quit other handlers otherwise Job.Do() called. + // + // func (o *YourJob) OnBefore(ctx context.Context) (err error){ + // // ... + // return + // } JobBefore interface { OnBefore(ctx context.Context) (err error) } // JobFailed - // is an interface used to define a failed handler for AOP. + // is an optional handler of a job, it's called by worker. + // + // func (o *YourJob) OnFailed(ctx context.Context){ + // // ... + // } JobFailed interface { - OnFailed(ctx context.Context, err error) + OnFailed(ctx context.Context) } // JobFinish - // is an interface used to define a finish handler for AOP. + // is an optional handler of a job, it's called by worker. + // + // func (o *YourJob) OnFinish(ctx context.Context){ + // // ... + // } JobFinish interface { OnFinish(ctx context.Context) } // JobSucceed - // is an interface used to define a succeed handler for AOP. + // is an optional handler of a job, it's called by worker. + // + // func (o *YourJob) OnSucceed(ctx context.Context){ + // // ... + // } JobSucceed interface { OnSucceed(ctx context.Context) } diff --git a/src/strategy.go b/src/strategy.go new file mode 100644 index 0000000..ed54b0d --- /dev/null +++ b/src/strategy.go @@ -0,0 +1,48 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-16 + +package src + +import ( + "github.com/gorhill/cronexpr" +) + +// Strategy +// is a type name used to define timed execution time for crontab like follows. +// +// - "*/1 * * * * * *" (every second) +// - "0 1/* * * * * *" (every minute) +// - "0 0 1/* * * * *" (every hour) +type Strategy string + +// Common strategies. + +const ( + EverySecond Strategy = "*/1 * * * * * *" + EveryMinute Strategy = "0 */1 * * * * *" + EveryHour Strategy = "0 0 */1 * * * *" +) + +// Parse +// timed execution time. +func (o Strategy) Parse() (*cronexpr.Expression, error) { + return cronexpr.Parse(o.String()) +} + +// String +// convert strategy as string. +func (o Strategy) String() string { + return string(o) +} diff --git a/src/worker.go b/src/worker.go index bf72733..5f1aeea 100644 --- a/src/worker.go +++ b/src/worker.go @@ -11,7 +11,7 @@ // limitations under the License. // // Author: wsfuyibing <682805@qq.com> -// Date: 2024-07-15 +// Date: 2024-07-16 package src @@ -25,265 +25,83 @@ import ( "time" ) -const ( - // DefaultNodeUnique - // when enabled, only one job is allowed to be in execution per node. - DefaultNodeUnique = true - - // DefaultGloballyUnique - // when enabled, only one job is allowed to be in execution per cluster. It's work dependent - // on redis. - DefaultGloballyUnique = false - - // DefaultRunOnStartup - // when enabled, crontab is immediately scheduled upon startup. - DefaultRunOnStartup = false -) - // Worker -// is a component of crontab. +// is a job manager that schedule in crontab. It's call job handlers in a +// goroutine. type Worker struct { class string - cron *cron + crontab *Crontab expression *cronexpr.Expression job Job - timeline Timeline - - runCount int32 - runTime time.Time - runTotal uint64 + strategy Strategy - nodeUnique, globallyUnique bool - runOnStartup bool + runProcessing int32 + runTime time.Time + runTotal uint64 + runOnStartup bool } // NewWorker -// creates a worker for crontab. -func NewWorker(timeline Timeline, job Job) *Worker { - ref := reflect.TypeOf(job) - worker := &Worker{timeline: timeline, job: job} - - if ref.Kind() == reflect.Pointer { - elem := ref.Elem() - worker.class = fmt.Sprintf(`%s.%s`, elem.PkgPath(), elem.Name()) - } else { - worker.class = fmt.Sprintf(`%s.%s`, ref.PkgPath(), ref.Name()) - } - - worker.nodeUnique = DefaultNodeUnique - worker.globallyUnique = DefaultGloballyUnique - worker.runOnStartup = DefaultRunOnStartup - return worker -} - -// Do -// crontab worker. -func (o *Worker) Do(ctx context.Context, tm time.Time) { - // Set last scheduled time. - o.runTime = tm - o.cron.increment() - - // Statistics increment. - atomic.AddUint64(&o.runTotal, 1) - atomic.AddInt32(&o.runCount, 1) - - // Create span based on parent. - span := log.NewSpanWithContext(ctx, o.class) - span.SetAttr(`crontab.scheduled.class`, o.class) - span.SetAttr(`crontab.scheduled.count`, o.runTotal) - span.SetAttr(`crontab.scheduled.time`, tm) - span.SetAttr(`crontab.scheduled.time.next`, o.expression.Next(tm)) - span.SetAttr(`crontab.scheduled.timeline`, o.timeline) - - // Called when done. - defer func() { - atomic.AddInt32(&o.runCount, -1) - span.End() - o.cron.decrement() - }() - - // Before handler. - if v, ok := o.job.(JobBefore); ok { - if o.doBefore(span.Context(), v.OnBefore) != nil { - return - } - } - - // Finish handler. - defer func() { - if v, ok := o.job.(JobFinish); ok { - o.doFinish(span.Context(), v.OnFinish) - } - }() - - // Job handler. - if err := o.do(span.Context(), o.job.Do); err != nil { - // Failed handler. - if v, ok := o.job.(JobFailed); ok { - o.doFailed(span.Context(), err, v.OnFailed) - } +// creates a worker with job. +func NewWorker(strategy Strategy, job Job) *Worker { + o := &Worker{strategy: strategy, job: job} + r := reflect.TypeOf(job) + + // Worker class. + if r.Kind() == reflect.Ptr { + // Class from pointer instance. + e := r.Elem() + o.class = fmt.Sprintf(`%s.%s`, e.PkgPath(), e.Name()) + } else if r.Kind() == reflect.Struct { + // Class from struct instance. + o.class = fmt.Sprintf(`%s.%s`, r.PkgPath(), r.Name()) } else { - // Succeed handler. - if v, ok := o.job.(JobSucceed); ok { - o.doSucceed(span.Context(), v.OnSucceed) + // Unsupported job type. + if job != nil { + o.job = nil } } + return o } -func (o *Worker) NodeUnique() bool { - return o.nodeUnique -} - -func (o *Worker) GloballyUnique() bool { - return o.globallyUnique -} - -func (o *Worker) RunOnStartup() bool { - return o.runOnStartup -} - -// Match -// matches the system time with worker timeline, Return true if match succeed. -func (o *Worker) Match(tm time.Time) bool { - // Return true - // if worker scheduled for first time and run on startup is enabled. - if o.runOnStartup && atomic.LoadUint64(&o.runTotal) == 0 { +func (o *Worker) Access(tm time.Time) bool { + if atomic.LoadUint64(&o.runTotal) == 0 { return true } - - // Return false - // if system time not matched on timed execution time. - if o.expression.Next(tm.Add(-1*time.Second)).Unix() != tm.Unix() { - return false - } - - // Return false - // if worker scheduling by other goroutine in the same node and node unique - // enabled. - if o.nodeUnique && atomic.LoadInt32(&o.runCount) > 0 { - return false - } - - // Return false - // if worker scheduling by other goroutine in the same node order other node - // goroutine. - if o.globallyUnique { - } - - // Allow schedule. - return true + return false } -func (o *Worker) SetNodeUnique(v bool) *Worker { - o.nodeUnique = v - return o -} +// GetClass +// gets worker class. +func (o *Worker) GetClass() string { return o.class } -func (o *Worker) SetGloballyUnique(v bool) *Worker { - o.globallyUnique = v - return o -} +// GetStrategy +// gets worker strategy. +func (o *Worker) GetStrategy() Strategy { return o.strategy } -func (o *Worker) SetRunOnStartup(v bool) *Worker { - o.runOnStartup = v - return o -} +// Run +// runs the worker. +func (o *Worker) run(ctx context.Context, tm time.Time) (err error) { + var ( + layout = "2006-01-02 15:04:05 Z07:00" + span log.Span + ) -// +---------------------------------------------------------------------------+ -// | Access methods | -// +---------------------------------------------------------------------------+ - -func (o *Worker) do(ctx context.Context, handler func(context.Context) error) (err error) { - // Span created. - span := log.NewSpanWithContext(ctx, `on:do`) - - // Call when handler done. - defer func() { - // Catch runtime panic. - if r := recover(); r != nil { - err = fmt.Errorf(`%v`, r) - span.Fatal(`fatal occurred on handler: %v`, err) - } - // Close span. - span.End() - }() + o.runTime = tm - // Call job handler. - err = handler(span.Context()) - return -} + atomic.AddUint64(&o.runTotal, 1) + atomic.AddInt32(&o.runProcessing, 1) -func (o *Worker) doBefore(ctx context.Context, handler func(ctx context.Context) error) (err error) { - // Span created. - span := log.NewSpanWithContext(ctx, `on:before`) + span = log.NewSpanWithContext(ctx, o.class) + span.SetAttr("crontab.scheduled.class", o.class) + span.SetAttr("crontab.scheduled.strategy", o.strategy) + span.SetAttr("crontab.scheduled.time", tm.Format(layout)) + span.SetAttr("crontab.scheduled.time.next", o.expression.Next(tm).Format(layout)) - // Call when handler done. defer func() { - // Catch runtime panic. - if r := recover(); r != nil { - err = fmt.Errorf(`%v`, r) - span.Fatal(`fatal occurred on before handler: %v`, err) - } - // Close span. span.End() + atomic.AddInt32(&o.runProcessing, -1) }() - // Call job handler. - err = handler(span.Context()) return } - -func (o *Worker) doFailed(ctx context.Context, err error, handler func(context.Context, error)) { - // Span created. - span := log.NewSpanWithContext(ctx, `on:failed`) - - // Call when handler done. - defer func() { - // Catch runtime panic. - if r := recover(); r != nil { - err = fmt.Errorf(`%v`, r) - span.Fatal(`fatal occurred on failed handler: %v`, err) - } - // Close span. - span.End() - }() - - // Call job handler. - handler(span.Context(), err) -} - -func (o *Worker) doFinish(ctx context.Context, handler func(context.Context)) { - // Span created. - span := log.NewSpanWithContext(ctx, `on:finish`) - - // Call when handler done. - defer func() { - // Catch runtime panic. - if r := recover(); r != nil { - span.Fatal(`fatal occurred on finish handler: %v`, r) - } - // Close span. - span.End() - }() - - // Call job handler. - handler(span.Context()) -} - -func (o *Worker) doSucceed(ctx context.Context, handler func(context.Context)) { - // Span created. - span := log.NewSpanWithContext(ctx, `on:succeed`) - - // Call when handler done. - defer func() { - // Catch runtime panic. - if r := recover(); r != nil { - span.Fatal(`fatal occurred on succeed handler: %v`, r) - } - // Close span. - span.End() - }() - - // Call job handler. - handler(span.Context()) -} diff --git a/src/worker_test.go b/src/worker_test.go new file mode 100644 index 0000000..872f9f3 --- /dev/null +++ b/src/worker_test.go @@ -0,0 +1,38 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: wsfuyibing <682805@qq.com> +// Date: 2024-07-16 + +package src + +import ( + "context" + "testing" +) + +type ej struct{} + +func (o *ej) Do(_ context.Context) (err error) { return } + +func TestNewWorker(t *testing.T) { + ExampleNewWorker() +} + +func ExampleNewWorker() { + worker := NewWorker(EveryMinute, &ej{}) + println("class:", worker.class) + println("strategy:", worker.strategy) + // Output: + // class: gitee.com/go-libs/crontab/src.ej + // strategy: 0 */1 * * * * * +} -- Gitee From d66aaf7655d8b149f533122b75e3a2dae0a67cb2 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 13:54:24 +0800 Subject: [PATCH 07/12] init --- .gitignore | 5 ++++- init.go | 30 ++++++++++++++---------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 62c8935..773efd0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -.idea/ \ No newline at end of file +.idea/ +config/ + +*.bak \ No newline at end of file diff --git a/init.go b/init.go index 6807df2..e8f8c49 100644 --- a/init.go +++ b/init.go @@ -15,32 +15,30 @@ package crontab -import "gitee.com/go-libs/crontab/src" +import ( + "gitee.com/go-libs/crontab/src" +) // Export src types. type ( - Cron = src.Cron - Job = src.Job - JobBefore = src.JobBefore - JobFailed = src.JobFailed - JobFinish = src.JobFinish - JobSucceed = src.JobSucceed - Timeline = src.Timeline - Worker = src.Worker + Cron = src.Cron + Crontab = src.Crontab + Job = src.Job + Strategy = src.Strategy + Worker = src.Worker ) // Export constants. const ( - TimelineEverySecond = src.TimelineEverySecond - TimelineEveryMinute = src.TimelineEveryMinute - TimelineEveryHour = src.TimelineEveryHour + EverySecond = src.EverySecond + EveryMinute = src.EveryMinute + EveryHour = src.EveryHour ) // Export src functions. -var ( - New = src.New - NewWorker = src.NewWorker -) +func New(name string) Cron { + return src.NewCrontab(name) +} -- Gitee From 823113daeee8dfde4a7ee5728d67dd3775af5832 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 17:25:40 +0800 Subject: [PATCH 08/12] init --- examples/simple/crontabs/example.go | 10 +- examples/simple/main.go | 8 +- init.go | 18 +- src/cron.go | 13 ++ src/crontab.go | 159 ++++++++++------- src/crontab_test.go | 2 +- src/errors.go | 5 - src/job.go | 8 +- src/worker.go | 263 +++++++++++++++++++++++++--- 9 files changed, 373 insertions(+), 113 deletions(-) diff --git a/examples/simple/crontabs/example.go b/examples/simple/crontabs/example.go index c9aa052..f6c62d9 100644 --- a/examples/simple/crontabs/example.go +++ b/examples/simple/crontabs/example.go @@ -22,11 +22,6 @@ import ( type Example struct{} -func (o *Example) Do(ctx context.Context) (err error) { - log.Infofc(ctx, `job.Do()`) - return -} - func (o *Example) OnBefore(ctx context.Context) (err error) { log.Infofc(ctx, `job.OnBefore()`) return @@ -40,6 +35,11 @@ func (o *Example) OnFinish(ctx context.Context) { log.Infofc(ctx, `job.OnFinish()`) } +func (o *Example) OnRun(ctx context.Context) (err error) { + log.Infofc(ctx, `job.Do()`) + return +} + func (o *Example) OnSucceed(ctx context.Context) { log.Infofc(ctx, `job.OnSucceed()`) } diff --git a/examples/simple/main.go b/examples/simple/main.go index b7b31c4..bf71717 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -26,7 +26,7 @@ import ( type my struct { } -func (o *my) Do(_ context.Context) (err error) { return nil } +func (o *my) OnRun(_ context.Context) (err error) { return nil } func main() { var ( @@ -39,7 +39,7 @@ func main() { defer log.Close() go func() { - time.Sleep(time.Second * 5) + time.Sleep(time.Minute * 2) if err != nil { return } @@ -47,12 +47,12 @@ func main() { cancel() }() - worker = crontab.NewWorker(crontab.TimelineEveryMinute, &crontabs.Example{}) + worker = crontab.NewWorker(crontab.EveryMinute, &crontabs.Example{}) worker.SetRunOnStartup(true) if err = cron.Add(worker); err == nil { if err = cron.Add( - crontab.NewWorker(crontab.TimelineEveryHour, &my{}). + crontab.NewWorker(crontab.EveryHour, &my{}). SetRunOnStartup(true), ); err == nil { err = cron.Start(ctx) diff --git a/init.go b/init.go index e8f8c49..82119f3 100644 --- a/init.go +++ b/init.go @@ -22,11 +22,15 @@ import ( // Export src types. type ( - Cron = src.Cron - Crontab = src.Crontab - Job = src.Job - Strategy = src.Strategy - Worker = src.Worker + Cron = src.Cron + Crontab = src.Crontab + Job = src.Job + JobBefore = src.JobBefore + JobFailed = src.JobFailed + JobFinish = src.JobFinish + JobSucceed = src.JobSucceed + Strategy = src.Strategy + Worker = src.Worker ) // Export constants. @@ -42,3 +46,7 @@ const ( func New(name string) Cron { return src.NewCrontab(name) } + +var ( + NewWorker = src.NewWorker +) diff --git a/src/cron.go b/src/cron.go index 7242dc0..6c2ef53 100644 --- a/src/cron.go +++ b/src/cron.go @@ -15,10 +15,23 @@ package src +import ( + "context" + "gitee.com/go-libs/process" +) + // Cron // is an interface of Crontab. type Cron interface { // Add // a worker into crontab. Add(worker *Worker) error + + GetWorker(class string) *Worker + GetWorkers() map[string]*Worker + Process() process.Process + Start(ctx context.Context) error + Started() bool + Stop() + Stopped() bool } diff --git a/src/crontab.go b/src/crontab.go index f6a11eb..bb54630 100644 --- a/src/crontab.go +++ b/src/crontab.go @@ -18,23 +18,25 @@ package src import ( "context" "gitee.com/go-libs/process" - "gitee.com/go-wares/log" - "sort" "sync" + "sync/atomic" "time" ) // Crontab -// is a scheduler for crontab. +// is a scheduler for crontab. It's worked as a provider of the process. type Crontab struct { mu *sync.RWMutex - process process.Process name string + process process.Process workers map[string]*Worker + + scheduleFailed, scheduleSucceed uint64 + scheduleRunning int32 } // NewCrontab -// creates a Crontab instance. +// creates a new crontab instance. func NewCrontab(name string) *Crontab { o := &Crontab{mu: new(sync.RWMutex), name: name} o.process = process.New(o.name, o) @@ -43,7 +45,7 @@ func NewCrontab(name string) *Crontab { } // Add -// a worker into crontab. +// adds a worker into crontab. func (o *Crontab) Add(worker *Worker) error { o.mu.Lock() defer o.mu.Unlock() @@ -53,7 +55,7 @@ func (o *Crontab) Add(worker *Worker) error { if worker == nil { return Error{ Type: "crontab", - Message: "worker can not be nil", + Message: "the worker you added can not be nil", } } @@ -62,7 +64,7 @@ func (o *Crontab) Add(worker *Worker) error { if worker.job == nil { return Error{ Type: "worker", - Message: "job for worker not allowed", + Message: "the job for worker not allowed", } } @@ -72,7 +74,7 @@ func (o *Crontab) Add(worker *Worker) error { if worker.expression, err = worker.strategy.Parse(); err != nil { return Error{ Type: "strategy", - Message: "invalid strategy", + Message: "invalid timed execution strategy for the worker", Value: worker.strategy.String(), } } @@ -82,7 +84,7 @@ func (o *Crontab) Add(worker *Worker) error { if _, ok := o.workers[worker.class]; ok { return Error{ Type: "crontab", - Message: "worker added already", + Message: "the worker added already", Value: worker.class, } } @@ -107,6 +109,8 @@ func (o *Crontab) DelByClass(class string) { o.mu.Lock() defer o.mu.Unlock() + // Remove + // from mapping if exists. if _, ok := o.workers[class]; ok { delete(o.workers, class) } @@ -114,20 +118,75 @@ func (o *Crontab) DelByClass(class string) { // GetName // returns the crontab name. -func (o *Crontab) GetName() string { return o.name } +func (o *Crontab) GetName() string { + return o.name +} +// GetWorker +// returns the worker with given class. +func (o *Crontab) GetWorker(class string) *Worker { + o.mu.RLock() + defer o.mu.RUnlock() + if worker, ok := o.workers[class]; ok { + return worker + } + return nil +} + +// GetWorkers +// returns the added worker mapping. +func (o *Crontab) GetWorkers() map[string]*Worker { + o.mu.RLock() + defer o.mu.RUnlock() + return o.workers +} + +// Process +// returns a process of the crontab. +func (o *Crontab) Process() process.Process { + return o.process +} + +// Start +// starts the crontab service by call process. func (o *Crontab) Start(ctx context.Context) (err error) { return o.process.Start(ctx) } +// Started +// return true if process started. +func (o *Crontab) Started() bool { + return o.process.Started() +} + +// Stop +// stops the crontab service by call process. func (o *Crontab) Stop() { o.process.Stop() } +// Stopped +// return true if process stopped. +func (o *Crontab) Stopped() bool { + return o.process.Stopped() +} + // +---------------------------------------------------------------------------+ // | Implemented on process | // +---------------------------------------------------------------------------+ +// After +// wait all running scheduler done before stopped. It's called by process, do not +// call it directly. +func (o *Crontab) After(_ context.Context) (err error) { + for { + if atomic.LoadInt32(&o.scheduleRunning) == 0 { + return + } + time.Sleep(time.Millisecond * 10) + } +} + // Run // crontab service. It's called by process, do not call it directly. func (o *Crontab) Run(ctx context.Context) (err error) { @@ -154,66 +213,34 @@ func (o *Crontab) Run(ctx context.Context) (err error) { // +---------------------------------------------------------------------------+ func (o *Crontab) schedule(ctx context.Context, tm time.Time) { - var ( - count, total, failed, succeed int - list = make([]string, 0) - mapper = make(map[string]*Worker) - span log.Span - wait *sync.WaitGroup - ) - - // Iterate added workers in crontab. - for _, worker := range func() map[string]*Worker { + // Iterate + // added workers in crontab. + for _, x := range func() map[string]*Worker { o.mu.RLock() defer o.mu.RUnlock() return o.workers }() { - total++ - if worker.Access(tm) { - list = append(list, worker.class) - mapper[worker.class] = worker - } - } - - // Do nothing - // if no workers accessed. - if count = len(list); count == 0 { - return - } - - // Prepare - // scheduler span and cleanup when done. - span = log.NewSpanWithContext(ctx, "crontab.scheduler") - span.Info(`scheduler begin: count="%d", total="%d"`, count, total) - wait = new(sync.WaitGroup) - - defer func() { - wait.Wait() - span.Info(`scheduler finish: failed="%d", succeed="%d"`, failed, succeed) - span.End() - }() - - sort.Strings(list) - - span.SetAttr("crontab.scheduler.count", count) - span.SetAttr("crontab.scheduler.total", total) - span.SetAttr("crontab.scheduler.name", o.name) - span.SetAttr("crontab.scheduler.jobs", list) - span.SetAttr("crontab.scheduler.time", tm.Format("2006-01-02 15:04:05 Z07:00")) - - // Iterate - // access workers. - for _, class := range list { - if v, ok := mapper[class]; ok { - wait.Add(1) - go func(worker *Worker) { - defer wait.Done() - if worker.run(span.Context(), tm) == nil { - succeed++ + // Increment running statistic. + atomic.AddInt32(&o.scheduleRunning, 1) + // Schedule in a goroutine. + go func(worker *Worker) { + // Decrement + // when done. + defer atomic.AddInt32(&o.scheduleRunning, -1) + // Run worker. + if do, err := worker.run(ctx, tm); do { + // Increment + // result statistic. + if err == nil { + // Succeed + // to run. + atomic.AddUint64(&o.scheduleSucceed, 1) } else { - failed++ + // Failed + // to run. + atomic.AddUint64(&o.scheduleFailed, 1) } - }(v) - } + } + }(x) } } diff --git a/src/crontab_test.go b/src/crontab_test.go index c824394..3804a02 100644 --- a/src/crontab_test.go +++ b/src/crontab_test.go @@ -27,7 +27,7 @@ func TestNewCrontab(t *testing.T) { func ExampleNewCrontab() { crontab := NewCrontab("example") - println("crontab:", crontab.name) + println("crontab:", crontab.GetName()) // Output: // crontab: example } diff --git a/src/errors.go b/src/errors.go index 11060e1..052a486 100644 --- a/src/errors.go +++ b/src/errors.go @@ -16,7 +16,6 @@ package src import ( - "errors" "fmt" ) @@ -28,7 +27,3 @@ type Error struct { func (o Error) Error() string { return fmt.Sprintf(`type=%s: %s`, o.Type, o.Message) } - -var ( - ErrInvalidStrategy = errors.New("invalid strategy") -) diff --git a/src/job.go b/src/job.go index b3fdfcd..5f081a9 100644 --- a/src/job.go +++ b/src/job.go @@ -32,17 +32,17 @@ type ( // // Code: // - // func (o *YourJob) Do(ctx context.Context) (err error){ + // func (o *YourJob) OnRun(ctx context.Context) (err error){ // // ... // return // } Job interface { - Do(ctx context.Context) (err error) + OnRun(ctx context.Context) (err error) } // JobBefore // is an optional handler of a job, it's called by worker. If an error - // returned then quit other handlers otherwise Job.Do() called. + // returned then quit other handlers otherwise Job.OnRun() called. // // func (o *YourJob) OnBefore(ctx context.Context) (err error){ // // ... @@ -59,7 +59,7 @@ type ( // // ... // } JobFailed interface { - OnFailed(ctx context.Context) + OnFailed(ctx context.Context, err error) } // JobFinish diff --git a/src/worker.go b/src/worker.go index 5f1aeea..0635b72 100644 --- a/src/worker.go +++ b/src/worker.go @@ -25,6 +25,14 @@ import ( "time" ) +const ( + DefaultGloballyUnique = false + DefaultNodeUnique = true + DefaultRunOnStartup = false + + DefaultSchedulerTimeFormat = "2006-01-02 15:04:05 Z07:00" +) + // Worker // is a job manager that schedule in crontab. It's call job handlers in a // goroutine. @@ -35,10 +43,10 @@ type Worker struct { job Job strategy Strategy - runProcessing int32 - runTime time.Time - runTotal uint64 - runOnStartup bool + runProcessing int32 + runTime time.Time + runTotal uint64 + runOnStartup, nodeUnique, globallyUnique bool } // NewWorker @@ -61,14 +69,12 @@ func NewWorker(strategy Strategy, job Job) *Worker { o.job = nil } } - return o -} -func (o *Worker) Access(tm time.Time) bool { - if atomic.LoadUint64(&o.runTotal) == 0 { - return true - } - return false + // With default fields and return. + o.SetGloballyUnique(DefaultGloballyUnique) + o.SetNodeUnique(DefaultNodeUnique) + o.SetRunOnStartup(DefaultRunOnStartup) + return o } // GetClass @@ -77,31 +83,242 @@ func (o *Worker) GetClass() string { return o.class } // GetStrategy // gets worker strategy. -func (o *Worker) GetStrategy() Strategy { return o.strategy } +func (o *Worker) GetStrategy() Strategy { + return o.strategy +} + +// SetGloballyUnique +// skip if previous scheduling in any node is not completed and value of v +// is true. +func (o *Worker) SetGloballyUnique(v bool) *Worker { + o.globallyUnique = v + return o +} + +// SetNodeUnique +// skip if previous scheduling is not completed and value of v is true. +func (o *Worker) SetNodeUnique(v bool) *Worker { + o.nodeUnique = v + return o +} + +// SetRunOnStartup +// schedule the job once immediately when crontab starts if enabled. +func (o *Worker) SetRunOnStartup(v bool) *Worker { + o.runOnStartup = v + return o +} // Run // runs the worker. -func (o *Worker) run(ctx context.Context, tm time.Time) (err error) { - var ( - layout = "2006-01-02 15:04:05 Z07:00" - span log.Span - ) +func (o *Worker) run(ctx context.Context, tm time.Time) (do bool, err error) { + // Called + // when done. + defer func() { + if do { + err = o.schedule(ctx, tm) + } + }() - o.runTime = tm + // Accessed + // if job configured as run-on-startup and schedule at first time. + if o.runOnStartup { + if atomic.LoadUint64(&o.runTotal) == 0 { + do = true + return + } + } + + // Compare + // timed execution time with strategy. Return false if not matched. + if tm.Unix() != o.expression.Next(tm.Add(-1*time.Second)).Unix() { + return + } + + // Unique on node checker, Return false if job configured as node-unique and + // previous scheduling not completed. + if o.nodeUnique { + if atomic.LoadInt32(&o.runProcessing) > 0 { + return + } + } + + // Unique on cluster checker, Return false if job configured as globally-unique + // and previous scheduling in any node not completed. It's dependent on redis + // lock. + if o.globallyUnique { + // todo: distributed lock with redis. + } - atomic.AddUint64(&o.runTotal, 1) + // End access check. + do = true + return +} + +// Schedule +// job handlers. +func (o *Worker) schedule(ctx context.Context, tm time.Time) (err error) { + var span log.Span + + // Set + // last schedule state when scheduler begin. + o.runTime = tm + count := atomic.AddUint64(&o.runTotal, 1) atomic.AddInt32(&o.runProcessing, 1) + // Create span. span = log.NewSpanWithContext(ctx, o.class) - span.SetAttr("crontab.scheduled.class", o.class) - span.SetAttr("crontab.scheduled.strategy", o.strategy) - span.SetAttr("crontab.scheduled.time", tm.Format(layout)) - span.SetAttr("crontab.scheduled.time.next", o.expression.Next(tm).Format(layout)) + span.Info(`schedule begin: class="%s"`, o.class) + span.SetAttr(`crontab.scheduler.count`, count) + span.SetAttr(`crontab.scheduler.class`, o.class) + span.SetAttr(`crontab.scheduler.strategy`, o.strategy) + span.SetAttr(`crontab.scheduler.time`, tm.Format(DefaultSchedulerTimeFormat)) + span.SetAttr(`crontab.scheduler.time.next`, o.expression.Next(tm).Format(DefaultSchedulerTimeFormat)) + // Cleanup + // when scheduler finish. defer func() { + // Log scheduler result. + if err != nil { + span.Info("schedule failed: %v", err) + } else { + span.Info("schedule succeed") + } + + // Close span. span.End() + + // Reset statistic. atomic.AddInt32(&o.runProcessing, -1) }() + // Before + // handler called. + if v, ok := o.job.(JobBefore); ok { + if err = o.scheduleBefore(span.Context(), v.OnBefore); err != nil { + return + } + } + + // Finish + // handler called. + defer func() { + if v, ok := o.job.(JobFinish); ok { + o.scheduleFinish(span.Context(), v.OnFinish) + } + }() + + // Process + // handler called. + if err = o.scheduleRun(span.Context(), o.job.OnRun); err != nil { + // Failed + // handler called. + if v, ok := o.job.(JobFailed); ok { + o.scheduleFailed(span.Context(), err, v.OnFailed) + } + } else { + // Succeed + // handler called. + if v, ok := o.job.(JobSucceed); ok { + o.scheduleSucceed(span.Context(), v.OnSucceed) + } + } + return +} + +func (o *Worker) scheduleBefore(ctx context.Context, handler func(context.Context) error) (err error) { + var span = log.NewSpanWithContext(ctx, `on:before`) + + // Cleanup + // when done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal on before: %v`, r) + } + + // Close span. + span.End() + }() + + // Call handler. + err = handler(span.Context()) + return +} + +func (o *Worker) scheduleRun(ctx context.Context, handler func(context.Context) error) (err error) { + var span = log.NewSpanWithContext(ctx, `on:run`) + + // Cleanup + // when done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal on do: %v`, r) + } + + // Close span. + span.End() + }() + + // Call handler. + err = handler(span.Context()) return } + +func (o *Worker) scheduleFailed(ctx context.Context, err error, handler func(context.Context, error)) { + var span = log.NewSpanWithContext(ctx, `on:failed`) + + // Cleanup + // when done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal on failed: %v`, r) + } + + // Close span. + span.End() + }() + + // Call handler. + handler(span.Context(), err) +} + +func (o *Worker) scheduleFinish(ctx context.Context, handler func(context.Context)) { + var span = log.NewSpanWithContext(ctx, `on:finish`) + + // Cleanup + // when done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal on finish: %v`, r) + } + + // Close span. + span.End() + }() + + // Call handler. + handler(span.Context()) +} + +func (o *Worker) scheduleSucceed(ctx context.Context, handler func(context.Context)) { + var span = log.NewSpanWithContext(ctx, `on:succeed`) + + // Cleanup + // when done. + defer func() { + // Catch runtime panic. + if r := recover(); r != nil { + span.Fatal(`fatal on succeed: %v`, r) + } + + // Close span. + span.End() + }() + + // Call handler. + handler(span.Context()) +} -- Gitee From 4827b32ccd2f73b3889f39bdefbb4af1ab54b171 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 17:26:47 +0800 Subject: [PATCH 09/12] init --- init.go | 1 - src/worker_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/init.go b/init.go index 82119f3..4881628 100644 --- a/init.go +++ b/init.go @@ -36,7 +36,6 @@ type ( // Export constants. const ( - EverySecond = src.EverySecond EveryMinute = src.EveryMinute EveryHour = src.EveryHour ) diff --git a/src/worker_test.go b/src/worker_test.go index 872f9f3..35b5db9 100644 --- a/src/worker_test.go +++ b/src/worker_test.go @@ -22,7 +22,7 @@ import ( type ej struct{} -func (o *ej) Do(_ context.Context) (err error) { return } +func (o *ej) OnRun(_ context.Context) (err error) { return } func TestNewWorker(t *testing.T) { ExampleNewWorker() -- Gitee From d0ad2e95322f71e913d4f07de6d6f2e3532f3b4e Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 17:27:02 +0800 Subject: [PATCH 10/12] init --- src/strategy.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/strategy.go b/src/strategy.go index ed54b0d..6b88eeb 100644 --- a/src/strategy.go +++ b/src/strategy.go @@ -30,7 +30,6 @@ type Strategy string // Common strategies. const ( - EverySecond Strategy = "*/1 * * * * * *" EveryMinute Strategy = "0 */1 * * * * *" EveryHour Strategy = "0 0 */1 * * * *" ) -- Gitee From fef26d74618b7bcc66be8dbca46a072af9289b9b Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 17:28:50 +0800 Subject: [PATCH 11/12] init --- src/timeline.go | 41 ----------------------------------------- 1 file changed, 41 deletions(-) delete mode 100644 src/timeline.go diff --git a/src/timeline.go b/src/timeline.go deleted file mode 100644 index 84276a0..0000000 --- a/src/timeline.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Author: wsfuyibing <682805@qq.com> -// Date: 2024-07-15 - -package src - -import "github.com/gorhill/cronexpr" - -// Timeline -// is a type name used for timed execution time. -type Timeline string - -const ( - TimelineEverySecond Timeline = "*/1 * * * * * *" - - TimelineEveryMinute Timeline = "0 */1 * * * * *" - TimelineEveryHour Timeline = "0 0 */1 * * * *" -) - -// Parse -// parses the timed execution time. -func (o Timeline) Parse() (*cronexpr.Expression, error) { - return cronexpr.Parse(o.String()) -} - -// String -// returns a formatted string for timed execution time. -func (o Timeline) String() string { - return string(o) -} -- Gitee From ee164521d3256b6c5d696c11c012724033da0951 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Wed, 17 Jul 2024 18:01:45 +0800 Subject: [PATCH 12/12] update comment --- init.go | 53 ++++++++++++++++++++++++++++++++++++++++--------- src/crontab.go | 2 +- src/strategy.go | 13 +++++++++++- src/worker.go | 18 ++++++++++++----- 4 files changed, 70 insertions(+), 16 deletions(-) diff --git a/init.go b/init.go index 4881628..2f39538 100644 --- a/init.go +++ b/init.go @@ -22,30 +22,65 @@ import ( // Export src types. type ( - Cron = src.Cron - Crontab = src.Crontab - Job = src.Job - JobBefore = src.JobBefore - JobFailed = src.JobFailed - JobFinish = src.JobFinish + // Cron + // is an alias for src.Cron. + Cron = src.Cron + + // Crontab + // is an alias for src.Crontab. + Crontab = src.Crontab + + // Job + // is an alias for src.Job. + Job = src.Job + + // JobBefore + // is an alias for src.JobBefore. + JobBefore = src.JobBefore + + // JobFailed + // is an alias for src.JobFailed. + JobFailed = src.JobFailed + + // JobFinish + // is an alias for src.JobFinish. + JobFinish = src.JobFinish + + // JobSucceed + // is an alias for src.JobSucceed. JobSucceed = src.JobSucceed - Strategy = src.Strategy - Worker = src.Worker + + // Strategy + // is an alias for src.Strategy. + Strategy = src.Strategy + + // Worker + // is an alias for src.Worker. + Worker = src.Worker ) // Export constants. const ( + // EveryMinute + // is an alias for src.EveryMinute strategy. EveryMinute = src.EveryMinute - EveryHour = src.EveryHour + + // EveryHour + // is an alias for src.EveryHour strategy. + EveryHour = src.EveryHour ) // Export src functions. +// New +// creates a crontab with given name. func New(name string) Cron { return src.NewCrontab(name) } var ( + // NewWorker + // is an alias for src.Worker creator. NewWorker = src.NewWorker ) diff --git a/src/crontab.go b/src/crontab.go index bb54630..176b4a7 100644 --- a/src/crontab.go +++ b/src/crontab.go @@ -24,7 +24,7 @@ import ( ) // Crontab -// is a scheduler for crontab. It's worked as a provider of the process. +// is a scheduler for crontab. It's worked as a provider for process. type Crontab struct { mu *sync.RWMutex name string diff --git a/src/strategy.go b/src/strategy.go index 6b88eeb..3cc0635 100644 --- a/src/strategy.go +++ b/src/strategy.go @@ -30,8 +30,19 @@ type Strategy string // Common strategies. const ( + // EveryMinute + // schedule once every 0 seconds per minute like follows. + // + // 1. 09:00:00 + // 2. 09:01:00 EveryMinute Strategy = "0 */1 * * * * *" - EveryHour Strategy = "0 0 */1 * * * *" + + // EveryHour + // schedule once every 0 seconds and 0 minutes per hour like follows. + // + // 1. 09:00:00 + // 2. 10:00:00 + EveryHour Strategy = "0 0 */1 * * * *" ) // Parse diff --git a/src/worker.go b/src/worker.go index 0635b72..fc977e5 100644 --- a/src/worker.go +++ b/src/worker.go @@ -26,11 +26,19 @@ import ( ) const ( + // DefaultGloballyUnique + // default to disable globally unique. DefaultGloballyUnique = false - DefaultNodeUnique = true - DefaultRunOnStartup = false - DefaultSchedulerTimeFormat = "2006-01-02 15:04:05 Z07:00" + // DefaultNodeUnique + // default to enabled node unique. + DefaultNodeUnique = true + + // DefaultRunOnStartup + // default to disable run for crontab startup. + DefaultRunOnStartup = false + + defaultSchedulerTimeFormat = "2006-01-02 15:04:05 Z07:00" ) // Worker @@ -172,8 +180,8 @@ func (o *Worker) schedule(ctx context.Context, tm time.Time) (err error) { span.SetAttr(`crontab.scheduler.count`, count) span.SetAttr(`crontab.scheduler.class`, o.class) span.SetAttr(`crontab.scheduler.strategy`, o.strategy) - span.SetAttr(`crontab.scheduler.time`, tm.Format(DefaultSchedulerTimeFormat)) - span.SetAttr(`crontab.scheduler.time.next`, o.expression.Next(tm).Format(DefaultSchedulerTimeFormat)) + span.SetAttr(`crontab.scheduler.time`, tm.Format(defaultSchedulerTimeFormat)) + span.SetAttr(`crontab.scheduler.time.next`, o.expression.Next(tm).Format(defaultSchedulerTimeFormat)) // Cleanup // when scheduler finish. -- Gitee