From c522e416d44262062aa3883bdcc6e13cb485c8d5 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Thu, 12 Dec 2024 17:20:33 +0800 Subject: [PATCH 1/2] sleep for connection closed --- src/consumer.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/consumer.go b/src/consumer.go index c04f3a7..2057f69 100644 --- a/src/consumer.go +++ b/src/consumer.go @@ -37,6 +37,7 @@ type consumer struct { name string process process.Process receiving int32 + restarts int64 delayExchange sr.TopicName queue sr.QueueName @@ -221,13 +222,28 @@ func (o *consumer) Run(ctx context.Context) (err error) { // receive counter. defer atomic.AddInt32(&o.receiving, -1) - // Loop - // receiver until context cancelled. + // Loop receiver until context cancelled. for { if ctx == nil || ctx.Err() != nil { return } + o.receive(ctx) + + // Too many redo receiver for connection closed. + if ctx.Err() == nil { + n := atomic.AddInt64(&o.restarts, 1) + + if n > 30 { + time.Sleep(time.Second * 15) + } else if n > 20 { + time.Sleep(time.Second * 10) + } else if n > 10 { + time.Sleep(time.Second * 6) + } else { + time.Sleep(time.Second * 3) + } + } } }() -- Gitee From 32670abf3bd4ceff72eba6c33a81d8f151e51d0f Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Thu, 12 Dec 2024 18:12:07 +0800 Subject: [PATCH 2/2] sleep for connection closed --- go.mod | 2 +- go.sum | 4 ++-- src/consumer.go | 31 ++++++++++++++++++------------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index f4c627e..5237706 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitee.com/gomq/servlet_rabbitmq go 1.22 require ( - gitee.com/go-libs/log v1.1.5 + gitee.com/go-libs/log v1.1.6 gitee.com/go-libs/process v1.1.2 gitee.com/gomq/sr v1.0.0 github.com/rabbitmq/amqp091-go v1.9.0 diff --git a/go.sum b/go.sum index 79743ba..03c7adb 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ gitee.com/go-libs/config v1.0.3 h1:dxNXDqelPxTxjmxc27lJCdQA+/IkYV8wOdd0qIhQeJM= gitee.com/go-libs/config v1.0.3/go.mod h1:azzGgnpQ4cTP7EXl7uctzfZ0L+y+v7JGYf65qfSwsXs= -gitee.com/go-libs/log v1.1.5 h1:WJ7brMbXLe+08Ms9OOSir0YOWRA98xkrtH5iH2IPadU= -gitee.com/go-libs/log v1.1.5/go.mod h1:qILAmim7XJZNAWj5KnJ7q0y/51mfUs4vlanbQbMWYlI= +gitee.com/go-libs/log v1.1.6 h1:1n42NSCGh9A4BYVoEEY8uTPIzjPqZ5SOdqjHAG4lwPo= +gitee.com/go-libs/log v1.1.6/go.mod h1:qILAmim7XJZNAWj5KnJ7q0y/51mfUs4vlanbQbMWYlI= gitee.com/go-libs/process v1.1.2 h1:98nKJ1k3kzBoSPamwYEpyb4avJQilRBmrhx/QURchAk= gitee.com/go-libs/process v1.1.2/go.mod h1:xv7IjHtnK0NhHd+Si40RErxNp9izLjlWeiV4Ki18qKc= gitee.com/gomq/sr v1.0.0 h1:DvktuBvOrnZ5jkmzG404aL0WuDS7PWoO2e8YBdJPES8= diff --git a/src/consumer.go b/src/consumer.go index 2057f69..704fa28 100644 --- a/src/consumer.go +++ b/src/consumer.go @@ -206,6 +206,7 @@ func (o *consumer) Run(ctx context.Context) (err error) { // for first runner. if o.initializer { if err = o.manager.remoting.Initialize(ctx, o.subscription); err != nil { + o.sleeper(ctx) return } o.initializer = false @@ -231,19 +232,7 @@ func (o *consumer) Run(ctx context.Context) (err error) { o.receive(ctx) // Too many redo receiver for connection closed. - if ctx.Err() == nil { - n := atomic.AddInt64(&o.restarts, 1) - - if n > 30 { - time.Sleep(time.Second * 15) - } else if n > 20 { - time.Sleep(time.Second * 10) - } else if n > 10 { - time.Sleep(time.Second * 6) - } else { - time.Sleep(time.Second * 3) - } - } + o.sleeper(ctx) } }() @@ -385,3 +374,19 @@ func (o *consumer) receive(ctx context.Context) { }(v) } } + +func (o *consumer) sleeper(ctx context.Context) { + if ctx.Err() == nil { + n := atomic.AddInt64(&o.restarts, 1) + + if n > 30 { + time.Sleep(time.Second * 15) + } else if n > 20 { + time.Sleep(time.Second * 10) + } else if n > 10 { + time.Sleep(time.Second * 5) + } else if n > 3 { + time.Sleep(time.Second * 3) + } + } +} -- Gitee