代码拉取完成,页面将自动刷新
package rtsp
import (
"bufio"
"context"
"errors"
"io"
"log"
"net"
"sync"
"time"
"gitee.com/raininfall/router"
)
// ServerContext of RTSP server connection
type ServerContext interface {
router.Context
context.Context
SetExtra(interface{}, interface{})
GetExtra(interface{}) (interface{}, bool)
Request() *Request
Stop() error
SendResponse(*Response) error
SendResponseError(error) error
SendBytes(net.Buffers) error
TrySendBytes(net.Buffers) error
ContextServer() ServerContext
// To Internal loop
Do(func()) error
// Quick response
NotFound()
InternalServerError()
BadRequest([]byte)
BadGateway([]byte)
}
type serverContext struct {
router.Context
ctx context.Context
cancel context.CancelFunc
sendQueue chan net.Buffers
writer io.WriteCloser
request *Request
extra map[interface{}]interface{}
loopChannel chan func()
}
func (c *serverContext) ContextServer() ServerContext {
return c
}
func (c *serverContext) SetExtra(key interface{}, val interface{}) {
c.extra[key] = val
}
func (c *serverContext) GetExtra(key interface{}) (interface{}, bool) {
val, ok := c.extra[key]
return val, ok
}
func (c *serverContext) Deadline() (deadline time.Time, ok bool) {
return c.ctx.Deadline()
}
func (c *serverContext) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *serverContext) Err() error {
return c.ctx.Err()
}
func (c *serverContext) Value(key interface{}) interface{} {
return c.ctx.Value(key)
}
func (c *serverContext) Request() *Request {
return c.request
}
func (c *serverContext) Path() string {
return c.request.URL.Path
}
func (c *serverContext) Method() string {
return c.request.Method
}
func (c *serverContext) Stop() error {
c.cancel()
return nil
}
func (c *serverContext) Do(do func()) error {
select {
case <-c.ctx.Done():
return errors.New("Already stop")
case c.loopChannel <- do:
return nil
}
}
func (c *serverContext) NotFound() {
response := NewResponseOf(c.request)
response.StatusCode = StatusNotFound
c.SendResponse(response)
}
func (c *serverContext) InternalServerError() {
response := NewResponseOf(c.request)
response.StatusCode = StatusInternalServerError
c.SendResponse(response)
}
func (c *serverContext) BadRequest(body []byte) {
response := NewResponseOf(c.request)
response.StatusCode = StatusBadRequest
response.Body = body
c.SendResponse(response)
}
func (c *serverContext) BadGateway(body []byte) {
response := NewResponseOf(c.request)
response.StatusCode = StatusBadGateway
response.Body = body
c.SendResponse(response)
}
func (c *serverContext) SendResponse(response *Response) error {
return c.SendBytes([][]byte{response.Bytes()})
}
func (c *serverContext) SendResponseError(err error) error {
response := NewResponseOf(c.request)
if se, ok := err.(*StatusError); ok {
response.StatusCode = StatusBadRequest
response.Body = []byte(se.body)
return c.SendResponse(response)
}
response.StatusCode = StatusInternalServerError
response.Body = []byte(err.Error())
return c.SendResponse(response)
}
func (c *serverContext) SendBytes(bufs net.Buffers) error {
select {
case <-c.ctx.Done():
return errors.New("RTSP server session stopped")
case c.sendQueue <- bufs:
return nil
}
}
func (c *serverContext) TrySendBytes(bufs net.Buffers) error {
select {
case <-c.ctx.Done():
return errors.New("RTSP server session stopped")
case c.sendQueue <- bufs:
return nil
default:
return ErrorQueueFull
}
}
func (c *serverContext) loop() {
done := c.ctx.Done()
for {
select {
case <-done:
return
case do := <-c.loopChannel:
do()
}
}
}
func (c *serverContext) sendLoop() {
defer c.Stop()
defer c.writer.Close()
ok := true
for ok {
select {
case _, ok = <-c.ctx.Done():
case bufs := <-c.sendQueue:
if _, err := bufs.WriteTo(c.writer); err != nil {
log.Println(err)
return
}
}
}
for {
select {
case bufs := <-c.sendQueue:
if _, err := bufs.WriteTo(c.writer); err != nil {
log.Println(err)
return
}
default:
return
}
}
}
// Server of RTSP
type Server struct {
context.Context
*router.Router
cancel context.CancelFunc
wait sync.WaitGroup
Options *ServerOptions
listener *net.TCPListener
}
// ServerOptions of RTSP
type ServerOptions struct {
WorkerNumber int
}
// DefaultServerOptions of RTSP
func DefaultServerOptions() *ServerOptions {
return &ServerOptions{
WorkerNumber: 1024,
}
}
// NewServer of RTSP
func NewServer(options *ServerOptions) *Server {
ctx, cancel := context.WithCancel(context.Background())
s := &Server{
Context: ctx,
cancel: cancel,
Router: router.New(),
Options: options,
}
s.Router.Use(func(next router.HandlerFunc) router.HandlerFunc {
return func(c router.Context) {
cc := c.ContextInput().(*serverContext)
cc.Context = c
next(cc.ContextServer())
}
})
return s
}
// Start the server
func (s *Server) Start(addr string) error {
address, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return err
}
s.listener, err = net.ListenTCP("tcp", address)
if err != nil {
return err
}
for i := 0; i < s.Options.WorkerNumber; i++ {
go s.work()
}
s.wait.Wait()
return nil
}
func (s *Server) handleConnection(reader *bufio.Reader, writer io.WriteCloser) {
ctx := &serverContext{
sendQueue: make(chan net.Buffers, 512),
writer: writer,
extra: make(map[interface{}]interface{}),
loopChannel: make(chan func(), 16),
}
ctx.ctx, ctx.cancel = context.WithCancel(s)
defer ctx.Stop()
go ctx.sendLoop()
go ctx.loop()
for {
request, _, err := ReadRequestOrInterleaved(reader)
if err != nil {
log.Println(err)
break
}
if request == nil {
continue
}
ctx.Do(func() {
ctx.request = request
if !s.Router.Serve(ctx) {
ctx.NotFound()
return
}
})
}
}
func (s *Server) work() {
s.wait.Add(1)
defer s.wait.Done()
reader := bufio.NewReaderSize(nil, 256*1024)
for {
conn, err := s.listener.AcceptTCP()
if nil != err {
log.Println(err)
return
}
reader.Reset(conn)
s.handleConnection(reader, conn)
}
}
// Stop the server
func (s *Server) Stop() error {
s.cancel()
if s.listener != nil {
s.listener.Close()
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。