1 Star 1 Fork 1

378077287 / exchanges

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
websocket_service.go 13.62 KB
一键复制 编辑 原始数据 按行查看 历史
package binance
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/bitly/go-simplejson"
)
var (
baseURL = "wss://stream.binance.com:9443/ws"
baseFutureURL = "wss://fstream.binance.com/ws"
combinedBaseURL = "wss://stream.binance.com:9443/stream?streams="
// WebsocketTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled
WebsocketTimeout = time.Second * 60
// WebsocketKeepalive enables sending ping/pong messages to check the connection stability
WebsocketKeepalive = false
)
// Bid define bid info with price and quantity
type Bid struct {
Price string
Quantity string
}
// Ask define ask info with price and quantity
type Ask struct {
Price string
Quantity string
}
func newJSON(data []byte) (j *simplejson.Json, err error) {
j, err = simplejson.NewJson(data)
if err != nil {
return nil, err
}
return j, nil
}
// WsPartialDepthEvent define websocket partial depth book event
type WsPartialDepthEvent struct {
Symbol string
LastUpdateID int64 `json:"lastUpdateId"`
Bids []Bid `json:"bids"`
Asks []Ask `json:"asks"`
}
// WsPartialDepthHandler handle websocket partial depth event
type WsPartialDepthHandler func(event *WsPartialDepthEvent)
// WsPartialDepthServe serve websocket partial depth handler with a symbol
func WsPartialDepthServe(symbol string, levels string, handler WsPartialDepthHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s@depth%s", baseURL, strings.ToLower(symbol), levels)
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
j, err := newJSON(message)
if err != nil {
errHandler(err)
return
}
event := new(WsPartialDepthEvent)
event.Symbol = symbol
event.LastUpdateID = j.Get("lastUpdateId").MustInt64()
bidsLen := len(j.Get("bids").MustArray())
event.Bids = make([]Bid, bidsLen)
for i := 0; i < bidsLen; i++ {
item := j.Get("bids").GetIndex(i)
event.Bids[i] = Bid{
Price: item.GetIndex(0).MustString(),
Quantity: item.GetIndex(1).MustString(),
}
}
asksLen := len(j.Get("asks").MustArray())
event.Asks = make([]Ask, asksLen)
for i := 0; i < asksLen; i++ {
item := j.Get("asks").GetIndex(i)
event.Asks[i] = Ask{
Price: item.GetIndex(0).MustString(),
Quantity: item.GetIndex(1).MustString(),
}
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsCombinedPartialDepthServe is similar to WsPartialDepthServe, but it for multiple symbols
func WsCombinedPartialDepthServe(symbolLevels map[string]string, handler WsPartialDepthHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := combinedBaseURL
for s, l := range symbolLevels {
endpoint += fmt.Sprintf("%s@depth%s", strings.ToLower(s), l) + "/"
}
endpoint = endpoint[:len(endpoint)-1]
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
j, err := newJSON(message)
if err != nil {
errHandler(err)
return
}
event := new(WsPartialDepthEvent)
stream := j.Get("stream").MustString()
symbol := strings.Split(stream, "@")[0]
event.Symbol = strings.ToUpper(symbol)
data := j.Get("data").MustMap()
event.LastUpdateID, _ = data["lastUpdateId"].(json.Number).Int64()
bidsLen := len(data["bids"].([]interface{}))
event.Bids = make([]Bid, bidsLen)
for i := 0; i < bidsLen; i++ {
item := data["bids"].([]interface{})[i].([]interface{})
event.Bids[i] = Bid{
Price: item[0].(string),
Quantity: item[1].(string),
}
}
asksLen := len(data["asks"].([]interface{}))
event.Asks = make([]Ask, asksLen)
for i := 0; i < asksLen; i++ {
item := data["asks"].([]interface{})[i].([]interface{})
event.Asks[i] = Ask{
Price: item[0].(string),
Quantity: item[1].(string),
}
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsDepthHandler handle websocket depth event
type WsDepthHandler func(event *WsDepthEvent)
// WsDepthServe serve websocket depth handler with a symbol
func WsDepthServe(symbol string, handler WsDepthHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s@depth20@100ms", baseURL, strings.ToLower(symbol))
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
j, err := newJSON(message)
if err != nil {
errHandler(err)
return
}
event := new(WsDepthEvent)
event.Event = j.Get("e").MustString()
event.Time = j.Get("E").MustInt64()
event.Symbol = j.Get("s").MustString()
event.UpdateID = j.Get("u").MustInt64()
event.FirstUpdateID = j.Get("U").MustInt64()
bidsLen := len(j.Get("b").MustArray())
event.Bids = make([]Bid, bidsLen)
for i := 0; i < bidsLen; i++ {
item := j.Get("b").GetIndex(i)
event.Bids[i] = Bid{
Price: item.GetIndex(0).MustString(),
Quantity: item.GetIndex(1).MustString(),
}
}
asksLen := len(j.Get("a").MustArray())
event.Asks = make([]Ask, asksLen)
for i := 0; i < asksLen; i++ {
item := j.Get("a").GetIndex(i)
event.Asks[i] = Ask{
Price: item.GetIndex(0).MustString(),
Quantity: item.GetIndex(1).MustString(),
}
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsDepthEvent define websocket depth event
type WsDepthEvent struct {
Event string `json:"e"`
Time int64 `json:"E"`
Symbol string `json:"s"`
UpdateID int64 `json:"u"`
FirstUpdateID int64 `json:"U"`
Bids []Bid `json:"b"`
Asks []Ask `json:"a"`
}
// WsKlineHandler handle websocket kline event
type WsKlineHandler func(event *WsKlineEvent)
// WsKlineServe serve websocket kline handler with a symbol and interval like 15m, 30s
func WsKlineServe(symbol string, interval string, handler WsKlineHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s@kline_%s", baseURL, strings.ToLower(symbol), interval)
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
event := new(WsKlineEvent)
err := json.Unmarshal(message, event)
if err != nil {
errHandler(err)
return
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsKlineEvent define websocket kline event
type WsKlineEvent struct {
Event string `json:"e"`
Time int64 `json:"E"`
Symbol string `json:"s"`
Kline WsKline `json:"k"`
}
// WsKline define websocket kline
type WsKline struct {
StartTime int64 `json:"t"`
EndTime int64 `json:"T"`
Symbol string `json:"s"`
Interval string `json:"i"`
FirstTradeID int64 `json:"f"`
LastTradeID int64 `json:"L"`
Open string `json:"o"`
Close string `json:"c"`
High string `json:"h"`
Low string `json:"l"`
Volume string `json:"v"`
TradeNum int64 `json:"n"`
IsFinal bool `json:"x"`
QuoteVolume string `json:"q"`
ActiveBuyVolume string `json:"V"`
ActiveBuyQuoteVolume string `json:"Q"`
}
// WsAggTradeHandler handle websocket aggregate trade event
type WsAggTradeHandler func(event *WsAggTradeEvent)
// WsAggTradeServe serve websocket aggregate handler with a symbol
func WsAggTradeServe(symbol string, handler WsAggTradeHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s@aggTrade", baseURL, strings.ToLower(symbol))
fmt.Println(endpoint)
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
event := new(WsAggTradeEvent)
err := json.Unmarshal(message, event)
if err != nil {
errHandler(err)
return
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsAggTradeEvent define websocket aggregate trade event
type WsAggTradeEvent struct {
Event string `json:"e"`
Time int64 `json:"E"`
Symbol string `json:"s"`
AggTradeID int64 `json:"a"`
Price string `json:"p"`
Quantity string `json:"q"`
FirstBreakdownTradeID int64 `json:"f"`
LastBreakdownTradeID int64 `json:"l"`
TradeTime int64 `json:"T"`
IsBuyerMaker bool `json:"m"`
Placeholder bool `json:"M"` // add this field to avoid case insensitive unmarshaling
}
// WsTradeHandler handle websocket trade event
type WsTradeHandler func(event *WsTradeEvent)
// WsTradeServe serve websocket handler with a symbol
func WsTradeServe(symbol string, handler WsTradeHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s@trade", baseURL, strings.ToLower(symbol))
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
event := new(WsTradeEvent)
err := json.Unmarshal(message, event)
if err != nil {
errHandler(err)
return
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsTradeEvent define websocket trade event
type WsTradeEvent struct {
Event string `json:"e"`
Time int64 `json:"E"`
Symbol string `json:"s"`
TradeID int64 `json:"t"`
Price string `json:"p"`
Quantity string `json:"q"`
BuyerOrderID int64 `json:"b"`
SellerOrderID int64 `json:"a"`
TradeTime int64 `json:"T"`
IsBuyerMaker bool `json:"m"`
Placeholder bool `json:"M"` // add this field to avoid case insensitive unmarshaling
}
// WsUserDataServe serve user data handler with listen key
func WsUserDataServe(listenKey string, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s", baseURL, listenKey)
cfg := newWsConfig(endpoint)
return wsServe(cfg, handler, errHandler)
}
// WsFutureUserDataServe serve user data handler with listen key
func WsFutureUserDataServe(listenKey string, handler WsHandler, errHandler ErrHandler, wsConfig ...*WsConfig) (doneC, stopC chan struct{}, err error) {
if len(wsConfig) > 0 {
baseFutureURL = wsConfig[0].Endpoint
}
endpoint := fmt.Sprintf("%s/%s", baseFutureURL, listenKey)
cfg := newWsConfig(endpoint)
return wsServe(cfg, handler, errHandler)
}
// WsMarketStatHandler handle websocket that push single market statistics for 24hr
type WsMarketStatHandler func(event *WsMarketStatEvent)
// WsMarketStatServe serve websocket that push 24hr statistics for single market every second
func WsMarketStatServe(symbol string, handler WsMarketStatHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/%s@ticker", baseURL, strings.ToLower(symbol))
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
var event WsMarketStatEvent
err := json.Unmarshal(message, &event)
if err != nil {
errHandler(err)
return
}
handler(&event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsAllMarketsStatHandler handle websocket that push all markets statistics for 24hr
type WsAllMarketsStatHandler func(event WsAllMarketsStatEvent)
// WsAllMarketsStatServe serve websocket that push 24hr statistics for all market every second
func WsAllMarketsStatServe(handler WsAllMarketsStatHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/!ticker@arr", baseURL)
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
var event WsAllMarketsStatEvent
err := json.Unmarshal(message, &event)
if err != nil {
errHandler(err)
return
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsAllMarketsStatEvent define array of websocket market statistics events
type WsAllMarketsStatEvent []*WsMarketStatEvent
// WsMarketStatEvent define websocket market statistics event
type WsMarketStatEvent struct {
Event string `json:"e"`
Time int64 `json:"E"`
Symbol string `json:"s"`
PriceChange string `json:"p"`
PriceChangePercent string `json:"P"`
WeightedAvgPrice string `json:"w"`
PrevClosePrice string `json:"x"`
LastPrice string `json:"c"`
CloseQty string `json:"Q"`
BidPrice string `json:"b"`
BidQty string `json:"B"`
AskPrice string `json:"a"`
AskQty string `json:"A"`
OpenPrice string `json:"o"`
HighPrice string `json:"h"`
LowPrice string `json:"l"`
BaseVolume string `json:"v"`
QuoteVolume string `json:"q"`
OpenTime int64 `json:"O"`
CloseTime int64 `json:"C"`
FirstID int64 `json:"F"`
LastID int64 `json:"L"`
Count int64 `json:"n"`
}
// WsAllMiniMarketsStatServeHandler handle websocket that push all mini-ticker market statistics for 24hr
type WsAllMiniMarketsStatServeHandler func(event WsAllMiniMarketsStatEvent)
// WsAllMiniMarketsStatServe serve websocket that push mini version of 24hr statistics for all market every second
func WsAllMiniMarketsStatServe(handler WsAllMiniMarketsStatServeHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
endpoint := fmt.Sprintf("%s/!miniTicker@arr", baseURL)
cfg := newWsConfig(endpoint)
wsHandler := func(message []byte) {
var event WsAllMiniMarketsStatEvent
err := json.Unmarshal(message, &event)
if err != nil {
errHandler(err)
return
}
handler(event)
}
return wsServe(cfg, wsHandler, errHandler)
}
// WsAllMiniMarketsStatEvent define array of websocket market mini-ticker statistics events
type WsAllMiniMarketsStatEvent []*WsMiniMarketsStatEvent
// WsMiniMarketsStatEvent define websocket market mini-ticker statistics event
type WsMiniMarketsStatEvent struct {
Event string `json:"e"`
Time int64 `json:"E"`
Symbol string `json:"s"`
LastPrice string `json:"c"`
OpenPrice string `json:"o"`
HighPrice string `json:"h"`
LowPrice string `json:"l"`
BaseVolume string `json:"v"`
QuoteVolume string `json:"q"`
}
Go
1
https://gitee.com/378077287/exchanges.git
git@gitee.com:378077287/exchanges.git
378077287
exchanges
exchanges
v0.0.8

搜索帮助