From bd0a80780ae135d0d1287cab38eab25fe42cf133 Mon Sep 17 00:00:00 2001 From: wsfuyibing Date: Thu, 26 Dec 2024 20:57:01 +0800 Subject: [PATCH] delay for producer --- src/client.go | 3 ++- src/consumer.go | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/client.go b/src/client.go index 100f77d..3ae1170 100644 --- a/src/client.go +++ b/src/client.go @@ -31,6 +31,7 @@ const ( defaultMessageTtl = 604800000 // 7 days (milliseconds) headerConsumedCount = "msr-consumed-count" + headerDelaySeconds = "msr-delay-seconds" headerProducedCount = "msr-produced-count" headerProducedTime = "msr-produced-time" ) @@ -226,7 +227,7 @@ func (o *client) GeneratePublishingWithPayload(payload *sr.Payload) amqp.Publish // Expiration supported. if payload.DelaySeconds > 0 { - publishing.Expiration = fmt.Sprintf(`%d`, payload.DelaySeconds*1000) + publishing.Headers[headerDelaySeconds] = payload.DelaySeconds * 1000 } // Attach counter. diff --git a/src/consumer.go b/src/consumer.go index 860fb48..06fd236 100644 --- a/src/consumer.go +++ b/src/consumer.go @@ -70,8 +70,9 @@ func (o *consumer) OnAcknowledge(ctx context.Context, delivery amqp.Delivery, re func (o *consumer) OnDelivery(conn *amqp.Connection, delivery amqp.Delivery) { var ( - header = o.getHeaders(delivery) - msg = sr.NewMessageWithHeader(header) + delayMilliseconds int64 + header = o.getHeaders(delivery) + msg = sr.NewMessageWithHeader(header) ct = time.Now() ctx = msg.Ctx @@ -105,8 +106,16 @@ func (o *consumer) OnDelivery(conn *amqp.Connection, delivery amqp.Delivery) { return } + // Delay by header. + if v, ok := delivery.Headers[headerDelaySeconds]; ok { + delete(delivery.Headers, headerDelaySeconds) + if vn, ve := strconv.ParseInt(fmt.Sprintf(`%v`, v), 10, 64); ve == nil && vn > 0 { + delayMilliseconds = vn + } + } + // Republish for delay subscription. - if delay := o.subscription.GetDelayMilliseconds(); delay > 0 { + if delay := o.subscription.GetDelayMilliseconds() + delayMilliseconds; delay > 0 { if ms := delay - msg.Delay.Milliseconds(); ms >= 10 { cts := log.NewContextForChildInfo(ctx, `[servlet=rabbitmq][%s] republish message: for="delay subscription", message=%s`, o.name, msg) dur := time.Duration(ms) * time.Millisecond -- Gitee