diff --git a/src/client.go b/src/client.go index 100f77db7cbd2bb718601fb7c1531adecd593a15..3ae11706d4061b13024611f10cf41e433718c53b 100644 --- a/src/client.go +++ b/src/client.go @@ -31,6 +31,7 @@ const ( defaultMessageTtl = 604800000 // 7 days (milliseconds) headerConsumedCount = "msr-consumed-count" + headerDelaySeconds = "msr-delay-seconds" headerProducedCount = "msr-produced-count" headerProducedTime = "msr-produced-time" ) @@ -226,7 +227,7 @@ func (o *client) GeneratePublishingWithPayload(payload *sr.Payload) amqp.Publish // Expiration supported. if payload.DelaySeconds > 0 { - publishing.Expiration = fmt.Sprintf(`%d`, payload.DelaySeconds*1000) + publishing.Headers[headerDelaySeconds] = payload.DelaySeconds * 1000 } // Attach counter. diff --git a/src/consumer.go b/src/consumer.go index 860fb48ce1c6972e57e4cc1f1240c458d1c268c6..06fd236aa52bd2b13695081f5ecfb5ed6317cb1b 100644 --- a/src/consumer.go +++ b/src/consumer.go @@ -70,8 +70,9 @@ func (o *consumer) OnAcknowledge(ctx context.Context, delivery amqp.Delivery, re func (o *consumer) OnDelivery(conn *amqp.Connection, delivery amqp.Delivery) { var ( - header = o.getHeaders(delivery) - msg = sr.NewMessageWithHeader(header) + delayMilliseconds int64 + header = o.getHeaders(delivery) + msg = sr.NewMessageWithHeader(header) ct = time.Now() ctx = msg.Ctx @@ -105,8 +106,16 @@ func (o *consumer) OnDelivery(conn *amqp.Connection, delivery amqp.Delivery) { return } + // Delay by header. + if v, ok := delivery.Headers[headerDelaySeconds]; ok { + delete(delivery.Headers, headerDelaySeconds) + if vn, ve := strconv.ParseInt(fmt.Sprintf(`%v`, v), 10, 64); ve == nil && vn > 0 { + delayMilliseconds = vn + } + } + // Republish for delay subscription. - if delay := o.subscription.GetDelayMilliseconds(); delay > 0 { + if delay := o.subscription.GetDelayMilliseconds() + delayMilliseconds; delay > 0 { if ms := delay - msg.Delay.Milliseconds(); ms >= 10 { cts := log.NewContextForChildInfo(ctx, `[servlet=rabbitmq][%s] republish message: for="delay subscription", message=%s`, o.name, msg) dur := time.Duration(ms) * time.Millisecond