1 Star 1 Fork 0

raininfall/RTSP

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
server.go 6.09 KB
一键复制 编辑 原始数据 按行查看 历史
Songrq 提交于 2019-10-24 14:59 . 2019年10月24日 14:59:30
package rtsp
import (
"bufio"
"context"
"errors"
"io"
"log"
"net"
"sync"
"time"
"gitee.com/raininfall/router"
)
// ServerContext of RTSP server connection
type ServerContext interface {
router.Context
context.Context
SetExtra(interface{}, interface{})
GetExtra(interface{}) (interface{}, bool)
Request() *Request
Stop() error
SendResponse(*Response) error
SendResponseError(error) error
SendBytes(net.Buffers) error
TrySendBytes(net.Buffers) error
ContextServer() ServerContext
// To Internal loop
Do(func()) error
// Quick response
NotFound()
InternalServerError()
BadRequest([]byte)
BadGateway([]byte)
}
type serverContext struct {
router.Context
ctx context.Context
cancel context.CancelFunc
sendQueue chan net.Buffers
writer io.WriteCloser
request *Request
extra map[interface{}]interface{}
loopChannel chan func()
}
func (c *serverContext) ContextServer() ServerContext {
return c
}
func (c *serverContext) SetExtra(key interface{}, val interface{}) {
c.extra[key] = val
}
func (c *serverContext) GetExtra(key interface{}) (interface{}, bool) {
val, ok := c.extra[key]
return val, ok
}
func (c *serverContext) Deadline() (deadline time.Time, ok bool) {
return c.ctx.Deadline()
}
func (c *serverContext) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *serverContext) Err() error {
return c.ctx.Err()
}
func (c *serverContext) Value(key interface{}) interface{} {
return c.ctx.Value(key)
}
func (c *serverContext) Request() *Request {
return c.request
}
func (c *serverContext) Path() string {
return c.request.URL.Path
}
func (c *serverContext) Method() string {
return c.request.Method
}
func (c *serverContext) Stop() error {
c.cancel()
return nil
}
func (c *serverContext) Do(do func()) error {
select {
case <-c.ctx.Done():
return errors.New("Already stop")
case c.loopChannel <- do:
return nil
}
}
func (c *serverContext) NotFound() {
response := NewResponseOf(c.request)
response.StatusCode = StatusNotFound
c.SendResponse(response)
}
func (c *serverContext) InternalServerError() {
response := NewResponseOf(c.request)
response.StatusCode = StatusInternalServerError
c.SendResponse(response)
}
func (c *serverContext) BadRequest(body []byte) {
response := NewResponseOf(c.request)
response.StatusCode = StatusBadRequest
response.Body = body
c.SendResponse(response)
}
func (c *serverContext) BadGateway(body []byte) {
response := NewResponseOf(c.request)
response.StatusCode = StatusBadGateway
response.Body = body
c.SendResponse(response)
}
func (c *serverContext) SendResponse(response *Response) error {
return c.SendBytes([][]byte{response.Bytes()})
}
func (c *serverContext) SendResponseError(err error) error {
response := NewResponseOf(c.request)
if se, ok := err.(*StatusError); ok {
response.StatusCode = StatusBadRequest
response.Body = []byte(se.body)
return c.SendResponse(response)
}
response.StatusCode = StatusInternalServerError
response.Body = []byte(err.Error())
return c.SendResponse(response)
}
func (c *serverContext) SendBytes(bufs net.Buffers) error {
select {
case <-c.ctx.Done():
return errors.New("RTSP server session stopped")
case c.sendQueue <- bufs:
return nil
}
}
func (c *serverContext) TrySendBytes(bufs net.Buffers) error {
select {
case <-c.ctx.Done():
return errors.New("RTSP server session stopped")
case c.sendQueue <- bufs:
return nil
default:
return ErrorQueueFull
}
}
func (c *serverContext) loop() {
done := c.ctx.Done()
for {
select {
case <-done:
return
case do := <-c.loopChannel:
do()
}
}
}
func (c *serverContext) sendLoop() {
defer c.Stop()
defer c.writer.Close()
ok := true
for ok {
select {
case _, ok = <-c.ctx.Done():
case bufs := <-c.sendQueue:
if _, err := bufs.WriteTo(c.writer); err != nil {
log.Println(err)
return
}
}
}
for {
select {
case bufs := <-c.sendQueue:
if _, err := bufs.WriteTo(c.writer); err != nil {
log.Println(err)
return
}
default:
return
}
}
}
// Server of RTSP
type Server struct {
context.Context
*router.Router
cancel context.CancelFunc
wait sync.WaitGroup
Options *ServerOptions
listener *net.TCPListener
}
// ServerOptions of RTSP
type ServerOptions struct {
WorkerNumber int
}
// DefaultServerOptions of RTSP
func DefaultServerOptions() *ServerOptions {
return &ServerOptions{
WorkerNumber: 1024,
}
}
// NewServer of RTSP
func NewServer(options *ServerOptions) *Server {
ctx, cancel := context.WithCancel(context.Background())
s := &Server{
Context: ctx,
cancel: cancel,
Router: router.New(),
Options: options,
}
s.Router.Use(func(next router.HandlerFunc) router.HandlerFunc {
return func(c router.Context) {
cc := c.ContextInput().(*serverContext)
cc.Context = c
next(cc.ContextServer())
}
})
return s
}
// Start the server
func (s *Server) Start(addr string) error {
address, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return err
}
s.listener, err = net.ListenTCP("tcp", address)
if err != nil {
return err
}
for i := 0; i < s.Options.WorkerNumber; i++ {
go s.work()
}
s.wait.Wait()
return nil
}
func (s *Server) handleConnection(reader *bufio.Reader, writer io.WriteCloser) {
ctx := &serverContext{
sendQueue: make(chan net.Buffers, 512),
writer: writer,
extra: make(map[interface{}]interface{}),
loopChannel: make(chan func(), 16),
}
ctx.ctx, ctx.cancel = context.WithCancel(s)
defer ctx.Stop()
go ctx.sendLoop()
go ctx.loop()
for {
request, _, err := ReadRequestOrInterleaved(reader)
if err != nil {
log.Println(err)
break
}
if request == nil {
continue
}
ctx.Do(func() {
ctx.request = request
if !s.Router.Serve(ctx) {
ctx.NotFound()
return
}
})
}
}
func (s *Server) work() {
s.wait.Add(1)
defer s.wait.Done()
reader := bufio.NewReaderSize(nil, 256*1024)
for {
conn, err := s.listener.AcceptTCP()
if nil != err {
log.Println(err)
return
}
reader.Reset(conn)
s.handleConnection(reader, conn)
}
}
// Stop the server
func (s *Server) Stop() error {
s.cancel()
if s.listener != nil {
s.listener.Close()
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/raininfall/rtsp.git
git@gitee.com:raininfall/rtsp.git
raininfall
rtsp
RTSP
master

搜索帮助