代码拉取完成,页面将自动刷新
package live
import (
"context"
"net/http"
"sync"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
)
type StreamManager struct {
log log.Logger
streams map[string]*Stream
streamRWMutex *sync.RWMutex
hub *hub
}
func NewStreamManager() *StreamManager {
return &StreamManager{
hub: newHub(),
log: log.New("stream.manager"),
streams: make(map[string]*Stream),
streamRWMutex: &sync.RWMutex{},
}
}
func (sm *StreamManager) Run(context context.Context) {
log.Info("Initializing Stream Manager")
go func() {
sm.hub.run(context)
log.Info("Stopped Stream Manager")
}()
}
func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) {
sm.log.Info("Upgrading to WebSocket")
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
sm.log.Error("Failed to upgrade connection to WebSocket", "error", err)
return
}
c := newConnection(ws, sm.hub)
sm.hub.register <- c
go c.writePump()
c.readPump()
}
func (s *StreamManager) GetStreamList() m.StreamList {
list := make(m.StreamList, 0)
for _, stream := range s.streams {
list = append(list, &m.StreamInfo{
Name: stream.name,
})
}
return list
}
func (s *StreamManager) Push(packet *m.StreamPacket) {
stream, exist := s.streams[packet.Stream]
if !exist {
s.log.Info("Creating metric stream", "name", packet.Stream)
stream = NewStream(packet.Stream)
s.streams[stream.name] = stream
}
stream.Push(packet)
}
type Stream struct {
subscribers []*connection
name string
}
func NewStream(name string) *Stream {
return &Stream{
subscribers: make([]*connection, 0),
name: name,
}
}
func (s *Stream) Push(packet *m.StreamPacket) {
messageBytes, _ := simplejson.NewFromAny(packet).Encode()
for _, sub := range s.subscribers {
// check if channel is open
// if _, ok := h.connections[sub]; !ok {
// delete(s.subscribers, sub)
// continue
// }
sub.send <- messageBytes
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。