1 Star 0 Fork 0

我还是我 / gortsplib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
client.go 43.14 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906
/*
Package gortsplib is a RTSP 1.0 library for the Go programming language.
Examples are available at https://github.com/bluenviron/gortsplib/tree/main/examples
*/
package gortsplib
import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/auth"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/bytecounter"
"github.com/bluenviron/gortsplib/v4/pkg/conn"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
)
// convert an URL into an address, in particular:
// * add default port
// * handle IPv6 with or without square brackets.
// Adapted from net/http:
// https://cs.opensource.google/go/go/+/refs/tags/go1.20.5:src/net/http/transport.go;l=2747
func canonicalAddr(u *base.URL) string {
addr := u.Hostname()
port := u.Port()
if port == "" {
if u.Scheme == "rtsp" {
port = "554"
} else { // rtsps
port = "322"
}
}
return net.JoinHostPort(addr, port)
}
func isAnyPort(p int) bool {
return p == 0 || p == 1
}
func findBaseURL(sd *sdp.SessionDescription, res *base.Response, u *base.URL) (*base.URL, error) {
// use global control attribute
if control, ok := sd.Attribute("control"); ok && control != "*" {
ret, err := base.ParseURL(control)
if err != nil {
return nil, fmt.Errorf("invalid control attribute: '%v'", control)
}
// add credentials
ret.User = u.User
return ret, nil
}
// use Content-Base
if cb, ok := res.Header["Content-Base"]; ok {
if len(cb) != 1 {
return nil, fmt.Errorf("invalid Content-Base: '%v'", cb)
}
ret, err := base.ParseURL(cb[0])
if err != nil {
return nil, fmt.Errorf("invalid Content-Base: '%v'", cb)
}
// add credentials
ret.User = u.User
return ret, nil
}
// use URL of request
return u, nil
}
func prepareForAnnounce(desc *description.Session) {
for i, media := range desc.Medias {
media.Control = "trackID=" + strconv.FormatInt(int64(i), 10)
}
}
func supportsGetParameter(header base.Header) bool {
pub, ok := header["Public"]
if !ok || len(pub) != 1 {
return false
}
for _, m := range strings.Split(pub[0], ",") {
if base.Method(strings.Trim(m, " ")) == base.GetParameter {
return true
}
}
return false
}
type clientState int
const (
clientStateInitial clientState = iota
clientStatePrePlay
clientStatePlay
clientStatePreRecord
clientStateRecord
)
func (s clientState) String() string {
switch s {
case clientStateInitial:
return "initial"
case clientStatePrePlay:
return "prePlay"
case clientStatePlay:
return "play"
case clientStatePreRecord:
return "preRecord"
case clientStateRecord:
return "record"
}
return "unknown"
}
type optionsReq struct {
url *base.URL
res chan clientRes
}
type describeReq struct {
url *base.URL
res chan clientRes
}
type announceReq struct {
url *base.URL
desc *description.Session
res chan clientRes
}
type setupReq struct {
baseURL *base.URL
media *description.Media
rtpPort int
rtcpPort int
res chan clientRes
}
type playReq struct {
ra *headers.Range
res chan clientRes
}
type recordReq struct {
res chan clientRes
}
type pauseReq struct {
res chan clientRes
}
type clientRes struct {
sd *description.Session // describe only
res *base.Response
err error
}
// ClientOnRequestFunc is the prototype of Client.OnRequest.
type ClientOnRequestFunc func(*base.Request)
// ClientOnResponseFunc is the prototype of Client.OnResponse.
type ClientOnResponseFunc func(*base.Response)
// ClientOnTransportSwitchFunc is the prototype of Client.OnTransportSwitch.
type ClientOnTransportSwitchFunc func(err error)
// ClientOnPacketLostFunc is the prototype of Client.OnPacketLost.
type ClientOnPacketLostFunc func(err error)
// ClientOnDecodeErrorFunc is the prototype of Client.OnDecodeError.
type ClientOnDecodeErrorFunc func(err error)
// OnPacketRTPFunc is the prototype of the callback passed to OnPacketRTP().
type OnPacketRTPFunc func(*rtp.Packet)
// OnPacketRTPAnyFunc is the prototype of the callback passed to OnPacketRTP(Any).
type OnPacketRTPAnyFunc func(*description.Media, format.Format, *rtp.Packet)
// OnPacketRTCPFunc is the prototype of the callback passed to OnPacketRTCP().
type OnPacketRTCPFunc func(rtcp.Packet)
// OnPacketRTCPAnyFunc is the prototype of the callback passed to OnPacketRTCPAny().
type OnPacketRTCPAnyFunc func(*description.Media, rtcp.Packet)
// Client is a RTSP client.
type Client struct {
//
// RTSP parameters (all optional)
//
// timeout of read operations.
// It defaults to 10 seconds.
ReadTimeout time.Duration
// timeout of write operations.
// It defaults to 10 seconds.
WriteTimeout time.Duration
// a TLS configuration to connect to TLS (RTSPS) servers.
// It defaults to nil.
TLSConfig *tls.Config
// enable communication with servers which don't provide UDP server ports
// or use different server ports than the announced ones.
// This can be a security issue.
// It defaults to false.
AnyPortEnable bool
// transport protocol (UDP, Multicast or TCP).
// If nil, it is chosen automatically (first UDP, then, if it fails, TCP).
// It defaults to nil.
Transport *Transport
// If the client is reading with UDP, it must receive
// at least a packet within this timeout, otherwise it switches to TCP.
// It defaults to 3 seconds.
InitialUDPReadTimeout time.Duration
// Size of the queue of outgoing packets.
// It defaults to 256.
WriteQueueSize int
// maximum size of outgoing RTP / RTCP packets.
// This must be less than the UDP MTU (1472 bytes).
// It defaults to 1472.
MaxPacketSize int
// user agent header.
// It defaults to "gortsplib"
UserAgent string
// disable automatic RTCP sender reports.
DisableRTCPSenderReports bool
// explicitly request back channels to the server.
RequestBackChannels bool
// pointer to a variable that stores received bytes.
BytesReceived *uint64
// pointer to a variable that stores sent bytes.
BytesSent *uint64
//
// system functions (all optional)
//
// function used to initialize the TCP client.
// It defaults to (&net.Dialer{}).DialContext.
DialContext func(ctx context.Context, network, address string) (net.Conn, error)
// function used to initialize UDP listeners.
// It defaults to net.ListenPacket.
ListenPacket func(network, address string) (net.PacketConn, error)
//
// callbacks (all optional)
//
// called when sending a request to the server.
OnRequest ClientOnRequestFunc
// called when receiving a response from the server.
OnResponse ClientOnResponseFunc
// called when receiving a request from the server.
OnServerRequest ClientOnRequestFunc
// called when sending a response to the server.
OnServerResponse ClientOnResponseFunc
// called when the transport protocol changes.
OnTransportSwitch ClientOnTransportSwitchFunc
// called when the client detects lost packets.
OnPacketLost ClientOnPacketLostFunc
// called when a non-fatal decode error occurs.
OnDecodeError ClientOnDecodeErrorFunc
//
// private
//
timeNow func() time.Time
senderReportPeriod time.Duration
receiverReportPeriod time.Duration
checkTimeoutPeriod time.Duration
connURL *base.URL
ctx context.Context
ctxCancel func()
state clientState
nconn net.Conn
conn *conn.Conn
session string
sender *auth.Sender
cseq int
optionsSent bool
useGetParameter bool
lastDescribeURL *base.URL
baseURL *base.URL
effectiveTransport *Transport
backChannelSetupped bool
stdChannelSetupped bool
medias map[*description.Media]*clientMedia
tcpCallbackByChannel map[int]readFunc
lastRange *headers.Range
checkTimeoutTimer *time.Timer
checkTimeoutInitial bool
tcpLastFrameTime *int64
keepalivePeriod time.Duration
keepaliveTimer *time.Timer
closeError error
writer asyncProcessor
reader *clientReader
timeDecoder *rtptime.GlobalDecoder
mustClose bool
// in
chOptions chan optionsReq
chDescribe chan describeReq
chAnnounce chan announceReq
chSetup chan setupReq
chPlay chan playReq
chRecord chan recordReq
chPause chan pauseReq
chReadError chan error
chReadResponse chan *base.Response
chReadRequest chan *base.Request
// out
done chan struct{}
}
// Start initializes the connection to a server.
func (c *Client) Start(scheme string, host string) error {
// RTSP parameters
if c.ReadTimeout == 0 {
c.ReadTimeout = 10 * time.Second
}
if c.WriteTimeout == 0 {
c.WriteTimeout = 10 * time.Second
}
if c.InitialUDPReadTimeout == 0 {
c.InitialUDPReadTimeout = 3 * time.Second
}
if c.WriteQueueSize == 0 {
c.WriteQueueSize = 256
} else if (c.WriteQueueSize & (c.WriteQueueSize - 1)) != 0 {
return fmt.Errorf("WriteQueueSize must be a power of two")
}
if c.MaxPacketSize == 0 {
c.MaxPacketSize = udpMaxPayloadSize
} else if c.MaxPacketSize > udpMaxPayloadSize {
return fmt.Errorf("MaxPacketSize must be less than %d", udpMaxPayloadSize)
}
if c.UserAgent == "" {
c.UserAgent = "gortsplib"
}
if c.BytesReceived == nil {
c.BytesReceived = new(uint64)
}
if c.BytesSent == nil {
c.BytesSent = new(uint64)
}
// system functions
if c.DialContext == nil {
c.DialContext = (&net.Dialer{}).DialContext
}
if c.ListenPacket == nil {
c.ListenPacket = net.ListenPacket
}
// callbacks
if c.OnRequest == nil {
c.OnRequest = func(*base.Request) {
}
}
if c.OnResponse == nil {
c.OnResponse = func(*base.Response) {
}
}
if c.OnServerRequest == nil {
c.OnServerRequest = func(*base.Request) {
}
}
if c.OnServerResponse == nil {
c.OnServerResponse = func(*base.Response) {
}
}
if c.OnTransportSwitch == nil {
c.OnTransportSwitch = func(err error) {
log.Println(err.Error())
}
}
if c.OnPacketLost == nil {
c.OnPacketLost = func(err error) {
log.Println(err.Error())
}
}
if c.OnDecodeError == nil {
c.OnDecodeError = func(err error) {
log.Println(err.Error())
}
}
// private
if c.timeNow == nil {
c.timeNow = time.Now
}
if c.senderReportPeriod == 0 {
c.senderReportPeriod = 10 * time.Second
}
if c.receiverReportPeriod == 0 {
// some cameras require a maximum of 5secs between keepalives
c.receiverReportPeriod = 5 * time.Second
}
if c.checkTimeoutPeriod == 0 {
c.checkTimeoutPeriod = 1 * time.Second
}
ctx, ctxCancel := context.WithCancel(context.Background())
c.connURL = &base.URL{
Scheme: scheme,
Host: host,
}
c.ctx = ctx
c.ctxCancel = ctxCancel
c.checkTimeoutTimer = emptyTimer()
c.keepalivePeriod = 30 * time.Second
c.keepaliveTimer = emptyTimer()
c.chOptions = make(chan optionsReq)
c.chDescribe = make(chan describeReq)
c.chAnnounce = make(chan announceReq)
c.chSetup = make(chan setupReq)
c.chPlay = make(chan playReq)
c.chRecord = make(chan recordReq)
c.chPause = make(chan pauseReq)
c.chReadError = make(chan error)
c.chReadResponse = make(chan *base.Response)
c.chReadRequest = make(chan *base.Request)
c.done = make(chan struct{})
go c.run()
return nil
}
// StartRecording connects to the address and starts publishing given media.
func (c *Client) StartRecording(address string, desc *description.Session) error {
u, err := base.ParseURL(address)
if err != nil {
return err
}
err = c.Start(u.Scheme, u.Host)
if err != nil {
return err
}
_, err = c.Announce(u, desc)
if err != nil {
c.Close()
return err
}
err = c.SetupAll(u, desc.Medias)
if err != nil {
c.Close()
return err
}
_, err = c.Record()
if err != nil {
c.Close()
return err
}
return nil
}
// Close closes all client resources and waits for them to close.
func (c *Client) Close() {
c.ctxCancel()
<-c.done
}
// Wait waits until all client resources are closed.
// This can happen when a fatal error occurs or when Close() is called.
func (c *Client) Wait() error {
<-c.done
return c.closeError
}
func (c *Client) run() {
defer close(c.done)
c.closeError = c.runInner()
c.ctxCancel()
c.doClose()
}
func (c *Client) runInner() error {
for {
select {
case req := <-c.chOptions:
res, err := c.doOptions(req.url)
req.res <- clientRes{res: res, err: err}
if c.mustClose {
return err
}
case req := <-c.chDescribe:
sd, res, err := c.doDescribe(req.url)
req.res <- clientRes{sd: sd, res: res, err: err}
if c.mustClose {
return err
}
case req := <-c.chAnnounce:
res, err := c.doAnnounce(req.url, req.desc)
req.res <- clientRes{res: res, err: err}
if c.mustClose {
return err
}
case req := <-c.chSetup:
res, err := c.doSetup(req.baseURL, req.media, req.rtpPort, req.rtcpPort)
req.res <- clientRes{res: res, err: err}
if c.mustClose {
return err
}
case req := <-c.chPlay:
res, err := c.doPlay(req.ra)
req.res <- clientRes{res: res, err: err}
if c.mustClose {
return err
}
case req := <-c.chRecord:
res, err := c.doRecord()
req.res <- clientRes{res: res, err: err}
if c.mustClose {
return err
}
case req := <-c.chPause:
res, err := c.doPause()
req.res <- clientRes{res: res, err: err}
if c.mustClose {
return err
}
case <-c.checkTimeoutTimer.C:
err := c.doCheckTimeout()
if err != nil {
return err
}
c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
case <-c.keepaliveTimer.C:
err := c.doKeepAlive()
if err != nil {
return err
}
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
case err := <-c.chReadError:
c.reader = nil
return err
case res := <-c.chReadResponse:
c.OnResponse(res)
// these are responses to keepalives, ignore them.
case req := <-c.chReadRequest:
err := c.handleServerRequest(req)
if err != nil {
return err
}
case <-c.ctx.Done():
return liberrors.ErrClientTerminated{}
}
}
}
func (c *Client) waitResponse(requestCseqStr string) (*base.Response, error) {
t := time.NewTimer(c.ReadTimeout)
defer t.Stop()
for {
select {
case <-t.C:
return nil, liberrors.ErrClientRequestTimedOut{}
case err := <-c.chReadError:
c.reader = nil
return nil, err
case res := <-c.chReadResponse:
c.OnResponse(res)
// accept response if CSeq equals request CSeq, or if CSeq is not present
if cseq, ok := res.Header["CSeq"]; !ok || len(cseq) != 1 || strings.TrimSpace(cseq[0]) == requestCseqStr {
return res, nil
}
case req := <-c.chReadRequest:
err := c.handleServerRequest(req)
if err != nil {
return nil, err
}
case <-c.ctx.Done():
return nil, liberrors.ErrClientTerminated{}
}
}
}
func (c *Client) handleServerRequest(req *base.Request) error {
c.OnServerRequest(req)
if req.Method != base.Options {
return liberrors.ErrClientUnhandledMethod{Method: req.Method}
}
h := base.Header{
"User-Agent": base.HeaderValue{c.UserAgent},
}
if cseq, ok := req.Header["CSeq"]; ok {
h["CSeq"] = cseq
}
res := &base.Response{
StatusCode: base.StatusOK,
Header: h,
}
c.OnServerResponse(res)
c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
return c.conn.WriteResponse(res)
}
func (c *Client) doClose() {
if c.state == clientStatePlay || c.state == clientStateRecord {
c.stopWriter()
c.stopReadRoutines()
}
if c.nconn != nil && c.baseURL != nil {
header := base.Header{}
if c.backChannelSetupped {
header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"}
}
c.do(&base.Request{ //nolint:errcheck
Method: base.Teardown,
URL: c.baseURL,
Header: header,
}, true)
}
if c.reader != nil {
c.nconn.Close()
c.reader.wait()
c.reader = nil
c.nconn = nil
c.conn = nil
} else if c.nconn != nil {
c.nconn.Close()
c.nconn = nil
c.conn = nil
}
for _, cm := range c.medias {
cm.close()
}
}
func (c *Client) reset() {
c.doClose()
c.state = clientStateInitial
c.session = ""
c.sender = nil
c.cseq = 0
c.optionsSent = false
c.useGetParameter = false
c.baseURL = nil
c.effectiveTransport = nil
c.backChannelSetupped = false
c.stdChannelSetupped = false
c.medias = nil
c.tcpCallbackByChannel = nil
}
func (c *Client) checkState(allowed map[clientState]struct{}) error {
if _, ok := allowed[c.state]; ok {
return nil
}
allowedList := make([]fmt.Stringer, len(allowed))
i := 0
for a := range allowed {
allowedList[i] = a
i++
}
return liberrors.ErrClientInvalidState{AllowedList: allowedList, State: c.state}
}
func (c *Client) trySwitchingProtocol() error {
c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP{})
prevConnURL := c.connURL
prevBaseURL := c.baseURL
prevMedias := c.medias
c.reset()
v := TransportTCP
c.effectiveTransport = &v
c.connURL = prevConnURL
// some Hikvision cameras require a describe before a setup
_, _, err := c.doDescribe(c.lastDescribeURL)
if err != nil {
return err
}
for i, cm := range prevMedias {
_, err := c.doSetup(prevBaseURL, cm.media, 0, 0)
if err != nil {
return err
}
c.medias[i].onPacketRTCP = cm.onPacketRTCP
for j, tr := range cm.formats {
c.medias[i].formats[j].onPacketRTP = tr.onPacketRTP
}
}
_, err = c.doPlay(c.lastRange)
if err != nil {
return err
}
return nil
}
func (c *Client) trySwitchingProtocol2(medi *description.Media, baseURL *base.URL) (*base.Response, error) {
c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP2{})
prevConnURL := c.connURL
c.reset()
v := TransportTCP
c.effectiveTransport = &v
c.connURL = prevConnURL
// some Hikvision cameras require a describe before a setup
_, _, err := c.doDescribe(c.lastDescribeURL)
if err != nil {
return nil, err
}
return c.doSetup(baseURL, medi, 0, 0)
}
func (c *Client) startReadRoutines() {
// allocate writer here because it's needed by RTCP receiver / sender
if c.state == clientStateRecord || c.backChannelSetupped {
c.writer.allocateBuffer(c.WriteQueueSize)
} else {
// when reading, buffer is only used to send RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers.
c.writer.allocateBuffer(8)
}
c.timeDecoder = rtptime.NewGlobalDecoder()
for _, cm := range c.medias {
cm.start()
}
if c.state == clientStatePlay && c.stdChannelSetupped {
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
switch *c.effectiveTransport {
case TransportUDP:
c.checkTimeoutTimer = time.NewTimer(c.InitialUDPReadTimeout)
c.checkTimeoutInitial = true
case TransportUDPMulticast:
c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
default: // TCP
c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
v := c.timeNow().Unix()
c.tcpLastFrameTime = &v
}
}
if *c.effectiveTransport == TransportTCP {
c.reader.setAllowInterleavedFrames(true)
}
}
func (c *Client) stopReadRoutines() {
if c.reader != nil {
c.reader.setAllowInterleavedFrames(false)
}
c.checkTimeoutTimer = emptyTimer()
c.keepaliveTimer = emptyTimer()
for _, cm := range c.medias {
cm.stop()
}
c.timeDecoder = nil
}
func (c *Client) startWriter() {
c.writer.start()
}
func (c *Client) stopWriter() {
c.writer.stop()
}
func (c *Client) connOpen() error {
if c.nconn != nil {
return nil
}
if c.connURL.Scheme != "rtsp" && c.connURL.Scheme != "rtsps" {
return liberrors.ErrClientUnsupportedScheme{Scheme: c.connURL.Scheme}
}
if c.connURL.Scheme == "rtsps" && c.Transport != nil && *c.Transport != TransportTCP {
return liberrors.ErrClientRTSPSTCP{}
}
dialCtx, dialCtxCancel := context.WithTimeout(c.ctx, c.ReadTimeout)
defer dialCtxCancel()
nconn, err := c.DialContext(dialCtx, "tcp", canonicalAddr(c.connURL))
if err != nil {
return err
}
if c.connURL.Scheme == "rtsps" {
tlsConfig := c.TLSConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{}
}
tlsConfig.ServerName = c.connURL.Hostname()
nconn = tls.Client(nconn, tlsConfig)
}
c.nconn = nconn
bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent)
c.conn = conn.NewConn(bc)
c.reader = &clientReader{
c: c,
}
c.reader.start()
return nil
}
func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error) {
if !c.optionsSent && req.Method != base.Options {
_, err := c.doOptions(req.URL)
if err != nil {
return nil, err
}
}
if req.Header == nil {
req.Header = make(base.Header)
}
if c.session != "" {
req.Header["Session"] = base.HeaderValue{c.session}
}
c.cseq++
cseqStr := strconv.FormatInt(int64(c.cseq), 10)
req.Header["CSeq"] = base.HeaderValue{cseqStr}
req.Header["User-Agent"] = base.HeaderValue{c.UserAgent}
if c.sender != nil {
c.sender.AddAuthorization(req)
}
c.OnRequest(req)
c.nconn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
err := c.conn.WriteRequest(req)
if err != nil {
return nil, err
}
if skipResponse {
return nil, nil
}
res, err := c.waitResponse(cseqStr)
if err != nil {
c.mustClose = true
return nil, err
}
// get session from response
if v, ok := res.Header["Session"]; ok {
var sx headers.Session
err := sx.Unmarshal(v)
if err != nil {
return nil, liberrors.ErrClientSessionHeaderInvalid{Err: err}
}
c.session = sx.Session
if sx.Timeout != nil && *sx.Timeout > 0 {
c.keepalivePeriod = time.Duration(*sx.Timeout) * time.Second * 8 / 10
}
}
// send request again with authentication
if res.StatusCode == base.StatusUnauthorized && req.URL.User != nil && c.sender == nil {
pass, _ := req.URL.User.Password()
user := req.URL.User.Username()
sender, err := auth.NewSender(res.Header["WWW-Authenticate"], user, pass)
if err != nil {
return nil, liberrors.ErrClientAuthSetup{Err: err}
}
c.sender = sender
return c.do(req, skipResponse)
}
return res, nil
}
func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool {
for _, ct := range c.medias {
lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime)
if lft != 0 {
return false
}
lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime)
if lft != 0 {
return false
}
}
return true
}
func (c *Client) isInUDPTimeout() bool {
now := c.timeNow()
for _, ct := range c.medias {
lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
}
return true
}
func (c *Client) isInTCPTimeout() bool {
now := c.timeNow()
lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0)
return now.Sub(lft) >= c.ReadTimeout
}
func (c *Client) doCheckTimeout() error {
if *c.effectiveTransport == TransportUDP ||
*c.effectiveTransport == TransportUDPMulticast {
if c.checkTimeoutInitial && !c.backChannelSetupped {
c.checkTimeoutInitial = false
if c.atLeastOneUDPPacketHasBeenReceived() {
err := c.trySwitchingProtocol()
if err != nil {
return err
}
}
} else if c.isInUDPTimeout() {
return liberrors.ErrClientUDPTimeout{}
}
} else if c.isInTCPTimeout() {
return liberrors.ErrClientTCPTimeout{}
}
return nil
}
func (c *Client) doKeepAlive() error {
// some cameras do not reply to keepalives, do not wait for responses.
_, err := c.do(&base.Request{
Method: func() base.Method {
// the VLC integrated rtsp server requires GET_PARAMETER
if c.useGetParameter {
return base.GetParameter
}
return base.Options
}(),
// use the stream base URL, otherwise some cameras do not reply
URL: c.baseURL,
}, true)
return err
}
func (c *Client) doOptions(u *base.URL) (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStateInitial: {},
clientStatePrePlay: {},
clientStatePreRecord: {},
})
if err != nil {
return nil, err
}
err = c.connOpen()
if err != nil {
return nil, err
}
res, err := c.do(&base.Request{
Method: base.Options,
URL: u,
}, false)
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
// since this method is not implemented by every RTSP server,
// return an error only if status code is not 404
if res.StatusCode == base.StatusNotFound {
return res, nil
}
return nil, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage}
}
c.optionsSent = true
c.useGetParameter = supportsGetParameter(res.Header)
return res, nil
}
// Options sends an OPTIONS request.
func (c *Client) Options(u *base.URL) (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.chOptions <- optionsReq{url: u, res: cres}:
res := <-cres
return res.res, res.err
case <-c.done:
return nil, c.closeError
}
}
func (c *Client) doDescribe(u *base.URL) (*description.Session, *base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStateInitial: {},
clientStatePrePlay: {},
clientStatePreRecord: {},
})
if err != nil {
return nil, nil, err
}
err = c.connOpen()
if err != nil {
return nil, nil, err
}
header := base.Header{
"Accept": base.HeaderValue{"application/sdp"},
}
if c.RequestBackChannels {
header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"}
}
res, err := c.do(&base.Request{
Method: base.Describe,
URL: u,
Header: header,
}, false)
if err != nil {
return nil, nil, err
}
if res.StatusCode != base.StatusOK {
// redirect
if res.StatusCode >= base.StatusMovedPermanently &&
res.StatusCode <= base.StatusUseProxy &&
len(res.Header["Location"]) == 1 {
c.reset()
ru, err := base.ParseURL(res.Header["Location"][0])
if err != nil {
return nil, nil, err
}
if u.User != nil {
ru.User = u.User
}
c.connURL = &base.URL{
Scheme: ru.Scheme,
Host: ru.Host,
}
return c.doDescribe(ru)
}
return nil, res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage}
}
ct, ok := res.Header["Content-Type"]
if !ok || len(ct) != 1 {
return nil, nil, liberrors.ErrClientContentTypeMissing{}
}
// strip encoding information from Content-Type header
ct = base.HeaderValue{strings.Split(ct[0], ";")[0]}
if ct[0] != "application/sdp" {
return nil, nil, liberrors.ErrClientContentTypeUnsupported{CT: ct}
}
var ssd sdp.SessionDescription
err = ssd.Unmarshal(res.Body)
if err != nil {
return nil, nil, liberrors.ErrClientSDPInvalid{Err: err}
}
var desc description.Session
err = desc.Unmarshal(&ssd)
if err != nil {
return nil, nil, liberrors.ErrClientSDPInvalid{Err: err}
}
baseURL, err := findBaseURL(&ssd, res, u)
if err != nil {
return nil, nil, err
}
desc.BaseURL = baseURL
c.lastDescribeURL = u
return &desc, res, nil
}
// Describe sends a DESCRIBE request.
func (c *Client) Describe(u *base.URL) (*description.Session, *base.Response, error) {
cres := make(chan clientRes)
select {
case c.chDescribe <- describeReq{url: u, res: cres}:
res := <-cres
return res.sd, res.res, res.err
case <-c.done:
return nil, nil, c.closeError
}
}
func (c *Client) doAnnounce(u *base.URL, desc *description.Session) (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStateInitial: {},
})
if err != nil {
return nil, err
}
err = c.connOpen()
if err != nil {
return nil, err
}
prepareForAnnounce(desc)
byts, err := desc.Marshal(false)
if err != nil {
return nil, err
}
res, err := c.do(&base.Request{
Method: base.Announce,
URL: u,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: byts,
}, false)
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, liberrors.ErrClientBadStatusCode{
Code: res.StatusCode, Message: res.StatusMessage,
}
}
c.baseURL = u.Clone()
c.state = clientStatePreRecord
return res, nil
}
// Announce sends an ANNOUNCE request.
func (c *Client) Announce(u *base.URL, desc *description.Session) (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.chAnnounce <- announceReq{url: u, desc: desc, res: cres}:
res := <-cres
return res.res, res.err
case <-c.done:
return nil, c.closeError
}
}
func (c *Client) doSetup(
baseURL *base.URL,
medi *description.Media,
rtpPort int,
rtcpPort int,
) (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStateInitial: {},
clientStatePrePlay: {},
clientStatePreRecord: {},
})
if err != nil {
return nil, err
}
err = c.connOpen()
if err != nil {
return nil, err
}
if c.baseURL != nil && *baseURL != *c.baseURL {
return nil, liberrors.ErrClientCannotSetupMediasDifferentURLs{}
}
th := headers.Transport{
Mode: func() *headers.TransportMode {
if c.state == clientStatePreRecord {
v := headers.TransportModeRecord
return &v
}
v := headers.TransportModePlay
return &v
}(),
}
cm := &clientMedia{
c: c,
onPacketRTCP: func(rtcp.Packet) {},
}
if c.effectiveTransport == nil {
if c.connURL.Scheme == "rtsps" { // always use TCP if encrypted
v := TransportTCP
c.effectiveTransport = &v
} else if c.Transport != nil { // take transport from config
c.effectiveTransport = c.Transport
}
}
var desiredTransport Transport
if c.effectiveTransport != nil {
desiredTransport = *c.effectiveTransport
} else {
desiredTransport = TransportUDP
}
switch desiredTransport {
case TransportUDP:
if (rtpPort == 0 && rtcpPort != 0) ||
(rtpPort != 0 && rtcpPort == 0) {
return nil, liberrors.ErrClientUDPPortsZero{}
}
if rtpPort != 0 && rtcpPort != (rtpPort+1) {
return nil, liberrors.ErrClientUDPPortsNotConsecutive{}
}
err := cm.allocateUDPListeners(
false,
nil,
net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)),
net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)),
)
if err != nil {
return nil, err
}
v1 := headers.TransportDeliveryUnicast
th.Delivery = &v1
th.Protocol = headers.TransportProtocolUDP
th.ClientPorts = &[2]int{cm.udpRTPListener.port(), cm.udpRTCPListener.port()}
case TransportUDPMulticast:
v1 := headers.TransportDeliveryMulticast
th.Delivery = &v1
th.Protocol = headers.TransportProtocolUDP
case TransportTCP:
v1 := headers.TransportDeliveryUnicast
th.Delivery = &v1
th.Protocol = headers.TransportProtocolTCP
ch := c.findFreeChannelPair()
th.InterleavedIDs = &[2]int{ch, ch + 1}
}
mediaURL, err := medi.URL(baseURL)
if err != nil {
cm.close()
return nil, err
}
header := base.Header{
"Transport": th.Marshal(),
}
if medi.IsBackChannel {
header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"}
}
res, err := c.do(&base.Request{
Method: base.Setup,
URL: mediaURL,
Header: header,
}, false)
if err != nil {
cm.close()
return nil, err
}
if res.StatusCode != base.StatusOK {
cm.close()
// switch transport automatically
if res.StatusCode == base.StatusUnsupportedTransport &&
c.effectiveTransport == nil {
c.OnTransportSwitch(liberrors.ErrClientSwitchToTCP2{})
v := TransportTCP
c.effectiveTransport = &v
return c.doSetup(baseURL, medi, 0, 0)
}
return nil, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage}
}
var thRes headers.Transport
err = thRes.Unmarshal(res.Header["Transport"])
if err != nil {
cm.close()
return nil, liberrors.ErrClientTransportHeaderInvalid{Err: err}
}
switch desiredTransport {
case TransportUDP, TransportUDPMulticast:
if thRes.Protocol == headers.TransportProtocolTCP {
cm.close()
// switch transport automatically
if c.effectiveTransport == nil &&
c.Transport == nil {
c.baseURL = baseURL
return c.trySwitchingProtocol2(medi, baseURL)
}
return nil, liberrors.ErrClientServerRequestedTCP{}
}
}
switch desiredTransport {
case TransportUDP:
if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast {
cm.close()
return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{}
}
serverPortsValid := thRes.ServerPorts != nil && !isAnyPort(thRes.ServerPorts[0]) && !isAnyPort(thRes.ServerPorts[1])
if (c.state == clientStatePreRecord || !c.AnyPortEnable) && !serverPortsValid {
cm.close()
return nil, liberrors.ErrClientServerPortsNotProvided{}
}
var readIP net.IP
if thRes.Source != nil {
readIP = *thRes.Source
} else {
readIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP
}
if serverPortsValid {
if !c.AnyPortEnable {
cm.udpRTPListener.readPort = thRes.ServerPorts[0]
}
cm.udpRTPListener.writeAddr = &net.UDPAddr{
IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP,
Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone,
Port: thRes.ServerPorts[0],
}
}
cm.udpRTPListener.readIP = readIP
if serverPortsValid {
if !c.AnyPortEnable {
cm.udpRTCPListener.readPort = thRes.ServerPorts[1]
}
cm.udpRTCPListener.writeAddr = &net.UDPAddr{
IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP,
Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone,
Port: thRes.ServerPorts[1],
}
}
cm.udpRTCPListener.readIP = readIP
case TransportUDPMulticast:
if thRes.Delivery == nil || *thRes.Delivery != headers.TransportDeliveryMulticast {
return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{}
}
if thRes.Ports == nil {
return nil, liberrors.ErrClientTransportHeaderNoPorts{}
}
if thRes.Destination == nil {
return nil, liberrors.ErrClientTransportHeaderNoDestination{}
}
var readIP net.IP
if thRes.Source != nil {
readIP = *thRes.Source
} else {
readIP = c.nconn.RemoteAddr().(*net.TCPAddr).IP
}
err := cm.allocateUDPListeners(
true,
readIP,
net.JoinHostPort(thRes.Destination.String(), strconv.FormatInt(int64(thRes.Ports[0]), 10)),
net.JoinHostPort(thRes.Destination.String(), strconv.FormatInt(int64(thRes.Ports[1]), 10)),
)
if err != nil {
return nil, err
}
cm.udpRTPListener.readIP = readIP
cm.udpRTPListener.readPort = thRes.Ports[0]
cm.udpRTPListener.writeAddr = &net.UDPAddr{
IP: *thRes.Destination,
Port: thRes.Ports[0],
}
cm.udpRTCPListener.readIP = readIP
cm.udpRTCPListener.readPort = thRes.Ports[1]
cm.udpRTCPListener.writeAddr = &net.UDPAddr{
IP: *thRes.Destination,
Port: thRes.Ports[1],
}
case TransportTCP:
if thRes.Protocol != headers.TransportProtocolTCP {
return nil, liberrors.ErrClientServerRequestedUDP{}
}
if thRes.Delivery != nil && *thRes.Delivery != headers.TransportDeliveryUnicast {
return nil, liberrors.ErrClientTransportHeaderInvalidDelivery{}
}
if thRes.InterleavedIDs == nil {
return nil, liberrors.ErrClientTransportHeaderNoInterleavedIDs{}
}
if (thRes.InterleavedIDs[0] + 1) != thRes.InterleavedIDs[1] {
return nil, liberrors.ErrClientTransportHeaderInvalidInterleavedIDs{}
}
if c.isChannelPairInUse(thRes.InterleavedIDs[0]) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrClientTransportHeaderInterleavedIDsInUse{}
}
cm.tcpChannel = thRes.InterleavedIDs[0]
}
if c.medias == nil {
c.medias = make(map[*description.Media]*clientMedia)
}
c.medias[medi] = cm
cm.setMedia(medi)
c.baseURL = baseURL
c.effectiveTransport = &desiredTransport
if medi.IsBackChannel {
c.backChannelSetupped = true
} else {
c.stdChannelSetupped = true
}
if c.state == clientStateInitial {
c.state = clientStatePrePlay
}
return res, nil
}
func (c *Client) isChannelPairInUse(channel int) bool {
for _, cm := range c.medias {
if (cm.tcpChannel+1) == channel || cm.tcpChannel == channel || cm.tcpChannel == (channel+1) {
return true
}
}
return false
}
func (c *Client) findFreeChannelPair() int {
for i := 0; ; i += 2 { // prefer even channels
if !c.isChannelPairInUse(i) {
return i
}
}
}
// Setup sends a SETUP request.
// rtpPort and rtcpPort are used only if transport is UDP.
// if rtpPort and rtcpPort are zero, they are chosen automatically.
func (c *Client) Setup(
baseURL *base.URL,
media *description.Media,
rtpPort int,
rtcpPort int,
) (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.chSetup <- setupReq{
baseURL: baseURL,
media: media,
rtpPort: rtpPort,
rtcpPort: rtcpPort,
res: cres,
}:
res := <-cres
return res.res, res.err
case <-c.done:
return nil, c.closeError
}
}
// SetupAll setups all the given medias.
func (c *Client) SetupAll(baseURL *base.URL, medias []*description.Media) error {
for _, m := range medias {
_, err := c.Setup(baseURL, m, 0, 0)
if err != nil {
return err
}
}
return nil
}
func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStatePrePlay: {},
})
if err != nil {
return nil, err
}
c.state = clientStatePlay
c.startReadRoutines()
// Range is mandatory in Parrot Streaming Server
if ra == nil {
ra = &headers.Range{
Value: &headers.RangeNPT{
Start: 0,
},
}
}
header := base.Header{
"Range": ra.Marshal(),
}
if c.backChannelSetupped {
header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"}
}
res, err := c.do(&base.Request{
Method: base.Play,
URL: c.baseURL,
Header: header,
}, false)
if err != nil {
c.stopReadRoutines()
c.state = clientStatePrePlay
return nil, err
}
if res.StatusCode != base.StatusOK {
c.stopReadRoutines()
c.state = clientStatePrePlay
return nil, liberrors.ErrClientBadStatusCode{
Code: res.StatusCode, Message: res.StatusMessage,
}
}
// open the firewall by sending empty packets to the counterpart.
// do this before sending the request.
// don't do this with multicast, otherwise the RTP packet is going to be broadcasted
// to all listeners, including us, messing up the stream.
if *c.effectiveTransport == TransportUDP {
for _, cm := range c.medias {
byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal()
cm.udpRTPListener.write(byts) //nolint:errcheck
byts, _ = (&rtcp.ReceiverReport{}).Marshal()
cm.udpRTCPListener.write(byts) //nolint:errcheck
}
}
c.startWriter()
c.lastRange = ra
return res, nil
}
// Play sends a PLAY request.
// This can be called only after Setup().
func (c *Client) Play(ra *headers.Range) (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.chPlay <- playReq{ra: ra, res: cres}:
res := <-cres
return res.res, res.err
case <-c.done:
return nil, c.closeError
}
}
func (c *Client) doRecord() (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStatePreRecord: {},
})
if err != nil {
return nil, err
}
c.state = clientStateRecord
c.startReadRoutines()
res, err := c.do(&base.Request{
Method: base.Record,
URL: c.baseURL,
}, false)
if err != nil {
c.stopReadRoutines()
c.state = clientStatePreRecord
return nil, err
}
if res.StatusCode != base.StatusOK {
c.stopReadRoutines()
c.state = clientStatePreRecord
return nil, liberrors.ErrClientBadStatusCode{
Code: res.StatusCode, Message: res.StatusMessage,
}
}
c.startWriter()
return nil, nil
}
// Record sends a RECORD request.
// This can be called only after Announce() and Setup().
func (c *Client) Record() (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.chRecord <- recordReq{res: cres}:
res := <-cres
return res.res, res.err
case <-c.done:
return nil, c.closeError
}
}
func (c *Client) doPause() (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStatePlay: {},
clientStateRecord: {},
})
if err != nil {
return nil, err
}
c.stopWriter()
res, err := c.do(&base.Request{
Method: base.Pause,
URL: c.baseURL,
}, false)
if err != nil {
c.startWriter()
return nil, err
}
if res.StatusCode != base.StatusOK {
c.startWriter()
return nil, liberrors.ErrClientBadStatusCode{
Code: res.StatusCode, Message: res.StatusMessage,
}
}
c.stopReadRoutines()
switch c.state {
case clientStatePlay:
c.state = clientStatePrePlay
case clientStateRecord:
c.state = clientStatePreRecord
}
return res, nil
}
// Pause sends a PAUSE request.
// This can be called only after Play() or Record().
func (c *Client) Pause() (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.chPause <- pauseReq{res: cres}:
res := <-cres
return res.res, res.err
case <-c.done:
return nil, c.closeError
}
}
// Seek asks the server to re-start the stream from a specific timestamp.
func (c *Client) Seek(ra *headers.Range) (*base.Response, error) {
_, err := c.Pause()
if err != nil {
return nil, err
}
return c.Play(ra)
}
// OnPacketRTPAny sets the callback that is called when a RTP packet is read from any setupped media.
func (c *Client) OnPacketRTPAny(cb OnPacketRTPAnyFunc) {
for _, cm := range c.medias {
cmedia := cm.media
for _, forma := range cm.media.Formats {
c.OnPacketRTP(cm.media, forma, func(pkt *rtp.Packet) {
cb(cmedia, forma, pkt)
})
}
}
}
// OnPacketRTCPAny sets the callback that is called when a RTCP packet is read from any setupped media.
func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
for _, cm := range c.medias {
cmedia := cm.media
c.OnPacketRTCP(cm.media, func(pkt rtcp.Packet) {
cb(cmedia, pkt)
})
}
}
// OnPacketRTP sets the callback that is called when a RTP packet is read.
func (c *Client) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) {
cm := c.medias[medi]
ct := cm.formats[forma.PayloadType()]
ct.onPacketRTP = cb
}
// OnPacketRTCP sets the callback that is called when a RTCP packet is read.
func (c *Client) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) {
cm := c.medias[medi]
cm.onPacketRTCP = cb
}
// WritePacketRTP writes a RTP packet to the server.
func (c *Client) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error {
return c.WritePacketRTPWithNTP(medi, pkt, c.timeNow())
}
// WritePacketRTPWithNTP writes a RTP packet to the server.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, c.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
select {
case <-c.done:
return c.closeError
default:
}
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.writePacketRTP(byts, pkt, ntp)
}
// WritePacketRTCP writes a RTCP packet to the server.
func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return err
}
select {
case <-c.done:
return c.closeError
default:
}
cm := c.medias[medi]
return cm.writePacketRTCP(byts)
}
// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder.Decode(ct.format, pkt)
}
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from RTCP sender reports.
func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.rtcpReceiver.PacketNTP(pkt.Timestamp)
}
func (c *Client) readResponse(res *base.Response) {
c.chReadResponse <- res
}
func (c *Client) readRequest(req *base.Request) {
c.chReadRequest <- req
}
func (c *Client) readError(err error) {
c.chReadError <- err
}
Go
1
https://gitee.com/likunde_admin/gortsplib.git
git@gitee.com:likunde_admin/gortsplib.git
likunde_admin
gortsplib
gortsplib
main

搜索帮助