4 Star 6 Fork 2

Gitee 极速下载 / taskq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/vmihailenco/taskq
克隆/下载
queue.go 11.96 KB
一键复制 编辑 原始数据 按行查看 历史
Vladimir Mihailenco 提交于 2018-08-27 15:45 . Change ReserveN
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
package azsqs
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/internal"
"github.com/go-msgqueue/msgqueue/internal/msgutil"
"github.com/go-msgqueue/msgqueue/memqueue"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
const delayUntilAttr = "MsgqueueDelayUntil"
type manager struct {
sqs *sqs.SQS
accountId string
}
var _ msgqueue.Manager = (*manager)(nil)
func (m *manager) NewQueue(opt *msgqueue.Options) msgqueue.Queue {
return NewQueue(m.sqs, m.accountId, opt)
}
func (manager) Queues() []msgqueue.Queue {
var queues []msgqueue.Queue
for _, q := range Queues() {
queues = append(queues, q)
}
return queues
}
func NewManager(sqs *sqs.SQS, accountId string) msgqueue.Manager {
return &manager{
sqs: sqs,
accountId: accountId,
}
}
type Queue struct {
sqs *sqs.SQS
accountId string
opt *msgqueue.Options
addQueue *memqueue.Queue
addBatcher *msgqueue.Batcher
delQueue *memqueue.Queue
delBatcher *msgqueue.Batcher
mu sync.RWMutex
_queueURL string
p *msgqueue.Processor
}
var _ msgqueue.Queue = (*Queue)(nil)
func NewQueue(sqs *sqs.SQS, accountId string, opt *msgqueue.Options) *Queue {
opt.Init()
q := Queue{
sqs: sqs,
accountId: accountId,
opt: opt,
}
q.initAddQueue()
q.initDelQueue()
registerQueue(&q)
return &q
}
func (q *Queue) initAddQueue() {
opt := &msgqueue.Options{
Name: "azsqs:" + q.opt.Name + ":add",
GroupName: q.opt.GroupName,
BufferSize: 1000,
RetryLimit: 3,
MinBackoff: time.Second,
Handler: msgqueue.HandlerFunc(q.addBatcherAdd),
Redis: q.opt.Redis,
}
if q.opt.Handler != nil {
h := msgqueue.NewHandler(q.opt.Handler, q.opt.Compress)
opt.FallbackHandler = msgutil.UnwrapMessageHandler(h)
}
q.addQueue = memqueue.NewQueue(opt)
q.addBatcher = msgqueue.NewBatcher(q.addQueue.Processor(), &msgqueue.BatcherOptions{
Handler: q.addBatch,
ShouldBatch: q.shouldBatchAdd,
})
}
func (q *Queue) initDelQueue() {
q.delQueue = memqueue.NewQueue(&msgqueue.Options{
Name: "azsqs:" + q.opt.Name + ":delete",
GroupName: q.opt.GroupName,
BufferSize: 1000,
RetryLimit: 3,
MinBackoff: time.Second,
Handler: msgqueue.HandlerFunc(q.delBatcherAdd),
Redis: q.opt.Redis,
})
q.delBatcher = msgqueue.NewBatcher(q.delQueue.Processor(), &msgqueue.BatcherOptions{
Handler: q.deleteBatch,
ShouldBatch: q.shouldBatchDelete,
})
}
func (q *Queue) Name() string {
return q.opt.Name
}
func (q *Queue) String() string {
return fmt.Sprintf("Queue<Name=%s>", q.Name())
}
func (q *Queue) Options() *msgqueue.Options {
return q.opt
}
func (q *Queue) GetAddQueue() *memqueue.Queue {
return q.addQueue
}
func (q *Queue) GetDeleteQueue() *memqueue.Queue {
return q.delQueue
}
func (q *Queue) Len() (int, error) {
params := &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(q.queueURL()),
AttributeNames: []*string{aws.String("ApproximateNumberOfMessages")},
}
resp, err := q.sqs.GetQueueAttributes(params)
if err != nil {
return 0, err
}
prop := resp.Attributes["ApproximateNumberOfMessages"]
return strconv.Atoi(*prop)
}
func (q *Queue) Processor() *msgqueue.Processor {
if q.p == nil {
q.p = msgqueue.NewProcessor(q, q.opt)
}
return q.p
}
// Add adds message to the queue.
func (q *Queue) Add(msg *msgqueue.Message) error {
msg = msgutil.WrapMessage(msg)
return q.addQueue.Add(msg)
}
// Call creates a message using the args and adds it to the queue.
func (q *Queue) Call(args ...interface{}) error {
msg := msgqueue.NewMessage(args...)
return q.Add(msg)
}
// CallOnce works like Call, but it returns ErrDuplicate if message
// with such args was already added in a period.
func (q *Queue) CallOnce(period time.Duration, args ...interface{}) error {
msg := msgqueue.NewMessage(args...)
msg.SetDelayName(period, args...)
return q.Add(msg)
}
func (q *Queue) queueURL() string {
q.mu.RLock()
queueURL := q._queueURL
q.mu.RUnlock()
if queueURL != "" {
return queueURL
}
q.mu.Lock()
_, _ = q.createQueue()
queueURL, err := q.getQueueURL()
if err == nil {
q._queueURL = queueURL
}
q.mu.Unlock()
return queueURL
}
func (q *Queue) createQueue() (string, error) {
visTimeout := strconv.Itoa(int(q.opt.ReservationTimeout / time.Second))
in := &sqs.CreateQueueInput{
QueueName: aws.String(q.Name()),
Attributes: map[string]*string{
"VisibilityTimeout": &visTimeout,
},
}
out, err := q.sqs.CreateQueue(in)
if err != nil {
return "", err
}
return *out.QueueUrl, nil
}
func (q *Queue) getQueueURL() (string, error) {
in := &sqs.GetQueueUrlInput{
QueueName: aws.String(q.Name()),
QueueOwnerAWSAccountId: &q.accountId,
}
out, err := q.sqs.GetQueueUrl(in)
if err != nil {
return "", err
}
return *out.QueueUrl, nil
}
func (q *Queue) ReserveN(n int, reservationTimeout time.Duration, waitTimeout time.Duration) ([]*msgqueue.Message, error) {
if n > 10 {
n = 10
}
in := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(q.queueURL()),
MaxNumberOfMessages: aws.Int64(int64(n)),
WaitTimeSeconds: aws.Int64(int64(waitTimeout / time.Second)),
AttributeNames: []*string{aws.String("ApproximateReceiveCount")},
MessageAttributeNames: []*string{aws.String(delayUntilAttr)},
}
out, err := q.sqs.ReceiveMessage(in)
if err != nil {
return nil, err
}
msgs := make([]*msgqueue.Message, len(out.Messages))
for i, sqsMsg := range out.Messages {
var reservedCount int
if v, ok := sqsMsg.Attributes["ApproximateReceiveCount"]; ok {
var err error
reservedCount, err = strconv.Atoi(*v)
if err != nil {
return nil, err
}
}
var delay time.Duration
if v, ok := sqsMsg.MessageAttributes[delayUntilAttr]; ok {
until, err := time.Parse(time.RFC3339, *v.StringValue)
if err != nil {
return nil, err
}
delay = until.Sub(time.Now())
if delay < 0 {
delay = 0
}
}
if *sqsMsg.Body == "_" {
*sqsMsg.Body = ""
}
msgs[i] = &msgqueue.Message{
Body: *sqsMsg.Body,
Delay: delay,
ReservationId: *sqsMsg.ReceiptHandle,
ReservedCount: reservedCount,
}
}
return msgs, nil
}
func (q *Queue) Release(msg *msgqueue.Message) error {
in := &sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(q.queueURL()),
ReceiptHandle: &msg.ReservationId,
VisibilityTimeout: aws.Int64(int64(msg.Delay / time.Second)),
}
var err error
for i := 0; i < 3; i++ {
_, err = q.sqs.ChangeMessageVisibility(in)
if err == nil {
return nil
}
if i > 0 &&
strings.Contains(err.Error(), "Message does not exist") {
return nil
}
if !strings.Contains(err.Error(), "Please try again") {
break
}
}
return err
}
// Delete deletes the message from the queue.
func (q *Queue) Delete(msg *msgqueue.Message) error {
return q.delQueue.Add(msgutil.WrapMessage(msg))
}
// Purge deletes all messages from the queue using SQS API.
func (q *Queue) Purge() error {
in := &sqs.PurgeQueueInput{
QueueUrl: aws.String(q.queueURL()),
}
_, err := q.sqs.PurgeQueue(in)
return err
}
// Close is CloseTimeout with 30 seconds timeout.
func (q *Queue) Close() error {
return q.CloseTimeout(30 * time.Second)
}
// Close closes the queue waiting for pending messages to be processed.
func (q *Queue) CloseTimeout(timeout time.Duration) error {
var firstErr error
if q.p != nil {
err := q.p.StopTimeout(timeout)
if err != nil && firstErr == nil {
firstErr = err
}
}
err := q.addBatcher.Close()
if err != nil && firstErr == nil {
firstErr = err
}
err = q.addQueue.CloseTimeout(timeout)
if err != nil && firstErr == nil {
firstErr = err
}
err = q.delBatcher.Close()
if err != nil && firstErr == nil {
firstErr = err
}
err = q.delQueue.CloseTimeout(timeout)
if err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}
func (q *Queue) addBatcherAdd(msg *msgqueue.Message) error {
return q.addBatcher.Add(msg)
}
func (q *Queue) addBatch(msgs []*msgqueue.Message) error {
const maxDelay = 15 * time.Minute
if len(msgs) == 0 {
return errors.New("azsqs: no messages to add")
}
in := &sqs.SendMessageBatchInput{
QueueUrl: aws.String(q.queueURL()),
}
for i, msg := range msgs {
msg, err := msgutil.UnwrapMessage(msg)
if err != nil {
return err
}
body, err := msg.EncodeBody(q.opt.Compress)
if err != nil {
internal.Logf("azsqs: EncodeBody failed: %s", err)
continue
}
if body == "" {
body = "_" // SQS requires body.
}
entry := &sqs.SendMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(i)),
MessageBody: aws.String(body),
}
if msg.Delay <= maxDelay {
entry.DelaySeconds = aws.Int64(int64(msg.Delay / time.Second))
} else {
entry.DelaySeconds = aws.Int64(int64(maxDelay / time.Second))
delayUntil := time.Now().Add(msg.Delay - maxDelay)
entry.MessageAttributes = map[string]*sqs.MessageAttributeValue{
delayUntilAttr: &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(delayUntil.Format(time.RFC3339)),
},
}
}
in.Entries = append(in.Entries, entry)
}
out, err := q.sqs.SendMessageBatch(in)
if err != nil {
internal.Logf("azsqs: SendMessageBatch msgs=%d size=%d failed: %s",
len(msgs), q.batchSize(msgs), err)
return err
}
for _, entry := range out.Failed {
if entry.SenderFault != nil && *entry.SenderFault {
internal.Logf(
"azsqs: SendMessageBatch failed with code=%s message=%q",
tos(entry.Code), tos(entry.Message))
continue
}
msg := findMessageById(msgs, tos(entry.Id))
if msg != nil {
msg.Err = fmt.Errorf("%s: %s", tos(entry.Code), tos(entry.Message))
} else {
internal.Logf("azsqs: can't find message with id=%s", tos(entry.Id))
}
}
return nil
}
func (q *Queue) shouldBatchAdd(batch []*msgqueue.Message, msg *msgqueue.Message) bool {
batch = append(batch, msg)
const sizeLimit = 250 * 1024
if q.batchSize(batch) > sizeLimit {
return false
}
const messagesLimit = 10
return len(batch) < messagesLimit
}
func (q *Queue) batchSize(batch []*msgqueue.Message) int {
var size int
for _, msg := range batch {
msg, err := msgutil.UnwrapMessage(msg)
if err != nil {
internal.Logf("azsqs: UnwrapMessage failed: %s", err)
continue
}
body, err := msg.EncodeBody(q.opt.Compress)
if err != nil {
internal.Logf("azsqs: Message.EncodeBody failed: %s", err)
continue
}
size += len(body)
}
return size
}
func (q *Queue) delBatcherAdd(msg *msgqueue.Message) error {
return q.delBatcher.Add(msg)
}
func (q *Queue) deleteBatch(msgs []*msgqueue.Message) error {
if len(msgs) == 0 {
return errors.New("azsqs: no messages to delete")
}
entries := make([]*sqs.DeleteMessageBatchRequestEntry, len(msgs))
for i, msg := range msgs {
msg, err := msgutil.UnwrapMessage(msg)
if err != nil {
return err
}
entries[i] = &sqs.DeleteMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(i)),
ReceiptHandle: &msg.ReservationId,
}
}
in := &sqs.DeleteMessageBatchInput{
QueueUrl: aws.String(q.queueURL()),
Entries: entries,
}
out, err := q.sqs.DeleteMessageBatch(in)
if err != nil {
internal.Logf("azsqs: DeleteMessageBatch failed: %s", err)
return err
}
for _, entry := range out.Failed {
if entry.SenderFault != nil && *entry.SenderFault {
internal.Logf(
"azsqs: DeleteMessageBatch failed with code=%s message=%q",
tos(entry.Code), tos(entry.Message),
)
continue
}
msg := findMessageById(msgs, tos(entry.Id))
if msg != nil {
msg.Err = fmt.Errorf("%s: %s", tos(entry.Code), tos(entry.Message))
} else {
internal.Logf("azsqs: can't find message with id=%s", tos(entry.Id))
}
}
return nil
}
func (q *Queue) shouldBatchDelete(batch []*msgqueue.Message, msg *msgqueue.Message) bool {
const messagesLimit = 10
return len(batch)+1 < messagesLimit
}
func findMessageById(msgs []*msgqueue.Message, id string) *msgqueue.Message {
i, err := strconv.Atoi(id)
if err != nil {
return nil
}
if i < len(msgs) {
return msgs[i]
}
return nil
}
func tos(s *string) string {
if s == nil {
return "<nil>"
}
return *s
}
1
https://gitee.com/mirrors/taskq.git
git@gitee.com:mirrors/taskq.git
mirrors
taskq
taskq
v1.8.2

搜索帮助