40 Star 146 Fork 3

Gitee 极速下载/grafana

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/grafana/grafana
克隆/下载
stream_manager.go 2.01 KB
一键复制 编辑 原始数据 按行查看 历史
package live
import (
"context"
"net/http"
"sync"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
)
type StreamManager struct {
log log.Logger
streams map[string]*Stream
streamRWMutex *sync.RWMutex
hub *hub
}
func NewStreamManager() *StreamManager {
return &StreamManager{
hub: newHub(),
log: log.New("stream.manager"),
streams: make(map[string]*Stream),
streamRWMutex: &sync.RWMutex{},
}
}
func (sm *StreamManager) Run(context context.Context) {
log.Info("Initializing Stream Manager")
go func() {
sm.hub.run(context)
log.Info("Stopped Stream Manager")
}()
}
func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) {
sm.log.Info("Upgrading to WebSocket")
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
sm.log.Error("Failed to upgrade connection to WebSocket", "error", err)
return
}
c := newConnection(ws, sm.hub)
sm.hub.register <- c
go c.writePump()
c.readPump()
}
func (s *StreamManager) GetStreamList() m.StreamList {
list := make(m.StreamList, 0)
for _, stream := range s.streams {
list = append(list, &m.StreamInfo{
Name: stream.name,
})
}
return list
}
func (s *StreamManager) Push(packet *m.StreamPacket) {
stream, exist := s.streams[packet.Stream]
if !exist {
s.log.Info("Creating metric stream", "name", packet.Stream)
stream = NewStream(packet.Stream)
s.streams[stream.name] = stream
}
stream.Push(packet)
}
type Stream struct {
subscribers []*connection
name string
}
func NewStream(name string) *Stream {
return &Stream{
subscribers: make([]*connection, 0),
name: name,
}
}
func (s *Stream) Push(packet *m.StreamPacket) {
messageBytes, _ := simplejson.NewFromAny(packet).Encode()
for _, sub := range s.subscribers {
// check if channel is open
// if _, ok := h.connections[sub]; !ok {
// delete(s.subscribers, sub)
// continue
// }
sub.send <- messageBytes
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
JavaScript
1
https://gitee.com/mirrors/grafana.git
git@gitee.com:mirrors/grafana.git
mirrors
grafana
grafana
v5.3.2

搜索帮助

0d507c66 1850385 C8b1a773 1850385