Metrics
0
Watch 5 Star 12 Fork 4

五十风 / YTaskGoGPL-3.0

Sign up for free
Explore and code with more than 2 million developers,Free private repositories !:)
Sign up
YTask is an asynchronous task queue for handling distributed jobs in golang (go异步任务队列) spread retract

Clone or download
Cancel
Notice: Creating folder will generate an empty file .keep, because not support in Git
Loading...
README.md

YTask

YTask is an asynchronous task queue for handling distributed jobs in golang
golang异步任务/队列 框架

  • 中文文档 (Chinese document has more detailed instructions. If you know Chinese, read Chinese document)
  • En Doc
  • Github

install

go get github.com/gojuukaze/YTask/v2

architecture diagram

architecture_diagram

todo

  • save result
  • task retry
  • support amqp
  • run multi group
  • more option in TaskCtl
  • support more type

doc

Quick Start

server demo

package main

import (
	"context"
	"github.com/gojuukaze/YTask/v2"
	"os"
	"os/signal"
	"syscall"
)
type User struct {
	Id   int
	Name string
}

func add(a int,b int)int {
    return a+b
}

func appendUser(user User, ids []int, names []string) []User {
	var r = make([]User, 0)
	r = append(r, user)
	for i := range ids {
		r = append(r, User{ids[i],names[i],})
	}
	return r
}

func main() {
	// For the server, you do not need to set up the poolSize
	// Server端无需设置poolSize,
	broker := ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 0)
	backend := ytask.Backend.NewRedisBackend("127.0.0.1", "6379", "", 0, 0)

	ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		ytask.Config.Debug(true),
		ytask.Config.StatusExpires(60*5),
		ytask.Config.ResultExpires(60*5),
	)

	ser.Add("group1", "add", add)
	ser.Add("group1", "append_user", appendUser)

	ser.Run("group1", 3)

	quit := make(chan os.Signal, 1)

	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	ser.Shutdown(context.Background())

}

client demo

package main

import (
	"fmt"
	"github.com/gojuukaze/YTask/v2"
	"github.com/gojuukaze/YTask/v2/server"
	"time"
)
type User struct {
	Id   int
	Name string
}
var client server.Client


func main() {
	// For the client, you need to set up the poolSize
	// 对于client你需要设置poolSize
	broker := ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 5)
	backend := ytask.Backend.NewRedisBackend("127.0.0.1", "6379", "", 0, 5)

	ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		ytask.Config.Debug(true),
		ytask.Config.StatusExpires(60*5),
		ytask.Config.ResultExpires(60*5),
	)

	client = ser.GetClient()

	// task add
	taskId, err := client.Send("group1", "add", 123, 44)
	_ = err
	result, err := client.GetResult(taskId, 2*time.Second, 300*time.Millisecond)
	_ = err

	if result.IsSuccess() {
		sum, err := result.GetInt64(0)
        // or
        var sum2 int
        err = result.Get(0, &sum2)
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println("add(123,44) =", int(sum))
	} else {
		fmt.Println("result failure")
	}
    // task append user
	taskId, _ = client.Send("group1", "append_user", User{1, "aa"}, []int{322, 11}, []string{"bb", "cc"})
	_ = err
	result, _ = client.GetResult(taskId, 2*time.Second, 300*time.Millisecond)
	var users []User
    result.Get(0, &users)
    fmt.Println(users)

}

other example

Also take a look at example directory

cd example/v2
go run server/main.go 

go run send/main.go

usage

server

  • init
import "github.com/gojuukaze/YTask/v2"

ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		...
)

server config

Config require default code other
Broker * ytask.Config.Broker
Backend nil ytask.Config.Backend
Debug FALSE ytask.Config.Debug
StatusExpires 1day ytask.Config.StatusExpires "task status expires in ex seconds, -1:forever"
ResultExpires 1day ytask.Config.ResultExpires "task result expires in ex seconds, -1:forever"

add worker func

// group1 : group name is also the query name
// add : worker name 
// addFunc : worker func
ser.Add("group1","add",addFunc)

run and shutdown

// group1 : run group name
// 3 : number of worker goroutine
ser.Run("group1",3)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ser.Shutdown(context.Background())

You cannot run multiple groups with the same server.

ser:=ytask.Server.NewServer(...)
ser.Run("g1",1)
// panic
ser.Run("g2",1)

This feature is already under development

client

get client

import "github.com/gojuukaze/YTask/v2"

ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		...
)

client = ser.GetClient()

send msg

// group1 : group name
// add : worker name
// 12,33 ... : func args
// return :
//   - taskId : taskId
//   - err : error
taskId,err:=client.Send("group1","add",12,33)

// set retry count
taskId,err=client.SetTaskCtl(client.RetryCount, 5).Send("group1","add",12,33)

get result

// taskId :
// 3*time.Second : timeout
// 300*time.Millisecond : sleep time
result, _ := client.GetResult(taskId, 3*time.Second, 300*time.Millisecond)

// get worker func return
if result.IsSuccess(){
    // get worker func return
    a,err:=result.GetInt64(0)
    b,err:=result.GetBool(1)
    
    // or
    var a int
    var b bool
    err:=result.Get(0, &a)
    err:=result.Get(1, &b)

    // or
    var a int
    var b bool
    err:=result.Gets(&a, &b)
}

Warning!!!
Although YTask provides the ability to get results, don't rely on transitions.
If the backend error causes the result to not be saved, YTask will not retry again. Keep retrying will cause the task to fail to start or end.
If you need task results in particular, it is recommended that you save them yourself in the task function.

retry

default retry count is 3

there are 2 way to trigger retry

  • use panic

func add(a, b int){
    panic("xx")
}
  • use TaskCtl

func add(ctl *controller.TaskCtl,a, b int){
    ctl.Retry(errors.New("xx"))
    return
}

set retry count

  • in client
client.SetTaskCtl(client.RetryCount, 5).Send("group1", "retry", 123, 44)

disable retry

  • in server
func add(ctl *controller.TaskCtl,a, b int){
    ctl.SetRetryCount(0)
    return
}
  • in client
client.SetTaskCtl(client.RetryCount, 0).Send("group1", "retry", 123, 44)

broker

redisBroker

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 6379 : port
// "" : password
// 0 : db
// 10 : connection pool size. 
//      For server, if poolSize is 0, the pool size will be set automatically.
//      For client, you need to set up the poolSize by yourself
ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 10)

custom broker

type BrokerInterface interface {
    // get task
	Next(queryName string) (message.Message, error)
    // send task
	Send(queryName string, msg message.Message) error
	// Activate connection
	Activate()
	SetPoolSize(int)
	GetPoolSize()int
}

backend

redisBackend

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 6379 : port
// "" : password
// 0 : db
// 10 : connection pool size. 
//      For server, if poolSize is 0, the pool size will be set automatically.
//      For client, you need to set up the poolSize by yourself
ytask.Backend.NewRedisBackend("127.0.0.1", "6379", "", 0, 10)

custom backend

type BackendInterface interface {
	SetResult(result message.Result, exTime int) error
	GetResult(key string) (message.Result, error)
	// Activate connection
	Activate()
	SetPoolSize(int)
	GetPoolSize() int
}

support type

Support all types what can be serialized to JSON

log

import (
"github.com/gojuukaze/YTask/v2/log"
"github.com/gojuukaze/go-watch-file")

// write to file
file,err:=watchFile.OpenWatchFile("xx.log")
if err != nil {
	panic(err)
}
log.YTaskLog.SetOutput(file)

// set level
log.YTaskLog.SetLevel(logrus.InfoLevel)

error

error type

const (
	ErrTypeEmptyQuery      = 1
	ErrTypeUnsupportedType = 2
	ErrTypeOutOfRange      = 3
	ErrTypeNilResult       = 4
	ErrTypeTimeOut         = 5
)

compare err

import 	"github.com/gojuukaze/YTask/v2/yerrors"
yerrors.IsEqual(err, yerrors.ErrTypeNilResult)

Comments ( 0 )

You need to Sign in for post a comment

Go
1
https://gitee.com/gojuukaze/YTask.git
git@gitee.com:gojuukaze/YTask.git

Help Search