diff --git a/go.mod b/go.mod index f4c627e45f93e5684bcf435e0b32896a60c865d6..523770679ae348ffacd256d5bc9f8d0c462608ed 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitee.com/gomq/servlet_rabbitmq go 1.22 require ( - gitee.com/go-libs/log v1.1.5 + gitee.com/go-libs/log v1.1.6 gitee.com/go-libs/process v1.1.2 gitee.com/gomq/sr v1.0.0 github.com/rabbitmq/amqp091-go v1.9.0 diff --git a/go.sum b/go.sum index 79743ba283f1bf90fedd2a684a6871fae7cd8088..03c7adbfd829333d401bb66441aa4d22a1f5fb14 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ gitee.com/go-libs/config v1.0.3 h1:dxNXDqelPxTxjmxc27lJCdQA+/IkYV8wOdd0qIhQeJM= gitee.com/go-libs/config v1.0.3/go.mod h1:azzGgnpQ4cTP7EXl7uctzfZ0L+y+v7JGYf65qfSwsXs= -gitee.com/go-libs/log v1.1.5 h1:WJ7brMbXLe+08Ms9OOSir0YOWRA98xkrtH5iH2IPadU= -gitee.com/go-libs/log v1.1.5/go.mod h1:qILAmim7XJZNAWj5KnJ7q0y/51mfUs4vlanbQbMWYlI= +gitee.com/go-libs/log v1.1.6 h1:1n42NSCGh9A4BYVoEEY8uTPIzjPqZ5SOdqjHAG4lwPo= +gitee.com/go-libs/log v1.1.6/go.mod h1:qILAmim7XJZNAWj5KnJ7q0y/51mfUs4vlanbQbMWYlI= gitee.com/go-libs/process v1.1.2 h1:98nKJ1k3kzBoSPamwYEpyb4avJQilRBmrhx/QURchAk= gitee.com/go-libs/process v1.1.2/go.mod h1:xv7IjHtnK0NhHd+Si40RErxNp9izLjlWeiV4Ki18qKc= gitee.com/gomq/sr v1.0.0 h1:DvktuBvOrnZ5jkmzG404aL0WuDS7PWoO2e8YBdJPES8= diff --git a/src/consumer.go b/src/consumer.go index c04f3a70bae4414010e6a58572774b1c6c95f2d4..704fa28162c1aaf956b574b8fcf4ac30fcf014c3 100644 --- a/src/consumer.go +++ b/src/consumer.go @@ -37,6 +37,7 @@ type consumer struct { name string process process.Process receiving int32 + restarts int64 delayExchange sr.TopicName queue sr.QueueName @@ -205,6 +206,7 @@ func (o *consumer) Run(ctx context.Context) (err error) { // for first runner. if o.initializer { if err = o.manager.remoting.Initialize(ctx, o.subscription); err != nil { + o.sleeper(ctx) return } o.initializer = false @@ -221,13 +223,16 @@ func (o *consumer) Run(ctx context.Context) (err error) { // receive counter. defer atomic.AddInt32(&o.receiving, -1) - // Loop - // receiver until context cancelled. + // Loop receiver until context cancelled. for { if ctx == nil || ctx.Err() != nil { return } + o.receive(ctx) + + // Too many redo receiver for connection closed. + o.sleeper(ctx) } }() @@ -369,3 +374,19 @@ func (o *consumer) receive(ctx context.Context) { }(v) } } + +func (o *consumer) sleeper(ctx context.Context) { + if ctx.Err() == nil { + n := atomic.AddInt64(&o.restarts, 1) + + if n > 30 { + time.Sleep(time.Second * 15) + } else if n > 20 { + time.Sleep(time.Second * 10) + } else if n > 10 { + time.Sleep(time.Second * 5) + } else if n > 3 { + time.Sleep(time.Second * 3) + } + } +}