# redisqueue **Repository Path**: qp84/redisqueue ## Basic Information - **Project Name**: redisqueue - **Description**: No description available - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-02-15 - **Last Updated**: 2025-02-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # redisqueue ![Version](https://img.shields.io/badge/version-v2.1.0-green.svg) [![GoDoc](https://godoc.org/gitee.com/qp84/redisqueue?status.svg)](https://pkg.go.dev/gitee.com/qp84/redisqueue/v2?tab=doc) [![Build Status](https://travis-ci.org/robinjoseph08/redisqueue.svg?branch=master)](https://travis-ci.org/robinjoseph08/redisqueue) [![Coverage Status](https://coveralls.io/repos/github/robinjoseph08/redisqueue/badge.svg?branch=master)](https://coveralls.io/github/robinjoseph08/redisqueue?branch=master) [![Go Report Card](https://goreportcard.com/badge/gitee.com/qp84/redisqueue)](https://goreportcard.com/report/gitee.com/qp84/redisqueue) ![License](https://img.shields.io/github/license/robinjoseph08/redisqueue.svg) `redisqueue` provides a producer and consumer of a queue that uses [Redis streams](https://redis.io/topics/streams-intro). ## Features - A `Producer` struct to make enqueuing messages easy. - A `Consumer` struct to make processing messages concurrenly. - Claiming and acknowledging messages if there's no error, so that if a consumer dies while processing, the message it was working on isn't lost. This guarantees at least once delivery. - A "visibility timeout" so that if a message isn't processed in a designated time frame, it will be be processed by another consumer. - A max length on the stream so that it doesn't store the messages indefinitely and run out of memory. - Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight messages complete. - A channel that will surface any errors so you can handle them centrally. - Graceful handling of panics to avoid crashing the whole process. - A concurrency setting to control how many goroutines are spawned to process messages. - A batch size setting to limit the total messages in flight. - Support for multiple streams. ## Installation `redisqueue` requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing `redisqueue`: ```sh go mod init github.com/my/repo go get gitee.com/qp84/redisqueue/v2 ``` Import: ```go import "gitee.com/qp84/redisqueue/v2" ``` ## Example Here's an example of a producer that inserts 1000 messages into a queue: ```go package main import ( "fmt" "gitee.com/qp84/redisqueue/v2" ) func main() { p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{ StreamMaxLength: 10000, ApproximateMaxLength: true, }) if err != nil { panic(err) } for i := 0; i < 1000; i++ { err := p.Enqueue(&redisqueue.Message{ Stream: "redisqueue:test", Values: map[string]interface{}{ "index": i, }, }) if err != nil { panic(err) } if i%100 == 0 { fmt.Printf("enqueued %d\n", i) } } } ``` And here's an example of a consumer that reads the messages off of that queue: ```go package main import ( "fmt" "time" "gitee.com/qp84/redisqueue/v2" ) func main() { c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{ VisibilityTimeout: 60 * time.Second, BlockingTimeout: 5 * time.Second, ReclaimInterval: 1 * time.Second, BufferSize: 100, Concurrency: 10, }) if err != nil { panic(err) } c.Register("redisqueue:test", process) go func() { for err := range c.Errors { // handle errors accordingly fmt.Printf("err: %+v\n", err) } }() fmt.Println("starting") c.Run() fmt.Println("stopped") } func process(msg *redisqueue.Message) error { fmt.Printf("processing message: %v\n", msg.Values["index"]) return nil } ```